use std::sync::Arc;
use datafusion::{
dataframe::DataFrame,
error::DataFusionError,
execution::{context::SessionState, TaskContext},
logical_expr::LogicalPlan,
prelude::SessionContext,
};
use object_store::ObjectStore;
use crate::object_storage::{AwsOptions, GcpOptions};
#[async_trait::async_trait]
pub trait CliSessionContext {
fn task_ctx(&self) -> Arc<TaskContext>;
fn session_state(&self) -> SessionState;
fn register_object_store(
&self,
url: &url::Url,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore + 'static>>;
fn register_table_options_extension_from_scheme(&self, scheme: &str);
async fn execute_logical_plan(
&self,
plan: LogicalPlan,
) -> Result<DataFrame, DataFusionError>;
}
#[async_trait::async_trait]
impl CliSessionContext for SessionContext {
fn task_ctx(&self) -> Arc<TaskContext> {
self.task_ctx()
}
fn session_state(&self) -> SessionState {
self.state()
}
fn register_object_store(
&self,
url: &url::Url,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore + 'static>> {
self.register_object_store(url, object_store)
}
fn register_table_options_extension_from_scheme(&self, scheme: &str) {
match scheme {
"s3" | "oss" | "cos" => {
self.register_table_options_extension(AwsOptions::default())
}
"gs" | "gcs" => {
self.register_table_options_extension(GcpOptions::default())
}
_ => {}
}
}
async fn execute_logical_plan(
&self,
plan: LogicalPlan,
) -> Result<DataFrame, DataFusionError> {
self.execute_logical_plan(plan).await
}
}