Skip to main content

checkpointing_demo/
checkpointing_demo.rs

1//! Demonstration of automatic checkpointing for fault tolerance.
2//!
3//! This example shows how to use Ironbeam's checkpointing feature to make
4//! long-running batch jobs resilient to failures. Checkpoints are automatically
5//! created at configurable intervals, and the pipeline can resume from the last
6//! checkpoint on restart.
7//!
8//! Run with:
9//! ```bash
10//! cargo run --example checkpointing_demo --features checkpointing
11//! ```
12
13use anyhow::Result;
14use ironbeam::{ExecMode, Pipeline, Runner, Sum, from_vec};
15
16#[cfg(feature = "checkpointing")]
17use ironbeam::checkpoint::{CheckpointConfig, CheckpointPolicy};
18
19#[cfg(feature = "checkpointing")]
20fn main() -> Result<()> {
21    println!("=== Ironbeam Checkpointing Demo ===\n");
22
23    // Create a pipeline with a large dataset to simulate long-running jobs
24    let p = Pipeline::default();
25
26    println!("Building pipeline with 100,000 records...");
27    let data = from_vec(&p, (0..100_000).collect::<Vec<i32>>());
28
29    // Perform some transformations that would take time in a real scenario
30    let transformed = data
31        .map(|x: &i32| x * 2)
32        .filter(|x: &i32| x % 3 == 0)
33        .key_by(|x: &i32| x % 100) // Group by modulo 100
34        .map_values(|x: &i32| x / 2)
35        .combine_values(Sum::<i32>::default()); // Sum values per key
36
37    // Configure checkpointing
38    let checkpoint_config = CheckpointConfig {
39        enabled: true,
40        directory: "./Ironbeam_checkpoints".into(),
41        policy: CheckpointPolicy::AfterEveryBarrier,
42        auto_recover: true,
43        max_checkpoints: Some(5),
44    };
45
46    println!("\nCheckpoint Configuration:");
47    println!("  Directory: {:?}", checkpoint_config.directory.display());
48    println!("  Policy: AfterEveryBarrier");
49    println!("  Auto-recovery: enabled");
50    println!("  Max checkpoints: 5");
51
52    // Create a runner with checkpointing
53    let runner = Runner {
54        mode: ExecMode::Sequential,
55        checkpoint_config: Some(checkpoint_config),
56        ..Default::default()
57    };
58
59    println!("\nExecuting pipeline with checkpointing...");
60    println!("(Watch for checkpoint messages during execution)\n");
61
62    // Execute the pipeline
63    let result = runner.run_collect::<(i32, i32)>(&p, transformed.node_id())?;
64
65    println!("\n=== Execution Complete ===");
66    println!("Total results: {}", result.len());
67    println!("Sample results (first 10):");
68    for (k, v) in result.iter().take(10) {
69        println!("  Key {k}: Sum = {v}");
70    }
71
72    println!("\n=== Checkpointing Summary ===");
73    println!("Checkpoints were automatically created after each barrier node.");
74    println!("If this pipeline had failed, it could resume from the last checkpoint.");
75    println!("Since execution completed successfully, all checkpoints were cleaned up.");
76
77    Ok(())
78}
79
80#[cfg(not(feature = "checkpointing"))]
81fn main() {
82    println!("This example requires the 'checkpointing' feature.");
83    println!("Run with: cargo run --example checkpointing_demo --features checkpointing");
84}