1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use std::sync::atomic::Ordering;
24
25use laminar_connectors::checkpoint::SourceCheckpoint;
26use laminar_connectors::connector::SourceConnector;
27use laminar_storage::changelog_drainer::ChangelogDrainer;
28use laminar_storage::checkpoint_manifest::{
29 CheckpointManifest, ConnectorCheckpoint, SinkCommitStatus,
30};
31use laminar_storage::checkpoint_store::CheckpointStore;
32use laminar_storage::per_core_wal::PerCoreWalManager;
33use tracing::{debug, error, info, warn};
34
35use crate::error::DbError;
36use crate::metrics::PipelineCounters;
37
38#[derive(Debug, Clone)]
40pub struct CheckpointConfig {
41 pub interval: Option<Duration>,
43 pub max_retained: usize,
45 pub alignment_timeout: Duration,
47 pub pre_commit_timeout: Duration,
52 pub persist_timeout: Duration,
57 pub commit_timeout: Duration,
61 pub incremental: bool,
63 pub state_inline_threshold: usize,
71 pub max_checkpoint_bytes: Option<usize>,
79 pub max_pending_changelog_entries: usize,
88 pub adaptive: Option<AdaptiveCheckpointConfig>,
94 pub unaligned: Option<UnalignedCheckpointConfig>,
100}
101
102#[derive(Debug, Clone)]
110pub struct AdaptiveCheckpointConfig {
111 pub min_interval: Duration,
113 pub max_interval: Duration,
115 pub target_overhead_ratio: f64,
120 pub smoothing_alpha: f64,
125}
126
127impl Default for AdaptiveCheckpointConfig {
128 fn default() -> Self {
129 Self {
130 min_interval: Duration::from_secs(10),
131 max_interval: Duration::from_secs(300),
132 target_overhead_ratio: 0.1,
133 smoothing_alpha: 0.3,
134 }
135 }
136}
137
138#[derive(Debug, Clone)]
144pub struct UnalignedCheckpointConfig {
145 pub enabled: bool,
147 pub alignment_timeout_threshold: Duration,
152 pub max_inflight_buffer_bytes: usize,
157 pub force_unaligned: bool,
161}
162
163impl Default for UnalignedCheckpointConfig {
164 fn default() -> Self {
165 Self {
166 enabled: true,
167 alignment_timeout_threshold: Duration::from_secs(10),
168 max_inflight_buffer_bytes: 256 * 1024 * 1024,
169 force_unaligned: false,
170 }
171 }
172}
173
174impl Default for CheckpointConfig {
175 fn default() -> Self {
176 Self {
177 interval: Some(Duration::from_secs(60)),
178 max_retained: 3,
179 alignment_timeout: Duration::from_secs(30),
180 pre_commit_timeout: Duration::from_secs(30),
181 persist_timeout: Duration::from_secs(120),
182 commit_timeout: Duration::from_secs(60),
183 incremental: false,
184 state_inline_threshold: 1_048_576,
185 max_checkpoint_bytes: None,
186 max_pending_changelog_entries: 10_000_000,
187 adaptive: None,
188 unaligned: None,
189 }
190 }
191}
192
193#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
195pub enum CheckpointPhase {
196 Idle,
198 BarrierInFlight,
200 Snapshotting,
202 PreCommitting,
204 Persisting,
206 Committing,
208}
209
210impl std::fmt::Display for CheckpointPhase {
211 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212 match self {
213 Self::Idle => write!(f, "Idle"),
214 Self::BarrierInFlight => write!(f, "BarrierInFlight"),
215 Self::Snapshotting => write!(f, "Snapshotting"),
216 Self::PreCommitting => write!(f, "PreCommitting"),
217 Self::Persisting => write!(f, "Persisting"),
218 Self::Committing => write!(f, "Committing"),
219 }
220 }
221}
222
223#[derive(Debug, serde::Serialize)]
225pub struct CheckpointResult {
226 pub success: bool,
228 pub checkpoint_id: u64,
230 pub epoch: u64,
232 pub duration: Duration,
234 pub error: Option<String>,
236}
237
238pub(crate) struct RegisteredSource {
240 pub name: String,
242 pub connector: Arc<tokio::sync::Mutex<Box<dyn SourceConnector>>>,
244 pub supports_replay: bool,
249}
250
251pub(crate) struct RegisteredSink {
253 pub name: String,
255 pub handle: crate::sink_task::SinkTaskHandle,
257 pub exactly_once: bool,
259}
260
261#[derive(Debug, Clone)]
266pub struct WalPrepareResult {
267 pub per_core_wal_positions: Vec<u64>,
269 pub entries_drained: u64,
271}
272
273pub struct CheckpointCoordinator {
279 config: CheckpointConfig,
280 store: Arc<dyn CheckpointStore>,
281 sources: Vec<RegisteredSource>,
282 sinks: Vec<RegisteredSink>,
283 table_sources: Vec<RegisteredSource>,
284 next_checkpoint_id: u64,
285 epoch: u64,
286 phase: CheckpointPhase,
287 checkpoints_completed: u64,
288 checkpoints_failed: u64,
289 last_checkpoint_duration: Option<Duration>,
290 duration_histogram: DurationHistogram,
292 wal_manager: Option<PerCoreWalManager>,
294 changelog_drainers: Vec<ChangelogDrainer>,
296 counters: Option<Arc<PipelineCounters>>,
298 smoothed_duration_ms: f64,
300 total_bytes_written: u64,
302 unaligned_checkpoint_count: u64,
304 previous_wal_positions: Option<Vec<u64>>,
307}
308
309impl CheckpointCoordinator {
310 #[must_use]
312 pub fn new(config: CheckpointConfig, store: Box<dyn CheckpointStore>) -> Self {
313 let store: Arc<dyn CheckpointStore> = Arc::from(store);
314 let (next_id, epoch) = match store.load_latest() {
316 Ok(Some(m)) => (m.checkpoint_id + 1, m.epoch + 1),
317 _ => (1, 1),
318 };
319
320 Self {
321 config,
322 store,
323 sources: Vec::new(),
324 sinks: Vec::new(),
325 table_sources: Vec::new(),
326 next_checkpoint_id: next_id,
327 epoch,
328 phase: CheckpointPhase::Idle,
329 checkpoints_completed: 0,
330 checkpoints_failed: 0,
331 last_checkpoint_duration: None,
332 duration_histogram: DurationHistogram::new(),
333 wal_manager: None,
334 changelog_drainers: Vec::new(),
335 counters: None,
336 smoothed_duration_ms: 0.0,
337 total_bytes_written: 0,
338 unaligned_checkpoint_count: 0,
339 previous_wal_positions: None,
340 }
341 }
342
343 pub fn register_source(
350 &mut self,
351 name: impl Into<String>,
352 connector: Arc<tokio::sync::Mutex<Box<dyn SourceConnector>>>,
353 supports_replay: bool,
354 ) {
355 let name = name.into();
356 if !supports_replay {
357 warn!(
358 source = %name,
359 "source does not support replay — exactly-once semantics \
360 are degraded to at-most-once for this source"
361 );
362 }
363 self.sources.push(RegisteredSource {
364 name,
365 connector,
366 supports_replay,
367 });
368 }
369
370 pub(crate) fn register_sink(
372 &mut self,
373 name: impl Into<String>,
374 handle: crate::sink_task::SinkTaskHandle,
375 exactly_once: bool,
376 ) {
377 self.sinks.push(RegisteredSink {
378 name: name.into(),
379 handle,
380 exactly_once,
381 });
382 }
383
384 pub async fn begin_initial_epoch(&self) -> Result<(), DbError> {
395 self.begin_epoch_for_sinks(self.epoch).await
396 }
397
398 async fn begin_epoch_for_sinks(&self, epoch: u64) -> Result<(), DbError> {
401 let mut started: Vec<&RegisteredSink> = Vec::new();
402 for sink in &self.sinks {
403 if sink.exactly_once {
404 match sink.handle.begin_epoch(epoch).await {
405 Ok(()) => {
406 started.push(sink);
407 debug!(sink = %sink.name, epoch, "began epoch");
408 }
409 Err(e) => {
410 for s in &started {
412 s.handle.rollback_epoch(epoch).await;
413 }
414 return Err(DbError::Checkpoint(format!(
415 "sink '{}' failed to begin epoch {epoch}: {e}",
416 sink.name
417 )));
418 }
419 }
420 }
421 }
422 Ok(())
423 }
424
425 pub fn register_table_source(
427 &mut self,
428 name: impl Into<String>,
429 connector: Arc<tokio::sync::Mutex<Box<dyn SourceConnector>>>,
430 ) {
431 self.table_sources.push(RegisteredSource {
432 name: name.into(),
433 connector,
434 supports_replay: true, });
436 }
437
438 pub fn set_counters(&mut self, counters: Arc<PipelineCounters>) {
445 self.counters = Some(counters);
446 }
447
448 fn emit_checkpoint_metrics(&self, success: bool, epoch: u64, duration: Duration) {
450 if let Some(ref counters) = self.counters {
451 if success {
452 counters
453 .checkpoints_completed
454 .fetch_add(1, Ordering::Relaxed);
455 } else {
456 counters.checkpoints_failed.fetch_add(1, Ordering::Relaxed);
457 }
458 #[allow(clippy::cast_possible_truncation)]
459 counters
460 .last_checkpoint_duration_ms
461 .store(duration.as_millis() as u64, Ordering::Relaxed);
462 counters.checkpoint_epoch.store(epoch, Ordering::Relaxed);
463 }
464 }
465
466 pub fn register_wal_manager(&mut self, wal_manager: PerCoreWalManager) {
475 self.wal_manager = Some(wal_manager);
476 }
477
478 pub fn register_changelog_drainer(&mut self, drainer: ChangelogDrainer) {
484 self.changelog_drainers.push(drainer);
485 }
486
487 pub fn prepare_wal_for_checkpoint(&mut self) -> Result<WalPrepareResult, DbError> {
501 let mut total_drained: u64 = 0;
503 for drainer in &mut self.changelog_drainers {
504 let count = drainer.drain();
505 total_drained += count as u64;
506 debug!(
507 drained = count,
508 pending = drainer.pending_count(),
509 "changelog drainer flushed"
510 );
511 }
512
513 let per_core_wal_positions = if let Some(ref mut wal) = self.wal_manager {
515 let epoch = wal.advance_epoch();
516 wal.set_epoch_all(epoch);
517
518 wal.write_epoch_barrier_all()
519 .map_err(|e| DbError::Checkpoint(format!("WAL epoch barrier failed: {e}")))?;
520
521 wal.sync_all()
522 .map_err(|e| DbError::Checkpoint(format!("WAL sync failed: {e}")))?;
523
524 let positions = wal.synced_positions();
526 debug!(epoch, positions = ?positions, "WAL prepared for checkpoint");
527 positions
528 } else {
529 Vec::new()
530 };
531
532 Ok(WalPrepareResult {
533 per_core_wal_positions,
534 entries_drained: total_drained,
535 })
536 }
537
538 pub fn truncate_wal_after_checkpoint(
547 &mut self,
548 current_positions: Vec<u64>,
549 ) -> Result<(), DbError> {
550 if let Some(ref mut wal) = self.wal_manager {
551 if let Some(ref prev) = self.previous_wal_positions {
552 wal.truncate_all(prev)
555 .map_err(|e| DbError::Checkpoint(format!("WAL truncation failed: {e}")))?;
556 debug!("WAL segments truncated to previous checkpoint positions");
557 } else {
558 wal.reset_all()
560 .map_err(|e| DbError::Checkpoint(format!("WAL truncation failed: {e}")))?;
561 debug!("WAL segments reset after first checkpoint");
562 }
563 }
564 self.previous_wal_positions = Some(current_positions);
565 Ok(())
566 }
567
568 #[must_use]
570 pub fn wal_manager(&self) -> Option<&PerCoreWalManager> {
571 self.wal_manager.as_ref()
572 }
573
574 pub fn wal_manager_mut(&mut self) -> Option<&mut PerCoreWalManager> {
576 self.wal_manager.as_mut()
577 }
578
579 #[must_use]
581 pub fn changelog_drainers(&self) -> &[ChangelogDrainer] {
582 &self.changelog_drainers
583 }
584
585 pub fn changelog_drainers_mut(&mut self) -> &mut [ChangelogDrainer] {
587 &mut self.changelog_drainers
588 }
589
590 fn maybe_cap_drainers(&mut self) {
596 let cap = self.config.max_pending_changelog_entries;
597 let needs_clear = self
598 .changelog_drainers
599 .iter()
600 .any(|d| d.pending_count() > cap);
601 if needs_clear {
602 let total: usize = self
603 .changelog_drainers
604 .iter()
605 .map(ChangelogDrainer::pending_count)
606 .sum();
607 warn!(
608 total_pending = total,
609 cap,
610 "[LDB-6010] changelog drainer exceeded cap after checkpoint failure — \
611 clearing to prevent OOM"
612 );
613 for drainer in &mut self.changelog_drainers {
614 drainer.clear_pending();
615 }
616 }
617 }
618
619 #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
636 pub async fn checkpoint(
637 &mut self,
638 operator_states: HashMap<String, Vec<u8>>,
639 watermark: Option<i64>,
640 table_store_checkpoint_path: Option<String>,
641 source_watermarks: HashMap<String, i64>,
642 pipeline_hash: Option<u64>,
643 ) -> Result<CheckpointResult, DbError> {
644 self.checkpoint_inner(
645 operator_states,
646 watermark,
647 table_store_checkpoint_path,
648 HashMap::new(),
649 source_watermarks,
650 pipeline_hash,
651 HashMap::new(),
652 )
653 .await
654 }
655
656 async fn snapshot_sources(
661 &self,
662 sources: &[RegisteredSource],
663 ) -> Result<HashMap<String, ConnectorCheckpoint>, DbError> {
664 use futures::future::join_all;
665
666 let futs = sources.iter().map(|source| {
667 let connector = Arc::clone(&source.connector);
668 let name = source.name.clone();
669 async move {
670 let guard = connector.lock().await;
671 let cp = guard.checkpoint();
672 let result = source_to_connector_checkpoint(&cp);
673 debug!(source = %name, epoch = cp.epoch(), "source snapshotted");
674 (name, result)
675 }
676 });
677
678 let results = join_all(futs).await;
679 Ok(results.into_iter().collect())
680 }
681
682 async fn snapshot_source_refs(
684 &self,
685 sources: &[&RegisteredSource],
686 ) -> Result<HashMap<String, ConnectorCheckpoint>, DbError> {
687 use futures::future::join_all;
688
689 let futs = sources.iter().map(|source| {
690 let connector = Arc::clone(&source.connector);
691 let name = source.name.clone();
692 async move {
693 let guard = connector.lock().await;
694 let cp = guard.checkpoint();
695 let result = source_to_connector_checkpoint(&cp);
696 debug!(source = %name, epoch = cp.epoch(), "source snapshotted");
697 (name, result)
698 }
699 });
700
701 let results = join_all(futs).await;
702 Ok(results.into_iter().collect())
703 }
704
705 async fn pre_commit_sinks(&self, epoch: u64) -> Result<(), DbError> {
710 let timeout_dur = self.config.pre_commit_timeout;
711
712 match tokio::time::timeout(timeout_dur, self.pre_commit_sinks_inner(epoch)).await {
713 Ok(result) => result,
714 Err(_elapsed) => Err(DbError::Checkpoint(format!(
715 "pre-commit timed out after {}s",
716 timeout_dur.as_secs()
717 ))),
718 }
719 }
720
721 async fn pre_commit_sinks_inner(&self, epoch: u64) -> Result<(), DbError> {
727 for sink in &self.sinks {
728 if sink.exactly_once {
729 sink.handle.pre_commit(epoch).await.map_err(|e| {
730 DbError::Checkpoint(format!("sink '{}' pre-commit failed: {e}", sink.name))
731 })?;
732 debug!(sink = %sink.name, epoch, "sink pre-committed");
733 }
734 }
735 Ok(())
736 }
737
738 async fn commit_sinks_tracked(&self, epoch: u64) -> HashMap<String, SinkCommitStatus> {
747 let timeout_dur = self.config.commit_timeout;
748
749 match tokio::time::timeout(timeout_dur, self.commit_sinks_inner(epoch)).await {
750 Ok(statuses) => statuses,
751 Err(_elapsed) => {
752 error!(
753 epoch,
754 timeout_secs = timeout_dur.as_secs(),
755 "[LDB-6012] sink commit timed out — marking all pending sinks as failed"
756 );
757 self.sinks
758 .iter()
759 .filter(|s| s.exactly_once)
760 .map(|s| {
761 (
762 s.name.clone(),
763 SinkCommitStatus::Failed(format!(
764 "sink '{}' commit timed out after {}s",
765 s.name,
766 timeout_dur.as_secs()
767 )),
768 )
769 })
770 .collect()
771 }
772 }
773 }
774
775 async fn commit_sinks_inner(&self, epoch: u64) -> HashMap<String, SinkCommitStatus> {
777 let mut statuses = HashMap::with_capacity(self.sinks.len());
778
779 for sink in &self.sinks {
780 if sink.exactly_once {
781 match sink.handle.commit_epoch(epoch).await {
782 Ok(()) => {
783 statuses.insert(sink.name.clone(), SinkCommitStatus::Committed);
784 debug!(sink = %sink.name, epoch, "sink committed");
785 }
786 Err(e) => {
787 let msg = format!("sink '{}' commit failed: {e}", sink.name);
788 error!(sink = %sink.name, epoch, error = %e, "sink commit failed");
789 statuses.insert(sink.name.clone(), SinkCommitStatus::Failed(msg));
790 }
791 }
792 }
793 }
794
795 statuses
796 }
797
798 async fn save_manifest(
810 &self,
811 manifest: CheckpointManifest,
812 state_data: Option<Vec<u8>>,
813 ) -> Result<(), DbError> {
814 let store = Arc::clone(&self.store);
815 let timeout_dur = self.config.persist_timeout;
816
817 let task = tokio::task::spawn_blocking(move || {
818 store.save_with_state(&manifest, state_data.as_deref())
819 });
820
821 match tokio::time::timeout(timeout_dur, task).await {
822 Ok(Ok(Ok(()))) => Ok(()),
823 Ok(Ok(Err(e))) => Err(DbError::Checkpoint(format!("manifest persist failed: {e}"))),
824 Ok(Err(join_err)) => Err(DbError::Checkpoint(format!(
825 "manifest persist task failed: {join_err}"
826 ))),
827 Err(_elapsed) => Err(DbError::Checkpoint(format!(
828 "[LDB-6011] manifest persist timed out after {}s — \
829 filesystem may be degraded",
830 timeout_dur.as_secs()
831 ))),
832 }
833 }
834
835 async fn update_manifest_only(&self, manifest: &CheckpointManifest) -> Result<(), DbError> {
839 let store = Arc::clone(&self.store);
840 let manifest = manifest.clone();
841 let timeout_dur = self.config.persist_timeout;
842
843 let task = tokio::task::spawn_blocking(move || store.update_manifest(&manifest));
844
845 match tokio::time::timeout(timeout_dur, task).await {
846 Ok(Ok(Ok(()))) => Ok(()),
847 Ok(Ok(Err(e))) => Err(DbError::Checkpoint(format!("manifest update failed: {e}"))),
848 Ok(Err(join_err)) => Err(DbError::Checkpoint(format!(
849 "manifest update task failed: {join_err}"
850 ))),
851 Err(_elapsed) => Err(DbError::Checkpoint(format!(
852 "manifest update timed out after {}s",
853 timeout_dur.as_secs()
854 ))),
855 }
856 }
857
858 fn initial_sink_commit_statuses(&self) -> HashMap<String, SinkCommitStatus> {
860 self.sinks
861 .iter()
862 .filter(|s| s.exactly_once)
863 .map(|s| (s.name.clone(), SinkCommitStatus::Pending))
864 .collect()
865 }
866
867 fn pack_operator_states(
872 manifest: &mut CheckpointManifest,
873 operator_states: &HashMap<String, Vec<u8>>,
874 threshold: usize,
875 ) -> Option<Vec<u8>> {
876 let mut sidecar_blobs: Vec<u8> = Vec::new();
877 for (name, data) in operator_states {
878 let (op_ckpt, maybe_blob) =
879 laminar_storage::checkpoint_manifest::OperatorCheckpoint::from_bytes(
880 data,
881 threshold,
882 sidecar_blobs.len() as u64,
883 );
884 if let Some(blob) = maybe_blob {
885 sidecar_blobs.extend_from_slice(&blob);
886 }
887 manifest.operator_states.insert(name.clone(), op_ckpt);
888 }
889
890 if sidecar_blobs.is_empty() {
891 None
892 } else {
893 Some(sidecar_blobs)
894 }
895 }
896
897 async fn rollback_sinks(&self, epoch: u64) -> Result<(), DbError> {
899 for sink in &self.sinks {
900 if sink.exactly_once {
901 sink.handle.rollback_epoch(epoch).await;
902 }
903 }
904 Ok(())
905 }
906
907 fn collect_sink_epochs(&self) -> HashMap<String, u64> {
909 let mut epochs = HashMap::with_capacity(self.sinks.len());
910 for sink in &self.sinks {
911 if sink.exactly_once {
913 epochs.insert(sink.name.clone(), self.epoch);
914 }
915 }
916 epochs
917 }
918
919 fn sorted_source_names(&self) -> Vec<String> {
921 let mut names: Vec<String> = self.sources.iter().map(|s| s.name.clone()).collect();
922 names.sort();
923 names
924 }
925
926 fn sorted_sink_names(&self) -> Vec<String> {
928 let mut names: Vec<String> = self.sinks.iter().map(|s| s.name.clone()).collect();
929 names.sort();
930 names
931 }
932
933 #[must_use]
935 pub fn phase(&self) -> CheckpointPhase {
936 self.phase
937 }
938
939 #[must_use]
941 pub fn epoch(&self) -> u64 {
942 self.epoch
943 }
944
945 #[must_use]
947 pub(crate) fn registered_sources(&self) -> &[RegisteredSource] {
948 &self.sources
949 }
950
951 #[must_use]
953 pub fn next_checkpoint_id(&self) -> u64 {
954 self.next_checkpoint_id
955 }
956
957 #[must_use]
959 pub fn config(&self) -> &CheckpointConfig {
960 &self.config
961 }
962
963 fn adjust_interval(&mut self) {
971 let adaptive = match &self.config.adaptive {
972 Some(a) => a.clone(),
973 None => return,
974 };
975
976 #[allow(clippy::cast_precision_loss)] let last_ms = match self.last_checkpoint_duration {
978 Some(d) => d.as_millis() as f64,
979 None => return,
980 };
981
982 if self.smoothed_duration_ms == 0.0 {
984 self.smoothed_duration_ms = last_ms;
985 } else {
986 self.smoothed_duration_ms = adaptive.smoothing_alpha * last_ms
987 + (1.0 - adaptive.smoothing_alpha) * self.smoothed_duration_ms;
988 }
989
990 let new_interval_secs =
992 self.smoothed_duration_ms / (1000.0 * adaptive.target_overhead_ratio);
993 let new_interval = Duration::from_secs_f64(new_interval_secs);
994
995 let clamped = new_interval.clamp(adaptive.min_interval, adaptive.max_interval);
997
998 let old_interval = self.config.interval;
999 self.config.interval = Some(clamped);
1000
1001 if old_interval != Some(clamped) {
1002 debug!(
1003 old_interval_ms = old_interval.map(|d| d.as_millis()),
1004 new_interval_ms = clamped.as_millis(),
1005 smoothed_duration_ms = self.smoothed_duration_ms,
1006 "adaptive checkpoint interval adjusted"
1007 );
1008 }
1009 }
1010
1011 #[must_use]
1016 pub fn smoothed_duration_ms(&self) -> f64 {
1017 self.smoothed_duration_ms
1018 }
1019
1020 #[must_use]
1022 pub fn unaligned_checkpoint_count(&self) -> u64 {
1023 self.unaligned_checkpoint_count
1024 }
1025
1026 #[must_use]
1028 pub fn stats(&self) -> CheckpointStats {
1029 let (p50, p95, p99) = self.duration_histogram.percentiles();
1030 CheckpointStats {
1031 completed: self.checkpoints_completed,
1032 failed: self.checkpoints_failed,
1033 last_duration: self.last_checkpoint_duration,
1034 duration_p50_ms: p50,
1035 duration_p95_ms: p95,
1036 duration_p99_ms: p99,
1037 total_bytes_written: self.total_bytes_written,
1038 current_phase: self.phase,
1039 current_epoch: self.epoch,
1040 unaligned_checkpoint_count: self.unaligned_checkpoint_count,
1041 }
1042 }
1043
1044 #[must_use]
1046 pub fn store(&self) -> &dyn CheckpointStore {
1047 &*self.store
1048 }
1049
1050 #[allow(clippy::too_many_arguments)]
1061 pub async fn checkpoint_with_extra_tables(
1062 &mut self,
1063 operator_states: HashMap<String, Vec<u8>>,
1064 watermark: Option<i64>,
1065 table_store_checkpoint_path: Option<String>,
1066 extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
1067 source_watermarks: HashMap<String, i64>,
1068 pipeline_hash: Option<u64>,
1069 ) -> Result<CheckpointResult, DbError> {
1070 self.checkpoint_inner(
1071 operator_states,
1072 watermark,
1073 table_store_checkpoint_path,
1074 extra_table_offsets,
1075 source_watermarks,
1076 pipeline_hash,
1077 HashMap::new(),
1078 )
1079 .await
1080 }
1081
1082 #[allow(clippy::too_many_arguments)]
1093 pub async fn checkpoint_with_offsets(
1094 &mut self,
1095 operator_states: HashMap<String, Vec<u8>>,
1096 watermark: Option<i64>,
1097 table_store_checkpoint_path: Option<String>,
1098 extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
1099 source_watermarks: HashMap<String, i64>,
1100 pipeline_hash: Option<u64>,
1101 source_offset_overrides: HashMap<String, ConnectorCheckpoint>,
1102 ) -> Result<CheckpointResult, DbError> {
1103 self.checkpoint_inner(
1104 operator_states,
1105 watermark,
1106 table_store_checkpoint_path,
1107 extra_table_offsets,
1108 source_watermarks,
1109 pipeline_hash,
1110 source_offset_overrides,
1111 )
1112 .await
1113 }
1114
1115 #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1126 async fn checkpoint_inner(
1127 &mut self,
1128 operator_states: HashMap<String, Vec<u8>>,
1129 watermark: Option<i64>,
1130 table_store_checkpoint_path: Option<String>,
1131 extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
1132 source_watermarks: HashMap<String, i64>,
1133 pipeline_hash: Option<u64>,
1134 source_offset_overrides: HashMap<String, ConnectorCheckpoint>,
1135 ) -> Result<CheckpointResult, DbError> {
1136 let start = Instant::now();
1137 let checkpoint_id = self.next_checkpoint_id;
1138 let epoch = self.epoch;
1139
1140 info!(checkpoint_id, epoch, "starting checkpoint");
1141
1142 let wal_result = self.prepare_wal_for_checkpoint()?;
1147 let per_core_wal_positions = wal_result.per_core_wal_positions;
1148
1149 self.phase = CheckpointPhase::Snapshotting;
1151
1152 let sources_to_snapshot: Vec<&RegisteredSource> = self
1156 .sources
1157 .iter()
1158 .filter(|s| !source_offset_overrides.contains_key(&s.name))
1159 .collect();
1160 let (mut source_offsets, mut table_offsets) = tokio::try_join!(
1161 self.snapshot_source_refs(&sources_to_snapshot),
1162 self.snapshot_sources(&self.table_sources),
1163 )?;
1164
1165 for (name, cp) in source_offset_overrides {
1167 source_offsets.insert(name, cp);
1168 }
1169
1170 for (name, cp) in extra_table_offsets {
1172 table_offsets.insert(name, cp);
1173 }
1174
1175 self.phase = CheckpointPhase::PreCommitting;
1177 if let Err(e) = self.pre_commit_sinks(epoch).await {
1178 self.phase = CheckpointPhase::Idle;
1179 self.checkpoints_failed += 1;
1180 self.maybe_cap_drainers();
1181 let duration = start.elapsed();
1182 self.emit_checkpoint_metrics(false, epoch, duration);
1183 error!(checkpoint_id, epoch, error = %e, "pre-commit failed");
1184 return Ok(CheckpointResult {
1185 success: false,
1186 checkpoint_id,
1187 epoch,
1188 duration,
1189 error: Some(format!("pre-commit failed: {e}")),
1190 });
1191 }
1192
1193 let mut manifest = CheckpointManifest::new(checkpoint_id, epoch);
1195 manifest.source_offsets = source_offsets;
1196 manifest.table_offsets = table_offsets;
1197 manifest.sink_epochs = self.collect_sink_epochs();
1198 manifest.sink_commit_statuses = self.initial_sink_commit_statuses();
1200 manifest.watermark = watermark;
1201 manifest.source_watermarks = source_watermarks;
1206 manifest.per_core_wal_positions = per_core_wal_positions;
1208 manifest.table_store_checkpoint_path = table_store_checkpoint_path;
1209 manifest.is_incremental = self.config.incremental;
1210 manifest.source_names = self.sorted_source_names();
1211 manifest.sink_names = self.sorted_sink_names();
1212 manifest.pipeline_hash = pipeline_hash;
1213
1214 let state_data = Self::pack_operator_states(
1215 &mut manifest,
1216 &operator_states,
1217 self.config.state_inline_threshold,
1218 );
1219 let sidecar_bytes = state_data.as_ref().map_or(0, Vec::len);
1220 if sidecar_bytes > 0 {
1221 debug!(
1222 checkpoint_id,
1223 sidecar_bytes, "writing operator state sidecar"
1224 );
1225 }
1226
1227 if let Some(cap) = self.config.max_checkpoint_bytes {
1229 if sidecar_bytes > cap {
1230 self.phase = CheckpointPhase::Idle;
1231 self.checkpoints_failed += 1;
1232 self.maybe_cap_drainers();
1233 let duration = start.elapsed();
1234 self.emit_checkpoint_metrics(false, epoch, duration);
1235 let msg = format!(
1236 "[LDB-6014] checkpoint size {sidecar_bytes} bytes exceeds \
1237 cap {cap} bytes — checkpoint rejected"
1238 );
1239 error!(checkpoint_id, epoch, sidecar_bytes, cap, "{msg}");
1240 return Ok(CheckpointResult {
1241 success: false,
1242 checkpoint_id,
1243 epoch,
1244 duration,
1245 error: Some(msg),
1246 });
1247 }
1248 let warn_threshold = cap * 4 / 5; if sidecar_bytes > warn_threshold {
1250 warn!(
1251 checkpoint_id,
1252 epoch, sidecar_bytes, cap, "checkpoint size approaching cap (>80%)"
1253 );
1254 }
1255 }
1256 let checkpoint_bytes = sidecar_bytes as u64;
1258
1259 self.phase = CheckpointPhase::Persisting;
1261 if let Err(e) = self.save_manifest(manifest.clone(), state_data).await {
1262 self.phase = CheckpointPhase::Idle;
1263 self.checkpoints_failed += 1;
1264 self.maybe_cap_drainers();
1265 let duration = start.elapsed();
1266 self.emit_checkpoint_metrics(false, epoch, duration);
1267 if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1268 error!(
1269 checkpoint_id,
1270 epoch,
1271 error = %rollback_err,
1272 "[LDB-6004] sink rollback failed after manifest persist failure — \
1273 sinks may be in an inconsistent state"
1274 );
1275 }
1276 error!(checkpoint_id, epoch, error = %e, "[LDB-6008] manifest persist failed");
1277 return Ok(CheckpointResult {
1278 success: false,
1279 checkpoint_id,
1280 epoch,
1281 duration,
1282 error: Some(format!("manifest persist failed: {e}")),
1283 });
1284 }
1285
1286 self.phase = CheckpointPhase::Committing;
1288 let sink_statuses = self.commit_sinks_tracked(epoch).await;
1289 let has_failures = sink_statuses
1290 .values()
1291 .any(|s| matches!(s, SinkCommitStatus::Failed(_)));
1292
1293 if !sink_statuses.is_empty() {
1295 manifest.sink_commit_statuses = sink_statuses;
1296 if let Err(e) = self.update_manifest_only(&manifest).await {
1297 warn!(
1298 checkpoint_id,
1299 epoch,
1300 error = %e,
1301 "post-commit manifest update failed"
1302 );
1303 }
1304 }
1305
1306 if has_failures {
1307 self.checkpoints_failed += 1;
1308 error!(
1309 checkpoint_id,
1310 epoch, "sink commit partially failed — epoch NOT advanced, will retry"
1311 );
1312 self.phase = CheckpointPhase::Idle;
1313 let duration = start.elapsed();
1314 self.emit_checkpoint_metrics(false, epoch, duration);
1315 return Ok(CheckpointResult {
1316 success: false,
1317 checkpoint_id,
1318 epoch,
1319 duration,
1320 error: Some("partial sink commit failure".into()),
1321 });
1322 }
1323
1324 for drainer in &mut self.changelog_drainers {
1329 drainer.clear_pending();
1330 }
1331
1332 self.phase = CheckpointPhase::Idle;
1334 self.next_checkpoint_id += 1;
1335 self.epoch += 1;
1336 self.checkpoints_completed += 1;
1337 self.total_bytes_written += checkpoint_bytes;
1338 let duration = start.elapsed();
1339 self.last_checkpoint_duration = Some(duration);
1340 self.duration_histogram.record(duration);
1341 self.emit_checkpoint_metrics(true, epoch, duration);
1342 self.adjust_interval();
1343
1344 let next_epoch = self.epoch;
1346 let begin_epoch_error = match self.begin_epoch_for_sinks(next_epoch).await {
1347 Ok(()) => None,
1348 Err(e) => {
1349 error!(
1350 next_epoch,
1351 error = %e,
1352 "[LDB-6015] failed to begin next epoch — writes will be non-transactional"
1353 );
1354 Some(e.to_string())
1355 }
1356 };
1357
1358 info!(
1359 checkpoint_id,
1360 epoch,
1361 duration_ms = duration.as_millis(),
1362 "checkpoint completed"
1363 );
1364
1365 Ok(CheckpointResult {
1369 success: true,
1370 checkpoint_id,
1371 epoch,
1372 duration,
1373 error: begin_epoch_error,
1374 })
1375 }
1376
1377 pub async fn recover(
1390 &mut self,
1391 ) -> Result<Option<crate::recovery_manager::RecoveredState>, DbError> {
1392 use crate::recovery_manager::RecoveryManager;
1393
1394 let mgr = RecoveryManager::new(&*self.store);
1395 let result = mgr
1396 .recover(&self.sources, &self.sinks, &self.table_sources)
1397 .await?;
1398
1399 if let Some(ref recovered) = result {
1400 self.epoch = recovered.epoch() + 1;
1402 self.next_checkpoint_id = recovered.manifest.checkpoint_id + 1;
1403 info!(
1404 epoch = self.epoch,
1405 checkpoint_id = self.next_checkpoint_id,
1406 "coordinator epoch set after recovery"
1407 );
1408 }
1409
1410 Ok(result)
1411 }
1412
1413 pub fn load_latest_manifest(&self) -> Result<Option<CheckpointManifest>, DbError> {
1419 self.store
1420 .load_latest()
1421 .map_err(|e| DbError::Checkpoint(format!("failed to load latest manifest: {e}")))
1422 }
1423}
1424
1425impl std::fmt::Debug for CheckpointCoordinator {
1426 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1427 f.debug_struct("CheckpointCoordinator")
1428 .field("epoch", &self.epoch)
1429 .field("next_checkpoint_id", &self.next_checkpoint_id)
1430 .field("phase", &self.phase)
1431 .field("sources", &self.sources.len())
1432 .field("sinks", &self.sinks.len())
1433 .field("has_wal_manager", &self.wal_manager.is_some())
1434 .field("changelog_drainers", &self.changelog_drainers.len())
1435 .field("completed", &self.checkpoints_completed)
1436 .field("failed", &self.checkpoints_failed)
1437 .finish_non_exhaustive()
1438 }
1439}
1440
1441#[derive(Clone)]
1446pub struct DurationHistogram {
1447 samples: Box<[u64; Self::CAPACITY]>,
1449 cursor: usize,
1451 count: u64,
1453}
1454
1455impl DurationHistogram {
1456 const CAPACITY: usize = 100;
1457
1458 #[must_use]
1460 fn new() -> Self {
1461 Self {
1462 samples: Box::new([0; Self::CAPACITY]),
1463 cursor: 0,
1464 count: 0,
1465 }
1466 }
1467
1468 fn record(&mut self, duration: Duration) {
1470 #[allow(clippy::cast_possible_truncation)]
1471 let ms = duration.as_millis() as u64;
1472 self.samples[self.cursor] = ms;
1473 self.cursor = (self.cursor + 1) % Self::CAPACITY;
1474 self.count += 1;
1475 }
1476
1477 #[must_use]
1479 fn len(&self) -> usize {
1480 if self.count >= Self::CAPACITY as u64 {
1481 Self::CAPACITY
1482 } else {
1483 #[allow(clippy::cast_possible_truncation)]
1485 {
1486 self.count as usize
1487 }
1488 }
1489 }
1490
1491 #[must_use]
1495 fn percentile(&self, p: f64) -> u64 {
1496 let n = self.len();
1497 if n == 0 {
1498 return 0;
1499 }
1500 let mut sorted: Vec<u64> = self.samples[..n].to_vec();
1501 sorted.sort_unstable();
1502 #[allow(
1503 clippy::cast_possible_truncation,
1504 clippy::cast_sign_loss,
1505 clippy::cast_precision_loss
1506 )]
1507 let idx = ((p * (n as f64 - 1.0)).ceil() as usize).min(n - 1);
1508 sorted[idx]
1509 }
1510
1511 #[must_use]
1513 fn percentiles(&self) -> (u64, u64, u64) {
1514 (
1515 self.percentile(0.50),
1516 self.percentile(0.95),
1517 self.percentile(0.99),
1518 )
1519 }
1520}
1521
1522impl std::fmt::Debug for DurationHistogram {
1523 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1524 let (p50, p95, p99) = self.percentiles();
1525 f.debug_struct("DurationHistogram")
1526 .field("samples_len", &self.samples.len())
1527 .field("cursor", &self.cursor)
1528 .field("count", &self.count)
1529 .field("p50_ms", &p50)
1530 .field("p95_ms", &p95)
1531 .field("p99_ms", &p99)
1532 .finish()
1533 }
1534}
1535
1536#[derive(Debug, Clone, serde::Serialize)]
1538pub struct CheckpointStats {
1539 pub completed: u64,
1541 pub failed: u64,
1543 pub last_duration: Option<Duration>,
1545 pub duration_p50_ms: u64,
1547 pub duration_p95_ms: u64,
1549 pub duration_p99_ms: u64,
1551 pub total_bytes_written: u64,
1553 pub current_phase: CheckpointPhase,
1555 pub current_epoch: u64,
1557 pub unaligned_checkpoint_count: u64,
1559}
1560
1561#[must_use]
1565pub fn source_to_connector_checkpoint(cp: &SourceCheckpoint) -> ConnectorCheckpoint {
1566 ConnectorCheckpoint {
1567 offsets: cp.offsets().clone(),
1568 epoch: cp.epoch(),
1569 metadata: cp.metadata().clone(),
1570 }
1571}
1572
1573#[must_use]
1575pub fn connector_to_source_checkpoint(cp: &ConnectorCheckpoint) -> SourceCheckpoint {
1576 let mut source_cp = SourceCheckpoint::with_offsets(cp.epoch, cp.offsets.clone());
1577 for (k, v) in &cp.metadata {
1578 source_cp.set_metadata(k.clone(), v.clone());
1579 }
1580 source_cp
1581}
1582
1583#[must_use]
1589pub fn dag_snapshot_to_manifest_operators<S: std::hash::BuildHasher>(
1590 node_states: &std::collections::HashMap<
1591 u32,
1592 laminar_core::dag::recovery::SerializableOperatorState,
1593 S,
1594 >,
1595) -> HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint> {
1596 node_states
1597 .iter()
1598 .map(|(id, state)| {
1599 (
1600 id.to_string(),
1601 laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&state.data),
1602 )
1603 })
1604 .collect()
1605}
1606
1607#[must_use]
1611pub fn manifest_operators_to_dag_states<S: std::hash::BuildHasher>(
1612 operators: &HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint, S>,
1613) -> rustc_hash::FxHashMap<laminar_core::dag::topology::NodeId, laminar_core::operator::OperatorState>
1614{
1615 let mut states =
1616 rustc_hash::FxHashMap::with_capacity_and_hasher(operators.len(), rustc_hash::FxBuildHasher);
1617 for (key, op_ckpt) in operators {
1618 if let Ok(node_id) = key.parse::<u32>() {
1619 if let Some(data) = op_ckpt.decode_inline() {
1620 states.insert(
1621 laminar_core::dag::topology::NodeId(node_id),
1622 laminar_core::operator::OperatorState {
1623 operator_id: key.clone(),
1624 data,
1625 },
1626 );
1627 }
1628 }
1629 }
1630 states
1631}
1632
1633#[cfg(test)]
1634mod tests {
1635 use super::*;
1636 use laminar_storage::checkpoint_store::FileSystemCheckpointStore;
1637
1638 fn make_coordinator(dir: &std::path::Path) -> CheckpointCoordinator {
1639 let store = Box::new(FileSystemCheckpointStore::new(dir, 3));
1640 CheckpointCoordinator::new(CheckpointConfig::default(), store)
1641 }
1642
1643 #[test]
1644 fn test_coordinator_new() {
1645 let dir = tempfile::tempdir().unwrap();
1646 let coord = make_coordinator(dir.path());
1647
1648 assert_eq!(coord.epoch(), 1);
1649 assert_eq!(coord.next_checkpoint_id(), 1);
1650 assert_eq!(coord.phase(), CheckpointPhase::Idle);
1651 }
1652
1653 #[test]
1654 fn test_coordinator_resumes_from_stored_checkpoint() {
1655 let dir = tempfile::tempdir().unwrap();
1656
1657 let store = FileSystemCheckpointStore::new(dir.path(), 3);
1659 let m = CheckpointManifest::new(5, 10);
1660 store.save(&m).unwrap();
1661
1662 let coord = make_coordinator(dir.path());
1664 assert_eq!(coord.epoch(), 11);
1665 assert_eq!(coord.next_checkpoint_id(), 6);
1666 }
1667
1668 #[test]
1669 fn test_checkpoint_phase_display() {
1670 assert_eq!(CheckpointPhase::Idle.to_string(), "Idle");
1671 assert_eq!(
1672 CheckpointPhase::BarrierInFlight.to_string(),
1673 "BarrierInFlight"
1674 );
1675 assert_eq!(CheckpointPhase::Snapshotting.to_string(), "Snapshotting");
1676 assert_eq!(CheckpointPhase::PreCommitting.to_string(), "PreCommitting");
1677 assert_eq!(CheckpointPhase::Persisting.to_string(), "Persisting");
1678 assert_eq!(CheckpointPhase::Committing.to_string(), "Committing");
1679 }
1680
1681 #[test]
1682 fn test_source_to_connector_checkpoint() {
1683 let mut cp = SourceCheckpoint::new(5);
1684 cp.set_offset("partition-0", "1234");
1685 cp.set_metadata("topic", "events");
1686
1687 let cc = source_to_connector_checkpoint(&cp);
1688 assert_eq!(cc.epoch, 5);
1689 assert_eq!(cc.offsets.get("partition-0"), Some(&"1234".into()));
1690 assert_eq!(cc.metadata.get("topic"), Some(&"events".into()));
1691 }
1692
1693 #[test]
1694 fn test_connector_to_source_checkpoint() {
1695 let cc = ConnectorCheckpoint {
1696 offsets: HashMap::from([("lsn".into(), "0/ABCD".into())]),
1697 epoch: 3,
1698 metadata: HashMap::from([("type".into(), "postgres".into())]),
1699 };
1700
1701 let cp = connector_to_source_checkpoint(&cc);
1702 assert_eq!(cp.epoch(), 3);
1703 assert_eq!(cp.get_offset("lsn"), Some("0/ABCD"));
1704 assert_eq!(cp.get_metadata("type"), Some("postgres"));
1705 }
1706
1707 #[test]
1708 fn test_stats_initial() {
1709 let dir = tempfile::tempdir().unwrap();
1710 let coord = make_coordinator(dir.path());
1711 let stats = coord.stats();
1712
1713 assert_eq!(stats.completed, 0);
1714 assert_eq!(stats.failed, 0);
1715 assert!(stats.last_duration.is_none());
1716 assert_eq!(stats.duration_p50_ms, 0);
1717 assert_eq!(stats.duration_p95_ms, 0);
1718 assert_eq!(stats.duration_p99_ms, 0);
1719 assert_eq!(stats.current_phase, CheckpointPhase::Idle);
1720 }
1721
1722 #[tokio::test]
1723 async fn test_checkpoint_no_sources_no_sinks() {
1724 let dir = tempfile::tempdir().unwrap();
1725 let mut coord = make_coordinator(dir.path());
1726
1727 let result = coord
1728 .checkpoint(HashMap::new(), Some(1000), None, HashMap::new(), None)
1729 .await
1730 .unwrap();
1731
1732 assert!(result.success);
1733 assert_eq!(result.checkpoint_id, 1);
1734 assert_eq!(result.epoch, 1);
1735
1736 let loaded = coord.store().load_latest().unwrap().unwrap();
1738 assert_eq!(loaded.checkpoint_id, 1);
1739 assert_eq!(loaded.epoch, 1);
1740 assert_eq!(loaded.watermark, Some(1000));
1741
1742 let result2 = coord
1744 .checkpoint(HashMap::new(), Some(2000), None, HashMap::new(), None)
1745 .await
1746 .unwrap();
1747
1748 assert!(result2.success);
1749 assert_eq!(result2.checkpoint_id, 2);
1750 assert_eq!(result2.epoch, 2);
1751
1752 let stats = coord.stats();
1753 assert_eq!(stats.completed, 2);
1754 assert_eq!(stats.failed, 0);
1755 }
1756
1757 #[tokio::test]
1758 async fn test_checkpoint_with_operator_states() {
1759 let dir = tempfile::tempdir().unwrap();
1760 let mut coord = make_coordinator(dir.path());
1761
1762 let mut ops = HashMap::new();
1763 ops.insert("window-agg".into(), b"state-data".to_vec());
1764 ops.insert("filter".into(), b"filter-state".to_vec());
1765
1766 let result = coord
1767 .checkpoint(ops, None, None, HashMap::new(), None)
1768 .await
1769 .unwrap();
1770
1771 assert!(result.success);
1772
1773 let loaded = coord.store().load_latest().unwrap().unwrap();
1774 assert_eq!(loaded.operator_states.len(), 2);
1775 assert!(loaded.per_core_wal_positions.is_empty());
1777
1778 let window_op = loaded.operator_states.get("window-agg").unwrap();
1779 assert_eq!(window_op.decode_inline().unwrap(), b"state-data");
1780 }
1781
1782 #[tokio::test]
1783 async fn test_checkpoint_with_table_store_path() {
1784 let dir = tempfile::tempdir().unwrap();
1785 let mut coord = make_coordinator(dir.path());
1786
1787 let result = coord
1788 .checkpoint(
1789 HashMap::new(),
1790 None,
1791 Some("/tmp/rocksdb_cp".into()),
1792 HashMap::new(),
1793 None,
1794 )
1795 .await
1796 .unwrap();
1797
1798 assert!(result.success);
1799
1800 let loaded = coord.store().load_latest().unwrap().unwrap();
1801 assert_eq!(
1802 loaded.table_store_checkpoint_path.as_deref(),
1803 Some("/tmp/rocksdb_cp")
1804 );
1805 }
1806
1807 #[test]
1808 fn test_load_latest_manifest_empty() {
1809 let dir = tempfile::tempdir().unwrap();
1810 let coord = make_coordinator(dir.path());
1811 assert!(coord.load_latest_manifest().unwrap().is_none());
1812 }
1813
1814 #[test]
1815 fn test_coordinator_debug() {
1816 let dir = tempfile::tempdir().unwrap();
1817 let coord = make_coordinator(dir.path());
1818 let debug = format!("{coord:?}");
1819 assert!(debug.contains("CheckpointCoordinator"));
1820 assert!(debug.contains("epoch: 1"));
1821 }
1822
1823 #[test]
1826 fn test_dag_snapshot_to_manifest_operators() {
1827 use laminar_core::dag::recovery::SerializableOperatorState;
1828
1829 let mut node_states = HashMap::new();
1830 node_states.insert(
1831 0,
1832 SerializableOperatorState {
1833 operator_id: "window-agg".into(),
1834 data: b"window-state".to_vec(),
1835 },
1836 );
1837 node_states.insert(
1838 3,
1839 SerializableOperatorState {
1840 operator_id: "filter".into(),
1841 data: b"filter-state".to_vec(),
1842 },
1843 );
1844
1845 let manifest_ops = dag_snapshot_to_manifest_operators(&node_states);
1846 assert_eq!(manifest_ops.len(), 2);
1847
1848 let w = manifest_ops.get("0").unwrap();
1849 assert_eq!(w.decode_inline().unwrap(), b"window-state");
1850 let f = manifest_ops.get("3").unwrap();
1851 assert_eq!(f.decode_inline().unwrap(), b"filter-state");
1852 }
1853
1854 #[test]
1855 fn test_manifest_operators_to_dag_states() {
1856 use laminar_storage::checkpoint_manifest::OperatorCheckpoint;
1857
1858 let mut operators = HashMap::new();
1859 operators.insert("0".into(), OperatorCheckpoint::inline(b"state-0"));
1860 operators.insert("5".into(), OperatorCheckpoint::inline(b"state-5"));
1861
1862 let dag_states = manifest_operators_to_dag_states(&operators);
1863 assert_eq!(dag_states.len(), 2);
1864
1865 let s0 = dag_states
1866 .get(&laminar_core::dag::topology::NodeId(0))
1867 .unwrap();
1868 assert_eq!(s0.data, b"state-0");
1869
1870 let s5 = dag_states
1871 .get(&laminar_core::dag::topology::NodeId(5))
1872 .unwrap();
1873 assert_eq!(s5.data, b"state-5");
1874 }
1875
1876 #[test]
1877 fn test_operator_state_round_trip_through_manifest() {
1878 use laminar_core::dag::recovery::SerializableOperatorState;
1879
1880 let mut node_states = HashMap::new();
1882 node_states.insert(
1883 7,
1884 SerializableOperatorState {
1885 operator_id: "join".into(),
1886 data: vec![1, 2, 3, 4, 5],
1887 },
1888 );
1889
1890 let manifest_ops = dag_snapshot_to_manifest_operators(&node_states);
1892
1893 let json = serde_json::to_string(&manifest_ops).unwrap();
1895 let reloaded: HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint> =
1896 serde_json::from_str(&json).unwrap();
1897
1898 let recovered = manifest_operators_to_dag_states(&reloaded);
1900 let state = recovered
1901 .get(&laminar_core::dag::topology::NodeId(7))
1902 .unwrap();
1903 assert_eq!(state.data, vec![1, 2, 3, 4, 5]);
1904 }
1905
1906 #[test]
1907 fn test_manifest_operators_skips_invalid_keys() {
1908 use laminar_storage::checkpoint_manifest::OperatorCheckpoint;
1909
1910 let mut operators = HashMap::new();
1911 operators.insert("not-a-number".into(), OperatorCheckpoint::inline(b"data"));
1912 operators.insert("42".into(), OperatorCheckpoint::inline(b"good"));
1913
1914 let dag_states = manifest_operators_to_dag_states(&operators);
1915 assert_eq!(dag_states.len(), 1);
1917 assert!(dag_states.contains_key(&laminar_core::dag::topology::NodeId(42)));
1918 }
1919
1920 #[test]
1923 fn test_prepare_wal_no_wal_manager() {
1924 let dir = tempfile::tempdir().unwrap();
1925 let mut coord = make_coordinator(dir.path());
1926
1927 let result = coord.prepare_wal_for_checkpoint().unwrap();
1929 assert!(result.per_core_wal_positions.is_empty());
1930 assert_eq!(result.entries_drained, 0);
1931 }
1932
1933 #[test]
1934 fn test_prepare_wal_with_manager() {
1935 let dir = tempfile::tempdir().unwrap();
1936 let mut coord = make_coordinator(dir.path());
1937
1938 let wal_dir = dir.path().join("wal");
1940 std::fs::create_dir_all(&wal_dir).unwrap();
1941 let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1942 let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1943 coord.register_wal_manager(wal);
1944
1945 coord
1947 .wal_manager_mut()
1948 .unwrap()
1949 .writer(0)
1950 .append_put(b"key", b"value")
1951 .unwrap();
1952
1953 let result = coord.prepare_wal_for_checkpoint().unwrap();
1954 assert_eq!(result.per_core_wal_positions.len(), 2);
1955 assert!(result.per_core_wal_positions.iter().any(|p| *p > 0));
1957 }
1958
1959 #[test]
1960 fn test_truncate_wal_no_manager() {
1961 let dir = tempfile::tempdir().unwrap();
1962 let mut coord = make_coordinator(dir.path());
1963
1964 coord.truncate_wal_after_checkpoint(vec![]).unwrap();
1966 }
1967
1968 #[test]
1969 fn test_truncate_wal_with_manager() {
1970 let dir = tempfile::tempdir().unwrap();
1971 let mut coord = make_coordinator(dir.path());
1972
1973 let wal_dir = dir.path().join("wal");
1974 std::fs::create_dir_all(&wal_dir).unwrap();
1975 let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
1976 let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
1977 coord.register_wal_manager(wal);
1978
1979 coord
1981 .wal_manager_mut()
1982 .unwrap()
1983 .writer(0)
1984 .append_put(b"key", b"value")
1985 .unwrap();
1986
1987 assert!(coord.wal_manager().unwrap().total_size() > 0);
1988
1989 coord.truncate_wal_after_checkpoint(vec![]).unwrap();
1991 assert_eq!(coord.wal_manager().unwrap().total_size(), 0);
1992 }
1993
1994 #[test]
1995 fn test_truncate_wal_safety_buffer() {
1996 let dir = tempfile::tempdir().unwrap();
1997 let mut coord = make_coordinator(dir.path());
1998
1999 let wal_dir = dir.path().join("wal");
2000 std::fs::create_dir_all(&wal_dir).unwrap();
2001 let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
2002 let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
2003 coord.register_wal_manager(wal);
2004
2005 coord
2007 .wal_manager_mut()
2008 .unwrap()
2009 .writer(0)
2010 .append_put(b"k1", b"v1")
2011 .unwrap();
2012
2013 coord.truncate_wal_after_checkpoint(vec![100, 200]).unwrap();
2015 assert_eq!(coord.wal_manager().unwrap().total_size(), 0);
2016
2017 coord
2019 .wal_manager_mut()
2020 .unwrap()
2021 .writer(0)
2022 .append_put(b"k2", b"v2")
2023 .unwrap();
2024 let size_after_write = coord.wal_manager().unwrap().total_size();
2025 assert!(size_after_write > 0);
2026
2027 coord.truncate_wal_after_checkpoint(vec![300, 400]).unwrap();
2031 assert!(coord.wal_manager().unwrap().total_size() > 0);
2034 }
2035
2036 #[test]
2037 fn test_prepare_wal_with_changelog_drainer() {
2038 use laminar_storage::incremental::StateChangelogBuffer;
2039 use laminar_storage::incremental::StateChangelogEntry;
2040 use std::sync::Arc;
2041
2042 let dir = tempfile::tempdir().unwrap();
2043 let mut coord = make_coordinator(dir.path());
2044
2045 let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
2047
2048 buf.push(StateChangelogEntry::put(1, 100, 0, 10));
2050 buf.push(StateChangelogEntry::put(1, 200, 10, 20));
2051 buf.push(StateChangelogEntry::delete(1, 300));
2052
2053 let drainer = ChangelogDrainer::new(buf, 100);
2054 coord.register_changelog_drainer(drainer);
2055
2056 let result = coord.prepare_wal_for_checkpoint().unwrap();
2057 assert_eq!(result.entries_drained, 3);
2058 assert!(result.per_core_wal_positions.is_empty()); assert_eq!(coord.changelog_drainers()[0].pending_count(), 3);
2062 }
2063
2064 #[tokio::test]
2065 async fn test_full_checkpoint_with_wal_coordination() {
2066 use laminar_storage::incremental::StateChangelogBuffer;
2067 use laminar_storage::incremental::StateChangelogEntry;
2068 use std::sync::Arc;
2069
2070 let dir = tempfile::tempdir().unwrap();
2071 let mut coord = make_coordinator(dir.path());
2072
2073 let wal_dir = dir.path().join("wal");
2075 std::fs::create_dir_all(&wal_dir).unwrap();
2076 let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
2077 let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
2078 coord.register_wal_manager(wal);
2079
2080 let buf = Arc::new(StateChangelogBuffer::with_capacity(64));
2082 buf.push(StateChangelogEntry::put(1, 100, 0, 10));
2083 let drainer = ChangelogDrainer::new(buf, 100);
2084 coord.register_changelog_drainer(drainer);
2085
2086 let result = coord
2088 .checkpoint(HashMap::new(), Some(5000), None, HashMap::new(), None)
2089 .await
2090 .unwrap();
2091
2092 assert!(result.success);
2093
2094 assert_eq!(
2096 coord.changelog_drainers()[0].pending_count(),
2097 0,
2098 "pending entries should be cleared after checkpoint"
2099 );
2100
2101 let loaded = coord.store().load_latest().unwrap().unwrap();
2103 assert_eq!(loaded.per_core_wal_positions.len(), 2);
2104
2105 coord.truncate_wal_after_checkpoint(vec![]).unwrap();
2107 assert_eq!(coord.wal_manager().unwrap().total_size(), 0);
2108 }
2109
2110 #[test]
2111 fn test_wal_manager_accessors() {
2112 let dir = tempfile::tempdir().unwrap();
2113 let mut coord = make_coordinator(dir.path());
2114
2115 assert!(coord.wal_manager().is_none());
2116 assert!(coord.wal_manager_mut().is_none());
2117 assert!(coord.changelog_drainers().is_empty());
2118
2119 let wal_dir = dir.path().join("wal");
2120 std::fs::create_dir_all(&wal_dir).unwrap();
2121 let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
2122 let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
2123 coord.register_wal_manager(wal);
2124
2125 assert!(coord.wal_manager().is_some());
2126 assert!(coord.wal_manager_mut().is_some());
2127 }
2128
2129 #[test]
2130 fn test_coordinator_debug_with_wal() {
2131 let dir = tempfile::tempdir().unwrap();
2132 let mut coord = make_coordinator(dir.path());
2133
2134 let wal_dir = dir.path().join("wal");
2135 std::fs::create_dir_all(&wal_dir).unwrap();
2136 let wal_config = laminar_storage::per_core_wal::PerCoreWalConfig::new(&wal_dir, 2);
2137 let wal = laminar_storage::per_core_wal::PerCoreWalManager::new(wal_config).unwrap();
2138 coord.register_wal_manager(wal);
2139
2140 let debug = format!("{coord:?}");
2141 assert!(debug.contains("has_wal_manager: true"));
2142 assert!(debug.contains("changelog_drainers: 0"));
2143 }
2144
2145 #[tokio::test]
2148 async fn test_checkpoint_emits_metrics_on_success() {
2149 use crate::metrics::PipelineCounters;
2150 use std::sync::atomic::Ordering;
2151
2152 let dir = tempfile::tempdir().unwrap();
2153 let mut coord = make_coordinator(dir.path());
2154
2155 let counters = Arc::new(PipelineCounters::new());
2156 coord.set_counters(Arc::clone(&counters));
2157
2158 let result = coord
2159 .checkpoint(HashMap::new(), Some(1000), None, HashMap::new(), None)
2160 .await
2161 .unwrap();
2162
2163 assert!(result.success);
2164 assert_eq!(counters.checkpoints_completed.load(Ordering::Relaxed), 1);
2165 assert_eq!(counters.checkpoints_failed.load(Ordering::Relaxed), 0);
2166 assert!(counters.last_checkpoint_duration_ms.load(Ordering::Relaxed) < 5000);
2167 assert_eq!(counters.checkpoint_epoch.load(Ordering::Relaxed), 1);
2168
2169 let result2 = coord
2171 .checkpoint(HashMap::new(), Some(2000), None, HashMap::new(), None)
2172 .await
2173 .unwrap();
2174
2175 assert!(result2.success);
2176 assert_eq!(counters.checkpoints_completed.load(Ordering::Relaxed), 2);
2177 assert_eq!(counters.checkpoint_epoch.load(Ordering::Relaxed), 2);
2178 }
2179
2180 #[tokio::test]
2181 async fn test_checkpoint_without_counters() {
2182 let dir = tempfile::tempdir().unwrap();
2184 let mut coord = make_coordinator(dir.path());
2185
2186 let result = coord
2187 .checkpoint(HashMap::new(), None, None, HashMap::new(), None)
2188 .await
2189 .unwrap();
2190
2191 assert!(result.success);
2192 }
2194
2195 #[test]
2198 fn test_histogram_empty() {
2199 let h = DurationHistogram::new();
2200 assert_eq!(h.len(), 0);
2201 assert_eq!(h.percentile(0.50), 0);
2202 assert_eq!(h.percentile(0.99), 0);
2203 let (p50, p95, p99) = h.percentiles();
2204 assert_eq!((p50, p95, p99), (0, 0, 0));
2205 }
2206
2207 #[test]
2208 fn test_histogram_single_sample() {
2209 let mut h = DurationHistogram::new();
2210 h.record(Duration::from_millis(42));
2211 assert_eq!(h.len(), 1);
2212 assert_eq!(h.percentile(0.50), 42);
2213 assert_eq!(h.percentile(0.99), 42);
2214 }
2215
2216 #[test]
2217 fn test_histogram_percentiles() {
2218 let mut h = DurationHistogram::new();
2219 for i in 1..=100 {
2221 h.record(Duration::from_millis(i));
2222 }
2223 assert_eq!(h.len(), 100);
2224
2225 let p50 = h.percentile(0.50);
2226 let p95 = h.percentile(0.95);
2227 let p99 = h.percentile(0.99);
2228
2229 assert!((49..=51).contains(&p50), "p50={p50}");
2232 assert!((94..=96).contains(&p95), "p95={p95}");
2233 assert!((98..=100).contains(&p99), "p99={p99}");
2234 }
2235
2236 #[test]
2237 fn test_histogram_wraps_ring_buffer() {
2238 let mut h = DurationHistogram::new();
2239 for i in 1..=150 {
2241 h.record(Duration::from_millis(i));
2242 }
2243 assert_eq!(h.len(), 100);
2244 assert_eq!(h.count, 150);
2245
2246 let p50 = h.percentile(0.50);
2248 assert!((99..=101).contains(&p50), "p50={p50}");
2249 }
2250
2251 #[tokio::test]
2254 async fn test_sidecar_round_trip() {
2255 let dir = tempfile::tempdir().unwrap();
2256 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2257 let config = CheckpointConfig {
2258 state_inline_threshold: 100, ..CheckpointConfig::default()
2260 };
2261 let mut coord = CheckpointCoordinator::new(config, store);
2262
2263 let mut ops = HashMap::new();
2265 ops.insert("small".into(), vec![0xAAu8; 50]);
2266 ops.insert("large".into(), vec![0xBBu8; 200]);
2267
2268 let result = coord
2269 .checkpoint(ops, None, None, HashMap::new(), None)
2270 .await
2271 .unwrap();
2272 assert!(result.success);
2273
2274 let loaded = coord.store().load_latest().unwrap().unwrap();
2276 let small_op = loaded.operator_states.get("small").unwrap();
2277 assert!(!small_op.external, "small state should be inline");
2278 assert_eq!(small_op.decode_inline().unwrap(), vec![0xAAu8; 50]);
2279
2280 let large_op = loaded.operator_states.get("large").unwrap();
2281 assert!(large_op.external, "large state should be external");
2282 assert_eq!(large_op.external_length, 200);
2283
2284 let state_data = coord.store().load_state_data(1).unwrap().unwrap();
2286 assert_eq!(state_data.len(), 200);
2287 assert!(state_data.iter().all(|&b| b == 0xBB));
2288 }
2289
2290 #[tokio::test]
2291 async fn test_all_inline_no_sidecar() {
2292 let dir = tempfile::tempdir().unwrap();
2293 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2294 let config = CheckpointConfig::default(); let mut coord = CheckpointCoordinator::new(config, store);
2296
2297 let mut ops = HashMap::new();
2298 ops.insert("op1".into(), b"small-state".to_vec());
2299
2300 let result = coord
2301 .checkpoint(ops, None, None, HashMap::new(), None)
2302 .await
2303 .unwrap();
2304 assert!(result.success);
2305
2306 assert!(coord.store().load_state_data(1).unwrap().is_none());
2308 }
2309
2310 #[test]
2313 fn test_adaptive_disabled_by_default() {
2314 let dir = tempfile::tempdir().unwrap();
2315 let coord = make_coordinator(dir.path());
2316 assert!(coord.config().adaptive.is_none());
2317 assert_eq!(coord.config().interval, Some(Duration::from_secs(60)));
2318 }
2319
2320 #[test]
2321 fn test_adaptive_increases_interval_for_slow_checkpoints() {
2322 let dir = tempfile::tempdir().unwrap();
2323 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2324 let config = CheckpointConfig {
2325 adaptive: Some(AdaptiveCheckpointConfig::default()),
2326 ..CheckpointConfig::default()
2327 };
2328 let mut coord = CheckpointCoordinator::new(config, store);
2329
2330 coord.last_checkpoint_duration = Some(Duration::from_secs(5));
2332 coord.adjust_interval();
2333
2334 let interval = coord.config().interval.unwrap();
2336 assert!(
2337 interval >= Duration::from_secs(49) && interval <= Duration::from_secs(51),
2338 "expected ~50s, got {interval:?}",
2339 );
2340 }
2341
2342 #[test]
2343 fn test_adaptive_decreases_interval_for_fast_checkpoints() {
2344 let dir = tempfile::tempdir().unwrap();
2345 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2346 let config = CheckpointConfig {
2347 adaptive: Some(AdaptiveCheckpointConfig::default()),
2348 ..CheckpointConfig::default()
2349 };
2350 let mut coord = CheckpointCoordinator::new(config, store);
2351
2352 coord.last_checkpoint_duration = Some(Duration::from_millis(100));
2354 coord.adjust_interval();
2355
2356 let interval = coord.config().interval.unwrap();
2357 assert_eq!(
2358 interval,
2359 Duration::from_secs(10),
2360 "should clamp to min_interval"
2361 );
2362 }
2363
2364 #[test]
2365 fn test_adaptive_clamps_to_min_max() {
2366 let dir = tempfile::tempdir().unwrap();
2367 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2368 let config = CheckpointConfig {
2369 adaptive: Some(AdaptiveCheckpointConfig {
2370 min_interval: Duration::from_secs(20),
2371 max_interval: Duration::from_secs(120),
2372 target_overhead_ratio: 0.1,
2373 smoothing_alpha: 1.0, }),
2375 ..CheckpointConfig::default()
2376 };
2377 let mut coord = CheckpointCoordinator::new(config, store);
2378
2379 coord.last_checkpoint_duration = Some(Duration::from_secs(60));
2381 coord.adjust_interval();
2382 let interval = coord.config().interval.unwrap();
2383 assert_eq!(interval, Duration::from_secs(120), "should clamp to max");
2384
2385 coord.last_checkpoint_duration = Some(Duration::from_millis(10));
2387 coord.smoothed_duration_ms = 0.0; coord.adjust_interval();
2389 let interval = coord.config().interval.unwrap();
2390 assert_eq!(interval, Duration::from_secs(20), "should clamp to min");
2391 }
2392
2393 #[test]
2394 fn test_adaptive_ema_smoothing() {
2395 let dir = tempfile::tempdir().unwrap();
2396 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2397 let config = CheckpointConfig {
2398 adaptive: Some(AdaptiveCheckpointConfig {
2399 min_interval: Duration::from_secs(1),
2400 max_interval: Duration::from_secs(600),
2401 target_overhead_ratio: 0.1,
2402 smoothing_alpha: 0.5,
2403 }),
2404 ..CheckpointConfig::default()
2405 };
2406 let mut coord = CheckpointCoordinator::new(config, store);
2407
2408 coord.last_checkpoint_duration = Some(Duration::from_millis(1000));
2410 coord.adjust_interval();
2411 assert!((coord.smoothed_duration_ms() - 1000.0).abs() < 1.0);
2412
2413 coord.last_checkpoint_duration = Some(Duration::from_millis(2000));
2415 coord.adjust_interval();
2416 assert!((coord.smoothed_duration_ms() - 1500.0).abs() < 1.0);
2417
2418 coord.last_checkpoint_duration = Some(Duration::from_millis(2000));
2420 coord.adjust_interval();
2421 assert!((coord.smoothed_duration_ms() - 1750.0).abs() < 1.0);
2422 }
2423
2424 #[tokio::test]
2425 async fn test_stats_include_percentiles_after_checkpoints() {
2426 let dir = tempfile::tempdir().unwrap();
2427 let mut coord = make_coordinator(dir.path());
2428
2429 for _ in 0..3 {
2431 let result = coord
2432 .checkpoint(HashMap::new(), None, None, HashMap::new(), None)
2433 .await
2434 .unwrap();
2435 assert!(result.success);
2436 }
2437
2438 let stats = coord.stats();
2439 assert_eq!(stats.completed, 3);
2440 assert!(stats.last_duration.is_some());
2443 }
2444}