1use super::chunk::DataChunk;
8use super::operators::OperatorError;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ChunkSizeHint {
15 Default,
17 Small,
19 Large,
21 Exact(usize),
23 AtMost(usize),
25}
26
27impl Default for ChunkSizeHint {
28 fn default() -> Self {
29 Self::Default
30 }
31}
32
33pub const DEFAULT_CHUNK_SIZE: usize = 2048;
35
36pub const SMALL_CHUNK_SIZE: usize = 512;
38
39pub const LARGE_CHUNK_SIZE: usize = 4096;
41
42pub trait Source: Send + Sync {
46 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError>;
54
55 fn reset(&mut self);
57
58 fn name(&self) -> &'static str;
60}
61
62pub trait Sink: Send + Sync {
66 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError>;
74
75 fn finalize(&mut self) -> Result<(), OperatorError>;
81
82 fn name(&self) -> &'static str;
84}
85
86pub trait PushOperator: Send + Sync {
92 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError>;
100
101 fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError>;
109
110 fn preferred_chunk_size(&self) -> ChunkSizeHint {
112 ChunkSizeHint::Default
113 }
114
115 fn name(&self) -> &'static str;
117}
118
119pub struct Pipeline {
121 source: Box<dyn Source>,
122 operators: Vec<Box<dyn PushOperator>>,
123 sink: Box<dyn Sink>,
124}
125
126impl Pipeline {
127 pub fn new(
129 source: Box<dyn Source>,
130 operators: Vec<Box<dyn PushOperator>>,
131 sink: Box<dyn Sink>,
132 ) -> Self {
133 Self {
134 source,
135 operators,
136 sink,
137 }
138 }
139
140 pub fn simple(source: Box<dyn Source>, sink: Box<dyn Sink>) -> Self {
142 Self {
143 source,
144 operators: Vec::new(),
145 sink,
146 }
147 }
148
149 #[must_use]
151 pub fn with_operator(mut self, op: Box<dyn PushOperator>) -> Self {
152 self.operators.push(op);
153 self
154 }
155
156 pub fn execute(&mut self) -> Result<(), OperatorError> {
162 let chunk_size = self.compute_chunk_size();
163
164 while let Some(chunk) = self.source.next_chunk(chunk_size)? {
166 if !self.push_through(chunk)? {
167 break;
169 }
170 }
171
172 self.finalize_all()
174 }
175
176 fn compute_chunk_size(&self) -> usize {
178 let mut size = DEFAULT_CHUNK_SIZE;
179
180 for op in &self.operators {
181 match op.preferred_chunk_size() {
182 ChunkSizeHint::Default => {}
183 ChunkSizeHint::Small => size = size.min(SMALL_CHUNK_SIZE),
184 ChunkSizeHint::Large => size = size.max(LARGE_CHUNK_SIZE),
185 ChunkSizeHint::Exact(s) => return s,
186 ChunkSizeHint::AtMost(s) => size = size.min(s),
187 }
188 }
189
190 size
191 }
192
193 fn push_through(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
195 if self.operators.is_empty() {
196 return self.sink.consume(chunk);
198 }
199
200 let mut current_chunk = chunk;
202 let num_operators = self.operators.len();
203
204 for i in 0..num_operators {
205 let is_last = i == num_operators - 1;
206
207 if is_last {
208 return self.operators[i].push(current_chunk, &mut *self.sink);
210 }
211
212 let mut collector = ChunkCollector::new();
214 let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
215
216 if !continue_processing || collector.is_empty() {
217 return Ok(continue_processing);
218 }
219
220 current_chunk = collector.into_single_chunk();
222 }
223
224 Ok(true)
225 }
226
227 fn finalize_all(&mut self) -> Result<(), OperatorError> {
229 if self.operators.is_empty() {
233 return self.sink.finalize();
234 }
235
236 for i in 0..self.operators.len() {
238 let is_last = i == self.operators.len() - 1;
239
240 if is_last {
241 self.operators[i].finalize(&mut *self.sink)?;
242 } else {
243 let mut collector = ChunkCollector::new();
245 self.operators[i].finalize(&mut collector)?;
246
247 for chunk in collector.into_chunks() {
248 self.push_through_from(chunk, i + 1)?;
250 }
251 }
252 }
253
254 self.sink.finalize()
255 }
256
257 fn push_through_from(&mut self, chunk: DataChunk, start: usize) -> Result<bool, OperatorError> {
259 let mut current_chunk = chunk;
260
261 for i in start..self.operators.len() {
262 let is_last = i == self.operators.len() - 1;
263
264 if is_last {
265 return self.operators[i].push(current_chunk, &mut *self.sink);
266 }
267
268 let mut collector = ChunkCollector::new();
269 let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
270
271 if !continue_processing || collector.is_empty() {
272 return Ok(continue_processing);
273 }
274
275 current_chunk = collector.into_single_chunk();
276 }
277
278 self.sink.consume(current_chunk)
279 }
280}
281
282pub struct ChunkCollector {
284 chunks: Vec<DataChunk>,
285}
286
287impl ChunkCollector {
288 pub fn new() -> Self {
290 Self { chunks: Vec::new() }
291 }
292
293 pub fn is_empty(&self) -> bool {
295 self.chunks.is_empty()
296 }
297
298 pub fn row_count(&self) -> usize {
300 self.chunks.iter().map(DataChunk::len).sum()
301 }
302
303 pub fn into_chunks(self) -> Vec<DataChunk> {
305 self.chunks
306 }
307
308 pub fn into_single_chunk(self) -> DataChunk {
314 if self.chunks.is_empty() {
315 return DataChunk::empty();
316 }
317 if self.chunks.len() == 1 {
318 return self
320 .chunks
321 .into_iter()
322 .next()
323 .expect("chunks has exactly one element: checked on previous line");
324 }
325
326 DataChunk::concat(&self.chunks)
328 }
329}
330
331impl Default for ChunkCollector {
332 fn default() -> Self {
333 Self::new()
334 }
335}
336
337impl Sink for ChunkCollector {
338 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
339 if !chunk.is_empty() {
340 self.chunks.push(chunk);
341 }
342 Ok(true)
343 }
344
345 fn finalize(&mut self) -> Result<(), OperatorError> {
346 Ok(())
347 }
348
349 fn name(&self) -> &'static str {
350 "ChunkCollector"
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357 use crate::execution::vector::ValueVector;
358 use grafeo_common::types::Value;
359
360 struct TestSource {
362 remaining: usize,
363 values_per_chunk: usize,
364 }
365
366 impl TestSource {
367 fn new(num_chunks: usize, values_per_chunk: usize) -> Self {
368 Self {
369 remaining: num_chunks,
370 values_per_chunk,
371 }
372 }
373 }
374
375 impl Source for TestSource {
376 fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
377 if self.remaining == 0 {
378 return Ok(None);
379 }
380 self.remaining -= 1;
381
382 let values: Vec<Value> = (0..self.values_per_chunk)
384 .map(|i| Value::Int64(i as i64))
385 .collect();
386 let vector = ValueVector::from_values(&values);
387 let chunk = DataChunk::new(vec![vector]);
388 Ok(Some(chunk))
389 }
390
391 fn reset(&mut self) {}
392
393 fn name(&self) -> &'static str {
394 "TestSource"
395 }
396 }
397
398 struct TestSink {
400 chunks: Vec<DataChunk>,
401 finalized: bool,
402 }
403
404 impl TestSink {
405 fn new() -> Self {
406 Self {
407 chunks: Vec::new(),
408 finalized: false,
409 }
410 }
411 }
412
413 impl Sink for TestSink {
414 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
415 self.chunks.push(chunk);
416 Ok(true)
417 }
418
419 fn finalize(&mut self) -> Result<(), OperatorError> {
420 self.finalized = true;
421 Ok(())
422 }
423
424 fn name(&self) -> &'static str {
425 "TestSink"
426 }
427 }
428
429 struct PassThroughOperator;
431
432 impl PushOperator for PassThroughOperator {
433 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
434 sink.consume(chunk)
435 }
436
437 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
438 Ok(())
439 }
440
441 fn name(&self) -> &'static str {
442 "PassThrough"
443 }
444 }
445
446 #[test]
447 fn test_simple_pipeline() {
448 let source = Box::new(TestSource::new(3, 10));
449 let sink = Box::new(TestSink::new());
450
451 let mut pipeline = Pipeline::simple(source, sink);
452 pipeline.execute().unwrap();
453
454 }
457
458 #[test]
459 fn test_pipeline_with_operator() {
460 let source = Box::new(TestSource::new(2, 5));
461 let sink = Box::new(TestSink::new());
462
463 let mut pipeline =
464 Pipeline::simple(source, sink).with_operator(Box::new(PassThroughOperator));
465
466 pipeline.execute().unwrap();
467 }
468
469 #[test]
470 fn test_chunk_collector() {
471 let mut collector = ChunkCollector::new();
472 assert!(collector.is_empty());
473
474 let values: Vec<Value> = vec![Value::Int64(1), Value::Int64(2)];
475 let vector = ValueVector::from_values(&values);
476 let chunk = DataChunk::new(vec![vector]);
477
478 collector.consume(chunk).unwrap();
479 assert!(!collector.is_empty());
480 assert_eq!(collector.row_count(), 2);
481
482 let merged = collector.into_single_chunk();
483 assert_eq!(merged.len(), 2);
484 }
485
486 #[test]
487 fn test_chunk_size_hints() {
488 assert_eq!(ChunkSizeHint::default(), ChunkSizeHint::Default);
489
490 let source = Box::new(TestSource::new(1, 10));
491 let sink = Box::new(TestSink::new());
492
493 struct SmallHintOp;
495 impl PushOperator for SmallHintOp {
496 fn push(
497 &mut self,
498 chunk: DataChunk,
499 sink: &mut dyn Sink,
500 ) -> Result<bool, OperatorError> {
501 sink.consume(chunk)
502 }
503 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
504 Ok(())
505 }
506 fn preferred_chunk_size(&self) -> ChunkSizeHint {
507 ChunkSizeHint::Small
508 }
509 fn name(&self) -> &'static str {
510 "SmallHint"
511 }
512 }
513
514 let pipeline = Pipeline::simple(source, sink).with_operator(Box::new(SmallHintOp));
515
516 let computed_size = pipeline.compute_chunk_size();
517 assert!(computed_size <= SMALL_CHUNK_SIZE);
518 }
519}