grafeo_core/execution/parallel/
pipeline.rs1use super::morsel::{DEFAULT_MORSEL_SIZE, compute_morsel_size};
7use super::scheduler::MorselScheduler;
8use super::source::ParallelSource;
9use crate::execution::chunk::DataChunk;
10use crate::execution::operators::OperatorError;
11use crate::execution::pipeline::{ChunkCollector, DEFAULT_CHUNK_SIZE, PushOperator, Sink};
12use grafeo_common::memory::buffer::PressureLevel;
13use parking_lot::Mutex;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::thread;
17
18pub trait OperatorChainFactory: Send + Sync {
23 fn create_chain(&self) -> Vec<Box<dyn PushOperator>>;
27
28 fn has_pipeline_breakers(&self) -> bool;
32
33 fn chain_length(&self) -> usize;
35}
36
37pub struct CloneableOperatorFactory {
39 factories: Vec<Box<dyn Fn() -> Box<dyn PushOperator> + Send + Sync>>,
41 has_breakers: bool,
43}
44
45impl CloneableOperatorFactory {
46 pub fn new() -> Self {
48 Self {
49 factories: Vec::new(),
50 has_breakers: false,
51 }
52 }
53
54 #[must_use]
56 pub fn with_operator<F>(mut self, factory: F) -> Self
57 where
58 F: Fn() -> Box<dyn PushOperator> + Send + Sync + 'static,
59 {
60 self.factories.push(Box::new(factory));
61 self
62 }
63
64 #[must_use]
66 pub fn with_pipeline_breakers(mut self) -> Self {
67 self.has_breakers = true;
68 self
69 }
70}
71
72impl Default for CloneableOperatorFactory {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78impl OperatorChainFactory for CloneableOperatorFactory {
79 fn create_chain(&self) -> Vec<Box<dyn PushOperator>> {
80 self.factories.iter().map(|f| f()).collect()
81 }
82
83 fn has_pipeline_breakers(&self) -> bool {
84 self.has_breakers
85 }
86
87 fn chain_length(&self) -> usize {
88 self.factories.len()
89 }
90}
91
92pub struct ParallelPipelineResult {
94 pub chunks: Vec<DataChunk>,
96 pub num_workers: usize,
98 pub morsels_processed: usize,
100 pub rows_processed: usize,
102}
103
104#[derive(Debug, Clone)]
106pub struct ParallelPipelineConfig {
107 pub num_workers: usize,
109 pub morsel_size: usize,
111 pub chunk_size: usize,
113 pub preserve_order: bool,
115 pub pressure_level: PressureLevel,
117}
118
119impl Default for ParallelPipelineConfig {
120 fn default() -> Self {
121 Self {
122 num_workers: thread::available_parallelism().map_or(4, |n| n.get()),
123 morsel_size: DEFAULT_MORSEL_SIZE,
124 chunk_size: DEFAULT_CHUNK_SIZE,
125 preserve_order: false,
126 pressure_level: PressureLevel::Normal,
127 }
128 }
129}
130
131impl ParallelPipelineConfig {
132 #[must_use]
134 pub fn for_testing() -> Self {
135 Self {
136 num_workers: 2,
137 ..Default::default()
138 }
139 }
140
141 #[must_use]
143 pub fn with_workers(mut self, n: usize) -> Self {
144 self.num_workers = n.max(1);
145 self
146 }
147
148 #[must_use]
150 pub fn with_pressure(mut self, level: PressureLevel) -> Self {
151 self.pressure_level = level;
152 self
153 }
154
155 #[must_use]
157 pub fn effective_morsel_size(&self) -> usize {
158 compute_morsel_size(self.pressure_level)
159 }
160}
161
162pub struct ParallelPipeline {
166 source: Arc<dyn ParallelSource>,
168 operator_factory: Arc<dyn OperatorChainFactory>,
170 config: ParallelPipelineConfig,
172}
173
174impl ParallelPipeline {
175 pub fn new(
177 source: Arc<dyn ParallelSource>,
178 operator_factory: Arc<dyn OperatorChainFactory>,
179 config: ParallelPipelineConfig,
180 ) -> Self {
181 Self {
182 source,
183 operator_factory,
184 config,
185 }
186 }
187
188 pub fn simple(
190 source: Arc<dyn ParallelSource>,
191 operator_factory: Arc<dyn OperatorChainFactory>,
192 ) -> Self {
193 Self::new(source, operator_factory, ParallelPipelineConfig::default())
194 }
195
196 pub fn execute(&self) -> Result<ParallelPipelineResult, OperatorError> {
202 let morsel_size = self.config.effective_morsel_size();
203 let morsels = self.source.generate_morsels(morsel_size, 0);
204
205 if morsels.is_empty() {
206 return Ok(ParallelPipelineResult {
207 chunks: Vec::new(),
208 num_workers: self.config.num_workers,
209 morsels_processed: 0,
210 rows_processed: 0,
211 });
212 }
213
214 let scheduler = Arc::new(MorselScheduler::new(self.config.num_workers));
216 let total_morsels = morsels.len();
217 scheduler.submit_batch(morsels);
218 scheduler.finish_submission();
219
220 let results = Arc::new(Mutex::new(Vec::new()));
222 let rows_processed = Arc::new(AtomicUsize::new(0));
223 let errors: Arc<Mutex<Option<OperatorError>>> = Arc::new(Mutex::new(None));
224
225 thread::scope(|s| {
227 for worker_id in 0..self.config.num_workers {
228 let scheduler = Arc::clone(&scheduler);
229 let source = Arc::clone(&self.source);
230 let factory = Arc::clone(&self.operator_factory);
231 let results = Arc::clone(&results);
232 let rows_processed = Arc::clone(&rows_processed);
233 let errors = Arc::clone(&errors);
234 let chunk_size = self.config.chunk_size;
235
236 s.spawn(move || {
237 if let Err(e) = Self::worker_loop(
238 worker_id,
239 scheduler,
240 source,
241 factory,
242 results,
243 rows_processed,
244 chunk_size,
245 ) {
246 let mut guard = errors.lock();
247 if guard.is_none() {
248 *guard = Some(e);
249 }
250 }
251 });
252 }
253 });
254
255 if let Some(e) = errors.lock().take() {
257 return Err(e);
258 }
259
260 let chunks = match Arc::try_unwrap(results) {
261 Ok(mutex) => mutex.into_inner(),
262 Err(arc) => arc.lock().clone(),
263 };
264
265 Ok(ParallelPipelineResult {
266 chunks,
267 num_workers: self.config.num_workers,
268 morsels_processed: total_morsels,
269 rows_processed: rows_processed.load(Ordering::Relaxed),
270 })
271 }
272
273 fn worker_loop(
275 _worker_id: usize,
276 scheduler: Arc<MorselScheduler>,
277 source: Arc<dyn ParallelSource>,
278 factory: Arc<dyn OperatorChainFactory>,
279 results: Arc<Mutex<Vec<DataChunk>>>,
280 rows_processed: Arc<AtomicUsize>,
281 chunk_size: usize,
282 ) -> Result<(), OperatorError> {
283 use super::scheduler::WorkerHandle;
284
285 let handle = WorkerHandle::new(scheduler);
287
288 let mut operators = factory.create_chain();
290 let mut local_sink = CollectorSink::new();
291
292 while let Some(morsel) = handle.get_work() {
294 let mut partition = source.create_partition(&morsel);
295 let mut morsel_rows = 0;
296
297 while let Some(chunk) = partition.next_chunk(chunk_size)? {
299 morsel_rows += chunk.len();
300 Self::push_through_chain(&mut operators, chunk, &mut local_sink)?;
301 }
302
303 rows_processed.fetch_add(morsel_rows, Ordering::Relaxed);
304 handle.complete_morsel();
305 }
306
307 Self::finalize_chain(&mut operators, &mut local_sink)?;
309
310 let chunks = local_sink.into_chunks();
312 if !chunks.is_empty() {
313 results.lock().extend(chunks);
314 }
315
316 Ok(())
317 }
318
319 fn push_through_chain(
321 operators: &mut [Box<dyn PushOperator>],
322 chunk: DataChunk,
323 sink: &mut dyn Sink,
324 ) -> Result<bool, OperatorError> {
325 if operators.is_empty() {
326 return sink.consume(chunk);
327 }
328
329 let num_operators = operators.len();
330 let mut current_chunk = chunk;
331
332 for i in 0..num_operators {
333 let is_last = i == num_operators - 1;
334
335 if is_last {
336 return operators[i].push(current_chunk, sink);
337 }
338
339 let mut collector = ChunkCollector::new();
341 let continue_processing = operators[i].push(current_chunk, &mut collector)?;
342
343 if !continue_processing || collector.is_empty() {
344 return Ok(continue_processing);
345 }
346
347 current_chunk = collector.into_single_chunk();
348 }
349
350 Ok(true)
351 }
352
353 fn finalize_chain(
355 operators: &mut [Box<dyn PushOperator>],
356 sink: &mut dyn Sink,
357 ) -> Result<(), OperatorError> {
358 if operators.is_empty() {
359 return sink.finalize();
360 }
361
362 let num_operators = operators.len();
363
364 for i in 0..num_operators {
365 let is_last = i == num_operators - 1;
366
367 if is_last {
368 operators[i].finalize(sink)?;
369 } else {
370 let mut collector = ChunkCollector::new();
372 operators[i].finalize(&mut collector)?;
373
374 for chunk in collector.into_chunks() {
376 Self::push_through_from_index(operators, i + 1, chunk, sink)?;
377 }
378 }
379 }
380
381 sink.finalize()
382 }
383
384 fn push_through_from_index(
386 operators: &mut [Box<dyn PushOperator>],
387 start: usize,
388 chunk: DataChunk,
389 sink: &mut dyn Sink,
390 ) -> Result<bool, OperatorError> {
391 let num_operators = operators.len();
392 let mut current_chunk = chunk;
393
394 for i in start..num_operators {
395 let is_last = i == num_operators - 1;
396
397 if is_last {
398 return operators[i].push(current_chunk, sink);
399 }
400
401 let mut collector = ChunkCollector::new();
402 let continue_processing = operators[i].push(current_chunk, &mut collector)?;
403
404 if !continue_processing || collector.is_empty() {
405 return Ok(continue_processing);
406 }
407
408 current_chunk = collector.into_single_chunk();
409 }
410
411 sink.consume(current_chunk)
412 }
413}
414
415#[derive(Default)]
417pub struct CollectorSink {
418 chunks: Vec<DataChunk>,
419}
420
421impl CollectorSink {
422 pub fn new() -> Self {
424 Self { chunks: Vec::new() }
425 }
426
427 pub fn into_chunks(self) -> Vec<DataChunk> {
429 self.chunks
430 }
431
432 pub fn len(&self) -> usize {
434 self.chunks.len()
435 }
436
437 pub fn is_empty(&self) -> bool {
439 self.chunks.is_empty()
440 }
441
442 pub fn row_count(&self) -> usize {
444 self.chunks.iter().map(DataChunk::len).sum()
445 }
446}
447
448impl Sink for CollectorSink {
449 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
450 if !chunk.is_empty() {
451 self.chunks.push(chunk);
452 }
453 Ok(true)
454 }
455
456 fn finalize(&mut self) -> Result<(), OperatorError> {
457 Ok(())
458 }
459
460 fn name(&self) -> &'static str {
461 "ParallelCollectorSink"
462 }
463
464 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
465 self
466 }
467}
468
469#[cfg(test)]
470mod tests {
471 use super::*;
472 use crate::execution::parallel::source::RangeSource;
473 use crate::execution::vector::ValueVector;
474 use grafeo_common::types::Value;
475
476 struct PassThroughOp;
478
479 impl PushOperator for PassThroughOp {
480 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
481 sink.consume(chunk)
482 }
483
484 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
485 Ok(())
486 }
487
488 fn name(&self) -> &'static str {
489 "PassThrough"
490 }
491 }
492
493 struct EvenFilterOp;
495
496 impl PushOperator for EvenFilterOp {
497 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
498 let col = chunk
499 .column(0)
500 .ok_or_else(|| OperatorError::Execution("Missing column".to_string()))?;
501
502 let mut filtered = ValueVector::new();
503 for i in 0..chunk.len() {
504 if let Some(Value::Int64(v)) = col.get(i)
505 && v % 2 == 0
506 {
507 filtered.push(Value::Int64(v));
508 }
509 }
510
511 if !filtered.is_empty() {
512 sink.consume(DataChunk::new(vec![filtered]))?;
513 }
514 Ok(true)
515 }
516
517 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
518 Ok(())
519 }
520
521 fn name(&self) -> &'static str {
522 "EvenFilter"
523 }
524 }
525
526 #[test]
527 fn test_parallel_pipeline_creation() {
528 let source = Arc::new(RangeSource::new(1000));
529 let factory = Arc::new(CloneableOperatorFactory::new());
530 let config = ParallelPipelineConfig::for_testing();
531
532 let pipeline = ParallelPipeline::new(source, factory, config);
533 assert_eq!(pipeline.config.num_workers, 2);
534 }
535
536 #[test]
537 fn test_parallel_pipeline_empty_source() {
538 let source = Arc::new(RangeSource::new(0));
539 let factory = Arc::new(CloneableOperatorFactory::new());
540
541 let pipeline = ParallelPipeline::simple(source, factory);
542 let result = pipeline.execute().unwrap();
543
544 assert!(result.chunks.is_empty());
545 assert_eq!(result.morsels_processed, 0);
546 assert_eq!(result.rows_processed, 0);
547 }
548
549 #[test]
550 fn test_parallel_pipeline_passthrough() {
551 let source = Arc::new(RangeSource::new(100));
552 let factory =
553 Arc::new(CloneableOperatorFactory::new().with_operator(|| Box::new(PassThroughOp)));
554 let config = ParallelPipelineConfig::for_testing();
555
556 let pipeline = ParallelPipeline::new(source, factory, config);
557 let result = pipeline.execute().unwrap();
558
559 let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
561 assert_eq!(total_rows, 100);
562 assert_eq!(result.rows_processed, 100);
563 }
564
565 #[test]
566 fn test_parallel_pipeline_filter() {
567 let source = Arc::new(RangeSource::new(100));
568 let factory =
569 Arc::new(CloneableOperatorFactory::new().with_operator(|| Box::new(EvenFilterOp)));
570 let config = ParallelPipelineConfig::for_testing();
571
572 let pipeline = ParallelPipeline::new(source, factory, config);
573 let result = pipeline.execute().unwrap();
574
575 let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
577 assert_eq!(total_rows, 50);
578 }
579
580 #[test]
581 fn test_parallel_pipeline_multiple_workers() {
582 let source = Arc::new(RangeSource::new(10000));
583 let factory = Arc::new(CloneableOperatorFactory::new());
584 let config = ParallelPipelineConfig::default().with_workers(4);
585
586 let pipeline = ParallelPipeline::new(source, factory, config);
587 let result = pipeline.execute().unwrap();
588
589 let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
590 assert_eq!(total_rows, 10000);
591 assert_eq!(result.num_workers, 4);
592 }
593
594 #[test]
595 fn test_parallel_pipeline_under_pressure() {
596 let source = Arc::new(RangeSource::new(10000));
597 let factory = Arc::new(CloneableOperatorFactory::new());
598 let config = ParallelPipelineConfig::for_testing().with_pressure(PressureLevel::High);
599
600 let pipeline = ParallelPipeline::new(source, factory, config);
601 let result = pipeline.execute().unwrap();
602
603 let total_rows: usize = result.chunks.iter().map(DataChunk::len).sum();
605 assert_eq!(total_rows, 10000);
606 }
607
608 #[test]
609 fn test_cloneable_operator_factory() {
610 let factory = CloneableOperatorFactory::new()
611 .with_operator(|| Box::new(PassThroughOp))
612 .with_operator(|| Box::new(EvenFilterOp))
613 .with_pipeline_breakers();
614
615 assert_eq!(factory.chain_length(), 2);
616 assert!(factory.has_pipeline_breakers());
617
618 let chain = factory.create_chain();
619 assert_eq!(chain.len(), 2);
620 }
621
622 #[test]
623 fn test_collector_sink() {
624 let mut sink = CollectorSink::new();
625 assert!(sink.is_empty());
626
627 let values = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
628 let chunk = DataChunk::new(vec![ValueVector::from_values(&values)]);
629
630 sink.consume(chunk).unwrap();
631 assert!(!sink.is_empty());
632 assert_eq!(sink.len(), 1);
633 assert_eq!(sink.row_count(), 3);
634
635 let chunks = sink.into_chunks();
636 assert_eq!(chunks.len(), 1);
637 }
638
639 #[test]
640 fn test_pipeline_config() {
641 let config = ParallelPipelineConfig::default()
642 .with_workers(8)
643 .with_pressure(PressureLevel::Moderate);
644
645 assert_eq!(config.num_workers, 8);
646 assert_eq!(config.pressure_level, PressureLevel::Moderate);
647 assert!(config.effective_morsel_size() < DEFAULT_MORSEL_SIZE);
648 }
649}