1use super::binding::{Binding, Value, Var};
13use super::cancel::CancelToken;
14use std::collections::HashSet;
15use std::fmt::Debug;
16
17#[inline]
24fn check_shared_cancel(token: &Option<CancelToken>) -> Result<(), IterError> {
25 if let Some(token) = token {
26 if token.is_cancelled() {
27 return Err(IterError::Cancelled);
28 }
29 }
30 Ok(())
31}
32
33pub type IterResult = Result<Option<Binding>, IterError>;
35
36#[derive(Debug, Clone)]
38pub enum IterError {
39 Execution(String),
41 Timeout,
43 Cancelled,
45 ResourceExhausted(String),
47}
48
49impl std::fmt::Display for IterError {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 match self {
52 IterError::Execution(msg) => write!(f, "Execution error: {}", msg),
53 IterError::Timeout => write!(f, "Query timeout"),
54 IterError::Cancelled => write!(f, "Query cancelled"),
55 IterError::ResourceExhausted(msg) => write!(f, "Resource exhausted: {}", msg),
56 }
57 }
58}
59
60impl std::error::Error for IterError {}
61
62pub trait BindingIterator: Debug + Send {
64 fn next_binding(&mut self) -> IterResult;
66
67 fn has_next(&self) -> bool {
69 true }
71
72 fn vars(&self) -> Vec<Var>;
74
75 fn cancel(&mut self);
77
78 fn reset(&mut self) -> bool {
80 false }
82}
83
84#[derive(Debug)]
86pub struct QueryIterBase {
87 bindings: Vec<Binding>,
88 index: usize,
89 vars: Vec<Var>,
90 cancelled: bool,
91 cancel: Option<CancelToken>,
95}
96
97impl QueryIterBase {
98 pub fn new(bindings: Vec<Binding>) -> Self {
100 let vars = if let Some(first) = bindings.first() {
101 first.all_vars().into_iter().cloned().collect()
102 } else {
103 Vec::new()
104 };
105
106 Self {
107 bindings,
108 index: 0,
109 vars,
110 cancelled: false,
111 cancel: None,
112 }
113 }
114
115 pub fn empty() -> Self {
117 Self {
118 bindings: Vec::new(),
119 index: 0,
120 vars: Vec::new(),
121 cancelled: false,
122 cancel: None,
123 }
124 }
125
126 pub fn single(binding: Binding) -> Self {
128 let vars = binding.all_vars().into_iter().cloned().collect();
129 Self {
130 bindings: vec![binding],
131 index: 0,
132 vars,
133 cancelled: false,
134 cancel: None,
135 }
136 }
137
138 pub fn with_cancel(mut self, token: CancelToken) -> Self {
143 self.cancel = Some(token);
144 self
145 }
146}
147
148impl BindingIterator for QueryIterBase {
149 fn next_binding(&mut self) -> IterResult {
150 if self.cancelled {
151 return Err(IterError::Cancelled);
152 }
153 check_shared_cancel(&self.cancel)?;
154
155 if self.index < self.bindings.len() {
156 let binding = self.bindings[self.index].clone();
157 self.index += 1;
158 Ok(Some(binding))
159 } else {
160 Ok(None)
161 }
162 }
163
164 fn has_next(&self) -> bool {
165 !self.cancelled && self.index < self.bindings.len()
166 }
167
168 fn vars(&self) -> Vec<Var> {
169 self.vars.clone()
170 }
171
172 fn cancel(&mut self) {
173 self.cancelled = true;
174 }
175
176 fn reset(&mut self) -> bool {
177 self.index = 0;
178 self.cancelled = false;
179 true
180 }
181}
182
183pub struct QueryIterFilter {
185 upstream: Box<dyn BindingIterator>,
186 predicate: Box<dyn Fn(&Binding) -> bool + Send + Sync>,
187 cancelled: bool,
188 cancel: Option<CancelToken>,
193}
194
195impl std::fmt::Debug for QueryIterFilter {
196 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197 f.debug_struct("QueryIterFilter")
198 .field("upstream", &self.upstream)
199 .field("predicate", &"<filter fn>")
200 .field("cancelled", &self.cancelled)
201 .finish()
202 }
203}
204
205impl QueryIterFilter {
206 pub fn new<F>(upstream: Box<dyn BindingIterator>, predicate: F) -> Self
208 where
209 F: Fn(&Binding) -> bool + Send + Sync + 'static,
210 {
211 Self {
212 upstream,
213 predicate: Box::new(predicate),
214 cancelled: false,
215 cancel: None,
216 }
217 }
218
219 pub fn with_cancel(mut self, token: CancelToken) -> Self {
221 self.cancel = Some(token);
222 self
223 }
224}
225
226impl BindingIterator for QueryIterFilter {
227 fn next_binding(&mut self) -> IterResult {
228 if self.cancelled {
229 return Err(IterError::Cancelled);
230 }
231
232 loop {
233 check_shared_cancel(&self.cancel)?;
234 match self.upstream.next_binding()? {
235 Some(binding) => {
236 if (self.predicate)(&binding) {
237 return Ok(Some(binding));
238 }
239 }
241 None => return Ok(None),
242 }
243 }
244 }
245
246 fn vars(&self) -> Vec<Var> {
247 self.upstream.vars()
248 }
249
250 fn cancel(&mut self) {
251 self.cancelled = true;
252 self.upstream.cancel();
253 }
254}
255
256pub struct QueryIterJoin {
258 left: Box<dyn BindingIterator>,
259 right_factory: Box<dyn Fn() -> Box<dyn BindingIterator> + Send + Sync>,
260 current_left: Option<Binding>,
261 current_right: Option<Box<dyn BindingIterator>>,
262 vars: Vec<Var>,
263 cancelled: bool,
264 cancel: Option<CancelToken>,
269}
270
271impl std::fmt::Debug for QueryIterJoin {
272 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273 f.debug_struct("QueryIterJoin")
274 .field("left", &self.left)
275 .field("right_factory", &"<factory fn>")
276 .field("current_left", &self.current_left)
277 .field("current_right", &self.current_right)
278 .field("vars", &self.vars)
279 .field("cancelled", &self.cancelled)
280 .finish()
281 }
282}
283
284impl QueryIterJoin {
285 pub fn new<F>(left: Box<dyn BindingIterator>, right_factory: F, right_vars: Vec<Var>) -> Self
287 where
288 F: Fn() -> Box<dyn BindingIterator> + Send + Sync + 'static,
289 {
290 let mut vars = left.vars();
291 for v in right_vars {
292 if !vars.contains(&v) {
293 vars.push(v);
294 }
295 }
296
297 Self {
298 left,
299 right_factory: Box::new(right_factory),
300 current_left: None,
301 current_right: None,
302 vars,
303 cancelled: false,
304 cancel: None,
305 }
306 }
307
308 pub fn with_cancel(mut self, token: CancelToken) -> Self {
310 self.cancel = Some(token);
311 self
312 }
313}
314
315impl BindingIterator for QueryIterJoin {
316 fn next_binding(&mut self) -> IterResult {
317 if self.cancelled {
318 return Err(IterError::Cancelled);
319 }
320
321 loop {
322 check_shared_cancel(&self.cancel)?;
323 if let Some(ref mut right) = self.current_right {
325 if let Some(right_binding) = right.next_binding()? {
326 if let Some(ref left_binding) = self.current_left {
328 if let Some(merged) = left_binding.merge(&right_binding) {
329 return Ok(Some(merged));
330 }
331 continue;
333 }
334 }
335 }
336
337 match self.left.next_binding()? {
339 Some(left_binding) => {
340 self.current_left = Some(left_binding);
341 self.current_right = Some((self.right_factory)());
342 }
343 None => return Ok(None),
344 }
345 }
346 }
347
348 fn vars(&self) -> Vec<Var> {
349 self.vars.clone()
350 }
351
352 fn cancel(&mut self) {
353 self.cancelled = true;
354 self.left.cancel();
355 if let Some(ref mut right) = self.current_right {
356 right.cancel();
357 }
358 }
359}
360
361#[derive(Debug)]
363pub struct QueryIterUnion {
364 iterators: Vec<Box<dyn BindingIterator>>,
365 current_index: usize,
366 vars: Vec<Var>,
367 cancelled: bool,
368}
369
370impl QueryIterUnion {
371 pub fn new(iterators: Vec<Box<dyn BindingIterator>>) -> Self {
373 let mut vars = Vec::new();
374 for iter in &iterators {
375 for v in iter.vars() {
376 if !vars.contains(&v) {
377 vars.push(v);
378 }
379 }
380 }
381
382 Self {
383 iterators,
384 current_index: 0,
385 vars,
386 cancelled: false,
387 }
388 }
389}
390
391impl BindingIterator for QueryIterUnion {
392 fn next_binding(&mut self) -> IterResult {
393 if self.cancelled {
394 return Err(IterError::Cancelled);
395 }
396
397 while self.current_index < self.iterators.len() {
398 match self.iterators[self.current_index].next_binding()? {
399 Some(binding) => return Ok(Some(binding)),
400 None => {
401 self.current_index += 1;
402 }
403 }
404 }
405
406 Ok(None)
407 }
408
409 fn vars(&self) -> Vec<Var> {
410 self.vars.clone()
411 }
412
413 fn cancel(&mut self) {
414 self.cancelled = true;
415 for iter in &mut self.iterators {
416 iter.cancel();
417 }
418 }
419}
420
421#[derive(Debug)]
423pub struct QueryIterProject {
424 upstream: Box<dyn BindingIterator>,
425 project_vars: Vec<Var>,
426 cancelled: bool,
427}
428
429impl QueryIterProject {
430 pub fn new(upstream: Box<dyn BindingIterator>, vars: Vec<Var>) -> Self {
432 Self {
433 upstream,
434 project_vars: vars,
435 cancelled: false,
436 }
437 }
438}
439
440impl BindingIterator for QueryIterProject {
441 fn next_binding(&mut self) -> IterResult {
442 if self.cancelled {
443 return Err(IterError::Cancelled);
444 }
445
446 match self.upstream.next_binding()? {
447 Some(binding) => Ok(Some(binding.project(&self.project_vars))),
448 None => Ok(None),
449 }
450 }
451
452 fn vars(&self) -> Vec<Var> {
453 self.project_vars.clone()
454 }
455
456 fn cancel(&mut self) {
457 self.cancelled = true;
458 self.upstream.cancel();
459 }
460}
461
462#[derive(Debug)]
464pub struct QueryIterSlice {
465 upstream: Box<dyn BindingIterator>,
466 offset: u64,
467 limit: Option<u64>,
468 skipped: u64,
469 returned: u64,
470 cancelled: bool,
471}
472
473impl QueryIterSlice {
474 pub fn new(upstream: Box<dyn BindingIterator>, offset: u64, limit: Option<u64>) -> Self {
476 Self {
477 upstream,
478 offset,
479 limit,
480 skipped: 0,
481 returned: 0,
482 cancelled: false,
483 }
484 }
485
486 pub fn limit(upstream: Box<dyn BindingIterator>, limit: u64) -> Self {
488 Self::new(upstream, 0, Some(limit))
489 }
490
491 pub fn offset(upstream: Box<dyn BindingIterator>, offset: u64) -> Self {
493 Self::new(upstream, offset, None)
494 }
495}
496
497impl BindingIterator for QueryIterSlice {
498 fn next_binding(&mut self) -> IterResult {
499 if self.cancelled {
500 return Err(IterError::Cancelled);
501 }
502
503 if let Some(limit) = self.limit {
505 if self.returned >= limit {
506 return Ok(None);
507 }
508 }
509
510 while self.skipped < self.offset {
512 match self.upstream.next_binding()? {
513 Some(_) => {
514 self.skipped += 1;
515 }
516 None => return Ok(None),
517 }
518 }
519
520 match self.upstream.next_binding()? {
522 Some(binding) => {
523 self.returned += 1;
524 Ok(Some(binding))
525 }
526 None => Ok(None),
527 }
528 }
529
530 fn vars(&self) -> Vec<Var> {
531 self.upstream.vars()
532 }
533
534 fn cancel(&mut self) {
535 self.cancelled = true;
536 self.upstream.cancel();
537 }
538}
539
540#[derive(Debug)]
542pub struct QueryIterSort {
543 upstream: Box<dyn BindingIterator>,
544 comparators: Vec<SortKey>,
545 sorted: Option<Vec<Binding>>,
546 index: usize,
547 cancelled: bool,
548}
549
550#[derive(Debug, Clone)]
552pub struct SortKey {
553 pub var: Var,
555 pub ascending: bool,
557}
558
559impl SortKey {
560 pub fn asc(var: Var) -> Self {
562 Self {
563 var,
564 ascending: true,
565 }
566 }
567
568 pub fn desc(var: Var) -> Self {
570 Self {
571 var,
572 ascending: false,
573 }
574 }
575}
576
577impl QueryIterSort {
578 pub fn new(upstream: Box<dyn BindingIterator>, comparators: Vec<SortKey>) -> Self {
580 Self {
581 upstream,
582 comparators,
583 sorted: None,
584 index: 0,
585 cancelled: false,
586 }
587 }
588
589 fn materialize(&mut self) -> Result<(), IterError> {
591 if self.sorted.is_some() {
592 return Ok(());
593 }
594
595 let mut bindings = Vec::new();
596 while let Some(b) = self.upstream.next_binding()? {
597 bindings.push(b);
598 }
599
600 let comparators = self.comparators.clone();
602 bindings.sort_by(|a, b| {
603 for key in &comparators {
604 let a_val = a.get(&key.var);
605 let b_val = b.get(&key.var);
606
607 let ordering = compare_values(a_val, b_val);
608 if ordering != std::cmp::Ordering::Equal {
609 return if key.ascending {
610 ordering
611 } else {
612 ordering.reverse()
613 };
614 }
615 }
616 std::cmp::Ordering::Equal
617 });
618
619 self.sorted = Some(bindings);
620 Ok(())
621 }
622}
623
624impl BindingIterator for QueryIterSort {
625 fn next_binding(&mut self) -> IterResult {
626 if self.cancelled {
627 return Err(IterError::Cancelled);
628 }
629
630 self.materialize()?;
631
632 if let Some(ref sorted) = self.sorted {
633 if self.index < sorted.len() {
634 let binding = sorted[self.index].clone();
635 self.index += 1;
636 return Ok(Some(binding));
637 }
638 }
639
640 Ok(None)
641 }
642
643 fn vars(&self) -> Vec<Var> {
644 self.upstream.vars()
645 }
646
647 fn cancel(&mut self) {
648 self.cancelled = true;
649 self.upstream.cancel();
650 }
651}
652
653fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
655 match (a, b) {
656 (None, None) => std::cmp::Ordering::Equal,
657 (None, Some(_)) => std::cmp::Ordering::Less,
658 (Some(_), None) => std::cmp::Ordering::Greater,
659 (Some(a_val), Some(b_val)) => compare_value(a_val, b_val),
660 }
661}
662
663fn compare_value(a: &Value, b: &Value) -> std::cmp::Ordering {
665 match (a, b) {
666 (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
667 (Value::Float(a), Value::Float(b)) => a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal),
668 (Value::String(a), Value::String(b)) => a.cmp(b),
669 (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
670 (Value::Node(a), Value::Node(b)) => a.cmp(b),
671 (Value::Edge(a), Value::Edge(b)) => a.cmp(b),
672 (Value::Uri(a), Value::Uri(b)) => a.cmp(b),
673 (Value::Null, Value::Null) => std::cmp::Ordering::Equal,
674 _ => {
676 let type_order = |v: &Value| -> u8 {
677 match v {
678 Value::Null => 0,
679 Value::Boolean(_) => 1,
680 Value::Integer(_) => 2,
681 Value::Float(_) => 3,
682 Value::String(_) => 4,
683 Value::Node(_) => 5,
684 Value::Edge(_) => 6,
685 Value::Uri(_) => 7,
686 }
687 };
688 type_order(a).cmp(&type_order(b))
689 }
690 }
691}
692
693#[derive(Debug)]
695pub struct QueryIterDistinct {
696 upstream: Box<dyn BindingIterator>,
697 seen: HashSet<u64>,
698 cancelled: bool,
699}
700
701impl QueryIterDistinct {
702 pub fn new(upstream: Box<dyn BindingIterator>) -> Self {
704 Self {
705 upstream,
706 seen: HashSet::new(),
707 cancelled: false,
708 }
709 }
710
711 fn hash_binding(binding: &Binding) -> u64 {
713 use std::hash::{Hash, Hasher};
714 let mut hasher = std::collections::hash_map::DefaultHasher::new();
715 binding.hash(&mut hasher);
716 hasher.finish()
717 }
718}
719
720impl BindingIterator for QueryIterDistinct {
721 fn next_binding(&mut self) -> IterResult {
722 if self.cancelled {
723 return Err(IterError::Cancelled);
724 }
725
726 loop {
727 match self.upstream.next_binding()? {
728 Some(binding) => {
729 let hash = Self::hash_binding(&binding);
730 if self.seen.insert(hash) {
731 return Ok(Some(binding));
732 }
733 }
735 None => return Ok(None),
736 }
737 }
738 }
739
740 fn vars(&self) -> Vec<Var> {
741 self.upstream.vars()
742 }
743
744 fn cancel(&mut self) {
745 self.cancelled = true;
746 self.upstream.cancel();
747 }
748}
749
750pub struct QueryIter {
752 inner: Box<dyn BindingIterator>,
753}
754
755impl QueryIter {
756 pub fn new(inner: Box<dyn BindingIterator>) -> Self {
758 Self { inner }
759 }
760
761 pub fn vars(&self) -> Vec<Var> {
763 self.inner.vars()
764 }
765
766 pub fn cancel(&mut self) {
768 self.inner.cancel();
769 }
770}
771
772impl Iterator for QueryIter {
773 type Item = Result<Binding, IterError>;
774
775 fn next(&mut self) -> Option<Self::Item> {
776 match self.inner.next_binding() {
777 Ok(Some(binding)) => Some(Ok(binding)),
778 Ok(None) => None,
779 Err(e) => Some(Err(e)),
780 }
781 }
782}
783
784#[cfg(test)]
785mod tests {
786 use super::*;
787 use crate::storage::query::engine::binding::BindingBuilder;
788
789 fn make_binding(x: i64) -> Binding {
790 BindingBuilder::new()
791 .add_named("x", Value::Integer(x))
792 .build()
793 }
794
795 #[test]
796 fn test_base_iterator() {
797 let bindings = vec![make_binding(1), make_binding(2), make_binding(3)];
798 let mut iter = QueryIterBase::new(bindings);
799
800 assert!(iter.has_next());
801 assert!(iter.next_binding().unwrap().is_some());
802 assert!(iter.next_binding().unwrap().is_some());
803 assert!(iter.next_binding().unwrap().is_some());
804 assert!(iter.next_binding().unwrap().is_none());
805 }
806
807 #[test]
808 fn test_filter_iterator() {
809 let bindings = vec![make_binding(1), make_binding(2), make_binding(3)];
810 let base = Box::new(QueryIterBase::new(bindings));
811
812 let mut iter = QueryIterFilter::new(base, |b| {
813 b.get(&Var::new("x"))
814 .and_then(|v| v.as_integer())
815 .map(|i| i > 1)
816 .unwrap_or(false)
817 });
818
819 let b1 = iter.next_binding().unwrap().unwrap();
821 assert_eq!(b1.get(&Var::new("x")), Some(&Value::Integer(2)));
822
823 let b2 = iter.next_binding().unwrap().unwrap();
824 assert_eq!(b2.get(&Var::new("x")), Some(&Value::Integer(3)));
825
826 assert!(iter.next_binding().unwrap().is_none());
827 }
828
829 #[test]
830 fn test_slice_iterator() {
831 let bindings: Vec<_> = (1..=10).map(make_binding).collect();
832 let base = Box::new(QueryIterBase::new(bindings));
833
834 let mut iter = QueryIterSlice::new(base, 2, Some(3));
836
837 let b1 = iter.next_binding().unwrap().unwrap();
838 assert_eq!(b1.get(&Var::new("x")), Some(&Value::Integer(3)));
839
840 let b2 = iter.next_binding().unwrap().unwrap();
841 assert_eq!(b2.get(&Var::new("x")), Some(&Value::Integer(4)));
842
843 let b3 = iter.next_binding().unwrap().unwrap();
844 assert_eq!(b3.get(&Var::new("x")), Some(&Value::Integer(5)));
845
846 assert!(iter.next_binding().unwrap().is_none());
847 }
848
849 #[test]
850 fn test_project_iterator() {
851 let binding = BindingBuilder::new()
852 .add_named("x", Value::Integer(1))
853 .add_named("y", Value::Integer(2))
854 .add_named("z", Value::Integer(3))
855 .build();
856
857 let base = Box::new(QueryIterBase::single(binding));
858 let mut iter = QueryIterProject::new(base, vec![Var::new("x"), Var::new("z")]);
859
860 let result = iter.next_binding().unwrap().unwrap();
861 assert!(result.contains(&Var::new("x")));
862 assert!(!result.contains(&Var::new("y")));
863 assert!(result.contains(&Var::new("z")));
864 }
865
866 #[test]
867 fn test_union_iterator() {
868 let iter1 = Box::new(QueryIterBase::new(vec![make_binding(1), make_binding(2)]));
869 let iter2 = Box::new(QueryIterBase::new(vec![make_binding(3), make_binding(4)]));
870
871 let mut union = QueryIterUnion::new(vec![iter1, iter2]);
872
873 let mut results = Vec::new();
874 while let Some(b) = union.next_binding().unwrap() {
875 results.push(b.get(&Var::new("x")).unwrap().as_integer().unwrap());
876 }
877
878 assert_eq!(results, vec![1, 2, 3, 4]);
879 }
880
881 #[test]
882 fn test_sort_iterator() {
883 let bindings = vec![make_binding(3), make_binding(1), make_binding(2)];
884 let base = Box::new(QueryIterBase::new(bindings));
885
886 let mut iter = QueryIterSort::new(base, vec![SortKey::asc(Var::new("x"))]);
887
888 let mut results = Vec::new();
889 while let Some(b) = iter.next_binding().unwrap() {
890 results.push(b.get(&Var::new("x")).unwrap().as_integer().unwrap());
891 }
892
893 assert_eq!(results, vec![1, 2, 3]);
894 }
895
896 #[test]
897 fn test_distinct_iterator() {
898 let bindings = vec![
899 make_binding(1),
900 make_binding(2),
901 make_binding(1),
902 make_binding(3),
903 make_binding(2),
904 ];
905 let base = Box::new(QueryIterBase::new(bindings));
906
907 let mut iter = QueryIterDistinct::new(base);
908
909 let mut results = Vec::new();
910 while let Some(b) = iter.next_binding().unwrap() {
911 results.push(b.get(&Var::new("x")).unwrap().as_integer().unwrap());
912 }
913
914 assert_eq!(results, vec![1, 2, 3]);
915 }
916
917 #[test]
918 fn test_cancel_iterator() {
919 let bindings: Vec<_> = (1..=100).map(make_binding).collect();
920 let mut iter = QueryIterBase::new(bindings);
921
922 iter.next_binding().unwrap();
924 iter.next_binding().unwrap();
925
926 iter.cancel();
928
929 assert!(matches!(iter.next_binding(), Err(IterError::Cancelled)));
931 }
932
933 #[test]
934 fn test_query_iter_wrapper() {
935 let bindings = vec![make_binding(1), make_binding(2)];
936 let base = Box::new(QueryIterBase::new(bindings));
937
938 let iter = QueryIter::new(base);
939 let results: Vec<_> = iter.collect();
940
941 assert_eq!(results.len(), 2);
942 assert!(results.iter().all(|r| r.is_ok()));
943 }
944
945 #[derive(Debug)]
952 struct InfiniteScan {
953 n: i64,
954 }
955
956 impl BindingIterator for InfiniteScan {
957 fn next_binding(&mut self) -> IterResult {
958 self.n += 1;
959 Ok(Some(make_binding(self.n)))
960 }
961
962 fn vars(&self) -> Vec<Var> {
963 vec![Var::new("x")]
964 }
965
966 fn cancel(&mut self) {}
967 }
968
969 #[test]
970 fn shared_token_cancels_scan() {
971 let token = CancelToken::new();
972 let bindings: Vec<_> = (1..=1000).map(make_binding).collect();
973 let mut scan = QueryIterBase::new(bindings).with_cancel(token.clone());
974
975 assert!(scan.next_binding().unwrap().is_some());
976 token.cancel();
977 assert!(matches!(scan.next_binding(), Err(IterError::Cancelled)));
980 }
981
982 #[test]
983 fn shared_token_cancels_filter_within_budget() {
984 use std::time::{Duration, Instant};
985
986 let token = CancelToken::new();
987 let upstream = Box::new(InfiniteScan { n: 0 });
990 let mut filter = QueryIterFilter::new(upstream, |_| false).with_cancel(token.clone());
991
992 let canceller = token.clone();
993 std::thread::spawn(move || {
994 std::thread::sleep(Duration::from_millis(20));
995 canceller.cancel();
996 });
997
998 let start = Instant::now();
999 let result = filter.next_binding();
1000 let elapsed = start.elapsed();
1001
1002 assert!(matches!(result, Err(IterError::Cancelled)));
1003 assert!(
1006 elapsed < Duration::from_secs(1),
1007 "cancel took too long to take effect: {elapsed:?}"
1008 );
1009 }
1010
1011 #[test]
1012 fn shared_token_cancels_join_within_budget() {
1013 use std::time::{Duration, Instant};
1014
1015 let token = CancelToken::new();
1016 let left = Box::new(
1020 QueryIterFilter::new(Box::new(InfiniteScan { n: 0 }), |_| false)
1021 .with_cancel(token.clone()),
1022 );
1023 let mut join = QueryIterJoin::new(
1024 left,
1025 || Box::new(QueryIterBase::empty()),
1026 vec![Var::new("y")],
1027 )
1028 .with_cancel(token.clone());
1029
1030 let canceller = token.clone();
1031 std::thread::spawn(move || {
1032 std::thread::sleep(Duration::from_millis(20));
1033 canceller.cancel();
1034 });
1035
1036 let start = Instant::now();
1037 let result = join.next_binding();
1038 let elapsed = start.elapsed();
1039
1040 assert!(matches!(result, Err(IterError::Cancelled)));
1041 assert!(
1042 elapsed < Duration::from_secs(1),
1043 "join cancel took too long: {elapsed:?}"
1044 );
1045 }
1046
1047 #[test]
1048 fn cancellation_propagates_through_aggregate_consumer() {
1049 let token = CancelToken::new();
1055 let upstream: Box<dyn BindingIterator> = Box::new(
1056 QueryIterFilter::new(Box::new(InfiniteScan { n: 0 }), |_| false)
1057 .with_cancel(token.clone()),
1058 );
1059
1060 fn count_all(mut it: Box<dyn BindingIterator>) -> Result<u64, IterError> {
1062 let mut n = 0;
1063 while it.next_binding()?.is_some() {
1064 n += 1;
1065 }
1066 Ok(n)
1067 }
1068
1069 token.cancel();
1070 assert!(matches!(count_all(upstream), Err(IterError::Cancelled)));
1071 }
1072}