Skip to main content

laminar_core/dag/
recovery.rs

1//! Snapshot and recovery management for DAG checkpoints.
2//!
3//! [`DagCheckpointSnapshot`] captures operator state at a point-in-time.
4//! It uses `std::collections::HashMap` (not `FxHashMap`) because it must
5//! be `Serialize`/`Deserialize` for persistence by the caller.
6//!
7//! [`DagRecoveryManager`] holds snapshots and provides recovery APIs.
8
9use std::collections::HashMap;
10
11use fxhash::FxHashMap;
12use serde::{Deserialize, Serialize};
13
14use crate::operator::OperatorState;
15
16use super::error::DagError;
17use super::topology::NodeId;
18
19/// Serializable form of [`OperatorState`].
20///
21/// Uses standard library types so it can derive `Serialize`/`Deserialize`.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct SerializableOperatorState {
24    /// Operator identifier.
25    pub operator_id: String,
26    /// Serialized state data.
27    pub data: Vec<u8>,
28}
29
30impl From<OperatorState> for SerializableOperatorState {
31    fn from(state: OperatorState) -> Self {
32        Self {
33            operator_id: state.operator_id,
34            data: state.data,
35        }
36    }
37}
38
39impl From<SerializableOperatorState> for OperatorState {
40    fn from(state: SerializableOperatorState) -> Self {
41        Self {
42            operator_id: state.operator_id,
43            data: state.data,
44        }
45    }
46}
47
48/// A point-in-time snapshot of the entire DAG's operator state.
49///
50/// Produced by [`DagCheckpointCoordinator::finalize_checkpoint()`](super::checkpoint::DagCheckpointCoordinator::finalize_checkpoint).
51/// The snapshot is serializable — persistence is the caller's responsibility.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct DagCheckpointSnapshot {
54    /// Unique checkpoint identifier.
55    pub checkpoint_id: u64,
56    /// Monotonically increasing epoch.
57    pub epoch: u64,
58    /// Timestamp when the checkpoint was triggered (millis since epoch).
59    pub timestamp: i64,
60    /// Per-node operator state, keyed by `NodeId.0`.
61    pub node_states: HashMap<u32, SerializableOperatorState>,
62    /// Per-source offset tracking (source name → offset).
63    pub source_offsets: HashMap<String, u64>,
64    /// Watermark at checkpoint time.
65    pub watermark: Option<i64>,
66}
67
68impl DagCheckpointSnapshot {
69    /// Creates a snapshot from a map of operator states.
70    ///
71    /// Converts from `FxHashMap<NodeId, OperatorState>` (internal) to
72    /// `HashMap<u32, SerializableOperatorState>` (serializable).
73    pub(crate) fn from_operator_states(
74        checkpoint_id: u64,
75        epoch: u64,
76        timestamp: i64,
77        states: &FxHashMap<NodeId, OperatorState>,
78    ) -> Self {
79        let node_states = states
80            .iter()
81            .map(|(node_id, state)| (node_id.0, SerializableOperatorState::from(state.clone())))
82            .collect();
83
84        Self {
85            checkpoint_id,
86            epoch,
87            timestamp,
88            node_states,
89            source_offsets: HashMap::new(),
90            watermark: None,
91        }
92    }
93
94    /// Converts node states back to `FxHashMap<NodeId, OperatorState>`.
95    #[must_use]
96    pub fn to_operator_states(&self) -> FxHashMap<NodeId, OperatorState> {
97        self.node_states
98            .iter()
99            .map(|(&id, state)| (NodeId(id), OperatorState::from(state.clone())))
100            .collect()
101    }
102}
103
104/// Recovered DAG state from a checkpoint snapshot.
105pub struct RecoveredDagState {
106    /// The snapshot that was used for recovery.
107    pub snapshot: DagCheckpointSnapshot,
108    /// Operator states converted back to internal representation.
109    pub operator_states: FxHashMap<NodeId, OperatorState>,
110    /// Source offsets for resuming consumption.
111    pub source_offsets: HashMap<String, u64>,
112    /// Watermark at the time of the checkpoint.
113    pub watermark: Option<i64>,
114}
115
116impl std::fmt::Debug for RecoveredDagState {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        f.debug_struct("RecoveredDagState")
119            .field("checkpoint_id", &self.snapshot.checkpoint_id)
120            .field("epoch", &self.snapshot.epoch)
121            .field("operator_count", &self.operator_states.len())
122            .field("source_offsets", &self.source_offsets)
123            .field("watermark", &self.watermark)
124            .finish()
125    }
126}
127
128/// Manages checkpoint snapshots and provides recovery.
129///
130/// Snapshots are held in memory. Persistence to disk or object storage
131/// is the caller's responsibility.
132#[derive(Debug)]
133pub struct DagRecoveryManager {
134    /// Stored snapshots, ordered by `checkpoint_id`.
135    snapshots: Vec<DagCheckpointSnapshot>,
136}
137
138impl DagRecoveryManager {
139    /// Creates an empty recovery manager.
140    #[must_use]
141    pub fn new() -> Self {
142        Self {
143            snapshots: Vec::new(),
144        }
145    }
146
147    /// Creates a recovery manager pre-loaded with snapshots.
148    #[must_use]
149    pub fn with_snapshots(snapshots: Vec<DagCheckpointSnapshot>) -> Self {
150        Self { snapshots }
151    }
152
153    /// Adds a snapshot to the manager.
154    pub fn add_snapshot(&mut self, snapshot: DagCheckpointSnapshot) {
155        self.snapshots.push(snapshot);
156    }
157
158    /// Recovers from the latest (highest `checkpoint_id`) snapshot.
159    ///
160    /// # Errors
161    ///
162    /// Returns [`DagError::CheckpointNotFound`] if no snapshots exist.
163    pub fn recover_latest(&self) -> Result<RecoveredDagState, DagError> {
164        let snapshot = self
165            .snapshots
166            .iter()
167            .max_by_key(|s| s.checkpoint_id)
168            .ok_or(DagError::CheckpointNotFound)?
169            .clone();
170
171        Ok(Self::build_recovered_state(snapshot))
172    }
173
174    /// Recovers from a specific checkpoint by ID.
175    ///
176    /// # Errors
177    ///
178    /// Returns [`DagError::CheckpointNotFound`] if the checkpoint ID
179    /// does not exist.
180    pub fn recover_by_id(&self, checkpoint_id: u64) -> Result<RecoveredDagState, DagError> {
181        let snapshot = self
182            .snapshots
183            .iter()
184            .find(|s| s.checkpoint_id == checkpoint_id)
185            .ok_or(DagError::CheckpointNotFound)?
186            .clone();
187
188        Ok(Self::build_recovered_state(snapshot))
189    }
190
191    /// Returns the number of stored snapshots.
192    #[must_use]
193    pub fn snapshot_count(&self) -> usize {
194        self.snapshots.len()
195    }
196
197    /// Returns whether any snapshots are available.
198    #[must_use]
199    pub fn has_snapshots(&self) -> bool {
200        !self.snapshots.is_empty()
201    }
202
203    /// Builds a `RecoveredDagState` from a snapshot.
204    fn build_recovered_state(snapshot: DagCheckpointSnapshot) -> RecoveredDagState {
205        let operator_states = snapshot.to_operator_states();
206        let source_offsets = snapshot.source_offsets.clone();
207        let watermark = snapshot.watermark;
208
209        RecoveredDagState {
210            snapshot,
211            operator_states,
212            source_offsets,
213            watermark,
214        }
215    }
216}
217
218impl Default for DagRecoveryManager {
219    fn default() -> Self {
220        Self::new()
221    }
222}