use std::collections::HashMap;
use crate::catalog::Catalog;
pub use crate::client::SparkSessionBuilder;
use crate::client::{MetadataInterceptor, SparkConnectClient};
use crate::dataframe::{DataFrame, DataFrameReader};
use crate::errors::SparkError;
use crate::plan::LogicalPlanBuilder;
use crate::spark;
use tonic::service::interceptor::InterceptedService;
use tonic::transport::Channel;
#[derive(Clone, Debug)]
pub struct SparkSession {
pub client: SparkConnectClient<InterceptedService<Channel, MetadataInterceptor>>,
}
impl SparkSession {
pub fn new(
client: SparkConnectClient<InterceptedService<Channel, MetadataInterceptor>>,
) -> Self {
Self { client }
}
pub fn range(
self,
start: Option<i64>,
end: i64,
step: i64,
num_partitions: Option<i32>,
) -> DataFrame {
let range_relation = spark::relation::RelType::Range(spark::Range {
start,
end,
step,
num_partitions,
});
DataFrame::new(self, LogicalPlanBuilder::from(range_relation))
}
pub fn read(self) -> DataFrameReader {
DataFrameReader::new(self)
}
pub fn catalog(self) -> Catalog {
Catalog::new(self)
}
pub async fn sql(&mut self, sql_query: &str) -> Result<DataFrame, SparkError> {
let sql_cmd = spark::command::CommandType::SqlCommand(spark::SqlCommand {
sql: sql_query.to_string(),
args: HashMap::default(),
pos_args: vec![],
});
let plan = LogicalPlanBuilder::build_plan_cmd(sql_cmd);
self.client.execute_command(plan).await?;
let cmd = self.client.handler.sql_command_result.to_owned().unwrap();
let logical_plan = LogicalPlanBuilder::new(cmd.relation.unwrap());
Ok(DataFrame::new(self.clone(), logical_plan))
}
}