spark_connect_core/
conf.rsuse std::collections::HashMap;
use crate::spark;
use crate::client::{MetadataInterceptor, SparkConnectClient};
use crate::errors::SparkError;
use tonic::service::interceptor::InterceptedService;
#[cfg(not(feature = "wasm"))]
use tonic::transport::Channel;
#[cfg(feature = "wasm")]
use tonic_web_wasm_client::Client;
pub struct RunTimeConfig {
#[cfg(not(feature = "wasm"))]
pub(crate) client: SparkConnectClient<InterceptedService<Channel, MetadataInterceptor>>,
#[cfg(feature = "wasm")]
pub(crate) client: SparkConnectClient<InterceptedService<Client, MetadataInterceptor>>,
}
impl RunTimeConfig {
#[cfg(not(feature = "wasm"))]
pub fn new(
client: &SparkConnectClient<InterceptedService<Channel, MetadataInterceptor>>,
) -> RunTimeConfig {
RunTimeConfig {
client: client.clone(),
}
}
#[cfg(feature = "wasm")]
pub fn new(
client: &SparkConnectClient<InterceptedService<Client, MetadataInterceptor>>,
) -> RunTimeConfig {
RunTimeConfig {
client: client.clone(),
}
}
pub(crate) async fn set_configs(
&mut self,
map: &HashMap<String, String>,
) -> Result<(), SparkError> {
for (key, value) in map {
self.set(key.as_str(), value.as_str()).await?
}
Ok(())
}
pub async fn set(&mut self, key: &str, value: &str) -> Result<(), SparkError> {
let op_type = spark::config_request::operation::OpType::Set(spark::config_request::Set {
pairs: vec![spark::KeyValue {
key: key.into(),
value: Some(value.into()),
}],
});
let operation = spark::config_request::Operation {
op_type: Some(op_type),
};
let _ = self.client.config_request(operation).await?;
Ok(())
}
pub async fn unset(&mut self, key: &str) -> Result<(), SparkError> {
let op_type =
spark::config_request::operation::OpType::Unset(spark::config_request::Unset {
keys: vec![key.to_string()],
});
let operation = spark::config_request::Operation {
op_type: Some(op_type),
};
let _ = self.client.config_request(operation).await?;
Ok(())
}
pub async fn get(&mut self, key: &str, default: Option<&str>) -> Result<String, SparkError> {
let operation = match default {
Some(default) => {
let op_type = spark::config_request::operation::OpType::GetWithDefault(
spark::config_request::GetWithDefault {
pairs: vec![spark::KeyValue {
key: key.into(),
value: Some(default.into()),
}],
},
);
spark::config_request::Operation {
op_type: Some(op_type),
}
}
None => {
let op_type =
spark::config_request::operation::OpType::Get(spark::config_request::Get {
keys: vec![key.to_string()],
});
spark::config_request::Operation {
op_type: Some(op_type),
}
}
};
let resp = self.client.config_request(operation).await?;
let val = resp.pairs.first().unwrap().value().to_string();
Ok(val)
}
pub async fn is_modifable(&mut self, key: &str) -> Result<bool, SparkError> {
let op_type = spark::config_request::operation::OpType::IsModifiable(
spark::config_request::IsModifiable {
keys: vec![key.to_string()],
},
);
let operation = spark::config_request::Operation {
op_type: Some(op_type),
};
let resp = self.client.config_request(operation).await?;
let val = resp.pairs.first().unwrap().value();
match val {
"true" => Ok(true),
"false" => Ok(false),
_ => Err(SparkError::AnalysisException(
"Unexpected response value for boolean".to_string(),
)),
}
}
}