Skip to main content

grafeo_core/execution/parallel/
pipeline.rs

1//! Parallel pipeline execution with morsel-driven scheduling.
2//!
3//! Executes push-based pipelines in parallel using work-stealing schedulers
4//! and per-worker operator instances.
5
6use super::morsel::{DEFAULT_MORSEL_SIZE, compute_morsel_size};
7use super::scheduler::MorselScheduler;
8use super::source::ParallelSource;
9use crate::execution::chunk::DataChunk;
10use crate::execution::operators::OperatorError;
11use crate::execution::pipeline::{ChunkCollector, DEFAULT_CHUNK_SIZE, PushOperator, Sink};
12use grafeo_common::memory::buffer::PressureLevel;
13use parking_lot::Mutex;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::thread;
17
18/// Factory for creating per-worker operator chains.
19///
20/// Each worker needs its own operator instances since operators may have
21/// internal state (e.g., aggregation accumulators).
22pub trait OperatorChainFactory: Send + Sync {
23    /// Creates a new operator chain for a worker.
24    ///
25    /// Each call should return fresh operator instances.
26    fn create_chain(&self) -> Vec<Box<dyn PushOperator>>;
27
28    /// Returns whether the chain contains pipeline breakers.
29    ///
30    /// Pipeline breakers (Sort, Aggregate, Distinct) require a merge phase.
31    fn has_pipeline_breakers(&self) -> bool;
32
33    /// Returns the number of operators in the chain.
34    fn chain_length(&self) -> usize;
35}
36
37/// Simple factory that clones a prototype chain.
38pub struct CloneableOperatorFactory {
39    /// Factory functions for each operator.
40    factories: Vec<Box<dyn Fn() -> Box<dyn PushOperator> + Send + Sync>>,
41    /// Whether chain has pipeline breakers.
42    has_breakers: bool,
43}
44
45impl CloneableOperatorFactory {
46    /// Creates a new factory.
47    pub fn new() -> Self {
48        Self {
49            factories: Vec::new(),
50            has_breakers: false,
51        }
52    }
53
54    /// Adds an operator factory.
55    #[must_use]
56    pub fn with_operator<F>(mut self, factory: F) -> Self
57    where
58        F: Fn() -> Box<dyn PushOperator> + Send + Sync + 'static,
59    {
60        self.factories.push(Box::new(factory));
61        self
62    }
63
64    /// Marks that the chain has pipeline breakers.
65    #[must_use]
66    pub fn with_pipeline_breakers(mut self) -> Self {
67        self.has_breakers = true;
68        self
69    }
70}
71
72impl Default for CloneableOperatorFactory {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78impl OperatorChainFactory for CloneableOperatorFactory {
79    fn create_chain(&self) -> Vec<Box<dyn PushOperator>> {
80        self.factories.iter().map(|f| f()).collect()
81    }
82
83    fn has_pipeline_breakers(&self) -> bool {
84        self.has_breakers
85    }
86
87    fn chain_length(&self) -> usize {
88        self.factories.len()
89    }
90}
91
92/// Result of parallel pipeline execution.
93pub struct ParallelPipelineResult {
94    /// Output chunks from all workers.
95    pub chunks: Vec<DataChunk>,
96    /// Number of workers used.
97    pub num_workers: usize,
98    /// Total morsels processed.
99    pub morsels_processed: usize,
100    /// Total rows processed.
101    pub rows_processed: usize,
102}
103
104/// Configuration for parallel pipeline execution.
105#[derive(Debug, Clone)]
106pub struct ParallelPipelineConfig {
107    /// Number of worker threads.
108    pub num_workers: usize,
109    /// Base morsel size (adjusted for memory pressure).
110    pub morsel_size: usize,
111    /// Chunk size for processing within morsels.
112    pub chunk_size: usize,
113    /// Whether to preserve output ordering.
114    pub preserve_order: bool,
115    /// Memory pressure level (affects morsel sizing).
116    pub pressure_level: PressureLevel,
117}
118
119impl Default for ParallelPipelineConfig {
120    fn default() -> Self {
121        Self {
122            num_workers: thread::available_parallelism()
123                .map(|n| n.get())
124                .unwrap_or(4),
125            morsel_size: DEFAULT_MORSEL_SIZE,
126            chunk_size: DEFAULT_CHUNK_SIZE,
127            preserve_order: false,
128            pressure_level: PressureLevel::Normal,
129        }
130    }
131}
132
133impl ParallelPipelineConfig {
134    /// Creates config for testing with limited workers.
135    #[must_use]
136    pub fn for_testing() -> Self {
137        Self {
138            num_workers: 2,
139            ..Default::default()
140        }
141    }
142
143    /// Sets the number of workers.
144    #[must_use]
145    pub fn with_workers(mut self, n: usize) -> Self {
146        self.num_workers = n.max(1);
147        self
148    }
149
150    /// Sets memory pressure level.
151    #[must_use]
152    pub fn with_pressure(mut self, level: PressureLevel) -> Self {
153        self.pressure_level = level;
154        self
155    }
156
157    /// Returns effective morsel size based on pressure.
158    #[must_use]
159    pub fn effective_morsel_size(&self) -> usize {
160        compute_morsel_size(self.pressure_level)
161    }
162}
163
164/// Parallel execution pipeline.
165///
166/// Distributes work across multiple threads using morsel-driven scheduling.
167pub struct ParallelPipeline {
168    /// Source of data (must support parallel partitioning).
169    source: Arc<dyn ParallelSource>,
170    /// Factory for creating operator chains.
171    operator_factory: Arc<dyn OperatorChainFactory>,
172    /// Configuration.
173    config: ParallelPipelineConfig,
174}
175
176impl ParallelPipeline {
177    /// Creates a new parallel pipeline.
178    pub fn new(
179        source: Arc<dyn ParallelSource>,
180        operator_factory: Arc<dyn OperatorChainFactory>,
181        config: ParallelPipelineConfig,
182    ) -> Self {
183        Self {
184            source,
185            operator_factory,
186            config,
187        }
188    }
189
190    /// Creates a simple parallel pipeline with default config.
191    pub fn simple(
192        source: Arc<dyn ParallelSource>,
193        operator_factory: Arc<dyn OperatorChainFactory>,
194    ) -> Self {
195        Self::new(source, operator_factory, ParallelPipelineConfig::default())
196    }
197
198    /// Executes the pipeline and returns results.
199    ///
200    /// # Errors
201    ///
202    /// Returns `Err` if any worker thread encounters an operator error.
203    pub fn execute(&self) -> Result<ParallelPipelineResult, OperatorError> {
204        let morsel_size = self.config.effective_morsel_size();
205        let morsels = self.source.generate_morsels(morsel_size, 0);
206
207        if morsels.is_empty() {
208            return Ok(ParallelPipelineResult {
209                chunks: Vec::new(),
210                num_workers: self.config.num_workers,
211                morsels_processed: 0,
212                rows_processed: 0,
213            });
214        }
215
216        // Create scheduler and submit morsels
217        let scheduler = Arc::new(MorselScheduler::new(self.config.num_workers));
218        let total_morsels = morsels.len();
219        scheduler.submit_batch(morsels);
220        scheduler.finish_submission();
221
222        // Shared results collector
223        let results = Arc::new(Mutex::new(Vec::new()));
224        let rows_processed = Arc::new(AtomicUsize::new(0));
225        let errors: Arc<Mutex<Option<OperatorError>>> = Arc::new(Mutex::new(None));
226
227        // Spawn workers
228        thread::scope(|s| {
229            for worker_id in 0..self.config.num_workers {
230                let scheduler = Arc::clone(&scheduler);
231                let source = Arc::clone(&self.source);
232                let factory = Arc::clone(&self.operator_factory);
233                let results = Arc::clone(&results);
234                let rows_processed = Arc::clone(&rows_processed);
235                let errors = Arc::clone(&errors);
236                let chunk_size = self.config.chunk_size;
237
238                s.spawn(move || {
239                    if let Err(e) = Self::worker_loop(
240                        worker_id,
241                        scheduler,
242                        source,
243                        factory,
244                        results,
245                        rows_processed,
246                        chunk_size,
247                    ) {
248                        let mut guard = errors.lock();
249                        if guard.is_none() {
250                            *guard = Some(e);
251                        }
252                    }
253                });
254            }
255        });
256
257        // Check for errors
258        if let Some(e) = errors.lock().take() {
259            return Err(e);
260        }
261
262        let chunks = match Arc::try_unwrap(results) {
263            Ok(mutex) => mutex.into_inner(),
264            Err(arc) => arc.lock().clone(),
265        };
266
267        Ok(ParallelPipelineResult {
268            chunks,
269            num_workers: self.config.num_workers,
270            morsels_processed: total_morsels,
271            rows_processed: rows_processed.load(Ordering::Relaxed),
272        })
273    }
274
275    /// Worker loop: process morsels until done.
276    fn worker_loop(
277        _worker_id: usize,
278        scheduler: Arc<MorselScheduler>,
279        source: Arc<dyn ParallelSource>,
280        factory: Arc<dyn OperatorChainFactory>,
281        results: Arc<Mutex<Vec<DataChunk>>>,
282        rows_processed: Arc<AtomicUsize>,
283        chunk_size: usize,
284    ) -> Result<(), OperatorError> {
285        use super::scheduler::WorkerHandle;
286
287        // Create worker handle (registers with scheduler for work-stealing)
288        let handle = WorkerHandle::new(scheduler);
289
290        // Create per-worker operator chain
291        let mut operators = factory.create_chain();
292        let mut local_sink = CollectorSink::new();
293
294        // Process morsels
295        while let Some(morsel) = handle.get_work() {
296            let mut partition = source.create_partition(&morsel);
297            let mut morsel_rows = 0;
298
299            // Process chunks within morsel
300            while let Some(chunk) = partition.next_chunk(chunk_size)? {
301                morsel_rows += chunk.len();
302                Self::push_through_chain(&mut operators, chunk, &mut local_sink)?;
303            }
304
305            rows_processed.fetch_add(morsel_rows, Ordering::Relaxed);
306            handle.complete_morsel();
307        }
308
309        // Finalize operators (important for pipeline breakers)
310        Self::finalize_chain(&mut operators, &mut local_sink)?;
311
312        // Collect results
313        let chunks = local_sink.into_chunks();
314        if !chunks.is_empty() {
315            results.lock().extend(chunks);
316        }
317
318        Ok(())
319    }
320
321    /// Pushes a chunk through the operator chain.
322    fn push_through_chain(
323        operators: &mut [Box<dyn PushOperator>],
324        chunk: DataChunk,
325        sink: &mut dyn Sink,
326    ) -> Result<bool, OperatorError> {
327        if operators.is_empty() {
328            return sink.consume(chunk);
329        }
330
331        let num_operators = operators.len();
332        let mut current_chunk = chunk;
333
334        for i in 0..num_operators {
335            let is_last = i == num_operators - 1;
336
337            if is_last {
338                return operators[i].push(current_chunk, sink);
339            }
340
341            // Intermediate: collect output
342            let mut collector = ChunkCollector::new();
343            let continue_processing = operators[i].push(current_chunk, &mut collector)?;
344
345            if !continue_processing || collector.is_empty() {
346                return Ok(continue_processing);
347            }
348
349            current_chunk = collector.into_single_chunk();
350        }
351
352        Ok(true)
353    }
354
355    /// Finalizes all operators in the chain.
356    fn finalize_chain(
357        operators: &mut [Box<dyn PushOperator>],
358        sink: &mut dyn Sink,
359    ) -> Result<(), OperatorError> {
360        if operators.is_empty() {
361            return sink.finalize();
362        }
363
364        let num_operators = operators.len();
365
366        for i in 0..num_operators {
367            let is_last = i == num_operators - 1;
368
369            if is_last {
370                operators[i].finalize(sink)?;
371            } else {
372                // Collect finalize output and push through remaining operators
373                let mut collector = ChunkCollector::new();
374                operators[i].finalize(&mut collector)?;
375
376                // Push through remaining operators
377                for chunk in collector.into_chunks() {
378                    Self::push_through_from_index(operators, i + 1, chunk, sink)?;
379                }
380            }
381        }
382
383        sink.finalize()
384    }
385
386    /// Pushes a chunk through operators starting at index.
387    fn push_through_from_index(
388        operators: &mut [Box<dyn PushOperator>],
389        start: usize,
390        chunk: DataChunk,
391        sink: &mut dyn Sink,
392    ) -> Result<bool, OperatorError> {
393        let num_operators = operators.len();
394        let mut current_chunk = chunk;
395
396        for i in start..num_operators {
397            let is_last = i == num_operators - 1;
398
399            if is_last {
400                return operators[i].push(current_chunk, sink);
401            }
402
403            let mut collector = ChunkCollector::new();
404            let continue_processing = operators[i].push(current_chunk, &mut collector)?;
405
406            if !continue_processing || collector.is_empty() {
407                return Ok(continue_processing);
408            }
409
410            current_chunk = collector.into_single_chunk();
411        }
412
413        sink.consume(current_chunk)
414    }
415}
416
417/// Collector sink that accumulates chunks.
418#[derive(Default)]
419pub struct CollectorSink {
420    chunks: Vec<DataChunk>,
421}
422
423impl CollectorSink {
424    /// Creates a new collector sink.
425    pub fn new() -> Self {
426        Self { chunks: Vec::new() }
427    }
428
429    /// Returns collected chunks.
430    pub fn into_chunks(self) -> Vec<DataChunk> {
431        self.chunks
432    }
433
434    /// Returns number of chunks collected.
435    pub fn len(&self) -> usize {
436        self.chunks.len()
437    }
438
439    /// Returns whether no chunks collected.
440    pub fn is_empty(&self) -> bool {
441        self.chunks.is_empty()
442    }
443
444    /// Returns total row count.
445    pub fn row_count(&self) -> usize {
446        self.chunks.iter().map(DataChunk::len).sum()
447    }
448}
449
450impl Sink for CollectorSink {
451    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
452        if !chunk.is_empty() {
453            self.chunks.push(chunk);
454        }
455        Ok(true)
456    }
457
458    fn finalize(&mut self) -> Result<(), OperatorError> {
459        Ok(())
460    }
461
462    fn name(&self) -> &'static str {
463        "ParallelCollectorSink"
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470    use crate::execution::parallel::source::RangeSource;
471    use crate::execution::vector::ValueVector;
472    use grafeo_common::types::Value;
473
474    /// Pass-through operator for testing.
475    struct PassThroughOp;
476
477    impl PushOperator for PassThroughOp {
478        fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
479            sink.consume(chunk)
480        }
481
482        fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
483            Ok(())
484        }
485
486        fn name(&self) -> &'static str {
487            "PassThrough"
488        }
489    }
490
491    /// Filter operator that keeps only even numbers.
492    struct EvenFilterOp;
493
494    impl PushOperator for EvenFilterOp {
495        fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
496            let col = chunk
497                .column(0)
498                .ok_or_else(|| OperatorError::Execution("Missing column".to_string()))?;
499
500            let mut filtered = ValueVector::new();
501            for i in 0..chunk.len() {
502                if let Some(Value::Int64(v)) = col.get(i)
503                    && v % 2 == 0
504                {
505                    filtered.push(Value::Int64(v));
506                }
507            }
508
509            if !filtered.is_empty() {
510                sink.consume(DataChunk::new(vec![filtered]))?;
511            }
512            Ok(true)
513        }
514
515        fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
516            Ok(())
517        }
518
519        fn name(&self) -> &'static str {
520            "EvenFilter"
521        }
522    }
523
524    #[test]
525    fn test_parallel_pipeline_creation() {
526        let source = Arc::new(RangeSource::new(1000));
527        let factory = Arc::new(CloneableOperatorFactory::new());
528        let config = ParallelPipelineConfig::for_testing();
529
530        let pipeline = ParallelPipeline::new(source, factory, config);
531        assert_eq!(pipeline.config.num_workers, 2);
532    }
533
534    #[test]
535    fn test_parallel_pipeline_empty_source() {
536        let source = Arc::new(RangeSource::new(0));
537        let factory = Arc::new(CloneableOperatorFactory::new());
538
539        let pipeline = ParallelPipeline::simple(source, factory);
540        let result = pipeline.execute().unwrap();
541
542        assert!(result.chunks.is_empty());
543        assert_eq!(result.morsels_processed, 0);
544        assert_eq!(result.rows_processed, 0);
545    }
546
547    #[test]
548    fn test_parallel_pipeline_passthrough() {
549        let source = Arc::new(RangeSource::new(100));
550        let factory =
551            Arc::new(CloneableOperatorFactory::new().with_operator(|| Box::new(PassThroughOp)));
552        let config = ParallelPipelineConfig::for_testing();
553
554        let pipeline = ParallelPipeline::new(source, factory, config);
555        let result = pipeline.execute().unwrap();
556
557        // Should process all 100 rows
558        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
559        assert_eq!(total_rows, 100);
560        assert_eq!(result.rows_processed, 100);
561    }
562
563    #[test]
564    fn test_parallel_pipeline_filter() {
565        let source = Arc::new(RangeSource::new(100));
566        let factory =
567            Arc::new(CloneableOperatorFactory::new().with_operator(|| Box::new(EvenFilterOp)));
568        let config = ParallelPipelineConfig::for_testing();
569
570        let pipeline = ParallelPipeline::new(source, factory, config);
571        let result = pipeline.execute().unwrap();
572
573        // Should have 50 even numbers (0, 2, 4, ..., 98)
574        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
575        assert_eq!(total_rows, 50);
576    }
577
578    #[test]
579    fn test_parallel_pipeline_multiple_workers() {
580        let source = Arc::new(RangeSource::new(10000));
581        let factory = Arc::new(CloneableOperatorFactory::new());
582        let config = ParallelPipelineConfig::default().with_workers(4);
583
584        let pipeline = ParallelPipeline::new(source, factory, config);
585        let result = pipeline.execute().unwrap();
586
587        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
588        assert_eq!(total_rows, 10000);
589        assert_eq!(result.num_workers, 4);
590    }
591
592    #[test]
593    fn test_parallel_pipeline_under_pressure() {
594        let source = Arc::new(RangeSource::new(10000));
595        let factory = Arc::new(CloneableOperatorFactory::new());
596        let config = ParallelPipelineConfig::for_testing().with_pressure(PressureLevel::High);
597
598        let pipeline = ParallelPipeline::new(source, factory, config);
599        let result = pipeline.execute().unwrap();
600
601        // More morsels under pressure due to smaller size
602        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
603        assert_eq!(total_rows, 10000);
604    }
605
606    #[test]
607    fn test_cloneable_operator_factory() {
608        let factory = CloneableOperatorFactory::new()
609            .with_operator(|| Box::new(PassThroughOp))
610            .with_operator(|| Box::new(EvenFilterOp))
611            .with_pipeline_breakers();
612
613        assert_eq!(factory.chain_length(), 2);
614        assert!(factory.has_pipeline_breakers());
615
616        let chain = factory.create_chain();
617        assert_eq!(chain.len(), 2);
618    }
619
620    #[test]
621    fn test_collector_sink() {
622        let mut sink = CollectorSink::new();
623        assert!(sink.is_empty());
624
625        let values = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
626        let chunk = DataChunk::new(vec![ValueVector::from_values(&values)]);
627
628        sink.consume(chunk).unwrap();
629        assert!(!sink.is_empty());
630        assert_eq!(sink.len(), 1);
631        assert_eq!(sink.row_count(), 3);
632
633        let chunks = sink.into_chunks();
634        assert_eq!(chunks.len(), 1);
635    }
636
637    #[test]
638    fn test_pipeline_config() {
639        let config = ParallelPipelineConfig::default()
640            .with_workers(8)
641            .with_pressure(PressureLevel::Moderate);
642
643        assert_eq!(config.num_workers, 8);
644        assert_eq!(config.pressure_level, PressureLevel::Moderate);
645        assert!(config.effective_morsel_size() < DEFAULT_MORSEL_SIZE);
646    }
647}