Skip to main content

oris_execution_runtime/
recovery.rs

1//! Crash recovery pipeline: lease expiry → checkpoint reload → replay → re-dispatch.
2//!
3//! When a worker crashes, the lease expires (or is detected stale). This module
4//! defines the pipeline steps and a minimal context type so the kernel can
5//! drive: expire (via [crate::LeaseManager::tick] /
6//! [crate::RuntimeRepository::expire_leases_and_requeue]),
7//! then checkpoint reload and replay for the affected attempt, then allow
8//! re-dispatch to a new worker.
9
10use oris_kernel::identity::RunId;
11
12/// Steps of the zero-data-loss failure recovery pipeline.
13#[derive(Clone, Copy, Debug, Eq, PartialEq)]
14pub enum RecoveryStep {
15    /// Lease has expired or worker is gone; attempt is requeued.
16    LeaseExpired,
17    /// Checkpoint for the run is (to be) reloaded.
18    CheckpointReload,
19    /// Execution is (to be) replayed from checkpoint.
20    Replay,
21    /// Attempt is ready to be dispatched to a new worker.
22    ReadyForDispatch,
23}
24
25impl RecoveryStep {
26    /// Ordered sequence of steps for the pipeline.
27    pub fn pipeline() -> &'static [RecoveryStep] {
28        &[
29            RecoveryStep::LeaseExpired,
30            RecoveryStep::CheckpointReload,
31            RecoveryStep::Replay,
32            RecoveryStep::ReadyForDispatch,
33        ]
34    }
35}
36
37/// Context produced after checkpoint reload for replay. Pass `run_id` to replay logic.
38#[derive(Clone, Debug, Default)]
39pub struct RecoveryContext {
40    pub attempt_id: String,
41    pub run_id: RunId,
42}
43
44impl RecoveryContext {
45    pub fn new(attempt_id: impl Into<String>, run_id: impl Into<RunId>) -> Self {
46        Self {
47            attempt_id: attempt_id.into(),
48            run_id: run_id.into(),
49        }
50    }
51}
52
53/// Crash recovery pipeline: documents and drives the steps from lease expiry to re-dispatch.
54///
55/// Use with [RecoveryStep::pipeline]. After [expire_leases_and_requeue](crate::RuntimeRepository::expire_leases_and_requeue),
56/// the caller loads checkpoint for each requeued attempt, builds a [RecoveryContext] with
57/// attempt_id and run_id, then replays; the attempt becomes dispatchable again.
58#[derive(Clone, Debug)]
59pub struct CrashRecoveryPipeline {
60    pub attempt_id: String,
61    pub run_id: RunId,
62}
63
64impl CrashRecoveryPipeline {
65    pub fn new(attempt_id: impl Into<String>, run_id: impl Into<RunId>) -> Self {
66        Self {
67            attempt_id: attempt_id.into(),
68            run_id: run_id.into(),
69        }
70    }
71
72    /// Returns the ordered recovery steps.
73    pub fn steps() -> &'static [RecoveryStep] {
74        RecoveryStep::pipeline()
75    }
76
77    /// Builds a context for the replay phase (checkpoint reload done; run_id available for replay).
78    pub fn replay_context(&self) -> RecoveryContext {
79        RecoveryContext::new(&self.attempt_id, &self.run_id)
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use super::*;
86
87    #[test]
88    fn recovery_step_pipeline_order() {
89        let steps = RecoveryStep::pipeline();
90        assert_eq!(steps.len(), 4);
91        assert_eq!(steps[0], RecoveryStep::LeaseExpired);
92        assert_eq!(steps[1], RecoveryStep::CheckpointReload);
93        assert_eq!(steps[2], RecoveryStep::Replay);
94        assert_eq!(steps[3], RecoveryStep::ReadyForDispatch);
95    }
96
97    #[test]
98    fn crash_recovery_pipeline_replay_context() {
99        let p = CrashRecoveryPipeline::new("attempt-1", "run-1");
100        let ctx = p.replay_context();
101        assert_eq!(ctx.attempt_id, "attempt-1");
102        assert_eq!(ctx.run_id, "run-1");
103    }
104}