clickhouse_datafusion/context/
plan_node.rs

1//! A `UserDefinedLogicalNodeCore` implementation for wrapping largest sub-trees of a `DataFusion`
2//! logical plan for execution on `ClickHouse` directly.
3use std::sync::Arc;
4
5use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
6use datafusion::common::{DFSchemaRef, plan_err};
7use datafusion::datasource::source_as_provider;
8use datafusion::error::Result;
9use datafusion::logical_expr::{InvariantLevel, LogicalPlan, UserDefinedLogicalNodeCore};
10use datafusion::physical_plan::ExecutionPlan;
11use datafusion::prelude::Expr;
12use datafusion::sql::unparser::Unparser;
13
14use crate::ClickHouseConnectionPool;
15use crate::dialect::ClickHouseDialect;
16use crate::providers::table::ClickHouseTableProvider;
17use crate::sql::ClickHouseSqlExec;
18
19pub const CLICKHOUSE_FUNCTION_NODE_NAME: &str = "ClickHouseFunctionNode";
20
21/// Extension node for `ClickHouse` function pushdown
22///
23/// This extension node serves as a wrapper only, so that during planner execution, the input plan
24/// can be unparsed into sql and executed on `ClickHouse`.
25#[derive(Clone, Debug)]
26pub struct ClickHouseFunctionNode {
27    /// The input plan that this node wraps
28    pub(super) input: LogicalPlan,
29    schema:           DFSchemaRef,
30    pool:             Arc<ClickHouseConnectionPool>,
31    coerce_schema:    bool,
32}
33
34impl ClickHouseFunctionNode {
35    /// Create a new `ClickHouseFunctionNode`
36    ///
37    /// # Errors
38    /// - Returns an error if the `TableProvider` cannot be found or downcast.
39    ///
40    /// # Panics
41    /// - Does not panic. No errors are thrown in plan recursion.
42    pub fn try_new(input: LogicalPlan) -> Result<Self> {
43        let schema = Arc::clone(input.schema());
44        // If the table provider is set to coerce schemas, ensure the execution plan coerces
45        let mut coerce_schema = false;
46        let mut pool = None;
47        let _ = input
48            .apply(|plan| {
49                if let LogicalPlan::TableScan(scan) = plan {
50                    // Convert to TableProvider
51                    let provider = source_as_provider(&scan.source)?;
52                    if let Some(provider) =
53                        provider.as_any().downcast_ref::<ClickHouseTableProvider>()
54                    {
55                        coerce_schema = provider.coerce_schema();
56                        pool = Some(Arc::clone(provider.pool()));
57                        return Ok(TreeNodeRecursion::Stop);
58                    }
59                }
60                Ok(TreeNodeRecursion::Continue)
61            })
62            // No error thrown
63            .unwrap();
64
65        let Some(pool) = pool else {
66            return plan_err!(
67                "ClickHouseFunctionNode: cannot execute without a connection pool, most likely a \
68                 ClickHouseTableProvider was never found in the plan."
69            );
70        };
71
72        Ok(Self { input, schema, pool, coerce_schema })
73    }
74
75    pub(crate) fn execute(&self) -> Result<Arc<dyn ExecutionPlan>> {
76        let sql = Unparser::new(&ClickHouseDialect).plan_to_sql(&self.input)?.to_string();
77        ClickHouseSqlExec::try_new(None, self.input.schema().inner(), Arc::clone(&self.pool), sql)
78            .map(|ex| ex.with_coercion(self.coerce_schema))
79            .map(|ex| Arc::new(ex) as Arc<dyn ExecutionPlan>)
80    }
81}
82
83impl UserDefinedLogicalNodeCore for ClickHouseFunctionNode {
84    fn name(&self) -> &str { CLICKHOUSE_FUNCTION_NODE_NAME }
85
86    fn inputs(&self) -> Vec<&LogicalPlan> {
87        // NOTE: This is disabled currently, see note in `with_exprs_and_inputs`.
88        //
89        // // Pass through to input so optimizations can be applied
90        // vec![&self.input]
91        vec![]
92    }
93
94    fn schema(&self) -> &DFSchemaRef { &self.schema }
95
96    // NOTE: This is disabled currently, see note in `with_exprs_and_inputs`.
97    fn expressions(&self) -> Vec<Expr> { vec![] }
98
99    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        write!(f, "ClickHouseFunctionNode")
101    }
102
103    // Pass through to input so optimizations can be applied
104    fn with_exprs_and_inputs(&self, exprs: Vec<Expr>, inputs: Vec<LogicalPlan>) -> Result<Self> {
105        if !exprs.is_empty() {
106            return plan_err!("ClickHouseFunctionNode expects no expressions");
107        }
108        if !inputs.is_empty() {
109            return plan_err!("ClickHouseFunctionNode expects no inputs");
110        }
111
112        Ok(self.clone())
113        // NOTE: The following logic has been commented out. Originally the idea was to allow
114        // optimization of the input logical plan by DataFusion's optimizers, and for almost all
115        // cases, this works fine.
116        //
117        // But, until the strange output seen when Unparsing in the case when the input to a
118        // `Projection` is a `TableScan` wrapped by another node, like `Limit` or `Sort`, then this
119        // must be disabled. In those aforementioned cases, a subquery will be produced that
120        // uses a wildcard but the aliases of the projection will NOT be updated, causing an
121        // error.
122        //
123        // if inputs.len() != 1 {
124        //     return plan_err!("ClickHouseFunctionNode expects exactly one input");
125        // }
126        // let input = inputs.remove(0);
127        // // If not coercing schema, check input schema
128        // if !self.coerce_schema {
129        //     self.schema
130        //         .has_equivalent_names_and_types(input.schema())
131        //         .map_err(|e| e.context("Invalid input schema for ClickHouseFunctionNode"))?;
132        // }
133        // Ok(Self {
134        //     input,
135        //     schema: Arc::clone(&self.schema),
136        //     pool: self.pool.clone(),
137        //     coerce_schema: self.coerce_schema,
138        // })
139    }
140
141    fn check_invariants(&self, _check: InvariantLevel, _plan: &LogicalPlan) -> Result<()> {
142        // No invariant checks needed for now
143        Ok(())
144    }
145
146    fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
147        // No special column requirements
148        None
149    }
150
151    /// `false` is the default behavior. It's included here specifically to note that pushing limits
152    /// down causes a bug in `Unparsing` where subquery aliases are wrapped in parentheses in a way
153    /// that causes an error in `ClickHouse`. But, since the query will run on `ClickHouse`,
154    /// optimization should be done there if possible.
155    fn supports_limit_pushdown(&self) -> bool { false }
156}
157
158// Implement required traits for LogicalPlan integration
159impl PartialEq for ClickHouseFunctionNode {
160    fn eq(&self, other: &Self) -> bool {
161        self.input == other.input
162            && self.name() == other.name()
163            && self.pool.join_push_down() == other.pool.join_push_down()
164    }
165}
166
167impl Eq for ClickHouseFunctionNode {}
168
169impl std::hash::Hash for ClickHouseFunctionNode {
170    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
171        self.name().hash(state);
172        self.input.hash(state);
173        self.pool.join_push_down().hash(state);
174    }
175}
176
177impl PartialOrd for ClickHouseFunctionNode {
178    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
179        self.input.partial_cmp(&other.input)
180    }
181}
182
183#[cfg(all(test, feature = "test-utils"))]
184mod tests {
185    use datafusion::arrow::datatypes::Schema;
186    use datafusion::datasource::empty::EmptyTable;
187    use datafusion::datasource::provider_as_source;
188    use datafusion::logical_expr::LogicalPlanBuilder;
189    use datafusion::sql::TableReference;
190
191    use super::*;
192
193    #[test]
194    fn test_plan_node_requires_provider() {
195        let provider = Arc::new(EmptyTable::new(Arc::new(Schema::empty())));
196        let err_plan = LogicalPlanBuilder::scan(
197            TableReference::bare("test"),
198            provider_as_source(provider),
199            None,
200        )
201        .unwrap()
202        .build()
203        .unwrap();
204        let result = ClickHouseFunctionNode::try_new(err_plan);
205        assert!(result.is_err(), "ClickHouseTableProvider required for a ClickHouseFunctionNode");
206    }
207
208    #[cfg(feature = "mocks")]
209    #[test]
210    fn test_verify_no_exprs_input() {
211        use datafusion::common::Column;
212
213        let pool = Arc::new(ClickHouseConnectionPool::new("pool".to_string(), ()));
214        let provider = Arc::new(ClickHouseTableProvider::new_with_schema_unchecked(
215            Arc::clone(&pool),
216            "table1".into(),
217            Arc::new(Schema::empty()),
218        ));
219        let plan = LogicalPlanBuilder::scan(
220            TableReference::bare("test"),
221            provider_as_source(provider),
222            None,
223        )
224        .unwrap()
225        .build()
226        .unwrap();
227
228        let node = ClickHouseFunctionNode::try_new(plan.clone()).unwrap();
229
230        let result =
231            node.with_exprs_and_inputs(vec![Expr::Column(Column::from_name("test_col"))], vec![]);
232        assert!(result.is_err(), "ClickHouseFunctionNode must take no exprs");
233
234        let result = node.with_exprs_and_inputs(vec![], vec![plan.clone()]);
235        assert!(result.is_err(), "ClickHouseFunctionNode must take no inputs");
236
237        assert!(
238            !node.supports_limit_pushdown(),
239            "ClickHouseFunctionNode does not support limit pushdown"
240        );
241
242        let other = ClickHouseFunctionNode::try_new(plan.clone()).unwrap();
243        assert_eq!(node, other, "PartialEq must be consistent");
244        assert_eq!(node.partial_cmp(&other), Some(std::cmp::Ordering::Equal), "Same node");
245    }
246}