alopex_sql/executor/query/
iterator.rs

1//! Iterator-based query execution pipeline.
2//!
3//! This module provides an iterator-based execution model for SQL queries,
4//! enabling streaming execution and reduced memory usage for large datasets.
5//!
6//! # Architecture
7//!
8//! The execution pipeline is built from composable iterators:
9//! - [`ScanIterator`]: Reads rows from storage
10//! - [`FilterIterator`]: Filters rows based on predicates
11//! - [`SortIterator`]: Sorts rows (requires materialization)
12//! - [`LimitIterator`]: Applies LIMIT/OFFSET constraints
13//!
14//! Each iterator implements the [`RowIterator`] trait, allowing them to be
15//! composed into a pipeline that processes rows one at a time.
16
17use std::cmp::Ordering;
18use std::marker::PhantomData;
19
20use crate::catalog::{ColumnMetadata, TableMetadata};
21use crate::executor::evaluator::EvalContext;
22use crate::executor::{ExecutorError, Result, Row};
23use crate::planner::typed_expr::{SortExpr, TypedExpr};
24use crate::storage::{SqlValue, TableScanIterator};
25
26/// A trait for row-producing iterators in the query execution pipeline.
27///
28/// This trait abstracts over different types of iterators (scan, filter, sort, etc.)
29/// allowing them to be composed into execution pipelines.
30pub trait RowIterator {
31    /// Advances the iterator and returns the next row, or `None` if exhausted.
32    ///
33    /// # Errors
34    ///
35    /// Returns an error if the underlying operation fails (e.g., storage errors,
36    /// evaluation errors).
37    fn next_row(&mut self) -> Option<Result<Row>>;
38
39    /// Returns the schema of rows produced by this iterator.
40    fn schema(&self) -> &[ColumnMetadata];
41}
42
43// Implement RowIterator for Box<dyn RowIterator> to allow dynamic dispatch.
44impl RowIterator for Box<dyn RowIterator + '_> {
45    fn next_row(&mut self) -> Option<Result<Row>> {
46        (**self).next_row()
47    }
48
49    fn schema(&self) -> &[ColumnMetadata] {
50        (**self).schema()
51    }
52}
53
54// ============================================================================
55// ScanIterator - Reads rows from storage for true streaming execution
56// ============================================================================
57
58/// Iterator that reads rows from table storage.
59///
60/// This is the leaf node in the iterator tree, providing rows from the
61/// underlying storage layer. Used for FR-7 streaming output compliance.
62pub struct ScanIterator<'a> {
63    inner: TableScanIterator<'a>,
64    schema: Vec<ColumnMetadata>,
65}
66
67impl<'a> ScanIterator<'a> {
68    /// Creates a new scan iterator from a table scan iterator and metadata.
69    pub fn new(inner: TableScanIterator<'a>, table_meta: &TableMetadata) -> Self {
70        Self {
71            inner,
72            schema: table_meta.columns.clone(),
73        }
74    }
75}
76
77impl RowIterator for ScanIterator<'_> {
78    fn next_row(&mut self) -> Option<Result<Row>> {
79        self.inner.next().map(|result| {
80            result
81                .map(|(row_id, values)| Row::new(row_id, values))
82                .map_err(ExecutorError::from)
83        })
84    }
85
86    fn schema(&self) -> &[ColumnMetadata] {
87        &self.schema
88    }
89}
90
91// ============================================================================
92// FilterIterator - Filters rows based on a predicate
93// ============================================================================
94
95/// Iterator that filters rows based on a predicate expression.
96///
97/// Only rows where the predicate evaluates to `true` are yielded.
98/// Rows where the predicate evaluates to `false` or `NULL` are skipped.
99pub struct FilterIterator<I: RowIterator> {
100    input: I,
101    predicate: TypedExpr,
102}
103
104impl<I: RowIterator> FilterIterator<I> {
105    /// Creates a new filter iterator with the given input and predicate.
106    pub fn new(input: I, predicate: TypedExpr) -> Self {
107        Self { input, predicate }
108    }
109}
110
111impl<I: RowIterator> RowIterator for FilterIterator<I> {
112    fn next_row(&mut self) -> Option<Result<Row>> {
113        loop {
114            match self.input.next_row()? {
115                Ok(row) => {
116                    let ctx = EvalContext::new(&row.values);
117                    match crate::executor::evaluator::evaluate(&self.predicate, &ctx) {
118                        Ok(SqlValue::Boolean(true)) => return Some(Ok(row)),
119                        Ok(_) => continue, // false or null - skip this row
120                        Err(e) => return Some(Err(e)),
121                    }
122                }
123                Err(e) => return Some(Err(e)),
124            }
125        }
126    }
127
128    fn schema(&self) -> &[ColumnMetadata] {
129        self.input.schema()
130    }
131}
132
133// ============================================================================
134// SortIterator - Sorts rows (materializes all input)
135// ============================================================================
136
137/// Iterator that sorts rows according to ORDER BY expressions.
138///
139/// **Note**: Sorting requires materializing all input rows into memory.
140/// This iterator collects all rows from its input, sorts them, and then
141/// yields them one at a time.
142pub struct SortIterator<I: RowIterator> {
143    /// Sorted rows ready for iteration.
144    sorted_rows: std::vec::IntoIter<Row>,
145    /// Schema from input.
146    schema: Vec<ColumnMetadata>,
147    /// Marker for input iterator type.
148    _marker: PhantomData<I>,
149}
150
151impl<I: RowIterator> SortIterator<I> {
152    /// Creates a new sort iterator.
153    ///
154    /// This constructor immediately materializes all input rows and sorts them.
155    ///
156    /// # Errors
157    ///
158    /// Returns an error if reading from input fails or if sort key evaluation fails.
159    pub fn new(mut input: I, order_by: &[SortExpr]) -> Result<Self> {
160        let schema = input.schema().to_vec();
161
162        // Collect all rows from input
163        let mut rows = Vec::new();
164        while let Some(result) = input.next_row() {
165            rows.push(result?);
166        }
167
168        if order_by.is_empty() {
169            return Ok(Self {
170                sorted_rows: rows.into_iter(),
171                schema,
172                _marker: PhantomData,
173            });
174        }
175
176        // Precompute sort keys to avoid repeated evaluation during comparisons
177        let mut keyed: Vec<(Row, Vec<SqlValue>)> = Vec::with_capacity(rows.len());
178        for row in rows {
179            let mut keys = Vec::with_capacity(order_by.len());
180            for expr in order_by {
181                let ctx = EvalContext::new(&row.values);
182                keys.push(crate::executor::evaluator::evaluate(&expr.expr, &ctx)?);
183            }
184            keyed.push((row, keys));
185        }
186
187        // Sort by keys
188        keyed.sort_by(|a, b| compare_keys(a, b, order_by));
189
190        let sorted: Vec<Row> = keyed.into_iter().map(|(row, _)| row).collect();
191
192        Ok(Self {
193            sorted_rows: sorted.into_iter(),
194            schema,
195            _marker: PhantomData,
196        })
197    }
198}
199
200impl<I: RowIterator> RowIterator for SortIterator<I> {
201    fn next_row(&mut self) -> Option<Result<Row>> {
202        self.sorted_rows.next().map(Ok)
203    }
204
205    fn schema(&self) -> &[ColumnMetadata] {
206        &self.schema
207    }
208}
209
210/// Compare two rows by their precomputed sort keys.
211fn compare_keys(
212    a: &(Row, Vec<SqlValue>),
213    b: &(Row, Vec<SqlValue>),
214    order_by: &[SortExpr],
215) -> Ordering {
216    for (i, sort_expr) in order_by.iter().enumerate() {
217        let left = &a.1[i];
218        let right = &b.1[i];
219        let cmp = compare_single(left, right, sort_expr.asc, sort_expr.nulls_first);
220        if cmp != Ordering::Equal {
221            return cmp;
222        }
223    }
224    Ordering::Equal
225}
226
227/// Compare two SqlValues according to sort direction and NULL ordering.
228fn compare_single(left: &SqlValue, right: &SqlValue, asc: bool, nulls_first: bool) -> Ordering {
229    match (left, right) {
230        (SqlValue::Null, SqlValue::Null) => Ordering::Equal,
231        (SqlValue::Null, _) => {
232            if nulls_first {
233                Ordering::Less
234            } else {
235                Ordering::Greater
236            }
237        }
238        (_, SqlValue::Null) => {
239            if nulls_first {
240                Ordering::Greater
241            } else {
242                Ordering::Less
243            }
244        }
245        _ => match left.partial_cmp(right).unwrap_or(Ordering::Equal) {
246            Ordering::Equal => Ordering::Equal,
247            ord if asc => ord,
248            ord => ord.reverse(),
249        },
250    }
251}
252
253// ============================================================================
254// LimitIterator - Applies LIMIT and OFFSET
255// ============================================================================
256
257/// Iterator that applies LIMIT and OFFSET constraints.
258///
259/// This iterator skips the first `offset` rows and yields at most `limit` rows.
260/// It provides early termination - once the limit is reached, no more rows
261/// are requested from the input.
262pub struct LimitIterator<I: RowIterator> {
263    input: I,
264    limit: Option<u64>,
265    offset: u64,
266    /// Number of rows skipped so far (for OFFSET).
267    skipped: u64,
268    /// Number of rows yielded so far (for LIMIT).
269    yielded: u64,
270}
271
272impl<I: RowIterator> LimitIterator<I> {
273    /// Creates a new limit iterator with the given LIMIT and OFFSET.
274    pub fn new(input: I, limit: Option<u64>, offset: Option<u64>) -> Self {
275        Self {
276            input,
277            limit,
278            offset: offset.unwrap_or(0),
279            skipped: 0,
280            yielded: 0,
281        }
282    }
283}
284
285impl<I: RowIterator> RowIterator for LimitIterator<I> {
286    fn next_row(&mut self) -> Option<Result<Row>> {
287        // Check if limit already reached
288        if let Some(limit) = self.limit
289            && self.yielded >= limit
290        {
291            return None;
292        }
293
294        loop {
295            match self.input.next_row()? {
296                Ok(row) => {
297                    // Skip rows for OFFSET
298                    if self.skipped < self.offset {
299                        self.skipped += 1;
300                        continue;
301                    }
302
303                    // Check limit again after skipping
304                    if let Some(limit) = self.limit
305                        && self.yielded >= limit
306                    {
307                        return None;
308                    }
309
310                    self.yielded += 1;
311                    return Some(Ok(row));
312                }
313                Err(e) => return Some(Err(e)),
314            }
315        }
316    }
317
318    fn schema(&self) -> &[ColumnMetadata] {
319        self.input.schema()
320    }
321}
322
323// ============================================================================
324// VecIterator - Wraps a Vec<Row> for testing and compatibility
325// ============================================================================
326
327/// Iterator that wraps a `Vec<Row>` for testing and compatibility.
328///
329/// This is useful for converting materialized results back into an iterator
330/// or for testing iterator-based code with fixed data.
331pub struct VecIterator {
332    rows: std::vec::IntoIter<Row>,
333    schema: Vec<ColumnMetadata>,
334}
335
336impl VecIterator {
337    /// Creates a new vec iterator from rows and schema.
338    pub fn new(rows: Vec<Row>, schema: Vec<ColumnMetadata>) -> Self {
339        Self {
340            rows: rows.into_iter(),
341            schema,
342        }
343    }
344}
345
346impl RowIterator for VecIterator {
347    fn next_row(&mut self) -> Option<Result<Row>> {
348        self.rows.next().map(Ok)
349    }
350
351    fn schema(&self) -> &[ColumnMetadata] {
352        &self.schema
353    }
354}
355
356// ============================================================================
357// Tests
358// ============================================================================
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363    use crate::Span;
364    use crate::planner::types::ResolvedType;
365
366    fn sample_schema() -> Vec<ColumnMetadata> {
367        vec![
368            ColumnMetadata::new("id", ResolvedType::Integer),
369            ColumnMetadata::new("name", ResolvedType::Text),
370        ]
371    }
372
373    fn sample_rows() -> Vec<Row> {
374        vec![
375            Row::new(
376                1,
377                vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
378            ),
379            Row::new(2, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
380            Row::new(
381                3,
382                vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
383            ),
384            Row::new(4, vec![SqlValue::Integer(4), SqlValue::Text("dave".into())]),
385            Row::new(5, vec![SqlValue::Integer(5), SqlValue::Text("eve".into())]),
386        ]
387    }
388
389    #[test]
390    fn vec_iterator_returns_all_rows() {
391        let rows = sample_rows();
392        let expected_len = rows.len();
393        let mut iter = VecIterator::new(rows, sample_schema());
394
395        let mut count = 0;
396        while let Some(Ok(_)) = iter.next_row() {
397            count += 1;
398        }
399        assert_eq!(count, expected_len);
400    }
401
402    #[test]
403    fn filter_iterator_filters_rows() {
404        use crate::ast::expr::BinaryOp;
405        use crate::planner::typed_expr::{TypedExpr, TypedExprKind};
406
407        let rows = sample_rows();
408        let schema = sample_schema();
409        let input = VecIterator::new(rows, schema);
410
411        // Filter: id > 2
412        let predicate = TypedExpr {
413            kind: TypedExprKind::BinaryOp {
414                left: Box::new(TypedExpr {
415                    kind: TypedExprKind::ColumnRef {
416                        table: "test".into(),
417                        column: "id".into(),
418                        column_index: 0,
419                    },
420                    resolved_type: ResolvedType::Integer,
421                    span: Span::default(),
422                }),
423                op: BinaryOp::Gt,
424                right: Box::new(TypedExpr::literal(
425                    crate::ast::expr::Literal::Number("2".into()),
426                    ResolvedType::Integer,
427                    Span::default(),
428                )),
429            },
430            resolved_type: ResolvedType::Boolean,
431            span: Span::default(),
432        };
433
434        let mut filter = FilterIterator::new(input, predicate);
435
436        let mut results = Vec::new();
437        while let Some(Ok(row)) = filter.next_row() {
438            results.push(row);
439        }
440
441        assert_eq!(results.len(), 3);
442        assert_eq!(results[0].row_id, 3);
443        assert_eq!(results[1].row_id, 4);
444        assert_eq!(results[2].row_id, 5);
445    }
446
447    #[test]
448    fn limit_iterator_limits_rows() {
449        let rows = sample_rows();
450        let schema = sample_schema();
451        let input = VecIterator::new(rows, schema);
452
453        let mut limit = LimitIterator::new(input, Some(2), None);
454
455        let mut results = Vec::new();
456        while let Some(Ok(row)) = limit.next_row() {
457            results.push(row);
458        }
459
460        assert_eq!(results.len(), 2);
461        assert_eq!(results[0].row_id, 1);
462        assert_eq!(results[1].row_id, 2);
463    }
464
465    #[test]
466    fn limit_iterator_applies_offset() {
467        let rows = sample_rows();
468        let schema = sample_schema();
469        let input = VecIterator::new(rows, schema);
470
471        let mut limit = LimitIterator::new(input, Some(2), Some(2));
472
473        let mut results = Vec::new();
474        while let Some(Ok(row)) = limit.next_row() {
475            results.push(row);
476        }
477
478        assert_eq!(results.len(), 2);
479        assert_eq!(results[0].row_id, 3);
480        assert_eq!(results[1].row_id, 4);
481    }
482
483    #[test]
484    fn limit_iterator_offset_only() {
485        let rows = sample_rows();
486        let schema = sample_schema();
487        let input = VecIterator::new(rows, schema);
488
489        let mut limit = LimitIterator::new(input, None, Some(3));
490
491        let mut results = Vec::new();
492        while let Some(Ok(row)) = limit.next_row() {
493            results.push(row);
494        }
495
496        assert_eq!(results.len(), 2);
497        assert_eq!(results[0].row_id, 4);
498        assert_eq!(results[1].row_id, 5);
499    }
500
501    #[test]
502    fn sort_iterator_sorts_rows() {
503        use crate::planner::typed_expr::{SortExpr, TypedExpr, TypedExprKind};
504
505        let rows = vec![
506            Row::new(
507                1,
508                vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
509            ),
510            Row::new(
511                2,
512                vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
513            ),
514            Row::new(3, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
515        ];
516        let schema = sample_schema();
517        let input = VecIterator::new(rows, schema);
518
519        // Sort by id ASC
520        let order_by = vec![SortExpr {
521            expr: TypedExpr {
522                kind: TypedExprKind::ColumnRef {
523                    table: "test".into(),
524                    column: "id".into(),
525                    column_index: 0,
526                },
527                resolved_type: ResolvedType::Integer,
528                span: Span::default(),
529            },
530            asc: true,
531            nulls_first: false,
532        }];
533
534        let mut sort = SortIterator::new(input, &order_by).unwrap();
535
536        let mut results = Vec::new();
537        while let Some(Ok(row)) = sort.next_row() {
538            results.push(row);
539        }
540
541        assert_eq!(results.len(), 3);
542        assert_eq!(results[0].values[0], SqlValue::Integer(1));
543        assert_eq!(results[1].values[0], SqlValue::Integer(2));
544        assert_eq!(results[2].values[0], SqlValue::Integer(3));
545    }
546
547    #[test]
548    fn sort_iterator_sorts_descending() {
549        use crate::planner::typed_expr::{SortExpr, TypedExpr, TypedExprKind};
550
551        let rows = vec![
552            Row::new(
553                1,
554                vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
555            ),
556            Row::new(
557                2,
558                vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
559            ),
560            Row::new(3, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
561        ];
562        let schema = sample_schema();
563        let input = VecIterator::new(rows, schema);
564
565        // Sort by id DESC
566        let order_by = vec![SortExpr {
567            expr: TypedExpr {
568                kind: TypedExprKind::ColumnRef {
569                    table: "test".into(),
570                    column: "id".into(),
571                    column_index: 0,
572                },
573                resolved_type: ResolvedType::Integer,
574                span: Span::default(),
575            },
576            asc: false,
577            nulls_first: false,
578        }];
579
580        let mut sort = SortIterator::new(input, &order_by).unwrap();
581
582        let mut results = Vec::new();
583        while let Some(Ok(row)) = sort.next_row() {
584            results.push(row);
585        }
586
587        assert_eq!(results.len(), 3);
588        assert_eq!(results[0].values[0], SqlValue::Integer(3));
589        assert_eq!(results[1].values[0], SqlValue::Integer(2));
590        assert_eq!(results[2].values[0], SqlValue::Integer(1));
591    }
592
593    #[test]
594    fn composed_pipeline_filter_then_limit() {
595        use crate::ast::expr::BinaryOp;
596        use crate::planner::typed_expr::{TypedExpr, TypedExprKind};
597
598        let rows = sample_rows();
599        let schema = sample_schema();
600        let input = VecIterator::new(rows, schema);
601
602        // Filter: id > 1
603        let predicate = TypedExpr {
604            kind: TypedExprKind::BinaryOp {
605                left: Box::new(TypedExpr {
606                    kind: TypedExprKind::ColumnRef {
607                        table: "test".into(),
608                        column: "id".into(),
609                        column_index: 0,
610                    },
611                    resolved_type: ResolvedType::Integer,
612                    span: Span::default(),
613                }),
614                op: BinaryOp::Gt,
615                right: Box::new(TypedExpr::literal(
616                    crate::ast::expr::Literal::Number("1".into()),
617                    ResolvedType::Integer,
618                    Span::default(),
619                )),
620            },
621            resolved_type: ResolvedType::Boolean,
622            span: Span::default(),
623        };
624
625        let filtered = FilterIterator::new(input, predicate);
626        let mut limited = LimitIterator::new(filtered, Some(2), None);
627
628        let mut results = Vec::new();
629        while let Some(Ok(row)) = limited.next_row() {
630            results.push(row);
631        }
632
633        // Should get rows 2, 3 (id > 1, then limit 2)
634        assert_eq!(results.len(), 2);
635        assert_eq!(results[0].row_id, 2);
636        assert_eq!(results[1].row_id, 3);
637    }
638}