graphos_core/execution/operators/push/
limit.rs1use crate::execution::chunk::DataChunk;
4use crate::execution::operators::OperatorError;
5use crate::execution::pipeline::{ChunkSizeHint, PushOperator, Sink};
6use crate::execution::selection::SelectionVector;
7
8pub struct LimitPushOperator {
12 limit: usize,
14 passed: usize,
16}
17
18impl LimitPushOperator {
19 pub fn new(limit: usize) -> Self {
21 Self { limit, passed: 0 }
22 }
23
24 pub fn limit(&self) -> usize {
26 self.limit
27 }
28
29 pub fn passed(&self) -> usize {
31 self.passed
32 }
33
34 pub fn is_exhausted(&self) -> bool {
36 self.passed >= self.limit
37 }
38}
39
40impl PushOperator for LimitPushOperator {
41 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
42 if self.passed >= self.limit {
43 return Ok(false);
45 }
46
47 let chunk_len = chunk.len();
48 let remaining = self.limit - self.passed;
49
50 if chunk_len <= remaining {
51 self.passed += chunk_len;
53 let should_continue = sink.consume(chunk)?;
54 Ok(should_continue && self.passed < self.limit)
55 } else {
56 self.passed += remaining;
58
59 let selection = SelectionVector::new_all(remaining);
61 let truncated = chunk.filter(&selection);
62
63 sink.consume(truncated)?;
64 Ok(false) }
66 }
67
68 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
69 Ok(())
71 }
72
73 fn preferred_chunk_size(&self) -> ChunkSizeHint {
74 if self.limit < 256 {
76 ChunkSizeHint::AtMost(self.limit)
77 } else if self.limit < 1000 {
78 ChunkSizeHint::Small
79 } else {
80 ChunkSizeHint::Default
81 }
82 }
83
84 fn name(&self) -> &'static str {
85 "LimitPush"
86 }
87}
88
89pub struct SkipPushOperator {
93 skip: usize,
95 skipped: usize,
97}
98
99impl SkipPushOperator {
100 pub fn new(skip: usize) -> Self {
102 Self { skip, skipped: 0 }
103 }
104
105 pub fn skip(&self) -> usize {
107 self.skip
108 }
109
110 pub fn skip_complete(&self) -> bool {
112 self.skipped >= self.skip
113 }
114}
115
116impl PushOperator for SkipPushOperator {
117 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
118 if self.skipped >= self.skip {
119 return sink.consume(chunk);
121 }
122
123 let chunk_len = chunk.len();
124 let remaining_to_skip = self.skip - self.skipped;
125
126 if chunk_len <= remaining_to_skip {
127 self.skipped += chunk_len;
129 Ok(true)
130 } else {
131 self.skipped = self.skip;
133
134 let start = remaining_to_skip;
135 let selection = SelectionVector::from_predicate(chunk_len, |i| i >= start);
136 let passed = chunk.filter(&selection);
137
138 sink.consume(passed)
139 }
140 }
141
142 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
143 Ok(())
144 }
145
146 fn name(&self) -> &'static str {
147 "SkipPush"
148 }
149}
150
151pub struct SkipLimitPushOperator {
155 skip: SkipPushOperator,
156 limit: LimitPushOperator,
157}
158
159impl SkipLimitPushOperator {
160 pub fn new(skip: usize, limit: usize) -> Self {
162 Self {
163 skip: SkipPushOperator::new(skip),
164 limit: LimitPushOperator::new(limit),
165 }
166 }
167}
168
169impl PushOperator for SkipLimitPushOperator {
170 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
171 if self.limit.is_exhausted() {
172 return Ok(false);
173 }
174
175 if !self.skip.skip_complete() {
176 let chunk_len = chunk.len();
179 let remaining_to_skip = self.skip.skip - self.skip.skipped;
180
181 if chunk_len <= remaining_to_skip {
182 self.skip.skipped += chunk_len;
184 return Ok(true);
185 }
186
187 self.skip.skipped = self.skip.skip;
189 let start = remaining_to_skip;
190 let selection = SelectionVector::from_predicate(chunk_len, |i| i >= start);
191 let passed = chunk.filter(&selection);
192
193 return self.limit.push(passed, sink);
194 }
195
196 self.limit.push(chunk, sink)
198 }
199
200 fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
201 self.skip.finalize(sink)?;
202 self.limit.finalize(sink)
203 }
204
205 fn preferred_chunk_size(&self) -> ChunkSizeHint {
206 self.limit.preferred_chunk_size()
207 }
208
209 fn name(&self) -> &'static str {
210 "SkipLimitPush"
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use crate::execution::sink::CollectorSink;
218 use crate::execution::vector::ValueVector;
219 use graphos_common::types::Value;
220
221 fn create_test_chunk(values: &[i64]) -> DataChunk {
222 let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
223 let vector = ValueVector::from_values(&v);
224 DataChunk::new(vec![vector])
225 }
226
227 #[test]
228 fn test_limit_under_limit() {
229 let mut limit = LimitPushOperator::new(10);
230 let mut sink = CollectorSink::new();
231
232 limit
233 .push(create_test_chunk(&[1, 2, 3]), &mut sink)
234 .unwrap();
235 limit.finalize(&mut sink).unwrap();
236
237 assert_eq!(sink.row_count(), 3);
238 assert!(!limit.is_exhausted());
239 }
240
241 #[test]
242 fn test_limit_exact_limit() {
243 let mut limit = LimitPushOperator::new(5);
244 let mut sink = CollectorSink::new();
245
246 let should_continue = limit
247 .push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
248 .unwrap();
249 limit.finalize(&mut sink).unwrap();
250
251 assert_eq!(sink.row_count(), 5);
252 assert!(!should_continue);
253 assert!(limit.is_exhausted());
254 }
255
256 #[test]
257 fn test_limit_over_limit() {
258 let mut limit = LimitPushOperator::new(3);
259 let mut sink = CollectorSink::new();
260
261 let should_continue = limit
262 .push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
263 .unwrap();
264 limit.finalize(&mut sink).unwrap();
265
266 assert_eq!(sink.row_count(), 3);
267 assert!(!should_continue);
268 }
269
270 #[test]
271 fn test_limit_multiple_chunks() {
272 let mut limit = LimitPushOperator::new(5);
273 let mut sink = CollectorSink::new();
274
275 limit.push(create_test_chunk(&[1, 2]), &mut sink).unwrap();
276 limit.push(create_test_chunk(&[3, 4]), &mut sink).unwrap();
277 let should_continue = limit
278 .push(create_test_chunk(&[5, 6, 7]), &mut sink)
279 .unwrap();
280 limit.finalize(&mut sink).unwrap();
281
282 assert_eq!(sink.row_count(), 5);
283 assert!(!should_continue);
284 }
285
286 #[test]
287 fn test_skip_under_skip() {
288 let mut skip = SkipPushOperator::new(10);
289 let mut sink = CollectorSink::new();
290
291 skip.push(create_test_chunk(&[1, 2, 3]), &mut sink).unwrap();
292 skip.finalize(&mut sink).unwrap();
293
294 assert_eq!(sink.row_count(), 0);
295 assert!(!skip.skip_complete());
296 }
297
298 #[test]
299 fn test_skip_exact_skip() {
300 let mut skip = SkipPushOperator::new(3);
301 let mut sink = CollectorSink::new();
302
303 skip.push(create_test_chunk(&[1, 2, 3]), &mut sink).unwrap();
304 skip.push(create_test_chunk(&[4, 5, 6]), &mut sink).unwrap();
305 skip.finalize(&mut sink).unwrap();
306
307 assert_eq!(sink.row_count(), 3); assert!(skip.skip_complete());
309 }
310
311 #[test]
312 fn test_skip_partial() {
313 let mut skip = SkipPushOperator::new(2);
314 let mut sink = CollectorSink::new();
315
316 skip.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
317 .unwrap();
318 skip.finalize(&mut sink).unwrap();
319
320 assert_eq!(sink.row_count(), 3); }
322
323 #[test]
324 fn test_skip_limit_combined() {
325 let mut op = SkipLimitPushOperator::new(2, 3);
326 let mut sink = CollectorSink::new();
327
328 op.push(create_test_chunk(&[1, 2, 3, 4, 5, 6, 7]), &mut sink)
329 .unwrap();
330 op.finalize(&mut sink).unwrap();
331
332 assert_eq!(sink.row_count(), 3); }
334}