Skip to main content

amaters_cluster/
snapshot.rs

1//! Snapshot management and log compaction for Raft consensus
2//!
3//! This module implements the snapshot mechanism described in Section 7 of the
4//! Raft paper. Snapshots allow the system to compact the log by capturing a
5//! point-in-time state of the state machine, enabling the removal of all log
6//! entries up to that point.
7
8use crate::error::{RaftError, RaftResult};
9use crate::types::{LogIndex, NodeId, Term};
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::fs;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::{Path, PathBuf};
15use tracing::{debug, info, warn};
16
17/// Snapshot metadata describing the state captured in a snapshot
18#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
19pub struct SnapshotMetadata {
20    /// Index of the last log entry included in the snapshot
21    pub last_included_index: LogIndex,
22    /// Term of the last log entry included in the snapshot
23    pub last_included_term: Term,
24    /// Timestamp when the snapshot was created
25    pub created_at: DateTime<Utc>,
26    /// Size of the snapshot data in bytes
27    pub size_bytes: u64,
28    /// CRC32 checksum of the snapshot data for integrity verification
29    pub checksum: u32,
30}
31
32impl SnapshotMetadata {
33    /// Create new snapshot metadata
34    pub fn new(
35        last_included_index: LogIndex,
36        last_included_term: Term,
37        size_bytes: u64,
38        checksum: u32,
39    ) -> Self {
40        Self {
41            last_included_index,
42            last_included_term,
43            created_at: Utc::now(),
44            size_bytes,
45            checksum,
46        }
47    }
48
49    /// Filename for this snapshot's metadata file
50    pub(crate) fn metadata_filename(&self) -> String {
51        format!(
52            "snapshot-{:016x}-{:016x}.meta.json",
53            self.last_included_term, self.last_included_index
54        )
55    }
56
57    /// Filename for this snapshot's data file
58    pub(crate) fn data_filename(&self) -> String {
59        format!(
60            "snapshot-{:016x}-{:016x}.data",
61            self.last_included_term, self.last_included_index
62        )
63    }
64}
65
66/// A complete snapshot containing metadata and serialized state machine data
67#[derive(Debug, Clone)]
68pub struct Snapshot {
69    /// Metadata describing this snapshot
70    pub metadata: SnapshotMetadata,
71    /// Serialized state machine data
72    pub data: Vec<u8>,
73}
74
75impl Snapshot {
76    /// Create a new snapshot from raw data
77    pub fn new(last_included_index: LogIndex, last_included_term: Term, data: Vec<u8>) -> Self {
78        let checksum = crc32fast::hash(&data);
79        let size_bytes = data.len() as u64;
80        let metadata = SnapshotMetadata::new(
81            last_included_index,
82            last_included_term,
83            size_bytes,
84            checksum,
85        );
86        Self { metadata, data }
87    }
88
89    /// Verify the integrity of the snapshot data against its checksum
90    pub fn verify_checksum(&self) -> bool {
91        let computed = crc32fast::hash(&self.data);
92        computed == self.metadata.checksum
93    }
94}
95
96/// Configuration for snapshot behavior
97#[derive(Debug, Clone)]
98pub struct SnapshotConfig {
99    /// Directory where snapshots are stored
100    pub snapshot_dir: PathBuf,
101    /// Maximum number of snapshots to retain on disk
102    pub max_snapshots: usize,
103    /// Number of log entries that trigger automatic snapshot creation
104    pub snapshot_threshold: u64,
105}
106
107impl SnapshotConfig {
108    /// Create a new snapshot configuration
109    pub fn new(snapshot_dir: PathBuf, max_snapshots: usize, snapshot_threshold: u64) -> Self {
110        Self {
111            snapshot_dir,
112            max_snapshots,
113            snapshot_threshold,
114        }
115    }
116
117    /// Create a default configuration using a temporary directory
118    pub fn with_defaults(snapshot_dir: PathBuf) -> Self {
119        Self {
120            snapshot_dir,
121            max_snapshots: 3,
122            snapshot_threshold: 10000,
123        }
124    }
125}
126
127/// Policy for automatic snapshot creation
128///
129/// Controls when snapshots are triggered based on log size thresholds.
130/// This policy is checked after each batch of entries is applied to the
131/// state machine.
132#[derive(Debug, Clone)]
133pub struct SnapshotPolicy {
134    /// Maximum number of log entries since the last snapshot before
135    /// triggering a new one. Set to 0 to disable automatic snapshots.
136    pub max_log_entries: u64,
137    /// Minimum number of applied entries before the first snapshot.
138    /// Prevents creating snapshots too early when the system is bootstrapping.
139    pub min_applied_before_snapshot: u64,
140}
141
142impl SnapshotPolicy {
143    /// Create a new snapshot policy with the given threshold
144    pub fn new(max_log_entries: u64) -> Self {
145        Self {
146            max_log_entries,
147            min_applied_before_snapshot: 0,
148        }
149    }
150
151    /// Create a disabled policy (no automatic snapshots)
152    pub fn disabled() -> Self {
153        Self {
154            max_log_entries: 0,
155            min_applied_before_snapshot: 0,
156        }
157    }
158
159    /// Set the minimum applied entries before first snapshot
160    pub fn with_min_applied(mut self, min: u64) -> Self {
161        self.min_applied_before_snapshot = min;
162        self
163    }
164
165    /// Check if a snapshot should be created based on current log size
166    ///
167    /// Returns true if:
168    /// - The policy is enabled (max_log_entries > 0)
169    /// - entries_since_snapshot >= max_log_entries
170    /// - applied_index >= min_applied_before_snapshot
171    pub fn should_snapshot(&self, entries_since_snapshot: u64, applied_index: u64) -> bool {
172        if self.max_log_entries == 0 {
173            return false;
174        }
175        if applied_index < self.min_applied_before_snapshot {
176            return false;
177        }
178        entries_since_snapshot >= self.max_log_entries
179    }
180}
181
182impl Default for SnapshotPolicy {
183    fn default() -> Self {
184        Self::new(10_000)
185    }
186}
187
188/// Manages snapshot creation, storage, loading, and cleanup
189pub struct SnapshotManager {
190    /// Configuration for snapshot behavior
191    pub(crate) config: SnapshotConfig,
192    /// Metadata of the latest known snapshot (cached)
193    latest: Option<SnapshotMetadata>,
194}
195
196impl SnapshotManager {
197    /// Create a new snapshot manager
198    ///
199    /// Initializes the snapshot directory and scans for existing snapshots.
200    pub fn new(config: SnapshotConfig) -> RaftResult<Self> {
201        // Ensure snapshot directory exists
202        fs::create_dir_all(&config.snapshot_dir).map_err(|e| RaftError::StorageError {
203            message: format!(
204                "Failed to create snapshot directory '{}': {}",
205                config.snapshot_dir.display(),
206                e
207            ),
208        })?;
209
210        let mut manager = Self {
211            config,
212            latest: None,
213        };
214
215        // Scan for existing snapshots and set latest
216        manager.scan_existing_snapshots()?;
217
218        Ok(manager)
219    }
220
221    /// Scan the snapshot directory for existing snapshot metadata files
222    fn scan_existing_snapshots(&mut self) -> RaftResult<()> {
223        let entries =
224            fs::read_dir(&self.config.snapshot_dir).map_err(|e| RaftError::StorageError {
225                message: format!(
226                    "Failed to read snapshot directory '{}': {}",
227                    self.config.snapshot_dir.display(),
228                    e
229                ),
230            })?;
231
232        let mut best: Option<SnapshotMetadata> = None;
233
234        for entry in entries {
235            let entry = entry.map_err(|e| RaftError::StorageError {
236                message: format!("Failed to read directory entry: {}", e),
237            })?;
238
239            let path = entry.path();
240            if let Some(ext) = path.extension() {
241                // We look for .json files that end in .meta.json
242                if ext == "json" {
243                    if let Some(stem) = path.file_stem() {
244                        let stem_str = stem.to_string_lossy();
245                        if stem_str.ends_with(".meta") {
246                            match self.load_metadata_from_file(&path) {
247                                Ok(meta) => {
248                                    let dominated = best.as_ref().is_some_and(|b| {
249                                        (b.last_included_term, b.last_included_index)
250                                            >= (meta.last_included_term, meta.last_included_index)
251                                    });
252                                    if !dominated {
253                                        best = Some(meta);
254                                    }
255                                }
256                                Err(e) => {
257                                    warn!(
258                                        path = %path.display(),
259                                        error = %e,
260                                        "Skipping corrupt snapshot metadata"
261                                    );
262                                }
263                            }
264                        }
265                    }
266                }
267            }
268        }
269
270        self.latest = best;
271        Ok(())
272    }
273
274    /// Load snapshot metadata from a specific file
275    fn load_metadata_from_file(&self, path: &Path) -> RaftResult<SnapshotMetadata> {
276        let contents = fs::read_to_string(path).map_err(|e| RaftError::StorageError {
277            message: format!("Failed to read metadata file '{}': {}", path.display(), e),
278        })?;
279
280        serde_json::from_str(&contents).map_err(|e| RaftError::StorageError {
281            message: format!("Failed to parse metadata file '{}': {}", path.display(), e),
282        })
283    }
284
285    /// Atomically write data to a file: write to temp, fsync, rename.
286    ///
287    /// This ensures that on crash, either the old file or the new file exists
288    /// in its entirety — never a partially written file.
289    fn atomic_write(final_path: &Path, data: &[u8]) -> RaftResult<()> {
290        let ext = final_path
291            .extension()
292            .map(|e| e.to_string_lossy())
293            .unwrap_or_default();
294        let tmp_path = final_path.with_extension(format!("{}.tmp", ext));
295        let mut f = fs::File::create(&tmp_path).map_err(|e| RaftError::StorageError {
296            message: format!("Failed to create temp file '{}': {}", tmp_path.display(), e),
297        })?;
298        f.write_all(data).map_err(|e| RaftError::StorageError {
299            message: format!("Failed to write temp file '{}': {}", tmp_path.display(), e),
300        })?;
301        f.sync_all().map_err(|e| RaftError::StorageError {
302            message: format!("Failed to fsync temp file '{}': {}", tmp_path.display(), e),
303        })?;
304        fs::rename(&tmp_path, final_path).map_err(|e| RaftError::StorageError {
305            message: format!(
306                "Failed to rename '{}' to '{}': {}",
307                tmp_path.display(),
308                final_path.display(),
309                e
310            ),
311        })?;
312        Ok(())
313    }
314
315    /// Create and persist a new snapshot
316    ///
317    /// Writes the snapshot data and metadata to disk atomically (write-to-temp
318    /// + fsync + rename), updates the latest metadata cache, and cleans up old
319    ///   snapshots.
320    pub fn create_snapshot(
321        &mut self,
322        data: Vec<u8>,
323        last_included_index: LogIndex,
324        last_included_term: Term,
325    ) -> RaftResult<SnapshotMetadata> {
326        let snapshot = Snapshot::new(last_included_index, last_included_term, data);
327
328        // Atomically write data file
329        let data_path = self
330            .config
331            .snapshot_dir
332            .join(snapshot.metadata.data_filename());
333        Self::atomic_write(&data_path, &snapshot.data)?;
334
335        // Atomically write metadata file
336        let meta_path = self
337            .config
338            .snapshot_dir
339            .join(snapshot.metadata.metadata_filename());
340        let meta_json = serde_json::to_string_pretty(&snapshot.metadata).map_err(|e| {
341            RaftError::StorageError {
342                message: format!("Failed to serialize snapshot metadata: {}", e),
343            }
344        })?;
345        Self::atomic_write(&meta_path, meta_json.as_bytes())?;
346
347        info!(
348            last_included_index = last_included_index,
349            last_included_term = last_included_term,
350            size_bytes = snapshot.metadata.size_bytes,
351            checksum = snapshot.metadata.checksum,
352            "Created snapshot"
353        );
354
355        let metadata = snapshot.metadata.clone();
356        self.latest = Some(snapshot.metadata);
357
358        // Clean up old snapshots
359        self.cleanup_old_snapshots()?;
360
361        Ok(metadata)
362    }
363
364    /// Load the most recent snapshot from disk
365    pub fn load_latest(&self) -> RaftResult<Option<Snapshot>> {
366        let meta = match &self.latest {
367            Some(m) => m,
368            None => return Ok(None),
369        };
370
371        let data_path = self.config.snapshot_dir.join(meta.data_filename());
372        let data = fs::read(&data_path).map_err(|e| RaftError::StorageError {
373            message: format!(
374                "Failed to read snapshot data from '{}': {}",
375                data_path.display(),
376                e
377            ),
378        })?;
379
380        let snapshot = Snapshot {
381            metadata: meta.clone(),
382            data,
383        };
384
385        // Verify integrity
386        if !snapshot.verify_checksum() {
387            return Err(RaftError::StorageError {
388                message: format!(
389                    "Snapshot checksum mismatch for index {}, term {}",
390                    meta.last_included_index, meta.last_included_term
391                ),
392            });
393        }
394
395        debug!(
396            last_included_index = meta.last_included_index,
397            last_included_term = meta.last_included_term,
398            size_bytes = meta.size_bytes,
399            "Loaded latest snapshot"
400        );
401
402        Ok(Some(snapshot))
403    }
404
405    /// Check whether the log has grown enough to warrant a new snapshot
406    pub fn should_snapshot(&self, log_size: u64) -> bool {
407        if self.config.snapshot_threshold == 0 {
408            return false;
409        }
410        log_size >= self.config.snapshot_threshold
411    }
412
413    /// Get the metadata of the latest snapshot
414    pub fn get_latest_metadata(&self) -> Option<&SnapshotMetadata> {
415        self.latest.as_ref()
416    }
417
418    /// Get the last included index of the latest snapshot
419    pub fn last_included_index(&self) -> LogIndex {
420        self.latest
421            .as_ref()
422            .map(|m| m.last_included_index)
423            .unwrap_or(0)
424    }
425
426    /// Get the last included term of the latest snapshot
427    pub fn last_included_term(&self) -> Term {
428        self.latest
429            .as_ref()
430            .map(|m| m.last_included_term)
431            .unwrap_or(0)
432    }
433
434    /// Remove old snapshots, keeping only the most recent `max_snapshots`
435    pub fn cleanup_old_snapshots(&self) -> RaftResult<()> {
436        let mut snapshot_metas = self.list_all_snapshots()?;
437
438        if snapshot_metas.len() <= self.config.max_snapshots {
439            return Ok(());
440        }
441
442        // Sort by (term, index) descending so we keep the newest
443        snapshot_metas.sort_by(|a, b| {
444            (b.last_included_term, b.last_included_index)
445                .cmp(&(a.last_included_term, a.last_included_index))
446        });
447
448        // Remove excess snapshots (those beyond max_snapshots)
449        let to_remove = &snapshot_metas[self.config.max_snapshots..];
450
451        for meta in to_remove {
452            let data_path = self.config.snapshot_dir.join(meta.data_filename());
453            let meta_path = self.config.snapshot_dir.join(meta.metadata_filename());
454
455            if data_path.exists() {
456                fs::remove_file(&data_path).map_err(|e| RaftError::StorageError {
457                    message: format!(
458                        "Failed to remove old snapshot data '{}': {}",
459                        data_path.display(),
460                        e
461                    ),
462                })?;
463            }
464
465            if meta_path.exists() {
466                fs::remove_file(&meta_path).map_err(|e| RaftError::StorageError {
467                    message: format!(
468                        "Failed to remove old snapshot metadata '{}': {}",
469                        meta_path.display(),
470                        e
471                    ),
472                })?;
473            }
474
475            info!(
476                last_included_index = meta.last_included_index,
477                last_included_term = meta.last_included_term,
478                "Removed old snapshot"
479            );
480        }
481
482        Ok(())
483    }
484
485    /// List all snapshot metadata files in the snapshot directory
486    pub fn list_all_snapshots(&self) -> RaftResult<Vec<SnapshotMetadata>> {
487        let entries =
488            fs::read_dir(&self.config.snapshot_dir).map_err(|e| RaftError::StorageError {
489                message: format!(
490                    "Failed to read snapshot directory '{}': {}",
491                    self.config.snapshot_dir.display(),
492                    e
493                ),
494            })?;
495
496        let mut metas = Vec::new();
497
498        for entry in entries {
499            let entry = entry.map_err(|e| RaftError::StorageError {
500                message: format!("Failed to read directory entry: {}", e),
501            })?;
502
503            let path = entry.path();
504            if let Some(ext) = path.extension() {
505                if ext == "json" {
506                    if let Some(stem) = path.file_stem() {
507                        let stem_str = stem.to_string_lossy();
508                        if stem_str.ends_with(".meta") {
509                            match self.load_metadata_from_file(&path) {
510                                Ok(meta) => metas.push(meta),
511                                Err(e) => {
512                                    warn!(
513                                        path = %path.display(),
514                                        error = %e,
515                                        "Skipping corrupt snapshot metadata during cleanup"
516                                    );
517                                }
518                            }
519                        }
520                    }
521                }
522            }
523        }
524
525        Ok(metas)
526    }
527
528    /// Install a snapshot received from the leader via InstallSnapshot RPC
529    ///
530    /// This handles the case where a follower is too far behind and the leader
531    /// sends its snapshot directly. The follower must replace its log and state.
532    pub fn install_snapshot(&mut self, snapshot: Snapshot) -> RaftResult<SnapshotMetadata> {
533        // Verify checksum
534        if !snapshot.verify_checksum() {
535            return Err(RaftError::StorageError {
536                message: format!(
537                    "Received snapshot with invalid checksum (index={}, term={})",
538                    snapshot.metadata.last_included_index, snapshot.metadata.last_included_term
539                ),
540            });
541        }
542
543        // Check that this snapshot is newer than what we have
544        if let Some(current) = &self.latest {
545            if (
546                snapshot.metadata.last_included_term,
547                snapshot.metadata.last_included_index,
548            ) <= (current.last_included_term, current.last_included_index)
549            {
550                return Err(RaftError::StorageError {
551                    message: format!(
552                        "Received snapshot (term={}, index={}) is not newer than current (term={}, index={})",
553                        snapshot.metadata.last_included_term,
554                        snapshot.metadata.last_included_index,
555                        current.last_included_term,
556                        current.last_included_index,
557                    ),
558                });
559            }
560        }
561
562        // Atomically write to disk
563        let data_path = self
564            .config
565            .snapshot_dir
566            .join(snapshot.metadata.data_filename());
567        Self::atomic_write(&data_path, &snapshot.data)?;
568
569        let meta_path = self
570            .config
571            .snapshot_dir
572            .join(snapshot.metadata.metadata_filename());
573        let meta_json = serde_json::to_string_pretty(&snapshot.metadata).map_err(|e| {
574            RaftError::StorageError {
575                message: format!("Failed to serialize installed snapshot metadata: {}", e),
576            }
577        })?;
578        Self::atomic_write(&meta_path, meta_json.as_bytes())?;
579
580        info!(
581            last_included_index = snapshot.metadata.last_included_index,
582            last_included_term = snapshot.metadata.last_included_term,
583            size_bytes = snapshot.metadata.size_bytes,
584            "Installed snapshot from leader"
585        );
586
587        let metadata = snapshot.metadata.clone();
588        self.latest = Some(snapshot.metadata);
589
590        self.cleanup_old_snapshots()?;
591
592        Ok(metadata)
593    }
594}
595
596/// InstallSnapshot RPC request (Raft paper Section 7)
597///
598/// Sent by the leader to followers that are too far behind in the log.
599/// The snapshot is sent in chunks identified by offset and done flag.
600#[derive(Debug, Clone, PartialEq, Eq)]
601pub struct InstallSnapshotRequest {
602    /// Leader's current term
603    pub term: Term,
604    /// Leader's ID so follower can redirect clients
605    pub leader_id: NodeId,
606    /// The last included index in the snapshot
607    pub last_included_index: LogIndex,
608    /// The term of the last included index
609    pub last_included_term: Term,
610    /// Byte offset into the snapshot data for chunked transfer
611    pub offset: u64,
612    /// Raw snapshot data chunk
613    pub data: Vec<u8>,
614    /// True if this is the final chunk of the snapshot
615    pub done: bool,
616}
617
618impl InstallSnapshotRequest {
619    /// Create a new InstallSnapshot request for a complete snapshot (single chunk)
620    pub fn new_complete(
621        term: Term,
622        leader_id: NodeId,
623        last_included_index: LogIndex,
624        last_included_term: Term,
625        data: Vec<u8>,
626    ) -> Self {
627        Self {
628            term,
629            leader_id,
630            last_included_index,
631            last_included_term,
632            offset: 0,
633            data,
634            done: true,
635        }
636    }
637
638    /// Create a new InstallSnapshot request for a chunk of a snapshot
639    pub fn new_chunk(
640        term: Term,
641        leader_id: NodeId,
642        last_included_index: LogIndex,
643        last_included_term: Term,
644        offset: u64,
645        data: Vec<u8>,
646        done: bool,
647    ) -> Self {
648        Self {
649            term,
650            leader_id,
651            last_included_index,
652            last_included_term,
653            offset,
654            data,
655            done,
656        }
657    }
658
659    /// Check if this is a complete (non-chunked) snapshot transfer
660    pub fn is_complete(&self) -> bool {
661        self.offset == 0 && self.done
662    }
663}
664
665/// InstallSnapshot RPC response
666#[derive(Debug, Clone, PartialEq, Eq)]
667pub struct InstallSnapshotResponse {
668    /// Current term of the responding node, for leader to update itself
669    pub term: Term,
670}
671
672impl InstallSnapshotResponse {
673    /// Create a new InstallSnapshot response
674    pub fn new(term: Term) -> Self {
675        Self { term }
676    }
677}
678
679/// Accumulator for receiving chunked snapshot data from the leader
680pub struct SnapshotReceiver {
681    /// Expected last included index
682    last_included_index: LogIndex,
683    /// Expected last included term
684    last_included_term: Term,
685    /// Accumulated data chunks
686    data: Vec<u8>,
687    /// Next expected offset
688    next_offset: u64,
689}
690
691impl SnapshotReceiver {
692    /// Create a new snapshot receiver for an incoming snapshot
693    pub fn new(last_included_index: LogIndex, last_included_term: Term) -> Self {
694        Self {
695            last_included_index,
696            last_included_term,
697            data: Vec::new(),
698            next_offset: 0,
699        }
700    }
701
702    /// Receive a chunk of snapshot data
703    ///
704    /// Returns `Ok(Some(Snapshot))` when the final chunk is received,
705    /// `Ok(None)` while still accumulating, or `Err` on protocol violation.
706    pub fn receive_chunk(&mut self, req: &InstallSnapshotRequest) -> RaftResult<Option<Snapshot>> {
707        // Validate this chunk matches our expected snapshot
708        if req.last_included_index != self.last_included_index
709            || req.last_included_term != self.last_included_term
710        {
711            return Err(RaftError::StorageError {
712                message: format!(
713                    "Snapshot chunk mismatch: expected (index={}, term={}), got (index={}, term={})",
714                    self.last_included_index,
715                    self.last_included_term,
716                    req.last_included_index,
717                    req.last_included_term,
718                ),
719            });
720        }
721
722        // Validate offset
723        if req.offset != self.next_offset {
724            return Err(RaftError::StorageError {
725                message: format!(
726                    "Unexpected snapshot chunk offset: expected {}, got {}",
727                    self.next_offset, req.offset,
728                ),
729            });
730        }
731
732        // Append chunk data
733        self.data.extend_from_slice(&req.data);
734        self.next_offset += req.data.len() as u64;
735
736        if req.done {
737            let snapshot = Snapshot::new(
738                self.last_included_index,
739                self.last_included_term,
740                std::mem::take(&mut self.data),
741            );
742            Ok(Some(snapshot))
743        } else {
744            Ok(None)
745        }
746    }
747
748    /// Get the expected last included index
749    pub fn last_included_index(&self) -> LogIndex {
750        self.last_included_index
751    }
752
753    /// Get the expected last included term
754    pub fn last_included_term(&self) -> Term {
755        self.last_included_term
756    }
757
758    /// Get how much data has been accumulated so far
759    pub fn bytes_received(&self) -> u64 {
760        self.data.len() as u64
761    }
762}
763
764/// Trait for snapshot storage operations.
765///
766/// Provides a clean, trait-based interface for saving, loading, listing,
767/// and pruning snapshots. Implementations must be `Send + Sync`.
768pub trait SnapshotStore: Send + Sync {
769    /// Save a snapshot to storage and return the generated metadata.
770    fn save(
771        &mut self,
772        data: Vec<u8>,
773        last_included_index: LogIndex,
774        last_included_term: Term,
775    ) -> RaftResult<SnapshotMetadata>;
776
777    /// Load the most recent snapshot from storage.
778    fn load_latest(&self) -> RaftResult<Option<Snapshot>>;
779
780    /// List metadata for all stored snapshots.
781    fn list(&self) -> RaftResult<Vec<SnapshotMetadata>>;
782
783    /// Prune old snapshots, keeping only the `keep_n` most recent.
784    fn prune(&self, keep_n: usize) -> RaftResult<()>;
785}
786
787/// Disk-backed implementation of [`SnapshotStore`].
788///
789/// Wraps a [`SnapshotManager`] to provide a trait-based interface with
790/// atomic writes (write-to-temp + fsync + rename) for crash safety.
791pub struct DiskSnapshotStore {
792    manager: SnapshotManager,
793}
794
795impl DiskSnapshotStore {
796    /// Create a new disk-backed snapshot store.
797    pub fn new(config: SnapshotConfig) -> RaftResult<Self> {
798        let manager = SnapshotManager::new(config)?;
799        Ok(Self { manager })
800    }
801
802    /// Access the underlying snapshot manager.
803    pub fn manager(&self) -> &SnapshotManager {
804        &self.manager
805    }
806
807    /// Mutably access the underlying snapshot manager.
808    pub fn manager_mut(&mut self) -> &mut SnapshotManager {
809        &mut self.manager
810    }
811}
812
813impl SnapshotStore for DiskSnapshotStore {
814    fn save(
815        &mut self,
816        data: Vec<u8>,
817        last_included_index: LogIndex,
818        last_included_term: Term,
819    ) -> RaftResult<SnapshotMetadata> {
820        self.manager
821            .create_snapshot(data, last_included_index, last_included_term)
822    }
823
824    fn load_latest(&self) -> RaftResult<Option<Snapshot>> {
825        self.manager.load_latest()
826    }
827
828    fn list(&self) -> RaftResult<Vec<SnapshotMetadata>> {
829        self.manager.list_all_snapshots()
830    }
831
832    fn prune(&self, keep_n: usize) -> RaftResult<()> {
833        let mut snapshot_metas = self.manager.list_all_snapshots()?;
834
835        if snapshot_metas.len() <= keep_n {
836            return Ok(());
837        }
838
839        // Sort by (term, index) descending so we keep the newest
840        snapshot_metas.sort_by(|a, b| {
841            (b.last_included_term, b.last_included_index)
842                .cmp(&(a.last_included_term, a.last_included_index))
843        });
844
845        let to_remove = &snapshot_metas[keep_n..];
846
847        for meta in to_remove {
848            let data_path = self.manager.config.snapshot_dir.join(meta.data_filename());
849            let meta_path = self
850                .manager
851                .config
852                .snapshot_dir
853                .join(meta.metadata_filename());
854
855            if data_path.exists() {
856                fs::remove_file(&data_path).map_err(|e| RaftError::StorageError {
857                    message: format!(
858                        "Failed to remove old snapshot data '{}': {}",
859                        data_path.display(),
860                        e
861                    ),
862                })?;
863            }
864
865            if meta_path.exists() {
866                fs::remove_file(&meta_path).map_err(|e| RaftError::StorageError {
867                    message: format!(
868                        "Failed to remove old snapshot metadata '{}': {}",
869                        meta_path.display(),
870                        e
871                    ),
872                })?;
873            }
874
875            info!(
876                last_included_index = meta.last_included_index,
877                last_included_term = meta.last_included_term,
878                "Pruned old snapshot"
879            );
880        }
881
882        Ok(())
883    }
884}
885
886/// Streams a snapshot file from disk in fixed-size chunks without buffering
887/// the entire snapshot in memory.
888pub struct SnapshotStreamer {
889    path: PathBuf,
890    metadata: SnapshotMetadata,
891    chunk_size: usize,
892    offset: u64,
893    total_size: u64,
894    file: fs::File,
895}
896
897impl SnapshotStreamer {
898    /// Create a new streamer for the snapshot at `path`.
899    ///
900    /// Opens the file and records its total size. Returns an error if the
901    /// file cannot be opened or its size cannot be determined.
902    pub fn new(path: PathBuf, metadata: SnapshotMetadata, chunk_size: usize) -> RaftResult<Self> {
903        let file = fs::File::open(&path).map_err(|e| RaftError::StorageError {
904            message: format!("Failed to open snapshot file '{}': {}", path.display(), e),
905        })?;
906        let total_size = file
907            .metadata()
908            .map_err(|e| RaftError::StorageError {
909                message: format!("Failed to stat snapshot file '{}': {}", path.display(), e),
910            })?
911            .len();
912        Ok(Self {
913            path,
914            metadata,
915            chunk_size,
916            offset: 0,
917            total_size,
918            file,
919        })
920    }
921
922    /// Return the snapshot metadata.
923    pub fn metadata(&self) -> &SnapshotMetadata {
924        &self.metadata
925    }
926
927    /// Return the total size of the snapshot file in bytes.
928    pub fn total_size(&self) -> u64 {
929        self.total_size
930    }
931
932    /// Read the next chunk from the file and build an [`InstallSnapshotRequest`].
933    ///
934    /// Returns `Ok(None)` once all bytes have been streamed.
935    pub fn next_chunk_for_rpc(
936        &mut self,
937        term: Term,
938        leader_id: NodeId,
939    ) -> RaftResult<Option<InstallSnapshotRequest>> {
940        if self.offset >= self.total_size {
941            return Ok(None);
942        }
943
944        self.file
945            .seek(SeekFrom::Start(self.offset))
946            .map_err(|e| RaftError::StorageError {
947                message: format!(
948                    "Failed to seek to offset {} in '{}': {}",
949                    self.offset,
950                    self.path.display(),
951                    e
952                ),
953            })?;
954
955        let remaining = self.total_size - self.offset;
956        let to_read = remaining.min(self.chunk_size as u64) as usize;
957        let mut buf = vec![0u8; to_read];
958
959        self.file
960            .read_exact(&mut buf)
961            .map_err(|e| RaftError::StorageError {
962                message: format!(
963                    "Failed to read {} bytes at offset {} from '{}': {}",
964                    to_read,
965                    self.offset,
966                    self.path.display(),
967                    e
968                ),
969            })?;
970
971        let chunk_offset = self.offset;
972        self.offset += to_read as u64;
973        let done = self.offset >= self.total_size;
974
975        Ok(Some(InstallSnapshotRequest {
976            term,
977            leader_id,
978            last_included_index: self.metadata.last_included_index,
979            last_included_term: self.metadata.last_included_term,
980            offset: chunk_offset,
981            data: buf,
982            done,
983        }))
984    }
985}
986
987/// Receives snapshot chunks, writing directly to a temp file.
988/// On completion, verifies the CRC32 checksum and atomically renames
989/// the temp file to the final destination path.
990pub struct SnapshotStreamReceiver {
991    temp_path: PathBuf,
992    final_path: PathBuf,
993    file: fs::File,
994    next_offset: u64,
995    last_included_index: LogIndex,
996    last_included_term: Term,
997    expected_checksum: Option<u32>,
998    bytes_written: u64,
999}
1000
1001impl SnapshotStreamReceiver {
1002    /// Create a new receiver that writes chunks to a temp file in `dir`.
1003    ///
1004    /// The temp file is `snapshot-{term}-{index}.data.tmp`; on completion it
1005    /// is atomically renamed to `snapshot-{term}-{index}.data`.
1006    pub fn new(
1007        dir: &Path,
1008        last_included_index: LogIndex,
1009        last_included_term: Term,
1010    ) -> RaftResult<Self> {
1011        let temp_name = format!(
1012            "snapshot-{:016x}-{:016x}.data.tmp",
1013            last_included_term, last_included_index
1014        );
1015        let final_name = format!(
1016            "snapshot-{:016x}-{:016x}.data",
1017            last_included_term, last_included_index
1018        );
1019        let temp_path = dir.join(&temp_name);
1020        let final_path = dir.join(&final_name);
1021
1022        let file = fs::File::create(&temp_path).map_err(|e| RaftError::StorageError {
1023            message: format!(
1024                "Failed to create temp snapshot file '{}': {}",
1025                temp_path.display(),
1026                e
1027            ),
1028        })?;
1029
1030        Ok(Self {
1031            temp_path,
1032            final_path,
1033            file,
1034            next_offset: 0,
1035            last_included_index,
1036            last_included_term,
1037            expected_checksum: None,
1038            bytes_written: 0,
1039        })
1040    }
1041
1042    /// Receive a snapshot data chunk.
1043    ///
1044    /// Returns `Ok(Some(final_path))` when the last chunk (`done == true`) is
1045    /// received and the file has been verified and atomically renamed to its
1046    /// final location. Returns `Ok(None)` while still accumulating chunks.
1047    pub fn receive_chunk(&mut self, req: &InstallSnapshotRequest) -> RaftResult<Option<PathBuf>> {
1048        // Validate snapshot identity
1049        if req.last_included_index != self.last_included_index
1050            || req.last_included_term != self.last_included_term
1051        {
1052            return Err(RaftError::StorageError {
1053                message: format!(
1054                    "Snapshot identity mismatch: expected (index={}, term={}), got (index={}, term={})",
1055                    self.last_included_index,
1056                    self.last_included_term,
1057                    req.last_included_index,
1058                    req.last_included_term,
1059                ),
1060            });
1061        }
1062
1063        // Validate sequential offset
1064        if req.offset != self.next_offset {
1065            return Err(RaftError::StorageError {
1066                message: format!(
1067                    "Snapshot chunk offset mismatch: expected {}, got {}",
1068                    self.next_offset, req.offset,
1069                ),
1070            });
1071        }
1072
1073        // Write chunk to the temp file
1074        self.file
1075            .write_all(&req.data)
1076            .map_err(|e| RaftError::StorageError {
1077                message: format!(
1078                    "Failed to write snapshot chunk at offset {}: {}",
1079                    self.next_offset, e
1080                ),
1081            })?;
1082
1083        self.next_offset += req.data.len() as u64;
1084        self.bytes_written += req.data.len() as u64;
1085
1086        if !req.done {
1087            return Ok(None);
1088        }
1089
1090        // Flush so all bytes are on disk before we read back for checksum
1091        self.file.flush().map_err(|e| RaftError::StorageError {
1092            message: format!("Failed to flush snapshot temp file: {}", e),
1093        })?;
1094
1095        // Re-read the temp file in streaming fashion to compute CRC32
1096        let mut verify_file =
1097            fs::File::open(&self.temp_path).map_err(|e| RaftError::StorageError {
1098                message: format!(
1099                    "Failed to open temp file '{}' for checksum verification: {}",
1100                    self.temp_path.display(),
1101                    e
1102                ),
1103            })?;
1104
1105        let mut hasher = crc32fast::Hasher::new();
1106        let mut read_buf = vec![0u8; 65536]; // 64 KiB read buffer
1107        loop {
1108            let n = verify_file
1109                .read(&mut read_buf)
1110                .map_err(|e| RaftError::StorageError {
1111                    message: format!("Failed to read temp file for checksum verification: {}", e),
1112                })?;
1113            if n == 0 {
1114                break;
1115            }
1116            hasher.update(&read_buf[..n]);
1117        }
1118        let computed_checksum = hasher.finalize();
1119
1120        // Verify against expected checksum when known
1121        if let Some(expected) = self.expected_checksum {
1122            if computed_checksum != expected {
1123                return Err(RaftError::StorageError {
1124                    message: format!(
1125                        "Snapshot CRC32 mismatch: expected {:#010x}, computed {:#010x}",
1126                        expected, computed_checksum
1127                    ),
1128                });
1129            }
1130        }
1131
1132        // Atomically rename temp → final
1133        fs::rename(&self.temp_path, &self.final_path).map_err(|e| RaftError::StorageError {
1134            message: format!(
1135                "Failed to rename '{}' to '{}': {}",
1136                self.temp_path.display(),
1137                self.final_path.display(),
1138                e
1139            ),
1140        })?;
1141
1142        info!(
1143            last_included_index = self.last_included_index,
1144            last_included_term = self.last_included_term,
1145            bytes_written = self.bytes_written,
1146            checksum = computed_checksum,
1147            "Snapshot stream received and finalized"
1148        );
1149
1150        Ok(Some(self.final_path.clone()))
1151    }
1152
1153    /// Return the number of bytes written to the temp file so far.
1154    pub fn bytes_written(&self) -> u64 {
1155        self.bytes_written
1156    }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161    use super::*;
1162
1163    fn test_snapshot_dir() -> tempfile::TempDir {
1164        tempfile::TempDir::new().expect("Failed to create temp dir for snapshot tests")
1165    }
1166
1167    fn make_config(dir: &Path) -> SnapshotConfig {
1168        SnapshotConfig::new(dir.to_path_buf(), 3, 100)
1169    }
1170
1171    #[test]
1172    fn test_snapshot_creation() {
1173        let dir = test_snapshot_dir();
1174        let config = make_config(dir.path());
1175        let mut manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1176
1177        let data = b"state machine data v1".to_vec();
1178        let meta = manager
1179            .create_snapshot(data.clone(), 50, 3)
1180            .expect("Failed to create snapshot");
1181
1182        assert_eq!(meta.last_included_index, 50);
1183        assert_eq!(meta.last_included_term, 3);
1184        assert_eq!(meta.size_bytes, data.len() as u64);
1185        assert_eq!(meta.checksum, crc32fast::hash(&data));
1186    }
1187
1188    #[test]
1189    fn test_snapshot_load_latest() {
1190        let dir = test_snapshot_dir();
1191        let config = make_config(dir.path());
1192        let mut manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1193
1194        let data = b"state machine snapshot data".to_vec();
1195        manager
1196            .create_snapshot(data.clone(), 100, 5)
1197            .expect("Failed to create snapshot");
1198
1199        let loaded = manager
1200            .load_latest()
1201            .expect("Failed to load latest snapshot");
1202        let loaded = loaded.expect("Expected a snapshot to exist");
1203
1204        assert_eq!(loaded.metadata.last_included_index, 100);
1205        assert_eq!(loaded.metadata.last_included_term, 5);
1206        assert_eq!(loaded.data, data);
1207        assert!(loaded.verify_checksum());
1208    }
1209
1210    #[test]
1211    fn test_snapshot_load_latest_empty() {
1212        let dir = test_snapshot_dir();
1213        let config = make_config(dir.path());
1214        let manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1215
1216        let loaded = manager
1217            .load_latest()
1218            .expect("Failed to load latest snapshot");
1219        assert!(loaded.is_none());
1220    }
1221
1222    #[test]
1223    fn test_snapshot_cleanup_old() {
1224        let dir = test_snapshot_dir();
1225        // max_snapshots = 2 for this test
1226        let config = SnapshotConfig::new(dir.path().to_path_buf(), 2, 100);
1227        let mut manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1228
1229        // Create 4 snapshots
1230        manager
1231            .create_snapshot(b"snap1".to_vec(), 10, 1)
1232            .expect("Failed to create snapshot 1");
1233        manager
1234            .create_snapshot(b"snap2".to_vec(), 20, 2)
1235            .expect("Failed to create snapshot 2");
1236        manager
1237            .create_snapshot(b"snap3".to_vec(), 30, 3)
1238            .expect("Failed to create snapshot 3");
1239        manager
1240            .create_snapshot(b"snap4".to_vec(), 40, 4)
1241            .expect("Failed to create snapshot 4");
1242
1243        // Should only have 2 snapshots remaining
1244        let all = manager
1245            .list_all_snapshots()
1246            .expect("Failed to list snapshots");
1247        assert_eq!(all.len(), 2);
1248
1249        // The two newest should remain
1250        let mut indices: Vec<u64> = all.iter().map(|m| m.last_included_index).collect();
1251        indices.sort();
1252        assert_eq!(indices, vec![30, 40]);
1253    }
1254
1255    #[test]
1256    fn test_snapshot_threshold_trigger() {
1257        let dir = test_snapshot_dir();
1258        let config = SnapshotConfig::new(dir.path().to_path_buf(), 3, 500);
1259        let manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1260
1261        assert!(!manager.should_snapshot(100));
1262        assert!(!manager.should_snapshot(499));
1263        assert!(manager.should_snapshot(500));
1264        assert!(manager.should_snapshot(1000));
1265    }
1266
1267    #[test]
1268    fn test_snapshot_threshold_zero_disabled() {
1269        let dir = test_snapshot_dir();
1270        let config = SnapshotConfig::new(dir.path().to_path_buf(), 3, 0);
1271        let manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1272
1273        assert!(!manager.should_snapshot(0));
1274        assert!(!manager.should_snapshot(999_999));
1275    }
1276
1277    #[test]
1278    fn test_snapshot_metadata_serialization() {
1279        let meta = SnapshotMetadata::new(42, 7, 1024, 0xDEAD_BEEF);
1280
1281        let json = serde_json::to_string(&meta).expect("Failed to serialize metadata");
1282        let deserialized: SnapshotMetadata =
1283            serde_json::from_str(&json).expect("Failed to deserialize metadata");
1284
1285        assert_eq!(deserialized.last_included_index, 42);
1286        assert_eq!(deserialized.last_included_term, 7);
1287        assert_eq!(deserialized.size_bytes, 1024);
1288        assert_eq!(deserialized.checksum, 0xDEAD_BEEF);
1289        assert_eq!(deserialized.created_at, meta.created_at);
1290    }
1291
1292    #[test]
1293    fn test_snapshot_checksum_verification() {
1294        let data = b"important state data".to_vec();
1295        let snapshot = Snapshot::new(10, 2, data);
1296        assert!(snapshot.verify_checksum());
1297
1298        // Tamper with data
1299        let mut tampered = snapshot.clone();
1300        if let Some(byte) = tampered.data.first_mut() {
1301            *byte ^= 0xFF;
1302        }
1303        assert!(!tampered.verify_checksum());
1304    }
1305
1306    #[test]
1307    fn test_install_snapshot_request_complete() {
1308        let req = InstallSnapshotRequest::new_complete(5, 1, 100, 3, b"data".to_vec());
1309        assert_eq!(req.term, 5);
1310        assert_eq!(req.leader_id, 1);
1311        assert_eq!(req.last_included_index, 100);
1312        assert_eq!(req.last_included_term, 3);
1313        assert_eq!(req.offset, 0);
1314        assert!(req.done);
1315        assert!(req.is_complete());
1316    }
1317
1318    #[test]
1319    fn test_install_snapshot_request_chunk() {
1320        let req = InstallSnapshotRequest::new_chunk(5, 1, 100, 3, 512, b"chunk2".to_vec(), false);
1321        assert_eq!(req.offset, 512);
1322        assert!(!req.done);
1323        assert!(!req.is_complete());
1324    }
1325
1326    #[test]
1327    fn test_install_snapshot_response() {
1328        let resp = InstallSnapshotResponse::new(7);
1329        assert_eq!(resp.term, 7);
1330    }
1331
1332    #[test]
1333    fn test_snapshot_receiver_single_chunk() {
1334        let mut receiver = SnapshotReceiver::new(50, 3);
1335
1336        let req = InstallSnapshotRequest::new_complete(5, 1, 50, 3, b"full data".to_vec());
1337
1338        let result = receiver
1339            .receive_chunk(&req)
1340            .expect("Failed to receive chunk");
1341        let snapshot = result.expect("Expected completed snapshot");
1342
1343        assert_eq!(snapshot.metadata.last_included_index, 50);
1344        assert_eq!(snapshot.metadata.last_included_term, 3);
1345        assert_eq!(snapshot.data, b"full data");
1346        assert!(snapshot.verify_checksum());
1347    }
1348
1349    #[test]
1350    fn test_snapshot_receiver_multi_chunk() {
1351        let mut receiver = SnapshotReceiver::new(100, 5);
1352
1353        // First chunk
1354        let req1 = InstallSnapshotRequest::new_chunk(5, 1, 100, 5, 0, b"hello".to_vec(), false);
1355        let result1 = receiver
1356            .receive_chunk(&req1)
1357            .expect("Failed to receive chunk 1");
1358        assert!(result1.is_none());
1359        assert_eq!(receiver.bytes_received(), 5);
1360
1361        // Second chunk
1362        let req2 = InstallSnapshotRequest::new_chunk(5, 1, 100, 5, 5, b" world".to_vec(), true);
1363        let result2 = receiver
1364            .receive_chunk(&req2)
1365            .expect("Failed to receive chunk 2");
1366        let snapshot = result2.expect("Expected completed snapshot");
1367
1368        assert_eq!(snapshot.data, b"hello world");
1369        assert!(snapshot.verify_checksum());
1370    }
1371
1372    #[test]
1373    fn test_snapshot_receiver_wrong_offset() {
1374        let mut receiver = SnapshotReceiver::new(50, 3);
1375
1376        let req = InstallSnapshotRequest::new_chunk(5, 1, 50, 3, 999, b"bad".to_vec(), false);
1377
1378        let result = receiver.receive_chunk(&req);
1379        assert!(result.is_err());
1380    }
1381
1382    #[test]
1383    fn test_snapshot_receiver_mismatched_snapshot() {
1384        let mut receiver = SnapshotReceiver::new(50, 3);
1385
1386        // Different index than expected
1387        let req = InstallSnapshotRequest::new_complete(5, 1, 99, 3, b"wrong snapshot".to_vec());
1388
1389        let result = receiver.receive_chunk(&req);
1390        assert!(result.is_err());
1391    }
1392
1393    #[test]
1394    fn test_install_snapshot_to_manager() {
1395        let dir = test_snapshot_dir();
1396        let config = make_config(dir.path());
1397        let mut manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1398
1399        let data = b"installed snapshot data".to_vec();
1400        let snapshot = Snapshot::new(200, 10, data.clone());
1401
1402        let meta = manager
1403            .install_snapshot(snapshot)
1404            .expect("Failed to install snapshot");
1405        assert_eq!(meta.last_included_index, 200);
1406        assert_eq!(meta.last_included_term, 10);
1407
1408        // Verify we can load it
1409        let loaded = manager
1410            .load_latest()
1411            .expect("Failed to load")
1412            .expect("Expected snapshot");
1413        assert_eq!(loaded.data, data);
1414    }
1415
1416    #[test]
1417    fn test_install_older_snapshot_rejected() {
1418        let dir = test_snapshot_dir();
1419        let config = make_config(dir.path());
1420        let mut manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1421
1422        // Create a newer snapshot first
1423        manager
1424            .create_snapshot(b"newer".to_vec(), 100, 5)
1425            .expect("Failed to create snapshot");
1426
1427        // Try to install an older one
1428        let old_snapshot = Snapshot::new(50, 3, b"older".to_vec());
1429        let result = manager.install_snapshot(old_snapshot);
1430        assert!(result.is_err());
1431    }
1432
1433    #[test]
1434    fn test_snapshot_persistence_across_managers() {
1435        let dir = test_snapshot_dir();
1436        let config = make_config(dir.path());
1437
1438        // Create snapshot with first manager
1439        {
1440            let mut manager =
1441                SnapshotManager::new(config.clone()).expect("Failed to create manager 1");
1442            manager
1443                .create_snapshot(b"persisted data".to_vec(), 75, 4)
1444                .expect("Failed to create snapshot");
1445        }
1446
1447        // Load with second manager (simulating restart)
1448        {
1449            let manager = SnapshotManager::new(config).expect("Failed to create manager 2");
1450            let latest = manager.get_latest_metadata();
1451            assert!(latest.is_some());
1452            let meta = latest.expect("Expected metadata");
1453            assert_eq!(meta.last_included_index, 75);
1454            assert_eq!(meta.last_included_term, 4);
1455
1456            let snapshot = manager
1457                .load_latest()
1458                .expect("Failed to load")
1459                .expect("Expected snapshot");
1460            assert_eq!(snapshot.data, b"persisted data");
1461        }
1462    }
1463
1464    #[test]
1465    fn test_snapshot_config_with_defaults() {
1466        let config = SnapshotConfig::with_defaults(PathBuf::from("/tmp/test"));
1467        assert_eq!(config.max_snapshots, 3);
1468        assert_eq!(config.snapshot_threshold, 10000);
1469    }
1470
1471    #[test]
1472    fn test_snapshot_policy_should_trigger() {
1473        let policy = SnapshotPolicy::new(100);
1474        assert!(!policy.should_snapshot(50, 50)); // below threshold
1475        assert!(!policy.should_snapshot(99, 99)); // just below
1476        assert!(policy.should_snapshot(100, 100)); // at threshold
1477        assert!(policy.should_snapshot(200, 200)); // above threshold
1478    }
1479
1480    #[test]
1481    fn test_snapshot_policy_disabled() {
1482        let policy = SnapshotPolicy::disabled();
1483        assert!(!policy.should_snapshot(10000, 10000));
1484    }
1485
1486    #[test]
1487    fn test_snapshot_policy_min_applied() {
1488        let policy = SnapshotPolicy::new(10).with_min_applied(50);
1489        assert!(!policy.should_snapshot(20, 30)); // enough entries but not enough applied
1490        assert!(policy.should_snapshot(20, 50)); // enough of both
1491    }
1492
1493    #[test]
1494    fn test_snapshot_policy_default() {
1495        let policy = SnapshotPolicy::default();
1496        assert_eq!(policy.max_log_entries, 10_000);
1497        assert!(!policy.should_snapshot(9_999, 9_999));
1498        assert!(policy.should_snapshot(10_000, 10_000));
1499    }
1500
1501    // --- Atomic write and DiskSnapshotStore tests ---
1502
1503    #[test]
1504    fn test_atomic_write_creates_file() {
1505        let dir = test_snapshot_dir();
1506        let file_path = dir.path().join("atomic_test.data");
1507        let content = b"atomic write content";
1508
1509        SnapshotManager::atomic_write(&file_path, content).expect("atomic_write should succeed");
1510
1511        let read_back = fs::read(&file_path).expect("File should exist");
1512        assert_eq!(read_back, content);
1513    }
1514
1515    #[test]
1516    fn test_atomic_write_no_tmp_left() {
1517        let dir = test_snapshot_dir();
1518        let file_path = dir.path().join("no_tmp_left.data");
1519
1520        SnapshotManager::atomic_write(&file_path, b"data").expect("atomic_write should succeed");
1521
1522        let tmp_path = file_path.with_extension("data.tmp");
1523        assert!(
1524            !tmp_path.exists(),
1525            "Temp file should not remain after atomic write"
1526        );
1527    }
1528
1529    #[test]
1530    fn test_snapshot_store_save_and_load() {
1531        let dir = test_snapshot_dir();
1532        let config = make_config(dir.path());
1533        let mut store = DiskSnapshotStore::new(config).expect("Failed to create DiskSnapshotStore");
1534
1535        let data = b"disk snapshot store data".to_vec();
1536        let meta = store
1537            .save(data.clone(), 100, 5)
1538            .expect("Failed to save snapshot");
1539
1540        assert_eq!(meta.last_included_index, 100);
1541        assert_eq!(meta.last_included_term, 5);
1542        assert_eq!(meta.size_bytes, data.len() as u64);
1543
1544        let loaded = store
1545            .load_latest()
1546            .expect("Failed to load latest")
1547            .expect("Expected a snapshot");
1548        assert_eq!(loaded.data, data);
1549        assert!(loaded.verify_checksum());
1550    }
1551
1552    #[test]
1553    fn test_snapshot_store_list() {
1554        let dir = test_snapshot_dir();
1555        let config = SnapshotConfig::new(dir.path().to_path_buf(), 10, 100);
1556        let mut store = DiskSnapshotStore::new(config).expect("Failed to create DiskSnapshotStore");
1557
1558        store.save(b"snap1".to_vec(), 10, 1).expect("save 1 failed");
1559        store.save(b"snap2".to_vec(), 20, 2).expect("save 2 failed");
1560        store.save(b"snap3".to_vec(), 30, 3).expect("save 3 failed");
1561
1562        let list = store.list().expect("list failed");
1563        assert_eq!(list.len(), 3);
1564
1565        let mut indices: Vec<u64> = list.iter().map(|m| m.last_included_index).collect();
1566        indices.sort();
1567        assert_eq!(indices, vec![10, 20, 30]);
1568    }
1569
1570    #[test]
1571    fn test_snapshot_store_prune() {
1572        let dir = test_snapshot_dir();
1573        // max_snapshots high so auto-cleanup doesn't kick in
1574        let config = SnapshotConfig::new(dir.path().to_path_buf(), 10, 100);
1575        let mut store = DiskSnapshotStore::new(config).expect("Failed to create DiskSnapshotStore");
1576
1577        // Create 5 snapshots
1578        for i in 1..=5 {
1579            store
1580                .save(format!("snap{}", i).into_bytes(), i * 10, i)
1581                .expect("save failed");
1582        }
1583
1584        assert_eq!(store.list().expect("list failed").len(), 5);
1585
1586        // Prune to keep only 2
1587        store.prune(2).expect("prune failed");
1588
1589        let remaining = store.list().expect("list failed");
1590        assert_eq!(remaining.len(), 2);
1591
1592        // The two newest (term=5/index=50, term=4/index=40) should remain
1593        let mut indices: Vec<u64> = remaining.iter().map(|m| m.last_included_index).collect();
1594        indices.sort();
1595        assert_eq!(indices, vec![40, 50]);
1596    }
1597
1598    // --- SnapshotStreamer / SnapshotStreamReceiver tests ---
1599
1600    #[test]
1601    fn test_streamer_chunks_correctly() {
1602        let dir = test_snapshot_dir();
1603
1604        // ~2 MiB of deterministic data → exactly 4 × 512 KiB chunks
1605        let total_size: usize = 2 * 1024 * 1024;
1606        let original_data: Vec<u8> = (0..total_size).map(|i| (i % 256) as u8).collect();
1607        let checksum = crc32fast::hash(&original_data);
1608        let metadata = SnapshotMetadata::new(42, 7, total_size as u64, checksum);
1609
1610        let snap_path = dir.path().join(metadata.data_filename());
1611        fs::write(&snap_path, &original_data).expect("Failed to write snapshot file");
1612
1613        let chunk_size = 512 * 1024; // 512 KiB
1614        let mut streamer = SnapshotStreamer::new(snap_path, metadata, chunk_size)
1615            .expect("Failed to create SnapshotStreamer");
1616
1617        assert_eq!(streamer.total_size(), total_size as u64);
1618
1619        let mut reconstructed = Vec::new();
1620        let mut chunk_count = 0usize;
1621        let mut last_done = false;
1622
1623        while let Some(req) = streamer
1624            .next_chunk_for_rpc(5, 1)
1625            .expect("next_chunk_for_rpc failed")
1626        {
1627            assert_eq!(req.last_included_index, 42);
1628            assert_eq!(req.last_included_term, 7);
1629            reconstructed.extend_from_slice(&req.data);
1630            chunk_count += 1;
1631            last_done = req.done;
1632        }
1633
1634        assert!(last_done, "Final chunk must have done=true");
1635        assert_eq!(chunk_count, 4, "2 MiB / 512 KiB = 4 chunks");
1636        assert_eq!(
1637            reconstructed, original_data,
1638            "Reconstructed data must match original"
1639        );
1640    }
1641
1642    #[test]
1643    fn test_stream_receiver_writes_to_disk() {
1644        let dir = test_snapshot_dir();
1645
1646        let last_included_index: LogIndex = 100;
1647        let last_included_term: Term = 5;
1648
1649        let mut receiver =
1650            SnapshotStreamReceiver::new(dir.path(), last_included_index, last_included_term)
1651                .expect("Failed to create SnapshotStreamReceiver");
1652
1653        let chunk1 = b"chunk one data--".to_vec();
1654        let chunk2 = b"chunk two data--".to_vec();
1655        let chunk3 = b"chunk three data".to_vec();
1656
1657        let req1 = InstallSnapshotRequest::new_chunk(5, 1, 100, 5, 0, chunk1.clone(), false);
1658        let result1 = receiver
1659            .receive_chunk(&req1)
1660            .expect("receive chunk 1 failed");
1661        assert!(result1.is_none());
1662        assert_eq!(receiver.bytes_written(), chunk1.len() as u64);
1663
1664        let offset2 = chunk1.len() as u64;
1665        let req2 = InstallSnapshotRequest::new_chunk(5, 1, 100, 5, offset2, chunk2.clone(), false);
1666        let result2 = receiver
1667            .receive_chunk(&req2)
1668            .expect("receive chunk 2 failed");
1669        assert!(result2.is_none());
1670
1671        let offset3 = offset2 + chunk2.len() as u64;
1672        let req3 = InstallSnapshotRequest::new_chunk(5, 1, 100, 5, offset3, chunk3.clone(), true);
1673        let result3 = receiver
1674            .receive_chunk(&req3)
1675            .expect("receive chunk 3 failed");
1676        let final_path = result3.expect("Expected final path on done=true");
1677
1678        assert!(final_path.exists(), "Final snapshot file must exist");
1679
1680        let written = fs::read(&final_path).expect("Failed to read final snapshot file");
1681        let expected: Vec<u8> = [chunk1, chunk2, chunk3].concat();
1682        assert_eq!(written, expected, "Written data must match sent chunks");
1683        assert_eq!(receiver.bytes_written(), expected.len() as u64);
1684    }
1685
1686    #[test]
1687    fn test_streamer_and_receiver_roundtrip() {
1688        let dir = test_snapshot_dir();
1689
1690        let src_dir = dir.path().join("src");
1691        let dst_dir = dir.path().join("dst");
1692        fs::create_dir_all(&src_dir).expect("Failed to create src dir");
1693        fs::create_dir_all(&dst_dir).expect("Failed to create dst dir");
1694
1695        // 1.5 MiB = 3 × 512 KiB chunks of pseudo-random bytes
1696        let total_size: usize = 3 * 512 * 1024;
1697        let original_data: Vec<u8> = (0..total_size)
1698            .map(|i| ((i.wrapping_mul(7).wrapping_add(i / 256)) % 256) as u8)
1699            .collect();
1700
1701        let last_included_index: LogIndex = 250;
1702        let last_included_term: Term = 12;
1703        let checksum = crc32fast::hash(&original_data);
1704        let metadata = SnapshotMetadata::new(
1705            last_included_index,
1706            last_included_term,
1707            total_size as u64,
1708            checksum,
1709        );
1710
1711        let snap_path = src_dir.join(metadata.data_filename());
1712        fs::write(&snap_path, &original_data).expect("Failed to write source snapshot");
1713
1714        let chunk_size = 512 * 1024; // exactly 3 chunks
1715        let mut streamer = SnapshotStreamer::new(snap_path, metadata, chunk_size)
1716            .expect("Failed to create SnapshotStreamer");
1717        let mut receiver =
1718            SnapshotStreamReceiver::new(&dst_dir, last_included_index, last_included_term)
1719                .expect("Failed to create SnapshotStreamReceiver");
1720
1721        let mut final_path: Option<PathBuf> = None;
1722        while let Some(req) = streamer.next_chunk_for_rpc(15, 1).expect("Streamer error") {
1723            if let Some(path) = receiver.receive_chunk(&req).expect("Receiver error") {
1724                final_path = Some(path);
1725                break;
1726            }
1727        }
1728
1729        let final_path = final_path.expect("Round-trip must complete");
1730        assert!(final_path.exists(), "Final snapshot file must exist");
1731
1732        let received_data = fs::read(&final_path).expect("Failed to read final snapshot");
1733        assert_eq!(
1734            received_data, original_data,
1735            "Round-trip data must match original"
1736        );
1737        assert_eq!(receiver.bytes_written(), total_size as u64);
1738    }
1739}