pub struct Runner {
pub mode: ExecMode,
pub default_partitions: usize,
pub checkpoint_config: Option<CheckpointConfig>,
}Expand description
Executes a pipeline produced by the builder API.
Construct a Runner and call Runner::run_collect with a pipeline and
terminal node id. See helpers for higher-level collect_* convenience
methods that build a Runner for you.
Fields§
§mode: ExecModeSelected execution mode.
default_partitions: usizeDefault partition count when neither the caller nor the planner suggests one.
checkpoint_config: Option<CheckpointConfig>Available on crate feature
checkpointing only.Optional checkpoint configuration for fault tolerance.
Implementations§
Source§impl Runner
impl Runner
Sourcepub fn run_collect<T: 'static + Send + Sync + Clone>(
&self,
p: &Pipeline,
terminal: NodeId,
) -> Result<Vec<T>>
pub fn run_collect<T: 'static + Send + Sync + Clone>( &self, p: &Pipeline, terminal: NodeId, ) -> Result<Vec<T>>
Execute the pipeline ending at terminal, collecting the terminal
vector as Vec<T>.
This function:
- Builds an optimized plan with the planner.
- Chooses sequential or parallel engine based on
self.mode. - Honors planner’s suggested partitioning unless overridden.
§Errors
An error is returned if the plan is malformed (e.g., a missing source),
if a node encounters an unexpected input type, or if the terminal
materialized type does not match T.
§Panics
If the pipeline is in an inconsistent state, such as during concurrent modifications.
Examples found in repository?
examples/checkpoint_recovery_demo.rs (line 50)
21fn run_with_time_based_checkpoints() -> Result<()> {
22 println!("=== Time-Based Checkpointing ===\n");
23
24 let p = Pipeline::default();
25 let data = from_vec(&p, (0..50_000).collect::<Vec<i32>>());
26
27 let result_collection = data
28 .map(|x: &i32| x * 2)
29 .filter(|x: &i32| x % 5 == 0)
30 .key_by(|x: &i32| x % 50)
31 .combine_values(Count);
32
33 let checkpoint_config = CheckpointConfig {
34 enabled: true,
35 directory: "./checkpoints_time_based".into(),
36 policy: CheckpointPolicy::TimeInterval(2), // Every 2 seconds
37 auto_recover: true,
38 max_checkpoints: Some(3),
39 };
40
41 println!("Policy: Checkpoint every 2 seconds");
42 println!("Max checkpoints: 3 (oldest will be deleted)\n");
43
44 let runner = Runner {
45 mode: ExecMode::Sequential,
46 checkpoint_config: Some(checkpoint_config),
47 ..Default::default()
48 };
49
50 let result = runner.run_collect::<(i32, u64)>(&p, result_collection.node_id())?;
51
52 println!("\nCompleted! Results: {} groups", result.len());
53 Ok(())
54}
55
56#[cfg(feature = "checkpointing")]
57fn run_with_node_based_checkpoints() -> Result<()> {
58 println!("=== Node-Based Checkpointing ===\n");
59
60 let p = Pipeline::default();
61 let data = from_vec(&p, (0..30_000).collect::<Vec<i32>>());
62
63 let result_collection = data
64 .map(|x: &i32| x + 1)
65 .map(|x: &i32| x * 3)
66 .filter(|x: &i32| *x > 100)
67 .key_by(|x: &i32| x % 20)
68 .map_values(|x: &i32| f64::from(*x))
69 .combine_values(AverageF64);
70
71 let checkpoint_config = CheckpointConfig {
72 enabled: true,
73 directory: "./checkpoints_node_based".into(),
74 policy: CheckpointPolicy::EveryNNodes(2), // Every 2 nodes
75 auto_recover: true,
76 max_checkpoints: Some(5),
77 };
78
79 println!("Policy: Checkpoint every 2 nodes");
80 println!("This creates more frequent checkpoints for fine-grained recovery\n");
81
82 let runner = Runner {
83 mode: ExecMode::Sequential,
84 checkpoint_config: Some(checkpoint_config),
85 ..Default::default()
86 };
87
88 let result = runner.run_collect::<(i32, f64)>(&p, result_collection.node_id())?;
89
90 println!("\nCompleted! Results: {} groups", result.len());
91 println!("Sample averages:");
92 for (k, v) in result.iter().take(5) {
93 println!(" Key {k}: Average = {v:.2}");
94 }
95 Ok(())
96}
97
98#[cfg(feature = "checkpointing")]
99fn run_with_hybrid_checkpoints() -> Result<()> {
100 println!("=== Hybrid Checkpointing ===\n");
101
102 let p = Pipeline::default();
103 let data = from_vec(&p, (0..40_000).collect::<Vec<i32>>());
104
105 let result_collection = data
106 .map(|x: &i32| x % 1000)
107 .key_by(|x: &i32| *x)
108 .map_values(|_: &i32| 1u64)
109 .combine_values(Sum::<u64>::default())
110 .map(|(k, v): &(i32, u64)| (k / 10, *v))
111 .key_by(|(k, _): &(i32, u64)| *k)
112 .map_values(|(_, v): &(i32, u64)| *v)
113 .combine_values(Sum::<u64>::default());
114
115 let checkpoint_config = CheckpointConfig {
116 enabled: true,
117 directory: "./checkpoints_hybrid".into(),
118 policy: CheckpointPolicy::Hybrid {
119 barriers: true,
120 interval_secs: 3,
121 },
122 auto_recover: true,
123 max_checkpoints: Some(10),
124 };
125
126 println!("Policy: Checkpoint after barriers OR every 3 seconds");
127 println!("This provides the most aggressive checkpointing\n");
128
129 let runner = Runner {
130 mode: ExecMode::Sequential,
131 checkpoint_config: Some(checkpoint_config),
132 ..Default::default()
133 };
134
135 let result = runner.run_collect::<(i32, u64)>(&p, result_collection.node_id())?;
136
137 println!("\nCompleted! Results: {} groups", result.len());
138 Ok(())
139}More examples
examples/checkpointing_demo.rs (line 63)
20fn main() -> Result<()> {
21 println!("=== Ironbeam Checkpointing Demo ===\n");
22
23 // Create a pipeline with a large dataset to simulate long-running jobs
24 let p = Pipeline::default();
25
26 println!("Building pipeline with 100,000 records...");
27 let data = from_vec(&p, (0..100_000).collect::<Vec<i32>>());
28
29 // Perform some transformations that would take time in a real scenario
30 let transformed = data
31 .map(|x: &i32| x * 2)
32 .filter(|x: &i32| x % 3 == 0)
33 .key_by(|x: &i32| x % 100) // Group by modulo 100
34 .map_values(|x: &i32| x / 2)
35 .combine_values(Sum::<i32>::default()); // Sum values per key
36
37 // Configure checkpointing
38 let checkpoint_config = CheckpointConfig {
39 enabled: true,
40 directory: "./Ironbeam_checkpoints".into(),
41 policy: CheckpointPolicy::AfterEveryBarrier,
42 auto_recover: true,
43 max_checkpoints: Some(5),
44 };
45
46 println!("\nCheckpoint Configuration:");
47 println!(" Directory: {:?}", checkpoint_config.directory.display());
48 println!(" Policy: AfterEveryBarrier");
49 println!(" Auto-recovery: enabled");
50 println!(" Max checkpoints: 5");
51
52 // Create a runner with checkpointing
53 let runner = Runner {
54 mode: ExecMode::Sequential,
55 checkpoint_config: Some(checkpoint_config),
56 ..Default::default()
57 };
58
59 println!("\nExecuting pipeline with checkpointing...");
60 println!("(Watch for checkpoint messages during execution)\n");
61
62 // Execute the pipeline
63 let result = runner.run_collect::<(i32, i32)>(&p, transformed.node_id())?;
64
65 println!("\n=== Execution Complete ===");
66 println!("Total results: {}", result.len());
67 println!("Sample results (first 10):");
68 for (k, v) in result.iter().take(10) {
69 println!(" Key {k}: Sum = {v}");
70 }
71
72 println!("\n=== Checkpointing Summary ===");
73 println!("Checkpoints were automatically created after each barrier node.");
74 println!("If this pipeline had failed, it could resume from the last checkpoint.");
75 println!("Since execution completed successfully, all checkpoints were cleaned up.");
76
77 Ok(())
78}Trait Implementations§
Auto Trait Implementations§
impl Freeze for Runner
impl RefUnwindSafe for Runner
impl Send for Runner
impl Sync for Runner
impl Unpin for Runner
impl UnsafeUnpin for Runner
impl UnwindSafe for Runner
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more