kyu_executor/operators/
limit.rs1use kyu_common::KyuResult;
4
5use crate::context::ExecutionContext;
6use crate::data_chunk::DataChunk;
7use crate::physical_plan::PhysicalOperator;
8
9pub struct LimitOp {
10 pub child: Box<PhysicalOperator>,
11 pub skip: u64,
12 pub limit: u64,
13 skipped: u64,
14 emitted: u64,
15}
16
17impl LimitOp {
18 pub fn new(child: PhysicalOperator, skip: u64, limit: u64) -> Self {
19 Self {
20 child: Box::new(child),
21 skip,
22 limit,
23 skipped: 0,
24 emitted: 0,
25 }
26 }
27
28 pub fn next(&mut self, ctx: &ExecutionContext<'_>) -> KyuResult<Option<DataChunk>> {
29 if self.emitted >= self.limit {
30 return Ok(None);
31 }
32
33 loop {
34 let chunk = match self.child.next(ctx)? {
35 Some(c) => c,
36 None => return Ok(None),
37 };
38
39 let n = chunk.num_rows();
40
41 let skip_remaining = (self.skip - self.skipped) as usize;
43 if skip_remaining >= n {
44 self.skipped += n as u64;
45 continue;
46 }
47
48 let start = skip_remaining;
49 self.skipped = self.skip;
50
51 let remaining = (self.limit - self.emitted) as usize;
52 let take = remaining.min(n - start);
53
54 if take == 0 {
55 return Ok(None);
56 }
57
58 let sel = chunk.selection();
60 let indices: Vec<u32> = (start..start + take)
61 .map(|i| sel.get(i) as u32)
62 .collect();
63 self.emitted += take as u64;
64
65 return Ok(Some(
66 chunk.with_selection(crate::value_vector::SelectionVector::from_indices(indices)),
67 ));
68 }
69 }
70}
71
72#[cfg(test)]
73mod tests {
74 use super::*;
75 use crate::context::MockStorage;
76 use kyu_types::TypedValue;
77
78 fn make_storage() -> MockStorage {
79 let mut storage = MockStorage::new();
80 storage.insert_table(
81 kyu_common::id::TableId(0),
82 vec![
83 vec![TypedValue::Int64(1)],
84 vec![TypedValue::Int64(2)],
85 vec![TypedValue::Int64(3)],
86 vec![TypedValue::Int64(4)],
87 vec![TypedValue::Int64(5)],
88 ],
89 );
90 storage
91 }
92
93 #[test]
94 fn limit_only() {
95 let storage = make_storage();
96 let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
97 let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
98 kyu_common::id::TableId(0),
99 ));
100 let mut limit = LimitOp::new(scan, 0, 3);
101 let chunk = limit.next(&ctx).unwrap().unwrap();
102 assert_eq!(chunk.num_rows(), 3);
103 assert!(limit.next(&ctx).unwrap().is_none());
104 }
105
106 #[test]
107 fn skip_and_limit() {
108 let storage = make_storage();
109 let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
110 let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
111 kyu_common::id::TableId(0),
112 ));
113 let mut limit = LimitOp::new(scan, 2, 2);
114 let chunk = limit.next(&ctx).unwrap().unwrap();
115 assert_eq!(chunk.num_rows(), 2);
116 assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(3));
117 assert_eq!(chunk.get_value(1, 0), TypedValue::Int64(4));
118 }
119
120 #[test]
121 fn skip_all() {
122 let storage = make_storage();
123 let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
124 let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
125 kyu_common::id::TableId(0),
126 ));
127 let mut limit = LimitOp::new(scan, 10, 5);
128 assert!(limit.next(&ctx).unwrap().is_none());
129 }
130}