Skip to main content

laminar_db/
checkpoint_coordinator.rs

1//! Unified checkpoint coordinator.
2//!
3//! Single orchestrator that replaces `StreamCheckpointManager`,
4//! `PipelineCheckpointManager`, and the persistence side of `DagRecoveryManager`.
5//! Lives in Ring 2 (control plane). Reuses the existing
6//! `DagCheckpointCoordinator` for barrier logic.
7//!
8//! ## Checkpoint Cycle
9//!
10//! 1. Barrier propagation — `dag_coordinator.trigger_checkpoint()`
11//! 2. Operator snapshot — `dag_coordinator.finalize_checkpoint()` → operator states
12//! 3. Source snapshot — `source.checkpoint()` for each source
13//! 4. Sink pre-commit — `sink.pre_commit(epoch)` for each exactly-once sink
14//! 5. Manifest persist — `store.save(&manifest)` (atomic write)
15//! 6. Sink commit — `sink.commit_epoch(epoch)` for each exactly-once sink
16//! 7. On ANY failure at 6 — `sink.rollback_epoch()` on remaining sinks
17#![allow(clippy::disallowed_types)] // cold path
18
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use std::sync::atomic::Ordering;
24
25use laminar_connectors::checkpoint::SourceCheckpoint;
26use laminar_connectors::connector::SourceConnector;
27use laminar_storage::changelog_drainer::ChangelogDrainer;
28use laminar_storage::checkpoint_manifest::{
29    CheckpointManifest, ConnectorCheckpoint, SinkCommitStatus,
30};
31use laminar_storage::checkpoint_store::CheckpointStore;
32use laminar_storage::per_core_wal::PerCoreWalManager;
33use tracing::{debug, error, info, warn};
34
35use crate::error::DbError;
36use crate::metrics::PipelineCounters;
37
38/// Unified checkpoint configuration.
39#[derive(Debug, Clone)]
40pub struct CheckpointConfig {
41    /// Interval between checkpoints. `None` = manual only.
42    pub interval: Option<Duration>,
43    /// Maximum number of retained checkpoints.
44    pub max_retained: usize,
45    /// Maximum time to wait for barrier alignment at fan-in nodes.
46    pub alignment_timeout: Duration,
47    /// Maximum time to wait for all sinks to pre-commit.
48    ///
49    /// A stuck sink will not block checkpointing indefinitely.
50    /// Defaults to 30 seconds.
51    pub pre_commit_timeout: Duration,
52    /// Maximum time to wait for manifest persist (filesystem I/O).
53    ///
54    /// A hung or degraded filesystem will not stall the runtime indefinitely.
55    /// Defaults to 120 seconds.
56    pub persist_timeout: Duration,
57    /// Maximum time to wait for all sinks to commit (phase 2).
58    ///
59    /// Defaults to 60 seconds.
60    pub commit_timeout: Duration,
61    /// Whether to use incremental checkpoints.
62    pub incremental: bool,
63    /// Maximum operator state size (bytes) to inline in the JSON manifest.
64    ///
65    /// States larger than this threshold are written to a `state.bin` sidecar
66    /// file and referenced by offset/length in the manifest. This avoids
67    /// inflating the manifest with base64-encoded blobs (~33% overhead).
68    ///
69    /// Default: 1 MB (`1_048_576`). Set to `usize::MAX` to inline everything.
70    pub state_inline_threshold: usize,
71    /// Maximum total checkpoint size in bytes (manifest + sidecar).
72    ///
73    /// If the packed operator state exceeds this limit, the checkpoint is
74    /// rejected with `[LDB-6014]` and not persisted. This prevents
75    /// unbounded state from creating multi-GB sidecar files.
76    ///
77    /// `None` means no limit (default). A warning is logged at 80% of the cap.
78    pub max_checkpoint_bytes: Option<usize>,
79    /// Maximum pending changelog entries per drainer before forced clear.
80    ///
81    /// On checkpoint failure, `clear_pending()` is normally skipped. If
82    /// failures repeat, pending entries grow unboundedly → OOM. This cap
83    /// is a safety valve: if any drainer exceeds it after a failure, all
84    /// drainers are cleared and an `[LDB-6010]` warning is logged.
85    ///
86    /// Default: 10,000,000 entries.
87    pub max_pending_changelog_entries: usize,
88    /// Adaptive checkpoint interval configuration.
89    ///
90    /// When `Some`, the coordinator dynamically adjusts the checkpoint interval
91    /// based on observed checkpoint durations using an exponential moving average.
92    /// When `None` (default), the static `interval` is used unchanged.
93    pub adaptive: Option<AdaptiveCheckpointConfig>,
94    /// Unaligned checkpoint configuration.
95    ///
96    /// When `Some`, the coordinator can fall back to unaligned checkpoints
97    /// when barrier alignment takes too long under backpressure.
98    /// When `None` (default), only aligned checkpoints are used.
99    pub unaligned: Option<UnalignedCheckpointConfig>,
100}
101
102/// Configuration for adaptive checkpoint intervals.
103///
104/// Dynamically adjusts the checkpoint interval based on observed checkpoint
105/// durations: `interval = clamp(smoothed_duration / target_ratio, min, max)`.
106///
107/// This avoids checkpointing too frequently under light load (wasting I/O)
108/// or too infrequently under heavy load (increasing recovery time).
109#[derive(Debug, Clone)]
110pub struct AdaptiveCheckpointConfig {
111    /// Minimum checkpoint interval (floor). Default: 10s.
112    pub min_interval: Duration,
113    /// Maximum checkpoint interval (ceiling). Default: 300s.
114    pub max_interval: Duration,
115    /// Target ratio of checkpoint duration to interval.
116    ///
117    /// For example, 0.1 means checkpoints should take at most 10% of the
118    /// time between them. Default: 0.1.
119    pub target_overhead_ratio: f64,
120    /// EMA smoothing factor for checkpoint durations.
121    ///
122    /// Higher values give more weight to recent observations.
123    /// Range: 0.0–1.0. Default: 0.3.
124    pub smoothing_alpha: f64,
125}
126
127impl Default for AdaptiveCheckpointConfig {
128    fn default() -> Self {
129        Self {
130            min_interval: Duration::from_secs(10),
131            max_interval: Duration::from_secs(300),
132            target_overhead_ratio: 0.1,
133            smoothing_alpha: 0.3,
134        }
135    }
136}
137
138/// Configuration for unaligned checkpoints.
139///
140/// When barrier alignment takes too long (due to backpressure on slow inputs),
141/// the checkpoint can fall back to an unaligned snapshot that captures in-flight
142/// data from channels. This trades larger checkpoint size for faster completion.
143#[derive(Debug, Clone)]
144pub struct UnalignedCheckpointConfig {
145    /// Whether unaligned checkpoints are enabled.
146    pub enabled: bool,
147    /// Duration after which aligned checkpoint falls back to unaligned.
148    ///
149    /// If all barriers don't arrive within this threshold after the first
150    /// barrier, the checkpoint switches to unaligned mode. Default: 10s.
151    pub alignment_timeout_threshold: Duration,
152    /// Maximum bytes of in-flight data to buffer per checkpoint.
153    ///
154    /// If the total in-flight data exceeds this, the checkpoint fails
155    /// rather than consuming unbounded memory. Default: 256 MB.
156    pub max_inflight_buffer_bytes: usize,
157    /// Force unaligned mode for all checkpoints (skip aligned attempt).
158    ///
159    /// Useful for testing or when backpressure is known to be persistent.
160    pub force_unaligned: bool,
161}
162
163impl Default for UnalignedCheckpointConfig {
164    fn default() -> Self {
165        Self {
166            enabled: true,
167            alignment_timeout_threshold: Duration::from_secs(10),
168            max_inflight_buffer_bytes: 256 * 1024 * 1024,
169            force_unaligned: false,
170        }
171    }
172}
173
174impl Default for CheckpointConfig {
175    fn default() -> Self {
176        Self {
177            interval: Some(Duration::from_secs(60)),
178            max_retained: 3,
179            alignment_timeout: Duration::from_secs(30),
180            pre_commit_timeout: Duration::from_secs(30),
181            persist_timeout: Duration::from_secs(120),
182            commit_timeout: Duration::from_secs(60),
183            incremental: false,
184            state_inline_threshold: 1_048_576,
185            max_checkpoint_bytes: None,
186            max_pending_changelog_entries: 10_000_000,
187            adaptive: None,
188            unaligned: None,
189        }
190    }
191}
192
193/// Phase of the checkpoint lifecycle.
194#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
195pub enum CheckpointPhase {
196    /// No checkpoint in progress.
197    Idle,
198    /// Barrier injected, waiting for operator snapshots.
199    BarrierInFlight,
200    /// Operators snapshotted, collecting source positions.
201    Snapshotting,
202    /// Sinks pre-committing (phase 1).
203    PreCommitting,
204    /// Manifest being persisted.
205    Persisting,
206    /// Sinks committing (phase 2).
207    Committing,
208}
209
210impl std::fmt::Display for CheckpointPhase {
211    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212        match self {
213            Self::Idle => write!(f, "Idle"),
214            Self::BarrierInFlight => write!(f, "BarrierInFlight"),
215            Self::Snapshotting => write!(f, "Snapshotting"),
216            Self::PreCommitting => write!(f, "PreCommitting"),
217            Self::Persisting => write!(f, "Persisting"),
218            Self::Committing => write!(f, "Committing"),
219        }
220    }
221}
222
223/// Result of a checkpoint attempt.
224#[derive(Debug, serde::Serialize)]
225pub struct CheckpointResult {
226    /// Whether the checkpoint succeeded.
227    pub success: bool,
228    /// Checkpoint ID (if created).
229    pub checkpoint_id: u64,
230    /// Epoch number.
231    pub epoch: u64,
232    /// Duration of the checkpoint operation.
233    pub duration: Duration,
234    /// Error message if failed.
235    pub error: Option<String>,
236}
237
238/// Registered source for checkpoint coordination.
239pub(crate) struct RegisteredSource {
240    /// Source name.
241    pub name: String,
242    /// Source connector handle.
243    pub connector: Arc<tokio::sync::Mutex<Box<dyn SourceConnector>>>,
244    /// Whether this source supports replay from a checkpointed position.
245    ///
246    /// Sources that do not support replay (e.g., WebSocket) degrade
247    /// exactly-once semantics to at-most-once.
248    pub supports_replay: bool,
249}
250
251/// Registered sink for checkpoint coordination.
252pub(crate) struct RegisteredSink {
253    /// Sink name.
254    pub name: String,
255    /// Sink task handle (channel-based, no mutex contention).
256    pub handle: crate::sink_task::SinkTaskHandle,
257    /// Whether this sink supports exactly-once / two-phase commit.
258    pub exactly_once: bool,
259}
260
261/// Result of WAL preparation for a checkpoint.
262///
263/// Contains the WAL positions recorded after flushing changelog drainers,
264/// writing epoch barriers, and syncing all segments.
265#[derive(Debug, Clone)]
266pub struct WalPrepareResult {
267    /// Per-core WAL positions at the time of the epoch barrier.
268    pub per_core_wal_positions: Vec<u64>,
269    /// Number of changelog entries drained across all drainers.
270    pub entries_drained: u64,
271}
272
273/// Unified checkpoint coordinator.
274///
275/// Orchestrates the full checkpoint lifecycle across sources, sinks,
276/// and operator state, persisting everything in a single
277/// [`CheckpointManifest`].
278pub struct CheckpointCoordinator {
279    config: CheckpointConfig,
280    store: Arc<dyn CheckpointStore>,
281    sources: Vec<RegisteredSource>,
282    sinks: Vec<RegisteredSink>,
283    table_sources: Vec<RegisteredSource>,
284    next_checkpoint_id: u64,
285    epoch: u64,
286    phase: CheckpointPhase,
287    checkpoints_completed: u64,
288    checkpoints_failed: u64,
289    last_checkpoint_duration: Option<Duration>,
290    /// Rolling histogram of checkpoint durations for percentile tracking.
291    duration_histogram: DurationHistogram,
292    /// Per-core WAL manager for epoch barriers and truncation.
293    wal_manager: Option<PerCoreWalManager>,
294    /// Changelog drainers to flush before checkpointing.
295    changelog_drainers: Vec<ChangelogDrainer>,
296    /// Shared counters for observability.
297    counters: Option<Arc<PipelineCounters>>,
298    /// Exponential moving average of checkpoint durations (milliseconds).
299    smoothed_duration_ms: f64,
300    /// Cumulative bytes written across all checkpoints (manifest + sidecar).
301    total_bytes_written: u64,
302    /// Number of checkpoints completed in unaligned mode.
303    unaligned_checkpoint_count: u64,
304    /// WAL positions from the previous checkpoint, used as a truncation
305    /// safety buffer — we keep one checkpoint's worth of WAL replay data.
306    previous_wal_positions: Option<Vec<u64>>,
307}
308
309impl CheckpointCoordinator {
310    /// Creates a new checkpoint coordinator.
311    #[must_use]
312    pub fn new(config: CheckpointConfig, store: Box<dyn CheckpointStore>) -> Self {
313        let store: Arc<dyn CheckpointStore> = Arc::from(store);
314        // Determine starting epoch from stored checkpoints.
315        let (next_id, epoch) = match store.load_latest() {
316            Ok(Some(m)) => (m.checkpoint_id + 1, m.epoch + 1),
317            _ => (1, 1),
318        };
319
320        Self {
321            config,
322            store,
323            sources: Vec::new(),
324            sinks: Vec::new(),
325            table_sources: Vec::new(),
326            next_checkpoint_id: next_id,
327            epoch,
328            phase: CheckpointPhase::Idle,
329            checkpoints_completed: 0,
330            checkpoints_failed: 0,
331            last_checkpoint_duration: None,
332            duration_histogram: DurationHistogram::new(),
333            wal_manager: None,
334            changelog_drainers: Vec::new(),
335            counters: None,
336            smoothed_duration_ms: 0.0,
337            total_bytes_written: 0,
338            unaligned_checkpoint_count: 0,
339            previous_wal_positions: None,
340        }
341    }
342
343    /// Registers a source connector for checkpoint coordination.
344    ///
345    /// The `supports_replay` flag indicates whether the source can seek
346    /// back to a checkpointed offset. Sources that cannot replay (e.g.,
347    /// WebSocket) will log a warning since exactly-once semantics are
348    /// degraded to at-most-once for events from that source.
349    pub fn register_source(
350        &mut self,
351        name: impl Into<String>,
352        connector: Arc<tokio::sync::Mutex<Box<dyn SourceConnector>>>,
353        supports_replay: bool,
354    ) {
355        let name = name.into();
356        if !supports_replay {
357            warn!(
358                source = %name,
359                "source does not support replay — exactly-once semantics \
360                 are degraded to at-most-once for this source"
361            );
362        }
363        self.sources.push(RegisteredSource {
364            name,
365            connector,
366            supports_replay,
367        });
368    }
369
370    /// Registers a sink connector for checkpoint coordination.
371    pub(crate) fn register_sink(
372        &mut self,
373        name: impl Into<String>,
374        handle: crate::sink_task::SinkTaskHandle,
375        exactly_once: bool,
376    ) {
377        self.sinks.push(RegisteredSink {
378            name: name.into(),
379            handle,
380            exactly_once,
381        });
382    }
383
384    /// Begins the initial epoch on all exactly-once sinks.
385    ///
386    /// Must be called once after all sinks are registered and before any
387    /// writes occur. This starts the first Kafka transaction for exactly-once
388    /// sinks. Subsequent epochs are started automatically after each
389    /// successful checkpoint commit.
390    ///
391    /// # Errors
392    ///
393    /// Returns the first error from any sink that fails to begin the epoch.
394    pub async fn begin_initial_epoch(&self) -> Result<(), DbError> {
395        self.begin_epoch_for_sinks(self.epoch).await
396    }
397
398    /// Begins an epoch on all exactly-once sinks. If any sink fails,
399    /// rolls back sinks that already started the epoch.
400    async fn begin_epoch_for_sinks(&self, epoch: u64) -> Result<(), DbError> {
401        let mut started: Vec<&RegisteredSink> = Vec::new();
402        for sink in &self.sinks {
403            if sink.exactly_once {
404                match sink.handle.begin_epoch(epoch).await {
405                    Ok(()) => {
406                        started.push(sink);
407                        debug!(sink = %sink.name, epoch, "began epoch");
408                    }
409                    Err(e) => {
410                        // Roll back sinks that already started.
411                        for s in &started {
412                            s.handle.rollback_epoch(epoch).await;
413                        }
414                        return Err(DbError::Checkpoint(format!(
415                            "sink '{}' failed to begin epoch {epoch}: {e}",
416                            sink.name
417                        )));
418                    }
419                }
420            }
421        }
422        Ok(())
423    }
424
425    /// Registers a reference table source connector.
426    pub fn register_table_source(
427        &mut self,
428        name: impl Into<String>,
429        connector: Arc<tokio::sync::Mutex<Box<dyn SourceConnector>>>,
430    ) {
431        self.table_sources.push(RegisteredSource {
432            name: name.into(),
433            connector,
434            supports_replay: true, // Table sources are always replayable
435        });
436    }
437
438    // ── Observability ──
439
440    /// Sets the shared pipeline counters for checkpoint metrics emission.
441    ///
442    /// When set, checkpoint completion and failure will update the counters
443    /// automatically.
444    pub fn set_counters(&mut self, counters: Arc<PipelineCounters>) {
445        self.counters = Some(counters);
446    }
447
448    /// Emits checkpoint metrics to the shared counters.
449    fn emit_checkpoint_metrics(&self, success: bool, epoch: u64, duration: Duration) {
450        if let Some(ref counters) = self.counters {
451            if success {
452                counters
453                    .checkpoints_completed
454                    .fetch_add(1, Ordering::Relaxed);
455            } else {
456                counters.checkpoints_failed.fetch_add(1, Ordering::Relaxed);
457            }
458            #[allow(clippy::cast_possible_truncation)]
459            counters
460                .last_checkpoint_duration_ms
461                .store(duration.as_millis() as u64, Ordering::Relaxed);
462            counters.checkpoint_epoch.store(epoch, Ordering::Relaxed);
463        }
464    }
465
466    // ── WAL coordination ──
467
468    /// Registers a per-core WAL manager for checkpoint coordination.
469    ///
470    /// When registered, [`prepare_wal_for_checkpoint()`](Self::prepare_wal_for_checkpoint)
471    /// will write epoch barriers and sync all segments, and
472    /// [`truncate_wal_after_checkpoint()`](Self::truncate_wal_after_checkpoint)
473    /// will reset all segments.
474    pub fn register_wal_manager(&mut self, wal_manager: PerCoreWalManager) {
475        self.wal_manager = Some(wal_manager);
476    }
477
478    /// Registers a changelog drainer to flush before checkpointing.
479    ///
480    /// Multiple drainers may be registered (one per core or per state store).
481    /// All are flushed during
482    /// [`prepare_wal_for_checkpoint()`](Self::prepare_wal_for_checkpoint).
483    pub fn register_changelog_drainer(&mut self, drainer: ChangelogDrainer) {
484        self.changelog_drainers.push(drainer);
485    }
486
487    /// Prepares the WAL for a checkpoint.
488    ///
489    /// 1. Flushes all registered [`ChangelogDrainer`] instances (Ring 1 → Ring 0 catchup)
490    /// 2. Writes epoch barriers to all per-core WAL segments
491    /// 3. Syncs all WAL segments (`fdatasync`)
492    /// 4. Records and returns per-core WAL positions
493    ///
494    /// Call this **before** [`checkpoint()`](Self::checkpoint) and pass the returned
495    /// positions into that method.
496    ///
497    /// # Errors
498    ///
499    /// Returns `DbError::Checkpoint` if WAL operations fail.
500    pub fn prepare_wal_for_checkpoint(&mut self) -> Result<WalPrepareResult, DbError> {
501        // Step 1: Flush changelog drainers
502        let mut total_drained: u64 = 0;
503        for drainer in &mut self.changelog_drainers {
504            let count = drainer.drain();
505            total_drained += count as u64;
506            debug!(
507                drained = count,
508                pending = drainer.pending_count(),
509                "changelog drainer flushed"
510            );
511        }
512
513        // Step 2-4: WAL epoch barriers + sync + positions
514        let per_core_wal_positions = if let Some(ref mut wal) = self.wal_manager {
515            let epoch = wal.advance_epoch();
516            wal.set_epoch_all(epoch);
517
518            wal.write_epoch_barrier_all()
519                .map_err(|e| DbError::Checkpoint(format!("WAL epoch barrier failed: {e}")))?;
520
521            wal.sync_all()
522                .map_err(|e| DbError::Checkpoint(format!("WAL sync failed: {e}")))?;
523
524            // Use synced positions — only durable data is safe for the manifest.
525            let positions = wal.synced_positions();
526            debug!(epoch, positions = ?positions, "WAL prepared for checkpoint");
527            positions
528        } else {
529            Vec::new()
530        };
531
532        Ok(WalPrepareResult {
533            per_core_wal_positions,
534            entries_drained: total_drained,
535        })
536    }
537
538    /// Truncates (resets) all per-core WAL segments after a successful checkpoint.
539    ///
540    /// Call this **after** [`checkpoint()`](Self::checkpoint) returns success.
541    /// WAL data before the checkpoint position is no longer needed.
542    ///
543    /// # Errors
544    ///
545    /// Returns `DbError::Checkpoint` if truncation fails.
546    pub fn truncate_wal_after_checkpoint(
547        &mut self,
548        current_positions: Vec<u64>,
549    ) -> Result<(), DbError> {
550        if let Some(ref mut wal) = self.wal_manager {
551            if let Some(ref prev) = self.previous_wal_positions {
552                // Truncate to previous checkpoint's positions, keeping one
553                // checkpoint's worth of WAL data as a safety buffer.
554                wal.truncate_all(prev)
555                    .map_err(|e| DbError::Checkpoint(format!("WAL truncation failed: {e}")))?;
556                debug!("WAL segments truncated to previous checkpoint positions");
557            } else {
558                // First checkpoint — no previous positions, truncate to 0.
559                wal.reset_all()
560                    .map_err(|e| DbError::Checkpoint(format!("WAL truncation failed: {e}")))?;
561                debug!("WAL segments reset after first checkpoint");
562            }
563        }
564        self.previous_wal_positions = Some(current_positions);
565        Ok(())
566    }
567
568    /// Returns a reference to the registered WAL manager, if any.
569    #[must_use]
570    pub fn wal_manager(&self) -> Option<&PerCoreWalManager> {
571        self.wal_manager.as_ref()
572    }
573
574    /// Returns a mutable reference to the registered WAL manager, if any.
575    pub fn wal_manager_mut(&mut self) -> Option<&mut PerCoreWalManager> {
576        self.wal_manager.as_mut()
577    }
578
579    /// Returns a slice of registered changelog drainers.
580    #[must_use]
581    pub fn changelog_drainers(&self) -> &[ChangelogDrainer] {
582        &self.changelog_drainers
583    }
584
585    /// Returns a mutable slice of registered changelog drainers.
586    pub fn changelog_drainers_mut(&mut self) -> &mut [ChangelogDrainer] {
587        &mut self.changelog_drainers
588    }
589
590    /// Safety valve: clears changelog drainer buffers if any exceeds the cap.
591    ///
592    /// Called on checkpoint failure to prevent unbounded memory growth when
593    /// checkpoints fail repeatedly. Under normal operation, `clear_pending()`
594    /// is called on the success path in `checkpoint_inner()`.
595    fn maybe_cap_drainers(&mut self) {
596        let cap = self.config.max_pending_changelog_entries;
597        let needs_clear = self
598            .changelog_drainers
599            .iter()
600            .any(|d| d.pending_count() > cap);
601        if needs_clear {
602            let total: usize = self
603                .changelog_drainers
604                .iter()
605                .map(ChangelogDrainer::pending_count)
606                .sum();
607            warn!(
608                total_pending = total,
609                cap,
610                "[LDB-6010] changelog drainer exceeded cap after checkpoint failure — \
611                 clearing to prevent OOM"
612            );
613            for drainer in &mut self.changelog_drainers {
614                drainer.clear_pending();
615            }
616        }
617    }
618
619    /// Performs a full checkpoint cycle (steps 3-7).
620    ///
621    /// Steps 1-2 (barrier propagation + operator snapshots) are handled
622    /// externally by the DAG executor and passed in as `operator_states`.
623    ///
624    /// # Arguments
625    ///
626    /// * `operator_states` — serialized operator state from `DagCheckpointCoordinator`
627    /// * `watermark` — current global watermark
628    /// * `table_store_checkpoint_path` — table store checkpoint path
629    /// * `source_watermarks` — per-source watermarks
630    /// * `pipeline_hash` — pipeline identity hash
631    ///
632    /// # Errors
633    ///
634    /// Returns `DbError::Checkpoint` if any phase fails.
635    #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
636    pub async fn checkpoint(
637        &mut self,
638        operator_states: HashMap<String, Vec<u8>>,
639        watermark: Option<i64>,
640        table_store_checkpoint_path: Option<String>,
641        source_watermarks: HashMap<String, i64>,
642        pipeline_hash: Option<u64>,
643    ) -> Result<CheckpointResult, DbError> {
644        self.checkpoint_inner(
645            operator_states,
646            watermark,
647            table_store_checkpoint_path,
648            HashMap::new(),
649            source_watermarks,
650            pipeline_hash,
651            HashMap::new(),
652        )
653        .await
654    }
655
656    /// Snapshots all registered source connectors concurrently.
657    ///
658    /// Uses `join_all` to lock and checkpoint each source in parallel rather
659    /// than sequentially, reducing snapshot latency proportional to source count.
660    async fn snapshot_sources(
661        &self,
662        sources: &[RegisteredSource],
663    ) -> Result<HashMap<String, ConnectorCheckpoint>, DbError> {
664        use futures::future::join_all;
665
666        let futs = sources.iter().map(|source| {
667            let connector = Arc::clone(&source.connector);
668            let name = source.name.clone();
669            async move {
670                let guard = connector.lock().await;
671                let cp = guard.checkpoint();
672                let result = source_to_connector_checkpoint(&cp);
673                debug!(source = %name, epoch = cp.epoch(), "source snapshotted");
674                (name, result)
675            }
676        });
677
678        let results = join_all(futs).await;
679        Ok(results.into_iter().collect())
680    }
681
682    /// Like `snapshot_sources` but takes a slice of references (for filtering).
683    async fn snapshot_source_refs(
684        &self,
685        sources: &[&RegisteredSource],
686    ) -> Result<HashMap<String, ConnectorCheckpoint>, DbError> {
687        use futures::future::join_all;
688
689        let futs = sources.iter().map(|source| {
690            let connector = Arc::clone(&source.connector);
691            let name = source.name.clone();
692            async move {
693                let guard = connector.lock().await;
694                let cp = guard.checkpoint();
695                let result = source_to_connector_checkpoint(&cp);
696                debug!(source = %name, epoch = cp.epoch(), "source snapshotted");
697                (name, result)
698            }
699        });
700
701        let results = join_all(futs).await;
702        Ok(results.into_iter().collect())
703    }
704
705    /// Pre-commits all exactly-once sinks (phase 1) with a timeout.
706    ///
707    /// A stuck sink will not block checkpointing indefinitely. The timeout
708    /// is configured via [`CheckpointConfig::pre_commit_timeout`].
709    async fn pre_commit_sinks(&self, epoch: u64) -> Result<(), DbError> {
710        let timeout_dur = self.config.pre_commit_timeout;
711
712        match tokio::time::timeout(timeout_dur, self.pre_commit_sinks_inner(epoch)).await {
713            Ok(result) => result,
714            Err(_elapsed) => Err(DbError::Checkpoint(format!(
715                "pre-commit timed out after {}s",
716                timeout_dur.as_secs()
717            ))),
718        }
719    }
720
721    /// Inner pre-commit loop (no timeout).
722    ///
723    /// Only sinks with `exactly_once = true` participate in two-phase commit.
724    /// At-most-once sinks are skipped — they receive no `pre_commit`/`commit`
725    /// calls and provide no transactional guarantees.
726    async fn pre_commit_sinks_inner(&self, epoch: u64) -> Result<(), DbError> {
727        for sink in &self.sinks {
728            if sink.exactly_once {
729                sink.handle.pre_commit(epoch).await.map_err(|e| {
730                    DbError::Checkpoint(format!("sink '{}' pre-commit failed: {e}", sink.name))
731                })?;
732                debug!(sink = %sink.name, epoch, "sink pre-committed");
733            }
734        }
735        Ok(())
736    }
737
738    /// Commits all exactly-once sinks with per-sink status tracking.
739    ///
740    /// Returns a map of sink name → commit status. Sinks that committed
741    /// successfully are `Committed`; failures are `Failed(message)`.
742    /// All sinks are attempted even if some fail.
743    ///
744    /// Bounded by [`CheckpointConfig::commit_timeout`] to prevent a stuck
745    /// sink from blocking checkpoint completion indefinitely.
746    async fn commit_sinks_tracked(&self, epoch: u64) -> HashMap<String, SinkCommitStatus> {
747        let timeout_dur = self.config.commit_timeout;
748
749        match tokio::time::timeout(timeout_dur, self.commit_sinks_inner(epoch)).await {
750            Ok(statuses) => statuses,
751            Err(_elapsed) => {
752                error!(
753                    epoch,
754                    timeout_secs = timeout_dur.as_secs(),
755                    "[LDB-6012] sink commit timed out — marking all pending sinks as failed"
756                );
757                self.sinks
758                    .iter()
759                    .filter(|s| s.exactly_once)
760                    .map(|s| {
761                        (
762                            s.name.clone(),
763                            SinkCommitStatus::Failed(format!(
764                                "sink '{}' commit timed out after {}s",
765                                s.name,
766                                timeout_dur.as_secs()
767                            )),
768                        )
769                    })
770                    .collect()
771            }
772        }
773    }
774
775    /// Inner commit loop (no timeout).
776    async fn commit_sinks_inner(&self, epoch: u64) -> HashMap<String, SinkCommitStatus> {
777        let mut statuses = HashMap::with_capacity(self.sinks.len());
778
779        for sink in &self.sinks {
780            if sink.exactly_once {
781                match sink.handle.commit_epoch(epoch).await {
782                    Ok(()) => {
783                        statuses.insert(sink.name.clone(), SinkCommitStatus::Committed);
784                        debug!(sink = %sink.name, epoch, "sink committed");
785                    }
786                    Err(e) => {
787                        let msg = format!("sink '{}' commit failed: {e}", sink.name);
788                        error!(sink = %sink.name, epoch, error = %e, "sink commit failed");
789                        statuses.insert(sink.name.clone(), SinkCommitStatus::Failed(msg));
790                    }
791                }
792            }
793        }
794
795        statuses
796    }
797
798    /// Saves a manifest to the checkpoint store (blocking I/O on a task).
799    ///
800    /// Uses [`CheckpointStore::save_with_state`] to write optional sidecar
801    /// data **before** the manifest, ensuring atomicity: if the sidecar write
802    /// fails, the manifest is never persisted.
803    ///
804    /// Takes `manifest` by value to avoid cloning on the common path.
805    /// Callers that need the manifest after save should clone before calling.
806    ///
807    /// Bounded by [`CheckpointConfig::persist_timeout`] to prevent a hung
808    /// filesystem from stalling the runtime indefinitely.
809    async fn save_manifest(
810        &self,
811        manifest: CheckpointManifest,
812        state_data: Option<Vec<u8>>,
813    ) -> Result<(), DbError> {
814        let store = Arc::clone(&self.store);
815        let timeout_dur = self.config.persist_timeout;
816
817        let task = tokio::task::spawn_blocking(move || {
818            store.save_with_state(&manifest, state_data.as_deref())
819        });
820
821        match tokio::time::timeout(timeout_dur, task).await {
822            Ok(Ok(Ok(()))) => Ok(()),
823            Ok(Ok(Err(e))) => Err(DbError::Checkpoint(format!("manifest persist failed: {e}"))),
824            Ok(Err(join_err)) => Err(DbError::Checkpoint(format!(
825                "manifest persist task failed: {join_err}"
826            ))),
827            Err(_elapsed) => Err(DbError::Checkpoint(format!(
828                "[LDB-6011] manifest persist timed out after {}s — \
829                 filesystem may be degraded",
830                timeout_dur.as_secs()
831            ))),
832        }
833    }
834
835    /// Overwrites an existing manifest with updated fields (e.g., sink commit
836    /// statuses after Step 6). Uses [`CheckpointStore::update_manifest`] which
837    /// does NOT use conditional PUT, so the overwrite always succeeds.
838    async fn update_manifest_only(&self, manifest: &CheckpointManifest) -> Result<(), DbError> {
839        let store = Arc::clone(&self.store);
840        let manifest = manifest.clone();
841        let timeout_dur = self.config.persist_timeout;
842
843        let task = tokio::task::spawn_blocking(move || store.update_manifest(&manifest));
844
845        match tokio::time::timeout(timeout_dur, task).await {
846            Ok(Ok(Ok(()))) => Ok(()),
847            Ok(Ok(Err(e))) => Err(DbError::Checkpoint(format!("manifest update failed: {e}"))),
848            Ok(Err(join_err)) => Err(DbError::Checkpoint(format!(
849                "manifest update task failed: {join_err}"
850            ))),
851            Err(_elapsed) => Err(DbError::Checkpoint(format!(
852                "manifest update timed out after {}s",
853                timeout_dur.as_secs()
854            ))),
855        }
856    }
857
858    /// Returns initial sink commit statuses (all `Pending`) for the manifest.
859    fn initial_sink_commit_statuses(&self) -> HashMap<String, SinkCommitStatus> {
860        self.sinks
861            .iter()
862            .filter(|s| s.exactly_once)
863            .map(|s| (s.name.clone(), SinkCommitStatus::Pending))
864            .collect()
865    }
866
867    /// Packs operator states into a manifest with optional sidecar blob.
868    ///
869    /// States larger than `threshold` are stored in a sidecar blob rather
870    /// than base64-inlined in the JSON manifest.
871    fn pack_operator_states(
872        manifest: &mut CheckpointManifest,
873        operator_states: &HashMap<String, Vec<u8>>,
874        threshold: usize,
875    ) -> Option<Vec<u8>> {
876        let mut sidecar_blobs: Vec<u8> = Vec::new();
877        for (name, data) in operator_states {
878            let (op_ckpt, maybe_blob) =
879                laminar_storage::checkpoint_manifest::OperatorCheckpoint::from_bytes(
880                    data,
881                    threshold,
882                    sidecar_blobs.len() as u64,
883                );
884            if let Some(blob) = maybe_blob {
885                sidecar_blobs.extend_from_slice(&blob);
886            }
887            manifest.operator_states.insert(name.clone(), op_ckpt);
888        }
889
890        if sidecar_blobs.is_empty() {
891            None
892        } else {
893            Some(sidecar_blobs)
894        }
895    }
896
897    /// Rolls back all exactly-once sinks.
898    async fn rollback_sinks(&self, epoch: u64) -> Result<(), DbError> {
899        for sink in &self.sinks {
900            if sink.exactly_once {
901                sink.handle.rollback_epoch(epoch).await;
902            }
903        }
904        Ok(())
905    }
906
907    /// Collects the last committed epoch from each sink.
908    fn collect_sink_epochs(&self) -> HashMap<String, u64> {
909        let mut epochs = HashMap::with_capacity(self.sinks.len());
910        for sink in &self.sinks {
911            // The epoch being committed is the current one
912            if sink.exactly_once {
913                epochs.insert(sink.name.clone(), self.epoch);
914            }
915        }
916        epochs
917    }
918
919    /// Returns sorted source names for topology tracking in the manifest.
920    fn sorted_source_names(&self) -> Vec<String> {
921        let mut names: Vec<String> = self.sources.iter().map(|s| s.name.clone()).collect();
922        names.sort();
923        names
924    }
925
926    /// Returns sorted sink names for topology tracking in the manifest.
927    fn sorted_sink_names(&self) -> Vec<String> {
928        let mut names: Vec<String> = self.sinks.iter().map(|s| s.name.clone()).collect();
929        names.sort();
930        names
931    }
932
933    /// Returns the current phase.
934    #[must_use]
935    pub fn phase(&self) -> CheckpointPhase {
936        self.phase
937    }
938
939    /// Returns the current epoch.
940    #[must_use]
941    pub fn epoch(&self) -> u64 {
942        self.epoch
943    }
944
945    /// Returns the registered sources (for pre-capture by the callback).
946    #[must_use]
947    pub(crate) fn registered_sources(&self) -> &[RegisteredSource] {
948        &self.sources
949    }
950
951    /// Returns the next checkpoint ID.
952    #[must_use]
953    pub fn next_checkpoint_id(&self) -> u64 {
954        self.next_checkpoint_id
955    }
956
957    /// Returns the checkpoint config.
958    #[must_use]
959    pub fn config(&self) -> &CheckpointConfig {
960        &self.config
961    }
962
963    /// Adjusts the checkpoint interval based on observed durations.
964    ///
965    /// Uses an exponential moving average (EMA) of checkpoint durations to
966    /// compute a new interval: `interval = smoothed_duration / target_ratio`,
967    /// clamped to `[min_interval, max_interval]`.
968    ///
969    /// No-op if adaptive checkpointing is not configured.
970    fn adjust_interval(&mut self) {
971        let adaptive = match &self.config.adaptive {
972            Some(a) => a.clone(),
973            None => return,
974        };
975
976        #[allow(clippy::cast_precision_loss)] // Checkpoint durations are << 2^52 ms
977        let last_ms = match self.last_checkpoint_duration {
978            Some(d) => d.as_millis() as f64,
979            None => return,
980        };
981
982        // Update EMA
983        if self.smoothed_duration_ms == 0.0 {
984            self.smoothed_duration_ms = last_ms;
985        } else {
986            self.smoothed_duration_ms = adaptive.smoothing_alpha * last_ms
987                + (1.0 - adaptive.smoothing_alpha) * self.smoothed_duration_ms;
988        }
989
990        // Compute target interval: smoothed_ms / (1000 * ratio)
991        let new_interval_secs =
992            self.smoothed_duration_ms / (1000.0 * adaptive.target_overhead_ratio);
993        let new_interval = Duration::from_secs_f64(new_interval_secs);
994
995        // Clamp to bounds
996        let clamped = new_interval.clamp(adaptive.min_interval, adaptive.max_interval);
997
998        let old_interval = self.config.interval;
999        self.config.interval = Some(clamped);
1000
1001        if old_interval != Some(clamped) {
1002            debug!(
1003                old_interval_ms = old_interval.map(|d| d.as_millis()),
1004                new_interval_ms = clamped.as_millis(),
1005                smoothed_duration_ms = self.smoothed_duration_ms,
1006                "adaptive checkpoint interval adjusted"
1007            );
1008        }
1009    }
1010
1011    /// Returns the current smoothed checkpoint duration (milliseconds).
1012    ///
1013    /// Returns 0.0 if no checkpoints have been completed or adaptive mode
1014    /// is not enabled.
1015    #[must_use]
1016    pub fn smoothed_duration_ms(&self) -> f64 {
1017        self.smoothed_duration_ms
1018    }
1019
1020    /// Returns the number of unaligned checkpoints completed.
1021    #[must_use]
1022    pub fn unaligned_checkpoint_count(&self) -> u64 {
1023        self.unaligned_checkpoint_count
1024    }
1025
1026    /// Returns checkpoint statistics.
1027    #[must_use]
1028    pub fn stats(&self) -> CheckpointStats {
1029        let (p50, p95, p99) = self.duration_histogram.percentiles();
1030        CheckpointStats {
1031            completed: self.checkpoints_completed,
1032            failed: self.checkpoints_failed,
1033            last_duration: self.last_checkpoint_duration,
1034            duration_p50_ms: p50,
1035            duration_p95_ms: p95,
1036            duration_p99_ms: p99,
1037            total_bytes_written: self.total_bytes_written,
1038            current_phase: self.phase,
1039            current_epoch: self.epoch,
1040            unaligned_checkpoint_count: self.unaligned_checkpoint_count,
1041        }
1042    }
1043
1044    /// Returns a reference to the underlying store.
1045    #[must_use]
1046    pub fn store(&self) -> &dyn CheckpointStore {
1047        &*self.store
1048    }
1049
1050    /// Performs a full checkpoint cycle with additional table offsets.
1051    ///
1052    /// Identical to [`checkpoint()`](Self::checkpoint) but merges
1053    /// `extra_table_offsets` into the manifest's `table_offsets` field.
1054    /// This is useful for `ReferenceTableSource` instances that are not
1055    /// registered as `SourceConnector` but still need their offsets persisted.
1056    ///
1057    /// # Errors
1058    ///
1059    /// Returns `DbError::Checkpoint` if any phase fails.
1060    #[allow(clippy::too_many_arguments)]
1061    pub async fn checkpoint_with_extra_tables(
1062        &mut self,
1063        operator_states: HashMap<String, Vec<u8>>,
1064        watermark: Option<i64>,
1065        table_store_checkpoint_path: Option<String>,
1066        extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
1067        source_watermarks: HashMap<String, i64>,
1068        pipeline_hash: Option<u64>,
1069    ) -> Result<CheckpointResult, DbError> {
1070        self.checkpoint_inner(
1071            operator_states,
1072            watermark,
1073            table_store_checkpoint_path,
1074            extra_table_offsets,
1075            source_watermarks,
1076            pipeline_hash,
1077            HashMap::new(),
1078        )
1079        .await
1080    }
1081
1082    /// Performs a full checkpoint with pre-captured source offsets.
1083    ///
1084    /// When `source_offset_overrides` is non-empty, those sources skip the
1085    /// live `snapshot_sources()` call and use the provided offsets instead.
1086    /// This is essential for barrier-aligned checkpoints where source
1087    /// positions must match the operator state at the barrier point.
1088    ///
1089    /// # Errors
1090    ///
1091    /// Returns `DbError::Checkpoint` if any phase fails.
1092    #[allow(clippy::too_many_arguments)]
1093    pub async fn checkpoint_with_offsets(
1094        &mut self,
1095        operator_states: HashMap<String, Vec<u8>>,
1096        watermark: Option<i64>,
1097        table_store_checkpoint_path: Option<String>,
1098        extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
1099        source_watermarks: HashMap<String, i64>,
1100        pipeline_hash: Option<u64>,
1101        source_offset_overrides: HashMap<String, ConnectorCheckpoint>,
1102    ) -> Result<CheckpointResult, DbError> {
1103        self.checkpoint_inner(
1104            operator_states,
1105            watermark,
1106            table_store_checkpoint_path,
1107            extra_table_offsets,
1108            source_watermarks,
1109            pipeline_hash,
1110            source_offset_overrides,
1111        )
1112        .await
1113    }
1114
1115    /// Shared checkpoint implementation for all checkpoint entry points.
1116    ///
1117    /// WAL preparation is performed internally to guarantee atomicity:
1118    /// changelog drain + epoch barrier + WAL sync happen after operator
1119    /// states are received, so the WAL positions in the manifest always
1120    /// reflect the state after the operator snapshot.
1121    ///
1122    /// When `source_offset_overrides` is non-empty, those sources use the
1123    /// provided offsets instead of calling `snapshot_sources()`. This ensures
1124    /// barrier-aligned and pre-captured offsets are used atomically.
1125    #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1126    async fn checkpoint_inner(
1127        &mut self,
1128        operator_states: HashMap<String, Vec<u8>>,
1129        watermark: Option<i64>,
1130        table_store_checkpoint_path: Option<String>,
1131        extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
1132        source_watermarks: HashMap<String, i64>,
1133        pipeline_hash: Option<u64>,
1134        source_offset_overrides: HashMap<String, ConnectorCheckpoint>,
1135    ) -> Result<CheckpointResult, DbError> {
1136        let start = Instant::now();
1137        let checkpoint_id = self.next_checkpoint_id;
1138        let epoch = self.epoch;
1139
1140        info!(checkpoint_id, epoch, "starting checkpoint");
1141
1142        // ── Step 2b: WAL preparation (internal) ──
1143        // Drain changelog, write epoch barriers, sync WAL, capture positions.
1144        // This MUST happen after operator states are received (passed as args)
1145        // to guarantee WAL positions reflect post-snapshot state.
1146        let wal_result = self.prepare_wal_for_checkpoint()?;
1147        let per_core_wal_positions = wal_result.per_core_wal_positions;
1148
1149        // ── Step 3: Source snapshot (parallel) ──
1150        self.phase = CheckpointPhase::Snapshotting;
1151
1152        // Only live-snapshot sources that don't have pre-captured overrides.
1153        // Sources with overrides were captured at a consistent point (barrier
1154        // alignment or pre-spawn) and must not be re-queried.
1155        let sources_to_snapshot: Vec<&RegisteredSource> = self
1156            .sources
1157            .iter()
1158            .filter(|s| !source_offset_overrides.contains_key(&s.name))
1159            .collect();
1160        let (mut source_offsets, mut table_offsets) = tokio::try_join!(
1161            self.snapshot_source_refs(&sources_to_snapshot),
1162            self.snapshot_sources(&self.table_sources),
1163        )?;
1164
1165        // Merge pre-captured source offset overrides into source_offsets.
1166        for (name, cp) in source_offset_overrides {
1167            source_offsets.insert(name, cp);
1168        }
1169
1170        // Merge extra table offsets (from ReferenceTableSource instances).
1171        for (name, cp) in extra_table_offsets {
1172            table_offsets.insert(name, cp);
1173        }
1174
1175        // ── Step 4: Sink pre-commit ──
1176        self.phase = CheckpointPhase::PreCommitting;
1177        if let Err(e) = self.pre_commit_sinks(epoch).await {
1178            self.phase = CheckpointPhase::Idle;
1179            self.checkpoints_failed += 1;
1180            self.maybe_cap_drainers();
1181            let duration = start.elapsed();
1182            self.emit_checkpoint_metrics(false, epoch, duration);
1183            error!(checkpoint_id, epoch, error = %e, "pre-commit failed");
1184            return Ok(CheckpointResult {
1185                success: false,
1186                checkpoint_id,
1187                epoch,
1188                duration,
1189                error: Some(format!("pre-commit failed: {e}")),
1190            });
1191        }
1192
1193        // ── Build manifest ──
1194        let mut manifest = CheckpointManifest::new(checkpoint_id, epoch);
1195        manifest.source_offsets = source_offsets;
1196        manifest.table_offsets = table_offsets;
1197        manifest.sink_epochs = self.collect_sink_epochs();
1198        // Mark all exactly-once sinks as Pending before commit phase.
1199        manifest.sink_commit_statuses = self.initial_sink_commit_statuses();
1200        manifest.watermark = watermark;
1201        // Use caller-provided per-source watermarks if available. When empty,
1202        // leave source_watermarks empty — recovery falls back to the global
1203        // manifest.watermark. Do NOT fabricate per-source values from the
1204        // global watermark, as that loses granularity on recovery.
1205        manifest.source_watermarks = source_watermarks;
1206        // wal_position is legacy (single-writer mode); per-core positions are authoritative.
1207        manifest.per_core_wal_positions = per_core_wal_positions;
1208        manifest.table_store_checkpoint_path = table_store_checkpoint_path;
1209        manifest.is_incremental = self.config.incremental;
1210        manifest.source_names = self.sorted_source_names();
1211        manifest.sink_names = self.sorted_sink_names();
1212        manifest.pipeline_hash = pipeline_hash;
1213
1214        let state_data = Self::pack_operator_states(
1215            &mut manifest,
1216            &operator_states,
1217            self.config.state_inline_threshold,
1218        );
1219        let sidecar_bytes = state_data.as_ref().map_or(0, Vec::len);
1220        if sidecar_bytes > 0 {
1221            debug!(
1222                checkpoint_id,
1223                sidecar_bytes, "writing operator state sidecar"
1224            );
1225        }
1226
1227        // ── Step 4b: Checkpoint size check ──
1228        if let Some(cap) = self.config.max_checkpoint_bytes {
1229            if sidecar_bytes > cap {
1230                self.phase = CheckpointPhase::Idle;
1231                self.checkpoints_failed += 1;
1232                self.maybe_cap_drainers();
1233                let duration = start.elapsed();
1234                self.emit_checkpoint_metrics(false, epoch, duration);
1235                let msg = format!(
1236                    "[LDB-6014] checkpoint size {sidecar_bytes} bytes exceeds \
1237                     cap {cap} bytes — checkpoint rejected"
1238                );
1239                error!(checkpoint_id, epoch, sidecar_bytes, cap, "{msg}");
1240                return Ok(CheckpointResult {
1241                    success: false,
1242                    checkpoint_id,
1243                    epoch,
1244                    duration,
1245                    error: Some(msg),
1246                });
1247            }
1248            let warn_threshold = cap * 4 / 5; // 80%
1249            if sidecar_bytes > warn_threshold {
1250                warn!(
1251                    checkpoint_id,
1252                    epoch, sidecar_bytes, cap, "checkpoint size approaching cap (>80%)"
1253                );
1254            }
1255        }
1256        // Track cumulative bytes written (updated after successful persist below).
1257        let checkpoint_bytes = sidecar_bytes as u64;
1258
1259        // ── Step 5: Persist manifest (decision record — sinks are Pending) ──
1260        self.phase = CheckpointPhase::Persisting;
1261        if let Err(e) = self.save_manifest(manifest.clone(), state_data).await {
1262            self.phase = CheckpointPhase::Idle;
1263            self.checkpoints_failed += 1;
1264            self.maybe_cap_drainers();
1265            let duration = start.elapsed();
1266            self.emit_checkpoint_metrics(false, epoch, duration);
1267            if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1268                error!(
1269                    checkpoint_id,
1270                    epoch,
1271                    error = %rollback_err,
1272                    "[LDB-6004] sink rollback failed after manifest persist failure — \
1273                     sinks may be in an inconsistent state"
1274                );
1275            }
1276            error!(checkpoint_id, epoch, error = %e, "[LDB-6008] manifest persist failed");
1277            return Ok(CheckpointResult {
1278                success: false,
1279                checkpoint_id,
1280                epoch,
1281                duration,
1282                error: Some(format!("manifest persist failed: {e}")),
1283            });
1284        }
1285
1286        // ── Step 6: Sink commit (per-sink tracking) ──
1287        self.phase = CheckpointPhase::Committing;
1288        let sink_statuses = self.commit_sinks_tracked(epoch).await;
1289        let has_failures = sink_statuses
1290            .values()
1291            .any(|s| matches!(s, SinkCommitStatus::Failed(_)));
1292
1293        // ── Step 6b: Overwrite manifest with final sink commit statuses ──
1294        if !sink_statuses.is_empty() {
1295            manifest.sink_commit_statuses = sink_statuses;
1296            if let Err(e) = self.update_manifest_only(&manifest).await {
1297                warn!(
1298                    checkpoint_id,
1299                    epoch,
1300                    error = %e,
1301                    "post-commit manifest update failed"
1302                );
1303            }
1304        }
1305
1306        if has_failures {
1307            self.checkpoints_failed += 1;
1308            error!(
1309                checkpoint_id,
1310                epoch, "sink commit partially failed — epoch NOT advanced, will retry"
1311            );
1312            self.phase = CheckpointPhase::Idle;
1313            let duration = start.elapsed();
1314            self.emit_checkpoint_metrics(false, epoch, duration);
1315            return Ok(CheckpointResult {
1316                success: false,
1317                checkpoint_id,
1318                epoch,
1319                duration,
1320                error: Some("partial sink commit failure".into()),
1321            });
1322        }
1323
1324        // ── Step 7: Clear changelog drainer pending buffers ──
1325        // Entries are metadata-only (key_hash + mmap_offset) used for SPSC
1326        // backpressure relief. The checkpoint captured full state, so the
1327        // metadata is no longer needed. Clearing prevents unbounded growth.
1328        for drainer in &mut self.changelog_drainers {
1329            drainer.clear_pending();
1330        }
1331
1332        // ── Success ──
1333        self.phase = CheckpointPhase::Idle;
1334        self.next_checkpoint_id += 1;
1335        self.epoch += 1;
1336        self.checkpoints_completed += 1;
1337        self.total_bytes_written += checkpoint_bytes;
1338        let duration = start.elapsed();
1339        self.last_checkpoint_duration = Some(duration);
1340        self.duration_histogram.record(duration);
1341        self.emit_checkpoint_metrics(true, epoch, duration);
1342        self.adjust_interval();
1343
1344        // ── Step 8: Begin next epoch on exactly-once sinks ──
1345        let next_epoch = self.epoch;
1346        let begin_epoch_error = match self.begin_epoch_for_sinks(next_epoch).await {
1347            Ok(()) => None,
1348            Err(e) => {
1349                error!(
1350                    next_epoch,
1351                    error = %e,
1352                    "[LDB-6015] failed to begin next epoch — writes will be non-transactional"
1353                );
1354                Some(e.to_string())
1355            }
1356        };
1357
1358        info!(
1359            checkpoint_id,
1360            epoch,
1361            duration_ms = duration.as_millis(),
1362            "checkpoint completed"
1363        );
1364
1365        // The checkpoint itself succeeded (state persisted, sinks committed).
1366        // begin_epoch failure for the *next* epoch is reported as a warning
1367        // but does not retroactively fail the completed checkpoint.
1368        Ok(CheckpointResult {
1369            success: true,
1370            checkpoint_id,
1371            epoch,
1372            duration,
1373            error: begin_epoch_error,
1374        })
1375    }
1376
1377    /// Attempts recovery from the latest checkpoint.
1378    ///
1379    /// Creates a [`RecoveryManager`](crate::recovery_manager::RecoveryManager)
1380    /// using the coordinator's store and delegates recovery to it.
1381    /// On success, advances `self.epoch` past the recovered epoch so the
1382    /// next checkpoint gets a fresh epoch number.
1383    ///
1384    /// Returns `Ok(None)` for a fresh start (no checkpoint found).
1385    ///
1386    /// # Errors
1387    ///
1388    /// Returns `DbError::Checkpoint` if the store itself fails.
1389    pub async fn recover(
1390        &mut self,
1391    ) -> Result<Option<crate::recovery_manager::RecoveredState>, DbError> {
1392        use crate::recovery_manager::RecoveryManager;
1393
1394        let mgr = RecoveryManager::new(&*self.store);
1395        let result = mgr
1396            .recover(&self.sources, &self.sinks, &self.table_sources)
1397            .await?;
1398
1399        if let Some(ref recovered) = result {
1400            // Advance epoch past the recovered one
1401            self.epoch = recovered.epoch() + 1;
1402            self.next_checkpoint_id = recovered.manifest.checkpoint_id + 1;
1403            info!(
1404                epoch = self.epoch,
1405                checkpoint_id = self.next_checkpoint_id,
1406                "coordinator epoch set after recovery"
1407            );
1408        }
1409
1410        Ok(result)
1411    }
1412
1413    /// Loads the latest manifest from the store.
1414    ///
1415    /// # Errors
1416    ///
1417    /// Returns `DbError::Checkpoint` on store errors.
1418    pub fn load_latest_manifest(&self) -> Result<Option<CheckpointManifest>, DbError> {
1419        self.store
1420            .load_latest()
1421            .map_err(|e| DbError::Checkpoint(format!("failed to load latest manifest: {e}")))
1422    }
1423}
1424
1425impl std::fmt::Debug for CheckpointCoordinator {
1426    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1427        f.debug_struct("CheckpointCoordinator")
1428            .field("epoch", &self.epoch)
1429            .field("next_checkpoint_id", &self.next_checkpoint_id)
1430            .field("phase", &self.phase)
1431            .field("sources", &self.sources.len())
1432            .field("sinks", &self.sinks.len())
1433            .field("has_wal_manager", &self.wal_manager.is_some())
1434            .field("changelog_drainers", &self.changelog_drainers.len())
1435            .field("completed", &self.checkpoints_completed)
1436            .field("failed", &self.checkpoints_failed)
1437            .finish_non_exhaustive()
1438    }
1439}
1440
1441/// Fixed-size ring buffer for checkpoint duration percentile tracking.
1442///
1443/// Stores the last `CAPACITY` checkpoint durations and computes p50/p95/p99
1444/// via sorted extraction. No heap allocation after construction.
1445#[derive(Clone)]
1446pub struct DurationHistogram {
1447    /// Ring buffer of durations in milliseconds.
1448    samples: Box<[u64; Self::CAPACITY]>,
1449    /// Write cursor (wraps at `CAPACITY`).
1450    cursor: usize,
1451    /// Total samples written (may exceed `CAPACITY`).
1452    count: u64,
1453}
1454
1455impl DurationHistogram {
1456    const CAPACITY: usize = 100;
1457
1458    /// Creates an empty histogram.
1459    #[must_use]
1460    fn new() -> Self {
1461        Self {
1462            samples: Box::new([0; Self::CAPACITY]),
1463            cursor: 0,
1464            count: 0,
1465        }
1466    }
1467
1468    /// Records a checkpoint duration.
1469    fn record(&mut self, duration: Duration) {
1470        #[allow(clippy::cast_possible_truncation)]
1471        let ms = duration.as_millis() as u64;
1472        self.samples[self.cursor] = ms;
1473        self.cursor = (self.cursor + 1) % Self::CAPACITY;
1474        self.count += 1;
1475    }
1476
1477    /// Returns the number of recorded samples (up to `CAPACITY`).
1478    #[must_use]
1479    fn len(&self) -> usize {
1480        if self.count >= Self::CAPACITY as u64 {
1481            Self::CAPACITY
1482        } else {
1483            // SAFETY: count < CAPACITY (100), which always fits in usize.
1484            #[allow(clippy::cast_possible_truncation)]
1485            {
1486                self.count as usize
1487            }
1488        }
1489    }
1490
1491    /// Computes a percentile (0.0–1.0) from recorded samples.
1492    ///
1493    /// Returns 0 if no samples have been recorded.
1494    #[must_use]
1495    fn percentile(&self, p: f64) -> u64 {
1496        let n = self.len();
1497        if n == 0 {
1498            return 0;
1499        }
1500        let mut sorted: Vec<u64> = self.samples[..n].to_vec();
1501        sorted.sort_unstable();
1502        #[allow(
1503            clippy::cast_possible_truncation,
1504            clippy::cast_sign_loss,
1505            clippy::cast_precision_loss
1506        )]
1507        let idx = ((p * (n as f64 - 1.0)).ceil() as usize).min(n - 1);
1508        sorted[idx]
1509    }
1510
1511    /// Returns (p50, p95, p99) in milliseconds.
1512    #[must_use]
1513    fn percentiles(&self) -> (u64, u64, u64) {
1514        (
1515            self.percentile(0.50),
1516            self.percentile(0.95),
1517            self.percentile(0.99),
1518        )
1519    }
1520}
1521
1522impl std::fmt::Debug for DurationHistogram {
1523    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1524        let (p50, p95, p99) = self.percentiles();
1525        f.debug_struct("DurationHistogram")
1526            .field("samples_len", &self.samples.len())
1527            .field("cursor", &self.cursor)
1528            .field("count", &self.count)
1529            .field("p50_ms", &p50)
1530            .field("p95_ms", &p95)
1531            .field("p99_ms", &p99)
1532            .finish()
1533    }
1534}
1535
1536/// Checkpoint performance statistics.
1537#[derive(Debug, Clone, serde::Serialize)]
1538pub struct CheckpointStats {
1539    /// Total completed checkpoints.
1540    pub completed: u64,
1541    /// Total failed checkpoints.
1542    pub failed: u64,
1543    /// Duration of the last checkpoint.
1544    pub last_duration: Option<Duration>,
1545    /// p50 checkpoint duration in milliseconds.
1546    pub duration_p50_ms: u64,
1547    /// p95 checkpoint duration in milliseconds.
1548    pub duration_p95_ms: u64,
1549    /// p99 checkpoint duration in milliseconds.
1550    pub duration_p99_ms: u64,
1551    /// Total bytes written across all checkpoints.
1552    pub total_bytes_written: u64,
1553    /// Current checkpoint phase.
1554    pub current_phase: CheckpointPhase,
1555    /// Current epoch number.
1556    pub current_epoch: u64,
1557    /// Number of checkpoints completed in unaligned mode.
1558    pub unaligned_checkpoint_count: u64,
1559}
1560
1561// ── Conversion helpers ──
1562
1563/// Converts a `SourceCheckpoint` to a `ConnectorCheckpoint`.
1564#[must_use]
1565pub fn source_to_connector_checkpoint(cp: &SourceCheckpoint) -> ConnectorCheckpoint {
1566    ConnectorCheckpoint {
1567        offsets: cp.offsets().clone(),
1568        epoch: cp.epoch(),
1569        metadata: cp.metadata().clone(),
1570    }
1571}
1572
1573/// Converts a `ConnectorCheckpoint` back to a `SourceCheckpoint`.
1574#[must_use]
1575pub fn connector_to_source_checkpoint(cp: &ConnectorCheckpoint) -> SourceCheckpoint {
1576    let mut source_cp = SourceCheckpoint::with_offsets(cp.epoch, cp.offsets.clone());
1577    for (k, v) in &cp.metadata {
1578        source_cp.set_metadata(k.clone(), v.clone());
1579    }
1580    source_cp
1581}
1582
1583// ── DAG operator state conversion helpers ──
1584
1585/// Converts DAG operator states (from `DagCheckpointSnapshot`) to manifest format.
1586///
1587/// Uses `"{node_id}"` as the key and base64-encodes the state data.
1588#[must_use]
1589pub fn dag_snapshot_to_manifest_operators<S: std::hash::BuildHasher>(
1590    node_states: &std::collections::HashMap<
1591        u32,
1592        laminar_core::dag::recovery::SerializableOperatorState,
1593        S,
1594    >,
1595) -> HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint> {
1596    node_states
1597        .iter()
1598        .map(|(id, state)| {
1599            (
1600                id.to_string(),
1601                laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&state.data),
1602            )
1603        })
1604        .collect()
1605}
1606
1607/// Converts manifest operator states back to DAG format for recovery.
1608///
1609/// Parses string keys as node IDs and decodes base64 state data.
1610#[must_use]
1611pub fn manifest_operators_to_dag_states<S: std::hash::BuildHasher>(
1612    operators: &HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint, S>,
1613) -> rustc_hash::FxHashMap<laminar_core::dag::topology::NodeId, laminar_core::operator::OperatorState>
1614{
1615    let mut states =
1616        rustc_hash::FxHashMap::with_capacity_and_hasher(operators.len(), rustc_hash::FxBuildHasher);
1617    for (key, op_ckpt) in operators {
1618        if let Ok(node_id) = key.parse::<u32>() {
1619            if let Some(data) = op_ckpt.decode_inline() {
1620                states.insert(
1621                    laminar_core::dag::topology::NodeId(node_id),
1622                    laminar_core::operator::OperatorState {
1623                        operator_id: key.clone(),
1624                        data,
1625                    },
1626                );
1627            }
1628        }
1629    }
1630    states
1631}
1632
1633#[cfg(test)]
1634mod tests {
1635    use super::*;
1636    use laminar_storage::checkpoint_store::FileSystemCheckpointStore;
1637
1638    fn make_coordinator(dir: &std::path::Path) -> CheckpointCoordinator {
1639        let store = Box::new(FileSystemCheckpointStore::new(dir, 3));
1640        CheckpointCoordinator::new(CheckpointConfig::default(), store)
1641    }
1642
1643    #[test]
1644    fn test_coordinator_new() {
1645        let dir = tempfile::tempdir().unwrap();
1646        let coord = make_coordinator(dir.path());
1647
1648        assert_eq!(coord.epoch(), 1);
1649        assert_eq!(coord.next_checkpoint_id(), 1);
1650        assert_eq!(coord.phase(), CheckpointPhase::Idle);
1651    }
1652
1653    #[test]
1654    fn test_coordinator_resumes_from_stored_checkpoint() {
1655        let dir = tempfile::tempdir().unwrap();
1656
1657        // Save a checkpoint manually
1658        let store = FileSystemCheckpointStore::new(dir.path(), 3);
1659        let m = CheckpointManifest::new(5, 10);
1660        store.save(&m).unwrap();
1661
1662        // Coordinator should resume from epoch 11, checkpoint_id 6
1663        let coord = make_coordinator(dir.path());
1664        assert_eq!(coord.epoch(), 11);
1665        assert_eq!(coord.next_checkpoint_id(), 6);
1666    }
1667
1668    #[test]
1669    fn test_checkpoint_phase_display() {
1670        assert_eq!(CheckpointPhase::Idle.to_string(), "Idle");
1671        assert_eq!(
1672            CheckpointPhase::BarrierInFlight.to_string(),
1673            "BarrierInFlight"
1674        );
1675        assert_eq!(CheckpointPhase::Snapshotting.to_string(), "Snapshotting");
1676        assert_eq!(CheckpointPhase::PreCommitting.to_string(), "PreCommitting");
1677        assert_eq!(CheckpointPhase::Persisting.to_string(), "Persisting");
1678        assert_eq!(CheckpointPhase::Committing.to_string(), "Committing");
1679    }
1680
1681    #[test]
1682    fn test_source_to_connector_checkpoint() {
1683        let mut cp = SourceCheckpoint::new(5);
1684        cp.set_offset("partition-0", "1234");
1685        cp.set_metadata("topic", "events");
1686
1687        let cc = source_to_connector_checkpoint(&cp);
1688        assert_eq!(cc.epoch, 5);
1689        assert_eq!(cc.offsets.get("partition-0"), Some(&"1234".into()));
1690        assert_eq!(cc.metadata.get("topic"), Some(&"events".into()));
1691    }
1692
1693    #[test]
1694    fn test_connector_to_source_checkpoint() {
1695        let cc = ConnectorCheckpoint {
1696            offsets: HashMap::from([("lsn".into(), "0/ABCD".into())]),
1697            epoch: 3,
1698            metadata: HashMap::from([("type".into(), "postgres".into())]),
1699        };
1700
1701        let cp = connector_to_source_checkpoint(&cc);
1702        assert_eq!(cp.epoch(), 3);
1703        assert_eq!(cp.get_offset("lsn"), Some("0/ABCD"));
1704        assert_eq!(cp.get_metadata("type"), Some("postgres"));
1705    }
1706
1707    #[test]
1708    fn test_stats_initial() {
1709        let dir = tempfile::tempdir().unwrap();
1710        let coord = make_coordinator(dir.path());
1711        let stats = coord.stats();
1712
1713        assert_eq!(stats.completed, 0);
1714        assert_eq!(stats.failed, 0);
1715        assert!(stats.last_duration.is_none());
1716        assert_eq!(stats.duration_p50_ms, 0);
1717        assert_eq!(stats.duration_p95_ms, 0);
1718        assert_eq!(stats.duration_p99_ms, 0);
1719        assert_eq!(stats.current_phase, CheckpointPhase::Idle);
1720    }
1721
1722    #[tokio::test]
1723    async fn test_checkpoint_no_sources_no_sinks() {
1724        let dir = tempfile::tempdir().unwrap();
1725        let mut coord = make_coordinator(dir.path());
1726
1727        let result = coord
1728            .checkpoint(HashMap::new(), Some(1000), None, HashMap::new(), None)
1729            .await
1730            .unwrap();
1731
1732        assert!(result.success);
1733        assert_eq!(result.checkpoint_id, 1);
1734        assert_eq!(result.epoch, 1);
1735
1736        // Verify manifest was persisted
1737        let loaded = coord.store().load_latest().unwrap().unwrap();
1738        assert_eq!(loaded.checkpoint_id, 1);
1739        assert_eq!(loaded.epoch, 1);
1740        assert_eq!(loaded.watermark, Some(1000));
1741
1742        // Second checkpoint should increment
1743        let result2 = coord
1744            .checkpoint(HashMap::new(), Some(2000), None, HashMap::new(), None)
1745            .await
1746            .unwrap();
1747
1748        assert!(result2.success);
1749        assert_eq!(result2.checkpoint_id, 2);
1750        assert_eq!(result2.epoch, 2);
1751
1752        let stats = coord.stats();
1753        assert_eq!(stats.completed, 2);
1754        assert_eq!(stats.failed, 0);
1755    }
1756
1757    #[tokio::test]
1758    async fn test_checkpoint_with_operator_states() {
1759        let dir = tempfile::tempdir().unwrap();
1760        let mut coord = make_coordinator(dir.path());
1761
1762        let mut ops = HashMap::new();
1763        ops.insert("window-agg".into(), b"state-data".to_vec());
1764        ops.insert("filter".into(), b"filter-state".to_vec());
1765
1766        let result = coord
1767            .checkpoint(ops, None, None, HashMap::new(), None)
1768            .await
1769            .unwrap();
1770
1771        assert!(result.success);
1772
1773        let loaded = coord.store().load_latest().unwrap().unwrap();
1774        assert_eq!(loaded.operator_states.len(), 2);
1775        // No WAL manager registered → positions are empty
1776        assert!(loaded.per_core_wal_positions.is_empty());
1777
1778        let window_op = loaded.operator_states.get("window-agg").unwrap();
1779        assert_eq!(window_op.decode_inline().unwrap(), b"state-data");
1780    }
1781
1782    #[tokio::test]
1783    async fn test_checkpoint_with_table_store_path() {
1784        let dir = tempfile::tempdir().unwrap();
1785        let mut coord = make_coordinator(dir.path());
1786
1787        let result = coord
1788            .checkpoint(
1789                HashMap::new(),
1790                None,
1791                Some("/tmp/rocksdb_cp".into()),
1792                HashMap::new(),
1793                None,
1794            )
1795            .await
1796            .unwrap();
1797
1798        assert!(result.success);
1799
1800        let loaded = coord.store().load_latest().unwrap().unwrap();
1801        assert_eq!(
1802            loaded.table_store_checkpoint_path.as_deref(),
1803            Some("/tmp/rocksdb_cp")
1804        );
1805    }
1806
1807    #[test]
1808    fn test_load_latest_manifest_empty() {
1809        let dir = tempfile::tempdir().unwrap();
1810        let coord = make_coordinator(dir.path());
1811        assert!(coord.load_latest_manifest().unwrap().is_none());
1812    }
1813
1814    #[test]
1815    fn test_coordinator_debug() {
1816        let dir = tempfile::tempdir().unwrap();
1817        let coord = make_coordinator(dir.path());
1818        let debug = format!("{coord:?}");
1819        assert!(debug.contains("CheckpointCoordinator"));
1820        assert!(debug.contains("epoch: 1"));
1821    }
1822
1823    // ── Operator state persistence tests ──
1824
1825    #[test]
1826    fn test_dag_snapshot_to_manifest_operators() {
1827        use laminar_core::dag::recovery::SerializableOperatorState;
1828
1829        let mut node_states = HashMap::new();
1830        node_states.insert(
1831            0,
1832            SerializableOperatorState {
1833                operator_id: "window-agg".into(),
1834                data: b"window-state".to_vec(),
1835            },
1836        );
1837        node_states.insert(
1838            3,
1839            SerializableOperatorState {
1840                operator_id: "filter".into(),
1841                data: b"filter-state".to_vec(),
1842            },
1843        );
1844
1845        let manifest_ops = dag_snapshot_to_manifest_operators(&node_states);
1846        assert_eq!(manifest_ops.len(), 2);
1847
1848        let w = manifest_ops.get("0").unwrap();
1849        assert_eq!(w.decode_inline().unwrap(), b"window-state");
1850        let f = manifest_ops.get("3").unwrap();
1851        assert_eq!(f.decode_inline().unwrap(), b"filter-state");
1852    }
1853
1854    #[test]
1855    fn test_manifest_operators_to_dag_states() {
1856        use laminar_storage::checkpoint_manifest::OperatorCheckpoint;
1857
1858        let mut operators = HashMap::new();
1859        operators.insert("0".into(), OperatorCheckpoint::inline(b"state-0"));
1860        operators.insert("5".into(), OperatorCheckpoint::inline(b"state-5"));
1861
1862        let dag_states = manifest_operators_to_dag_states(&operators);
1863        assert_eq!(dag_states.len(), 2);
1864
1865        let s0 = dag_states
1866            .get(&laminar_core::dag::topology::NodeId(0))
1867            .unwrap();
1868        assert_eq!(s0.data, b"state-0");
1869
1870        let s5 = dag_states
1871            .get(&laminar_core::dag::topology::NodeId(5))
1872            .unwrap();
1873        assert_eq!(s5.data, b"state-5");
1874    }
1875
1876    #[test]
1877    fn test_operator_state_round_trip_through_manifest() {
1878        use laminar_core::dag::recovery::SerializableOperatorState;
1879
1880        // Original DAG states
1881        let mut node_states = HashMap::new();
1882        node_states.insert(
1883            7,
1884            SerializableOperatorState {
1885                operator_id: "join".into(),
1886                data: vec![1, 2, 3, 4, 5],
1887            },
1888        );
1889
1890        // DAG → manifest
1891        let manifest_ops = dag_snapshot_to_manifest_operators(&node_states);
1892
1893        // Persist and reload (simulate)
1894        let json = serde_json::to_string(&manifest_ops).unwrap();
1895        let reloaded: HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint> =
1896            serde_json::from_str(&json).unwrap();
1897
1898        // Manifest → DAG
1899        let recovered = manifest_operators_to_dag_states(&reloaded);
1900        let state = recovered
1901            .get(&laminar_core::dag::topology::NodeId(7))
1902            .unwrap();
1903        assert_eq!(state.data, vec![1, 2, 3, 4, 5]);
1904    }
1905
1906    #[test]
1907    fn test_manifest_operators_skips_invalid_keys() {
1908        use laminar_storage::checkpoint_manifest::OperatorCheckpoint;
1909
1910        let mut operators = HashMap::new();
1911        operators.insert("not-a-number".into(), OperatorCheckpoint::inline(b"data"));
1912        operators.insert("42".into(), OperatorCheckpoint::inline(b"good"));
1913
1914        let dag_states = manifest_operators_to_dag_states(&operators);
1915        // Only the numeric key should survive
1916        assert_eq!(dag_states.len(), 1);
1917        assert!(dag_states.contains_key(&laminar_core::dag::topology::NodeId(42)));
1918    }
1919
1920    // ── WAL checkpoint coordination tests ──
1921
1922    #[test]
1923    fn test_prepare_wal_no_wal_manager() {
1924        let dir = tempfile::tempdir().unwrap();
1925        let mut coord = make_coordinator(dir.path());
1926
1927        // Without a WAL manager, prepare should succeed with empty positions.
1928        let result = coord.prepare_wal_for_checkpoint().unwrap();
1929        assert!(result.per_core_wal_positions.is_empty());
1930        assert_eq!(result.entries_drained, 0);
1931    }
1932
1933    #[test]
1934    fn test_prepare_wal_with_manager() {
1935        let dir = tempfile::tempdir().unwrap();
1936        let mut coord = make_coordinator(dir.path());
1937
1938        // Set up a per-core WAL manager
1939        let wal_dir = dir.path().join("wal");
1940        std::fs::create_dir_all(&wal_dir).unwrap();
1941        let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1942        let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1943        coord.register_wal_manager(wal);
1944
1945        // Write some data to WAL
1946        coord
1947            .wal_manager_mut()
1948            .unwrap()
1949            .writer(0)
1950            .append_put(b"key", b"value")
1951            .unwrap();
1952
1953        let result = coord.prepare_wal_for_checkpoint().unwrap();
1954        assert_eq!(result.per_core_wal_positions.len(), 2);
1955        // Positions should be > 0 (epoch barrier + data)
1956        assert!(result.per_core_wal_positions.iter().any(|p| *p > 0));
1957    }
1958
1959    #[test]
1960    fn test_truncate_wal_no_manager() {
1961        let dir = tempfile::tempdir().unwrap();
1962        let mut coord = make_coordinator(dir.path());
1963
1964        // Without a WAL manager, truncation is a no-op.
1965        coord.truncate_wal_after_checkpoint(vec![]).unwrap();
1966    }
1967
1968    #[test]
1969    fn test_truncate_wal_with_manager() {
1970        let dir = tempfile::tempdir().unwrap();
1971        let mut coord = make_coordinator(dir.path());
1972
1973        let wal_dir = dir.path().join("wal");
1974        std::fs::create_dir_all(&wal_dir).unwrap();
1975        let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1976        let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1977        coord.register_wal_manager(wal);
1978
1979        // Write data
1980        coord
1981            .wal_manager_mut()
1982            .unwrap()
1983            .writer(0)
1984            .append_put(b"key", b"value")
1985            .unwrap();
1986
1987        assert!(coord.wal_manager().unwrap().total_size() > 0);
1988
1989        // Truncate
1990        coord.truncate_wal_after_checkpoint(vec![]).unwrap();
1991        assert_eq!(coord.wal_manager().unwrap().total_size(), 0);
1992    }
1993
1994    #[test]
1995    fn test_truncate_wal_safety_buffer() {
1996        let dir = tempfile::tempdir().unwrap();
1997        let mut coord = make_coordinator(dir.path());
1998
1999        let wal_dir = dir.path().join("wal");
2000        std::fs::create_dir_all(&wal_dir).unwrap();
2001        let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
2002        let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
2003        coord.register_wal_manager(wal);
2004
2005        // Write some data before first checkpoint
2006        coord
2007            .wal_manager_mut()
2008            .unwrap()
2009            .writer(0)
2010            .append_put(b"k1", b"v1")
2011            .unwrap();
2012
2013        // First truncation (no previous positions) — resets to 0
2014        coord.truncate_wal_after_checkpoint(vec![100, 200]).unwrap();
2015        assert_eq!(coord.wal_manager().unwrap().total_size(), 0);
2016
2017        // Write more data
2018        coord
2019            .wal_manager_mut()
2020            .unwrap()
2021            .writer(0)
2022            .append_put(b"k2", b"v2")
2023            .unwrap();
2024        let size_after_write = coord.wal_manager().unwrap().total_size();
2025        assert!(size_after_write > 0);
2026
2027        // Second truncation — truncates to previous positions (100, 200),
2028        // not to 0. Since actual WAL positions are smaller than 100, this
2029        // effectively keeps all current data as safety buffer.
2030        coord.truncate_wal_after_checkpoint(vec![300, 400]).unwrap();
2031        // Data should still be present (positions 100,200 are beyond what
2032        // was written, so truncation is a no-op for the data range)
2033        assert!(coord.wal_manager().unwrap().total_size() > 0);
2034    }
2035
2036    #[test]
2037    fn test_prepare_wal_with_changelog_drainer() {
2038        use laminar_storage::incremental::StateChangelogBuffer;
2039        use laminar_storage::incremental::StateChangelogEntry;
2040        use std::sync::Arc;
2041
2042        let dir = tempfile::tempdir().unwrap();
2043        let mut coord = make_coordinator(dir.path());
2044
2045        // Set up a changelog buffer and drainer
2046        let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
2047
2048        // Push some entries
2049        buf.push(StateChangelogEntry::put(1, 100, 0, 10));
2050        buf.push(StateChangelogEntry::put(1, 200, 10, 20));
2051        buf.push(StateChangelogEntry::delete(1, 300));
2052
2053        let drainer = ChangelogDrainer::new(buf, 100);
2054        coord.register_changelog_drainer(drainer);
2055
2056        let result = coord.prepare_wal_for_checkpoint().unwrap();
2057        assert_eq!(result.entries_drained, 3);
2058        assert!(result.per_core_wal_positions.is_empty()); // No WAL manager
2059
2060        // Drainer should have pending entries
2061        assert_eq!(coord.changelog_drainers()[0].pending_count(), 3);
2062    }
2063
2064    #[tokio::test]
2065    async fn test_full_checkpoint_with_wal_coordination() {
2066        use laminar_storage::incremental::StateChangelogBuffer;
2067        use laminar_storage::incremental::StateChangelogEntry;
2068        use std::sync::Arc;
2069
2070        let dir = tempfile::tempdir().unwrap();
2071        let mut coord = make_coordinator(dir.path());
2072
2073        // Set up WAL manager
2074        let wal_dir = dir.path().join("wal");
2075        std::fs::create_dir_all(&wal_dir).unwrap();
2076        let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
2077        let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
2078        coord.register_wal_manager(wal);
2079
2080        // Set up changelog drainer
2081        let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
2082        buf.push(StateChangelogEntry::put(1, 100, 0, 10));
2083        let drainer = ChangelogDrainer::new(buf, 100);
2084        coord.register_changelog_drainer(drainer);
2085
2086        // Full cycle: checkpoint (WAL preparation is internal) → truncate
2087        let result = coord
2088            .checkpoint(HashMap::new(), Some(5000), None, HashMap::new(), None)
2089            .await
2090            .unwrap();
2091
2092        assert!(result.success);
2093
2094        // Changelog drainer pending should be cleared after successful checkpoint
2095        assert_eq!(
2096            coord.changelog_drainers()[0].pending_count(),
2097            0,
2098            "pending entries should be cleared after checkpoint"
2099        );
2100
2101        // Manifest should have per-core WAL positions (captured internally)
2102        let loaded = coord.store().load_latest().unwrap().unwrap();
2103        assert_eq!(loaded.per_core_wal_positions.len(), 2);
2104
2105        // Truncate WAL
2106        coord.truncate_wal_after_checkpoint(vec![]).unwrap();
2107        assert_eq!(coord.wal_manager().unwrap().total_size(), 0);
2108    }
2109
2110    #[test]
2111    fn test_wal_manager_accessors() {
2112        let dir = tempfile::tempdir().unwrap();
2113        let mut coord = make_coordinator(dir.path());
2114
2115        assert!(coord.wal_manager().is_none());
2116        assert!(coord.wal_manager_mut().is_none());
2117        assert!(coord.changelog_drainers().is_empty());
2118
2119        let wal_dir = dir.path().join("wal");
2120        std::fs::create_dir_all(&wal_dir).unwrap();
2121        let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
2122        let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
2123        coord.register_wal_manager(wal);
2124
2125        assert!(coord.wal_manager().is_some());
2126        assert!(coord.wal_manager_mut().is_some());
2127    }
2128
2129    #[test]
2130    fn test_coordinator_debug_with_wal() {
2131        let dir = tempfile::tempdir().unwrap();
2132        let mut coord = make_coordinator(dir.path());
2133
2134        let wal_dir = dir.path().join("wal");
2135        std::fs::create_dir_all(&wal_dir).unwrap();
2136        let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
2137        let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
2138        coord.register_wal_manager(wal);
2139
2140        let debug = format!("{coord:?}");
2141        assert!(debug.contains("has_wal_manager: true"));
2142        assert!(debug.contains("changelog_drainers: 0"));
2143    }
2144
2145    // ── Checkpoint observability tests ──
2146
2147    #[tokio::test]
2148    async fn test_checkpoint_emits_metrics_on_success() {
2149        use crate::metrics::PipelineCounters;
2150        use std::sync::atomic::Ordering;
2151
2152        let dir = tempfile::tempdir().unwrap();
2153        let mut coord = make_coordinator(dir.path());
2154
2155        let counters = Arc::new(PipelineCounters::new());
2156        coord.set_counters(Arc::clone(&counters));
2157
2158        let result = coord
2159            .checkpoint(HashMap::new(), Some(1000), None, HashMap::new(), None)
2160            .await
2161            .unwrap();
2162
2163        assert!(result.success);
2164        assert_eq!(counters.checkpoints_completed.load(Ordering::Relaxed), 1);
2165        assert_eq!(counters.checkpoints_failed.load(Ordering::Relaxed), 0);
2166        assert!(counters.last_checkpoint_duration_ms.load(Ordering::Relaxed) < 5000);
2167        assert_eq!(counters.checkpoint_epoch.load(Ordering::Relaxed), 1);
2168
2169        // Second checkpoint
2170        let result2 = coord
2171            .checkpoint(HashMap::new(), Some(2000), None, HashMap::new(), None)
2172            .await
2173            .unwrap();
2174
2175        assert!(result2.success);
2176        assert_eq!(counters.checkpoints_completed.load(Ordering::Relaxed), 2);
2177        assert_eq!(counters.checkpoint_epoch.load(Ordering::Relaxed), 2);
2178    }
2179
2180    #[tokio::test]
2181    async fn test_checkpoint_without_counters() {
2182        // Verify checkpoint works fine without counters set
2183        let dir = tempfile::tempdir().unwrap();
2184        let mut coord = make_coordinator(dir.path());
2185
2186        let result = coord
2187            .checkpoint(HashMap::new(), None, None, HashMap::new(), None)
2188            .await
2189            .unwrap();
2190
2191        assert!(result.success);
2192        // No panics — metrics emission is a no-op
2193    }
2194
2195    // ── DurationHistogram tests ──
2196
2197    #[test]
2198    fn test_histogram_empty() {
2199        let h = DurationHistogram::new();
2200        assert_eq!(h.len(), 0);
2201        assert_eq!(h.percentile(0.50), 0);
2202        assert_eq!(h.percentile(0.99), 0);
2203        let (p50, p95, p99) = h.percentiles();
2204        assert_eq!((p50, p95, p99), (0, 0, 0));
2205    }
2206
2207    #[test]
2208    fn test_histogram_single_sample() {
2209        let mut h = DurationHistogram::new();
2210        h.record(Duration::from_millis(42));
2211        assert_eq!(h.len(), 1);
2212        assert_eq!(h.percentile(0.50), 42);
2213        assert_eq!(h.percentile(0.99), 42);
2214    }
2215
2216    #[test]
2217    fn test_histogram_percentiles() {
2218        let mut h = DurationHistogram::new();
2219        // Record 1..=100ms in order.
2220        for i in 1..=100 {
2221            h.record(Duration::from_millis(i));
2222        }
2223        assert_eq!(h.len(), 100);
2224
2225        let p50 = h.percentile(0.50);
2226        let p95 = h.percentile(0.95);
2227        let p99 = h.percentile(0.99);
2228
2229        // With values 1..=100:
2230        //   p50 ≈ 50, p95 ≈ 95, p99 ≈ 99
2231        assert!((49..=51).contains(&p50), "p50={p50}");
2232        assert!((94..=96).contains(&p95), "p95={p95}");
2233        assert!((98..=100).contains(&p99), "p99={p99}");
2234    }
2235
2236    #[test]
2237    fn test_histogram_wraps_ring_buffer() {
2238        let mut h = DurationHistogram::new();
2239        // Write 150 samples — first 50 are overwritten.
2240        for i in 1..=150 {
2241            h.record(Duration::from_millis(i));
2242        }
2243        assert_eq!(h.len(), 100);
2244        assert_eq!(h.count, 150);
2245
2246        // Only samples 51..=150 remain in the buffer.
2247        let p50 = h.percentile(0.50);
2248        assert!((99..=101).contains(&p50), "p50={p50}");
2249    }
2250
2251    // ── Sidecar threshold tests ──
2252
2253    #[tokio::test]
2254    async fn test_sidecar_round_trip() {
2255        let dir = tempfile::tempdir().unwrap();
2256        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2257        let config = CheckpointConfig {
2258            state_inline_threshold: 100, // 100 bytes threshold
2259            ..CheckpointConfig::default()
2260        };
2261        let mut coord = CheckpointCoordinator::new(config, store);
2262
2263        // Small state stays inline, large state goes to sidecar
2264        let mut ops = HashMap::new();
2265        ops.insert("small".into(), vec![0xAAu8; 50]);
2266        ops.insert("large".into(), vec![0xBBu8; 200]);
2267
2268        let result = coord
2269            .checkpoint(ops, None, None, HashMap::new(), None)
2270            .await
2271            .unwrap();
2272        assert!(result.success);
2273
2274        // Verify manifest
2275        let loaded = coord.store().load_latest().unwrap().unwrap();
2276        let small_op = loaded.operator_states.get("small").unwrap();
2277        assert!(!small_op.external, "small state should be inline");
2278        assert_eq!(small_op.decode_inline().unwrap(), vec![0xAAu8; 50]);
2279
2280        let large_op = loaded.operator_states.get("large").unwrap();
2281        assert!(large_op.external, "large state should be external");
2282        assert_eq!(large_op.external_length, 200);
2283
2284        // Verify sidecar file exists and has correct data
2285        let state_data = coord.store().load_state_data(1).unwrap().unwrap();
2286        assert_eq!(state_data.len(), 200);
2287        assert!(state_data.iter().all(|&b| b == 0xBB));
2288    }
2289
2290    #[tokio::test]
2291    async fn test_all_inline_no_sidecar() {
2292        let dir = tempfile::tempdir().unwrap();
2293        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2294        let config = CheckpointConfig::default(); // 1MB threshold
2295        let mut coord = CheckpointCoordinator::new(config, store);
2296
2297        let mut ops = HashMap::new();
2298        ops.insert("op1".into(), b"small-state".to_vec());
2299
2300        let result = coord
2301            .checkpoint(ops, None, None, HashMap::new(), None)
2302            .await
2303            .unwrap();
2304        assert!(result.success);
2305
2306        // No sidecar file
2307        assert!(coord.store().load_state_data(1).unwrap().is_none());
2308    }
2309
2310    // ── Adaptive interval tests ──
2311
2312    #[test]
2313    fn test_adaptive_disabled_by_default() {
2314        let dir = tempfile::tempdir().unwrap();
2315        let coord = make_coordinator(dir.path());
2316        assert!(coord.config().adaptive.is_none());
2317        assert_eq!(coord.config().interval, Some(Duration::from_secs(60)));
2318    }
2319
2320    #[test]
2321    fn test_adaptive_increases_interval_for_slow_checkpoints() {
2322        let dir = tempfile::tempdir().unwrap();
2323        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2324        let config = CheckpointConfig {
2325            adaptive: Some(AdaptiveCheckpointConfig::default()),
2326            ..CheckpointConfig::default()
2327        };
2328        let mut coord = CheckpointCoordinator::new(config, store);
2329
2330        // Simulate a 5-second checkpoint
2331        coord.last_checkpoint_duration = Some(Duration::from_secs(5));
2332        coord.adjust_interval();
2333
2334        // Expected: 5000ms / (1000 * 0.1) = 50s
2335        let interval = coord.config().interval.unwrap();
2336        assert!(
2337            interval >= Duration::from_secs(49) && interval <= Duration::from_secs(51),
2338            "expected ~50s, got {interval:?}",
2339        );
2340    }
2341
2342    #[test]
2343    fn test_adaptive_decreases_interval_for_fast_checkpoints() {
2344        let dir = tempfile::tempdir().unwrap();
2345        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2346        let config = CheckpointConfig {
2347            adaptive: Some(AdaptiveCheckpointConfig::default()),
2348            ..CheckpointConfig::default()
2349        };
2350        let mut coord = CheckpointCoordinator::new(config, store);
2351
2352        // Simulate a 100ms checkpoint → 100 / (1000 * 0.1) = 1s → clamped to 10s min
2353        coord.last_checkpoint_duration = Some(Duration::from_millis(100));
2354        coord.adjust_interval();
2355
2356        let interval = coord.config().interval.unwrap();
2357        assert_eq!(
2358            interval,
2359            Duration::from_secs(10),
2360            "should clamp to min_interval"
2361        );
2362    }
2363
2364    #[test]
2365    fn test_adaptive_clamps_to_min_max() {
2366        let dir = tempfile::tempdir().unwrap();
2367        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2368        let config = CheckpointConfig {
2369            adaptive: Some(AdaptiveCheckpointConfig {
2370                min_interval: Duration::from_secs(20),
2371                max_interval: Duration::from_secs(120),
2372                target_overhead_ratio: 0.1,
2373                smoothing_alpha: 1.0, // Full weight on latest
2374            }),
2375            ..CheckpointConfig::default()
2376        };
2377        let mut coord = CheckpointCoordinator::new(config, store);
2378
2379        // Very slow → clamp to max
2380        coord.last_checkpoint_duration = Some(Duration::from_secs(60));
2381        coord.adjust_interval();
2382        let interval = coord.config().interval.unwrap();
2383        assert_eq!(interval, Duration::from_secs(120), "should clamp to max");
2384
2385        // Very fast → clamp to min
2386        coord.last_checkpoint_duration = Some(Duration::from_millis(10));
2387        coord.smoothed_duration_ms = 0.0; // Reset EMA
2388        coord.adjust_interval();
2389        let interval = coord.config().interval.unwrap();
2390        assert_eq!(interval, Duration::from_secs(20), "should clamp to min");
2391    }
2392
2393    #[test]
2394    fn test_adaptive_ema_smoothing() {
2395        let dir = tempfile::tempdir().unwrap();
2396        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2397        let config = CheckpointConfig {
2398            adaptive: Some(AdaptiveCheckpointConfig {
2399                min_interval: Duration::from_secs(1),
2400                max_interval: Duration::from_secs(600),
2401                target_overhead_ratio: 0.1,
2402                smoothing_alpha: 0.5,
2403            }),
2404            ..CheckpointConfig::default()
2405        };
2406        let mut coord = CheckpointCoordinator::new(config, store);
2407
2408        // First observation: 1000ms → EMA = 1000 (cold start)
2409        coord.last_checkpoint_duration = Some(Duration::from_millis(1000));
2410        coord.adjust_interval();
2411        assert!((coord.smoothed_duration_ms() - 1000.0).abs() < 1.0);
2412
2413        // Second observation: 2000ms → EMA = 0.5*2000 + 0.5*1000 = 1500
2414        coord.last_checkpoint_duration = Some(Duration::from_millis(2000));
2415        coord.adjust_interval();
2416        assert!((coord.smoothed_duration_ms() - 1500.0).abs() < 1.0);
2417
2418        // Third observation: 2000ms → EMA = 0.5*2000 + 0.5*1500 = 1750
2419        coord.last_checkpoint_duration = Some(Duration::from_millis(2000));
2420        coord.adjust_interval();
2421        assert!((coord.smoothed_duration_ms() - 1750.0).abs() < 1.0);
2422    }
2423
2424    #[tokio::test]
2425    async fn test_stats_include_percentiles_after_checkpoints() {
2426        let dir = tempfile::tempdir().unwrap();
2427        let mut coord = make_coordinator(dir.path());
2428
2429        // Run 3 checkpoints.
2430        for _ in 0..3 {
2431            let result = coord
2432                .checkpoint(HashMap::new(), None, None, HashMap::new(), None)
2433                .await
2434                .unwrap();
2435            assert!(result.success);
2436        }
2437
2438        let stats = coord.stats();
2439        assert_eq!(stats.completed, 3);
2440        // After 3 fast checkpoints, percentiles should be > 0
2441        // (they're real durations, not zero).
2442        assert!(stats.last_duration.is_some());
2443    }
2444}