Skip to main content

grafeo_core/execution/
pipeline.rs

1//! Push-based execution pipeline.
2//!
3//! This module provides push-based execution primitives where data flows
4//! forward through operators via `push()` calls, enabling better parallelism
5//! and cache utilization compared to pull-based execution.
6
7use super::chunk::DataChunk;
8use super::operators::OperatorError;
9
10/// Hint for preferred chunk size.
11///
12/// Operators can provide hints to optimize chunk sizing for their workload.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ChunkSizeHint {
15    /// Use default chunk size (2048 tuples).
16    Default,
17    /// Use small chunks (256-512 tuples) for LIMIT or high selectivity.
18    Small,
19    /// Use large chunks (4096 tuples) for full scans.
20    Large,
21    /// Use exact chunk size.
22    Exact(usize),
23    /// Use at most this many tuples (for LIMIT).
24    AtMost(usize),
25}
26
27impl Default for ChunkSizeHint {
28    fn default() -> Self {
29        Self::Default
30    }
31}
32
33/// Default chunk size in tuples.
34pub const DEFAULT_CHUNK_SIZE: usize = 2048;
35
36/// Small chunk size for high selectivity or LIMIT.
37pub const SMALL_CHUNK_SIZE: usize = 512;
38
39/// Large chunk size for full scans.
40pub const LARGE_CHUNK_SIZE: usize = 4096;
41
42/// Source of data chunks for a pipeline.
43///
44/// Sources produce chunks of data that flow through the pipeline.
45pub trait Source: Send + Sync {
46    /// Produce the next chunk of data.
47    ///
48    /// Returns `Ok(Some(chunk))` if data is available, `Ok(None)` if exhausted.
49    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError>;
50
51    /// Reset the source to its initial state.
52    fn reset(&mut self);
53
54    /// Name of this source for debugging.
55    fn name(&self) -> &'static str;
56}
57
58/// Sink that receives output from operators.
59///
60/// Sinks consume data chunks produced by the pipeline.
61pub trait Sink: Send + Sync {
62    /// Consume a chunk of data.
63    ///
64    /// Returns `Ok(true)` to continue, `Ok(false)` to signal early termination.
65    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError>;
66
67    /// Called when all input has been processed.
68    fn finalize(&mut self) -> Result<(), OperatorError>;
69
70    /// Name of this sink for debugging.
71    fn name(&self) -> &'static str;
72}
73
74/// Push-based operator trait.
75///
76/// Unlike pull-based operators that return data on `next()` calls,
77/// push-based operators receive data via `push()` and forward results
78/// to a downstream sink.
79pub trait PushOperator: Send + Sync {
80    /// Process an incoming chunk and push results to the sink.
81    ///
82    /// Returns `Ok(true)` to continue processing, `Ok(false)` for early termination.
83    fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError>;
84
85    /// Called when all input has been processed.
86    ///
87    /// Pipeline breakers (Sort, Aggregate, etc.) emit their results here.
88    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError>;
89
90    /// Hint for preferred chunk size.
91    fn preferred_chunk_size(&self) -> ChunkSizeHint {
92        ChunkSizeHint::Default
93    }
94
95    /// Name of this operator for debugging.
96    fn name(&self) -> &'static str;
97}
98
99/// Execution pipeline connecting source → operators → sink.
100pub struct Pipeline {
101    source: Box<dyn Source>,
102    operators: Vec<Box<dyn PushOperator>>,
103    sink: Box<dyn Sink>,
104}
105
106impl Pipeline {
107    /// Create a new pipeline.
108    pub fn new(
109        source: Box<dyn Source>,
110        operators: Vec<Box<dyn PushOperator>>,
111        sink: Box<dyn Sink>,
112    ) -> Self {
113        Self {
114            source,
115            operators,
116            sink,
117        }
118    }
119
120    /// Create a simple pipeline with just source and sink.
121    pub fn simple(source: Box<dyn Source>, sink: Box<dyn Sink>) -> Self {
122        Self {
123            source,
124            operators: Vec::new(),
125            sink,
126        }
127    }
128
129    /// Add an operator to the pipeline.
130    #[must_use]
131    pub fn with_operator(mut self, op: Box<dyn PushOperator>) -> Self {
132        self.operators.push(op);
133        self
134    }
135
136    /// Execute the pipeline.
137    pub fn execute(&mut self) -> Result<(), OperatorError> {
138        let chunk_size = self.compute_chunk_size();
139
140        // Process all chunks from source
141        while let Some(chunk) = self.source.next_chunk(chunk_size)? {
142            if !self.push_through(chunk)? {
143                // Early termination requested
144                break;
145            }
146        }
147
148        // Finalize all operators (important for pipeline breakers)
149        self.finalize_all()
150    }
151
152    /// Compute optimal chunk size from operator hints.
153    fn compute_chunk_size(&self) -> usize {
154        let mut size = DEFAULT_CHUNK_SIZE;
155
156        for op in &self.operators {
157            match op.preferred_chunk_size() {
158                ChunkSizeHint::Default => {}
159                ChunkSizeHint::Small => size = size.min(SMALL_CHUNK_SIZE),
160                ChunkSizeHint::Large => size = size.max(LARGE_CHUNK_SIZE),
161                ChunkSizeHint::Exact(s) => return s,
162                ChunkSizeHint::AtMost(s) => size = size.min(s),
163            }
164        }
165
166        size
167    }
168
169    /// Push a chunk through the operator chain.
170    fn push_through(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
171        if self.operators.is_empty() {
172            // No operators, push directly to sink
173            return self.sink.consume(chunk);
174        }
175
176        // Build a chain: operators push to each other, final one pushes to sink
177        let mut current_chunk = chunk;
178        let num_operators = self.operators.len();
179
180        for i in 0..num_operators {
181            let is_last = i == num_operators - 1;
182
183            if is_last {
184                // Last operator pushes to the real sink
185                return self.operators[i].push(current_chunk, &mut *self.sink);
186            }
187
188            // Intermediate operators collect output
189            let mut collector = ChunkCollector::new();
190            let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
191
192            if !continue_processing || collector.is_empty() {
193                return Ok(continue_processing);
194            }
195
196            // Merge collected chunks for next operator
197            current_chunk = collector.into_single_chunk();
198        }
199
200        Ok(true)
201    }
202
203    /// Finalize all operators in reverse order.
204    fn finalize_all(&mut self) -> Result<(), OperatorError> {
205        // For pipeline breakers, finalize produces output
206        // We need to chain finalize calls through the operators
207
208        if self.operators.is_empty() {
209            return self.sink.finalize();
210        }
211
212        // Finalize operators in order, each pushing to the next
213        for i in 0..self.operators.len() {
214            let is_last = i == self.operators.len() - 1;
215
216            if is_last {
217                self.operators[i].finalize(&mut *self.sink)?;
218            } else {
219                // Collect finalize output and push through remaining operators
220                let mut collector = ChunkCollector::new();
221                self.operators[i].finalize(&mut collector)?;
222
223                for chunk in collector.into_chunks() {
224                    // Push through remaining operators
225                    self.push_through_from(chunk, i + 1)?;
226                }
227            }
228        }
229
230        self.sink.finalize()
231    }
232
233    /// Push a chunk through operators starting at index.
234    fn push_through_from(&mut self, chunk: DataChunk, start: usize) -> Result<bool, OperatorError> {
235        let mut current_chunk = chunk;
236
237        for i in start..self.operators.len() {
238            let is_last = i == self.operators.len() - 1;
239
240            if is_last {
241                return self.operators[i].push(current_chunk, &mut *self.sink);
242            }
243
244            let mut collector = ChunkCollector::new();
245            let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
246
247            if !continue_processing || collector.is_empty() {
248                return Ok(continue_processing);
249            }
250
251            current_chunk = collector.into_single_chunk();
252        }
253
254        self.sink.consume(current_chunk)
255    }
256}
257
258/// Collects chunks from operators for intermediate processing.
259pub struct ChunkCollector {
260    chunks: Vec<DataChunk>,
261}
262
263impl ChunkCollector {
264    /// Create a new chunk collector.
265    pub fn new() -> Self {
266        Self { chunks: Vec::new() }
267    }
268
269    /// Check if collector has any chunks.
270    pub fn is_empty(&self) -> bool {
271        self.chunks.is_empty()
272    }
273
274    /// Get total row count across all chunks.
275    pub fn row_count(&self) -> usize {
276        self.chunks.iter().map(DataChunk::len).sum()
277    }
278
279    /// Convert to a vector of chunks.
280    pub fn into_chunks(self) -> Vec<DataChunk> {
281        self.chunks
282    }
283
284    /// Merge all chunks into a single chunk.
285    pub fn into_single_chunk(self) -> DataChunk {
286        if self.chunks.is_empty() {
287            return DataChunk::empty();
288        }
289        if self.chunks.len() == 1 {
290            // Invariant: self.chunks.len() == 1 guarantees exactly one element
291            return self
292                .chunks
293                .into_iter()
294                .next()
295                .expect("chunks has exactly one element: checked on previous line");
296        }
297
298        // Concatenate all chunks
299        DataChunk::concat(&self.chunks)
300    }
301}
302
303impl Default for ChunkCollector {
304    fn default() -> Self {
305        Self::new()
306    }
307}
308
309impl Sink for ChunkCollector {
310    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
311        if !chunk.is_empty() {
312            self.chunks.push(chunk);
313        }
314        Ok(true)
315    }
316
317    fn finalize(&mut self) -> Result<(), OperatorError> {
318        Ok(())
319    }
320
321    fn name(&self) -> &'static str {
322        "ChunkCollector"
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329    use crate::execution::vector::ValueVector;
330    use grafeo_common::types::Value;
331
332    /// Test source that produces a fixed number of chunks.
333    struct TestSource {
334        remaining: usize,
335        values_per_chunk: usize,
336    }
337
338    impl TestSource {
339        fn new(num_chunks: usize, values_per_chunk: usize) -> Self {
340            Self {
341                remaining: num_chunks,
342                values_per_chunk,
343            }
344        }
345    }
346
347    impl Source for TestSource {
348        fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
349            if self.remaining == 0 {
350                return Ok(None);
351            }
352            self.remaining -= 1;
353
354            // Create a chunk with integer values
355            let values: Vec<Value> = (0..self.values_per_chunk)
356                .map(|i| Value::Int64(i as i64))
357                .collect();
358            let vector = ValueVector::from_values(&values);
359            let chunk = DataChunk::new(vec![vector]);
360            Ok(Some(chunk))
361        }
362
363        fn reset(&mut self) {}
364
365        fn name(&self) -> &'static str {
366            "TestSource"
367        }
368    }
369
370    /// Test sink that collects all chunks.
371    struct TestSink {
372        chunks: Vec<DataChunk>,
373        finalized: bool,
374    }
375
376    impl TestSink {
377        fn new() -> Self {
378            Self {
379                chunks: Vec::new(),
380                finalized: false,
381            }
382        }
383
384        #[allow(dead_code)]
385        fn total_rows(&self) -> usize {
386            self.chunks.iter().map(DataChunk::len).sum()
387        }
388    }
389
390    impl Sink for TestSink {
391        fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
392            self.chunks.push(chunk);
393            Ok(true)
394        }
395
396        fn finalize(&mut self) -> Result<(), OperatorError> {
397            self.finalized = true;
398            Ok(())
399        }
400
401        fn name(&self) -> &'static str {
402            "TestSink"
403        }
404    }
405
406    /// Pass-through operator for testing.
407    struct PassThroughOperator;
408
409    impl PushOperator for PassThroughOperator {
410        fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
411            sink.consume(chunk)
412        }
413
414        fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
415            Ok(())
416        }
417
418        fn name(&self) -> &'static str {
419            "PassThrough"
420        }
421    }
422
423    #[test]
424    fn test_simple_pipeline() {
425        let source = Box::new(TestSource::new(3, 10));
426        let sink = Box::new(TestSink::new());
427
428        let mut pipeline = Pipeline::simple(source, sink);
429        pipeline.execute().unwrap();
430
431        // Access sink through downcast (in real code we'd use a different pattern)
432        // For this test, we verify execution completed without error
433    }
434
435    #[test]
436    fn test_pipeline_with_operator() {
437        let source = Box::new(TestSource::new(2, 5));
438        let sink = Box::new(TestSink::new());
439
440        let mut pipeline =
441            Pipeline::simple(source, sink).with_operator(Box::new(PassThroughOperator));
442
443        pipeline.execute().unwrap();
444    }
445
446    #[test]
447    fn test_chunk_collector() {
448        let mut collector = ChunkCollector::new();
449        assert!(collector.is_empty());
450
451        let values: Vec<Value> = vec![Value::Int64(1), Value::Int64(2)];
452        let vector = ValueVector::from_values(&values);
453        let chunk = DataChunk::new(vec![vector]);
454
455        collector.consume(chunk).unwrap();
456        assert!(!collector.is_empty());
457        assert_eq!(collector.row_count(), 2);
458
459        let merged = collector.into_single_chunk();
460        assert_eq!(merged.len(), 2);
461    }
462
463    #[test]
464    fn test_chunk_size_hints() {
465        assert_eq!(ChunkSizeHint::default(), ChunkSizeHint::Default);
466
467        let source = Box::new(TestSource::new(1, 10));
468        let sink = Box::new(TestSink::new());
469
470        // Test with small hint operator
471        struct SmallHintOp;
472        impl PushOperator for SmallHintOp {
473            fn push(
474                &mut self,
475                chunk: DataChunk,
476                sink: &mut dyn Sink,
477            ) -> Result<bool, OperatorError> {
478                sink.consume(chunk)
479            }
480            fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
481                Ok(())
482            }
483            fn preferred_chunk_size(&self) -> ChunkSizeHint {
484                ChunkSizeHint::Small
485            }
486            fn name(&self) -> &'static str {
487                "SmallHint"
488            }
489        }
490
491        let pipeline = Pipeline::simple(source, sink).with_operator(Box::new(SmallHintOp));
492
493        let computed_size = pipeline.compute_chunk_size();
494        assert!(computed_size <= SMALL_CHUNK_SIZE);
495    }
496}