Skip to main content

uni_query_functions/
pushdown.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Predicate pushdown and index-aware query routing.
5//!
6//! Routes WHERE predicates to the most selective execution path:
7//! UID index lookup → BTree prefix scan → JSON FTS → Lance columnar filter → residual.
8//! Includes SQL injection prevention for LIKE patterns (CWE-89) and UID validation (CWE-345).
9
10use std::collections::{HashMap, HashSet};
11use uni_cypher::ast::{BinaryOp, CypherLiteral, Expr, UnaryOp};
12
13use uni_common::Value;
14use uni_common::core::id::UniId;
15use uni_common::core::schema::{
16    IndexDefinition, IndexStatus, PropertyMeta, ScalarIndexType, Schema,
17};
18
19/// Categorized pushdown strategy for predicates with index awareness.
20///
21/// This struct represents the optimal execution path for predicates,
22/// routing them to the most selective index when available.
23#[derive(Debug, Clone, Default)]
24pub struct PushdownStrategy {
25    /// UID lookup predicate: WHERE n._uid = 'base32string'
26    /// Contains the UniId parsed from the predicate value.
27    pub uid_lookup: Option<UniId>,
28
29    /// BTree index prefix scans for STARTS WITH predicates.
30    /// When a property has a scalar BTree index, STARTS WITH 'prefix' can be
31    /// converted to a range scan: column >= 'prefix' AND column < 'prefix_next'.
32    /// Vec of: (column_name, lower_bound, upper_bound)
33    pub btree_prefix_scans: Vec<(String, String, String)>,
34
35    /// JSON FTS predicates for full-text search on JSON columns.
36    /// Vec of: (column_name, search_term, optional_path_filter)
37    pub json_fts_predicates: Vec<(String, String, Option<String>)>,
38
39    /// Predicates pushable to Lance scan filter.
40    pub lance_predicates: Vec<Expr>,
41
42    /// Property columns that have an Online Hash scalar index AND a pushable
43    /// equality / IN predicate in this scan. Recorded for two reasons:
44    ///   1. Telemetry — EXPLAIN reports these as `IndexUsage { used: true }`.
45    ///   2. Routing — the planner pushes the matching `lance_predicates` into
46    ///      `GraphScanExec`'s scan-time filter (rather than a `FilterExec` on
47    ///      top), so Lance's scalar-hash index turns the lookup into O(1).
48    ///
49    /// See issue #57.
50    pub hash_index_columns: Vec<String>,
51
52    /// Residual predicates (not pushable to storage).
53    pub residual: Vec<Expr>,
54}
55
56/// Analyzer that considers available indexes when categorizing predicates.
57///
58/// Unlike `PredicateAnalyzer` which only categorizes into pushable/residual,
59/// this analyzer routes predicates to the most optimal execution path:
60/// 1. UID index lookup (most selective, O(1) lookup)
61/// 2. BTree prefix scan (STARTS WITH on scalar-indexed properties)
62/// 3. JSON FTS lookup (BM25 full-text search)
63/// 4. Lance scan filter (columnar scan with filter)
64/// 5. Residual (post-scan evaluation)
65// M-PUBLIC-DEBUG: Schema implements Debug, so the derived impl is sound.
66#[derive(Debug)]
67pub struct IndexAwareAnalyzer<'a> {
68    schema: &'a Schema,
69}
70
71impl<'a> IndexAwareAnalyzer<'a> {
72    /// Create an analyzer bound to the given schema for index-aware predicate routing.
73    pub fn new(schema: &'a Schema) -> Self {
74        Self { schema }
75    }
76
77    /// Analyze predicates and determine optimal pushdown strategy.
78    ///
79    /// Predicates are categorized in order of selectivity:
80    /// 1. `_uid = 'xxx'` -> UID index lookup
81    /// 2. BTree prefix scans for STARTS WITH predicates
82    /// 3. Pushable to Lance -> Lance filter
83    /// 4. Everything else -> Residual
84    pub fn analyze(&self, predicate: &Expr, variable: &str, label_id: u16) -> PushdownStrategy {
85        let mut strategy = PushdownStrategy::default();
86        let conjuncts = Self::split_conjuncts(predicate);
87        let lance_analyzer = PredicateAnalyzer::new();
88
89        for conj in conjuncts {
90            // 1. Check for _uid = 'xxx' pattern (most selective)
91            if let Some(uid) = self.extract_uid_predicate(&conj, variable) {
92                strategy.uid_lookup = Some(uid);
93                continue;
94            }
95
96            // 2. Check for BTree-indexed STARTS WITH predicates
97            if let Some((column, lower, upper)) =
98                self.extract_btree_prefix_scan(&conj, variable, label_id)
99            {
100                strategy.btree_prefix_scans.push((column, lower, upper));
101                continue;
102            }
103
104            // 3. Check for JSON FTS predicates (CONTAINS on FTS-indexed columns)
105            if let Some((column, term, path)) =
106                self.extract_json_fts_predicate(&conj, variable, label_id)
107            {
108                strategy.json_fts_predicates.push((column, term, path));
109                continue;
110            }
111
112            // 4. Check if pushable to Lance
113            if lance_analyzer.is_pushable(&conj, variable) {
114                // 4a. Hash-index point lookup: equality / IN against an
115                // Online hash-indexed property. Record the column so the
116                // planner can push this predicate into the scan filter
117                // (Lance turns it into an O(1) hash-index lookup) and
118                // EXPLAIN can report `used: true`.
119                if let Some(col) = self.hash_index_column(&conj, variable, label_id)
120                    && !strategy.hash_index_columns.contains(&col)
121                {
122                    strategy.hash_index_columns.push(col);
123                }
124                strategy.lance_predicates.push(conj);
125            } else {
126                strategy.residual.push(conj);
127            }
128        }
129
130        strategy
131    }
132
133    /// If `expr` is an equality or IN predicate of the form
134    /// `variable.prop = ...` / `variable.prop IN ...` where `(label, prop)`
135    /// has an Online `ScalarIndexType::Hash` index, return the column name.
136    fn hash_index_column(&self, expr: &Expr, variable: &str, label_id: u16) -> Option<String> {
137        let prop = match expr {
138            Expr::BinaryOp {
139                left,
140                op: BinaryOp::Eq,
141                ..
142            } => match left.as_ref() {
143                Expr::Property(var_expr, prop) => match var_expr.as_ref() {
144                    Expr::Variable(v) if v == variable => prop.clone(),
145                    _ => return None,
146                },
147                _ => return None,
148            },
149            Expr::In { expr: left, .. } => match left.as_ref() {
150                Expr::Property(var_expr, prop) => match var_expr.as_ref() {
151                    Expr::Variable(v) if v == variable => prop.clone(),
152                    _ => return None,
153                },
154                _ => return None,
155            },
156            _ => return None,
157        };
158
159        let label_name = self.schema.label_name_by_id(label_id)?;
160        for idx in &self.schema.indexes {
161            if let IndexDefinition::Scalar(cfg) = idx
162                && cfg.label == *label_name
163                && cfg.properties.contains(&prop)
164                && cfg.index_type == ScalarIndexType::Hash
165                && cfg.metadata.status == IndexStatus::Online
166            {
167                return Some(prop);
168            }
169        }
170        None
171    }
172
173    /// Extract UniId from `_uid = 'xxx'` predicate.
174    ///
175    /// # Security
176    ///
177    /// **CWE-345 (Insufficient Verification)**: The UID value is validated using
178    /// `UniId::from_multibase()` which enforces Base32Lower encoding and 32-byte
179    /// length. Invalid UIDs are rejected and the predicate becomes residual.
180    fn extract_uid_predicate(&self, expr: &Expr, variable: &str) -> Option<UniId> {
181        if let Expr::BinaryOp {
182            left,
183            op: BinaryOp::Eq,
184            right,
185        } = expr
186            && let Expr::Property(var_expr, prop) = left.as_ref()
187            && let Expr::Variable(v) = var_expr.as_ref()
188            && v == variable
189            && prop == "_uid"
190            && let Expr::Literal(CypherLiteral::String(s)) = right.as_ref()
191        {
192            // Security: UniId::from_multibase validates Base32Lower and 32-byte length
193            return UniId::from_multibase(s).ok();
194        }
195        None
196    }
197
198    /// Extract BTree prefix scan for STARTS WITH predicates on scalar-indexed properties.
199    ///
200    /// Returns `Some((column, lower_bound, upper_bound))` if:
201    /// - The predicate is `variable.property STARTS WITH 'prefix'`
202    /// - The property has a scalar BTree index
203    /// - The prefix is non-empty (empty prefix matches all, not worth optimizing)
204    ///
205    /// Converts `column STARTS WITH 'John'` to:
206    /// `column >= 'John' AND column < 'Joho'`
207    fn extract_btree_prefix_scan(
208        &self,
209        expr: &Expr,
210        variable: &str,
211        label_id: u16,
212    ) -> Option<(String, String, String)> {
213        if let Expr::BinaryOp {
214            left,
215            op: BinaryOp::StartsWith,
216            right,
217        } = expr
218            && let Expr::Property(var_expr, prop) = left.as_ref()
219            && let Expr::Variable(v) = var_expr.as_ref()
220            && v == variable
221            && let Expr::Literal(CypherLiteral::String(prefix)) = right.as_ref()
222        {
223            // Skip empty prefix (matches all, no optimization benefit)
224            if prefix.is_empty() {
225                return None;
226            }
227
228            // Check if property has a scalar BTree index
229            let label_name = self.schema.label_name_by_id(label_id)?;
230
231            for idx in &self.schema.indexes {
232                if let IndexDefinition::Scalar(cfg) = idx
233                    && cfg.label == *label_name
234                    && cfg.properties.contains(prop)
235                    && cfg.index_type == ScalarIndexType::BTree
236                    && cfg.metadata.status == IndexStatus::Online
237                {
238                    // Calculate the upper bound by incrementing the last character
239                    // For "John" -> "Joho"
240                    // This works for ASCII and most UTF-8 strings
241                    if let Some(upper) = increment_last_char(prefix) {
242                        return Some((prop.clone(), prefix.clone(), upper));
243                    }
244                }
245            }
246        }
247        None
248    }
249
250    /// Extract JSON FTS predicate from CONTAINS on an FTS-indexed column.
251    ///
252    /// Returns `Some((column, search_term, optional_path))` if:
253    /// - The predicate is `variable.column CONTAINS 'term'`
254    /// - The column has a `JsonFullText` index
255    fn extract_json_fts_predicate(
256        &self,
257        expr: &Expr,
258        variable: &str,
259        label_id: u16,
260    ) -> Option<(String, String, Option<String>)> {
261        if let Expr::BinaryOp {
262            left,
263            op: BinaryOp::Contains,
264            right,
265        } = expr
266            && let Expr::Property(var_expr, prop) = left.as_ref()
267            && let Expr::Variable(v) = var_expr.as_ref()
268            && v == variable
269            && let Expr::Literal(CypherLiteral::String(term)) = right.as_ref()
270        {
271            let label_name = self.schema.label_name_by_id(label_id)?;
272
273            // Check if property has a JsonFullText index
274            for idx in &self.schema.indexes {
275                if let IndexDefinition::JsonFullText(cfg) = idx
276                    && cfg.label == *label_name
277                    && cfg.column == *prop
278                    && cfg.metadata.status == IndexStatus::Online
279                {
280                    return Some((prop.clone(), term.clone(), None));
281                }
282            }
283        }
284        None
285    }
286
287    /// Split AND-connected predicates into a list.
288    fn split_conjuncts(expr: &Expr) -> Vec<Expr> {
289        match expr {
290            Expr::BinaryOp {
291                left,
292                op: BinaryOp::And,
293                right,
294            } => {
295                let mut result = Self::split_conjuncts(left);
296                result.extend(Self::split_conjuncts(right));
297                result
298            }
299            _ => vec![expr.clone()],
300        }
301    }
302}
303
304/// Split result of predicate analysis: pushable vs residual.
305#[derive(Debug)]
306pub struct PredicateAnalysis {
307    /// Predicates that can be pushed to storage
308    pub pushable: Vec<Expr>,
309    /// Predicates that must be evaluated post-scan
310    pub residual: Vec<Expr>,
311    /// Properties needed for residual evaluation
312    pub required_properties: Vec<String>,
313}
314
315/// Classifies predicates as pushable to Lance or residual (post-scan).
316#[derive(Debug, Default)]
317pub struct PredicateAnalyzer;
318
319impl PredicateAnalyzer {
320    /// Create a new analyzer for classifying predicates.
321    pub fn new() -> Self {
322        Self
323    }
324
325    /// Split a predicate into pushable (Lance) and residual (post-scan) parts.
326    pub fn analyze(&self, predicate: &Expr, scan_variable: &str) -> PredicateAnalysis {
327        let mut pushable = Vec::new();
328        let mut residual = Vec::new();
329
330        self.split_conjuncts(predicate, scan_variable, &mut pushable, &mut residual);
331
332        let required_properties = self.extract_properties(&residual, scan_variable);
333
334        PredicateAnalysis {
335            pushable,
336            residual,
337            required_properties,
338        }
339    }
340
341    /// Split AND-connected predicates
342    fn split_conjuncts(
343        &self,
344        expr: &Expr,
345        variable: &str,
346        pushable: &mut Vec<Expr>,
347        residual: &mut Vec<Expr>,
348    ) {
349        // Try OR-to-IN conversion first
350        if let Some(in_expr) = try_or_to_in(expr, variable)
351            && self.is_pushable(&in_expr, variable)
352        {
353            pushable.push(in_expr);
354            return;
355        }
356
357        match expr {
358            Expr::BinaryOp {
359                left,
360                op: BinaryOp::And,
361                right,
362            } => {
363                self.split_conjuncts(left, variable, pushable, residual);
364                self.split_conjuncts(right, variable, pushable, residual);
365            }
366            _ => {
367                if self.is_pushable(expr, variable) {
368                    pushable.push(expr.clone());
369                } else {
370                    residual.push(expr.clone());
371                }
372            }
373        }
374    }
375
376    /// Returns `true` if a predicate can be pushed down to Lance storage.
377    pub fn is_pushable(&self, expr: &Expr, variable: &str) -> bool {
378        match expr {
379            Expr::In {
380                expr: left,
381                list: right,
382            } => {
383                // Check left side is a property of the scan variable
384                let left_is_property = matches!(
385                    left.as_ref(),
386                    Expr::Property(box_expr, _) if matches!(box_expr.as_ref(), Expr::Variable(v) if v == variable)
387                );
388                // Check right side is list or parameter
389                let right_valid = matches!(right.as_ref(), Expr::List(_) | Expr::Parameter(_));
390                left_is_property && right_valid
391            }
392            Expr::BinaryOp { left, op, right } => {
393                // Check operator is supported
394                let op_supported = matches!(
395                    op,
396                    BinaryOp::Eq
397                        | BinaryOp::NotEq
398                        | BinaryOp::Lt
399                        | BinaryOp::LtEq
400                        | BinaryOp::Gt
401                        | BinaryOp::GtEq
402                        | BinaryOp::Contains
403                        | BinaryOp::StartsWith
404                        | BinaryOp::EndsWith
405                );
406
407                if !op_supported {
408                    return false;
409                }
410
411                // Check left side is a property of the scan variable
412                // Structure: Property(Identifier(var), prop_name)
413                let left_is_property = matches!(
414                    left.as_ref(),
415                    Expr::Property(box_expr, _) if matches!(box_expr.as_ref(), Expr::Variable(v) if v == variable)
416                );
417
418                // Check right side is a literal or parameter or list of literals
419                // For string operators, strict requirement on String Literal
420                let right_valid = if matches!(
421                    op,
422                    BinaryOp::Contains | BinaryOp::StartsWith | BinaryOp::EndsWith
423                ) {
424                    matches!(right.as_ref(), Expr::Literal(CypherLiteral::String(_)))
425                } else {
426                    matches!(
427                        right.as_ref(),
428                        Expr::Literal(_) | Expr::Parameter(_) | Expr::List(_)
429                    )
430                };
431
432                left_is_property && right_valid
433            }
434            Expr::UnaryOp {
435                op: UnaryOp::Not,
436                expr,
437            } => self.is_pushable(expr, variable),
438
439            Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
440                // Check if inner is a property of the scan variable
441                matches!(
442                    inner.as_ref(),
443                    Expr::Property(var_expr, _)
444                        if matches!(var_expr.as_ref(), Expr::Variable(v) if v == variable)
445                )
446            }
447
448            _ => false,
449        }
450    }
451
452    /// Extract property names required by residual predicates
453    fn extract_properties(&self, exprs: &[Expr], variable: &str) -> Vec<String> {
454        let mut props = HashSet::new();
455        for expr in exprs {
456            collect_properties(expr, variable, &mut props);
457        }
458        props.into_iter().collect()
459    }
460}
461
462/// Detect a chain of single-label `LabelCheck`s combined with `OR` over
463/// the same variable, collecting the labels into a flat list.
464///
465/// Example: `n:Person OR n:Organization` → `Some(["Person", "Organization"])`.
466///
467/// Returns `None` if the predicate isn't a pure OR-tree of single-label
468/// label checks on `variable` (mixed predicates, multi-label conjunctions,
469/// or different variables abort the rewrite). The `LabelCheck` AST node
470/// uses `labels: Vec<String>` with conjunction semantics (`n:A:B`); we
471/// only accept single-element lists since a conjunctive leaf can't be
472/// expressed as a label-scoped scan without an additional residual filter.
473pub fn try_label_or_to_union(expr: &Expr, variable: &str) -> Option<Vec<String>> {
474    let mut labels: Vec<String> = Vec::new();
475    if collect_or_branches(expr, variable, &mut labels, &label_leaf) && labels.len() >= 2 {
476        Some(labels)
477    } else {
478        None
479    }
480}
481
482/// Walk an `OR`-tree, requiring every leaf to satisfy `leaf` (push into
483/// `out` and return true). Shared by `try_label_or_to_union` and
484/// `try_type_or_to_union`; the predicates differ only in what shape they
485/// recognize at the leaf.
486fn collect_or_branches<F>(expr: &Expr, variable: &str, out: &mut Vec<String>, leaf: &F) -> bool
487where
488    F: Fn(&Expr, &str, &mut Vec<String>) -> bool,
489{
490    match expr {
491        Expr::BinaryOp {
492            left,
493            op: BinaryOp::Or,
494            right,
495        } => {
496            collect_or_branches(left, variable, out, leaf)
497                && collect_or_branches(right, variable, out, leaf)
498        }
499        _ => leaf(expr, variable, out),
500    }
501}
502
503fn label_leaf(expr: &Expr, variable: &str, out: &mut Vec<String>) -> bool {
504    let Expr::LabelCheck {
505        expr: target,
506        labels,
507    } = expr
508    else {
509        return false;
510    };
511    if labels.len() != 1 {
512        // Conjunction-of-multiple is not pushable as a single label scan
513        // branch — fall back to residual filter.
514        return false;
515    }
516    if let Expr::Variable(v) = target.as_ref()
517        && v == variable
518    {
519        out.push(labels[0].clone());
520        return true;
521    }
522    false
523}
524
525/// Detect a chain of `type(r) = 'A'` equality checks combined with `OR`
526/// over the same relationship variable, collecting the type names.
527///
528/// Example: `type(r) = 'KNOWS' OR type(r) = 'FOLLOWS'` →
529/// `Some(["KNOWS", "FOLLOWS"])`.
530pub fn try_type_or_to_union(expr: &Expr, variable: &str) -> Option<Vec<String>> {
531    let mut types: Vec<String> = Vec::new();
532    if collect_or_branches(expr, variable, &mut types, &type_eq_leaf) && types.len() >= 2 {
533        Some(types)
534    } else {
535        None
536    }
537}
538
539fn type_eq_leaf(expr: &Expr, variable: &str, out: &mut Vec<String>) -> bool {
540    let Expr::BinaryOp {
541        left,
542        op: BinaryOp::Eq,
543        right,
544    } = expr
545    else {
546        return false;
547    };
548    is_type_eq_string(left, right, variable, out) || is_type_eq_string(right, left, variable, out)
549}
550
551fn is_type_eq_string(
552    fn_side: &Expr,
553    str_side: &Expr,
554    variable: &str,
555    out: &mut Vec<String>,
556) -> bool {
557    if let Expr::FunctionCall { name, args, .. } = fn_side
558        && name.eq_ignore_ascii_case("type")
559        && args.len() == 1
560        && let Expr::Variable(v) = &args[0]
561        && v == variable
562        && let Expr::Literal(CypherLiteral::String(s)) = str_side
563    {
564        out.push(s.clone());
565        return true;
566    }
567    false
568}
569
570/// Attempt to convert OR disjunctions to IN predicates
571fn try_or_to_in(expr: &Expr, variable: &str) -> Option<Expr> {
572    match expr {
573        Expr::BinaryOp {
574            op: BinaryOp::Or, ..
575        } => {
576            // Collect all equality comparisons on the same property
577            let mut property: Option<String> = None;
578            let mut values: Vec<Expr> = Vec::new();
579
580            if collect_or_equals(expr, variable, &mut property, &mut values)
581                && let Some(prop) = property
582                && values.len() >= 2
583            {
584                return Some(Expr::In {
585                    expr: Box::new(Expr::Property(
586                        Box::new(Expr::Variable(variable.to_string())),
587                        prop,
588                    )),
589                    list: Box::new(Expr::List(values)),
590                });
591            }
592            None
593        }
594        _ => None,
595    }
596}
597
598fn collect_or_equals(
599    expr: &Expr,
600    variable: &str,
601    property: &mut Option<String>,
602    values: &mut Vec<Expr>,
603) -> bool {
604    match expr {
605        Expr::BinaryOp {
606            left,
607            op: BinaryOp::Or,
608            right,
609        } => {
610            collect_or_equals(left, variable, property, values)
611                && collect_or_equals(right, variable, property, values)
612        }
613        Expr::BinaryOp {
614            left,
615            op: BinaryOp::Eq,
616            right,
617        } => {
618            if let Expr::Property(var_expr, prop) = left.as_ref()
619                && let Expr::Variable(v) = var_expr.as_ref()
620                && v == variable
621            {
622                match property {
623                    None => {
624                        *property = Some(prop.clone());
625                        values.push(right.as_ref().clone());
626                        return true;
627                    }
628                    Some(p) if p == prop => {
629                        values.push(right.as_ref().clone());
630                        return true;
631                    }
632                    _ => return false, // Different properties
633                }
634            }
635            false
636        }
637        _ => false,
638    }
639}
640
641fn collect_properties(expr: &Expr, variable: &str, props: &mut HashSet<String>) {
642    match expr {
643        Expr::Property(box_expr, prop) => {
644            if let Expr::Variable(v) = box_expr.as_ref()
645                && v == variable
646            {
647                props.insert(prop.clone());
648            }
649        }
650        Expr::BinaryOp { left, right, .. } => {
651            collect_properties(left, variable, props);
652            collect_properties(right, variable, props);
653        }
654        Expr::UnaryOp { expr, .. } => {
655            collect_properties(expr, variable, props);
656        }
657        Expr::IsNull(expr) | Expr::IsNotNull(expr) => {
658            collect_properties(expr, variable, props);
659        }
660        Expr::List(items) => {
661            for item in items {
662                collect_properties(item, variable, props);
663            }
664        }
665        Expr::Map(items) => {
666            for (_, item) in items {
667                collect_properties(item, variable, props);
668            }
669        }
670        Expr::FunctionCall { args, .. } => {
671            for arg in args {
672                collect_properties(arg, variable, props);
673            }
674        }
675        Expr::ArrayIndex {
676            array: arr,
677            index: idx,
678        } => {
679            collect_properties(arr, variable, props);
680            collect_properties(idx, variable, props);
681        }
682        _ => {}
683    }
684}
685
686/// Increment the last character of a string to create an exclusive upper bound.
687///
688/// For ASCII strings, this increments the last character.
689/// For example: "John" -> "Joho"
690///
691/// Returns `None` if the last character is at its maximum value (cannot be incremented).
692fn increment_last_char(s: &str) -> Option<String> {
693    if s.is_empty() {
694        return None;
695    }
696
697    let mut chars: Vec<char> = s.chars().collect();
698    let last_idx = chars.len() - 1;
699    let last_char = chars[last_idx];
700
701    // Increment the last character
702    // For most ASCII/UTF-8 characters, this works correctly
703    if let Some(next_char) = char::from_u32(last_char as u32 + 1) {
704        chars[last_idx] = next_char;
705        Some(chars.into_iter().collect())
706    } else {
707        // Last character is at maximum, cannot increment
708        None
709    }
710}
711
712/// Flatten nested AND expressions into a vector
713fn flatten_ands(expr: &Expr) -> Vec<&Expr> {
714    match expr {
715        Expr::BinaryOp {
716            left,
717            op: BinaryOp::And,
718            right,
719        } => {
720            let mut result = flatten_ands(left);
721            result.extend(flatten_ands(right));
722            result
723        }
724        _ => vec![expr],
725    }
726}
727
728/// Converts pushable predicates to Lance SQL filter strings.
729#[derive(Debug)]
730pub struct LanceFilterGenerator;
731
732impl LanceFilterGenerator {
733    /// Checks if a string contains SQL LIKE wildcard characters.
734    ///
735    /// # Security
736    ///
737    /// **CWE-89 (SQL Injection)**: Predicates containing wildcards are NOT pushed
738    /// to storage because Lance DataFusion doesn't support the ESCAPE clause.
739    /// Instead, they're evaluated at the application layer where we have full
740    /// control over string matching semantics.
741    fn contains_sql_wildcards(s: &str) -> bool {
742        s.contains('%') || s.contains('_')
743    }
744
745    /// Escapes special characters in LIKE patterns.
746    ///
747    /// **Note**: This function is kept for documentation and potential future use,
748    /// but currently we do not push down LIKE patterns containing wildcards
749    /// because Lance DataFusion doesn't support the ESCAPE clause.
750    #[expect(
751        dead_code,
752        reason = "Reserved for future use when Lance supports ESCAPE"
753    )]
754    fn escape_like_pattern(s: &str) -> String {
755        s.replace('\\', "\\\\")
756            .replace('%', "\\%")
757            .replace('_', "\\_")
758            .replace('\'', "''")
759    }
760
761    /// Converts pushable predicates to Lance SQL filter string.
762    ///
763    /// When `schema_props` is provided, properties not in the schema (overflow properties)
764    /// are skipped since they don't exist as physical columns in Lance.
765    pub fn generate(
766        predicates: &[Expr],
767        variable: &str,
768        schema_props: Option<&HashMap<String, PropertyMeta>>,
769    ) -> Option<String> {
770        if predicates.is_empty() {
771            return None;
772        }
773
774        // Flatten nested ANDs first
775        let flattened: Vec<&Expr> = predicates.iter().flat_map(|p| flatten_ands(p)).collect();
776
777        // Optimize Ranges: Group predicates by column and combine into >= AND <= if possible
778        let mut by_column: HashMap<String, Vec<&Expr>> = HashMap::new();
779        let mut optimized_filters: Vec<String> = Vec::new();
780        let mut used_expressions: HashSet<*const Expr> = HashSet::new();
781
782        for expr in &flattened {
783            if let Some(col) = Self::extract_column_from_range(expr, variable, schema_props) {
784                by_column.entry(col).or_default().push(expr);
785            }
786        }
787
788        for (col, exprs) in &by_column {
789            if exprs.len() < 2 {
790                continue;
791            }
792
793            // Try to find pairs of >/>= and </<=
794            // Very naive: find ONE pair and emit range expression.
795            // Complex ranges (e.g. >10 AND >20) are not merged but valid.
796            // We look for: (col > L OR col >= L) AND (col < R OR col <= R)
797
798            let mut lower: Option<(bool, &Expr, &Expr)> = None; // (inclusive, val_expr, original_expr)
799            let mut upper: Option<(bool, &Expr, &Expr)> = None;
800
801            for expr in exprs {
802                if let Expr::BinaryOp { op, right, .. } = expr {
803                    match op {
804                        BinaryOp::Gt => {
805                            // If we have multiple lower bounds, pick the last one (arbitrary for now, intersection handles logic)
806                            lower = Some((false, right, expr));
807                        }
808                        BinaryOp::GtEq => {
809                            lower = Some((true, right, expr));
810                        }
811                        BinaryOp::Lt => {
812                            upper = Some((false, right, expr));
813                        }
814                        BinaryOp::LtEq => {
815                            upper = Some((true, right, expr));
816                        }
817                        _ => {}
818                    }
819                }
820            }
821
822            if let (Some((true, l_val, l_expr)), Some((true, u_val, u_expr))) = (lower, upper) {
823                // Both inclusive -> use >= AND <= (Lance doesn't support BETWEEN)
824                if let (Some(l_str), Some(u_str)) =
825                    (Self::value_to_lance(l_val), Self::value_to_lance(u_val))
826                {
827                    optimized_filters.push(format!(
828                        "\"{}\" >= {} AND \"{}\" <= {}",
829                        col, l_str, col, u_str
830                    ));
831                    used_expressions.insert(l_expr as *const Expr);
832                    used_expressions.insert(u_expr as *const Expr);
833                }
834            }
835        }
836
837        let mut filters = optimized_filters;
838
839        for expr in flattened {
840            if used_expressions.contains(&(expr as *const Expr)) {
841                continue;
842            }
843            if let Some(s) = Self::expr_to_lance(expr, variable, schema_props) {
844                filters.push(s);
845            }
846        }
847
848        if filters.is_empty() {
849            None
850        } else {
851            Some(filters.join(" AND "))
852        }
853    }
854
855    fn extract_column_from_range(
856        expr: &Expr,
857        variable: &str,
858        schema_props: Option<&HashMap<String, PropertyMeta>>,
859    ) -> Option<String> {
860        match expr {
861            Expr::BinaryOp { left, op, .. } => {
862                if matches!(
863                    op,
864                    BinaryOp::Gt | BinaryOp::GtEq | BinaryOp::Lt | BinaryOp::LtEq
865                ) {
866                    return Self::extract_column(left, variable, schema_props);
867                }
868                None
869            }
870            _ => None,
871        }
872    }
873
874    fn expr_to_lance(
875        expr: &Expr,
876        variable: &str,
877        schema_props: Option<&HashMap<String, PropertyMeta>>,
878    ) -> Option<String> {
879        match expr {
880            Expr::In {
881                expr: left,
882                list: right,
883            } => {
884                let column = Self::extract_column(left, variable, schema_props)?;
885                let value = Self::value_to_lance(right)?;
886                Some(format!("{} IN {}", column, value))
887            }
888            Expr::BinaryOp { left, op, right } => {
889                let column = Self::extract_column(left, variable, schema_props)?;
890
891                // Special handling for string operators
892                // Security: CWE-89 - Prevent SQL wildcard injection
893                //
894                // Lance DataFusion doesn't support the ESCAPE clause, so we cannot
895                // safely push down LIKE predicates containing SQL wildcards (% or _).
896                // If the input contains these characters, we return None to keep
897                // the predicate as a residual for application-level evaluation.
898                match op {
899                    BinaryOp::Contains | BinaryOp::StartsWith | BinaryOp::EndsWith => {
900                        let raw_value = Self::get_string_value(right)?;
901
902                        // If the value contains SQL wildcards, don't push down
903                        // to prevent wildcard injection attacks
904                        if Self::contains_sql_wildcards(&raw_value) {
905                            return None;
906                        }
907
908                        // Escape single quotes for the SQL string
909                        let escaped = raw_value.replace('\'', "''");
910
911                        match op {
912                            BinaryOp::Contains => Some(format!("{} LIKE '%{}%'", column, escaped)),
913                            BinaryOp::StartsWith => Some(format!("{} LIKE '{}%'", column, escaped)),
914                            BinaryOp::EndsWith => Some(format!("{} LIKE '%{}'", column, escaped)),
915                            _ => unreachable!(),
916                        }
917                    }
918                    _ => {
919                        let op_str = Self::op_to_lance(op)?;
920                        let value = Self::value_to_lance(right)?;
921                        // Use unquoted column name for DataFusion compatibility
922                        // DataFusion treats unquoted identifiers case-insensitively
923                        Some(format!("{} {} {}", column, op_str, value))
924                    }
925                }
926            }
927            Expr::UnaryOp {
928                op: UnaryOp::Not,
929                expr,
930            } => {
931                let inner = Self::expr_to_lance(expr, variable, schema_props)?;
932                Some(format!("NOT ({})", inner))
933            }
934            Expr::IsNull(inner) => {
935                let column = Self::extract_column(inner, variable, schema_props)?;
936                Some(format!("{} IS NULL", column))
937            }
938            Expr::IsNotNull(inner) => {
939                let column = Self::extract_column(inner, variable, schema_props)?;
940                Some(format!("{} IS NOT NULL", column))
941            }
942            _ => None,
943        }
944    }
945
946    fn extract_column(
947        expr: &Expr,
948        variable: &str,
949        schema_props: Option<&HashMap<String, PropertyMeta>>,
950    ) -> Option<String> {
951        match expr {
952            Expr::Property(box_expr, prop) => {
953                if let Expr::Variable(var) = box_expr.as_ref()
954                    && var == variable
955                {
956                    // System columns (starting with _) are always physical Lance columns
957                    if prop.starts_with('_') {
958                        return Some(prop.clone());
959                    }
960                    // If schema_props is provided, only allow properties that are
961                    // physical columns in Lance. Overflow properties (not in schema)
962                    // don't exist as Lance columns.
963                    // If schema_props is Some but empty (schemaless label), ALL
964                    // non-system properties are overflow.
965                    // If schema_props is None, no filtering is applied (caller
966                    // doesn't have schema info).
967                    if let Some(props) = schema_props
968                        && !props.contains_key(prop.as_str())
969                    {
970                        return None;
971                    }
972                    return Some(prop.clone());
973                }
974                None
975            }
976            _ => None,
977        }
978    }
979
980    fn op_to_lance(op: &BinaryOp) -> Option<&'static str> {
981        match op {
982            BinaryOp::Eq => Some("="),
983            BinaryOp::NotEq => Some("!="),
984            BinaryOp::Lt => Some("<"),
985            BinaryOp::LtEq => Some("<="),
986            BinaryOp::Gt => Some(">"),
987            BinaryOp::GtEq => Some(">="),
988            _ => None,
989        }
990    }
991
992    fn value_to_lance(expr: &Expr) -> Option<String> {
993        match expr {
994            Expr::Literal(CypherLiteral::String(s)) => {
995                // Normalize datetime strings to include seconds for Arrow timestamp parsing.
996                // Our Cypher datetime formatting omits `:00` seconds (e.g. `2021-06-01T00:00Z`)
997                // but Arrow/Lance requires full `HH:MM:SS` for timestamp parsing.
998                let s = super::df_expr::normalize_datetime_str(s).unwrap_or_else(|| s.clone());
999                Some(format!("'{}'", s.replace("'", "''")))
1000            }
1001            Expr::Literal(CypherLiteral::Integer(i)) => Some(i.to_string()),
1002            Expr::Literal(CypherLiteral::Float(f)) => Some(f.to_string()),
1003            Expr::Literal(CypherLiteral::Bool(b)) => Some(b.to_string()),
1004            Expr::Literal(CypherLiteral::Null) => Some("NULL".to_string()),
1005            Expr::List(items) => {
1006                let values: Option<Vec<String>> = items.iter().map(Self::value_to_lance).collect();
1007                values.map(|v| format!("({})", v.join(", ")))
1008            }
1009            // Security: CWE-89 - Parameters are NOT pushed to storage layer.
1010            // Parameterized predicates stay in the application layer where the
1011            // query executor can safely substitute values with proper type handling.
1012            // This prevents potential SQL injection if Lance doesn't support the $name syntax.
1013            Expr::Parameter(_) => None,
1014            _ => None,
1015        }
1016    }
1017
1018    /// Extracts raw string value from expression for LIKE pattern use.
1019    ///
1020    /// Returns the raw string without escaping - escaping is handled by
1021    /// `escape_like_pattern` for LIKE clauses.
1022    fn get_string_value(expr: &Expr) -> Option<String> {
1023        match expr {
1024            Expr::Literal(CypherLiteral::String(s)) => Some(s.clone()),
1025            _ => None,
1026        }
1027    }
1028}
1029
1030/// If `expr` is a property predicate (`Property(var, p) <op> _` or
1031/// `Property(var, p) IN _`) on `variable`, return the property name.
1032/// Used by the planner to match analyzer-detected hash-index columns back
1033/// to the originating predicate.
1034pub fn predicate_target_column(expr: &Expr, variable: &str) -> Option<String> {
1035    let prop_side = match expr {
1036        Expr::BinaryOp { left, .. } => left.as_ref(),
1037        Expr::In { expr: left, .. } => left.as_ref(),
1038        Expr::IsNull(inner) | Expr::IsNotNull(inner) => inner.as_ref(),
1039        _ => return None,
1040    };
1041    if let Expr::Property(var_expr, prop) = prop_side
1042        && let Expr::Variable(v) = var_expr.as_ref()
1043        && v == variable
1044    {
1045        return Some(prop.clone());
1046    }
1047    None
1048}
1049
1050/// Convert a runtime `Value` to a Cypher AST `Expr` literal.
1051///
1052/// Returns `None` for variants we cannot represent inline in a pushed-down
1053/// Lance filter (e.g. nodes/edges/paths). Maps and lists nest.
1054fn value_to_expr(v: &Value) -> Option<Expr> {
1055    Some(match v {
1056        Value::Null => Expr::Literal(CypherLiteral::Null),
1057        Value::Bool(b) => Expr::Literal(CypherLiteral::Bool(*b)),
1058        Value::Int(i) => Expr::Literal(CypherLiteral::Integer(*i)),
1059        Value::Float(f) => Expr::Literal(CypherLiteral::Float(*f)),
1060        Value::String(s) => Expr::Literal(CypherLiteral::String(s.clone())),
1061        Value::List(items) => {
1062            let items: Option<Vec<Expr>> = items.iter().map(value_to_expr).collect();
1063            Expr::List(items?)
1064        }
1065        // Bytes / Map / Node / Edge / Path / Vector / Temporal can't be
1066        // represented as a Lance literal here. Bail out and let the caller
1067        // fall back to FilterExec.
1068        _ => return None,
1069    })
1070}
1071
1072/// Recursively replace `Expr::Parameter(name)` with a literal `Expr` resolved
1073/// from `params`. Returns `None` if any parameter is missing or its `Value`
1074/// cannot be represented as a Cypher literal (so the predicate cannot be
1075/// safely pushed to storage and the caller should fall back).
1076///
1077/// `LanceFilterGenerator::value_to_lance` deliberately rejects
1078/// `Expr::Parameter` to prevent SQL injection (CWE-89). Substituting at plan
1079/// time with the resolved value sidesteps that — values come from already-
1080/// authenticated query params and are emitted via the same string-escaping
1081/// path as inline literals.
1082pub fn substitute_params(expr: &Expr, params: &HashMap<String, Value>) -> Option<Expr> {
1083    Some(match expr {
1084        Expr::Parameter(name) => value_to_expr(params.get(name)?)?,
1085        Expr::BinaryOp { left, op, right } => Expr::BinaryOp {
1086            left: Box::new(substitute_params(left, params)?),
1087            op: *op,
1088            right: Box::new(substitute_params(right, params)?),
1089        },
1090        Expr::UnaryOp { op, expr: inner } => Expr::UnaryOp {
1091            op: *op,
1092            expr: Box::new(substitute_params(inner, params)?),
1093        },
1094        Expr::In { expr: left, list } => Expr::In {
1095            expr: Box::new(substitute_params(left, params)?),
1096            list: Box::new(substitute_params(list, params)?),
1097        },
1098        Expr::IsNull(inner) => Expr::IsNull(Box::new(substitute_params(inner, params)?)),
1099        Expr::IsNotNull(inner) => Expr::IsNotNull(Box::new(substitute_params(inner, params)?)),
1100        Expr::List(items) => {
1101            let items: Option<Vec<Expr>> =
1102                items.iter().map(|i| substitute_params(i, params)).collect();
1103            Expr::List(items?)
1104        }
1105        // Leaves with no parameter references — passthrough.
1106        _ => expr.clone(),
1107    })
1108}
1109
1110#[cfg(test)]
1111mod security_tests {
1112    use super::*;
1113
1114    /// Tests for CWE-89 (SQL Injection) prevention in LIKE patterns.
1115    mod wildcard_protection {
1116        use super::*;
1117
1118        #[test]
1119        fn test_contains_sql_wildcards_detects_percent() {
1120            assert!(LanceFilterGenerator::contains_sql_wildcards("admin%"));
1121            assert!(LanceFilterGenerator::contains_sql_wildcards("%admin"));
1122            assert!(LanceFilterGenerator::contains_sql_wildcards("ad%min"));
1123        }
1124
1125        #[test]
1126        fn test_contains_sql_wildcards_detects_underscore() {
1127            assert!(LanceFilterGenerator::contains_sql_wildcards("a_min"));
1128            assert!(LanceFilterGenerator::contains_sql_wildcards("_admin"));
1129            assert!(LanceFilterGenerator::contains_sql_wildcards("admin_"));
1130        }
1131
1132        #[test]
1133        fn test_contains_sql_wildcards_safe_strings() {
1134            assert!(!LanceFilterGenerator::contains_sql_wildcards("admin"));
1135            assert!(!LanceFilterGenerator::contains_sql_wildcards("John Smith"));
1136            assert!(!LanceFilterGenerator::contains_sql_wildcards(
1137                "test@example.com"
1138            ));
1139        }
1140
1141        #[test]
1142        fn test_wildcard_in_contains_not_pushed_down() {
1143            // Input with % should NOT be pushed to storage
1144            let expr = Expr::BinaryOp {
1145                left: Box::new(Expr::Property(
1146                    Box::new(Expr::Variable("n".to_string())),
1147                    "name".to_string(),
1148                )),
1149                op: BinaryOp::Contains,
1150                right: Box::new(Expr::Literal(CypherLiteral::String("admin%".to_string()))),
1151            };
1152
1153            let filter = LanceFilterGenerator::generate(&[expr], "n", None);
1154            assert!(
1155                filter.is_none(),
1156                "CONTAINS with wildcard should not be pushed to storage"
1157            );
1158        }
1159
1160        #[test]
1161        fn test_underscore_in_startswith_not_pushed_down() {
1162            // Input with _ should NOT be pushed to storage
1163            let expr = Expr::BinaryOp {
1164                left: Box::new(Expr::Property(
1165                    Box::new(Expr::Variable("n".to_string())),
1166                    "name".to_string(),
1167                )),
1168                op: BinaryOp::StartsWith,
1169                right: Box::new(Expr::Literal(CypherLiteral::String("user_".to_string()))),
1170            };
1171
1172            let filter = LanceFilterGenerator::generate(&[expr], "n", None);
1173            assert!(
1174                filter.is_none(),
1175                "STARTSWITH with underscore should not be pushed to storage"
1176            );
1177        }
1178
1179        #[test]
1180        fn test_safe_contains_is_pushed_down() {
1181            // Input without wildcards SHOULD be pushed to storage
1182            let expr = Expr::BinaryOp {
1183                left: Box::new(Expr::Property(
1184                    Box::new(Expr::Variable("n".to_string())),
1185                    "name".to_string(),
1186                )),
1187                op: BinaryOp::Contains,
1188                right: Box::new(Expr::Literal(CypherLiteral::String("admin".to_string()))),
1189            };
1190
1191            let filter = LanceFilterGenerator::generate(&[expr], "n", None);
1192            assert!(filter.is_some(), "Safe CONTAINS should be pushed down");
1193            assert!(
1194                filter.as_ref().unwrap().contains("LIKE '%admin%'"),
1195                "Generated filter: {:?}",
1196                filter
1197            );
1198        }
1199
1200        #[test]
1201        fn test_single_quotes_escaped_in_safe_string() {
1202            // Single quotes should be doubled in safe strings
1203            let expr = Expr::BinaryOp {
1204                left: Box::new(Expr::Property(
1205                    Box::new(Expr::Variable("n".to_string())),
1206                    "name".to_string(),
1207                )),
1208                op: BinaryOp::Contains,
1209                right: Box::new(Expr::Literal(CypherLiteral::String("O'Brien".to_string()))),
1210            };
1211
1212            let filter = LanceFilterGenerator::generate(&[expr], "n", None).unwrap();
1213            assert!(
1214                filter.contains("O''Brien"),
1215                "Single quotes should be doubled: {}",
1216                filter
1217            );
1218        }
1219    }
1220
1221    /// Tests for parameter handling (not pushed to storage).
1222    mod parameter_safety {
1223        use super::*;
1224
1225        #[test]
1226        fn test_parameters_not_pushed_down() {
1227            let expr = Expr::BinaryOp {
1228                left: Box::new(Expr::Property(
1229                    Box::new(Expr::Variable("n".to_string())),
1230                    "name".to_string(),
1231                )),
1232                op: BinaryOp::Eq,
1233                right: Box::new(Expr::Parameter("userInput".to_string())),
1234            };
1235
1236            let filter = LanceFilterGenerator::generate(&[expr], "n", None);
1237            assert!(
1238                filter.is_none(),
1239                "Parameterized predicates should not be pushed to storage"
1240            );
1241        }
1242    }
1243}