Skip to main content

alopex_sql/executor/query/
iterator.rs

1//! Iterator-based query execution pipeline.
2//!
3//! This module provides an iterator-based execution model for SQL queries,
4//! enabling streaming execution and reduced memory usage for large datasets.
5//!
6//! # Architecture
7//!
8//! The execution pipeline is built from composable iterators:
9//! - [`ScanIterator`]: Reads rows from storage
10//! - [`FilterIterator`]: Filters rows based on predicates
11//! - [`SortIterator`]: Sorts rows (requires materialization)
12//! - [`LimitIterator`]: Applies LIMIT/OFFSET constraints
13//!
14//! Each iterator implements the [`RowIterator`] trait, allowing them to be
15//! composed into a pipeline that processes rows one at a time.
16
17use std::cmp::Ordering;
18use std::collections::BinaryHeap;
19use std::fs::{self, File, OpenOptions};
20use std::io::{BufReader, BufWriter, Read, Write};
21use std::marker::PhantomData;
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
25
26use crate::catalog::{ColumnMetadata, TableMetadata};
27use crate::executor::evaluator::EvalContext;
28use crate::executor::memory::{MemoryPolicy, MemoryTracker};
29use crate::executor::{ExecutorError, Result, Row};
30use crate::planner::typed_expr::{SortExpr, TypedExpr};
31use crate::storage::{RowCodec, SqlValue, TableScanIterator};
32
33/// A trait for row-producing iterators in the query execution pipeline.
34///
35/// This trait abstracts over different types of iterators (scan, filter, sort, etc.)
36/// allowing them to be composed into execution pipelines.
37pub trait RowIterator {
38    /// Advances the iterator and returns the next row, or `None` if exhausted.
39    ///
40    /// # Errors
41    ///
42    /// Returns an error if the underlying operation fails (e.g., storage errors,
43    /// evaluation errors).
44    fn next_row(&mut self) -> Option<Result<Row>>;
45
46    /// Returns the schema of rows produced by this iterator.
47    fn schema(&self) -> &[ColumnMetadata];
48}
49
50// Implement RowIterator for Box<dyn RowIterator> to allow dynamic dispatch.
51impl RowIterator for Box<dyn RowIterator + '_> {
52    fn next_row(&mut self) -> Option<Result<Row>> {
53        (**self).next_row()
54    }
55
56    fn schema(&self) -> &[ColumnMetadata] {
57        (**self).schema()
58    }
59}
60
61// ============================================================================
62// ScanIterator - Reads rows from storage for true streaming execution
63// ============================================================================
64
65/// Iterator that reads rows from table storage.
66///
67/// This is the leaf node in the iterator tree, providing rows from the
68/// underlying storage layer. Used for FR-7 streaming output compliance.
69pub struct ScanIterator<'a> {
70    inner: TableScanIterator<'a>,
71    schema: Vec<ColumnMetadata>,
72}
73
74impl<'a> ScanIterator<'a> {
75    /// Creates a new scan iterator from a table scan iterator and metadata.
76    pub fn new(inner: TableScanIterator<'a>, table_meta: &TableMetadata) -> Self {
77        Self {
78            inner,
79            schema: table_meta.columns.clone(),
80        }
81    }
82}
83
84impl RowIterator for ScanIterator<'_> {
85    fn next_row(&mut self) -> Option<Result<Row>> {
86        self.inner.next().map(|result| {
87            result
88                .map(|(row_id, values)| Row::new(row_id, values))
89                .map_err(ExecutorError::from)
90        })
91    }
92
93    fn schema(&self) -> &[ColumnMetadata] {
94        &self.schema
95    }
96}
97
98// ============================================================================
99// FilterIterator - Filters rows based on a predicate
100// ============================================================================
101
102/// Iterator that filters rows based on a predicate expression.
103///
104/// Only rows where the predicate evaluates to `true` are yielded.
105/// Rows where the predicate evaluates to `false` or `NULL` are skipped.
106pub struct FilterIterator<I: RowIterator> {
107    input: I,
108    predicate: TypedExpr,
109}
110
111impl<I: RowIterator> FilterIterator<I> {
112    /// Creates a new filter iterator with the given input and predicate.
113    pub fn new(input: I, predicate: TypedExpr) -> Self {
114        Self { input, predicate }
115    }
116}
117
118impl<I: RowIterator> RowIterator for FilterIterator<I> {
119    fn next_row(&mut self) -> Option<Result<Row>> {
120        loop {
121            match self.input.next_row()? {
122                Ok(row) => {
123                    let ctx = EvalContext::new(&row.values);
124                    match crate::executor::evaluator::evaluate(&self.predicate, &ctx) {
125                        Ok(SqlValue::Boolean(true)) => return Some(Ok(row)),
126                        Ok(_) => continue, // false or null - skip this row
127                        Err(e) => return Some(Err(e)),
128                    }
129                }
130                Err(e) => return Some(Err(e)),
131            }
132        }
133    }
134
135    fn schema(&self) -> &[ColumnMetadata] {
136        self.input.schema()
137    }
138}
139
140// ============================================================================
141// SortIterator - Sorts rows (materializes all input)
142// ============================================================================
143
144/// Iterator that sorts rows according to ORDER BY expressions.
145///
146/// **Note**: Sorting requires materializing all input rows into memory.
147/// This iterator collects all rows from its input, sorts them, and then
148/// yields them one at a time.
149pub struct SortIterator<I: RowIterator> {
150    output: SortOutput,
151    /// Schema from input.
152    schema: Vec<ColumnMetadata>,
153    /// Marker for input iterator type.
154    _marker: PhantomData<I>,
155}
156
157enum SortOutput {
158    InMemory(std::vec::IntoIter<Row>),
159    External(ExternalSortState),
160}
161
162impl<I: RowIterator> SortIterator<I> {
163    /// Creates a new sort iterator.
164    ///
165    /// This constructor immediately materializes all input rows and sorts them.
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if reading from input fails or if sort key evaluation fails.
170    pub fn new(input: I, order_by: &[SortExpr]) -> Result<Self> {
171        Self::new_with_policy(input, order_by, None)
172    }
173
174    /// Creates a new sort iterator with an optional memory policy.
175    pub fn new_with_policy(
176        mut input: I,
177        order_by: &[SortExpr],
178        policy: Option<MemoryPolicy>,
179    ) -> Result<Self> {
180        let schema = input.schema().to_vec();
181        let mut tracker = policy.clone().map(MemoryTracker::new);
182
183        if order_by.is_empty() {
184            let mut rows = Vec::new();
185            while let Some(result) = input.next_row() {
186                rows.push(result?);
187                if let Some(tracker) = &mut tracker {
188                    let row = rows.last().expect("row just pushed");
189                    tracker.add_row(&row.values)?;
190                }
191            }
192            return Ok(Self {
193                output: SortOutput::InMemory(rows.into_iter()),
194                schema,
195                _marker: PhantomData,
196            });
197        }
198
199        let allow_spill = policy
200            .as_ref()
201            .and_then(|policy| policy.spill_directory())
202            .is_some();
203        let mut runs: Vec<PathBuf> = Vec::new();
204        let mut keyed: Vec<(Row, Vec<SqlValue>)> = Vec::new();
205
206        while let Some(result) = input.next_row() {
207            let row = result?;
208            let mut keys = Vec::with_capacity(order_by.len());
209            for expr in order_by {
210                let ctx = EvalContext::new(&row.values);
211                keys.push(crate::executor::evaluator::evaluate(&expr.expr, &ctx)?);
212            }
213            if let Some(tracker) = &mut tracker {
214                tracker.add_row(&row.values)?;
215                tracker.add_values(&keys)?;
216            }
217            keyed.push((row, keys));
218
219            if allow_spill && tracker.as_ref().map(|t| t.over_limit()).unwrap_or(false) {
220                let policy = policy
221                    .as_ref()
222                    .ok_or_else(|| ExecutorError::InvalidOperation {
223                        operation: "sort spill".into(),
224                        reason: "spill policy missing".into(),
225                    })?;
226                let path = spill_run(&mut keyed, order_by, policy)?;
227                runs.push(path);
228                if let Some(tracker) = &mut tracker {
229                    tracker.reset();
230                }
231            }
232        }
233
234        if runs.is_empty() {
235            keyed.sort_by(|a, b| compare_key_values(&a.1, &b.1, order_by));
236            let sorted: Vec<Row> = keyed.into_iter().map(|(row, _)| row).collect();
237            return Ok(Self {
238                output: SortOutput::InMemory(sorted.into_iter()),
239                schema,
240                _marker: PhantomData,
241            });
242        }
243
244        if !keyed.is_empty() {
245            let policy = policy
246                .as_ref()
247                .ok_or_else(|| ExecutorError::InvalidOperation {
248                    operation: "sort spill".into(),
249                    reason: "spill policy missing".into(),
250                })?;
251            let path = spill_run(&mut keyed, order_by, policy)?;
252            runs.push(path);
253        }
254
255        let external = ExternalSortState::new(order_by.to_vec(), runs)?;
256
257        Ok(Self {
258            output: SortOutput::External(external),
259            schema,
260            _marker: PhantomData,
261        })
262    }
263}
264
265impl<I: RowIterator> RowIterator for SortIterator<I> {
266    fn next_row(&mut self) -> Option<Result<Row>> {
267        match &mut self.output {
268            SortOutput::InMemory(iter) => iter.next().map(Ok),
269            SortOutput::External(state) => state.next_row(),
270        }
271    }
272
273    fn schema(&self) -> &[ColumnMetadata] {
274        &self.schema
275    }
276}
277
278static SPILL_COUNTER: AtomicU64 = AtomicU64::new(0);
279
280fn spill_run(
281    entries: &mut Vec<(Row, Vec<SqlValue>)>,
282    order_by: &[SortExpr],
283    policy: &MemoryPolicy,
284) -> Result<PathBuf> {
285    let directory = policy
286        .spill_directory()
287        .ok_or_else(|| ExecutorError::InvalidOperation {
288            operation: "sort spill".into(),
289            reason: "spill directory not configured".into(),
290        })?;
291    ensure_spill_dir(directory)?;
292    let (path, file) = create_spill_file(directory, "sort-run")?;
293    let mut writer = BufWriter::new(file);
294
295    entries.sort_by(|a, b| compare_key_values(&a.1, &b.1, order_by));
296
297    let mut bytes_written = 0u64;
298    for (row, keys) in entries.iter() {
299        let key_bytes = RowCodec::encode(keys);
300        let row_bytes = RowCodec::encode(&row.values);
301        let key_len =
302            u32::try_from(key_bytes.len()).map_err(|_| ExecutorError::InvalidOperation {
303                operation: "sort spill".into(),
304                reason: "sort key size exceeds u32::MAX".into(),
305            })?;
306        let row_len =
307            u32::try_from(row_bytes.len()).map_err(|_| ExecutorError::InvalidOperation {
308                operation: "sort spill".into(),
309                reason: "row size exceeds u32::MAX".into(),
310            })?;
311
312        writer
313            .write_all(&row.row_id.to_le_bytes())
314            .map_err(|err| spill_io_error("sort spill", err))?;
315        writer
316            .write_all(&key_len.to_le_bytes())
317            .map_err(|err| spill_io_error("sort spill", err))?;
318        writer
319            .write_all(&row_len.to_le_bytes())
320            .map_err(|err| spill_io_error("sort spill", err))?;
321        writer
322            .write_all(&key_bytes)
323            .map_err(|err| spill_io_error("sort spill", err))?;
324        writer
325            .write_all(&row_bytes)
326            .map_err(|err| spill_io_error("sort spill", err))?;
327        bytes_written = bytes_written
328            .saturating_add(8)
329            .saturating_add(4)
330            .saturating_add(4)
331            .saturating_add(key_bytes.len() as u64)
332            .saturating_add(row_bytes.len() as u64);
333    }
334
335    writer
336        .flush()
337        .map_err(|err| spill_io_error("sort spill", err))?;
338    policy.record_spill(bytes_written, 1);
339    entries.clear();
340
341    Ok(path)
342}
343
344fn ensure_spill_dir(directory: &Path) -> Result<()> {
345    fs::create_dir_all(directory).map_err(|err| spill_io_error("sort spill", err))?;
346    Ok(())
347}
348
349fn create_spill_file(directory: &Path, prefix: &str) -> Result<(PathBuf, File)> {
350    for _ in 0..16 {
351        let counter = SPILL_COUNTER.fetch_add(1, AtomicOrdering::Relaxed);
352        let timestamp = std::time::SystemTime::now()
353            .duration_since(std::time::UNIX_EPOCH)
354            .unwrap_or_default()
355            .as_nanos();
356        let path = directory.join(format!("{prefix}-{timestamp}-{counter}.bin"));
357        match OpenOptions::new().create_new(true).write(true).open(&path) {
358            Ok(file) => return Ok((path, file)),
359            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue,
360            Err(err) => return Err(spill_io_error("sort spill", err)),
361        }
362    }
363    Err(ExecutorError::InvalidOperation {
364        operation: "sort spill".into(),
365        reason: "failed to allocate spill file".into(),
366    })
367}
368
369fn spill_io_error(operation: &str, err: impl std::fmt::Display) -> ExecutorError {
370    ExecutorError::InvalidOperation {
371        operation: operation.into(),
372        reason: err.to_string(),
373    }
374}
375
376struct SpillEntry {
377    row: Row,
378    keys: Vec<SqlValue>,
379}
380
381struct SpillRunReader {
382    path: PathBuf,
383    reader: BufReader<File>,
384}
385
386impl SpillRunReader {
387    fn open(path: PathBuf) -> Result<Self> {
388        let file = File::open(&path).map_err(|err| spill_io_error("sort spill", err))?;
389        Ok(Self {
390            path,
391            reader: BufReader::new(file),
392        })
393    }
394
395    fn next_entry(&mut self) -> Result<Option<SpillEntry>> {
396        let mut row_id_buf = [0u8; 8];
397        match self.reader.read_exact(&mut row_id_buf) {
398            Ok(()) => {}
399            Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
400            Err(err) => return Err(spill_io_error("sort spill", err)),
401        }
402        let row_id = u64::from_le_bytes(row_id_buf);
403        let key_len = self.read_u32()?;
404        let row_len = self.read_u32()?;
405
406        let mut key_bytes = vec![0u8; key_len as usize];
407        self.reader
408            .read_exact(&mut key_bytes)
409            .map_err(|err| spill_io_error("sort spill", err))?;
410        let mut row_bytes = vec![0u8; row_len as usize];
411        self.reader
412            .read_exact(&mut row_bytes)
413            .map_err(|err| spill_io_error("sort spill", err))?;
414
415        let keys = RowCodec::decode(&key_bytes).map_err(ExecutorError::Storage)?;
416        let values = RowCodec::decode(&row_bytes).map_err(ExecutorError::Storage)?;
417
418        Ok(Some(SpillEntry {
419            row: Row::new(row_id, values),
420            keys,
421        }))
422    }
423
424    fn read_u32(&mut self) -> Result<u32> {
425        let mut buf = [0u8; 4];
426        self.reader
427            .read_exact(&mut buf)
428            .map_err(|err| spill_io_error("sort spill", err))?;
429        Ok(u32::from_le_bytes(buf))
430    }
431}
432
433impl Drop for SpillRunReader {
434    fn drop(&mut self) {
435        let _ = fs::remove_file(&self.path);
436    }
437}
438
439struct ExternalSortState {
440    order_by: Arc<Vec<SortExpr>>,
441    readers: Vec<SpillRunReader>,
442    heap: BinaryHeap<SpillHeapItem>,
443}
444
445impl ExternalSortState {
446    fn new(order_by: Vec<SortExpr>, runs: Vec<PathBuf>) -> Result<Self> {
447        let order_by = Arc::new(order_by);
448        let mut readers = Vec::with_capacity(runs.len());
449        let mut heap = BinaryHeap::new();
450
451        for (idx, path) in runs.into_iter().enumerate() {
452            let mut reader = SpillRunReader::open(path)?;
453            if let Some(entry) = reader.next_entry()? {
454                heap.push(SpillHeapItem {
455                    run_idx: idx,
456                    row: entry.row,
457                    keys: entry.keys,
458                    order_by: Arc::clone(&order_by),
459                });
460            }
461            readers.push(reader);
462        }
463
464        Ok(Self {
465            order_by,
466            readers,
467            heap,
468        })
469    }
470
471    fn next_row(&mut self) -> Option<Result<Row>> {
472        let item = self.heap.pop()?;
473        let row = item.row;
474        let run_idx = item.run_idx;
475
476        match self.readers[run_idx].next_entry() {
477            Ok(Some(entry)) => {
478                self.heap.push(SpillHeapItem {
479                    run_idx,
480                    row: entry.row,
481                    keys: entry.keys,
482                    order_by: Arc::clone(&self.order_by),
483                });
484            }
485            Ok(None) => {}
486            Err(err) => return Some(Err(err)),
487        }
488
489        Some(Ok(row))
490    }
491}
492
493#[derive(Clone)]
494struct SpillHeapItem {
495    run_idx: usize,
496    row: Row,
497    keys: Vec<SqlValue>,
498    order_by: Arc<Vec<SortExpr>>,
499}
500
501impl PartialEq for SpillHeapItem {
502    fn eq(&self, other: &Self) -> bool {
503        compare_key_values(&self.keys, &other.keys, &self.order_by) == Ordering::Equal
504            && self.run_idx == other.run_idx
505            && self.row.row_id == other.row.row_id
506    }
507}
508
509impl Eq for SpillHeapItem {}
510
511impl PartialOrd for SpillHeapItem {
512    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
513        Some(self.cmp(other))
514    }
515}
516
517impl Ord for SpillHeapItem {
518    fn cmp(&self, other: &Self) -> Ordering {
519        let order = compare_key_values(&self.keys, &other.keys, &self.order_by);
520        let order = if order == Ordering::Equal {
521            self.run_idx
522                .cmp(&other.run_idx)
523                .then_with(|| self.row.row_id.cmp(&other.row.row_id))
524        } else {
525            order
526        };
527        order.reverse()
528    }
529}
530
531fn compare_key_values(a: &[SqlValue], b: &[SqlValue], order_by: &[SortExpr]) -> Ordering {
532    for (i, sort_expr) in order_by.iter().enumerate() {
533        let left = &a[i];
534        let right = &b[i];
535        let cmp = compare_single(left, right, sort_expr.asc, sort_expr.nulls_first);
536        if cmp != Ordering::Equal {
537            return cmp;
538        }
539    }
540    Ordering::Equal
541}
542
543/// Compare two SqlValues according to sort direction and NULL ordering.
544fn compare_single(left: &SqlValue, right: &SqlValue, asc: bool, nulls_first: bool) -> Ordering {
545    match (left, right) {
546        (SqlValue::Null, SqlValue::Null) => Ordering::Equal,
547        (SqlValue::Null, _) => {
548            if nulls_first {
549                Ordering::Less
550            } else {
551                Ordering::Greater
552            }
553        }
554        (_, SqlValue::Null) => {
555            if nulls_first {
556                Ordering::Greater
557            } else {
558                Ordering::Less
559            }
560        }
561        _ => match left.partial_cmp(right).unwrap_or(Ordering::Equal) {
562            Ordering::Equal => Ordering::Equal,
563            ord if asc => ord,
564            ord => ord.reverse(),
565        },
566    }
567}
568
569// ============================================================================
570// LimitIterator - Applies LIMIT and OFFSET
571// ============================================================================
572
573/// Iterator that applies LIMIT and OFFSET constraints.
574///
575/// This iterator skips the first `offset` rows and yields at most `limit` rows.
576/// It provides early termination - once the limit is reached, no more rows
577/// are requested from the input.
578pub struct LimitIterator<I: RowIterator> {
579    input: I,
580    limit: Option<u64>,
581    offset: u64,
582    /// Number of rows skipped so far (for OFFSET).
583    skipped: u64,
584    /// Number of rows yielded so far (for LIMIT).
585    yielded: u64,
586}
587
588impl<I: RowIterator> LimitIterator<I> {
589    /// Creates a new limit iterator with the given LIMIT and OFFSET.
590    pub fn new(input: I, limit: Option<u64>, offset: Option<u64>) -> Self {
591        Self {
592            input,
593            limit,
594            offset: offset.unwrap_or(0),
595            skipped: 0,
596            yielded: 0,
597        }
598    }
599}
600
601impl<I: RowIterator> RowIterator for LimitIterator<I> {
602    fn next_row(&mut self) -> Option<Result<Row>> {
603        // Check if limit already reached
604        if let Some(limit) = self.limit
605            && self.yielded >= limit
606        {
607            return None;
608        }
609
610        loop {
611            match self.input.next_row()? {
612                Ok(row) => {
613                    // Skip rows for OFFSET
614                    if self.skipped < self.offset {
615                        self.skipped += 1;
616                        continue;
617                    }
618
619                    // Check limit again after skipping
620                    if let Some(limit) = self.limit
621                        && self.yielded >= limit
622                    {
623                        return None;
624                    }
625
626                    self.yielded += 1;
627                    return Some(Ok(row));
628                }
629                Err(e) => return Some(Err(e)),
630            }
631        }
632    }
633
634    fn schema(&self) -> &[ColumnMetadata] {
635        self.input.schema()
636    }
637}
638
639// ============================================================================
640// VecIterator - Wraps a Vec<Row> for testing and compatibility
641// ============================================================================
642
643/// Iterator that wraps a `Vec<Row>` for testing and compatibility.
644///
645/// This is useful for converting materialized results back into an iterator
646/// or for testing iterator-based code with fixed data.
647pub struct VecIterator {
648    rows: std::vec::IntoIter<Row>,
649    schema: Vec<ColumnMetadata>,
650}
651
652impl VecIterator {
653    /// Creates a new vec iterator from rows and schema.
654    pub fn new(rows: Vec<Row>, schema: Vec<ColumnMetadata>) -> Self {
655        Self {
656            rows: rows.into_iter(),
657            schema,
658        }
659    }
660}
661
662impl RowIterator for VecIterator {
663    fn next_row(&mut self) -> Option<Result<Row>> {
664        self.rows.next().map(Ok)
665    }
666
667    fn schema(&self) -> &[ColumnMetadata] {
668        &self.schema
669    }
670}
671
672// ============================================================================
673// Tests
674// ============================================================================
675
676#[cfg(test)]
677mod tests {
678    use super::*;
679    use crate::Span;
680    use crate::planner::types::ResolvedType;
681
682    fn sample_schema() -> Vec<ColumnMetadata> {
683        vec![
684            ColumnMetadata::new("id", ResolvedType::Integer),
685            ColumnMetadata::new("name", ResolvedType::Text),
686        ]
687    }
688
689    fn sample_rows() -> Vec<Row> {
690        vec![
691            Row::new(
692                1,
693                vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
694            ),
695            Row::new(2, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
696            Row::new(
697                3,
698                vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
699            ),
700            Row::new(4, vec![SqlValue::Integer(4), SqlValue::Text("dave".into())]),
701            Row::new(5, vec![SqlValue::Integer(5), SqlValue::Text("eve".into())]),
702        ]
703    }
704
705    #[test]
706    fn vec_iterator_returns_all_rows() {
707        let rows = sample_rows();
708        let expected_len = rows.len();
709        let mut iter = VecIterator::new(rows, sample_schema());
710
711        let mut count = 0;
712        while let Some(Ok(_)) = iter.next_row() {
713            count += 1;
714        }
715        assert_eq!(count, expected_len);
716    }
717
718    #[test]
719    fn filter_iterator_filters_rows() {
720        use crate::ast::expr::BinaryOp;
721        use crate::planner::typed_expr::{TypedExpr, TypedExprKind};
722
723        let rows = sample_rows();
724        let schema = sample_schema();
725        let input = VecIterator::new(rows, schema);
726
727        // Filter: id > 2
728        let predicate = TypedExpr {
729            kind: TypedExprKind::BinaryOp {
730                left: Box::new(TypedExpr {
731                    kind: TypedExprKind::ColumnRef {
732                        table: "test".into(),
733                        column: "id".into(),
734                        column_index: 0,
735                    },
736                    resolved_type: ResolvedType::Integer,
737                    span: Span::default(),
738                }),
739                op: BinaryOp::Gt,
740                right: Box::new(TypedExpr::literal(
741                    crate::ast::expr::Literal::Number("2".into()),
742                    ResolvedType::Integer,
743                    Span::default(),
744                )),
745            },
746            resolved_type: ResolvedType::Boolean,
747            span: Span::default(),
748        };
749
750        let mut filter = FilterIterator::new(input, predicate);
751
752        let mut results = Vec::new();
753        while let Some(Ok(row)) = filter.next_row() {
754            results.push(row);
755        }
756
757        assert_eq!(results.len(), 3);
758        assert_eq!(results[0].row_id, 3);
759        assert_eq!(results[1].row_id, 4);
760        assert_eq!(results[2].row_id, 5);
761    }
762
763    #[test]
764    fn limit_iterator_limits_rows() {
765        let rows = sample_rows();
766        let schema = sample_schema();
767        let input = VecIterator::new(rows, schema);
768
769        let mut limit = LimitIterator::new(input, Some(2), None);
770
771        let mut results = Vec::new();
772        while let Some(Ok(row)) = limit.next_row() {
773            results.push(row);
774        }
775
776        assert_eq!(results.len(), 2);
777        assert_eq!(results[0].row_id, 1);
778        assert_eq!(results[1].row_id, 2);
779    }
780
781    #[test]
782    fn limit_iterator_applies_offset() {
783        let rows = sample_rows();
784        let schema = sample_schema();
785        let input = VecIterator::new(rows, schema);
786
787        let mut limit = LimitIterator::new(input, Some(2), Some(2));
788
789        let mut results = Vec::new();
790        while let Some(Ok(row)) = limit.next_row() {
791            results.push(row);
792        }
793
794        assert_eq!(results.len(), 2);
795        assert_eq!(results[0].row_id, 3);
796        assert_eq!(results[1].row_id, 4);
797    }
798
799    #[test]
800    fn limit_iterator_offset_only() {
801        let rows = sample_rows();
802        let schema = sample_schema();
803        let input = VecIterator::new(rows, schema);
804
805        let mut limit = LimitIterator::new(input, None, Some(3));
806
807        let mut results = Vec::new();
808        while let Some(Ok(row)) = limit.next_row() {
809            results.push(row);
810        }
811
812        assert_eq!(results.len(), 2);
813        assert_eq!(results[0].row_id, 4);
814        assert_eq!(results[1].row_id, 5);
815    }
816
817    #[test]
818    fn sort_iterator_sorts_rows() {
819        use crate::planner::typed_expr::{SortExpr, TypedExpr, TypedExprKind};
820
821        let rows = vec![
822            Row::new(
823                1,
824                vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
825            ),
826            Row::new(
827                2,
828                vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
829            ),
830            Row::new(3, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
831        ];
832        let schema = sample_schema();
833        let input = VecIterator::new(rows, schema);
834
835        // Sort by id ASC
836        let order_by = vec![SortExpr {
837            expr: TypedExpr {
838                kind: TypedExprKind::ColumnRef {
839                    table: "test".into(),
840                    column: "id".into(),
841                    column_index: 0,
842                },
843                resolved_type: ResolvedType::Integer,
844                span: Span::default(),
845            },
846            asc: true,
847            nulls_first: false,
848        }];
849
850        let mut sort = SortIterator::new(input, &order_by).unwrap();
851
852        let mut results = Vec::new();
853        while let Some(Ok(row)) = sort.next_row() {
854            results.push(row);
855        }
856
857        assert_eq!(results.len(), 3);
858        assert_eq!(results[0].values[0], SqlValue::Integer(1));
859        assert_eq!(results[1].values[0], SqlValue::Integer(2));
860        assert_eq!(results[2].values[0], SqlValue::Integer(3));
861    }
862
863    #[test]
864    fn sort_iterator_sorts_descending() {
865        use crate::planner::typed_expr::{SortExpr, TypedExpr, TypedExprKind};
866
867        let rows = vec![
868            Row::new(
869                1,
870                vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
871            ),
872            Row::new(
873                2,
874                vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
875            ),
876            Row::new(3, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
877        ];
878        let schema = sample_schema();
879        let input = VecIterator::new(rows, schema);
880
881        // Sort by id DESC
882        let order_by = vec![SortExpr {
883            expr: TypedExpr {
884                kind: TypedExprKind::ColumnRef {
885                    table: "test".into(),
886                    column: "id".into(),
887                    column_index: 0,
888                },
889                resolved_type: ResolvedType::Integer,
890                span: Span::default(),
891            },
892            asc: false,
893            nulls_first: false,
894        }];
895
896        let mut sort = SortIterator::new(input, &order_by).unwrap();
897
898        let mut results = Vec::new();
899        while let Some(Ok(row)) = sort.next_row() {
900            results.push(row);
901        }
902
903        assert_eq!(results.len(), 3);
904        assert_eq!(results[0].values[0], SqlValue::Integer(3));
905        assert_eq!(results[1].values[0], SqlValue::Integer(2));
906        assert_eq!(results[2].values[0], SqlValue::Integer(1));
907    }
908
909    #[test]
910    fn composed_pipeline_filter_then_limit() {
911        use crate::ast::expr::BinaryOp;
912        use crate::planner::typed_expr::{TypedExpr, TypedExprKind};
913
914        let rows = sample_rows();
915        let schema = sample_schema();
916        let input = VecIterator::new(rows, schema);
917
918        // Filter: id > 1
919        let predicate = TypedExpr {
920            kind: TypedExprKind::BinaryOp {
921                left: Box::new(TypedExpr {
922                    kind: TypedExprKind::ColumnRef {
923                        table: "test".into(),
924                        column: "id".into(),
925                        column_index: 0,
926                    },
927                    resolved_type: ResolvedType::Integer,
928                    span: Span::default(),
929                }),
930                op: BinaryOp::Gt,
931                right: Box::new(TypedExpr::literal(
932                    crate::ast::expr::Literal::Number("1".into()),
933                    ResolvedType::Integer,
934                    Span::default(),
935                )),
936            },
937            resolved_type: ResolvedType::Boolean,
938            span: Span::default(),
939        };
940
941        let filtered = FilterIterator::new(input, predicate);
942        let mut limited = LimitIterator::new(filtered, Some(2), None);
943
944        let mut results = Vec::new();
945        while let Some(Ok(row)) = limited.next_row() {
946            results.push(row);
947        }
948
949        // Should get rows 2, 3 (id > 1, then limit 2)
950        assert_eq!(results.len(), 2);
951        assert_eq!(results[0].row_id, 2);
952        assert_eq!(results[1].row_id, 3);
953    }
954}