Skip to main content

grafeo_core/execution/
pipeline.rs

1//! Push-based execution pipeline.
2//!
3//! This module provides push-based execution primitives where data flows
4//! forward through operators via `push()` calls, enabling better parallelism
5//! and cache utilization compared to pull-based execution.
6
7use super::chunk::DataChunk;
8use super::operators::OperatorError;
9
10/// Hint for preferred chunk size.
11///
12/// Operators can provide hints to optimize chunk sizing for their workload.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ChunkSizeHint {
15    /// Use default chunk size (2048 tuples).
16    Default,
17    /// Use small chunks (256-512 tuples) for LIMIT or high selectivity.
18    Small,
19    /// Use large chunks (4096 tuples) for full scans.
20    Large,
21    /// Use exact chunk size.
22    Exact(usize),
23    /// Use at most this many tuples (for LIMIT).
24    AtMost(usize),
25}
26
27impl Default for ChunkSizeHint {
28    fn default() -> Self {
29        Self::Default
30    }
31}
32
33/// Default chunk size in tuples.
34pub const DEFAULT_CHUNK_SIZE: usize = 2048;
35
36/// Small chunk size for high selectivity or LIMIT.
37pub const SMALL_CHUNK_SIZE: usize = 512;
38
39/// Large chunk size for full scans.
40pub const LARGE_CHUNK_SIZE: usize = 4096;
41
42/// Source of data chunks for a pipeline.
43///
44/// Sources produce chunks of data that flow through the pipeline.
45pub trait Source: Send + Sync {
46    /// Produce the next chunk of data.
47    ///
48    /// Returns `Ok(Some(chunk))` if data is available, `Ok(None)` if exhausted.
49    ///
50    /// # Errors
51    ///
52    /// Returns `Err` if the source fails to produce data.
53    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError>;
54
55    /// Reset the source to its initial state.
56    fn reset(&mut self);
57
58    /// Name of this source for debugging.
59    fn name(&self) -> &'static str;
60}
61
62/// Sink that receives output from operators.
63///
64/// Sinks consume data chunks produced by the pipeline.
65pub trait Sink: Send + Sync {
66    /// Consume a chunk of data.
67    ///
68    /// Returns `Ok(true)` to continue, `Ok(false)` to signal early termination.
69    ///
70    /// # Errors
71    ///
72    /// Returns `Err` if the sink fails to process the chunk.
73    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError>;
74
75    /// Called when all input has been processed.
76    ///
77    /// # Errors
78    ///
79    /// Returns `Err` if finalization fails.
80    fn finalize(&mut self) -> Result<(), OperatorError>;
81
82    /// Name of this sink for debugging.
83    fn name(&self) -> &'static str;
84}
85
86/// Push-based operator trait.
87///
88/// Unlike pull-based operators that return data on `next()` calls,
89/// push-based operators receive data via `push()` and forward results
90/// to a downstream sink.
91pub trait PushOperator: Send + Sync {
92    /// Process an incoming chunk and push results to the sink.
93    ///
94    /// Returns `Ok(true)` to continue processing, `Ok(false)` for early termination.
95    ///
96    /// # Errors
97    ///
98    /// Returns `Err` if the operator or sink fails during processing.
99    fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError>;
100
101    /// Called when all input has been processed.
102    ///
103    /// Pipeline breakers (Sort, Aggregate, etc.) emit their results here.
104    ///
105    /// # Errors
106    ///
107    /// Returns `Err` if finalization or downstream sink consumption fails.
108    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError>;
109
110    /// Hint for preferred chunk size.
111    fn preferred_chunk_size(&self) -> ChunkSizeHint {
112        ChunkSizeHint::Default
113    }
114
115    /// Name of this operator for debugging.
116    fn name(&self) -> &'static str;
117}
118
119/// Execution pipeline connecting source → operators → sink.
120pub struct Pipeline {
121    source: Box<dyn Source>,
122    operators: Vec<Box<dyn PushOperator>>,
123    sink: Box<dyn Sink>,
124}
125
126impl Pipeline {
127    /// Create a new pipeline.
128    pub fn new(
129        source: Box<dyn Source>,
130        operators: Vec<Box<dyn PushOperator>>,
131        sink: Box<dyn Sink>,
132    ) -> Self {
133        Self {
134            source,
135            operators,
136            sink,
137        }
138    }
139
140    /// Create a simple pipeline with just source and sink.
141    pub fn simple(source: Box<dyn Source>, sink: Box<dyn Sink>) -> Self {
142        Self {
143            source,
144            operators: Vec::new(),
145            sink,
146        }
147    }
148
149    /// Add an operator to the pipeline.
150    #[must_use]
151    pub fn with_operator(mut self, op: Box<dyn PushOperator>) -> Self {
152        self.operators.push(op);
153        self
154    }
155
156    /// Execute the pipeline.
157    ///
158    /// # Errors
159    ///
160    /// Returns `Err` if any source, operator, or sink fails during execution.
161    pub fn execute(&mut self) -> Result<(), OperatorError> {
162        let chunk_size = self.compute_chunk_size();
163
164        // Process all chunks from source
165        while let Some(chunk) = self.source.next_chunk(chunk_size)? {
166            if !self.push_through(chunk)? {
167                // Early termination requested
168                break;
169            }
170        }
171
172        // Finalize all operators (important for pipeline breakers)
173        self.finalize_all()
174    }
175
176    /// Compute optimal chunk size from operator hints.
177    fn compute_chunk_size(&self) -> usize {
178        let mut size = DEFAULT_CHUNK_SIZE;
179
180        for op in &self.operators {
181            match op.preferred_chunk_size() {
182                ChunkSizeHint::Default => {}
183                ChunkSizeHint::Small => size = size.min(SMALL_CHUNK_SIZE),
184                ChunkSizeHint::Large => size = size.max(LARGE_CHUNK_SIZE),
185                ChunkSizeHint::Exact(s) => return s,
186                ChunkSizeHint::AtMost(s) => size = size.min(s),
187            }
188        }
189
190        size
191    }
192
193    /// Push a chunk through the operator chain.
194    fn push_through(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
195        if self.operators.is_empty() {
196            // No operators, push directly to sink
197            return self.sink.consume(chunk);
198        }
199
200        // Build a chain: operators push to each other, final one pushes to sink
201        let mut current_chunk = chunk;
202        let num_operators = self.operators.len();
203
204        for i in 0..num_operators {
205            let is_last = i == num_operators - 1;
206
207            if is_last {
208                // Last operator pushes to the real sink
209                return self.operators[i].push(current_chunk, &mut *self.sink);
210            }
211
212            // Intermediate operators collect output
213            let mut collector = ChunkCollector::new();
214            let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
215
216            if !continue_processing || collector.is_empty() {
217                return Ok(continue_processing);
218            }
219
220            // Merge collected chunks for next operator
221            current_chunk = collector.into_single_chunk();
222        }
223
224        Ok(true)
225    }
226
227    /// Finalize all operators in reverse order.
228    fn finalize_all(&mut self) -> Result<(), OperatorError> {
229        // For pipeline breakers, finalize produces output
230        // We need to chain finalize calls through the operators
231
232        if self.operators.is_empty() {
233            return self.sink.finalize();
234        }
235
236        // Finalize operators in order, each pushing to the next
237        for i in 0..self.operators.len() {
238            let is_last = i == self.operators.len() - 1;
239
240            if is_last {
241                self.operators[i].finalize(&mut *self.sink)?;
242            } else {
243                // Collect finalize output and push through remaining operators
244                let mut collector = ChunkCollector::new();
245                self.operators[i].finalize(&mut collector)?;
246
247                for chunk in collector.into_chunks() {
248                    // Push through remaining operators
249                    self.push_through_from(chunk, i + 1)?;
250                }
251            }
252        }
253
254        self.sink.finalize()
255    }
256
257    /// Push a chunk through operators starting at index.
258    fn push_through_from(&mut self, chunk: DataChunk, start: usize) -> Result<bool, OperatorError> {
259        let mut current_chunk = chunk;
260
261        for i in start..self.operators.len() {
262            let is_last = i == self.operators.len() - 1;
263
264            if is_last {
265                return self.operators[i].push(current_chunk, &mut *self.sink);
266            }
267
268            let mut collector = ChunkCollector::new();
269            let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
270
271            if !continue_processing || collector.is_empty() {
272                return Ok(continue_processing);
273            }
274
275            current_chunk = collector.into_single_chunk();
276        }
277
278        self.sink.consume(current_chunk)
279    }
280}
281
282/// Collects chunks from operators for intermediate processing.
283pub struct ChunkCollector {
284    chunks: Vec<DataChunk>,
285}
286
287impl ChunkCollector {
288    /// Create a new chunk collector.
289    pub fn new() -> Self {
290        Self { chunks: Vec::new() }
291    }
292
293    /// Check if collector has any chunks.
294    pub fn is_empty(&self) -> bool {
295        self.chunks.is_empty()
296    }
297
298    /// Get total row count across all chunks.
299    pub fn row_count(&self) -> usize {
300        self.chunks.iter().map(DataChunk::len).sum()
301    }
302
303    /// Convert to a vector of chunks.
304    pub fn into_chunks(self) -> Vec<DataChunk> {
305        self.chunks
306    }
307
308    /// Merge all chunks into a single chunk.
309    ///
310    /// # Panics
311    ///
312    /// Panics if internal invariants are violated (single-element vec is unexpectedly empty).
313    pub fn into_single_chunk(self) -> DataChunk {
314        if self.chunks.is_empty() {
315            return DataChunk::empty();
316        }
317        if self.chunks.len() == 1 {
318            // Invariant: self.chunks.len() == 1 guarantees exactly one element
319            return self
320                .chunks
321                .into_iter()
322                .next()
323                .expect("chunks has exactly one element: checked on previous line");
324        }
325
326        // Concatenate all chunks
327        DataChunk::concat(&self.chunks)
328    }
329}
330
331impl Default for ChunkCollector {
332    fn default() -> Self {
333        Self::new()
334    }
335}
336
337impl Sink for ChunkCollector {
338    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
339        if !chunk.is_empty() {
340            self.chunks.push(chunk);
341        }
342        Ok(true)
343    }
344
345    fn finalize(&mut self) -> Result<(), OperatorError> {
346        Ok(())
347    }
348
349    fn name(&self) -> &'static str {
350        "ChunkCollector"
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357    use crate::execution::vector::ValueVector;
358    use grafeo_common::types::Value;
359
360    /// Test source that produces a fixed number of chunks.
361    struct TestSource {
362        remaining: usize,
363        values_per_chunk: usize,
364    }
365
366    impl TestSource {
367        fn new(num_chunks: usize, values_per_chunk: usize) -> Self {
368            Self {
369                remaining: num_chunks,
370                values_per_chunk,
371            }
372        }
373    }
374
375    impl Source for TestSource {
376        fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
377            if self.remaining == 0 {
378                return Ok(None);
379            }
380            self.remaining -= 1;
381
382            // Create a chunk with integer values
383            let values: Vec<Value> = (0..self.values_per_chunk)
384                .map(|i| Value::Int64(i as i64))
385                .collect();
386            let vector = ValueVector::from_values(&values);
387            let chunk = DataChunk::new(vec![vector]);
388            Ok(Some(chunk))
389        }
390
391        fn reset(&mut self) {}
392
393        fn name(&self) -> &'static str {
394            "TestSource"
395        }
396    }
397
398    /// Test sink that collects all chunks.
399    struct TestSink {
400        chunks: Vec<DataChunk>,
401        finalized: bool,
402    }
403
404    impl TestSink {
405        fn new() -> Self {
406            Self {
407                chunks: Vec::new(),
408                finalized: false,
409            }
410        }
411    }
412
413    impl Sink for TestSink {
414        fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
415            self.chunks.push(chunk);
416            Ok(true)
417        }
418
419        fn finalize(&mut self) -> Result<(), OperatorError> {
420            self.finalized = true;
421            Ok(())
422        }
423
424        fn name(&self) -> &'static str {
425            "TestSink"
426        }
427    }
428
429    /// Pass-through operator for testing.
430    struct PassThroughOperator;
431
432    impl PushOperator for PassThroughOperator {
433        fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
434            sink.consume(chunk)
435        }
436
437        fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
438            Ok(())
439        }
440
441        fn name(&self) -> &'static str {
442            "PassThrough"
443        }
444    }
445
446    #[test]
447    fn test_simple_pipeline() {
448        let source = Box::new(TestSource::new(3, 10));
449        let sink = Box::new(TestSink::new());
450
451        let mut pipeline = Pipeline::simple(source, sink);
452        pipeline.execute().unwrap();
453
454        // Access sink through downcast (in real code we'd use a different pattern)
455        // For this test, we verify execution completed without error
456    }
457
458    #[test]
459    fn test_pipeline_with_operator() {
460        let source = Box::new(TestSource::new(2, 5));
461        let sink = Box::new(TestSink::new());
462
463        let mut pipeline =
464            Pipeline::simple(source, sink).with_operator(Box::new(PassThroughOperator));
465
466        pipeline.execute().unwrap();
467    }
468
469    #[test]
470    fn test_chunk_collector() {
471        let mut collector = ChunkCollector::new();
472        assert!(collector.is_empty());
473
474        let values: Vec<Value> = vec![Value::Int64(1), Value::Int64(2)];
475        let vector = ValueVector::from_values(&values);
476        let chunk = DataChunk::new(vec![vector]);
477
478        collector.consume(chunk).unwrap();
479        assert!(!collector.is_empty());
480        assert_eq!(collector.row_count(), 2);
481
482        let merged = collector.into_single_chunk();
483        assert_eq!(merged.len(), 2);
484    }
485
486    #[test]
487    fn test_chunk_size_hints() {
488        assert_eq!(ChunkSizeHint::default(), ChunkSizeHint::Default);
489
490        let source = Box::new(TestSource::new(1, 10));
491        let sink = Box::new(TestSink::new());
492
493        // Test with small hint operator
494        struct SmallHintOp;
495        impl PushOperator for SmallHintOp {
496            fn push(
497                &mut self,
498                chunk: DataChunk,
499                sink: &mut dyn Sink,
500            ) -> Result<bool, OperatorError> {
501                sink.consume(chunk)
502            }
503            fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
504                Ok(())
505            }
506            fn preferred_chunk_size(&self) -> ChunkSizeHint {
507                ChunkSizeHint::Small
508            }
509            fn name(&self) -> &'static str {
510                "SmallHint"
511            }
512        }
513
514        let pipeline = Pipeline::simple(source, sink).with_operator(Box::new(SmallHintOp));
515
516        let computed_size = pipeline.compute_chunk_size();
517        assert!(computed_size <= SMALL_CHUNK_SIZE);
518    }
519}