Skip to main content

graphos_core/execution/operators/push/
limit.rs

1//! Push-based limit operator.
2
3use crate::execution::chunk::DataChunk;
4use crate::execution::operators::OperatorError;
5use crate::execution::pipeline::{ChunkSizeHint, PushOperator, Sink};
6use crate::execution::selection::SelectionVector;
7
8/// Push-based limit operator.
9///
10/// Passes through rows until the limit is reached, then signals early termination.
11pub struct LimitPushOperator {
12    /// Maximum number of rows to pass through.
13    limit: usize,
14    /// Number of rows passed through so far.
15    passed: usize,
16}
17
18impl LimitPushOperator {
19    /// Create a new limit operator.
20    pub fn new(limit: usize) -> Self {
21        Self { limit, passed: 0 }
22    }
23
24    /// Get the limit.
25    pub fn limit(&self) -> usize {
26        self.limit
27    }
28
29    /// Get the number of rows passed through.
30    pub fn passed(&self) -> usize {
31        self.passed
32    }
33
34    /// Check if limit has been reached.
35    pub fn is_exhausted(&self) -> bool {
36        self.passed >= self.limit
37    }
38}
39
40impl PushOperator for LimitPushOperator {
41    fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
42        if self.passed >= self.limit {
43            // Already at limit, signal termination
44            return Ok(false);
45        }
46
47        let chunk_len = chunk.len();
48        let remaining = self.limit - self.passed;
49
50        if chunk_len <= remaining {
51            // Pass entire chunk
52            self.passed += chunk_len;
53            let should_continue = sink.consume(chunk)?;
54            Ok(should_continue && self.passed < self.limit)
55        } else {
56            // Need to truncate chunk
57            self.passed += remaining;
58
59            // Create selection for first `remaining` rows
60            let selection = SelectionVector::new_all(remaining);
61            let truncated = chunk.filter(&selection);
62
63            sink.consume(truncated)?;
64            Ok(false) // Limit reached
65        }
66    }
67
68    fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
69        // Nothing to finalize
70        Ok(())
71    }
72
73    fn preferred_chunk_size(&self) -> ChunkSizeHint {
74        // If limit is small, use small chunks to avoid processing extra data
75        if self.limit < 256 {
76            ChunkSizeHint::AtMost(self.limit)
77        } else if self.limit < 1000 {
78            ChunkSizeHint::Small
79        } else {
80            ChunkSizeHint::Default
81        }
82    }
83
84    fn name(&self) -> &'static str {
85        "LimitPush"
86    }
87}
88
89/// Push-based skip operator.
90///
91/// Skips the first N rows, then passes through the rest.
92pub struct SkipPushOperator {
93    /// Number of rows to skip.
94    skip: usize,
95    /// Number of rows skipped so far.
96    skipped: usize,
97}
98
99impl SkipPushOperator {
100    /// Create a new skip operator.
101    pub fn new(skip: usize) -> Self {
102        Self { skip, skipped: 0 }
103    }
104
105    /// Get the skip count.
106    pub fn skip(&self) -> usize {
107        self.skip
108    }
109
110    /// Check if skip phase is complete.
111    pub fn skip_complete(&self) -> bool {
112        self.skipped >= self.skip
113    }
114}
115
116impl PushOperator for SkipPushOperator {
117    fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
118        if self.skipped >= self.skip {
119            // Skip phase complete, pass everything through
120            return sink.consume(chunk);
121        }
122
123        let chunk_len = chunk.len();
124        let remaining_to_skip = self.skip - self.skipped;
125
126        if chunk_len <= remaining_to_skip {
127            // Skip entire chunk
128            self.skipped += chunk_len;
129            Ok(true)
130        } else {
131            // Skip first `remaining_to_skip` rows, pass the rest
132            self.skipped = self.skip;
133
134            let start = remaining_to_skip;
135            let selection = SelectionVector::from_predicate(chunk_len, |i| i >= start);
136            let passed = chunk.filter(&selection);
137
138            sink.consume(passed)
139        }
140    }
141
142    fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
143        Ok(())
144    }
145
146    fn name(&self) -> &'static str {
147        "SkipPush"
148    }
149}
150
151/// Combined skip and limit operator.
152///
153/// Skips the first N rows, then passes through at most M rows.
154pub struct SkipLimitPushOperator {
155    skip: SkipPushOperator,
156    limit: LimitPushOperator,
157}
158
159impl SkipLimitPushOperator {
160    /// Create a new skip+limit operator.
161    pub fn new(skip: usize, limit: usize) -> Self {
162        Self {
163            skip: SkipPushOperator::new(skip),
164            limit: LimitPushOperator::new(limit),
165        }
166    }
167}
168
169impl PushOperator for SkipLimitPushOperator {
170    fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
171        if self.limit.is_exhausted() {
172            return Ok(false);
173        }
174
175        if !self.skip.skip_complete() {
176            // Still in skip phase
177            // We need to handle partial skip
178            let chunk_len = chunk.len();
179            let remaining_to_skip = self.skip.skip - self.skip.skipped;
180
181            if chunk_len <= remaining_to_skip {
182                // Skip entire chunk
183                self.skip.skipped += chunk_len;
184                return Ok(true);
185            }
186
187            // Partial skip
188            self.skip.skipped = self.skip.skip;
189            let start = remaining_to_skip;
190            let selection = SelectionVector::from_predicate(chunk_len, |i| i >= start);
191            let passed = chunk.filter(&selection);
192
193            return self.limit.push(passed, sink);
194        }
195
196        // Skip complete, apply limit
197        self.limit.push(chunk, sink)
198    }
199
200    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
201        self.skip.finalize(sink)?;
202        self.limit.finalize(sink)
203    }
204
205    fn preferred_chunk_size(&self) -> ChunkSizeHint {
206        self.limit.preferred_chunk_size()
207    }
208
209    fn name(&self) -> &'static str {
210        "SkipLimitPush"
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use crate::execution::sink::CollectorSink;
218    use crate::execution::vector::ValueVector;
219    use graphos_common::types::Value;
220
221    fn create_test_chunk(values: &[i64]) -> DataChunk {
222        let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
223        let vector = ValueVector::from_values(&v);
224        DataChunk::new(vec![vector])
225    }
226
227    #[test]
228    fn test_limit_under_limit() {
229        let mut limit = LimitPushOperator::new(10);
230        let mut sink = CollectorSink::new();
231
232        limit
233            .push(create_test_chunk(&[1, 2, 3]), &mut sink)
234            .unwrap();
235        limit.finalize(&mut sink).unwrap();
236
237        assert_eq!(sink.row_count(), 3);
238        assert!(!limit.is_exhausted());
239    }
240
241    #[test]
242    fn test_limit_exact_limit() {
243        let mut limit = LimitPushOperator::new(5);
244        let mut sink = CollectorSink::new();
245
246        let should_continue = limit
247            .push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
248            .unwrap();
249        limit.finalize(&mut sink).unwrap();
250
251        assert_eq!(sink.row_count(), 5);
252        assert!(!should_continue);
253        assert!(limit.is_exhausted());
254    }
255
256    #[test]
257    fn test_limit_over_limit() {
258        let mut limit = LimitPushOperator::new(3);
259        let mut sink = CollectorSink::new();
260
261        let should_continue = limit
262            .push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
263            .unwrap();
264        limit.finalize(&mut sink).unwrap();
265
266        assert_eq!(sink.row_count(), 3);
267        assert!(!should_continue);
268    }
269
270    #[test]
271    fn test_limit_multiple_chunks() {
272        let mut limit = LimitPushOperator::new(5);
273        let mut sink = CollectorSink::new();
274
275        limit.push(create_test_chunk(&[1, 2]), &mut sink).unwrap();
276        limit.push(create_test_chunk(&[3, 4]), &mut sink).unwrap();
277        let should_continue = limit
278            .push(create_test_chunk(&[5, 6, 7]), &mut sink)
279            .unwrap();
280        limit.finalize(&mut sink).unwrap();
281
282        assert_eq!(sink.row_count(), 5);
283        assert!(!should_continue);
284    }
285
286    #[test]
287    fn test_skip_under_skip() {
288        let mut skip = SkipPushOperator::new(10);
289        let mut sink = CollectorSink::new();
290
291        skip.push(create_test_chunk(&[1, 2, 3]), &mut sink).unwrap();
292        skip.finalize(&mut sink).unwrap();
293
294        assert_eq!(sink.row_count(), 0);
295        assert!(!skip.skip_complete());
296    }
297
298    #[test]
299    fn test_skip_exact_skip() {
300        let mut skip = SkipPushOperator::new(3);
301        let mut sink = CollectorSink::new();
302
303        skip.push(create_test_chunk(&[1, 2, 3]), &mut sink).unwrap();
304        skip.push(create_test_chunk(&[4, 5, 6]), &mut sink).unwrap();
305        skip.finalize(&mut sink).unwrap();
306
307        assert_eq!(sink.row_count(), 3); // 4, 5, 6
308        assert!(skip.skip_complete());
309    }
310
311    #[test]
312    fn test_skip_partial() {
313        let mut skip = SkipPushOperator::new(2);
314        let mut sink = CollectorSink::new();
315
316        skip.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
317            .unwrap();
318        skip.finalize(&mut sink).unwrap();
319
320        assert_eq!(sink.row_count(), 3); // 3, 4, 5
321    }
322
323    #[test]
324    fn test_skip_limit_combined() {
325        let mut op = SkipLimitPushOperator::new(2, 3);
326        let mut sink = CollectorSink::new();
327
328        op.push(create_test_chunk(&[1, 2, 3, 4, 5, 6, 7]), &mut sink)
329            .unwrap();
330        op.finalize(&mut sink).unwrap();
331
332        assert_eq!(sink.row_count(), 3); // 3, 4, 5 (skip 1,2; limit 3)
333    }
334}