Skip to main content

Runner

Struct Runner 

Source
pub struct Runner {
    pub mode: ExecMode,
    pub default_partitions: usize,
    pub checkpoint_config: Option<CheckpointConfig>,
}
Expand description

Executes a pipeline produced by the builder API.

Construct a Runner and call Runner::run_collect with a pipeline and terminal node id. See helpers for higher-level collect_* convenience methods that build a Runner for you.

Fields§

§mode: ExecMode

Selected execution mode.

§default_partitions: usize

Default partition count when neither the caller nor the planner suggests one.

§checkpoint_config: Option<CheckpointConfig>
Available on crate feature checkpointing only.

Optional checkpoint configuration for fault tolerance.

Implementations§

Source§

impl Runner

Source

pub fn run_collect<T: 'static + Send + Sync + Clone>( &self, p: &Pipeline, terminal: NodeId, ) -> Result<Vec<T>>

Execute the pipeline ending at terminal, collecting the terminal vector as Vec<T>.

This function:

  1. Builds an optimized plan with the planner.
  2. Chooses sequential or parallel engine based on self.mode.
  3. Honors planner’s suggested partitioning unless overridden.
§Errors

An error is returned if the plan is malformed (e.g., a missing source), if a node encounters an unexpected input type, or if the terminal materialized type does not match T.

§Panics

If the pipeline is in an inconsistent state, such as during concurrent modifications.

