Skip to main content

graphos_core/execution/parallel/
pipeline.rs

1//! Parallel pipeline execution with morsel-driven scheduling.
2//!
3//! Executes push-based pipelines in parallel using work-stealing schedulers
4//! and per-worker operator instances.
5
6use super::morsel::{DEFAULT_MORSEL_SIZE, compute_morsel_size};
7use super::scheduler::MorselScheduler;
8use super::source::ParallelSource;
9use crate::execution::chunk::DataChunk;
10use crate::execution::operators::OperatorError;
11use crate::execution::pipeline::{ChunkCollector, DEFAULT_CHUNK_SIZE, PushOperator, Sink};
12use graphos_common::memory::buffer::PressureLevel;
13use parking_lot::Mutex;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::thread;
17
18/// Factory for creating per-worker operator chains.
19///
20/// Each worker needs its own operator instances since operators may have
21/// internal state (e.g., aggregation accumulators).
22pub trait OperatorChainFactory: Send + Sync {
23    /// Creates a new operator chain for a worker.
24    ///
25    /// Each call should return fresh operator instances.
26    fn create_chain(&self) -> Vec<Box<dyn PushOperator>>;
27
28    /// Returns whether the chain contains pipeline breakers.
29    ///
30    /// Pipeline breakers (Sort, Aggregate, Distinct) require a merge phase.
31    fn has_pipeline_breakers(&self) -> bool;
32
33    /// Returns the number of operators in the chain.
34    fn chain_length(&self) -> usize;
35}
36
37/// Simple factory that clones a prototype chain.
38pub struct CloneableOperatorFactory {
39    /// Factory functions for each operator.
40    factories: Vec<Box<dyn Fn() -> Box<dyn PushOperator> + Send + Sync>>,
41    /// Whether chain has pipeline breakers.
42    has_breakers: bool,
43}
44
45impl CloneableOperatorFactory {
46    /// Creates a new factory.
47    pub fn new() -> Self {
48        Self {
49            factories: Vec::new(),
50            has_breakers: false,
51        }
52    }
53
54    /// Adds an operator factory.
55    #[must_use]
56    pub fn with_operator<F>(mut self, factory: F) -> Self
57    where
58        F: Fn() -> Box<dyn PushOperator> + Send + Sync + 'static,
59    {
60        self.factories.push(Box::new(factory));
61        self
62    }
63
64    /// Marks that the chain has pipeline breakers.
65    #[must_use]
66    pub fn with_pipeline_breakers(mut self) -> Self {
67        self.has_breakers = true;
68        self
69    }
70}
71
72impl Default for CloneableOperatorFactory {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78impl OperatorChainFactory for CloneableOperatorFactory {
79    fn create_chain(&self) -> Vec<Box<dyn PushOperator>> {
80        self.factories.iter().map(|f| f()).collect()
81    }
82
83    fn has_pipeline_breakers(&self) -> bool {
84        self.has_breakers
85    }
86
87    fn chain_length(&self) -> usize {
88        self.factories.len()
89    }
90}
91
92/// Result of parallel pipeline execution.
93pub struct ParallelPipelineResult {
94    /// Output chunks from all workers.
95    pub chunks: Vec<DataChunk>,
96    /// Number of workers used.
97    pub num_workers: usize,
98    /// Total morsels processed.
99    pub morsels_processed: usize,
100    /// Total rows processed.
101    pub rows_processed: usize,
102}
103
104/// Configuration for parallel pipeline execution.
105#[derive(Debug, Clone)]
106pub struct ParallelPipelineConfig {
107    /// Number of worker threads.
108    pub num_workers: usize,
109    /// Base morsel size (adjusted for memory pressure).
110    pub morsel_size: usize,
111    /// Chunk size for processing within morsels.
112    pub chunk_size: usize,
113    /// Whether to preserve output ordering.
114    pub preserve_order: bool,
115    /// Memory pressure level (affects morsel sizing).
116    pub pressure_level: PressureLevel,
117}
118
119impl Default for ParallelPipelineConfig {
120    fn default() -> Self {
121        Self {
122            num_workers: thread::available_parallelism()
123                .map(|n| n.get())
124                .unwrap_or(4),
125            morsel_size: DEFAULT_MORSEL_SIZE,
126            chunk_size: DEFAULT_CHUNK_SIZE,
127            preserve_order: false,
128            pressure_level: PressureLevel::Normal,
129        }
130    }
131}
132
133impl ParallelPipelineConfig {
134    /// Creates config for testing with limited workers.
135    #[must_use]
136    pub fn for_testing() -> Self {
137        Self {
138            num_workers: 2,
139            ..Default::default()
140        }
141    }
142
143    /// Sets the number of workers.
144    #[must_use]
145    pub fn with_workers(mut self, n: usize) -> Self {
146        self.num_workers = n.max(1);
147        self
148    }
149
150    /// Sets memory pressure level.
151    #[must_use]
152    pub fn with_pressure(mut self, level: PressureLevel) -> Self {
153        self.pressure_level = level;
154        self
155    }
156
157    /// Returns effective morsel size based on pressure.
158    #[must_use]
159    pub fn effective_morsel_size(&self) -> usize {
160        compute_morsel_size(self.pressure_level)
161    }
162}
163
164/// Parallel execution pipeline.
165///
166/// Distributes work across multiple threads using morsel-driven scheduling.
167pub struct ParallelPipeline {
168    /// Source of data (must support parallel partitioning).
169    source: Arc<dyn ParallelSource>,
170    /// Factory for creating operator chains.
171    operator_factory: Arc<dyn OperatorChainFactory>,
172    /// Configuration.
173    config: ParallelPipelineConfig,
174}
175
176impl ParallelPipeline {
177    /// Creates a new parallel pipeline.
178    pub fn new(
179        source: Arc<dyn ParallelSource>,
180        operator_factory: Arc<dyn OperatorChainFactory>,
181        config: ParallelPipelineConfig,
182    ) -> Self {
183        Self {
184            source,
185            operator_factory,
186            config,
187        }
188    }
189
190    /// Creates a simple parallel pipeline with default config.
191    pub fn simple(
192        source: Arc<dyn ParallelSource>,
193        operator_factory: Arc<dyn OperatorChainFactory>,
194    ) -> Self {
195        Self::new(source, operator_factory, ParallelPipelineConfig::default())
196    }
197
198    /// Executes the pipeline and returns results.
199    pub fn execute(&self) -> Result<ParallelPipelineResult, OperatorError> {
200        let morsel_size = self.config.effective_morsel_size();
201        let morsels = self.source.generate_morsels(morsel_size, 0);
202
203        if morsels.is_empty() {
204            return Ok(ParallelPipelineResult {
205                chunks: Vec::new(),
206                num_workers: self.config.num_workers,
207                morsels_processed: 0,
208                rows_processed: 0,
209            });
210        }
211
212        // Create scheduler and submit morsels
213        let scheduler = Arc::new(MorselScheduler::new(self.config.num_workers));
214        let total_morsels = morsels.len();
215        scheduler.submit_batch(morsels);
216        scheduler.finish_submission();
217
218        // Shared results collector
219        let results = Arc::new(Mutex::new(Vec::new()));
220        let rows_processed = Arc::new(AtomicUsize::new(0));
221        let errors: Arc<Mutex<Option<OperatorError>>> = Arc::new(Mutex::new(None));
222
223        // Spawn workers
224        thread::scope(|s| {
225            for worker_id in 0..self.config.num_workers {
226                let scheduler = Arc::clone(&scheduler);
227                let source = Arc::clone(&self.source);
228                let factory = Arc::clone(&self.operator_factory);
229                let results = Arc::clone(&results);
230                let rows_processed = Arc::clone(&rows_processed);
231                let errors = Arc::clone(&errors);
232                let chunk_size = self.config.chunk_size;
233
234                s.spawn(move || {
235                    if let Err(e) = Self::worker_loop(
236                        worker_id,
237                        scheduler,
238                        source,
239                        factory,
240                        results,
241                        rows_processed,
242                        chunk_size,
243                    ) {
244                        let mut guard = errors.lock();
245                        if guard.is_none() {
246                            *guard = Some(e);
247                        }
248                    }
249                });
250            }
251        });
252
253        // Check for errors
254        if let Some(e) = errors.lock().take() {
255            return Err(e);
256        }
257
258        let chunks = match Arc::try_unwrap(results) {
259            Ok(mutex) => mutex.into_inner(),
260            Err(arc) => arc.lock().clone(),
261        };
262
263        Ok(ParallelPipelineResult {
264            chunks,
265            num_workers: self.config.num_workers,
266            morsels_processed: total_morsels,
267            rows_processed: rows_processed.load(Ordering::Relaxed),
268        })
269    }
270
271    /// Worker loop: process morsels until done.
272    fn worker_loop(
273        _worker_id: usize,
274        scheduler: Arc<MorselScheduler>,
275        source: Arc<dyn ParallelSource>,
276        factory: Arc<dyn OperatorChainFactory>,
277        results: Arc<Mutex<Vec<DataChunk>>>,
278        rows_processed: Arc<AtomicUsize>,
279        chunk_size: usize,
280    ) -> Result<(), OperatorError> {
281        use super::scheduler::WorkerHandle;
282
283        // Create worker handle (registers with scheduler for work-stealing)
284        let handle = WorkerHandle::new(scheduler);
285
286        // Create per-worker operator chain
287        let mut operators = factory.create_chain();
288        let mut local_sink = CollectorSink::new();
289
290        // Process morsels
291        while let Some(morsel) = handle.get_work() {
292            let mut partition = source.create_partition(&morsel);
293            let mut morsel_rows = 0;
294
295            // Process chunks within morsel
296            while let Some(chunk) = partition.next_chunk(chunk_size)? {
297                morsel_rows += chunk.len();
298                Self::push_through_chain(&mut operators, chunk, &mut local_sink)?;
299            }
300
301            rows_processed.fetch_add(morsel_rows, Ordering::Relaxed);
302            handle.complete_morsel();
303        }
304
305        // Finalize operators (important for pipeline breakers)
306        Self::finalize_chain(&mut operators, &mut local_sink)?;
307
308        // Collect results
309        let chunks = local_sink.into_chunks();
310        if !chunks.is_empty() {
311            results.lock().extend(chunks);
312        }
313
314        Ok(())
315    }
316
317    /// Pushes a chunk through the operator chain.
318    fn push_through_chain(
319        operators: &mut [Box<dyn PushOperator>],
320        chunk: DataChunk,
321        sink: &mut dyn Sink,
322    ) -> Result<bool, OperatorError> {
323        if operators.is_empty() {
324            return sink.consume(chunk);
325        }
326
327        let num_operators = operators.len();
328        let mut current_chunk = chunk;
329
330        for i in 0..num_operators {
331            let is_last = i == num_operators - 1;
332
333            if is_last {
334                return operators[i].push(current_chunk, sink);
335            }
336
337            // Intermediate: collect output
338            let mut collector = ChunkCollector::new();
339            let continue_processing = operators[i].push(current_chunk, &mut collector)?;
340
341            if !continue_processing || collector.is_empty() {
342                return Ok(continue_processing);
343            }
344
345            current_chunk = collector.into_single_chunk();
346        }
347
348        Ok(true)
349    }
350
351    /// Finalizes all operators in the chain.
352    fn finalize_chain(
353        operators: &mut [Box<dyn PushOperator>],
354        sink: &mut dyn Sink,
355    ) -> Result<(), OperatorError> {
356        if operators.is_empty() {
357            return sink.finalize();
358        }
359
360        let num_operators = operators.len();
361
362        for i in 0..num_operators {
363            let is_last = i == num_operators - 1;
364
365            if is_last {
366                operators[i].finalize(sink)?;
367            } else {
368                // Collect finalize output and push through remaining operators
369                let mut collector = ChunkCollector::new();
370                operators[i].finalize(&mut collector)?;
371
372                // Push through remaining operators
373                for chunk in collector.into_chunks() {
374                    Self::push_through_from_index(operators, i + 1, chunk, sink)?;
375                }
376            }
377        }
378
379        sink.finalize()
380    }
381
382    /// Pushes a chunk through operators starting at index.
383    fn push_through_from_index(
384        operators: &mut [Box<dyn PushOperator>],
385        start: usize,
386        chunk: DataChunk,
387        sink: &mut dyn Sink,
388    ) -> Result<bool, OperatorError> {
389        let num_operators = operators.len();
390        let mut current_chunk = chunk;
391
392        for i in start..num_operators {
393            let is_last = i == num_operators - 1;
394
395            if is_last {
396                return operators[i].push(current_chunk, sink);
397            }
398
399            let mut collector = ChunkCollector::new();
400            let continue_processing = operators[i].push(current_chunk, &mut collector)?;
401
402            if !continue_processing || collector.is_empty() {
403                return Ok(continue_processing);
404            }
405
406            current_chunk = collector.into_single_chunk();
407        }
408
409        sink.consume(current_chunk)
410    }
411}
412
413/// Collector sink that accumulates chunks.
414#[derive(Default)]
415pub struct CollectorSink {
416    chunks: Vec<DataChunk>,
417}
418
419impl CollectorSink {
420    /// Creates a new collector sink.
421    pub fn new() -> Self {
422        Self { chunks: Vec::new() }
423    }
424
425    /// Returns collected chunks.
426    pub fn into_chunks(self) -> Vec<DataChunk> {
427        self.chunks
428    }
429
430    /// Returns number of chunks collected.
431    pub fn len(&self) -> usize {
432        self.chunks.len()
433    }
434
435    /// Returns whether no chunks collected.
436    pub fn is_empty(&self) -> bool {
437        self.chunks.is_empty()
438    }
439
440    /// Returns total row count.
441    pub fn row_count(&self) -> usize {
442        self.chunks.iter().map(DataChunk::len).sum()
443    }
444}
445
446impl Sink for CollectorSink {
447    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
448        if !chunk.is_empty() {
449            self.chunks.push(chunk);
450        }
451        Ok(true)
452    }
453
454    fn finalize(&mut self) -> Result<(), OperatorError> {
455        Ok(())
456    }
457
458    fn name(&self) -> &'static str {
459        "ParallelCollectorSink"
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466    use crate::execution::parallel::source::RangeSource;
467    use crate::execution::vector::ValueVector;
468    use graphos_common::types::Value;
469
470    /// Pass-through operator for testing.
471    struct PassThroughOp;
472
473    impl PushOperator for PassThroughOp {
474        fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
475            sink.consume(chunk)
476        }
477
478        fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
479            Ok(())
480        }
481
482        fn name(&self) -> &'static str {
483            "PassThrough"
484        }
485    }
486
487    /// Filter operator that keeps only even numbers.
488    struct EvenFilterOp;
489
490    impl PushOperator for EvenFilterOp {
491        fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
492            let col = chunk
493                .column(0)
494                .ok_or_else(|| OperatorError::Execution("Missing column".to_string()))?;
495
496            let mut filtered = ValueVector::new();
497            for i in 0..chunk.len() {
498                if let Some(Value::Int64(v)) = col.get(i) {
499                    if v % 2 == 0 {
500                        filtered.push(Value::Int64(v));
501                    }
502                }
503            }
504
505            if !filtered.is_empty() {
506                sink.consume(DataChunk::new(vec![filtered]))?;
507            }
508            Ok(true)
509        }
510
511        fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
512            Ok(())
513        }
514
515        fn name(&self) -> &'static str {
516            "EvenFilter"
517        }
518    }
519
520    #[test]
521    fn test_parallel_pipeline_creation() {
522        let source = Arc::new(RangeSource::new(1000));
523        let factory = Arc::new(CloneableOperatorFactory::new());
524        let config = ParallelPipelineConfig::for_testing();
525
526        let pipeline = ParallelPipeline::new(source, factory, config);
527        assert_eq!(pipeline.config.num_workers, 2);
528    }
529
530    #[test]
531    fn test_parallel_pipeline_empty_source() {
532        let source = Arc::new(RangeSource::new(0));
533        let factory = Arc::new(CloneableOperatorFactory::new());
534
535        let pipeline = ParallelPipeline::simple(source, factory);
536        let result = pipeline.execute().unwrap();
537
538        assert!(result.chunks.is_empty());
539        assert_eq!(result.morsels_processed, 0);
540        assert_eq!(result.rows_processed, 0);
541    }
542
543    #[test]
544    fn test_parallel_pipeline_passthrough() {
545        let source = Arc::new(RangeSource::new(100));
546        let factory =
547            Arc::new(CloneableOperatorFactory::new().with_operator(|| Box::new(PassThroughOp)));
548        let config = ParallelPipelineConfig::for_testing();
549
550        let pipeline = ParallelPipeline::new(source, factory, config);
551        let result = pipeline.execute().unwrap();
552
553        // Should process all 100 rows
554        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
555        assert_eq!(total_rows, 100);
556        assert_eq!(result.rows_processed, 100);
557    }
558
559    #[test]
560    fn test_parallel_pipeline_filter() {
561        let source = Arc::new(RangeSource::new(100));
562        let factory =
563            Arc::new(CloneableOperatorFactory::new().with_operator(|| Box::new(EvenFilterOp)));
564        let config = ParallelPipelineConfig::for_testing();
565
566        let pipeline = ParallelPipeline::new(source, factory, config);
567        let result = pipeline.execute().unwrap();
568
569        // Should have 50 even numbers (0, 2, 4, ..., 98)
570        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
571        assert_eq!(total_rows, 50);
572    }
573
574    #[test]
575    fn test_parallel_pipeline_multiple_workers() {
576        let source = Arc::new(RangeSource::new(10000));
577        let factory = Arc::new(CloneableOperatorFactory::new());
578        let config = ParallelPipelineConfig::default().with_workers(4);
579
580        let pipeline = ParallelPipeline::new(source, factory, config);
581        let result = pipeline.execute().unwrap();
582
583        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
584        assert_eq!(total_rows, 10000);
585        assert_eq!(result.num_workers, 4);
586    }
587
588    #[test]
589    fn test_parallel_pipeline_under_pressure() {
590        let source = Arc::new(RangeSource::new(10000));
591        let factory = Arc::new(CloneableOperatorFactory::new());
592        let config = ParallelPipelineConfig::for_testing().with_pressure(PressureLevel::High);
593
594        let pipeline = ParallelPipeline::new(source, factory, config);
595        let result = pipeline.execute().unwrap();
596
597        // More morsels under pressure due to smaller size
598        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
599        assert_eq!(total_rows, 10000);
600    }
601
602    #[test]
603    fn test_cloneable_operator_factory() {
604        let factory = CloneableOperatorFactory::new()
605            .with_operator(|| Box::new(PassThroughOp))
606            .with_operator(|| Box::new(EvenFilterOp))
607            .with_pipeline_breakers();
608
609        assert_eq!(factory.chain_length(), 2);
610        assert!(factory.has_pipeline_breakers());
611
612        let chain = factory.create_chain();
613        assert_eq!(chain.len(), 2);
614    }
615
616    #[test]
617    fn test_collector_sink() {
618        let mut sink = CollectorSink::new();
619        assert!(sink.is_empty());
620
621        let values = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
622        let chunk = DataChunk::new(vec![ValueVector::from_values(&values)]);
623
624        sink.consume(chunk).unwrap();
625        assert!(!sink.is_empty());
626        assert_eq!(sink.len(), 1);
627        assert_eq!(sink.row_count(), 3);
628
629        let chunks = sink.into_chunks();
630        assert_eq!(chunks.len(), 1);
631    }
632
633    #[test]
634    fn test_pipeline_config() {
635        let config = ParallelPipelineConfig::default()
636            .with_workers(8)
637            .with_pressure(PressureLevel::Moderate);
638
639        assert_eq!(config.num_workers, 8);
640        assert_eq!(config.pressure_level, PressureLevel::Moderate);
641        assert!(config.effective_morsel_size() < DEFAULT_MORSEL_SIZE);
642    }
643}