alopex_sql/executor/query/
iterator.rs

1//! Iterator-based query execution pipeline.
2//!
3//! This module provides an iterator-based execution model for SQL queries,
4//! enabling streaming execution and reduced memory usage for large datasets.
5//!
6//! # Architecture
7//!
8//! The execution pipeline is built from composable iterators:
9//! - [`ScanIterator`]: Reads rows from storage
10//! - [`FilterIterator`]: Filters rows based on predicates
11//! - [`SortIterator`]: Sorts rows (requires materialization)
12//! - [`LimitIterator`]: Applies LIMIT/OFFSET constraints
13//!
14//! Each iterator implements the [`RowIterator`] trait, allowing them to be
15//! composed into a pipeline that processes rows one at a time.
16
17use std::cmp::Ordering;
18use std::marker::PhantomData;
19
20use crate::catalog::{ColumnMetadata, TableMetadata};
21use crate::executor::evaluator::EvalContext;
22use crate::executor::{ExecutorError, Result, Row};
23use crate::planner::typed_expr::{SortExpr, TypedExpr};
24use crate::storage::{SqlValue, TableScanIterator};
25
26/// A trait for row-producing iterators in the query execution pipeline.
27///
28/// This trait abstracts over different types of iterators (scan, filter, sort, etc.)
29/// allowing them to be composed into execution pipelines.
30pub trait RowIterator {
31    /// Advances the iterator and returns the next row, or `None` if exhausted.
32    ///
33    /// # Errors
34    ///
35    /// Returns an error if the underlying operation fails (e.g., storage errors,
36    /// evaluation errors).
37    fn next_row(&mut self) -> Option<Result<Row>>;
38
39    /// Returns the schema of rows produced by this iterator.
40    fn schema(&self) -> &[ColumnMetadata];
41}
42
43// Implement RowIterator for Box<dyn RowIterator> to allow dynamic dispatch.
44impl RowIterator for Box<dyn RowIterator + '_> {
45    fn next_row(&mut self) -> Option<Result<Row>> {
46        (**self).next_row()
47    }
48
49    fn schema(&self) -> &[ColumnMetadata] {
50        (**self).schema()
51    }
52}
53
54// ============================================================================
55// ScanIterator - Reads rows from storage (reserved for future streaming)
56// ============================================================================
57
58/// Iterator that reads rows from table storage.
59///
60/// This is the leaf node in the iterator tree, providing rows from the
61/// underlying storage layer.
62///
63/// **Note**: Currently unused due to lifetime constraints with SqlTransaction.
64/// Reserved for future true streaming execution when those constraints are resolved.
65/// The current implementation uses VecIterator after collecting all rows from scan.
66#[allow(dead_code)]
67pub struct ScanIterator<'a> {
68    inner: TableScanIterator<'a>,
69    schema: Vec<ColumnMetadata>,
70}
71
72#[allow(dead_code)]
73impl<'a> ScanIterator<'a> {
74    /// Creates a new scan iterator from a table scan iterator and metadata.
75    pub fn new(inner: TableScanIterator<'a>, table_meta: &TableMetadata) -> Self {
76        Self {
77            inner,
78            schema: table_meta.columns.clone(),
79        }
80    }
81}
82
83impl RowIterator for ScanIterator<'_> {
84    fn next_row(&mut self) -> Option<Result<Row>> {
85        self.inner.next().map(|result| {
86            result
87                .map(|(row_id, values)| Row::new(row_id, values))
88                .map_err(ExecutorError::from)
89        })
90    }
91
92    fn schema(&self) -> &[ColumnMetadata] {
93        &self.schema
94    }
95}
96
97// ============================================================================
98// FilterIterator - Filters rows based on a predicate
99// ============================================================================
100
101/// Iterator that filters rows based on a predicate expression.
102///
103/// Only rows where the predicate evaluates to `true` are yielded.
104/// Rows where the predicate evaluates to `false` or `NULL` are skipped.
105pub struct FilterIterator<I: RowIterator> {
106    input: I,
107    predicate: TypedExpr,
108}
109
110impl<I: RowIterator> FilterIterator<I> {
111    /// Creates a new filter iterator with the given input and predicate.
112    pub fn new(input: I, predicate: TypedExpr) -> Self {
113        Self { input, predicate }
114    }
115}
116
117impl<I: RowIterator> RowIterator for FilterIterator<I> {
118    fn next_row(&mut self) -> Option<Result<Row>> {
119        loop {
120            match self.input.next_row()? {
121                Ok(row) => {
122                    let ctx = EvalContext::new(&row.values);
123                    match crate::executor::evaluator::evaluate(&self.predicate, &ctx) {
124                        Ok(SqlValue::Boolean(true)) => return Some(Ok(row)),
125                        Ok(_) => continue, // false or null - skip this row
126                        Err(e) => return Some(Err(e)),
127                    }
128                }
129                Err(e) => return Some(Err(e)),
130            }
131        }
132    }
133
134    fn schema(&self) -> &[ColumnMetadata] {
135        self.input.schema()
136    }
137}
138
139// ============================================================================
140// SortIterator - Sorts rows (materializes all input)
141// ============================================================================
142
143/// Iterator that sorts rows according to ORDER BY expressions.
144///
145/// **Note**: Sorting requires materializing all input rows into memory.
146/// This iterator collects all rows from its input, sorts them, and then
147/// yields them one at a time.
148pub struct SortIterator<I: RowIterator> {
149    /// Sorted rows ready for iteration.
150    sorted_rows: std::vec::IntoIter<Row>,
151    /// Schema from input.
152    schema: Vec<ColumnMetadata>,
153    /// Marker for input iterator type.
154    _marker: PhantomData<I>,
155}
156
157impl<I: RowIterator> SortIterator<I> {
158    /// Creates a new sort iterator.
159    ///
160    /// This constructor immediately materializes all input rows and sorts them.
161    ///
162    /// # Errors
163    ///
164    /// Returns an error if reading from input fails or if sort key evaluation fails.
165    pub fn new(mut input: I, order_by: &[SortExpr]) -> Result<Self> {
166        let schema = input.schema().to_vec();
167
168        // Collect all rows from input
169        let mut rows = Vec::new();
170        while let Some(result) = input.next_row() {
171            rows.push(result?);
172        }
173
174        if order_by.is_empty() {
175            return Ok(Self {
176                sorted_rows: rows.into_iter(),
177                schema,
178                _marker: PhantomData,
179            });
180        }
181
182        // Precompute sort keys to avoid repeated evaluation during comparisons
183        let mut keyed: Vec<(Row, Vec<SqlValue>)> = Vec::with_capacity(rows.len());
184        for row in rows {
185            let mut keys = Vec::with_capacity(order_by.len());
186            for expr in order_by {
187                let ctx = EvalContext::new(&row.values);
188                keys.push(crate::executor::evaluator::evaluate(&expr.expr, &ctx)?);
189            }
190            keyed.push((row, keys));
191        }
192
193        // Sort by keys
194        keyed.sort_by(|a, b| compare_keys(a, b, order_by));
195
196        let sorted: Vec<Row> = keyed.into_iter().map(|(row, _)| row).collect();
197
198        Ok(Self {
199            sorted_rows: sorted.into_iter(),
200            schema,
201            _marker: PhantomData,
202        })
203    }
204}
205
206impl<I: RowIterator> RowIterator for SortIterator<I> {
207    fn next_row(&mut self) -> Option<Result<Row>> {
208        self.sorted_rows.next().map(Ok)
209    }
210
211    fn schema(&self) -> &[ColumnMetadata] {
212        &self.schema
213    }
214}
215
216/// Compare two rows by their precomputed sort keys.
217fn compare_keys(
218    a: &(Row, Vec<SqlValue>),
219    b: &(Row, Vec<SqlValue>),
220    order_by: &[SortExpr],
221) -> Ordering {
222    for (i, sort_expr) in order_by.iter().enumerate() {
223        let left = &a.1[i];
224        let right = &b.1[i];
225        let cmp = compare_single(left, right, sort_expr.asc, sort_expr.nulls_first);
226        if cmp != Ordering::Equal {
227            return cmp;
228        }
229    }
230    Ordering::Equal
231}
232
233/// Compare two SqlValues according to sort direction and NULL ordering.
234fn compare_single(left: &SqlValue, right: &SqlValue, asc: bool, nulls_first: bool) -> Ordering {
235    match (left, right) {
236        (SqlValue::Null, SqlValue::Null) => Ordering::Equal,
237        (SqlValue::Null, _) => {
238            if nulls_first {
239                Ordering::Less
240            } else {
241                Ordering::Greater
242            }
243        }
244        (_, SqlValue::Null) => {
245            if nulls_first {
246                Ordering::Greater
247            } else {
248                Ordering::Less
249            }
250        }
251        _ => match left.partial_cmp(right).unwrap_or(Ordering::Equal) {
252            Ordering::Equal => Ordering::Equal,
253            ord if asc => ord,
254            ord => ord.reverse(),
255        },
256    }
257}
258
259// ============================================================================
260// LimitIterator - Applies LIMIT and OFFSET
261// ============================================================================
262
263/// Iterator that applies LIMIT and OFFSET constraints.
264///
265/// This iterator skips the first `offset` rows and yields at most `limit` rows.
266/// It provides early termination - once the limit is reached, no more rows
267/// are requested from the input.
268pub struct LimitIterator<I: RowIterator> {
269    input: I,
270    limit: Option<u64>,
271    offset: u64,
272    /// Number of rows skipped so far (for OFFSET).
273    skipped: u64,
274    /// Number of rows yielded so far (for LIMIT).
275    yielded: u64,
276}
277
278impl<I: RowIterator> LimitIterator<I> {
279    /// Creates a new limit iterator with the given LIMIT and OFFSET.
280    pub fn new(input: I, limit: Option<u64>, offset: Option<u64>) -> Self {
281        Self {
282            input,
283            limit,
284            offset: offset.unwrap_or(0),
285            skipped: 0,
286            yielded: 0,
287        }
288    }
289}
290
291impl<I: RowIterator> RowIterator for LimitIterator<I> {
292    fn next_row(&mut self) -> Option<Result<Row>> {
293        // Check if limit already reached
294        if let Some(limit) = self.limit
295            && self.yielded >= limit
296        {
297            return None;
298        }
299
300        loop {
301            match self.input.next_row()? {
302                Ok(row) => {
303                    // Skip rows for OFFSET
304                    if self.skipped < self.offset {
305                        self.skipped += 1;
306                        continue;
307                    }
308
309                    // Check limit again after skipping
310                    if let Some(limit) = self.limit
311                        && self.yielded >= limit
312                    {
313                        return None;
314                    }
315
316                    self.yielded += 1;
317                    return Some(Ok(row));
318                }
319                Err(e) => return Some(Err(e)),
320            }
321        }
322    }
323
324    fn schema(&self) -> &[ColumnMetadata] {
325        self.input.schema()
326    }
327}
328
329// ============================================================================
330// VecIterator - Wraps a Vec<Row> for testing and compatibility
331// ============================================================================
332
333/// Iterator that wraps a `Vec<Row>` for testing and compatibility.
334///
335/// This is useful for converting materialized results back into an iterator
336/// or for testing iterator-based code with fixed data.
337pub struct VecIterator {
338    rows: std::vec::IntoIter<Row>,
339    schema: Vec<ColumnMetadata>,
340}
341
342impl VecIterator {
343    /// Creates a new vec iterator from rows and schema.
344    pub fn new(rows: Vec<Row>, schema: Vec<ColumnMetadata>) -> Self {
345        Self {
346            rows: rows.into_iter(),
347            schema,
348        }
349    }
350}
351
352impl RowIterator for VecIterator {
353    fn next_row(&mut self) -> Option<Result<Row>> {
354        self.rows.next().map(Ok)
355    }
356
357    fn schema(&self) -> &[ColumnMetadata] {
358        &self.schema
359    }
360}
361
362// ============================================================================
363// Tests
364// ============================================================================
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use crate::Span;
370    use crate::planner::types::ResolvedType;
371
372    fn sample_schema() -> Vec<ColumnMetadata> {
373        vec![
374            ColumnMetadata::new("id", ResolvedType::Integer),
375            ColumnMetadata::new("name", ResolvedType::Text),
376        ]
377    }
378
379    fn sample_rows() -> Vec<Row> {
380        vec![
381            Row::new(
382                1,
383                vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
384            ),
385            Row::new(2, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
386            Row::new(
387                3,
388                vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
389            ),
390            Row::new(4, vec![SqlValue::Integer(4), SqlValue::Text("dave".into())]),
391            Row::new(5, vec![SqlValue::Integer(5), SqlValue::Text("eve".into())]),
392        ]
393    }
394
395    #[test]
396    fn vec_iterator_returns_all_rows() {
397        let rows = sample_rows();
398        let expected_len = rows.len();
399        let mut iter = VecIterator::new(rows, sample_schema());
400
401        let mut count = 0;
402        while let Some(Ok(_)) = iter.next_row() {
403            count += 1;
404        }
405        assert_eq!(count, expected_len);
406    }
407
408    #[test]
409    fn filter_iterator_filters_rows() {
410        use crate::ast::expr::BinaryOp;
411        use crate::planner::typed_expr::{TypedExpr, TypedExprKind};
412
413        let rows = sample_rows();
414        let schema = sample_schema();
415        let input = VecIterator::new(rows, schema);
416
417        // Filter: id > 2
418        let predicate = TypedExpr {
419            kind: TypedExprKind::BinaryOp {
420                left: Box::new(TypedExpr {
421                    kind: TypedExprKind::ColumnRef {
422                        table: "test".into(),
423                        column: "id".into(),
424                        column_index: 0,
425                    },
426                    resolved_type: ResolvedType::Integer,
427                    span: Span::default(),
428                }),
429                op: BinaryOp::Gt,
430                right: Box::new(TypedExpr::literal(
431                    crate::ast::expr::Literal::Number("2".into()),
432                    ResolvedType::Integer,
433                    Span::default(),
434                )),
435            },
436            resolved_type: ResolvedType::Boolean,
437            span: Span::default(),
438        };
439
440        let mut filter = FilterIterator::new(input, predicate);
441
442        let mut results = Vec::new();
443        while let Some(Ok(row)) = filter.next_row() {
444            results.push(row);
445        }
446
447        assert_eq!(results.len(), 3);
448        assert_eq!(results[0].row_id, 3);
449        assert_eq!(results[1].row_id, 4);
450        assert_eq!(results[2].row_id, 5);
451    }
452
453    #[test]
454    fn limit_iterator_limits_rows() {
455        let rows = sample_rows();
456        let schema = sample_schema();
457        let input = VecIterator::new(rows, schema);
458
459        let mut limit = LimitIterator::new(input, Some(2), None);
460
461        let mut results = Vec::new();
462        while let Some(Ok(row)) = limit.next_row() {
463            results.push(row);
464        }
465
466        assert_eq!(results.len(), 2);
467        assert_eq!(results[0].row_id, 1);
468        assert_eq!(results[1].row_id, 2);
469    }
470
471    #[test]
472    fn limit_iterator_applies_offset() {
473        let rows = sample_rows();
474        let schema = sample_schema();
475        let input = VecIterator::new(rows, schema);
476
477        let mut limit = LimitIterator::new(input, Some(2), Some(2));
478
479        let mut results = Vec::new();
480        while let Some(Ok(row)) = limit.next_row() {
481            results.push(row);
482        }
483
484        assert_eq!(results.len(), 2);
485        assert_eq!(results[0].row_id, 3);
486        assert_eq!(results[1].row_id, 4);
487    }
488
489    #[test]
490    fn limit_iterator_offset_only() {
491        let rows = sample_rows();
492        let schema = sample_schema();
493        let input = VecIterator::new(rows, schema);
494
495        let mut limit = LimitIterator::new(input, None, Some(3));
496
497        let mut results = Vec::new();
498        while let Some(Ok(row)) = limit.next_row() {
499            results.push(row);
500        }
501
502        assert_eq!(results.len(), 2);
503        assert_eq!(results[0].row_id, 4);
504        assert_eq!(results[1].row_id, 5);
505    }
506
507    #[test]
508    fn sort_iterator_sorts_rows() {
509        use crate::planner::typed_expr::{SortExpr, TypedExpr, TypedExprKind};
510
511        let rows = vec![
512            Row::new(
513                1,
514                vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
515            ),
516            Row::new(
517                2,
518                vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
519            ),
520            Row::new(3, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
521        ];
522        let schema = sample_schema();
523        let input = VecIterator::new(rows, schema);
524
525        // Sort by id ASC
526        let order_by = vec![SortExpr {
527            expr: TypedExpr {
528                kind: TypedExprKind::ColumnRef {
529                    table: "test".into(),
530                    column: "id".into(),
531                    column_index: 0,
532                },
533                resolved_type: ResolvedType::Integer,
534                span: Span::default(),
535            },
536            asc: true,
537            nulls_first: false,
538        }];
539
540        let mut sort = SortIterator::new(input, &order_by).unwrap();
541
542        let mut results = Vec::new();
543        while let Some(Ok(row)) = sort.next_row() {
544            results.push(row);
545        }
546
547        assert_eq!(results.len(), 3);
548        assert_eq!(results[0].values[0], SqlValue::Integer(1));
549        assert_eq!(results[1].values[0], SqlValue::Integer(2));
550        assert_eq!(results[2].values[0], SqlValue::Integer(3));
551    }
552
553    #[test]
554    fn sort_iterator_sorts_descending() {
555        use crate::planner::typed_expr::{SortExpr, TypedExpr, TypedExprKind};
556
557        let rows = vec![
558            Row::new(
559                1,
560                vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
561            ),
562            Row::new(
563                2,
564                vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
565            ),
566            Row::new(3, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
567        ];
568        let schema = sample_schema();
569        let input = VecIterator::new(rows, schema);
570
571        // Sort by id DESC
572        let order_by = vec![SortExpr {
573            expr: TypedExpr {
574                kind: TypedExprKind::ColumnRef {
575                    table: "test".into(),
576                    column: "id".into(),
577                    column_index: 0,
578                },
579                resolved_type: ResolvedType::Integer,
580                span: Span::default(),
581            },
582            asc: false,
583            nulls_first: false,
584        }];
585
586        let mut sort = SortIterator::new(input, &order_by).unwrap();
587
588        let mut results = Vec::new();
589        while let Some(Ok(row)) = sort.next_row() {
590            results.push(row);
591        }
592
593        assert_eq!(results.len(), 3);
594        assert_eq!(results[0].values[0], SqlValue::Integer(3));
595        assert_eq!(results[1].values[0], SqlValue::Integer(2));
596        assert_eq!(results[2].values[0], SqlValue::Integer(1));
597    }
598
599    #[test]
600    fn composed_pipeline_filter_then_limit() {
601        use crate::ast::expr::BinaryOp;
602        use crate::planner::typed_expr::{TypedExpr, TypedExprKind};
603
604        let rows = sample_rows();
605        let schema = sample_schema();
606        let input = VecIterator::new(rows, schema);
607
608        // Filter: id > 1
609        let predicate = TypedExpr {
610            kind: TypedExprKind::BinaryOp {
611                left: Box::new(TypedExpr {
612                    kind: TypedExprKind::ColumnRef {
613                        table: "test".into(),
614                        column: "id".into(),
615                        column_index: 0,
616                    },
617                    resolved_type: ResolvedType::Integer,
618                    span: Span::default(),
619                }),
620                op: BinaryOp::Gt,
621                right: Box::new(TypedExpr::literal(
622                    crate::ast::expr::Literal::Number("1".into()),
623                    ResolvedType::Integer,
624                    Span::default(),
625                )),
626            },
627            resolved_type: ResolvedType::Boolean,
628            span: Span::default(),
629        };
630
631        let filtered = FilterIterator::new(input, predicate);
632        let mut limited = LimitIterator::new(filtered, Some(2), None);
633
634        let mut results = Vec::new();
635        while let Some(Ok(row)) = limited.next_row() {
636            results.push(row);
637        }
638
639        // Should get rows 2, 3 (id > 1, then limit 2)
640        assert_eq!(results.len(), 2);
641        assert_eq!(results[0].row_id, 2);
642        assert_eq!(results[1].row_id, 3);
643    }
644}