Skip to main content

uni_query/query/
pushdown.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Predicate pushdown and index-aware query routing.
5//!
6//! Routes WHERE predicates to the most selective execution path:
7//! UID index lookup → BTree prefix scan → JSON FTS → Lance columnar filter → residual.
8//! Includes SQL injection prevention for LIKE patterns (CWE-89) and UID validation (CWE-345).
9
10use std::collections::{HashMap, HashSet};
11use uni_cypher::ast::{BinaryOp, CypherLiteral, Expr, UnaryOp};
12
13use uni_common::core::id::UniId;
14use uni_common::core::schema::{
15    IndexDefinition, IndexStatus, PropertyMeta, ScalarIndexType, Schema,
16};
17
18/// Categorized pushdown strategy for predicates with index awareness.
19///
20/// This struct represents the optimal execution path for predicates,
21/// routing them to the most selective index when available.
22#[derive(Debug, Clone, Default)]
23pub struct PushdownStrategy {
24    /// UID lookup predicate: WHERE n._uid = 'base32string'
25    /// Contains the UniId parsed from the predicate value.
26    pub uid_lookup: Option<UniId>,
27
28    /// BTree index prefix scans for STARTS WITH predicates.
29    /// When a property has a scalar BTree index, STARTS WITH 'prefix' can be
30    /// converted to a range scan: column >= 'prefix' AND column < 'prefix_next'.
31    /// Vec of: (column_name, lower_bound, upper_bound)
32    pub btree_prefix_scans: Vec<(String, String, String)>,
33
34    /// JSON FTS predicates for full-text search on JSON columns.
35    /// Vec of: (column_name, search_term, optional_path_filter)
36    pub json_fts_predicates: Vec<(String, String, Option<String>)>,
37
38    /// Predicates pushable to Lance scan filter.
39    pub lance_predicates: Vec<Expr>,
40
41    /// Residual predicates (not pushable to storage).
42    pub residual: Vec<Expr>,
43}
44
45/// Analyzer that considers available indexes when categorizing predicates.
46///
47/// Unlike `PredicateAnalyzer` which only categorizes into pushable/residual,
48/// this analyzer routes predicates to the most optimal execution path:
49/// 1. UID index lookup (most selective, O(1) lookup)
50/// 2. BTree prefix scan (STARTS WITH on scalar-indexed properties)
51/// 3. JSON FTS lookup (BM25 full-text search)
52/// 4. Lance scan filter (columnar scan with filter)
53/// 5. Residual (post-scan evaluation)
54// M-PUBLIC-DEBUG: Schema implements Debug, so the derived impl is sound.
55#[derive(Debug)]
56pub struct IndexAwareAnalyzer<'a> {
57    schema: &'a Schema,
58}
59
60impl<'a> IndexAwareAnalyzer<'a> {
61    /// Create an analyzer bound to the given schema for index-aware predicate routing.
62    pub fn new(schema: &'a Schema) -> Self {
63        Self { schema }
64    }
65
66    /// Analyze predicates and determine optimal pushdown strategy.
67    ///
68    /// Predicates are categorized in order of selectivity:
69    /// 1. `_uid = 'xxx'` -> UID index lookup
70    /// 2. BTree prefix scans for STARTS WITH predicates
71    /// 3. Pushable to Lance -> Lance filter
72    /// 4. Everything else -> Residual
73    pub fn analyze(&self, predicate: &Expr, variable: &str, label_id: u16) -> PushdownStrategy {
74        let mut strategy = PushdownStrategy::default();
75        let conjuncts = Self::split_conjuncts(predicate);
76        let lance_analyzer = PredicateAnalyzer::new();
77
78        for conj in conjuncts {
79            // 1. Check for _uid = 'xxx' pattern (most selective)
80            if let Some(uid) = self.extract_uid_predicate(&conj, variable) {
81                strategy.uid_lookup = Some(uid);
82                continue;
83            }
84
85            // 2. Check for BTree-indexed STARTS WITH predicates
86            if let Some((column, lower, upper)) =
87                self.extract_btree_prefix_scan(&conj, variable, label_id)
88            {
89                strategy.btree_prefix_scans.push((column, lower, upper));
90                continue;
91            }
92
93            // 3. Check for JSON FTS predicates (CONTAINS on FTS-indexed columns)
94            if let Some((column, term, path)) =
95                self.extract_json_fts_predicate(&conj, variable, label_id)
96            {
97                strategy.json_fts_predicates.push((column, term, path));
98                continue;
99            }
100
101            // 4. Check if pushable to Lance
102            if lance_analyzer.is_pushable(&conj, variable) {
103                strategy.lance_predicates.push(conj);
104            } else {
105                strategy.residual.push(conj);
106            }
107        }
108
109        strategy
110    }
111
112    /// Extract UniId from `_uid = 'xxx'` predicate.
113    ///
114    /// # Security
115    ///
116    /// **CWE-345 (Insufficient Verification)**: The UID value is validated using
117    /// `UniId::from_multibase()` which enforces Base32Lower encoding and 32-byte
118    /// length. Invalid UIDs are rejected and the predicate becomes residual.
119    fn extract_uid_predicate(&self, expr: &Expr, variable: &str) -> Option<UniId> {
120        if let Expr::BinaryOp {
121            left,
122            op: BinaryOp::Eq,
123            right,
124        } = expr
125            && let Expr::Property(var_expr, prop) = left.as_ref()
126            && let Expr::Variable(v) = var_expr.as_ref()
127            && v == variable
128            && prop == "_uid"
129            && let Expr::Literal(CypherLiteral::String(s)) = right.as_ref()
130        {
131            // Security: UniId::from_multibase validates Base32Lower and 32-byte length
132            return UniId::from_multibase(s).ok();
133        }
134        None
135    }
136
137    /// Extract BTree prefix scan for STARTS WITH predicates on scalar-indexed properties.
138    ///
139    /// Returns `Some((column, lower_bound, upper_bound))` if:
140    /// - The predicate is `variable.property STARTS WITH 'prefix'`
141    /// - The property has a scalar BTree index
142    /// - The prefix is non-empty (empty prefix matches all, not worth optimizing)
143    ///
144    /// Converts `column STARTS WITH 'John'` to:
145    /// `column >= 'John' AND column < 'Joho'`
146    fn extract_btree_prefix_scan(
147        &self,
148        expr: &Expr,
149        variable: &str,
150        label_id: u16,
151    ) -> Option<(String, String, String)> {
152        if let Expr::BinaryOp {
153            left,
154            op: BinaryOp::StartsWith,
155            right,
156        } = expr
157            && let Expr::Property(var_expr, prop) = left.as_ref()
158            && let Expr::Variable(v) = var_expr.as_ref()
159            && v == variable
160            && let Expr::Literal(CypherLiteral::String(prefix)) = right.as_ref()
161        {
162            // Skip empty prefix (matches all, no optimization benefit)
163            if prefix.is_empty() {
164                return None;
165            }
166
167            // Check if property has a scalar BTree index
168            let label_name = self.schema.label_name_by_id(label_id)?;
169
170            for idx in &self.schema.indexes {
171                if let IndexDefinition::Scalar(cfg) = idx
172                    && cfg.label == *label_name
173                    && cfg.properties.contains(prop)
174                    && cfg.index_type == ScalarIndexType::BTree
175                    && cfg.metadata.status == IndexStatus::Online
176                {
177                    // Calculate the upper bound by incrementing the last character
178                    // For "John" -> "Joho"
179                    // This works for ASCII and most UTF-8 strings
180                    if let Some(upper) = increment_last_char(prefix) {
181                        return Some((prop.clone(), prefix.clone(), upper));
182                    }
183                }
184            }
185        }
186        None
187    }
188
189    /// Extract JSON FTS predicate from CONTAINS on an FTS-indexed column.
190    ///
191    /// Returns `Some((column, search_term, optional_path))` if:
192    /// - The predicate is `variable.column CONTAINS 'term'`
193    /// - The column has a `JsonFullText` index
194    fn extract_json_fts_predicate(
195        &self,
196        expr: &Expr,
197        variable: &str,
198        label_id: u16,
199    ) -> Option<(String, String, Option<String>)> {
200        if let Expr::BinaryOp {
201            left,
202            op: BinaryOp::Contains,
203            right,
204        } = expr
205            && let Expr::Property(var_expr, prop) = left.as_ref()
206            && let Expr::Variable(v) = var_expr.as_ref()
207            && v == variable
208            && let Expr::Literal(CypherLiteral::String(term)) = right.as_ref()
209        {
210            let label_name = self.schema.label_name_by_id(label_id)?;
211
212            // Check if property has a JsonFullText index
213            for idx in &self.schema.indexes {
214                if let IndexDefinition::JsonFullText(cfg) = idx
215                    && cfg.label == *label_name
216                    && cfg.column == *prop
217                    && cfg.metadata.status == IndexStatus::Online
218                {
219                    return Some((prop.clone(), term.clone(), None));
220                }
221            }
222        }
223        None
224    }
225
226    /// Split AND-connected predicates into a list.
227    fn split_conjuncts(expr: &Expr) -> Vec<Expr> {
228        match expr {
229            Expr::BinaryOp {
230                left,
231                op: BinaryOp::And,
232                right,
233            } => {
234                let mut result = Self::split_conjuncts(left);
235                result.extend(Self::split_conjuncts(right));
236                result
237            }
238            _ => vec![expr.clone()],
239        }
240    }
241}
242
243/// Split result of predicate analysis: pushable vs residual.
244#[derive(Debug)]
245pub struct PredicateAnalysis {
246    /// Predicates that can be pushed to storage
247    pub pushable: Vec<Expr>,
248    /// Predicates that must be evaluated post-scan
249    pub residual: Vec<Expr>,
250    /// Properties needed for residual evaluation
251    pub required_properties: Vec<String>,
252}
253
254/// Classifies predicates as pushable to Lance or residual (post-scan).
255#[derive(Debug, Default)]
256pub struct PredicateAnalyzer;
257
258impl PredicateAnalyzer {
259    /// Create a new analyzer for classifying predicates.
260    pub fn new() -> Self {
261        Self
262    }
263
264    /// Split a predicate into pushable (Lance) and residual (post-scan) parts.
265    pub fn analyze(&self, predicate: &Expr, scan_variable: &str) -> PredicateAnalysis {
266        let mut pushable = Vec::new();
267        let mut residual = Vec::new();
268
269        self.split_conjuncts(predicate, scan_variable, &mut pushable, &mut residual);
270
271        let required_properties = self.extract_properties(&residual, scan_variable);
272
273        PredicateAnalysis {
274            pushable,
275            residual,
276            required_properties,
277        }
278    }
279
280    /// Split AND-connected predicates
281    fn split_conjuncts(
282        &self,
283        expr: &Expr,
284        variable: &str,
285        pushable: &mut Vec<Expr>,
286        residual: &mut Vec<Expr>,
287    ) {
288        // Try OR-to-IN conversion first
289        if let Some(in_expr) = try_or_to_in(expr, variable)
290            && self.is_pushable(&in_expr, variable)
291        {
292            pushable.push(in_expr);
293            return;
294        }
295
296        match expr {
297            Expr::BinaryOp {
298                left,
299                op: BinaryOp::And,
300                right,
301            } => {
302                self.split_conjuncts(left, variable, pushable, residual);
303                self.split_conjuncts(right, variable, pushable, residual);
304            }
305            _ => {
306                if self.is_pushable(expr, variable) {
307                    pushable.push(expr.clone());
308                } else {
309                    residual.push(expr.clone());
310                }
311            }
312        }
313    }
314
315    /// Returns `true` if a predicate can be pushed down to Lance storage.
316    pub fn is_pushable(&self, expr: &Expr, variable: &str) -> bool {
317        match expr {
318            Expr::In {
319                expr: left,
320                list: right,
321            } => {
322                // Check left side is a property of the scan variable
323                let left_is_property = matches!(
324                    left.as_ref(),
325                    Expr::Property(box_expr, _) if matches!(box_expr.as_ref(), Expr::Variable(v) if v == variable)
326                );
327                // Check right side is list or parameter
328                let right_valid = matches!(right.as_ref(), Expr::List(_) | Expr::Parameter(_));
329                left_is_property && right_valid
330            }
331            Expr::BinaryOp { left, op, right } => {
332                // Check operator is supported
333                let op_supported = matches!(
334                    op,
335                    BinaryOp::Eq
336                        | BinaryOp::NotEq
337                        | BinaryOp::Lt
338                        | BinaryOp::LtEq
339                        | BinaryOp::Gt
340                        | BinaryOp::GtEq
341                        | BinaryOp::Contains
342                        | BinaryOp::StartsWith
343                        | BinaryOp::EndsWith
344                );
345
346                if !op_supported {
347                    return false;
348                }
349
350                // Check left side is a property of the scan variable
351                // Structure: Property(Identifier(var), prop_name)
352                let left_is_property = matches!(
353                    left.as_ref(),
354                    Expr::Property(box_expr, _) if matches!(box_expr.as_ref(), Expr::Variable(v) if v == variable)
355                );
356
357                // Check right side is a literal or parameter or list of literals
358                // For string operators, strict requirement on String Literal
359                let right_valid = if matches!(
360                    op,
361                    BinaryOp::Contains | BinaryOp::StartsWith | BinaryOp::EndsWith
362                ) {
363                    matches!(right.as_ref(), Expr::Literal(CypherLiteral::String(_)))
364                } else {
365                    matches!(
366                        right.as_ref(),
367                        Expr::Literal(_) | Expr::Parameter(_) | Expr::List(_)
368                    )
369                };
370
371                left_is_property && right_valid
372            }
373            Expr::UnaryOp {
374                op: UnaryOp::Not,
375                expr,
376            } => self.is_pushable(expr, variable),
377
378            Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
379                // Check if inner is a property of the scan variable
380                matches!(
381                    inner.as_ref(),
382                    Expr::Property(var_expr, _)
383                        if matches!(var_expr.as_ref(), Expr::Variable(v) if v == variable)
384                )
385            }
386
387            _ => false,
388        }
389    }
390
391    /// Extract property names required by residual predicates
392    fn extract_properties(&self, exprs: &[Expr], variable: &str) -> Vec<String> {
393        let mut props = HashSet::new();
394        for expr in exprs {
395            collect_properties(expr, variable, &mut props);
396        }
397        props.into_iter().collect()
398    }
399}
400
401/// Attempt to convert OR disjunctions to IN predicates
402fn try_or_to_in(expr: &Expr, variable: &str) -> Option<Expr> {
403    match expr {
404        Expr::BinaryOp {
405            op: BinaryOp::Or, ..
406        } => {
407            // Collect all equality comparisons on the same property
408            let mut property: Option<String> = None;
409            let mut values: Vec<Expr> = Vec::new();
410
411            if collect_or_equals(expr, variable, &mut property, &mut values)
412                && let Some(prop) = property
413                && values.len() >= 2
414            {
415                return Some(Expr::In {
416                    expr: Box::new(Expr::Property(
417                        Box::new(Expr::Variable(variable.to_string())),
418                        prop,
419                    )),
420                    list: Box::new(Expr::List(values)),
421                });
422            }
423            None
424        }
425        _ => None,
426    }
427}
428
429fn collect_or_equals(
430    expr: &Expr,
431    variable: &str,
432    property: &mut Option<String>,
433    values: &mut Vec<Expr>,
434) -> bool {
435    match expr {
436        Expr::BinaryOp {
437            left,
438            op: BinaryOp::Or,
439            right,
440        } => {
441            collect_or_equals(left, variable, property, values)
442                && collect_or_equals(right, variable, property, values)
443        }
444        Expr::BinaryOp {
445            left,
446            op: BinaryOp::Eq,
447            right,
448        } => {
449            if let Expr::Property(var_expr, prop) = left.as_ref()
450                && let Expr::Variable(v) = var_expr.as_ref()
451                && v == variable
452            {
453                match property {
454                    None => {
455                        *property = Some(prop.clone());
456                        values.push(right.as_ref().clone());
457                        return true;
458                    }
459                    Some(p) if p == prop => {
460                        values.push(right.as_ref().clone());
461                        return true;
462                    }
463                    _ => return false, // Different properties
464                }
465            }
466            false
467        }
468        _ => false,
469    }
470}
471
472fn collect_properties(expr: &Expr, variable: &str, props: &mut HashSet<String>) {
473    match expr {
474        Expr::Property(box_expr, prop) => {
475            if let Expr::Variable(v) = box_expr.as_ref()
476                && v == variable
477            {
478                props.insert(prop.clone());
479            }
480        }
481        Expr::BinaryOp { left, right, .. } => {
482            collect_properties(left, variable, props);
483            collect_properties(right, variable, props);
484        }
485        Expr::UnaryOp { expr, .. } => {
486            collect_properties(expr, variable, props);
487        }
488        Expr::IsNull(expr) | Expr::IsNotNull(expr) => {
489            collect_properties(expr, variable, props);
490        }
491        Expr::List(items) => {
492            for item in items {
493                collect_properties(item, variable, props);
494            }
495        }
496        Expr::Map(items) => {
497            for (_, item) in items {
498                collect_properties(item, variable, props);
499            }
500        }
501        Expr::FunctionCall { args, .. } => {
502            for arg in args {
503                collect_properties(arg, variable, props);
504            }
505        }
506        Expr::ArrayIndex {
507            array: arr,
508            index: idx,
509        } => {
510            collect_properties(arr, variable, props);
511            collect_properties(idx, variable, props);
512        }
513        _ => {}
514    }
515}
516
517/// Increment the last character of a string to create an exclusive upper bound.
518///
519/// For ASCII strings, this increments the last character.
520/// For example: "John" -> "Joho"
521///
522/// Returns `None` if the last character is at its maximum value (cannot be incremented).
523fn increment_last_char(s: &str) -> Option<String> {
524    if s.is_empty() {
525        return None;
526    }
527
528    let mut chars: Vec<char> = s.chars().collect();
529    let last_idx = chars.len() - 1;
530    let last_char = chars[last_idx];
531
532    // Increment the last character
533    // For most ASCII/UTF-8 characters, this works correctly
534    if let Some(next_char) = char::from_u32(last_char as u32 + 1) {
535        chars[last_idx] = next_char;
536        Some(chars.into_iter().collect())
537    } else {
538        // Last character is at maximum, cannot increment
539        None
540    }
541}
542
543/// Flatten nested AND expressions into a vector
544fn flatten_ands(expr: &Expr) -> Vec<&Expr> {
545    match expr {
546        Expr::BinaryOp {
547            left,
548            op: BinaryOp::And,
549            right,
550        } => {
551            let mut result = flatten_ands(left);
552            result.extend(flatten_ands(right));
553            result
554        }
555        _ => vec![expr],
556    }
557}
558
559/// Converts pushable predicates to Lance SQL filter strings.
560#[derive(Debug)]
561pub struct LanceFilterGenerator;
562
563impl LanceFilterGenerator {
564    /// Checks if a string contains SQL LIKE wildcard characters.
565    ///
566    /// # Security
567    ///
568    /// **CWE-89 (SQL Injection)**: Predicates containing wildcards are NOT pushed
569    /// to storage because Lance DataFusion doesn't support the ESCAPE clause.
570    /// Instead, they're evaluated at the application layer where we have full
571    /// control over string matching semantics.
572    fn contains_sql_wildcards(s: &str) -> bool {
573        s.contains('%') || s.contains('_')
574    }
575
576    /// Escapes special characters in LIKE patterns.
577    ///
578    /// **Note**: This function is kept for documentation and potential future use,
579    /// but currently we do not push down LIKE patterns containing wildcards
580    /// because Lance DataFusion doesn't support the ESCAPE clause.
581    #[expect(
582        dead_code,
583        reason = "Reserved for future use when Lance supports ESCAPE"
584    )]
585    fn escape_like_pattern(s: &str) -> String {
586        s.replace('\\', "\\\\")
587            .replace('%', "\\%")
588            .replace('_', "\\_")
589            .replace('\'', "''")
590    }
591
592    /// Converts pushable predicates to Lance SQL filter string.
593    ///
594    /// When `schema_props` is provided, properties not in the schema (overflow properties)
595    /// are skipped since they don't exist as physical columns in Lance.
596    pub fn generate(
597        predicates: &[Expr],
598        variable: &str,
599        schema_props: Option<&HashMap<String, PropertyMeta>>,
600    ) -> Option<String> {
601        if predicates.is_empty() {
602            return None;
603        }
604
605        // Flatten nested ANDs first
606        let flattened: Vec<&Expr> = predicates.iter().flat_map(|p| flatten_ands(p)).collect();
607
608        // Optimize Ranges: Group predicates by column and combine into >= AND <= if possible
609        let mut by_column: HashMap<String, Vec<&Expr>> = HashMap::new();
610        let mut optimized_filters: Vec<String> = Vec::new();
611        let mut used_expressions: HashSet<*const Expr> = HashSet::new();
612
613        for expr in &flattened {
614            if let Some(col) = Self::extract_column_from_range(expr, variable, schema_props) {
615                by_column.entry(col).or_default().push(expr);
616            }
617        }
618
619        for (col, exprs) in &by_column {
620            if exprs.len() < 2 {
621                continue;
622            }
623
624            // Try to find pairs of >/>= and </<=
625            // Very naive: find ONE pair and emit range expression.
626            // Complex ranges (e.g. >10 AND >20) are not merged but valid.
627            // We look for: (col > L OR col >= L) AND (col < R OR col <= R)
628
629            let mut lower: Option<(bool, &Expr, &Expr)> = None; // (inclusive, val_expr, original_expr)
630            let mut upper: Option<(bool, &Expr, &Expr)> = None;
631
632            for expr in exprs {
633                if let Expr::BinaryOp { op, right, .. } = expr {
634                    match op {
635                        BinaryOp::Gt => {
636                            // If we have multiple lower bounds, pick the last one (arbitrary for now, intersection handles logic)
637                            lower = Some((false, right, expr));
638                        }
639                        BinaryOp::GtEq => {
640                            lower = Some((true, right, expr));
641                        }
642                        BinaryOp::Lt => {
643                            upper = Some((false, right, expr));
644                        }
645                        BinaryOp::LtEq => {
646                            upper = Some((true, right, expr));
647                        }
648                        _ => {}
649                    }
650                }
651            }
652
653            if let (Some((true, l_val, l_expr)), Some((true, u_val, u_expr))) = (lower, upper) {
654                // Both inclusive -> use >= AND <= (Lance doesn't support BETWEEN)
655                if let (Some(l_str), Some(u_str)) =
656                    (Self::value_to_lance(l_val), Self::value_to_lance(u_val))
657                {
658                    optimized_filters.push(format!(
659                        "\"{}\" >= {} AND \"{}\" <= {}",
660                        col, l_str, col, u_str
661                    ));
662                    used_expressions.insert(l_expr as *const Expr);
663                    used_expressions.insert(u_expr as *const Expr);
664                }
665            }
666        }
667
668        let mut filters = optimized_filters;
669
670        for expr in flattened {
671            if used_expressions.contains(&(expr as *const Expr)) {
672                continue;
673            }
674            if let Some(s) = Self::expr_to_lance(expr, variable, schema_props) {
675                filters.push(s);
676            }
677        }
678
679        if filters.is_empty() {
680            None
681        } else {
682            Some(filters.join(" AND "))
683        }
684    }
685
686    fn extract_column_from_range(
687        expr: &Expr,
688        variable: &str,
689        schema_props: Option<&HashMap<String, PropertyMeta>>,
690    ) -> Option<String> {
691        match expr {
692            Expr::BinaryOp { left, op, .. } => {
693                if matches!(
694                    op,
695                    BinaryOp::Gt | BinaryOp::GtEq | BinaryOp::Lt | BinaryOp::LtEq
696                ) {
697                    return Self::extract_column(left, variable, schema_props);
698                }
699                None
700            }
701            _ => None,
702        }
703    }
704
705    fn expr_to_lance(
706        expr: &Expr,
707        variable: &str,
708        schema_props: Option<&HashMap<String, PropertyMeta>>,
709    ) -> Option<String> {
710        match expr {
711            Expr::In {
712                expr: left,
713                list: right,
714            } => {
715                let column = Self::extract_column(left, variable, schema_props)?;
716                let value = Self::value_to_lance(right)?;
717                Some(format!("{} IN {}", column, value))
718            }
719            Expr::BinaryOp { left, op, right } => {
720                let column = Self::extract_column(left, variable, schema_props)?;
721
722                // Special handling for string operators
723                // Security: CWE-89 - Prevent SQL wildcard injection
724                //
725                // Lance DataFusion doesn't support the ESCAPE clause, so we cannot
726                // safely push down LIKE predicates containing SQL wildcards (% or _).
727                // If the input contains these characters, we return None to keep
728                // the predicate as a residual for application-level evaluation.
729                match op {
730                    BinaryOp::Contains | BinaryOp::StartsWith | BinaryOp::EndsWith => {
731                        let raw_value = Self::get_string_value(right)?;
732
733                        // If the value contains SQL wildcards, don't push down
734                        // to prevent wildcard injection attacks
735                        if Self::contains_sql_wildcards(&raw_value) {
736                            return None;
737                        }
738
739                        // Escape single quotes for the SQL string
740                        let escaped = raw_value.replace('\'', "''");
741
742                        match op {
743                            BinaryOp::Contains => Some(format!("{} LIKE '%{}%'", column, escaped)),
744                            BinaryOp::StartsWith => Some(format!("{} LIKE '{}%'", column, escaped)),
745                            BinaryOp::EndsWith => Some(format!("{} LIKE '%{}'", column, escaped)),
746                            _ => unreachable!(),
747                        }
748                    }
749                    _ => {
750                        let op_str = Self::op_to_lance(op)?;
751                        let value = Self::value_to_lance(right)?;
752                        // Use unquoted column name for DataFusion compatibility
753                        // DataFusion treats unquoted identifiers case-insensitively
754                        Some(format!("{} {} {}", column, op_str, value))
755                    }
756                }
757            }
758            Expr::UnaryOp {
759                op: UnaryOp::Not,
760                expr,
761            } => {
762                let inner = Self::expr_to_lance(expr, variable, schema_props)?;
763                Some(format!("NOT ({})", inner))
764            }
765            Expr::IsNull(inner) => {
766                let column = Self::extract_column(inner, variable, schema_props)?;
767                Some(format!("{} IS NULL", column))
768            }
769            Expr::IsNotNull(inner) => {
770                let column = Self::extract_column(inner, variable, schema_props)?;
771                Some(format!("{} IS NOT NULL", column))
772            }
773            _ => None,
774        }
775    }
776
777    fn extract_column(
778        expr: &Expr,
779        variable: &str,
780        schema_props: Option<&HashMap<String, PropertyMeta>>,
781    ) -> Option<String> {
782        match expr {
783            Expr::Property(box_expr, prop) => {
784                if let Expr::Variable(var) = box_expr.as_ref()
785                    && var == variable
786                {
787                    // System columns (starting with _) are always physical Lance columns
788                    if prop.starts_with('_') {
789                        return Some(prop.clone());
790                    }
791                    // If schema_props is provided, only allow properties that are
792                    // physical columns in Lance. Overflow properties (not in schema)
793                    // don't exist as Lance columns.
794                    // If schema_props is Some but empty (schemaless label), ALL
795                    // non-system properties are overflow.
796                    // If schema_props is None, no filtering is applied (caller
797                    // doesn't have schema info).
798                    if let Some(props) = schema_props
799                        && !props.contains_key(prop.as_str())
800                    {
801                        return None;
802                    }
803                    return Some(prop.clone());
804                }
805                None
806            }
807            _ => None,
808        }
809    }
810
811    fn op_to_lance(op: &BinaryOp) -> Option<&'static str> {
812        match op {
813            BinaryOp::Eq => Some("="),
814            BinaryOp::NotEq => Some("!="),
815            BinaryOp::Lt => Some("<"),
816            BinaryOp::LtEq => Some("<="),
817            BinaryOp::Gt => Some(">"),
818            BinaryOp::GtEq => Some(">="),
819            _ => None,
820        }
821    }
822
823    fn value_to_lance(expr: &Expr) -> Option<String> {
824        match expr {
825            Expr::Literal(CypherLiteral::String(s)) => {
826                // Normalize datetime strings to include seconds for Arrow timestamp parsing.
827                // Our Cypher datetime formatting omits `:00` seconds (e.g. `2021-06-01T00:00Z`)
828                // but Arrow/Lance requires full `HH:MM:SS` for timestamp parsing.
829                let s = super::df_expr::normalize_datetime_str(s).unwrap_or_else(|| s.clone());
830                Some(format!("'{}'", s.replace("'", "''")))
831            }
832            Expr::Literal(CypherLiteral::Integer(i)) => Some(i.to_string()),
833            Expr::Literal(CypherLiteral::Float(f)) => Some(f.to_string()),
834            Expr::Literal(CypherLiteral::Bool(b)) => Some(b.to_string()),
835            Expr::Literal(CypherLiteral::Null) => Some("NULL".to_string()),
836            Expr::List(items) => {
837                let values: Option<Vec<String>> = items.iter().map(Self::value_to_lance).collect();
838                values.map(|v| format!("({})", v.join(", ")))
839            }
840            // Security: CWE-89 - Parameters are NOT pushed to storage layer.
841            // Parameterized predicates stay in the application layer where the
842            // query executor can safely substitute values with proper type handling.
843            // This prevents potential SQL injection if Lance doesn't support the $name syntax.
844            Expr::Parameter(_) => None,
845            _ => None,
846        }
847    }
848
849    /// Extracts raw string value from expression for LIKE pattern use.
850    ///
851    /// Returns the raw string without escaping - escaping is handled by
852    /// `escape_like_pattern` for LIKE clauses.
853    fn get_string_value(expr: &Expr) -> Option<String> {
854        match expr {
855            Expr::Literal(CypherLiteral::String(s)) => Some(s.clone()),
856            _ => None,
857        }
858    }
859}
860
861#[cfg(test)]
862mod security_tests {
863    use super::*;
864
865    /// Tests for CWE-89 (SQL Injection) prevention in LIKE patterns.
866    mod wildcard_protection {
867        use super::*;
868
869        #[test]
870        fn test_contains_sql_wildcards_detects_percent() {
871            assert!(LanceFilterGenerator::contains_sql_wildcards("admin%"));
872            assert!(LanceFilterGenerator::contains_sql_wildcards("%admin"));
873            assert!(LanceFilterGenerator::contains_sql_wildcards("ad%min"));
874        }
875
876        #[test]
877        fn test_contains_sql_wildcards_detects_underscore() {
878            assert!(LanceFilterGenerator::contains_sql_wildcards("a_min"));
879            assert!(LanceFilterGenerator::contains_sql_wildcards("_admin"));
880            assert!(LanceFilterGenerator::contains_sql_wildcards("admin_"));
881        }
882
883        #[test]
884        fn test_contains_sql_wildcards_safe_strings() {
885            assert!(!LanceFilterGenerator::contains_sql_wildcards("admin"));
886            assert!(!LanceFilterGenerator::contains_sql_wildcards("John Smith"));
887            assert!(!LanceFilterGenerator::contains_sql_wildcards(
888                "test@example.com"
889            ));
890        }
891
892        #[test]
893        fn test_wildcard_in_contains_not_pushed_down() {
894            // Input with % should NOT be pushed to storage
895            let expr = Expr::BinaryOp {
896                left: Box::new(Expr::Property(
897                    Box::new(Expr::Variable("n".to_string())),
898                    "name".to_string(),
899                )),
900                op: BinaryOp::Contains,
901                right: Box::new(Expr::Literal(CypherLiteral::String("admin%".to_string()))),
902            };
903
904            let filter = LanceFilterGenerator::generate(&[expr], "n", None);
905            assert!(
906                filter.is_none(),
907                "CONTAINS with wildcard should not be pushed to storage"
908            );
909        }
910
911        #[test]
912        fn test_underscore_in_startswith_not_pushed_down() {
913            // Input with _ should NOT be pushed to storage
914            let expr = Expr::BinaryOp {
915                left: Box::new(Expr::Property(
916                    Box::new(Expr::Variable("n".to_string())),
917                    "name".to_string(),
918                )),
919                op: BinaryOp::StartsWith,
920                right: Box::new(Expr::Literal(CypherLiteral::String("user_".to_string()))),
921            };
922
923            let filter = LanceFilterGenerator::generate(&[expr], "n", None);
924            assert!(
925                filter.is_none(),
926                "STARTSWITH with underscore should not be pushed to storage"
927            );
928        }
929
930        #[test]
931        fn test_safe_contains_is_pushed_down() {
932            // Input without wildcards SHOULD be pushed to storage
933            let expr = Expr::BinaryOp {
934                left: Box::new(Expr::Property(
935                    Box::new(Expr::Variable("n".to_string())),
936                    "name".to_string(),
937                )),
938                op: BinaryOp::Contains,
939                right: Box::new(Expr::Literal(CypherLiteral::String("admin".to_string()))),
940            };
941
942            let filter = LanceFilterGenerator::generate(&[expr], "n", None);
943            assert!(filter.is_some(), "Safe CONTAINS should be pushed down");
944            assert!(
945                filter.as_ref().unwrap().contains("LIKE '%admin%'"),
946                "Generated filter: {:?}",
947                filter
948            );
949        }
950
951        #[test]
952        fn test_single_quotes_escaped_in_safe_string() {
953            // Single quotes should be doubled in safe strings
954            let expr = Expr::BinaryOp {
955                left: Box::new(Expr::Property(
956                    Box::new(Expr::Variable("n".to_string())),
957                    "name".to_string(),
958                )),
959                op: BinaryOp::Contains,
960                right: Box::new(Expr::Literal(CypherLiteral::String("O'Brien".to_string()))),
961            };
962
963            let filter = LanceFilterGenerator::generate(&[expr], "n", None).unwrap();
964            assert!(
965                filter.contains("O''Brien"),
966                "Single quotes should be doubled: {}",
967                filter
968            );
969        }
970    }
971
972    /// Tests for parameter handling (not pushed to storage).
973    mod parameter_safety {
974        use super::*;
975
976        #[test]
977        fn test_parameters_not_pushed_down() {
978            let expr = Expr::BinaryOp {
979                left: Box::new(Expr::Property(
980                    Box::new(Expr::Variable("n".to_string())),
981                    "name".to_string(),
982                )),
983                op: BinaryOp::Eq,
984                right: Box::new(Expr::Parameter("userInput".to_string())),
985            };
986
987            let filter = LanceFilterGenerator::generate(&[expr], "n", None);
988            assert!(
989                filter.is_none(),
990                "Parameterized predicates should not be pushed to storage"
991            );
992        }
993    }
994}