laminar_core/dag/checkpoint.rs
1//! Chandy-Lamport barrier checkpointing for DAG pipelines.
2//!
3//! This module implements barrier-based checkpointing:
4//!
5//! - [`CheckpointBarrier`] — marker injected at source nodes
6//! - [`BarrierAligner`] — buffers events at fan-in (MPSC) nodes until all
7//! upstream inputs have delivered their barrier
8//! - [`DagCheckpointCoordinator`] — Ring 1 orchestrator that triggers
9//! checkpoints, tracks progress, and produces snapshots
10//! - [`DagCheckpointConfig`] — tuning knobs (interval, timeout, retention)
11//!
12//! Barriers do NOT flow through event queues — they are handled by a separate
13//! orchestration path, keeping the hot-path [`Event`]
14//! type unchanged.
15
16use std::collections::VecDeque;
17use std::time::Duration;
18
19use fxhash::FxHashMap;
20
21use crate::operator::{Event, OperatorState};
22
23use super::error::DagError;
24use super::recovery::DagCheckpointSnapshot;
25use super::topology::NodeId;
26
27/// Checkpoint identifier.
28pub type CheckpointId = u64;
29
30/// Barrier type for checkpoint coordination.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum BarrierType {
33 /// All inputs must deliver the barrier before the node snapshots.
34 /// Events from already-aligned inputs are buffered until alignment.
35 Aligned,
36}
37
38/// A checkpoint barrier injected at source nodes and propagated through the DAG.
39#[derive(Debug, Clone)]
40pub struct CheckpointBarrier {
41 /// Unique identifier for this checkpoint.
42 pub checkpoint_id: CheckpointId,
43 /// Monotonically increasing epoch counter.
44 pub epoch: u64,
45 /// Timestamp when the barrier was created (event-time millis).
46 pub timestamp: i64,
47 /// Barrier alignment strategy.
48 pub barrier_type: BarrierType,
49}
50
51/// Configuration for DAG checkpointing.
52#[derive(Debug, Clone)]
53pub struct DagCheckpointConfig {
54 /// How often to trigger checkpoints.
55 pub interval: Duration,
56 /// Barrier alignment strategy.
57 pub barrier_type: BarrierType,
58 /// Maximum time to wait for barrier alignment at fan-in nodes.
59 pub alignment_timeout: Duration,
60 /// Whether to use incremental checkpoints (future use).
61 pub incremental: bool,
62 /// Maximum number of concurrent in-flight checkpoints.
63 pub max_concurrent: usize,
64 /// Maximum number of completed snapshots to retain.
65 pub max_retained: usize,
66}
67
68impl Default for DagCheckpointConfig {
69 fn default() -> Self {
70 Self {
71 interval: Duration::from_secs(60),
72 barrier_type: BarrierType::Aligned,
73 alignment_timeout: Duration::from_secs(10),
74 incremental: false,
75 max_concurrent: 1,
76 max_retained: 3,
77 }
78 }
79}
80
81// ---------------------------------------------------------------------------
82// BarrierAligner
83// ---------------------------------------------------------------------------
84
85/// Result of presenting a barrier to the aligner.
86#[derive(Debug)]
87pub enum AlignmentResult {
88 /// Not all inputs have delivered their barrier yet.
89 Pending,
90 /// All inputs have delivered their barrier; checkpoint can proceed.
91 Aligned {
92 /// The checkpoint barrier (from the final input).
93 barrier: CheckpointBarrier,
94 /// Events that were buffered from already-aligned inputs.
95 buffered_events: Vec<Event>,
96 },
97}
98
99/// Buffers events at fan-in (MPSC) nodes until all upstream inputs
100/// have delivered their checkpoint barrier.
101///
102/// For nodes with a single input (`expected_inputs == 1`), alignment
103/// is immediate — no buffering occurs.
104#[derive(Debug)]
105pub struct BarrierAligner {
106 /// Number of upstream inputs that must deliver a barrier.
107 expected_inputs: usize,
108 /// Barriers received so far, keyed by source node.
109 barriers_received: FxHashMap<NodeId, CheckpointBarrier>,
110 /// Events buffered from sources that have already delivered their barrier.
111 buffered_events: FxHashMap<NodeId, VecDeque<Event>>,
112 /// The checkpoint currently being aligned (if any).
113 current_checkpoint_id: Option<CheckpointId>,
114}
115
116impl BarrierAligner {
117 /// Creates a new barrier aligner for a node with `expected_inputs` upstream edges.
118 #[must_use]
119 pub fn new(expected_inputs: usize) -> Self {
120 Self {
121 expected_inputs,
122 barriers_received: FxHashMap::default(),
123 buffered_events: FxHashMap::default(),
124 current_checkpoint_id: None,
125 }
126 }
127
128 /// Processes a barrier arriving from `source_node`.
129 ///
130 /// Returns [`AlignmentResult::Pending`] until all inputs have reported,
131 /// then returns [`AlignmentResult::Aligned`] with the barrier and any
132 /// buffered events.
133 ///
134 /// # Panics
135 ///
136 /// Panics if the internal barrier map is empty after insertion (should
137 /// never happen).
138 pub fn on_barrier(
139 &mut self,
140 source_node: NodeId,
141 barrier: CheckpointBarrier,
142 ) -> AlignmentResult {
143 self.current_checkpoint_id = Some(barrier.checkpoint_id);
144 self.barriers_received.insert(source_node, barrier);
145
146 if self.barriers_received.len() >= self.expected_inputs {
147 // All inputs aligned — drain buffered events.
148 let mut all_buffered = Vec::new();
149 for (_node, mut events) in self.buffered_events.drain() {
150 all_buffered.extend(events.drain(..));
151 }
152
153 // Take the last barrier received as the canonical one.
154 let barrier = self
155 .barriers_received
156 .values()
157 .last()
158 .cloned()
159 .expect("at least one barrier");
160
161 AlignmentResult::Aligned {
162 barrier,
163 buffered_events: all_buffered,
164 }
165 } else {
166 AlignmentResult::Pending
167 }
168 }
169
170 /// Buffers an event from `source_node` if that source has already
171 /// delivered its barrier for the current checkpoint.
172 ///
173 /// Returns `true` if the event was buffered (source already aligned),
174 /// `false` if the source has not yet delivered its barrier (event should
175 /// be processed normally).
176 pub fn buffer_if_aligned(&mut self, source_node: NodeId, event: Event) -> bool {
177 if self.barriers_received.contains_key(&source_node) {
178 self.buffered_events
179 .entry(source_node)
180 .or_default()
181 .push_back(event);
182 true
183 } else {
184 false
185 }
186 }
187
188 /// Returns whether `source_node` has already delivered its barrier.
189 #[must_use]
190 pub fn is_source_aligned(&self, source_node: NodeId) -> bool {
191 self.barriers_received.contains_key(&source_node)
192 }
193
194 /// Resets alignment state after a checkpoint completes.
195 pub fn complete_checkpoint(&mut self) {
196 self.barriers_received.clear();
197 self.buffered_events.clear();
198 self.current_checkpoint_id = None;
199 }
200
201 /// Returns how many barriers have been received so far.
202 #[must_use]
203 pub fn barriers_received_count(&self) -> usize {
204 self.barriers_received.len()
205 }
206
207 /// Returns the expected number of upstream inputs.
208 #[must_use]
209 pub fn expected_inputs(&self) -> usize {
210 self.expected_inputs
211 }
212}
213
214// ---------------------------------------------------------------------------
215// DagCheckpointCoordinator
216// ---------------------------------------------------------------------------
217
218/// Tracks progress of an in-flight checkpoint.
219struct CheckpointProgress {
220 /// Checkpoint identifier.
221 checkpoint_id: CheckpointId,
222 /// Epoch number.
223 epoch: u64,
224 /// Operator states reported by completed nodes.
225 completed_nodes: FxHashMap<NodeId, OperatorState>,
226 /// Nodes that have not yet reported.
227 pending_nodes: Vec<NodeId>,
228 /// When the checkpoint was triggered (event-time millis).
229 triggered_at: i64,
230}
231
232/// Ring 1 checkpoint coordinator.
233///
234/// Orchestrates the checkpoint lifecycle:
235/// 1. [`trigger_checkpoint()`](Self::trigger_checkpoint) — creates a barrier
236/// 2. Barrier propagates through the DAG (external to this struct)
237/// 3. [`on_node_snapshot_complete()`](Self::on_node_snapshot_complete) —
238/// each node reports its state
239/// 4. [`finalize_checkpoint()`](Self::finalize_checkpoint) — produces a
240/// [`DagCheckpointSnapshot`]
241pub struct DagCheckpointCoordinator {
242 /// Configuration.
243 config: DagCheckpointConfig,
244 /// Source node IDs (barrier injection points).
245 source_nodes: Vec<NodeId>,
246 /// All node IDs in the DAG.
247 all_nodes: Vec<NodeId>,
248 /// Next epoch counter.
249 next_epoch: u64,
250 /// Next checkpoint ID counter.
251 next_checkpoint_id: CheckpointId,
252 /// Currently in-flight checkpoint progress.
253 in_progress: Option<CheckpointProgress>,
254 /// Completed snapshots (bounded by `max_retained`).
255 completed_snapshots: Vec<DagCheckpointSnapshot>,
256 /// Maximum number of snapshots to retain.
257 max_retained: usize,
258}
259
260impl DagCheckpointCoordinator {
261 /// Creates a new checkpoint coordinator.
262 ///
263 /// # Arguments
264 ///
265 /// * `source_nodes` — DAG source node IDs (barrier injection points)
266 /// * `all_nodes` — all node IDs in the DAG
267 /// * `config` — checkpoint configuration
268 #[must_use]
269 pub fn new(
270 source_nodes: Vec<NodeId>,
271 all_nodes: Vec<NodeId>,
272 config: DagCheckpointConfig,
273 ) -> Self {
274 let max_retained = config.max_retained;
275 Self {
276 config,
277 source_nodes,
278 all_nodes,
279 next_epoch: 1,
280 next_checkpoint_id: 1,
281 in_progress: None,
282 completed_snapshots: Vec::new(),
283 max_retained,
284 }
285 }
286
287 /// Triggers a new checkpoint by creating a barrier.
288 ///
289 /// # Errors
290 ///
291 /// Returns [`DagError::CheckpointInProgress`] if a checkpoint is already
292 /// in flight.
293 pub fn trigger_checkpoint(&mut self) -> Result<CheckpointBarrier, DagError> {
294 if let Some(ref progress) = self.in_progress {
295 return Err(DagError::CheckpointInProgress(progress.epoch));
296 }
297
298 let checkpoint_id = self.next_checkpoint_id;
299 self.next_checkpoint_id += 1;
300 let epoch = self.next_epoch;
301 self.next_epoch += 1;
302
303 #[allow(clippy::cast_possible_truncation)]
304 // Timestamp ms fits i64 for ~292 years from epoch
305 let timestamp = std::time::SystemTime::now()
306 .duration_since(std::time::UNIX_EPOCH)
307 .map_or(0, |d| d.as_millis() as i64);
308
309 let barrier = CheckpointBarrier {
310 checkpoint_id,
311 epoch,
312 timestamp,
313 barrier_type: self.config.barrier_type,
314 };
315
316 self.in_progress = Some(CheckpointProgress {
317 checkpoint_id,
318 epoch,
319 completed_nodes: FxHashMap::default(),
320 pending_nodes: self.all_nodes.clone(),
321 triggered_at: barrier.timestamp,
322 });
323
324 Ok(barrier)
325 }
326
327 /// Records that a node has completed its snapshot.
328 ///
329 /// Returns `true` if all nodes have now reported (checkpoint is ready
330 /// to finalize).
331 pub fn on_node_snapshot_complete(&mut self, node_id: NodeId, state: OperatorState) -> bool {
332 if let Some(ref mut progress) = self.in_progress {
333 progress.pending_nodes.retain(|&n| n != node_id);
334 progress.completed_nodes.insert(node_id, state);
335 progress.pending_nodes.is_empty()
336 } else {
337 false
338 }
339 }
340
341 /// Finalizes the current checkpoint, producing a snapshot.
342 ///
343 /// # Errors
344 ///
345 /// Returns [`DagError::NoCheckpointInProgress`] if no checkpoint is active.
346 /// Returns [`DagError::CheckpointIncomplete`] if not all nodes have reported.
347 pub fn finalize_checkpoint(&mut self) -> Result<DagCheckpointSnapshot, DagError> {
348 let progress = self
349 .in_progress
350 .take()
351 .ok_or(DagError::NoCheckpointInProgress)?;
352
353 if !progress.pending_nodes.is_empty() {
354 let pending = progress.pending_nodes.len();
355 // Put progress back so it can continue.
356 self.in_progress = Some(progress);
357 return Err(DagError::CheckpointIncomplete { pending });
358 }
359
360 let snapshot = DagCheckpointSnapshot::from_operator_states(
361 progress.checkpoint_id,
362 progress.epoch,
363 progress.triggered_at,
364 &progress.completed_nodes,
365 );
366
367 self.completed_snapshots.push(snapshot.clone());
368
369 // Trim old snapshots.
370 while self.completed_snapshots.len() > self.max_retained {
371 self.completed_snapshots.remove(0);
372 }
373
374 Ok(snapshot)
375 }
376
377 /// Returns the in-flight checkpoint progress (if any).
378 #[must_use]
379 pub fn in_progress(&self) -> Option<&CheckpointBarrier> {
380 // We reconstruct a temporary barrier from progress for API consumers.
381 // Since we can't return a reference to a temporary, we expose a simpler API.
382 None // Use is_checkpoint_in_progress() instead.
383 }
384
385 /// Returns whether a checkpoint is currently in progress.
386 #[must_use]
387 pub fn is_checkpoint_in_progress(&self) -> bool {
388 self.in_progress.is_some()
389 }
390
391 /// Returns completed snapshots.
392 #[must_use]
393 pub fn completed_snapshots(&self) -> &[DagCheckpointSnapshot] {
394 &self.completed_snapshots
395 }
396
397 /// Returns the latest completed snapshot (if any).
398 #[must_use]
399 pub fn latest_snapshot(&self) -> Option<&DagCheckpointSnapshot> {
400 self.completed_snapshots.last()
401 }
402
403 /// Returns the current epoch counter (next epoch to be assigned).
404 #[must_use]
405 pub fn current_epoch(&self) -> u64 {
406 self.next_epoch
407 }
408
409 /// Returns the source node IDs.
410 #[must_use]
411 pub fn source_nodes(&self) -> &[NodeId] {
412 &self.source_nodes
413 }
414}
415
416impl std::fmt::Debug for DagCheckpointCoordinator {
417 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
418 f.debug_struct("DagCheckpointCoordinator")
419 .field("next_epoch", &self.next_epoch)
420 .field("next_checkpoint_id", &self.next_checkpoint_id)
421 .field("in_progress", &self.in_progress.is_some())
422 .field("completed_count", &self.completed_snapshots.len())
423 .field("source_nodes", &self.source_nodes)
424 .finish_non_exhaustive()
425 }
426}