clickhouse_datafusion/udfs/
clickhouse.rs1use datafusion::arrow::datatypes::{DataType, FieldRef};
5use datafusion::common::{internal_err, not_impl_err};
6use datafusion::error::Result;
7use datafusion::logical_expr::{
8 ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
9 Volatility,
10};
11
12pub const CLICKHOUSE_UDF_ALIASES: [&str; 4] =
13 ["clickhouse", "clickhouse_udf", "clickhouse_pushdown", "clickhouse_pushdown_udf"];
14
15pub fn clickhouse_udf() -> ScalarUDF { ScalarUDF::new_from_impl(ClickHouseUDF::new()) }
16
17#[derive(Debug, PartialEq, Eq, Hash)]
19pub struct ClickHouseUDF {
20 signature: Signature,
21 aliases: Vec<String>,
22}
23
24impl Default for ClickHouseUDF {
25 fn default() -> Self {
26 Self {
27 signature: Signature::any(Self::ARG_LEN, Volatility::Immutable),
28 aliases: CLICKHOUSE_UDF_ALIASES.iter().map(ToString::to_string).collect(),
29 }
30 }
31}
32
33impl ClickHouseUDF {
34 pub const ARG_LEN: usize = 2;
35
36 pub fn new() -> Self { Self::default() }
37}
38
39impl ScalarUDFImpl for ClickHouseUDF {
40 fn as_any(&self) -> &dyn std::any::Any { self }
41
42 fn name(&self) -> &str { CLICKHOUSE_UDF_ALIASES[0] }
43
44 fn aliases(&self) -> &[String] { &self.aliases }
45
46 fn signature(&self) -> &Signature { &self.signature }
47
48 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
49 internal_err!("return_type_from_args used")
50 }
51
52 fn return_field_from_args(&self, args: ReturnFieldArgs<'_>) -> Result<FieldRef> {
53 super::extract_return_field_from_args(self.name(), &args)
54 }
55
56 fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
57 not_impl_err!(
58 "ClickHouseUDF is for planning only, a syntax error may have occurred. Sometimes, \
59 ClickHouse functions need to be backticked"
60 )
61 }
62
63 fn short_circuits(&self) -> bool { true }
66}
67
68#[cfg(all(test, feature = "test-utils"))]
71mod tests {
72 use std::sync::Arc;
73
74 use datafusion::arrow::datatypes::{DataType, Field};
75 use datafusion::common::ScalarValue;
76 use datafusion::config::ConfigOptions;
77 use datafusion::logical_expr::{
78 ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, TypeSignature,
79 };
80
81 use super::*;
82
83 #[test]
84 fn test_clickhouse_udf_pushdown_udf_creation() {
85 let udf = clickhouse_udf();
86 assert_eq!(udf.name(), CLICKHOUSE_UDF_ALIASES[0]);
87 }
88
89 #[test]
90 fn test_clickhouse_pushdown_udf_new() {
91 let udf = ClickHouseUDF::new();
92 assert_eq!(udf.name(), CLICKHOUSE_UDF_ALIASES[0]);
93 assert_eq!(udf.aliases(), &CLICKHOUSE_UDF_ALIASES);
94 }
95
96 #[test]
97 fn test_clickhouse_pushdown_udf_default() {
98 let udf = ClickHouseUDF::default();
99 assert_eq!(udf.name(), CLICKHOUSE_UDF_ALIASES[0]);
100 }
101
102 #[test]
103 fn test_clickhouse_pushdown_udf_constants() {
104 assert_eq!(ClickHouseUDF::ARG_LEN, 2);
105 }
106
107 #[test]
108 fn test_return_field_from_args_valid() {
109 let udf = ClickHouseUDF::new();
110 let field1 = Arc::new(Field::new("syntax", DataType::Utf8, false));
111 let field2 = Arc::new(Field::new("type", DataType::Utf8, false));
112 let scalar = [
113 Some(ScalarValue::Utf8(Some("count()".to_string()))),
114 Some(ScalarValue::Utf8(Some("Int64".to_string()))),
115 ];
116 let args = ReturnFieldArgs {
117 arg_fields: &[field1, field2],
118 scalar_arguments: &[scalar[0].as_ref(), scalar[1].as_ref()],
119 };
120
121 let result = udf.return_field_from_args(args);
122 assert!(result.is_ok());
123 let field = result.unwrap();
124 assert_eq!(field.name(), CLICKHOUSE_UDF_ALIASES[0]);
125 assert_eq!(field.data_type(), &DataType::Int64);
126 }
127
128 #[test]
129 fn test_return_field_from_args_invalid_type() {
130 let udf = ClickHouseUDF::new();
131 let field1 = Arc::new(Field::new("syntax", DataType::Utf8, false));
132 let field2 = Arc::new(Field::new("type", DataType::Utf8, false));
133 let scalar = [
134 Some(ScalarValue::Utf8(Some("count()".to_string()))),
135 Some(ScalarValue::Utf8(Some("InvalidType".to_string()))),
136 ];
137 let args = ReturnFieldArgs {
138 arg_fields: &[field1, field2],
139 scalar_arguments: &[scalar[0].as_ref(), scalar[1].as_ref()],
140 };
141
142 let result = udf.return_field_from_args(args);
143 assert!(result.is_err());
144 assert!(result.unwrap_err().to_string().contains("Invalid return type"));
145 }
146
147 #[test]
148 fn test_invoke_with_args_not_implemented() {
149 let udf = ClickHouseUDF::new();
150 let args = ScalarFunctionArgs {
151 args: vec![],
152 arg_fields: vec![],
153 number_rows: 1,
154 return_field: Arc::new(Field::new("", DataType::Int32, false)),
155 config_options: Arc::new(ConfigOptions::default()),
156 };
157 let result = udf.invoke_with_args(args);
158 assert!(result.is_err());
159 assert!(result.unwrap_err().to_string().contains("planning only"));
160 }
161
162 #[test]
163 fn test_as_any() {
164 let udf = ClickHouseUDF::new();
165 let any_ref = udf.as_any();
166 assert!(any_ref.downcast_ref::<ClickHouseUDF>().is_some());
167 }
168
169 #[test]
170 fn test_signature() {
171 let udf = ClickHouseUDF::new();
172 let signature = udf.signature();
173
174 assert!(matches!(signature.type_signature, TypeSignature::Any(ClickHouseUDF::ARG_LEN)));
175 }
176}