1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
8use std::fs;
9use std::path::{Path, PathBuf};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use rustc_hash::FxHashMap;
14use serde::{Deserialize, Serialize};
15use tracing::{debug, info};
16
17use super::error::IncrementalCheckpointError;
18
19#[derive(Debug, Clone)]
21pub struct CheckpointConfig {
22 pub checkpoint_dir: PathBuf,
24 pub wal_path: Option<PathBuf>,
26 pub interval: Duration,
28 pub max_retained: usize,
30 pub truncate_wal: bool,
32 pub min_wal_size_for_checkpoint: u64,
34 pub incremental: bool,
36}
37
38impl CheckpointConfig {
39 #[must_use]
41 pub fn new(checkpoint_dir: &Path) -> Self {
42 Self {
43 checkpoint_dir: checkpoint_dir.to_path_buf(),
44 wal_path: None,
45 interval: Duration::from_secs(60),
46 max_retained: 3,
47 truncate_wal: true,
48 min_wal_size_for_checkpoint: 64 * 1024 * 1024, incremental: true,
50 }
51 }
52
53 #[must_use]
55 pub fn with_wal_path(mut self, path: &Path) -> Self {
56 self.wal_path = Some(path.to_path_buf());
57 self
58 }
59
60 #[must_use]
62 pub fn with_interval(mut self, interval: Duration) -> Self {
63 self.interval = interval;
64 self
65 }
66
67 #[must_use]
69 pub fn with_max_retained(mut self, max: usize) -> Self {
70 self.max_retained = max;
71 self
72 }
73
74 #[must_use]
76 pub fn with_truncate_wal(mut self, enabled: bool) -> Self {
77 self.truncate_wal = enabled;
78 self
79 }
80
81 #[must_use]
83 pub fn with_min_wal_size(mut self, size: u64) -> Self {
84 self.min_wal_size_for_checkpoint = size;
85 self
86 }
87
88 #[must_use]
90 pub fn with_incremental(mut self, enabled: bool) -> Self {
91 self.incremental = enabled;
92 self
93 }
94
95 pub fn validate(&self) -> Result<(), IncrementalCheckpointError> {
101 if self.max_retained == 0 {
102 return Err(IncrementalCheckpointError::InvalidConfig(
103 "max_retained must be > 0".to_string(),
104 ));
105 }
106 if self.interval.is_zero() {
107 return Err(IncrementalCheckpointError::InvalidConfig(
108 "interval must be > 0".to_string(),
109 ));
110 }
111 Ok(())
112 }
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct IncrementalCheckpointMetadata {
118 pub id: u64,
120 pub epoch: u64,
122 pub timestamp: u64,
124 pub wal_position: u64,
126 pub source_offsets: HashMap<String, u64>,
128 pub watermark: Option<i64>,
130 pub size_bytes: u64,
132 pub key_count: u64,
134 pub is_incremental: bool,
136 pub parent_id: Option<u64>,
138 pub sst_files: Vec<String>,
140}
141
142impl IncrementalCheckpointMetadata {
143 #[must_use]
145 pub fn new(id: u64, epoch: u64) -> Self {
146 let timestamp = SystemTime::now()
147 .duration_since(UNIX_EPOCH)
148 .unwrap_or_default()
149 .as_secs();
150
151 Self {
152 id,
153 epoch,
154 timestamp,
155 wal_position: 0,
156 source_offsets: HashMap::new(),
157 watermark: None,
158 size_bytes: 0,
159 key_count: 0,
160 is_incremental: true,
161 parent_id: None,
162 sst_files: Vec::new(),
163 }
164 }
165
166 #[must_use]
168 pub fn checkpoint_path(&self, base_dir: &Path) -> PathBuf {
169 base_dir.join(format!("checkpoint_{:016x}", self.id))
170 }
171
172 pub fn to_json(&self) -> Result<String, IncrementalCheckpointError> {
178 serde_json::to_string_pretty(self)
179 .map_err(|e| IncrementalCheckpointError::Serialization(e.to_string()))
180 }
181
182 pub fn from_json(json: &str) -> Result<Self, IncrementalCheckpointError> {
188 serde_json::from_str(json)
189 .map_err(|e| IncrementalCheckpointError::Deserialization(e.to_string()))
190 }
191}
192
193pub struct IncrementalCheckpointManager {
198 config: CheckpointConfig,
200 next_id: AtomicU64,
202 current_epoch: AtomicU64,
204 last_checkpoint_time: AtomicU64,
206 latest_checkpoint_id: Option<u64>,
208 state: FxHashMap<Vec<u8>, Vec<u8>>,
210 source_offsets: HashMap<String, u64>,
212 watermark: Option<i64>,
214}
215
216impl IncrementalCheckpointManager {
217 pub fn new(config: CheckpointConfig) -> Result<Self, IncrementalCheckpointError> {
223 config.validate()?;
224
225 fs::create_dir_all(&config.checkpoint_dir)?;
227
228 let (next_id, latest_id) = Self::scan_checkpoints(&config.checkpoint_dir)?;
230
231 Ok(Self {
232 config,
233 next_id: AtomicU64::new(next_id),
234 current_epoch: AtomicU64::new(0),
235 last_checkpoint_time: AtomicU64::new(0),
236 latest_checkpoint_id: latest_id,
237 state: FxHashMap::default(),
238 source_offsets: HashMap::new(),
239 watermark: None,
240 })
241 }
242
243 fn scan_checkpoints(dir: &Path) -> Result<(u64, Option<u64>), IncrementalCheckpointError> {
250 let mut max_dir_id = 0u64;
251 let mut latest_valid_id = None;
252
253 if dir.exists() {
254 for entry in fs::read_dir(dir)? {
255 let entry = entry?;
256 let name = entry.file_name();
257 let name_str = name.to_string_lossy();
258
259 if let Some(id_str) = name_str.strip_prefix("checkpoint_") {
260 if let Ok(id) = u64::from_str_radix(id_str, 16) {
261 if id >= max_dir_id {
263 max_dir_id = id;
264 }
265 let metadata_path = dir.join(name_str.as_ref()).join("metadata.json");
269 if !metadata_path.exists() {
270 debug!(
271 checkpoint_id = id,
272 "skipping partial checkpoint dir (no metadata.json)"
273 );
274 continue;
275 }
276 if latest_valid_id.is_none_or(|prev| id >= prev) {
277 latest_valid_id = Some(id);
278 }
279 }
280 }
281 }
282 }
283
284 Ok((max_dir_id + 1, latest_valid_id))
285 }
286
287 #[must_use]
289 pub fn config(&self) -> &CheckpointConfig {
290 &self.config
291 }
292
293 pub fn set_epoch(&self, epoch: u64) {
295 self.current_epoch.store(epoch, Ordering::Release);
296 }
297
298 #[must_use]
300 pub fn epoch(&self) -> u64 {
301 self.current_epoch.load(Ordering::Acquire)
302 }
303
304 #[allow(clippy::unnecessary_wraps)]
310 pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), IncrementalCheckpointError> {
311 self.state.insert(key.to_vec(), value.to_vec());
312 Ok(())
313 }
314
315 #[allow(clippy::unnecessary_wraps)]
321 pub fn delete(&mut self, key: &[u8]) -> Result<(), IncrementalCheckpointError> {
322 self.state.remove(key);
323 Ok(())
324 }
325
326 pub fn set_source_offset(&mut self, source: String, offset: u64) {
328 self.source_offsets.insert(source, offset);
329 }
330
331 #[must_use]
333 pub fn source_offsets(&self) -> &HashMap<String, u64> {
334 &self.source_offsets
335 }
336
337 pub fn set_watermark(&mut self, watermark: i64) {
339 self.watermark = Some(watermark);
340 }
341
342 #[must_use]
344 pub fn watermark(&self) -> Option<i64> {
345 self.watermark
346 }
347
348 #[must_use]
350 pub fn latest_checkpoint_id(&self) -> Option<u64> {
351 self.latest_checkpoint_id
352 }
353
354 #[must_use]
356 pub fn should_checkpoint(&self) -> bool {
357 let last = self.last_checkpoint_time.load(Ordering::Relaxed);
358 let now = SystemTime::now()
359 .duration_since(UNIX_EPOCH)
360 .unwrap_or_default()
361 .as_secs();
362
363 now.saturating_sub(last) >= self.config.interval.as_secs()
364 }
365
366 pub fn create_checkpoint(
375 &mut self,
376 wal_position: u64,
377 ) -> Result<IncrementalCheckpointMetadata, IncrementalCheckpointError> {
378 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
379 let epoch = self.current_epoch.load(Ordering::Acquire);
380
381 let mut metadata = IncrementalCheckpointMetadata::new(id, epoch);
382 metadata.wal_position = wal_position;
383 metadata.parent_id = self.latest_checkpoint_id;
384 metadata.is_incremental = self.config.incremental && self.latest_checkpoint_id.is_some();
385
386 let checkpoint_path = metadata.checkpoint_path(&self.config.checkpoint_dir);
387
388 fs::create_dir_all(&checkpoint_path)?;
390
391 let metadata_path = checkpoint_path.join("metadata.json");
393 let metadata_json = metadata.to_json()?;
394 fs::write(&metadata_path, &metadata_json)?;
395
396 self.latest_checkpoint_id = Some(id);
398 self.last_checkpoint_time.store(
399 SystemTime::now()
400 .duration_since(UNIX_EPOCH)
401 .unwrap_or_default()
402 .as_secs(),
403 Ordering::Relaxed,
404 );
405
406 self.cleanup_old_checkpoints()?;
408
409 Ok(metadata)
410 }
411
412 pub fn create_checkpoint_with_state(
418 &mut self,
419 wal_position: u64,
420 source_offsets: HashMap<String, u64>,
421 watermark: Option<i64>,
422 state_data: &[u8],
423 ) -> Result<IncrementalCheckpointMetadata, IncrementalCheckpointError> {
424 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
425 let epoch = self.current_epoch.load(Ordering::Acquire);
426
427 let mut metadata = IncrementalCheckpointMetadata::new(id, epoch);
428 metadata.wal_position = wal_position;
429 metadata.source_offsets = source_offsets;
430 metadata.watermark = watermark;
431 metadata.parent_id = self.latest_checkpoint_id;
432 metadata.is_incremental = self.config.incremental && self.latest_checkpoint_id.is_some();
433
434 let checkpoint_path = metadata.checkpoint_path(&self.config.checkpoint_dir);
435 fs::create_dir_all(&checkpoint_path)?;
436
437 let state_path = checkpoint_path.join("state.bin");
439 fs::write(&state_path, state_data)?;
440
441 #[allow(clippy::cast_possible_truncation)]
442 {
444 metadata.size_bytes = state_data.len() as u64;
445 }
446
447 let metadata_path = checkpoint_path.join("metadata.json");
449 let metadata_json = metadata.to_json()?;
450 fs::write(&metadata_path, &metadata_json)?;
451
452 self.latest_checkpoint_id = Some(id);
454 self.last_checkpoint_time.store(
455 SystemTime::now()
456 .duration_since(UNIX_EPOCH)
457 .unwrap_or_default()
458 .as_secs(),
459 Ordering::Relaxed,
460 );
461
462 info!(
463 checkpoint_id = id,
464 epoch = epoch,
465 wal_position = wal_position,
466 size_bytes = metadata.size_bytes,
467 "Created checkpoint with state"
468 );
469
470 self.cleanup_old_checkpoints()?;
471
472 Ok(metadata)
473 }
474
475 pub fn find_latest_checkpoint(
481 &self,
482 ) -> Result<Option<IncrementalCheckpointMetadata>, IncrementalCheckpointError> {
483 let Some(id) = self.latest_checkpoint_id else {
484 return Ok(None);
485 };
486
487 self.load_checkpoint_metadata(id)
488 }
489
490 pub fn load_checkpoint_metadata(
496 &self,
497 id: u64,
498 ) -> Result<Option<IncrementalCheckpointMetadata>, IncrementalCheckpointError> {
499 let checkpoint_dir = self
500 .config
501 .checkpoint_dir
502 .join(format!("checkpoint_{id:016x}"));
503 let metadata_path = checkpoint_dir.join("metadata.json");
504
505 if !metadata_path.exists() {
506 return Ok(None);
507 }
508
509 let metadata_json = fs::read_to_string(&metadata_path)?;
510 let metadata = IncrementalCheckpointMetadata::from_json(&metadata_json)?;
511 Ok(Some(metadata))
512 }
513
514 pub fn load_checkpoint_state(&self, id: u64) -> Result<Vec<u8>, IncrementalCheckpointError> {
520 let checkpoint_dir = self
521 .config
522 .checkpoint_dir
523 .join(format!("checkpoint_{id:016x}"));
524 let state_path = checkpoint_dir.join("state.bin");
525
526 if !state_path.exists() {
527 return Err(IncrementalCheckpointError::NotFound(format!(
528 "State file not found for checkpoint {id}"
529 )));
530 }
531
532 Ok(fs::read(&state_path)?)
533 }
534
535 pub fn list_checkpoints(
541 &self,
542 ) -> Result<Vec<IncrementalCheckpointMetadata>, IncrementalCheckpointError> {
543 let mut checkpoints = Vec::new();
544
545 if !self.config.checkpoint_dir.exists() {
546 return Ok(checkpoints);
547 }
548
549 for entry in fs::read_dir(&self.config.checkpoint_dir)? {
550 let entry = entry?;
551 let name = entry.file_name();
552 let name_str = name.to_string_lossy();
553
554 if let Some(id_str) = name_str.strip_prefix("checkpoint_") {
555 if let Ok(id) = u64::from_str_radix(id_str, 16) {
556 if let Ok(Some(metadata)) = self.load_checkpoint_metadata(id) {
557 checkpoints.push(metadata);
558 }
559 }
560 }
561 }
562
563 checkpoints.sort_by(|a, b| b.id.cmp(&a.id));
565
566 Ok(checkpoints)
567 }
568
569 pub fn cleanup_old_checkpoints(&self) -> Result<(), IncrementalCheckpointError> {
575 self.cleanup_old_checkpoints_keep(self.config.max_retained)
576 }
577
578 pub fn cleanup_old_checkpoints_keep(
584 &self,
585 keep_count: usize,
586 ) -> Result<(), IncrementalCheckpointError> {
587 let checkpoints = self.list_checkpoints()?;
588
589 if checkpoints.len() <= keep_count {
590 return Ok(());
591 }
592
593 for checkpoint in checkpoints.iter().skip(keep_count) {
595 let checkpoint_dir = checkpoint.checkpoint_path(&self.config.checkpoint_dir);
596 if checkpoint_dir.exists() {
597 debug!(checkpoint_id = checkpoint.id, "Removing old checkpoint");
598 fs::remove_dir_all(&checkpoint_dir)?;
599 }
600 }
601
602 Ok(())
603 }
604
605 pub fn delete_checkpoint(&mut self, id: u64) -> Result<(), IncrementalCheckpointError> {
611 let checkpoint_dir = self
612 .config
613 .checkpoint_dir
614 .join(format!("checkpoint_{id:016x}"));
615
616 if !checkpoint_dir.exists() {
617 return Err(IncrementalCheckpointError::NotFound(format!(
618 "Checkpoint {id} not found"
619 )));
620 }
621
622 fs::remove_dir_all(&checkpoint_dir)?;
623
624 if self.latest_checkpoint_id == Some(id) {
626 let checkpoints = self.list_checkpoints()?;
627 self.latest_checkpoint_id = checkpoints.first().map(|c| c.id);
628 }
629
630 info!(checkpoint_id = id, "Deleted checkpoint");
631 Ok(())
632 }
633}
634
635#[cfg(test)]
636mod tests {
637 use super::*;
638 use tempfile::TempDir;
639
640 #[test]
641 fn test_checkpoint_config_validation() {
642 let temp_dir = TempDir::new().unwrap();
643
644 let config = CheckpointConfig::new(temp_dir.path())
646 .with_interval(Duration::from_secs(60))
647 .with_max_retained(3);
648 assert!(config.validate().is_ok());
649
650 let invalid = CheckpointConfig::new(temp_dir.path()).with_max_retained(0);
652 assert!(invalid.validate().is_err());
653
654 let invalid = CheckpointConfig::new(temp_dir.path()).with_interval(Duration::ZERO);
656 assert!(invalid.validate().is_err());
657 }
658
659 #[test]
660 fn test_checkpoint_metadata() {
661 let metadata = IncrementalCheckpointMetadata::new(1, 100);
662 assert_eq!(metadata.id, 1);
663 assert_eq!(metadata.epoch, 100);
664 assert!(metadata.is_incremental);
665 assert!(metadata.parent_id.is_none());
666
667 let json = metadata.to_json().unwrap();
669 let restored = IncrementalCheckpointMetadata::from_json(&json).unwrap();
670 assert_eq!(restored.id, metadata.id);
671 assert_eq!(restored.epoch, metadata.epoch);
672 }
673
674 #[test]
675 fn test_checkpoint_path() {
676 let metadata = IncrementalCheckpointMetadata::new(0x1234_5678_9abc_def0, 1);
677 let base = Path::new("/data/checkpoints");
678 let path = metadata.checkpoint_path(base);
679 assert_eq!(
680 path,
681 PathBuf::from("/data/checkpoints/checkpoint_123456789abcdef0")
682 );
683 }
684
685 #[test]
686 fn test_manager_creation() {
687 let temp_dir = TempDir::new().unwrap();
688 let config = CheckpointConfig::new(temp_dir.path());
689
690 let manager = IncrementalCheckpointManager::new(config).unwrap();
691 assert!(manager.latest_checkpoint_id().is_none());
692 assert_eq!(manager.epoch(), 0);
693 }
694
695 #[test]
696 fn test_manager_create_checkpoint() {
697 let temp_dir = TempDir::new().unwrap();
698 let config = CheckpointConfig::new(temp_dir.path());
699
700 let mut manager = IncrementalCheckpointManager::new(config).unwrap();
701 manager.set_epoch(42);
702
703 let metadata = manager.create_checkpoint(1000).unwrap();
704 assert_eq!(metadata.epoch, 42);
705 assert_eq!(metadata.wal_position, 1000);
706 assert!(metadata.parent_id.is_none()); let metadata2 = manager.create_checkpoint(2000).unwrap();
710 assert_eq!(metadata2.parent_id, Some(metadata.id));
711 }
712
713 #[test]
714 fn test_manager_create_checkpoint_with_state() {
715 let temp_dir = TempDir::new().unwrap();
716 let config = CheckpointConfig::new(temp_dir.path());
717
718 let mut manager = IncrementalCheckpointManager::new(config).unwrap();
719 manager.set_epoch(10);
720
721 let mut offsets = HashMap::new();
722 offsets.insert("source1".to_string(), 100);
723 offsets.insert("source2".to_string(), 200);
724
725 let state_data = b"test state data";
726 let metadata = manager
727 .create_checkpoint_with_state(500, offsets.clone(), Some(5000), state_data)
728 .unwrap();
729
730 assert_eq!(metadata.epoch, 10);
731 assert_eq!(metadata.wal_position, 500);
732 assert_eq!(metadata.watermark, Some(5000));
733 assert_eq!(metadata.source_offsets.len(), 2);
734 assert_eq!(metadata.source_offsets.get("source1"), Some(&100));
735
736 let loaded = manager.load_checkpoint_state(metadata.id).unwrap();
738 assert_eq!(loaded, state_data);
739 }
740
741 #[test]
742 fn test_manager_list_checkpoints() {
743 let temp_dir = TempDir::new().unwrap();
744 let config = CheckpointConfig::new(temp_dir.path()).with_max_retained(10);
745
746 let mut manager = IncrementalCheckpointManager::new(config).unwrap();
747
748 for i in 0..5 {
750 manager.set_epoch(i);
751 manager.create_checkpoint(i * 100).unwrap();
752 }
753
754 let checkpoints = manager.list_checkpoints().unwrap();
755 assert_eq!(checkpoints.len(), 5);
756
757 assert!(checkpoints[0].id > checkpoints[4].id);
759 }
760
761 #[test]
762 fn test_manager_cleanup() {
763 let temp_dir = TempDir::new().unwrap();
764 let config = CheckpointConfig::new(temp_dir.path()).with_max_retained(2);
765
766 let mut manager = IncrementalCheckpointManager::new(config).unwrap();
767
768 for i in 0..5 {
770 manager.set_epoch(i);
771 manager.create_checkpoint(i * 100).unwrap();
772 }
773
774 let checkpoints = manager.list_checkpoints().unwrap();
775 assert_eq!(checkpoints.len(), 2);
776
777 assert_eq!(checkpoints[0].epoch, 4);
779 assert_eq!(checkpoints[1].epoch, 3);
780 }
781
782 #[test]
783 fn test_manager_find_latest() {
784 let temp_dir = TempDir::new().unwrap();
785 let config = CheckpointConfig::new(temp_dir.path());
786
787 let mut manager = IncrementalCheckpointManager::new(config).unwrap();
788
789 assert!(manager.find_latest_checkpoint().unwrap().is_none());
791
792 manager.set_epoch(1);
794 let metadata = manager.create_checkpoint(100).unwrap();
795
796 let latest = manager.find_latest_checkpoint().unwrap().unwrap();
797 assert_eq!(latest.id, metadata.id);
798 }
799
800 #[test]
801 fn test_manager_delete_checkpoint() {
802 let temp_dir = TempDir::new().unwrap();
803 let config = CheckpointConfig::new(temp_dir.path()).with_max_retained(10);
804
805 let mut manager = IncrementalCheckpointManager::new(config).unwrap();
806
807 manager.set_epoch(1);
808 let meta1 = manager.create_checkpoint(100).unwrap();
809 manager.set_epoch(2);
810 let meta2 = manager.create_checkpoint(200).unwrap();
811
812 assert_eq!(manager.list_checkpoints().unwrap().len(), 2);
813
814 manager.delete_checkpoint(meta1.id).unwrap();
815
816 let checkpoints = manager.list_checkpoints().unwrap();
817 assert_eq!(checkpoints.len(), 1);
818 assert_eq!(checkpoints[0].id, meta2.id);
819 }
820
821 #[test]
822 fn test_manager_should_checkpoint() {
823 let temp_dir = TempDir::new().unwrap();
824 let config = CheckpointConfig::new(temp_dir.path()).with_interval(Duration::from_secs(1));
825
826 let manager = IncrementalCheckpointManager::new(config).unwrap();
827
828 assert!(manager.should_checkpoint());
830 }
831
832 #[test]
833 fn test_scan_existing_checkpoints() {
834 let temp_dir = TempDir::new().unwrap();
835
836 for id in [1u64, 2, 3] {
838 let dir = temp_dir.path().join(format!("checkpoint_{id:016x}"));
839 fs::create_dir_all(&dir).unwrap();
840 let metadata = IncrementalCheckpointMetadata::new(id, id * 10);
841 fs::write(dir.join("metadata.json"), metadata.to_json().unwrap()).unwrap();
842 }
843
844 let config = CheckpointConfig::new(temp_dir.path());
845 let manager = IncrementalCheckpointManager::new(config).unwrap();
846
847 assert_eq!(manager.next_id.load(Ordering::Relaxed), 4);
849 assert_eq!(manager.latest_checkpoint_id, Some(3));
851 }
852
853 #[test]
854 fn test_scan_skips_partial_checkpoint_dirs() {
855 let temp_dir = TempDir::new().unwrap();
856
857 let dir1 = temp_dir.path().join("checkpoint_0000000000000001");
859 fs::create_dir_all(&dir1).unwrap();
860 let metadata = IncrementalCheckpointMetadata::new(1, 10);
861 fs::write(dir1.join("metadata.json"), metadata.to_json().unwrap()).unwrap();
862
863 let dir3 = temp_dir.path().join("checkpoint_0000000000000003");
865 fs::create_dir_all(&dir3).unwrap();
866
867 let config = CheckpointConfig::new(temp_dir.path());
868 let manager = IncrementalCheckpointManager::new(config).unwrap();
869
870 assert_eq!(manager.latest_checkpoint_id, Some(1));
872 assert_eq!(manager.next_id.load(Ordering::Relaxed), 4);
874 }
875}