graphos_core/execution/parallel/
pipeline.rs1use super::morsel::{DEFAULT_MORSEL_SIZE, compute_morsel_size};
7use super::scheduler::MorselScheduler;
8use super::source::ParallelSource;
9use crate::execution::chunk::DataChunk;
10use crate::execution::operators::OperatorError;
11use crate::execution::pipeline::{ChunkCollector, DEFAULT_CHUNK_SIZE, PushOperator, Sink};
12use graphos_common::memory::buffer::PressureLevel;
13use parking_lot::Mutex;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::thread;
17
18pub trait OperatorChainFactory: Send + Sync {
23 fn create_chain(&self) -> Vec<Box<dyn PushOperator>>;
27
28 fn has_pipeline_breakers(&self) -> bool;
32
33 fn chain_length(&self) -> usize;
35}
36
37pub struct CloneableOperatorFactory {
39 factories: Vec<Box<dyn Fn() -> Box<dyn PushOperator> + Send + Sync>>,
41 has_breakers: bool,
43}
44
45impl CloneableOperatorFactory {
46 pub fn new() -> Self {
48 Self {
49 factories: Vec::new(),
50 has_breakers: false,
51 }
52 }
53
54 #[must_use]
56 pub fn with_operator<F>(mut self, factory: F) -> Self
57 where
58 F: Fn() -> Box<dyn PushOperator> + Send + Sync + 'static,
59 {
60 self.factories.push(Box::new(factory));
61 self
62 }
63
64 #[must_use]
66 pub fn with_pipeline_breakers(mut self) -> Self {
67 self.has_breakers = true;
68 self
69 }
70}
71
72impl Default for CloneableOperatorFactory {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78impl OperatorChainFactory for CloneableOperatorFactory {
79 fn create_chain(&self) -> Vec<Box<dyn PushOperator>> {
80 self.factories.iter().map(|f| f()).collect()
81 }
82
83 fn has_pipeline_breakers(&self) -> bool {
84 self.has_breakers
85 }
86
87 fn chain_length(&self) -> usize {
88 self.factories.len()
89 }
90}
91
92pub struct ParallelPipelineResult {
94 pub chunks: Vec<DataChunk>,
96 pub num_workers: usize,
98 pub morsels_processed: usize,
100 pub rows_processed: usize,
102}
103
104#[derive(Debug, Clone)]
106pub struct ParallelPipelineConfig {
107 pub num_workers: usize,
109 pub morsel_size: usize,
111 pub chunk_size: usize,
113 pub preserve_order: bool,
115 pub pressure_level: PressureLevel,
117}
118
119impl Default for ParallelPipelineConfig {
120 fn default() -> Self {
121 Self {
122 num_workers: thread::available_parallelism()
123 .map(|n| n.get())
124 .unwrap_or(4),
125 morsel_size: DEFAULT_MORSEL_SIZE,
126 chunk_size: DEFAULT_CHUNK_SIZE,
127 preserve_order: false,
128 pressure_level: PressureLevel::Normal,
129 }
130 }
131}
132
133impl ParallelPipelineConfig {
134 #[must_use]
136 pub fn for_testing() -> Self {
137 Self {
138 num_workers: 2,
139 ..Default::default()
140 }
141 }
142
143 #[must_use]
145 pub fn with_workers(mut self, n: usize) -> Self {
146 self.num_workers = n.max(1);
147 self
148 }
149
150 #[must_use]
152 pub fn with_pressure(mut self, level: PressureLevel) -> Self {
153 self.pressure_level = level;
154 self
155 }
156
157 #[must_use]
159 pub fn effective_morsel_size(&self) -> usize {
160 compute_morsel_size(self.pressure_level)
161 }
162}
163
164pub struct ParallelPipeline {
168 source: Arc<dyn ParallelSource>,
170 operator_factory: Arc<dyn OperatorChainFactory>,
172 config: ParallelPipelineConfig,
174}
175
176impl ParallelPipeline {
177 pub fn new(
179 source: Arc<dyn ParallelSource>,
180 operator_factory: Arc<dyn OperatorChainFactory>,
181 config: ParallelPipelineConfig,
182 ) -> Self {
183 Self {
184 source,
185 operator_factory,
186 config,
187 }
188 }
189
190 pub fn simple(
192 source: Arc<dyn ParallelSource>,
193 operator_factory: Arc<dyn OperatorChainFactory>,
194 ) -> Self {
195 Self::new(source, operator_factory, ParallelPipelineConfig::default())
196 }
197
198 pub fn execute(&self) -> Result<ParallelPipelineResult, OperatorError> {
200 let morsel_size = self.config.effective_morsel_size();
201 let morsels = self.source.generate_morsels(morsel_size, 0);
202
203 if morsels.is_empty() {
204 return Ok(ParallelPipelineResult {
205 chunks: Vec::new(),
206 num_workers: self.config.num_workers,
207 morsels_processed: 0,
208 rows_processed: 0,
209 });
210 }
211
212 let scheduler = Arc::new(MorselScheduler::new(self.config.num_workers));
214 let total_morsels = morsels.len();
215 scheduler.submit_batch(morsels);
216 scheduler.finish_submission();
217
218 let results = Arc::new(Mutex::new(Vec::new()));
220 let rows_processed = Arc::new(AtomicUsize::new(0));
221 let errors: Arc<Mutex<Option<OperatorError>>> = Arc::new(Mutex::new(None));
222
223 thread::scope(|s| {
225 for worker_id in 0..self.config.num_workers {
226 let scheduler = Arc::clone(&scheduler);
227 let source = Arc::clone(&self.source);
228 let factory = Arc::clone(&self.operator_factory);
229 let results = Arc::clone(&results);
230 let rows_processed = Arc::clone(&rows_processed);
231 let errors = Arc::clone(&errors);
232 let chunk_size = self.config.chunk_size;
233
234 s.spawn(move || {
235 if let Err(e) = Self::worker_loop(
236 worker_id,
237 scheduler,
238 source,
239 factory,
240 results,
241 rows_processed,
242 chunk_size,
243 ) {
244 let mut guard = errors.lock();
245 if guard.is_none() {
246 *guard = Some(e);
247 }
248 }
249 });
250 }
251 });
252
253 if let Some(e) = errors.lock().take() {
255 return Err(e);
256 }
257
258 let chunks = match Arc::try_unwrap(results) {
259 Ok(mutex) => mutex.into_inner(),
260 Err(arc) => arc.lock().clone(),
261 };
262
263 Ok(ParallelPipelineResult {
264 chunks,
265 num_workers: self.config.num_workers,
266 morsels_processed: total_morsels,
267 rows_processed: rows_processed.load(Ordering::Relaxed),
268 })
269 }
270
271 fn worker_loop(
273 _worker_id: usize,
274 scheduler: Arc<MorselScheduler>,
275 source: Arc<dyn ParallelSource>,
276 factory: Arc<dyn OperatorChainFactory>,
277 results: Arc<Mutex<Vec<DataChunk>>>,
278 rows_processed: Arc<AtomicUsize>,
279 chunk_size: usize,
280 ) -> Result<(), OperatorError> {
281 use super::scheduler::WorkerHandle;
282
283 let handle = WorkerHandle::new(scheduler);
285
286 let mut operators = factory.create_chain();
288 let mut local_sink = CollectorSink::new();
289
290 while let Some(morsel) = handle.get_work() {
292 let mut partition = source.create_partition(&morsel);
293 let mut morsel_rows = 0;
294
295 while let Some(chunk) = partition.next_chunk(chunk_size)? {
297 morsel_rows += chunk.len();
298 Self::push_through_chain(&mut operators, chunk, &mut local_sink)?;
299 }
300
301 rows_processed.fetch_add(morsel_rows, Ordering::Relaxed);
302 handle.complete_morsel();
303 }
304
305 Self::finalize_chain(&mut operators, &mut local_sink)?;
307
308 let chunks = local_sink.into_chunks();
310 if !chunks.is_empty() {
311 results.lock().extend(chunks);
312 }
313
314 Ok(())
315 }
316
317 fn push_through_chain(
319 operators: &mut [Box<dyn PushOperator>],
320 chunk: DataChunk,
321 sink: &mut dyn Sink,
322 ) -> Result<bool, OperatorError> {
323 if operators.is_empty() {
324 return sink.consume(chunk);
325 }
326
327 let num_operators = operators.len();
328 let mut current_chunk = chunk;
329
330 for i in 0..num_operators {
331 let is_last = i == num_operators - 1;
332
333 if is_last {
334 return operators[i].push(current_chunk, sink);
335 }
336
337 let mut collector = ChunkCollector::new();
339 let continue_processing = operators[i].push(current_chunk, &mut collector)?;
340
341 if !continue_processing || collector.is_empty() {
342 return Ok(continue_processing);
343 }
344
345 current_chunk = collector.into_single_chunk();
346 }
347
348 Ok(true)
349 }
350
351 fn finalize_chain(
353 operators: &mut [Box<dyn PushOperator>],
354 sink: &mut dyn Sink,
355 ) -> Result<(), OperatorError> {
356 if operators.is_empty() {
357 return sink.finalize();
358 }
359
360 let num_operators = operators.len();
361
362 for i in 0..num_operators {
363 let is_last = i == num_operators - 1;
364
365 if is_last {
366 operators[i].finalize(sink)?;
367 } else {
368 let mut collector = ChunkCollector::new();
370 operators[i].finalize(&mut collector)?;
371
372 for chunk in collector.into_chunks() {
374 Self::push_through_from_index(operators, i + 1, chunk, sink)?;
375 }
376 }
377 }
378
379 sink.finalize()
380 }
381
382 fn push_through_from_index(
384 operators: &mut [Box<dyn PushOperator>],
385 start: usize,
386 chunk: DataChunk,
387 sink: &mut dyn Sink,
388 ) -> Result<bool, OperatorError> {
389 let num_operators = operators.len();
390 let mut current_chunk = chunk;
391
392 for i in start..num_operators {
393 let is_last = i == num_operators - 1;
394
395 if is_last {
396 return operators[i].push(current_chunk, sink);
397 }
398
399 let mut collector = ChunkCollector::new();
400 let continue_processing = operators[i].push(current_chunk, &mut collector)?;
401
402 if !continue_processing || collector.is_empty() {
403 return Ok(continue_processing);
404 }
405
406 current_chunk = collector.into_single_chunk();
407 }
408
409 sink.consume(current_chunk)
410 }
411}
412
413#[derive(Default)]
415pub struct CollectorSink {
416 chunks: Vec<DataChunk>,
417}
418
419impl CollectorSink {
420 pub fn new() -> Self {
422 Self { chunks: Vec::new() }
423 }
424
425 pub fn into_chunks(self) -> Vec<DataChunk> {
427 self.chunks
428 }
429
430 pub fn len(&self) -> usize {
432 self.chunks.len()
433 }
434
435 pub fn is_empty(&self) -> bool {
437 self.chunks.is_empty()
438 }
439
440 pub fn row_count(&self) -> usize {
442 self.chunks.iter().map(DataChunk::len).sum()
443 }
444}
445
446impl Sink for CollectorSink {
447 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
448 if !chunk.is_empty() {
449 self.chunks.push(chunk);
450 }
451 Ok(true)
452 }
453
454 fn finalize(&mut self) -> Result<(), OperatorError> {
455 Ok(())
456 }
457
458 fn name(&self) -> &'static str {
459 "ParallelCollectorSink"
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466 use crate::execution::parallel::source::RangeSource;
467 use crate::execution::vector::ValueVector;
468 use graphos_common::types::Value;
469
470 struct PassThroughOp;
472
473 impl PushOperator for PassThroughOp {
474 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
475 sink.consume(chunk)
476 }
477
478 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
479 Ok(())
480 }
481
482 fn name(&self) -> &'static str {
483 "PassThrough"
484 }
485 }
486
487 struct EvenFilterOp;
489
490 impl PushOperator for EvenFilterOp {
491 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
492 let col = chunk
493 .column(0)
494 .ok_or_else(|| OperatorError::Execution("Missing column".to_string()))?;
495
496 let mut filtered = ValueVector::new();
497 for i in 0..chunk.len() {
498 if let Some(Value::Int64(v)) = col.get(i) {
499 if v % 2 == 0 {
500 filtered.push(Value::Int64(v));
501 }
502 }
503 }
504
505 if !filtered.is_empty() {
506 sink.consume(DataChunk::new(vec![filtered]))?;
507 }
508 Ok(true)
509 }
510
511 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
512 Ok(())
513 }
514
515 fn name(&self) -> &'static str {
516 "EvenFilter"
517 }
518 }
519
520 #[test]
521 fn test_parallel_pipeline_creation() {
522 let source = Arc::new(RangeSource::new(1000));
523 let factory = Arc::new(CloneableOperatorFactory::new());
524 let config = ParallelPipelineConfig::for_testing();
525
526 let pipeline = ParallelPipeline::new(source, factory, config);
527 assert_eq!(pipeline.config.num_workers, 2);
528 }
529
530 #[test]
531 fn test_parallel_pipeline_empty_source() {
532 let source = Arc::new(RangeSource::new(0));
533 let factory = Arc::new(CloneableOperatorFactory::new());
534
535 let pipeline = ParallelPipeline::simple(source, factory);
536 let result = pipeline.execute().unwrap();
537
538 assert!(result.chunks.is_empty());
539 assert_eq!(result.morsels_processed, 0);
540 assert_eq!(result.rows_processed, 0);
541 }
542
543 #[test]
544 fn test_parallel_pipeline_passthrough() {
545 let source = Arc::new(RangeSource::new(100));
546 let factory =
547 Arc::new(CloneableOperatorFactory::new().with_operator(|| Box::new(PassThroughOp)));
548 let config = ParallelPipelineConfig::for_testing();
549
550 let pipeline = ParallelPipeline::new(source, factory, config);
551 let result = pipeline.execute().unwrap();
552
553 let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
555 assert_eq!(total_rows, 100);
556 assert_eq!(result.rows_processed, 100);
557 }
558
559 #[test]
560 fn test_parallel_pipeline_filter() {
561 let source = Arc::new(RangeSource::new(100));
562 let factory =
563 Arc::new(CloneableOperatorFactory::new().with_operator(|| Box::new(EvenFilterOp)));
564 let config = ParallelPipelineConfig::for_testing();
565
566 let pipeline = ParallelPipeline::new(source, factory, config);
567 let result = pipeline.execute().unwrap();
568
569 let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
571 assert_eq!(total_rows, 50);
572 }
573
574 #[test]
575 fn test_parallel_pipeline_multiple_workers() {
576 let source = Arc::new(RangeSource::new(10000));
577 let factory = Arc::new(CloneableOperatorFactory::new());
578 let config = ParallelPipelineConfig::default().with_workers(4);
579
580 let pipeline = ParallelPipeline::new(source, factory, config);
581 let result = pipeline.execute().unwrap();
582
583 let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
584 assert_eq!(total_rows, 10000);
585 assert_eq!(result.num_workers, 4);
586 }
587
588 #[test]
589 fn test_parallel_pipeline_under_pressure() {
590 let source = Arc::new(RangeSource::new(10000));
591 let factory = Arc::new(CloneableOperatorFactory::new());
592 let config = ParallelPipelineConfig::for_testing().with_pressure(PressureLevel::High);
593
594 let pipeline = ParallelPipeline::new(source, factory, config);
595 let result = pipeline.execute().unwrap();
596
597 let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
599 assert_eq!(total_rows, 10000);
600 }
601
602 #[test]
603 fn test_cloneable_operator_factory() {
604 let factory = CloneableOperatorFactory::new()
605 .with_operator(|| Box::new(PassThroughOp))
606 .with_operator(|| Box::new(EvenFilterOp))
607 .with_pipeline_breakers();
608
609 assert_eq!(factory.chain_length(), 2);
610 assert!(factory.has_pipeline_breakers());
611
612 let chain = factory.create_chain();
613 assert_eq!(chain.len(), 2);
614 }
615
616 #[test]
617 fn test_collector_sink() {
618 let mut sink = CollectorSink::new();
619 assert!(sink.is_empty());
620
621 let values = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
622 let chunk = DataChunk::new(vec![ValueVector::from_values(&values)]);
623
624 sink.consume(chunk).unwrap();
625 assert!(!sink.is_empty());
626 assert_eq!(sink.len(), 1);
627 assert_eq!(sink.row_count(), 3);
628
629 let chunks = sink.into_chunks();
630 assert_eq!(chunks.len(), 1);
631 }
632
633 #[test]
634 fn test_pipeline_config() {
635 let config = ParallelPipelineConfig::default()
636 .with_workers(8)
637 .with_pressure(PressureLevel::Moderate);
638
639 assert_eq!(config.num_workers, 8);
640 assert_eq!(config.pressure_level, PressureLevel::Moderate);
641 assert!(config.effective_morsel_size() < DEFAULT_MORSEL_SIZE);
642 }
643}