clickhouse_datafusion/context/
planner.rs

1//! An `ExtensionPlanner` implementation for executing [`ClickHouseFunctionNode`]s
2use std::sync::Arc;
3
4use datafusion::common::plan_datafusion_err;
5use datafusion::error::Result;
6use datafusion::execution::SessionState;
7use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
8use datafusion::physical_plan::ExecutionPlan;
9use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
10
11use super::plan_node::{CLICKHOUSE_FUNCTION_NODE_NAME, ClickHouseFunctionNode};
12
13/// `DataFusion` `ExtensionPlanner` for executing [`ClickHouseFunctionNode`]s.
14#[derive(Clone, Copy, Debug)]
15pub struct ClickHouseExtensionPlanner;
16
17#[async_trait::async_trait]
18impl ExtensionPlanner for ClickHouseExtensionPlanner {
19    async fn plan_extension(
20        &self,
21        _planner: &dyn PhysicalPlanner,
22        node: &dyn UserDefinedLogicalNode,
23        _logical_inputs: &[&LogicalPlan],
24        _physical_inputs: &[Arc<dyn ExecutionPlan>],
25        _session_state: &SessionState,
26    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
27        if node.name() == CLICKHOUSE_FUNCTION_NODE_NAME {
28            let clickhouse_node = node
29                .as_any()
30                .downcast_ref::<ClickHouseFunctionNode>()
31                .ok_or(plan_datafusion_err!("Failed to downcast to ClickHouseFunctionNode"))?;
32            clickhouse_node.execute().map(Some)
33        } else {
34            Ok(None)
35        }
36    }
37}