liquid_cache/cache/
liquid_expr.rs1use arrow_schema::DataType;
2use datafusion_common::ScalarValue;
3use datafusion_expr_common::operator::Operator;
4use datafusion_physical_expr::expressions::{
5 BinaryExpr, CastColumnExpr, CastExpr, Column, DynamicFilterPhysicalExpr, LikeExpr, Literal,
6 TryCastExpr,
7};
8use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr};
9
10use crate::cache::CacheExpression;
11use crate::sync::Arc;
12use crate::utils::get_bytes_needle;
13
14#[derive(Clone)]
16pub struct LiquidExpr {
17 expr: Arc<dyn PhysicalExpr>,
18}
19
20impl std::fmt::Debug for LiquidExpr {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 f.debug_struct("LiquidExpr")
23 .field("expr", &self.expr.to_string())
24 .finish()
25 }
26}
27
28impl LiquidExpr {
29 pub fn try_new(
34 expr: Arc<dyn PhysicalExpr>,
35 data_type: &DataType,
36 expression_hint: Option<&CacheExpression>,
37 ) -> Option<Self> {
38 let normalized = unwrap_dynamic_filter(&expr)?;
39 if supports_expr(&normalized, data_type, expression_hint) {
40 Some(Self { expr: normalized })
41 } else {
42 None
43 }
44 }
45
46 pub fn physical_expr(&self) -> &Arc<dyn PhysicalExpr> {
48 &self.expr
49 }
50
51 #[cfg(test)]
52 pub(crate) fn new_unchecked(expr: Arc<dyn PhysicalExpr>) -> Self {
53 Self { expr }
54 }
55}
56
57fn unwrap_dynamic_filter(expr: &Arc<dyn PhysicalExpr>) -> Option<Arc<dyn PhysicalExpr>> {
58 if let Some(dynamic_filter) = expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() {
59 dynamic_filter.current().ok()
60 } else {
61 Some(expr.clone())
62 }
63}
64
65fn supports_expr(
66 expr: &Arc<dyn PhysicalExpr>,
67 data_type: &DataType,
68 expression_hint: Option<&CacheExpression>,
69) -> bool {
70 if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
71 return supports_binary_expr(binary, data_type, expression_hint);
72 }
73
74 if let Some(like_expr) = expr.as_any().downcast_ref::<LikeExpr>() {
75 return supports_like_expr(like_expr, data_type, expression_hint);
76 }
77
78 if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
79 return matches!(literal.value(), ScalarValue::Boolean(Some(_))) && is_byte_like(data_type);
80 }
81
82 false
83}
84
85fn supports_binary_expr(
86 binary: &BinaryExpr,
87 data_type: &DataType,
88 expression_hint: Option<&CacheExpression>,
89) -> bool {
90 let Some(literal) = binary.right().as_any().downcast_ref::<Literal>() else {
91 return false;
92 };
93 let op = binary.op();
94 if is_byte_like(data_type) {
95 if !is_column_like(binary.left()) {
96 return false;
97 }
98
99 match op {
100 Operator::Eq
101 | Operator::NotEq
102 | Operator::Lt
103 | Operator::LtEq
104 | Operator::Gt
105 | Operator::GtEq => get_bytes_needle(literal.value()).is_some(),
106 Operator::LikeMatch | Operator::NotLikeMatch => {
107 get_bytes_needle(literal.value()).is_some()
108 && is_substring_hint_enabled(expression_hint)
109 }
110 _ => false,
111 }
112 } else if is_numeric_like(data_type) {
113 matches!(
114 op,
115 Operator::Eq
116 | Operator::NotEq
117 | Operator::Lt
118 | Operator::LtEq
119 | Operator::Gt
120 | Operator::GtEq
121 ) && (is_column_like(binary.left()) || is_to_timestamp_seconds_column(binary.left()))
122 } else {
123 false
124 }
125}
126
127fn supports_like_expr(
128 like_expr: &LikeExpr,
129 data_type: &DataType,
130 expression_hint: Option<&CacheExpression>,
131) -> bool {
132 if !is_byte_like(data_type) || like_expr.case_insensitive() {
133 return false;
134 }
135 if !is_column_like(like_expr.expr()) || !is_substring_hint_enabled(expression_hint) {
136 return false;
137 }
138 like_expr
139 .pattern()
140 .as_any()
141 .downcast_ref::<Literal>()
142 .and_then(|literal| get_bytes_needle(literal.value()))
143 .is_some()
144}
145
146fn is_substring_hint_enabled(expression_hint: Option<&CacheExpression>) -> bool {
147 matches!(expression_hint, Some(CacheExpression::SubstringSearch))
148}
149
150fn is_column_like(expr: &Arc<dyn PhysicalExpr>) -> bool {
151 if expr.as_any().downcast_ref::<Column>().is_some() {
152 return true;
153 }
154 if let Some(cast_expr) = expr.as_any().downcast_ref::<CastExpr>() {
155 return is_column_like(cast_expr.expr());
156 }
157 if let Some(cast_column_expr) = expr.as_any().downcast_ref::<CastColumnExpr>() {
158 return is_column_like(cast_column_expr.expr());
159 }
160 if let Some(try_cast_expr) = expr.as_any().downcast_ref::<TryCastExpr>() {
161 return is_column_like(try_cast_expr.expr());
162 }
163 false
164}
165
166fn is_to_timestamp_seconds_column(expr: &Arc<dyn PhysicalExpr>) -> bool {
167 if let Some(func) = expr.as_any().downcast_ref::<ScalarFunctionExpr>()
168 && func.name() == "to_timestamp_seconds"
169 && let [arg] = func.args()
170 {
171 return is_column_like(arg);
172 }
173 false
174}
175
176fn is_byte_like(data_type: &DataType) -> bool {
177 match data_type {
178 DataType::Utf8 | DataType::Utf8View | DataType::Binary | DataType::BinaryView => true,
179 DataType::Dictionary(_, value_type) => is_byte_like(value_type.as_ref()),
180 _ => false,
181 }
182}
183
184fn is_numeric_like(data_type: &DataType) -> bool {
185 matches!(
186 data_type,
187 DataType::Int8
188 | DataType::Int16
189 | DataType::Int32
190 | DataType::Int64
191 | DataType::UInt8
192 | DataType::UInt16
193 | DataType::UInt32
194 | DataType::UInt64
195 | DataType::Float32
196 | DataType::Float64
197 | DataType::Date32
198 | DataType::Date64
199 | DataType::Decimal128(_, _)
200 | DataType::Decimal256(_, _)
201 ) || matches!(data_type, DataType::Timestamp(_, None))
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use datafusion_common::ScalarValue;
208 use datafusion_physical_expr::expressions::{BinaryExpr, Column, LikeExpr};
209
210 #[test]
211 fn validates_byte_comparison_with_literal() {
212 let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
213 Arc::new(Column::new("c", 0)),
214 Operator::Eq,
215 Arc::new(Literal::new(ScalarValue::Utf8(Some("x".to_string())))),
216 ));
217 let liquid_expr = LiquidExpr::try_new(expr, &DataType::Utf8, None);
218 assert!(liquid_expr.is_some());
219 }
220
221 #[test]
222 fn rejects_byte_like_without_substring_hint() {
223 let expr: Arc<dyn PhysicalExpr> = Arc::new(LikeExpr::new(
224 false,
225 false,
226 Arc::new(Column::new("c", 0)),
227 Arc::new(Literal::new(ScalarValue::Utf8(Some("%abc%".to_string())))),
228 ));
229 let liquid_expr = LiquidExpr::try_new(expr, &DataType::Utf8, None);
230 assert!(liquid_expr.is_none());
231 }
232
233 #[test]
234 fn accepts_byte_like_with_substring_hint() {
235 let expr: Arc<dyn PhysicalExpr> = Arc::new(LikeExpr::new(
236 false,
237 false,
238 Arc::new(Column::new("c", 0)),
239 Arc::new(Literal::new(ScalarValue::Utf8(Some("%abc%".to_string())))),
240 ));
241 let liquid_expr = LiquidExpr::try_new(
242 expr,
243 &DataType::Utf8,
244 Some(&CacheExpression::SubstringSearch),
245 );
246 assert!(liquid_expr.is_some());
247 }
248
249 #[test]
250 fn validates_numeric_comparison() {
251 let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
252 Arc::new(Column::new("c", 0)),
253 Operator::Gt,
254 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
255 ));
256 let liquid_expr = LiquidExpr::try_new(expr, &DataType::Int32, None);
257 assert!(liquid_expr.is_some());
258 }
259}