Skip to main content

grafeo_core/execution/
pipeline.rs

1//! Push-based execution pipeline.
2//!
3//! This module provides push-based execution primitives where data flows
4//! forward through operators via `push()` calls, enabling better parallelism
5//! and cache utilization compared to pull-based execution.
6
7use std::time::Instant;
8
9use super::chunk::DataChunk;
10use super::operators::OperatorError;
11
12/// Hint for preferred chunk size.
13///
14/// Operators can provide hints to optimize chunk sizing for their workload.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[non_exhaustive]
17pub enum ChunkSizeHint {
18    /// Use default chunk size (2048 tuples).
19    Default,
20    /// Use small chunks (256-512 tuples) for LIMIT or high selectivity.
21    Small,
22    /// Use large chunks (4096 tuples) for full scans.
23    Large,
24    /// Use exact chunk size.
25    Exact(usize),
26    /// Use at most this many tuples (for LIMIT).
27    AtMost(usize),
28}
29
30impl Default for ChunkSizeHint {
31    fn default() -> Self {
32        Self::Default
33    }
34}
35
36/// Default chunk size in tuples.
37pub const DEFAULT_CHUNK_SIZE: usize = 2048;
38
39/// Small chunk size for high selectivity or LIMIT.
40pub const SMALL_CHUNK_SIZE: usize = 512;
41
42/// Large chunk size for full scans.
43pub const LARGE_CHUNK_SIZE: usize = 4096;
44
45/// Source of data chunks for a pipeline.
46///
47/// Sources produce chunks of data that flow through the pipeline.
48pub trait Source: Send + Sync {
49    /// Produce the next chunk of data.
50    ///
51    /// Returns `Ok(Some(chunk))` if data is available, `Ok(None)` if exhausted.
52    ///
53    /// # Errors
54    ///
55    /// Returns `Err` if the source fails to produce data.
56    fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError>;
57
58    /// Reset the source to its initial state.
59    fn reset(&mut self);
60
61    /// Name of this source for debugging.
62    fn name(&self) -> &'static str;
63}
64
65/// Sink that receives output from operators.
66///
67/// Sinks consume data chunks produced by the pipeline.
68pub trait Sink: Send + Sync {
69    /// Consume a chunk of data.
70    ///
71    /// Returns `Ok(true)` to continue, `Ok(false)` to signal early termination.
72    ///
73    /// # Errors
74    ///
75    /// Returns `Err` if the sink fails to process the chunk.
76    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError>;
77
78    /// Called when all input has been processed.
79    ///
80    /// # Errors
81    ///
82    /// Returns `Err` if finalization fails.
83    fn finalize(&mut self) -> Result<(), OperatorError>;
84
85    /// Name of this sink for debugging.
86    fn name(&self) -> &'static str;
87
88    /// Converts this boxed sink into `Box<dyn Any>` for type-based dispatch.
89    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any>;
90}
91
92/// Push-based operator trait.
93///
94/// Unlike pull-based operators that return data on `next()` calls,
95/// push-based operators receive data via `push()` and forward results
96/// to a downstream sink.
97pub trait PushOperator: Send + Sync {
98    /// Process an incoming chunk and push results to the sink.
99    ///
100    /// Returns `Ok(true)` to continue processing, `Ok(false)` for early termination.
101    ///
102    /// # Errors
103    ///
104    /// Returns `Err` if the operator or sink fails during processing.
105    fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError>;
106
107    /// Called when all input has been processed.
108    ///
109    /// Pipeline breakers (Sort, Aggregate, etc.) emit their results here.
110    ///
111    /// # Errors
112    ///
113    /// Returns `Err` if finalization or downstream sink consumption fails.
114    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError>;
115
116    /// Hint for preferred chunk size.
117    fn preferred_chunk_size(&self) -> ChunkSizeHint {
118        ChunkSizeHint::Default
119    }
120
121    /// Name of this operator for debugging.
122    fn name(&self) -> &'static str;
123}
124
125/// Execution pipeline connecting source, operators, and sink.
126pub struct Pipeline {
127    source: Box<dyn Source>,
128    operators: Vec<Box<dyn PushOperator>>,
129    sink: Box<dyn Sink>,
130    /// Optional wall-clock deadline after which execution is aborted.
131    deadline: Option<Instant>,
132}
133
134impl Pipeline {
135    /// Create a new pipeline.
136    pub fn new(
137        source: Box<dyn Source>,
138        operators: Vec<Box<dyn PushOperator>>,
139        sink: Box<dyn Sink>,
140    ) -> Self {
141        Self {
142            source,
143            operators,
144            sink,
145            deadline: None,
146        }
147    }
148
149    /// Create a simple pipeline with just source and sink.
150    pub fn simple(source: Box<dyn Source>, sink: Box<dyn Sink>) -> Self {
151        Self {
152            source,
153            operators: Vec::new(),
154            sink,
155            deadline: None,
156        }
157    }
158
159    /// Add an operator to the pipeline.
160    #[must_use]
161    pub fn with_operator(mut self, op: Box<dyn PushOperator>) -> Self {
162        self.operators.push(op);
163        self
164    }
165
166    /// Sets a wall-clock deadline for pipeline execution.
167    ///
168    /// When set, the pipeline checks between chunks whether the deadline has
169    /// been exceeded and aborts with a timeout error if so.
170    #[must_use]
171    pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
172        self.deadline = deadline;
173        self
174    }
175
176    /// Sets the deadline on an already-constructed pipeline.
177    pub fn set_deadline(&mut self, deadline: Option<Instant>) {
178        self.deadline = deadline;
179    }
180
181    /// Checks whether the deadline has been exceeded.
182    ///
183    /// On WASM targets this is a no-op because `Instant` is not available.
184    fn check_deadline(&self) -> Result<(), OperatorError> {
185        #[cfg(not(target_arch = "wasm32"))]
186        if let Some(deadline) = self.deadline
187            && Instant::now() >= deadline
188        {
189            return Err(OperatorError::Execution(
190                "Query exceeded timeout".to_string(),
191            ));
192        }
193        Ok(())
194    }
195
196    /// Consumes the pipeline and returns the sink.
197    ///
198    /// Call this after [`execute()`](Self::execute) to retrieve collected results
199    /// from the sink. Useful for extracting chunks from a [`CollectorSink`](super::sink::CollectorSink)
200    /// or [`ChunkCollector`].
201    pub fn into_sink(self) -> Box<dyn Sink> {
202        self.sink
203    }
204
205    /// Execute the pipeline.
206    ///
207    /// # Deadline behavior
208    ///
209    /// The deadline is checked **between** chunk iterations, not during source reads.
210    /// A slow source read (e.g., a complex scan or join) can exceed the deadline
211    /// before cancellation is evaluated. This matches the pull-based executor's
212    /// behavior and is an accepted trade-off: cooperative cancellation between
213    /// chunks avoids the complexity of interrupt-based cancellation within operators.
214    ///
215    /// # Errors
216    ///
217    /// Returns `Err` if any source, operator, or sink fails during execution,
218    /// or if the configured deadline is exceeded between chunks.
219    pub fn execute(&mut self) -> Result<(), OperatorError> {
220        let chunk_size = self.compute_chunk_size();
221
222        // Process all chunks from source
223        while let Some(chunk) = self.source.next_chunk(chunk_size)? {
224            self.check_deadline()?;
225
226            if !self.push_through(chunk)? {
227                // Early termination requested
228                break;
229            }
230        }
231
232        // Finalize all operators (important for pipeline breakers)
233        self.finalize_all()
234    }
235
236    /// Compute optimal chunk size from operator hints.
237    fn compute_chunk_size(&self) -> usize {
238        let mut size = DEFAULT_CHUNK_SIZE;
239
240        for op in &self.operators {
241            match op.preferred_chunk_size() {
242                ChunkSizeHint::Default => {}
243                ChunkSizeHint::Small => size = size.min(SMALL_CHUNK_SIZE),
244                ChunkSizeHint::Large => size = size.max(LARGE_CHUNK_SIZE),
245                ChunkSizeHint::Exact(s) => return s,
246                ChunkSizeHint::AtMost(s) => size = size.min(s),
247            }
248        }
249
250        size
251    }
252
253    /// Push a chunk through the operator chain.
254    fn push_through(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
255        if self.operators.is_empty() {
256            // No operators, push directly to sink
257            return self.sink.consume(chunk);
258        }
259
260        // Build a chain: operators push to each other, final one pushes to sink
261        let mut current_chunk = chunk;
262        let num_operators = self.operators.len();
263
264        for i in 0..num_operators {
265            let is_last = i == num_operators - 1;
266
267            if is_last {
268                // Last operator pushes to the real sink
269                return self.operators[i].push(current_chunk, &mut *self.sink);
270            }
271
272            // Intermediate operators collect output
273            let mut collector = ChunkCollector::new();
274            let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
275
276            if !continue_processing || collector.is_empty() {
277                return Ok(continue_processing);
278            }
279
280            // Merge collected chunks for next operator
281            current_chunk = collector.into_single_chunk();
282        }
283
284        Ok(true)
285    }
286
287    /// Finalize all operators in reverse order.
288    fn finalize_all(&mut self) -> Result<(), OperatorError> {
289        // For pipeline breakers, finalize produces output
290        // We need to chain finalize calls through the operators
291
292        if self.operators.is_empty() {
293            return self.sink.finalize();
294        }
295
296        // Finalize operators in order, each pushing to the next
297        for i in 0..self.operators.len() {
298            let is_last = i == self.operators.len() - 1;
299
300            if is_last {
301                self.operators[i].finalize(&mut *self.sink)?;
302            } else {
303                // Collect finalize output and push through remaining operators
304                let mut collector = ChunkCollector::new();
305                self.operators[i].finalize(&mut collector)?;
306
307                for chunk in collector.into_chunks() {
308                    // Push through remaining operators
309                    self.push_through_from(chunk, i + 1)?;
310                }
311            }
312        }
313
314        self.sink.finalize()
315    }
316
317    /// Push a chunk through operators starting at index.
318    fn push_through_from(&mut self, chunk: DataChunk, start: usize) -> Result<bool, OperatorError> {
319        let mut current_chunk = chunk;
320
321        for i in start..self.operators.len() {
322            let is_last = i == self.operators.len() - 1;
323
324            if is_last {
325                return self.operators[i].push(current_chunk, &mut *self.sink);
326            }
327
328            let mut collector = ChunkCollector::new();
329            let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
330
331            if !continue_processing || collector.is_empty() {
332                return Ok(continue_processing);
333            }
334
335            current_chunk = collector.into_single_chunk();
336        }
337
338        self.sink.consume(current_chunk)
339    }
340}
341
342/// Collects chunks from operators for intermediate processing.
343pub struct ChunkCollector {
344    chunks: Vec<DataChunk>,
345}
346
347impl ChunkCollector {
348    /// Create a new chunk collector.
349    pub fn new() -> Self {
350        Self { chunks: Vec::new() }
351    }
352
353    /// Check if collector has any chunks.
354    pub fn is_empty(&self) -> bool {
355        self.chunks.is_empty()
356    }
357
358    /// Get total row count across all chunks.
359    pub fn row_count(&self) -> usize {
360        self.chunks.iter().map(DataChunk::len).sum()
361    }
362
363    /// Convert to a vector of chunks.
364    pub fn into_chunks(self) -> Vec<DataChunk> {
365        self.chunks
366    }
367
368    /// Merge all chunks into a single chunk.
369    ///
370    /// # Panics
371    ///
372    /// Panics if internal invariants are violated (single-element vec is unexpectedly empty).
373    pub fn into_single_chunk(self) -> DataChunk {
374        if self.chunks.is_empty() {
375            return DataChunk::empty();
376        }
377        if self.chunks.len() == 1 {
378            // Invariant: self.chunks.len() == 1 guarantees exactly one element
379            return self
380                .chunks
381                .into_iter()
382                .next()
383                .expect("chunks has exactly one element: checked on previous line");
384        }
385
386        // Concatenate all chunks
387        DataChunk::concat(&self.chunks)
388    }
389}
390
391impl Default for ChunkCollector {
392    fn default() -> Self {
393        Self::new()
394    }
395}
396
397impl Sink for ChunkCollector {
398    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
399        if !chunk.is_empty() {
400            self.chunks.push(chunk);
401        }
402        Ok(true)
403    }
404
405    fn finalize(&mut self) -> Result<(), OperatorError> {
406        Ok(())
407    }
408
409    fn name(&self) -> &'static str {
410        "ChunkCollector"
411    }
412
413    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
414        self
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421    use crate::execution::vector::ValueVector;
422    use grafeo_common::types::Value;
423
424    /// Test source that produces a fixed number of chunks.
425    struct TestSource {
426        remaining: usize,
427        values_per_chunk: usize,
428    }
429
430    impl TestSource {
431        fn new(num_chunks: usize, values_per_chunk: usize) -> Self {
432            Self {
433                remaining: num_chunks,
434                values_per_chunk,
435            }
436        }
437    }
438
439    impl Source for TestSource {
440        fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
441            if self.remaining == 0 {
442                return Ok(None);
443            }
444            self.remaining -= 1;
445
446            // Create a chunk with integer values
447            // reason: test chunk size is small, fits i64
448            #[allow(clippy::cast_possible_wrap)]
449            let values: Vec<Value> = (0..self.values_per_chunk)
450                .map(|i| Value::Int64(i as i64))
451                .collect();
452            let vector = ValueVector::from_values(&values);
453            let chunk = DataChunk::new(vec![vector]);
454            Ok(Some(chunk))
455        }
456
457        fn reset(&mut self) {}
458
459        fn name(&self) -> &'static str {
460            "TestSource"
461        }
462    }
463
464    /// Test sink that collects all chunks.
465    struct TestSink {
466        chunks: Vec<DataChunk>,
467        finalized: bool,
468    }
469
470    impl TestSink {
471        fn new() -> Self {
472            Self {
473                chunks: Vec::new(),
474                finalized: false,
475            }
476        }
477    }
478
479    impl Sink for TestSink {
480        fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
481            self.chunks.push(chunk);
482            Ok(true)
483        }
484
485        fn finalize(&mut self) -> Result<(), OperatorError> {
486            self.finalized = true;
487            Ok(())
488        }
489
490        fn name(&self) -> &'static str {
491            "TestSink"
492        }
493
494        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
495            self
496        }
497    }
498
499    /// Pass-through operator for testing.
500    struct PassThroughOperator;
501
502    impl PushOperator for PassThroughOperator {
503        fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
504            sink.consume(chunk)
505        }
506
507        fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
508            Ok(())
509        }
510
511        fn name(&self) -> &'static str {
512            "PassThrough"
513        }
514    }
515
516    #[test]
517    fn test_simple_pipeline() {
518        let source = Box::new(TestSource::new(3, 10));
519        let sink = Box::new(TestSink::new());
520
521        let mut pipeline = Pipeline::simple(source, sink);
522        pipeline.execute().unwrap();
523
524        // Access sink through downcast (in real code we'd use a different pattern)
525        // For this test, we verify execution completed without error
526    }
527
528    #[test]
529    fn test_pipeline_with_operator() {
530        let source = Box::new(TestSource::new(2, 5));
531        let sink = Box::new(TestSink::new());
532
533        let mut pipeline =
534            Pipeline::simple(source, sink).with_operator(Box::new(PassThroughOperator));
535
536        pipeline.execute().unwrap();
537    }
538
539    #[test]
540    fn test_chunk_collector() {
541        let mut collector = ChunkCollector::new();
542        assert!(collector.is_empty());
543
544        let values: Vec<Value> = vec![Value::Int64(1), Value::Int64(2)];
545        let vector = ValueVector::from_values(&values);
546        let chunk = DataChunk::new(vec![vector]);
547
548        collector.consume(chunk).unwrap();
549        assert!(!collector.is_empty());
550        assert_eq!(collector.row_count(), 2);
551
552        let merged = collector.into_single_chunk();
553        assert_eq!(merged.len(), 2);
554    }
555
556    #[test]
557    fn test_pipeline_deadline_expired() {
558        use std::time::{Duration, Instant};
559
560        let source = Box::new(TestSource::new(10, 5));
561        let sink = Box::new(TestSink::new());
562
563        // Set a deadline that has already passed
564        let expired = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
565        let mut pipeline = Pipeline::simple(source, sink).with_deadline(Some(expired));
566
567        let result = pipeline.execute();
568        assert!(result.is_err());
569        let err = result.unwrap_err();
570        assert!(
571            err.to_string().contains("Query exceeded timeout"),
572            "Expected timeout error, got: {err}"
573        );
574    }
575
576    #[test]
577    fn test_pipeline_no_deadline() {
578        let source = Box::new(TestSource::new(3, 5));
579        let sink = Box::new(TestSink::new());
580
581        // No deadline should execute normally
582        let mut pipeline = Pipeline::simple(source, sink).with_deadline(None);
583        pipeline.execute().unwrap();
584    }
585
586    #[test]
587    fn test_pipeline_set_deadline() {
588        use std::time::{Duration, Instant};
589
590        let source = Box::new(TestSource::new(10, 5));
591        let sink = Box::new(TestSink::new());
592
593        let mut pipeline = Pipeline::simple(source, sink);
594
595        // Set deadline after construction
596        let expired = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
597        pipeline.set_deadline(Some(expired));
598
599        let result = pipeline.execute();
600        assert!(result.is_err());
601        let err = result.unwrap_err();
602        assert!(
603            err.to_string().contains("Query exceeded timeout"),
604            "Expected timeout error, got: {err}"
605        );
606    }
607
608    #[test]
609    fn test_chunk_size_hints() {
610        assert_eq!(ChunkSizeHint::default(), ChunkSizeHint::Default);
611
612        let source = Box::new(TestSource::new(1, 10));
613        let sink = Box::new(TestSink::new());
614
615        // Test with small hint operator
616        struct SmallHintOp;
617        impl PushOperator for SmallHintOp {
618            fn push(
619                &mut self,
620                chunk: DataChunk,
621                sink: &mut dyn Sink,
622            ) -> Result<bool, OperatorError> {
623                sink.consume(chunk)
624            }
625            fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
626                Ok(())
627            }
628            fn preferred_chunk_size(&self) -> ChunkSizeHint {
629                ChunkSizeHint::Small
630            }
631            fn name(&self) -> &'static str {
632                "SmallHint"
633            }
634        }
635
636        let pipeline = Pipeline::simple(source, sink).with_operator(Box::new(SmallHintOp));
637
638        let computed_size = pipeline.compute_chunk_size();
639        assert!(computed_size <= SMALL_CHUNK_SIZE);
640    }
641}