Skip to main content

grafeo_core/execution/parallel/
pipeline.rs

1//! Parallel pipeline execution with morsel-driven scheduling.
2//!
3//! Executes push-based pipelines in parallel using work-stealing schedulers
4//! and per-worker operator instances.
5
6use super::morsel::{DEFAULT_MORSEL_SIZE, compute_morsel_size};
7use super::scheduler::MorselScheduler;
8use super::source::ParallelSource;
9use crate::execution::chunk::DataChunk;
10use crate::execution::operators::OperatorError;
11use crate::execution::pipeline::{ChunkCollector, DEFAULT_CHUNK_SIZE, PushOperator, Sink};
12use grafeo_common::memory::buffer::PressureLevel;
13use parking_lot::Mutex;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::thread;
17
18/// Factory for creating per-worker operator chains.
19///
20/// Each worker needs its own operator instances since operators may have
21/// internal state (e.g., aggregation accumulators).
22pub trait OperatorChainFactory: Send + Sync {
23    /// Creates a new operator chain for a worker.
24    ///
25    /// Each call should return fresh operator instances.
26    fn create_chain(&self) -> Vec<Box<dyn PushOperator>>;
27
28    /// Returns whether the chain contains pipeline breakers.
29    ///
30    /// Pipeline breakers (Sort, Aggregate, Distinct) require a merge phase.
31    fn has_pipeline_breakers(&self) -> bool;
32
33    /// Returns the number of operators in the chain.
34    fn chain_length(&self) -> usize;
35}
36
37/// Simple factory that clones a prototype chain.
38pub struct CloneableOperatorFactory {
39    /// Factory functions for each operator.
40    factories: Vec<Box<dyn Fn() -> Box<dyn PushOperator> + Send + Sync>>,
41    /// Whether chain has pipeline breakers.
42    has_breakers: bool,
43}
44
45impl CloneableOperatorFactory {
46    /// Creates a new factory.
47    pub fn new() -> Self {
48        Self {
49            factories: Vec::new(),
50            has_breakers: false,
51        }
52    }
53
54    /// Adds an operator factory.
55    #[must_use]
56    pub fn with_operator<F>(mut self, factory: F) -> Self
57    where
58        F: Fn() -> Box<dyn PushOperator> + Send + Sync + 'static,
59    {
60        self.factories.push(Box::new(factory));
61        self
62    }
63
64    /// Marks that the chain has pipeline breakers.
65    #[must_use]
66    pub fn with_pipeline_breakers(mut self) -> Self {
67        self.has_breakers = true;
68        self
69    }
70}
71
72impl Default for CloneableOperatorFactory {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78impl OperatorChainFactory for CloneableOperatorFactory {
79    fn create_chain(&self) -> Vec<Box<dyn PushOperator>> {
80        self.factories.iter().map(|f| f()).collect()
81    }
82
83    fn has_pipeline_breakers(&self) -> bool {
84        self.has_breakers
85    }
86
87    fn chain_length(&self) -> usize {
88        self.factories.len()
89    }
90}
91
92/// Result of parallel pipeline execution.
93pub struct ParallelPipelineResult {
94    /// Output chunks from all workers.
95    pub chunks: Vec<DataChunk>,
96    /// Number of workers used.
97    pub num_workers: usize,
98    /// Total morsels processed.
99    pub morsels_processed: usize,
100    /// Total rows processed.
101    pub rows_processed: usize,
102}
103
104/// Configuration for parallel pipeline execution.
105#[derive(Debug, Clone)]
106pub struct ParallelPipelineConfig {
107    /// Number of worker threads.
108    pub num_workers: usize,
109    /// Base morsel size (adjusted for memory pressure).
110    pub morsel_size: usize,
111    /// Chunk size for processing within morsels.
112    pub chunk_size: usize,
113    /// Whether to preserve output ordering.
114    pub preserve_order: bool,
115    /// Memory pressure level (affects morsel sizing).
116    pub pressure_level: PressureLevel,
117}
118
119impl Default for ParallelPipelineConfig {
120    fn default() -> Self {
121        Self {
122            num_workers: thread::available_parallelism()
123                .map(|n| n.get())
124                .unwrap_or(4),
125            morsel_size: DEFAULT_MORSEL_SIZE,
126            chunk_size: DEFAULT_CHUNK_SIZE,
127            preserve_order: false,
128            pressure_level: PressureLevel::Normal,
129        }
130    }
131}
132
133impl ParallelPipelineConfig {
134    /// Creates config for testing with limited workers.
135    #[must_use]
136    pub fn for_testing() -> Self {
137        Self {
138            num_workers: 2,
139            ..Default::default()
140        }
141    }
142
143    /// Sets the number of workers.
144    #[must_use]
145    pub fn with_workers(mut self, n: usize) -> Self {
146        self.num_workers = n.max(1);
147        self
148    }
149
150    /// Sets memory pressure level.
151    #[must_use]
152    pub fn with_pressure(mut self, level: PressureLevel) -> Self {
153        self.pressure_level = level;
154        self
155    }
156
157    /// Returns effective morsel size based on pressure.
158    #[must_use]
159    pub fn effective_morsel_size(&self) -> usize {
160        compute_morsel_size(self.pressure_level)
161    }
162}
163
164/// Parallel execution pipeline.
165///
166/// Distributes work across multiple threads using morsel-driven scheduling.
167pub struct ParallelPipeline {
168    /// Source of data (must support parallel partitioning).
169    source: Arc<dyn ParallelSource>,
170    /// Factory for creating operator chains.
171    operator_factory: Arc<dyn OperatorChainFactory>,
172    /// Configuration.
173    config: ParallelPipelineConfig,
174}
175
176impl ParallelPipeline {
177    /// Creates a new parallel pipeline.
178    pub fn new(
179        source: Arc<dyn ParallelSource>,
180        operator_factory: Arc<dyn OperatorChainFactory>,
181        config: ParallelPipelineConfig,
182    ) -> Self {
183        Self {
184            source,
185            operator_factory,
186            config,
187        }
188    }
189
190    /// Creates a simple parallel pipeline with default config.
191    pub fn simple(
192        source: Arc<dyn ParallelSource>,
193        operator_factory: Arc<dyn OperatorChainFactory>,
194    ) -> Self {
195        Self::new(source, operator_factory, ParallelPipelineConfig::default())
196    }
197
198    /// Executes the pipeline and returns results.
199    ///
200    /// # Errors
201    ///
202    /// Returns `Err` if any worker thread encounters an operator error.
203    pub fn execute(&self) -> Result<ParallelPipelineResult, OperatorError> {
204        let morsel_size = self.config.effective_morsel_size();
205        let morsels = self.source.generate_morsels(morsel_size, 0);
206
207        if morsels.is_empty() {
208            return Ok(ParallelPipelineResult {
209                chunks: Vec::new(),
210                num_workers: self.config.num_workers,
211                morsels_processed: 0,
212                rows_processed: 0,
213            });
214        }
215
216        // Create scheduler and submit morsels
217        let scheduler = Arc::new(MorselScheduler::new(self.config.num_workers));
218        let total_morsels = morsels.len();
219        scheduler.submit_batch(morsels);
220        scheduler.finish_submission();
221
222        // Shared results collector
223        let results = Arc::new(Mutex::new(Vec::new()));
224        let rows_processed = Arc::new(AtomicUsize::new(0));
225        let errors: Arc<Mutex<Option<OperatorError>>> = Arc::new(Mutex::new(None));
226
227        // Spawn workers
228        thread::scope(|s| {
229            for worker_id in 0..self.config.num_workers {
230                let scheduler = Arc::clone(&scheduler);
231                let source = Arc::clone(&self.source);
232                let factory = Arc::clone(&self.operator_factory);
233                let results = Arc::clone(&results);
234                let rows_processed = Arc::clone(&rows_processed);
235                let errors = Arc::clone(&errors);
236                let chunk_size = self.config.chunk_size;
237
238                s.spawn(move || {
239                    if let Err(e) = Self::worker_loop(
240                        worker_id,
241                        scheduler,
242                        source,
243                        factory,
244                        results,
245                        rows_processed,
246                        chunk_size,
247                    ) {
248                        let mut guard = errors.lock();
249                        if guard.is_none() {
250                            *guard = Some(e);
251                        }
252                    }
253                });
254            }
255        });
256
257        // Check for errors
258        if let Some(e) = errors.lock().take() {
259            return Err(e);
260        }
261
262        let chunks = match Arc::try_unwrap(results) {
263            Ok(mutex) => mutex.into_inner(),
264            Err(arc) => arc.lock().clone(),
265        };
266
267        Ok(ParallelPipelineResult {
268            chunks,
269            num_workers: self.config.num_workers,
270            morsels_processed: total_morsels,
271            rows_processed: rows_processed.load(Ordering::Relaxed),
272        })
273    }
274
275    /// Worker loop: process morsels until done.
276    fn worker_loop(
277        _worker_id: usize,
278        scheduler: Arc<MorselScheduler>,
279        source: Arc<dyn ParallelSource>,
280        factory: Arc<dyn OperatorChainFactory>,
281        results: Arc<Mutex<Vec<DataChunk>>>,
282        rows_processed: Arc<AtomicUsize>,
283        chunk_size: usize,
284    ) -> Result<(), OperatorError> {
285        use super::scheduler::WorkerHandle;
286
287        // Create worker handle (registers with scheduler for work-stealing)
288        let handle = WorkerHandle::new(scheduler);
289
290        // Create per-worker operator chain
291        let mut operators = factory.create_chain();
292        let mut local_sink = CollectorSink::new();
293
294        // Process morsels
295        while let Some(morsel) = handle.get_work() {
296            let mut partition = source.create_partition(&morsel);
297            let mut morsel_rows = 0;
298
299            // Process chunks within morsel
300            while let Some(chunk) = partition.next_chunk(chunk_size)? {
301                morsel_rows += chunk.len();
302                Self::push_through_chain(&mut operators, chunk, &mut local_sink)?;
303            }
304
305            rows_processed.fetch_add(morsel_rows, Ordering::Relaxed);
306            handle.complete_morsel();
307        }
308
309        // Finalize operators (important for pipeline breakers)
310        Self::finalize_chain(&mut operators, &mut local_sink)?;
311
312        // Collect results
313        let chunks = local_sink.into_chunks();
314        if !chunks.is_empty() {
315            results.lock().extend(chunks);
316        }
317
318        Ok(())
319    }
320
321    /// Pushes a chunk through the operator chain.
322    fn push_through_chain(
323        operators: &mut [Box<dyn PushOperator>],
324        chunk: DataChunk,
325        sink: &mut dyn Sink,
326    ) -> Result<bool, OperatorError> {
327        if operators.is_empty() {
328            return sink.consume(chunk);
329        }
330
331        let num_operators = operators.len();
332        let mut current_chunk = chunk;
333
334        for i in 0..num_operators {
335            let is_last = i == num_operators - 1;
336
337            if is_last {
338                return operators[i].push(current_chunk, sink);
339            }
340
341            // Intermediate: collect output
342            let mut collector = ChunkCollector::new();
343            let continue_processing = operators[i].push(current_chunk, &mut collector)?;
344
345            if !continue_processing || collector.is_empty() {
346                return Ok(continue_processing);
347            }
348
349            current_chunk = collector.into_single_chunk();
350        }
351
352        Ok(true)
353    }
354
355    /// Finalizes all operators in the chain.
356    fn finalize_chain(
357        operators: &mut [Box<dyn PushOperator>],
358        sink: &mut dyn Sink,
359    ) -> Result<(), OperatorError> {
360        if operators.is_empty() {
361            return sink.finalize();
362        }
363
364        let num_operators = operators.len();
365
366        for i in 0..num_operators {
367            let is_last = i == num_operators - 1;
368
369            if is_last {
370                operators[i].finalize(sink)?;
371            } else {
372                // Collect finalize output and push through remaining operators
373                let mut collector = ChunkCollector::new();
374                operators[i].finalize(&mut collector)?;
375
376                // Push through remaining operators
377                for chunk in collector.into_chunks() {
378                    Self::push_through_from_index(operators, i + 1, chunk, sink)?;
379                }
380            }
381        }
382
383        sink.finalize()
384    }
385
386    /// Pushes a chunk through operators starting at index.
387    fn push_through_from_index(
388        operators: &mut [Box<dyn PushOperator>],
389        start: usize,
390        chunk: DataChunk,
391        sink: &mut dyn Sink,
392    ) -> Result<bool, OperatorError> {
393        let num_operators = operators.len();
394        let mut current_chunk = chunk;
395
396        for i in start..num_operators {
397            let is_last = i == num_operators - 1;
398
399            if is_last {
400                return operators[i].push(current_chunk, sink);
401            }
402
403            let mut collector = ChunkCollector::new();
404            let continue_processing = operators[i].push(current_chunk, &mut collector)?;
405
406            if !continue_processing || collector.is_empty() {
407                return Ok(continue_processing);
408            }
409
410            current_chunk = collector.into_single_chunk();
411        }
412
413        sink.consume(current_chunk)
414    }
415}
416
417/// Collector sink that accumulates chunks.
418#[derive(Default)]
419pub struct CollectorSink {
420    chunks: Vec<DataChunk>,
421}
422
423impl CollectorSink {
424    /// Creates a new collector sink.
425    pub fn new() -> Self {
426        Self { chunks: Vec::new() }
427    }
428
429    /// Returns collected chunks.
430    pub fn into_chunks(self) -> Vec<DataChunk> {
431        self.chunks
432    }
433
434    /// Returns number of chunks collected.
435    pub fn len(&self) -> usize {
436        self.chunks.len()
437    }
438
439    /// Returns whether no chunks collected.
440    pub fn is_empty(&self) -> bool {
441        self.chunks.is_empty()
442    }
443
444    /// Returns total row count.
445    pub fn row_count(&self) -> usize {
446        self.chunks.iter().map(DataChunk::len).sum()
447    }
448}
449
450impl Sink for CollectorSink {
451    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
452        if !chunk.is_empty() {
453            self.chunks.push(chunk);
454        }
455        Ok(true)
456    }
457
458    fn finalize(&mut self) -> Result<(), OperatorError> {
459        Ok(())
460    }
461
462    fn name(&self) -> &'static str {
463        "ParallelCollectorSink"
464    }
465
466    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
467        self
468    }
469}
470
471#[cfg(test)]
472mod tests {
473    use super::*;
474    use crate::execution::parallel::source::RangeSource;
475    use crate::execution::vector::ValueVector;
476    use grafeo_common::types::Value;
477
478    /// Pass-through operator for testing.
479    struct PassThroughOp;
480
481    impl PushOperator for PassThroughOp {
482        fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
483            sink.consume(chunk)
484        }
485
486        fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
487            Ok(())
488        }
489
490        fn name(&self) -> &'static str {
491            "PassThrough"
492        }
493    }
494
495    /// Filter operator that keeps only even numbers.
496    struct EvenFilterOp;
497
498    impl PushOperator for EvenFilterOp {
499        fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
500            let col = chunk
501                .column(0)
502                .ok_or_else(|| OperatorError::Execution("Missing column".to_string()))?;
503
504            let mut filtered = ValueVector::new();
505            for i in 0..chunk.len() {
506                if let Some(Value::Int64(v)) = col.get(i)
507                    && v % 2 == 0
508                {
509                    filtered.push(Value::Int64(v));
510                }
511            }
512
513            if !filtered.is_empty() {
514                sink.consume(DataChunk::new(vec![filtered]))?;
515            }
516            Ok(true)
517        }
518
519        fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
520            Ok(())
521        }
522
523        fn name(&self) -> &'static str {
524            "EvenFilter"
525        }
526    }
527
528    #[test]
529    fn test_parallel_pipeline_creation() {
530        let source = Arc::new(RangeSource::new(1000));
531        let factory = Arc::new(CloneableOperatorFactory::new());
532        let config = ParallelPipelineConfig::for_testing();
533
534        let pipeline = ParallelPipeline::new(source, factory, config);
535        assert_eq!(pipeline.config.num_workers, 2);
536    }
537
538    #[test]
539    fn test_parallel_pipeline_empty_source() {
540        let source = Arc::new(RangeSource::new(0));
541        let factory = Arc::new(CloneableOperatorFactory::new());
542
543        let pipeline = ParallelPipeline::simple(source, factory);
544        let result = pipeline.execute().unwrap();
545
546        assert!(result.chunks.is_empty());
547        assert_eq!(result.morsels_processed, 0);
548        assert_eq!(result.rows_processed, 0);
549    }
550
551    #[test]
552    fn test_parallel_pipeline_passthrough() {
553        let source = Arc::new(RangeSource::new(100));
554        let factory =
555            Arc::new(CloneableOperatorFactory::new().with_operator(|| Box::new(PassThroughOp)));
556        let config = ParallelPipelineConfig::for_testing();
557
558        let pipeline = ParallelPipeline::new(source, factory, config);
559        let result = pipeline.execute().unwrap();
560
561        // Should process all 100 rows
562        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
563        assert_eq!(total_rows, 100);
564        assert_eq!(result.rows_processed, 100);
565    }
566
567    #[test]
568    fn test_parallel_pipeline_filter() {
569        let source = Arc::new(RangeSource::new(100));
570        let factory =
571            Arc::new(CloneableOperatorFactory::new().with_operator(|| Box::new(EvenFilterOp)));
572        let config = ParallelPipelineConfig::for_testing();
573
574        let pipeline = ParallelPipeline::new(source, factory, config);
575        let result = pipeline.execute().unwrap();
576
577        // Should have 50 even numbers (0, 2, 4, ..., 98)
578        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
579        assert_eq!(total_rows, 50);
580    }
581
582    #[test]
583    fn test_parallel_pipeline_multiple_workers() {
584        let source = Arc::new(RangeSource::new(10000));
585        let factory = Arc::new(CloneableOperatorFactory::new());
586        let config = ParallelPipelineConfig::default().with_workers(4);
587
588        let pipeline = ParallelPipeline::new(source, factory, config);
589        let result = pipeline.execute().unwrap();
590
591        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
592        assert_eq!(total_rows, 10000);
593        assert_eq!(result.num_workers, 4);
594    }
595
596    #[test]
597    fn test_parallel_pipeline_under_pressure() {
598        let source = Arc::new(RangeSource::new(10000));
599        let factory = Arc::new(CloneableOperatorFactory::new());
600        let config = ParallelPipelineConfig::for_testing().with_pressure(PressureLevel::High);
601
602        let pipeline = ParallelPipeline::new(source, factory, config);
603        let result = pipeline.execute().unwrap();
604
605        // More morsels under pressure due to smaller size
606        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
607        assert_eq!(total_rows, 10000);
608    }
609
610    #[test]
611    fn test_cloneable_operator_factory() {
612        let factory = CloneableOperatorFactory::new()
613            .with_operator(|| Box::new(PassThroughOp))
614            .with_operator(|| Box::new(EvenFilterOp))
615            .with_pipeline_breakers();
616
617        assert_eq!(factory.chain_length(), 2);
618        assert!(factory.has_pipeline_breakers());
619
620        let chain = factory.create_chain();
621        assert_eq!(chain.len(), 2);
622    }
623
624    #[test]
625    fn test_collector_sink() {
626        let mut sink = CollectorSink::new();
627        assert!(sink.is_empty());
628
629        let values = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
630        let chunk = DataChunk::new(vec![ValueVector::from_values(&values)]);
631
632        sink.consume(chunk).unwrap();
633        assert!(!sink.is_empty());
634        assert_eq!(sink.len(), 1);
635        assert_eq!(sink.row_count(), 3);
636
637        let chunks = sink.into_chunks();
638        assert_eq!(chunks.len(), 1);
639    }
640
641    #[test]
642    fn test_pipeline_config() {
643        let config = ParallelPipelineConfig::default()
644            .with_workers(8)
645            .with_pressure(PressureLevel::Moderate);
646
647        assert_eq!(config.num_workers, 8);
648        assert_eq!(config.pressure_level, PressureLevel::Moderate);
649        assert!(config.effective_morsel_size() < DEFAULT_MORSEL_SIZE);
650    }
651}