Skip to main content

laminar_core/dag/
checkpoint.rs

1//! Chandy-Lamport barrier checkpointing for DAG pipelines.
2//!
3//! This module implements barrier-based checkpointing:
4//!
5//! - [`CheckpointBarrier`] — marker injected at source nodes
6//! - [`BarrierAligner`] — buffers events at fan-in (MPSC) nodes until all
7//!   upstream inputs have delivered their barrier
8//! - [`DagCheckpointCoordinator`] — Ring 1 orchestrator that triggers
9//!   checkpoints, tracks progress, and produces snapshots
10//! - [`DagCheckpointConfig`] — tuning knobs (interval, timeout, retention)
11//!
12//! Barriers do NOT flow through event queues — they are handled by a separate
13//! orchestration path, keeping the hot-path [`Event`]
14//! type unchanged.
15
16use std::collections::VecDeque;
17use std::time::Duration;
18
19use fxhash::FxHashMap;
20
21use crate::operator::{Event, OperatorState};
22
23use super::error::DagError;
24use super::recovery::DagCheckpointSnapshot;
25use super::topology::NodeId;
26
27/// Checkpoint identifier.
28pub type CheckpointId = u64;
29
30/// Barrier type for checkpoint coordination.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum BarrierType {
33    /// All inputs must deliver the barrier before the node snapshots.
34    /// Events from already-aligned inputs are buffered until alignment.
35    Aligned,
36}
37
38/// A checkpoint barrier injected at source nodes and propagated through the DAG.
39#[derive(Debug, Clone)]
40pub struct CheckpointBarrier {
41    /// Unique identifier for this checkpoint.
42    pub checkpoint_id: CheckpointId,
43    /// Monotonically increasing epoch counter.
44    pub epoch: u64,
45    /// Timestamp when the barrier was created (event-time millis).
46    pub timestamp: i64,
47    /// Barrier alignment strategy.
48    pub barrier_type: BarrierType,
49}
50
51/// Configuration for DAG checkpointing.
52#[derive(Debug, Clone)]
53pub struct DagCheckpointConfig {
54    /// How often to trigger checkpoints.
55    pub interval: Duration,
56    /// Barrier alignment strategy.
57    pub barrier_type: BarrierType,
58    /// Maximum time to wait for barrier alignment at fan-in nodes.
59    pub alignment_timeout: Duration,
60    /// Whether to use incremental checkpoints (future use).
61    pub incremental: bool,
62    /// Maximum number of concurrent in-flight checkpoints.
63    pub max_concurrent: usize,
64    /// Maximum number of completed snapshots to retain.
65    pub max_retained: usize,
66}
67
68impl Default for DagCheckpointConfig {
69    fn default() -> Self {
70        Self {
71            interval: Duration::from_secs(60),
72            barrier_type: BarrierType::Aligned,
73            alignment_timeout: Duration::from_secs(10),
74            incremental: false,
75            max_concurrent: 1,
76            max_retained: 3,
77        }
78    }
79}
80
81// ---------------------------------------------------------------------------
82// BarrierAligner
83// ---------------------------------------------------------------------------
84
85/// Result of presenting a barrier to the aligner.
86#[derive(Debug)]
87pub enum AlignmentResult {
88    /// Not all inputs have delivered their barrier yet.
89    Pending,
90    /// All inputs have delivered their barrier; checkpoint can proceed.
91    Aligned {
92        /// The checkpoint barrier (from the final input).
93        barrier: CheckpointBarrier,
94        /// Events that were buffered from already-aligned inputs.
95        buffered_events: Vec<Event>,
96    },
97}
98
99/// Buffers events at fan-in (MPSC) nodes until all upstream inputs
100/// have delivered their checkpoint barrier.
101///
102/// For nodes with a single input (`expected_inputs == 1`), alignment
103/// is immediate — no buffering occurs.
104#[derive(Debug)]
105pub struct BarrierAligner {
106    /// Number of upstream inputs that must deliver a barrier.
107    expected_inputs: usize,
108    /// Barriers received so far, keyed by source node.
109    barriers_received: FxHashMap<NodeId, CheckpointBarrier>,
110    /// Events buffered from sources that have already delivered their barrier.
111    buffered_events: FxHashMap<NodeId, VecDeque<Event>>,
112    /// The checkpoint currently being aligned (if any).
113    current_checkpoint_id: Option<CheckpointId>,
114}
115
116impl BarrierAligner {
117    /// Creates a new barrier aligner for a node with `expected_inputs` upstream edges.
118    #[must_use]
119    pub fn new(expected_inputs: usize) -> Self {
120        Self {
121            expected_inputs,
122            barriers_received: FxHashMap::default(),
123            buffered_events: FxHashMap::default(),
124            current_checkpoint_id: None,
125        }
126    }
127
128    /// Processes a barrier arriving from `source_node`.
129    ///
130    /// Returns [`AlignmentResult::Pending`] until all inputs have reported,
131    /// then returns [`AlignmentResult::Aligned`] with the barrier and any
132    /// buffered events.
133    ///
134    /// # Panics
135    ///
136    /// Panics if the internal barrier map is empty after insertion (should
137    /// never happen).
138    pub fn on_barrier(
139        &mut self,
140        source_node: NodeId,
141        barrier: CheckpointBarrier,
142    ) -> AlignmentResult {
143        self.current_checkpoint_id = Some(barrier.checkpoint_id);
144        self.barriers_received.insert(source_node, barrier);
145
146        if self.barriers_received.len() >= self.expected_inputs {
147            // All inputs aligned — drain buffered events.
148            let mut all_buffered = Vec::new();
149            for (_node, mut events) in self.buffered_events.drain() {
150                all_buffered.extend(events.drain(..));
151            }
152
153            // Take the last barrier received as the canonical one.
154            let barrier = self
155                .barriers_received
156                .values()
157                .last()
158                .cloned()
159                .expect("at least one barrier");
160
161            AlignmentResult::Aligned {
162                barrier,
163                buffered_events: all_buffered,
164            }
165        } else {
166            AlignmentResult::Pending
167        }
168    }
169
170    /// Buffers an event from `source_node` if that source has already
171    /// delivered its barrier for the current checkpoint.
172    ///
173    /// Returns `true` if the event was buffered (source already aligned),
174    /// `false` if the source has not yet delivered its barrier (event should
175    /// be processed normally).
176    pub fn buffer_if_aligned(&mut self, source_node: NodeId, event: Event) -> bool {
177        if self.barriers_received.contains_key(&source_node) {
178            self.buffered_events
179                .entry(source_node)
180                .or_default()
181                .push_back(event);
182            true
183        } else {
184            false
185        }
186    }
187
188    /// Returns whether `source_node` has already delivered its barrier.
189    #[must_use]
190    pub fn is_source_aligned(&self, source_node: NodeId) -> bool {
191        self.barriers_received.contains_key(&source_node)
192    }
193
194    /// Resets alignment state after a checkpoint completes.
195    pub fn complete_checkpoint(&mut self) {
196        self.barriers_received.clear();
197        self.buffered_events.clear();
198        self.current_checkpoint_id = None;
199    }
200
201    /// Returns how many barriers have been received so far.
202    #[must_use]
203    pub fn barriers_received_count(&self) -> usize {
204        self.barriers_received.len()
205    }
206
207    /// Returns the expected number of upstream inputs.
208    #[must_use]
209    pub fn expected_inputs(&self) -> usize {
210        self.expected_inputs
211    }
212}
213
214// ---------------------------------------------------------------------------
215// DagCheckpointCoordinator
216// ---------------------------------------------------------------------------
217
218/// Tracks progress of an in-flight checkpoint.
219struct CheckpointProgress {
220    /// Checkpoint identifier.
221    checkpoint_id: CheckpointId,
222    /// Epoch number.
223    epoch: u64,
224    /// Operator states reported by completed nodes.
225    completed_nodes: FxHashMap<NodeId, OperatorState>,
226    /// Nodes that have not yet reported.
227    pending_nodes: Vec<NodeId>,
228    /// When the checkpoint was triggered (event-time millis).
229    triggered_at: i64,
230}
231
232/// Ring 1 checkpoint coordinator.
233///
234/// Orchestrates the checkpoint lifecycle:
235/// 1. [`trigger_checkpoint()`](Self::trigger_checkpoint) — creates a barrier
236/// 2. Barrier propagates through the DAG (external to this struct)
237/// 3. [`on_node_snapshot_complete()`](Self::on_node_snapshot_complete) —
238///    each node reports its state
239/// 4. [`finalize_checkpoint()`](Self::finalize_checkpoint) — produces a
240///    [`DagCheckpointSnapshot`]
241pub struct DagCheckpointCoordinator {
242    /// Configuration.
243    config: DagCheckpointConfig,
244    /// Source node IDs (barrier injection points).
245    source_nodes: Vec<NodeId>,
246    /// All node IDs in the DAG.
247    all_nodes: Vec<NodeId>,
248    /// Next epoch counter.
249    next_epoch: u64,
250    /// Next checkpoint ID counter.
251    next_checkpoint_id: CheckpointId,
252    /// Currently in-flight checkpoint progress.
253    in_progress: Option<CheckpointProgress>,
254    /// Completed snapshots (bounded by `max_retained`).
255    completed_snapshots: Vec<DagCheckpointSnapshot>,
256    /// Maximum number of snapshots to retain.
257    max_retained: usize,
258}
259
260impl DagCheckpointCoordinator {
261    /// Creates a new checkpoint coordinator.
262    ///
263    /// # Arguments
264    ///
265    /// * `source_nodes` — DAG source node IDs (barrier injection points)
266    /// * `all_nodes` — all node IDs in the DAG
267    /// * `config` — checkpoint configuration
268    #[must_use]
269    pub fn new(
270        source_nodes: Vec<NodeId>,
271        all_nodes: Vec<NodeId>,
272        config: DagCheckpointConfig,
273    ) -> Self {
274        let max_retained = config.max_retained;
275        Self {
276            config,
277            source_nodes,
278            all_nodes,
279            next_epoch: 1,
280            next_checkpoint_id: 1,
281            in_progress: None,
282            completed_snapshots: Vec::new(),
283            max_retained,
284        }
285    }
286
287    /// Triggers a new checkpoint by creating a barrier.
288    ///
289    /// # Errors
290    ///
291    /// Returns [`DagError::CheckpointInProgress`] if a checkpoint is already
292    /// in flight.
293    pub fn trigger_checkpoint(&mut self) -> Result<CheckpointBarrier, DagError> {
294        if let Some(ref progress) = self.in_progress {
295            return Err(DagError::CheckpointInProgress(progress.epoch));
296        }
297
298        let checkpoint_id = self.next_checkpoint_id;
299        self.next_checkpoint_id += 1;
300        let epoch = self.next_epoch;
301        self.next_epoch += 1;
302
303        #[allow(clippy::cast_possible_truncation)]
304        // Timestamp ms fits i64 for ~292 years from epoch
305        let timestamp = std::time::SystemTime::now()
306            .duration_since(std::time::UNIX_EPOCH)
307            .map_or(0, |d| d.as_millis() as i64);
308
309        let barrier = CheckpointBarrier {
310            checkpoint_id,
311            epoch,
312            timestamp,
313            barrier_type: self.config.barrier_type,
314        };
315
316        self.in_progress = Some(CheckpointProgress {
317            checkpoint_id,
318            epoch,
319            completed_nodes: FxHashMap::default(),
320            pending_nodes: self.all_nodes.clone(),
321            triggered_at: barrier.timestamp,
322        });
323
324        Ok(barrier)
325    }
326
327    /// Records that a node has completed its snapshot.
328    ///
329    /// Returns `true` if all nodes have now reported (checkpoint is ready
330    /// to finalize).
331    pub fn on_node_snapshot_complete(&mut self, node_id: NodeId, state: OperatorState) -> bool {
332        if let Some(ref mut progress) = self.in_progress {
333            progress.pending_nodes.retain(|&n| n != node_id);
334            progress.completed_nodes.insert(node_id, state);
335            progress.pending_nodes.is_empty()
336        } else {
337            false
338        }
339    }
340
341    /// Finalizes the current checkpoint, producing a snapshot.
342    ///
343    /// # Errors
344    ///
345    /// Returns [`DagError::NoCheckpointInProgress`] if no checkpoint is active.
346    /// Returns [`DagError::CheckpointIncomplete`] if not all nodes have reported.
347    pub fn finalize_checkpoint(&mut self) -> Result<DagCheckpointSnapshot, DagError> {
348        let progress = self
349            .in_progress
350            .take()
351            .ok_or(DagError::NoCheckpointInProgress)?;
352
353        if !progress.pending_nodes.is_empty() {
354            let pending = progress.pending_nodes.len();
355            // Put progress back so it can continue.
356            self.in_progress = Some(progress);
357            return Err(DagError::CheckpointIncomplete { pending });
358        }
359
360        let snapshot = DagCheckpointSnapshot::from_operator_states(
361            progress.checkpoint_id,
362            progress.epoch,
363            progress.triggered_at,
364            &progress.completed_nodes,
365        );
366
367        self.completed_snapshots.push(snapshot.clone());
368
369        // Trim old snapshots.
370        while self.completed_snapshots.len() > self.max_retained {
371            self.completed_snapshots.remove(0);
372        }
373
374        Ok(snapshot)
375    }
376
377    /// Returns the in-flight checkpoint progress (if any).
378    #[must_use]
379    pub fn in_progress(&self) -> Option<&CheckpointBarrier> {
380        // We reconstruct a temporary barrier from progress for API consumers.
381        // Since we can't return a reference to a temporary, we expose a simpler API.
382        None // Use is_checkpoint_in_progress() instead.
383    }
384
385    /// Returns whether a checkpoint is currently in progress.
386    #[must_use]
387    pub fn is_checkpoint_in_progress(&self) -> bool {
388        self.in_progress.is_some()
389    }
390
391    /// Returns completed snapshots.
392    #[must_use]
393    pub fn completed_snapshots(&self) -> &[DagCheckpointSnapshot] {
394        &self.completed_snapshots
395    }
396
397    /// Returns the latest completed snapshot (if any).
398    #[must_use]
399    pub fn latest_snapshot(&self) -> Option<&DagCheckpointSnapshot> {
400        self.completed_snapshots.last()
401    }
402
403    /// Returns the current epoch counter (next epoch to be assigned).
404    #[must_use]
405    pub fn current_epoch(&self) -> u64 {
406        self.next_epoch
407    }
408
409    /// Returns the source node IDs.
410    #[must_use]
411    pub fn source_nodes(&self) -> &[NodeId] {
412        &self.source_nodes
413    }
414}
415
416impl std::fmt::Debug for DagCheckpointCoordinator {
417    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
418        f.debug_struct("DagCheckpointCoordinator")
419            .field("next_epoch", &self.next_epoch)
420            .field("next_checkpoint_id", &self.next_checkpoint_id)
421            .field("in_progress", &self.in_progress.is_some())
422            .field("completed_count", &self.completed_snapshots.len())
423            .field("source_nodes", &self.source_nodes)
424            .finish_non_exhaustive()
425    }
426}