Skip to main content

reddb_server/storage/query/engine/
iterator.rs

1//! Query Result Iterators
2//!
3//! Lazy evaluation of query results through binding streams.
4//!
5//! # Design
6//!
7//! - Pull-based iteration (demand-driven)
8//! - Composable iterator wrappers
9//! - Memory-efficient streaming
10//! - Early termination support
11
12use super::binding::{Binding, Value, Var};
13use super::cancel::CancelToken;
14use std::collections::HashSet;
15use std::fmt::Debug;
16
17/// Issue #808 / 750d — cooperative cancellation check shared by every
18/// pull-based operator. Returns [`IterError::Cancelled`] the moment the
19/// stream layer raises the token, so a long-running scan / filter / join
20/// loop stops within one row of the cancel signal rather than running to
21/// completion. A `None` token (the default) never cancels, so operators
22/// built without a token behave exactly as before.
23#[inline]
24fn check_shared_cancel(token: &Option<CancelToken>) -> Result<(), IterError> {
25    if let Some(token) = token {
26        if token.is_cancelled() {
27            return Err(IterError::Cancelled);
28        }
29    }
30    Ok(())
31}
32
33/// Result from iterator operations
34pub type IterResult = Result<Option<Binding>, IterError>;
35
36/// Iterator errors
37#[derive(Debug, Clone)]
38pub enum IterError {
39    /// Execution error
40    Execution(String),
41    /// Timeout
42    Timeout,
43    /// Cancelled
44    Cancelled,
45    /// Resource exhausted
46    ResourceExhausted(String),
47}
48
49impl std::fmt::Display for IterError {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        match self {
52            IterError::Execution(msg) => write!(f, "Execution error: {}", msg),
53            IterError::Timeout => write!(f, "Query timeout"),
54            IterError::Cancelled => write!(f, "Query cancelled"),
55            IterError::ResourceExhausted(msg) => write!(f, "Resource exhausted: {}", msg),
56        }
57    }
58}
59
60impl std::error::Error for IterError {}
61
62/// Core trait for binding iterators
63pub trait BindingIterator: Debug + Send {
64    /// Get next binding
65    fn next_binding(&mut self) -> IterResult;
66
67    /// Check if there are more results (optional hint)
68    fn has_next(&self) -> bool {
69        true // Default: unknown, caller should try next()
70    }
71
72    /// Get variables produced by this iterator
73    fn vars(&self) -> Vec<Var>;
74
75    /// Cancel iteration
76    fn cancel(&mut self);
77
78    /// Reset iterator to beginning (if supported)
79    fn reset(&mut self) -> bool {
80        false // Default: not supported
81    }
82}
83
84/// Base query iterator wrapping a binding source
85#[derive(Debug)]
86pub struct QueryIterBase {
87    bindings: Vec<Binding>,
88    index: usize,
89    vars: Vec<Var>,
90    cancelled: bool,
91    /// Issue #808 / 750d — shared cancel token observed on every pull so a
92    /// scan over a large binding set stops promptly when the stream is
93    /// cancelled. `None` (the default) preserves the legacy behaviour.
94    cancel: Option<CancelToken>,
95}
96
97impl QueryIterBase {
98    /// Create from binding list
99    pub fn new(bindings: Vec<Binding>) -> Self {
100        let vars = if let Some(first) = bindings.first() {
101            first.all_vars().into_iter().cloned().collect()
102        } else {
103            Vec::new()
104        };
105
106        Self {
107            bindings,
108            index: 0,
109            vars,
110            cancelled: false,
111            cancel: None,
112        }
113    }
114
115    /// Create empty iterator
116    pub fn empty() -> Self {
117        Self {
118            bindings: Vec::new(),
119            index: 0,
120            vars: Vec::new(),
121            cancelled: false,
122            cancel: None,
123        }
124    }
125
126    /// Create single-result iterator
127    pub fn single(binding: Binding) -> Self {
128        let vars = binding.all_vars().into_iter().cloned().collect();
129        Self {
130            bindings: vec![binding],
131            index: 0,
132            vars,
133            cancelled: false,
134            cancel: None,
135        }
136    }
137
138    /// Issue #808 / 750d — attach a shared cancel token. The scan checks it
139    /// on every `next_binding`, so cancelling the token (from the stream
140    /// layer on disconnect or explicit cancel) halts production within one
141    /// row regardless of how many bindings remain.
142    pub fn with_cancel(mut self, token: CancelToken) -> Self {
143        self.cancel = Some(token);
144        self
145    }
146}
147
148impl BindingIterator for QueryIterBase {
149    fn next_binding(&mut self) -> IterResult {
150        if self.cancelled {
151            return Err(IterError::Cancelled);
152        }
153        check_shared_cancel(&self.cancel)?;
154
155        if self.index < self.bindings.len() {
156            let binding = self.bindings[self.index].clone();
157            self.index += 1;
158            Ok(Some(binding))
159        } else {
160            Ok(None)
161        }
162    }
163
164    fn has_next(&self) -> bool {
165        !self.cancelled && self.index < self.bindings.len()
166    }
167
168    fn vars(&self) -> Vec<Var> {
169        self.vars.clone()
170    }
171
172    fn cancel(&mut self) {
173        self.cancelled = true;
174    }
175
176    fn reset(&mut self) -> bool {
177        self.index = 0;
178        self.cancelled = false;
179        true
180    }
181}
182
183/// Filter iterator - applies predicate to upstream
184pub struct QueryIterFilter {
185    upstream: Box<dyn BindingIterator>,
186    predicate: Box<dyn Fn(&Binding) -> bool + Send + Sync>,
187    cancelled: bool,
188    /// Issue #808 / 750d — shared cancel token. A highly selective filter
189    /// can pull many upstream rows before yielding one; the token is
190    /// re-checked on every upstream pull so the rejection loop cannot
191    /// outrun the cancel signal.
192    cancel: Option<CancelToken>,
193}
194
195impl std::fmt::Debug for QueryIterFilter {
196    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197        f.debug_struct("QueryIterFilter")
198            .field("upstream", &self.upstream)
199            .field("predicate", &"<filter fn>")
200            .field("cancelled", &self.cancelled)
201            .finish()
202    }
203}
204
205impl QueryIterFilter {
206    /// Create filter iterator
207    pub fn new<F>(upstream: Box<dyn BindingIterator>, predicate: F) -> Self
208    where
209        F: Fn(&Binding) -> bool + Send + Sync + 'static,
210    {
211        Self {
212            upstream,
213            predicate: Box::new(predicate),
214            cancelled: false,
215            cancel: None,
216        }
217    }
218
219    /// Issue #808 / 750d — attach a shared cancel token (see [`QueryIterBase::with_cancel`]).
220    pub fn with_cancel(mut self, token: CancelToken) -> Self {
221        self.cancel = Some(token);
222        self
223    }
224}
225
226impl BindingIterator for QueryIterFilter {
227    fn next_binding(&mut self) -> IterResult {
228        if self.cancelled {
229            return Err(IterError::Cancelled);
230        }
231
232        loop {
233            check_shared_cancel(&self.cancel)?;
234            match self.upstream.next_binding()? {
235                Some(binding) => {
236                    if (self.predicate)(&binding) {
237                        return Ok(Some(binding));
238                    }
239                    // Continue to next
240                }
241                None => return Ok(None),
242            }
243        }
244    }
245
246    fn vars(&self) -> Vec<Var> {
247        self.upstream.vars()
248    }
249
250    fn cancel(&mut self) {
251        self.cancelled = true;
252        self.upstream.cancel();
253    }
254}
255
256/// Join iterator - nested loop join
257pub struct QueryIterJoin {
258    left: Box<dyn BindingIterator>,
259    right_factory: Box<dyn Fn() -> Box<dyn BindingIterator> + Send + Sync>,
260    current_left: Option<Binding>,
261    current_right: Option<Box<dyn BindingIterator>>,
262    vars: Vec<Var>,
263    cancelled: bool,
264    /// Issue #808 / 750d — shared cancel token. A nested-loop join is the
265    /// worst case for unbounded work (|left| × |right|); the token is
266    /// re-checked on every iteration of the merge loop so a cancel stops
267    /// the join without draining the cross product.
268    cancel: Option<CancelToken>,
269}
270
271impl std::fmt::Debug for QueryIterJoin {
272    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273        f.debug_struct("QueryIterJoin")
274            .field("left", &self.left)
275            .field("right_factory", &"<factory fn>")
276            .field("current_left", &self.current_left)
277            .field("current_right", &self.current_right)
278            .field("vars", &self.vars)
279            .field("cancelled", &self.cancelled)
280            .finish()
281    }
282}
283
284impl QueryIterJoin {
285    /// Create join iterator
286    pub fn new<F>(left: Box<dyn BindingIterator>, right_factory: F, right_vars: Vec<Var>) -> Self
287    where
288        F: Fn() -> Box<dyn BindingIterator> + Send + Sync + 'static,
289    {
290        let mut vars = left.vars();
291        for v in right_vars {
292            if !vars.contains(&v) {
293                vars.push(v);
294            }
295        }
296
297        Self {
298            left,
299            right_factory: Box::new(right_factory),
300            current_left: None,
301            current_right: None,
302            vars,
303            cancelled: false,
304            cancel: None,
305        }
306    }
307
308    /// Issue #808 / 750d — attach a shared cancel token (see [`QueryIterBase::with_cancel`]).
309    pub fn with_cancel(mut self, token: CancelToken) -> Self {
310        self.cancel = Some(token);
311        self
312    }
313}
314
315impl BindingIterator for QueryIterJoin {
316    fn next_binding(&mut self) -> IterResult {
317        if self.cancelled {
318            return Err(IterError::Cancelled);
319        }
320
321        loop {
322            check_shared_cancel(&self.cancel)?;
323            // Try to get next from current right
324            if let Some(ref mut right) = self.current_right {
325                if let Some(right_binding) = right.next_binding()? {
326                    // Merge with current left
327                    if let Some(ref left_binding) = self.current_left {
328                        if let Some(merged) = left_binding.merge(&right_binding) {
329                            return Ok(Some(merged));
330                        }
331                        // Conflict, continue to next right
332                        continue;
333                    }
334                }
335            }
336
337            // Need new left binding
338            match self.left.next_binding()? {
339                Some(left_binding) => {
340                    self.current_left = Some(left_binding);
341                    self.current_right = Some((self.right_factory)());
342                }
343                None => return Ok(None),
344            }
345        }
346    }
347
348    fn vars(&self) -> Vec<Var> {
349        self.vars.clone()
350    }
351
352    fn cancel(&mut self) {
353        self.cancelled = true;
354        self.left.cancel();
355        if let Some(ref mut right) = self.current_right {
356            right.cancel();
357        }
358    }
359}
360
361/// Union iterator - concatenates multiple iterators
362#[derive(Debug)]
363pub struct QueryIterUnion {
364    iterators: Vec<Box<dyn BindingIterator>>,
365    current_index: usize,
366    vars: Vec<Var>,
367    cancelled: bool,
368}
369
370impl QueryIterUnion {
371    /// Create union iterator
372    pub fn new(iterators: Vec<Box<dyn BindingIterator>>) -> Self {
373        let mut vars = Vec::new();
374        for iter in &iterators {
375            for v in iter.vars() {
376                if !vars.contains(&v) {
377                    vars.push(v);
378                }
379            }
380        }
381
382        Self {
383            iterators,
384            current_index: 0,
385            vars,
386            cancelled: false,
387        }
388    }
389}
390
391impl BindingIterator for QueryIterUnion {
392    fn next_binding(&mut self) -> IterResult {
393        if self.cancelled {
394            return Err(IterError::Cancelled);
395        }
396
397        while self.current_index < self.iterators.len() {
398            match self.iterators[self.current_index].next_binding()? {
399                Some(binding) => return Ok(Some(binding)),
400                None => {
401                    self.current_index += 1;
402                }
403            }
404        }
405
406        Ok(None)
407    }
408
409    fn vars(&self) -> Vec<Var> {
410        self.vars.clone()
411    }
412
413    fn cancel(&mut self) {
414        self.cancelled = true;
415        for iter in &mut self.iterators {
416            iter.cancel();
417        }
418    }
419}
420
421/// Project iterator - selects subset of variables
422#[derive(Debug)]
423pub struct QueryIterProject {
424    upstream: Box<dyn BindingIterator>,
425    project_vars: Vec<Var>,
426    cancelled: bool,
427}
428
429impl QueryIterProject {
430    /// Create project iterator
431    pub fn new(upstream: Box<dyn BindingIterator>, vars: Vec<Var>) -> Self {
432        Self {
433            upstream,
434            project_vars: vars,
435            cancelled: false,
436        }
437    }
438}
439
440impl BindingIterator for QueryIterProject {
441    fn next_binding(&mut self) -> IterResult {
442        if self.cancelled {
443            return Err(IterError::Cancelled);
444        }
445
446        match self.upstream.next_binding()? {
447            Some(binding) => Ok(Some(binding.project(&self.project_vars))),
448            None => Ok(None),
449        }
450    }
451
452    fn vars(&self) -> Vec<Var> {
453        self.project_vars.clone()
454    }
455
456    fn cancel(&mut self) {
457        self.cancelled = true;
458        self.upstream.cancel();
459    }
460}
461
462/// Slice iterator - limit and offset
463#[derive(Debug)]
464pub struct QueryIterSlice {
465    upstream: Box<dyn BindingIterator>,
466    offset: u64,
467    limit: Option<u64>,
468    skipped: u64,
469    returned: u64,
470    cancelled: bool,
471}
472
473impl QueryIterSlice {
474    /// Create slice iterator
475    pub fn new(upstream: Box<dyn BindingIterator>, offset: u64, limit: Option<u64>) -> Self {
476        Self {
477            upstream,
478            offset,
479            limit,
480            skipped: 0,
481            returned: 0,
482            cancelled: false,
483        }
484    }
485
486    /// Create limit-only iterator
487    pub fn limit(upstream: Box<dyn BindingIterator>, limit: u64) -> Self {
488        Self::new(upstream, 0, Some(limit))
489    }
490
491    /// Create offset-only iterator
492    pub fn offset(upstream: Box<dyn BindingIterator>, offset: u64) -> Self {
493        Self::new(upstream, offset, None)
494    }
495}
496
497impl BindingIterator for QueryIterSlice {
498    fn next_binding(&mut self) -> IterResult {
499        if self.cancelled {
500            return Err(IterError::Cancelled);
501        }
502
503        // Check limit
504        if let Some(limit) = self.limit {
505            if self.returned >= limit {
506                return Ok(None);
507            }
508        }
509
510        // Skip offset
511        while self.skipped < self.offset {
512            match self.upstream.next_binding()? {
513                Some(_) => {
514                    self.skipped += 1;
515                }
516                None => return Ok(None),
517            }
518        }
519
520        // Return result
521        match self.upstream.next_binding()? {
522            Some(binding) => {
523                self.returned += 1;
524                Ok(Some(binding))
525            }
526            None => Ok(None),
527        }
528    }
529
530    fn vars(&self) -> Vec<Var> {
531        self.upstream.vars()
532    }
533
534    fn cancel(&mut self) {
535        self.cancelled = true;
536        self.upstream.cancel();
537    }
538}
539
540/// Sort iterator - orders results
541#[derive(Debug)]
542pub struct QueryIterSort {
543    upstream: Box<dyn BindingIterator>,
544    comparators: Vec<SortKey>,
545    sorted: Option<Vec<Binding>>,
546    index: usize,
547    cancelled: bool,
548}
549
550/// Sort key specification
551#[derive(Debug, Clone)]
552pub struct SortKey {
553    /// Variable to sort by
554    pub var: Var,
555    /// Ascending order
556    pub ascending: bool,
557}
558
559impl SortKey {
560    /// Create ascending sort key
561    pub fn asc(var: Var) -> Self {
562        Self {
563            var,
564            ascending: true,
565        }
566    }
567
568    /// Create descending sort key
569    pub fn desc(var: Var) -> Self {
570        Self {
571            var,
572            ascending: false,
573        }
574    }
575}
576
577impl QueryIterSort {
578    /// Create sort iterator
579    pub fn new(upstream: Box<dyn BindingIterator>, comparators: Vec<SortKey>) -> Self {
580        Self {
581            upstream,
582            comparators,
583            sorted: None,
584            index: 0,
585            cancelled: false,
586        }
587    }
588
589    /// Materialize and sort all results
590    fn materialize(&mut self) -> Result<(), IterError> {
591        if self.sorted.is_some() {
592            return Ok(());
593        }
594
595        let mut bindings = Vec::new();
596        while let Some(b) = self.upstream.next_binding()? {
597            bindings.push(b);
598        }
599
600        // Sort by comparators
601        let comparators = self.comparators.clone();
602        bindings.sort_by(|a, b| {
603            for key in &comparators {
604                let a_val = a.get(&key.var);
605                let b_val = b.get(&key.var);
606
607                let ordering = compare_values(a_val, b_val);
608                if ordering != std::cmp::Ordering::Equal {
609                    return if key.ascending {
610                        ordering
611                    } else {
612                        ordering.reverse()
613                    };
614                }
615            }
616            std::cmp::Ordering::Equal
617        });
618
619        self.sorted = Some(bindings);
620        Ok(())
621    }
622}
623
624impl BindingIterator for QueryIterSort {
625    fn next_binding(&mut self) -> IterResult {
626        if self.cancelled {
627            return Err(IterError::Cancelled);
628        }
629
630        self.materialize()?;
631
632        if let Some(ref sorted) = self.sorted {
633            if self.index < sorted.len() {
634                let binding = sorted[self.index].clone();
635                self.index += 1;
636                return Ok(Some(binding));
637            }
638        }
639
640        Ok(None)
641    }
642
643    fn vars(&self) -> Vec<Var> {
644        self.upstream.vars()
645    }
646
647    fn cancel(&mut self) {
648        self.cancelled = true;
649        self.upstream.cancel();
650    }
651}
652
653/// Compare two optional values
654fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
655    match (a, b) {
656        (None, None) => std::cmp::Ordering::Equal,
657        (None, Some(_)) => std::cmp::Ordering::Less,
658        (Some(_), None) => std::cmp::Ordering::Greater,
659        (Some(a_val), Some(b_val)) => compare_value(a_val, b_val),
660    }
661}
662
663/// Compare two values
664fn compare_value(a: &Value, b: &Value) -> std::cmp::Ordering {
665    match (a, b) {
666        (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
667        (Value::Float(a), Value::Float(b)) => a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal),
668        (Value::String(a), Value::String(b)) => a.cmp(b),
669        (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
670        (Value::Node(a), Value::Node(b)) => a.cmp(b),
671        (Value::Edge(a), Value::Edge(b)) => a.cmp(b),
672        (Value::Uri(a), Value::Uri(b)) => a.cmp(b),
673        (Value::Null, Value::Null) => std::cmp::Ordering::Equal,
674        // Cross-type comparison: types differ, so we return a consistent ordering
675        _ => {
676            let type_order = |v: &Value| -> u8 {
677                match v {
678                    Value::Null => 0,
679                    Value::Boolean(_) => 1,
680                    Value::Integer(_) => 2,
681                    Value::Float(_) => 3,
682                    Value::String(_) => 4,
683                    Value::Node(_) => 5,
684                    Value::Edge(_) => 6,
685                    Value::Uri(_) => 7,
686                }
687            };
688            type_order(a).cmp(&type_order(b))
689        }
690    }
691}
692
693/// Distinct iterator - removes duplicates
694#[derive(Debug)]
695pub struct QueryIterDistinct {
696    upstream: Box<dyn BindingIterator>,
697    seen: HashSet<u64>,
698    cancelled: bool,
699}
700
701impl QueryIterDistinct {
702    /// Create distinct iterator
703    pub fn new(upstream: Box<dyn BindingIterator>) -> Self {
704        Self {
705            upstream,
706            seen: HashSet::new(),
707            cancelled: false,
708        }
709    }
710
711    /// Hash a binding for deduplication
712    fn hash_binding(binding: &Binding) -> u64 {
713        use std::hash::{Hash, Hasher};
714        let mut hasher = std::collections::hash_map::DefaultHasher::new();
715        binding.hash(&mut hasher);
716        hasher.finish()
717    }
718}
719
720impl BindingIterator for QueryIterDistinct {
721    fn next_binding(&mut self) -> IterResult {
722        if self.cancelled {
723            return Err(IterError::Cancelled);
724        }
725
726        loop {
727            match self.upstream.next_binding()? {
728                Some(binding) => {
729                    let hash = Self::hash_binding(&binding);
730                    if self.seen.insert(hash) {
731                        return Ok(Some(binding));
732                    }
733                    // Already seen, continue
734                }
735                None => return Ok(None),
736            }
737        }
738    }
739
740    fn vars(&self) -> Vec<Var> {
741        self.upstream.vars()
742    }
743
744    fn cancel(&mut self) {
745        self.cancelled = true;
746        self.upstream.cancel();
747    }
748}
749
750/// Wrapper for boxed iterator to implement Iterator trait
751pub struct QueryIter {
752    inner: Box<dyn BindingIterator>,
753}
754
755impl QueryIter {
756    /// Create from binding iterator
757    pub fn new(inner: Box<dyn BindingIterator>) -> Self {
758        Self { inner }
759    }
760
761    /// Get variables
762    pub fn vars(&self) -> Vec<Var> {
763        self.inner.vars()
764    }
765
766    /// Cancel iteration
767    pub fn cancel(&mut self) {
768        self.inner.cancel();
769    }
770}
771
772impl Iterator for QueryIter {
773    type Item = Result<Binding, IterError>;
774
775    fn next(&mut self) -> Option<Self::Item> {
776        match self.inner.next_binding() {
777            Ok(Some(binding)) => Some(Ok(binding)),
778            Ok(None) => None,
779            Err(e) => Some(Err(e)),
780        }
781    }
782}
783
784#[cfg(test)]
785mod tests {
786    use super::*;
787    use crate::storage::query::engine::binding::BindingBuilder;
788
789    fn make_binding(x: i64) -> Binding {
790        BindingBuilder::new()
791            .add_named("x", Value::Integer(x))
792            .build()
793    }
794
795    #[test]
796    fn test_base_iterator() {
797        let bindings = vec![make_binding(1), make_binding(2), make_binding(3)];
798        let mut iter = QueryIterBase::new(bindings);
799
800        assert!(iter.has_next());
801        assert!(iter.next_binding().unwrap().is_some());
802        assert!(iter.next_binding().unwrap().is_some());
803        assert!(iter.next_binding().unwrap().is_some());
804        assert!(iter.next_binding().unwrap().is_none());
805    }
806
807    #[test]
808    fn test_filter_iterator() {
809        let bindings = vec![make_binding(1), make_binding(2), make_binding(3)];
810        let base = Box::new(QueryIterBase::new(bindings));
811
812        let mut iter = QueryIterFilter::new(base, |b| {
813            b.get(&Var::new("x"))
814                .and_then(|v| v.as_integer())
815                .map(|i| i > 1)
816                .unwrap_or(false)
817        });
818
819        // Should skip 1, return 2 and 3
820        let b1 = iter.next_binding().unwrap().unwrap();
821        assert_eq!(b1.get(&Var::new("x")), Some(&Value::Integer(2)));
822
823        let b2 = iter.next_binding().unwrap().unwrap();
824        assert_eq!(b2.get(&Var::new("x")), Some(&Value::Integer(3)));
825
826        assert!(iter.next_binding().unwrap().is_none());
827    }
828
829    #[test]
830    fn test_slice_iterator() {
831        let bindings: Vec<_> = (1..=10).map(make_binding).collect();
832        let base = Box::new(QueryIterBase::new(bindings));
833
834        // Offset 2, limit 3
835        let mut iter = QueryIterSlice::new(base, 2, Some(3));
836
837        let b1 = iter.next_binding().unwrap().unwrap();
838        assert_eq!(b1.get(&Var::new("x")), Some(&Value::Integer(3)));
839
840        let b2 = iter.next_binding().unwrap().unwrap();
841        assert_eq!(b2.get(&Var::new("x")), Some(&Value::Integer(4)));
842
843        let b3 = iter.next_binding().unwrap().unwrap();
844        assert_eq!(b3.get(&Var::new("x")), Some(&Value::Integer(5)));
845
846        assert!(iter.next_binding().unwrap().is_none());
847    }
848
849    #[test]
850    fn test_project_iterator() {
851        let binding = BindingBuilder::new()
852            .add_named("x", Value::Integer(1))
853            .add_named("y", Value::Integer(2))
854            .add_named("z", Value::Integer(3))
855            .build();
856
857        let base = Box::new(QueryIterBase::single(binding));
858        let mut iter = QueryIterProject::new(base, vec![Var::new("x"), Var::new("z")]);
859
860        let result = iter.next_binding().unwrap().unwrap();
861        assert!(result.contains(&Var::new("x")));
862        assert!(!result.contains(&Var::new("y")));
863        assert!(result.contains(&Var::new("z")));
864    }
865
866    #[test]
867    fn test_union_iterator() {
868        let iter1 = Box::new(QueryIterBase::new(vec![make_binding(1), make_binding(2)]));
869        let iter2 = Box::new(QueryIterBase::new(vec![make_binding(3), make_binding(4)]));
870
871        let mut union = QueryIterUnion::new(vec![iter1, iter2]);
872
873        let mut results = Vec::new();
874        while let Some(b) = union.next_binding().unwrap() {
875            results.push(b.get(&Var::new("x")).unwrap().as_integer().unwrap());
876        }
877
878        assert_eq!(results, vec![1, 2, 3, 4]);
879    }
880
881    #[test]
882    fn test_sort_iterator() {
883        let bindings = vec![make_binding(3), make_binding(1), make_binding(2)];
884        let base = Box::new(QueryIterBase::new(bindings));
885
886        let mut iter = QueryIterSort::new(base, vec![SortKey::asc(Var::new("x"))]);
887
888        let mut results = Vec::new();
889        while let Some(b) = iter.next_binding().unwrap() {
890            results.push(b.get(&Var::new("x")).unwrap().as_integer().unwrap());
891        }
892
893        assert_eq!(results, vec![1, 2, 3]);
894    }
895
896    #[test]
897    fn test_distinct_iterator() {
898        let bindings = vec![
899            make_binding(1),
900            make_binding(2),
901            make_binding(1),
902            make_binding(3),
903            make_binding(2),
904        ];
905        let base = Box::new(QueryIterBase::new(bindings));
906
907        let mut iter = QueryIterDistinct::new(base);
908
909        let mut results = Vec::new();
910        while let Some(b) = iter.next_binding().unwrap() {
911            results.push(b.get(&Var::new("x")).unwrap().as_integer().unwrap());
912        }
913
914        assert_eq!(results, vec![1, 2, 3]);
915    }
916
917    #[test]
918    fn test_cancel_iterator() {
919        let bindings: Vec<_> = (1..=100).map(make_binding).collect();
920        let mut iter = QueryIterBase::new(bindings);
921
922        // Read a few
923        iter.next_binding().unwrap();
924        iter.next_binding().unwrap();
925
926        // Cancel
927        iter.cancel();
928
929        // Should return cancelled error
930        assert!(matches!(iter.next_binding(), Err(IterError::Cancelled)));
931    }
932
933    #[test]
934    fn test_query_iter_wrapper() {
935        let bindings = vec![make_binding(1), make_binding(2)];
936        let base = Box::new(QueryIterBase::new(bindings));
937
938        let iter = QueryIter::new(base);
939        let results: Vec<_> = iter.collect();
940
941        assert_eq!(results.len(), 2);
942        assert!(results.iter().all(|r| r.is_ok()));
943    }
944
945    // ---- Issue #808 / 750d — cooperative shared-token cancellation. ----
946
947    /// An upstream that yields rows forever, so a downstream operator's
948    /// loop only terminates when it observes the shared cancel token. Used
949    /// to pin the latency budget for the cancel signal without depending on
950    /// a finite (and slow-to-build) materialised input.
951    #[derive(Debug)]
952    struct InfiniteScan {
953        n: i64,
954    }
955
956    impl BindingIterator for InfiniteScan {
957        fn next_binding(&mut self) -> IterResult {
958            self.n += 1;
959            Ok(Some(make_binding(self.n)))
960        }
961
962        fn vars(&self) -> Vec<Var> {
963            vec![Var::new("x")]
964        }
965
966        fn cancel(&mut self) {}
967    }
968
969    #[test]
970    fn shared_token_cancels_scan() {
971        let token = CancelToken::new();
972        let bindings: Vec<_> = (1..=1000).map(make_binding).collect();
973        let mut scan = QueryIterBase::new(bindings).with_cancel(token.clone());
974
975        assert!(scan.next_binding().unwrap().is_some());
976        token.cancel();
977        // The scan still has hundreds of rows buffered, but the shared
978        // token short-circuits the next pull.
979        assert!(matches!(scan.next_binding(), Err(IterError::Cancelled)));
980    }
981
982    #[test]
983    fn shared_token_cancels_filter_within_budget() {
984        use std::time::{Duration, Instant};
985
986        let token = CancelToken::new();
987        // Always-false predicate: the filter pulls from an infinite scan
988        // forever, never yielding. Only the shared token can stop it.
989        let upstream = Box::new(InfiniteScan { n: 0 });
990        let mut filter = QueryIterFilter::new(upstream, |_| false).with_cancel(token.clone());
991
992        let canceller = token.clone();
993        std::thread::spawn(move || {
994            std::thread::sleep(Duration::from_millis(20));
995            canceller.cancel();
996        });
997
998        let start = Instant::now();
999        let result = filter.next_binding();
1000        let elapsed = start.elapsed();
1001
1002        assert!(matches!(result, Err(IterError::Cancelled)));
1003        // Soft latency budget: the cancel must take effect well within a
1004        // second even though the filter would otherwise loop forever.
1005        assert!(
1006            elapsed < Duration::from_secs(1),
1007            "cancel took too long to take effect: {elapsed:?}"
1008        );
1009    }
1010
1011    #[test]
1012    fn shared_token_cancels_join_within_budget() {
1013        use std::time::{Duration, Instant};
1014
1015        let token = CancelToken::new();
1016        // Left side never yields a mergeable row (always-false filter over
1017        // an infinite scan), so the join's merge loop spins until the
1018        // shared token stops it.
1019        let left = Box::new(
1020            QueryIterFilter::new(Box::new(InfiniteScan { n: 0 }), |_| false)
1021                .with_cancel(token.clone()),
1022        );
1023        let mut join = QueryIterJoin::new(
1024            left,
1025            || Box::new(QueryIterBase::empty()),
1026            vec![Var::new("y")],
1027        )
1028        .with_cancel(token.clone());
1029
1030        let canceller = token.clone();
1031        std::thread::spawn(move || {
1032            std::thread::sleep(Duration::from_millis(20));
1033            canceller.cancel();
1034        });
1035
1036        let start = Instant::now();
1037        let result = join.next_binding();
1038        let elapsed = start.elapsed();
1039
1040        assert!(matches!(result, Err(IterError::Cancelled)));
1041        assert!(
1042            elapsed < Duration::from_secs(1),
1043            "join cancel took too long: {elapsed:?}"
1044        );
1045    }
1046
1047    #[test]
1048    fn cancellation_propagates_through_aggregate_consumer() {
1049        // An aggregate (e.g. COUNT) drains its upstream to completion. When
1050        // the upstream is cancel-aware, the aggregate's own pull surfaces
1051        // the Cancelled error rather than looping forever — i.e. the cancel
1052        // signal propagates *through* the aggregate without the aggregate
1053        // needing its own token.
1054        let token = CancelToken::new();
1055        let upstream: Box<dyn BindingIterator> = Box::new(
1056            QueryIterFilter::new(Box::new(InfiniteScan { n: 0 }), |_| false)
1057                .with_cancel(token.clone()),
1058        );
1059
1060        // Simulate an aggregate's drain loop.
1061        fn count_all(mut it: Box<dyn BindingIterator>) -> Result<u64, IterError> {
1062            let mut n = 0;
1063            while it.next_binding()?.is_some() {
1064                n += 1;
1065            }
1066            Ok(n)
1067        }
1068
1069        token.cancel();
1070        assert!(matches!(count_all(upstream), Err(IterError::Cancelled)));
1071    }
1072}