clickhouse_datafusion/udfs/
clickhouse.rs

1// TODO: Remove - better docs
2//! Custom `clickhouse` UDF implementation that allows intelligent "pushdown" to execute
3//! `ClickHouse` functions directly on the remote server.
4use datafusion::arrow::datatypes::{DataType, FieldRef};
5use datafusion::common::{internal_err, not_impl_err};
6use datafusion::error::Result;
7use datafusion::logical_expr::{
8    ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
9    Volatility,
10};
11
12pub const CLICKHOUSE_UDF_ALIASES: [&str; 4] =
13    ["clickhouse", "clickhouse_udf", "clickhouse_pushdown", "clickhouse_pushdown_udf"];
14
15pub fn clickhouse_udf() -> ScalarUDF { ScalarUDF::new_from_impl(ClickHouseUDF::new()) }
16
17// TODO: Remove - docs
18#[derive(Debug)]
19pub struct ClickHouseUDF {
20    signature: Signature,
21    aliases:   Vec<String>,
22}
23
24impl Default for ClickHouseUDF {
25    fn default() -> Self {
26        Self {
27            signature: Signature::any(Self::ARG_LEN, Volatility::Immutable),
28            aliases:   CLICKHOUSE_UDF_ALIASES.iter().map(ToString::to_string).collect(),
29        }
30    }
31}
32
33impl ClickHouseUDF {
34    pub const ARG_LEN: usize = 2;
35
36    pub fn new() -> Self { Self::default() }
37}
38
39impl ScalarUDFImpl for ClickHouseUDF {
40    fn as_any(&self) -> &dyn std::any::Any { self }
41
42    fn name(&self) -> &str { CLICKHOUSE_UDF_ALIASES[0] }
43
44    fn aliases(&self) -> &[String] { &self.aliases }
45
46    fn signature(&self) -> &Signature { &self.signature }
47
48    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
49        internal_err!("return_type_from_args used")
50    }
51
52    fn return_field_from_args(&self, args: ReturnFieldArgs<'_>) -> Result<FieldRef> {
53        super::extract_return_field_from_args(self.name(), &args)
54    }
55
56    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
57        not_impl_err!(
58            "ClickHouseUDF is for planning only, a syntax error may have occurred. Sometimes, \
59             ClickHouse functions need to be backticked"
60        )
61    }
62
63    /// Set to true to prevent optimizations. There is no way to know what the function will
64    /// produce, so these settings must be conservative.
65    fn short_circuits(&self) -> bool { true }
66}
67
68// Helper functions for identifying ClickHouse functions
69
70#[cfg(all(test, feature = "test-utils"))]
71mod tests {
72    use std::sync::Arc;
73
74    use datafusion::arrow::datatypes::{DataType, Field};
75    use datafusion::common::ScalarValue;
76    use datafusion::logical_expr::{
77        ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, TypeSignature,
78    };
79
80    use super::*;
81
82    #[test]
83    fn test_clickhouse_udf_pushdown_udf_creation() {
84        let udf = clickhouse_udf();
85        assert_eq!(udf.name(), CLICKHOUSE_UDF_ALIASES[0]);
86    }
87
88    #[test]
89    fn test_clickhouse_pushdown_udf_new() {
90        let udf = ClickHouseUDF::new();
91        assert_eq!(udf.name(), CLICKHOUSE_UDF_ALIASES[0]);
92        assert_eq!(udf.aliases(), &CLICKHOUSE_UDF_ALIASES);
93    }
94
95    #[test]
96    fn test_clickhouse_pushdown_udf_default() {
97        let udf = ClickHouseUDF::default();
98        assert_eq!(udf.name(), CLICKHOUSE_UDF_ALIASES[0]);
99    }
100
101    #[test]
102    fn test_clickhouse_pushdown_udf_constants() {
103        assert_eq!(ClickHouseUDF::ARG_LEN, 2);
104    }
105
106    #[test]
107    fn test_return_field_from_args_valid() {
108        let udf = ClickHouseUDF::new();
109        let field1 = Arc::new(Field::new("syntax", DataType::Utf8, false));
110        let field2 = Arc::new(Field::new("type", DataType::Utf8, false));
111        let scalar = [
112            Some(ScalarValue::Utf8(Some("count()".to_string()))),
113            Some(ScalarValue::Utf8(Some("Int64".to_string()))),
114        ];
115        let args = ReturnFieldArgs {
116            arg_fields:       &[field1, field2],
117            scalar_arguments: &[scalar[0].as_ref(), scalar[1].as_ref()],
118        };
119
120        let result = udf.return_field_from_args(args);
121        assert!(result.is_ok());
122        let field = result.unwrap();
123        assert_eq!(field.name(), CLICKHOUSE_UDF_ALIASES[0]);
124        assert_eq!(field.data_type(), &DataType::Int64);
125    }
126
127    #[test]
128    fn test_return_field_from_args_invalid_type() {
129        let udf = ClickHouseUDF::new();
130        let field1 = Arc::new(Field::new("syntax", DataType::Utf8, false));
131        let field2 = Arc::new(Field::new("type", DataType::Utf8, false));
132        let scalar = [
133            Some(ScalarValue::Utf8(Some("count()".to_string()))),
134            Some(ScalarValue::Utf8(Some("InvalidType".to_string()))),
135        ];
136        let args = ReturnFieldArgs {
137            arg_fields:       &[field1, field2],
138            scalar_arguments: &[scalar[0].as_ref(), scalar[1].as_ref()],
139        };
140
141        let result = udf.return_field_from_args(args);
142        assert!(result.is_err());
143        assert!(result.unwrap_err().to_string().contains("Invalid return type"));
144    }
145
146    #[test]
147    fn test_invoke_with_args_not_implemented() {
148        let udf = ClickHouseUDF::new();
149        let args = ScalarFunctionArgs {
150            args:         vec![],
151            arg_fields:   vec![],
152            number_rows:  1,
153            return_field: Arc::new(Field::new("", DataType::Int32, false)),
154        };
155        let result = udf.invoke_with_args(args);
156        assert!(result.is_err());
157        assert!(result.unwrap_err().to_string().contains("planning only"));
158    }
159
160    #[test]
161    fn test_as_any() {
162        let udf = ClickHouseUDF::new();
163        let any_ref = udf.as_any();
164        assert!(any_ref.downcast_ref::<ClickHouseUDF>().is_some());
165    }
166
167    #[test]
168    fn test_signature() {
169        let udf = ClickHouseUDF::new();
170        let signature = udf.signature();
171
172        assert!(matches!(signature.type_signature, TypeSignature::Any(ClickHouseUDF::ARG_LEN)));
173    }
174}