Skip to main content

graphos_core/execution/operators/
limit.rs

1//! Limit and Skip operators for result pagination.
2//!
3//! This module provides:
4//! - `LimitOperator`: Limits the number of output rows
5//! - `SkipOperator`: Skips a number of input rows
6//! - `LimitSkipOperator`: Combined LIMIT and OFFSET/SKIP
7
8use graphos_common::types::{LogicalType, Value};
9
10use super::{Operator, OperatorResult};
11use crate::execution::chunk::DataChunkBuilder;
12
13/// Limit operator.
14///
15/// Returns at most `limit` rows from the input.
16pub struct LimitOperator {
17    /// Child operator.
18    child: Box<dyn Operator>,
19    /// Maximum number of rows to return.
20    limit: usize,
21    /// Output schema.
22    output_schema: Vec<LogicalType>,
23    /// Number of rows returned so far.
24    returned: usize,
25}
26
27impl LimitOperator {
28    /// Creates a new limit operator.
29    pub fn new(child: Box<dyn Operator>, limit: usize, output_schema: Vec<LogicalType>) -> Self {
30        Self {
31            child,
32            limit,
33            output_schema,
34            returned: 0,
35        }
36    }
37}
38
39impl Operator for LimitOperator {
40    fn next(&mut self) -> OperatorResult {
41        if self.returned >= self.limit {
42            return Ok(None);
43        }
44
45        let remaining = self.limit - self.returned;
46
47        loop {
48            let chunk = match self.child.next()? {
49                Some(c) => c,
50                None => return Ok(None),
51            };
52
53            let row_count = chunk.row_count();
54            if row_count == 0 {
55                continue;
56            }
57
58            if row_count <= remaining {
59                // Return entire chunk
60                self.returned += row_count;
61                return Ok(Some(chunk));
62            } else {
63                // Return partial chunk
64                let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, remaining);
65
66                let mut count = 0;
67                for row in chunk.selected_indices() {
68                    if count >= remaining {
69                        break;
70                    }
71
72                    for col_idx in 0..chunk.column_count() {
73                        if let (Some(src_col), Some(dst_col)) =
74                            (chunk.column(col_idx), builder.column_mut(col_idx))
75                        {
76                            if let Some(value) = src_col.get_value(row) {
77                                dst_col.push_value(value);
78                            } else {
79                                dst_col.push_value(Value::Null);
80                            }
81                        }
82                    }
83                    builder.advance_row();
84                    count += 1;
85                }
86
87                self.returned += count;
88                return Ok(Some(builder.finish()));
89            }
90        }
91    }
92
93    fn reset(&mut self) {
94        self.child.reset();
95        self.returned = 0;
96    }
97
98    fn name(&self) -> &'static str {
99        "Limit"
100    }
101}
102
103/// Skip operator.
104///
105/// Skips the first `skip` rows from the input.
106pub struct SkipOperator {
107    /// Child operator.
108    child: Box<dyn Operator>,
109    /// Number of rows to skip.
110    skip: usize,
111    /// Output schema.
112    output_schema: Vec<LogicalType>,
113    /// Number of rows skipped so far.
114    skipped: usize,
115}
116
117impl SkipOperator {
118    /// Creates a new skip operator.
119    pub fn new(child: Box<dyn Operator>, skip: usize, output_schema: Vec<LogicalType>) -> Self {
120        Self {
121            child,
122            skip,
123            output_schema,
124            skipped: 0,
125        }
126    }
127}
128
129impl Operator for SkipOperator {
130    fn next(&mut self) -> OperatorResult {
131        // Skip rows until we've skipped enough
132        while self.skipped < self.skip {
133            let chunk = match self.child.next()? {
134                Some(c) => c,
135                None => return Ok(None),
136            };
137
138            let row_count = chunk.row_count();
139            let to_skip = (self.skip - self.skipped).min(row_count);
140
141            if to_skip >= row_count {
142                // Skip entire chunk
143                self.skipped += row_count;
144                continue;
145            }
146
147            // Skip partial chunk
148            self.skipped = self.skip;
149
150            let mut builder =
151                DataChunkBuilder::with_capacity(&self.output_schema, row_count - to_skip);
152
153            let rows: Vec<usize> = chunk.selected_indices().collect();
154            for &row in rows.iter().skip(to_skip) {
155                for col_idx in 0..chunk.column_count() {
156                    if let (Some(src_col), Some(dst_col)) =
157                        (chunk.column(col_idx), builder.column_mut(col_idx))
158                    {
159                        if let Some(value) = src_col.get_value(row) {
160                            dst_col.push_value(value);
161                        } else {
162                            dst_col.push_value(Value::Null);
163                        }
164                    }
165                }
166                builder.advance_row();
167            }
168
169            return Ok(Some(builder.finish()));
170        }
171
172        // After skipping, just pass through
173        self.child.next()
174    }
175
176    fn reset(&mut self) {
177        self.child.reset();
178        self.skipped = 0;
179    }
180
181    fn name(&self) -> &'static str {
182        "Skip"
183    }
184}
185
186/// Combined Limit and Skip operator.
187///
188/// Equivalent to OFFSET skip LIMIT limit.
189pub struct LimitSkipOperator {
190    /// Child operator.
191    child: Box<dyn Operator>,
192    /// Number of rows to skip.
193    skip: usize,
194    /// Maximum number of rows to return.
195    limit: usize,
196    /// Output schema.
197    output_schema: Vec<LogicalType>,
198    /// Number of rows skipped so far.
199    skipped: usize,
200    /// Number of rows returned so far.
201    returned: usize,
202}
203
204impl LimitSkipOperator {
205    /// Creates a new limit/skip operator.
206    pub fn new(
207        child: Box<dyn Operator>,
208        skip: usize,
209        limit: usize,
210        output_schema: Vec<LogicalType>,
211    ) -> Self {
212        Self {
213            child,
214            skip,
215            limit,
216            output_schema,
217            skipped: 0,
218            returned: 0,
219        }
220    }
221}
222
223impl Operator for LimitSkipOperator {
224    fn next(&mut self) -> OperatorResult {
225        // Check if we've returned enough
226        if self.returned >= self.limit {
227            return Ok(None);
228        }
229
230        loop {
231            let chunk = match self.child.next()? {
232                Some(c) => c,
233                None => return Ok(None),
234            };
235
236            let row_count = chunk.row_count();
237            if row_count == 0 {
238                continue;
239            }
240
241            let rows: Vec<usize> = chunk.selected_indices().collect();
242            let mut start_idx = 0;
243
244            // Skip rows if needed
245            if self.skipped < self.skip {
246                let to_skip = (self.skip - self.skipped).min(row_count);
247                if to_skip >= row_count {
248                    self.skipped += row_count;
249                    continue;
250                }
251                self.skipped = self.skip;
252                start_idx = to_skip;
253            }
254
255            // Calculate how many rows to return
256            let remaining_in_chunk = row_count - start_idx;
257            let remaining_to_return = self.limit - self.returned;
258            let to_return = remaining_in_chunk.min(remaining_to_return);
259
260            if to_return == 0 {
261                return Ok(None);
262            }
263
264            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, to_return);
265
266            for &row in rows.iter().skip(start_idx).take(to_return) {
267                for col_idx in 0..chunk.column_count() {
268                    if let (Some(src_col), Some(dst_col)) =
269                        (chunk.column(col_idx), builder.column_mut(col_idx))
270                    {
271                        if let Some(value) = src_col.get_value(row) {
272                            dst_col.push_value(value);
273                        } else {
274                            dst_col.push_value(Value::Null);
275                        }
276                    }
277                }
278                builder.advance_row();
279            }
280
281            self.returned += to_return;
282            return Ok(Some(builder.finish()));
283        }
284    }
285
286    fn reset(&mut self) {
287        self.child.reset();
288        self.skipped = 0;
289        self.returned = 0;
290    }
291
292    fn name(&self) -> &'static str {
293        "LimitSkip"
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use crate::execution::DataChunk;
301    use crate::execution::chunk::DataChunkBuilder;
302
303    struct MockOperator {
304        chunks: Vec<DataChunk>,
305        position: usize,
306    }
307
308    impl MockOperator {
309        fn new(chunks: Vec<DataChunk>) -> Self {
310            Self {
311                chunks,
312                position: 0,
313            }
314        }
315    }
316
317    impl Operator for MockOperator {
318        fn next(&mut self) -> OperatorResult {
319            if self.position < self.chunks.len() {
320                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
321                self.position += 1;
322                Ok(Some(chunk))
323            } else {
324                Ok(None)
325            }
326        }
327
328        fn reset(&mut self) {
329            self.position = 0;
330        }
331
332        fn name(&self) -> &'static str {
333            "Mock"
334        }
335    }
336
337    fn create_numbered_chunk(values: &[i64]) -> DataChunk {
338        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
339        for &v in values {
340            builder.column_mut(0).unwrap().push_int64(v);
341            builder.advance_row();
342        }
343        builder.finish()
344    }
345
346    #[test]
347    fn test_limit() {
348        let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3, 4, 5])]);
349
350        let mut limit = LimitOperator::new(Box::new(mock), 3, vec![LogicalType::Int64]);
351
352        let mut results = Vec::new();
353        while let Some(chunk) = limit.next().unwrap() {
354            for row in chunk.selected_indices() {
355                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
356                results.push(val);
357            }
358        }
359
360        assert_eq!(results, vec![1, 2, 3]);
361    }
362
363    #[test]
364    fn test_limit_larger_than_input() {
365        let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3])]);
366
367        let mut limit = LimitOperator::new(Box::new(mock), 10, vec![LogicalType::Int64]);
368
369        let mut results = Vec::new();
370        while let Some(chunk) = limit.next().unwrap() {
371            for row in chunk.selected_indices() {
372                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
373                results.push(val);
374            }
375        }
376
377        assert_eq!(results, vec![1, 2, 3]);
378    }
379
380    #[test]
381    fn test_skip() {
382        let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3, 4, 5])]);
383
384        let mut skip = SkipOperator::new(Box::new(mock), 2, vec![LogicalType::Int64]);
385
386        let mut results = Vec::new();
387        while let Some(chunk) = skip.next().unwrap() {
388            for row in chunk.selected_indices() {
389                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
390                results.push(val);
391            }
392        }
393
394        assert_eq!(results, vec![3, 4, 5]);
395    }
396
397    #[test]
398    fn test_skip_all() {
399        let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3])]);
400
401        let mut skip = SkipOperator::new(Box::new(mock), 5, vec![LogicalType::Int64]);
402
403        let result = skip.next().unwrap();
404        assert!(result.is_none());
405    }
406
407    #[test]
408    fn test_limit_skip_combined() {
409        let mock = MockOperator::new(vec![create_numbered_chunk(&[
410            1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
411        ])]);
412
413        let mut op = LimitSkipOperator::new(
414            Box::new(mock),
415            3, // Skip first 3
416            4, // Take next 4
417            vec![LogicalType::Int64],
418        );
419
420        let mut results = Vec::new();
421        while let Some(chunk) = op.next().unwrap() {
422            for row in chunk.selected_indices() {
423                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
424                results.push(val);
425            }
426        }
427
428        assert_eq!(results, vec![4, 5, 6, 7]);
429    }
430
431    #[test]
432    fn test_limit_across_chunks() {
433        let mock = MockOperator::new(vec![
434            create_numbered_chunk(&[1, 2]),
435            create_numbered_chunk(&[3, 4]),
436            create_numbered_chunk(&[5, 6]),
437        ]);
438
439        let mut limit = LimitOperator::new(Box::new(mock), 5, vec![LogicalType::Int64]);
440
441        let mut results = Vec::new();
442        while let Some(chunk) = limit.next().unwrap() {
443            for row in chunk.selected_indices() {
444                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
445                results.push(val);
446            }
447        }
448
449        assert_eq!(results, vec![1, 2, 3, 4, 5]);
450    }
451
452    #[test]
453    fn test_skip_across_chunks() {
454        let mock = MockOperator::new(vec![
455            create_numbered_chunk(&[1, 2]),
456            create_numbered_chunk(&[3, 4]),
457            create_numbered_chunk(&[5, 6]),
458        ]);
459
460        let mut skip = SkipOperator::new(Box::new(mock), 3, vec![LogicalType::Int64]);
461
462        let mut results = Vec::new();
463        while let Some(chunk) = skip.next().unwrap() {
464            for row in chunk.selected_indices() {
465                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
466                results.push(val);
467            }
468        }
469
470        assert_eq!(results, vec![4, 5, 6]);
471    }
472}