Skip to main content

kyu_executor/operators/
limit.rs

1//! Limit operator — SKIP + LIMIT row counting.
2
3use kyu_common::KyuResult;
4
5use crate::context::ExecutionContext;
6use crate::data_chunk::DataChunk;
7use crate::physical_plan::PhysicalOperator;
8
9pub struct LimitOp {
10    pub child: Box<PhysicalOperator>,
11    pub skip: u64,
12    pub limit: u64,
13    skipped: u64,
14    emitted: u64,
15}
16
17impl LimitOp {
18    pub fn new(child: PhysicalOperator, skip: u64, limit: u64) -> Self {
19        Self {
20            child: Box::new(child),
21            skip,
22            limit,
23            skipped: 0,
24            emitted: 0,
25        }
26    }
27
28    pub fn next(&mut self, ctx: &ExecutionContext<'_>) -> KyuResult<Option<DataChunk>> {
29        if self.emitted >= self.limit {
30            return Ok(None);
31        }
32
33        loop {
34            let chunk = match self.child.next(ctx)? {
35                Some(c) => c,
36                None => return Ok(None),
37            };
38
39            let n = chunk.num_rows();
40
41            // Fast skip: entire chunk can be skipped
42            let skip_remaining = (self.skip - self.skipped) as usize;
43            if skip_remaining >= n {
44                self.skipped += n as u64;
45                continue;
46            }
47
48            let start = skip_remaining;
49            self.skipped = self.skip;
50
51            let remaining = (self.limit - self.emitted) as usize;
52            let take = remaining.min(n - start);
53
54            if take == 0 {
55                return Ok(None);
56            }
57
58            // Build selection vector for the slice [start..start+take]
59            let sel = chunk.selection();
60            let indices: Vec<u32> = (start..start + take).map(|i| sel.get(i) as u32).collect();
61            self.emitted += take as u64;
62
63            return Ok(Some(chunk.with_selection(
64                crate::value_vector::SelectionVector::from_indices(indices),
65            )));
66        }
67    }
68}
69
70#[cfg(test)]
71mod tests {
72    use super::*;
73    use crate::context::MockStorage;
74    use kyu_types::TypedValue;
75
76    fn make_storage() -> MockStorage {
77        let mut storage = MockStorage::new();
78        storage.insert_table(
79            kyu_common::id::TableId(0),
80            vec![
81                vec![TypedValue::Int64(1)],
82                vec![TypedValue::Int64(2)],
83                vec![TypedValue::Int64(3)],
84                vec![TypedValue::Int64(4)],
85                vec![TypedValue::Int64(5)],
86            ],
87        );
88        storage
89    }
90
91    #[test]
92    fn limit_only() {
93        let storage = make_storage();
94        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
95        let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
96            kyu_common::id::TableId(0),
97        ));
98        let mut limit = LimitOp::new(scan, 0, 3);
99        let chunk = limit.next(&ctx).unwrap().unwrap();
100        assert_eq!(chunk.num_rows(), 3);
101        assert!(limit.next(&ctx).unwrap().is_none());
102    }
103
104    #[test]
105    fn skip_and_limit() {
106        let storage = make_storage();
107        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
108        let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
109            kyu_common::id::TableId(0),
110        ));
111        let mut limit = LimitOp::new(scan, 2, 2);
112        let chunk = limit.next(&ctx).unwrap().unwrap();
113        assert_eq!(chunk.num_rows(), 2);
114        assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(3));
115        assert_eq!(chunk.get_value(1, 0), TypedValue::Int64(4));
116    }
117
118    #[test]
119    fn skip_all() {
120        let storage = make_storage();
121        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
122        let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
123            kyu_common::id::TableId(0),
124        ));
125        let mut limit = LimitOp::new(scan, 10, 5);
126        assert!(limit.next(&ctx).unwrap().is_none());
127    }
128}