Examples found in repository?
examples/checkpoint_recovery_demo.rs (line 50)
21fn run_with_time_based_checkpoints() -> Result<()> {
22    println!("=== Time-Based Checkpointing ===\n");
23
24    let p = Pipeline::default();
25    let data = from_vec(&p, (0..50_000).collect::<Vec<i32>>());
26
27    let result_collection = data
28        .map(|x: &i32| x * 2)
29        .filter(|x: &i32| x % 5 == 0)
30        .key_by(|x: &i32| x % 50)
31        .combine_values(Count);
32
33    let checkpoint_config = CheckpointConfig {
34        enabled: true,
35        directory: "./checkpoints_time_based".into(),
36        policy: CheckpointPolicy::TimeInterval(2), // Every 2 seconds
37        auto_recover: true,
38        max_checkpoints: Some(3),
39    };
40
41    println!("Policy: Checkpoint every 2 seconds");
42    println!("Max checkpoints: 3 (oldest will be deleted)\n");
43
44    let runner = Runner {
45        mode: ExecMode::Sequential,
46        checkpoint_config: Some(checkpoint_config),
47        ..Default::default()
48    };
49
50    let result = runner.run_collect::<(i32, u64)>(&p, result_collection.node_id())?;
51
52    println!("\nCompleted! Results: {} groups", result.len());
53    Ok(())
54}
55
56#[cfg(feature = "checkpointing")]
57fn run_with_node_based_checkpoints() -> Result<()> {
58    println!("=== Node-Based Checkpointing ===\n");
59
60    let p = Pipeline::default();
61    let data = from_vec(&p, (0..30_000).collect::<Vec<i32>>());
62
63    let result_collection = data
64        .map(|x: &i32| x + 1)
65        .map(|x: &i32| x * 3)
66        .filter(|x: &i32| *x > 100)
67        .key_by(|x: &i32| x % 20)
68        .map_values(|x: &i32| f64::from(*x))
69        .combine_values(AverageF64);
70
71    let checkpoint_config = CheckpointConfig {
72        enabled: true,
73        directory: "./checkpoints_node_based".into(),
74        policy: CheckpointPolicy::EveryNNodes(2), // Every 2 nodes
75        auto_recover: true,
76        max_checkpoints: Some(5),
77    };
78
79    println!("Policy: Checkpoint every 2 nodes");
80    println!("This creates more frequent checkpoints for fine-grained recovery\n");
81
82    let runner = Runner {
83        mode: ExecMode::Sequential,
84        checkpoint_config: Some(checkpoint_config),
85        ..Default::default()
86    };
87
88    let result = runner.run_collect::<(i32, f64)>(&p, result_collection.node_id())?;
89
90    println!("\nCompleted! Results: {} groups", result.len());
91    println!("Sample averages:");
92    for (k, v) in result.iter().take(5) {
93        println!("  Key {k}: Average = {v:.2}");
94    }
95    Ok(())
96}
97
98#[cfg(feature = "checkpointing")]
99fn run_with_hybrid_checkpoints() -> Result<()> {
100    println!("=== Hybrid Checkpointing ===\n");
101
102    let p = Pipeline::default();
103    let data = from_vec(&p, (0..40_000).collect::<Vec<i32>>());
104
105    let result_collection = data
106        .map(|x: &i32| x % 1000)
107        .key_by(|x: &i32| *x)
108        .map_values(|_: &i32| 1u64)
109        .combine_values(Sum::<u64>::default())
110        .map(|(k, v): &(i32, u64)| (k / 10, *v))
111        .key_by(|(k, _): &(i32, u64)| *k)
112        .map_values(|(_, v): &(i32, u64)| *v)
113        .combine_values(Sum::<u64>::default());
114
115    let checkpoint_config = CheckpointConfig {
116        enabled: true,
117        directory: "./checkpoints_hybrid".into(),
118        policy: CheckpointPolicy::Hybrid {
119            barriers: true,
120            interval_secs: 3,
121        },
122        auto_recover: true,
123        max_checkpoints: Some(10),
124    };
125
126    println!("Policy: Checkpoint after barriers OR every 3 seconds");
127    println!("This provides the most aggressive checkpointing\n");
128
129    let runner = Runner {
130        mode: ExecMode::Sequential,
131        checkpoint_config: Some(checkpoint_config),
132        ..Default::default()
133    };
134
135    let result = runner.run_collect::<(i32, u64)>(&p, result_collection.node_id())?;
136
137    println!("\nCompleted! Results: {} groups", result.len());
138    Ok(())
139}
More examples
Hide additional examples
examples/checkpointing_demo.rs (line 63)
20fn main() -> Result<()> {
21    println!("=== Ironbeam Checkpointing Demo ===\n");
22
23    // Create a pipeline with a large dataset to simulate long-running jobs
24    let p = Pipeline::default();
25
26    println!("Building pipeline with 100,000 records...");
27    let data = from_vec(&p, (0..100_000).collect::<Vec<i32>>());
28
29    // Perform some transformations that would take time in a real scenario
30    let transformed = data
31        .map(|x: &i32| x * 2)
32        .filter(|x: &i32| x % 3 == 0)
33        .key_by(|x: &i32| x % 100) // Group by modulo 100
34        .map_values(|x: &i32| x / 2)
35        .combine_values(Sum::<i32>::default()); // Sum values per key
36
37    // Configure checkpointing
38    let checkpoint_config = CheckpointConfig {
39        enabled: true,
40        directory: "./Ironbeam_checkpoints".into(),
41        policy: CheckpointPolicy::AfterEveryBarrier,
42        auto_recover: true,
43        max_checkpoints: Some(5),
44    };
45
46    println!("\nCheckpoint Configuration:");
47    println!("  Directory: {:?}", checkpoint_config.directory.display());
48    println!("  Policy: AfterEveryBarrier");
49    println!("  Auto-recovery: enabled");
50    println!("  Max checkpoints: 5");
51
52    // Create a runner with checkpointing
53    let runner = Runner {
54        mode: ExecMode::Sequential,
55        checkpoint_config: Some(checkpoint_config),
56        ..Default::default()
57    };
58
59    println!("\nExecuting pipeline with checkpointing...");
60    println!("(Watch for checkpoint messages during execution)\n");
61
62    // Execute the pipeline
63    let result = runner.run_collect::<(i32, i32)>(&p, transformed.node_id())?;
64
65    println!("\n=== Execution Complete ===");
66    println!("Total results: {}", result.len());
67    println!("Sample results (first 10):");
68    for (k, v) in result.iter().take(10) {
69        println!("  Key {k}: Sum = {v}");
70    }
71
72    println!("\n=== Checkpointing Summary ===");
73    println!("Checkpoints were automatically created after each barrier node.");
74    println!("If this pipeline had failed, it could resume from the last checkpoint.");
75    println!("Since execution completed successfully, all checkpoints were cleaned up.");
76
77    Ok(())
78}

Trait Implementations§

Source§

impl Default for Runner

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> Allocation for T
where T: RefUnwindSafe + Send + Sync,