use std::sync::Arc;
use async_trait::async_trait;
use datafusion::{
common::DFSchema,
datasource::file_format::{csv::CsvSink, json::JsonSink, parquet::ParquetSink},
error::Result,
execution::context::SessionState,
logical_expr::{Expr, LogicalPlan},
physical_plan::{insert::DataSinkExec, ExecutionPlan, PhysicalExpr},
physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
};
use crate::ExonRuntimeEnvExt;
use super::exon_extension_planner::ExomeExtensionPlanner;
pub struct ExonPhysicalPlanner {
planner: DefaultPhysicalPlanner,
}
impl Default for ExonPhysicalPlanner {
fn default() -> Self {
let exon_extension_planner = ExomeExtensionPlanner::default();
let planner =
DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(exon_extension_planner)]);
Self { planner }
}
}
#[async_trait]
impl PhysicalPlanner for ExonPhysicalPlanner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = self
.planner
.create_physical_plan(logical_plan, session_state)
.await?;
let runtime = session_state.runtime_env();
if let Some(file_sink) = plan.as_any().downcast_ref::<DataSinkExec>() {
let sink = file_sink.sink();
if let Some(parquet_sink) = sink.as_any().downcast_ref::<ParquetSink>() {
let config = parquet_sink.config();
let url = config.object_store_url.as_ref();
runtime.exon_register_object_store_url(url).await?;
}
if let Some(json_sink) = sink.as_any().downcast_ref::<JsonSink>() {
let url = json_sink.config().object_store_url.as_ref();
runtime.exon_register_object_store_url(url).await?;
}
if let Some(csv_sink) = sink.as_any().downcast_ref::<CsvSink>() {
let url = csv_sink.config().object_store_url.as_ref();
runtime.exon_register_object_store_url(url).await?;
}
}
Ok(plan)
}
fn create_physical_expr(
&self,
expr: &Expr,
input_dfschema: &DFSchema,
session_state: &SessionState,
) -> Result<Arc<dyn PhysicalExpr>> {
self.planner
.create_physical_expr(expr, input_dfschema, session_state)
}
}