Skip to main content

ironbeam/
runner.rs

1//! Execution engine.
2//!
3//! The `Runner` executes an optimized, linearized plan produced by the planner.
4//! It supports both **sequential** and **parallel** execution modes:
5//!
6//! - **Sequential** walks the node chain in a single thread, materializing one
7//!   partition buffer at a time.
8//! - **Parallel** uses `rayon` to evaluate partition-local work in parallel,
9//!   coalescing at barriers such as `GroupByKey`, `CombineValues`, and `CoGroup`.
10//!
11//! Determinism: within a single partition, stateless transforms preserve element
12//! order. Parallel execution may interleave partitions; callers that require a
13//! stable final order can use the `collect_*_sorted` helpers after the collection
14//! is complete.
15
16use crate::NodeId;
17use crate::node::Node;
18use crate::pipeline::Pipeline;
19use crate::planner::build_plan;
20use crate::type_token::Partition;
21use anyhow::{Result, anyhow, bail};
22use ordered_float::NotNan;
23use rayon::ThreadPoolBuilder;
24use rayon::prelude::*;
25use std::collections::BinaryHeap;
26use std::sync::Arc;
27
28#[cfg(feature = "checkpointing")]
29use crate::checkpoint::CheckpointConfig;
30
31/// Execution mode for a plan.
32///
33/// - `Sequential` runs in a single thread.
34/// - `Parallel` runs with optional thread count and partition count hints.
35///   If `threads` is `Some(n)`, a global rayon thread pool with `n` threads
36///   is installed for this process (first one wins; later calls are no-ops).
37///   If `partitions` is `None`, the planner's suggestion (if any) is used,
38///   otherwise `Runner::default_partitions`.
39#[derive(Clone, Copy, Debug)]
40pub enum ExecMode {
41    /// Single-threaded execution.
42    Sequential,
43    /// Parallel execution using rayon.
44    Parallel {
45        /// Optional rayon worker thread count.
46        threads: Option<usize>,
47        /// Optional number of source partitions.
48        partitions: Option<usize>,
49    },
50}
51
52/// Executes a pipeline produced by the builder API.
53///
54/// Construct a `Runner` and call [`Runner::run_collect`] with a pipeline and
55/// terminal node id. See `helpers` for higher-level `collect_*` convenience
56/// methods that build a `Runner` for you.
57pub struct Runner {
58    /// Selected execution mode.
59    pub mode: ExecMode,
60    /// Default partition count when neither the caller nor the planner suggests one.
61    pub default_partitions: usize,
62    /// Optional checkpoint configuration for fault tolerance.
63    #[cfg(feature = "checkpointing")]
64    pub checkpoint_config: Option<CheckpointConfig>,
65}
66
67impl Default for Runner {
68    fn default() -> Self {
69        Self {
70            mode: ExecMode::Parallel {
71                threads: None,
72                partitions: None,
73            },
74            // Heuristic default: 2× hardware threads (min 2)
75            default_partitions: 2 * num_cpus::get().max(2),
76            #[cfg(feature = "checkpointing")]
77            checkpoint_config: None,
78        }
79    }
80}
81
82impl Runner {
83    /// Execute the pipeline ending at `terminal`, collecting the terminal
84    /// vector as `Vec<T>`.
85    ///
86    /// This function:
87    /// 1. Builds an optimized plan with the planner.
88    /// 2. Chooses sequential or parallel engine based on `self.mode`.
89    /// 3. Honors planner's suggested partitioning unless overridden.
90    ///
91    /// # Errors
92    /// An error is returned if the plan is malformed (e.g., a missing source),
93    /// if a node encounters an unexpected input type, or if the terminal
94    /// materialized type does not match `T`.
95    ///
96    /// # Panics
97    ///
98    /// If the pipeline is in an inconsistent state, such as during concurrent modifications.
99    pub fn run_collect<T: 'static + Send + Sync + Clone>(
100        &self,
101        p: &Pipeline,
102        terminal: NodeId,
103    ) -> Result<Vec<T>> {
104        // Record start time in metrics
105        #[cfg(feature = "metrics")]
106        p.record_metrics_start();
107
108        // Get the optimized plan
109        let plan = build_plan(p, terminal)?;
110        let chain = plan.chain;
111        let suggested_parts = plan.suggested_partitions;
112
113        #[cfg(feature = "checkpointing")]
114        let checkpoint_enabled = self.checkpoint_config.as_ref().is_some_and(|c| c.enabled);
115
116        #[cfg(feature = "checkpointing")]
117        let result = if checkpoint_enabled {
118            // Run with checkpointing
119            let config = self.checkpoint_config.as_ref().unwrap().clone();
120            match self.mode {
121                ExecMode::Sequential => exec_seq_with_checkpointing::<T>(chain, config),
122                ExecMode::Parallel {
123                    threads,
124                    partitions,
125                } => {
126                    if let Some(t) = threads {
127                        ThreadPoolBuilder::new().num_threads(t).build_global().ok();
128                    }
129                    let parts = partitions
130                        .or(suggested_parts)
131                        .unwrap_or(self.default_partitions);
132                    exec_par_with_checkpointing::<T>(&chain, parts, config)
133                }
134            }
135        } else {
136            // Run without checkpointing
137            match self.mode {
138                ExecMode::Sequential => exec_seq::<T>(chain),
139                ExecMode::Parallel {
140                    threads,
141                    partitions,
142                } => {
143                    if let Some(t) = threads {
144                        ThreadPoolBuilder::new().num_threads(t).build_global().ok();
145                    }
146                    let parts = partitions
147                        .or(suggested_parts)
148                        .unwrap_or(self.default_partitions);
149                    exec_par::<T>(&chain, parts)
150                }
151            }
152        };
153
154        #[cfg(not(feature = "checkpointing"))]
155        let result = match self.mode {
156            ExecMode::Sequential => exec_seq::<T>(chain),
157            ExecMode::Parallel {
158                threads,
159                partitions,
160            } => {
161                if let Some(t) = threads {
162                    // Best-effort: first builder to install wins globally.
163                    ThreadPoolBuilder::new().num_threads(t).build_global().ok();
164                }
165                let parts = partitions
166                    .or(suggested_parts)
167                    .unwrap_or(self.default_partitions);
168                exec_par::<T>(chain, parts)
169            }
170        };
171
172        // Record end time in metrics
173        #[cfg(feature = "metrics")]
174        p.record_metrics_end();
175
176        result
177    }
178}
179
180/// Execute a fully linearized chain **sequentially**, collecting `Vec<T>`.
181///
182/// Internal helper used by [`Runner::run_collect`]. Walks the chain left->right,
183/// maintaining a single opaque `Partition` buffer.
184#[allow(clippy::too_many_lines)]
185fn exec_seq<T: 'static + Send + Sync + Clone>(chain: Vec<Node>) -> Result<Vec<T>> {
186    let mut buf: Option<Partition> = None;
187
188    let run_subplan_seq = |chain: Vec<Node>| -> Result<Vec<Partition>> {
189        let mut curr: Option<Partition> = None;
190        for node in chain {
191            curr = Some(match node {
192                Node::Source {
193                    payload, vec_ops, ..
194                } => vec_ops
195                    .clone_any(payload.as_ref())
196                    .ok_or_else(|| anyhow!("unsupported source vec type"))?,
197                Node::Stateless(ops) => ops
198                    .into_iter()
199                    .fold(curr.take().unwrap(), |acc, op| op.apply(acc)),
200                Node::GroupByKey { local, merge } => {
201                    let mid = local(curr.take().unwrap());
202                    merge(vec![mid])
203                }
204                Node::CombineValues {
205                    local_pairs,
206                    local_groups,
207                    merge,
208                } => {
209                    // choose which local to run based on the presence of local_groups
210                    let local = local_groups.map_or(local_pairs, |lg| lg);
211                    let mid = local(curr.take().unwrap());
212                    merge(vec![mid])
213                }
214                Node::Materialized(p) => Box::new(p) as Partition,
215                Node::Flatten { .. } => bail!("nested Flatten not supported in subplan"),
216                Node::CoGroup { .. } => bail!("nested CoGroup not supported in subplan"),
217                Node::CombineGlobal {
218                    local,
219                    merge,
220                    finish,
221                    ..
222                } => {
223                    let mid = local(curr.take().unwrap());
224                    let acc = merge(vec![mid]);
225                    if let Some(h) = acc.downcast_ref::<BinaryHeap<NotNan<f64>>>() {
226                        eprintln!("DEBUG: KMV heap len = {}", h.len()); // should be <= k
227                    }
228                    finish(acc)
229                }
230            });
231        }
232        Ok(vec![curr.unwrap()])
233    };
234
235    for node in chain {
236        buf = Some(match node {
237            Node::Flatten {
238                chains,
239                coalesce,
240                merge,
241            } => {
242                // Execute each subplan and coalesce its partitions
243                let mut coalesced_inputs: Vec<Partition> = Vec::new();
244                for chain in chains.iter() {
245                    let mut parts = run_subplan_seq(chain.clone())?;
246                    let single: Partition = if parts.len() == 1 {
247                        parts.pop().unwrap()
248                    } else {
249                        coalesce(parts)
250                    };
251                    coalesced_inputs.push(single);
252                }
253                // Merge all inputs into one
254                merge(coalesced_inputs)
255            }
256            Node::CoGroup {
257                left_chain,
258                right_chain,
259                coalesce_left,
260                coalesce_right,
261                exec,
262            } => {
263                // Execute left/right subplans and coalesce if they produced multiple partitions.
264                let mut left_parts = run_subplan_seq((*left_chain).clone())?;
265                let mut right_parts = run_subplan_seq((*right_chain).clone())?;
266
267                let left_single: Partition = if left_parts.len() == 1 {
268                    left_parts.pop().unwrap()
269                } else {
270                    coalesce_left(left_parts)
271                };
272                let right_single: Partition = if right_parts.len() == 1 {
273                    right_parts.pop().unwrap()
274                } else {
275                    coalesce_right(right_parts)
276                };
277
278                exec(left_single, right_single)
279            }
280            Node::Source {
281                payload, vec_ops, ..
282            } => vec_ops
283                .clone_any(payload.as_ref())
284                .ok_or_else(|| anyhow!("unsupported source vec type"))?,
285            Node::Stateless(ops) => ops
286                .into_iter()
287                .fold(buf.take().unwrap(), |acc, op| op.apply(acc)),
288            Node::GroupByKey { local, merge } => {
289                let mid = local(buf.take().unwrap());
290                merge(vec![mid])
291            }
292            Node::CombineValues {
293                local_pairs,
294                local_groups,
295                merge,
296            } => {
297                let local = local_groups.map_or(local_pairs, |lg| lg);
298                let mid = local(buf.take().unwrap());
299                merge(vec![mid])
300            }
301            // Terminal: type-check and materialize as Vec<T>
302            Node::Materialized(p) => Box::new(
303                p.downcast_ref::<Vec<T>>()
304                    .cloned()
305                    .ok_or_else(|| anyhow!("terminal type mismatch"))?,
306            ) as Partition,
307            Node::CombineGlobal {
308                local,
309                merge,
310                finish,
311                ..
312            } => {
313                let mid_acc = local(buf.take().unwrap());
314                let acc = merge(vec![mid_acc]);
315                if let Some(h) = acc.downcast_ref::<BinaryHeap<NotNan<f64>>>() {
316                    eprintln!("DEBUG: KMV heap len = {}", h.len()); // should be <= k
317                }
318                finish(acc)
319            }
320        });
321    }
322
323    let out = buf.unwrap();
324    let v = *out
325        .downcast::<Vec<T>>()
326        .map_err(|_| anyhow!("terminal type mismatch"))?;
327    Ok(v)
328}
329
330/// Execute a fully linearized chain **in parallel**, collecting `Vec<T>`.
331///
332/// Internal helper used by [`Runner::run_collect`]. Partitions the head source
333/// and applies stateless runs with rayon. Barriers (`GroupByKey`, `CombineValues`,
334/// `CoGroup`) perform a parallel local phase followed by a global merge.
335#[allow(clippy::too_many_lines)]
336fn exec_par<T: 'static + Send + Sync + Clone>(chain: &[Node], partitions: usize) -> Result<Vec<T>> {
337    /// Run a nested subplan (used by `CoGroup`) in parallel, returning a vector
338    /// of partitions. The subplan must start with a `Source`. Nested `CoGroup`
339    /// inside a subplan is not supported.
340    fn run_subplan_par(chain: &[Node], partitions: usize) -> Result<Vec<Partition>> {
341        // must start with a source
342        let (payload, vec_ops, rest) = match &chain[0] {
343            Node::Source {
344                payload, vec_ops, ..
345            } => (payload.clone(), vec_ops.clone(), &chain[1..]),
346            _ => bail!("subplan must start with a Source"),
347        };
348        let total_len = vec_ops.len(payload.as_ref()).unwrap_or(0);
349        let parts = partitions.max(1).min(total_len.max(1));
350        let mut curr = vec_ops.split(payload.as_ref(), parts).unwrap_or_else(|| {
351            vec![
352                vec_ops
353                    .clone_any(payload.as_ref())
354                    .expect("cloneable source"),
355            ]
356        });
357
358        let mut i = 0usize;
359        while i < rest.len() {
360            match &rest[i] {
361                Node::Stateless(_) => {
362                    let mut ops = Vec::new();
363                    while i < rest.len() {
364                        if let Node::Stateless(more) = &rest[i] {
365                            ops.extend(more.iter().cloned());
366                            i += 1;
367                        } else {
368                            break;
369                        }
370                    }
371                    curr = curr
372                        .into_par_iter()
373                        .map(|p| ops.iter().fold(p, |acc, op| op.apply(acc)))
374                        .collect();
375                }
376                Node::GroupByKey { local, merge } => {
377                    let mids: Vec<Partition> = curr.into_par_iter().map(|p| local(p)).collect();
378                    curr = vec![merge(mids)];
379                    i += 1;
380                }
381                Node::CombineValues {
382                    local_pairs,
383                    local_groups,
384                    merge,
385                } => {
386                    let local = local_groups
387                        .as_ref()
388                        .map_or_else(|| local_pairs.clone(), |lg| lg.clone());
389                    let mids: Vec<Partition> = curr.into_par_iter().map(|p| local(p)).collect();
390                    curr = vec![merge(mids)];
391                    i += 1;
392                }
393                Node::Source { .. } | Node::Materialized(_) => {
394                    bail!("unexpected source/materialized in subplan")
395                }
396                Node::Flatten { .. } => bail!("nested Flatten not supported in subplan"),
397                Node::CoGroup { .. } => bail!("nested CoGroup not supported in subplan"),
398                Node::CombineGlobal {
399                    local,
400                    merge,
401                    finish,
402                    fanout,
403                } => {
404                    // local on each partition -> Vec<A> (type-erased)
405                    let mut accs: Vec<Partition> = curr.into_par_iter().map(|p| local(p)).collect();
406
407                    // multi-round merge with optional fanout, no cloning
408                    let f = fanout.unwrap_or(usize::MAX).max(1);
409                    while accs.len() > 1 {
410                        if f == usize::MAX {
411                            accs = vec![merge(accs)];
412                            break;
413                        }
414                        let mut next: Vec<Partition> = Vec::with_capacity(accs.len().div_ceil(f));
415                        let mut it = accs.into_iter(); // take ownership to avoid clones
416                        loop {
417                            let mut group: Vec<Partition> = Vec::with_capacity(f);
418                            for _ in 0..f {
419                                if let Some(p) = it.next() {
420                                    group.push(p);
421                                } else {
422                                    break;
423                                }
424                            }
425                            if group.is_empty() {
426                                break;
427                            }
428                            next.push(merge(group));
429                        }
430                        accs = next;
431                    }
432
433                    let acc = accs.pop().unwrap_or_else(|| merge(Vec::new()));
434                    if let Some(h) = acc.downcast_ref::<BinaryHeap<NotNan<f64>>>() {
435                        eprintln!("DEBUG: KMV heap len = {}", h.len()); // should be <= k
436                    }
437                    curr = vec![finish(acc)];
438                    i += 1;
439                }
440            }
441        }
442        Ok(curr)
443    }
444
445    // Original head Source
446    let (payload, vec_ops, rest) = match &chain[0] {
447        Node::Source {
448            payload, vec_ops, ..
449        } => (Arc::clone(payload), Arc::clone(vec_ops), &chain[1..]),
450        _ => bail!("execution plan must start with a Source node"),
451    };
452
453    let total_len = vec_ops.len(payload.as_ref()).unwrap_or(0);
454    let parts = partitions.max(1).min(total_len.max(1));
455    let mut curr = vec_ops.split(payload.as_ref(), parts).unwrap_or_else(|| {
456        vec![
457            vec_ops
458                .clone_any(payload.as_ref())
459                .expect("cloneable source"),
460        ]
461    });
462
463    let mut i = 0usize;
464    while i < rest.len() {
465        match &rest[i] {
466            Node::Stateless(_) => {
467                let mut ops = Vec::new();
468                while i < rest.len() {
469                    if let Node::Stateless(more) = &rest[i] {
470                        ops.extend(more.iter().cloned());
471                        i += 1;
472                    } else {
473                        break;
474                    }
475                }
476                curr = curr
477                    .into_par_iter()
478                    .map(|p| ops.iter().fold(p, |acc, op| op.apply(acc)))
479                    .collect();
480            }
481            Node::GroupByKey { local, merge } => {
482                let mids: Vec<Partition> = curr.into_par_iter().map(|p| local(p)).collect();
483                curr = vec![merge(mids)];
484                i += 1;
485            }
486            Node::CombineValues {
487                local_pairs,
488                local_groups,
489                merge,
490            } => {
491                let local = local_groups
492                    .as_ref()
493                    .map_or_else(|| local_pairs.clone(), |lg| lg.clone());
494                let mids: Vec<Partition> = curr.into_par_iter().map(|p| local(p)).collect();
495                curr = vec![merge(mids)];
496                i += 1;
497            }
498            Node::Flatten {
499                chains,
500                coalesce,
501                merge,
502            } => {
503                // Execute each subplan in parallel and coalesce
504                let coalesced_inputs: Vec<Partition> = chains
505                    .iter()
506                    .map(|chain| {
507                        let parts = run_subplan_par(chain, partitions)?;
508                        Ok(if parts.len() == 1 {
509                            parts.into_iter().next().unwrap()
510                        } else {
511                            coalesce(parts)
512                        })
513                    })
514                    .collect::<Result<Vec<_>>>()?;
515                curr = vec![merge(coalesced_inputs)];
516                i += 1;
517            }
518            Node::CoGroup {
519                left_chain,
520                right_chain,
521                coalesce_left,
522                coalesce_right,
523                exec,
524            } => {
525                // Execute left/right subplans in parallel; coalesce when necessary.
526                let left_parts = run_subplan_par(&(**left_chain).clone(), partitions)?;
527                let right_parts = run_subplan_par(&(**right_chain).clone(), partitions)?;
528
529                let left_single = if left_parts.len() == 1 {
530                    left_parts.into_iter().next().unwrap()
531                } else {
532                    coalesce_left(left_parts)
533                };
534                let right_single = if right_parts.len() == 1 {
535                    right_parts.into_iter().next().unwrap()
536                } else {
537                    coalesce_right(right_parts)
538                };
539
540                curr = vec![exec(left_single, right_single)];
541                i += 1;
542            }
543            Node::Source { .. } | Node::Materialized(_) => {
544                bail!("unexpected additional source/materialized")
545            }
546            Node::CombineGlobal {
547                local,
548                merge,
549                finish,
550                fanout,
551            } => {
552                let mut accs: Vec<Partition> = curr.into_par_iter().map(|p| local(p)).collect();
553
554                let f = fanout.unwrap_or(usize::MAX).max(1);
555                while accs.len() > 1 {
556                    if f == usize::MAX {
557                        accs = vec![merge(accs)];
558                        break;
559                    }
560                    let mut next: Vec<Partition> = Vec::with_capacity(accs.len().div_ceil(f));
561                    let mut it = accs.into_iter();
562                    loop {
563                        let mut group: Vec<Partition> = Vec::with_capacity(f);
564                        for _ in 0..f {
565                            if let Some(p) = it.next() {
566                                group.push(p);
567                            } else {
568                                break;
569                            }
570                        }
571                        if group.is_empty() {
572                            break;
573                        }
574                        next.push(merge(group));
575                    }
576                    accs = next;
577                }
578
579                let acc = accs.pop().unwrap_or_else(|| merge(Vec::new()));
580                if let Some(h) = acc.downcast_ref::<BinaryHeap<NotNan<f64>>>() {
581                    eprintln!("DEBUG: KMV heap len = {}", h.len()); // should be <= k
582                }
583                curr = vec![finish(acc)];
584                i += 1;
585            }
586        }
587    }
588
589    // Terminal collection
590    if curr.len() == 1 {
591        let one = curr.into_iter().next().unwrap();
592        let v = *one
593            .downcast::<Vec<T>>()
594            .map_err(|_| anyhow!("terminal type mismatch"))?;
595        Ok(v)
596    } else {
597        let mut out = Vec::<T>::new();
598        for part in curr {
599            let v = *part
600                .downcast::<Vec<T>>()
601                .map_err(|_| anyhow!("terminal type mismatch"))?;
602            out.extend(v);
603        }
604        Ok(out)
605    }
606}
607
608/// Execute a fully linearized chain **sequentially** with checkpointing support.
609///
610/// This version saves checkpoint state at configurable intervals and can resume
611/// from the last checkpoint on failure. Note: Due to type-erasure, we cannot
612/// serialize intermediate partition state, so checkpoints track progress only.
613/// On recovery, the pipeline re-executes from the start but logs progress.
614#[cfg(feature = "checkpointing")]
615#[allow(clippy::too_many_lines)]
616fn exec_seq_with_checkpointing<T: 'static + Send + Sync + Clone>(
617    chain: Vec<Node>,
618    config: CheckpointConfig,
619) -> Result<Vec<T>> {
620    use crate::checkpoint::{
621        CheckpointManager, CheckpointMetadata, CheckpointState, compute_checksum,
622        current_timestamp_ms, generate_pipeline_id,
623    };
624
625    let total_nodes = chain.len();
626    let mut manager = CheckpointManager::new(config)?;
627
628    // Generate pipeline ID from chain structure
629    let pipeline_id = generate_pipeline_id(&format!("{:?}", chain.len()));
630
631    // Check for existing checkpoint if auto-recovery is enabled
632    if manager.config.auto_recover
633        && let Some(checkpoint_path) = manager.find_latest_checkpoint(&pipeline_id)?
634    {
635        eprintln!("[Checkpoint] Found existing checkpoint, attempting recovery...");
636        match manager.load_checkpoint(&checkpoint_path) {
637            Ok(state) => {
638                eprintln!(
639                    "[Checkpoint] Recovered from node {} ({:.0}% complete)",
640                    state.completed_node_index, state.metadata.progress_percent
641                );
642                // Note: We still re-execute from the start due to type-erasure limitations.
643                // But we log that we're resuming from a previous run
644            }
645            Err(e) => {
646                eprintln!("[Checkpoint] Failed to load checkpoint: {e}");
647            }
648        }
649    }
650
651    // Execute the chain with checkpointing
652    let mut buf: Option<Partition> = None;
653
654    for (idx, node) in chain.into_iter().enumerate() {
655        let is_barrier = matches!(
656            node,
657            Node::GroupByKey { .. }
658                | Node::CombineValues { .. }
659                | Node::Flatten { .. }
660                | Node::CoGroup { .. }
661                | Node::CombineGlobal { .. }
662        );
663
664        let node_type = match &node {
665            Node::Source { .. } => "Source",
666            Node::Stateless(_) => "Stateless",
667            Node::GroupByKey { .. } => "GroupByKey",
668            Node::CombineValues { .. } => "CombineValues",
669            Node::Flatten { .. } => "Flatten",
670            Node::CoGroup { .. } => "CoGroup",
671            Node::Materialized(_) => "Materialized",
672            Node::CombineGlobal { .. } => "CombineGlobal",
673        };
674
675        // Execute the node (same logic as exec_seq)
676        buf = Some(match node {
677            Node::Source {
678                payload, vec_ops, ..
679            } => vec_ops
680                .clone_any(payload.as_ref())
681                .ok_or_else(|| anyhow!("unsupported source vec type"))?,
682            Node::Stateless(ops) => ops
683                .into_iter()
684                .fold(buf.take().unwrap(), |acc, op| op.apply(acc)),
685            Node::GroupByKey { local, merge } => {
686                let mid = local(buf.take().unwrap());
687                merge(vec![mid])
688            }
689            Node::CombineValues {
690                local_pairs,
691                local_groups,
692                merge,
693            } => {
694                let local = local_groups.map_or(local_pairs, |lg| lg);
695                let mid = local(buf.take().unwrap());
696                merge(vec![mid])
697            }
698            Node::Materialized(p) => Box::new(
699                p.downcast_ref::<Vec<T>>()
700                    .cloned()
701                    .ok_or_else(|| anyhow!("terminal type mismatch"))?,
702            ) as Partition,
703            Node::Flatten { .. } => bail!("Flatten requires subplan execution"),
704            Node::CoGroup { .. } => bail!("CoGroup requires subplan execution"),
705            Node::CombineGlobal {
706                local,
707                merge,
708                finish,
709                ..
710            } => {
711                let mid_acc = local(buf.take().unwrap());
712                let acc = merge(vec![mid_acc]);
713                finish(acc)
714            }
715        });
716
717        // Check if we should create a checkpoint
718        if manager.should_checkpoint(idx, is_barrier, total_nodes) {
719            let timestamp = current_timestamp_ms();
720            #[allow(
721                clippy::cast_possible_truncation,
722                clippy::cast_sign_loss,
723                clippy::cast_precision_loss
724            )]
725            let progress_percent = ((idx as f64 / total_nodes as f64) * 100.0) as u8;
726            let metadata_str = format!("{pipeline_id}:{idx}:{timestamp}:1");
727            let checksum = compute_checksum(metadata_str.as_bytes());
728
729            let state = CheckpointState {
730                pipeline_id: pipeline_id.clone(),
731                completed_node_index: idx,
732                timestamp,
733                partition_count: 1,
734                checksum,
735                exec_mode: "sequential".to_string(),
736                metadata: CheckpointMetadata {
737                    total_nodes,
738                    last_node_type: node_type.to_string(),
739                    progress_percent,
740                },
741            };
742
743            match manager.save_checkpoint(&state) {
744                Ok(path) => {
745                    eprintln!(
746                        "[Checkpoint] Saved checkpoint at node {idx} ({:.0}% complete) to {:?}",
747                        progress_percent,
748                        path.display()
749                    );
750                }
751                Err(e) => {
752                    eprintln!("[Checkpoint] Warning: Failed to save checkpoint: {e}");
753                }
754            }
755        }
756    }
757
758    let out = buf.unwrap();
759    let v = *out
760        .downcast::<Vec<T>>()
761        .map_err(|_| anyhow!("terminal type mismatch"))?;
762
763    // Clean up checkpoints on successful completion
764    manager.clear_checkpoints(&pipeline_id).ok();
765    eprintln!("[Checkpoint] Pipeline completed successfully, checkpoints cleared");
766
767    Ok(v)
768}
769
770/// Execute a fully linearized chain **in parallel** with checkpointing support.
771#[cfg(feature = "checkpointing")]
772fn exec_par_with_checkpointing<T: 'static + Send + Sync + Clone>(
773    chain: &[Node],
774    partitions: usize,
775    config: CheckpointConfig,
776) -> Result<Vec<T>> {
777    use crate::checkpoint::{
778        CheckpointManager, CheckpointMetadata, CheckpointState, compute_checksum,
779        current_timestamp_ms, generate_pipeline_id,
780    };
781
782    let total_nodes = chain.len();
783    let mut manager = CheckpointManager::new(config)?;
784
785    // Generate pipeline ID
786    let pipeline_id = generate_pipeline_id(&format!("{:?}:{}", chain.len(), partitions));
787
788    // Check for existing checkpoint
789    if manager.config.auto_recover
790        && let Some(checkpoint_path) = manager.find_latest_checkpoint(&pipeline_id)?
791    {
792        eprintln!("[Checkpoint] Found existing checkpoint, attempting recovery...");
793        match manager.load_checkpoint(&checkpoint_path) {
794            Ok(state) => {
795                eprintln!(
796                    "[Checkpoint] Recovered from node {} ({:.0}% complete)",
797                    state.completed_node_index, state.metadata.progress_percent
798                );
799            }
800            Err(e) => {
801                eprintln!("[Checkpoint] Failed to load checkpoint: {e}");
802            }
803        }
804    }
805
806    // Execute with checkpointing (simplified: checkpoint after major barriers only)
807    // Due to parallel execution complexity, we use the standard exec_par and checkpoint
808    // at coarser granularity
809    let result = exec_par::<T>(chain, partitions);
810
811    // On success, clear checkpoints
812    if result.is_ok() {
813        manager.clear_checkpoints(&pipeline_id).ok();
814        eprintln!("[Checkpoint] Pipeline completed successfully, checkpoints cleared");
815    } else {
816        // On failure, save a checkpoint indicating the failure point
817        let timestamp = current_timestamp_ms();
818        let metadata_str = format!("{pipeline_id}:0:{timestamp}:{partitions}");
819        let checksum = compute_checksum(metadata_str.as_bytes());
820        let state = CheckpointState {
821            pipeline_id,
822            completed_node_index: 0,
823            timestamp,
824            partition_count: partitions,
825            checksum,
826            exec_mode: format!("parallel:{partitions}"),
827            metadata: CheckpointMetadata {
828                total_nodes,
829                last_node_type: "Failed".to_string(),
830                progress_percent: 0,
831            },
832        };
833        manager.save_checkpoint(&state).ok();
834    }
835
836    result
837}