1use crate::error::{RaftError, RaftResult};
9use crate::types::{LogIndex, NodeId, Term};
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::fs;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::{Path, PathBuf};
15use tracing::{debug, info, warn};
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
19pub struct SnapshotMetadata {
20 pub last_included_index: LogIndex,
22 pub last_included_term: Term,
24 pub created_at: DateTime<Utc>,
26 pub size_bytes: u64,
28 pub checksum: u32,
30}
31
32impl SnapshotMetadata {
33 pub fn new(
35 last_included_index: LogIndex,
36 last_included_term: Term,
37 size_bytes: u64,
38 checksum: u32,
39 ) -> Self {
40 Self {
41 last_included_index,
42 last_included_term,
43 created_at: Utc::now(),
44 size_bytes,
45 checksum,
46 }
47 }
48
49 pub(crate) fn metadata_filename(&self) -> String {
51 format!(
52 "snapshot-{:016x}-{:016x}.meta.json",
53 self.last_included_term, self.last_included_index
54 )
55 }
56
57 pub(crate) fn data_filename(&self) -> String {
59 format!(
60 "snapshot-{:016x}-{:016x}.data",
61 self.last_included_term, self.last_included_index
62 )
63 }
64}
65
66#[derive(Debug, Clone)]
68pub struct Snapshot {
69 pub metadata: SnapshotMetadata,
71 pub data: Vec<u8>,
73}
74
75impl Snapshot {
76 pub fn new(last_included_index: LogIndex, last_included_term: Term, data: Vec<u8>) -> Self {
78 let checksum = crc32fast::hash(&data);
79 let size_bytes = data.len() as u64;
80 let metadata = SnapshotMetadata::new(
81 last_included_index,
82 last_included_term,
83 size_bytes,
84 checksum,
85 );
86 Self { metadata, data }
87 }
88
89 pub fn verify_checksum(&self) -> bool {
91 let computed = crc32fast::hash(&self.data);
92 computed == self.metadata.checksum
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct SnapshotConfig {
99 pub snapshot_dir: PathBuf,
101 pub max_snapshots: usize,
103 pub snapshot_threshold: u64,
105}
106
107impl SnapshotConfig {
108 pub fn new(snapshot_dir: PathBuf, max_snapshots: usize, snapshot_threshold: u64) -> Self {
110 Self {
111 snapshot_dir,
112 max_snapshots,
113 snapshot_threshold,
114 }
115 }
116
117 pub fn with_defaults(snapshot_dir: PathBuf) -> Self {
119 Self {
120 snapshot_dir,
121 max_snapshots: 3,
122 snapshot_threshold: 10000,
123 }
124 }
125}
126
127#[derive(Debug, Clone)]
133pub struct SnapshotPolicy {
134 pub max_log_entries: u64,
137 pub min_applied_before_snapshot: u64,
140}
141
142impl SnapshotPolicy {
143 pub fn new(max_log_entries: u64) -> Self {
145 Self {
146 max_log_entries,
147 min_applied_before_snapshot: 0,
148 }
149 }
150
151 pub fn disabled() -> Self {
153 Self {
154 max_log_entries: 0,
155 min_applied_before_snapshot: 0,
156 }
157 }
158
159 pub fn with_min_applied(mut self, min: u64) -> Self {
161 self.min_applied_before_snapshot = min;
162 self
163 }
164
165 pub fn should_snapshot(&self, entries_since_snapshot: u64, applied_index: u64) -> bool {
172 if self.max_log_entries == 0 {
173 return false;
174 }
175 if applied_index < self.min_applied_before_snapshot {
176 return false;
177 }
178 entries_since_snapshot >= self.max_log_entries
179 }
180}
181
182impl Default for SnapshotPolicy {
183 fn default() -> Self {
184 Self::new(10_000)
185 }
186}
187
188pub struct SnapshotManager {
190 pub(crate) config: SnapshotConfig,
192 latest: Option<SnapshotMetadata>,
194}
195
196impl SnapshotManager {
197 pub fn new(config: SnapshotConfig) -> RaftResult<Self> {
201 fs::create_dir_all(&config.snapshot_dir).map_err(|e| RaftError::StorageError {
203 message: format!(
204 "Failed to create snapshot directory '{}': {}",
205 config.snapshot_dir.display(),
206 e
207 ),
208 })?;
209
210 let mut manager = Self {
211 config,
212 latest: None,
213 };
214
215 manager.scan_existing_snapshots()?;
217
218 Ok(manager)
219 }
220
221 fn scan_existing_snapshots(&mut self) -> RaftResult<()> {
223 let entries =
224 fs::read_dir(&self.config.snapshot_dir).map_err(|e| RaftError::StorageError {
225 message: format!(
226 "Failed to read snapshot directory '{}': {}",
227 self.config.snapshot_dir.display(),
228 e
229 ),
230 })?;
231
232 let mut best: Option<SnapshotMetadata> = None;
233
234 for entry in entries {
235 let entry = entry.map_err(|e| RaftError::StorageError {
236 message: format!("Failed to read directory entry: {}", e),
237 })?;
238
239 let path = entry.path();
240 if let Some(ext) = path.extension() {
241 if ext == "json" {
243 if let Some(stem) = path.file_stem() {
244 let stem_str = stem.to_string_lossy();
245 if stem_str.ends_with(".meta") {
246 match self.load_metadata_from_file(&path) {
247 Ok(meta) => {
248 let dominated = best.as_ref().is_some_and(|b| {
249 (b.last_included_term, b.last_included_index)
250 >= (meta.last_included_term, meta.last_included_index)
251 });
252 if !dominated {
253 best = Some(meta);
254 }
255 }
256 Err(e) => {
257 warn!(
258 path = %path.display(),
259 error = %e,
260 "Skipping corrupt snapshot metadata"
261 );
262 }
263 }
264 }
265 }
266 }
267 }
268 }
269
270 self.latest = best;
271 Ok(())
272 }
273
274 fn load_metadata_from_file(&self, path: &Path) -> RaftResult<SnapshotMetadata> {
276 let contents = fs::read_to_string(path).map_err(|e| RaftError::StorageError {
277 message: format!("Failed to read metadata file '{}': {}", path.display(), e),
278 })?;
279
280 serde_json::from_str(&contents).map_err(|e| RaftError::StorageError {
281 message: format!("Failed to parse metadata file '{}': {}", path.display(), e),
282 })
283 }
284
285 fn atomic_write(final_path: &Path, data: &[u8]) -> RaftResult<()> {
290 let ext = final_path
291 .extension()
292 .map(|e| e.to_string_lossy())
293 .unwrap_or_default();
294 let tmp_path = final_path.with_extension(format!("{}.tmp", ext));
295 let mut f = fs::File::create(&tmp_path).map_err(|e| RaftError::StorageError {
296 message: format!("Failed to create temp file '{}': {}", tmp_path.display(), e),
297 })?;
298 f.write_all(data).map_err(|e| RaftError::StorageError {
299 message: format!("Failed to write temp file '{}': {}", tmp_path.display(), e),
300 })?;
301 f.sync_all().map_err(|e| RaftError::StorageError {
302 message: format!("Failed to fsync temp file '{}': {}", tmp_path.display(), e),
303 })?;
304 fs::rename(&tmp_path, final_path).map_err(|e| RaftError::StorageError {
305 message: format!(
306 "Failed to rename '{}' to '{}': {}",
307 tmp_path.display(),
308 final_path.display(),
309 e
310 ),
311 })?;
312 Ok(())
313 }
314
315 pub fn create_snapshot(
321 &mut self,
322 data: Vec<u8>,
323 last_included_index: LogIndex,
324 last_included_term: Term,
325 ) -> RaftResult<SnapshotMetadata> {
326 let snapshot = Snapshot::new(last_included_index, last_included_term, data);
327
328 let data_path = self
330 .config
331 .snapshot_dir
332 .join(snapshot.metadata.data_filename());
333 Self::atomic_write(&data_path, &snapshot.data)?;
334
335 let meta_path = self
337 .config
338 .snapshot_dir
339 .join(snapshot.metadata.metadata_filename());
340 let meta_json = serde_json::to_string_pretty(&snapshot.metadata).map_err(|e| {
341 RaftError::StorageError {
342 message: format!("Failed to serialize snapshot metadata: {}", e),
343 }
344 })?;
345 Self::atomic_write(&meta_path, meta_json.as_bytes())?;
346
347 info!(
348 last_included_index = last_included_index,
349 last_included_term = last_included_term,
350 size_bytes = snapshot.metadata.size_bytes,
351 checksum = snapshot.metadata.checksum,
352 "Created snapshot"
353 );
354
355 let metadata = snapshot.metadata.clone();
356 self.latest = Some(snapshot.metadata);
357
358 self.cleanup_old_snapshots()?;
360
361 Ok(metadata)
362 }
363
364 pub fn load_latest(&self) -> RaftResult<Option<Snapshot>> {
366 let meta = match &self.latest {
367 Some(m) => m,
368 None => return Ok(None),
369 };
370
371 let data_path = self.config.snapshot_dir.join(meta.data_filename());
372 let data = fs::read(&data_path).map_err(|e| RaftError::StorageError {
373 message: format!(
374 "Failed to read snapshot data from '{}': {}",
375 data_path.display(),
376 e
377 ),
378 })?;
379
380 let snapshot = Snapshot {
381 metadata: meta.clone(),
382 data,
383 };
384
385 if !snapshot.verify_checksum() {
387 return Err(RaftError::StorageError {
388 message: format!(
389 "Snapshot checksum mismatch for index {}, term {}",
390 meta.last_included_index, meta.last_included_term
391 ),
392 });
393 }
394
395 debug!(
396 last_included_index = meta.last_included_index,
397 last_included_term = meta.last_included_term,
398 size_bytes = meta.size_bytes,
399 "Loaded latest snapshot"
400 );
401
402 Ok(Some(snapshot))
403 }
404
405 pub fn should_snapshot(&self, log_size: u64) -> bool {
407 if self.config.snapshot_threshold == 0 {
408 return false;
409 }
410 log_size >= self.config.snapshot_threshold
411 }
412
413 pub fn get_latest_metadata(&self) -> Option<&SnapshotMetadata> {
415 self.latest.as_ref()
416 }
417
418 pub fn last_included_index(&self) -> LogIndex {
420 self.latest
421 .as_ref()
422 .map(|m| m.last_included_index)
423 .unwrap_or(0)
424 }
425
426 pub fn last_included_term(&self) -> Term {
428 self.latest
429 .as_ref()
430 .map(|m| m.last_included_term)
431 .unwrap_or(0)
432 }
433
434 pub fn cleanup_old_snapshots(&self) -> RaftResult<()> {
436 let mut snapshot_metas = self.list_all_snapshots()?;
437
438 if snapshot_metas.len() <= self.config.max_snapshots {
439 return Ok(());
440 }
441
442 snapshot_metas.sort_by(|a, b| {
444 (b.last_included_term, b.last_included_index)
445 .cmp(&(a.last_included_term, a.last_included_index))
446 });
447
448 let to_remove = &snapshot_metas[self.config.max_snapshots..];
450
451 for meta in to_remove {
452 let data_path = self.config.snapshot_dir.join(meta.data_filename());
453 let meta_path = self.config.snapshot_dir.join(meta.metadata_filename());
454
455 if data_path.exists() {
456 fs::remove_file(&data_path).map_err(|e| RaftError::StorageError {
457 message: format!(
458 "Failed to remove old snapshot data '{}': {}",
459 data_path.display(),
460 e
461 ),
462 })?;
463 }
464
465 if meta_path.exists() {
466 fs::remove_file(&meta_path).map_err(|e| RaftError::StorageError {
467 message: format!(
468 "Failed to remove old snapshot metadata '{}': {}",
469 meta_path.display(),
470 e
471 ),
472 })?;
473 }
474
475 info!(
476 last_included_index = meta.last_included_index,
477 last_included_term = meta.last_included_term,
478 "Removed old snapshot"
479 );
480 }
481
482 Ok(())
483 }
484
485 pub fn list_all_snapshots(&self) -> RaftResult<Vec<SnapshotMetadata>> {
487 let entries =
488 fs::read_dir(&self.config.snapshot_dir).map_err(|e| RaftError::StorageError {
489 message: format!(
490 "Failed to read snapshot directory '{}': {}",
491 self.config.snapshot_dir.display(),
492 e
493 ),
494 })?;
495
496 let mut metas = Vec::new();
497
498 for entry in entries {
499 let entry = entry.map_err(|e| RaftError::StorageError {
500 message: format!("Failed to read directory entry: {}", e),
501 })?;
502
503 let path = entry.path();
504 if let Some(ext) = path.extension() {
505 if ext == "json" {
506 if let Some(stem) = path.file_stem() {
507 let stem_str = stem.to_string_lossy();
508 if stem_str.ends_with(".meta") {
509 match self.load_metadata_from_file(&path) {
510 Ok(meta) => metas.push(meta),
511 Err(e) => {
512 warn!(
513 path = %path.display(),
514 error = %e,
515 "Skipping corrupt snapshot metadata during cleanup"
516 );
517 }
518 }
519 }
520 }
521 }
522 }
523 }
524
525 Ok(metas)
526 }
527
528 pub fn install_snapshot(&mut self, snapshot: Snapshot) -> RaftResult<SnapshotMetadata> {
533 if !snapshot.verify_checksum() {
535 return Err(RaftError::StorageError {
536 message: format!(
537 "Received snapshot with invalid checksum (index={}, term={})",
538 snapshot.metadata.last_included_index, snapshot.metadata.last_included_term
539 ),
540 });
541 }
542
543 if let Some(current) = &self.latest {
545 if (
546 snapshot.metadata.last_included_term,
547 snapshot.metadata.last_included_index,
548 ) <= (current.last_included_term, current.last_included_index)
549 {
550 return Err(RaftError::StorageError {
551 message: format!(
552 "Received snapshot (term={}, index={}) is not newer than current (term={}, index={})",
553 snapshot.metadata.last_included_term,
554 snapshot.metadata.last_included_index,
555 current.last_included_term,
556 current.last_included_index,
557 ),
558 });
559 }
560 }
561
562 let data_path = self
564 .config
565 .snapshot_dir
566 .join(snapshot.metadata.data_filename());
567 Self::atomic_write(&data_path, &snapshot.data)?;
568
569 let meta_path = self
570 .config
571 .snapshot_dir
572 .join(snapshot.metadata.metadata_filename());
573 let meta_json = serde_json::to_string_pretty(&snapshot.metadata).map_err(|e| {
574 RaftError::StorageError {
575 message: format!("Failed to serialize installed snapshot metadata: {}", e),
576 }
577 })?;
578 Self::atomic_write(&meta_path, meta_json.as_bytes())?;
579
580 info!(
581 last_included_index = snapshot.metadata.last_included_index,
582 last_included_term = snapshot.metadata.last_included_term,
583 size_bytes = snapshot.metadata.size_bytes,
584 "Installed snapshot from leader"
585 );
586
587 let metadata = snapshot.metadata.clone();
588 self.latest = Some(snapshot.metadata);
589
590 self.cleanup_old_snapshots()?;
591
592 Ok(metadata)
593 }
594}
595
596#[derive(Debug, Clone, PartialEq, Eq)]
601pub struct InstallSnapshotRequest {
602 pub term: Term,
604 pub leader_id: NodeId,
606 pub last_included_index: LogIndex,
608 pub last_included_term: Term,
610 pub offset: u64,
612 pub data: Vec<u8>,
614 pub done: bool,
616}
617
618impl InstallSnapshotRequest {
619 pub fn new_complete(
621 term: Term,
622 leader_id: NodeId,
623 last_included_index: LogIndex,
624 last_included_term: Term,
625 data: Vec<u8>,
626 ) -> Self {
627 Self {
628 term,
629 leader_id,
630 last_included_index,
631 last_included_term,
632 offset: 0,
633 data,
634 done: true,
635 }
636 }
637
638 pub fn new_chunk(
640 term: Term,
641 leader_id: NodeId,
642 last_included_index: LogIndex,
643 last_included_term: Term,
644 offset: u64,
645 data: Vec<u8>,
646 done: bool,
647 ) -> Self {
648 Self {
649 term,
650 leader_id,
651 last_included_index,
652 last_included_term,
653 offset,
654 data,
655 done,
656 }
657 }
658
659 pub fn is_complete(&self) -> bool {
661 self.offset == 0 && self.done
662 }
663}
664
665#[derive(Debug, Clone, PartialEq, Eq)]
667pub struct InstallSnapshotResponse {
668 pub term: Term,
670}
671
672impl InstallSnapshotResponse {
673 pub fn new(term: Term) -> Self {
675 Self { term }
676 }
677}
678
679pub struct SnapshotReceiver {
681 last_included_index: LogIndex,
683 last_included_term: Term,
685 data: Vec<u8>,
687 next_offset: u64,
689}
690
691impl SnapshotReceiver {
692 pub fn new(last_included_index: LogIndex, last_included_term: Term) -> Self {
694 Self {
695 last_included_index,
696 last_included_term,
697 data: Vec::new(),
698 next_offset: 0,
699 }
700 }
701
702 pub fn receive_chunk(&mut self, req: &InstallSnapshotRequest) -> RaftResult<Option<Snapshot>> {
707 if req.last_included_index != self.last_included_index
709 || req.last_included_term != self.last_included_term
710 {
711 return Err(RaftError::StorageError {
712 message: format!(
713 "Snapshot chunk mismatch: expected (index={}, term={}), got (index={}, term={})",
714 self.last_included_index,
715 self.last_included_term,
716 req.last_included_index,
717 req.last_included_term,
718 ),
719 });
720 }
721
722 if req.offset != self.next_offset {
724 return Err(RaftError::StorageError {
725 message: format!(
726 "Unexpected snapshot chunk offset: expected {}, got {}",
727 self.next_offset, req.offset,
728 ),
729 });
730 }
731
732 self.data.extend_from_slice(&req.data);
734 self.next_offset += req.data.len() as u64;
735
736 if req.done {
737 let snapshot = Snapshot::new(
738 self.last_included_index,
739 self.last_included_term,
740 std::mem::take(&mut self.data),
741 );
742 Ok(Some(snapshot))
743 } else {
744 Ok(None)
745 }
746 }
747
748 pub fn last_included_index(&self) -> LogIndex {
750 self.last_included_index
751 }
752
753 pub fn last_included_term(&self) -> Term {
755 self.last_included_term
756 }
757
758 pub fn bytes_received(&self) -> u64 {
760 self.data.len() as u64
761 }
762}
763
764pub trait SnapshotStore: Send + Sync {
769 fn save(
771 &mut self,
772 data: Vec<u8>,
773 last_included_index: LogIndex,
774 last_included_term: Term,
775 ) -> RaftResult<SnapshotMetadata>;
776
777 fn load_latest(&self) -> RaftResult<Option<Snapshot>>;
779
780 fn list(&self) -> RaftResult<Vec<SnapshotMetadata>>;
782
783 fn prune(&self, keep_n: usize) -> RaftResult<()>;
785}
786
787pub struct DiskSnapshotStore {
792 manager: SnapshotManager,
793}
794
795impl DiskSnapshotStore {
796 pub fn new(config: SnapshotConfig) -> RaftResult<Self> {
798 let manager = SnapshotManager::new(config)?;
799 Ok(Self { manager })
800 }
801
802 pub fn manager(&self) -> &SnapshotManager {
804 &self.manager
805 }
806
807 pub fn manager_mut(&mut self) -> &mut SnapshotManager {
809 &mut self.manager
810 }
811}
812
813impl SnapshotStore for DiskSnapshotStore {
814 fn save(
815 &mut self,
816 data: Vec<u8>,
817 last_included_index: LogIndex,
818 last_included_term: Term,
819 ) -> RaftResult<SnapshotMetadata> {
820 self.manager
821 .create_snapshot(data, last_included_index, last_included_term)
822 }
823
824 fn load_latest(&self) -> RaftResult<Option<Snapshot>> {
825 self.manager.load_latest()
826 }
827
828 fn list(&self) -> RaftResult<Vec<SnapshotMetadata>> {
829 self.manager.list_all_snapshots()
830 }
831
832 fn prune(&self, keep_n: usize) -> RaftResult<()> {
833 let mut snapshot_metas = self.manager.list_all_snapshots()?;
834
835 if snapshot_metas.len() <= keep_n {
836 return Ok(());
837 }
838
839 snapshot_metas.sort_by(|a, b| {
841 (b.last_included_term, b.last_included_index)
842 .cmp(&(a.last_included_term, a.last_included_index))
843 });
844
845 let to_remove = &snapshot_metas[keep_n..];
846
847 for meta in to_remove {
848 let data_path = self.manager.config.snapshot_dir.join(meta.data_filename());
849 let meta_path = self
850 .manager
851 .config
852 .snapshot_dir
853 .join(meta.metadata_filename());
854
855 if data_path.exists() {
856 fs::remove_file(&data_path).map_err(|e| RaftError::StorageError {
857 message: format!(
858 "Failed to remove old snapshot data '{}': {}",
859 data_path.display(),
860 e
861 ),
862 })?;
863 }
864
865 if meta_path.exists() {
866 fs::remove_file(&meta_path).map_err(|e| RaftError::StorageError {
867 message: format!(
868 "Failed to remove old snapshot metadata '{}': {}",
869 meta_path.display(),
870 e
871 ),
872 })?;
873 }
874
875 info!(
876 last_included_index = meta.last_included_index,
877 last_included_term = meta.last_included_term,
878 "Pruned old snapshot"
879 );
880 }
881
882 Ok(())
883 }
884}
885
886pub struct SnapshotStreamer {
889 path: PathBuf,
890 metadata: SnapshotMetadata,
891 chunk_size: usize,
892 offset: u64,
893 total_size: u64,
894 file: fs::File,
895}
896
897impl SnapshotStreamer {
898 pub fn new(path: PathBuf, metadata: SnapshotMetadata, chunk_size: usize) -> RaftResult<Self> {
903 let file = fs::File::open(&path).map_err(|e| RaftError::StorageError {
904 message: format!("Failed to open snapshot file '{}': {}", path.display(), e),
905 })?;
906 let total_size = file
907 .metadata()
908 .map_err(|e| RaftError::StorageError {
909 message: format!("Failed to stat snapshot file '{}': {}", path.display(), e),
910 })?
911 .len();
912 Ok(Self {
913 path,
914 metadata,
915 chunk_size,
916 offset: 0,
917 total_size,
918 file,
919 })
920 }
921
922 pub fn metadata(&self) -> &SnapshotMetadata {
924 &self.metadata
925 }
926
927 pub fn total_size(&self) -> u64 {
929 self.total_size
930 }
931
932 pub fn next_chunk_for_rpc(
936 &mut self,
937 term: Term,
938 leader_id: NodeId,
939 ) -> RaftResult<Option<InstallSnapshotRequest>> {
940 if self.offset >= self.total_size {
941 return Ok(None);
942 }
943
944 self.file
945 .seek(SeekFrom::Start(self.offset))
946 .map_err(|e| RaftError::StorageError {
947 message: format!(
948 "Failed to seek to offset {} in '{}': {}",
949 self.offset,
950 self.path.display(),
951 e
952 ),
953 })?;
954
955 let remaining = self.total_size - self.offset;
956 let to_read = remaining.min(self.chunk_size as u64) as usize;
957 let mut buf = vec![0u8; to_read];
958
959 self.file
960 .read_exact(&mut buf)
961 .map_err(|e| RaftError::StorageError {
962 message: format!(
963 "Failed to read {} bytes at offset {} from '{}': {}",
964 to_read,
965 self.offset,
966 self.path.display(),
967 e
968 ),
969 })?;
970
971 let chunk_offset = self.offset;
972 self.offset += to_read as u64;
973 let done = self.offset >= self.total_size;
974
975 Ok(Some(InstallSnapshotRequest {
976 term,
977 leader_id,
978 last_included_index: self.metadata.last_included_index,
979 last_included_term: self.metadata.last_included_term,
980 offset: chunk_offset,
981 data: buf,
982 done,
983 }))
984 }
985}
986
987pub struct SnapshotStreamReceiver {
991 temp_path: PathBuf,
992 final_path: PathBuf,
993 file: fs::File,
994 next_offset: u64,
995 last_included_index: LogIndex,
996 last_included_term: Term,
997 expected_checksum: Option<u32>,
998 bytes_written: u64,
999}
1000
1001impl SnapshotStreamReceiver {
1002 pub fn new(
1007 dir: &Path,
1008 last_included_index: LogIndex,
1009 last_included_term: Term,
1010 ) -> RaftResult<Self> {
1011 let temp_name = format!(
1012 "snapshot-{:016x}-{:016x}.data.tmp",
1013 last_included_term, last_included_index
1014 );
1015 let final_name = format!(
1016 "snapshot-{:016x}-{:016x}.data",
1017 last_included_term, last_included_index
1018 );
1019 let temp_path = dir.join(&temp_name);
1020 let final_path = dir.join(&final_name);
1021
1022 let file = fs::File::create(&temp_path).map_err(|e| RaftError::StorageError {
1023 message: format!(
1024 "Failed to create temp snapshot file '{}': {}",
1025 temp_path.display(),
1026 e
1027 ),
1028 })?;
1029
1030 Ok(Self {
1031 temp_path,
1032 final_path,
1033 file,
1034 next_offset: 0,
1035 last_included_index,
1036 last_included_term,
1037 expected_checksum: None,
1038 bytes_written: 0,
1039 })
1040 }
1041
1042 pub fn receive_chunk(&mut self, req: &InstallSnapshotRequest) -> RaftResult<Option<PathBuf>> {
1048 if req.last_included_index != self.last_included_index
1050 || req.last_included_term != self.last_included_term
1051 {
1052 return Err(RaftError::StorageError {
1053 message: format!(
1054 "Snapshot identity mismatch: expected (index={}, term={}), got (index={}, term={})",
1055 self.last_included_index,
1056 self.last_included_term,
1057 req.last_included_index,
1058 req.last_included_term,
1059 ),
1060 });
1061 }
1062
1063 if req.offset != self.next_offset {
1065 return Err(RaftError::StorageError {
1066 message: format!(
1067 "Snapshot chunk offset mismatch: expected {}, got {}",
1068 self.next_offset, req.offset,
1069 ),
1070 });
1071 }
1072
1073 self.file
1075 .write_all(&req.data)
1076 .map_err(|e| RaftError::StorageError {
1077 message: format!(
1078 "Failed to write snapshot chunk at offset {}: {}",
1079 self.next_offset, e
1080 ),
1081 })?;
1082
1083 self.next_offset += req.data.len() as u64;
1084 self.bytes_written += req.data.len() as u64;
1085
1086 if !req.done {
1087 return Ok(None);
1088 }
1089
1090 self.file.flush().map_err(|e| RaftError::StorageError {
1092 message: format!("Failed to flush snapshot temp file: {}", e),
1093 })?;
1094
1095 let mut verify_file =
1097 fs::File::open(&self.temp_path).map_err(|e| RaftError::StorageError {
1098 message: format!(
1099 "Failed to open temp file '{}' for checksum verification: {}",
1100 self.temp_path.display(),
1101 e
1102 ),
1103 })?;
1104
1105 let mut hasher = crc32fast::Hasher::new();
1106 let mut read_buf = vec![0u8; 65536]; loop {
1108 let n = verify_file
1109 .read(&mut read_buf)
1110 .map_err(|e| RaftError::StorageError {
1111 message: format!("Failed to read temp file for checksum verification: {}", e),
1112 })?;
1113 if n == 0 {
1114 break;
1115 }
1116 hasher.update(&read_buf[..n]);
1117 }
1118 let computed_checksum = hasher.finalize();
1119
1120 if let Some(expected) = self.expected_checksum {
1122 if computed_checksum != expected {
1123 return Err(RaftError::StorageError {
1124 message: format!(
1125 "Snapshot CRC32 mismatch: expected {:#010x}, computed {:#010x}",
1126 expected, computed_checksum
1127 ),
1128 });
1129 }
1130 }
1131
1132 fs::rename(&self.temp_path, &self.final_path).map_err(|e| RaftError::StorageError {
1134 message: format!(
1135 "Failed to rename '{}' to '{}': {}",
1136 self.temp_path.display(),
1137 self.final_path.display(),
1138 e
1139 ),
1140 })?;
1141
1142 info!(
1143 last_included_index = self.last_included_index,
1144 last_included_term = self.last_included_term,
1145 bytes_written = self.bytes_written,
1146 checksum = computed_checksum,
1147 "Snapshot stream received and finalized"
1148 );
1149
1150 Ok(Some(self.final_path.clone()))
1151 }
1152
1153 pub fn bytes_written(&self) -> u64 {
1155 self.bytes_written
1156 }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161 use super::*;
1162
1163 fn test_snapshot_dir() -> tempfile::TempDir {
1164 tempfile::TempDir::new().expect("Failed to create temp dir for snapshot tests")
1165 }
1166
1167 fn make_config(dir: &Path) -> SnapshotConfig {
1168 SnapshotConfig::new(dir.to_path_buf(), 3, 100)
1169 }
1170
1171 #[test]
1172 fn test_snapshot_creation() {
1173 let dir = test_snapshot_dir();
1174 let config = make_config(dir.path());
1175 let mut manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1176
1177 let data = b"state machine data v1".to_vec();
1178 let meta = manager
1179 .create_snapshot(data.clone(), 50, 3)
1180 .expect("Failed to create snapshot");
1181
1182 assert_eq!(meta.last_included_index, 50);
1183 assert_eq!(meta.last_included_term, 3);
1184 assert_eq!(meta.size_bytes, data.len() as u64);
1185 assert_eq!(meta.checksum, crc32fast::hash(&data));
1186 }
1187
1188 #[test]
1189 fn test_snapshot_load_latest() {
1190 let dir = test_snapshot_dir();
1191 let config = make_config(dir.path());
1192 let mut manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1193
1194 let data = b"state machine snapshot data".to_vec();
1195 manager
1196 .create_snapshot(data.clone(), 100, 5)
1197 .expect("Failed to create snapshot");
1198
1199 let loaded = manager
1200 .load_latest()
1201 .expect("Failed to load latest snapshot");
1202 let loaded = loaded.expect("Expected a snapshot to exist");
1203
1204 assert_eq!(loaded.metadata.last_included_index, 100);
1205 assert_eq!(loaded.metadata.last_included_term, 5);
1206 assert_eq!(loaded.data, data);
1207 assert!(loaded.verify_checksum());
1208 }
1209
1210 #[test]
1211 fn test_snapshot_load_latest_empty() {
1212 let dir = test_snapshot_dir();
1213 let config = make_config(dir.path());
1214 let manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1215
1216 let loaded = manager
1217 .load_latest()
1218 .expect("Failed to load latest snapshot");
1219 assert!(loaded.is_none());
1220 }
1221
1222 #[test]
1223 fn test_snapshot_cleanup_old() {
1224 let dir = test_snapshot_dir();
1225 let config = SnapshotConfig::new(dir.path().to_path_buf(), 2, 100);
1227 let mut manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1228
1229 manager
1231 .create_snapshot(b"snap1".to_vec(), 10, 1)
1232 .expect("Failed to create snapshot 1");
1233 manager
1234 .create_snapshot(b"snap2".to_vec(), 20, 2)
1235 .expect("Failed to create snapshot 2");
1236 manager
1237 .create_snapshot(b"snap3".to_vec(), 30, 3)
1238 .expect("Failed to create snapshot 3");
1239 manager
1240 .create_snapshot(b"snap4".to_vec(), 40, 4)
1241 .expect("Failed to create snapshot 4");
1242
1243 let all = manager
1245 .list_all_snapshots()
1246 .expect("Failed to list snapshots");
1247 assert_eq!(all.len(), 2);
1248
1249 let mut indices: Vec<u64> = all.iter().map(|m| m.last_included_index).collect();
1251 indices.sort();
1252 assert_eq!(indices, vec![30, 40]);
1253 }
1254
1255 #[test]
1256 fn test_snapshot_threshold_trigger() {
1257 let dir = test_snapshot_dir();
1258 let config = SnapshotConfig::new(dir.path().to_path_buf(), 3, 500);
1259 let manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1260
1261 assert!(!manager.should_snapshot(100));
1262 assert!(!manager.should_snapshot(499));
1263 assert!(manager.should_snapshot(500));
1264 assert!(manager.should_snapshot(1000));
1265 }
1266
1267 #[test]
1268 fn test_snapshot_threshold_zero_disabled() {
1269 let dir = test_snapshot_dir();
1270 let config = SnapshotConfig::new(dir.path().to_path_buf(), 3, 0);
1271 let manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1272
1273 assert!(!manager.should_snapshot(0));
1274 assert!(!manager.should_snapshot(999_999));
1275 }
1276
1277 #[test]
1278 fn test_snapshot_metadata_serialization() {
1279 let meta = SnapshotMetadata::new(42, 7, 1024, 0xDEAD_BEEF);
1280
1281 let json = serde_json::to_string(&meta).expect("Failed to serialize metadata");
1282 let deserialized: SnapshotMetadata =
1283 serde_json::from_str(&json).expect("Failed to deserialize metadata");
1284
1285 assert_eq!(deserialized.last_included_index, 42);
1286 assert_eq!(deserialized.last_included_term, 7);
1287 assert_eq!(deserialized.size_bytes, 1024);
1288 assert_eq!(deserialized.checksum, 0xDEAD_BEEF);
1289 assert_eq!(deserialized.created_at, meta.created_at);
1290 }
1291
1292 #[test]
1293 fn test_snapshot_checksum_verification() {
1294 let data = b"important state data".to_vec();
1295 let snapshot = Snapshot::new(10, 2, data);
1296 assert!(snapshot.verify_checksum());
1297
1298 let mut tampered = snapshot.clone();
1300 if let Some(byte) = tampered.data.first_mut() {
1301 *byte ^= 0xFF;
1302 }
1303 assert!(!tampered.verify_checksum());
1304 }
1305
1306 #[test]
1307 fn test_install_snapshot_request_complete() {
1308 let req = InstallSnapshotRequest::new_complete(5, 1, 100, 3, b"data".to_vec());
1309 assert_eq!(req.term, 5);
1310 assert_eq!(req.leader_id, 1);
1311 assert_eq!(req.last_included_index, 100);
1312 assert_eq!(req.last_included_term, 3);
1313 assert_eq!(req.offset, 0);
1314 assert!(req.done);
1315 assert!(req.is_complete());
1316 }
1317
1318 #[test]
1319 fn test_install_snapshot_request_chunk() {
1320 let req = InstallSnapshotRequest::new_chunk(5, 1, 100, 3, 512, b"chunk2".to_vec(), false);
1321 assert_eq!(req.offset, 512);
1322 assert!(!req.done);
1323 assert!(!req.is_complete());
1324 }
1325
1326 #[test]
1327 fn test_install_snapshot_response() {
1328 let resp = InstallSnapshotResponse::new(7);
1329 assert_eq!(resp.term, 7);
1330 }
1331
1332 #[test]
1333 fn test_snapshot_receiver_single_chunk() {
1334 let mut receiver = SnapshotReceiver::new(50, 3);
1335
1336 let req = InstallSnapshotRequest::new_complete(5, 1, 50, 3, b"full data".to_vec());
1337
1338 let result = receiver
1339 .receive_chunk(&req)
1340 .expect("Failed to receive chunk");
1341 let snapshot = result.expect("Expected completed snapshot");
1342
1343 assert_eq!(snapshot.metadata.last_included_index, 50);
1344 assert_eq!(snapshot.metadata.last_included_term, 3);
1345 assert_eq!(snapshot.data, b"full data");
1346 assert!(snapshot.verify_checksum());
1347 }
1348
1349 #[test]
1350 fn test_snapshot_receiver_multi_chunk() {
1351 let mut receiver = SnapshotReceiver::new(100, 5);
1352
1353 let req1 = InstallSnapshotRequest::new_chunk(5, 1, 100, 5, 0, b"hello".to_vec(), false);
1355 let result1 = receiver
1356 .receive_chunk(&req1)
1357 .expect("Failed to receive chunk 1");
1358 assert!(result1.is_none());
1359 assert_eq!(receiver.bytes_received(), 5);
1360
1361 let req2 = InstallSnapshotRequest::new_chunk(5, 1, 100, 5, 5, b" world".to_vec(), true);
1363 let result2 = receiver
1364 .receive_chunk(&req2)
1365 .expect("Failed to receive chunk 2");
1366 let snapshot = result2.expect("Expected completed snapshot");
1367
1368 assert_eq!(snapshot.data, b"hello world");
1369 assert!(snapshot.verify_checksum());
1370 }
1371
1372 #[test]
1373 fn test_snapshot_receiver_wrong_offset() {
1374 let mut receiver = SnapshotReceiver::new(50, 3);
1375
1376 let req = InstallSnapshotRequest::new_chunk(5, 1, 50, 3, 999, b"bad".to_vec(), false);
1377
1378 let result = receiver.receive_chunk(&req);
1379 assert!(result.is_err());
1380 }
1381
1382 #[test]
1383 fn test_snapshot_receiver_mismatched_snapshot() {
1384 let mut receiver = SnapshotReceiver::new(50, 3);
1385
1386 let req = InstallSnapshotRequest::new_complete(5, 1, 99, 3, b"wrong snapshot".to_vec());
1388
1389 let result = receiver.receive_chunk(&req);
1390 assert!(result.is_err());
1391 }
1392
1393 #[test]
1394 fn test_install_snapshot_to_manager() {
1395 let dir = test_snapshot_dir();
1396 let config = make_config(dir.path());
1397 let mut manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1398
1399 let data = b"installed snapshot data".to_vec();
1400 let snapshot = Snapshot::new(200, 10, data.clone());
1401
1402 let meta = manager
1403 .install_snapshot(snapshot)
1404 .expect("Failed to install snapshot");
1405 assert_eq!(meta.last_included_index, 200);
1406 assert_eq!(meta.last_included_term, 10);
1407
1408 let loaded = manager
1410 .load_latest()
1411 .expect("Failed to load")
1412 .expect("Expected snapshot");
1413 assert_eq!(loaded.data, data);
1414 }
1415
1416 #[test]
1417 fn test_install_older_snapshot_rejected() {
1418 let dir = test_snapshot_dir();
1419 let config = make_config(dir.path());
1420 let mut manager = SnapshotManager::new(config).expect("Failed to create snapshot manager");
1421
1422 manager
1424 .create_snapshot(b"newer".to_vec(), 100, 5)
1425 .expect("Failed to create snapshot");
1426
1427 let old_snapshot = Snapshot::new(50, 3, b"older".to_vec());
1429 let result = manager.install_snapshot(old_snapshot);
1430 assert!(result.is_err());
1431 }
1432
1433 #[test]
1434 fn test_snapshot_persistence_across_managers() {
1435 let dir = test_snapshot_dir();
1436 let config = make_config(dir.path());
1437
1438 {
1440 let mut manager =
1441 SnapshotManager::new(config.clone()).expect("Failed to create manager 1");
1442 manager
1443 .create_snapshot(b"persisted data".to_vec(), 75, 4)
1444 .expect("Failed to create snapshot");
1445 }
1446
1447 {
1449 let manager = SnapshotManager::new(config).expect("Failed to create manager 2");
1450 let latest = manager.get_latest_metadata();
1451 assert!(latest.is_some());
1452 let meta = latest.expect("Expected metadata");
1453 assert_eq!(meta.last_included_index, 75);
1454 assert_eq!(meta.last_included_term, 4);
1455
1456 let snapshot = manager
1457 .load_latest()
1458 .expect("Failed to load")
1459 .expect("Expected snapshot");
1460 assert_eq!(snapshot.data, b"persisted data");
1461 }
1462 }
1463
1464 #[test]
1465 fn test_snapshot_config_with_defaults() {
1466 let config = SnapshotConfig::with_defaults(PathBuf::from("/tmp/test"));
1467 assert_eq!(config.max_snapshots, 3);
1468 assert_eq!(config.snapshot_threshold, 10000);
1469 }
1470
1471 #[test]
1472 fn test_snapshot_policy_should_trigger() {
1473 let policy = SnapshotPolicy::new(100);
1474 assert!(!policy.should_snapshot(50, 50)); assert!(!policy.should_snapshot(99, 99)); assert!(policy.should_snapshot(100, 100)); assert!(policy.should_snapshot(200, 200)); }
1479
1480 #[test]
1481 fn test_snapshot_policy_disabled() {
1482 let policy = SnapshotPolicy::disabled();
1483 assert!(!policy.should_snapshot(10000, 10000));
1484 }
1485
1486 #[test]
1487 fn test_snapshot_policy_min_applied() {
1488 let policy = SnapshotPolicy::new(10).with_min_applied(50);
1489 assert!(!policy.should_snapshot(20, 30)); assert!(policy.should_snapshot(20, 50)); }
1492
1493 #[test]
1494 fn test_snapshot_policy_default() {
1495 let policy = SnapshotPolicy::default();
1496 assert_eq!(policy.max_log_entries, 10_000);
1497 assert!(!policy.should_snapshot(9_999, 9_999));
1498 assert!(policy.should_snapshot(10_000, 10_000));
1499 }
1500
1501 #[test]
1504 fn test_atomic_write_creates_file() {
1505 let dir = test_snapshot_dir();
1506 let file_path = dir.path().join("atomic_test.data");
1507 let content = b"atomic write content";
1508
1509 SnapshotManager::atomic_write(&file_path, content).expect("atomic_write should succeed");
1510
1511 let read_back = fs::read(&file_path).expect("File should exist");
1512 assert_eq!(read_back, content);
1513 }
1514
1515 #[test]
1516 fn test_atomic_write_no_tmp_left() {
1517 let dir = test_snapshot_dir();
1518 let file_path = dir.path().join("no_tmp_left.data");
1519
1520 SnapshotManager::atomic_write(&file_path, b"data").expect("atomic_write should succeed");
1521
1522 let tmp_path = file_path.with_extension("data.tmp");
1523 assert!(
1524 !tmp_path.exists(),
1525 "Temp file should not remain after atomic write"
1526 );
1527 }
1528
1529 #[test]
1530 fn test_snapshot_store_save_and_load() {
1531 let dir = test_snapshot_dir();
1532 let config = make_config(dir.path());
1533 let mut store = DiskSnapshotStore::new(config).expect("Failed to create DiskSnapshotStore");
1534
1535 let data = b"disk snapshot store data".to_vec();
1536 let meta = store
1537 .save(data.clone(), 100, 5)
1538 .expect("Failed to save snapshot");
1539
1540 assert_eq!(meta.last_included_index, 100);
1541 assert_eq!(meta.last_included_term, 5);
1542 assert_eq!(meta.size_bytes, data.len() as u64);
1543
1544 let loaded = store
1545 .load_latest()
1546 .expect("Failed to load latest")
1547 .expect("Expected a snapshot");
1548 assert_eq!(loaded.data, data);
1549 assert!(loaded.verify_checksum());
1550 }
1551
1552 #[test]
1553 fn test_snapshot_store_list() {
1554 let dir = test_snapshot_dir();
1555 let config = SnapshotConfig::new(dir.path().to_path_buf(), 10, 100);
1556 let mut store = DiskSnapshotStore::new(config).expect("Failed to create DiskSnapshotStore");
1557
1558 store.save(b"snap1".to_vec(), 10, 1).expect("save 1 failed");
1559 store.save(b"snap2".to_vec(), 20, 2).expect("save 2 failed");
1560 store.save(b"snap3".to_vec(), 30, 3).expect("save 3 failed");
1561
1562 let list = store.list().expect("list failed");
1563 assert_eq!(list.len(), 3);
1564
1565 let mut indices: Vec<u64> = list.iter().map(|m| m.last_included_index).collect();
1566 indices.sort();
1567 assert_eq!(indices, vec![10, 20, 30]);
1568 }
1569
1570 #[test]
1571 fn test_snapshot_store_prune() {
1572 let dir = test_snapshot_dir();
1573 let config = SnapshotConfig::new(dir.path().to_path_buf(), 10, 100);
1575 let mut store = DiskSnapshotStore::new(config).expect("Failed to create DiskSnapshotStore");
1576
1577 for i in 1..=5 {
1579 store
1580 .save(format!("snap{}", i).into_bytes(), i * 10, i)
1581 .expect("save failed");
1582 }
1583
1584 assert_eq!(store.list().expect("list failed").len(), 5);
1585
1586 store.prune(2).expect("prune failed");
1588
1589 let remaining = store.list().expect("list failed");
1590 assert_eq!(remaining.len(), 2);
1591
1592 let mut indices: Vec<u64> = remaining.iter().map(|m| m.last_included_index).collect();
1594 indices.sort();
1595 assert_eq!(indices, vec![40, 50]);
1596 }
1597
1598 #[test]
1601 fn test_streamer_chunks_correctly() {
1602 let dir = test_snapshot_dir();
1603
1604 let total_size: usize = 2 * 1024 * 1024;
1606 let original_data: Vec<u8> = (0..total_size).map(|i| (i % 256) as u8).collect();
1607 let checksum = crc32fast::hash(&original_data);
1608 let metadata = SnapshotMetadata::new(42, 7, total_size as u64, checksum);
1609
1610 let snap_path = dir.path().join(metadata.data_filename());
1611 fs::write(&snap_path, &original_data).expect("Failed to write snapshot file");
1612
1613 let chunk_size = 512 * 1024; let mut streamer = SnapshotStreamer::new(snap_path, metadata, chunk_size)
1615 .expect("Failed to create SnapshotStreamer");
1616
1617 assert_eq!(streamer.total_size(), total_size as u64);
1618
1619 let mut reconstructed = Vec::new();
1620 let mut chunk_count = 0usize;
1621 let mut last_done = false;
1622
1623 while let Some(req) = streamer
1624 .next_chunk_for_rpc(5, 1)
1625 .expect("next_chunk_for_rpc failed")
1626 {
1627 assert_eq!(req.last_included_index, 42);
1628 assert_eq!(req.last_included_term, 7);
1629 reconstructed.extend_from_slice(&req.data);
1630 chunk_count += 1;
1631 last_done = req.done;
1632 }
1633
1634 assert!(last_done, "Final chunk must have done=true");
1635 assert_eq!(chunk_count, 4, "2 MiB / 512 KiB = 4 chunks");
1636 assert_eq!(
1637 reconstructed, original_data,
1638 "Reconstructed data must match original"
1639 );
1640 }
1641
1642 #[test]
1643 fn test_stream_receiver_writes_to_disk() {
1644 let dir = test_snapshot_dir();
1645
1646 let last_included_index: LogIndex = 100;
1647 let last_included_term: Term = 5;
1648
1649 let mut receiver =
1650 SnapshotStreamReceiver::new(dir.path(), last_included_index, last_included_term)
1651 .expect("Failed to create SnapshotStreamReceiver");
1652
1653 let chunk1 = b"chunk one data--".to_vec();
1654 let chunk2 = b"chunk two data--".to_vec();
1655 let chunk3 = b"chunk three data".to_vec();
1656
1657 let req1 = InstallSnapshotRequest::new_chunk(5, 1, 100, 5, 0, chunk1.clone(), false);
1658 let result1 = receiver
1659 .receive_chunk(&req1)
1660 .expect("receive chunk 1 failed");
1661 assert!(result1.is_none());
1662 assert_eq!(receiver.bytes_written(), chunk1.len() as u64);
1663
1664 let offset2 = chunk1.len() as u64;
1665 let req2 = InstallSnapshotRequest::new_chunk(5, 1, 100, 5, offset2, chunk2.clone(), false);
1666 let result2 = receiver
1667 .receive_chunk(&req2)
1668 .expect("receive chunk 2 failed");
1669 assert!(result2.is_none());
1670
1671 let offset3 = offset2 + chunk2.len() as u64;
1672 let req3 = InstallSnapshotRequest::new_chunk(5, 1, 100, 5, offset3, chunk3.clone(), true);
1673 let result3 = receiver
1674 .receive_chunk(&req3)
1675 .expect("receive chunk 3 failed");
1676 let final_path = result3.expect("Expected final path on done=true");
1677
1678 assert!(final_path.exists(), "Final snapshot file must exist");
1679
1680 let written = fs::read(&final_path).expect("Failed to read final snapshot file");
1681 let expected: Vec<u8> = [chunk1, chunk2, chunk3].concat();
1682 assert_eq!(written, expected, "Written data must match sent chunks");
1683 assert_eq!(receiver.bytes_written(), expected.len() as u64);
1684 }
1685
1686 #[test]
1687 fn test_streamer_and_receiver_roundtrip() {
1688 let dir = test_snapshot_dir();
1689
1690 let src_dir = dir.path().join("src");
1691 let dst_dir = dir.path().join("dst");
1692 fs::create_dir_all(&src_dir).expect("Failed to create src dir");
1693 fs::create_dir_all(&dst_dir).expect("Failed to create dst dir");
1694
1695 let total_size: usize = 3 * 512 * 1024;
1697 let original_data: Vec<u8> = (0..total_size)
1698 .map(|i| ((i.wrapping_mul(7).wrapping_add(i / 256)) % 256) as u8)
1699 .collect();
1700
1701 let last_included_index: LogIndex = 250;
1702 let last_included_term: Term = 12;
1703 let checksum = crc32fast::hash(&original_data);
1704 let metadata = SnapshotMetadata::new(
1705 last_included_index,
1706 last_included_term,
1707 total_size as u64,
1708 checksum,
1709 );
1710
1711 let snap_path = src_dir.join(metadata.data_filename());
1712 fs::write(&snap_path, &original_data).expect("Failed to write source snapshot");
1713
1714 let chunk_size = 512 * 1024; let mut streamer = SnapshotStreamer::new(snap_path, metadata, chunk_size)
1716 .expect("Failed to create SnapshotStreamer");
1717 let mut receiver =
1718 SnapshotStreamReceiver::new(&dst_dir, last_included_index, last_included_term)
1719 .expect("Failed to create SnapshotStreamReceiver");
1720
1721 let mut final_path: Option<PathBuf> = None;
1722 while let Some(req) = streamer.next_chunk_for_rpc(15, 1).expect("Streamer error") {
1723 if let Some(path) = receiver.receive_chunk(&req).expect("Receiver error") {
1724 final_path = Some(path);
1725 break;
1726 }
1727 }
1728
1729 let final_path = final_path.expect("Round-trip must complete");
1730 assert!(final_path.exists(), "Final snapshot file must exist");
1731
1732 let received_data = fs::read(&final_path).expect("Failed to read final snapshot");
1733 assert_eq!(
1734 received_data, original_data,
1735 "Round-trip data must match original"
1736 );
1737 assert_eq!(receiver.bytes_written(), total_size as u64);
1738 }
1739}