Skip to main content

laminar_storage/incremental/
manager.rs

1//! Incremental checkpoint manager.
2//!
3//! This module provides incremental checkpointing using directory-based
4//! snapshots with metadata tracking.
5
6#[allow(clippy::disallowed_types)] // cold path: incremental checkpoint
7use std::collections::HashMap;
8use std::fs;
9use std::path::{Path, PathBuf};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use rustc_hash::FxHashMap;
14use serde::{Deserialize, Serialize};
15use tracing::{debug, info};
16
17use super::error::IncrementalCheckpointError;
18
19/// Configuration for incremental checkpointing.
20#[derive(Debug, Clone)]
21pub struct CheckpointConfig {
22    /// Directory for checkpoint storage.
23    pub checkpoint_dir: PathBuf,
24    /// Path to the WAL file.
25    pub wal_path: Option<PathBuf>,
26    /// Checkpoint interval.
27    pub interval: Duration,
28    /// Maximum number of checkpoints to retain.
29    pub max_retained: usize,
30    /// Enable WAL truncation after checkpoint.
31    pub truncate_wal: bool,
32    /// Minimum WAL size before checkpoint (bytes).
33    pub min_wal_size_for_checkpoint: u64,
34    /// Enable incremental checkpoints.
35    pub incremental: bool,
36}
37
38impl CheckpointConfig {
39    /// Creates a new checkpoint configuration with the given directory.
40    #[must_use]
41    pub fn new(checkpoint_dir: &Path) -> Self {
42        Self {
43            checkpoint_dir: checkpoint_dir.to_path_buf(),
44            wal_path: None,
45            interval: Duration::from_secs(60),
46            max_retained: 3,
47            truncate_wal: true,
48            min_wal_size_for_checkpoint: 64 * 1024 * 1024, // 64MB
49            incremental: true,
50        }
51    }
52
53    /// Sets the WAL path.
54    #[must_use]
55    pub fn with_wal_path(mut self, path: &Path) -> Self {
56        self.wal_path = Some(path.to_path_buf());
57        self
58    }
59
60    /// Sets the checkpoint interval.
61    #[must_use]
62    pub fn with_interval(mut self, interval: Duration) -> Self {
63        self.interval = interval;
64        self
65    }
66
67    /// Sets the maximum number of retained checkpoints.
68    #[must_use]
69    pub fn with_max_retained(mut self, max: usize) -> Self {
70        self.max_retained = max;
71        self
72    }
73
74    /// Enables or disables WAL truncation.
75    #[must_use]
76    pub fn with_truncate_wal(mut self, enabled: bool) -> Self {
77        self.truncate_wal = enabled;
78        self
79    }
80
81    /// Sets the minimum WAL size for triggering checkpoint.
82    #[must_use]
83    pub fn with_min_wal_size(mut self, size: u64) -> Self {
84        self.min_wal_size_for_checkpoint = size;
85        self
86    }
87
88    /// Enables or disables incremental checkpoints.
89    #[must_use]
90    pub fn with_incremental(mut self, enabled: bool) -> Self {
91        self.incremental = enabled;
92        self
93    }
94
95    /// Validates the configuration.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if the configuration is invalid.
100    pub fn validate(&self) -> Result<(), IncrementalCheckpointError> {
101        if self.max_retained == 0 {
102            return Err(IncrementalCheckpointError::InvalidConfig(
103                "max_retained must be > 0".to_string(),
104            ));
105        }
106        if self.interval.is_zero() {
107            return Err(IncrementalCheckpointError::InvalidConfig(
108                "interval must be > 0".to_string(),
109            ));
110        }
111        Ok(())
112    }
113}
114
115/// Metadata for an incremental checkpoint.
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct IncrementalCheckpointMetadata {
118    /// Unique checkpoint ID.
119    pub id: u64,
120    /// Epoch at which the checkpoint was taken.
121    pub epoch: u64,
122    /// Unix timestamp when checkpoint was created.
123    pub timestamp: u64,
124    /// WAL position at checkpoint time.
125    pub wal_position: u64,
126    /// Source offsets for exactly-once semantics.
127    pub source_offsets: HashMap<String, u64>,
128    /// Watermark at checkpoint time.
129    pub watermark: Option<i64>,
130    /// Size of the checkpoint in bytes.
131    pub size_bytes: u64,
132    /// Number of keys in the checkpoint.
133    pub key_count: u64,
134    /// Whether this is an incremental checkpoint.
135    pub is_incremental: bool,
136    /// Parent checkpoint ID (for incremental).
137    pub parent_id: Option<u64>,
138    /// `SSTable` files included (for incremental, relative paths).
139    pub sst_files: Vec<String>,
140}
141
142impl IncrementalCheckpointMetadata {
143    /// Creates a new checkpoint metadata instance.
144    #[must_use]
145    pub fn new(id: u64, epoch: u64) -> Self {
146        let timestamp = SystemTime::now()
147            .duration_since(UNIX_EPOCH)
148            .unwrap_or_default()
149            .as_secs();
150
151        Self {
152            id,
153            epoch,
154            timestamp,
155            wal_position: 0,
156            source_offsets: HashMap::new(),
157            watermark: None,
158            size_bytes: 0,
159            key_count: 0,
160            is_incremental: true,
161            parent_id: None,
162            sst_files: Vec::new(),
163        }
164    }
165
166    /// Returns the path to this checkpoint's directory.
167    #[must_use]
168    pub fn checkpoint_path(&self, base_dir: &Path) -> PathBuf {
169        base_dir.join(format!("checkpoint_{:016x}", self.id))
170    }
171
172    /// Serializes metadata to JSON.
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if serialization fails.
177    pub fn to_json(&self) -> Result<String, IncrementalCheckpointError> {
178        serde_json::to_string_pretty(self)
179            .map_err(|e| IncrementalCheckpointError::Serialization(e.to_string()))
180    }
181
182    /// Deserializes metadata from JSON.
183    ///
184    /// # Errors
185    ///
186    /// Returns an error if deserialization fails.
187    pub fn from_json(json: &str) -> Result<Self, IncrementalCheckpointError> {
188        serde_json::from_str(json)
189            .map_err(|e| IncrementalCheckpointError::Deserialization(e.to_string()))
190    }
191}
192
193/// Incremental checkpoint manager.
194///
195/// This manager creates and manages incremental checkpoints using
196/// directory-based snapshots with metadata tracking.
197pub struct IncrementalCheckpointManager {
198    /// Configuration.
199    config: CheckpointConfig,
200    /// Next checkpoint ID.
201    next_id: AtomicU64,
202    /// Current epoch.
203    current_epoch: AtomicU64,
204    /// Last checkpoint time (Unix timestamp).
205    last_checkpoint_time: AtomicU64,
206    /// Latest checkpoint ID.
207    latest_checkpoint_id: Option<u64>,
208    /// In-memory state store.
209    state: FxHashMap<Vec<u8>, Vec<u8>>,
210    /// Source offsets for exactly-once semantics.
211    source_offsets: HashMap<String, u64>,
212    /// Current watermark.
213    watermark: Option<i64>,
214}
215
216impl IncrementalCheckpointManager {
217    /// Creates a new incremental checkpoint manager.
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if the configuration is invalid or directory creation fails.
222    pub fn new(config: CheckpointConfig) -> Result<Self, IncrementalCheckpointError> {
223        config.validate()?;
224
225        // Create checkpoint directory
226        fs::create_dir_all(&config.checkpoint_dir)?;
227
228        // Find existing checkpoints
229        let (next_id, latest_id) = Self::scan_checkpoints(&config.checkpoint_dir)?;
230
231        Ok(Self {
232            config,
233            next_id: AtomicU64::new(next_id),
234            current_epoch: AtomicU64::new(0),
235            last_checkpoint_time: AtomicU64::new(0),
236            latest_checkpoint_id: latest_id,
237            state: FxHashMap::default(),
238            source_offsets: HashMap::new(),
239            watermark: None,
240        })
241    }
242
243    /// Scans the checkpoint directory to find existing checkpoints.
244    ///
245    /// `next_id` is based on the highest directory ID (including partial
246    /// dirs) to avoid ID collisions. `latest_id` only considers dirs with
247    /// a valid `metadata.json` to prevent partial checkpoint directories
248    /// from poisoning the incremental `parent_id` chain.
249    fn scan_checkpoints(dir: &Path) -> Result<(u64, Option<u64>), IncrementalCheckpointError> {
250        let mut max_dir_id = 0u64;
251        let mut latest_valid_id = None;
252
253        if dir.exists() {
254            for entry in fs::read_dir(dir)? {
255                let entry = entry?;
256                let name = entry.file_name();
257                let name_str = name.to_string_lossy();
258
259                if let Some(id_str) = name_str.strip_prefix("checkpoint_") {
260                    if let Ok(id) = u64::from_str_radix(id_str, 16) {
261                        // Track highest dir ID for next_id (avoids collisions).
262                        if id >= max_dir_id {
263                            max_dir_id = id;
264                        }
265                        // Only set latest_id if metadata.json exists.
266                        // Partial dirs (crash before metadata write) must not
267                        // set latest_id or they poison parent_id chains.
268                        let metadata_path = dir.join(name_str.as_ref()).join("metadata.json");
269                        if !metadata_path.exists() {
270                            debug!(
271                                checkpoint_id = id,
272                                "skipping partial checkpoint dir (no metadata.json)"
273                            );
274                            continue;
275                        }
276                        if latest_valid_id.is_none_or(|prev| id >= prev) {
277                            latest_valid_id = Some(id);
278                        }
279                    }
280                }
281            }
282        }
283
284        Ok((max_dir_id + 1, latest_valid_id))
285    }
286
287    /// Returns the configuration.
288    #[must_use]
289    pub fn config(&self) -> &CheckpointConfig {
290        &self.config
291    }
292
293    /// Sets the current epoch.
294    pub fn set_epoch(&self, epoch: u64) {
295        self.current_epoch.store(epoch, Ordering::Release);
296    }
297
298    /// Returns the current epoch.
299    #[must_use]
300    pub fn epoch(&self) -> u64 {
301        self.current_epoch.load(Ordering::Acquire)
302    }
303
304    /// Puts a key-value pair into the state.
305    ///
306    /// # Errors
307    ///
308    /// Returns an error if the write fails.
309    #[allow(clippy::unnecessary_wraps)]
310    pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), IncrementalCheckpointError> {
311        self.state.insert(key.to_vec(), value.to_vec());
312        Ok(())
313    }
314
315    /// Deletes a key from the state.
316    ///
317    /// # Errors
318    ///
319    /// Returns an error if the delete fails.
320    #[allow(clippy::unnecessary_wraps)]
321    pub fn delete(&mut self, key: &[u8]) -> Result<(), IncrementalCheckpointError> {
322        self.state.remove(key);
323        Ok(())
324    }
325
326    /// Sets a source offset for exactly-once semantics.
327    pub fn set_source_offset(&mut self, source: String, offset: u64) {
328        self.source_offsets.insert(source, offset);
329    }
330
331    /// Returns the source offsets.
332    #[must_use]
333    pub fn source_offsets(&self) -> &HashMap<String, u64> {
334        &self.source_offsets
335    }
336
337    /// Sets the current watermark.
338    pub fn set_watermark(&mut self, watermark: i64) {
339        self.watermark = Some(watermark);
340    }
341
342    /// Returns the current watermark.
343    #[must_use]
344    pub fn watermark(&self) -> Option<i64> {
345        self.watermark
346    }
347
348    /// Returns the latest checkpoint ID.
349    #[must_use]
350    pub fn latest_checkpoint_id(&self) -> Option<u64> {
351        self.latest_checkpoint_id
352    }
353
354    /// Checks if it's time to create a checkpoint.
355    #[must_use]
356    pub fn should_checkpoint(&self) -> bool {
357        let last = self.last_checkpoint_time.load(Ordering::Relaxed);
358        let now = SystemTime::now()
359            .duration_since(UNIX_EPOCH)
360            .unwrap_or_default()
361            .as_secs();
362
363        now.saturating_sub(last) >= self.config.interval.as_secs()
364    }
365
366    /// Creates a new incremental checkpoint.
367    ///
368    /// This creates a checkpoint of the current state using directory-based
369    /// snapshots.
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if checkpoint creation fails.
374    pub fn create_checkpoint(
375        &mut self,
376        wal_position: u64,
377    ) -> Result<IncrementalCheckpointMetadata, IncrementalCheckpointError> {
378        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
379        let epoch = self.current_epoch.load(Ordering::Acquire);
380
381        let mut metadata = IncrementalCheckpointMetadata::new(id, epoch);
382        metadata.wal_position = wal_position;
383        metadata.parent_id = self.latest_checkpoint_id;
384        metadata.is_incremental = self.config.incremental && self.latest_checkpoint_id.is_some();
385
386        let checkpoint_path = metadata.checkpoint_path(&self.config.checkpoint_dir);
387
388        // Create checkpoint directory
389        fs::create_dir_all(&checkpoint_path)?;
390
391        // Write metadata
392        let metadata_path = checkpoint_path.join("metadata.json");
393        let metadata_json = metadata.to_json()?;
394        fs::write(&metadata_path, &metadata_json)?;
395
396        // Update tracking state
397        self.latest_checkpoint_id = Some(id);
398        self.last_checkpoint_time.store(
399            SystemTime::now()
400                .duration_since(UNIX_EPOCH)
401                .unwrap_or_default()
402                .as_secs(),
403            Ordering::Relaxed,
404        );
405
406        // Cleanup old checkpoints
407        self.cleanup_old_checkpoints()?;
408
409        Ok(metadata)
410    }
411
412    /// Creates a checkpoint with additional state data.
413    ///
414    /// # Errors
415    ///
416    /// Returns an error if checkpoint creation fails.
417    pub fn create_checkpoint_with_state(
418        &mut self,
419        wal_position: u64,
420        source_offsets: HashMap<String, u64>,
421        watermark: Option<i64>,
422        state_data: &[u8],
423    ) -> Result<IncrementalCheckpointMetadata, IncrementalCheckpointError> {
424        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
425        let epoch = self.current_epoch.load(Ordering::Acquire);
426
427        let mut metadata = IncrementalCheckpointMetadata::new(id, epoch);
428        metadata.wal_position = wal_position;
429        metadata.source_offsets = source_offsets;
430        metadata.watermark = watermark;
431        metadata.parent_id = self.latest_checkpoint_id;
432        metadata.is_incremental = self.config.incremental && self.latest_checkpoint_id.is_some();
433
434        let checkpoint_path = metadata.checkpoint_path(&self.config.checkpoint_dir);
435        fs::create_dir_all(&checkpoint_path)?;
436
437        // Write state data
438        let state_path = checkpoint_path.join("state.bin");
439        fs::write(&state_path, state_data)?;
440
441        #[allow(clippy::cast_possible_truncation)]
442        // usize → u64: lossless on 64-bit, acceptable on 32-bit
443        {
444            metadata.size_bytes = state_data.len() as u64;
445        }
446
447        // Write metadata
448        let metadata_path = checkpoint_path.join("metadata.json");
449        let metadata_json = metadata.to_json()?;
450        fs::write(&metadata_path, &metadata_json)?;
451
452        // Update tracking state
453        self.latest_checkpoint_id = Some(id);
454        self.last_checkpoint_time.store(
455            SystemTime::now()
456                .duration_since(UNIX_EPOCH)
457                .unwrap_or_default()
458                .as_secs(),
459            Ordering::Relaxed,
460        );
461
462        info!(
463            checkpoint_id = id,
464            epoch = epoch,
465            wal_position = wal_position,
466            size_bytes = metadata.size_bytes,
467            "Created checkpoint with state"
468        );
469
470        self.cleanup_old_checkpoints()?;
471
472        Ok(metadata)
473    }
474
475    /// Finds the latest checkpoint.
476    ///
477    /// # Errors
478    ///
479    /// Returns an error if reading checkpoint metadata fails.
480    pub fn find_latest_checkpoint(
481        &self,
482    ) -> Result<Option<IncrementalCheckpointMetadata>, IncrementalCheckpointError> {
483        let Some(id) = self.latest_checkpoint_id else {
484            return Ok(None);
485        };
486
487        self.load_checkpoint_metadata(id)
488    }
489
490    /// Loads checkpoint metadata by ID.
491    ///
492    /// # Errors
493    ///
494    /// Returns an error if reading metadata fails.
495    pub fn load_checkpoint_metadata(
496        &self,
497        id: u64,
498    ) -> Result<Option<IncrementalCheckpointMetadata>, IncrementalCheckpointError> {
499        let checkpoint_dir = self
500            .config
501            .checkpoint_dir
502            .join(format!("checkpoint_{id:016x}"));
503        let metadata_path = checkpoint_dir.join("metadata.json");
504
505        if !metadata_path.exists() {
506            return Ok(None);
507        }
508
509        let metadata_json = fs::read_to_string(&metadata_path)?;
510        let metadata = IncrementalCheckpointMetadata::from_json(&metadata_json)?;
511        Ok(Some(metadata))
512    }
513
514    /// Loads state data from a checkpoint.
515    ///
516    /// # Errors
517    ///
518    /// Returns an error if reading state data fails.
519    pub fn load_checkpoint_state(&self, id: u64) -> Result<Vec<u8>, IncrementalCheckpointError> {
520        let checkpoint_dir = self
521            .config
522            .checkpoint_dir
523            .join(format!("checkpoint_{id:016x}"));
524        let state_path = checkpoint_dir.join("state.bin");
525
526        if !state_path.exists() {
527            return Err(IncrementalCheckpointError::NotFound(format!(
528                "State file not found for checkpoint {id}"
529            )));
530        }
531
532        Ok(fs::read(&state_path)?)
533    }
534
535    /// Lists all checkpoints sorted by ID (newest first).
536    ///
537    /// # Errors
538    ///
539    /// Returns an error if reading checkpoints fails.
540    pub fn list_checkpoints(
541        &self,
542    ) -> Result<Vec<IncrementalCheckpointMetadata>, IncrementalCheckpointError> {
543        let mut checkpoints = Vec::new();
544
545        if !self.config.checkpoint_dir.exists() {
546            return Ok(checkpoints);
547        }
548
549        for entry in fs::read_dir(&self.config.checkpoint_dir)? {
550            let entry = entry?;
551            let name = entry.file_name();
552            let name_str = name.to_string_lossy();
553
554            if let Some(id_str) = name_str.strip_prefix("checkpoint_") {
555                if let Ok(id) = u64::from_str_radix(id_str, 16) {
556                    if let Ok(Some(metadata)) = self.load_checkpoint_metadata(id) {
557                        checkpoints.push(metadata);
558                    }
559                }
560            }
561        }
562
563        // Sort by ID descending (newest first)
564        checkpoints.sort_by(|a, b| b.id.cmp(&a.id));
565
566        Ok(checkpoints)
567    }
568
569    /// Cleans up old checkpoints beyond the retention limit.
570    ///
571    /// # Errors
572    ///
573    /// Returns an error if cleanup fails.
574    pub fn cleanup_old_checkpoints(&self) -> Result<(), IncrementalCheckpointError> {
575        self.cleanup_old_checkpoints_keep(self.config.max_retained)
576    }
577
578    /// Cleans up old checkpoints, keeping only `keep_count` most recent.
579    ///
580    /// # Errors
581    ///
582    /// Returns an error if cleanup fails.
583    pub fn cleanup_old_checkpoints_keep(
584        &self,
585        keep_count: usize,
586    ) -> Result<(), IncrementalCheckpointError> {
587        let checkpoints = self.list_checkpoints()?;
588
589        if checkpoints.len() <= keep_count {
590            return Ok(());
591        }
592
593        // Remove checkpoints beyond retention limit
594        for checkpoint in checkpoints.iter().skip(keep_count) {
595            let checkpoint_dir = checkpoint.checkpoint_path(&self.config.checkpoint_dir);
596            if checkpoint_dir.exists() {
597                debug!(checkpoint_id = checkpoint.id, "Removing old checkpoint");
598                fs::remove_dir_all(&checkpoint_dir)?;
599            }
600        }
601
602        Ok(())
603    }
604
605    /// Deletes a specific checkpoint.
606    ///
607    /// # Errors
608    ///
609    /// Returns an error if deletion fails.
610    pub fn delete_checkpoint(&mut self, id: u64) -> Result<(), IncrementalCheckpointError> {
611        let checkpoint_dir = self
612            .config
613            .checkpoint_dir
614            .join(format!("checkpoint_{id:016x}"));
615
616        if !checkpoint_dir.exists() {
617            return Err(IncrementalCheckpointError::NotFound(format!(
618                "Checkpoint {id} not found"
619            )));
620        }
621
622        fs::remove_dir_all(&checkpoint_dir)?;
623
624        // Update latest checkpoint if we deleted it
625        if self.latest_checkpoint_id == Some(id) {
626            let checkpoints = self.list_checkpoints()?;
627            self.latest_checkpoint_id = checkpoints.first().map(|c| c.id);
628        }
629
630        info!(checkpoint_id = id, "Deleted checkpoint");
631        Ok(())
632    }
633}
634
635#[cfg(test)]
636mod tests {
637    use super::*;
638    use tempfile::TempDir;
639
640    #[test]
641    fn test_checkpoint_config_validation() {
642        let temp_dir = TempDir::new().unwrap();
643
644        // Valid config
645        let config = CheckpointConfig::new(temp_dir.path())
646            .with_interval(Duration::from_secs(60))
647            .with_max_retained(3);
648        assert!(config.validate().is_ok());
649
650        // Invalid: zero max_retained
651        let invalid = CheckpointConfig::new(temp_dir.path()).with_max_retained(0);
652        assert!(invalid.validate().is_err());
653
654        // Invalid: zero interval
655        let invalid = CheckpointConfig::new(temp_dir.path()).with_interval(Duration::ZERO);
656        assert!(invalid.validate().is_err());
657    }
658
659    #[test]
660    fn test_checkpoint_metadata() {
661        let metadata = IncrementalCheckpointMetadata::new(1, 100);
662        assert_eq!(metadata.id, 1);
663        assert_eq!(metadata.epoch, 100);
664        assert!(metadata.is_incremental);
665        assert!(metadata.parent_id.is_none());
666
667        // Test JSON roundtrip
668        let json = metadata.to_json().unwrap();
669        let restored = IncrementalCheckpointMetadata::from_json(&json).unwrap();
670        assert_eq!(restored.id, metadata.id);
671        assert_eq!(restored.epoch, metadata.epoch);
672    }
673
674    #[test]
675    fn test_checkpoint_path() {
676        let metadata = IncrementalCheckpointMetadata::new(0x1234_5678_9abc_def0, 1);
677        let base = Path::new("/data/checkpoints");
678        let path = metadata.checkpoint_path(base);
679        assert_eq!(
680            path,
681            PathBuf::from("/data/checkpoints/checkpoint_123456789abcdef0")
682        );
683    }
684
685    #[test]
686    fn test_manager_creation() {
687        let temp_dir = TempDir::new().unwrap();
688        let config = CheckpointConfig::new(temp_dir.path());
689
690        let manager = IncrementalCheckpointManager::new(config).unwrap();
691        assert!(manager.latest_checkpoint_id().is_none());
692        assert_eq!(manager.epoch(), 0);
693    }
694
695    #[test]
696    fn test_manager_create_checkpoint() {
697        let temp_dir = TempDir::new().unwrap();
698        let config = CheckpointConfig::new(temp_dir.path());
699
700        let mut manager = IncrementalCheckpointManager::new(config).unwrap();
701        manager.set_epoch(42);
702
703        let metadata = manager.create_checkpoint(1000).unwrap();
704        assert_eq!(metadata.epoch, 42);
705        assert_eq!(metadata.wal_position, 1000);
706        assert!(metadata.parent_id.is_none()); // First checkpoint
707
708        // Second checkpoint should have parent
709        let metadata2 = manager.create_checkpoint(2000).unwrap();
710        assert_eq!(metadata2.parent_id, Some(metadata.id));
711    }
712
713    #[test]
714    fn test_manager_create_checkpoint_with_state() {
715        let temp_dir = TempDir::new().unwrap();
716        let config = CheckpointConfig::new(temp_dir.path());
717
718        let mut manager = IncrementalCheckpointManager::new(config).unwrap();
719        manager.set_epoch(10);
720
721        let mut offsets = HashMap::new();
722        offsets.insert("source1".to_string(), 100);
723        offsets.insert("source2".to_string(), 200);
724
725        let state_data = b"test state data";
726        let metadata = manager
727            .create_checkpoint_with_state(500, offsets.clone(), Some(5000), state_data)
728            .unwrap();
729
730        assert_eq!(metadata.epoch, 10);
731        assert_eq!(metadata.wal_position, 500);
732        assert_eq!(metadata.watermark, Some(5000));
733        assert_eq!(metadata.source_offsets.len(), 2);
734        assert_eq!(metadata.source_offsets.get("source1"), Some(&100));
735
736        // Load state back
737        let loaded = manager.load_checkpoint_state(metadata.id).unwrap();
738        assert_eq!(loaded, state_data);
739    }
740
741    #[test]
742    fn test_manager_list_checkpoints() {
743        let temp_dir = TempDir::new().unwrap();
744        let config = CheckpointConfig::new(temp_dir.path()).with_max_retained(10);
745
746        let mut manager = IncrementalCheckpointManager::new(config).unwrap();
747
748        // Create multiple checkpoints
749        for i in 0..5 {
750            manager.set_epoch(i);
751            manager.create_checkpoint(i * 100).unwrap();
752        }
753
754        let checkpoints = manager.list_checkpoints().unwrap();
755        assert_eq!(checkpoints.len(), 5);
756
757        // Should be sorted newest first
758        assert!(checkpoints[0].id > checkpoints[4].id);
759    }
760
761    #[test]
762    fn test_manager_cleanup() {
763        let temp_dir = TempDir::new().unwrap();
764        let config = CheckpointConfig::new(temp_dir.path()).with_max_retained(2);
765
766        let mut manager = IncrementalCheckpointManager::new(config).unwrap();
767
768        // Create 5 checkpoints (should only keep 2)
769        for i in 0..5 {
770            manager.set_epoch(i);
771            manager.create_checkpoint(i * 100).unwrap();
772        }
773
774        let checkpoints = manager.list_checkpoints().unwrap();
775        assert_eq!(checkpoints.len(), 2);
776
777        // Should have the 2 newest
778        assert_eq!(checkpoints[0].epoch, 4);
779        assert_eq!(checkpoints[1].epoch, 3);
780    }
781
782    #[test]
783    fn test_manager_find_latest() {
784        let temp_dir = TempDir::new().unwrap();
785        let config = CheckpointConfig::new(temp_dir.path());
786
787        let mut manager = IncrementalCheckpointManager::new(config).unwrap();
788
789        // No checkpoints yet
790        assert!(manager.find_latest_checkpoint().unwrap().is_none());
791
792        // Create a checkpoint
793        manager.set_epoch(1);
794        let metadata = manager.create_checkpoint(100).unwrap();
795
796        let latest = manager.find_latest_checkpoint().unwrap().unwrap();
797        assert_eq!(latest.id, metadata.id);
798    }
799
800    #[test]
801    fn test_manager_delete_checkpoint() {
802        let temp_dir = TempDir::new().unwrap();
803        let config = CheckpointConfig::new(temp_dir.path()).with_max_retained(10);
804
805        let mut manager = IncrementalCheckpointManager::new(config).unwrap();
806
807        manager.set_epoch(1);
808        let meta1 = manager.create_checkpoint(100).unwrap();
809        manager.set_epoch(2);
810        let meta2 = manager.create_checkpoint(200).unwrap();
811
812        assert_eq!(manager.list_checkpoints().unwrap().len(), 2);
813
814        manager.delete_checkpoint(meta1.id).unwrap();
815
816        let checkpoints = manager.list_checkpoints().unwrap();
817        assert_eq!(checkpoints.len(), 1);
818        assert_eq!(checkpoints[0].id, meta2.id);
819    }
820
821    #[test]
822    fn test_manager_should_checkpoint() {
823        let temp_dir = TempDir::new().unwrap();
824        let config = CheckpointConfig::new(temp_dir.path()).with_interval(Duration::from_secs(1));
825
826        let manager = IncrementalCheckpointManager::new(config).unwrap();
827
828        // Initially should checkpoint (last_checkpoint_time is 0)
829        assert!(manager.should_checkpoint());
830    }
831
832    #[test]
833    fn test_scan_existing_checkpoints() {
834        let temp_dir = TempDir::new().unwrap();
835
836        // Create checkpoint directories with metadata.json so they're recognized
837        for id in [1u64, 2, 3] {
838            let dir = temp_dir.path().join(format!("checkpoint_{id:016x}"));
839            fs::create_dir_all(&dir).unwrap();
840            let metadata = IncrementalCheckpointMetadata::new(id, id * 10);
841            fs::write(dir.join("metadata.json"), metadata.to_json().unwrap()).unwrap();
842        }
843
844        let config = CheckpointConfig::new(temp_dir.path());
845        let manager = IncrementalCheckpointManager::new(config).unwrap();
846
847        // Next ID should be 4 (max 3 + 1)
848        assert_eq!(manager.next_id.load(Ordering::Relaxed), 4);
849        // Latest should be 3
850        assert_eq!(manager.latest_checkpoint_id, Some(3));
851    }
852
853    #[test]
854    fn test_scan_skips_partial_checkpoint_dirs() {
855        let temp_dir = TempDir::new().unwrap();
856
857        // Valid checkpoint with metadata.json
858        let dir1 = temp_dir.path().join("checkpoint_0000000000000001");
859        fs::create_dir_all(&dir1).unwrap();
860        let metadata = IncrementalCheckpointMetadata::new(1, 10);
861        fs::write(dir1.join("metadata.json"), metadata.to_json().unwrap()).unwrap();
862
863        // Partial checkpoint dir (no metadata.json) — should be skipped for latest_id
864        let dir3 = temp_dir.path().join("checkpoint_0000000000000003");
865        fs::create_dir_all(&dir3).unwrap();
866
867        let config = CheckpointConfig::new(temp_dir.path());
868        let manager = IncrementalCheckpointManager::new(config).unwrap();
869
870        // Latest should be 1 (dir 3 was skipped due to missing metadata)
871        assert_eq!(manager.latest_checkpoint_id, Some(1));
872        // next_id should still be 4 (max dir id 3 + 1) to avoid ID collision
873        assert_eq!(manager.next_id.load(Ordering::Relaxed), 4);
874    }
875}