1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use std::sync::atomic::Ordering;
24
25use laminar_connectors::checkpoint::SourceCheckpoint;
26use laminar_connectors::connector::SourceConnector;
27use laminar_storage::changelog_drainer::ChangelogDrainer;
28use laminar_storage::checkpoint_manifest::{
29 CheckpointManifest, ConnectorCheckpoint, SinkCommitStatus,
30};
31use laminar_storage::checkpoint_store::CheckpointStore;
32use laminar_storage::per_core_wal::PerCoreWalManager;
33use tracing::{debug, error, info, warn};
34
35use crate::error::DbError;
36use crate::metrics::PipelineCounters;
37
38#[derive(Debug, Clone)]
40pub struct CheckpointConfig {
41 pub interval: Option<Duration>,
43 pub max_retained: usize,
45 pub alignment_timeout: Duration,
47 pub pre_commit_timeout: Duration,
52 pub persist_timeout: Duration,
57 pub commit_timeout: Duration,
61 pub incremental: bool,
63 pub state_inline_threshold: usize,
71 pub max_checkpoint_bytes: Option<usize>,
79 pub max_pending_changelog_entries: usize,
88 pub adaptive: Option<AdaptiveCheckpointConfig>,
94 pub unaligned: Option<UnalignedCheckpointConfig>,
100}
101
102#[derive(Debug, Clone)]
110pub struct AdaptiveCheckpointConfig {
111 pub min_interval: Duration,
113 pub max_interval: Duration,
115 pub target_overhead_ratio: f64,
120 pub smoothing_alpha: f64,
125}
126
127impl Default for AdaptiveCheckpointConfig {
128 fn default() -> Self {
129 Self {
130 min_interval: Duration::from_secs(10),
131 max_interval: Duration::from_secs(300),
132 target_overhead_ratio: 0.1,
133 smoothing_alpha: 0.3,
134 }
135 }
136}
137
138pub use laminar_core::checkpoint::UnalignedCheckpointConfig;
140
141impl Default for CheckpointConfig {
142 fn default() -> Self {
143 Self {
144 interval: Some(Duration::from_secs(60)),
145 max_retained: 3,
146 alignment_timeout: Duration::from_secs(30),
147 pre_commit_timeout: Duration::from_secs(30),
148 persist_timeout: Duration::from_secs(120),
149 commit_timeout: Duration::from_secs(60),
150 incremental: false,
151 state_inline_threshold: 1_048_576,
152 max_checkpoint_bytes: None,
153 max_pending_changelog_entries: 10_000_000,
154 adaptive: None,
155 unaligned: None,
156 }
157 }
158}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
162pub enum CheckpointPhase {
163 Idle,
165 BarrierInFlight,
167 Snapshotting,
169 PreCommitting,
171 Persisting,
173 Committing,
175}
176
177impl std::fmt::Display for CheckpointPhase {
178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179 match self {
180 Self::Idle => write!(f, "Idle"),
181 Self::BarrierInFlight => write!(f, "BarrierInFlight"),
182 Self::Snapshotting => write!(f, "Snapshotting"),
183 Self::PreCommitting => write!(f, "PreCommitting"),
184 Self::Persisting => write!(f, "Persisting"),
185 Self::Committing => write!(f, "Committing"),
186 }
187 }
188}
189
190#[derive(Debug, serde::Serialize)]
192pub struct CheckpointResult {
193 pub success: bool,
195 pub checkpoint_id: u64,
197 pub epoch: u64,
199 pub duration: Duration,
201 pub error: Option<String>,
203}
204
205pub(crate) struct RegisteredSource {
207 pub name: String,
209 pub connector: Arc<tokio::sync::Mutex<Box<dyn SourceConnector>>>,
211 pub supports_replay: bool,
216}
217
218pub(crate) struct RegisteredSink {
220 pub name: String,
222 pub handle: crate::sink_task::SinkTaskHandle,
224 pub exactly_once: bool,
226}
227
228#[derive(Debug, Clone)]
233pub struct WalPrepareResult {
234 pub per_core_wal_positions: Vec<u64>,
236 pub entries_drained: u64,
238}
239
240pub struct CheckpointCoordinator {
246 config: CheckpointConfig,
247 store: Arc<dyn CheckpointStore>,
248 sinks: Vec<RegisteredSink>,
249 next_checkpoint_id: u64,
250 epoch: u64,
251 phase: CheckpointPhase,
252 checkpoints_completed: u64,
253 checkpoints_failed: u64,
254 last_checkpoint_duration: Option<Duration>,
255 duration_histogram: DurationHistogram,
257 wal_manager: Option<PerCoreWalManager>,
259 changelog_drainers: Vec<ChangelogDrainer>,
261 counters: Option<Arc<PipelineCounters>>,
263 smoothed_duration_ms: f64,
265 total_bytes_written: u64,
267 unaligned_checkpoint_count: u64,
269 previous_wal_positions: Option<Vec<u64>>,
272}
273
274impl CheckpointCoordinator {
275 #[must_use]
277 pub fn new(config: CheckpointConfig, store: Box<dyn CheckpointStore>) -> Self {
278 let store: Arc<dyn CheckpointStore> = Arc::from(store);
279 let (next_id, epoch) = match store.load_latest() {
281 Ok(Some(m)) => (m.checkpoint_id + 1, m.epoch + 1),
282 _ => (1, 1),
283 };
284
285 Self {
286 config,
287 store,
288 sinks: Vec::new(),
289 next_checkpoint_id: next_id,
290 epoch,
291 phase: CheckpointPhase::Idle,
292 checkpoints_completed: 0,
293 checkpoints_failed: 0,
294 last_checkpoint_duration: None,
295 duration_histogram: DurationHistogram::new(),
296 wal_manager: None,
297 changelog_drainers: Vec::new(),
298 counters: None,
299 smoothed_duration_ms: 0.0,
300 total_bytes_written: 0,
301 unaligned_checkpoint_count: 0,
302 previous_wal_positions: None,
303 }
304 }
305
306 pub(crate) fn register_sink(
308 &mut self,
309 name: impl Into<String>,
310 handle: crate::sink_task::SinkTaskHandle,
311 exactly_once: bool,
312 ) {
313 self.sinks.push(RegisteredSink {
314 name: name.into(),
315 handle,
316 exactly_once,
317 });
318 }
319
320 pub async fn begin_initial_epoch(&self) -> Result<(), DbError> {
331 self.begin_epoch_for_sinks(self.epoch).await
332 }
333
334 async fn begin_epoch_for_sinks(&self, epoch: u64) -> Result<(), DbError> {
337 let mut started: Vec<&RegisteredSink> = Vec::new();
338 for sink in &self.sinks {
339 if sink.exactly_once {
340 match sink.handle.begin_epoch(epoch).await {
341 Ok(()) => {
342 started.push(sink);
343 debug!(sink = %sink.name, epoch, "began epoch");
344 }
345 Err(e) => {
346 for s in &started {
348 s.handle.rollback_epoch(epoch).await;
349 }
350 return Err(DbError::Checkpoint(format!(
351 "sink '{}' failed to begin epoch {epoch}: {e}",
352 sink.name
353 )));
354 }
355 }
356 }
357 }
358 Ok(())
359 }
360
361 pub fn set_counters(&mut self, counters: Arc<PipelineCounters>) {
368 self.counters = Some(counters);
369 }
370
371 fn emit_checkpoint_metrics(&self, success: bool, epoch: u64, duration: Duration) {
373 if let Some(ref counters) = self.counters {
374 if success {
375 counters
376 .checkpoints_completed
377 .fetch_add(1, Ordering::Relaxed);
378 } else {
379 counters.checkpoints_failed.fetch_add(1, Ordering::Relaxed);
380 }
381 #[allow(clippy::cast_possible_truncation)]
382 counters
383 .last_checkpoint_duration_ms
384 .store(duration.as_millis() as u64, Ordering::Relaxed);
385 counters.checkpoint_epoch.store(epoch, Ordering::Relaxed);
386 }
387 }
388
389 pub fn register_wal_manager(&mut self, wal_manager: PerCoreWalManager) {
398 self.wal_manager = Some(wal_manager);
399 }
400
401 pub fn register_changelog_drainer(&mut self, drainer: ChangelogDrainer) {
407 self.changelog_drainers.push(drainer);
408 }
409
410 pub fn prepare_wal_for_checkpoint(&mut self) -> Result<WalPrepareResult, DbError> {
424 let mut total_drained: u64 = 0;
426 for drainer in &mut self.changelog_drainers {
427 let count = drainer.drain();
428 total_drained += count as u64;
429 debug!(
430 drained = count,
431 pending = drainer.pending_count(),
432 "changelog drainer flushed"
433 );
434 }
435
436 let per_core_wal_positions = if let Some(ref mut wal) = self.wal_manager {
438 let epoch = wal.advance_epoch();
439 wal.set_epoch_all(epoch);
440
441 wal.write_epoch_barrier_all()
442 .map_err(|e| DbError::Checkpoint(format!("WAL epoch barrier failed: {e}")))?;
443
444 wal.sync_all()
445 .map_err(|e| DbError::Checkpoint(format!("WAL sync failed: {e}")))?;
446
447 let positions = wal.synced_positions();
449 debug!(epoch, positions = ?positions, "WAL prepared for checkpoint");
450 positions
451 } else {
452 Vec::new()
453 };
454
455 Ok(WalPrepareResult {
456 per_core_wal_positions,
457 entries_drained: total_drained,
458 })
459 }
460
461 pub fn truncate_wal_after_checkpoint(
470 &mut self,
471 current_positions: Vec<u64>,
472 ) -> Result<(), DbError> {
473 if let Some(ref mut wal) = self.wal_manager {
474 if let Some(ref prev) = self.previous_wal_positions {
475 wal.truncate_all(prev)
478 .map_err(|e| DbError::Checkpoint(format!("WAL truncation failed: {e}")))?;
479 debug!("WAL segments truncated to previous checkpoint positions");
480 } else {
481 wal.reset_all()
483 .map_err(|e| DbError::Checkpoint(format!("WAL truncation failed: {e}")))?;
484 debug!("WAL segments reset after first checkpoint");
485 }
486 }
487 self.previous_wal_positions = Some(current_positions);
488 Ok(())
489 }
490
491 #[must_use]
493 pub fn wal_manager(&self) -> Option<&PerCoreWalManager> {
494 self.wal_manager.as_ref()
495 }
496
497 pub fn wal_manager_mut(&mut self) -> Option<&mut PerCoreWalManager> {
499 self.wal_manager.as_mut()
500 }
501
502 #[must_use]
504 pub fn changelog_drainers(&self) -> &[ChangelogDrainer] {
505 &self.changelog_drainers
506 }
507
508 pub fn changelog_drainers_mut(&mut self) -> &mut [ChangelogDrainer] {
510 &mut self.changelog_drainers
511 }
512
513 fn maybe_cap_drainers(&mut self) {
519 let cap = self.config.max_pending_changelog_entries;
520 let needs_clear = self
521 .changelog_drainers
522 .iter()
523 .any(|d| d.pending_count() > cap);
524 if needs_clear {
525 let total: usize = self
526 .changelog_drainers
527 .iter()
528 .map(ChangelogDrainer::pending_count)
529 .sum();
530 warn!(
531 total_pending = total,
532 cap,
533 "[LDB-6010] changelog drainer exceeded cap after checkpoint failure — \
534 clearing to prevent OOM"
535 );
536 for drainer in &mut self.changelog_drainers {
537 drainer.clear_pending();
538 }
539 }
540 }
541
542 #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
559 pub async fn checkpoint(
560 &mut self,
561 operator_states: HashMap<String, Vec<u8>>,
562 watermark: Option<i64>,
563 table_store_checkpoint_path: Option<String>,
564 source_watermarks: HashMap<String, i64>,
565 pipeline_hash: Option<u64>,
566 ) -> Result<CheckpointResult, DbError> {
567 self.checkpoint_inner(
568 operator_states,
569 watermark,
570 table_store_checkpoint_path,
571 HashMap::new(),
572 source_watermarks,
573 pipeline_hash,
574 HashMap::new(),
575 )
576 .await
577 }
578
579 async fn pre_commit_sinks(&self, epoch: u64) -> Result<(), DbError> {
584 let timeout_dur = self.config.pre_commit_timeout;
585
586 match tokio::time::timeout(timeout_dur, self.pre_commit_sinks_inner(epoch)).await {
587 Ok(result) => result,
588 Err(_elapsed) => Err(DbError::Checkpoint(format!(
589 "pre-commit timed out after {}s",
590 timeout_dur.as_secs()
591 ))),
592 }
593 }
594
595 async fn pre_commit_sinks_inner(&self, epoch: u64) -> Result<(), DbError> {
601 for sink in &self.sinks {
602 if sink.exactly_once {
603 sink.handle.pre_commit(epoch).await.map_err(|e| {
604 DbError::Checkpoint(format!("sink '{}' pre-commit failed: {e}", sink.name))
605 })?;
606 debug!(sink = %sink.name, epoch, "sink pre-committed");
607 }
608 }
609 Ok(())
610 }
611
612 async fn commit_sinks_tracked(&self, epoch: u64) -> HashMap<String, SinkCommitStatus> {
621 let timeout_dur = self.config.commit_timeout;
622
623 match tokio::time::timeout(timeout_dur, self.commit_sinks_inner(epoch)).await {
624 Ok(statuses) => statuses,
625 Err(_elapsed) => {
626 error!(
627 epoch,
628 timeout_secs = timeout_dur.as_secs(),
629 "[LDB-6012] sink commit timed out — marking all pending sinks as failed"
630 );
631 self.sinks
632 .iter()
633 .filter(|s| s.exactly_once)
634 .map(|s| {
635 (
636 s.name.clone(),
637 SinkCommitStatus::Failed(format!(
638 "sink '{}' commit timed out after {}s",
639 s.name,
640 timeout_dur.as_secs()
641 )),
642 )
643 })
644 .collect()
645 }
646 }
647 }
648
649 async fn commit_sinks_inner(&self, epoch: u64) -> HashMap<String, SinkCommitStatus> {
651 let mut statuses = HashMap::with_capacity(self.sinks.len());
652
653 for sink in &self.sinks {
654 if sink.exactly_once {
655 match sink.handle.commit_epoch(epoch).await {
656 Ok(()) => {
657 statuses.insert(sink.name.clone(), SinkCommitStatus::Committed);
658 debug!(sink = %sink.name, epoch, "sink committed");
659 }
660 Err(e) => {
661 let msg = format!("sink '{}' commit failed: {e}", sink.name);
662 error!(sink = %sink.name, epoch, error = %e, "sink commit failed");
663 statuses.insert(sink.name.clone(), SinkCommitStatus::Failed(msg));
664 }
665 }
666 }
667 }
668
669 statuses
670 }
671
672 async fn save_manifest(
684 &self,
685 manifest: CheckpointManifest,
686 state_data: Option<Vec<u8>>,
687 ) -> Result<(), DbError> {
688 let store = Arc::clone(&self.store);
689 let timeout_dur = self.config.persist_timeout;
690
691 let task = tokio::task::spawn_blocking(move || {
692 store.save_with_state(&manifest, state_data.as_deref())
693 });
694
695 match tokio::time::timeout(timeout_dur, task).await {
696 Ok(Ok(Ok(()))) => Ok(()),
697 Ok(Ok(Err(e))) => Err(DbError::Checkpoint(format!("manifest persist failed: {e}"))),
698 Ok(Err(join_err)) => Err(DbError::Checkpoint(format!(
699 "manifest persist task failed: {join_err}"
700 ))),
701 Err(_elapsed) => Err(DbError::Checkpoint(format!(
702 "[LDB-6011] manifest persist timed out after {}s — \
703 filesystem may be degraded",
704 timeout_dur.as_secs()
705 ))),
706 }
707 }
708
709 async fn update_manifest_only(&self, manifest: &CheckpointManifest) -> Result<(), DbError> {
713 let store = Arc::clone(&self.store);
714 let manifest = manifest.clone();
715 let timeout_dur = self.config.persist_timeout;
716
717 let task = tokio::task::spawn_blocking(move || store.update_manifest(&manifest));
718
719 match tokio::time::timeout(timeout_dur, task).await {
720 Ok(Ok(Ok(()))) => Ok(()),
721 Ok(Ok(Err(e))) => Err(DbError::Checkpoint(format!("manifest update failed: {e}"))),
722 Ok(Err(join_err)) => Err(DbError::Checkpoint(format!(
723 "manifest update task failed: {join_err}"
724 ))),
725 Err(_elapsed) => Err(DbError::Checkpoint(format!(
726 "manifest update timed out after {}s",
727 timeout_dur.as_secs()
728 ))),
729 }
730 }
731
732 fn initial_sink_commit_statuses(&self) -> HashMap<String, SinkCommitStatus> {
734 self.sinks
735 .iter()
736 .filter(|s| s.exactly_once)
737 .map(|s| (s.name.clone(), SinkCommitStatus::Pending))
738 .collect()
739 }
740
741 fn pack_operator_states(
746 manifest: &mut CheckpointManifest,
747 operator_states: &HashMap<String, Vec<u8>>,
748 threshold: usize,
749 ) -> Option<Vec<u8>> {
750 let mut sidecar_blobs: Vec<u8> = Vec::new();
751 for (name, data) in operator_states {
752 let (op_ckpt, maybe_blob) =
753 laminar_storage::checkpoint_manifest::OperatorCheckpoint::from_bytes(
754 data,
755 threshold,
756 sidecar_blobs.len() as u64,
757 );
758 if let Some(blob) = maybe_blob {
759 sidecar_blobs.extend_from_slice(&blob);
760 }
761 manifest.operator_states.insert(name.clone(), op_ckpt);
762 }
763
764 if sidecar_blobs.is_empty() {
765 None
766 } else {
767 Some(sidecar_blobs)
768 }
769 }
770
771 async fn rollback_sinks(&self, epoch: u64) -> Result<(), DbError> {
773 for sink in &self.sinks {
774 if sink.exactly_once {
775 sink.handle.rollback_epoch(epoch).await;
776 }
777 }
778 Ok(())
779 }
780
781 fn collect_sink_epochs(&self) -> HashMap<String, u64> {
783 let mut epochs = HashMap::with_capacity(self.sinks.len());
784 for sink in &self.sinks {
785 if sink.exactly_once {
787 epochs.insert(sink.name.clone(), self.epoch);
788 }
789 }
790 epochs
791 }
792
793 fn sorted_sink_names(&self) -> Vec<String> {
795 let mut names: Vec<String> = self.sinks.iter().map(|s| s.name.clone()).collect();
796 names.sort();
797 names
798 }
799
800 #[must_use]
802 pub fn phase(&self) -> CheckpointPhase {
803 self.phase
804 }
805
806 #[must_use]
808 pub fn epoch(&self) -> u64 {
809 self.epoch
810 }
811
812 #[must_use]
814 pub fn next_checkpoint_id(&self) -> u64 {
815 self.next_checkpoint_id
816 }
817
818 #[must_use]
820 pub fn config(&self) -> &CheckpointConfig {
821 &self.config
822 }
823
824 fn adjust_interval(&mut self) {
832 let adaptive = match &self.config.adaptive {
833 Some(a) => a.clone(),
834 None => return,
835 };
836
837 #[allow(clippy::cast_precision_loss)] let last_ms = match self.last_checkpoint_duration {
839 Some(d) => d.as_millis() as f64,
840 None => return,
841 };
842
843 if self.smoothed_duration_ms == 0.0 {
845 self.smoothed_duration_ms = last_ms;
846 } else {
847 self.smoothed_duration_ms = adaptive.smoothing_alpha * last_ms
848 + (1.0 - adaptive.smoothing_alpha) * self.smoothed_duration_ms;
849 }
850
851 let new_interval_secs =
853 self.smoothed_duration_ms / (1000.0 * adaptive.target_overhead_ratio);
854 let new_interval = Duration::from_secs_f64(new_interval_secs);
855
856 let clamped = new_interval.clamp(adaptive.min_interval, adaptive.max_interval);
858
859 let old_interval = self.config.interval;
860 self.config.interval = Some(clamped);
861
862 if old_interval != Some(clamped) {
863 debug!(
864 old_interval_ms = old_interval.map(|d| d.as_millis()),
865 new_interval_ms = clamped.as_millis(),
866 smoothed_duration_ms = self.smoothed_duration_ms,
867 "adaptive checkpoint interval adjusted"
868 );
869 }
870 }
871
872 #[must_use]
877 pub fn smoothed_duration_ms(&self) -> f64 {
878 self.smoothed_duration_ms
879 }
880
881 #[must_use]
883 pub fn unaligned_checkpoint_count(&self) -> u64 {
884 self.unaligned_checkpoint_count
885 }
886
887 #[must_use]
889 pub fn stats(&self) -> CheckpointStats {
890 let (p50, p95, p99) = self.duration_histogram.percentiles();
891 CheckpointStats {
892 completed: self.checkpoints_completed,
893 failed: self.checkpoints_failed,
894 last_duration: self.last_checkpoint_duration,
895 duration_p50_ms: p50,
896 duration_p95_ms: p95,
897 duration_p99_ms: p99,
898 total_bytes_written: self.total_bytes_written,
899 current_phase: self.phase,
900 current_epoch: self.epoch,
901 unaligned_checkpoint_count: self.unaligned_checkpoint_count,
902 }
903 }
904
905 #[must_use]
907 pub fn store(&self) -> &dyn CheckpointStore {
908 &*self.store
909 }
910
911 #[allow(clippy::too_many_arguments)]
922 pub async fn checkpoint_with_extra_tables(
923 &mut self,
924 operator_states: HashMap<String, Vec<u8>>,
925 watermark: Option<i64>,
926 table_store_checkpoint_path: Option<String>,
927 extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
928 source_watermarks: HashMap<String, i64>,
929 pipeline_hash: Option<u64>,
930 ) -> Result<CheckpointResult, DbError> {
931 self.checkpoint_inner(
932 operator_states,
933 watermark,
934 table_store_checkpoint_path,
935 extra_table_offsets,
936 source_watermarks,
937 pipeline_hash,
938 HashMap::new(),
939 )
940 .await
941 }
942
943 #[allow(clippy::too_many_arguments)]
954 pub async fn checkpoint_with_offsets(
955 &mut self,
956 operator_states: HashMap<String, Vec<u8>>,
957 watermark: Option<i64>,
958 table_store_checkpoint_path: Option<String>,
959 extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
960 source_watermarks: HashMap<String, i64>,
961 pipeline_hash: Option<u64>,
962 source_offset_overrides: HashMap<String, ConnectorCheckpoint>,
963 ) -> Result<CheckpointResult, DbError> {
964 self.checkpoint_inner(
965 operator_states,
966 watermark,
967 table_store_checkpoint_path,
968 extra_table_offsets,
969 source_watermarks,
970 pipeline_hash,
971 source_offset_overrides,
972 )
973 .await
974 }
975
976 #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
987 async fn checkpoint_inner(
988 &mut self,
989 operator_states: HashMap<String, Vec<u8>>,
990 watermark: Option<i64>,
991 table_store_checkpoint_path: Option<String>,
992 extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
993 source_watermarks: HashMap<String, i64>,
994 pipeline_hash: Option<u64>,
995 source_offset_overrides: HashMap<String, ConnectorCheckpoint>,
996 ) -> Result<CheckpointResult, DbError> {
997 let start = Instant::now();
998 let checkpoint_id = self.next_checkpoint_id;
999 let epoch = self.epoch;
1000
1001 info!(checkpoint_id, epoch, "starting checkpoint");
1002
1003 let wal_result = self.prepare_wal_for_checkpoint()?;
1008 let per_core_wal_positions = wal_result.per_core_wal_positions;
1009
1010 self.phase = CheckpointPhase::Snapshotting;
1014 let source_offsets = source_offset_overrides;
1015 let table_offsets = extra_table_offsets;
1016
1017 self.phase = CheckpointPhase::PreCommitting;
1019 if let Err(e) = self.pre_commit_sinks(epoch).await {
1020 self.phase = CheckpointPhase::Idle;
1021 self.checkpoints_failed += 1;
1022 self.maybe_cap_drainers();
1023 let duration = start.elapsed();
1024 self.emit_checkpoint_metrics(false, epoch, duration);
1025 error!(checkpoint_id, epoch, error = %e, "pre-commit failed");
1026 return Ok(CheckpointResult {
1027 success: false,
1028 checkpoint_id,
1029 epoch,
1030 duration,
1031 error: Some(format!("pre-commit failed: {e}")),
1032 });
1033 }
1034
1035 let mut manifest = CheckpointManifest::new(checkpoint_id, epoch);
1037 manifest.source_offsets = source_offsets;
1038 manifest.table_offsets = table_offsets;
1039 manifest.sink_epochs = self.collect_sink_epochs();
1040 manifest.sink_commit_statuses = self.initial_sink_commit_statuses();
1042 manifest.watermark = watermark;
1043 manifest.source_watermarks = source_watermarks;
1048 manifest.per_core_wal_positions = per_core_wal_positions;
1050 manifest.table_store_checkpoint_path = table_store_checkpoint_path;
1051 manifest.is_incremental = self.config.incremental;
1052 manifest.source_names = {
1053 let mut names: Vec<String> = manifest.source_offsets.keys().cloned().collect();
1054 names.sort();
1055 names
1056 };
1057 manifest.sink_names = self.sorted_sink_names();
1058 manifest.pipeline_hash = pipeline_hash;
1059
1060 let state_data = Self::pack_operator_states(
1061 &mut manifest,
1062 &operator_states,
1063 self.config.state_inline_threshold,
1064 );
1065 let sidecar_bytes = state_data.as_ref().map_or(0, Vec::len);
1066 if sidecar_bytes > 0 {
1067 debug!(
1068 checkpoint_id,
1069 sidecar_bytes, "writing operator state sidecar"
1070 );
1071 }
1072
1073 if let Some(cap) = self.config.max_checkpoint_bytes {
1075 if sidecar_bytes > cap {
1076 self.phase = CheckpointPhase::Idle;
1077 self.checkpoints_failed += 1;
1078 self.maybe_cap_drainers();
1079 let duration = start.elapsed();
1080 self.emit_checkpoint_metrics(false, epoch, duration);
1081 let msg = format!(
1082 "[LDB-6014] checkpoint size {sidecar_bytes} bytes exceeds \
1083 cap {cap} bytes — checkpoint rejected"
1084 );
1085 error!(checkpoint_id, epoch, sidecar_bytes, cap, "{msg}");
1086 return Ok(CheckpointResult {
1087 success: false,
1088 checkpoint_id,
1089 epoch,
1090 duration,
1091 error: Some(msg),
1092 });
1093 }
1094 let warn_threshold = cap * 4 / 5; if sidecar_bytes > warn_threshold {
1096 warn!(
1097 checkpoint_id,
1098 epoch, sidecar_bytes, cap, "checkpoint size approaching cap (>80%)"
1099 );
1100 }
1101 }
1102 let checkpoint_bytes = sidecar_bytes as u64;
1104
1105 self.phase = CheckpointPhase::Persisting;
1107 if let Err(e) = self.save_manifest(manifest.clone(), state_data).await {
1108 self.phase = CheckpointPhase::Idle;
1109 self.checkpoints_failed += 1;
1110 self.maybe_cap_drainers();
1111 let duration = start.elapsed();
1112 self.emit_checkpoint_metrics(false, epoch, duration);
1113 if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1114 error!(
1115 checkpoint_id,
1116 epoch,
1117 error = %rollback_err,
1118 "[LDB-6004] sink rollback failed after manifest persist failure — \
1119 sinks may be in an inconsistent state"
1120 );
1121 }
1122 error!(checkpoint_id, epoch, error = %e, "[LDB-6008] manifest persist failed");
1123 return Ok(CheckpointResult {
1124 success: false,
1125 checkpoint_id,
1126 epoch,
1127 duration,
1128 error: Some(format!("manifest persist failed: {e}")),
1129 });
1130 }
1131
1132 self.phase = CheckpointPhase::Committing;
1134 let sink_statuses = self.commit_sinks_tracked(epoch).await;
1135 let has_failures = sink_statuses
1136 .values()
1137 .any(|s| matches!(s, SinkCommitStatus::Failed(_)));
1138
1139 if !sink_statuses.is_empty() {
1141 manifest.sink_commit_statuses = sink_statuses;
1142 if let Err(e) = self.update_manifest_only(&manifest).await {
1143 warn!(
1144 checkpoint_id,
1145 epoch,
1146 error = %e,
1147 "post-commit manifest update failed"
1148 );
1149 }
1150 }
1151
1152 if has_failures {
1153 self.checkpoints_failed += 1;
1154 error!(
1155 checkpoint_id,
1156 epoch, "sink commit partially failed — epoch NOT advanced, will retry"
1157 );
1158 self.phase = CheckpointPhase::Idle;
1159 let duration = start.elapsed();
1160 self.emit_checkpoint_metrics(false, epoch, duration);
1161 return Ok(CheckpointResult {
1162 success: false,
1163 checkpoint_id,
1164 epoch,
1165 duration,
1166 error: Some("partial sink commit failure".into()),
1167 });
1168 }
1169
1170 for drainer in &mut self.changelog_drainers {
1175 drainer.clear_pending();
1176 }
1177
1178 self.phase = CheckpointPhase::Idle;
1180 self.next_checkpoint_id += 1;
1181 self.epoch += 1;
1182 self.checkpoints_completed += 1;
1183 self.total_bytes_written += checkpoint_bytes;
1184 let duration = start.elapsed();
1185 self.last_checkpoint_duration = Some(duration);
1186 self.duration_histogram.record(duration);
1187 self.emit_checkpoint_metrics(true, epoch, duration);
1188 self.adjust_interval();
1189
1190 let next_epoch = self.epoch;
1192 let begin_epoch_error = match self.begin_epoch_for_sinks(next_epoch).await {
1193 Ok(()) => None,
1194 Err(e) => {
1195 error!(
1196 next_epoch,
1197 error = %e,
1198 "[LDB-6015] failed to begin next epoch — writes will be non-transactional"
1199 );
1200 Some(e.to_string())
1201 }
1202 };
1203
1204 info!(
1205 checkpoint_id,
1206 epoch,
1207 duration_ms = duration.as_millis(),
1208 "checkpoint completed"
1209 );
1210
1211 Ok(CheckpointResult {
1215 success: true,
1216 checkpoint_id,
1217 epoch,
1218 duration,
1219 error: begin_epoch_error,
1220 })
1221 }
1222
1223 pub async fn recover(
1236 &mut self,
1237 ) -> Result<Option<crate::recovery_manager::RecoveredState>, DbError> {
1238 use crate::recovery_manager::RecoveryManager;
1239
1240 let mgr = RecoveryManager::new(&*self.store);
1241 let result = mgr.recover(&[], &self.sinks, &[]).await?;
1245
1246 if let Some(ref recovered) = result {
1247 self.epoch = recovered.epoch() + 1;
1249 self.next_checkpoint_id = recovered.manifest.checkpoint_id + 1;
1250 info!(
1251 epoch = self.epoch,
1252 checkpoint_id = self.next_checkpoint_id,
1253 "coordinator epoch set after recovery"
1254 );
1255 }
1256
1257 Ok(result)
1258 }
1259
1260 pub fn load_latest_manifest(&self) -> Result<Option<CheckpointManifest>, DbError> {
1266 self.store
1267 .load_latest()
1268 .map_err(|e| DbError::Checkpoint(format!("failed to load latest manifest: {e}")))
1269 }
1270}
1271
1272impl std::fmt::Debug for CheckpointCoordinator {
1273 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1274 f.debug_struct("CheckpointCoordinator")
1275 .field("epoch", &self.epoch)
1276 .field("next_checkpoint_id", &self.next_checkpoint_id)
1277 .field("phase", &self.phase)
1278 .field("sinks", &self.sinks.len())
1279 .field("has_wal_manager", &self.wal_manager.is_some())
1280 .field("changelog_drainers", &self.changelog_drainers.len())
1281 .field("completed", &self.checkpoints_completed)
1282 .field("failed", &self.checkpoints_failed)
1283 .finish_non_exhaustive()
1284 }
1285}
1286
1287#[derive(Clone)]
1292pub struct DurationHistogram {
1293 samples: Box<[u64; Self::CAPACITY]>,
1295 cursor: usize,
1297 count: u64,
1299}
1300
1301impl DurationHistogram {
1302 const CAPACITY: usize = 100;
1303
1304 #[must_use]
1306 fn new() -> Self {
1307 Self {
1308 samples: Box::new([0; Self::CAPACITY]),
1309 cursor: 0,
1310 count: 0,
1311 }
1312 }
1313
1314 fn record(&mut self, duration: Duration) {
1316 #[allow(clippy::cast_possible_truncation)]
1317 let ms = duration.as_millis() as u64;
1318 self.samples[self.cursor] = ms;
1319 self.cursor = (self.cursor + 1) % Self::CAPACITY;
1320 self.count += 1;
1321 }
1322
1323 #[must_use]
1325 fn len(&self) -> usize {
1326 if self.count >= Self::CAPACITY as u64 {
1327 Self::CAPACITY
1328 } else {
1329 #[allow(clippy::cast_possible_truncation)]
1331 {
1332 self.count as usize
1333 }
1334 }
1335 }
1336
1337 #[must_use]
1341 fn percentile(&self, p: f64) -> u64 {
1342 let n = self.len();
1343 if n == 0 {
1344 return 0;
1345 }
1346 let mut sorted: Vec<u64> = self.samples[..n].to_vec();
1347 sorted.sort_unstable();
1348 #[allow(
1349 clippy::cast_possible_truncation,
1350 clippy::cast_sign_loss,
1351 clippy::cast_precision_loss
1352 )]
1353 let idx = ((p * (n as f64 - 1.0)).ceil() as usize).min(n - 1);
1354 sorted[idx]
1355 }
1356
1357 #[must_use]
1359 fn percentiles(&self) -> (u64, u64, u64) {
1360 (
1361 self.percentile(0.50),
1362 self.percentile(0.95),
1363 self.percentile(0.99),
1364 )
1365 }
1366}
1367
1368impl std::fmt::Debug for DurationHistogram {
1369 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1370 let (p50, p95, p99) = self.percentiles();
1371 f.debug_struct("DurationHistogram")
1372 .field("samples_len", &self.samples.len())
1373 .field("cursor", &self.cursor)
1374 .field("count", &self.count)
1375 .field("p50_ms", &p50)
1376 .field("p95_ms", &p95)
1377 .field("p99_ms", &p99)
1378 .finish()
1379 }
1380}
1381
1382#[derive(Debug, Clone, serde::Serialize)]
1384pub struct CheckpointStats {
1385 pub completed: u64,
1387 pub failed: u64,
1389 pub last_duration: Option<Duration>,
1391 pub duration_p50_ms: u64,
1393 pub duration_p95_ms: u64,
1395 pub duration_p99_ms: u64,
1397 pub total_bytes_written: u64,
1399 pub current_phase: CheckpointPhase,
1401 pub current_epoch: u64,
1403 pub unaligned_checkpoint_count: u64,
1405}
1406
1407#[must_use]
1411pub fn source_to_connector_checkpoint(cp: &SourceCheckpoint) -> ConnectorCheckpoint {
1412 ConnectorCheckpoint {
1413 offsets: cp.offsets().clone(),
1414 epoch: cp.epoch(),
1415 metadata: cp.metadata().clone(),
1416 }
1417}
1418
1419#[must_use]
1421pub fn connector_to_source_checkpoint(cp: &ConnectorCheckpoint) -> SourceCheckpoint {
1422 let mut source_cp = SourceCheckpoint::with_offsets(cp.epoch, cp.offsets.clone());
1423 for (k, v) in &cp.metadata {
1424 source_cp.set_metadata(k.clone(), v.clone());
1425 }
1426 source_cp
1427}
1428
1429#[must_use]
1435pub fn dag_snapshot_to_manifest_operators<S: std::hash::BuildHasher>(
1436 node_states: &std::collections::HashMap<
1437 u32,
1438 laminar_core::dag::recovery::SerializableOperatorState,
1439 S,
1440 >,
1441) -> HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint> {
1442 node_states
1443 .iter()
1444 .map(|(id, state)| {
1445 (
1446 id.to_string(),
1447 laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&state.data),
1448 )
1449 })
1450 .collect()
1451}
1452
1453#[must_use]
1457pub fn manifest_operators_to_dag_states<S: std::hash::BuildHasher>(
1458 operators: &HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint, S>,
1459) -> rustc_hash::FxHashMap<laminar_core::dag::topology::NodeId, laminar_core::operator::OperatorState>
1460{
1461 let mut states =
1462 rustc_hash::FxHashMap::with_capacity_and_hasher(operators.len(), rustc_hash::FxBuildHasher);
1463 for (key, op_ckpt) in operators {
1464 if let Ok(node_id) = key.parse::<u32>() {
1465 if let Some(data) = op_ckpt.decode_inline() {
1466 states.insert(
1467 laminar_core::dag::topology::NodeId(node_id),
1468 laminar_core::operator::OperatorState {
1469 operator_id: key.clone(),
1470 data,
1471 },
1472 );
1473 }
1474 }
1475 }
1476 states
1477}
1478
1479#[cfg(test)]
1480mod tests {
1481 use super::*;
1482 use laminar_storage::checkpoint_store::FileSystemCheckpointStore;
1483
1484 fn make_coordinator(dir: &std::path::Path) -> CheckpointCoordinator {
1485 let store = Box::new(FileSystemCheckpointStore::new(dir, 3));
1486 CheckpointCoordinator::new(CheckpointConfig::default(), store)
1487 }
1488
1489 #[test]
1490 fn test_coordinator_new() {
1491 let dir = tempfile::tempdir().unwrap();
1492 let coord = make_coordinator(dir.path());
1493
1494 assert_eq!(coord.epoch(), 1);
1495 assert_eq!(coord.next_checkpoint_id(), 1);
1496 assert_eq!(coord.phase(), CheckpointPhase::Idle);
1497 }
1498
1499 #[test]
1500 fn test_coordinator_resumes_from_stored_checkpoint() {
1501 let dir = tempfile::tempdir().unwrap();
1502
1503 let store = FileSystemCheckpointStore::new(dir.path(), 3);
1505 let m = CheckpointManifest::new(5, 10);
1506 store.save(&m).unwrap();
1507
1508 let coord = make_coordinator(dir.path());
1510 assert_eq!(coord.epoch(), 11);
1511 assert_eq!(coord.next_checkpoint_id(), 6);
1512 }
1513
1514 #[test]
1515 fn test_checkpoint_phase_display() {
1516 assert_eq!(CheckpointPhase::Idle.to_string(), "Idle");
1517 assert_eq!(
1518 CheckpointPhase::BarrierInFlight.to_string(),
1519 "BarrierInFlight"
1520 );
1521 assert_eq!(CheckpointPhase::Snapshotting.to_string(), "Snapshotting");
1522 assert_eq!(CheckpointPhase::PreCommitting.to_string(), "PreCommitting");
1523 assert_eq!(CheckpointPhase::Persisting.to_string(), "Persisting");
1524 assert_eq!(CheckpointPhase::Committing.to_string(), "Committing");
1525 }
1526
1527 #[test]
1528 fn test_source_to_connector_checkpoint() {
1529 let mut cp = SourceCheckpoint::new(5);
1530 cp.set_offset("partition-0", "1234");
1531 cp.set_metadata("topic", "events");
1532
1533 let cc = source_to_connector_checkpoint(&cp);
1534 assert_eq!(cc.epoch, 5);
1535 assert_eq!(cc.offsets.get("partition-0"), Some(&"1234".into()));
1536 assert_eq!(cc.metadata.get("topic"), Some(&"events".into()));
1537 }
1538
1539 #[test]
1540 fn test_connector_to_source_checkpoint() {
1541 let cc = ConnectorCheckpoint {
1542 offsets: HashMap::from([("lsn".into(), "0/ABCD".into())]),
1543 epoch: 3,
1544 metadata: HashMap::from([("type".into(), "postgres".into())]),
1545 };
1546
1547 let cp = connector_to_source_checkpoint(&cc);
1548 assert_eq!(cp.epoch(), 3);
1549 assert_eq!(cp.get_offset("lsn"), Some("0/ABCD"));
1550 assert_eq!(cp.get_metadata("type"), Some("postgres"));
1551 }
1552
1553 #[test]
1554 fn test_stats_initial() {
1555 let dir = tempfile::tempdir().unwrap();
1556 let coord = make_coordinator(dir.path());
1557 let stats = coord.stats();
1558
1559 assert_eq!(stats.completed, 0);
1560 assert_eq!(stats.failed, 0);
1561 assert!(stats.last_duration.is_none());
1562 assert_eq!(stats.duration_p50_ms, 0);
1563 assert_eq!(stats.duration_p95_ms, 0);
1564 assert_eq!(stats.duration_p99_ms, 0);
1565 assert_eq!(stats.current_phase, CheckpointPhase::Idle);
1566 }
1567
1568 #[tokio::test]
1569 async fn test_checkpoint_no_sources_no_sinks() {
1570 let dir = tempfile::tempdir().unwrap();
1571 let mut coord = make_coordinator(dir.path());
1572
1573 let result = coord
1574 .checkpoint(HashMap::new(), Some(1000), None, HashMap::new(), None)
1575 .await
1576 .unwrap();
1577
1578 assert!(result.success);
1579 assert_eq!(result.checkpoint_id, 1);
1580 assert_eq!(result.epoch, 1);
1581
1582 let loaded = coord.store().load_latest().unwrap().unwrap();
1584 assert_eq!(loaded.checkpoint_id, 1);
1585 assert_eq!(loaded.epoch, 1);
1586 assert_eq!(loaded.watermark, Some(1000));
1587
1588 let result2 = coord
1590 .checkpoint(HashMap::new(), Some(2000), None, HashMap::new(), None)
1591 .await
1592 .unwrap();
1593
1594 assert!(result2.success);
1595 assert_eq!(result2.checkpoint_id, 2);
1596 assert_eq!(result2.epoch, 2);
1597
1598 let stats = coord.stats();
1599 assert_eq!(stats.completed, 2);
1600 assert_eq!(stats.failed, 0);
1601 }
1602
1603 #[tokio::test]
1604 async fn test_checkpoint_with_operator_states() {
1605 let dir = tempfile::tempdir().unwrap();
1606 let mut coord = make_coordinator(dir.path());
1607
1608 let mut ops = HashMap::new();
1609 ops.insert("window-agg".into(), b"state-data".to_vec());
1610 ops.insert("filter".into(), b"filter-state".to_vec());
1611
1612 let result = coord
1613 .checkpoint(ops, None, None, HashMap::new(), None)
1614 .await
1615 .unwrap();
1616
1617 assert!(result.success);
1618
1619 let loaded = coord.store().load_latest().unwrap().unwrap();
1620 assert_eq!(loaded.operator_states.len(), 2);
1621 assert!(loaded.per_core_wal_positions.is_empty());
1623
1624 let window_op = loaded.operator_states.get("window-agg").unwrap();
1625 assert_eq!(window_op.decode_inline().unwrap(), b"state-data");
1626 }
1627
1628 #[tokio::test]
1629 async fn test_checkpoint_with_table_store_path() {
1630 let dir = tempfile::tempdir().unwrap();
1631 let mut coord = make_coordinator(dir.path());
1632
1633 let result = coord
1634 .checkpoint(
1635 HashMap::new(),
1636 None,
1637 Some("/tmp/rocksdb_cp".into()),
1638 HashMap::new(),
1639 None,
1640 )
1641 .await
1642 .unwrap();
1643
1644 assert!(result.success);
1645
1646 let loaded = coord.store().load_latest().unwrap().unwrap();
1647 assert_eq!(
1648 loaded.table_store_checkpoint_path.as_deref(),
1649 Some("/tmp/rocksdb_cp")
1650 );
1651 }
1652
1653 #[test]
1654 fn test_load_latest_manifest_empty() {
1655 let dir = tempfile::tempdir().unwrap();
1656 let coord = make_coordinator(dir.path());
1657 assert!(coord.load_latest_manifest().unwrap().is_none());
1658 }
1659
1660 #[test]
1661 fn test_coordinator_debug() {
1662 let dir = tempfile::tempdir().unwrap();
1663 let coord = make_coordinator(dir.path());
1664 let debug = format!("{coord:?}");
1665 assert!(debug.contains("CheckpointCoordinator"));
1666 assert!(debug.contains("epoch: 1"));
1667 }
1668
1669 #[test]
1672 fn test_dag_snapshot_to_manifest_operators() {
1673 use laminar_core::dag::recovery::SerializableOperatorState;
1674
1675 let mut node_states = HashMap::new();
1676 node_states.insert(
1677 0,
1678 SerializableOperatorState {
1679 operator_id: "window-agg".into(),
1680 data: b"window-state".to_vec(),
1681 },
1682 );
1683 node_states.insert(
1684 3,
1685 SerializableOperatorState {
1686 operator_id: "filter".into(),
1687 data: b"filter-state".to_vec(),
1688 },
1689 );
1690
1691 let manifest_ops = dag_snapshot_to_manifest_operators(&node_states);
1692 assert_eq!(manifest_ops.len(), 2);
1693
1694 let w = manifest_ops.get("0").unwrap();
1695 assert_eq!(w.decode_inline().unwrap(), b"window-state");
1696 let f = manifest_ops.get("3").unwrap();
1697 assert_eq!(f.decode_inline().unwrap(), b"filter-state");
1698 }
1699
1700 #[test]
1701 fn test_manifest_operators_to_dag_states() {
1702 use laminar_storage::checkpoint_manifest::OperatorCheckpoint;
1703
1704 let mut operators = HashMap::new();
1705 operators.insert("0".into(), OperatorCheckpoint::inline(b"state-0"));
1706 operators.insert("5".into(), OperatorCheckpoint::inline(b"state-5"));
1707
1708 let dag_states = manifest_operators_to_dag_states(&operators);
1709 assert_eq!(dag_states.len(), 2);
1710
1711 let s0 = dag_states
1712 .get(&laminar_core::dag::topology::NodeId(0))
1713 .unwrap();
1714 assert_eq!(s0.data, b"state-0");
1715
1716 let s5 = dag_states
1717 .get(&laminar_core::dag::topology::NodeId(5))
1718 .unwrap();
1719 assert_eq!(s5.data, b"state-5");
1720 }
1721
1722 #[test]
1723 fn test_operator_state_round_trip_through_manifest() {
1724 use laminar_core::dag::recovery::SerializableOperatorState;
1725
1726 let mut node_states = HashMap::new();
1728 node_states.insert(
1729 7,
1730 SerializableOperatorState {
1731 operator_id: "join".into(),
1732 data: vec![1, 2, 3, 4, 5],
1733 },
1734 );
1735
1736 let manifest_ops = dag_snapshot_to_manifest_operators(&node_states);
1738
1739 let json = serde_json::to_string(&manifest_ops).unwrap();
1741 let reloaded: HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint> =
1742 serde_json::from_str(&json).unwrap();
1743
1744 let recovered = manifest_operators_to_dag_states(&reloaded);
1746 let state = recovered
1747 .get(&laminar_core::dag::topology::NodeId(7))
1748 .unwrap();
1749 assert_eq!(state.data, vec![1, 2, 3, 4, 5]);
1750 }
1751
1752 #[test]
1753 fn test_manifest_operators_skips_invalid_keys() {
1754 use laminar_storage::checkpoint_manifest::OperatorCheckpoint;
1755
1756 let mut operators = HashMap::new();
1757 operators.insert("not-a-number".into(), OperatorCheckpoint::inline(b"data"));
1758 operators.insert("42".into(), OperatorCheckpoint::inline(b"good"));
1759
1760 let dag_states = manifest_operators_to_dag_states(&operators);
1761 assert_eq!(dag_states.len(), 1);
1763 assert!(dag_states.contains_key(&laminar_core::dag::topology::NodeId(42)));
1764 }
1765
1766 #[test]
1769 fn test_prepare_wal_no_wal_manager() {
1770 let dir = tempfile::tempdir().unwrap();
1771 let mut coord = make_coordinator(dir.path());
1772
1773 let result = coord.prepare_wal_for_checkpoint().unwrap();
1775 assert!(result.per_core_wal_positions.is_empty());
1776 assert_eq!(result.entries_drained, 0);
1777 }
1778
1779 #[test]
1780 fn test_prepare_wal_with_manager() {
1781 let dir = tempfile::tempdir().unwrap();
1782 let mut coord = make_coordinator(dir.path());
1783
1784 let wal_dir = dir.path().join("wal");
1786 std::fs::create_dir_all(&wal_dir).unwrap();
1787 let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1788 let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1789 coord.register_wal_manager(wal);
1790
1791 coord
1793 .wal_manager_mut()
1794 .unwrap()
1795 .writer(0)
1796 .append_put(b"key", b"value")
1797 .unwrap();
1798
1799 let result = coord.prepare_wal_for_checkpoint().unwrap();
1800 assert_eq!(result.per_core_wal_positions.len(), 2);
1801 assert!(result.per_core_wal_positions.iter().any(|p| *p > 0));
1803 }
1804
1805 #[test]
1806 fn test_truncate_wal_no_manager() {
1807 let dir = tempfile::tempdir().unwrap();
1808 let mut coord = make_coordinator(dir.path());
1809
1810 coord.truncate_wal_after_checkpoint(vec![]).unwrap();
1812 }
1813
1814 #[test]
1815 fn test_truncate_wal_with_manager() {
1816 let dir = tempfile::tempdir().unwrap();
1817 let mut coord = make_coordinator(dir.path());
1818
1819 let wal_dir = dir.path().join("wal");
1820 std::fs::create_dir_all(&wal_dir).unwrap();
1821 let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1822 let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1823 coord.register_wal_manager(wal);
1824
1825 coord
1827 .wal_manager_mut()
1828 .unwrap()
1829 .writer(0)
1830 .append_put(b"key", b"value")
1831 .unwrap();
1832
1833 assert!(coord.wal_manager().unwrap().total_size() > 0);
1834
1835 coord.truncate_wal_after_checkpoint(vec![]).unwrap();
1837 assert_eq!(coord.wal_manager().unwrap().total_size(), 0);
1838 }
1839
1840 #[test]
1841 fn test_truncate_wal_safety_buffer() {
1842 let dir = tempfile::tempdir().unwrap();
1843 let mut coord = make_coordinator(dir.path());
1844
1845 let wal_dir = dir.path().join("wal");
1846 std::fs::create_dir_all(&wal_dir).unwrap();
1847 let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1848 let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1849 coord.register_wal_manager(wal);
1850
1851 coord
1853 .wal_manager_mut()
1854 .unwrap()
1855 .writer(0)
1856 .append_put(b"k1", b"v1")
1857 .unwrap();
1858
1859 coord.truncate_wal_after_checkpoint(vec![100, 200]).unwrap();
1861 assert_eq!(coord.wal_manager().unwrap().total_size(), 0);
1862
1863 coord
1865 .wal_manager_mut()
1866 .unwrap()
1867 .writer(0)
1868 .append_put(b"k2", b"v2")
1869 .unwrap();
1870 let size_after_write = coord.wal_manager().unwrap().total_size();
1871 assert!(size_after_write > 0);
1872
1873 coord.truncate_wal_after_checkpoint(vec![300, 400]).unwrap();
1877 assert!(coord.wal_manager().unwrap().total_size() > 0);
1880 }
1881
1882 #[test]
1883 fn test_prepare_wal_with_changelog_drainer() {
1884 use laminar_storage::incremental::StateChangelogBuffer;
1885 use laminar_storage::incremental::StateChangelogEntry;
1886 use std::sync::Arc;
1887
1888 let dir = tempfile::tempdir().unwrap();
1889 let mut coord = make_coordinator(dir.path());
1890
1891 let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
1893
1894 buf.push(StateChangelogEntry::put(1, 100, 0, 10));
1896 buf.push(StateChangelogEntry::put(1, 200, 10, 20));
1897 buf.push(StateChangelogEntry::delete(1, 300));
1898
1899 let drainer = ChangelogDrainer::new(buf, 100);
1900 coord.register_changelog_drainer(drainer);
1901
1902 let result = coord.prepare_wal_for_checkpoint().unwrap();
1903 assert_eq!(result.entries_drained, 3);
1904 assert!(result.per_core_wal_positions.is_empty()); assert_eq!(coord.changelog_drainers()[0].pending_count(), 3);
1908 }
1909
1910 #[tokio::test]
1911 async fn test_full_checkpoint_with_wal_coordination() {
1912 use laminar_storage::incremental::StateChangelogBuffer;
1913 use laminar_storage::incremental::StateChangelogEntry;
1914 use std::sync::Arc;
1915
1916 let dir = tempfile::tempdir().unwrap();
1917 let mut coord = make_coordinator(dir.path());
1918
1919 let wal_dir = dir.path().join("wal");
1921 std::fs::create_dir_all(&wal_dir).unwrap();
1922 let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1923 let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1924 coord.register_wal_manager(wal);
1925
1926 let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
1928 buf.push(StateChangelogEntry::put(1, 100, 0, 10));
1929 let drainer = ChangelogDrainer::new(buf, 100);
1930 coord.register_changelog_drainer(drainer);
1931
1932 let result = coord
1934 .checkpoint(HashMap::new(), Some(5000), None, HashMap::new(), None)
1935 .await
1936 .unwrap();
1937
1938 assert!(result.success);
1939
1940 assert_eq!(
1942 coord.changelog_drainers()[0].pending_count(),
1943 0,
1944 "pending entries should be cleared after checkpoint"
1945 );
1946
1947 let loaded = coord.store().load_latest().unwrap().unwrap();
1949 assert_eq!(loaded.per_core_wal_positions.len(), 2);
1950
1951 coord.truncate_wal_after_checkpoint(vec![]).unwrap();
1953 assert_eq!(coord.wal_manager().unwrap().total_size(), 0);
1954 }
1955
1956 #[test]
1957 fn test_wal_manager_accessors() {
1958 let dir = tempfile::tempdir().unwrap();
1959 let mut coord = make_coordinator(dir.path());
1960
1961 assert!(coord.wal_manager().is_none());
1962 assert!(coord.wal_manager_mut().is_none());
1963 assert!(coord.changelog_drainers().is_empty());
1964
1965 let wal_dir = dir.path().join("wal");
1966 std::fs::create_dir_all(&wal_dir).unwrap();
1967 let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1968 let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1969 coord.register_wal_manager(wal);
1970
1971 assert!(coord.wal_manager().is_some());
1972 assert!(coord.wal_manager_mut().is_some());
1973 }
1974
1975 #[test]
1976 fn test_coordinator_debug_with_wal() {
1977 let dir = tempfile::tempdir().unwrap();
1978 let mut coord = make_coordinator(dir.path());
1979
1980 let wal_dir = dir.path().join("wal");
1981 std::fs::create_dir_all(&wal_dir).unwrap();
1982 let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1983 let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1984 coord.register_wal_manager(wal);
1985
1986 let debug = format!("{coord:?}");
1987 assert!(debug.contains("has_wal_manager: true"));
1988 assert!(debug.contains("changelog_drainers: 0"));
1989 }
1990
1991 #[tokio::test]
1994 async fn test_checkpoint_emits_metrics_on_success() {
1995 use crate::metrics::PipelineCounters;
1996 use std::sync::atomic::Ordering;
1997
1998 let dir = tempfile::tempdir().unwrap();
1999 let mut coord = make_coordinator(dir.path());
2000
2001 let counters = Arc::new(PipelineCounters::new());
2002 coord.set_counters(Arc::clone(&counters));
2003
2004 let result = coord
2005 .checkpoint(HashMap::new(), Some(1000), None, HashMap::new(), None)
2006 .await
2007 .unwrap();
2008
2009 assert!(result.success);
2010 assert_eq!(counters.checkpoints_completed.load(Ordering::Relaxed), 1);
2011 assert_eq!(counters.checkpoints_failed.load(Ordering::Relaxed), 0);
2012 assert!(counters.last_checkpoint_duration_ms.load(Ordering::Relaxed) < 5000);
2013 assert_eq!(counters.checkpoint_epoch.load(Ordering::Relaxed), 1);
2014
2015 let result2 = coord
2017 .checkpoint(HashMap::new(), Some(2000), None, HashMap::new(), None)
2018 .await
2019 .unwrap();
2020
2021 assert!(result2.success);
2022 assert_eq!(counters.checkpoints_completed.load(Ordering::Relaxed), 2);
2023 assert_eq!(counters.checkpoint_epoch.load(Ordering::Relaxed), 2);
2024 }
2025
2026 #[tokio::test]
2027 async fn test_checkpoint_without_counters() {
2028 let dir = tempfile::tempdir().unwrap();
2030 let mut coord = make_coordinator(dir.path());
2031
2032 let result = coord
2033 .checkpoint(HashMap::new(), None, None, HashMap::new(), None)
2034 .await
2035 .unwrap();
2036
2037 assert!(result.success);
2038 }
2040
2041 #[test]
2044 fn test_histogram_empty() {
2045 let h = DurationHistogram::new();
2046 assert_eq!(h.len(), 0);
2047 assert_eq!(h.percentile(0.50), 0);
2048 assert_eq!(h.percentile(0.99), 0);
2049 let (p50, p95, p99) = h.percentiles();
2050 assert_eq!((p50, p95, p99), (0, 0, 0));
2051 }
2052
2053 #[test]
2054 fn test_histogram_single_sample() {
2055 let mut h = DurationHistogram::new();
2056 h.record(Duration::from_millis(42));
2057 assert_eq!(h.len(), 1);
2058 assert_eq!(h.percentile(0.50), 42);
2059 assert_eq!(h.percentile(0.99), 42);
2060 }
2061
2062 #[test]
2063 fn test_histogram_percentiles() {
2064 let mut h = DurationHistogram::new();
2065 for i in 1..=100 {
2067 h.record(Duration::from_millis(i));
2068 }
2069 assert_eq!(h.len(), 100);
2070
2071 let p50 = h.percentile(0.50);
2072 let p95 = h.percentile(0.95);
2073 let p99 = h.percentile(0.99);
2074
2075 assert!((49..=51).contains(&p50), "p50={p50}");
2078 assert!((94..=96).contains(&p95), "p95={p95}");
2079 assert!((98..=100).contains(&p99), "p99={p99}");
2080 }
2081
2082 #[test]
2083 fn test_histogram_wraps_ring_buffer() {
2084 let mut h = DurationHistogram::new();
2085 for i in 1..=150 {
2087 h.record(Duration::from_millis(i));
2088 }
2089 assert_eq!(h.len(), 100);
2090 assert_eq!(h.count, 150);
2091
2092 let p50 = h.percentile(0.50);
2094 assert!((99..=101).contains(&p50), "p50={p50}");
2095 }
2096
2097 #[tokio::test]
2100 async fn test_sidecar_round_trip() {
2101 let dir = tempfile::tempdir().unwrap();
2102 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2103 let config = CheckpointConfig {
2104 state_inline_threshold: 100, ..CheckpointConfig::default()
2106 };
2107 let mut coord = CheckpointCoordinator::new(config, store);
2108
2109 let mut ops = HashMap::new();
2111 ops.insert("small".into(), vec![0xAAu8; 50]);
2112 ops.insert("large".into(), vec![0xBBu8; 200]);
2113
2114 let result = coord
2115 .checkpoint(ops, None, None, HashMap::new(), None)
2116 .await
2117 .unwrap();
2118 assert!(result.success);
2119
2120 let loaded = coord.store().load_latest().unwrap().unwrap();
2122 let small_op = loaded.operator_states.get("small").unwrap();
2123 assert!(!small_op.external, "small state should be inline");
2124 assert_eq!(small_op.decode_inline().unwrap(), vec![0xAAu8; 50]);
2125
2126 let large_op = loaded.operator_states.get("large").unwrap();
2127 assert!(large_op.external, "large state should be external");
2128 assert_eq!(large_op.external_length, 200);
2129
2130 let state_data = coord.store().load_state_data(1).unwrap().unwrap();
2132 assert_eq!(state_data.len(), 200);
2133 assert!(state_data.iter().all(|&b| b == 0xBB));
2134 }
2135
2136 #[tokio::test]
2137 async fn test_all_inline_no_sidecar() {
2138 let dir = tempfile::tempdir().unwrap();
2139 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2140 let config = CheckpointConfig::default(); let mut coord = CheckpointCoordinator::new(config, store);
2142
2143 let mut ops = HashMap::new();
2144 ops.insert("op1".into(), b"small-state".to_vec());
2145
2146 let result = coord
2147 .checkpoint(ops, None, None, HashMap::new(), None)
2148 .await
2149 .unwrap();
2150 assert!(result.success);
2151
2152 assert!(coord.store().load_state_data(1).unwrap().is_none());
2154 }
2155
2156 #[test]
2159 fn test_adaptive_disabled_by_default() {
2160 let dir = tempfile::tempdir().unwrap();
2161 let coord = make_coordinator(dir.path());
2162 assert!(coord.config().adaptive.is_none());
2163 assert_eq!(coord.config().interval, Some(Duration::from_secs(60)));
2164 }
2165
2166 #[test]
2167 fn test_adaptive_increases_interval_for_slow_checkpoints() {
2168 let dir = tempfile::tempdir().unwrap();
2169 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2170 let config = CheckpointConfig {
2171 adaptive: Some(AdaptiveCheckpointConfig::default()),
2172 ..CheckpointConfig::default()
2173 };
2174 let mut coord = CheckpointCoordinator::new(config, store);
2175
2176 coord.last_checkpoint_duration = Some(Duration::from_secs(5));
2178 coord.adjust_interval();
2179
2180 let interval = coord.config().interval.unwrap();
2182 assert!(
2183 interval >= Duration::from_secs(49) && interval <= Duration::from_secs(51),
2184 "expected ~50s, got {interval:?}",
2185 );
2186 }
2187
2188 #[test]
2189 fn test_adaptive_decreases_interval_for_fast_checkpoints() {
2190 let dir = tempfile::tempdir().unwrap();
2191 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2192 let config = CheckpointConfig {
2193 adaptive: Some(AdaptiveCheckpointConfig::default()),
2194 ..CheckpointConfig::default()
2195 };
2196 let mut coord = CheckpointCoordinator::new(config, store);
2197
2198 coord.last_checkpoint_duration = Some(Duration::from_millis(100));
2200 coord.adjust_interval();
2201
2202 let interval = coord.config().interval.unwrap();
2203 assert_eq!(
2204 interval,
2205 Duration::from_secs(10),
2206 "should clamp to min_interval"
2207 );
2208 }
2209
2210 #[test]
2211 fn test_adaptive_clamps_to_min_max() {
2212 let dir = tempfile::tempdir().unwrap();
2213 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2214 let config = CheckpointConfig {
2215 adaptive: Some(AdaptiveCheckpointConfig {
2216 min_interval: Duration::from_secs(20),
2217 max_interval: Duration::from_secs(120),
2218 target_overhead_ratio: 0.1,
2219 smoothing_alpha: 1.0, }),
2221 ..CheckpointConfig::default()
2222 };
2223 let mut coord = CheckpointCoordinator::new(config, store);
2224
2225 coord.last_checkpoint_duration = Some(Duration::from_secs(60));
2227 coord.adjust_interval();
2228 let interval = coord.config().interval.unwrap();
2229 assert_eq!(interval, Duration::from_secs(120), "should clamp to max");
2230
2231 coord.last_checkpoint_duration = Some(Duration::from_millis(10));
2233 coord.smoothed_duration_ms = 0.0; coord.adjust_interval();
2235 let interval = coord.config().interval.unwrap();
2236 assert_eq!(interval, Duration::from_secs(20), "should clamp to min");
2237 }
2238
2239 #[test]
2240 fn test_adaptive_ema_smoothing() {
2241 let dir = tempfile::tempdir().unwrap();
2242 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2243 let config = CheckpointConfig {
2244 adaptive: Some(AdaptiveCheckpointConfig {
2245 min_interval: Duration::from_secs(1),
2246 max_interval: Duration::from_secs(600),
2247 target_overhead_ratio: 0.1,
2248 smoothing_alpha: 0.5,
2249 }),
2250 ..CheckpointConfig::default()
2251 };
2252 let mut coord = CheckpointCoordinator::new(config, store);
2253
2254 coord.last_checkpoint_duration = Some(Duration::from_millis(1000));
2256 coord.adjust_interval();
2257 assert!((coord.smoothed_duration_ms() - 1000.0).abs() < 1.0);
2258
2259 coord.last_checkpoint_duration = Some(Duration::from_millis(2000));
2261 coord.adjust_interval();
2262 assert!((coord.smoothed_duration_ms() - 1500.0).abs() < 1.0);
2263
2264 coord.last_checkpoint_duration = Some(Duration::from_millis(2000));
2266 coord.adjust_interval();
2267 assert!((coord.smoothed_duration_ms() - 1750.0).abs() < 1.0);
2268 }
2269
2270 #[tokio::test]
2271 async fn test_stats_include_percentiles_after_checkpoints() {
2272 let dir = tempfile::tempdir().unwrap();
2273 let mut coord = make_coordinator(dir.path());
2274
2275 for _ in 0..3 {
2277 let result = coord
2278 .checkpoint(HashMap::new(), None, None, HashMap::new(), None)
2279 .await
2280 .unwrap();
2281 assert!(result.success);
2282 }
2283
2284 let stats = coord.stats();
2285 assert_eq!(stats.completed, 3);
2286 assert!(stats.last_duration.is_some());
2289 }
2290}