Skip to main content

kyu_executor/operators/
limit.rs

1//! Limit operator — SKIP + LIMIT row counting.
2
3use kyu_common::KyuResult;
4
5use crate::context::ExecutionContext;
6use crate::data_chunk::DataChunk;
7use crate::physical_plan::PhysicalOperator;
8
9pub struct LimitOp {
10    pub child: Box<PhysicalOperator>,
11    pub skip: u64,
12    pub limit: u64,
13    skipped: u64,
14    emitted: u64,
15}
16
17impl LimitOp {
18    pub fn new(child: PhysicalOperator, skip: u64, limit: u64) -> Self {
19        Self {
20            child: Box::new(child),
21            skip,
22            limit,
23            skipped: 0,
24            emitted: 0,
25        }
26    }
27
28    pub fn next(&mut self, ctx: &ExecutionContext<'_>) -> KyuResult<Option<DataChunk>> {
29        if self.emitted >= self.limit {
30            return Ok(None);
31        }
32
33        loop {
34            let chunk = match self.child.next(ctx)? {
35                Some(c) => c,
36                None => return Ok(None),
37            };
38
39            let n = chunk.num_rows();
40
41            // Fast skip: entire chunk can be skipped
42            let skip_remaining = (self.skip - self.skipped) as usize;
43            if skip_remaining >= n {
44                self.skipped += n as u64;
45                continue;
46            }
47
48            let start = skip_remaining;
49            self.skipped = self.skip;
50
51            let remaining = (self.limit - self.emitted) as usize;
52            let take = remaining.min(n - start);
53
54            if take == 0 {
55                return Ok(None);
56            }
57
58            // Build selection vector for the slice [start..start+take]
59            let sel = chunk.selection();
60            let indices: Vec<u32> = (start..start + take)
61                .map(|i| sel.get(i) as u32)
62                .collect();
63            self.emitted += take as u64;
64
65            return Ok(Some(
66                chunk.with_selection(crate::value_vector::SelectionVector::from_indices(indices)),
67            ));
68        }
69    }
70}
71
72#[cfg(test)]
73mod tests {
74    use super::*;
75    use crate::context::MockStorage;
76    use kyu_types::TypedValue;
77
78    fn make_storage() -> MockStorage {
79        let mut storage = MockStorage::new();
80        storage.insert_table(
81            kyu_common::id::TableId(0),
82            vec![
83                vec![TypedValue::Int64(1)],
84                vec![TypedValue::Int64(2)],
85                vec![TypedValue::Int64(3)],
86                vec![TypedValue::Int64(4)],
87                vec![TypedValue::Int64(5)],
88            ],
89        );
90        storage
91    }
92
93    #[test]
94    fn limit_only() {
95        let storage = make_storage();
96        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
97        let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
98            kyu_common::id::TableId(0),
99        ));
100        let mut limit = LimitOp::new(scan, 0, 3);
101        let chunk = limit.next(&ctx).unwrap().unwrap();
102        assert_eq!(chunk.num_rows(), 3);
103        assert!(limit.next(&ctx).unwrap().is_none());
104    }
105
106    #[test]
107    fn skip_and_limit() {
108        let storage = make_storage();
109        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
110        let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
111            kyu_common::id::TableId(0),
112        ));
113        let mut limit = LimitOp::new(scan, 2, 2);
114        let chunk = limit.next(&ctx).unwrap().unwrap();
115        assert_eq!(chunk.num_rows(), 2);
116        assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(3));
117        assert_eq!(chunk.get_value(1, 0), TypedValue::Int64(4));
118    }
119
120    #[test]
121    fn skip_all() {
122        let storage = make_storage();
123        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
124        let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
125            kyu_common::id::TableId(0),
126        ));
127        let mut limit = LimitOp::new(scan, 10, 5);
128        assert!(limit.next(&ctx).unwrap().is_none());
129    }
130}