checkpointing_demo/
checkpointing_demo.rs1use anyhow::Result;
14use ironbeam::{ExecMode, Pipeline, Runner, Sum, from_vec};
15
16#[cfg(feature = "checkpointing")]
17use ironbeam::checkpoint::{CheckpointConfig, CheckpointPolicy};
18
19#[cfg(feature = "checkpointing")]
20fn main() -> Result<()> {
21 println!("=== Ironbeam Checkpointing Demo ===\n");
22
23 let p = Pipeline::default();
25
26 println!("Building pipeline with 100,000 records...");
27 let data = from_vec(&p, (0..100_000).collect::<Vec<i32>>());
28
29 let transformed = data
31 .map(|x: &i32| x * 2)
32 .filter(|x: &i32| x % 3 == 0)
33 .key_by(|x: &i32| x % 100) .map_values(|x: &i32| x / 2)
35 .combine_values(Sum::<i32>::default()); let checkpoint_config = CheckpointConfig {
39 enabled: true,
40 directory: "./Ironbeam_checkpoints".into(),
41 policy: CheckpointPolicy::AfterEveryBarrier,
42 auto_recover: true,
43 max_checkpoints: Some(5),
44 };
45
46 println!("\nCheckpoint Configuration:");
47 println!(" Directory: {:?}", checkpoint_config.directory.display());
48 println!(" Policy: AfterEveryBarrier");
49 println!(" Auto-recovery: enabled");
50 println!(" Max checkpoints: 5");
51
52 let runner = Runner {
54 mode: ExecMode::Sequential,
55 checkpoint_config: Some(checkpoint_config),
56 ..Default::default()
57 };
58
59 println!("\nExecuting pipeline with checkpointing...");
60 println!("(Watch for checkpoint messages during execution)\n");
61
62 let result = runner.run_collect::<(i32, i32)>(&p, transformed.node_id())?;
64
65 println!("\n=== Execution Complete ===");
66 println!("Total results: {}", result.len());
67 println!("Sample results (first 10):");
68 for (k, v) in result.iter().take(10) {
69 println!(" Key {k}: Sum = {v}");
70 }
71
72 println!("\n=== Checkpointing Summary ===");
73 println!("Checkpoints were automatically created after each barrier node.");
74 println!("If this pipeline had failed, it could resume from the last checkpoint.");
75 println!("Since execution completed successfully, all checkpoints were cleaned up.");
76
77 Ok(())
78}
79
80#[cfg(not(feature = "checkpointing"))]
81fn main() {
82 println!("This example requires the 'checkpointing' feature.");
83 println!("Run with: cargo run --example checkpointing_demo --features checkpointing");
84}