1use super::binding::{Binding, Value, Var};
13use std::collections::HashSet;
14use std::fmt::Debug;
15
16pub type IterResult = Result<Option<Binding>, IterError>;
18
19#[derive(Debug, Clone)]
21pub enum IterError {
22 Execution(String),
24 Timeout,
26 Cancelled,
28 ResourceExhausted(String),
30}
31
32impl std::fmt::Display for IterError {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 match self {
35 IterError::Execution(msg) => write!(f, "Execution error: {}", msg),
36 IterError::Timeout => write!(f, "Query timeout"),
37 IterError::Cancelled => write!(f, "Query cancelled"),
38 IterError::ResourceExhausted(msg) => write!(f, "Resource exhausted: {}", msg),
39 }
40 }
41}
42
43impl std::error::Error for IterError {}
44
45pub trait BindingIterator: Debug + Send {
47 fn next_binding(&mut self) -> IterResult;
49
50 fn has_next(&self) -> bool {
52 true }
54
55 fn vars(&self) -> Vec<Var>;
57
58 fn cancel(&mut self);
60
61 fn reset(&mut self) -> bool {
63 false }
65}
66
67#[derive(Debug)]
69pub struct QueryIterBase {
70 bindings: Vec<Binding>,
71 index: usize,
72 vars: Vec<Var>,
73 cancelled: bool,
74}
75
76impl QueryIterBase {
77 pub fn new(bindings: Vec<Binding>) -> Self {
79 let vars = if let Some(first) = bindings.first() {
80 first.all_vars().into_iter().cloned().collect()
81 } else {
82 Vec::new()
83 };
84
85 Self {
86 bindings,
87 index: 0,
88 vars,
89 cancelled: false,
90 }
91 }
92
93 pub fn empty() -> Self {
95 Self {
96 bindings: Vec::new(),
97 index: 0,
98 vars: Vec::new(),
99 cancelled: false,
100 }
101 }
102
103 pub fn single(binding: Binding) -> Self {
105 let vars = binding.all_vars().into_iter().cloned().collect();
106 Self {
107 bindings: vec![binding],
108 index: 0,
109 vars,
110 cancelled: false,
111 }
112 }
113}
114
115impl BindingIterator for QueryIterBase {
116 fn next_binding(&mut self) -> IterResult {
117 if self.cancelled {
118 return Err(IterError::Cancelled);
119 }
120
121 if self.index < self.bindings.len() {
122 let binding = self.bindings[self.index].clone();
123 self.index += 1;
124 Ok(Some(binding))
125 } else {
126 Ok(None)
127 }
128 }
129
130 fn has_next(&self) -> bool {
131 !self.cancelled && self.index < self.bindings.len()
132 }
133
134 fn vars(&self) -> Vec<Var> {
135 self.vars.clone()
136 }
137
138 fn cancel(&mut self) {
139 self.cancelled = true;
140 }
141
142 fn reset(&mut self) -> bool {
143 self.index = 0;
144 self.cancelled = false;
145 true
146 }
147}
148
149pub struct QueryIterFilter {
151 upstream: Box<dyn BindingIterator>,
152 predicate: Box<dyn Fn(&Binding) -> bool + Send + Sync>,
153 cancelled: bool,
154}
155
156impl std::fmt::Debug for QueryIterFilter {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("QueryIterFilter")
159 .field("upstream", &self.upstream)
160 .field("predicate", &"<filter fn>")
161 .field("cancelled", &self.cancelled)
162 .finish()
163 }
164}
165
166impl QueryIterFilter {
167 pub fn new<F>(upstream: Box<dyn BindingIterator>, predicate: F) -> Self
169 where
170 F: Fn(&Binding) -> bool + Send + Sync + 'static,
171 {
172 Self {
173 upstream,
174 predicate: Box::new(predicate),
175 cancelled: false,
176 }
177 }
178}
179
180impl BindingIterator for QueryIterFilter {
181 fn next_binding(&mut self) -> IterResult {
182 if self.cancelled {
183 return Err(IterError::Cancelled);
184 }
185
186 loop {
187 match self.upstream.next_binding()? {
188 Some(binding) => {
189 if (self.predicate)(&binding) {
190 return Ok(Some(binding));
191 }
192 }
194 None => return Ok(None),
195 }
196 }
197 }
198
199 fn vars(&self) -> Vec<Var> {
200 self.upstream.vars()
201 }
202
203 fn cancel(&mut self) {
204 self.cancelled = true;
205 self.upstream.cancel();
206 }
207}
208
209pub struct QueryIterJoin {
211 left: Box<dyn BindingIterator>,
212 right_factory: Box<dyn Fn() -> Box<dyn BindingIterator> + Send + Sync>,
213 current_left: Option<Binding>,
214 current_right: Option<Box<dyn BindingIterator>>,
215 vars: Vec<Var>,
216 cancelled: bool,
217}
218
219impl std::fmt::Debug for QueryIterJoin {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 f.debug_struct("QueryIterJoin")
222 .field("left", &self.left)
223 .field("right_factory", &"<factory fn>")
224 .field("current_left", &self.current_left)
225 .field("current_right", &self.current_right)
226 .field("vars", &self.vars)
227 .field("cancelled", &self.cancelled)
228 .finish()
229 }
230}
231
232impl QueryIterJoin {
233 pub fn new<F>(left: Box<dyn BindingIterator>, right_factory: F, right_vars: Vec<Var>) -> Self
235 where
236 F: Fn() -> Box<dyn BindingIterator> + Send + Sync + 'static,
237 {
238 let mut vars = left.vars();
239 for v in right_vars {
240 if !vars.contains(&v) {
241 vars.push(v);
242 }
243 }
244
245 Self {
246 left,
247 right_factory: Box::new(right_factory),
248 current_left: None,
249 current_right: None,
250 vars,
251 cancelled: false,
252 }
253 }
254}
255
256impl BindingIterator for QueryIterJoin {
257 fn next_binding(&mut self) -> IterResult {
258 if self.cancelled {
259 return Err(IterError::Cancelled);
260 }
261
262 loop {
263 if let Some(ref mut right) = self.current_right {
265 if let Some(right_binding) = right.next_binding()? {
266 if let Some(ref left_binding) = self.current_left {
268 if let Some(merged) = left_binding.merge(&right_binding) {
269 return Ok(Some(merged));
270 }
271 continue;
273 }
274 }
275 }
276
277 match self.left.next_binding()? {
279 Some(left_binding) => {
280 self.current_left = Some(left_binding);
281 self.current_right = Some((self.right_factory)());
282 }
283 None => return Ok(None),
284 }
285 }
286 }
287
288 fn vars(&self) -> Vec<Var> {
289 self.vars.clone()
290 }
291
292 fn cancel(&mut self) {
293 self.cancelled = true;
294 self.left.cancel();
295 if let Some(ref mut right) = self.current_right {
296 right.cancel();
297 }
298 }
299}
300
301#[derive(Debug)]
303pub struct QueryIterUnion {
304 iterators: Vec<Box<dyn BindingIterator>>,
305 current_index: usize,
306 vars: Vec<Var>,
307 cancelled: bool,
308}
309
310impl QueryIterUnion {
311 pub fn new(iterators: Vec<Box<dyn BindingIterator>>) -> Self {
313 let mut vars = Vec::new();
314 for iter in &iterators {
315 for v in iter.vars() {
316 if !vars.contains(&v) {
317 vars.push(v);
318 }
319 }
320 }
321
322 Self {
323 iterators,
324 current_index: 0,
325 vars,
326 cancelled: false,
327 }
328 }
329}
330
331impl BindingIterator for QueryIterUnion {
332 fn next_binding(&mut self) -> IterResult {
333 if self.cancelled {
334 return Err(IterError::Cancelled);
335 }
336
337 while self.current_index < self.iterators.len() {
338 match self.iterators[self.current_index].next_binding()? {
339 Some(binding) => return Ok(Some(binding)),
340 None => {
341 self.current_index += 1;
342 }
343 }
344 }
345
346 Ok(None)
347 }
348
349 fn vars(&self) -> Vec<Var> {
350 self.vars.clone()
351 }
352
353 fn cancel(&mut self) {
354 self.cancelled = true;
355 for iter in &mut self.iterators {
356 iter.cancel();
357 }
358 }
359}
360
361#[derive(Debug)]
363pub struct QueryIterProject {
364 upstream: Box<dyn BindingIterator>,
365 project_vars: Vec<Var>,
366 cancelled: bool,
367}
368
369impl QueryIterProject {
370 pub fn new(upstream: Box<dyn BindingIterator>, vars: Vec<Var>) -> Self {
372 Self {
373 upstream,
374 project_vars: vars,
375 cancelled: false,
376 }
377 }
378}
379
380impl BindingIterator for QueryIterProject {
381 fn next_binding(&mut self) -> IterResult {
382 if self.cancelled {
383 return Err(IterError::Cancelled);
384 }
385
386 match self.upstream.next_binding()? {
387 Some(binding) => Ok(Some(binding.project(&self.project_vars))),
388 None => Ok(None),
389 }
390 }
391
392 fn vars(&self) -> Vec<Var> {
393 self.project_vars.clone()
394 }
395
396 fn cancel(&mut self) {
397 self.cancelled = true;
398 self.upstream.cancel();
399 }
400}
401
402#[derive(Debug)]
404pub struct QueryIterSlice {
405 upstream: Box<dyn BindingIterator>,
406 offset: u64,
407 limit: Option<u64>,
408 skipped: u64,
409 returned: u64,
410 cancelled: bool,
411}
412
413impl QueryIterSlice {
414 pub fn new(upstream: Box<dyn BindingIterator>, offset: u64, limit: Option<u64>) -> Self {
416 Self {
417 upstream,
418 offset,
419 limit,
420 skipped: 0,
421 returned: 0,
422 cancelled: false,
423 }
424 }
425
426 pub fn limit(upstream: Box<dyn BindingIterator>, limit: u64) -> Self {
428 Self::new(upstream, 0, Some(limit))
429 }
430
431 pub fn offset(upstream: Box<dyn BindingIterator>, offset: u64) -> Self {
433 Self::new(upstream, offset, None)
434 }
435}
436
437impl BindingIterator for QueryIterSlice {
438 fn next_binding(&mut self) -> IterResult {
439 if self.cancelled {
440 return Err(IterError::Cancelled);
441 }
442
443 if let Some(limit) = self.limit {
445 if self.returned >= limit {
446 return Ok(None);
447 }
448 }
449
450 while self.skipped < self.offset {
452 match self.upstream.next_binding()? {
453 Some(_) => {
454 self.skipped += 1;
455 }
456 None => return Ok(None),
457 }
458 }
459
460 match self.upstream.next_binding()? {
462 Some(binding) => {
463 self.returned += 1;
464 Ok(Some(binding))
465 }
466 None => Ok(None),
467 }
468 }
469
470 fn vars(&self) -> Vec<Var> {
471 self.upstream.vars()
472 }
473
474 fn cancel(&mut self) {
475 self.cancelled = true;
476 self.upstream.cancel();
477 }
478}
479
480#[derive(Debug)]
482pub struct QueryIterSort {
483 upstream: Box<dyn BindingIterator>,
484 comparators: Vec<SortKey>,
485 sorted: Option<Vec<Binding>>,
486 index: usize,
487 cancelled: bool,
488}
489
490#[derive(Debug, Clone)]
492pub struct SortKey {
493 pub var: Var,
495 pub ascending: bool,
497}
498
499impl SortKey {
500 pub fn asc(var: Var) -> Self {
502 Self {
503 var,
504 ascending: true,
505 }
506 }
507
508 pub fn desc(var: Var) -> Self {
510 Self {
511 var,
512 ascending: false,
513 }
514 }
515}
516
517impl QueryIterSort {
518 pub fn new(upstream: Box<dyn BindingIterator>, comparators: Vec<SortKey>) -> Self {
520 Self {
521 upstream,
522 comparators,
523 sorted: None,
524 index: 0,
525 cancelled: false,
526 }
527 }
528
529 fn materialize(&mut self) -> Result<(), IterError> {
531 if self.sorted.is_some() {
532 return Ok(());
533 }
534
535 let mut bindings = Vec::new();
536 while let Some(b) = self.upstream.next_binding()? {
537 bindings.push(b);
538 }
539
540 let comparators = self.comparators.clone();
542 bindings.sort_by(|a, b| {
543 for key in &comparators {
544 let a_val = a.get(&key.var);
545 let b_val = b.get(&key.var);
546
547 let ordering = compare_values(a_val, b_val);
548 if ordering != std::cmp::Ordering::Equal {
549 return if key.ascending {
550 ordering
551 } else {
552 ordering.reverse()
553 };
554 }
555 }
556 std::cmp::Ordering::Equal
557 });
558
559 self.sorted = Some(bindings);
560 Ok(())
561 }
562}
563
564impl BindingIterator for QueryIterSort {
565 fn next_binding(&mut self) -> IterResult {
566 if self.cancelled {
567 return Err(IterError::Cancelled);
568 }
569
570 self.materialize()?;
571
572 if let Some(ref sorted) = self.sorted {
573 if self.index < sorted.len() {
574 let binding = sorted[self.index].clone();
575 self.index += 1;
576 return Ok(Some(binding));
577 }
578 }
579
580 Ok(None)
581 }
582
583 fn vars(&self) -> Vec<Var> {
584 self.upstream.vars()
585 }
586
587 fn cancel(&mut self) {
588 self.cancelled = true;
589 self.upstream.cancel();
590 }
591}
592
593fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
595 match (a, b) {
596 (None, None) => std::cmp::Ordering::Equal,
597 (None, Some(_)) => std::cmp::Ordering::Less,
598 (Some(_), None) => std::cmp::Ordering::Greater,
599 (Some(a_val), Some(b_val)) => compare_value(a_val, b_val),
600 }
601}
602
603fn compare_value(a: &Value, b: &Value) -> std::cmp::Ordering {
605 match (a, b) {
606 (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
607 (Value::Float(a), Value::Float(b)) => a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal),
608 (Value::String(a), Value::String(b)) => a.cmp(b),
609 (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
610 (Value::Node(a), Value::Node(b)) => a.cmp(b),
611 (Value::Edge(a), Value::Edge(b)) => a.cmp(b),
612 (Value::Uri(a), Value::Uri(b)) => a.cmp(b),
613 (Value::Null, Value::Null) => std::cmp::Ordering::Equal,
614 _ => {
616 let type_order = |v: &Value| -> u8 {
617 match v {
618 Value::Null => 0,
619 Value::Boolean(_) => 1,
620 Value::Integer(_) => 2,
621 Value::Float(_) => 3,
622 Value::String(_) => 4,
623 Value::Node(_) => 5,
624 Value::Edge(_) => 6,
625 Value::Uri(_) => 7,
626 }
627 };
628 type_order(a).cmp(&type_order(b))
629 }
630 }
631}
632
633#[derive(Debug)]
635pub struct QueryIterDistinct {
636 upstream: Box<dyn BindingIterator>,
637 seen: HashSet<u64>,
638 cancelled: bool,
639}
640
641impl QueryIterDistinct {
642 pub fn new(upstream: Box<dyn BindingIterator>) -> Self {
644 Self {
645 upstream,
646 seen: HashSet::new(),
647 cancelled: false,
648 }
649 }
650
651 fn hash_binding(binding: &Binding) -> u64 {
653 use std::hash::{Hash, Hasher};
654 let mut hasher = std::collections::hash_map::DefaultHasher::new();
655 binding.hash(&mut hasher);
656 hasher.finish()
657 }
658}
659
660impl BindingIterator for QueryIterDistinct {
661 fn next_binding(&mut self) -> IterResult {
662 if self.cancelled {
663 return Err(IterError::Cancelled);
664 }
665
666 loop {
667 match self.upstream.next_binding()? {
668 Some(binding) => {
669 let hash = Self::hash_binding(&binding);
670 if self.seen.insert(hash) {
671 return Ok(Some(binding));
672 }
673 }
675 None => return Ok(None),
676 }
677 }
678 }
679
680 fn vars(&self) -> Vec<Var> {
681 self.upstream.vars()
682 }
683
684 fn cancel(&mut self) {
685 self.cancelled = true;
686 self.upstream.cancel();
687 }
688}
689
690pub struct QueryIter {
692 inner: Box<dyn BindingIterator>,
693}
694
695impl QueryIter {
696 pub fn new(inner: Box<dyn BindingIterator>) -> Self {
698 Self { inner }
699 }
700
701 pub fn vars(&self) -> Vec<Var> {
703 self.inner.vars()
704 }
705
706 pub fn cancel(&mut self) {
708 self.inner.cancel();
709 }
710}
711
712impl Iterator for QueryIter {
713 type Item = Result<Binding, IterError>;
714
715 fn next(&mut self) -> Option<Self::Item> {
716 match self.inner.next_binding() {
717 Ok(Some(binding)) => Some(Ok(binding)),
718 Ok(None) => None,
719 Err(e) => Some(Err(e)),
720 }
721 }
722}
723
724#[cfg(test)]
725mod tests {
726 use super::*;
727 use crate::storage::query::engine::binding::BindingBuilder;
728
729 fn make_binding(x: i64) -> Binding {
730 BindingBuilder::new()
731 .add_named("x", Value::Integer(x))
732 .build()
733 }
734
735 #[test]
736 fn test_base_iterator() {
737 let bindings = vec![make_binding(1), make_binding(2), make_binding(3)];
738 let mut iter = QueryIterBase::new(bindings);
739
740 assert!(iter.has_next());
741 assert!(iter.next_binding().unwrap().is_some());
742 assert!(iter.next_binding().unwrap().is_some());
743 assert!(iter.next_binding().unwrap().is_some());
744 assert!(iter.next_binding().unwrap().is_none());
745 }
746
747 #[test]
748 fn test_filter_iterator() {
749 let bindings = vec![make_binding(1), make_binding(2), make_binding(3)];
750 let base = Box::new(QueryIterBase::new(bindings));
751
752 let mut iter = QueryIterFilter::new(base, |b| {
753 b.get(&Var::new("x"))
754 .and_then(|v| v.as_integer())
755 .map(|i| i > 1)
756 .unwrap_or(false)
757 });
758
759 let b1 = iter.next_binding().unwrap().unwrap();
761 assert_eq!(b1.get(&Var::new("x")), Some(&Value::Integer(2)));
762
763 let b2 = iter.next_binding().unwrap().unwrap();
764 assert_eq!(b2.get(&Var::new("x")), Some(&Value::Integer(3)));
765
766 assert!(iter.next_binding().unwrap().is_none());
767 }
768
769 #[test]
770 fn test_slice_iterator() {
771 let bindings: Vec<_> = (1..=10).map(make_binding).collect();
772 let base = Box::new(QueryIterBase::new(bindings));
773
774 let mut iter = QueryIterSlice::new(base, 2, Some(3));
776
777 let b1 = iter.next_binding().unwrap().unwrap();
778 assert_eq!(b1.get(&Var::new("x")), Some(&Value::Integer(3)));
779
780 let b2 = iter.next_binding().unwrap().unwrap();
781 assert_eq!(b2.get(&Var::new("x")), Some(&Value::Integer(4)));
782
783 let b3 = iter.next_binding().unwrap().unwrap();
784 assert_eq!(b3.get(&Var::new("x")), Some(&Value::Integer(5)));
785
786 assert!(iter.next_binding().unwrap().is_none());
787 }
788
789 #[test]
790 fn test_project_iterator() {
791 let binding = BindingBuilder::new()
792 .add_named("x", Value::Integer(1))
793 .add_named("y", Value::Integer(2))
794 .add_named("z", Value::Integer(3))
795 .build();
796
797 let base = Box::new(QueryIterBase::single(binding));
798 let mut iter = QueryIterProject::new(base, vec![Var::new("x"), Var::new("z")]);
799
800 let result = iter.next_binding().unwrap().unwrap();
801 assert!(result.contains(&Var::new("x")));
802 assert!(!result.contains(&Var::new("y")));
803 assert!(result.contains(&Var::new("z")));
804 }
805
806 #[test]
807 fn test_union_iterator() {
808 let iter1 = Box::new(QueryIterBase::new(vec![make_binding(1), make_binding(2)]));
809 let iter2 = Box::new(QueryIterBase::new(vec![make_binding(3), make_binding(4)]));
810
811 let mut union = QueryIterUnion::new(vec![iter1, iter2]);
812
813 let mut results = Vec::new();
814 while let Some(b) = union.next_binding().unwrap() {
815 results.push(b.get(&Var::new("x")).unwrap().as_integer().unwrap());
816 }
817
818 assert_eq!(results, vec![1, 2, 3, 4]);
819 }
820
821 #[test]
822 fn test_sort_iterator() {
823 let bindings = vec![make_binding(3), make_binding(1), make_binding(2)];
824 let base = Box::new(QueryIterBase::new(bindings));
825
826 let mut iter = QueryIterSort::new(base, vec![SortKey::asc(Var::new("x"))]);
827
828 let mut results = Vec::new();
829 while let Some(b) = iter.next_binding().unwrap() {
830 results.push(b.get(&Var::new("x")).unwrap().as_integer().unwrap());
831 }
832
833 assert_eq!(results, vec![1, 2, 3]);
834 }
835
836 #[test]
837 fn test_distinct_iterator() {
838 let bindings = vec![
839 make_binding(1),
840 make_binding(2),
841 make_binding(1),
842 make_binding(3),
843 make_binding(2),
844 ];
845 let base = Box::new(QueryIterBase::new(bindings));
846
847 let mut iter = QueryIterDistinct::new(base);
848
849 let mut results = Vec::new();
850 while let Some(b) = iter.next_binding().unwrap() {
851 results.push(b.get(&Var::new("x")).unwrap().as_integer().unwrap());
852 }
853
854 assert_eq!(results, vec![1, 2, 3]);
855 }
856
857 #[test]
858 fn test_cancel_iterator() {
859 let bindings: Vec<_> = (1..=100).map(make_binding).collect();
860 let mut iter = QueryIterBase::new(bindings);
861
862 iter.next_binding().unwrap();
864 iter.next_binding().unwrap();
865
866 iter.cancel();
868
869 assert!(matches!(iter.next_binding(), Err(IterError::Cancelled)));
871 }
872
873 #[test]
874 fn test_query_iter_wrapper() {
875 let bindings = vec![make_binding(1), make_binding(2)];
876 let base = Box::new(QueryIterBase::new(bindings));
877
878 let iter = QueryIter::new(base);
879 let results: Vec<_> = iter.collect();
880
881 assert_eq!(results.len(), 2);
882 assert!(results.iter().all(|r| r.is_ok()));
883 }
884}