1use std::cmp::Ordering;
18use std::marker::PhantomData;
19
20use crate::catalog::{ColumnMetadata, TableMetadata};
21use crate::executor::evaluator::EvalContext;
22use crate::executor::{ExecutorError, Result, Row};
23use crate::planner::typed_expr::{SortExpr, TypedExpr};
24use crate::storage::{SqlValue, TableScanIterator};
25
26pub trait RowIterator {
31 fn next_row(&mut self) -> Option<Result<Row>>;
38
39 fn schema(&self) -> &[ColumnMetadata];
41}
42
43impl RowIterator for Box<dyn RowIterator + '_> {
45 fn next_row(&mut self) -> Option<Result<Row>> {
46 (**self).next_row()
47 }
48
49 fn schema(&self) -> &[ColumnMetadata] {
50 (**self).schema()
51 }
52}
53
54pub struct ScanIterator<'a> {
63 inner: TableScanIterator<'a>,
64 schema: Vec<ColumnMetadata>,
65}
66
67impl<'a> ScanIterator<'a> {
68 pub fn new(inner: TableScanIterator<'a>, table_meta: &TableMetadata) -> Self {
70 Self {
71 inner,
72 schema: table_meta.columns.clone(),
73 }
74 }
75}
76
77impl RowIterator for ScanIterator<'_> {
78 fn next_row(&mut self) -> Option<Result<Row>> {
79 self.inner.next().map(|result| {
80 result
81 .map(|(row_id, values)| Row::new(row_id, values))
82 .map_err(ExecutorError::from)
83 })
84 }
85
86 fn schema(&self) -> &[ColumnMetadata] {
87 &self.schema
88 }
89}
90
91pub struct FilterIterator<I: RowIterator> {
100 input: I,
101 predicate: TypedExpr,
102}
103
104impl<I: RowIterator> FilterIterator<I> {
105 pub fn new(input: I, predicate: TypedExpr) -> Self {
107 Self { input, predicate }
108 }
109}
110
111impl<I: RowIterator> RowIterator for FilterIterator<I> {
112 fn next_row(&mut self) -> Option<Result<Row>> {
113 loop {
114 match self.input.next_row()? {
115 Ok(row) => {
116 let ctx = EvalContext::new(&row.values);
117 match crate::executor::evaluator::evaluate(&self.predicate, &ctx) {
118 Ok(SqlValue::Boolean(true)) => return Some(Ok(row)),
119 Ok(_) => continue, Err(e) => return Some(Err(e)),
121 }
122 }
123 Err(e) => return Some(Err(e)),
124 }
125 }
126 }
127
128 fn schema(&self) -> &[ColumnMetadata] {
129 self.input.schema()
130 }
131}
132
133pub struct SortIterator<I: RowIterator> {
143 sorted_rows: std::vec::IntoIter<Row>,
145 schema: Vec<ColumnMetadata>,
147 _marker: PhantomData<I>,
149}
150
151impl<I: RowIterator> SortIterator<I> {
152 pub fn new(mut input: I, order_by: &[SortExpr]) -> Result<Self> {
160 let schema = input.schema().to_vec();
161
162 let mut rows = Vec::new();
164 while let Some(result) = input.next_row() {
165 rows.push(result?);
166 }
167
168 if order_by.is_empty() {
169 return Ok(Self {
170 sorted_rows: rows.into_iter(),
171 schema,
172 _marker: PhantomData,
173 });
174 }
175
176 let mut keyed: Vec<(Row, Vec<SqlValue>)> = Vec::with_capacity(rows.len());
178 for row in rows {
179 let mut keys = Vec::with_capacity(order_by.len());
180 for expr in order_by {
181 let ctx = EvalContext::new(&row.values);
182 keys.push(crate::executor::evaluator::evaluate(&expr.expr, &ctx)?);
183 }
184 keyed.push((row, keys));
185 }
186
187 keyed.sort_by(|a, b| compare_keys(a, b, order_by));
189
190 let sorted: Vec<Row> = keyed.into_iter().map(|(row, _)| row).collect();
191
192 Ok(Self {
193 sorted_rows: sorted.into_iter(),
194 schema,
195 _marker: PhantomData,
196 })
197 }
198}
199
200impl<I: RowIterator> RowIterator for SortIterator<I> {
201 fn next_row(&mut self) -> Option<Result<Row>> {
202 self.sorted_rows.next().map(Ok)
203 }
204
205 fn schema(&self) -> &[ColumnMetadata] {
206 &self.schema
207 }
208}
209
210fn compare_keys(
212 a: &(Row, Vec<SqlValue>),
213 b: &(Row, Vec<SqlValue>),
214 order_by: &[SortExpr],
215) -> Ordering {
216 for (i, sort_expr) in order_by.iter().enumerate() {
217 let left = &a.1[i];
218 let right = &b.1[i];
219 let cmp = compare_single(left, right, sort_expr.asc, sort_expr.nulls_first);
220 if cmp != Ordering::Equal {
221 return cmp;
222 }
223 }
224 Ordering::Equal
225}
226
227fn compare_single(left: &SqlValue, right: &SqlValue, asc: bool, nulls_first: bool) -> Ordering {
229 match (left, right) {
230 (SqlValue::Null, SqlValue::Null) => Ordering::Equal,
231 (SqlValue::Null, _) => {
232 if nulls_first {
233 Ordering::Less
234 } else {
235 Ordering::Greater
236 }
237 }
238 (_, SqlValue::Null) => {
239 if nulls_first {
240 Ordering::Greater
241 } else {
242 Ordering::Less
243 }
244 }
245 _ => match left.partial_cmp(right).unwrap_or(Ordering::Equal) {
246 Ordering::Equal => Ordering::Equal,
247 ord if asc => ord,
248 ord => ord.reverse(),
249 },
250 }
251}
252
253pub struct LimitIterator<I: RowIterator> {
263 input: I,
264 limit: Option<u64>,
265 offset: u64,
266 skipped: u64,
268 yielded: u64,
270}
271
272impl<I: RowIterator> LimitIterator<I> {
273 pub fn new(input: I, limit: Option<u64>, offset: Option<u64>) -> Self {
275 Self {
276 input,
277 limit,
278 offset: offset.unwrap_or(0),
279 skipped: 0,
280 yielded: 0,
281 }
282 }
283}
284
285impl<I: RowIterator> RowIterator for LimitIterator<I> {
286 fn next_row(&mut self) -> Option<Result<Row>> {
287 if let Some(limit) = self.limit
289 && self.yielded >= limit
290 {
291 return None;
292 }
293
294 loop {
295 match self.input.next_row()? {
296 Ok(row) => {
297 if self.skipped < self.offset {
299 self.skipped += 1;
300 continue;
301 }
302
303 if let Some(limit) = self.limit
305 && self.yielded >= limit
306 {
307 return None;
308 }
309
310 self.yielded += 1;
311 return Some(Ok(row));
312 }
313 Err(e) => return Some(Err(e)),
314 }
315 }
316 }
317
318 fn schema(&self) -> &[ColumnMetadata] {
319 self.input.schema()
320 }
321}
322
323pub struct VecIterator {
332 rows: std::vec::IntoIter<Row>,
333 schema: Vec<ColumnMetadata>,
334}
335
336impl VecIterator {
337 pub fn new(rows: Vec<Row>, schema: Vec<ColumnMetadata>) -> Self {
339 Self {
340 rows: rows.into_iter(),
341 schema,
342 }
343 }
344}
345
346impl RowIterator for VecIterator {
347 fn next_row(&mut self) -> Option<Result<Row>> {
348 self.rows.next().map(Ok)
349 }
350
351 fn schema(&self) -> &[ColumnMetadata] {
352 &self.schema
353 }
354}
355
356#[cfg(test)]
361mod tests {
362 use super::*;
363 use crate::Span;
364 use crate::planner::types::ResolvedType;
365
366 fn sample_schema() -> Vec<ColumnMetadata> {
367 vec![
368 ColumnMetadata::new("id", ResolvedType::Integer),
369 ColumnMetadata::new("name", ResolvedType::Text),
370 ]
371 }
372
373 fn sample_rows() -> Vec<Row> {
374 vec![
375 Row::new(
376 1,
377 vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
378 ),
379 Row::new(2, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
380 Row::new(
381 3,
382 vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
383 ),
384 Row::new(4, vec![SqlValue::Integer(4), SqlValue::Text("dave".into())]),
385 Row::new(5, vec![SqlValue::Integer(5), SqlValue::Text("eve".into())]),
386 ]
387 }
388
389 #[test]
390 fn vec_iterator_returns_all_rows() {
391 let rows = sample_rows();
392 let expected_len = rows.len();
393 let mut iter = VecIterator::new(rows, sample_schema());
394
395 let mut count = 0;
396 while let Some(Ok(_)) = iter.next_row() {
397 count += 1;
398 }
399 assert_eq!(count, expected_len);
400 }
401
402 #[test]
403 fn filter_iterator_filters_rows() {
404 use crate::ast::expr::BinaryOp;
405 use crate::planner::typed_expr::{TypedExpr, TypedExprKind};
406
407 let rows = sample_rows();
408 let schema = sample_schema();
409 let input = VecIterator::new(rows, schema);
410
411 let predicate = TypedExpr {
413 kind: TypedExprKind::BinaryOp {
414 left: Box::new(TypedExpr {
415 kind: TypedExprKind::ColumnRef {
416 table: "test".into(),
417 column: "id".into(),
418 column_index: 0,
419 },
420 resolved_type: ResolvedType::Integer,
421 span: Span::default(),
422 }),
423 op: BinaryOp::Gt,
424 right: Box::new(TypedExpr::literal(
425 crate::ast::expr::Literal::Number("2".into()),
426 ResolvedType::Integer,
427 Span::default(),
428 )),
429 },
430 resolved_type: ResolvedType::Boolean,
431 span: Span::default(),
432 };
433
434 let mut filter = FilterIterator::new(input, predicate);
435
436 let mut results = Vec::new();
437 while let Some(Ok(row)) = filter.next_row() {
438 results.push(row);
439 }
440
441 assert_eq!(results.len(), 3);
442 assert_eq!(results[0].row_id, 3);
443 assert_eq!(results[1].row_id, 4);
444 assert_eq!(results[2].row_id, 5);
445 }
446
447 #[test]
448 fn limit_iterator_limits_rows() {
449 let rows = sample_rows();
450 let schema = sample_schema();
451 let input = VecIterator::new(rows, schema);
452
453 let mut limit = LimitIterator::new(input, Some(2), None);
454
455 let mut results = Vec::new();
456 while let Some(Ok(row)) = limit.next_row() {
457 results.push(row);
458 }
459
460 assert_eq!(results.len(), 2);
461 assert_eq!(results[0].row_id, 1);
462 assert_eq!(results[1].row_id, 2);
463 }
464
465 #[test]
466 fn limit_iterator_applies_offset() {
467 let rows = sample_rows();
468 let schema = sample_schema();
469 let input = VecIterator::new(rows, schema);
470
471 let mut limit = LimitIterator::new(input, Some(2), Some(2));
472
473 let mut results = Vec::new();
474 while let Some(Ok(row)) = limit.next_row() {
475 results.push(row);
476 }
477
478 assert_eq!(results.len(), 2);
479 assert_eq!(results[0].row_id, 3);
480 assert_eq!(results[1].row_id, 4);
481 }
482
483 #[test]
484 fn limit_iterator_offset_only() {
485 let rows = sample_rows();
486 let schema = sample_schema();
487 let input = VecIterator::new(rows, schema);
488
489 let mut limit = LimitIterator::new(input, None, Some(3));
490
491 let mut results = Vec::new();
492 while let Some(Ok(row)) = limit.next_row() {
493 results.push(row);
494 }
495
496 assert_eq!(results.len(), 2);
497 assert_eq!(results[0].row_id, 4);
498 assert_eq!(results[1].row_id, 5);
499 }
500
501 #[test]
502 fn sort_iterator_sorts_rows() {
503 use crate::planner::typed_expr::{SortExpr, TypedExpr, TypedExprKind};
504
505 let rows = vec![
506 Row::new(
507 1,
508 vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
509 ),
510 Row::new(
511 2,
512 vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
513 ),
514 Row::new(3, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
515 ];
516 let schema = sample_schema();
517 let input = VecIterator::new(rows, schema);
518
519 let order_by = vec![SortExpr {
521 expr: TypedExpr {
522 kind: TypedExprKind::ColumnRef {
523 table: "test".into(),
524 column: "id".into(),
525 column_index: 0,
526 },
527 resolved_type: ResolvedType::Integer,
528 span: Span::default(),
529 },
530 asc: true,
531 nulls_first: false,
532 }];
533
534 let mut sort = SortIterator::new(input, &order_by).unwrap();
535
536 let mut results = Vec::new();
537 while let Some(Ok(row)) = sort.next_row() {
538 results.push(row);
539 }
540
541 assert_eq!(results.len(), 3);
542 assert_eq!(results[0].values[0], SqlValue::Integer(1));
543 assert_eq!(results[1].values[0], SqlValue::Integer(2));
544 assert_eq!(results[2].values[0], SqlValue::Integer(3));
545 }
546
547 #[test]
548 fn sort_iterator_sorts_descending() {
549 use crate::planner::typed_expr::{SortExpr, TypedExpr, TypedExprKind};
550
551 let rows = vec![
552 Row::new(
553 1,
554 vec![SqlValue::Integer(1), SqlValue::Text("alice".into())],
555 ),
556 Row::new(
557 2,
558 vec![SqlValue::Integer(3), SqlValue::Text("carol".into())],
559 ),
560 Row::new(3, vec![SqlValue::Integer(2), SqlValue::Text("bob".into())]),
561 ];
562 let schema = sample_schema();
563 let input = VecIterator::new(rows, schema);
564
565 let order_by = vec![SortExpr {
567 expr: TypedExpr {
568 kind: TypedExprKind::ColumnRef {
569 table: "test".into(),
570 column: "id".into(),
571 column_index: 0,
572 },
573 resolved_type: ResolvedType::Integer,
574 span: Span::default(),
575 },
576 asc: false,
577 nulls_first: false,
578 }];
579
580 let mut sort = SortIterator::new(input, &order_by).unwrap();
581
582 let mut results = Vec::new();
583 while let Some(Ok(row)) = sort.next_row() {
584 results.push(row);
585 }
586
587 assert_eq!(results.len(), 3);
588 assert_eq!(results[0].values[0], SqlValue::Integer(3));
589 assert_eq!(results[1].values[0], SqlValue::Integer(2));
590 assert_eq!(results[2].values[0], SqlValue::Integer(1));
591 }
592
593 #[test]
594 fn composed_pipeline_filter_then_limit() {
595 use crate::ast::expr::BinaryOp;
596 use crate::planner::typed_expr::{TypedExpr, TypedExprKind};
597
598 let rows = sample_rows();
599 let schema = sample_schema();
600 let input = VecIterator::new(rows, schema);
601
602 let predicate = TypedExpr {
604 kind: TypedExprKind::BinaryOp {
605 left: Box::new(TypedExpr {
606 kind: TypedExprKind::ColumnRef {
607 table: "test".into(),
608 column: "id".into(),
609 column_index: 0,
610 },
611 resolved_type: ResolvedType::Integer,
612 span: Span::default(),
613 }),
614 op: BinaryOp::Gt,
615 right: Box::new(TypedExpr::literal(
616 crate::ast::expr::Literal::Number("1".into()),
617 ResolvedType::Integer,
618 Span::default(),
619 )),
620 },
621 resolved_type: ResolvedType::Boolean,
622 span: Span::default(),
623 };
624
625 let filtered = FilterIterator::new(input, predicate);
626 let mut limited = LimitIterator::new(filtered, Some(2), None);
627
628 let mut results = Vec::new();
629 while let Some(Ok(row)) = limited.next_row() {
630 results.push(row);
631 }
632
633 assert_eq!(results.len(), 2);
635 assert_eq!(results[0].row_id, 2);
636 assert_eq!(results[1].row_id, 3);
637 }
638}