1use crate::NodeId;
17use crate::node::Node;
18use crate::pipeline::Pipeline;
19use crate::planner::build_plan;
20use crate::type_token::Partition;
21use anyhow::{Result, anyhow, bail};
22use ordered_float::NotNan;
23use rayon::ThreadPoolBuilder;
24use rayon::prelude::*;
25use std::collections::BinaryHeap;
26use std::sync::Arc;
27
28#[cfg(feature = "checkpointing")]
29use crate::checkpoint::CheckpointConfig;
30
31#[derive(Clone, Copy, Debug)]
40pub enum ExecMode {
41 Sequential,
43 Parallel {
45 threads: Option<usize>,
47 partitions: Option<usize>,
49 },
50}
51
52pub struct Runner {
58 pub mode: ExecMode,
60 pub default_partitions: usize,
62 #[cfg(feature = "checkpointing")]
64 pub checkpoint_config: Option<CheckpointConfig>,
65}
66
67impl Default for Runner {
68 fn default() -> Self {
69 Self {
70 mode: ExecMode::Parallel {
71 threads: None,
72 partitions: None,
73 },
74 default_partitions: 2 * num_cpus::get().max(2),
76 #[cfg(feature = "checkpointing")]
77 checkpoint_config: None,
78 }
79 }
80}
81
82impl Runner {
83 pub fn run_collect<T: 'static + Send + Sync + Clone>(
100 &self,
101 p: &Pipeline,
102 terminal: NodeId,
103 ) -> Result<Vec<T>> {
104 #[cfg(feature = "metrics")]
106 p.record_metrics_start();
107
108 let plan = build_plan(p, terminal)?;
110 let chain = plan.chain;
111 let suggested_parts = plan.suggested_partitions;
112
113 #[cfg(feature = "checkpointing")]
114 let checkpoint_enabled = self.checkpoint_config.as_ref().is_some_and(|c| c.enabled);
115
116 #[cfg(feature = "checkpointing")]
117 let result = if checkpoint_enabled {
118 let config = self.checkpoint_config.as_ref().unwrap().clone();
120 match self.mode {
121 ExecMode::Sequential => exec_seq_with_checkpointing::<T>(chain, config),
122 ExecMode::Parallel {
123 threads,
124 partitions,
125 } => {
126 if let Some(t) = threads {
127 ThreadPoolBuilder::new().num_threads(t).build_global().ok();
128 }
129 let parts = partitions
130 .or(suggested_parts)
131 .unwrap_or(self.default_partitions);
132 exec_par_with_checkpointing::<T>(&chain, parts, config)
133 }
134 }
135 } else {
136 match self.mode {
138 ExecMode::Sequential => exec_seq::<T>(chain),
139 ExecMode::Parallel {
140 threads,
141 partitions,
142 } => {
143 if let Some(t) = threads {
144 ThreadPoolBuilder::new().num_threads(t).build_global().ok();
145 }
146 let parts = partitions
147 .or(suggested_parts)
148 .unwrap_or(self.default_partitions);
149 exec_par::<T>(&chain, parts)
150 }
151 }
152 };
153
154 #[cfg(not(feature = "checkpointing"))]
155 let result = match self.mode {
156 ExecMode::Sequential => exec_seq::<T>(chain),
157 ExecMode::Parallel {
158 threads,
159 partitions,
160 } => {
161 if let Some(t) = threads {
162 ThreadPoolBuilder::new().num_threads(t).build_global().ok();
164 }
165 let parts = partitions
166 .or(suggested_parts)
167 .unwrap_or(self.default_partitions);
168 exec_par::<T>(chain, parts)
169 }
170 };
171
172 #[cfg(feature = "metrics")]
174 p.record_metrics_end();
175
176 result
177 }
178}
179
180#[allow(clippy::too_many_lines)]
185fn exec_seq<T: 'static + Send + Sync + Clone>(chain: Vec<Node>) -> Result<Vec<T>> {
186 let mut buf: Option<Partition> = None;
187
188 let run_subplan_seq = |chain: Vec<Node>| -> Result<Vec<Partition>> {
189 let mut curr: Option<Partition> = None;
190 for node in chain {
191 curr = Some(match node {
192 Node::Source {
193 payload, vec_ops, ..
194 } => vec_ops
195 .clone_any(payload.as_ref())
196 .ok_or_else(|| anyhow!("unsupported source vec type"))?,
197 Node::Stateless(ops) => ops
198 .into_iter()
199 .fold(curr.take().unwrap(), |acc, op| op.apply(acc)),
200 Node::GroupByKey { local, merge } => {
201 let mid = local(curr.take().unwrap());
202 merge(vec![mid])
203 }
204 Node::CombineValues {
205 local_pairs,
206 local_groups,
207 merge,
208 } => {
209 let local = local_groups.map_or(local_pairs, |lg| lg);
211 let mid = local(curr.take().unwrap());
212 merge(vec![mid])
213 }
214 Node::Materialized(p) => Box::new(p) as Partition,
215 Node::Flatten { .. } => bail!("nested Flatten not supported in subplan"),
216 Node::CoGroup { .. } => bail!("nested CoGroup not supported in subplan"),
217 Node::CombineGlobal {
218 local,
219 merge,
220 finish,
221 ..
222 } => {
223 let mid = local(curr.take().unwrap());
224 let acc = merge(vec![mid]);
225 if let Some(h) = acc.downcast_ref::<BinaryHeap<NotNan<f64>>>() {
226 eprintln!("DEBUG: KMV heap len = {}", h.len()); }
228 finish(acc)
229 }
230 });
231 }
232 Ok(vec![curr.unwrap()])
233 };
234
235 for node in chain {
236 buf = Some(match node {
237 Node::Flatten {
238 chains,
239 coalesce,
240 merge,
241 } => {
242 let mut coalesced_inputs: Vec<Partition> = Vec::new();
244 for chain in chains.iter() {
245 let mut parts = run_subplan_seq(chain.clone())?;
246 let single: Partition = if parts.len() == 1 {
247 parts.pop().unwrap()
248 } else {
249 coalesce(parts)
250 };
251 coalesced_inputs.push(single);
252 }
253 merge(coalesced_inputs)
255 }
256 Node::CoGroup {
257 left_chain,
258 right_chain,
259 coalesce_left,
260 coalesce_right,
261 exec,
262 } => {
263 let mut left_parts = run_subplan_seq((*left_chain).clone())?;
265 let mut right_parts = run_subplan_seq((*right_chain).clone())?;
266
267 let left_single: Partition = if left_parts.len() == 1 {
268 left_parts.pop().unwrap()
269 } else {
270 coalesce_left(left_parts)
271 };
272 let right_single: Partition = if right_parts.len() == 1 {
273 right_parts.pop().unwrap()
274 } else {
275 coalesce_right(right_parts)
276 };
277
278 exec(left_single, right_single)
279 }
280 Node::Source {
281 payload, vec_ops, ..
282 } => vec_ops
283 .clone_any(payload.as_ref())
284 .ok_or_else(|| anyhow!("unsupported source vec type"))?,
285 Node::Stateless(ops) => ops
286 .into_iter()
287 .fold(buf.take().unwrap(), |acc, op| op.apply(acc)),
288 Node::GroupByKey { local, merge } => {
289 let mid = local(buf.take().unwrap());
290 merge(vec![mid])
291 }
292 Node::CombineValues {
293 local_pairs,
294 local_groups,
295 merge,
296 } => {
297 let local = local_groups.map_or(local_pairs, |lg| lg);
298 let mid = local(buf.take().unwrap());
299 merge(vec![mid])
300 }
301 Node::Materialized(p) => Box::new(
303 p.downcast_ref::<Vec<T>>()
304 .cloned()
305 .ok_or_else(|| anyhow!("terminal type mismatch"))?,
306 ) as Partition,
307 Node::CombineGlobal {
308 local,
309 merge,
310 finish,
311 ..
312 } => {
313 let mid_acc = local(buf.take().unwrap());
314 let acc = merge(vec![mid_acc]);
315 if let Some(h) = acc.downcast_ref::<BinaryHeap<NotNan<f64>>>() {
316 eprintln!("DEBUG: KMV heap len = {}", h.len()); }
318 finish(acc)
319 }
320 });
321 }
322
323 let out = buf.unwrap();
324 let v = *out
325 .downcast::<Vec<T>>()
326 .map_err(|_| anyhow!("terminal type mismatch"))?;
327 Ok(v)
328}
329
330#[allow(clippy::too_many_lines)]
336fn exec_par<T: 'static + Send + Sync + Clone>(chain: &[Node], partitions: usize) -> Result<Vec<T>> {
337 fn run_subplan_par(chain: &[Node], partitions: usize) -> Result<Vec<Partition>> {
341 let (payload, vec_ops, rest) = match &chain[0] {
343 Node::Source {
344 payload, vec_ops, ..
345 } => (payload.clone(), vec_ops.clone(), &chain[1..]),
346 _ => bail!("subplan must start with a Source"),
347 };
348 let total_len = vec_ops.len(payload.as_ref()).unwrap_or(0);
349 let parts = partitions.max(1).min(total_len.max(1));
350 let mut curr = vec_ops.split(payload.as_ref(), parts).unwrap_or_else(|| {
351 vec![
352 vec_ops
353 .clone_any(payload.as_ref())
354 .expect("cloneable source"),
355 ]
356 });
357
358 let mut i = 0usize;
359 while i < rest.len() {
360 match &rest[i] {
361 Node::Stateless(_) => {
362 let mut ops = Vec::new();
363 while i < rest.len() {
364 if let Node::Stateless(more) = &rest[i] {
365 ops.extend(more.iter().cloned());
366 i += 1;
367 } else {
368 break;
369 }
370 }
371 curr = curr
372 .into_par_iter()
373 .map(|p| ops.iter().fold(p, |acc, op| op.apply(acc)))
374 .collect();
375 }
376 Node::GroupByKey { local, merge } => {
377 let mids: Vec<Partition> = curr.into_par_iter().map(|p| local(p)).collect();
378 curr = vec![merge(mids)];
379 i += 1;
380 }
381 Node::CombineValues {
382 local_pairs,
383 local_groups,
384 merge,
385 } => {
386 let local = local_groups
387 .as_ref()
388 .map_or_else(|| local_pairs.clone(), |lg| lg.clone());
389 let mids: Vec<Partition> = curr.into_par_iter().map(|p| local(p)).collect();
390 curr = vec![merge(mids)];
391 i += 1;
392 }
393 Node::Source { .. } | Node::Materialized(_) => {
394 bail!("unexpected source/materialized in subplan")
395 }
396 Node::Flatten { .. } => bail!("nested Flatten not supported in subplan"),
397 Node::CoGroup { .. } => bail!("nested CoGroup not supported in subplan"),
398 Node::CombineGlobal {
399 local,
400 merge,
401 finish,
402 fanout,
403 } => {
404 let mut accs: Vec<Partition> = curr.into_par_iter().map(|p| local(p)).collect();
406
407 let f = fanout.unwrap_or(usize::MAX).max(1);
409 while accs.len() > 1 {
410 if f == usize::MAX {
411 accs = vec![merge(accs)];
412 break;
413 }
414 let mut next: Vec<Partition> = Vec::with_capacity(accs.len().div_ceil(f));
415 let mut it = accs.into_iter(); loop {
417 let mut group: Vec<Partition> = Vec::with_capacity(f);
418 for _ in 0..f {
419 if let Some(p) = it.next() {
420 group.push(p);
421 } else {
422 break;
423 }
424 }
425 if group.is_empty() {
426 break;
427 }
428 next.push(merge(group));
429 }
430 accs = next;
431 }
432
433 let acc = accs.pop().unwrap_or_else(|| merge(Vec::new()));
434 if let Some(h) = acc.downcast_ref::<BinaryHeap<NotNan<f64>>>() {
435 eprintln!("DEBUG: KMV heap len = {}", h.len()); }
437 curr = vec![finish(acc)];
438 i += 1;
439 }
440 }
441 }
442 Ok(curr)
443 }
444
445 let (payload, vec_ops, rest) = match &chain[0] {
447 Node::Source {
448 payload, vec_ops, ..
449 } => (Arc::clone(payload), Arc::clone(vec_ops), &chain[1..]),
450 _ => bail!("execution plan must start with a Source node"),
451 };
452
453 let total_len = vec_ops.len(payload.as_ref()).unwrap_or(0);
454 let parts = partitions.max(1).min(total_len.max(1));
455 let mut curr = vec_ops.split(payload.as_ref(), parts).unwrap_or_else(|| {
456 vec![
457 vec_ops
458 .clone_any(payload.as_ref())
459 .expect("cloneable source"),
460 ]
461 });
462
463 let mut i = 0usize;
464 while i < rest.len() {
465 match &rest[i] {
466 Node::Stateless(_) => {
467 let mut ops = Vec::new();
468 while i < rest.len() {
469 if let Node::Stateless(more) = &rest[i] {
470 ops.extend(more.iter().cloned());
471 i += 1;
472 } else {
473 break;
474 }
475 }
476 curr = curr
477 .into_par_iter()
478 .map(|p| ops.iter().fold(p, |acc, op| op.apply(acc)))
479 .collect();
480 }
481 Node::GroupByKey { local, merge } => {
482 let mids: Vec<Partition> = curr.into_par_iter().map(|p| local(p)).collect();
483 curr = vec![merge(mids)];
484 i += 1;
485 }
486 Node::CombineValues {
487 local_pairs,
488 local_groups,
489 merge,
490 } => {
491 let local = local_groups
492 .as_ref()
493 .map_or_else(|| local_pairs.clone(), |lg| lg.clone());
494 let mids: Vec<Partition> = curr.into_par_iter().map(|p| local(p)).collect();
495 curr = vec![merge(mids)];
496 i += 1;
497 }
498 Node::Flatten {
499 chains,
500 coalesce,
501 merge,
502 } => {
503 let coalesced_inputs: Vec<Partition> = chains
505 .iter()
506 .map(|chain| {
507 let parts = run_subplan_par(chain, partitions)?;
508 Ok(if parts.len() == 1 {
509 parts.into_iter().next().unwrap()
510 } else {
511 coalesce(parts)
512 })
513 })
514 .collect::<Result<Vec<_>>>()?;
515 curr = vec![merge(coalesced_inputs)];
516 i += 1;
517 }
518 Node::CoGroup {
519 left_chain,
520 right_chain,
521 coalesce_left,
522 coalesce_right,
523 exec,
524 } => {
525 let left_parts = run_subplan_par(&(**left_chain).clone(), partitions)?;
527 let right_parts = run_subplan_par(&(**right_chain).clone(), partitions)?;
528
529 let left_single = if left_parts.len() == 1 {
530 left_parts.into_iter().next().unwrap()
531 } else {
532 coalesce_left(left_parts)
533 };
534 let right_single = if right_parts.len() == 1 {
535 right_parts.into_iter().next().unwrap()
536 } else {
537 coalesce_right(right_parts)
538 };
539
540 curr = vec![exec(left_single, right_single)];
541 i += 1;
542 }
543 Node::Source { .. } | Node::Materialized(_) => {
544 bail!("unexpected additional source/materialized")
545 }
546 Node::CombineGlobal {
547 local,
548 merge,
549 finish,
550 fanout,
551 } => {
552 let mut accs: Vec<Partition> = curr.into_par_iter().map(|p| local(p)).collect();
553
554 let f = fanout.unwrap_or(usize::MAX).max(1);
555 while accs.len() > 1 {
556 if f == usize::MAX {
557 accs = vec![merge(accs)];
558 break;
559 }
560 let mut next: Vec<Partition> = Vec::with_capacity(accs.len().div_ceil(f));
561 let mut it = accs.into_iter();
562 loop {
563 let mut group: Vec<Partition> = Vec::with_capacity(f);
564 for _ in 0..f {
565 if let Some(p) = it.next() {
566 group.push(p);
567 } else {
568 break;
569 }
570 }
571 if group.is_empty() {
572 break;
573 }
574 next.push(merge(group));
575 }
576 accs = next;
577 }
578
579 let acc = accs.pop().unwrap_or_else(|| merge(Vec::new()));
580 if let Some(h) = acc.downcast_ref::<BinaryHeap<NotNan<f64>>>() {
581 eprintln!("DEBUG: KMV heap len = {}", h.len()); }
583 curr = vec![finish(acc)];
584 i += 1;
585 }
586 }
587 }
588
589 if curr.len() == 1 {
591 let one = curr.into_iter().next().unwrap();
592 let v = *one
593 .downcast::<Vec<T>>()
594 .map_err(|_| anyhow!("terminal type mismatch"))?;
595 Ok(v)
596 } else {
597 let mut out = Vec::<T>::new();
598 for part in curr {
599 let v = *part
600 .downcast::<Vec<T>>()
601 .map_err(|_| anyhow!("terminal type mismatch"))?;
602 out.extend(v);
603 }
604 Ok(out)
605 }
606}
607
608#[cfg(feature = "checkpointing")]
615#[allow(clippy::too_many_lines)]
616fn exec_seq_with_checkpointing<T: 'static + Send + Sync + Clone>(
617 chain: Vec<Node>,
618 config: CheckpointConfig,
619) -> Result<Vec<T>> {
620 use crate::checkpoint::{
621 CheckpointManager, CheckpointMetadata, CheckpointState, compute_checksum,
622 current_timestamp_ms, generate_pipeline_id,
623 };
624
625 let total_nodes = chain.len();
626 let mut manager = CheckpointManager::new(config)?;
627
628 let pipeline_id = generate_pipeline_id(&format!("{:?}", chain.len()));
630
631 if manager.config.auto_recover
633 && let Some(checkpoint_path) = manager.find_latest_checkpoint(&pipeline_id)?
634 {
635 eprintln!("[Checkpoint] Found existing checkpoint, attempting recovery...");
636 match manager.load_checkpoint(&checkpoint_path) {
637 Ok(state) => {
638 eprintln!(
639 "[Checkpoint] Recovered from node {} ({:.0}% complete)",
640 state.completed_node_index, state.metadata.progress_percent
641 );
642 }
645 Err(e) => {
646 eprintln!("[Checkpoint] Failed to load checkpoint: {e}");
647 }
648 }
649 }
650
651 let mut buf: Option<Partition> = None;
653
654 for (idx, node) in chain.into_iter().enumerate() {
655 let is_barrier = matches!(
656 node,
657 Node::GroupByKey { .. }
658 | Node::CombineValues { .. }
659 | Node::Flatten { .. }
660 | Node::CoGroup { .. }
661 | Node::CombineGlobal { .. }
662 );
663
664 let node_type = match &node {
665 Node::Source { .. } => "Source",
666 Node::Stateless(_) => "Stateless",
667 Node::GroupByKey { .. } => "GroupByKey",
668 Node::CombineValues { .. } => "CombineValues",
669 Node::Flatten { .. } => "Flatten",
670 Node::CoGroup { .. } => "CoGroup",
671 Node::Materialized(_) => "Materialized",
672 Node::CombineGlobal { .. } => "CombineGlobal",
673 };
674
675 buf = Some(match node {
677 Node::Source {
678 payload, vec_ops, ..
679 } => vec_ops
680 .clone_any(payload.as_ref())
681 .ok_or_else(|| anyhow!("unsupported source vec type"))?,
682 Node::Stateless(ops) => ops
683 .into_iter()
684 .fold(buf.take().unwrap(), |acc, op| op.apply(acc)),
685 Node::GroupByKey { local, merge } => {
686 let mid = local(buf.take().unwrap());
687 merge(vec![mid])
688 }
689 Node::CombineValues {
690 local_pairs,
691 local_groups,
692 merge,
693 } => {
694 let local = local_groups.map_or(local_pairs, |lg| lg);
695 let mid = local(buf.take().unwrap());
696 merge(vec![mid])
697 }
698 Node::Materialized(p) => Box::new(
699 p.downcast_ref::<Vec<T>>()
700 .cloned()
701 .ok_or_else(|| anyhow!("terminal type mismatch"))?,
702 ) as Partition,
703 Node::Flatten { .. } => bail!("Flatten requires subplan execution"),
704 Node::CoGroup { .. } => bail!("CoGroup requires subplan execution"),
705 Node::CombineGlobal {
706 local,
707 merge,
708 finish,
709 ..
710 } => {
711 let mid_acc = local(buf.take().unwrap());
712 let acc = merge(vec![mid_acc]);
713 finish(acc)
714 }
715 });
716
717 if manager.should_checkpoint(idx, is_barrier, total_nodes) {
719 let timestamp = current_timestamp_ms();
720 #[allow(
721 clippy::cast_possible_truncation,
722 clippy::cast_sign_loss,
723 clippy::cast_precision_loss
724 )]
725 let progress_percent = ((idx as f64 / total_nodes as f64) * 100.0) as u8;
726 let metadata_str = format!("{pipeline_id}:{idx}:{timestamp}:1");
727 let checksum = compute_checksum(metadata_str.as_bytes());
728
729 let state = CheckpointState {
730 pipeline_id: pipeline_id.clone(),
731 completed_node_index: idx,
732 timestamp,
733 partition_count: 1,
734 checksum,
735 exec_mode: "sequential".to_string(),
736 metadata: CheckpointMetadata {
737 total_nodes,
738 last_node_type: node_type.to_string(),
739 progress_percent,
740 },
741 };
742
743 match manager.save_checkpoint(&state) {
744 Ok(path) => {
745 eprintln!(
746 "[Checkpoint] Saved checkpoint at node {idx} ({:.0}% complete) to {:?}",
747 progress_percent,
748 path.display()
749 );
750 }
751 Err(e) => {
752 eprintln!("[Checkpoint] Warning: Failed to save checkpoint: {e}");
753 }
754 }
755 }
756 }
757
758 let out = buf.unwrap();
759 let v = *out
760 .downcast::<Vec<T>>()
761 .map_err(|_| anyhow!("terminal type mismatch"))?;
762
763 manager.clear_checkpoints(&pipeline_id).ok();
765 eprintln!("[Checkpoint] Pipeline completed successfully, checkpoints cleared");
766
767 Ok(v)
768}
769
770#[cfg(feature = "checkpointing")]
772fn exec_par_with_checkpointing<T: 'static + Send + Sync + Clone>(
773 chain: &[Node],
774 partitions: usize,
775 config: CheckpointConfig,
776) -> Result<Vec<T>> {
777 use crate::checkpoint::{
778 CheckpointManager, CheckpointMetadata, CheckpointState, compute_checksum,
779 current_timestamp_ms, generate_pipeline_id,
780 };
781
782 let total_nodes = chain.len();
783 let mut manager = CheckpointManager::new(config)?;
784
785 let pipeline_id = generate_pipeline_id(&format!("{:?}:{}", chain.len(), partitions));
787
788 if manager.config.auto_recover
790 && let Some(checkpoint_path) = manager.find_latest_checkpoint(&pipeline_id)?
791 {
792 eprintln!("[Checkpoint] Found existing checkpoint, attempting recovery...");
793 match manager.load_checkpoint(&checkpoint_path) {
794 Ok(state) => {
795 eprintln!(
796 "[Checkpoint] Recovered from node {} ({:.0}% complete)",
797 state.completed_node_index, state.metadata.progress_percent
798 );
799 }
800 Err(e) => {
801 eprintln!("[Checkpoint] Failed to load checkpoint: {e}");
802 }
803 }
804 }
805
806 let result = exec_par::<T>(chain, partitions);
810
811 if result.is_ok() {
813 manager.clear_checkpoints(&pipeline_id).ok();
814 eprintln!("[Checkpoint] Pipeline completed successfully, checkpoints cleared");
815 } else {
816 let timestamp = current_timestamp_ms();
818 let metadata_str = format!("{pipeline_id}:0:{timestamp}:{partitions}");
819 let checksum = compute_checksum(metadata_str.as_bytes());
820 let state = CheckpointState {
821 pipeline_id,
822 completed_node_index: 0,
823 timestamp,
824 partition_count: partitions,
825 checksum,
826 exec_mode: format!("parallel:{partitions}"),
827 metadata: CheckpointMetadata {
828 total_nodes,
829 last_node_type: "Failed".to_string(),
830 progress_percent: 0,
831 },
832 };
833 manager.save_checkpoint(&state).ok();
834 }
835
836 result
837}