1use std::cmp::Ordering;
18use std::marker::PhantomData;
19
20use crate::catalog::{ColumnMetadata, TableMetadata};
21use crate::executor::evaluator::EvalContext;
22use crate::executor::{ExecutorError, Result, Row};
23use crate::planner::typed_expr::{SortExpr, TypedExpr};
24use crate::storage::{SqlValue, TableScanIterator};
25
26pub trait RowIterator {
31 fn next_row(&mut self) -> Option<Result<Row>>;
38
39 fn schema(&self) -> &[ColumnMetadata];
41}
42
43impl RowIterator for Box<dyn RowIterator + '_> {
45 fn next_row(&mut self) -> Option<Result<Row>> {
46 (**self).next_row()
47 }
48
49 fn schema(&self) -> &[ColumnMetadata] {
50 (**self).schema()
51 }
52}
53
54#[allow(dead_code)]
67pub struct ScanIterator<'a> {
68 inner: TableScanIterator<'a>,
69 schema: Vec<ColumnMetadata>,
70}
71
72#[allow(dead_code)]
73impl<'a> ScanIterator<'a> {
74 pub fn new(inner: TableScanIterator<'a>, table_meta: &TableMetadata) -> Self {
76 Self {
77 inner,
78 schema: table_meta.columns.clone(),
79 }
80 }
81}
82
83impl RowIterator for ScanIterator<'_> {
84 fn next_row(&mut self) -> Option<Result<Row>> {
85 self.inner.next().map(|result| {
86 result
87 .map(|(row_id, values)| Row::new(row_id, values))
88 .map_err(ExecutorError::from)
89 })
90 }
91
92 fn schema(&self) -> &[ColumnMetadata] {
93 &self.schema
94 }
95}
96
97pub struct FilterIterator<I: RowIterator> {
106 input: I,
107 predicate: TypedExpr,
108}
109
110impl<I: RowIterator> FilterIterator<I> {
111 pub fn new(input: I, predicate: TypedExpr) -> Self {
113 Self { input, predicate }
114 }
115}
116
117impl<I: RowIterator> RowIterator for FilterIterator<I> {
118 fn next_row(&mut self) -> Option<Result<Row>> {
119 loop {
120 match self.input.next_row()? {
121 Ok(row) => {
122 let ctx = EvalContext::new(&row.values);
123 match crate::executor::evaluator::evaluate(&self.predicate, &ctx) {
124 Ok(SqlValue::Boolean(true)) => return Some(Ok(row)),
125 Ok(_) => continue, Err(e) => return Some(Err(e)),
127 }
128 }
129 Err(e) => return Some(Err(e)),
130 }
131 }
132 }
133
134 fn schema(&self) -> &[ColumnMetadata] {
135 self.input.schema()
136 }
137}
138
139pub struct SortIterator<I: RowIterator> {
149 sorted_rows: std::vec::IntoIter<Row>,
151 schema: Vec<ColumnMetadata>,
153 _marker: PhantomData<I>,
155}
156
157impl<I: RowIterator> SortIterator<I> {
158 pub fn new(mut input: I, order_by: &[SortExpr]) -> Result<Self> {
166 let schema = input.schema().to_vec();
167
168 let mut rows = Vec::new();
170 while let Some(result) = input.next_row() {
171 rows.push(result?);
172 }
173
174 if order_by.is_empty() {
175 return Ok(Self {
176 sorted_rows: rows.into_iter(),
177 schema,
178 _marker: PhantomData,
179 });
180 }
181
182 let mut keyed: Vec<(Row, Vec<SqlValue>)> = Vec::with_capacity(rows.len());
184 for row in rows {
185 let mut keys = Vec::with_capacity(order_by.len());
186 for expr in order_by {
187 let ctx = EvalContext::new(&row.values);
188 keys.push(crate::executor::evaluator::evaluate(&expr.expr, &ctx)?);
189 }
190 keyed.push((row, keys));
191 }
192
193 keyed.sort_by(|a, b| compare_keys(a, b, order_by));
195
196 let sorted: Vec<Row> = keyed.into_iter().map(|(row, _)| row).collect();
197
198 Ok(Self {
199 sorted_rows: sorted.into_iter(),
200 schema,
201 _marker: PhantomData,
202 })
203 }
204}
205
206impl<I: RowIterator> RowIterator for SortIterator<I> {
207 fn next_row(&mut self) -> Option<Result<Row>> {
208 self.sorted_rows.next().map(Ok)
209 }
210
211 fn schema(&self) -> &[ColumnMetadata] {
212 &self.schema
213 }
214}
215
216fn compare_keys(
218 a: &(Row, Vec<SqlValue>),
219 b: &(Row, Vec<SqlValue>),
220 order_by: &[SortExpr],
221) -> Ordering {
222 for (i, sort_expr) in order_by.iter().enumerate() {
223 let left = &a.1[i];
224 let right = &b.1[i];
225 let cmp = compare_single(left, right, sort_expr.asc, sort_expr.nulls_first);
226 if cmp != Ordering::Equal {
227 return cmp;
228 }
229 }
230 Ordering::Equal
231}
232
233fn compare_single(left: &SqlValue, right: &SqlValue, asc: bool, nulls_first: bool) -> Ordering {
235 match (left, right) {
236 (SqlValue::Null, SqlValue::Null) => Ordering::Equal,
237 (SqlValue::Null, _) => {
238 if nulls_first {
239 Ordering::Less
240 } else {
241 Ordering::Greater
242 }
243 }
244 (_, SqlValue::Null) => {
245 if nulls_first {
246 Ordering::Greater
247 } else {
248 Ordering::Less
249 }
250 }
251 _ => match left.partial_cmp(right).unwrap_or(Ordering::Equal) {
252 Ordering::Equal => Ordering::Equal,
253 ord if asc => ord,
254 ord => ord.reverse(),
255 },
256 }
257}
258
259pub struct LimitIterator<I: RowIterator> {
269 input: I,
270 limit: Option<u64>,
271 offset: u64,
272 skipped: u64,
274 yielded: u64,
276}
277
278impl<I: RowIterator> LimitIterator<I> {
279 pub fn new(input: I, limit: Option<u64>, offset: Option<u64>) -> Self {
281 Self {
282 input,
283 limit,
284 offset: offset.unwrap_or(0),
285 skipped: 0,
286 yielded: 0,
287 }
288 }
289}
290
291impl<I: RowIterator> RowIterator for LimitIterator<I> {
292 fn next_row(&mut self) -> Option<Result<Row>> {
293 if let Some(limit) = self.limit
295 && self.yielded >= limit
296 {
297 return None;
298 }
299
300 loop {
301 match self.input.next_row()? {
302 Ok(row) => {
303 if self.skipped < self.offset {
305 self.skipped += 1;
306 continue;
307 }
308
309 if let Some(limit) = self.limit
311 && self.yielded >= limit
312 {
313 return None;
314 }
315
316 self.yielded += 1;
317 return Some(Ok(row));
318 }
319 Err(e) => return Some(Err(e)),
320 }
321 }
322 }
323
324 fn schema(&self) -> &[ColumnMetadata] {
325 self.input.schema()
326 }
327}
328
329pub struct VecIterator {
338 rows: std::vec::IntoIter<Row>,
339 schema: Vec<ColumnMetadata>,
340}
341
342impl VecIterator {
343 pub fn new(rows: Vec<Row>, schema: Vec<ColumnMetadata>) -> Self {
345 Self {
346 rows: rows.into_iter(),
347 schema,
348 }
349 }
350}
351
352impl RowIterator for VecIterator {
353 fn next_row(&mut self) -> Option<Result<Row>> {
354 self.rows.next().map(Ok)
355 }
356
357 fn schema(&self) -> &[ColumnMetadata] {
358 &self.schema
359 }
360}
361
362#[cfg(test)]
367mod tests {
368 use super::*;
369 use crate::Span;
370 use crate::planner::types::ResolvedType;
371
372 fn sample_schema() -> Vec<ColumnMetadata> {
373 vec![
374 ColumnMetadata::new("id", ResolvedType::Integer),
375 ColumnMetadata::new("name", ResolvedType::Text),
376 ]
377 }
378
379 fn sample_rows() -> Vec<Row> {
380 vec![
381 Row::new(
382 1,
383 vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
384 ),
385 Row::new(2, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
386 Row::new(
387 3,
388 vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
389 ),
390 Row::new(4, vec![SqlValue::Integer(4), SqlValue::Text("dave".into())]),
391 Row::new(5, vec![SqlValue::Integer(5), SqlValue::Text("eve".into())]),
392 ]
393 }
394
395 #[test]
396 fn vec_iterator_returns_all_rows() {
397 let rows = sample_rows();
398 let expected_len = rows.len();
399 let mut iter = VecIterator::new(rows, sample_schema());
400
401 let mut count = 0;
402 while let Some(Ok(_)) = iter.next_row() {
403 count += 1;
404 }
405 assert_eq!(count, expected_len);
406 }
407
408 #[test]
409 fn filter_iterator_filters_rows() {
410 use crate::ast::expr::BinaryOp;
411 use crate::planner::typed_expr::{TypedExpr, TypedExprKind};
412
413 let rows = sample_rows();
414 let schema = sample_schema();
415 let input = VecIterator::new(rows, schema);
416
417 let predicate = TypedExpr {
419 kind: TypedExprKind::BinaryOp {
420 left: Box::new(TypedExpr {
421 kind: TypedExprKind::ColumnRef {
422 table: "test".into(),
423 column: "id".into(),
424 column_index: 0,
425 },
426 resolved_type: ResolvedType::Integer,
427 span: Span::default(),
428 }),
429 op: BinaryOp::Gt,
430 right: Box::new(TypedExpr::literal(
431 crate::ast::expr::Literal::Number("2".into()),
432 ResolvedType::Integer,
433 Span::default(),
434 )),
435 },
436 resolved_type: ResolvedType::Boolean,
437 span: Span::default(),
438 };
439
440 let mut filter = FilterIterator::new(input, predicate);
441
442 let mut results = Vec::new();
443 while let Some(Ok(row)) = filter.next_row() {
444 results.push(row);
445 }
446
447 assert_eq!(results.len(), 3);
448 assert_eq!(results[0].row_id, 3);
449 assert_eq!(results[1].row_id, 4);
450 assert_eq!(results[2].row_id, 5);
451 }
452
453 #[test]
454 fn limit_iterator_limits_rows() {
455 let rows = sample_rows();
456 let schema = sample_schema();
457 let input = VecIterator::new(rows, schema);
458
459 let mut limit = LimitIterator::new(input, Some(2), None);
460
461 let mut results = Vec::new();
462 while let Some(Ok(row)) = limit.next_row() {
463 results.push(row);
464 }
465
466 assert_eq!(results.len(), 2);
467 assert_eq!(results[0].row_id, 1);
468 assert_eq!(results[1].row_id, 2);
469 }
470
471 #[test]
472 fn limit_iterator_applies_offset() {
473 let rows = sample_rows();
474 let schema = sample_schema();
475 let input = VecIterator::new(rows, schema);
476
477 let mut limit = LimitIterator::new(input, Some(2), Some(2));
478
479 let mut results = Vec::new();
480 while let Some(Ok(row)) = limit.next_row() {
481 results.push(row);
482 }
483
484 assert_eq!(results.len(), 2);
485 assert_eq!(results[0].row_id, 3);
486 assert_eq!(results[1].row_id, 4);
487 }
488
489 #[test]
490 fn limit_iterator_offset_only() {
491 let rows = sample_rows();
492 let schema = sample_schema();
493 let input = VecIterator::new(rows, schema);
494
495 let mut limit = LimitIterator::new(input, None, Some(3));
496
497 let mut results = Vec::new();
498 while let Some(Ok(row)) = limit.next_row() {
499 results.push(row);
500 }
501
502 assert_eq!(results.len(), 2);
503 assert_eq!(results[0].row_id, 4);
504 assert_eq!(results[1].row_id, 5);
505 }
506
507 #[test]
508 fn sort_iterator_sorts_rows() {
509 use crate::planner::typed_expr::{SortExpr, TypedExpr, TypedExprKind};
510
511 let rows = vec![
512 Row::new(
513 1,
514 vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
515 ),
516 Row::new(
517 2,
518 vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
519 ),
520 Row::new(3, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
521 ];
522 let schema = sample_schema();
523 let input = VecIterator::new(rows, schema);
524
525 let order_by = vec![SortExpr {
527 expr: TypedExpr {
528 kind: TypedExprKind::ColumnRef {
529 table: "test".into(),
530 column: "id".into(),
531 column_index: 0,
532 },
533 resolved_type: ResolvedType::Integer,
534 span: Span::default(),
535 },
536 asc: true,
537 nulls_first: false,
538 }];
539
540 let mut sort = SortIterator::new(input, &order_by).unwrap();
541
542 let mut results = Vec::new();
543 while let Some(Ok(row)) = sort.next_row() {
544 results.push(row);
545 }
546
547 assert_eq!(results.len(), 3);
548 assert_eq!(results[0].values[0], SqlValue::Integer(1));
549 assert_eq!(results[1].values[0], SqlValue::Integer(2));
550 assert_eq!(results[2].values[0], SqlValue::Integer(3));
551 }
552
553 #[test]
554 fn sort_iterator_sorts_descending() {
555 use crate::planner::typed_expr::{SortExpr, TypedExpr, TypedExprKind};
556
557 let rows = vec![
558 Row::new(
559 1,
560 vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
561 ),
562 Row::new(
563 2,
564 vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
565 ),
566 Row::new(3, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
567 ];
568 let schema = sample_schema();
569 let input = VecIterator::new(rows, schema);
570
571 let order_by = vec![SortExpr {
573 expr: TypedExpr {
574 kind: TypedExprKind::ColumnRef {
575 table: "test".into(),
576 column: "id".into(),
577 column_index: 0,
578 },
579 resolved_type: ResolvedType::Integer,
580 span: Span::default(),
581 },
582 asc: false,
583 nulls_first: false,
584 }];
585
586 let mut sort = SortIterator::new(input, &order_by).unwrap();
587
588 let mut results = Vec::new();
589 while let Some(Ok(row)) = sort.next_row() {
590 results.push(row);
591 }
592
593 assert_eq!(results.len(), 3);
594 assert_eq!(results[0].values[0], SqlValue::Integer(3));
595 assert_eq!(results[1].values[0], SqlValue::Integer(2));
596 assert_eq!(results[2].values[0], SqlValue::Integer(1));
597 }
598
599 #[test]
600 fn composed_pipeline_filter_then_limit() {
601 use crate::ast::expr::BinaryOp;
602 use crate::planner::typed_expr::{TypedExpr, TypedExprKind};
603
604 let rows = sample_rows();
605 let schema = sample_schema();
606 let input = VecIterator::new(rows, schema);
607
608 let predicate = TypedExpr {
610 kind: TypedExprKind::BinaryOp {
611 left: Box::new(TypedExpr {
612 kind: TypedExprKind::ColumnRef {
613 table: "test".into(),
614 column: "id".into(),
615 column_index: 0,
616 },
617 resolved_type: ResolvedType::Integer,
618 span: Span::default(),
619 }),
620 op: BinaryOp::Gt,
621 right: Box::new(TypedExpr::literal(
622 crate::ast::expr::Literal::Number("1".into()),
623 ResolvedType::Integer,
624 Span::default(),
625 )),
626 },
627 resolved_type: ResolvedType::Boolean,
628 span: Span::default(),
629 };
630
631 let filtered = FilterIterator::new(input, predicate);
632 let mut limited = LimitIterator::new(filtered, Some(2), None);
633
634 let mut results = Vec::new();
635 while let Some(Ok(row)) = limited.next_row() {
636 results.push(row);
637 }
638
639 assert_eq!(results.len(), 2);
641 assert_eq!(results[0].row_id, 2);
642 assert_eq!(results[1].row_id, 3);
643 }
644}