Skip to main content

grafeo_core/execution/operators/
limit.rs

1//! Limit and Skip operators for result pagination.
2//!
3//! This module provides:
4//! - `LimitOperator`: Limits the number of output rows
5//! - `SkipOperator`: Skips a number of input rows
6//! - `LimitSkipOperator`: Combined LIMIT and OFFSET/SKIP
7
8use grafeo_common::types::{LogicalType, Value};
9
10use super::{Operator, OperatorResult};
11use crate::execution::chunk::DataChunkBuilder;
12
13/// Limit operator.
14///
15/// Returns at most `limit` rows from the input.
16pub struct LimitOperator {
17    /// Child operator.
18    child: Box<dyn Operator>,
19    /// Maximum number of rows to return.
20    limit: usize,
21    /// Output schema.
22    output_schema: Vec<LogicalType>,
23    /// Number of rows returned so far.
24    returned: usize,
25}
26
27impl LimitOperator {
28    /// Creates a new limit operator.
29    pub fn new(child: Box<dyn Operator>, limit: usize, output_schema: Vec<LogicalType>) -> Self {
30        Self {
31            child,
32            limit,
33            output_schema,
34            returned: 0,
35        }
36    }
37
38    /// Decomposes this operator for push-based conversion.
39    pub fn into_parts(self) -> (Box<dyn Operator>, usize) {
40        (self.child, self.limit)
41    }
42}
43
44impl Operator for LimitOperator {
45    fn next(&mut self) -> OperatorResult {
46        if self.returned >= self.limit {
47            return Ok(None);
48        }
49
50        let remaining = self.limit - self.returned;
51
52        loop {
53            let Some(chunk) = self.child.next()? else {
54                return Ok(None);
55            };
56
57            let row_count = chunk.row_count();
58            if row_count == 0 {
59                continue;
60            }
61
62            if row_count <= remaining {
63                // Return entire chunk
64                self.returned += row_count;
65                return Ok(Some(chunk));
66            }
67
68            // Return partial chunk
69            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, remaining);
70
71            let mut count = 0;
72            for row in chunk.selected_indices() {
73                if count >= remaining {
74                    break;
75                }
76
77                for col_idx in 0..chunk.column_count() {
78                    if let (Some(src_col), Some(dst_col)) =
79                        (chunk.column(col_idx), builder.column_mut(col_idx))
80                    {
81                        if let Some(value) = src_col.get_value(row) {
82                            dst_col.push_value(value);
83                        } else {
84                            dst_col.push_value(Value::Null);
85                        }
86                    }
87                }
88                builder.advance_row();
89                count += 1;
90            }
91
92            self.returned += count;
93            return Ok(Some(builder.finish()));
94        }
95    }
96
97    fn reset(&mut self) {
98        self.child.reset();
99        self.returned = 0;
100    }
101
102    fn name(&self) -> &'static str {
103        "Limit"
104    }
105
106    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
107        self
108    }
109}
110
111/// Skip operator.
112///
113/// Skips the first `skip` rows from the input.
114pub struct SkipOperator {
115    /// Child operator.
116    child: Box<dyn Operator>,
117    /// Number of rows to skip.
118    skip: usize,
119    /// Output schema.
120    output_schema: Vec<LogicalType>,
121    /// Number of rows skipped so far.
122    skipped: usize,
123}
124
125impl SkipOperator {
126    /// Creates a new skip operator.
127    pub fn new(child: Box<dyn Operator>, skip: usize, output_schema: Vec<LogicalType>) -> Self {
128        Self {
129            child,
130            skip,
131            output_schema,
132            skipped: 0,
133        }
134    }
135}
136
137impl Operator for SkipOperator {
138    fn next(&mut self) -> OperatorResult {
139        // Skip rows until we've skipped enough
140        while self.skipped < self.skip {
141            let Some(chunk) = self.child.next()? else {
142                return Ok(None);
143            };
144
145            let row_count = chunk.row_count();
146            let to_skip = (self.skip - self.skipped).min(row_count);
147
148            if to_skip >= row_count {
149                // Skip entire chunk
150                self.skipped += row_count;
151                continue;
152            }
153
154            // Skip partial chunk
155            self.skipped = self.skip;
156
157            let mut builder =
158                DataChunkBuilder::with_capacity(&self.output_schema, row_count - to_skip);
159
160            let rows: Vec<usize> = chunk.selected_indices().collect();
161            for &row in rows.iter().skip(to_skip) {
162                for col_idx in 0..chunk.column_count() {
163                    if let (Some(src_col), Some(dst_col)) =
164                        (chunk.column(col_idx), builder.column_mut(col_idx))
165                    {
166                        if let Some(value) = src_col.get_value(row) {
167                            dst_col.push_value(value);
168                        } else {
169                            dst_col.push_value(Value::Null);
170                        }
171                    }
172                }
173                builder.advance_row();
174            }
175
176            return Ok(Some(builder.finish()));
177        }
178
179        // After skipping, just pass through
180        self.child.next()
181    }
182
183    fn reset(&mut self) {
184        self.child.reset();
185        self.skipped = 0;
186    }
187
188    fn name(&self) -> &'static str {
189        "Skip"
190    }
191
192    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
193        self
194    }
195}
196
197/// Combined Limit and Skip operator.
198///
199/// Equivalent to OFFSET skip LIMIT limit.
200pub struct LimitSkipOperator {
201    /// Child operator.
202    child: Box<dyn Operator>,
203    /// Number of rows to skip.
204    skip: usize,
205    /// Maximum number of rows to return.
206    limit: usize,
207    /// Output schema.
208    output_schema: Vec<LogicalType>,
209    /// Number of rows skipped so far.
210    skipped: usize,
211    /// Number of rows returned so far.
212    returned: usize,
213}
214
215impl LimitSkipOperator {
216    /// Creates a new limit/skip operator.
217    pub fn new(
218        child: Box<dyn Operator>,
219        skip: usize,
220        limit: usize,
221        output_schema: Vec<LogicalType>,
222    ) -> Self {
223        Self {
224            child,
225            skip,
226            limit,
227            output_schema,
228            skipped: 0,
229            returned: 0,
230        }
231    }
232}
233
234impl Operator for LimitSkipOperator {
235    fn next(&mut self) -> OperatorResult {
236        // Check if we've returned enough
237        if self.returned >= self.limit {
238            return Ok(None);
239        }
240
241        loop {
242            let Some(chunk) = self.child.next()? else {
243                return Ok(None);
244            };
245
246            let row_count = chunk.row_count();
247            if row_count == 0 {
248                continue;
249            }
250
251            let rows: Vec<usize> = chunk.selected_indices().collect();
252            let mut start_idx = 0;
253
254            // Skip rows if needed
255            if self.skipped < self.skip {
256                let to_skip = (self.skip - self.skipped).min(row_count);
257                if to_skip >= row_count {
258                    self.skipped += row_count;
259                    continue;
260                }
261                self.skipped = self.skip;
262                start_idx = to_skip;
263            }
264
265            // Calculate how many rows to return
266            let remaining_in_chunk = row_count - start_idx;
267            let remaining_to_return = self.limit - self.returned;
268            let to_return = remaining_in_chunk.min(remaining_to_return);
269
270            if to_return == 0 {
271                return Ok(None);
272            }
273
274            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, to_return);
275
276            for &row in rows.iter().skip(start_idx).take(to_return) {
277                for col_idx in 0..chunk.column_count() {
278                    if let (Some(src_col), Some(dst_col)) =
279                        (chunk.column(col_idx), builder.column_mut(col_idx))
280                    {
281                        if let Some(value) = src_col.get_value(row) {
282                            dst_col.push_value(value);
283                        } else {
284                            dst_col.push_value(Value::Null);
285                        }
286                    }
287                }
288                builder.advance_row();
289            }
290
291            self.returned += to_return;
292            return Ok(Some(builder.finish()));
293        }
294    }
295
296    fn reset(&mut self) {
297        self.child.reset();
298        self.skipped = 0;
299        self.returned = 0;
300    }
301
302    fn name(&self) -> &'static str {
303        "LimitSkip"
304    }
305
306    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
307        self
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use crate::execution::DataChunk;
315    use crate::execution::chunk::DataChunkBuilder;
316
317    struct MockOperator {
318        chunks: Vec<DataChunk>,
319        position: usize,
320    }
321
322    impl MockOperator {
323        fn new(chunks: Vec<DataChunk>) -> Self {
324            Self {
325                chunks,
326                position: 0,
327            }
328        }
329    }
330
331    impl Operator for MockOperator {
332        fn next(&mut self) -> OperatorResult {
333            if self.position < self.chunks.len() {
334                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
335                self.position += 1;
336                Ok(Some(chunk))
337            } else {
338                Ok(None)
339            }
340        }
341
342        fn reset(&mut self) {
343            self.position = 0;
344        }
345
346        fn name(&self) -> &'static str {
347            "Mock"
348        }
349
350        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
351            self
352        }
353    }
354
355    fn create_numbered_chunk(values: &[i64]) -> DataChunk {
356        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
357        for &v in values {
358            builder.column_mut(0).unwrap().push_int64(v);
359            builder.advance_row();
360        }
361        builder.finish()
362    }
363
364    #[test]
365    fn test_limit() {
366        let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3, 4, 5])]);
367
368        let mut limit = LimitOperator::new(Box::new(mock), 3, vec![LogicalType::Int64]);
369
370        let mut results = Vec::new();
371        while let Some(chunk) = limit.next().unwrap() {
372            for row in chunk.selected_indices() {
373                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
374                results.push(val);
375            }
376        }
377
378        assert_eq!(results, vec![1, 2, 3]);
379    }
380
381    #[test]
382    fn test_limit_larger_than_input() {
383        let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3])]);
384
385        let mut limit = LimitOperator::new(Box::new(mock), 10, vec![LogicalType::Int64]);
386
387        let mut results = Vec::new();
388        while let Some(chunk) = limit.next().unwrap() {
389            for row in chunk.selected_indices() {
390                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
391                results.push(val);
392            }
393        }
394
395        assert_eq!(results, vec![1, 2, 3]);
396    }
397
398    #[test]
399    fn test_skip() {
400        let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3, 4, 5])]);
401
402        let mut skip = SkipOperator::new(Box::new(mock), 2, vec![LogicalType::Int64]);
403
404        let mut results = Vec::new();
405        while let Some(chunk) = skip.next().unwrap() {
406            for row in chunk.selected_indices() {
407                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
408                results.push(val);
409            }
410        }
411
412        assert_eq!(results, vec![3, 4, 5]);
413    }
414
415    #[test]
416    fn test_skip_all() {
417        let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3])]);
418
419        let mut skip = SkipOperator::new(Box::new(mock), 5, vec![LogicalType::Int64]);
420
421        let result = skip.next().unwrap();
422        assert!(result.is_none());
423    }
424
425    #[test]
426    fn test_limit_skip_combined() {
427        let mock = MockOperator::new(vec![create_numbered_chunk(&[
428            1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
429        ])]);
430
431        let mut op = LimitSkipOperator::new(
432            Box::new(mock),
433            3, // Skip first 3
434            4, // Take next 4
435            vec![LogicalType::Int64],
436        );
437
438        let mut results = Vec::new();
439        while let Some(chunk) = op.next().unwrap() {
440            for row in chunk.selected_indices() {
441                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
442                results.push(val);
443            }
444        }
445
446        assert_eq!(results, vec![4, 5, 6, 7]);
447    }
448
449    #[test]
450    fn test_limit_across_chunks() {
451        let mock = MockOperator::new(vec![
452            create_numbered_chunk(&[1, 2]),
453            create_numbered_chunk(&[3, 4]),
454            create_numbered_chunk(&[5, 6]),
455        ]);
456
457        let mut limit = LimitOperator::new(Box::new(mock), 5, vec![LogicalType::Int64]);
458
459        let mut results = Vec::new();
460        while let Some(chunk) = limit.next().unwrap() {
461            for row in chunk.selected_indices() {
462                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
463                results.push(val);
464            }
465        }
466
467        assert_eq!(results, vec![1, 2, 3, 4, 5]);
468    }
469
470    #[test]
471    fn test_skip_across_chunks() {
472        let mock = MockOperator::new(vec![
473            create_numbered_chunk(&[1, 2]),
474            create_numbered_chunk(&[3, 4]),
475            create_numbered_chunk(&[5, 6]),
476        ]);
477
478        let mut skip = SkipOperator::new(Box::new(mock), 3, vec![LogicalType::Int64]);
479
480        let mut results = Vec::new();
481        while let Some(chunk) = skip.next().unwrap() {
482            for row in chunk.selected_indices() {
483                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
484                results.push(val);
485            }
486        }
487
488        assert_eq!(results, vec![4, 5, 6]);
489    }
490
491    #[test]
492    fn test_limit_into_parts() {
493        let child = Box::new(MockOperator::new(vec![]));
494        let limit = LimitOperator::new(child, 42, vec![LogicalType::Int64]);
495        let (_, limit_value) = limit.into_parts();
496        assert_eq!(limit_value, 42);
497    }
498
499    #[test]
500    fn test_limit_into_any() {
501        let child = Box::new(MockOperator::new(vec![]));
502        let limit: Box<dyn Operator> = Box::new(LimitOperator::new(child, 10, vec![]));
503        let any = limit.into_any();
504        assert!(any.downcast::<LimitOperator>().is_ok());
505    }
506}