grafeo_core/execution/parallel/
pipeline.rs1use super::morsel::{DEFAULT_MORSEL_SIZE, compute_morsel_size};
7use super::scheduler::MorselScheduler;
8use super::source::ParallelSource;
9use crate::execution::chunk::DataChunk;
10use crate::execution::operators::OperatorError;
11use crate::execution::pipeline::{ChunkCollector, DEFAULT_CHUNK_SIZE, PushOperator, Sink};
12use grafeo_common::memory::buffer::PressureLevel;
13use parking_lot::Mutex;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::thread;
17
18pub trait OperatorChainFactory: Send + Sync {
23 fn create_chain(&self) -> Vec<Box<dyn PushOperator>>;
27
28 fn has_pipeline_breakers(&self) -> bool;
32
33 fn chain_length(&self) -> usize;
35}
36
37pub struct CloneableOperatorFactory {
39 factories: Vec<Box<dyn Fn() -> Box<dyn PushOperator> + Send + Sync>>,
41 has_breakers: bool,
43}
44
45impl CloneableOperatorFactory {
46 pub fn new() -> Self {
48 Self {
49 factories: Vec::new(),
50 has_breakers: false,
51 }
52 }
53
54 #[must_use]
56 pub fn with_operator<F>(mut self, factory: F) -> Self
57 where
58 F: Fn() -> Box<dyn PushOperator> + Send + Sync + 'static,
59 {
60 self.factories.push(Box::new(factory));
61 self
62 }
63
64 #[must_use]
66 pub fn with_pipeline_breakers(mut self) -> Self {
67 self.has_breakers = true;
68 self
69 }
70}
71
72impl Default for CloneableOperatorFactory {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78impl OperatorChainFactory for CloneableOperatorFactory {
79 fn create_chain(&self) -> Vec<Box<dyn PushOperator>> {
80 self.factories.iter().map(|f| f()).collect()
81 }
82
83 fn has_pipeline_breakers(&self) -> bool {
84 self.has_breakers
85 }
86
87 fn chain_length(&self) -> usize {
88 self.factories.len()
89 }
90}
91
92pub struct ParallelPipelineResult {
94 pub chunks: Vec<DataChunk>,
96 pub num_workers: usize,
98 pub morsels_processed: usize,
100 pub rows_processed: usize,
102}
103
104#[derive(Debug, Clone)]
106pub struct ParallelPipelineConfig {
107 pub num_workers: usize,
109 pub morsel_size: usize,
111 pub chunk_size: usize,
113 pub preserve_order: bool,
115 pub pressure_level: PressureLevel,
117}
118
119impl Default for ParallelPipelineConfig {
120 fn default() -> Self {
121 Self {
122 num_workers: thread::available_parallelism()
123 .map(|n| n.get())
124 .unwrap_or(4),
125 morsel_size: DEFAULT_MORSEL_SIZE,
126 chunk_size: DEFAULT_CHUNK_SIZE,
127 preserve_order: false,
128 pressure_level: PressureLevel::Normal,
129 }
130 }
131}
132
133impl ParallelPipelineConfig {
134 #[must_use]
136 pub fn for_testing() -> Self {
137 Self {
138 num_workers: 2,
139 ..Default::default()
140 }
141 }
142
143 #[must_use]
145 pub fn with_workers(mut self, n: usize) -> Self {
146 self.num_workers = n.max(1);
147 self
148 }
149
150 #[must_use]
152 pub fn with_pressure(mut self, level: PressureLevel) -> Self {
153 self.pressure_level = level;
154 self
155 }
156
157 #[must_use]
159 pub fn effective_morsel_size(&self) -> usize {
160 compute_morsel_size(self.pressure_level)
161 }
162}
163
164pub struct ParallelPipeline {
168 source: Arc<dyn ParallelSource>,
170 operator_factory: Arc<dyn OperatorChainFactory>,
172 config: ParallelPipelineConfig,
174}
175
176impl ParallelPipeline {
177 pub fn new(
179 source: Arc<dyn ParallelSource>,
180 operator_factory: Arc<dyn OperatorChainFactory>,
181 config: ParallelPipelineConfig,
182 ) -> Self {
183 Self {
184 source,
185 operator_factory,
186 config,
187 }
188 }
189
190 pub fn simple(
192 source: Arc<dyn ParallelSource>,
193 operator_factory: Arc<dyn OperatorChainFactory>,
194 ) -> Self {
195 Self::new(source, operator_factory, ParallelPipelineConfig::default())
196 }
197
198 pub fn execute(&self) -> Result<ParallelPipelineResult, OperatorError> {
204 let morsel_size = self.config.effective_morsel_size();
205 let morsels = self.source.generate_morsels(morsel_size, 0);
206
207 if morsels.is_empty() {
208 return Ok(ParallelPipelineResult {
209 chunks: Vec::new(),
210 num_workers: self.config.num_workers,
211 morsels_processed: 0,
212 rows_processed: 0,
213 });
214 }
215
216 let scheduler = Arc::new(MorselScheduler::new(self.config.num_workers));
218 let total_morsels = morsels.len();
219 scheduler.submit_batch(morsels);
220 scheduler.finish_submission();
221
222 let results = Arc::new(Mutex::new(Vec::new()));
224 let rows_processed = Arc::new(AtomicUsize::new(0));
225 let errors: Arc<Mutex<Option<OperatorError>>> = Arc::new(Mutex::new(None));
226
227 thread::scope(|s| {
229 for worker_id in 0..self.config.num_workers {
230 let scheduler = Arc::clone(&scheduler);
231 let source = Arc::clone(&self.source);
232 let factory = Arc::clone(&self.operator_factory);
233 let results = Arc::clone(&results);
234 let rows_processed = Arc::clone(&rows_processed);
235 let errors = Arc::clone(&errors);
236 let chunk_size = self.config.chunk_size;
237
238 s.spawn(move || {
239 if let Err(e) = Self::worker_loop(
240 worker_id,
241 scheduler,
242 source,
243 factory,
244 results,
245 rows_processed,
246 chunk_size,
247 ) {
248 let mut guard = errors.lock();
249 if guard.is_none() {
250 *guard = Some(e);
251 }
252 }
253 });
254 }
255 });
256
257 if let Some(e) = errors.lock().take() {
259 return Err(e);
260 }
261
262 let chunks = match Arc::try_unwrap(results) {
263 Ok(mutex) => mutex.into_inner(),
264 Err(arc) => arc.lock().clone(),
265 };
266
267 Ok(ParallelPipelineResult {
268 chunks,
269 num_workers: self.config.num_workers,
270 morsels_processed: total_morsels,
271 rows_processed: rows_processed.load(Ordering::Relaxed),
272 })
273 }
274
275 fn worker_loop(
277 _worker_id: usize,
278 scheduler: Arc<MorselScheduler>,
279 source: Arc<dyn ParallelSource>,
280 factory: Arc<dyn OperatorChainFactory>,
281 results: Arc<Mutex<Vec<DataChunk>>>,
282 rows_processed: Arc<AtomicUsize>,
283 chunk_size: usize,
284 ) -> Result<(), OperatorError> {
285 use super::scheduler::WorkerHandle;
286
287 let handle = WorkerHandle::new(scheduler);
289
290 let mut operators = factory.create_chain();
292 let mut local_sink = CollectorSink::new();
293
294 while let Some(morsel) = handle.get_work() {
296 let mut partition = source.create_partition(&morsel);
297 let mut morsel_rows = 0;
298
299 while let Some(chunk) = partition.next_chunk(chunk_size)? {
301 morsel_rows += chunk.len();
302 Self::push_through_chain(&mut operators, chunk, &mut local_sink)?;
303 }
304
305 rows_processed.fetch_add(morsel_rows, Ordering::Relaxed);
306 handle.complete_morsel();
307 }
308
309 Self::finalize_chain(&mut operators, &mut local_sink)?;
311
312 let chunks = local_sink.into_chunks();
314 if !chunks.is_empty() {
315 results.lock().extend(chunks);
316 }
317
318 Ok(())
319 }
320
321 fn push_through_chain(
323 operators: &mut [Box<dyn PushOperator>],
324 chunk: DataChunk,
325 sink: &mut dyn Sink,
326 ) -> Result<bool, OperatorError> {
327 if operators.is_empty() {
328 return sink.consume(chunk);
329 }
330
331 let num_operators = operators.len();
332 let mut current_chunk = chunk;
333
334 for i in 0..num_operators {
335 let is_last = i == num_operators - 1;
336
337 if is_last {
338 return operators[i].push(current_chunk, sink);
339 }
340
341 let mut collector = ChunkCollector::new();
343 let continue_processing = operators[i].push(current_chunk, &mut collector)?;
344
345 if !continue_processing || collector.is_empty() {
346 return Ok(continue_processing);
347 }
348
349 current_chunk = collector.into_single_chunk();
350 }
351
352 Ok(true)
353 }
354
355 fn finalize_chain(
357 operators: &mut [Box<dyn PushOperator>],
358 sink: &mut dyn Sink,
359 ) -> Result<(), OperatorError> {
360 if operators.is_empty() {
361 return sink.finalize();
362 }
363
364 let num_operators = operators.len();
365
366 for i in 0..num_operators {
367 let is_last = i == num_operators - 1;
368
369 if is_last {
370 operators[i].finalize(sink)?;
371 } else {
372 let mut collector = ChunkCollector::new();
374 operators[i].finalize(&mut collector)?;
375
376 for chunk in collector.into_chunks() {
378 Self::push_through_from_index(operators, i + 1, chunk, sink)?;
379 }
380 }
381 }
382
383 sink.finalize()
384 }
385
386 fn push_through_from_index(
388 operators: &mut [Box<dyn PushOperator>],
389 start: usize,
390 chunk: DataChunk,
391 sink: &mut dyn Sink,
392 ) -> Result<bool, OperatorError> {
393 let num_operators = operators.len();
394 let mut current_chunk = chunk;
395
396 for i in start..num_operators {
397 let is_last = i == num_operators - 1;
398
399 if is_last {
400 return operators[i].push(current_chunk, sink);
401 }
402
403 let mut collector = ChunkCollector::new();
404 let continue_processing = operators[i].push(current_chunk, &mut collector)?;
405
406 if !continue_processing || collector.is_empty() {
407 return Ok(continue_processing);
408 }
409
410 current_chunk = collector.into_single_chunk();
411 }
412
413 sink.consume(current_chunk)
414 }
415}
416
417#[derive(Default)]
419pub struct CollectorSink {
420 chunks: Vec<DataChunk>,
421}
422
423impl CollectorSink {
424 pub fn new() -> Self {
426 Self { chunks: Vec::new() }
427 }
428
429 pub fn into_chunks(self) -> Vec<DataChunk> {
431 self.chunks
432 }
433
434 pub fn len(&self) -> usize {
436 self.chunks.len()
437 }
438
439 pub fn is_empty(&self) -> bool {
441 self.chunks.is_empty()
442 }
443
444 pub fn row_count(&self) -> usize {
446 self.chunks.iter().map(DataChunk::len).sum()
447 }
448}
449
450impl Sink for CollectorSink {
451 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
452 if !chunk.is_empty() {
453 self.chunks.push(chunk);
454 }
455 Ok(true)
456 }
457
458 fn finalize(&mut self) -> Result<(), OperatorError> {
459 Ok(())
460 }
461
462 fn name(&self) -> &'static str {
463 "ParallelCollectorSink"
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470 use crate::execution::parallel::source::RangeSource;
471 use crate::execution::vector::ValueVector;
472 use grafeo_common::types::Value;
473
474 struct PassThroughOp;
476
477 impl PushOperator for PassThroughOp {
478 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
479 sink.consume(chunk)
480 }
481
482 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
483 Ok(())
484 }
485
486 fn name(&self) -> &'static str {
487 "PassThrough"
488 }
489 }
490
491 struct EvenFilterOp;
493
494 impl PushOperator for EvenFilterOp {
495 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
496 let col = chunk
497 .column(0)
498 .ok_or_else(|| OperatorError::Execution("Missing column".to_string()))?;
499
500 let mut filtered = ValueVector::new();
501 for i in 0..chunk.len() {
502 if let Some(Value::Int64(v)) = col.get(i)
503 && v % 2 == 0
504 {
505 filtered.push(Value::Int64(v));
506 }
507 }
508
509 if !filtered.is_empty() {
510 sink.consume(DataChunk::new(vec![filtered]))?;
511 }
512 Ok(true)
513 }
514
515 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
516 Ok(())
517 }
518
519 fn name(&self) -> &'static str {
520 "EvenFilter"
521 }
522 }
523
524 #[test]
525 fn test_parallel_pipeline_creation() {
526 let source = Arc::new(RangeSource::new(1000));
527 let factory = Arc::new(CloneableOperatorFactory::new());
528 let config = ParallelPipelineConfig::for_testing();
529
530 let pipeline = ParallelPipeline::new(source, factory, config);
531 assert_eq!(pipeline.config.num_workers, 2);
532 }
533
534 #[test]
535 fn test_parallel_pipeline_empty_source() {
536 let source = Arc::new(RangeSource::new(0));
537 let factory = Arc::new(CloneableOperatorFactory::new());
538
539 let pipeline = ParallelPipeline::simple(source, factory);
540 let result = pipeline.execute().unwrap();
541
542 assert!(result.chunks.is_empty());
543 assert_eq!(result.morsels_processed, 0);
544 assert_eq!(result.rows_processed, 0);
545 }
546
547 #[test]
548 fn test_parallel_pipeline_passthrough() {
549 let source = Arc::new(RangeSource::new(100));
550 let factory =
551 Arc::new(CloneableOperatorFactory::new().with_operator(|| Box::new(PassThroughOp)));
552 let config = ParallelPipelineConfig::for_testing();
553
554 let pipeline = ParallelPipeline::new(source, factory, config);
555 let result = pipeline.execute().unwrap();
556
557 let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
559 assert_eq!(total_rows, 100);
560 assert_eq!(result.rows_processed, 100);
561 }
562
563 #[test]
564 fn test_parallel_pipeline_filter() {
565 let source = Arc::new(RangeSource::new(100));
566 let factory =
567 Arc::new(CloneableOperatorFactory::new().with_operator(|| Box::new(EvenFilterOp)));
568 let config = ParallelPipelineConfig::for_testing();
569
570 let pipeline = ParallelPipeline::new(source, factory, config);
571 let result = pipeline.execute().unwrap();
572
573 let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
575 assert_eq!(total_rows, 50);
576 }
577
578 #[test]
579 fn test_parallel_pipeline_multiple_workers() {
580 let source = Arc::new(RangeSource::new(10000));
581 let factory = Arc::new(CloneableOperatorFactory::new());
582 let config = ParallelPipelineConfig::default().with_workers(4);
583
584 let pipeline = ParallelPipeline::new(source, factory, config);
585 let result = pipeline.execute().unwrap();
586
587 let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
588 assert_eq!(total_rows, 10000);
589 assert_eq!(result.num_workers, 4);
590 }
591
592 #[test]
593 fn test_parallel_pipeline_under_pressure() {
594 let source = Arc::new(RangeSource::new(10000));
595 let factory = Arc::new(CloneableOperatorFactory::new());
596 let config = ParallelPipelineConfig::for_testing().with_pressure(PressureLevel::High);
597
598 let pipeline = ParallelPipeline::new(source, factory, config);
599 let result = pipeline.execute().unwrap();
600
601 let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
603 assert_eq!(total_rows, 10000);
604 }
605
606 #[test]
607 fn test_cloneable_operator_factory() {
608 let factory = CloneableOperatorFactory::new()
609 .with_operator(|| Box::new(PassThroughOp))
610 .with_operator(|| Box::new(EvenFilterOp))
611 .with_pipeline_breakers();
612
613 assert_eq!(factory.chain_length(), 2);
614 assert!(factory.has_pipeline_breakers());
615
616 let chain = factory.create_chain();
617 assert_eq!(chain.len(), 2);
618 }
619
620 #[test]
621 fn test_collector_sink() {
622 let mut sink = CollectorSink::new();
623 assert!(sink.is_empty());
624
625 let values = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
626 let chunk = DataChunk::new(vec![ValueVector::from_values(&values)]);
627
628 sink.consume(chunk).unwrap();
629 assert!(!sink.is_empty());
630 assert_eq!(sink.len(), 1);
631 assert_eq!(sink.row_count(), 3);
632
633 let chunks = sink.into_chunks();
634 assert_eq!(chunks.len(), 1);
635 }
636
637 #[test]
638 fn test_pipeline_config() {
639 let config = ParallelPipelineConfig::default()
640 .with_workers(8)
641 .with_pressure(PressureLevel::Moderate);
642
643 assert_eq!(config.num_workers, 8);
644 assert_eq!(config.pressure_level, PressureLevel::Moderate);
645 assert!(config.effective_morsel_size() < DEFAULT_MORSEL_SIZE);
646 }
647}