Skip to main content

uni_query/query/
pushdown.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use std::collections::{HashMap, HashSet};
5use uni_cypher::ast::{BinaryOp, CypherLiteral, Expr, UnaryOp};
6
7use uni_common::core::id::UniId;
8use uni_common::core::schema::{IndexDefinition, PropertyMeta, Schema};
9
10/// Categorized pushdown strategy for predicates with index awareness.
11///
12/// This struct represents the optimal execution path for predicates,
13/// routing them to the most selective index when available.
14#[derive(Debug, Clone, Default)]
15pub struct PushdownStrategy {
16    /// UID lookup predicate: WHERE n._uid = 'base32string'
17    /// Contains the UniId parsed from the predicate value.
18    pub uid_lookup: Option<UniId>,
19
20    /// BTree index prefix scans for STARTS WITH predicates.
21    /// When a property has a scalar BTree index, STARTS WITH 'prefix' can be
22    /// converted to a range scan: column >= 'prefix' AND column < 'prefix_next'.
23    /// Vec of: (column_name, lower_bound, upper_bound)
24    pub btree_prefix_scans: Vec<(String, String, String)>,
25
26    /// JSON FTS predicates for full-text search on JSON columns.
27    /// Vec of: (column_name, search_term, optional_path_filter)
28    pub json_fts_predicates: Vec<(String, String, Option<String>)>,
29
30    /// Predicates pushable to Lance scan filter.
31    pub lance_predicates: Vec<Expr>,
32
33    /// Residual predicates (not pushable to storage).
34    pub residual: Vec<Expr>,
35}
36
37/// Analyzer that considers available indexes when categorizing predicates.
38///
39/// Unlike `PredicateAnalyzer` which only categorizes into pushable/residual,
40/// this analyzer routes predicates to the most optimal execution path:
41/// 1. UID index lookup (most selective, O(1) lookup)
42/// 2. BTree prefix scan (STARTS WITH on scalar-indexed properties)
43/// 3. JSON FTS lookup (BM25 full-text search)
44/// 4. Lance scan filter (columnar scan with filter)
45/// 5. Residual (post-scan evaluation)
46pub struct IndexAwareAnalyzer<'a> {
47    schema: &'a Schema,
48}
49
50impl<'a> IndexAwareAnalyzer<'a> {
51    pub fn new(schema: &'a Schema) -> Self {
52        Self { schema }
53    }
54
55    /// Analyze predicates and determine optimal pushdown strategy.
56    ///
57    /// Predicates are categorized in order of selectivity:
58    /// 1. `_uid = 'xxx'` -> UID index lookup
59    /// 2. BTree prefix scans for STARTS WITH predicates
60    /// 3. Pushable to Lance -> Lance filter
61    /// 4. Everything else -> Residual
62    pub fn analyze(&self, predicate: &Expr, variable: &str, label_id: u16) -> PushdownStrategy {
63        let mut strategy = PushdownStrategy::default();
64        let conjuncts = Self::split_conjuncts(predicate);
65        let lance_analyzer = PredicateAnalyzer::new();
66
67        for conj in conjuncts {
68            // 1. Check for _uid = 'xxx' pattern (most selective)
69            if let Some(uid) = self.extract_uid_predicate(&conj, variable) {
70                strategy.uid_lookup = Some(uid);
71                continue;
72            }
73
74            // 2. Check for BTree-indexed STARTS WITH predicates
75            if let Some((column, lower, upper)) =
76                self.extract_btree_prefix_scan(&conj, variable, label_id)
77            {
78                strategy.btree_prefix_scans.push((column, lower, upper));
79                continue;
80            }
81
82            // 3. Check for JSON FTS predicates (CONTAINS on FTS-indexed columns)
83            if let Some((column, term, path)) =
84                self.extract_json_fts_predicate(&conj, variable, label_id)
85            {
86                strategy.json_fts_predicates.push((column, term, path));
87                continue;
88            }
89
90            // 4. Check if pushable to Lance
91            if lance_analyzer.is_pushable(&conj, variable) {
92                strategy.lance_predicates.push(conj);
93            } else {
94                strategy.residual.push(conj);
95            }
96        }
97
98        strategy
99    }
100
101    /// Extract UniId from `_uid = 'xxx'` predicate.
102    ///
103    /// # Security
104    ///
105    /// **CWE-345 (Insufficient Verification)**: The UID value is validated using
106    /// `UniId::from_multibase()` which enforces Base32Lower encoding and 32-byte
107    /// length. Invalid UIDs are rejected and the predicate becomes residual.
108    fn extract_uid_predicate(&self, expr: &Expr, variable: &str) -> Option<UniId> {
109        if let Expr::BinaryOp {
110            left,
111            op: BinaryOp::Eq,
112            right,
113        } = expr
114            && let Expr::Property(var_expr, prop) = left.as_ref()
115            && let Expr::Variable(v) = var_expr.as_ref()
116            && v == variable
117            && prop == "_uid"
118            && let Expr::Literal(CypherLiteral::String(s)) = right.as_ref()
119        {
120            // Security: UniId::from_multibase validates Base32Lower and 32-byte length
121            return UniId::from_multibase(s).ok();
122        }
123        None
124    }
125
126    /// Extract BTree prefix scan for STARTS WITH predicates on scalar-indexed properties.
127    ///
128    /// Returns `Some((column, lower_bound, upper_bound))` if:
129    /// - The predicate is `variable.property STARTS WITH 'prefix'`
130    /// - The property has a scalar BTree index
131    /// - The prefix is non-empty (empty prefix matches all, not worth optimizing)
132    ///
133    /// Converts `column STARTS WITH 'John'` to:
134    /// `column >= 'John' AND column < 'Joho'`
135    fn extract_btree_prefix_scan(
136        &self,
137        expr: &Expr,
138        variable: &str,
139        label_id: u16,
140    ) -> Option<(String, String, String)> {
141        if let Expr::BinaryOp {
142            left,
143            op: BinaryOp::StartsWith,
144            right,
145        } = expr
146            && let Expr::Property(var_expr, prop) = left.as_ref()
147            && let Expr::Variable(v) = var_expr.as_ref()
148            && v == variable
149            && let Expr::Literal(CypherLiteral::String(prefix)) = right.as_ref()
150        {
151            // Skip empty prefix (matches all, no optimization benefit)
152            if prefix.is_empty() {
153                return None;
154            }
155
156            // Check if property has a scalar BTree index
157            let label_name = self.schema.label_name_by_id(label_id)?;
158
159            for idx in &self.schema.indexes {
160                if let uni_common::core::schema::IndexDefinition::Scalar(cfg) = idx
161                    && cfg.label == *label_name
162                    && cfg.properties.contains(prop)
163                    && cfg.index_type == uni_common::core::schema::ScalarIndexType::BTree
164                {
165                    // Calculate the upper bound by incrementing the last character
166                    // For "John" -> "Joho"
167                    // This works for ASCII and most UTF-8 strings
168                    if let Some(upper) = increment_last_char(prefix) {
169                        return Some((prop.clone(), prefix.clone(), upper));
170                    }
171                }
172            }
173        }
174        None
175    }
176
177    /// Extract JSON FTS predicate from CONTAINS on an FTS-indexed column.
178    ///
179    /// Returns `Some((column, search_term, optional_path))` if:
180    /// - The predicate is `variable.column CONTAINS 'term'`
181    /// - The column has a `JsonFullText` index
182    fn extract_json_fts_predicate(
183        &self,
184        expr: &Expr,
185        variable: &str,
186        label_id: u16,
187    ) -> Option<(String, String, Option<String>)> {
188        if let Expr::BinaryOp {
189            left,
190            op: BinaryOp::Contains,
191            right,
192        } = expr
193            && let Expr::Property(var_expr, prop) = left.as_ref()
194            && let Expr::Variable(v) = var_expr.as_ref()
195            && v == variable
196            && let Expr::Literal(CypherLiteral::String(term)) = right.as_ref()
197        {
198            let label_name = self.schema.label_name_by_id(label_id)?;
199
200            // Check if property has a JsonFullText index
201            for idx in &self.schema.indexes {
202                if let IndexDefinition::JsonFullText(cfg) = idx
203                    && cfg.label == *label_name
204                    && cfg.column == *prop
205                {
206                    return Some((prop.clone(), term.clone(), None));
207                }
208            }
209        }
210        None
211    }
212
213    /// Split AND-connected predicates into a list.
214    fn split_conjuncts(expr: &Expr) -> Vec<Expr> {
215        match expr {
216            Expr::BinaryOp {
217                left,
218                op: BinaryOp::And,
219                right,
220            } => {
221                let mut result = Self::split_conjuncts(left);
222                result.extend(Self::split_conjuncts(right));
223                result
224            }
225            _ => vec![expr.clone()],
226        }
227    }
228}
229
230pub struct PredicateAnalysis {
231    /// Predicates that can be pushed to storage
232    pub pushable: Vec<Expr>,
233    /// Predicates that must be evaluated post-scan
234    pub residual: Vec<Expr>,
235    /// Properties needed for residual evaluation
236    pub required_properties: Vec<String>,
237}
238
239#[derive(Default)]
240pub struct PredicateAnalyzer;
241
242impl PredicateAnalyzer {
243    pub fn new() -> Self {
244        Self
245    }
246
247    /// Analyze a predicate and determine pushdown strategy
248    pub fn analyze(&self, predicate: &Expr, scan_variable: &str) -> PredicateAnalysis {
249        let mut pushable = Vec::new();
250        let mut residual = Vec::new();
251
252        self.split_conjuncts(predicate, scan_variable, &mut pushable, &mut residual);
253
254        let required_properties = self.extract_properties(&residual, scan_variable);
255
256        PredicateAnalysis {
257            pushable,
258            residual,
259            required_properties,
260        }
261    }
262
263    /// Split AND-connected predicates
264    fn split_conjuncts(
265        &self,
266        expr: &Expr,
267        variable: &str,
268        pushable: &mut Vec<Expr>,
269        residual: &mut Vec<Expr>,
270    ) {
271        // Try OR-to-IN conversion first
272        if let Some(in_expr) = try_or_to_in(expr, variable)
273            && self.is_pushable(&in_expr, variable)
274        {
275            pushable.push(in_expr);
276            return;
277        }
278
279        match expr {
280            Expr::BinaryOp {
281                left,
282                op: BinaryOp::And,
283                right,
284            } => {
285                self.split_conjuncts(left, variable, pushable, residual);
286                self.split_conjuncts(right, variable, pushable, residual);
287            }
288            _ => {
289                if self.is_pushable(expr, variable) {
290                    pushable.push(expr.clone());
291                } else {
292                    residual.push(expr.clone());
293                }
294            }
295        }
296    }
297
298    /// Check if a predicate can be pushed to Lance
299    pub fn is_pushable(&self, expr: &Expr, variable: &str) -> bool {
300        match expr {
301            Expr::In {
302                expr: left,
303                list: right,
304            } => {
305                // Check left side is a property of the scan variable
306                let left_is_property = matches!(
307                    left.as_ref(),
308                    Expr::Property(box_expr, _) if matches!(box_expr.as_ref(), Expr::Variable(v) if v == variable)
309                );
310                // Check right side is list or parameter
311                let right_valid = matches!(right.as_ref(), Expr::List(_) | Expr::Parameter(_));
312                left_is_property && right_valid
313            }
314            Expr::BinaryOp { left, op, right } => {
315                // Check operator is supported
316                let op_supported = matches!(
317                    op,
318                    BinaryOp::Eq
319                        | BinaryOp::NotEq
320                        | BinaryOp::Lt
321                        | BinaryOp::LtEq
322                        | BinaryOp::Gt
323                        | BinaryOp::GtEq
324                        | BinaryOp::Contains
325                        | BinaryOp::StartsWith
326                        | BinaryOp::EndsWith
327                );
328
329                if !op_supported {
330                    return false;
331                }
332
333                // Check left side is a property of the scan variable
334                // Structure: Property(Identifier(var), prop_name)
335                let left_is_property = matches!(
336                    left.as_ref(),
337                    Expr::Property(box_expr, _) if matches!(box_expr.as_ref(), Expr::Variable(v) if v == variable)
338                );
339
340                // Check right side is a literal or parameter or list of literals
341                // For string operators, strict requirement on String Literal
342                let right_valid = if matches!(
343                    op,
344                    BinaryOp::Contains | BinaryOp::StartsWith | BinaryOp::EndsWith
345                ) {
346                    matches!(right.as_ref(), Expr::Literal(CypherLiteral::String(_)))
347                } else {
348                    matches!(
349                        right.as_ref(),
350                        Expr::Literal(_) | Expr::Parameter(_) | Expr::List(_)
351                    )
352                };
353
354                left_is_property && right_valid
355            }
356            Expr::UnaryOp {
357                op: UnaryOp::Not,
358                expr,
359            } => self.is_pushable(expr, variable),
360
361            Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
362                // Check if inner is a property of the scan variable
363                matches!(
364                    inner.as_ref(),
365                    Expr::Property(var_expr, _)
366                        if matches!(var_expr.as_ref(), Expr::Variable(v) if v == variable)
367                )
368            }
369
370            _ => false,
371        }
372    }
373
374    /// Extract property names required by residual predicates
375    fn extract_properties(&self, exprs: &[Expr], variable: &str) -> Vec<String> {
376        let mut props = HashSet::new();
377        for expr in exprs {
378            collect_properties(expr, variable, &mut props);
379        }
380        props.into_iter().collect()
381    }
382}
383
384/// Attempt to convert OR disjunctions to IN predicates
385fn try_or_to_in(expr: &Expr, variable: &str) -> Option<Expr> {
386    match expr {
387        Expr::BinaryOp {
388            op: BinaryOp::Or, ..
389        } => {
390            // Collect all equality comparisons on the same property
391            let mut property: Option<String> = None;
392            let mut values: Vec<Expr> = Vec::new();
393
394            if collect_or_equals(expr, variable, &mut property, &mut values)
395                && let Some(prop) = property
396                && values.len() >= 2
397            {
398                return Some(Expr::In {
399                    expr: Box::new(Expr::Property(
400                        Box::new(Expr::Variable(variable.to_string())),
401                        prop,
402                    )),
403                    list: Box::new(Expr::List(values)),
404                });
405            }
406            None
407        }
408        _ => None,
409    }
410}
411
412fn collect_or_equals(
413    expr: &Expr,
414    variable: &str,
415    property: &mut Option<String>,
416    values: &mut Vec<Expr>,
417) -> bool {
418    match expr {
419        Expr::BinaryOp {
420            left,
421            op: BinaryOp::Or,
422            right,
423        } => {
424            collect_or_equals(left, variable, property, values)
425                && collect_or_equals(right, variable, property, values)
426        }
427        Expr::BinaryOp {
428            left,
429            op: BinaryOp::Eq,
430            right,
431        } => {
432            if let Expr::Property(var_expr, prop) = left.as_ref()
433                && let Expr::Variable(v) = var_expr.as_ref()
434                && v == variable
435            {
436                match property {
437                    None => {
438                        *property = Some(prop.clone());
439                        values.push(right.as_ref().clone());
440                        return true;
441                    }
442                    Some(p) if p == prop => {
443                        values.push(right.as_ref().clone());
444                        return true;
445                    }
446                    _ => return false, // Different properties
447                }
448            }
449            false
450        }
451        _ => false,
452    }
453}
454
455fn collect_properties(expr: &Expr, variable: &str, props: &mut HashSet<String>) {
456    match expr {
457        Expr::Property(box_expr, prop) => {
458            if let Expr::Variable(v) = box_expr.as_ref()
459                && v == variable
460            {
461                props.insert(prop.clone());
462            }
463        }
464        Expr::BinaryOp { left, right, .. } => {
465            collect_properties(left, variable, props);
466            collect_properties(right, variable, props);
467        }
468        Expr::UnaryOp { expr, .. } => {
469            collect_properties(expr, variable, props);
470        }
471        Expr::IsNull(expr) | Expr::IsNotNull(expr) => {
472            collect_properties(expr, variable, props);
473        }
474        Expr::List(items) => {
475            for item in items {
476                collect_properties(item, variable, props);
477            }
478        }
479        Expr::Map(items) => {
480            for (_, item) in items {
481                collect_properties(item, variable, props);
482            }
483        }
484        Expr::FunctionCall { args, .. } => {
485            for arg in args {
486                collect_properties(arg, variable, props);
487            }
488        }
489        Expr::ArrayIndex {
490            array: arr,
491            index: idx,
492        } => {
493            collect_properties(arr, variable, props);
494            collect_properties(idx, variable, props);
495        }
496        _ => {}
497    }
498}
499
500/// Increment the last character of a string to create an exclusive upper bound.
501///
502/// For ASCII strings, this increments the last character.
503/// For example: "John" -> "Joho"
504///
505/// Returns `None` if the last character is at its maximum value (cannot be incremented).
506fn increment_last_char(s: &str) -> Option<String> {
507    if s.is_empty() {
508        return None;
509    }
510
511    let mut chars: Vec<char> = s.chars().collect();
512    let last_idx = chars.len() - 1;
513    let last_char = chars[last_idx];
514
515    // Increment the last character
516    // For most ASCII/UTF-8 characters, this works correctly
517    if let Some(next_char) = char::from_u32(last_char as u32 + 1) {
518        chars[last_idx] = next_char;
519        Some(chars.into_iter().collect())
520    } else {
521        // Last character is at maximum, cannot increment
522        None
523    }
524}
525
526/// Flatten nested AND expressions into a vector
527fn flatten_ands(expr: &Expr) -> Vec<&Expr> {
528    match expr {
529        Expr::BinaryOp {
530            left,
531            op: BinaryOp::And,
532            right,
533        } => {
534            let mut result = flatten_ands(left);
535            result.extend(flatten_ands(right));
536            result
537        }
538        _ => vec![expr],
539    }
540}
541
542pub struct LanceFilterGenerator;
543
544impl LanceFilterGenerator {
545    /// Checks if a string contains SQL LIKE wildcard characters.
546    ///
547    /// # Security
548    ///
549    /// **CWE-89 (SQL Injection)**: Predicates containing wildcards are NOT pushed
550    /// to storage because Lance DataFusion doesn't support the ESCAPE clause.
551    /// Instead, they're evaluated at the application layer where we have full
552    /// control over string matching semantics.
553    fn contains_sql_wildcards(s: &str) -> bool {
554        s.contains('%') || s.contains('_')
555    }
556
557    /// Escapes special characters in LIKE patterns.
558    ///
559    /// **Note**: This function is kept for documentation and potential future use,
560    /// but currently we do not push down LIKE patterns containing wildcards
561    /// because Lance DataFusion doesn't support the ESCAPE clause.
562    #[expect(
563        dead_code,
564        reason = "Reserved for future use when Lance supports ESCAPE"
565    )]
566    fn escape_like_pattern(s: &str) -> String {
567        s.replace('\\', "\\\\")
568            .replace('%', "\\%")
569            .replace('_', "\\_")
570            .replace('\'', "''")
571    }
572
573    /// Converts pushable predicates to Lance SQL filter string.
574    ///
575    /// When `schema_props` is provided, properties not in the schema (overflow properties)
576    /// are skipped since they don't exist as physical columns in Lance.
577    pub fn generate(
578        predicates: &[Expr],
579        variable: &str,
580        schema_props: Option<&HashMap<String, PropertyMeta>>,
581    ) -> Option<String> {
582        if predicates.is_empty() {
583            return None;
584        }
585
586        // Flatten nested ANDs first
587        let flattened: Vec<&Expr> = predicates.iter().flat_map(|p| flatten_ands(p)).collect();
588
589        // Optimize Ranges: Group predicates by column and combine into >= AND <= if possible
590        let mut by_column: HashMap<String, Vec<&Expr>> = HashMap::new();
591        let mut optimized_filters: Vec<String> = Vec::new();
592        let mut used_expressions: HashSet<*const Expr> = HashSet::new();
593
594        for expr in &flattened {
595            if let Some(col) = Self::extract_column_from_range(expr, variable, schema_props) {
596                by_column.entry(col).or_default().push(expr);
597            }
598        }
599
600        for (col, exprs) in &by_column {
601            if exprs.len() < 2 {
602                continue;
603            }
604
605            // Try to find pairs of >/>= and </<=
606            // Very naive: find ONE pair and emit range expression.
607            // Complex ranges (e.g. >10 AND >20) are not merged but valid.
608            // We look for: (col > L OR col >= L) AND (col < R OR col <= R)
609
610            let mut lower: Option<(bool, &Expr, &Expr)> = None; // (inclusive, val_expr, original_expr)
611            let mut upper: Option<(bool, &Expr, &Expr)> = None;
612
613            for expr in exprs {
614                if let Expr::BinaryOp { op, right, .. } = expr {
615                    match op {
616                        BinaryOp::Gt => {
617                            // If we have multiple lower bounds, pick the last one (arbitrary for now, intersection handles logic)
618                            lower = Some((false, right, expr));
619                        }
620                        BinaryOp::GtEq => {
621                            lower = Some((true, right, expr));
622                        }
623                        BinaryOp::Lt => {
624                            upper = Some((false, right, expr));
625                        }
626                        BinaryOp::LtEq => {
627                            upper = Some((true, right, expr));
628                        }
629                        _ => {}
630                    }
631                }
632            }
633
634            if let (Some((true, l_val, l_expr)), Some((true, u_val, u_expr))) = (lower, upper) {
635                // Both inclusive -> use >= AND <= (Lance doesn't support BETWEEN)
636                if let (Some(l_str), Some(u_str)) =
637                    (Self::value_to_lance(l_val), Self::value_to_lance(u_val))
638                {
639                    optimized_filters.push(format!(
640                        "\"{}\" >= {} AND \"{}\" <= {}",
641                        col, l_str, col, u_str
642                    ));
643                    used_expressions.insert(l_expr as *const Expr);
644                    used_expressions.insert(u_expr as *const Expr);
645                }
646            }
647        }
648
649        let mut filters = optimized_filters;
650
651        for expr in flattened {
652            if used_expressions.contains(&(expr as *const Expr)) {
653                continue;
654            }
655            if let Some(s) = Self::expr_to_lance(expr, variable, schema_props) {
656                filters.push(s);
657            }
658        }
659
660        if filters.is_empty() {
661            None
662        } else {
663            Some(filters.join(" AND "))
664        }
665    }
666
667    fn extract_column_from_range(
668        expr: &Expr,
669        variable: &str,
670        schema_props: Option<&HashMap<String, PropertyMeta>>,
671    ) -> Option<String> {
672        match expr {
673            Expr::BinaryOp { left, op, .. } => {
674                if matches!(
675                    op,
676                    BinaryOp::Gt | BinaryOp::GtEq | BinaryOp::Lt | BinaryOp::LtEq
677                ) {
678                    return Self::extract_column(left, variable, schema_props);
679                }
680                None
681            }
682            _ => None,
683        }
684    }
685
686    fn expr_to_lance(
687        expr: &Expr,
688        variable: &str,
689        schema_props: Option<&HashMap<String, PropertyMeta>>,
690    ) -> Option<String> {
691        match expr {
692            Expr::In {
693                expr: left,
694                list: right,
695            } => {
696                let column = Self::extract_column(left, variable, schema_props)?;
697                let value = Self::value_to_lance(right)?;
698                Some(format!("{} IN {}", column, value))
699            }
700            Expr::BinaryOp { left, op, right } => {
701                let column = Self::extract_column(left, variable, schema_props)?;
702
703                // Special handling for string operators
704                // Security: CWE-89 - Prevent SQL wildcard injection
705                //
706                // Lance DataFusion doesn't support the ESCAPE clause, so we cannot
707                // safely push down LIKE predicates containing SQL wildcards (% or _).
708                // If the input contains these characters, we return None to keep
709                // the predicate as a residual for application-level evaluation.
710                match op {
711                    BinaryOp::Contains | BinaryOp::StartsWith | BinaryOp::EndsWith => {
712                        let raw_value = Self::get_string_value(right)?;
713
714                        // If the value contains SQL wildcards, don't push down
715                        // to prevent wildcard injection attacks
716                        if Self::contains_sql_wildcards(&raw_value) {
717                            return None;
718                        }
719
720                        // Escape single quotes for the SQL string
721                        let escaped = raw_value.replace('\'', "''");
722
723                        match op {
724                            BinaryOp::Contains => Some(format!("{} LIKE '%{}%'", column, escaped)),
725                            BinaryOp::StartsWith => Some(format!("{} LIKE '{}%'", column, escaped)),
726                            BinaryOp::EndsWith => Some(format!("{} LIKE '%{}'", column, escaped)),
727                            _ => unreachable!(),
728                        }
729                    }
730                    _ => {
731                        let op_str = Self::op_to_lance(op)?;
732                        let value = Self::value_to_lance(right)?;
733                        // Use unquoted column name for DataFusion compatibility
734                        // DataFusion treats unquoted identifiers case-insensitively
735                        Some(format!("{} {} {}", column, op_str, value))
736                    }
737                }
738            }
739            Expr::UnaryOp {
740                op: UnaryOp::Not,
741                expr,
742            } => {
743                let inner = Self::expr_to_lance(expr, variable, schema_props)?;
744                Some(format!("NOT ({})", inner))
745            }
746            Expr::IsNull(inner) => {
747                let column = Self::extract_column(inner, variable, schema_props)?;
748                Some(format!("{} IS NULL", column))
749            }
750            Expr::IsNotNull(inner) => {
751                let column = Self::extract_column(inner, variable, schema_props)?;
752                Some(format!("{} IS NOT NULL", column))
753            }
754            _ => None,
755        }
756    }
757
758    fn extract_column(
759        expr: &Expr,
760        variable: &str,
761        schema_props: Option<&HashMap<String, PropertyMeta>>,
762    ) -> Option<String> {
763        match expr {
764            Expr::Property(box_expr, prop) => {
765                if let Expr::Variable(var) = box_expr.as_ref()
766                    && var == variable
767                {
768                    // System columns (starting with _) are always physical Lance columns
769                    if prop.starts_with('_') {
770                        return Some(prop.clone());
771                    }
772                    // If schema_props is provided, only allow properties that are
773                    // physical columns in Lance. Overflow properties (not in schema)
774                    // don't exist as Lance columns.
775                    // If schema_props is Some but empty (schemaless label), ALL
776                    // non-system properties are overflow.
777                    // If schema_props is None, no filtering is applied (caller
778                    // doesn't have schema info).
779                    if let Some(props) = schema_props
780                        && !props.contains_key(prop.as_str())
781                    {
782                        return None;
783                    }
784                    return Some(prop.clone());
785                }
786                None
787            }
788            _ => None,
789        }
790    }
791
792    fn op_to_lance(op: &BinaryOp) -> Option<&'static str> {
793        match op {
794            BinaryOp::Eq => Some("="),
795            BinaryOp::NotEq => Some("!="),
796            BinaryOp::Lt => Some("<"),
797            BinaryOp::LtEq => Some("<="),
798            BinaryOp::Gt => Some(">"),
799            BinaryOp::GtEq => Some(">="),
800            _ => None,
801        }
802    }
803
804    fn value_to_lance(expr: &Expr) -> Option<String> {
805        match expr {
806            Expr::Literal(CypherLiteral::String(s)) => {
807                // Normalize datetime strings to include seconds for Arrow timestamp parsing.
808                // Our Cypher datetime formatting omits `:00` seconds (e.g. `2021-06-01T00:00Z`)
809                // but Arrow/Lance requires full `HH:MM:SS` for timestamp parsing.
810                let s = super::df_expr::normalize_datetime_str(s).unwrap_or_else(|| s.clone());
811                Some(format!("'{}'", s.replace("'", "''")))
812            }
813            Expr::Literal(CypherLiteral::Integer(i)) => Some(i.to_string()),
814            Expr::Literal(CypherLiteral::Float(f)) => Some(f.to_string()),
815            Expr::Literal(CypherLiteral::Bool(b)) => Some(b.to_string()),
816            Expr::Literal(CypherLiteral::Null) => Some("NULL".to_string()),
817            Expr::List(items) => {
818                let values: Option<Vec<String>> = items.iter().map(Self::value_to_lance).collect();
819                values.map(|v| format!("({})", v.join(", ")))
820            }
821            // Security: CWE-89 - Parameters are NOT pushed to storage layer.
822            // Parameterized predicates stay in the application layer where the
823            // query executor can safely substitute values with proper type handling.
824            // This prevents potential SQL injection if Lance doesn't support the $name syntax.
825            Expr::Parameter(_) => None,
826            _ => None,
827        }
828    }
829
830    /// Extracts raw string value from expression for LIKE pattern use.
831    ///
832    /// Returns the raw string without escaping - escaping is handled by
833    /// `escape_like_pattern` for LIKE clauses.
834    fn get_string_value(expr: &Expr) -> Option<String> {
835        match expr {
836            Expr::Literal(CypherLiteral::String(s)) => Some(s.clone()),
837            _ => None,
838        }
839    }
840}
841
842#[cfg(test)]
843mod security_tests {
844    use super::*;
845
846    /// Tests for CWE-89 (SQL Injection) prevention in LIKE patterns.
847    mod wildcard_protection {
848        use super::*;
849
850        #[test]
851        fn test_contains_sql_wildcards_detects_percent() {
852            assert!(LanceFilterGenerator::contains_sql_wildcards("admin%"));
853            assert!(LanceFilterGenerator::contains_sql_wildcards("%admin"));
854            assert!(LanceFilterGenerator::contains_sql_wildcards("ad%min"));
855        }
856
857        #[test]
858        fn test_contains_sql_wildcards_detects_underscore() {
859            assert!(LanceFilterGenerator::contains_sql_wildcards("a_min"));
860            assert!(LanceFilterGenerator::contains_sql_wildcards("_admin"));
861            assert!(LanceFilterGenerator::contains_sql_wildcards("admin_"));
862        }
863
864        #[test]
865        fn test_contains_sql_wildcards_safe_strings() {
866            assert!(!LanceFilterGenerator::contains_sql_wildcards("admin"));
867            assert!(!LanceFilterGenerator::contains_sql_wildcards("John Smith"));
868            assert!(!LanceFilterGenerator::contains_sql_wildcards(
869                "test@example.com"
870            ));
871        }
872
873        #[test]
874        fn test_wildcard_in_contains_not_pushed_down() {
875            // Input with % should NOT be pushed to storage
876            let expr = Expr::BinaryOp {
877                left: Box::new(Expr::Property(
878                    Box::new(Expr::Variable("n".to_string())),
879                    "name".to_string(),
880                )),
881                op: BinaryOp::Contains,
882                right: Box::new(Expr::Literal(CypherLiteral::String("admin%".to_string()))),
883            };
884
885            let filter = LanceFilterGenerator::generate(&[expr], "n", None);
886            assert!(
887                filter.is_none(),
888                "CONTAINS with wildcard should not be pushed to storage"
889            );
890        }
891
892        #[test]
893        fn test_underscore_in_startswith_not_pushed_down() {
894            // Input with _ should NOT be pushed to storage
895            let expr = Expr::BinaryOp {
896                left: Box::new(Expr::Property(
897                    Box::new(Expr::Variable("n".to_string())),
898                    "name".to_string(),
899                )),
900                op: BinaryOp::StartsWith,
901                right: Box::new(Expr::Literal(CypherLiteral::String("user_".to_string()))),
902            };
903
904            let filter = LanceFilterGenerator::generate(&[expr], "n", None);
905            assert!(
906                filter.is_none(),
907                "STARTSWITH with underscore should not be pushed to storage"
908            );
909        }
910
911        #[test]
912        fn test_safe_contains_is_pushed_down() {
913            // Input without wildcards SHOULD be pushed to storage
914            let expr = Expr::BinaryOp {
915                left: Box::new(Expr::Property(
916                    Box::new(Expr::Variable("n".to_string())),
917                    "name".to_string(),
918                )),
919                op: BinaryOp::Contains,
920                right: Box::new(Expr::Literal(CypherLiteral::String("admin".to_string()))),
921            };
922
923            let filter = LanceFilterGenerator::generate(&[expr], "n", None);
924            assert!(filter.is_some(), "Safe CONTAINS should be pushed down");
925            assert!(
926                filter.as_ref().unwrap().contains("LIKE '%admin%'"),
927                "Generated filter: {:?}",
928                filter
929            );
930        }
931
932        #[test]
933        fn test_single_quotes_escaped_in_safe_string() {
934            // Single quotes should be doubled in safe strings
935            let expr = Expr::BinaryOp {
936                left: Box::new(Expr::Property(
937                    Box::new(Expr::Variable("n".to_string())),
938                    "name".to_string(),
939                )),
940                op: BinaryOp::Contains,
941                right: Box::new(Expr::Literal(CypherLiteral::String("O'Brien".to_string()))),
942            };
943
944            let filter = LanceFilterGenerator::generate(&[expr], "n", None).unwrap();
945            assert!(
946                filter.contains("O''Brien"),
947                "Single quotes should be doubled: {}",
948                filter
949            );
950        }
951    }
952
953    /// Tests for parameter handling (not pushed to storage).
954    mod parameter_safety {
955        use super::*;
956
957        #[test]
958        fn test_parameters_not_pushed_down() {
959            let expr = Expr::BinaryOp {
960                left: Box::new(Expr::Property(
961                    Box::new(Expr::Variable("n".to_string())),
962                    "name".to_string(),
963                )),
964                op: BinaryOp::Eq,
965                right: Box::new(Expr::Parameter("userInput".to_string())),
966            };
967
968            let filter = LanceFilterGenerator::generate(&[expr], "n", None);
969            assert!(
970                filter.is_none(),
971                "Parameterized predicates should not be pushed to storage"
972            );
973        }
974    }
975}