Skip to main content

reddb_server/storage/query/executors/
subquery.rs

1//! Subquery Executor
2//!
3//! Provides support for nested queries within SQL expressions.
4//!
5//! # Subquery Types
6//!
7//! - **Scalar**: Single-value subquery `x = (SELECT max(y) FROM ...)`
8//! - **EXISTS**: Existence check `EXISTS (SELECT * FROM ... WHERE ...)`
9//! - **IN**: Set membership `x IN (SELECT y FROM ...)`
10//! - **NOT IN**: Set non-membership
11//! - **ANY/ALL**: Comparison with set `x > ANY (SELECT ...)`
12//!
13//! # Correlation
14//!
15//! - **Correlated**: References outer query columns, evaluated per row
16//! - **Non-correlated**: Independent, can be evaluated once and cached
17//!
18//! # Optimization
19//!
20//! - Non-correlated subqueries use once-only evaluation
21//! - IN subqueries build hash index for O(1) lookups
22//! - EXISTS short-circuits on first match
23
24use std::collections::HashSet;
25
26use super::super::engine::binding::{Binding, Value, Var};
27use super::value_compare::{partial_compare_values, values_equal};
28
29// ============================================================================
30// Subquery Types
31// ============================================================================
32
33/// Type of subquery
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub enum SubqueryType {
36    /// Scalar subquery returning single value
37    /// Example: `WHERE x = (SELECT max(y) FROM t)`
38    Scalar,
39
40    /// EXISTS check - true if any rows returned
41    /// Example: `WHERE EXISTS (SELECT * FROM t WHERE ...)`
42    Exists,
43
44    /// NOT EXISTS check
45    NotExists,
46
47    /// IN membership test
48    /// Example: `WHERE x IN (SELECT y FROM t)`
49    In,
50
51    /// NOT IN membership test
52    NotIn,
53
54    /// ANY comparison (at least one match)
55    /// Example: `WHERE x > ANY (SELECT y FROM t)`
56    Any(CompareOp),
57
58    /// ALL comparison (all must match)
59    /// Example: `WHERE x > ALL (SELECT y FROM t)`
60    All(CompareOp),
61}
62
63/// Comparison operator for ANY/ALL
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum CompareOp {
66    Eq,
67    Ne,
68    Lt,
69    Le,
70    Gt,
71    Ge,
72}
73
74// ============================================================================
75// Subquery Definition
76// ============================================================================
77
78/// A subquery definition
79#[derive(Debug, Clone)]
80pub struct SubqueryDef {
81    /// Unique identifier for this subquery
82    pub id: usize,
83
84    /// Type of subquery
85    pub subquery_type: SubqueryType,
86
87    /// Variable to compare against (for IN, ANY, ALL)
88    pub compare_var: Option<Var>,
89
90    /// Result variable name (for scalar subqueries)
91    pub result_var: Option<Var>,
92
93    /// Variables referenced from outer query (for correlation detection)
94    pub outer_refs: Vec<Var>,
95
96    /// Whether this subquery is correlated (references outer variables)
97    pub is_correlated: bool,
98}
99
100impl SubqueryDef {
101    /// Create a scalar subquery
102    pub fn scalar(id: usize, result_var: Var) -> Self {
103        Self {
104            id,
105            subquery_type: SubqueryType::Scalar,
106            compare_var: None,
107            result_var: Some(result_var),
108            outer_refs: Vec::new(),
109            is_correlated: false,
110        }
111    }
112
113    /// Create an EXISTS subquery
114    pub fn exists(id: usize, negated: bool) -> Self {
115        Self {
116            id,
117            subquery_type: if negated {
118                SubqueryType::NotExists
119            } else {
120                SubqueryType::Exists
121            },
122            compare_var: None,
123            result_var: None,
124            outer_refs: Vec::new(),
125            is_correlated: false,
126        }
127    }
128
129    /// Create an IN subquery
130    pub fn in_list(id: usize, compare_var: Var, negated: bool) -> Self {
131        Self {
132            id,
133            subquery_type: if negated {
134                SubqueryType::NotIn
135            } else {
136                SubqueryType::In
137            },
138            compare_var: Some(compare_var),
139            result_var: None,
140            outer_refs: Vec::new(),
141            is_correlated: false,
142        }
143    }
144
145    /// Mark as correlated with outer references
146    pub fn with_outer_refs(mut self, refs: Vec<Var>) -> Self {
147        self.outer_refs = refs;
148        self.is_correlated = !self.outer_refs.is_empty();
149        self
150    }
151}
152
153// ============================================================================
154// Subquery Result Cache
155// ============================================================================
156
157/// Cached result of a non-correlated subquery
158#[derive(Debug, Clone)]
159pub enum SubqueryCache {
160    /// Not yet evaluated
161    Unevaluated,
162
163    /// Scalar result
164    Scalar(Option<Value>),
165
166    /// Boolean result (EXISTS)
167    Boolean(bool),
168
169    /// Set of values (for IN)
170    ValueSet(HashSet<ValueHash>),
171
172    /// Ordered list of values (for ANY/ALL with ordering)
173    ValueList(Vec<Value>),
174}
175
176/// Hashable wrapper for Value (for HashSet storage)
177#[derive(Debug, Clone, PartialEq, Eq, Hash)]
178pub struct ValueHash(String);
179
180impl From<&Value> for ValueHash {
181    fn from(v: &Value) -> Self {
182        ValueHash(value_to_key(v))
183    }
184}
185
186fn value_to_key(value: &Value) -> String {
187    match value {
188        Value::Node(id) => format!("N:{}", id),
189        Value::Edge(id) => format!("E:{}", id),
190        Value::String(s) => format!("S:{}", s),
191        Value::Integer(i) => format!("I:{}", i),
192        Value::Float(f) => format!("F:{}", f.to_bits()),
193        Value::Boolean(b) => format!("B:{}", b),
194        Value::Uri(u) => format!("U:{}", u),
195        Value::Null => "NULL".to_string(),
196    }
197}
198
199// ============================================================================
200// Subquery Executor
201// ============================================================================
202
203/// Subquery executor handles evaluation of nested queries
204pub struct SubqueryExecutor {
205    /// Cache for non-correlated subqueries
206    cache: Vec<SubqueryCache>,
207}
208
209impl SubqueryExecutor {
210    /// Create new executor with space for n subqueries
211    pub fn new(num_subqueries: usize) -> Self {
212        Self {
213            cache: vec![SubqueryCache::Unevaluated; num_subqueries],
214        }
215    }
216
217    /// Evaluate an EXISTS subquery
218    pub fn eval_exists<F>(
219        &mut self,
220        def: &SubqueryDef,
221        outer_binding: &Binding,
222        execute_subquery: F,
223    ) -> bool
224    where
225        F: FnOnce(&Binding) -> Vec<Binding>,
226    {
227        let negated = matches!(def.subquery_type, SubqueryType::NotExists);
228
229        // Check cache for non-correlated
230        if !def.is_correlated {
231            if let SubqueryCache::Boolean(result) = &self.cache[def.id] {
232                return if negated { !*result } else { *result };
233            }
234        }
235
236        // Execute subquery
237        let results = execute_subquery(outer_binding);
238        let exists = !results.is_empty();
239
240        // Cache if non-correlated
241        if !def.is_correlated {
242            self.cache[def.id] = SubqueryCache::Boolean(exists);
243        }
244
245        if negated {
246            !exists
247        } else {
248            exists
249        }
250    }
251
252    /// Evaluate a scalar subquery
253    pub fn eval_scalar<F>(
254        &mut self,
255        def: &SubqueryDef,
256        outer_binding: &Binding,
257        result_var: &Var,
258        execute_subquery: F,
259    ) -> Option<Value>
260    where
261        F: FnOnce(&Binding) -> Vec<Binding>,
262    {
263        // Check cache for non-correlated
264        if !def.is_correlated {
265            if let SubqueryCache::Scalar(result) = &self.cache[def.id] {
266                return result.clone();
267            }
268        }
269
270        // Execute subquery
271        let results = execute_subquery(outer_binding);
272
273        // Get first result's value
274        let value = results.first().and_then(|b| b.get(result_var)).cloned();
275
276        // Cache if non-correlated
277        if !def.is_correlated {
278            self.cache[def.id] = SubqueryCache::Scalar(value.clone());
279        }
280
281        value
282    }
283
284    /// Evaluate an IN subquery
285    pub fn eval_in<F>(
286        &mut self,
287        def: &SubqueryDef,
288        outer_binding: &Binding,
289        check_value: &Value,
290        result_var: &Var,
291        execute_subquery: F,
292    ) -> bool
293    where
294        F: FnOnce(&Binding) -> Vec<Binding>,
295    {
296        let negated = matches!(def.subquery_type, SubqueryType::NotIn);
297
298        // Check cache for non-correlated (build hash set once)
299        if !def.is_correlated {
300            if let SubqueryCache::ValueSet(set) = &self.cache[def.id] {
301                let hash = ValueHash::from(check_value);
302                let in_set = set.contains(&hash);
303                return if negated { !in_set } else { in_set };
304            }
305        }
306
307        // Execute subquery and build set
308        let results = execute_subquery(outer_binding);
309        let set: HashSet<ValueHash> = results
310            .iter()
311            .filter_map(|b| b.get(result_var))
312            .map(ValueHash::from)
313            .collect();
314
315        let hash = ValueHash::from(check_value);
316        let in_set = set.contains(&hash);
317
318        // Cache if non-correlated
319        if !def.is_correlated {
320            self.cache[def.id] = SubqueryCache::ValueSet(set);
321        }
322
323        if negated {
324            !in_set
325        } else {
326            in_set
327        }
328    }
329
330    /// Evaluate an ANY subquery
331    pub fn eval_any<F>(
332        &mut self,
333        def: &SubqueryDef,
334        outer_binding: &Binding,
335        check_value: &Value,
336        op: CompareOp,
337        result_var: &Var,
338        execute_subquery: F,
339    ) -> bool
340    where
341        F: FnOnce(&Binding) -> Vec<Binding>,
342    {
343        // Check cache for non-correlated
344        if !def.is_correlated {
345            if let SubqueryCache::ValueList(list) = &self.cache[def.id] {
346                return list.iter().any(|v| compare_with_op(check_value, v, op));
347            }
348        }
349
350        // Execute subquery
351        let results = execute_subquery(outer_binding);
352        let list: Vec<Value> = results
353            .iter()
354            .filter_map(|b| b.get(result_var).cloned())
355            .collect();
356
357        let result = list.iter().any(|v| compare_with_op(check_value, v, op));
358
359        // Cache if non-correlated
360        if !def.is_correlated {
361            self.cache[def.id] = SubqueryCache::ValueList(list);
362        }
363
364        result
365    }
366
367    /// Evaluate an ALL subquery
368    pub fn eval_all<F>(
369        &mut self,
370        def: &SubqueryDef,
371        outer_binding: &Binding,
372        check_value: &Value,
373        op: CompareOp,
374        result_var: &Var,
375        execute_subquery: F,
376    ) -> bool
377    where
378        F: FnOnce(&Binding) -> Vec<Binding>,
379    {
380        // Check cache for non-correlated
381        if !def.is_correlated {
382            if let SubqueryCache::ValueList(list) = &self.cache[def.id] {
383                // Empty set: ALL is vacuously true
384                if list.is_empty() {
385                    return true;
386                }
387                return list.iter().all(|v| compare_with_op(check_value, v, op));
388            }
389        }
390
391        // Execute subquery
392        let results = execute_subquery(outer_binding);
393        let list: Vec<Value> = results
394            .iter()
395            .filter_map(|b| b.get(result_var).cloned())
396            .collect();
397
398        // Empty set: ALL is vacuously true
399        let result = if list.is_empty() {
400            true
401        } else {
402            list.iter().all(|v| compare_with_op(check_value, v, op))
403        };
404
405        // Cache if non-correlated
406        if !def.is_correlated {
407            self.cache[def.id] = SubqueryCache::ValueList(list);
408        }
409
410        result
411    }
412
413    /// Reset cache (for new execution)
414    pub fn reset(&mut self) {
415        for cache in &mut self.cache {
416            *cache = SubqueryCache::Unevaluated;
417        }
418    }
419
420    /// Check if a subquery is cached
421    pub fn is_cached(&self, id: usize) -> bool {
422        !matches!(self.cache.get(id), Some(SubqueryCache::Unevaluated) | None)
423    }
424}
425
426// ============================================================================
427// Correlation Detection
428// ============================================================================
429
430/// Detect outer variable references in a subquery
431pub fn detect_correlation(subquery_vars: &[Var], outer_vars: &[Var]) -> Vec<Var> {
432    subquery_vars
433        .iter()
434        .filter(|v| outer_vars.contains(v))
435        .cloned()
436        .collect()
437}
438
439/// Check if a binding satisfies correlation constraints
440pub fn bind_outer_refs(outer_binding: &Binding, outer_refs: &[Var]) -> Binding {
441    let mut result = Binding::empty();
442    for var in outer_refs {
443        if let Some(value) = outer_binding.get(var) {
444            let partial = Binding::one(var.clone(), value.clone());
445            result = result.merge(&partial).unwrap_or(result);
446        }
447    }
448    result
449}
450
451// ============================================================================
452// Helper Functions
453// ============================================================================
454
455fn compare_with_op(left: &Value, right: &Value, op: CompareOp) -> bool {
456    match op {
457        CompareOp::Eq => values_equal(left, right),
458        CompareOp::Ne => !values_equal(left, right),
459        CompareOp::Lt => partial_compare_values(left, right) == Some(std::cmp::Ordering::Less),
460        CompareOp::Le => matches!(
461            partial_compare_values(left, right),
462            Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
463        ),
464        CompareOp::Gt => partial_compare_values(left, right) == Some(std::cmp::Ordering::Greater),
465        CompareOp::Ge => matches!(
466            partial_compare_values(left, right),
467            Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
468        ),
469    }
470}
471
472// ============================================================================
473// Tests
474// ============================================================================
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479
480    fn make_binding(pairs: &[(&str, Value)]) -> Binding {
481        if pairs.is_empty() {
482            return Binding::empty();
483        }
484
485        let mut result = Binding::one(Var::new(pairs[0].0), pairs[0].1.clone());
486
487        for (k, v) in pairs.iter().skip(1) {
488            let next = Binding::one(Var::new(k), v.clone());
489            result = result.merge(&next).unwrap_or(result);
490        }
491
492        result
493    }
494
495    #[test]
496    fn test_exists_uncorrelated() {
497        let mut executor = SubqueryExecutor::new(1);
498        let def = SubqueryDef::exists(0, false);
499        let outer = Binding::empty();
500
501        // First call - executes subquery
502        let result = executor.eval_exists(&def, &outer, |_| {
503            vec![make_binding(&[("x", Value::Integer(1))])]
504        });
505        assert!(result);
506
507        // Second call - uses cache (won't call closure)
508        let result2 = executor.eval_exists(&def, &outer, |_| {
509            panic!("Should use cache!");
510        });
511        assert!(result2);
512    }
513
514    #[test]
515    fn test_not_exists() {
516        let mut executor = SubqueryExecutor::new(1);
517        let def = SubqueryDef::exists(0, true); // NOT EXISTS
518
519        let result = executor.eval_exists(&def, &Binding::empty(), |_| {
520            vec![] // Empty result
521        });
522        assert!(result); // NOT EXISTS of empty = true
523    }
524
525    #[test]
526    fn test_in_subquery() {
527        let mut executor = SubqueryExecutor::new(1);
528        let def = SubqueryDef::in_list(0, Var::new("x"), false);
529
530        let check = Value::Integer(2);
531        let result_var = Var::new("y");
532
533        let in_set = executor.eval_in(&def, &Binding::empty(), &check, &result_var, |_| {
534            vec![
535                make_binding(&[("y", Value::Integer(1))]),
536                make_binding(&[("y", Value::Integer(2))]),
537                make_binding(&[("y", Value::Integer(3))]),
538            ]
539        });
540        assert!(in_set);
541
542        // Check not in set
543        let check2 = Value::Integer(5);
544        let not_in = executor.eval_in(&def, &Binding::empty(), &check2, &result_var, |_| {
545            panic!("Should use cache!");
546        });
547        assert!(!not_in);
548    }
549
550    #[test]
551    fn test_scalar_subquery() {
552        let mut executor = SubqueryExecutor::new(1);
553        let def = SubqueryDef::scalar(0, Var::new("result"));
554        let result_var = Var::new("max_val");
555
556        let value = executor.eval_scalar(&def, &Binding::empty(), &result_var, |_| {
557            vec![make_binding(&[("max_val", Value::Integer(100))])]
558        });
559
560        assert_eq!(value, Some(Value::Integer(100)));
561    }
562
563    #[test]
564    fn test_any_subquery() {
565        let mut executor = SubqueryExecutor::new(1);
566        let def = SubqueryDef {
567            id: 0,
568            subquery_type: SubqueryType::Any(CompareOp::Gt),
569            compare_var: Some(Var::new("x")),
570            result_var: None,
571            outer_refs: Vec::new(),
572            is_correlated: false,
573        };
574
575        let check = Value::Integer(5);
576        let result_var = Var::new("y");
577
578        // 5 > ANY (1, 3, 10) = true (5 > 1 and 5 > 3)
579        let result = executor.eval_any(
580            &def,
581            &Binding::empty(),
582            &check,
583            CompareOp::Gt,
584            &result_var,
585            |_| {
586                vec![
587                    make_binding(&[("y", Value::Integer(1))]),
588                    make_binding(&[("y", Value::Integer(3))]),
589                    make_binding(&[("y", Value::Integer(10))]),
590                ]
591            },
592        );
593        assert!(result);
594    }
595
596    #[test]
597    fn test_all_subquery() {
598        let mut executor = SubqueryExecutor::new(1);
599        let def = SubqueryDef {
600            id: 0,
601            subquery_type: SubqueryType::All(CompareOp::Gt),
602            compare_var: Some(Var::new("x")),
603            result_var: None,
604            outer_refs: Vec::new(),
605            is_correlated: false,
606        };
607
608        let check = Value::Integer(5);
609        let result_var = Var::new("y");
610
611        // 5 > ALL (1, 2, 3) = true
612        let result = executor.eval_all(
613            &def,
614            &Binding::empty(),
615            &check,
616            CompareOp::Gt,
617            &result_var,
618            |_| {
619                vec![
620                    make_binding(&[("y", Value::Integer(1))]),
621                    make_binding(&[("y", Value::Integer(2))]),
622                    make_binding(&[("y", Value::Integer(3))]),
623                ]
624            },
625        );
626        assert!(result);
627
628        // Reset and try with value that fails
629        executor.reset();
630        let check2 = Value::Integer(2);
631        let result2 = executor.eval_all(
632            &def,
633            &Binding::empty(),
634            &check2,
635            CompareOp::Gt,
636            &result_var,
637            |_| {
638                vec![
639                    make_binding(&[("y", Value::Integer(1))]),
640                    make_binding(&[("y", Value::Integer(3))]), // 2 > 3 = false
641                ]
642            },
643        );
644        assert!(!result2);
645    }
646
647    #[test]
648    fn test_correlated_no_cache() {
649        let mut executor = SubqueryExecutor::new(1);
650        let def = SubqueryDef::exists(0, false).with_outer_refs(vec![Var::new("outer_id")]);
651
652        let mut call_count = 0;
653
654        // First call
655        let outer1 = make_binding(&[("outer_id", Value::Integer(1))]);
656        let _ = executor.eval_exists(&def, &outer1, |_| {
657            call_count += 1;
658            vec![make_binding(&[("x", Value::Integer(1))])]
659        });
660
661        // Second call - should NOT use cache (correlated)
662        let outer2 = make_binding(&[("outer_id", Value::Integer(2))]);
663        let _ = executor.eval_exists(&def, &outer2, |_| {
664            call_count += 1;
665            vec![]
666        });
667
668        assert_eq!(call_count, 2); // Both calls executed
669    }
670
671    #[test]
672    fn test_correlation_detection() {
673        let subquery_vars = vec![Var::new("x"), Var::new("outer_id"), Var::new("y")];
674        let outer_vars = vec![Var::new("outer_id"), Var::new("z")];
675
676        let refs = detect_correlation(&subquery_vars, &outer_vars);
677        assert_eq!(refs.len(), 1);
678        assert_eq!(refs[0].name(), "outer_id");
679    }
680}