1use super::chunk::DataChunk;
8use super::operators::OperatorError;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14#[non_exhaustive]
15pub enum ChunkSizeHint {
16 Default,
18 Small,
20 Large,
22 Exact(usize),
24 AtMost(usize),
26}
27
28impl Default for ChunkSizeHint {
29 fn default() -> Self {
30 Self::Default
31 }
32}
33
34pub const DEFAULT_CHUNK_SIZE: usize = 2048;
36
37pub const SMALL_CHUNK_SIZE: usize = 512;
39
40pub const LARGE_CHUNK_SIZE: usize = 4096;
42
43pub trait Source: Send + Sync {
47 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError>;
55
56 fn reset(&mut self);
58
59 fn name(&self) -> &'static str;
61}
62
63pub trait Sink: Send + Sync {
67 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError>;
75
76 fn finalize(&mut self) -> Result<(), OperatorError>;
82
83 fn name(&self) -> &'static str;
85}
86
87pub trait PushOperator: Send + Sync {
93 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError>;
101
102 fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError>;
110
111 fn preferred_chunk_size(&self) -> ChunkSizeHint {
113 ChunkSizeHint::Default
114 }
115
116 fn name(&self) -> &'static str;
118}
119
120pub struct Pipeline {
122 source: Box<dyn Source>,
123 operators: Vec<Box<dyn PushOperator>>,
124 sink: Box<dyn Sink>,
125}
126
127impl Pipeline {
128 pub fn new(
130 source: Box<dyn Source>,
131 operators: Vec<Box<dyn PushOperator>>,
132 sink: Box<dyn Sink>,
133 ) -> Self {
134 Self {
135 source,
136 operators,
137 sink,
138 }
139 }
140
141 pub fn simple(source: Box<dyn Source>, sink: Box<dyn Sink>) -> Self {
143 Self {
144 source,
145 operators: Vec::new(),
146 sink,
147 }
148 }
149
150 #[must_use]
152 pub fn with_operator(mut self, op: Box<dyn PushOperator>) -> Self {
153 self.operators.push(op);
154 self
155 }
156
157 pub fn execute(&mut self) -> Result<(), OperatorError> {
163 let chunk_size = self.compute_chunk_size();
164
165 while let Some(chunk) = self.source.next_chunk(chunk_size)? {
167 if !self.push_through(chunk)? {
168 break;
170 }
171 }
172
173 self.finalize_all()
175 }
176
177 fn compute_chunk_size(&self) -> usize {
179 let mut size = DEFAULT_CHUNK_SIZE;
180
181 for op in &self.operators {
182 match op.preferred_chunk_size() {
183 ChunkSizeHint::Default => {}
184 ChunkSizeHint::Small => size = size.min(SMALL_CHUNK_SIZE),
185 ChunkSizeHint::Large => size = size.max(LARGE_CHUNK_SIZE),
186 ChunkSizeHint::Exact(s) => return s,
187 ChunkSizeHint::AtMost(s) => size = size.min(s),
188 }
189 }
190
191 size
192 }
193
194 fn push_through(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
196 if self.operators.is_empty() {
197 return self.sink.consume(chunk);
199 }
200
201 let mut current_chunk = chunk;
203 let num_operators = self.operators.len();
204
205 for i in 0..num_operators {
206 let is_last = i == num_operators - 1;
207
208 if is_last {
209 return self.operators[i].push(current_chunk, &mut *self.sink);
211 }
212
213 let mut collector = ChunkCollector::new();
215 let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
216
217 if !continue_processing || collector.is_empty() {
218 return Ok(continue_processing);
219 }
220
221 current_chunk = collector.into_single_chunk();
223 }
224
225 Ok(true)
226 }
227
228 fn finalize_all(&mut self) -> Result<(), OperatorError> {
230 if self.operators.is_empty() {
234 return self.sink.finalize();
235 }
236
237 for i in 0..self.operators.len() {
239 let is_last = i == self.operators.len() - 1;
240
241 if is_last {
242 self.operators[i].finalize(&mut *self.sink)?;
243 } else {
244 let mut collector = ChunkCollector::new();
246 self.operators[i].finalize(&mut collector)?;
247
248 for chunk in collector.into_chunks() {
249 self.push_through_from(chunk, i + 1)?;
251 }
252 }
253 }
254
255 self.sink.finalize()
256 }
257
258 fn push_through_from(&mut self, chunk: DataChunk, start: usize) -> Result<bool, OperatorError> {
260 let mut current_chunk = chunk;
261
262 for i in start..self.operators.len() {
263 let is_last = i == self.operators.len() - 1;
264
265 if is_last {
266 return self.operators[i].push(current_chunk, &mut *self.sink);
267 }
268
269 let mut collector = ChunkCollector::new();
270 let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
271
272 if !continue_processing || collector.is_empty() {
273 return Ok(continue_processing);
274 }
275
276 current_chunk = collector.into_single_chunk();
277 }
278
279 self.sink.consume(current_chunk)
280 }
281}
282
283pub struct ChunkCollector {
285 chunks: Vec<DataChunk>,
286}
287
288impl ChunkCollector {
289 pub fn new() -> Self {
291 Self { chunks: Vec::new() }
292 }
293
294 pub fn is_empty(&self) -> bool {
296 self.chunks.is_empty()
297 }
298
299 pub fn row_count(&self) -> usize {
301 self.chunks.iter().map(DataChunk::len).sum()
302 }
303
304 pub fn into_chunks(self) -> Vec<DataChunk> {
306 self.chunks
307 }
308
309 pub fn into_single_chunk(self) -> DataChunk {
315 if self.chunks.is_empty() {
316 return DataChunk::empty();
317 }
318 if self.chunks.len() == 1 {
319 return self
321 .chunks
322 .into_iter()
323 .next()
324 .expect("chunks has exactly one element: checked on previous line");
325 }
326
327 DataChunk::concat(&self.chunks)
329 }
330}
331
332impl Default for ChunkCollector {
333 fn default() -> Self {
334 Self::new()
335 }
336}
337
338impl Sink for ChunkCollector {
339 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
340 if !chunk.is_empty() {
341 self.chunks.push(chunk);
342 }
343 Ok(true)
344 }
345
346 fn finalize(&mut self) -> Result<(), OperatorError> {
347 Ok(())
348 }
349
350 fn name(&self) -> &'static str {
351 "ChunkCollector"
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358 use crate::execution::vector::ValueVector;
359 use grafeo_common::types::Value;
360
361 struct TestSource {
363 remaining: usize,
364 values_per_chunk: usize,
365 }
366
367 impl TestSource {
368 fn new(num_chunks: usize, values_per_chunk: usize) -> Self {
369 Self {
370 remaining: num_chunks,
371 values_per_chunk,
372 }
373 }
374 }
375
376 impl Source for TestSource {
377 fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
378 if self.remaining == 0 {
379 return Ok(None);
380 }
381 self.remaining -= 1;
382
383 let values: Vec<Value> = (0..self.values_per_chunk)
385 .map(|i| Value::Int64(i as i64))
386 .collect();
387 let vector = ValueVector::from_values(&values);
388 let chunk = DataChunk::new(vec![vector]);
389 Ok(Some(chunk))
390 }
391
392 fn reset(&mut self) {}
393
394 fn name(&self) -> &'static str {
395 "TestSource"
396 }
397 }
398
399 struct TestSink {
401 chunks: Vec<DataChunk>,
402 finalized: bool,
403 }
404
405 impl TestSink {
406 fn new() -> Self {
407 Self {
408 chunks: Vec::new(),
409 finalized: false,
410 }
411 }
412 }
413
414 impl Sink for TestSink {
415 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
416 self.chunks.push(chunk);
417 Ok(true)
418 }
419
420 fn finalize(&mut self) -> Result<(), OperatorError> {
421 self.finalized = true;
422 Ok(())
423 }
424
425 fn name(&self) -> &'static str {
426 "TestSink"
427 }
428 }
429
430 struct PassThroughOperator;
432
433 impl PushOperator for PassThroughOperator {
434 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
435 sink.consume(chunk)
436 }
437
438 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
439 Ok(())
440 }
441
442 fn name(&self) -> &'static str {
443 "PassThrough"
444 }
445 }
446
447 #[test]
448 fn test_simple_pipeline() {
449 let source = Box::new(TestSource::new(3, 10));
450 let sink = Box::new(TestSink::new());
451
452 let mut pipeline = Pipeline::simple(source, sink);
453 pipeline.execute().unwrap();
454
455 }
458
459 #[test]
460 fn test_pipeline_with_operator() {
461 let source = Box::new(TestSource::new(2, 5));
462 let sink = Box::new(TestSink::new());
463
464 let mut pipeline =
465 Pipeline::simple(source, sink).with_operator(Box::new(PassThroughOperator));
466
467 pipeline.execute().unwrap();
468 }
469
470 #[test]
471 fn test_chunk_collector() {
472 let mut collector = ChunkCollector::new();
473 assert!(collector.is_empty());
474
475 let values: Vec<Value> = vec![Value::Int64(1), Value::Int64(2)];
476 let vector = ValueVector::from_values(&values);
477 let chunk = DataChunk::new(vec![vector]);
478
479 collector.consume(chunk).unwrap();
480 assert!(!collector.is_empty());
481 assert_eq!(collector.row_count(), 2);
482
483 let merged = collector.into_single_chunk();
484 assert_eq!(merged.len(), 2);
485 }
486
487 #[test]
488 fn test_chunk_size_hints() {
489 assert_eq!(ChunkSizeHint::default(), ChunkSizeHint::Default);
490
491 let source = Box::new(TestSource::new(1, 10));
492 let sink = Box::new(TestSink::new());
493
494 struct SmallHintOp;
496 impl PushOperator for SmallHintOp {
497 fn push(
498 &mut self,
499 chunk: DataChunk,
500 sink: &mut dyn Sink,
501 ) -> Result<bool, OperatorError> {
502 sink.consume(chunk)
503 }
504 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
505 Ok(())
506 }
507 fn preferred_chunk_size(&self) -> ChunkSizeHint {
508 ChunkSizeHint::Small
509 }
510 fn name(&self) -> &'static str {
511 "SmallHint"
512 }
513 }
514
515 let pipeline = Pipeline::simple(source, sink).with_operator(Box::new(SmallHintOp));
516
517 let computed_size = pipeline.compute_chunk_size();
518 assert!(computed_size <= SMALL_CHUNK_SIZE);
519 }
520}