Skip to main content

grafeo_core/execution/
pipeline.rs

1//! Push-based execution pipeline.
2//!
3//! This module provides push-based execution primitives where data flows
4//! forward through operators via `push()` calls, enabling better parallelism
5//! and cache utilization compared to pull-based execution.
6
7use super::chunk::DataChunk;
8use super::operators::OperatorError;
9
10/// Hint for preferred chunk size.
11///
12/// Operators can provide hints to optimize chunk sizing for their workload.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14#[non_exhaustive]
15pub enum ChunkSizeHint {
16    /// Use default chunk size (2048 tuples).
17    Default,
18    /// Use small chunks (256-512 tuples) for LIMIT or high selectivity.
19    Small,
20    /// Use large chunks (4096 tuples) for full scans.
21    Large,
22    /// Use exact chunk size.
23    Exact(usize),
24    /// Use at most this many tuples (for LIMIT).
25    AtMost(usize),
26}
27
28impl Default for ChunkSizeHint {
29    fn default() -> Self {
30        Self::Default
31    }
32}
33
34/// Default chunk size in tuples.
35pub const DEFAULT_CHUNK_SIZE: usize = 2048;
36
37/// Small chunk size for high selectivity or LIMIT.
38pub const SMALL_CHUNK_SIZE: usize = 512;
39
40/// Large chunk size for full scans.
41pub const LARGE_CHUNK_SIZE: usize = 4096;
42
43/// Source of data chunks for a pipeline.
44///
45/// Sources produce chunks of data that flow through the pipeline.
46pub trait Source: Send + Sync {
47    /// Produce the next chunk of data.
48    ///
49    /// Returns `Ok(Some(chunk))` if data is available, `Ok(None)` if exhausted.
50    ///
51    /// # Errors
52    ///
53    /// Returns `Err` if the source fails to produce data.
54    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError>;
55
56    /// Reset the source to its initial state.
57    fn reset(&mut self);
58
59    /// Name of this source for debugging.
60    fn name(&self) -> &'static str;
61}
62
63/// Sink that receives output from operators.
64///
65/// Sinks consume data chunks produced by the pipeline.
66pub trait Sink: Send + Sync {
67    /// Consume a chunk of data.
68    ///
69    /// Returns `Ok(true)` to continue, `Ok(false)` to signal early termination.
70    ///
71    /// # Errors
72    ///
73    /// Returns `Err` if the sink fails to process the chunk.
74    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError>;
75
76    /// Called when all input has been processed.
77    ///
78    /// # Errors
79    ///
80    /// Returns `Err` if finalization fails.
81    fn finalize(&mut self) -> Result<(), OperatorError>;
82
83    /// Name of this sink for debugging.
84    fn name(&self) -> &'static str;
85}
86
87/// Push-based operator trait.
88///
89/// Unlike pull-based operators that return data on `next()` calls,
90/// push-based operators receive data via `push()` and forward results
91/// to a downstream sink.
92pub trait PushOperator: Send + Sync {
93    /// Process an incoming chunk and push results to the sink.
94    ///
95    /// Returns `Ok(true)` to continue processing, `Ok(false)` for early termination.
96    ///
97    /// # Errors
98    ///
99    /// Returns `Err` if the operator or sink fails during processing.
100    fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError>;
101
102    /// Called when all input has been processed.
103    ///
104    /// Pipeline breakers (Sort, Aggregate, etc.) emit their results here.
105    ///
106    /// # Errors
107    ///
108    /// Returns `Err` if finalization or downstream sink consumption fails.
109    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError>;
110
111    /// Hint for preferred chunk size.
112    fn preferred_chunk_size(&self) -> ChunkSizeHint {
113        ChunkSizeHint::Default
114    }
115
116    /// Name of this operator for debugging.
117    fn name(&self) -> &'static str;
118}
119
120/// Execution pipeline connecting source → operators → sink.
121pub struct Pipeline {
122    source: Box<dyn Source>,
123    operators: Vec<Box<dyn PushOperator>>,
124    sink: Box<dyn Sink>,
125}
126
127impl Pipeline {
128    /// Create a new pipeline.
129    pub fn new(
130        source: Box<dyn Source>,
131        operators: Vec<Box<dyn PushOperator>>,
132        sink: Box<dyn Sink>,
133    ) -> Self {
134        Self {
135            source,
136            operators,
137            sink,
138        }
139    }
140
141    /// Create a simple pipeline with just source and sink.
142    pub fn simple(source: Box<dyn Source>, sink: Box<dyn Sink>) -> Self {
143        Self {
144            source,
145            operators: Vec::new(),
146            sink,
147        }
148    }
149
150    /// Add an operator to the pipeline.
151    #[must_use]
152    pub fn with_operator(mut self, op: Box<dyn PushOperator>) -> Self {
153        self.operators.push(op);
154        self
155    }
156
157    /// Execute the pipeline.
158    ///
159    /// # Errors
160    ///
161    /// Returns `Err` if any source, operator, or sink fails during execution.
162    pub fn execute(&mut self) -> Result<(), OperatorError> {
163        let chunk_size = self.compute_chunk_size();
164
165        // Process all chunks from source
166        while let Some(chunk) = self.source.next_chunk(chunk_size)? {
167            if !self.push_through(chunk)? {
168                // Early termination requested
169                break;
170            }
171        }
172
173        // Finalize all operators (important for pipeline breakers)
174        self.finalize_all()
175    }
176
177    /// Compute optimal chunk size from operator hints.
178    fn compute_chunk_size(&self) -> usize {
179        let mut size = DEFAULT_CHUNK_SIZE;
180
181        for op in &self.operators {
182            match op.preferred_chunk_size() {
183                ChunkSizeHint::Default => {}
184                ChunkSizeHint::Small => size = size.min(SMALL_CHUNK_SIZE),
185                ChunkSizeHint::Large => size = size.max(LARGE_CHUNK_SIZE),
186                ChunkSizeHint::Exact(s) => return s,
187                ChunkSizeHint::AtMost(s) => size = size.min(s),
188            }
189        }
190
191        size
192    }
193
194    /// Push a chunk through the operator chain.
195    fn push_through(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
196        if self.operators.is_empty() {
197            // No operators, push directly to sink
198            return self.sink.consume(chunk);
199        }
200
201        // Build a chain: operators push to each other, final one pushes to sink
202        let mut current_chunk = chunk;
203        let num_operators = self.operators.len();
204
205        for i in 0..num_operators {
206            let is_last = i == num_operators - 1;
207
208            if is_last {
209                // Last operator pushes to the real sink
210                return self.operators[i].push(current_chunk, &mut *self.sink);
211            }
212
213            // Intermediate operators collect output
214            let mut collector = ChunkCollector::new();
215            let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
216
217            if !continue_processing || collector.is_empty() {
218                return Ok(continue_processing);
219            }
220
221            // Merge collected chunks for next operator
222            current_chunk = collector.into_single_chunk();
223        }
224
225        Ok(true)
226    }
227
228    /// Finalize all operators in reverse order.
229    fn finalize_all(&mut self) -> Result<(), OperatorError> {
230        // For pipeline breakers, finalize produces output
231        // We need to chain finalize calls through the operators
232
233        if self.operators.is_empty() {
234            return self.sink.finalize();
235        }
236
237        // Finalize operators in order, each pushing to the next
238        for i in 0..self.operators.len() {
239            let is_last = i == self.operators.len() - 1;
240
241            if is_last {
242                self.operators[i].finalize(&mut *self.sink)?;
243            } else {
244                // Collect finalize output and push through remaining operators
245                let mut collector = ChunkCollector::new();
246                self.operators[i].finalize(&mut collector)?;
247
248                for chunk in collector.into_chunks() {
249                    // Push through remaining operators
250                    self.push_through_from(chunk, i + 1)?;
251                }
252            }
253        }
254
255        self.sink.finalize()
256    }
257
258    /// Push a chunk through operators starting at index.
259    fn push_through_from(&mut self, chunk: DataChunk, start: usize) -> Result<bool, OperatorError> {
260        let mut current_chunk = chunk;
261
262        for i in start..self.operators.len() {
263            let is_last = i == self.operators.len() - 1;
264
265            if is_last {
266                return self.operators[i].push(current_chunk, &mut *self.sink);
267            }
268
269            let mut collector = ChunkCollector::new();
270            let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
271
272            if !continue_processing || collector.is_empty() {
273                return Ok(continue_processing);
274            }
275
276            current_chunk = collector.into_single_chunk();
277        }
278
279        self.sink.consume(current_chunk)
280    }
281}
282
283/// Collects chunks from operators for intermediate processing.
284pub struct ChunkCollector {
285    chunks: Vec<DataChunk>,
286}
287
288impl ChunkCollector {
289    /// Create a new chunk collector.
290    pub fn new() -> Self {
291        Self { chunks: Vec::new() }
292    }
293
294    /// Check if collector has any chunks.
295    pub fn is_empty(&self) -> bool {
296        self.chunks.is_empty()
297    }
298
299    /// Get total row count across all chunks.
300    pub fn row_count(&self) -> usize {
301        self.chunks.iter().map(DataChunk::len).sum()
302    }
303
304    /// Convert to a vector of chunks.
305    pub fn into_chunks(self) -> Vec<DataChunk> {
306        self.chunks
307    }
308
309    /// Merge all chunks into a single chunk.
310    ///
311    /// # Panics
312    ///
313    /// Panics if internal invariants are violated (single-element vec is unexpectedly empty).
314    pub fn into_single_chunk(self) -> DataChunk {
315        if self.chunks.is_empty() {
316            return DataChunk::empty();
317        }
318        if self.chunks.len() == 1 {
319            // Invariant: self.chunks.len() == 1 guarantees exactly one element
320            return self
321                .chunks
322                .into_iter()
323                .next()
324                .expect("chunks has exactly one element: checked on previous line");
325        }
326
327        // Concatenate all chunks
328        DataChunk::concat(&self.chunks)
329    }
330}
331
332impl Default for ChunkCollector {
333    fn default() -> Self {
334        Self::new()
335    }
336}
337
338impl Sink for ChunkCollector {
339    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
340        if !chunk.is_empty() {
341            self.chunks.push(chunk);
342        }
343        Ok(true)
344    }
345
346    fn finalize(&mut self) -> Result<(), OperatorError> {
347        Ok(())
348    }
349
350    fn name(&self) -> &'static str {
351        "ChunkCollector"
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358    use crate::execution::vector::ValueVector;
359    use grafeo_common::types::Value;
360
361    /// Test source that produces a fixed number of chunks.
362    struct TestSource {
363        remaining: usize,
364        values_per_chunk: usize,
365    }
366
367    impl TestSource {
368        fn new(num_chunks: usize, values_per_chunk: usize) -> Self {
369            Self {
370                remaining: num_chunks,
371                values_per_chunk,
372            }
373        }
374    }
375
376    impl Source for TestSource {
377        fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
378            if self.remaining == 0 {
379                return Ok(None);
380            }
381            self.remaining -= 1;
382
383            // Create a chunk with integer values
384            let values: Vec<Value> = (0..self.values_per_chunk)
385                .map(|i| Value::Int64(i as i64))
386                .collect();
387            let vector = ValueVector::from_values(&values);
388            let chunk = DataChunk::new(vec![vector]);
389            Ok(Some(chunk))
390        }
391
392        fn reset(&mut self) {}
393
394        fn name(&self) -> &'static str {
395            "TestSource"
396        }
397    }
398
399    /// Test sink that collects all chunks.
400    struct TestSink {
401        chunks: Vec<DataChunk>,
402        finalized: bool,
403    }
404
405    impl TestSink {
406        fn new() -> Self {
407            Self {
408                chunks: Vec::new(),
409                finalized: false,
410            }
411        }
412    }
413
414    impl Sink for TestSink {
415        fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
416            self.chunks.push(chunk);
417            Ok(true)
418        }
419
420        fn finalize(&mut self) -> Result<(), OperatorError> {
421            self.finalized = true;
422            Ok(())
423        }
424
425        fn name(&self) -> &'static str {
426            "TestSink"
427        }
428    }
429
430    /// Pass-through operator for testing.
431    struct PassThroughOperator;
432
433    impl PushOperator for PassThroughOperator {
434        fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
435            sink.consume(chunk)
436        }
437
438        fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
439            Ok(())
440        }
441
442        fn name(&self) -> &'static str {
443            "PassThrough"
444        }
445    }
446
447    #[test]
448    fn test_simple_pipeline() {
449        let source = Box::new(TestSource::new(3, 10));
450        let sink = Box::new(TestSink::new());
451
452        let mut pipeline = Pipeline::simple(source, sink);
453        pipeline.execute().unwrap();
454
455        // Access sink through downcast (in real code we'd use a different pattern)
456        // For this test, we verify execution completed without error
457    }
458
459    #[test]
460    fn test_pipeline_with_operator() {
461        let source = Box::new(TestSource::new(2, 5));
462        let sink = Box::new(TestSink::new());
463
464        let mut pipeline =
465            Pipeline::simple(source, sink).with_operator(Box::new(PassThroughOperator));
466
467        pipeline.execute().unwrap();
468    }
469
470    #[test]
471    fn test_chunk_collector() {
472        let mut collector = ChunkCollector::new();
473        assert!(collector.is_empty());
474
475        let values: Vec<Value> = vec![Value::Int64(1), Value::Int64(2)];
476        let vector = ValueVector::from_values(&values);
477        let chunk = DataChunk::new(vec![vector]);
478
479        collector.consume(chunk).unwrap();
480        assert!(!collector.is_empty());
481        assert_eq!(collector.row_count(), 2);
482
483        let merged = collector.into_single_chunk();
484        assert_eq!(merged.len(), 2);
485    }
486
487    #[test]
488    fn test_chunk_size_hints() {
489        assert_eq!(ChunkSizeHint::default(), ChunkSizeHint::Default);
490
491        let source = Box::new(TestSource::new(1, 10));
492        let sink = Box::new(TestSink::new());
493
494        // Test with small hint operator
495        struct SmallHintOp;
496        impl PushOperator for SmallHintOp {
497            fn push(
498                &mut self,
499                chunk: DataChunk,
500                sink: &mut dyn Sink,
501            ) -> Result<bool, OperatorError> {
502                sink.consume(chunk)
503            }
504            fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
505                Ok(())
506            }
507            fn preferred_chunk_size(&self) -> ChunkSizeHint {
508                ChunkSizeHint::Small
509            }
510            fn name(&self) -> &'static str {
511                "SmallHint"
512            }
513        }
514
515        let pipeline = Pipeline::simple(source, sink).with_operator(Box::new(SmallHintOp));
516
517        let computed_size = pipeline.compute_chunk_size();
518        assert!(computed_size <= SMALL_CHUNK_SIZE);
519    }
520}