1use std::cmp::Ordering;
18use std::collections::BinaryHeap;
19use std::fs::{self, File, OpenOptions};
20use std::io::{BufReader, BufWriter, Read, Write};
21use std::marker::PhantomData;
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
25
26use crate::catalog::{ColumnMetadata, TableMetadata};
27use crate::executor::evaluator::EvalContext;
28use crate::executor::memory::{MemoryPolicy, MemoryTracker};
29use crate::executor::{ExecutorError, Result, Row};
30use crate::planner::typed_expr::{SortExpr, TypedExpr};
31use crate::storage::{RowCodec, SqlValue, TableScanIterator};
32
33pub trait RowIterator {
38 fn next_row(&mut self) -> Option<Result<Row>>;
45
46 fn schema(&self) -> &[ColumnMetadata];
48}
49
50impl RowIterator for Box<dyn RowIterator + '_> {
52 fn next_row(&mut self) -> Option<Result<Row>> {
53 (**self).next_row()
54 }
55
56 fn schema(&self) -> &[ColumnMetadata] {
57 (**self).schema()
58 }
59}
60
61pub struct ScanIterator<'a> {
70 inner: TableScanIterator<'a>,
71 schema: Vec<ColumnMetadata>,
72}
73
74impl<'a> ScanIterator<'a> {
75 pub fn new(inner: TableScanIterator<'a>, table_meta: &TableMetadata) -> Self {
77 Self {
78 inner,
79 schema: table_meta.columns.clone(),
80 }
81 }
82}
83
84impl RowIterator for ScanIterator<'_> {
85 fn next_row(&mut self) -> Option<Result<Row>> {
86 self.inner.next().map(|result| {
87 result
88 .map(|(row_id, values)| Row::new(row_id, values))
89 .map_err(ExecutorError::from)
90 })
91 }
92
93 fn schema(&self) -> &[ColumnMetadata] {
94 &self.schema
95 }
96}
97
98pub struct FilterIterator<I: RowIterator> {
107 input: I,
108 predicate: TypedExpr,
109}
110
111impl<I: RowIterator> FilterIterator<I> {
112 pub fn new(input: I, predicate: TypedExpr) -> Self {
114 Self { input, predicate }
115 }
116}
117
118impl<I: RowIterator> RowIterator for FilterIterator<I> {
119 fn next_row(&mut self) -> Option<Result<Row>> {
120 loop {
121 match self.input.next_row()? {
122 Ok(row) => {
123 let ctx = EvalContext::new(&row.values);
124 match crate::executor::evaluator::evaluate(&self.predicate, &ctx) {
125 Ok(SqlValue::Boolean(true)) => return Some(Ok(row)),
126 Ok(_) => continue, Err(e) => return Some(Err(e)),
128 }
129 }
130 Err(e) => return Some(Err(e)),
131 }
132 }
133 }
134
135 fn schema(&self) -> &[ColumnMetadata] {
136 self.input.schema()
137 }
138}
139
140pub struct SortIterator<I: RowIterator> {
150 output: SortOutput,
151 schema: Vec<ColumnMetadata>,
153 _marker: PhantomData<I>,
155}
156
157enum SortOutput {
158 InMemory(std::vec::IntoIter<Row>),
159 External(ExternalSortState),
160}
161
162impl<I: RowIterator> SortIterator<I> {
163 pub fn new(input: I, order_by: &[SortExpr]) -> Result<Self> {
171 Self::new_with_policy(input, order_by, None)
172 }
173
174 pub fn new_with_policy(
176 mut input: I,
177 order_by: &[SortExpr],
178 policy: Option<MemoryPolicy>,
179 ) -> Result<Self> {
180 let schema = input.schema().to_vec();
181 let mut tracker = policy.clone().map(MemoryTracker::new);
182
183 if order_by.is_empty() {
184 let mut rows = Vec::new();
185 while let Some(result) = input.next_row() {
186 rows.push(result?);
187 if let Some(tracker) = &mut tracker {
188 let row = rows.last().expect("row just pushed");
189 tracker.add_row(&row.values)?;
190 }
191 }
192 return Ok(Self {
193 output: SortOutput::InMemory(rows.into_iter()),
194 schema,
195 _marker: PhantomData,
196 });
197 }
198
199 let allow_spill = policy
200 .as_ref()
201 .and_then(|policy| policy.spill_directory())
202 .is_some();
203 let mut runs: Vec<PathBuf> = Vec::new();
204 let mut keyed: Vec<(Row, Vec<SqlValue>)> = Vec::new();
205
206 while let Some(result) = input.next_row() {
207 let row = result?;
208 let mut keys = Vec::with_capacity(order_by.len());
209 for expr in order_by {
210 let ctx = EvalContext::new(&row.values);
211 keys.push(crate::executor::evaluator::evaluate(&expr.expr, &ctx)?);
212 }
213 if let Some(tracker) = &mut tracker {
214 tracker.add_row(&row.values)?;
215 tracker.add_values(&keys)?;
216 }
217 keyed.push((row, keys));
218
219 if allow_spill && tracker.as_ref().map(|t| t.over_limit()).unwrap_or(false) {
220 let policy = policy
221 .as_ref()
222 .ok_or_else(|| ExecutorError::InvalidOperation {
223 operation: "sort spill".into(),
224 reason: "spill policy missing".into(),
225 })?;
226 let path = spill_run(&mut keyed, order_by, policy)?;
227 runs.push(path);
228 if let Some(tracker) = &mut tracker {
229 tracker.reset();
230 }
231 }
232 }
233
234 if runs.is_empty() {
235 keyed.sort_by(|a, b| compare_key_values(&a.1, &b.1, order_by));
236 let sorted: Vec<Row> = keyed.into_iter().map(|(row, _)| row).collect();
237 return Ok(Self {
238 output: SortOutput::InMemory(sorted.into_iter()),
239 schema,
240 _marker: PhantomData,
241 });
242 }
243
244 if !keyed.is_empty() {
245 let policy = policy
246 .as_ref()
247 .ok_or_else(|| ExecutorError::InvalidOperation {
248 operation: "sort spill".into(),
249 reason: "spill policy missing".into(),
250 })?;
251 let path = spill_run(&mut keyed, order_by, policy)?;
252 runs.push(path);
253 }
254
255 let external = ExternalSortState::new(order_by.to_vec(), runs)?;
256
257 Ok(Self {
258 output: SortOutput::External(external),
259 schema,
260 _marker: PhantomData,
261 })
262 }
263}
264
265impl<I: RowIterator> RowIterator for SortIterator<I> {
266 fn next_row(&mut self) -> Option<Result<Row>> {
267 match &mut self.output {
268 SortOutput::InMemory(iter) => iter.next().map(Ok),
269 SortOutput::External(state) => state.next_row(),
270 }
271 }
272
273 fn schema(&self) -> &[ColumnMetadata] {
274 &self.schema
275 }
276}
277
278static SPILL_COUNTER: AtomicU64 = AtomicU64::new(0);
279
280fn spill_run(
281 entries: &mut Vec<(Row, Vec<SqlValue>)>,
282 order_by: &[SortExpr],
283 policy: &MemoryPolicy,
284) -> Result<PathBuf> {
285 let directory = policy
286 .spill_directory()
287 .ok_or_else(|| ExecutorError::InvalidOperation {
288 operation: "sort spill".into(),
289 reason: "spill directory not configured".into(),
290 })?;
291 ensure_spill_dir(directory)?;
292 let (path, file) = create_spill_file(directory, "sort-run")?;
293 let mut writer = BufWriter::new(file);
294
295 entries.sort_by(|a, b| compare_key_values(&a.1, &b.1, order_by));
296
297 let mut bytes_written = 0u64;
298 for (row, keys) in entries.iter() {
299 let key_bytes = RowCodec::encode(keys);
300 let row_bytes = RowCodec::encode(&row.values);
301 let key_len =
302 u32::try_from(key_bytes.len()).map_err(|_| ExecutorError::InvalidOperation {
303 operation: "sort spill".into(),
304 reason: "sort key size exceeds u32::MAX".into(),
305 })?;
306 let row_len =
307 u32::try_from(row_bytes.len()).map_err(|_| ExecutorError::InvalidOperation {
308 operation: "sort spill".into(),
309 reason: "row size exceeds u32::MAX".into(),
310 })?;
311
312 writer
313 .write_all(&row.row_id.to_le_bytes())
314 .map_err(|err| spill_io_error("sort spill", err))?;
315 writer
316 .write_all(&key_len.to_le_bytes())
317 .map_err(|err| spill_io_error("sort spill", err))?;
318 writer
319 .write_all(&row_len.to_le_bytes())
320 .map_err(|err| spill_io_error("sort spill", err))?;
321 writer
322 .write_all(&key_bytes)
323 .map_err(|err| spill_io_error("sort spill", err))?;
324 writer
325 .write_all(&row_bytes)
326 .map_err(|err| spill_io_error("sort spill", err))?;
327 bytes_written = bytes_written
328 .saturating_add(8)
329 .saturating_add(4)
330 .saturating_add(4)
331 .saturating_add(key_bytes.len() as u64)
332 .saturating_add(row_bytes.len() as u64);
333 }
334
335 writer
336 .flush()
337 .map_err(|err| spill_io_error("sort spill", err))?;
338 policy.record_spill(bytes_written, 1);
339 entries.clear();
340
341 Ok(path)
342}
343
344fn ensure_spill_dir(directory: &Path) -> Result<()> {
345 fs::create_dir_all(directory).map_err(|err| spill_io_error("sort spill", err))?;
346 Ok(())
347}
348
349fn create_spill_file(directory: &Path, prefix: &str) -> Result<(PathBuf, File)> {
350 for _ in 0..16 {
351 let counter = SPILL_COUNTER.fetch_add(1, AtomicOrdering::Relaxed);
352 let timestamp = std::time::SystemTime::now()
353 .duration_since(std::time::UNIX_EPOCH)
354 .unwrap_or_default()
355 .as_nanos();
356 let path = directory.join(format!("{prefix}-{timestamp}-{counter}.bin"));
357 match OpenOptions::new().create_new(true).write(true).open(&path) {
358 Ok(file) => return Ok((path, file)),
359 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue,
360 Err(err) => return Err(spill_io_error("sort spill", err)),
361 }
362 }
363 Err(ExecutorError::InvalidOperation {
364 operation: "sort spill".into(),
365 reason: "failed to allocate spill file".into(),
366 })
367}
368
369fn spill_io_error(operation: &str, err: impl std::fmt::Display) -> ExecutorError {
370 ExecutorError::InvalidOperation {
371 operation: operation.into(),
372 reason: err.to_string(),
373 }
374}
375
376struct SpillEntry {
377 row: Row,
378 keys: Vec<SqlValue>,
379}
380
381struct SpillRunReader {
382 path: PathBuf,
383 reader: BufReader<File>,
384}
385
386impl SpillRunReader {
387 fn open(path: PathBuf) -> Result<Self> {
388 let file = File::open(&path).map_err(|err| spill_io_error("sort spill", err))?;
389 Ok(Self {
390 path,
391 reader: BufReader::new(file),
392 })
393 }
394
395 fn next_entry(&mut self) -> Result<Option<SpillEntry>> {
396 let mut row_id_buf = [0u8; 8];
397 match self.reader.read_exact(&mut row_id_buf) {
398 Ok(()) => {}
399 Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
400 Err(err) => return Err(spill_io_error("sort spill", err)),
401 }
402 let row_id = u64::from_le_bytes(row_id_buf);
403 let key_len = self.read_u32()?;
404 let row_len = self.read_u32()?;
405
406 let mut key_bytes = vec![0u8; key_len as usize];
407 self.reader
408 .read_exact(&mut key_bytes)
409 .map_err(|err| spill_io_error("sort spill", err))?;
410 let mut row_bytes = vec![0u8; row_len as usize];
411 self.reader
412 .read_exact(&mut row_bytes)
413 .map_err(|err| spill_io_error("sort spill", err))?;
414
415 let keys = RowCodec::decode(&key_bytes).map_err(ExecutorError::Storage)?;
416 let values = RowCodec::decode(&row_bytes).map_err(ExecutorError::Storage)?;
417
418 Ok(Some(SpillEntry {
419 row: Row::new(row_id, values),
420 keys,
421 }))
422 }
423
424 fn read_u32(&mut self) -> Result<u32> {
425 let mut buf = [0u8; 4];
426 self.reader
427 .read_exact(&mut buf)
428 .map_err(|err| spill_io_error("sort spill", err))?;
429 Ok(u32::from_le_bytes(buf))
430 }
431}
432
433impl Drop for SpillRunReader {
434 fn drop(&mut self) {
435 let _ = fs::remove_file(&self.path);
436 }
437}
438
439struct ExternalSortState {
440 order_by: Arc<Vec<SortExpr>>,
441 readers: Vec<SpillRunReader>,
442 heap: BinaryHeap<SpillHeapItem>,
443}
444
445impl ExternalSortState {
446 fn new(order_by: Vec<SortExpr>, runs: Vec<PathBuf>) -> Result<Self> {
447 let order_by = Arc::new(order_by);
448 let mut readers = Vec::with_capacity(runs.len());
449 let mut heap = BinaryHeap::new();
450
451 for (idx, path) in runs.into_iter().enumerate() {
452 let mut reader = SpillRunReader::open(path)?;
453 if let Some(entry) = reader.next_entry()? {
454 heap.push(SpillHeapItem {
455 run_idx: idx,
456 row: entry.row,
457 keys: entry.keys,
458 order_by: Arc::clone(&order_by),
459 });
460 }
461 readers.push(reader);
462 }
463
464 Ok(Self {
465 order_by,
466 readers,
467 heap,
468 })
469 }
470
471 fn next_row(&mut self) -> Option<Result<Row>> {
472 let item = self.heap.pop()?;
473 let row = item.row;
474 let run_idx = item.run_idx;
475
476 match self.readers[run_idx].next_entry() {
477 Ok(Some(entry)) => {
478 self.heap.push(SpillHeapItem {
479 run_idx,
480 row: entry.row,
481 keys: entry.keys,
482 order_by: Arc::clone(&self.order_by),
483 });
484 }
485 Ok(None) => {}
486 Err(err) => return Some(Err(err)),
487 }
488
489 Some(Ok(row))
490 }
491}
492
493#[derive(Clone)]
494struct SpillHeapItem {
495 run_idx: usize,
496 row: Row,
497 keys: Vec<SqlValue>,
498 order_by: Arc<Vec<SortExpr>>,
499}
500
501impl PartialEq for SpillHeapItem {
502 fn eq(&self, other: &Self) -> bool {
503 compare_key_values(&self.keys, &other.keys, &self.order_by) == Ordering::Equal
504 && self.run_idx == other.run_idx
505 && self.row.row_id == other.row.row_id
506 }
507}
508
509impl Eq for SpillHeapItem {}
510
511impl PartialOrd for SpillHeapItem {
512 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
513 Some(self.cmp(other))
514 }
515}
516
517impl Ord for SpillHeapItem {
518 fn cmp(&self, other: &Self) -> Ordering {
519 let order = compare_key_values(&self.keys, &other.keys, &self.order_by);
520 let order = if order == Ordering::Equal {
521 self.run_idx
522 .cmp(&other.run_idx)
523 .then_with(|| self.row.row_id.cmp(&other.row.row_id))
524 } else {
525 order
526 };
527 order.reverse()
528 }
529}
530
531fn compare_key_values(a: &[SqlValue], b: &[SqlValue], order_by: &[SortExpr]) -> Ordering {
532 for (i, sort_expr) in order_by.iter().enumerate() {
533 let left = &a[i];
534 let right = &b[i];
535 let cmp = compare_single(left, right, sort_expr.asc, sort_expr.nulls_first);
536 if cmp != Ordering::Equal {
537 return cmp;
538 }
539 }
540 Ordering::Equal
541}
542
543fn compare_single(left: &SqlValue, right: &SqlValue, asc: bool, nulls_first: bool) -> Ordering {
545 match (left, right) {
546 (SqlValue::Null, SqlValue::Null) => Ordering::Equal,
547 (SqlValue::Null, _) => {
548 if nulls_first {
549 Ordering::Less
550 } else {
551 Ordering::Greater
552 }
553 }
554 (_, SqlValue::Null) => {
555 if nulls_first {
556 Ordering::Greater
557 } else {
558 Ordering::Less
559 }
560 }
561 _ => match left.partial_cmp(right).unwrap_or(Ordering::Equal) {
562 Ordering::Equal => Ordering::Equal,
563 ord if asc => ord,
564 ord => ord.reverse(),
565 },
566 }
567}
568
569pub struct LimitIterator<I: RowIterator> {
579 input: I,
580 limit: Option<u64>,
581 offset: u64,
582 skipped: u64,
584 yielded: u64,
586}
587
588impl<I: RowIterator> LimitIterator<I> {
589 pub fn new(input: I, limit: Option<u64>, offset: Option<u64>) -> Self {
591 Self {
592 input,
593 limit,
594 offset: offset.unwrap_or(0),
595 skipped: 0,
596 yielded: 0,
597 }
598 }
599}
600
601impl<I: RowIterator> RowIterator for LimitIterator<I> {
602 fn next_row(&mut self) -> Option<Result<Row>> {
603 if let Some(limit) = self.limit
605 && self.yielded >= limit
606 {
607 return None;
608 }
609
610 loop {
611 match self.input.next_row()? {
612 Ok(row) => {
613 if self.skipped < self.offset {
615 self.skipped += 1;
616 continue;
617 }
618
619 if let Some(limit) = self.limit
621 && self.yielded >= limit
622 {
623 return None;
624 }
625
626 self.yielded += 1;
627 return Some(Ok(row));
628 }
629 Err(e) => return Some(Err(e)),
630 }
631 }
632 }
633
634 fn schema(&self) -> &[ColumnMetadata] {
635 self.input.schema()
636 }
637}
638
639pub struct VecIterator {
648 rows: std::vec::IntoIter<Row>,
649 schema: Vec<ColumnMetadata>,
650}
651
652impl VecIterator {
653 pub fn new(rows: Vec<Row>, schema: Vec<ColumnMetadata>) -> Self {
655 Self {
656 rows: rows.into_iter(),
657 schema,
658 }
659 }
660}
661
662impl RowIterator for VecIterator {
663 fn next_row(&mut self) -> Option<Result<Row>> {
664 self.rows.next().map(Ok)
665 }
666
667 fn schema(&self) -> &[ColumnMetadata] {
668 &self.schema
669 }
670}
671
672#[cfg(test)]
677mod tests {
678 use super::*;
679 use crate::Span;
680 use crate::planner::types::ResolvedType;
681
682 fn sample_schema() -> Vec<ColumnMetadata> {
683 vec![
684 ColumnMetadata::new("id", ResolvedType::Integer),
685 ColumnMetadata::new("name", ResolvedType::Text),
686 ]
687 }
688
689 fn sample_rows() -> Vec<Row> {
690 vec![
691 Row::new(
692 1,
693 vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
694 ),
695 Row::new(2, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
696 Row::new(
697 3,
698 vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
699 ),
700 Row::new(4, vec![SqlValue::Integer(4), SqlValue::Text("dave".into())]),
701 Row::new(5, vec![SqlValue::Integer(5), SqlValue::Text("eve".into())]),
702 ]
703 }
704
705 #[test]
706 fn vec_iterator_returns_all_rows() {
707 let rows = sample_rows();
708 let expected_len = rows.len();
709 let mut iter = VecIterator::new(rows, sample_schema());
710
711 let mut count = 0;
712 while let Some(Ok(_)) = iter.next_row() {
713 count += 1;
714 }
715 assert_eq!(count, expected_len);
716 }
717
718 #[test]
719 fn filter_iterator_filters_rows() {
720 use crate::ast::expr::BinaryOp;
721 use crate::planner::typed_expr::{TypedExpr, TypedExprKind};
722
723 let rows = sample_rows();
724 let schema = sample_schema();
725 let input = VecIterator::new(rows, schema);
726
727 let predicate = TypedExpr {
729 kind: TypedExprKind::BinaryOp {
730 left: Box::new(TypedExpr {
731 kind: TypedExprKind::ColumnRef {
732 table: "test".into(),
733 column: "id".into(),
734 column_index: 0,
735 },
736 resolved_type: ResolvedType::Integer,
737 span: Span::default(),
738 }),
739 op: BinaryOp::Gt,
740 right: Box::new(TypedExpr::literal(
741 crate::ast::expr::Literal::Number("2".into()),
742 ResolvedType::Integer,
743 Span::default(),
744 )),
745 },
746 resolved_type: ResolvedType::Boolean,
747 span: Span::default(),
748 };
749
750 let mut filter = FilterIterator::new(input, predicate);
751
752 let mut results = Vec::new();
753 while let Some(Ok(row)) = filter.next_row() {
754 results.push(row);
755 }
756
757 assert_eq!(results.len(), 3);
758 assert_eq!(results[0].row_id, 3);
759 assert_eq!(results[1].row_id, 4);
760 assert_eq!(results[2].row_id, 5);
761 }
762
763 #[test]
764 fn limit_iterator_limits_rows() {
765 let rows = sample_rows();
766 let schema = sample_schema();
767 let input = VecIterator::new(rows, schema);
768
769 let mut limit = LimitIterator::new(input, Some(2), None);
770
771 let mut results = Vec::new();
772 while let Some(Ok(row)) = limit.next_row() {
773 results.push(row);
774 }
775
776 assert_eq!(results.len(), 2);
777 assert_eq!(results[0].row_id, 1);
778 assert_eq!(results[1].row_id, 2);
779 }
780
781 #[test]
782 fn limit_iterator_applies_offset() {
783 let rows = sample_rows();
784 let schema = sample_schema();
785 let input = VecIterator::new(rows, schema);
786
787 let mut limit = LimitIterator::new(input, Some(2), Some(2));
788
789 let mut results = Vec::new();
790 while let Some(Ok(row)) = limit.next_row() {
791 results.push(row);
792 }
793
794 assert_eq!(results.len(), 2);
795 assert_eq!(results[0].row_id, 3);
796 assert_eq!(results[1].row_id, 4);
797 }
798
799 #[test]
800 fn limit_iterator_offset_only() {
801 let rows = sample_rows();
802 let schema = sample_schema();
803 let input = VecIterator::new(rows, schema);
804
805 let mut limit = LimitIterator::new(input, None, Some(3));
806
807 let mut results = Vec::new();
808 while let Some(Ok(row)) = limit.next_row() {
809 results.push(row);
810 }
811
812 assert_eq!(results.len(), 2);
813 assert_eq!(results[0].row_id, 4);
814 assert_eq!(results[1].row_id, 5);
815 }
816
817 #[test]
818 fn sort_iterator_sorts_rows() {
819 use crate::planner::typed_expr::{SortExpr, TypedExpr, TypedExprKind};
820
821 let rows = vec![
822 Row::new(
823 1,
824 vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
825 ),
826 Row::new(
827 2,
828 vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
829 ),
830 Row::new(3, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
831 ];
832 let schema = sample_schema();
833 let input = VecIterator::new(rows, schema);
834
835 let order_by = vec![SortExpr {
837 expr: TypedExpr {
838 kind: TypedExprKind::ColumnRef {
839 table: "test".into(),
840 column: "id".into(),
841 column_index: 0,
842 },
843 resolved_type: ResolvedType::Integer,
844 span: Span::default(),
845 },
846 asc: true,
847 nulls_first: false,
848 }];
849
850 let mut sort = SortIterator::new(input, &order_by).unwrap();
851
852 let mut results = Vec::new();
853 while let Some(Ok(row)) = sort.next_row() {
854 results.push(row);
855 }
856
857 assert_eq!(results.len(), 3);
858 assert_eq!(results[0].values[0], SqlValue::Integer(1));
859 assert_eq!(results[1].values[0], SqlValue::Integer(2));
860 assert_eq!(results[2].values[0], SqlValue::Integer(3));
861 }
862
863 #[test]
864 fn sort_iterator_sorts_descending() {
865 use crate::planner::typed_expr::{SortExpr, TypedExpr, TypedExprKind};
866
867 let rows = vec![
868 Row::new(
869 1,
870 vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
871 ),
872 Row::new(
873 2,
874 vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
875 ),
876 Row::new(3, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
877 ];
878 let schema = sample_schema();
879 let input = VecIterator::new(rows, schema);
880
881 let order_by = vec![SortExpr {
883 expr: TypedExpr {
884 kind: TypedExprKind::ColumnRef {
885 table: "test".into(),
886 column: "id".into(),
887 column_index: 0,
888 },
889 resolved_type: ResolvedType::Integer,
890 span: Span::default(),
891 },
892 asc: false,
893 nulls_first: false,
894 }];
895
896 let mut sort = SortIterator::new(input, &order_by).unwrap();
897
898 let mut results = Vec::new();
899 while let Some(Ok(row)) = sort.next_row() {
900 results.push(row);
901 }
902
903 assert_eq!(results.len(), 3);
904 assert_eq!(results[0].values[0], SqlValue::Integer(3));
905 assert_eq!(results[1].values[0], SqlValue::Integer(2));
906 assert_eq!(results[2].values[0], SqlValue::Integer(1));
907 }
908
909 #[test]
910 fn composed_pipeline_filter_then_limit() {
911 use crate::ast::expr::BinaryOp;
912 use crate::planner::typed_expr::{TypedExpr, TypedExprKind};
913
914 let rows = sample_rows();
915 let schema = sample_schema();
916 let input = VecIterator::new(rows, schema);
917
918 let predicate = TypedExpr {
920 kind: TypedExprKind::BinaryOp {
921 left: Box::new(TypedExpr {
922 kind: TypedExprKind::ColumnRef {
923 table: "test".into(),
924 column: "id".into(),
925 column_index: 0,
926 },
927 resolved_type: ResolvedType::Integer,
928 span: Span::default(),
929 }),
930 op: BinaryOp::Gt,
931 right: Box::new(TypedExpr::literal(
932 crate::ast::expr::Literal::Number("1".into()),
933 ResolvedType::Integer,
934 Span::default(),
935 )),
936 },
937 resolved_type: ResolvedType::Boolean,
938 span: Span::default(),
939 };
940
941 let filtered = FilterIterator::new(input, predicate);
942 let mut limited = LimitIterator::new(filtered, Some(2), None);
943
944 let mut results = Vec::new();
945 while let Some(Ok(row)) = limited.next_row() {
946 results.push(row);
947 }
948
949 assert_eq!(results.len(), 2);
951 assert_eq!(results[0].row_id, 2);
952 assert_eq!(results[1].row_id, 3);
953 }
954}