Skip to main content

grafeo_core/execution/operators/
limit.rs

1//! Limit and Skip operators for result pagination.
2//!
3//! This module provides:
4//! - `LimitOperator`: Limits the number of output rows
5//! - `SkipOperator`: Skips a number of input rows
6//! - `LimitSkipOperator`: Combined LIMIT and OFFSET/SKIP
7
8use grafeo_common::types::{LogicalType, Value};
9
10use super::{Operator, OperatorResult};
11use crate::execution::chunk::DataChunkBuilder;
12
13/// Limit operator.
14///
15/// Returns at most `limit` rows from the input.
16pub struct LimitOperator {
17    /// Child operator.
18    child: Box<dyn Operator>,
19    /// Maximum number of rows to return.
20    limit: usize,
21    /// Output schema.
22    output_schema: Vec<LogicalType>,
23    /// Number of rows returned so far.
24    returned: usize,
25}
26
27impl LimitOperator {
28    /// Creates a new limit operator.
29    pub fn new(child: Box<dyn Operator>, limit: usize, output_schema: Vec<LogicalType>) -> Self {
30        Self {
31            child,
32            limit,
33            output_schema,
34            returned: 0,
35        }
36    }
37}
38
39impl Operator for LimitOperator {
40    fn next(&mut self) -> OperatorResult {
41        if self.returned >= self.limit {
42            return Ok(None);
43        }
44
45        let remaining = self.limit - self.returned;
46
47        loop {
48            let Some(chunk) = self.child.next()? else {
49                return Ok(None);
50            };
51
52            let row_count = chunk.row_count();
53            if row_count == 0 {
54                continue;
55            }
56
57            if row_count <= remaining {
58                // Return entire chunk
59                self.returned += row_count;
60                return Ok(Some(chunk));
61            }
62
63            // Return partial chunk
64            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, remaining);
65
66            let mut count = 0;
67            for row in chunk.selected_indices() {
68                if count >= remaining {
69                    break;
70                }
71
72                for col_idx in 0..chunk.column_count() {
73                    if let (Some(src_col), Some(dst_col)) =
74                        (chunk.column(col_idx), builder.column_mut(col_idx))
75                    {
76                        if let Some(value) = src_col.get_value(row) {
77                            dst_col.push_value(value);
78                        } else {
79                            dst_col.push_value(Value::Null);
80                        }
81                    }
82                }
83                builder.advance_row();
84                count += 1;
85            }
86
87            self.returned += count;
88            return Ok(Some(builder.finish()));
89        }
90    }
91
92    fn reset(&mut self) {
93        self.child.reset();
94        self.returned = 0;
95    }
96
97    fn name(&self) -> &'static str {
98        "Limit"
99    }
100}
101
102/// Skip operator.
103///
104/// Skips the first `skip` rows from the input.
105pub struct SkipOperator {
106    /// Child operator.
107    child: Box<dyn Operator>,
108    /// Number of rows to skip.
109    skip: usize,
110    /// Output schema.
111    output_schema: Vec<LogicalType>,
112    /// Number of rows skipped so far.
113    skipped: usize,
114}
115
116impl SkipOperator {
117    /// Creates a new skip operator.
118    pub fn new(child: Box<dyn Operator>, skip: usize, output_schema: Vec<LogicalType>) -> Self {
119        Self {
120            child,
121            skip,
122            output_schema,
123            skipped: 0,
124        }
125    }
126}
127
128impl Operator for SkipOperator {
129    fn next(&mut self) -> OperatorResult {
130        // Skip rows until we've skipped enough
131        while self.skipped < self.skip {
132            let Some(chunk) = self.child.next()? else {
133                return Ok(None);
134            };
135
136            let row_count = chunk.row_count();
137            let to_skip = (self.skip - self.skipped).min(row_count);
138
139            if to_skip >= row_count {
140                // Skip entire chunk
141                self.skipped += row_count;
142                continue;
143            }
144
145            // Skip partial chunk
146            self.skipped = self.skip;
147
148            let mut builder =
149                DataChunkBuilder::with_capacity(&self.output_schema, row_count - to_skip);
150
151            let rows: Vec<usize> = chunk.selected_indices().collect();
152            for &row in rows.iter().skip(to_skip) {
153                for col_idx in 0..chunk.column_count() {
154                    if let (Some(src_col), Some(dst_col)) =
155                        (chunk.column(col_idx), builder.column_mut(col_idx))
156                    {
157                        if let Some(value) = src_col.get_value(row) {
158                            dst_col.push_value(value);
159                        } else {
160                            dst_col.push_value(Value::Null);
161                        }
162                    }
163                }
164                builder.advance_row();
165            }
166
167            return Ok(Some(builder.finish()));
168        }
169
170        // After skipping, just pass through
171        self.child.next()
172    }
173
174    fn reset(&mut self) {
175        self.child.reset();
176        self.skipped = 0;
177    }
178
179    fn name(&self) -> &'static str {
180        "Skip"
181    }
182}
183
184/// Combined Limit and Skip operator.
185///
186/// Equivalent to OFFSET skip LIMIT limit.
187pub struct LimitSkipOperator {
188    /// Child operator.
189    child: Box<dyn Operator>,
190    /// Number of rows to skip.
191    skip: usize,
192    /// Maximum number of rows to return.
193    limit: usize,
194    /// Output schema.
195    output_schema: Vec<LogicalType>,
196    /// Number of rows skipped so far.
197    skipped: usize,
198    /// Number of rows returned so far.
199    returned: usize,
200}
201
202impl LimitSkipOperator {
203    /// Creates a new limit/skip operator.
204    pub fn new(
205        child: Box<dyn Operator>,
206        skip: usize,
207        limit: usize,
208        output_schema: Vec<LogicalType>,
209    ) -> Self {
210        Self {
211            child,
212            skip,
213            limit,
214            output_schema,
215            skipped: 0,
216            returned: 0,
217        }
218    }
219}
220
221impl Operator for LimitSkipOperator {
222    fn next(&mut self) -> OperatorResult {
223        // Check if we've returned enough
224        if self.returned >= self.limit {
225            return Ok(None);
226        }
227
228        loop {
229            let Some(chunk) = self.child.next()? else {
230                return Ok(None);
231            };
232
233            let row_count = chunk.row_count();
234            if row_count == 0 {
235                continue;
236            }
237
238            let rows: Vec<usize> = chunk.selected_indices().collect();
239            let mut start_idx = 0;
240
241            // Skip rows if needed
242            if self.skipped < self.skip {
243                let to_skip = (self.skip - self.skipped).min(row_count);
244                if to_skip >= row_count {
245                    self.skipped += row_count;
246                    continue;
247                }
248                self.skipped = self.skip;
249                start_idx = to_skip;
250            }
251
252            // Calculate how many rows to return
253            let remaining_in_chunk = row_count - start_idx;
254            let remaining_to_return = self.limit - self.returned;
255            let to_return = remaining_in_chunk.min(remaining_to_return);
256
257            if to_return == 0 {
258                return Ok(None);
259            }
260
261            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, to_return);
262
263            for &row in rows.iter().skip(start_idx).take(to_return) {
264                for col_idx in 0..chunk.column_count() {
265                    if let (Some(src_col), Some(dst_col)) =
266                        (chunk.column(col_idx), builder.column_mut(col_idx))
267                    {
268                        if let Some(value) = src_col.get_value(row) {
269                            dst_col.push_value(value);
270                        } else {
271                            dst_col.push_value(Value::Null);
272                        }
273                    }
274                }
275                builder.advance_row();
276            }
277
278            self.returned += to_return;
279            return Ok(Some(builder.finish()));
280        }
281    }
282
283    fn reset(&mut self) {
284        self.child.reset();
285        self.skipped = 0;
286        self.returned = 0;
287    }
288
289    fn name(&self) -> &'static str {
290        "LimitSkip"
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297    use crate::execution::DataChunk;
298    use crate::execution::chunk::DataChunkBuilder;
299
300    struct MockOperator {
301        chunks: Vec<DataChunk>,
302        position: usize,
303    }
304
305    impl MockOperator {
306        fn new(chunks: Vec<DataChunk>) -> Self {
307            Self {
308                chunks,
309                position: 0,
310            }
311        }
312    }
313
314    impl Operator for MockOperator {
315        fn next(&mut self) -> OperatorResult {
316            if self.position < self.chunks.len() {
317                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
318                self.position += 1;
319                Ok(Some(chunk))
320            } else {
321                Ok(None)
322            }
323        }
324
325        fn reset(&mut self) {
326            self.position = 0;
327        }
328
329        fn name(&self) -> &'static str {
330            "Mock"
331        }
332    }
333
334    fn create_numbered_chunk(values: &[i64]) -> DataChunk {
335        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
336        for &v in values {
337            builder.column_mut(0).unwrap().push_int64(v);
338            builder.advance_row();
339        }
340        builder.finish()
341    }
342
343    #[test]
344    fn test_limit() {
345        let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3, 4, 5])]);
346
347        let mut limit = LimitOperator::new(Box::new(mock), 3, vec![LogicalType::Int64]);
348
349        let mut results = Vec::new();
350        while let Some(chunk) = limit.next().unwrap() {
351            for row in chunk.selected_indices() {
352                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
353                results.push(val);
354            }
355        }
356
357        assert_eq!(results, vec![1, 2, 3]);
358    }
359
360    #[test]
361    fn test_limit_larger_than_input() {
362        let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3])]);
363
364        let mut limit = LimitOperator::new(Box::new(mock), 10, vec![LogicalType::Int64]);
365
366        let mut results = Vec::new();
367        while let Some(chunk) = limit.next().unwrap() {
368            for row in chunk.selected_indices() {
369                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
370                results.push(val);
371            }
372        }
373
374        assert_eq!(results, vec![1, 2, 3]);
375    }
376
377    #[test]
378    fn test_skip() {
379        let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3, 4, 5])]);
380
381        let mut skip = SkipOperator::new(Box::new(mock), 2, vec![LogicalType::Int64]);
382
383        let mut results = Vec::new();
384        while let Some(chunk) = skip.next().unwrap() {
385            for row in chunk.selected_indices() {
386                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
387                results.push(val);
388            }
389        }
390
391        assert_eq!(results, vec![3, 4, 5]);
392    }
393
394    #[test]
395    fn test_skip_all() {
396        let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3])]);
397
398        let mut skip = SkipOperator::new(Box::new(mock), 5, vec![LogicalType::Int64]);
399
400        let result = skip.next().unwrap();
401        assert!(result.is_none());
402    }
403
404    #[test]
405    fn test_limit_skip_combined() {
406        let mock = MockOperator::new(vec![create_numbered_chunk(&[
407            1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
408        ])]);
409
410        let mut op = LimitSkipOperator::new(
411            Box::new(mock),
412            3, // Skip first 3
413            4, // Take next 4
414            vec![LogicalType::Int64],
415        );
416
417        let mut results = Vec::new();
418        while let Some(chunk) = op.next().unwrap() {
419            for row in chunk.selected_indices() {
420                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
421                results.push(val);
422            }
423        }
424
425        assert_eq!(results, vec![4, 5, 6, 7]);
426    }
427
428    #[test]
429    fn test_limit_across_chunks() {
430        let mock = MockOperator::new(vec![
431            create_numbered_chunk(&[1, 2]),
432            create_numbered_chunk(&[3, 4]),
433            create_numbered_chunk(&[5, 6]),
434        ]);
435
436        let mut limit = LimitOperator::new(Box::new(mock), 5, vec![LogicalType::Int64]);
437
438        let mut results = Vec::new();
439        while let Some(chunk) = limit.next().unwrap() {
440            for row in chunk.selected_indices() {
441                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
442                results.push(val);
443            }
444        }
445
446        assert_eq!(results, vec![1, 2, 3, 4, 5]);
447    }
448
449    #[test]
450    fn test_skip_across_chunks() {
451        let mock = MockOperator::new(vec![
452            create_numbered_chunk(&[1, 2]),
453            create_numbered_chunk(&[3, 4]),
454            create_numbered_chunk(&[5, 6]),
455        ]);
456
457        let mut skip = SkipOperator::new(Box::new(mock), 3, vec![LogicalType::Int64]);
458
459        let mut results = Vec::new();
460        while let Some(chunk) = skip.next().unwrap() {
461            for row in chunk.selected_indices() {
462                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
463                results.push(val);
464            }
465        }
466
467        assert_eq!(results, vec![4, 5, 6]);
468    }
469}