Skip to main content

laminar_db/
checkpoint_coordinator.rs

1//! Unified checkpoint coordinator.
2//!
3//! Single orchestrator that replaces `StreamCheckpointManager`,
4//! `PipelineCheckpointManager`, and the persistence side of `DagRecoveryManager`.
5//! Lives in Ring 2 (control plane). Reuses the existing
6//! `DagCheckpointCoordinator` for barrier logic.
7//!
8//! ## Checkpoint Cycle
9//!
10//! 1. Barrier propagation — `dag_coordinator.trigger_checkpoint()`
11//! 2. Operator snapshot — `dag_coordinator.finalize_checkpoint()` → operator states
12//! 3. Source snapshot — `source.checkpoint()` for each source
13//! 4. Sink pre-commit — `sink.pre_commit(epoch)` for each exactly-once sink
14//! 5. Manifest persist — `store.save(&manifest)` (atomic write)
15//! 6. Sink commit — `sink.commit_epoch(epoch)` for each exactly-once sink
16//! 7. On ANY failure at 6 — `sink.rollback_epoch()` on remaining sinks
17#![allow(clippy::disallowed_types)] // cold path
18
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use std::sync::atomic::Ordering;
24
25use laminar_connectors::checkpoint::SourceCheckpoint;
26use laminar_connectors::connector::SourceConnector;
27use laminar_storage::changelog_drainer::ChangelogDrainer;
28use laminar_storage::checkpoint_manifest::{
29    CheckpointManifest, ConnectorCheckpoint, SinkCommitStatus,
30};
31use laminar_storage::checkpoint_store::CheckpointStore;
32use laminar_storage::per_core_wal::PerCoreWalManager;
33use tracing::{debug, error, info, warn};
34
35use crate::error::DbError;
36use crate::metrics::PipelineCounters;
37
38/// Unified checkpoint configuration.
39#[derive(Debug, Clone)]
40pub struct CheckpointConfig {
41    /// Interval between checkpoints. `None` = manual only.
42    pub interval: Option<Duration>,
43    /// Maximum number of retained checkpoints.
44    pub max_retained: usize,
45    /// Maximum time to wait for barrier alignment at fan-in nodes.
46    pub alignment_timeout: Duration,
47    /// Maximum time to wait for all sinks to pre-commit.
48    ///
49    /// A stuck sink will not block checkpointing indefinitely.
50    /// Defaults to 30 seconds.
51    pub pre_commit_timeout: Duration,
52    /// Maximum time to wait for manifest persist (filesystem I/O).
53    ///
54    /// A hung or degraded filesystem will not stall the runtime indefinitely.
55    /// Defaults to 120 seconds.
56    pub persist_timeout: Duration,
57    /// Maximum time to wait for all sinks to commit (phase 2).
58    ///
59    /// Defaults to 60 seconds.
60    pub commit_timeout: Duration,
61    /// Whether to use incremental checkpoints.
62    pub incremental: bool,
63    /// Maximum operator state size (bytes) to inline in the JSON manifest.
64    ///
65    /// States larger than this threshold are written to a `state.bin` sidecar
66    /// file and referenced by offset/length in the manifest. This avoids
67    /// inflating the manifest with base64-encoded blobs (~33% overhead).
68    ///
69    /// Default: 1 MB (`1_048_576`). Set to `usize::MAX` to inline everything.
70    pub state_inline_threshold: usize,
71    /// Maximum total checkpoint size in bytes (manifest + sidecar).
72    ///
73    /// If the packed operator state exceeds this limit, the checkpoint is
74    /// rejected with `[LDB-6014]` and not persisted. This prevents
75    /// unbounded state from creating multi-GB sidecar files.
76    ///
77    /// `None` means no limit (default). A warning is logged at 80% of the cap.
78    pub max_checkpoint_bytes: Option<usize>,
79    /// Maximum pending changelog entries per drainer before forced clear.
80    ///
81    /// On checkpoint failure, `clear_pending()` is normally skipped. If
82    /// failures repeat, pending entries grow unboundedly → OOM. This cap
83    /// is a safety valve: if any drainer exceeds it after a failure, all
84    /// drainers are cleared and an `[LDB-6010]` warning is logged.
85    ///
86    /// Default: 10,000,000 entries.
87    pub max_pending_changelog_entries: usize,
88    /// Adaptive checkpoint interval configuration.
89    ///
90    /// When `Some`, the coordinator dynamically adjusts the checkpoint interval
91    /// based on observed checkpoint durations using an exponential moving average.
92    /// When `None` (default), the static `interval` is used unchanged.
93    pub adaptive: Option<AdaptiveCheckpointConfig>,
94    /// Unaligned checkpoint configuration.
95    ///
96    /// When `Some`, the coordinator can fall back to unaligned checkpoints
97    /// when barrier alignment takes too long under backpressure.
98    /// When `None` (default), only aligned checkpoints are used.
99    pub unaligned: Option<UnalignedCheckpointConfig>,
100}
101
102/// Configuration for adaptive checkpoint intervals.
103///
104/// Dynamically adjusts the checkpoint interval based on observed checkpoint
105/// durations: `interval = clamp(smoothed_duration / target_ratio, min, max)`.
106///
107/// This avoids checkpointing too frequently under light load (wasting I/O)
108/// or too infrequently under heavy load (increasing recovery time).
109#[derive(Debug, Clone)]
110pub struct AdaptiveCheckpointConfig {
111    /// Minimum checkpoint interval (floor). Default: 10s.
112    pub min_interval: Duration,
113    /// Maximum checkpoint interval (ceiling). Default: 300s.
114    pub max_interval: Duration,
115    /// Target ratio of checkpoint duration to interval.
116    ///
117    /// For example, 0.1 means checkpoints should take at most 10% of the
118    /// time between them. Default: 0.1.
119    pub target_overhead_ratio: f64,
120    /// EMA smoothing factor for checkpoint durations.
121    ///
122    /// Higher values give more weight to recent observations.
123    /// Range: 0.0–1.0. Default: 0.3.
124    pub smoothing_alpha: f64,
125}
126
127impl Default for AdaptiveCheckpointConfig {
128    fn default() -> Self {
129        Self {
130            min_interval: Duration::from_secs(10),
131            max_interval: Duration::from_secs(300),
132            target_overhead_ratio: 0.1,
133            smoothing_alpha: 0.3,
134        }
135    }
136}
137
138/// Re-export from `laminar_core::checkpoint::unaligned`.
139pub use laminar_core::checkpoint::UnalignedCheckpointConfig;
140
141impl Default for CheckpointConfig {
142    fn default() -> Self {
143        Self {
144            interval: Some(Duration::from_secs(60)),
145            max_retained: 3,
146            alignment_timeout: Duration::from_secs(30),
147            pre_commit_timeout: Duration::from_secs(30),
148            persist_timeout: Duration::from_secs(120),
149            commit_timeout: Duration::from_secs(60),
150            incremental: false,
151            state_inline_threshold: 1_048_576,
152            max_checkpoint_bytes: None,
153            max_pending_changelog_entries: 10_000_000,
154            adaptive: None,
155            unaligned: None,
156        }
157    }
158}
159
160/// Phase of the checkpoint lifecycle.
161#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
162pub enum CheckpointPhase {
163    /// No checkpoint in progress.
164    Idle,
165    /// Barrier injected, waiting for operator snapshots.
166    BarrierInFlight,
167    /// Operators snapshotted, collecting source positions.
168    Snapshotting,
169    /// Sinks pre-committing (phase 1).
170    PreCommitting,
171    /// Manifest being persisted.
172    Persisting,
173    /// Sinks committing (phase 2).
174    Committing,
175}
176
177impl std::fmt::Display for CheckpointPhase {
178    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179        match self {
180            Self::Idle => write!(f, "Idle"),
181            Self::BarrierInFlight => write!(f, "BarrierInFlight"),
182            Self::Snapshotting => write!(f, "Snapshotting"),
183            Self::PreCommitting => write!(f, "PreCommitting"),
184            Self::Persisting => write!(f, "Persisting"),
185            Self::Committing => write!(f, "Committing"),
186        }
187    }
188}
189
190/// Result of a checkpoint attempt.
191#[derive(Debug, serde::Serialize)]
192pub struct CheckpointResult {
193    /// Whether the checkpoint succeeded.
194    pub success: bool,
195    /// Checkpoint ID (if created).
196    pub checkpoint_id: u64,
197    /// Epoch number.
198    pub epoch: u64,
199    /// Duration of the checkpoint operation.
200    pub duration: Duration,
201    /// Error message if failed.
202    pub error: Option<String>,
203}
204
205/// Registered source for checkpoint coordination.
206pub(crate) struct RegisteredSource {
207    /// Source name.
208    pub name: String,
209    /// Source connector handle.
210    pub connector: Arc<tokio::sync::Mutex<Box<dyn SourceConnector>>>,
211    /// Whether this source supports replay from a checkpointed position.
212    ///
213    /// Sources that do not support replay (e.g., WebSocket) degrade
214    /// exactly-once semantics to at-most-once.
215    pub supports_replay: bool,
216}
217
218/// Registered sink for checkpoint coordination.
219pub(crate) struct RegisteredSink {
220    /// Sink name.
221    pub name: String,
222    /// Sink task handle (channel-based, no mutex contention).
223    pub handle: crate::sink_task::SinkTaskHandle,
224    /// Whether this sink supports exactly-once / two-phase commit.
225    pub exactly_once: bool,
226}
227
228/// Result of WAL preparation for a checkpoint.
229///
230/// Contains the WAL positions recorded after flushing changelog drainers,
231/// writing epoch barriers, and syncing all segments.
232#[derive(Debug, Clone)]
233pub struct WalPrepareResult {
234    /// Per-core WAL positions at the time of the epoch barrier.
235    pub per_core_wal_positions: Vec<u64>,
236    /// Number of changelog entries drained across all drainers.
237    pub entries_drained: u64,
238}
239
240/// Unified checkpoint coordinator.
241///
242/// Orchestrates the full checkpoint lifecycle across sources, sinks,
243/// and operator state, persisting everything in a single
244/// [`CheckpointManifest`].
245pub struct CheckpointCoordinator {
246    config: CheckpointConfig,
247    store: Arc<dyn CheckpointStore>,
248    sinks: Vec<RegisteredSink>,
249    next_checkpoint_id: u64,
250    epoch: u64,
251    phase: CheckpointPhase,
252    checkpoints_completed: u64,
253    checkpoints_failed: u64,
254    last_checkpoint_duration: Option<Duration>,
255    /// Rolling histogram of checkpoint durations for percentile tracking.
256    duration_histogram: DurationHistogram,
257    /// Per-core WAL manager for epoch barriers and truncation.
258    wal_manager: Option<PerCoreWalManager>,
259    /// Changelog drainers to flush before checkpointing.
260    changelog_drainers: Vec<ChangelogDrainer>,
261    /// Shared counters for observability.
262    counters: Option<Arc<PipelineCounters>>,
263    /// Exponential moving average of checkpoint durations (milliseconds).
264    smoothed_duration_ms: f64,
265    /// Cumulative bytes written across all checkpoints (manifest + sidecar).
266    total_bytes_written: u64,
267    /// Number of checkpoints completed in unaligned mode.
268    unaligned_checkpoint_count: u64,
269    /// WAL positions from the previous checkpoint, used as a truncation
270    /// safety buffer — we keep one checkpoint's worth of WAL replay data.
271    previous_wal_positions: Option<Vec<u64>>,
272}
273
274impl CheckpointCoordinator {
275    /// Creates a new checkpoint coordinator.
276    #[must_use]
277    pub fn new(config: CheckpointConfig, store: Box<dyn CheckpointStore>) -> Self {
278        let store: Arc<dyn CheckpointStore> = Arc::from(store);
279        // Determine starting epoch from stored checkpoints.
280        let (next_id, epoch) = match store.load_latest() {
281            Ok(Some(m)) => (m.checkpoint_id + 1, m.epoch + 1),
282            _ => (1, 1),
283        };
284
285        Self {
286            config,
287            store,
288            sinks: Vec::new(),
289            next_checkpoint_id: next_id,
290            epoch,
291            phase: CheckpointPhase::Idle,
292            checkpoints_completed: 0,
293            checkpoints_failed: 0,
294            last_checkpoint_duration: None,
295            duration_histogram: DurationHistogram::new(),
296            wal_manager: None,
297            changelog_drainers: Vec::new(),
298            counters: None,
299            smoothed_duration_ms: 0.0,
300            total_bytes_written: 0,
301            unaligned_checkpoint_count: 0,
302            previous_wal_positions: None,
303        }
304    }
305
306    /// Registers a sink connector for checkpoint coordination.
307    pub(crate) fn register_sink(
308        &mut self,
309        name: impl Into<String>,
310        handle: crate::sink_task::SinkTaskHandle,
311        exactly_once: bool,
312    ) {
313        self.sinks.push(RegisteredSink {
314            name: name.into(),
315            handle,
316            exactly_once,
317        });
318    }
319
320    /// Begins the initial epoch on all exactly-once sinks.
321    ///
322    /// Must be called once after all sinks are registered and before any
323    /// writes occur. This starts the first Kafka transaction for exactly-once
324    /// sinks. Subsequent epochs are started automatically after each
325    /// successful checkpoint commit.
326    ///
327    /// # Errors
328    ///
329    /// Returns the first error from any sink that fails to begin the epoch.
330    pub async fn begin_initial_epoch(&self) -> Result<(), DbError> {
331        self.begin_epoch_for_sinks(self.epoch).await
332    }
333
334    /// Begins an epoch on all exactly-once sinks. If any sink fails,
335    /// rolls back sinks that already started the epoch.
336    async fn begin_epoch_for_sinks(&self, epoch: u64) -> Result<(), DbError> {
337        let mut started: Vec<&RegisteredSink> = Vec::new();
338        for sink in &self.sinks {
339            if sink.exactly_once {
340                match sink.handle.begin_epoch(epoch).await {
341                    Ok(()) => {
342                        started.push(sink);
343                        debug!(sink = %sink.name, epoch, "began epoch");
344                    }
345                    Err(e) => {
346                        // Roll back sinks that already started.
347                        for s in &started {
348                            s.handle.rollback_epoch(epoch).await;
349                        }
350                        return Err(DbError::Checkpoint(format!(
351                            "sink '{}' failed to begin epoch {epoch}: {e}",
352                            sink.name
353                        )));
354                    }
355                }
356            }
357        }
358        Ok(())
359    }
360
361    // ── Observability ──
362
363    /// Sets the shared pipeline counters for checkpoint metrics emission.
364    ///
365    /// When set, checkpoint completion and failure will update the counters
366    /// automatically.
367    pub fn set_counters(&mut self, counters: Arc<PipelineCounters>) {
368        self.counters = Some(counters);
369    }
370
371    /// Emits checkpoint metrics to the shared counters.
372    fn emit_checkpoint_metrics(&self, success: bool, epoch: u64, duration: Duration) {
373        if let Some(ref counters) = self.counters {
374            if success {
375                counters
376                    .checkpoints_completed
377                    .fetch_add(1, Ordering::Relaxed);
378            } else {
379                counters.checkpoints_failed.fetch_add(1, Ordering::Relaxed);
380            }
381            #[allow(clippy::cast_possible_truncation)]
382            counters
383                .last_checkpoint_duration_ms
384                .store(duration.as_millis() as u64, Ordering::Relaxed);
385            counters.checkpoint_epoch.store(epoch, Ordering::Relaxed);
386        }
387    }
388
389    // ── WAL coordination ──
390
391    /// Registers a per-core WAL manager for checkpoint coordination.
392    ///
393    /// When registered, [`prepare_wal_for_checkpoint()`](Self::prepare_wal_for_checkpoint)
394    /// will write epoch barriers and sync all segments, and
395    /// [`truncate_wal_after_checkpoint()`](Self::truncate_wal_after_checkpoint)
396    /// will reset all segments.
397    pub fn register_wal_manager(&mut self, wal_manager: PerCoreWalManager) {
398        self.wal_manager = Some(wal_manager);
399    }
400
401    /// Registers a changelog drainer to flush before checkpointing.
402    ///
403    /// Multiple drainers may be registered (one per core or per state store).
404    /// All are flushed during
405    /// [`prepare_wal_for_checkpoint()`](Self::prepare_wal_for_checkpoint).
406    pub fn register_changelog_drainer(&mut self, drainer: ChangelogDrainer) {
407        self.changelog_drainers.push(drainer);
408    }
409
410    /// Prepares the WAL for a checkpoint.
411    ///
412    /// 1. Flushes all registered [`ChangelogDrainer`] instances (Ring 1 → Ring 0 catchup)
413    /// 2. Writes epoch barriers to all per-core WAL segments
414    /// 3. Syncs all WAL segments (`fdatasync`)
415    /// 4. Records and returns per-core WAL positions
416    ///
417    /// Call this **before** [`checkpoint()`](Self::checkpoint) and pass the returned
418    /// positions into that method.
419    ///
420    /// # Errors
421    ///
422    /// Returns `DbError::Checkpoint` if WAL operations fail.
423    pub fn prepare_wal_for_checkpoint(&mut self) -> Result<WalPrepareResult, DbError> {
424        // Step 1: Flush changelog drainers
425        let mut total_drained: u64 = 0;
426        for drainer in &mut self.changelog_drainers {
427            let count = drainer.drain();
428            total_drained += count as u64;
429            debug!(
430                drained = count,
431                pending = drainer.pending_count(),
432                "changelog drainer flushed"
433            );
434        }
435
436        // Step 2-4: WAL epoch barriers + sync + positions
437        let per_core_wal_positions = if let Some(ref mut wal) = self.wal_manager {
438            let epoch = wal.advance_epoch();
439            wal.set_epoch_all(epoch);
440
441            wal.write_epoch_barrier_all()
442                .map_err(|e| DbError::Checkpoint(format!("WAL epoch barrier failed: {e}")))?;
443
444            wal.sync_all()
445                .map_err(|e| DbError::Checkpoint(format!("WAL sync failed: {e}")))?;
446
447            // Use synced positions — only durable data is safe for the manifest.
448            let positions = wal.synced_positions();
449            debug!(epoch, positions = ?positions, "WAL prepared for checkpoint");
450            positions
451        } else {
452            Vec::new()
453        };
454
455        Ok(WalPrepareResult {
456            per_core_wal_positions,
457            entries_drained: total_drained,
458        })
459    }
460
461    /// Truncates (resets) all per-core WAL segments after a successful checkpoint.
462    ///
463    /// Call this **after** [`checkpoint()`](Self::checkpoint) returns success.
464    /// WAL data before the checkpoint position is no longer needed.
465    ///
466    /// # Errors
467    ///
468    /// Returns `DbError::Checkpoint` if truncation fails.
469    pub fn truncate_wal_after_checkpoint(
470        &mut self,
471        current_positions: Vec<u64>,
472    ) -> Result<(), DbError> {
473        if let Some(ref mut wal) = self.wal_manager {
474            if let Some(ref prev) = self.previous_wal_positions {
475                // Truncate to previous checkpoint's positions, keeping one
476                // checkpoint's worth of WAL data as a safety buffer.
477                wal.truncate_all(prev)
478                    .map_err(|e| DbError::Checkpoint(format!("WAL truncation failed: {e}")))?;
479                debug!("WAL segments truncated to previous checkpoint positions");
480            } else {
481                // First checkpoint — no previous positions, truncate to 0.
482                wal.reset_all()
483                    .map_err(|e| DbError::Checkpoint(format!("WAL truncation failed: {e}")))?;
484                debug!("WAL segments reset after first checkpoint");
485            }
486        }
487        self.previous_wal_positions = Some(current_positions);
488        Ok(())
489    }
490
491    /// Returns a reference to the registered WAL manager, if any.
492    #[must_use]
493    pub fn wal_manager(&self) -> Option<&PerCoreWalManager> {
494        self.wal_manager.as_ref()
495    }
496
497    /// Returns a mutable reference to the registered WAL manager, if any.
498    pub fn wal_manager_mut(&mut self) -> Option<&mut PerCoreWalManager> {
499        self.wal_manager.as_mut()
500    }
501
502    /// Returns a slice of registered changelog drainers.
503    #[must_use]
504    pub fn changelog_drainers(&self) -> &[ChangelogDrainer] {
505        &self.changelog_drainers
506    }
507
508    /// Returns a mutable slice of registered changelog drainers.
509    pub fn changelog_drainers_mut(&mut self) -> &mut [ChangelogDrainer] {
510        &mut self.changelog_drainers
511    }
512
513    /// Safety valve: clears changelog drainer buffers if any exceeds the cap.
514    ///
515    /// Called on checkpoint failure to prevent unbounded memory growth when
516    /// checkpoints fail repeatedly. Under normal operation, `clear_pending()`
517    /// is called on the success path in `checkpoint_inner()`.
518    fn maybe_cap_drainers(&mut self) {
519        let cap = self.config.max_pending_changelog_entries;
520        let needs_clear = self
521            .changelog_drainers
522            .iter()
523            .any(|d| d.pending_count() > cap);
524        if needs_clear {
525            let total: usize = self
526                .changelog_drainers
527                .iter()
528                .map(ChangelogDrainer::pending_count)
529                .sum();
530            warn!(
531                total_pending = total,
532                cap,
533                "[LDB-6010] changelog drainer exceeded cap after checkpoint failure — \
534                 clearing to prevent OOM"
535            );
536            for drainer in &mut self.changelog_drainers {
537                drainer.clear_pending();
538            }
539        }
540    }
541
542    /// Performs a full checkpoint cycle (steps 3-7).
543    ///
544    /// Steps 1-2 (barrier propagation + operator snapshots) are handled
545    /// externally by the DAG executor and passed in as `operator_states`.
546    ///
547    /// # Arguments
548    ///
549    /// * `operator_states` — serialized operator state from `DagCheckpointCoordinator`
550    /// * `watermark` — current global watermark
551    /// * `table_store_checkpoint_path` — table store checkpoint path
552    /// * `source_watermarks` — per-source watermarks
553    /// * `pipeline_hash` — pipeline identity hash
554    ///
555    /// # Errors
556    ///
557    /// Returns `DbError::Checkpoint` if any phase fails.
558    #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
559    pub async fn checkpoint(
560        &mut self,
561        operator_states: HashMap<String, Vec<u8>>,
562        watermark: Option<i64>,
563        table_store_checkpoint_path: Option<String>,
564        source_watermarks: HashMap<String, i64>,
565        pipeline_hash: Option<u64>,
566    ) -> Result<CheckpointResult, DbError> {
567        self.checkpoint_inner(
568            operator_states,
569            watermark,
570            table_store_checkpoint_path,
571            HashMap::new(),
572            source_watermarks,
573            pipeline_hash,
574            HashMap::new(),
575        )
576        .await
577    }
578
579    /// Pre-commits all exactly-once sinks (phase 1) with a timeout.
580    ///
581    /// A stuck sink will not block checkpointing indefinitely. The timeout
582    /// is configured via [`CheckpointConfig::pre_commit_timeout`].
583    async fn pre_commit_sinks(&self, epoch: u64) -> Result<(), DbError> {
584        let timeout_dur = self.config.pre_commit_timeout;
585
586        match tokio::time::timeout(timeout_dur, self.pre_commit_sinks_inner(epoch)).await {
587            Ok(result) => result,
588            Err(_elapsed) => Err(DbError::Checkpoint(format!(
589                "pre-commit timed out after {}s",
590                timeout_dur.as_secs()
591            ))),
592        }
593    }
594
595    /// Inner pre-commit loop (no timeout).
596    ///
597    /// Only sinks with `exactly_once = true` participate in two-phase commit.
598    /// At-most-once sinks are skipped — they receive no `pre_commit`/`commit`
599    /// calls and provide no transactional guarantees.
600    async fn pre_commit_sinks_inner(&self, epoch: u64) -> Result<(), DbError> {
601        for sink in &self.sinks {
602            if sink.exactly_once {
603                sink.handle.pre_commit(epoch).await.map_err(|e| {
604                    DbError::Checkpoint(format!("sink '{}' pre-commit failed: {e}", sink.name))
605                })?;
606                debug!(sink = %sink.name, epoch, "sink pre-committed");
607            }
608        }
609        Ok(())
610    }
611
612    /// Commits all exactly-once sinks with per-sink status tracking.
613    ///
614    /// Returns a map of sink name → commit status. Sinks that committed
615    /// successfully are `Committed`; failures are `Failed(message)`.
616    /// All sinks are attempted even if some fail.
617    ///
618    /// Bounded by [`CheckpointConfig::commit_timeout`] to prevent a stuck
619    /// sink from blocking checkpoint completion indefinitely.
620    async fn commit_sinks_tracked(&self, epoch: u64) -> HashMap<String, SinkCommitStatus> {
621        let timeout_dur = self.config.commit_timeout;
622
623        match tokio::time::timeout(timeout_dur, self.commit_sinks_inner(epoch)).await {
624            Ok(statuses) => statuses,
625            Err(_elapsed) => {
626                error!(
627                    epoch,
628                    timeout_secs = timeout_dur.as_secs(),
629                    "[LDB-6012] sink commit timed out — marking all pending sinks as failed"
630                );
631                self.sinks
632                    .iter()
633                    .filter(|s| s.exactly_once)
634                    .map(|s| {
635                        (
636                            s.name.clone(),
637                            SinkCommitStatus::Failed(format!(
638                                "sink '{}' commit timed out after {}s",
639                                s.name,
640                                timeout_dur.as_secs()
641                            )),
642                        )
643                    })
644                    .collect()
645            }
646        }
647    }
648
649    /// Inner commit loop (no timeout).
650    async fn commit_sinks_inner(&self, epoch: u64) -> HashMap<String, SinkCommitStatus> {
651        let mut statuses = HashMap::with_capacity(self.sinks.len());
652
653        for sink in &self.sinks {
654            if sink.exactly_once {
655                match sink.handle.commit_epoch(epoch).await {
656                    Ok(()) => {
657                        statuses.insert(sink.name.clone(), SinkCommitStatus::Committed);
658                        debug!(sink = %sink.name, epoch, "sink committed");
659                    }
660                    Err(e) => {
661                        let msg = format!("sink '{}' commit failed: {e}", sink.name);
662                        error!(sink = %sink.name, epoch, error = %e, "sink commit failed");
663                        statuses.insert(sink.name.clone(), SinkCommitStatus::Failed(msg));
664                    }
665                }
666            }
667        }
668
669        statuses
670    }
671
672    /// Saves a manifest to the checkpoint store (blocking I/O on a task).
673    ///
674    /// Uses [`CheckpointStore::save_with_state`] to write optional sidecar
675    /// data **before** the manifest, ensuring atomicity: if the sidecar write
676    /// fails, the manifest is never persisted.
677    ///
678    /// Takes `manifest` by value to avoid cloning on the common path.
679    /// Callers that need the manifest after save should clone before calling.
680    ///
681    /// Bounded by [`CheckpointConfig::persist_timeout`] to prevent a hung
682    /// filesystem from stalling the runtime indefinitely.
683    async fn save_manifest(
684        &self,
685        manifest: CheckpointManifest,
686        state_data: Option<Vec<u8>>,
687    ) -> Result<(), DbError> {
688        let store = Arc::clone(&self.store);
689        let timeout_dur = self.config.persist_timeout;
690
691        let task = tokio::task::spawn_blocking(move || {
692            store.save_with_state(&manifest, state_data.as_deref())
693        });
694
695        match tokio::time::timeout(timeout_dur, task).await {
696            Ok(Ok(Ok(()))) => Ok(()),
697            Ok(Ok(Err(e))) => Err(DbError::Checkpoint(format!("manifest persist failed: {e}"))),
698            Ok(Err(join_err)) => Err(DbError::Checkpoint(format!(
699                "manifest persist task failed: {join_err}"
700            ))),
701            Err(_elapsed) => Err(DbError::Checkpoint(format!(
702                "[LDB-6011] manifest persist timed out after {}s — \
703                 filesystem may be degraded",
704                timeout_dur.as_secs()
705            ))),
706        }
707    }
708
709    /// Overwrites an existing manifest with updated fields (e.g., sink commit
710    /// statuses after Step 6). Uses [`CheckpointStore::update_manifest`] which
711    /// does NOT use conditional PUT, so the overwrite always succeeds.
712    async fn update_manifest_only(&self, manifest: &CheckpointManifest) -> Result<(), DbError> {
713        let store = Arc::clone(&self.store);
714        let manifest = manifest.clone();
715        let timeout_dur = self.config.persist_timeout;
716
717        let task = tokio::task::spawn_blocking(move || store.update_manifest(&manifest));
718
719        match tokio::time::timeout(timeout_dur, task).await {
720            Ok(Ok(Ok(()))) => Ok(()),
721            Ok(Ok(Err(e))) => Err(DbError::Checkpoint(format!("manifest update failed: {e}"))),
722            Ok(Err(join_err)) => Err(DbError::Checkpoint(format!(
723                "manifest update task failed: {join_err}"
724            ))),
725            Err(_elapsed) => Err(DbError::Checkpoint(format!(
726                "manifest update timed out after {}s",
727                timeout_dur.as_secs()
728            ))),
729        }
730    }
731
732    /// Returns initial sink commit statuses (all `Pending`) for the manifest.
733    fn initial_sink_commit_statuses(&self) -> HashMap<String, SinkCommitStatus> {
734        self.sinks
735            .iter()
736            .filter(|s| s.exactly_once)
737            .map(|s| (s.name.clone(), SinkCommitStatus::Pending))
738            .collect()
739    }
740
741    /// Packs operator states into a manifest with optional sidecar blob.
742    ///
743    /// States larger than `threshold` are stored in a sidecar blob rather
744    /// than base64-inlined in the JSON manifest.
745    fn pack_operator_states(
746        manifest: &mut CheckpointManifest,
747        operator_states: &HashMap<String, Vec<u8>>,
748        threshold: usize,
749    ) -> Option<Vec<u8>> {
750        let mut sidecar_blobs: Vec<u8> = Vec::new();
751        for (name, data) in operator_states {
752            let (op_ckpt, maybe_blob) =
753                laminar_storage::checkpoint_manifest::OperatorCheckpoint::from_bytes(
754                    data,
755                    threshold,
756                    sidecar_blobs.len() as u64,
757                );
758            if let Some(blob) = maybe_blob {
759                sidecar_blobs.extend_from_slice(&blob);
760            }
761            manifest.operator_states.insert(name.clone(), op_ckpt);
762        }
763
764        if sidecar_blobs.is_empty() {
765            None
766        } else {
767            Some(sidecar_blobs)
768        }
769    }
770
771    /// Rolls back all exactly-once sinks.
772    async fn rollback_sinks(&self, epoch: u64) -> Result<(), DbError> {
773        for sink in &self.sinks {
774            if sink.exactly_once {
775                sink.handle.rollback_epoch(epoch).await;
776            }
777        }
778        Ok(())
779    }
780
781    /// Collects the last committed epoch from each sink.
782    fn collect_sink_epochs(&self) -> HashMap<String, u64> {
783        let mut epochs = HashMap::with_capacity(self.sinks.len());
784        for sink in &self.sinks {
785            // The epoch being committed is the current one
786            if sink.exactly_once {
787                epochs.insert(sink.name.clone(), self.epoch);
788            }
789        }
790        epochs
791    }
792
793    /// Returns sorted sink names for topology tracking in the manifest.
794    fn sorted_sink_names(&self) -> Vec<String> {
795        let mut names: Vec<String> = self.sinks.iter().map(|s| s.name.clone()).collect();
796        names.sort();
797        names
798    }
799
800    /// Returns the current phase.
801    #[must_use]
802    pub fn phase(&self) -> CheckpointPhase {
803        self.phase
804    }
805
806    /// Returns the current epoch.
807    #[must_use]
808    pub fn epoch(&self) -> u64 {
809        self.epoch
810    }
811
812    /// Returns the next checkpoint ID.
813    #[must_use]
814    pub fn next_checkpoint_id(&self) -> u64 {
815        self.next_checkpoint_id
816    }
817
818    /// Returns the checkpoint config.
819    #[must_use]
820    pub fn config(&self) -> &CheckpointConfig {
821        &self.config
822    }
823
824    /// Adjusts the checkpoint interval based on observed durations.
825    ///
826    /// Uses an exponential moving average (EMA) of checkpoint durations to
827    /// compute a new interval: `interval = smoothed_duration / target_ratio`,
828    /// clamped to `[min_interval, max_interval]`.
829    ///
830    /// No-op if adaptive checkpointing is not configured.
831    fn adjust_interval(&mut self) {
832        let adaptive = match &self.config.adaptive {
833            Some(a) => a.clone(),
834            None => return,
835        };
836
837        #[allow(clippy::cast_precision_loss)] // Checkpoint durations are << 2^52 ms
838        let last_ms = match self.last_checkpoint_duration {
839            Some(d) => d.as_millis() as f64,
840            None => return,
841        };
842
843        // Update EMA
844        if self.smoothed_duration_ms == 0.0 {
845            self.smoothed_duration_ms = last_ms;
846        } else {
847            self.smoothed_duration_ms = adaptive.smoothing_alpha * last_ms
848                + (1.0 - adaptive.smoothing_alpha) * self.smoothed_duration_ms;
849        }
850
851        // Compute target interval: smoothed_ms / (1000 * ratio)
852        let new_interval_secs =
853            self.smoothed_duration_ms / (1000.0 * adaptive.target_overhead_ratio);
854        let new_interval = Duration::from_secs_f64(new_interval_secs);
855
856        // Clamp to bounds
857        let clamped = new_interval.clamp(adaptive.min_interval, adaptive.max_interval);
858
859        let old_interval = self.config.interval;
860        self.config.interval = Some(clamped);
861
862        if old_interval != Some(clamped) {
863            debug!(
864                old_interval_ms = old_interval.map(|d| d.as_millis()),
865                new_interval_ms = clamped.as_millis(),
866                smoothed_duration_ms = self.smoothed_duration_ms,
867                "adaptive checkpoint interval adjusted"
868            );
869        }
870    }
871
872    /// Returns the current smoothed checkpoint duration (milliseconds).
873    ///
874    /// Returns 0.0 if no checkpoints have been completed or adaptive mode
875    /// is not enabled.
876    #[must_use]
877    pub fn smoothed_duration_ms(&self) -> f64 {
878        self.smoothed_duration_ms
879    }
880
881    /// Returns the number of unaligned checkpoints completed.
882    #[must_use]
883    pub fn unaligned_checkpoint_count(&self) -> u64 {
884        self.unaligned_checkpoint_count
885    }
886
887    /// Returns checkpoint statistics.
888    #[must_use]
889    pub fn stats(&self) -> CheckpointStats {
890        let (p50, p95, p99) = self.duration_histogram.percentiles();
891        CheckpointStats {
892            completed: self.checkpoints_completed,
893            failed: self.checkpoints_failed,
894            last_duration: self.last_checkpoint_duration,
895            duration_p50_ms: p50,
896            duration_p95_ms: p95,
897            duration_p99_ms: p99,
898            total_bytes_written: self.total_bytes_written,
899            current_phase: self.phase,
900            current_epoch: self.epoch,
901            unaligned_checkpoint_count: self.unaligned_checkpoint_count,
902        }
903    }
904
905    /// Returns a reference to the underlying store.
906    #[must_use]
907    pub fn store(&self) -> &dyn CheckpointStore {
908        &*self.store
909    }
910
911    /// Performs a full checkpoint cycle with additional table offsets.
912    ///
913    /// Identical to [`checkpoint()`](Self::checkpoint) but merges
914    /// `extra_table_offsets` into the manifest's `table_offsets` field.
915    /// This is useful for `ReferenceTableSource` instances that are not
916    /// registered as `SourceConnector` but still need their offsets persisted.
917    ///
918    /// # Errors
919    ///
920    /// Returns `DbError::Checkpoint` if any phase fails.
921    #[allow(clippy::too_many_arguments)]
922    pub async fn checkpoint_with_extra_tables(
923        &mut self,
924        operator_states: HashMap<String, Vec<u8>>,
925        watermark: Option<i64>,
926        table_store_checkpoint_path: Option<String>,
927        extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
928        source_watermarks: HashMap<String, i64>,
929        pipeline_hash: Option<u64>,
930    ) -> Result<CheckpointResult, DbError> {
931        self.checkpoint_inner(
932            operator_states,
933            watermark,
934            table_store_checkpoint_path,
935            extra_table_offsets,
936            source_watermarks,
937            pipeline_hash,
938            HashMap::new(),
939        )
940        .await
941    }
942
943    /// Performs a full checkpoint with pre-captured source offsets.
944    ///
945    /// When `source_offset_overrides` is non-empty, those sources skip the
946    /// live `snapshot_sources()` call and use the provided offsets instead.
947    /// This is essential for barrier-aligned checkpoints where source
948    /// positions must match the operator state at the barrier point.
949    ///
950    /// # Errors
951    ///
952    /// Returns `DbError::Checkpoint` if any phase fails.
953    #[allow(clippy::too_many_arguments)]
954    pub async fn checkpoint_with_offsets(
955        &mut self,
956        operator_states: HashMap<String, Vec<u8>>,
957        watermark: Option<i64>,
958        table_store_checkpoint_path: Option<String>,
959        extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
960        source_watermarks: HashMap<String, i64>,
961        pipeline_hash: Option<u64>,
962        source_offset_overrides: HashMap<String, ConnectorCheckpoint>,
963    ) -> Result<CheckpointResult, DbError> {
964        self.checkpoint_inner(
965            operator_states,
966            watermark,
967            table_store_checkpoint_path,
968            extra_table_offsets,
969            source_watermarks,
970            pipeline_hash,
971            source_offset_overrides,
972        )
973        .await
974    }
975
976    /// Shared checkpoint implementation for all checkpoint entry points.
977    ///
978    /// WAL preparation is performed internally to guarantee atomicity:
979    /// changelog drain + epoch barrier + WAL sync happen after operator
980    /// states are received, so the WAL positions in the manifest always
981    /// reflect the state after the operator snapshot.
982    ///
983    /// When `source_offset_overrides` is non-empty, those sources use the
984    /// provided offsets instead of calling `snapshot_sources()`. This ensures
985    /// barrier-aligned and pre-captured offsets are used atomically.
986    #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
987    async fn checkpoint_inner(
988        &mut self,
989        operator_states: HashMap<String, Vec<u8>>,
990        watermark: Option<i64>,
991        table_store_checkpoint_path: Option<String>,
992        extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
993        source_watermarks: HashMap<String, i64>,
994        pipeline_hash: Option<u64>,
995        source_offset_overrides: HashMap<String, ConnectorCheckpoint>,
996    ) -> Result<CheckpointResult, DbError> {
997        let start = Instant::now();
998        let checkpoint_id = self.next_checkpoint_id;
999        let epoch = self.epoch;
1000
1001        info!(checkpoint_id, epoch, "starting checkpoint");
1002
1003        // ── Step 2b: WAL preparation (internal) ──
1004        // Drain changelog, write epoch barriers, sync WAL, capture positions.
1005        // This MUST happen after operator states are received (passed as args)
1006        // to guarantee WAL positions reflect post-snapshot state.
1007        let wal_result = self.prepare_wal_for_checkpoint()?;
1008        let per_core_wal_positions = wal_result.per_core_wal_positions;
1009
1010        // ── Step 3: Source offsets ──
1011        // Source offsets are provided by the caller (pre-captured at barrier
1012        // alignment or pre-spawn). Table offsets come from extra_table_offsets.
1013        self.phase = CheckpointPhase::Snapshotting;
1014        let source_offsets = source_offset_overrides;
1015        let table_offsets = extra_table_offsets;
1016
1017        // ── Step 4: Sink pre-commit ──
1018        self.phase = CheckpointPhase::PreCommitting;
1019        if let Err(e) = self.pre_commit_sinks(epoch).await {
1020            self.phase = CheckpointPhase::Idle;
1021            self.checkpoints_failed += 1;
1022            self.maybe_cap_drainers();
1023            let duration = start.elapsed();
1024            self.emit_checkpoint_metrics(false, epoch, duration);
1025            error!(checkpoint_id, epoch, error = %e, "pre-commit failed");
1026            return Ok(CheckpointResult {
1027                success: false,
1028                checkpoint_id,
1029                epoch,
1030                duration,
1031                error: Some(format!("pre-commit failed: {e}")),
1032            });
1033        }
1034
1035        // ── Build manifest ──
1036        let mut manifest = CheckpointManifest::new(checkpoint_id, epoch);
1037        manifest.source_offsets = source_offsets;
1038        manifest.table_offsets = table_offsets;
1039        manifest.sink_epochs = self.collect_sink_epochs();
1040        // Mark all exactly-once sinks as Pending before commit phase.
1041        manifest.sink_commit_statuses = self.initial_sink_commit_statuses();
1042        manifest.watermark = watermark;
1043        // Use caller-provided per-source watermarks if available. When empty,
1044        // leave source_watermarks empty — recovery falls back to the global
1045        // manifest.watermark. Do NOT fabricate per-source values from the
1046        // global watermark, as that loses granularity on recovery.
1047        manifest.source_watermarks = source_watermarks;
1048        // wal_position is legacy (single-writer mode); per-core positions are authoritative.
1049        manifest.per_core_wal_positions = per_core_wal_positions;
1050        manifest.table_store_checkpoint_path = table_store_checkpoint_path;
1051        manifest.is_incremental = self.config.incremental;
1052        manifest.source_names = {
1053            let mut names: Vec<String> = manifest.source_offsets.keys().cloned().collect();
1054            names.sort();
1055            names
1056        };
1057        manifest.sink_names = self.sorted_sink_names();
1058        manifest.pipeline_hash = pipeline_hash;
1059
1060        let state_data = Self::pack_operator_states(
1061            &mut manifest,
1062            &operator_states,
1063            self.config.state_inline_threshold,
1064        );
1065        let sidecar_bytes = state_data.as_ref().map_or(0, Vec::len);
1066        if sidecar_bytes > 0 {
1067            debug!(
1068                checkpoint_id,
1069                sidecar_bytes, "writing operator state sidecar"
1070            );
1071        }
1072
1073        // ── Step 4b: Checkpoint size check ──
1074        if let Some(cap) = self.config.max_checkpoint_bytes {
1075            if sidecar_bytes > cap {
1076                self.phase = CheckpointPhase::Idle;
1077                self.checkpoints_failed += 1;
1078                self.maybe_cap_drainers();
1079                let duration = start.elapsed();
1080                self.emit_checkpoint_metrics(false, epoch, duration);
1081                let msg = format!(
1082                    "[LDB-6014] checkpoint size {sidecar_bytes} bytes exceeds \
1083                     cap {cap} bytes — checkpoint rejected"
1084                );
1085                error!(checkpoint_id, epoch, sidecar_bytes, cap, "{msg}");
1086                return Ok(CheckpointResult {
1087                    success: false,
1088                    checkpoint_id,
1089                    epoch,
1090                    duration,
1091                    error: Some(msg),
1092                });
1093            }
1094            let warn_threshold = cap * 4 / 5; // 80%
1095            if sidecar_bytes > warn_threshold {
1096                warn!(
1097                    checkpoint_id,
1098                    epoch, sidecar_bytes, cap, "checkpoint size approaching cap (>80%)"
1099                );
1100            }
1101        }
1102        // Track cumulative bytes written (updated after successful persist below).
1103        let checkpoint_bytes = sidecar_bytes as u64;
1104
1105        // ── Step 5: Persist manifest (decision record — sinks are Pending) ──
1106        self.phase = CheckpointPhase::Persisting;
1107        if let Err(e) = self.save_manifest(manifest.clone(), state_data).await {
1108            self.phase = CheckpointPhase::Idle;
1109            self.checkpoints_failed += 1;
1110            self.maybe_cap_drainers();
1111            let duration = start.elapsed();
1112            self.emit_checkpoint_metrics(false, epoch, duration);
1113            if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1114                error!(
1115                    checkpoint_id,
1116                    epoch,
1117                    error = %rollback_err,
1118                    "[LDB-6004] sink rollback failed after manifest persist failure — \
1119                     sinks may be in an inconsistent state"
1120                );
1121            }
1122            error!(checkpoint_id, epoch, error = %e, "[LDB-6008] manifest persist failed");
1123            return Ok(CheckpointResult {
1124                success: false,
1125                checkpoint_id,
1126                epoch,
1127                duration,
1128                error: Some(format!("manifest persist failed: {e}")),
1129            });
1130        }
1131
1132        // ── Step 6: Sink commit (per-sink tracking) ──
1133        self.phase = CheckpointPhase::Committing;
1134        let sink_statuses = self.commit_sinks_tracked(epoch).await;
1135        let has_failures = sink_statuses
1136            .values()
1137            .any(|s| matches!(s, SinkCommitStatus::Failed(_)));
1138
1139        // ── Step 6b: Overwrite manifest with final sink commit statuses ──
1140        if !sink_statuses.is_empty() {
1141            manifest.sink_commit_statuses = sink_statuses;
1142            if let Err(e) = self.update_manifest_only(&manifest).await {
1143                warn!(
1144                    checkpoint_id,
1145                    epoch,
1146                    error = %e,
1147                    "post-commit manifest update failed"
1148                );
1149            }
1150        }
1151
1152        if has_failures {
1153            self.checkpoints_failed += 1;
1154            error!(
1155                checkpoint_id,
1156                epoch, "sink commit partially failed — epoch NOT advanced, will retry"
1157            );
1158            self.phase = CheckpointPhase::Idle;
1159            let duration = start.elapsed();
1160            self.emit_checkpoint_metrics(false, epoch, duration);
1161            return Ok(CheckpointResult {
1162                success: false,
1163                checkpoint_id,
1164                epoch,
1165                duration,
1166                error: Some("partial sink commit failure".into()),
1167            });
1168        }
1169
1170        // ── Step 7: Clear changelog drainer pending buffers ──
1171        // Entries are metadata-only (key_hash + mmap_offset) used for SPSC
1172        // backpressure relief. The checkpoint captured full state, so the
1173        // metadata is no longer needed. Clearing prevents unbounded growth.
1174        for drainer in &mut self.changelog_drainers {
1175            drainer.clear_pending();
1176        }
1177
1178        // ── Success ──
1179        self.phase = CheckpointPhase::Idle;
1180        self.next_checkpoint_id += 1;
1181        self.epoch += 1;
1182        self.checkpoints_completed += 1;
1183        self.total_bytes_written += checkpoint_bytes;
1184        let duration = start.elapsed();
1185        self.last_checkpoint_duration = Some(duration);
1186        self.duration_histogram.record(duration);
1187        self.emit_checkpoint_metrics(true, epoch, duration);
1188        self.adjust_interval();
1189
1190        // ── Step 8: Begin next epoch on exactly-once sinks ──
1191        let next_epoch = self.epoch;
1192        let begin_epoch_error = match self.begin_epoch_for_sinks(next_epoch).await {
1193            Ok(()) => None,
1194            Err(e) => {
1195                error!(
1196                    next_epoch,
1197                    error = %e,
1198                    "[LDB-6015] failed to begin next epoch — writes will be non-transactional"
1199                );
1200                Some(e.to_string())
1201            }
1202        };
1203
1204        info!(
1205            checkpoint_id,
1206            epoch,
1207            duration_ms = duration.as_millis(),
1208            "checkpoint completed"
1209        );
1210
1211        // The checkpoint itself succeeded (state persisted, sinks committed).
1212        // begin_epoch failure for the *next* epoch is reported as a warning
1213        // but does not retroactively fail the completed checkpoint.
1214        Ok(CheckpointResult {
1215            success: true,
1216            checkpoint_id,
1217            epoch,
1218            duration,
1219            error: begin_epoch_error,
1220        })
1221    }
1222
1223    /// Attempts recovery from the latest checkpoint.
1224    ///
1225    /// Creates a [`RecoveryManager`](crate::recovery_manager::RecoveryManager)
1226    /// using the coordinator's store and delegates recovery to it.
1227    /// On success, advances `self.epoch` past the recovered epoch so the
1228    /// next checkpoint gets a fresh epoch number.
1229    ///
1230    /// Returns `Ok(None)` for a fresh start (no checkpoint found).
1231    ///
1232    /// # Errors
1233    ///
1234    /// Returns `DbError::Checkpoint` if the store itself fails.
1235    pub async fn recover(
1236        &mut self,
1237    ) -> Result<Option<crate::recovery_manager::RecoveredState>, DbError> {
1238        use crate::recovery_manager::RecoveryManager;
1239
1240        let mgr = RecoveryManager::new(&*self.store);
1241        // Sources and table sources are restored by the pipeline lifecycle
1242        // (via SourceRegistration.restore_checkpoint), not by the coordinator.
1243        // Pass empty slices — the coordinator only manages sink recovery here.
1244        let result = mgr.recover(&[], &self.sinks, &[]).await?;
1245
1246        if let Some(ref recovered) = result {
1247            // Advance epoch past the recovered one
1248            self.epoch = recovered.epoch() + 1;
1249            self.next_checkpoint_id = recovered.manifest.checkpoint_id + 1;
1250            info!(
1251                epoch = self.epoch,
1252                checkpoint_id = self.next_checkpoint_id,
1253                "coordinator epoch set after recovery"
1254            );
1255        }
1256
1257        Ok(result)
1258    }
1259
1260    /// Loads the latest manifest from the store.
1261    ///
1262    /// # Errors
1263    ///
1264    /// Returns `DbError::Checkpoint` on store errors.
1265    pub fn load_latest_manifest(&self) -> Result<Option<CheckpointManifest>, DbError> {
1266        self.store
1267            .load_latest()
1268            .map_err(|e| DbError::Checkpoint(format!("failed to load latest manifest: {e}")))
1269    }
1270}
1271
1272impl std::fmt::Debug for CheckpointCoordinator {
1273    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1274        f.debug_struct("CheckpointCoordinator")
1275            .field("epoch", &self.epoch)
1276            .field("next_checkpoint_id", &self.next_checkpoint_id)
1277            .field("phase", &self.phase)
1278            .field("sinks", &self.sinks.len())
1279            .field("has_wal_manager", &self.wal_manager.is_some())
1280            .field("changelog_drainers", &self.changelog_drainers.len())
1281            .field("completed", &self.checkpoints_completed)
1282            .field("failed", &self.checkpoints_failed)
1283            .finish_non_exhaustive()
1284    }
1285}
1286
1287/// Fixed-size ring buffer for checkpoint duration percentile tracking.
1288///
1289/// Stores the last `CAPACITY` checkpoint durations and computes p50/p95/p99
1290/// via sorted extraction. No heap allocation after construction.
1291#[derive(Clone)]
1292pub struct DurationHistogram {
1293    /// Ring buffer of durations in milliseconds.
1294    samples: Box<[u64; Self::CAPACITY]>,
1295    /// Write cursor (wraps at `CAPACITY`).
1296    cursor: usize,
1297    /// Total samples written (may exceed `CAPACITY`).
1298    count: u64,
1299}
1300
1301impl DurationHistogram {
1302    const CAPACITY: usize = 100;
1303
1304    /// Creates an empty histogram.
1305    #[must_use]
1306    fn new() -> Self {
1307        Self {
1308            samples: Box::new([0; Self::CAPACITY]),
1309            cursor: 0,
1310            count: 0,
1311        }
1312    }
1313
1314    /// Records a checkpoint duration.
1315    fn record(&mut self, duration: Duration) {
1316        #[allow(clippy::cast_possible_truncation)]
1317        let ms = duration.as_millis() as u64;
1318        self.samples[self.cursor] = ms;
1319        self.cursor = (self.cursor + 1) % Self::CAPACITY;
1320        self.count += 1;
1321    }
1322
1323    /// Returns the number of recorded samples (up to `CAPACITY`).
1324    #[must_use]
1325    fn len(&self) -> usize {
1326        if self.count >= Self::CAPACITY as u64 {
1327            Self::CAPACITY
1328        } else {
1329            // SAFETY: count < CAPACITY (100), which always fits in usize.
1330            #[allow(clippy::cast_possible_truncation)]
1331            {
1332                self.count as usize
1333            }
1334        }
1335    }
1336
1337    /// Computes a percentile (0.0–1.0) from recorded samples.
1338    ///
1339    /// Returns 0 if no samples have been recorded.
1340    #[must_use]
1341    fn percentile(&self, p: f64) -> u64 {
1342        let n = self.len();
1343        if n == 0 {
1344            return 0;
1345        }
1346        let mut sorted: Vec<u64> = self.samples[..n].to_vec();
1347        sorted.sort_unstable();
1348        #[allow(
1349            clippy::cast_possible_truncation,
1350            clippy::cast_sign_loss,
1351            clippy::cast_precision_loss
1352        )]
1353        let idx = ((p * (n as f64 - 1.0)).ceil() as usize).min(n - 1);
1354        sorted[idx]
1355    }
1356
1357    /// Returns (p50, p95, p99) in milliseconds.
1358    #[must_use]
1359    fn percentiles(&self) -> (u64, u64, u64) {
1360        (
1361            self.percentile(0.50),
1362            self.percentile(0.95),
1363            self.percentile(0.99),
1364        )
1365    }
1366}
1367
1368impl std::fmt::Debug for DurationHistogram {
1369    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1370        let (p50, p95, p99) = self.percentiles();
1371        f.debug_struct("DurationHistogram")
1372            .field("samples_len", &self.samples.len())
1373            .field("cursor", &self.cursor)
1374            .field("count", &self.count)
1375            .field("p50_ms", &p50)
1376            .field("p95_ms", &p95)
1377            .field("p99_ms", &p99)
1378            .finish()
1379    }
1380}
1381
1382/// Checkpoint performance statistics.
1383#[derive(Debug, Clone, serde::Serialize)]
1384pub struct CheckpointStats {
1385    /// Total completed checkpoints.
1386    pub completed: u64,
1387    /// Total failed checkpoints.
1388    pub failed: u64,
1389    /// Duration of the last checkpoint.
1390    pub last_duration: Option<Duration>,
1391    /// p50 checkpoint duration in milliseconds.
1392    pub duration_p50_ms: u64,
1393    /// p95 checkpoint duration in milliseconds.
1394    pub duration_p95_ms: u64,
1395    /// p99 checkpoint duration in milliseconds.
1396    pub duration_p99_ms: u64,
1397    /// Total bytes written across all checkpoints.
1398    pub total_bytes_written: u64,
1399    /// Current checkpoint phase.
1400    pub current_phase: CheckpointPhase,
1401    /// Current epoch number.
1402    pub current_epoch: u64,
1403    /// Number of checkpoints completed in unaligned mode.
1404    pub unaligned_checkpoint_count: u64,
1405}
1406
1407// ── Conversion helpers ──
1408
1409/// Converts a `SourceCheckpoint` to a `ConnectorCheckpoint`.
1410#[must_use]
1411pub fn source_to_connector_checkpoint(cp: &SourceCheckpoint) -> ConnectorCheckpoint {
1412    ConnectorCheckpoint {
1413        offsets: cp.offsets().clone(),
1414        epoch: cp.epoch(),
1415        metadata: cp.metadata().clone(),
1416    }
1417}
1418
1419/// Converts a `ConnectorCheckpoint` back to a `SourceCheckpoint`.
1420#[must_use]
1421pub fn connector_to_source_checkpoint(cp: &ConnectorCheckpoint) -> SourceCheckpoint {
1422    let mut source_cp = SourceCheckpoint::with_offsets(cp.epoch, cp.offsets.clone());
1423    for (k, v) in &cp.metadata {
1424        source_cp.set_metadata(k.clone(), v.clone());
1425    }
1426    source_cp
1427}
1428
1429// ── DAG operator state conversion helpers ──
1430
1431/// Converts DAG operator states (from `DagCheckpointSnapshot`) to manifest format.
1432///
1433/// Uses `"{node_id}"` as the key and base64-encodes the state data.
1434#[must_use]
1435pub fn dag_snapshot_to_manifest_operators<S: std::hash::BuildHasher>(
1436    node_states: &std::collections::HashMap<
1437        u32,
1438        laminar_core::dag::recovery::SerializableOperatorState,
1439        S,
1440    >,
1441) -> HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint> {
1442    node_states
1443        .iter()
1444        .map(|(id, state)| {
1445            (
1446                id.to_string(),
1447                laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&state.data),
1448            )
1449        })
1450        .collect()
1451}
1452
1453/// Converts manifest operator states back to DAG format for recovery.
1454///
1455/// Parses string keys as node IDs and decodes base64 state data.
1456#[must_use]
1457pub fn manifest_operators_to_dag_states<S: std::hash::BuildHasher>(
1458    operators: &HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint, S>,
1459) -> rustc_hash::FxHashMap<laminar_core::dag::topology::NodeId, laminar_core::operator::OperatorState>
1460{
1461    let mut states =
1462        rustc_hash::FxHashMap::with_capacity_and_hasher(operators.len(), rustc_hash::FxBuildHasher);
1463    for (key, op_ckpt) in operators {
1464        if let Ok(node_id) = key.parse::<u32>() {
1465            if let Some(data) = op_ckpt.decode_inline() {
1466                states.insert(
1467                    laminar_core::dag::topology::NodeId(node_id),
1468                    laminar_core::operator::OperatorState {
1469                        operator_id: key.clone(),
1470                        data,
1471                    },
1472                );
1473            }
1474        }
1475    }
1476    states
1477}
1478
1479#[cfg(test)]
1480mod tests {
1481    use super::*;
1482    use laminar_storage::checkpoint_store::FileSystemCheckpointStore;
1483
1484    fn make_coordinator(dir: &std::path::Path) -> CheckpointCoordinator {
1485        let store = Box::new(FileSystemCheckpointStore::new(dir, 3));
1486        CheckpointCoordinator::new(CheckpointConfig::default(), store)
1487    }
1488
1489    #[test]
1490    fn test_coordinator_new() {
1491        let dir = tempfile::tempdir().unwrap();
1492        let coord = make_coordinator(dir.path());
1493
1494        assert_eq!(coord.epoch(), 1);
1495        assert_eq!(coord.next_checkpoint_id(), 1);
1496        assert_eq!(coord.phase(), CheckpointPhase::Idle);
1497    }
1498
1499    #[test]
1500    fn test_coordinator_resumes_from_stored_checkpoint() {
1501        let dir = tempfile::tempdir().unwrap();
1502
1503        // Save a checkpoint manually
1504        let store = FileSystemCheckpointStore::new(dir.path(), 3);
1505        let m = CheckpointManifest::new(5, 10);
1506        store.save(&m).unwrap();
1507
1508        // Coordinator should resume from epoch 11, checkpoint_id 6
1509        let coord = make_coordinator(dir.path());
1510        assert_eq!(coord.epoch(), 11);
1511        assert_eq!(coord.next_checkpoint_id(), 6);
1512    }
1513
1514    #[test]
1515    fn test_checkpoint_phase_display() {
1516        assert_eq!(CheckpointPhase::Idle.to_string(), "Idle");
1517        assert_eq!(
1518            CheckpointPhase::BarrierInFlight.to_string(),
1519            "BarrierInFlight"
1520        );
1521        assert_eq!(CheckpointPhase::Snapshotting.to_string(), "Snapshotting");
1522        assert_eq!(CheckpointPhase::PreCommitting.to_string(), "PreCommitting");
1523        assert_eq!(CheckpointPhase::Persisting.to_string(), "Persisting");
1524        assert_eq!(CheckpointPhase::Committing.to_string(), "Committing");
1525    }
1526
1527    #[test]
1528    fn test_source_to_connector_checkpoint() {
1529        let mut cp = SourceCheckpoint::new(5);
1530        cp.set_offset("partition-0", "1234");
1531        cp.set_metadata("topic", "events");
1532
1533        let cc = source_to_connector_checkpoint(&cp);
1534        assert_eq!(cc.epoch, 5);
1535        assert_eq!(cc.offsets.get("partition-0"), Some(&"1234".into()));
1536        assert_eq!(cc.metadata.get("topic"), Some(&"events".into()));
1537    }
1538
1539    #[test]
1540    fn test_connector_to_source_checkpoint() {
1541        let cc = ConnectorCheckpoint {
1542            offsets: HashMap::from([("lsn".into(), "0/ABCD".into())]),
1543            epoch: 3,
1544            metadata: HashMap::from([("type".into(), "postgres".into())]),
1545        };
1546
1547        let cp = connector_to_source_checkpoint(&cc);
1548        assert_eq!(cp.epoch(), 3);
1549        assert_eq!(cp.get_offset("lsn"), Some("0/ABCD"));
1550        assert_eq!(cp.get_metadata("type"), Some("postgres"));
1551    }
1552
1553    #[test]
1554    fn test_stats_initial() {
1555        let dir = tempfile::tempdir().unwrap();
1556        let coord = make_coordinator(dir.path());
1557        let stats = coord.stats();
1558
1559        assert_eq!(stats.completed, 0);
1560        assert_eq!(stats.failed, 0);
1561        assert!(stats.last_duration.is_none());
1562        assert_eq!(stats.duration_p50_ms, 0);
1563        assert_eq!(stats.duration_p95_ms, 0);
1564        assert_eq!(stats.duration_p99_ms, 0);
1565        assert_eq!(stats.current_phase, CheckpointPhase::Idle);
1566    }
1567
1568    #[tokio::test]
1569    async fn test_checkpoint_no_sources_no_sinks() {
1570        let dir = tempfile::tempdir().unwrap();
1571        let mut coord = make_coordinator(dir.path());
1572
1573        let result = coord
1574            .checkpoint(HashMap::new(), Some(1000), None, HashMap::new(), None)
1575            .await
1576            .unwrap();
1577
1578        assert!(result.success);
1579        assert_eq!(result.checkpoint_id, 1);
1580        assert_eq!(result.epoch, 1);
1581
1582        // Verify manifest was persisted
1583        let loaded = coord.store().load_latest().unwrap().unwrap();
1584        assert_eq!(loaded.checkpoint_id, 1);
1585        assert_eq!(loaded.epoch, 1);
1586        assert_eq!(loaded.watermark, Some(1000));
1587
1588        // Second checkpoint should increment
1589        let result2 = coord
1590            .checkpoint(HashMap::new(), Some(2000), None, HashMap::new(), None)
1591            .await
1592            .unwrap();
1593
1594        assert!(result2.success);
1595        assert_eq!(result2.checkpoint_id, 2);
1596        assert_eq!(result2.epoch, 2);
1597
1598        let stats = coord.stats();
1599        assert_eq!(stats.completed, 2);
1600        assert_eq!(stats.failed, 0);
1601    }
1602
1603    #[tokio::test]
1604    async fn test_checkpoint_with_operator_states() {
1605        let dir = tempfile::tempdir().unwrap();
1606        let mut coord = make_coordinator(dir.path());
1607
1608        let mut ops = HashMap::new();
1609        ops.insert("window-agg".into(), b"state-data".to_vec());
1610        ops.insert("filter".into(), b"filter-state".to_vec());
1611
1612        let result = coord
1613            .checkpoint(ops, None, None, HashMap::new(), None)
1614            .await
1615            .unwrap();
1616
1617        assert!(result.success);
1618
1619        let loaded = coord.store().load_latest().unwrap().unwrap();
1620        assert_eq!(loaded.operator_states.len(), 2);
1621        // No WAL manager registered → positions are empty
1622        assert!(loaded.per_core_wal_positions.is_empty());
1623
1624        let window_op = loaded.operator_states.get("window-agg").unwrap();
1625        assert_eq!(window_op.decode_inline().unwrap(), b"state-data");
1626    }
1627
1628    #[tokio::test]
1629    async fn test_checkpoint_with_table_store_path() {
1630        let dir = tempfile::tempdir().unwrap();
1631        let mut coord = make_coordinator(dir.path());
1632
1633        let result = coord
1634            .checkpoint(
1635                HashMap::new(),
1636                None,
1637                Some("/tmp/rocksdb_cp".into()),
1638                HashMap::new(),
1639                None,
1640            )
1641            .await
1642            .unwrap();
1643
1644        assert!(result.success);
1645
1646        let loaded = coord.store().load_latest().unwrap().unwrap();
1647        assert_eq!(
1648            loaded.table_store_checkpoint_path.as_deref(),
1649            Some("/tmp/rocksdb_cp")
1650        );
1651    }
1652
1653    #[test]
1654    fn test_load_latest_manifest_empty() {
1655        let dir = tempfile::tempdir().unwrap();
1656        let coord = make_coordinator(dir.path());
1657        assert!(coord.load_latest_manifest().unwrap().is_none());
1658    }
1659
1660    #[test]
1661    fn test_coordinator_debug() {
1662        let dir = tempfile::tempdir().unwrap();
1663        let coord = make_coordinator(dir.path());
1664        let debug = format!("{coord:?}");
1665        assert!(debug.contains("CheckpointCoordinator"));
1666        assert!(debug.contains("epoch: 1"));
1667    }
1668
1669    // ── Operator state persistence tests ──
1670
1671    #[test]
1672    fn test_dag_snapshot_to_manifest_operators() {
1673        use laminar_core::dag::recovery::SerializableOperatorState;
1674
1675        let mut node_states = HashMap::new();
1676        node_states.insert(
1677            0,
1678            SerializableOperatorState {
1679                operator_id: "window-agg".into(),
1680                data: b"window-state".to_vec(),
1681            },
1682        );
1683        node_states.insert(
1684            3,
1685            SerializableOperatorState {
1686                operator_id: "filter".into(),
1687                data: b"filter-state".to_vec(),
1688            },
1689        );
1690
1691        let manifest_ops = dag_snapshot_to_manifest_operators(&node_states);
1692        assert_eq!(manifest_ops.len(), 2);
1693
1694        let w = manifest_ops.get("0").unwrap();
1695        assert_eq!(w.decode_inline().unwrap(), b"window-state");
1696        let f = manifest_ops.get("3").unwrap();
1697        assert_eq!(f.decode_inline().unwrap(), b"filter-state");
1698    }
1699
1700    #[test]
1701    fn test_manifest_operators_to_dag_states() {
1702        use laminar_storage::checkpoint_manifest::OperatorCheckpoint;
1703
1704        let mut operators = HashMap::new();
1705        operators.insert("0".into(), OperatorCheckpoint::inline(b"state-0"));
1706        operators.insert("5".into(), OperatorCheckpoint::inline(b"state-5"));
1707
1708        let dag_states = manifest_operators_to_dag_states(&operators);
1709        assert_eq!(dag_states.len(), 2);
1710
1711        let s0 = dag_states
1712            .get(&laminar_core::dag::topology::NodeId(0))
1713            .unwrap();
1714        assert_eq!(s0.data, b"state-0");
1715
1716        let s5 = dag_states
1717            .get(&laminar_core::dag::topology::NodeId(5))
1718            .unwrap();
1719        assert_eq!(s5.data, b"state-5");
1720    }
1721
1722    #[test]
1723    fn test_operator_state_round_trip_through_manifest() {
1724        use laminar_core::dag::recovery::SerializableOperatorState;
1725
1726        // Original DAG states
1727        let mut node_states = HashMap::new();
1728        node_states.insert(
1729            7,
1730            SerializableOperatorState {
1731                operator_id: "join".into(),
1732                data: vec![1, 2, 3, 4, 5],
1733            },
1734        );
1735
1736        // DAG → manifest
1737        let manifest_ops = dag_snapshot_to_manifest_operators(&node_states);
1738
1739        // Persist and reload (simulate)
1740        let json = serde_json::to_string(&manifest_ops).unwrap();
1741        let reloaded: HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint> =
1742            serde_json::from_str(&json).unwrap();
1743
1744        // Manifest → DAG
1745        let recovered = manifest_operators_to_dag_states(&reloaded);
1746        let state = recovered
1747            .get(&laminar_core::dag::topology::NodeId(7))
1748            .unwrap();
1749        assert_eq!(state.data, vec![1, 2, 3, 4, 5]);
1750    }
1751
1752    #[test]
1753    fn test_manifest_operators_skips_invalid_keys() {
1754        use laminar_storage::checkpoint_manifest::OperatorCheckpoint;
1755
1756        let mut operators = HashMap::new();
1757        operators.insert("not-a-number".into(), OperatorCheckpoint::inline(b"data"));
1758        operators.insert("42".into(), OperatorCheckpoint::inline(b"good"));
1759
1760        let dag_states = manifest_operators_to_dag_states(&operators);
1761        // Only the numeric key should survive
1762        assert_eq!(dag_states.len(), 1);
1763        assert!(dag_states.contains_key(&laminar_core::dag::topology::NodeId(42)));
1764    }
1765
1766    // ── WAL checkpoint coordination tests ──
1767
1768    #[test]
1769    fn test_prepare_wal_no_wal_manager() {
1770        let dir = tempfile::tempdir().unwrap();
1771        let mut coord = make_coordinator(dir.path());
1772
1773        // Without a WAL manager, prepare should succeed with empty positions.
1774        let result = coord.prepare_wal_for_checkpoint().unwrap();
1775        assert!(result.per_core_wal_positions.is_empty());
1776        assert_eq!(result.entries_drained, 0);
1777    }
1778
1779    #[test]
1780    fn test_prepare_wal_with_manager() {
1781        let dir = tempfile::tempdir().unwrap();
1782        let mut coord = make_coordinator(dir.path());
1783
1784        // Set up a per-core WAL manager
1785        let wal_dir = dir.path().join("wal");
1786        std::fs::create_dir_all(&wal_dir).unwrap();
1787        let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1788        let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1789        coord.register_wal_manager(wal);
1790
1791        // Write some data to WAL
1792        coord
1793            .wal_manager_mut()
1794            .unwrap()
1795            .writer(0)
1796            .append_put(b"key", b"value")
1797            .unwrap();
1798
1799        let result = coord.prepare_wal_for_checkpoint().unwrap();
1800        assert_eq!(result.per_core_wal_positions.len(), 2);
1801        // Positions should be > 0 (epoch barrier + data)
1802        assert!(result.per_core_wal_positions.iter().any(|p| *p > 0));
1803    }
1804
1805    #[test]
1806    fn test_truncate_wal_no_manager() {
1807        let dir = tempfile::tempdir().unwrap();
1808        let mut coord = make_coordinator(dir.path());
1809
1810        // Without a WAL manager, truncation is a no-op.
1811        coord.truncate_wal_after_checkpoint(vec![]).unwrap();
1812    }
1813
1814    #[test]
1815    fn test_truncate_wal_with_manager() {
1816        let dir = tempfile::tempdir().unwrap();
1817        let mut coord = make_coordinator(dir.path());
1818
1819        let wal_dir = dir.path().join("wal");
1820        std::fs::create_dir_all(&wal_dir).unwrap();
1821        let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1822        let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1823        coord.register_wal_manager(wal);
1824
1825        // Write data
1826        coord
1827            .wal_manager_mut()
1828            .unwrap()
1829            .writer(0)
1830            .append_put(b"key", b"value")
1831            .unwrap();
1832
1833        assert!(coord.wal_manager().unwrap().total_size() > 0);
1834
1835        // Truncate
1836        coord.truncate_wal_after_checkpoint(vec![]).unwrap();
1837        assert_eq!(coord.wal_manager().unwrap().total_size(), 0);
1838    }
1839
1840    #[test]
1841    fn test_truncate_wal_safety_buffer() {
1842        let dir = tempfile::tempdir().unwrap();
1843        let mut coord = make_coordinator(dir.path());
1844
1845        let wal_dir = dir.path().join("wal");
1846        std::fs::create_dir_all(&wal_dir).unwrap();
1847        let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1848        let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1849        coord.register_wal_manager(wal);
1850
1851        // Write some data before first checkpoint
1852        coord
1853            .wal_manager_mut()
1854            .unwrap()
1855            .writer(0)
1856            .append_put(b"k1", b"v1")
1857            .unwrap();
1858
1859        // First truncation (no previous positions) — resets to 0
1860        coord.truncate_wal_after_checkpoint(vec![100, 200]).unwrap();
1861        assert_eq!(coord.wal_manager().unwrap().total_size(), 0);
1862
1863        // Write more data
1864        coord
1865            .wal_manager_mut()
1866            .unwrap()
1867            .writer(0)
1868            .append_put(b"k2", b"v2")
1869            .unwrap();
1870        let size_after_write = coord.wal_manager().unwrap().total_size();
1871        assert!(size_after_write > 0);
1872
1873        // Second truncation — truncates to previous positions (100, 200),
1874        // not to 0. Since actual WAL positions are smaller than 100, this
1875        // effectively keeps all current data as safety buffer.
1876        coord.truncate_wal_after_checkpoint(vec![300, 400]).unwrap();
1877        // Data should still be present (positions 100,200 are beyond what
1878        // was written, so truncation is a no-op for the data range)
1879        assert!(coord.wal_manager().unwrap().total_size() > 0);
1880    }
1881
1882    #[test]
1883    fn test_prepare_wal_with_changelog_drainer() {
1884        use laminar_storage::incremental::StateChangelogBuffer;
1885        use laminar_storage::incremental::StateChangelogEntry;
1886        use std::sync::Arc;
1887
1888        let dir = tempfile::tempdir().unwrap();
1889        let mut coord = make_coordinator(dir.path());
1890
1891        // Set up a changelog buffer and drainer
1892        let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
1893
1894        // Push some entries
1895        buf.push(StateChangelogEntry::put(1, 100, 0, 10));
1896        buf.push(StateChangelogEntry::put(1, 200, 10, 20));
1897        buf.push(StateChangelogEntry::delete(1, 300));
1898
1899        let drainer = ChangelogDrainer::new(buf, 100);
1900        coord.register_changelog_drainer(drainer);
1901
1902        let result = coord.prepare_wal_for_checkpoint().unwrap();
1903        assert_eq!(result.entries_drained, 3);
1904        assert!(result.per_core_wal_positions.is_empty()); // No WAL manager
1905
1906        // Drainer should have pending entries
1907        assert_eq!(coord.changelog_drainers()[0].pending_count(), 3);
1908    }
1909
1910    #[tokio::test]
1911    async fn test_full_checkpoint_with_wal_coordination() {
1912        use laminar_storage::incremental::StateChangelogBuffer;
1913        use laminar_storage::incremental::StateChangelogEntry;
1914        use std::sync::Arc;
1915
1916        let dir = tempfile::tempdir().unwrap();
1917        let mut coord = make_coordinator(dir.path());
1918
1919        // Set up WAL manager
1920        let wal_dir = dir.path().join("wal");
1921        std::fs::create_dir_all(&wal_dir).unwrap();
1922        let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1923        let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1924        coord.register_wal_manager(wal);
1925
1926        // Set up changelog drainer
1927        let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
1928        buf.push(StateChangelogEntry::put(1, 100, 0, 10));
1929        let drainer = ChangelogDrainer::new(buf, 100);
1930        coord.register_changelog_drainer(drainer);
1931
1932        // Full cycle: checkpoint (WAL preparation is internal) → truncate
1933        let result = coord
1934            .checkpoint(HashMap::new(), Some(5000), None, HashMap::new(), None)
1935            .await
1936            .unwrap();
1937
1938        assert!(result.success);
1939
1940        // Changelog drainer pending should be cleared after successful checkpoint
1941        assert_eq!(
1942            coord.changelog_drainers()[0].pending_count(),
1943            0,
1944            "pending entries should be cleared after checkpoint"
1945        );
1946
1947        // Manifest should have per-core WAL positions (captured internally)
1948        let loaded = coord.store().load_latest().unwrap().unwrap();
1949        assert_eq!(loaded.per_core_wal_positions.len(), 2);
1950
1951        // Truncate WAL
1952        coord.truncate_wal_after_checkpoint(vec![]).unwrap();
1953        assert_eq!(coord.wal_manager().unwrap().total_size(), 0);
1954    }
1955
1956    #[test]
1957    fn test_wal_manager_accessors() {
1958        let dir = tempfile::tempdir().unwrap();
1959        let mut coord = make_coordinator(dir.path());
1960
1961        assert!(coord.wal_manager().is_none());
1962        assert!(coord.wal_manager_mut().is_none());
1963        assert!(coord.changelog_drainers().is_empty());
1964
1965        let wal_dir = dir.path().join("wal");
1966        std::fs::create_dir_all(&wal_dir).unwrap();
1967        let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1968        let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1969        coord.register_wal_manager(wal);
1970
1971        assert!(coord.wal_manager().is_some());
1972        assert!(coord.wal_manager_mut().is_some());
1973    }
1974
1975    #[test]
1976    fn test_coordinator_debug_with_wal() {
1977        let dir = tempfile::tempdir().unwrap();
1978        let mut coord = make_coordinator(dir.path());
1979
1980        let wal_dir = dir.path().join("wal");
1981        std::fs::create_dir_all(&wal_dir).unwrap();
1982        let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1983        let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1984        coord.register_wal_manager(wal);
1985
1986        let debug = format!("{coord:?}");
1987        assert!(debug.contains("has_wal_manager: true"));
1988        assert!(debug.contains("changelog_drainers: 0"));
1989    }
1990
1991    // ── Checkpoint observability tests ──
1992
1993    #[tokio::test]
1994    async fn test_checkpoint_emits_metrics_on_success() {
1995        use crate::metrics::PipelineCounters;
1996        use std::sync::atomic::Ordering;
1997
1998        let dir = tempfile::tempdir().unwrap();
1999        let mut coord = make_coordinator(dir.path());
2000
2001        let counters = Arc::new(PipelineCounters::new());
2002        coord.set_counters(Arc::clone(&counters));
2003
2004        let result = coord
2005            .checkpoint(HashMap::new(), Some(1000), None, HashMap::new(), None)
2006            .await
2007            .unwrap();
2008
2009        assert!(result.success);
2010        assert_eq!(counters.checkpoints_completed.load(Ordering::Relaxed), 1);
2011        assert_eq!(counters.checkpoints_failed.load(Ordering::Relaxed), 0);
2012        assert!(counters.last_checkpoint_duration_ms.load(Ordering::Relaxed) < 5000);
2013        assert_eq!(counters.checkpoint_epoch.load(Ordering::Relaxed), 1);
2014
2015        // Second checkpoint
2016        let result2 = coord
2017            .checkpoint(HashMap::new(), Some(2000), None, HashMap::new(), None)
2018            .await
2019            .unwrap();
2020
2021        assert!(result2.success);
2022        assert_eq!(counters.checkpoints_completed.load(Ordering::Relaxed), 2);
2023        assert_eq!(counters.checkpoint_epoch.load(Ordering::Relaxed), 2);
2024    }
2025
2026    #[tokio::test]
2027    async fn test_checkpoint_without_counters() {
2028        // Verify checkpoint works fine without counters set
2029        let dir = tempfile::tempdir().unwrap();
2030        let mut coord = make_coordinator(dir.path());
2031
2032        let result = coord
2033            .checkpoint(HashMap::new(), None, None, HashMap::new(), None)
2034            .await
2035            .unwrap();
2036
2037        assert!(result.success);
2038        // No panics — metrics emission is a no-op
2039    }
2040
2041    // ── DurationHistogram tests ──
2042
2043    #[test]
2044    fn test_histogram_empty() {
2045        let h = DurationHistogram::new();
2046        assert_eq!(h.len(), 0);
2047        assert_eq!(h.percentile(0.50), 0);
2048        assert_eq!(h.percentile(0.99), 0);
2049        let (p50, p95, p99) = h.percentiles();
2050        assert_eq!((p50, p95, p99), (0, 0, 0));
2051    }
2052
2053    #[test]
2054    fn test_histogram_single_sample() {
2055        let mut h = DurationHistogram::new();
2056        h.record(Duration::from_millis(42));
2057        assert_eq!(h.len(), 1);
2058        assert_eq!(h.percentile(0.50), 42);
2059        assert_eq!(h.percentile(0.99), 42);
2060    }
2061
2062    #[test]
2063    fn test_histogram_percentiles() {
2064        let mut h = DurationHistogram::new();
2065        // Record 1..=100ms in order.
2066        for i in 1..=100 {
2067            h.record(Duration::from_millis(i));
2068        }
2069        assert_eq!(h.len(), 100);
2070
2071        let p50 = h.percentile(0.50);
2072        let p95 = h.percentile(0.95);
2073        let p99 = h.percentile(0.99);
2074
2075        // With values 1..=100:
2076        //   p50 ≈ 50, p95 ≈ 95, p99 ≈ 99
2077        assert!((49..=51).contains(&p50), "p50={p50}");
2078        assert!((94..=96).contains(&p95), "p95={p95}");
2079        assert!((98..=100).contains(&p99), "p99={p99}");
2080    }
2081
2082    #[test]
2083    fn test_histogram_wraps_ring_buffer() {
2084        let mut h = DurationHistogram::new();
2085        // Write 150 samples — first 50 are overwritten.
2086        for i in 1..=150 {
2087            h.record(Duration::from_millis(i));
2088        }
2089        assert_eq!(h.len(), 100);
2090        assert_eq!(h.count, 150);
2091
2092        // Only samples 51..=150 remain in the buffer.
2093        let p50 = h.percentile(0.50);
2094        assert!((99..=101).contains(&p50), "p50={p50}");
2095    }
2096
2097    // ── Sidecar threshold tests ──
2098
2099    #[tokio::test]
2100    async fn test_sidecar_round_trip() {
2101        let dir = tempfile::tempdir().unwrap();
2102        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2103        let config = CheckpointConfig {
2104            state_inline_threshold: 100, // 100 bytes threshold
2105            ..CheckpointConfig::default()
2106        };
2107        let mut coord = CheckpointCoordinator::new(config, store);
2108
2109        // Small state stays inline, large state goes to sidecar
2110        let mut ops = HashMap::new();
2111        ops.insert("small".into(), vec![0xAAu8; 50]);
2112        ops.insert("large".into(), vec![0xBBu8; 200]);
2113
2114        let result = coord
2115            .checkpoint(ops, None, None, HashMap::new(), None)
2116            .await
2117            .unwrap();
2118        assert!(result.success);
2119
2120        // Verify manifest
2121        let loaded = coord.store().load_latest().unwrap().unwrap();
2122        let small_op = loaded.operator_states.get("small").unwrap();
2123        assert!(!small_op.external, "small state should be inline");
2124        assert_eq!(small_op.decode_inline().unwrap(), vec![0xAAu8; 50]);
2125
2126        let large_op = loaded.operator_states.get("large").unwrap();
2127        assert!(large_op.external, "large state should be external");
2128        assert_eq!(large_op.external_length, 200);
2129
2130        // Verify sidecar file exists and has correct data
2131        let state_data = coord.store().load_state_data(1).unwrap().unwrap();
2132        assert_eq!(state_data.len(), 200);
2133        assert!(state_data.iter().all(|&b| b == 0xBB));
2134    }
2135
2136    #[tokio::test]
2137    async fn test_all_inline_no_sidecar() {
2138        let dir = tempfile::tempdir().unwrap();
2139        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2140        let config = CheckpointConfig::default(); // 1MB threshold
2141        let mut coord = CheckpointCoordinator::new(config, store);
2142
2143        let mut ops = HashMap::new();
2144        ops.insert("op1".into(), b"small-state".to_vec());
2145
2146        let result = coord
2147            .checkpoint(ops, None, None, HashMap::new(), None)
2148            .await
2149            .unwrap();
2150        assert!(result.success);
2151
2152        // No sidecar file
2153        assert!(coord.store().load_state_data(1).unwrap().is_none());
2154    }
2155
2156    // ── Adaptive interval tests ──
2157
2158    #[test]
2159    fn test_adaptive_disabled_by_default() {
2160        let dir = tempfile::tempdir().unwrap();
2161        let coord = make_coordinator(dir.path());
2162        assert!(coord.config().adaptive.is_none());
2163        assert_eq!(coord.config().interval, Some(Duration::from_secs(60)));
2164    }
2165
2166    #[test]
2167    fn test_adaptive_increases_interval_for_slow_checkpoints() {
2168        let dir = tempfile::tempdir().unwrap();
2169        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2170        let config = CheckpointConfig {
2171            adaptive: Some(AdaptiveCheckpointConfig::default()),
2172            ..CheckpointConfig::default()
2173        };
2174        let mut coord = CheckpointCoordinator::new(config, store);
2175
2176        // Simulate a 5-second checkpoint
2177        coord.last_checkpoint_duration = Some(Duration::from_secs(5));
2178        coord.adjust_interval();
2179
2180        // Expected: 5000ms / (1000 * 0.1) = 50s
2181        let interval = coord.config().interval.unwrap();
2182        assert!(
2183            interval >= Duration::from_secs(49) && interval <= Duration::from_secs(51),
2184            "expected ~50s, got {interval:?}",
2185        );
2186    }
2187
2188    #[test]
2189    fn test_adaptive_decreases_interval_for_fast_checkpoints() {
2190        let dir = tempfile::tempdir().unwrap();
2191        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2192        let config = CheckpointConfig {
2193            adaptive: Some(AdaptiveCheckpointConfig::default()),
2194            ..CheckpointConfig::default()
2195        };
2196        let mut coord = CheckpointCoordinator::new(config, store);
2197
2198        // Simulate a 100ms checkpoint → 100 / (1000 * 0.1) = 1s → clamped to 10s min
2199        coord.last_checkpoint_duration = Some(Duration::from_millis(100));
2200        coord.adjust_interval();
2201
2202        let interval = coord.config().interval.unwrap();
2203        assert_eq!(
2204            interval,
2205            Duration::from_secs(10),
2206            "should clamp to min_interval"
2207        );
2208    }
2209
2210    #[test]
2211    fn test_adaptive_clamps_to_min_max() {
2212        let dir = tempfile::tempdir().unwrap();
2213        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2214        let config = CheckpointConfig {
2215            adaptive: Some(AdaptiveCheckpointConfig {
2216                min_interval: Duration::from_secs(20),
2217                max_interval: Duration::from_secs(120),
2218                target_overhead_ratio: 0.1,
2219                smoothing_alpha: 1.0, // Full weight on latest
2220            }),
2221            ..CheckpointConfig::default()
2222        };
2223        let mut coord = CheckpointCoordinator::new(config, store);
2224
2225        // Very slow → clamp to max
2226        coord.last_checkpoint_duration = Some(Duration::from_secs(60));
2227        coord.adjust_interval();
2228        let interval = coord.config().interval.unwrap();
2229        assert_eq!(interval, Duration::from_secs(120), "should clamp to max");
2230
2231        // Very fast → clamp to min
2232        coord.last_checkpoint_duration = Some(Duration::from_millis(10));
2233        coord.smoothed_duration_ms = 0.0; // Reset EMA
2234        coord.adjust_interval();
2235        let interval = coord.config().interval.unwrap();
2236        assert_eq!(interval, Duration::from_secs(20), "should clamp to min");
2237    }
2238
2239    #[test]
2240    fn test_adaptive_ema_smoothing() {
2241        let dir = tempfile::tempdir().unwrap();
2242        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2243        let config = CheckpointConfig {
2244            adaptive: Some(AdaptiveCheckpointConfig {
2245                min_interval: Duration::from_secs(1),
2246                max_interval: Duration::from_secs(600),
2247                target_overhead_ratio: 0.1,
2248                smoothing_alpha: 0.5,
2249            }),
2250            ..CheckpointConfig::default()
2251        };
2252        let mut coord = CheckpointCoordinator::new(config, store);
2253
2254        // First observation: 1000ms → EMA = 1000 (cold start)
2255        coord.last_checkpoint_duration = Some(Duration::from_millis(1000));
2256        coord.adjust_interval();
2257        assert!((coord.smoothed_duration_ms() - 1000.0).abs() < 1.0);
2258
2259        // Second observation: 2000ms → EMA = 0.5*2000 + 0.5*1000 = 1500
2260        coord.last_checkpoint_duration = Some(Duration::from_millis(2000));
2261        coord.adjust_interval();
2262        assert!((coord.smoothed_duration_ms() - 1500.0).abs() < 1.0);
2263
2264        // Third observation: 2000ms → EMA = 0.5*2000 + 0.5*1500 = 1750
2265        coord.last_checkpoint_duration = Some(Duration::from_millis(2000));
2266        coord.adjust_interval();
2267        assert!((coord.smoothed_duration_ms() - 1750.0).abs() < 1.0);
2268    }
2269
2270    #[tokio::test]
2271    async fn test_stats_include_percentiles_after_checkpoints() {
2272        let dir = tempfile::tempdir().unwrap();
2273        let mut coord = make_coordinator(dir.path());
2274
2275        // Run 3 checkpoints.
2276        for _ in 0..3 {
2277            let result = coord
2278                .checkpoint(HashMap::new(), None, None, HashMap::new(), None)
2279                .await
2280                .unwrap();
2281            assert!(result.success);
2282        }
2283
2284        let stats = coord.stats();
2285        assert_eq!(stats.completed, 3);
2286        // After 3 fast checkpoints, percentiles should be > 0
2287        // (they're real durations, not zero).
2288        assert!(stats.last_duration.is_some());
2289    }
2290}