Skip to main content

grafeo_core/execution/parallel/
pipeline.rs

1//! Parallel pipeline execution with morsel-driven scheduling.
2//!
3//! Executes push-based pipelines in parallel using work-stealing schedulers
4//! and per-worker operator instances.
5
6use super::morsel::{DEFAULT_MORSEL_SIZE, compute_morsel_size};
7use super::scheduler::MorselScheduler;
8use super::source::ParallelSource;
9use crate::execution::chunk::DataChunk;
10use crate::execution::operators::OperatorError;
11use crate::execution::pipeline::{ChunkCollector, DEFAULT_CHUNK_SIZE, PushOperator, Sink};
12use grafeo_common::memory::buffer::PressureLevel;
13use parking_lot::Mutex;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::thread;
17
18/// Factory for creating per-worker operator chains.
19///
20/// Each worker needs its own operator instances since operators may have
21/// internal state (e.g., aggregation accumulators).
22pub trait OperatorChainFactory: Send + Sync {
23    /// Creates a new operator chain for a worker.
24    ///
25    /// Each call should return fresh operator instances.
26    fn create_chain(&self) -> Vec<Box<dyn PushOperator>>;
27
28    /// Returns whether the chain contains pipeline breakers.
29    ///
30    /// Pipeline breakers (Sort, Aggregate, Distinct) require a merge phase.
31    fn has_pipeline_breakers(&self) -> bool;
32
33    /// Returns the number of operators in the chain.
34    fn chain_length(&self) -> usize;
35}
36
37/// Simple factory that clones a prototype chain.
38pub struct CloneableOperatorFactory {
39    /// Factory functions for each operator.
40    factories: Vec<Box<dyn Fn() -> Box<dyn PushOperator> + Send + Sync>>,
41    /// Whether chain has pipeline breakers.
42    has_breakers: bool,
43}
44
45impl CloneableOperatorFactory {
46    /// Creates a new factory.
47    pub fn new() -> Self {
48        Self {
49            factories: Vec::new(),
50            has_breakers: false,
51        }
52    }
53
54    /// Adds an operator factory.
55    #[must_use]
56    pub fn with_operator<F>(mut self, factory: F) -> Self
57    where
58        F: Fn() -> Box<dyn PushOperator> + Send + Sync + 'static,
59    {
60        self.factories.push(Box::new(factory));
61        self
62    }
63
64    /// Marks that the chain has pipeline breakers.
65    #[must_use]
66    pub fn with_pipeline_breakers(mut self) -> Self {
67        self.has_breakers = true;
68        self
69    }
70}
71
72impl Default for CloneableOperatorFactory {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78impl OperatorChainFactory for CloneableOperatorFactory {
79    fn create_chain(&self) -> Vec<Box<dyn PushOperator>> {
80        self.factories.iter().map(|f| f()).collect()
81    }
82
83    fn has_pipeline_breakers(&self) -> bool {
84        self.has_breakers
85    }
86
87    fn chain_length(&self) -> usize {
88        self.factories.len()
89    }
90}
91
92/// Result of parallel pipeline execution.
93pub struct ParallelPipelineResult {
94    /// Output chunks from all workers.
95    pub chunks: Vec<DataChunk>,
96    /// Number of workers used.
97    pub num_workers: usize,
98    /// Total morsels processed.
99    pub morsels_processed: usize,
100    /// Total rows processed.
101    pub rows_processed: usize,
102}
103
104/// Configuration for parallel pipeline execution.
105#[derive(Debug, Clone)]
106pub struct ParallelPipelineConfig {
107    /// Number of worker threads.
108    pub num_workers: usize,
109    /// Base morsel size (adjusted for memory pressure).
110    pub morsel_size: usize,
111    /// Chunk size for processing within morsels.
112    pub chunk_size: usize,
113    /// Whether to preserve output ordering.
114    pub preserve_order: bool,
115    /// Memory pressure level (affects morsel sizing).
116    pub pressure_level: PressureLevel,
117}
118
119impl Default for ParallelPipelineConfig {
120    fn default() -> Self {
121        Self {
122            num_workers: thread::available_parallelism().map_or(4, |n| n.get()),
123            morsel_size: DEFAULT_MORSEL_SIZE,
124            chunk_size: DEFAULT_CHUNK_SIZE,
125            preserve_order: false,
126            pressure_level: PressureLevel::Normal,
127        }
128    }
129}
130
131impl ParallelPipelineConfig {
132    /// Creates config for testing with limited workers.
133    #[must_use]
134    pub fn for_testing() -> Self {
135        Self {
136            num_workers: 2,
137            ..Default::default()
138        }
139    }
140
141    /// Sets the number of workers.
142    #[must_use]
143    pub fn with_workers(mut self, n: usize) -> Self {
144        self.num_workers = n.max(1);
145        self
146    }
147
148    /// Sets memory pressure level.
149    #[must_use]
150    pub fn with_pressure(mut self, level: PressureLevel) -> Self {
151        self.pressure_level = level;
152        self
153    }
154
155    /// Returns effective morsel size based on pressure.
156    #[must_use]
157    pub fn effective_morsel_size(&self) -> usize {
158        compute_morsel_size(self.pressure_level)
159    }
160}
161
162/// Parallel execution pipeline.
163///
164/// Distributes work across multiple threads using morsel-driven scheduling.
165pub struct ParallelPipeline {
166    /// Source of data (must support parallel partitioning).
167    source: Arc<dyn ParallelSource>,
168    /// Factory for creating operator chains.
169    operator_factory: Arc<dyn OperatorChainFactory>,
170    /// Configuration.
171    config: ParallelPipelineConfig,
172}
173
174impl ParallelPipeline {
175    /// Creates a new parallel pipeline.
176    pub fn new(
177        source: Arc<dyn ParallelSource>,
178        operator_factory: Arc<dyn OperatorChainFactory>,
179        config: ParallelPipelineConfig,
180    ) -> Self {
181        Self {
182            source,
183            operator_factory,
184            config,
185        }
186    }
187
188    /// Creates a simple parallel pipeline with default config.
189    pub fn simple(
190        source: Arc<dyn ParallelSource>,
191        operator_factory: Arc<dyn OperatorChainFactory>,
192    ) -> Self {
193        Self::new(source, operator_factory, ParallelPipelineConfig::default())
194    }
195
196    /// Executes the pipeline and returns results.
197    ///
198    /// # Errors
199    ///
200    /// Returns `Err` if any worker thread encounters an operator error.
201    pub fn execute(&self) -> Result<ParallelPipelineResult, OperatorError> {
202        let morsel_size = self.config.effective_morsel_size();
203        let morsels = self.source.generate_morsels(morsel_size, 0);
204
205        if morsels.is_empty() {
206            return Ok(ParallelPipelineResult {
207                chunks: Vec::new(),
208                num_workers: self.config.num_workers,
209                morsels_processed: 0,
210                rows_processed: 0,
211            });
212        }
213
214        // Create scheduler and submit morsels
215        let scheduler = Arc::new(MorselScheduler::new(self.config.num_workers));
216        let total_morsels = morsels.len();
217        scheduler.submit_batch(morsels);
218        scheduler.finish_submission();
219
220        // Shared results collector
221        let results = Arc::new(Mutex::new(Vec::new()));
222        let rows_processed = Arc::new(AtomicUsize::new(0));
223        let errors: Arc<Mutex<Option<OperatorError>>> = Arc::new(Mutex::new(None));
224
225        // Spawn workers
226        thread::scope(|s| {
227            for worker_id in 0..self.config.num_workers {
228                let scheduler = Arc::clone(&scheduler);
229                let source = Arc::clone(&self.source);
230                let factory = Arc::clone(&self.operator_factory);
231                let results = Arc::clone(&results);
232                let rows_processed = Arc::clone(&rows_processed);
233                let errors = Arc::clone(&errors);
234                let chunk_size = self.config.chunk_size;
235
236                s.spawn(move || {
237                    if let Err(e) = Self::worker_loop(
238                        worker_id,
239                        scheduler,
240                        source,
241                        factory,
242                        results,
243                        rows_processed,
244                        chunk_size,
245                    ) {
246                        let mut guard = errors.lock();
247                        if guard.is_none() {
248                            *guard = Some(e);
249                        }
250                    }
251                });
252            }
253        });
254
255        // Check for errors
256        if let Some(e) = errors.lock().take() {
257            return Err(e);
258        }
259
260        let chunks = match Arc::try_unwrap(results) {
261            Ok(mutex) => mutex.into_inner(),
262            Err(arc) => arc.lock().clone(),
263        };
264
265        Ok(ParallelPipelineResult {
266            chunks,
267            num_workers: self.config.num_workers,
268            morsels_processed: total_morsels,
269            rows_processed: rows_processed.load(Ordering::Relaxed),
270        })
271    }
272
273    /// Worker loop: process morsels until done.
274    fn worker_loop(
275        _worker_id: usize,
276        scheduler: Arc<MorselScheduler>,
277        source: Arc<dyn ParallelSource>,
278        factory: Arc<dyn OperatorChainFactory>,
279        results: Arc<Mutex<Vec<DataChunk>>>,
280        rows_processed: Arc<AtomicUsize>,
281        chunk_size: usize,
282    ) -> Result<(), OperatorError> {
283        use super::scheduler::WorkerHandle;
284
285        // Create worker handle (registers with scheduler for work-stealing)
286        let handle = WorkerHandle::new(scheduler);
287
288        // Create per-worker operator chain
289        let mut operators = factory.create_chain();
290        let mut local_sink = CollectorSink::new();
291
292        // Process morsels
293        while let Some(morsel) = handle.get_work() {
294            let mut partition = source.create_partition(&morsel);
295            let mut morsel_rows = 0;
296
297            // Process chunks within morsel
298            while let Some(chunk) = partition.next_chunk(chunk_size)? {
299                morsel_rows += chunk.len();
300                Self::push_through_chain(&mut operators, chunk, &mut local_sink)?;
301            }
302
303            rows_processed.fetch_add(morsel_rows, Ordering::Relaxed);
304            handle.complete_morsel();
305        }
306
307        // Finalize operators (important for pipeline breakers)
308        Self::finalize_chain(&mut operators, &mut local_sink)?;
309
310        // Collect results
311        let chunks = local_sink.into_chunks();
312        if !chunks.is_empty() {
313            results.lock().extend(chunks);
314        }
315
316        Ok(())
317    }
318
319    /// Pushes a chunk through the operator chain.
320    fn push_through_chain(
321        operators: &mut [Box<dyn PushOperator>],
322        chunk: DataChunk,
323        sink: &mut dyn Sink,
324    ) -> Result<bool, OperatorError> {
325        if operators.is_empty() {
326            return sink.consume(chunk);
327        }
328
329        let num_operators = operators.len();
330        let mut current_chunk = chunk;
331
332        for i in 0..num_operators {
333            let is_last = i == num_operators - 1;
334
335            if is_last {
336                return operators[i].push(current_chunk, sink);
337            }
338
339            // Intermediate: collect output
340            let mut collector = ChunkCollector::new();
341            let continue_processing = operators[i].push(current_chunk, &mut collector)?;
342
343            if !continue_processing || collector.is_empty() {
344                return Ok(continue_processing);
345            }
346
347            current_chunk = collector.into_single_chunk();
348        }
349
350        Ok(true)
351    }
352
353    /// Finalizes all operators in the chain.
354    fn finalize_chain(
355        operators: &mut [Box<dyn PushOperator>],
356        sink: &mut dyn Sink,
357    ) -> Result<(), OperatorError> {
358        if operators.is_empty() {
359            return sink.finalize();
360        }
361
362        let num_operators = operators.len();
363
364        for i in 0..num_operators {
365            let is_last = i == num_operators - 1;
366
367            if is_last {
368                operators[i].finalize(sink)?;
369            } else {
370                // Collect finalize output and push through remaining operators
371                let mut collector = ChunkCollector::new();
372                operators[i].finalize(&mut collector)?;
373
374                // Push through remaining operators
375                for chunk in collector.into_chunks() {
376                    Self::push_through_from_index(operators, i + 1, chunk, sink)?;
377                }
378            }
379        }
380
381        sink.finalize()
382    }
383
384    /// Pushes a chunk through operators starting at index.
385    fn push_through_from_index(
386        operators: &mut [Box<dyn PushOperator>],
387        start: usize,
388        chunk: DataChunk,
389        sink: &mut dyn Sink,
390    ) -> Result<bool, OperatorError> {
391        let num_operators = operators.len();
392        let mut current_chunk = chunk;
393
394        for i in start..num_operators {
395            let is_last = i == num_operators - 1;
396
397            if is_last {
398                return operators[i].push(current_chunk, sink);
399            }
400
401            let mut collector = ChunkCollector::new();
402            let continue_processing = operators[i].push(current_chunk, &mut collector)?;
403
404            if !continue_processing || collector.is_empty() {
405                return Ok(continue_processing);
406            }
407
408            current_chunk = collector.into_single_chunk();
409        }
410
411        sink.consume(current_chunk)
412    }
413}
414
415/// Collector sink that accumulates chunks.
416#[derive(Default)]
417pub struct CollectorSink {
418    chunks: Vec<DataChunk>,
419}
420
421impl CollectorSink {
422    /// Creates a new collector sink.
423    pub fn new() -> Self {
424        Self { chunks: Vec::new() }
425    }
426
427    /// Returns collected chunks.
428    pub fn into_chunks(self) -> Vec<DataChunk> {
429        self.chunks
430    }
431
432    /// Returns number of chunks collected.
433    pub fn len(&self) -> usize {
434        self.chunks.len()
435    }
436
437    /// Returns whether no chunks collected.
438    pub fn is_empty(&self) -> bool {
439        self.chunks.is_empty()
440    }
441
442    /// Returns total row count.
443    pub fn row_count(&self) -> usize {
444        self.chunks.iter().map(DataChunk::len).sum()
445    }
446}
447
448impl Sink for CollectorSink {
449    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
450        if !chunk.is_empty() {
451            self.chunks.push(chunk);
452        }
453        Ok(true)
454    }
455
456    fn finalize(&mut self) -> Result<(), OperatorError> {
457        Ok(())
458    }
459
460    fn name(&self) -> &'static str {
461        "ParallelCollectorSink"
462    }
463
464    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
465        self
466    }
467}
468
469#[cfg(test)]
470mod tests {
471    use super::*;
472    use crate::execution::parallel::source::RangeSource;
473    use crate::execution::vector::ValueVector;
474    use grafeo_common::types::Value;
475
476    /// Pass-through operator for testing.
477    struct PassThroughOp;
478
479    impl PushOperator for PassThroughOp {
480        fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
481            sink.consume(chunk)
482        }
483
484        fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
485            Ok(())
486        }
487
488        fn name(&self) -> &'static str {
489            "PassThrough"
490        }
491    }
492
493    /// Filter operator that keeps only even numbers.
494    struct EvenFilterOp;
495
496    impl PushOperator for EvenFilterOp {
497        fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
498            let col = chunk
499                .column(0)
500                .ok_or_else(|| OperatorError::Execution("Missing column".to_string()))?;
501
502            let mut filtered = ValueVector::new();
503            for i in 0..chunk.len() {
504                if let Some(Value::Int64(v)) = col.get(i)
505                    && v % 2 == 0
506                {
507                    filtered.push(Value::Int64(v));
508                }
509            }
510
511            if !filtered.is_empty() {
512                sink.consume(DataChunk::new(vec![filtered]))?;
513            }
514            Ok(true)
515        }
516
517        fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
518            Ok(())
519        }
520
521        fn name(&self) -> &'static str {
522            "EvenFilter"
523        }
524    }
525
526    #[test]
527    fn test_parallel_pipeline_creation() {
528        let source = Arc::new(RangeSource::new(1000));
529        let factory = Arc::new(CloneableOperatorFactory::new());
530        let config = ParallelPipelineConfig::for_testing();
531
532        let pipeline = ParallelPipeline::new(source, factory, config);
533        assert_eq!(pipeline.config.num_workers, 2);
534    }
535
536    #[test]
537    fn test_parallel_pipeline_empty_source() {
538        let source = Arc::new(RangeSource::new(0));
539        let factory = Arc::new(CloneableOperatorFactory::new());
540
541        let pipeline = ParallelPipeline::simple(source, factory);
542        let result = pipeline.execute().unwrap();
543
544        assert!(result.chunks.is_empty());
545        assert_eq!(result.morsels_processed, 0);
546        assert_eq!(result.rows_processed, 0);
547    }
548
549    #[test]
550    fn test_parallel_pipeline_passthrough() {
551        let source = Arc::new(RangeSource::new(100));
552        let factory =
553            Arc::new(CloneableOperatorFactory::new().with_operator(|| Box::new(PassThroughOp)));
554        let config = ParallelPipelineConfig::for_testing();
555
556        let pipeline = ParallelPipeline::new(source, factory, config);
557        let result = pipeline.execute().unwrap();
558
559        // Should process all 100 rows
560        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
561        assert_eq!(total_rows, 100);
562        assert_eq!(result.rows_processed, 100);
563    }
564
565    #[test]
566    fn test_parallel_pipeline_filter() {
567        let source = Arc::new(RangeSource::new(100));
568        let factory =
569            Arc::new(CloneableOperatorFactory::new().with_operator(|| Box::new(EvenFilterOp)));
570        let config = ParallelPipelineConfig::for_testing();
571
572        let pipeline = ParallelPipeline::new(source, factory, config);
573        let result = pipeline.execute().unwrap();
574
575        // Should have 50 even numbers (0, 2, 4, ..., 98)
576        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
577        assert_eq!(total_rows, 50);
578    }
579
580    #[test]
581    fn test_parallel_pipeline_multiple_workers() {
582        let source = Arc::new(RangeSource::new(10000));
583        let factory = Arc::new(CloneableOperatorFactory::new());
584        let config = ParallelPipelineConfig::default().with_workers(4);
585
586        let pipeline = ParallelPipeline::new(source, factory, config);
587        let result = pipeline.execute().unwrap();
588
589        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
590        assert_eq!(total_rows, 10000);
591        assert_eq!(result.num_workers, 4);
592    }
593
594    #[test]
595    fn test_parallel_pipeline_under_pressure() {
596        let source = Arc::new(RangeSource::new(10000));
597        let factory = Arc::new(CloneableOperatorFactory::new());
598        let config = ParallelPipelineConfig::for_testing().with_pressure(PressureLevel::High);
599
600        let pipeline = ParallelPipeline::new(source, factory, config);
601        let result = pipeline.execute().unwrap();
602
603        // More morsels under pressure due to smaller size
604        let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
605        assert_eq!(total_rows, 10000);
606    }
607
608    #[test]
609    fn test_cloneable_operator_factory() {
610        let factory = CloneableOperatorFactory::new()
611            .with_operator(|| Box::new(PassThroughOp))
612            .with_operator(|| Box::new(EvenFilterOp))
613            .with_pipeline_breakers();
614
615        assert_eq!(factory.chain_length(), 2);
616        assert!(factory.has_pipeline_breakers());
617
618        let chain = factory.create_chain();
619        assert_eq!(chain.len(), 2);
620    }
621
622    #[test]
623    fn test_collector_sink() {
624        let mut sink = CollectorSink::new();
625        assert!(sink.is_empty());
626
627        let values = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
628        let chunk = DataChunk::new(vec![ValueVector::from_values(&values)]);
629
630        sink.consume(chunk).unwrap();
631        assert!(!sink.is_empty());
632        assert_eq!(sink.len(), 1);
633        assert_eq!(sink.row_count(), 3);
634
635        let chunks = sink.into_chunks();
636        assert_eq!(chunks.len(), 1);
637    }
638
639    #[test]
640    fn test_pipeline_config() {
641        let config = ParallelPipelineConfig::default()
642            .with_workers(8)
643            .with_pressure(PressureLevel::Moderate);
644
645        assert_eq!(config.num_workers, 8);
646        assert_eq!(config.pressure_level, PressureLevel::Moderate);
647        assert!(config.effective_morsel_size() < DEFAULT_MORSEL_SIZE);
648    }
649}