grafeo_core/execution/operators/
limit.rs1use grafeo_common::types::{LogicalType, Value};
9
10use super::{Operator, OperatorResult};
11use crate::execution::chunk::DataChunkBuilder;
12
13pub struct LimitOperator {
17 child: Box<dyn Operator>,
19 limit: usize,
21 output_schema: Vec<LogicalType>,
23 returned: usize,
25}
26
27impl LimitOperator {
28 pub fn new(child: Box<dyn Operator>, limit: usize, output_schema: Vec<LogicalType>) -> Self {
30 Self {
31 child,
32 limit,
33 output_schema,
34 returned: 0,
35 }
36 }
37
38 pub fn into_parts(self) -> (Box<dyn Operator>, usize) {
40 (self.child, self.limit)
41 }
42}
43
44impl Operator for LimitOperator {
45 fn next(&mut self) -> OperatorResult {
46 if self.returned >= self.limit {
47 return Ok(None);
48 }
49
50 let remaining = self.limit - self.returned;
51
52 loop {
53 let Some(chunk) = self.child.next()? else {
54 return Ok(None);
55 };
56
57 let row_count = chunk.row_count();
58 if row_count == 0 {
59 continue;
60 }
61
62 if row_count <= remaining {
63 self.returned += row_count;
65 return Ok(Some(chunk));
66 }
67
68 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, remaining);
70
71 let mut count = 0;
72 for row in chunk.selected_indices() {
73 if count >= remaining {
74 break;
75 }
76
77 for col_idx in 0..chunk.column_count() {
78 if let (Some(src_col), Some(dst_col)) =
79 (chunk.column(col_idx), builder.column_mut(col_idx))
80 {
81 if let Some(value) = src_col.get_value(row) {
82 dst_col.push_value(value);
83 } else {
84 dst_col.push_value(Value::Null);
85 }
86 }
87 }
88 builder.advance_row();
89 count += 1;
90 }
91
92 self.returned += count;
93 return Ok(Some(builder.finish()));
94 }
95 }
96
97 fn reset(&mut self) {
98 self.child.reset();
99 self.returned = 0;
100 }
101
102 fn name(&self) -> &'static str {
103 "Limit"
104 }
105
106 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
107 self
108 }
109}
110
111pub struct SkipOperator {
115 child: Box<dyn Operator>,
117 skip: usize,
119 output_schema: Vec<LogicalType>,
121 skipped: usize,
123}
124
125impl SkipOperator {
126 pub fn new(child: Box<dyn Operator>, skip: usize, output_schema: Vec<LogicalType>) -> Self {
128 Self {
129 child,
130 skip,
131 output_schema,
132 skipped: 0,
133 }
134 }
135}
136
137impl Operator for SkipOperator {
138 fn next(&mut self) -> OperatorResult {
139 while self.skipped < self.skip {
141 let Some(chunk) = self.child.next()? else {
142 return Ok(None);
143 };
144
145 let row_count = chunk.row_count();
146 let to_skip = (self.skip - self.skipped).min(row_count);
147
148 if to_skip >= row_count {
149 self.skipped += row_count;
151 continue;
152 }
153
154 self.skipped = self.skip;
156
157 let mut builder =
158 DataChunkBuilder::with_capacity(&self.output_schema, row_count - to_skip);
159
160 let rows: Vec<usize> = chunk.selected_indices().collect();
161 for &row in rows.iter().skip(to_skip) {
162 for col_idx in 0..chunk.column_count() {
163 if let (Some(src_col), Some(dst_col)) =
164 (chunk.column(col_idx), builder.column_mut(col_idx))
165 {
166 if let Some(value) = src_col.get_value(row) {
167 dst_col.push_value(value);
168 } else {
169 dst_col.push_value(Value::Null);
170 }
171 }
172 }
173 builder.advance_row();
174 }
175
176 return Ok(Some(builder.finish()));
177 }
178
179 self.child.next()
181 }
182
183 fn reset(&mut self) {
184 self.child.reset();
185 self.skipped = 0;
186 }
187
188 fn name(&self) -> &'static str {
189 "Skip"
190 }
191
192 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
193 self
194 }
195}
196
197pub struct LimitSkipOperator {
201 child: Box<dyn Operator>,
203 skip: usize,
205 limit: usize,
207 output_schema: Vec<LogicalType>,
209 skipped: usize,
211 returned: usize,
213}
214
215impl LimitSkipOperator {
216 pub fn new(
218 child: Box<dyn Operator>,
219 skip: usize,
220 limit: usize,
221 output_schema: Vec<LogicalType>,
222 ) -> Self {
223 Self {
224 child,
225 skip,
226 limit,
227 output_schema,
228 skipped: 0,
229 returned: 0,
230 }
231 }
232}
233
234impl Operator for LimitSkipOperator {
235 fn next(&mut self) -> OperatorResult {
236 if self.returned >= self.limit {
238 return Ok(None);
239 }
240
241 loop {
242 let Some(chunk) = self.child.next()? else {
243 return Ok(None);
244 };
245
246 let row_count = chunk.row_count();
247 if row_count == 0 {
248 continue;
249 }
250
251 let rows: Vec<usize> = chunk.selected_indices().collect();
252 let mut start_idx = 0;
253
254 if self.skipped < self.skip {
256 let to_skip = (self.skip - self.skipped).min(row_count);
257 if to_skip >= row_count {
258 self.skipped += row_count;
259 continue;
260 }
261 self.skipped = self.skip;
262 start_idx = to_skip;
263 }
264
265 let remaining_in_chunk = row_count - start_idx;
267 let remaining_to_return = self.limit - self.returned;
268 let to_return = remaining_in_chunk.min(remaining_to_return);
269
270 if to_return == 0 {
271 return Ok(None);
272 }
273
274 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, to_return);
275
276 for &row in rows.iter().skip(start_idx).take(to_return) {
277 for col_idx in 0..chunk.column_count() {
278 if let (Some(src_col), Some(dst_col)) =
279 (chunk.column(col_idx), builder.column_mut(col_idx))
280 {
281 if let Some(value) = src_col.get_value(row) {
282 dst_col.push_value(value);
283 } else {
284 dst_col.push_value(Value::Null);
285 }
286 }
287 }
288 builder.advance_row();
289 }
290
291 self.returned += to_return;
292 return Ok(Some(builder.finish()));
293 }
294 }
295
296 fn reset(&mut self) {
297 self.child.reset();
298 self.skipped = 0;
299 self.returned = 0;
300 }
301
302 fn name(&self) -> &'static str {
303 "LimitSkip"
304 }
305
306 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
307 self
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use crate::execution::DataChunk;
315 use crate::execution::chunk::DataChunkBuilder;
316
317 struct MockOperator {
318 chunks: Vec<DataChunk>,
319 position: usize,
320 }
321
322 impl MockOperator {
323 fn new(chunks: Vec<DataChunk>) -> Self {
324 Self {
325 chunks,
326 position: 0,
327 }
328 }
329 }
330
331 impl Operator for MockOperator {
332 fn next(&mut self) -> OperatorResult {
333 if self.position < self.chunks.len() {
334 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
335 self.position += 1;
336 Ok(Some(chunk))
337 } else {
338 Ok(None)
339 }
340 }
341
342 fn reset(&mut self) {
343 self.position = 0;
344 }
345
346 fn name(&self) -> &'static str {
347 "Mock"
348 }
349
350 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
351 self
352 }
353 }
354
355 fn create_numbered_chunk(values: &[i64]) -> DataChunk {
356 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
357 for &v in values {
358 builder.column_mut(0).unwrap().push_int64(v);
359 builder.advance_row();
360 }
361 builder.finish()
362 }
363
364 #[test]
365 fn test_limit() {
366 let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3, 4, 5])]);
367
368 let mut limit = LimitOperator::new(Box::new(mock), 3, vec![LogicalType::Int64]);
369
370 let mut results = Vec::new();
371 while let Some(chunk) = limit.next().unwrap() {
372 for row in chunk.selected_indices() {
373 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
374 results.push(val);
375 }
376 }
377
378 assert_eq!(results, vec![1, 2, 3]);
379 }
380
381 #[test]
382 fn test_limit_larger_than_input() {
383 let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3])]);
384
385 let mut limit = LimitOperator::new(Box::new(mock), 10, vec![LogicalType::Int64]);
386
387 let mut results = Vec::new();
388 while let Some(chunk) = limit.next().unwrap() {
389 for row in chunk.selected_indices() {
390 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
391 results.push(val);
392 }
393 }
394
395 assert_eq!(results, vec![1, 2, 3]);
396 }
397
398 #[test]
399 fn test_skip() {
400 let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3, 4, 5])]);
401
402 let mut skip = SkipOperator::new(Box::new(mock), 2, vec![LogicalType::Int64]);
403
404 let mut results = Vec::new();
405 while let Some(chunk) = skip.next().unwrap() {
406 for row in chunk.selected_indices() {
407 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
408 results.push(val);
409 }
410 }
411
412 assert_eq!(results, vec![3, 4, 5]);
413 }
414
415 #[test]
416 fn test_skip_all() {
417 let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3])]);
418
419 let mut skip = SkipOperator::new(Box::new(mock), 5, vec![LogicalType::Int64]);
420
421 let result = skip.next().unwrap();
422 assert!(result.is_none());
423 }
424
425 #[test]
426 fn test_limit_skip_combined() {
427 let mock = MockOperator::new(vec![create_numbered_chunk(&[
428 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
429 ])]);
430
431 let mut op = LimitSkipOperator::new(
432 Box::new(mock),
433 3, 4, vec![LogicalType::Int64],
436 );
437
438 let mut results = Vec::new();
439 while let Some(chunk) = op.next().unwrap() {
440 for row in chunk.selected_indices() {
441 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
442 results.push(val);
443 }
444 }
445
446 assert_eq!(results, vec![4, 5, 6, 7]);
447 }
448
449 #[test]
450 fn test_limit_across_chunks() {
451 let mock = MockOperator::new(vec![
452 create_numbered_chunk(&[1, 2]),
453 create_numbered_chunk(&[3, 4]),
454 create_numbered_chunk(&[5, 6]),
455 ]);
456
457 let mut limit = LimitOperator::new(Box::new(mock), 5, vec![LogicalType::Int64]);
458
459 let mut results = Vec::new();
460 while let Some(chunk) = limit.next().unwrap() {
461 for row in chunk.selected_indices() {
462 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
463 results.push(val);
464 }
465 }
466
467 assert_eq!(results, vec![1, 2, 3, 4, 5]);
468 }
469
470 #[test]
471 fn test_skip_across_chunks() {
472 let mock = MockOperator::new(vec![
473 create_numbered_chunk(&[1, 2]),
474 create_numbered_chunk(&[3, 4]),
475 create_numbered_chunk(&[5, 6]),
476 ]);
477
478 let mut skip = SkipOperator::new(Box::new(mock), 3, vec![LogicalType::Int64]);
479
480 let mut results = Vec::new();
481 while let Some(chunk) = skip.next().unwrap() {
482 for row in chunk.selected_indices() {
483 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
484 results.push(val);
485 }
486 }
487
488 assert_eq!(results, vec![4, 5, 6]);
489 }
490
491 #[test]
492 fn test_limit_into_parts() {
493 let child = Box::new(MockOperator::new(vec![]));
494 let limit = LimitOperator::new(child, 42, vec![LogicalType::Int64]);
495 let (_, limit_value) = limit.into_parts();
496 assert_eq!(limit_value, 42);
497 }
498
499 #[test]
500 fn test_limit_into_any() {
501 let child = Box::new(MockOperator::new(vec![]));
502 let limit: Box<dyn Operator> = Box::new(LimitOperator::new(child, 10, vec![]));
503 let any = limit.into_any();
504 assert!(any.downcast::<LimitOperator>().is_ok());
505 }
506}