grafeo_core/execution/operators/
limit.rs1use grafeo_common::types::{LogicalType, Value};
9
10use super::{Operator, OperatorResult};
11use crate::execution::chunk::DataChunkBuilder;
12
13pub struct LimitOperator {
17 child: Box<dyn Operator>,
19 limit: usize,
21 output_schema: Vec<LogicalType>,
23 returned: usize,
25}
26
27impl LimitOperator {
28 pub fn new(child: Box<dyn Operator>, limit: usize, output_schema: Vec<LogicalType>) -> Self {
30 Self {
31 child,
32 limit,
33 output_schema,
34 returned: 0,
35 }
36 }
37}
38
39impl Operator for LimitOperator {
40 fn next(&mut self) -> OperatorResult {
41 if self.returned >= self.limit {
42 return Ok(None);
43 }
44
45 let remaining = self.limit - self.returned;
46
47 loop {
48 let Some(chunk) = self.child.next()? else {
49 return Ok(None);
50 };
51
52 let row_count = chunk.row_count();
53 if row_count == 0 {
54 continue;
55 }
56
57 if row_count <= remaining {
58 self.returned += row_count;
60 return Ok(Some(chunk));
61 }
62
63 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, remaining);
65
66 let mut count = 0;
67 for row in chunk.selected_indices() {
68 if count >= remaining {
69 break;
70 }
71
72 for col_idx in 0..chunk.column_count() {
73 if let (Some(src_col), Some(dst_col)) =
74 (chunk.column(col_idx), builder.column_mut(col_idx))
75 {
76 if let Some(value) = src_col.get_value(row) {
77 dst_col.push_value(value);
78 } else {
79 dst_col.push_value(Value::Null);
80 }
81 }
82 }
83 builder.advance_row();
84 count += 1;
85 }
86
87 self.returned += count;
88 return Ok(Some(builder.finish()));
89 }
90 }
91
92 fn reset(&mut self) {
93 self.child.reset();
94 self.returned = 0;
95 }
96
97 fn name(&self) -> &'static str {
98 "Limit"
99 }
100}
101
102pub struct SkipOperator {
106 child: Box<dyn Operator>,
108 skip: usize,
110 output_schema: Vec<LogicalType>,
112 skipped: usize,
114}
115
116impl SkipOperator {
117 pub fn new(child: Box<dyn Operator>, skip: usize, output_schema: Vec<LogicalType>) -> Self {
119 Self {
120 child,
121 skip,
122 output_schema,
123 skipped: 0,
124 }
125 }
126}
127
128impl Operator for SkipOperator {
129 fn next(&mut self) -> OperatorResult {
130 while self.skipped < self.skip {
132 let Some(chunk) = self.child.next()? else {
133 return Ok(None);
134 };
135
136 let row_count = chunk.row_count();
137 let to_skip = (self.skip - self.skipped).min(row_count);
138
139 if to_skip >= row_count {
140 self.skipped += row_count;
142 continue;
143 }
144
145 self.skipped = self.skip;
147
148 let mut builder =
149 DataChunkBuilder::with_capacity(&self.output_schema, row_count - to_skip);
150
151 let rows: Vec<usize> = chunk.selected_indices().collect();
152 for &row in rows.iter().skip(to_skip) {
153 for col_idx in 0..chunk.column_count() {
154 if let (Some(src_col), Some(dst_col)) =
155 (chunk.column(col_idx), builder.column_mut(col_idx))
156 {
157 if let Some(value) = src_col.get_value(row) {
158 dst_col.push_value(value);
159 } else {
160 dst_col.push_value(Value::Null);
161 }
162 }
163 }
164 builder.advance_row();
165 }
166
167 return Ok(Some(builder.finish()));
168 }
169
170 self.child.next()
172 }
173
174 fn reset(&mut self) {
175 self.child.reset();
176 self.skipped = 0;
177 }
178
179 fn name(&self) -> &'static str {
180 "Skip"
181 }
182}
183
184pub struct LimitSkipOperator {
188 child: Box<dyn Operator>,
190 skip: usize,
192 limit: usize,
194 output_schema: Vec<LogicalType>,
196 skipped: usize,
198 returned: usize,
200}
201
202impl LimitSkipOperator {
203 pub fn new(
205 child: Box<dyn Operator>,
206 skip: usize,
207 limit: usize,
208 output_schema: Vec<LogicalType>,
209 ) -> Self {
210 Self {
211 child,
212 skip,
213 limit,
214 output_schema,
215 skipped: 0,
216 returned: 0,
217 }
218 }
219}
220
221impl Operator for LimitSkipOperator {
222 fn next(&mut self) -> OperatorResult {
223 if self.returned >= self.limit {
225 return Ok(None);
226 }
227
228 loop {
229 let Some(chunk) = self.child.next()? else {
230 return Ok(None);
231 };
232
233 let row_count = chunk.row_count();
234 if row_count == 0 {
235 continue;
236 }
237
238 let rows: Vec<usize> = chunk.selected_indices().collect();
239 let mut start_idx = 0;
240
241 if self.skipped < self.skip {
243 let to_skip = (self.skip - self.skipped).min(row_count);
244 if to_skip >= row_count {
245 self.skipped += row_count;
246 continue;
247 }
248 self.skipped = self.skip;
249 start_idx = to_skip;
250 }
251
252 let remaining_in_chunk = row_count - start_idx;
254 let remaining_to_return = self.limit - self.returned;
255 let to_return = remaining_in_chunk.min(remaining_to_return);
256
257 if to_return == 0 {
258 return Ok(None);
259 }
260
261 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, to_return);
262
263 for &row in rows.iter().skip(start_idx).take(to_return) {
264 for col_idx in 0..chunk.column_count() {
265 if let (Some(src_col), Some(dst_col)) =
266 (chunk.column(col_idx), builder.column_mut(col_idx))
267 {
268 if let Some(value) = src_col.get_value(row) {
269 dst_col.push_value(value);
270 } else {
271 dst_col.push_value(Value::Null);
272 }
273 }
274 }
275 builder.advance_row();
276 }
277
278 self.returned += to_return;
279 return Ok(Some(builder.finish()));
280 }
281 }
282
283 fn reset(&mut self) {
284 self.child.reset();
285 self.skipped = 0;
286 self.returned = 0;
287 }
288
289 fn name(&self) -> &'static str {
290 "LimitSkip"
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297 use crate::execution::DataChunk;
298 use crate::execution::chunk::DataChunkBuilder;
299
300 struct MockOperator {
301 chunks: Vec<DataChunk>,
302 position: usize,
303 }
304
305 impl MockOperator {
306 fn new(chunks: Vec<DataChunk>) -> Self {
307 Self {
308 chunks,
309 position: 0,
310 }
311 }
312 }
313
314 impl Operator for MockOperator {
315 fn next(&mut self) -> OperatorResult {
316 if self.position < self.chunks.len() {
317 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
318 self.position += 1;
319 Ok(Some(chunk))
320 } else {
321 Ok(None)
322 }
323 }
324
325 fn reset(&mut self) {
326 self.position = 0;
327 }
328
329 fn name(&self) -> &'static str {
330 "Mock"
331 }
332 }
333
334 fn create_numbered_chunk(values: &[i64]) -> DataChunk {
335 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
336 for &v in values {
337 builder.column_mut(0).unwrap().push_int64(v);
338 builder.advance_row();
339 }
340 builder.finish()
341 }
342
343 #[test]
344 fn test_limit() {
345 let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3, 4, 5])]);
346
347 let mut limit = LimitOperator::new(Box::new(mock), 3, vec![LogicalType::Int64]);
348
349 let mut results = Vec::new();
350 while let Some(chunk) = limit.next().unwrap() {
351 for row in chunk.selected_indices() {
352 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
353 results.push(val);
354 }
355 }
356
357 assert_eq!(results, vec![1, 2, 3]);
358 }
359
360 #[test]
361 fn test_limit_larger_than_input() {
362 let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3])]);
363
364 let mut limit = LimitOperator::new(Box::new(mock), 10, vec![LogicalType::Int64]);
365
366 let mut results = Vec::new();
367 while let Some(chunk) = limit.next().unwrap() {
368 for row in chunk.selected_indices() {
369 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
370 results.push(val);
371 }
372 }
373
374 assert_eq!(results, vec![1, 2, 3]);
375 }
376
377 #[test]
378 fn test_skip() {
379 let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3, 4, 5])]);
380
381 let mut skip = SkipOperator::new(Box::new(mock), 2, vec![LogicalType::Int64]);
382
383 let mut results = Vec::new();
384 while let Some(chunk) = skip.next().unwrap() {
385 for row in chunk.selected_indices() {
386 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
387 results.push(val);
388 }
389 }
390
391 assert_eq!(results, vec![3, 4, 5]);
392 }
393
394 #[test]
395 fn test_skip_all() {
396 let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3])]);
397
398 let mut skip = SkipOperator::new(Box::new(mock), 5, vec![LogicalType::Int64]);
399
400 let result = skip.next().unwrap();
401 assert!(result.is_none());
402 }
403
404 #[test]
405 fn test_limit_skip_combined() {
406 let mock = MockOperator::new(vec![create_numbered_chunk(&[
407 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
408 ])]);
409
410 let mut op = LimitSkipOperator::new(
411 Box::new(mock),
412 3, 4, vec![LogicalType::Int64],
415 );
416
417 let mut results = Vec::new();
418 while let Some(chunk) = op.next().unwrap() {
419 for row in chunk.selected_indices() {
420 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
421 results.push(val);
422 }
423 }
424
425 assert_eq!(results, vec![4, 5, 6, 7]);
426 }
427
428 #[test]
429 fn test_limit_across_chunks() {
430 let mock = MockOperator::new(vec![
431 create_numbered_chunk(&[1, 2]),
432 create_numbered_chunk(&[3, 4]),
433 create_numbered_chunk(&[5, 6]),
434 ]);
435
436 let mut limit = LimitOperator::new(Box::new(mock), 5, vec![LogicalType::Int64]);
437
438 let mut results = Vec::new();
439 while let Some(chunk) = limit.next().unwrap() {
440 for row in chunk.selected_indices() {
441 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
442 results.push(val);
443 }
444 }
445
446 assert_eq!(results, vec![1, 2, 3, 4, 5]);
447 }
448
449 #[test]
450 fn test_skip_across_chunks() {
451 let mock = MockOperator::new(vec![
452 create_numbered_chunk(&[1, 2]),
453 create_numbered_chunk(&[3, 4]),
454 create_numbered_chunk(&[5, 6]),
455 ]);
456
457 let mut skip = SkipOperator::new(Box::new(mock), 3, vec![LogicalType::Int64]);
458
459 let mut results = Vec::new();
460 while let Some(chunk) = skip.next().unwrap() {
461 for row in chunk.selected_indices() {
462 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
463 results.push(val);
464 }
465 }
466
467 assert_eq!(results, vec![4, 5, 6]);
468 }
469}