1use std::collections::HashSet;
25
26use super::super::engine::binding::{Binding, Value, Var};
27use super::value_compare::{partial_compare_values, values_equal};
28
29#[derive(Debug, Clone, PartialEq, Eq)]
35pub enum SubqueryType {
36 Scalar,
39
40 Exists,
43
44 NotExists,
46
47 In,
50
51 NotIn,
53
54 Any(CompareOp),
57
58 All(CompareOp),
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum CompareOp {
66 Eq,
67 Ne,
68 Lt,
69 Le,
70 Gt,
71 Ge,
72}
73
74#[derive(Debug, Clone)]
80pub struct SubqueryDef {
81 pub id: usize,
83
84 pub subquery_type: SubqueryType,
86
87 pub compare_var: Option<Var>,
89
90 pub result_var: Option<Var>,
92
93 pub outer_refs: Vec<Var>,
95
96 pub is_correlated: bool,
98}
99
100impl SubqueryDef {
101 pub fn scalar(id: usize, result_var: Var) -> Self {
103 Self {
104 id,
105 subquery_type: SubqueryType::Scalar,
106 compare_var: None,
107 result_var: Some(result_var),
108 outer_refs: Vec::new(),
109 is_correlated: false,
110 }
111 }
112
113 pub fn exists(id: usize, negated: bool) -> Self {
115 Self {
116 id,
117 subquery_type: if negated {
118 SubqueryType::NotExists
119 } else {
120 SubqueryType::Exists
121 },
122 compare_var: None,
123 result_var: None,
124 outer_refs: Vec::new(),
125 is_correlated: false,
126 }
127 }
128
129 pub fn in_list(id: usize, compare_var: Var, negated: bool) -> Self {
131 Self {
132 id,
133 subquery_type: if negated {
134 SubqueryType::NotIn
135 } else {
136 SubqueryType::In
137 },
138 compare_var: Some(compare_var),
139 result_var: None,
140 outer_refs: Vec::new(),
141 is_correlated: false,
142 }
143 }
144
145 pub fn with_outer_refs(mut self, refs: Vec<Var>) -> Self {
147 self.outer_refs = refs;
148 self.is_correlated = !self.outer_refs.is_empty();
149 self
150 }
151}
152
153#[derive(Debug, Clone)]
159pub enum SubqueryCache {
160 Unevaluated,
162
163 Scalar(Option<Value>),
165
166 Boolean(bool),
168
169 ValueSet(HashSet<ValueHash>),
171
172 ValueList(Vec<Value>),
174}
175
176#[derive(Debug, Clone, PartialEq, Eq, Hash)]
178pub struct ValueHash(String);
179
180impl From<&Value> for ValueHash {
181 fn from(v: &Value) -> Self {
182 ValueHash(value_to_key(v))
183 }
184}
185
186fn value_to_key(value: &Value) -> String {
187 match value {
188 Value::Node(id) => format!("N:{}", id),
189 Value::Edge(id) => format!("E:{}", id),
190 Value::String(s) => format!("S:{}", s),
191 Value::Integer(i) => format!("I:{}", i),
192 Value::Float(f) => format!("F:{}", f.to_bits()),
193 Value::Boolean(b) => format!("B:{}", b),
194 Value::Uri(u) => format!("U:{}", u),
195 Value::Null => "NULL".to_string(),
196 }
197}
198
199pub struct SubqueryExecutor {
205 cache: Vec<SubqueryCache>,
207}
208
209impl SubqueryExecutor {
210 pub fn new(num_subqueries: usize) -> Self {
212 Self {
213 cache: vec![SubqueryCache::Unevaluated; num_subqueries],
214 }
215 }
216
217 pub fn eval_exists<F>(
219 &mut self,
220 def: &SubqueryDef,
221 outer_binding: &Binding,
222 execute_subquery: F,
223 ) -> bool
224 where
225 F: FnOnce(&Binding) -> Vec<Binding>,
226 {
227 let negated = matches!(def.subquery_type, SubqueryType::NotExists);
228
229 if !def.is_correlated {
231 if let SubqueryCache::Boolean(result) = &self.cache[def.id] {
232 return if negated { !*result } else { *result };
233 }
234 }
235
236 let results = execute_subquery(outer_binding);
238 let exists = !results.is_empty();
239
240 if !def.is_correlated {
242 self.cache[def.id] = SubqueryCache::Boolean(exists);
243 }
244
245 if negated {
246 !exists
247 } else {
248 exists
249 }
250 }
251
252 pub fn eval_scalar<F>(
254 &mut self,
255 def: &SubqueryDef,
256 outer_binding: &Binding,
257 result_var: &Var,
258 execute_subquery: F,
259 ) -> Option<Value>
260 where
261 F: FnOnce(&Binding) -> Vec<Binding>,
262 {
263 if !def.is_correlated {
265 if let SubqueryCache::Scalar(result) = &self.cache[def.id] {
266 return result.clone();
267 }
268 }
269
270 let results = execute_subquery(outer_binding);
272
273 let value = results.first().and_then(|b| b.get(result_var)).cloned();
275
276 if !def.is_correlated {
278 self.cache[def.id] = SubqueryCache::Scalar(value.clone());
279 }
280
281 value
282 }
283
284 pub fn eval_in<F>(
286 &mut self,
287 def: &SubqueryDef,
288 outer_binding: &Binding,
289 check_value: &Value,
290 result_var: &Var,
291 execute_subquery: F,
292 ) -> bool
293 where
294 F: FnOnce(&Binding) -> Vec<Binding>,
295 {
296 let negated = matches!(def.subquery_type, SubqueryType::NotIn);
297
298 if !def.is_correlated {
300 if let SubqueryCache::ValueSet(set) = &self.cache[def.id] {
301 let hash = ValueHash::from(check_value);
302 let in_set = set.contains(&hash);
303 return if negated { !in_set } else { in_set };
304 }
305 }
306
307 let results = execute_subquery(outer_binding);
309 let set: HashSet<ValueHash> = results
310 .iter()
311 .filter_map(|b| b.get(result_var))
312 .map(ValueHash::from)
313 .collect();
314
315 let hash = ValueHash::from(check_value);
316 let in_set = set.contains(&hash);
317
318 if !def.is_correlated {
320 self.cache[def.id] = SubqueryCache::ValueSet(set);
321 }
322
323 if negated {
324 !in_set
325 } else {
326 in_set
327 }
328 }
329
330 pub fn eval_any<F>(
332 &mut self,
333 def: &SubqueryDef,
334 outer_binding: &Binding,
335 check_value: &Value,
336 op: CompareOp,
337 result_var: &Var,
338 execute_subquery: F,
339 ) -> bool
340 where
341 F: FnOnce(&Binding) -> Vec<Binding>,
342 {
343 if !def.is_correlated {
345 if let SubqueryCache::ValueList(list) = &self.cache[def.id] {
346 return list.iter().any(|v| compare_with_op(check_value, v, op));
347 }
348 }
349
350 let results = execute_subquery(outer_binding);
352 let list: Vec<Value> = results
353 .iter()
354 .filter_map(|b| b.get(result_var).cloned())
355 .collect();
356
357 let result = list.iter().any(|v| compare_with_op(check_value, v, op));
358
359 if !def.is_correlated {
361 self.cache[def.id] = SubqueryCache::ValueList(list);
362 }
363
364 result
365 }
366
367 pub fn eval_all<F>(
369 &mut self,
370 def: &SubqueryDef,
371 outer_binding: &Binding,
372 check_value: &Value,
373 op: CompareOp,
374 result_var: &Var,
375 execute_subquery: F,
376 ) -> bool
377 where
378 F: FnOnce(&Binding) -> Vec<Binding>,
379 {
380 if !def.is_correlated {
382 if let SubqueryCache::ValueList(list) = &self.cache[def.id] {
383 if list.is_empty() {
385 return true;
386 }
387 return list.iter().all(|v| compare_with_op(check_value, v, op));
388 }
389 }
390
391 let results = execute_subquery(outer_binding);
393 let list: Vec<Value> = results
394 .iter()
395 .filter_map(|b| b.get(result_var).cloned())
396 .collect();
397
398 let result = if list.is_empty() {
400 true
401 } else {
402 list.iter().all(|v| compare_with_op(check_value, v, op))
403 };
404
405 if !def.is_correlated {
407 self.cache[def.id] = SubqueryCache::ValueList(list);
408 }
409
410 result
411 }
412
413 pub fn reset(&mut self) {
415 for cache in &mut self.cache {
416 *cache = SubqueryCache::Unevaluated;
417 }
418 }
419
420 pub fn is_cached(&self, id: usize) -> bool {
422 !matches!(self.cache.get(id), Some(SubqueryCache::Unevaluated) | None)
423 }
424}
425
426pub fn detect_correlation(subquery_vars: &[Var], outer_vars: &[Var]) -> Vec<Var> {
432 subquery_vars
433 .iter()
434 .filter(|v| outer_vars.contains(v))
435 .cloned()
436 .collect()
437}
438
439pub fn bind_outer_refs(outer_binding: &Binding, outer_refs: &[Var]) -> Binding {
441 let mut result = Binding::empty();
442 for var in outer_refs {
443 if let Some(value) = outer_binding.get(var) {
444 let partial = Binding::one(var.clone(), value.clone());
445 result = result.merge(&partial).unwrap_or(result);
446 }
447 }
448 result
449}
450
451fn compare_with_op(left: &Value, right: &Value, op: CompareOp) -> bool {
456 match op {
457 CompareOp::Eq => values_equal(left, right),
458 CompareOp::Ne => !values_equal(left, right),
459 CompareOp::Lt => partial_compare_values(left, right) == Some(std::cmp::Ordering::Less),
460 CompareOp::Le => matches!(
461 partial_compare_values(left, right),
462 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
463 ),
464 CompareOp::Gt => partial_compare_values(left, right) == Some(std::cmp::Ordering::Greater),
465 CompareOp::Ge => matches!(
466 partial_compare_values(left, right),
467 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
468 ),
469 }
470}
471
472#[cfg(test)]
477mod tests {
478 use super::*;
479
480 fn make_binding(pairs: &[(&str, Value)]) -> Binding {
481 if pairs.is_empty() {
482 return Binding::empty();
483 }
484
485 let mut result = Binding::one(Var::new(pairs[0].0), pairs[0].1.clone());
486
487 for (k, v) in pairs.iter().skip(1) {
488 let next = Binding::one(Var::new(k), v.clone());
489 result = result.merge(&next).unwrap_or(result);
490 }
491
492 result
493 }
494
495 #[test]
496 fn test_exists_uncorrelated() {
497 let mut executor = SubqueryExecutor::new(1);
498 let def = SubqueryDef::exists(0, false);
499 let outer = Binding::empty();
500
501 let result = executor.eval_exists(&def, &outer, |_| {
503 vec![make_binding(&[("x", Value::Integer(1))])]
504 });
505 assert!(result);
506
507 let result2 = executor.eval_exists(&def, &outer, |_| {
509 panic!("Should use cache!");
510 });
511 assert!(result2);
512 }
513
514 #[test]
515 fn test_not_exists() {
516 let mut executor = SubqueryExecutor::new(1);
517 let def = SubqueryDef::exists(0, true); let result = executor.eval_exists(&def, &Binding::empty(), |_| {
520 vec![] });
522 assert!(result); }
524
525 #[test]
526 fn test_in_subquery() {
527 let mut executor = SubqueryExecutor::new(1);
528 let def = SubqueryDef::in_list(0, Var::new("x"), false);
529
530 let check = Value::Integer(2);
531 let result_var = Var::new("y");
532
533 let in_set = executor.eval_in(&def, &Binding::empty(), &check, &result_var, |_| {
534 vec![
535 make_binding(&[("y", Value::Integer(1))]),
536 make_binding(&[("y", Value::Integer(2))]),
537 make_binding(&[("y", Value::Integer(3))]),
538 ]
539 });
540 assert!(in_set);
541
542 let check2 = Value::Integer(5);
544 let not_in = executor.eval_in(&def, &Binding::empty(), &check2, &result_var, |_| {
545 panic!("Should use cache!");
546 });
547 assert!(!not_in);
548 }
549
550 #[test]
551 fn test_scalar_subquery() {
552 let mut executor = SubqueryExecutor::new(1);
553 let def = SubqueryDef::scalar(0, Var::new("result"));
554 let result_var = Var::new("max_val");
555
556 let value = executor.eval_scalar(&def, &Binding::empty(), &result_var, |_| {
557 vec![make_binding(&[("max_val", Value::Integer(100))])]
558 });
559
560 assert_eq!(value, Some(Value::Integer(100)));
561 }
562
563 #[test]
564 fn test_any_subquery() {
565 let mut executor = SubqueryExecutor::new(1);
566 let def = SubqueryDef {
567 id: 0,
568 subquery_type: SubqueryType::Any(CompareOp::Gt),
569 compare_var: Some(Var::new("x")),
570 result_var: None,
571 outer_refs: Vec::new(),
572 is_correlated: false,
573 };
574
575 let check = Value::Integer(5);
576 let result_var = Var::new("y");
577
578 let result = executor.eval_any(
580 &def,
581 &Binding::empty(),
582 &check,
583 CompareOp::Gt,
584 &result_var,
585 |_| {
586 vec![
587 make_binding(&[("y", Value::Integer(1))]),
588 make_binding(&[("y", Value::Integer(3))]),
589 make_binding(&[("y", Value::Integer(10))]),
590 ]
591 },
592 );
593 assert!(result);
594 }
595
596 #[test]
597 fn test_all_subquery() {
598 let mut executor = SubqueryExecutor::new(1);
599 let def = SubqueryDef {
600 id: 0,
601 subquery_type: SubqueryType::All(CompareOp::Gt),
602 compare_var: Some(Var::new("x")),
603 result_var: None,
604 outer_refs: Vec::new(),
605 is_correlated: false,
606 };
607
608 let check = Value::Integer(5);
609 let result_var = Var::new("y");
610
611 let result = executor.eval_all(
613 &def,
614 &Binding::empty(),
615 &check,
616 CompareOp::Gt,
617 &result_var,
618 |_| {
619 vec![
620 make_binding(&[("y", Value::Integer(1))]),
621 make_binding(&[("y", Value::Integer(2))]),
622 make_binding(&[("y", Value::Integer(3))]),
623 ]
624 },
625 );
626 assert!(result);
627
628 executor.reset();
630 let check2 = Value::Integer(2);
631 let result2 = executor.eval_all(
632 &def,
633 &Binding::empty(),
634 &check2,
635 CompareOp::Gt,
636 &result_var,
637 |_| {
638 vec![
639 make_binding(&[("y", Value::Integer(1))]),
640 make_binding(&[("y", Value::Integer(3))]), ]
642 },
643 );
644 assert!(!result2);
645 }
646
647 #[test]
648 fn test_correlated_no_cache() {
649 let mut executor = SubqueryExecutor::new(1);
650 let def = SubqueryDef::exists(0, false).with_outer_refs(vec![Var::new("outer_id")]);
651
652 let mut call_count = 0;
653
654 let outer1 = make_binding(&[("outer_id", Value::Integer(1))]);
656 let _ = executor.eval_exists(&def, &outer1, |_| {
657 call_count += 1;
658 vec![make_binding(&[("x", Value::Integer(1))])]
659 });
660
661 let outer2 = make_binding(&[("outer_id", Value::Integer(2))]);
663 let _ = executor.eval_exists(&def, &outer2, |_| {
664 call_count += 1;
665 vec![]
666 });
667
668 assert_eq!(call_count, 2); }
670
671 #[test]
672 fn test_correlation_detection() {
673 let subquery_vars = vec![Var::new("x"), Var::new("outer_id"), Var::new("y")];
674 let outer_vars = vec![Var::new("outer_id"), Var::new("z")];
675
676 let refs = detect_correlation(&subquery_vars, &outer_vars);
677 assert_eq!(refs.len(), 1);
678 assert_eq!(refs[0].name(), "outer_id");
679 }
680}