kyu_executor/operators/
limit.rs1use kyu_common::KyuResult;
4
5use crate::context::ExecutionContext;
6use crate::data_chunk::DataChunk;
7use crate::physical_plan::PhysicalOperator;
8
9pub struct LimitOp {
10 pub child: Box<PhysicalOperator>,
11 pub skip: u64,
12 pub limit: u64,
13 skipped: u64,
14 emitted: u64,
15}
16
17impl LimitOp {
18 pub fn new(child: PhysicalOperator, skip: u64, limit: u64) -> Self {
19 Self {
20 child: Box::new(child),
21 skip,
22 limit,
23 skipped: 0,
24 emitted: 0,
25 }
26 }
27
28 pub fn next(&mut self, ctx: &ExecutionContext<'_>) -> KyuResult<Option<DataChunk>> {
29 if self.emitted >= self.limit {
30 return Ok(None);
31 }
32
33 loop {
34 let chunk = match self.child.next(ctx)? {
35 Some(c) => c,
36 None => return Ok(None),
37 };
38
39 let n = chunk.num_rows();
40
41 let skip_remaining = (self.skip - self.skipped) as usize;
43 if skip_remaining >= n {
44 self.skipped += n as u64;
45 continue;
46 }
47
48 let start = skip_remaining;
49 self.skipped = self.skip;
50
51 let remaining = (self.limit - self.emitted) as usize;
52 let take = remaining.min(n - start);
53
54 if take == 0 {
55 return Ok(None);
56 }
57
58 let sel = chunk.selection();
60 let indices: Vec<u32> = (start..start + take).map(|i| sel.get(i) as u32).collect();
61 self.emitted += take as u64;
62
63 return Ok(Some(chunk.with_selection(
64 crate::value_vector::SelectionVector::from_indices(indices),
65 )));
66 }
67 }
68}
69
70#[cfg(test)]
71mod tests {
72 use super::*;
73 use crate::context::MockStorage;
74 use kyu_types::TypedValue;
75
76 fn make_storage() -> MockStorage {
77 let mut storage = MockStorage::new();
78 storage.insert_table(
79 kyu_common::id::TableId(0),
80 vec![
81 vec![TypedValue::Int64(1)],
82 vec![TypedValue::Int64(2)],
83 vec![TypedValue::Int64(3)],
84 vec![TypedValue::Int64(4)],
85 vec![TypedValue::Int64(5)],
86 ],
87 );
88 storage
89 }
90
91 #[test]
92 fn limit_only() {
93 let storage = make_storage();
94 let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
95 let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
96 kyu_common::id::TableId(0),
97 ));
98 let mut limit = LimitOp::new(scan, 0, 3);
99 let chunk = limit.next(&ctx).unwrap().unwrap();
100 assert_eq!(chunk.num_rows(), 3);
101 assert!(limit.next(&ctx).unwrap().is_none());
102 }
103
104 #[test]
105 fn skip_and_limit() {
106 let storage = make_storage();
107 let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
108 let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
109 kyu_common::id::TableId(0),
110 ));
111 let mut limit = LimitOp::new(scan, 2, 2);
112 let chunk = limit.next(&ctx).unwrap().unwrap();
113 assert_eq!(chunk.num_rows(), 2);
114 assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(3));
115 assert_eq!(chunk.get_value(1, 0), TypedValue::Int64(4));
116 }
117
118 #[test]
119 fn skip_all() {
120 let storage = make_storage();
121 let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
122 let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
123 kyu_common::id::TableId(0),
124 ));
125 let mut limit = LimitOp::new(scan, 10, 5);
126 assert!(limit.next(&ctx).unwrap().is_none());
127 }
128}