use std::collections::HashMap;
use crate::catalog::Catalog;
pub use crate::client::SparkSessionBuilder;
use crate::client::{MetadataInterceptor, SparkConnectClient};
use crate::dataframe::{DataFrame, DataFrameReader};
use crate::errors::SparkError;
use crate::plan::LogicalPlanBuilder;
use crate::spark;
use tonic::service::interceptor::InterceptedService;
use tonic::transport::Channel;
#[derive(Clone, Debug)]
pub struct SparkSession {
    pub client: SparkConnectClient<InterceptedService<Channel, MetadataInterceptor>>,
}
impl SparkSession {
    pub fn new(
        client: SparkConnectClient<InterceptedService<Channel, MetadataInterceptor>>,
    ) -> Self {
        Self { client }
    }
    pub fn range(
        self,
        start: Option<i64>,
        end: i64,
        step: i64,
        num_partitions: Option<i32>,
    ) -> DataFrame {
        let range_relation = spark::relation::RelType::Range(spark::Range {
            start,
            end,
            step,
            num_partitions,
        });
        DataFrame::new(self, LogicalPlanBuilder::from(range_relation))
    }
    pub fn read(self) -> DataFrameReader {
        DataFrameReader::new(self)
    }
    pub fn catalog(self) -> Catalog {
        Catalog::new(self)
    }
    pub async fn sql(&mut self, sql_query: &str) -> Result<DataFrame, SparkError> {
        let sql_cmd = spark::command::CommandType::SqlCommand(spark::SqlCommand {
            sql: sql_query.to_string(),
            args: HashMap::default(),
            pos_args: vec![],
        });
        let plan = LogicalPlanBuilder::build_plan_cmd(sql_cmd);
        self.client.execute_command(plan).await?;
        let cmd = self.client.handler.sql_command_result.to_owned().unwrap();
        let logical_plan = LogicalPlanBuilder::new(cmd.relation.unwrap());
        Ok(DataFrame::new(self.clone(), logical_plan))
    }
}