grafeo_core/execution/operators/
limit.rs1use grafeo_common::types::{LogicalType, Value};
9
10use super::{Operator, OperatorResult};
11use crate::execution::chunk::DataChunkBuilder;
12
13pub struct LimitOperator {
17 child: Box<dyn Operator>,
19 limit: usize,
21 output_schema: Vec<LogicalType>,
23 returned: usize,
25}
26
27impl LimitOperator {
28 pub fn new(child: Box<dyn Operator>, limit: usize, output_schema: Vec<LogicalType>) -> Self {
30 Self {
31 child,
32 limit,
33 output_schema,
34 returned: 0,
35 }
36 }
37}
38
39impl Operator for LimitOperator {
40 fn next(&mut self) -> OperatorResult {
41 if self.returned >= self.limit {
42 return Ok(None);
43 }
44
45 let remaining = self.limit - self.returned;
46
47 loop {
48 let chunk = match self.child.next()? {
49 Some(c) => c,
50 None => return Ok(None),
51 };
52
53 let row_count = chunk.row_count();
54 if row_count == 0 {
55 continue;
56 }
57
58 if row_count <= remaining {
59 self.returned += row_count;
61 return Ok(Some(chunk));
62 } else {
63 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, remaining);
65
66 let mut count = 0;
67 for row in chunk.selected_indices() {
68 if count >= remaining {
69 break;
70 }
71
72 for col_idx in 0..chunk.column_count() {
73 if let (Some(src_col), Some(dst_col)) =
74 (chunk.column(col_idx), builder.column_mut(col_idx))
75 {
76 if let Some(value) = src_col.get_value(row) {
77 dst_col.push_value(value);
78 } else {
79 dst_col.push_value(Value::Null);
80 }
81 }
82 }
83 builder.advance_row();
84 count += 1;
85 }
86
87 self.returned += count;
88 return Ok(Some(builder.finish()));
89 }
90 }
91 }
92
93 fn reset(&mut self) {
94 self.child.reset();
95 self.returned = 0;
96 }
97
98 fn name(&self) -> &'static str {
99 "Limit"
100 }
101}
102
103pub struct SkipOperator {
107 child: Box<dyn Operator>,
109 skip: usize,
111 output_schema: Vec<LogicalType>,
113 skipped: usize,
115}
116
117impl SkipOperator {
118 pub fn new(child: Box<dyn Operator>, skip: usize, output_schema: Vec<LogicalType>) -> Self {
120 Self {
121 child,
122 skip,
123 output_schema,
124 skipped: 0,
125 }
126 }
127}
128
129impl Operator for SkipOperator {
130 fn next(&mut self) -> OperatorResult {
131 while self.skipped < self.skip {
133 let chunk = match self.child.next()? {
134 Some(c) => c,
135 None => return Ok(None),
136 };
137
138 let row_count = chunk.row_count();
139 let to_skip = (self.skip - self.skipped).min(row_count);
140
141 if to_skip >= row_count {
142 self.skipped += row_count;
144 continue;
145 }
146
147 self.skipped = self.skip;
149
150 let mut builder =
151 DataChunkBuilder::with_capacity(&self.output_schema, row_count - to_skip);
152
153 let rows: Vec<usize> = chunk.selected_indices().collect();
154 for &row in rows.iter().skip(to_skip) {
155 for col_idx in 0..chunk.column_count() {
156 if let (Some(src_col), Some(dst_col)) =
157 (chunk.column(col_idx), builder.column_mut(col_idx))
158 {
159 if let Some(value) = src_col.get_value(row) {
160 dst_col.push_value(value);
161 } else {
162 dst_col.push_value(Value::Null);
163 }
164 }
165 }
166 builder.advance_row();
167 }
168
169 return Ok(Some(builder.finish()));
170 }
171
172 self.child.next()
174 }
175
176 fn reset(&mut self) {
177 self.child.reset();
178 self.skipped = 0;
179 }
180
181 fn name(&self) -> &'static str {
182 "Skip"
183 }
184}
185
186pub struct LimitSkipOperator {
190 child: Box<dyn Operator>,
192 skip: usize,
194 limit: usize,
196 output_schema: Vec<LogicalType>,
198 skipped: usize,
200 returned: usize,
202}
203
204impl LimitSkipOperator {
205 pub fn new(
207 child: Box<dyn Operator>,
208 skip: usize,
209 limit: usize,
210 output_schema: Vec<LogicalType>,
211 ) -> Self {
212 Self {
213 child,
214 skip,
215 limit,
216 output_schema,
217 skipped: 0,
218 returned: 0,
219 }
220 }
221}
222
223impl Operator for LimitSkipOperator {
224 fn next(&mut self) -> OperatorResult {
225 if self.returned >= self.limit {
227 return Ok(None);
228 }
229
230 loop {
231 let chunk = match self.child.next()? {
232 Some(c) => c,
233 None => return Ok(None),
234 };
235
236 let row_count = chunk.row_count();
237 if row_count == 0 {
238 continue;
239 }
240
241 let rows: Vec<usize> = chunk.selected_indices().collect();
242 let mut start_idx = 0;
243
244 if self.skipped < self.skip {
246 let to_skip = (self.skip - self.skipped).min(row_count);
247 if to_skip >= row_count {
248 self.skipped += row_count;
249 continue;
250 }
251 self.skipped = self.skip;
252 start_idx = to_skip;
253 }
254
255 let remaining_in_chunk = row_count - start_idx;
257 let remaining_to_return = self.limit - self.returned;
258 let to_return = remaining_in_chunk.min(remaining_to_return);
259
260 if to_return == 0 {
261 return Ok(None);
262 }
263
264 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, to_return);
265
266 for &row in rows.iter().skip(start_idx).take(to_return) {
267 for col_idx in 0..chunk.column_count() {
268 if let (Some(src_col), Some(dst_col)) =
269 (chunk.column(col_idx), builder.column_mut(col_idx))
270 {
271 if let Some(value) = src_col.get_value(row) {
272 dst_col.push_value(value);
273 } else {
274 dst_col.push_value(Value::Null);
275 }
276 }
277 }
278 builder.advance_row();
279 }
280
281 self.returned += to_return;
282 return Ok(Some(builder.finish()));
283 }
284 }
285
286 fn reset(&mut self) {
287 self.child.reset();
288 self.skipped = 0;
289 self.returned = 0;
290 }
291
292 fn name(&self) -> &'static str {
293 "LimitSkip"
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use crate::execution::DataChunk;
301 use crate::execution::chunk::DataChunkBuilder;
302
303 struct MockOperator {
304 chunks: Vec<DataChunk>,
305 position: usize,
306 }
307
308 impl MockOperator {
309 fn new(chunks: Vec<DataChunk>) -> Self {
310 Self {
311 chunks,
312 position: 0,
313 }
314 }
315 }
316
317 impl Operator for MockOperator {
318 fn next(&mut self) -> OperatorResult {
319 if self.position < self.chunks.len() {
320 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
321 self.position += 1;
322 Ok(Some(chunk))
323 } else {
324 Ok(None)
325 }
326 }
327
328 fn reset(&mut self) {
329 self.position = 0;
330 }
331
332 fn name(&self) -> &'static str {
333 "Mock"
334 }
335 }
336
337 fn create_numbered_chunk(values: &[i64]) -> DataChunk {
338 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
339 for &v in values {
340 builder.column_mut(0).unwrap().push_int64(v);
341 builder.advance_row();
342 }
343 builder.finish()
344 }
345
346 #[test]
347 fn test_limit() {
348 let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3, 4, 5])]);
349
350 let mut limit = LimitOperator::new(Box::new(mock), 3, vec![LogicalType::Int64]);
351
352 let mut results = Vec::new();
353 while let Some(chunk) = limit.next().unwrap() {
354 for row in chunk.selected_indices() {
355 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
356 results.push(val);
357 }
358 }
359
360 assert_eq!(results, vec![1, 2, 3]);
361 }
362
363 #[test]
364 fn test_limit_larger_than_input() {
365 let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3])]);
366
367 let mut limit = LimitOperator::new(Box::new(mock), 10, vec![LogicalType::Int64]);
368
369 let mut results = Vec::new();
370 while let Some(chunk) = limit.next().unwrap() {
371 for row in chunk.selected_indices() {
372 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
373 results.push(val);
374 }
375 }
376
377 assert_eq!(results, vec![1, 2, 3]);
378 }
379
380 #[test]
381 fn test_skip() {
382 let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3, 4, 5])]);
383
384 let mut skip = SkipOperator::new(Box::new(mock), 2, vec![LogicalType::Int64]);
385
386 let mut results = Vec::new();
387 while let Some(chunk) = skip.next().unwrap() {
388 for row in chunk.selected_indices() {
389 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
390 results.push(val);
391 }
392 }
393
394 assert_eq!(results, vec![3, 4, 5]);
395 }
396
397 #[test]
398 fn test_skip_all() {
399 let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3])]);
400
401 let mut skip = SkipOperator::new(Box::new(mock), 5, vec![LogicalType::Int64]);
402
403 let result = skip.next().unwrap();
404 assert!(result.is_none());
405 }
406
407 #[test]
408 fn test_limit_skip_combined() {
409 let mock = MockOperator::new(vec![create_numbered_chunk(&[
410 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
411 ])]);
412
413 let mut op = LimitSkipOperator::new(
414 Box::new(mock),
415 3, 4, vec![LogicalType::Int64],
418 );
419
420 let mut results = Vec::new();
421 while let Some(chunk) = op.next().unwrap() {
422 for row in chunk.selected_indices() {
423 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
424 results.push(val);
425 }
426 }
427
428 assert_eq!(results, vec![4, 5, 6, 7]);
429 }
430
431 #[test]
432 fn test_limit_across_chunks() {
433 let mock = MockOperator::new(vec![
434 create_numbered_chunk(&[1, 2]),
435 create_numbered_chunk(&[3, 4]),
436 create_numbered_chunk(&[5, 6]),
437 ]);
438
439 let mut limit = LimitOperator::new(Box::new(mock), 5, vec![LogicalType::Int64]);
440
441 let mut results = Vec::new();
442 while let Some(chunk) = limit.next().unwrap() {
443 for row in chunk.selected_indices() {
444 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
445 results.push(val);
446 }
447 }
448
449 assert_eq!(results, vec![1, 2, 3, 4, 5]);
450 }
451
452 #[test]
453 fn test_skip_across_chunks() {
454 let mock = MockOperator::new(vec![
455 create_numbered_chunk(&[1, 2]),
456 create_numbered_chunk(&[3, 4]),
457 create_numbered_chunk(&[5, 6]),
458 ]);
459
460 let mut skip = SkipOperator::new(Box::new(mock), 3, vec![LogicalType::Int64]);
461
462 let mut results = Vec::new();
463 while let Some(chunk) = skip.next().unwrap() {
464 for row in chunk.selected_indices() {
465 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
466 results.push(val);
467 }
468 }
469
470 assert_eq!(results, vec![4, 5, 6]);
471 }
472}