laminar_core/dag/
recovery.rs1use std::collections::HashMap;
10
11use fxhash::FxHashMap;
12use serde::{Deserialize, Serialize};
13
14use crate::operator::OperatorState;
15
16use super::error::DagError;
17use super::topology::NodeId;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct SerializableOperatorState {
24 pub operator_id: String,
26 pub data: Vec<u8>,
28}
29
30impl From<OperatorState> for SerializableOperatorState {
31 fn from(state: OperatorState) -> Self {
32 Self {
33 operator_id: state.operator_id,
34 data: state.data,
35 }
36 }
37}
38
39impl From<SerializableOperatorState> for OperatorState {
40 fn from(state: SerializableOperatorState) -> Self {
41 Self {
42 operator_id: state.operator_id,
43 data: state.data,
44 }
45 }
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct DagCheckpointSnapshot {
54 pub checkpoint_id: u64,
56 pub epoch: u64,
58 pub timestamp: i64,
60 pub node_states: HashMap<u32, SerializableOperatorState>,
62 pub source_offsets: HashMap<String, u64>,
64 pub watermark: Option<i64>,
66}
67
68impl DagCheckpointSnapshot {
69 pub(crate) fn from_operator_states(
74 checkpoint_id: u64,
75 epoch: u64,
76 timestamp: i64,
77 states: &FxHashMap<NodeId, OperatorState>,
78 ) -> Self {
79 let node_states = states
80 .iter()
81 .map(|(node_id, state)| (node_id.0, SerializableOperatorState::from(state.clone())))
82 .collect();
83
84 Self {
85 checkpoint_id,
86 epoch,
87 timestamp,
88 node_states,
89 source_offsets: HashMap::new(),
90 watermark: None,
91 }
92 }
93
94 #[must_use]
96 pub fn to_operator_states(&self) -> FxHashMap<NodeId, OperatorState> {
97 self.node_states
98 .iter()
99 .map(|(&id, state)| (NodeId(id), OperatorState::from(state.clone())))
100 .collect()
101 }
102}
103
104pub struct RecoveredDagState {
106 pub snapshot: DagCheckpointSnapshot,
108 pub operator_states: FxHashMap<NodeId, OperatorState>,
110 pub source_offsets: HashMap<String, u64>,
112 pub watermark: Option<i64>,
114}
115
116impl std::fmt::Debug for RecoveredDagState {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 f.debug_struct("RecoveredDagState")
119 .field("checkpoint_id", &self.snapshot.checkpoint_id)
120 .field("epoch", &self.snapshot.epoch)
121 .field("operator_count", &self.operator_states.len())
122 .field("source_offsets", &self.source_offsets)
123 .field("watermark", &self.watermark)
124 .finish()
125 }
126}
127
128#[derive(Debug)]
133pub struct DagRecoveryManager {
134 snapshots: Vec<DagCheckpointSnapshot>,
136}
137
138impl DagRecoveryManager {
139 #[must_use]
141 pub fn new() -> Self {
142 Self {
143 snapshots: Vec::new(),
144 }
145 }
146
147 #[must_use]
149 pub fn with_snapshots(snapshots: Vec<DagCheckpointSnapshot>) -> Self {
150 Self { snapshots }
151 }
152
153 pub fn add_snapshot(&mut self, snapshot: DagCheckpointSnapshot) {
155 self.snapshots.push(snapshot);
156 }
157
158 pub fn recover_latest(&self) -> Result<RecoveredDagState, DagError> {
164 let snapshot = self
165 .snapshots
166 .iter()
167 .max_by_key(|s| s.checkpoint_id)
168 .ok_or(DagError::CheckpointNotFound)?
169 .clone();
170
171 Ok(Self::build_recovered_state(snapshot))
172 }
173
174 pub fn recover_by_id(&self, checkpoint_id: u64) -> Result<RecoveredDagState, DagError> {
181 let snapshot = self
182 .snapshots
183 .iter()
184 .find(|s| s.checkpoint_id == checkpoint_id)
185 .ok_or(DagError::CheckpointNotFound)?
186 .clone();
187
188 Ok(Self::build_recovered_state(snapshot))
189 }
190
191 #[must_use]
193 pub fn snapshot_count(&self) -> usize {
194 self.snapshots.len()
195 }
196
197 #[must_use]
199 pub fn has_snapshots(&self) -> bool {
200 !self.snapshots.is_empty()
201 }
202
203 fn build_recovered_state(snapshot: DagCheckpointSnapshot) -> RecoveredDagState {
205 let operator_states = snapshot.to_operator_states();
206 let source_offsets = snapshot.source_offsets.clone();
207 let watermark = snapshot.watermark;
208
209 RecoveredDagState {
210 snapshot,
211 operator_states,
212 source_offsets,
213 watermark,
214 }
215 }
216}
217
218impl Default for DagRecoveryManager {
219 fn default() -> Self {
220 Self::new()
221 }
222}