clickhouse_datafusion/udfs/
clickhouse.rs

1// TODO: Remove - better docs
2//! Custom `clickhouse` UDF implementation that allows intelligent "pushdown" to execute
3//! `ClickHouse` functions directly on the remote server.
4use datafusion::arrow::datatypes::{DataType, FieldRef};
5use datafusion::common::{internal_err, not_impl_err};
6use datafusion::error::Result;
7use datafusion::logical_expr::{
8    ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
9    Volatility,
10};
11
12pub const CLICKHOUSE_UDF_ALIASES: [&str; 4] =
13    ["clickhouse", "clickhouse_udf", "clickhouse_pushdown", "clickhouse_pushdown_udf"];
14
15pub fn clickhouse_udf() -> ScalarUDF { ScalarUDF::new_from_impl(ClickHouseUDF::new()) }
16
17// TODO: Remove - docs
18#[derive(Debug, PartialEq, Eq, Hash)]
19pub struct ClickHouseUDF {
20    signature: Signature,
21    aliases:   Vec<String>,
22}
23
24impl Default for ClickHouseUDF {
25    fn default() -> Self {
26        Self {
27            signature: Signature::any(Self::ARG_LEN, Volatility::Immutable),
28            aliases:   CLICKHOUSE_UDF_ALIASES.iter().map(ToString::to_string).collect(),
29        }
30    }
31}
32
33impl ClickHouseUDF {
34    pub const ARG_LEN: usize = 2;
35
36    pub fn new() -> Self { Self::default() }
37}
38
39impl ScalarUDFImpl for ClickHouseUDF {
40    fn as_any(&self) -> &dyn std::any::Any { self }
41
42    fn name(&self) -> &str { CLICKHOUSE_UDF_ALIASES[0] }
43
44    fn aliases(&self) -> &[String] { &self.aliases }
45
46    fn signature(&self) -> &Signature { &self.signature }
47
48    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
49        internal_err!("return_type_from_args used")
50    }
51
52    fn return_field_from_args(&self, args: ReturnFieldArgs<'_>) -> Result<FieldRef> {
53        super::extract_return_field_from_args(self.name(), &args)
54    }
55
56    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
57        not_impl_err!(
58            "ClickHouseUDF is for planning only, a syntax error may have occurred. Sometimes, \
59             ClickHouse functions need to be backticked"
60        )
61    }
62
63    /// Set to true to prevent optimizations. There is no way to know what the function will
64    /// produce, so these settings must be conservative.
65    fn short_circuits(&self) -> bool { true }
66}
67
68// Helper functions for identifying ClickHouse functions
69
70#[cfg(all(test, feature = "test-utils"))]
71mod tests {
72    use std::sync::Arc;
73
74    use datafusion::arrow::datatypes::{DataType, Field};
75    use datafusion::common::ScalarValue;
76    use datafusion::config::ConfigOptions;
77    use datafusion::logical_expr::{
78        ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, TypeSignature,
79    };
80
81    use super::*;
82
83    #[test]
84    fn test_clickhouse_udf_pushdown_udf_creation() {
85        let udf = clickhouse_udf();
86        assert_eq!(udf.name(), CLICKHOUSE_UDF_ALIASES[0]);
87    }
88
89    #[test]
90    fn test_clickhouse_pushdown_udf_new() {
91        let udf = ClickHouseUDF::new();
92        assert_eq!(udf.name(), CLICKHOUSE_UDF_ALIASES[0]);
93        assert_eq!(udf.aliases(), &CLICKHOUSE_UDF_ALIASES);
94    }
95
96    #[test]
97    fn test_clickhouse_pushdown_udf_default() {
98        let udf = ClickHouseUDF::default();
99        assert_eq!(udf.name(), CLICKHOUSE_UDF_ALIASES[0]);
100    }
101
102    #[test]
103    fn test_clickhouse_pushdown_udf_constants() {
104        assert_eq!(ClickHouseUDF::ARG_LEN, 2);
105    }
106
107    #[test]
108    fn test_return_field_from_args_valid() {
109        let udf = ClickHouseUDF::new();
110        let field1 = Arc::new(Field::new("syntax", DataType::Utf8, false));
111        let field2 = Arc::new(Field::new("type", DataType::Utf8, false));
112        let scalar = [
113            Some(ScalarValue::Utf8(Some("count()".to_string()))),
114            Some(ScalarValue::Utf8(Some("Int64".to_string()))),
115        ];
116        let args = ReturnFieldArgs {
117            arg_fields:       &[field1, field2],
118            scalar_arguments: &[scalar[0].as_ref(), scalar[1].as_ref()],
119        };
120
121        let result = udf.return_field_from_args(args);
122        assert!(result.is_ok());
123        let field = result.unwrap();
124        assert_eq!(field.name(), CLICKHOUSE_UDF_ALIASES[0]);
125        assert_eq!(field.data_type(), &DataType::Int64);
126    }
127
128    #[test]
129    fn test_return_field_from_args_invalid_type() {
130        let udf = ClickHouseUDF::new();
131        let field1 = Arc::new(Field::new("syntax", DataType::Utf8, false));
132        let field2 = Arc::new(Field::new("type", DataType::Utf8, false));
133        let scalar = [
134            Some(ScalarValue::Utf8(Some("count()".to_string()))),
135            Some(ScalarValue::Utf8(Some("InvalidType".to_string()))),
136        ];
137        let args = ReturnFieldArgs {
138            arg_fields:       &[field1, field2],
139            scalar_arguments: &[scalar[0].as_ref(), scalar[1].as_ref()],
140        };
141
142        let result = udf.return_field_from_args(args);
143        assert!(result.is_err());
144        assert!(result.unwrap_err().to_string().contains("Invalid return type"));
145    }
146
147    #[test]
148    fn test_invoke_with_args_not_implemented() {
149        let udf = ClickHouseUDF::new();
150        let args = ScalarFunctionArgs {
151            args:           vec![],
152            arg_fields:     vec![],
153            number_rows:    1,
154            return_field:   Arc::new(Field::new("", DataType::Int32, false)),
155            config_options: Arc::new(ConfigOptions::default()),
156        };
157        let result = udf.invoke_with_args(args);
158        assert!(result.is_err());
159        assert!(result.unwrap_err().to_string().contains("planning only"));
160    }
161
162    #[test]
163    fn test_as_any() {
164        let udf = ClickHouseUDF::new();
165        let any_ref = udf.as_any();
166        assert!(any_ref.downcast_ref::<ClickHouseUDF>().is_some());
167    }
168
169    #[test]
170    fn test_signature() {
171        let udf = ClickHouseUDF::new();
172        let signature = udf.signature();
173
174        assert!(matches!(signature.type_signature, TypeSignature::Any(ClickHouseUDF::ARG_LEN)));
175    }
176}