clickhouse_datafusion/udfs/
clickhouse.rs1use datafusion::arrow::datatypes::{DataType, FieldRef};
5use datafusion::common::{internal_err, not_impl_err};
6use datafusion::error::Result;
7use datafusion::logical_expr::{
8 ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
9 Volatility,
10};
11
12pub const CLICKHOUSE_UDF_ALIASES: [&str; 4] =
13 ["clickhouse", "clickhouse_udf", "clickhouse_pushdown", "clickhouse_pushdown_udf"];
14
15pub fn clickhouse_udf() -> ScalarUDF { ScalarUDF::new_from_impl(ClickHouseUDF::new()) }
16
17#[derive(Debug)]
19pub struct ClickHouseUDF {
20 signature: Signature,
21 aliases: Vec<String>,
22}
23
24impl Default for ClickHouseUDF {
25 fn default() -> Self {
26 Self {
27 signature: Signature::any(Self::ARG_LEN, Volatility::Immutable),
28 aliases: CLICKHOUSE_UDF_ALIASES.iter().map(ToString::to_string).collect(),
29 }
30 }
31}
32
33impl ClickHouseUDF {
34 pub const ARG_LEN: usize = 2;
35
36 pub fn new() -> Self { Self::default() }
37}
38
39impl ScalarUDFImpl for ClickHouseUDF {
40 fn as_any(&self) -> &dyn std::any::Any { self }
41
42 fn name(&self) -> &str { CLICKHOUSE_UDF_ALIASES[0] }
43
44 fn aliases(&self) -> &[String] { &self.aliases }
45
46 fn signature(&self) -> &Signature { &self.signature }
47
48 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
49 internal_err!("return_type_from_args used")
50 }
51
52 fn return_field_from_args(&self, args: ReturnFieldArgs<'_>) -> Result<FieldRef> {
53 super::extract_return_field_from_args(self.name(), &args)
54 }
55
56 fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
57 not_impl_err!(
58 "ClickHouseUDF is for planning only, a syntax error may have occurred. Sometimes, \
59 ClickHouse functions need to be backticked"
60 )
61 }
62
63 fn short_circuits(&self) -> bool { true }
66}
67
68#[cfg(all(test, feature = "test-utils"))]
71mod tests {
72 use std::sync::Arc;
73
74 use datafusion::arrow::datatypes::{DataType, Field};
75 use datafusion::common::ScalarValue;
76 use datafusion::logical_expr::{
77 ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, TypeSignature,
78 };
79
80 use super::*;
81
82 #[test]
83 fn test_clickhouse_udf_pushdown_udf_creation() {
84 let udf = clickhouse_udf();
85 assert_eq!(udf.name(), CLICKHOUSE_UDF_ALIASES[0]);
86 }
87
88 #[test]
89 fn test_clickhouse_pushdown_udf_new() {
90 let udf = ClickHouseUDF::new();
91 assert_eq!(udf.name(), CLICKHOUSE_UDF_ALIASES[0]);
92 assert_eq!(udf.aliases(), &CLICKHOUSE_UDF_ALIASES);
93 }
94
95 #[test]
96 fn test_clickhouse_pushdown_udf_default() {
97 let udf = ClickHouseUDF::default();
98 assert_eq!(udf.name(), CLICKHOUSE_UDF_ALIASES[0]);
99 }
100
101 #[test]
102 fn test_clickhouse_pushdown_udf_constants() {
103 assert_eq!(ClickHouseUDF::ARG_LEN, 2);
104 }
105
106 #[test]
107 fn test_return_field_from_args_valid() {
108 let udf = ClickHouseUDF::new();
109 let field1 = Arc::new(Field::new("syntax", DataType::Utf8, false));
110 let field2 = Arc::new(Field::new("type", DataType::Utf8, false));
111 let scalar = [
112 Some(ScalarValue::Utf8(Some("count()".to_string()))),
113 Some(ScalarValue::Utf8(Some("Int64".to_string()))),
114 ];
115 let args = ReturnFieldArgs {
116 arg_fields: &[field1, field2],
117 scalar_arguments: &[scalar[0].as_ref(), scalar[1].as_ref()],
118 };
119
120 let result = udf.return_field_from_args(args);
121 assert!(result.is_ok());
122 let field = result.unwrap();
123 assert_eq!(field.name(), CLICKHOUSE_UDF_ALIASES[0]);
124 assert_eq!(field.data_type(), &DataType::Int64);
125 }
126
127 #[test]
128 fn test_return_field_from_args_invalid_type() {
129 let udf = ClickHouseUDF::new();
130 let field1 = Arc::new(Field::new("syntax", DataType::Utf8, false));
131 let field2 = Arc::new(Field::new("type", DataType::Utf8, false));
132 let scalar = [
133 Some(ScalarValue::Utf8(Some("count()".to_string()))),
134 Some(ScalarValue::Utf8(Some("InvalidType".to_string()))),
135 ];
136 let args = ReturnFieldArgs {
137 arg_fields: &[field1, field2],
138 scalar_arguments: &[scalar[0].as_ref(), scalar[1].as_ref()],
139 };
140
141 let result = udf.return_field_from_args(args);
142 assert!(result.is_err());
143 assert!(result.unwrap_err().to_string().contains("Invalid return type"));
144 }
145
146 #[test]
147 fn test_invoke_with_args_not_implemented() {
148 let udf = ClickHouseUDF::new();
149 let args = ScalarFunctionArgs {
150 args: vec![],
151 arg_fields: vec![],
152 number_rows: 1,
153 return_field: Arc::new(Field::new("", DataType::Int32, false)),
154 };
155 let result = udf.invoke_with_args(args);
156 assert!(result.is_err());
157 assert!(result.unwrap_err().to_string().contains("planning only"));
158 }
159
160 #[test]
161 fn test_as_any() {
162 let udf = ClickHouseUDF::new();
163 let any_ref = udf.as_any();
164 assert!(any_ref.downcast_ref::<ClickHouseUDF>().is_some());
165 }
166
167 #[test]
168 fn test_signature() {
169 let udf = ClickHouseUDF::new();
170 let signature = udf.signature();
171
172 assert!(matches!(signature.type_signature, TypeSignature::Any(ClickHouseUDF::ARG_LEN)));
173 }
174}