Skip to main content

liquid_cache/cache/
liquid_expr.rs

1use arrow_schema::DataType;
2use datafusion_common::ScalarValue;
3use datafusion_expr_common::operator::Operator;
4use datafusion_physical_expr::expressions::{
5    BinaryExpr, CastColumnExpr, CastExpr, Column, DynamicFilterPhysicalExpr, LikeExpr, Literal,
6    TryCastExpr,
7};
8use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr};
9
10use crate::cache::CacheExpression;
11use crate::sync::Arc;
12use crate::utils::get_bytes_needle;
13
14/// A predicate expression validated for LiquidCache predicate evaluation.
15#[derive(Clone)]
16pub struct LiquidExpr {
17    expr: Arc<dyn PhysicalExpr>,
18}
19
20impl std::fmt::Debug for LiquidExpr {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        f.debug_struct("LiquidExpr")
23            .field("expr", &self.expr.to_string())
24            .finish()
25    }
26}
27
28impl LiquidExpr {
29    /// Validate and wrap a physical expression for LiquidCache predicate evaluation.
30    ///
31    /// Returns `None` when the expression shape or operator is unsupported for the
32    /// provided column type and expression hint.
33    pub fn try_new(
34        expr: Arc<dyn PhysicalExpr>,
35        data_type: &DataType,
36        expression_hint: Option<&CacheExpression>,
37    ) -> Option<Self> {
38        let normalized = unwrap_dynamic_filter(&expr)?;
39        if supports_expr(&normalized, data_type, expression_hint) {
40            Some(Self { expr: normalized })
41        } else {
42            None
43        }
44    }
45
46    /// Get the underlying validated physical expression.
47    pub fn physical_expr(&self) -> &Arc<dyn PhysicalExpr> {
48        &self.expr
49    }
50
51    #[cfg(test)]
52    pub(crate) fn new_unchecked(expr: Arc<dyn PhysicalExpr>) -> Self {
53        Self { expr }
54    }
55}
56
57fn unwrap_dynamic_filter(expr: &Arc<dyn PhysicalExpr>) -> Option<Arc<dyn PhysicalExpr>> {
58    if let Some(dynamic_filter) = expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() {
59        dynamic_filter.current().ok()
60    } else {
61        Some(expr.clone())
62    }
63}
64
65fn supports_expr(
66    expr: &Arc<dyn PhysicalExpr>,
67    data_type: &DataType,
68    expression_hint: Option<&CacheExpression>,
69) -> bool {
70    if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
71        return supports_binary_expr(binary, data_type, expression_hint);
72    }
73
74    if let Some(like_expr) = expr.as_any().downcast_ref::<LikeExpr>() {
75        return supports_like_expr(like_expr, data_type, expression_hint);
76    }
77
78    if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
79        return matches!(literal.value(), ScalarValue::Boolean(Some(_))) && is_byte_like(data_type);
80    }
81
82    false
83}
84
85fn supports_binary_expr(
86    binary: &BinaryExpr,
87    data_type: &DataType,
88    expression_hint: Option<&CacheExpression>,
89) -> bool {
90    let Some(literal) = binary.right().as_any().downcast_ref::<Literal>() else {
91        return false;
92    };
93    let op = binary.op();
94    if is_byte_like(data_type) {
95        if !is_column_like(binary.left()) {
96            return false;
97        }
98
99        match op {
100            Operator::Eq
101            | Operator::NotEq
102            | Operator::Lt
103            | Operator::LtEq
104            | Operator::Gt
105            | Operator::GtEq => get_bytes_needle(literal.value()).is_some(),
106            Operator::LikeMatch | Operator::NotLikeMatch => {
107                get_bytes_needle(literal.value()).is_some()
108                    && is_substring_hint_enabled(expression_hint)
109            }
110            _ => false,
111        }
112    } else if is_numeric_like(data_type) {
113        matches!(
114            op,
115            Operator::Eq
116                | Operator::NotEq
117                | Operator::Lt
118                | Operator::LtEq
119                | Operator::Gt
120                | Operator::GtEq
121        ) && (is_column_like(binary.left()) || is_to_timestamp_seconds_column(binary.left()))
122    } else {
123        false
124    }
125}
126
127fn supports_like_expr(
128    like_expr: &LikeExpr,
129    data_type: &DataType,
130    expression_hint: Option<&CacheExpression>,
131) -> bool {
132    if !is_byte_like(data_type) || like_expr.case_insensitive() {
133        return false;
134    }
135    if !is_column_like(like_expr.expr()) || !is_substring_hint_enabled(expression_hint) {
136        return false;
137    }
138    like_expr
139        .pattern()
140        .as_any()
141        .downcast_ref::<Literal>()
142        .and_then(|literal| get_bytes_needle(literal.value()))
143        .is_some()
144}
145
146fn is_substring_hint_enabled(expression_hint: Option<&CacheExpression>) -> bool {
147    matches!(expression_hint, Some(CacheExpression::SubstringSearch))
148}
149
150fn is_column_like(expr: &Arc<dyn PhysicalExpr>) -> bool {
151    if expr.as_any().downcast_ref::<Column>().is_some() {
152        return true;
153    }
154    if let Some(cast_expr) = expr.as_any().downcast_ref::<CastExpr>() {
155        return is_column_like(cast_expr.expr());
156    }
157    if let Some(cast_column_expr) = expr.as_any().downcast_ref::<CastColumnExpr>() {
158        return is_column_like(cast_column_expr.expr());
159    }
160    if let Some(try_cast_expr) = expr.as_any().downcast_ref::<TryCastExpr>() {
161        return is_column_like(try_cast_expr.expr());
162    }
163    false
164}
165
166fn is_to_timestamp_seconds_column(expr: &Arc<dyn PhysicalExpr>) -> bool {
167    if let Some(func) = expr.as_any().downcast_ref::<ScalarFunctionExpr>()
168        && func.name() == "to_timestamp_seconds"
169        && let [arg] = func.args()
170    {
171        return is_column_like(arg);
172    }
173    false
174}
175
176fn is_byte_like(data_type: &DataType) -> bool {
177    match data_type {
178        DataType::Utf8 | DataType::Utf8View | DataType::Binary | DataType::BinaryView => true,
179        DataType::Dictionary(_, value_type) => is_byte_like(value_type.as_ref()),
180        _ => false,
181    }
182}
183
184fn is_numeric_like(data_type: &DataType) -> bool {
185    matches!(
186        data_type,
187        DataType::Int8
188            | DataType::Int16
189            | DataType::Int32
190            | DataType::Int64
191            | DataType::UInt8
192            | DataType::UInt16
193            | DataType::UInt32
194            | DataType::UInt64
195            | DataType::Float32
196            | DataType::Float64
197            | DataType::Date32
198            | DataType::Date64
199            | DataType::Decimal128(_, _)
200            | DataType::Decimal256(_, _)
201    ) || matches!(data_type, DataType::Timestamp(_, None))
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207    use datafusion_common::ScalarValue;
208    use datafusion_physical_expr::expressions::{BinaryExpr, Column, LikeExpr};
209
210    #[test]
211    fn validates_byte_comparison_with_literal() {
212        let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
213            Arc::new(Column::new("c", 0)),
214            Operator::Eq,
215            Arc::new(Literal::new(ScalarValue::Utf8(Some("x".to_string())))),
216        ));
217        let liquid_expr = LiquidExpr::try_new(expr, &DataType::Utf8, None);
218        assert!(liquid_expr.is_some());
219    }
220
221    #[test]
222    fn rejects_byte_like_without_substring_hint() {
223        let expr: Arc<dyn PhysicalExpr> = Arc::new(LikeExpr::new(
224            false,
225            false,
226            Arc::new(Column::new("c", 0)),
227            Arc::new(Literal::new(ScalarValue::Utf8(Some("%abc%".to_string())))),
228        ));
229        let liquid_expr = LiquidExpr::try_new(expr, &DataType::Utf8, None);
230        assert!(liquid_expr.is_none());
231    }
232
233    #[test]
234    fn accepts_byte_like_with_substring_hint() {
235        let expr: Arc<dyn PhysicalExpr> = Arc::new(LikeExpr::new(
236            false,
237            false,
238            Arc::new(Column::new("c", 0)),
239            Arc::new(Literal::new(ScalarValue::Utf8(Some("%abc%".to_string())))),
240        ));
241        let liquid_expr = LiquidExpr::try_new(
242            expr,
243            &DataType::Utf8,
244            Some(&CacheExpression::SubstringSearch),
245        );
246        assert!(liquid_expr.is_some());
247    }
248
249    #[test]
250    fn validates_numeric_comparison() {
251        let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
252            Arc::new(Column::new("c", 0)),
253            Operator::Gt,
254            Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
255        ));
256        let liquid_expr = LiquidExpr::try_new(expr, &DataType::Int32, None);
257        assert!(liquid_expr.is_some());
258    }
259}