persistent_kv/snapshot_set/
file_snapshot_set.rs

1use std::{
2    collections::{HashMap, HashSet},
3    fs::{self, File},
4    io,
5    path::{Path, PathBuf},
6};
7
8use fs2::FileExt;
9use regex::Regex;
10
11use super::{SnapshotInfo, SnapshotOrdinal, SnapshotSet, SnapshotType};
12
13/// Implementation of SnapshotSet using files on disk that exactly mirror the state
14/// in memory, i.e., each entry in `snapshots` corresponds to a file in the folder, even
15/// if the file is empty.
16///
17/// Snapshot files are always of the format
18///   `snapshot_<ordinal>_<shard>-of-<shard-count>_<type>.bin`
19///
20/// Name components:
21///  1) `<ordinal>` is a monotonically increasing sequence number
22///  2) `<type>` is one of `diff`, `full`, or `pending` where `pending` should be renamed to `full`
23///     once the snapshot is complete and published.
24///  3) `<shard>` is the shard number (0-based) of the snapshot
25///  4) `<shard-count>` is the total number of shards in the snapshot
26///
27/// `<ordinal>` must be processed in sequence, `<shard>` can be processed in parallel.
28#[derive(Debug)]
29pub struct FileSnapshotSet {
30    pub snapshots: Vec<SnapshotInfo>,
31    folder: PathBuf,
32    lockfile: File,
33}
34
35impl FileSnapshotSet {
36    pub fn new(folder: &Path) -> Result<Self, io::Error> {
37        fs::create_dir_all(folder)?;
38        // Scan the folder for all files matching snapshot pattern and map them to SnapshotInfo.
39        let mut snapshots: Vec<SnapshotInfo> = Vec::new();
40        let mut seen_shards: HashSet<(SnapshotOrdinal, u64)> = HashSet::new();
41        let mut info_by_ordinal: HashMap<SnapshotOrdinal, (u64, SnapshotType)> = HashMap::new();
42        for entry in fs::read_dir(folder).unwrap() {
43            let entry = entry.unwrap();
44            let path = entry.path();
45            if let Some((ordinal, shard, shard_count, snapshot_type)) =
46                Self::parse_snapshot_filename_(&path)
47            {
48                // Find existing snapshot and append this shard or create new one
49                let snapshot = snapshots
50                    .iter_mut()
51                    .find(|snapshot| snapshot.ordinal == ordinal);
52                if let Some(snapshot) = snapshot {
53                    if seen_shards.contains(&(ordinal, shard)) {
54                        return Err(io::Error::new(
55                            io::ErrorKind::InvalidInput,
56                            "Duplicate snapshot shard detected",
57                        ));
58                    }
59                    let (prior_shard_count, prior_snapshot_type) =
60                        info_by_ordinal.get(&ordinal).unwrap();
61                    if shard_count != *prior_shard_count || snapshot_type != *prior_snapshot_type {
62                        return Err(io::Error::new(
63                            io::ErrorKind::InvalidInput,
64                            "Inconsistent snapshot shard count or type detected",
65                        ));
66                    }
67                    snapshot.shard_paths.push(path);
68                } else {
69                    info_by_ordinal.insert(ordinal, (shard_count, snapshot_type));
70                    snapshots.push(SnapshotInfo {
71                        snapshot_type,
72                        ordinal,
73                        shard_paths: vec![path],
74                    });
75                }
76                seen_shards.insert((ordinal, shard));
77            }
78        }
79
80        // Validate that all shards are present for each ordinal.
81        for snapshot in snapshots.iter() {
82            let (shard_count, _) = info_by_ordinal.get(&snapshot.ordinal).unwrap();
83            if snapshot.shard_paths.len() as u64 != *shard_count {
84                return Err(io::Error::new(
85                    io::ErrorKind::InvalidInput,
86                    "Missing snapshot shard detected",
87                ));
88            }
89        }
90
91        // Sort data by (ordinal, shard-index)
92        snapshots.sort_by_key(|snapshot| snapshot.ordinal);
93        snapshots
94            .iter_mut()
95            .for_each(|snapshot| snapshot.shard_paths.sort());
96
97        // Ensure no other snapshotset instance is active in the same folder.
98        let lockfile = File::create(folder.join("lockfile"))?;
99        lockfile.try_lock_exclusive()?;
100        Ok(Self {
101            folder: folder.to_path_buf(),
102            snapshots,
103            lockfile,
104        })
105    }
106
107    /// Returns the latest full snapshot that has been published and is considered complete.
108    pub fn get_latest_full_snapshot(&self) -> Option<&SnapshotInfo> {
109        self.snapshots
110            .iter()
111            .filter(|snapshot| snapshot.snapshot_type == SnapshotType::FullCompleted)
112            .max_by_key(|snapshot| snapshot.ordinal)
113    }
114
115    /// Returns all differential snapshots that have been created since the snapshot
116    /// with the given ordinal number. This is useful to determine which differential snapshots
117    /// need to be applied to the full snapshot to get the latest state.
118    pub fn get_all_diff_snapshots_since(
119        &self,
120        last_full_ordinal: SnapshotOrdinal,
121    ) -> Vec<&SnapshotInfo> {
122        self.snapshots
123            .iter()
124            .filter(|snapshot| {
125                snapshot.snapshot_type == SnapshotType::Diff && snapshot.ordinal > last_full_ordinal
126            })
127            .collect()
128    }
129
130    fn create_new_snapshot_file_(
131        &mut self,
132        snapshot_type: SnapshotType,
133        shard_count: u64,
134    ) -> Result<&SnapshotInfo, io::Error> {
135        let ordinal = self.get_next_ordinal_number_();
136        let mut shard_paths = Vec::new();
137        for shard in 0..shard_count {
138            let filename =
139                Self::generate_snapshot_filename_(ordinal, shard, shard_count, snapshot_type);
140            let path = self.folder.join(Path::new(&filename));
141            File::create_new(path.clone())?;
142            shard_paths.push(path);
143        }
144        self.snapshots.push(SnapshotInfo {
145            shard_paths,
146            snapshot_type,
147            ordinal,
148        });
149        Ok(self.snapshots.last().unwrap())
150    }
151
152    fn get_next_ordinal_number_(&self) -> SnapshotOrdinal {
153        SnapshotOrdinal(
154            self.snapshots
155                .iter()
156                .map(|snapshot| snapshot.ordinal.0)
157                .max()
158                .unwrap_or(0)
159                + 1,
160        )
161    }
162
163    fn generate_snapshot_filename_(
164        ordinal: SnapshotOrdinal,
165        shard: u64,
166        shard_count: u64,
167        snapshot_type: SnapshotType,
168    ) -> String {
169        let snapshot_type_str = match snapshot_type {
170            SnapshotType::Diff => "diff",
171            SnapshotType::FullCompleted => "full",
172            SnapshotType::Pending => "pending",
173        };
174        format!(
175            "snapshot_{}_{}-of-{}_{}.bin",
176            ordinal.0, shard, shard_count, snapshot_type_str
177        )
178    }
179
180    fn parse_snapshot_filename_(path: &Path) -> Option<(SnapshotOrdinal, u64, u64, SnapshotType)> {
181        let filename = path.file_name().unwrap().to_str().unwrap();
182        let re = Regex::new(r"^snapshot_([0-9]+)_([0-9]+)-of-([0-9]+)_(diff|full|pending)\.bin$")
183            .unwrap();
184        let captures = re.captures(filename)?;
185        let ordinal = SnapshotOrdinal(captures[1].parse().unwrap());
186        let shard = captures[2].parse().unwrap();
187        let shard_count = captures[3].parse().unwrap();
188        let snapshot_type = match &captures[4] {
189            "diff" => SnapshotType::Diff,
190            "full" => SnapshotType::FullCompleted,
191            "pending" => SnapshotType::Pending,
192            _ => {
193                return None;
194            }
195        };
196        Some((ordinal, shard, shard_count, snapshot_type))
197    }
198}
199
200impl Drop for FileSnapshotSet {
201    fn drop(&mut self) {
202        <File as FileExt>::unlock(&self.lockfile).unwrap();
203    }
204}
205
206impl SnapshotSet for FileSnapshotSet {
207    fn create_or_get_snapshot(
208        &mut self,
209        snapshot_type: SnapshotType,
210        shard_count: u64,
211        may_append_existing: bool,
212    ) -> Result<SnapshotInfo, io::Error> {
213        assert!(shard_count > 0);
214        assert!(
215            snapshot_type != SnapshotType::FullCompleted,
216            "Cannot create completed snapshot directly, use publish_completed_snapshot()"
217        );
218        if may_append_existing {
219            let latest_diff_snapshot = self
220                .snapshots
221                .iter()
222                .filter(|snapshot| snapshot.snapshot_type == snapshot_type)
223                .max_by_key(|snapshot| snapshot.ordinal);
224            let latest_full_snapshot = self.get_latest_full_snapshot();
225            // We cannot continue writing the existing files if the shard count changed,
226            // this happens if the binary is restarted with different configuration.
227            if let Some(latest_diff_snapshot) = latest_diff_snapshot {
228                if latest_diff_snapshot.shard_paths.len() as u64 == shard_count {
229                    if let Some(latest_full_snapshot) = latest_full_snapshot {
230                        if latest_full_snapshot.ordinal < latest_diff_snapshot.ordinal {
231                            return Ok(latest_diff_snapshot.clone());
232                        }
233                    } else {
234                        return Ok(latest_diff_snapshot.clone());
235                    }
236                }
237            }
238        }
239        // Should (and could) return &SnapshotInfo, but borrow checker doesn't follow the branches.
240        self.create_new_snapshot_file_(snapshot_type, shard_count)
241            .cloned()
242    }
243
244    fn publish_completed_snapshot(
245        &mut self,
246        pending_snapshot_ordinal: SnapshotOrdinal,
247        purge_obsolete_diff_snapshots: bool,
248        purge_obsolete_pending_snapshots: bool,
249    ) -> Result<(), io::Error> {
250        let pending_snapshot = self
251            .snapshots
252            .iter_mut()
253            .find(|snapshot| snapshot.ordinal == pending_snapshot_ordinal)
254            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "Snapshot not found"))?;
255        if pending_snapshot.snapshot_type != SnapshotType::Pending {
256            return Err(io::Error::new(
257                io::ErrorKind::AlreadyExists,
258                "Snapshot is not pending",
259            ));
260        }
261        let shard_count = pending_snapshot.shard_paths.len() as u64;
262        for shard in 0..shard_count {
263            let new_snapshot_path = Self::generate_snapshot_filename_(
264                pending_snapshot.ordinal,
265                shard,
266                shard_count,
267                SnapshotType::FullCompleted,
268            );
269            let new_snapshot_path = self.folder.join(new_snapshot_path);
270            fs::rename(
271                &pending_snapshot.shard_paths[shard as usize],
272                &new_snapshot_path,
273            )?;
274            pending_snapshot.shard_paths[shard as usize] = new_snapshot_path;
275        }
276        pending_snapshot.snapshot_type = SnapshotType::FullCompleted;
277
278        if purge_obsolete_diff_snapshots || purge_obsolete_pending_snapshots {
279            let obsolete_snapshot: Vec<_> = self
280                .snapshots
281                .iter()
282                .filter(|snapshot| {
283                    ((purge_obsolete_diff_snapshots
284                        && snapshot.snapshot_type == SnapshotType::Diff)
285                        || (purge_obsolete_pending_snapshots
286                            && snapshot.snapshot_type == SnapshotType::Pending))
287                        && snapshot.ordinal < pending_snapshot_ordinal
288                })
289                .cloned()
290                .collect();
291            for obsolete_snapshot in obsolete_snapshot {
292                for path in obsolete_snapshot.shard_paths.iter() {
293                    fs::remove_file(path)?;
294                }
295                self.snapshots
296                    .retain(|s| s.ordinal != obsolete_snapshot.ordinal);
297            }
298        }
299        Ok(())
300    }
301
302    fn get_snapshots_to_restore(&self) -> Vec<&SnapshotInfo> {
303        let mut snapshots_to_restore = Vec::new();
304        let last_snapshot_ordinal = match self.get_latest_full_snapshot() {
305            Some(snapshot) => {
306                snapshots_to_restore.push(snapshot);
307                snapshot.ordinal
308            }
309            None => SnapshotOrdinal(0),
310        };
311        snapshots_to_restore.append(&mut self.get_all_diff_snapshots_since(last_snapshot_ordinal));
312        snapshots_to_restore
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319    use std::fs::File;
320    use tempfile::TempDir;
321
322    fn create_temp_dir() -> TempDir {
323        TempDir::new().unwrap()
324    }
325
326    fn create_snapshot_file(folder: &Path, name: &str) -> PathBuf {
327        let path = folder.join(name);
328        File::create(&path).unwrap();
329        path
330    }
331
332    #[test]
333    fn empty() {
334        let tmp_dir = create_temp_dir();
335
336        let snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
337        assert_eq!(snapshot_set.snapshots.len(), 0);
338    }
339
340    #[test]
341    fn empty_folder_does_not_exist_yet() {
342        let tmp_dir = create_temp_dir();
343
344        let snapshot_set = FileSnapshotSet::new(&tmp_dir.path().join("new-folder")).unwrap();
345        assert_eq!(snapshot_set.snapshots.len(), 0);
346    }
347
348    #[test]
349    fn snapshots_in_ordinal_order() {
350        let tmp_dir = create_temp_dir();
351        create_snapshot_file(tmp_dir.path(), "snapshot_1_0-of-1_diff.bin");
352        create_snapshot_file(tmp_dir.path(), "snapshot_4_0-of-1_diff.bin");
353        create_snapshot_file(tmp_dir.path(), "snapshot_3_0-of-1_pending.bin");
354        create_snapshot_file(tmp_dir.path(), "snapshot_2_0-of-1_full.bin");
355        create_snapshot_file(tmp_dir.path(), "other_file.txt");
356
357        let snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
358        assert_eq!(snapshot_set.snapshots.len(), 4);
359        assert_eq!(
360            snapshot_set.snapshots[0],
361            SnapshotInfo {
362                shard_paths: vec![tmp_dir.path().join("snapshot_1_0-of-1_diff.bin")],
363                ordinal: SnapshotOrdinal(1),
364                snapshot_type: SnapshotType::Diff,
365            }
366        );
367        assert_eq!(
368            snapshot_set.snapshots[1],
369            SnapshotInfo {
370                shard_paths: vec![tmp_dir.path().join("snapshot_2_0-of-1_full.bin")],
371                ordinal: SnapshotOrdinal(2),
372                snapshot_type: SnapshotType::FullCompleted,
373            }
374        );
375        assert_eq!(
376            snapshot_set.snapshots[2],
377            SnapshotInfo {
378                shard_paths: vec![tmp_dir.path().join("snapshot_3_0-of-1_pending.bin")],
379                ordinal: SnapshotOrdinal(3),
380                snapshot_type: SnapshotType::Pending,
381            }
382        );
383        assert_eq!(
384            snapshot_set.snapshots[3],
385            SnapshotInfo {
386                shard_paths: vec![tmp_dir.path().join("snapshot_4_0-of-1_diff.bin")],
387                ordinal: SnapshotOrdinal(4),
388                snapshot_type: SnapshotType::Diff,
389            }
390        );
391    }
392
393    #[test]
394    fn snapshots_in_shard_order() {
395        let tmp_dir = create_temp_dir();
396        create_snapshot_file(tmp_dir.path(), "snapshot_1_1-of-3_diff.bin");
397        create_snapshot_file(tmp_dir.path(), "snapshot_1_2-of-3_diff.bin");
398        create_snapshot_file(tmp_dir.path(), "snapshot_1_0-of-3_diff.bin");
399
400        let snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
401        assert_eq!(snapshot_set.snapshots.len(), 1);
402        assert_eq!(snapshot_set.snapshots[0].shard_paths.len(), 3);
403        assert_eq!(
404            snapshot_set.snapshots[0].shard_paths[0],
405            tmp_dir.path().join("snapshot_1_0-of-3_diff.bin")
406        );
407        assert_eq!(
408            snapshot_set.snapshots[0].shard_paths[1],
409            tmp_dir.path().join("snapshot_1_1-of-3_diff.bin")
410        );
411        assert_eq!(
412            snapshot_set.snapshots[0].shard_paths[2],
413            tmp_dir.path().join("snapshot_1_2-of-3_diff.bin")
414        );
415    }
416
417    #[test]
418    fn fails_duplicate_shards() {
419        let tmp_dir = create_temp_dir();
420        create_snapshot_file(tmp_dir.path(), "snapshot_1_0-of-1_diff.bin");
421        create_snapshot_file(tmp_dir.path(), "snapshot_1_0-of-1_full.bin");
422
423        let error = FileSnapshotSet::new(tmp_dir.path()).unwrap_err();
424        assert_eq!(error.to_string(), "Duplicate snapshot shard detected");
425    }
426
427    #[test]
428    fn fails_missing_shards() {
429        let tmp_dir = create_temp_dir();
430        create_snapshot_file(tmp_dir.path(), "snapshot_1_1-of-2_diff.bin");
431
432        let error = FileSnapshotSet::new(tmp_dir.path()).unwrap_err();
433        assert_eq!(error.to_string(), "Missing snapshot shard detected");
434    }
435
436    #[test]
437    fn fails_mismatched_shard_counts() {
438        let tmp_dir = create_temp_dir();
439        create_snapshot_file(tmp_dir.path(), "snapshot_1_1-of-2_full.bin");
440        create_snapshot_file(tmp_dir.path(), "snapshot_1_0-of-1_full.bin");
441
442        let error = FileSnapshotSet::new(tmp_dir.path()).unwrap_err();
443        assert_eq!(
444            error.to_string(),
445            "Inconsistent snapshot shard count or type detected"
446        );
447    }
448
449    #[test]
450    fn fails_mismatched_shard_types() {
451        let tmp_dir = create_temp_dir();
452        create_snapshot_file(tmp_dir.path(), "snapshot_1_1-of-2_full.bin");
453        create_snapshot_file(tmp_dir.path(), "snapshot_1_0-of-2_diff.bin");
454
455        let error = FileSnapshotSet::new(tmp_dir.path()).unwrap_err();
456        assert_eq!(
457            error.to_string(),
458            "Inconsistent snapshot shard count or type detected"
459        );
460    }
461
462    #[test]
463    fn registers_new_snapshot_path_assigns_new_snapshot_ordinals() {
464        let tmp_dir = create_temp_dir();
465        create_snapshot_file(tmp_dir.path(), "snapshot_0_0-of-1_diff.bin");
466        create_snapshot_file(tmp_dir.path(), "snapshot_60_0-of-1_full.bin");
467        create_snapshot_file(tmp_dir.path(), "snapshot_900000000000_0-of-1_pending.bin");
468
469        let mut snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
470        let new_diff_snapshot_path = snapshot_set
471            .create_or_get_snapshot(SnapshotType::Diff, 2, false)
472            .unwrap();
473
474        assert_eq!(new_diff_snapshot_path.shard_paths.len(), 2);
475        assert_eq!(
476            new_diff_snapshot_path.shard_paths[0],
477            tmp_dir.path().join("snapshot_900000000001_0-of-2_diff.bin")
478        );
479        assert_eq!(
480            new_diff_snapshot_path.shard_paths[1],
481            tmp_dir.path().join("snapshot_900000000001_1-of-2_diff.bin")
482        );
483
484        let new_pending_snapshot_path = snapshot_set
485            .create_or_get_snapshot(SnapshotType::Pending, 2, false)
486            .unwrap();
487
488        assert_eq!(new_pending_snapshot_path.shard_paths.len(), 2);
489        assert_eq!(
490            new_pending_snapshot_path.shard_paths[0],
491            tmp_dir
492                .path()
493                .join("snapshot_900000000002_0-of-2_pending.bin")
494        );
495        assert_eq!(
496            new_pending_snapshot_path.shard_paths[1],
497            tmp_dir
498                .path()
499                .join("snapshot_900000000002_1-of-2_pending.bin")
500        );
501
502        // Construct a new SnapShotSet to verify that the files were created on disk
503        drop(snapshot_set);
504        let snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
505        assert_eq!(snapshot_set.snapshots.len(), 5);
506        assert_eq!(
507            snapshot_set.snapshots[3].ordinal,
508            SnapshotOrdinal(900000000001)
509        );
510        assert_eq!(
511            snapshot_set.snapshots[4].ordinal,
512            SnapshotOrdinal(900000000002)
513        );
514        assert_eq!(snapshot_set.snapshots[3].shard_paths.len(), 2);
515        assert_eq!(snapshot_set.snapshots[3].shard_paths.len(), 2);
516    }
517
518    #[test]
519    fn registers_new_snapshot_path_reuses_most_recent_diff_ordinal() {
520        let tmp_dir = create_temp_dir();
521        create_snapshot_file(tmp_dir.path(), "snapshot_0_0-of-1_diff.bin");
522        create_snapshot_file(tmp_dir.path(), "snapshot_1_0-of-1_full.bin");
523        create_snapshot_file(tmp_dir.path(), "snapshot_2_0-of-1_diff.bin");
524
525        let mut snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
526        let new_diff_snapshot_path = snapshot_set
527            .create_or_get_snapshot(SnapshotType::Diff, 1, true)
528            .unwrap();
529
530        assert_eq!(
531            new_diff_snapshot_path.single_shard_path(),
532            tmp_dir.path().join("snapshot_2_0-of-1_diff.bin")
533        );
534
535        // Construct a new SnapShotSet to verify that no new files were created on disk
536        drop(snapshot_set);
537        let snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
538        assert_eq!(snapshot_set.snapshots.len(), 3);
539    }
540
541    #[test]
542    fn registers_new_snapshot_path_does_not_reuse_mismatching_shard_count() {
543        let tmp_dir = create_temp_dir();
544        create_snapshot_file(tmp_dir.path(), "snapshot_0_0-of-2_diff.bin");
545        create_snapshot_file(tmp_dir.path(), "snapshot_0_1-of-2_diff.bin");
546
547        let mut snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
548        let new_diff_snapshot_path = snapshot_set
549            .create_or_get_snapshot(SnapshotType::Diff, 1, true)
550            .unwrap();
551
552        assert_eq!(
553            new_diff_snapshot_path.single_shard_path(),
554            tmp_dir.path().join("snapshot_1_0-of-1_diff.bin")
555        );
556    }
557
558    #[test]
559    fn registers_new_snapshot_path_from_empty() {
560        let tmp_dir = create_temp_dir();
561
562        let mut snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
563        let new_diff_snapshot_path = snapshot_set
564            .create_or_get_snapshot(SnapshotType::Diff, 1, true)
565            .unwrap();
566
567        // First ordinal number assigned is 1.
568        assert_eq!(
569            new_diff_snapshot_path.single_shard_path(),
570            tmp_dir.path().join("snapshot_1_0-of-1_diff.bin")
571        );
572    }
573
574    #[test]
575    fn registers_new_snapshot_path_fails_if_files_exist() {
576        let tmp_dir = create_temp_dir();
577
578        let mut snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
579
580        create_snapshot_file(tmp_dir.path(), "snapshot_1_0-of-1_diff.bin");
581
582        let error = snapshot_set
583            .create_or_get_snapshot(SnapshotType::Diff, 1, true)
584            .map_err(|e| e.kind());
585        assert_eq!(error, Err(io::ErrorKind::AlreadyExists));
586    }
587
588    #[test]
589    #[should_panic(
590        expected = "Cannot create completed snapshot directly, use publish_completed_snapshot()"
591    )]
592    fn registers_new_snapshot_path_rejects_full_completed_type() {
593        let tmp_dir = create_temp_dir();
594
595        let mut snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
596
597        snapshot_set
598            .create_or_get_snapshot(SnapshotType::FullCompleted, 1, true)
599            .unwrap();
600    }
601
602    #[test]
603    fn gets_latest_full_snapshot() {
604        let tmp_dir = create_temp_dir();
605        create_snapshot_file(tmp_dir.path(), "snapshot_0_0-of-1_diff.bin");
606        create_snapshot_file(tmp_dir.path(), "snapshot_1_0-of-1_full.bin");
607        create_snapshot_file(tmp_dir.path(), "snapshot_3_0-of-1_full.bin");
608        create_snapshot_file(tmp_dir.path(), "snapshot_2_0-of-1_full.bin");
609
610        let snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
611        let latest_full_snapshot = snapshot_set.get_latest_full_snapshot().unwrap();
612
613        assert_eq!(latest_full_snapshot.ordinal, SnapshotOrdinal(3));
614    }
615
616    #[test]
617    fn gets_all_diff_snapshots_since() {
618        let tmp_dir = create_temp_dir();
619        create_snapshot_file(tmp_dir.path(), "snapshot_1_0-of-1_diff.bin");
620        create_snapshot_file(tmp_dir.path(), "snapshot_2_0-of-1_full.bin");
621        create_snapshot_file(tmp_dir.path(), "snapshot_3_0-of-1_diff.bin");
622        create_snapshot_file(tmp_dir.path(), "snapshot_4_0-of-1_full.bin");
623        create_snapshot_file(tmp_dir.path(), "snapshot_5_0-of-1_diff.bin");
624        create_snapshot_file(tmp_dir.path(), "snapshot_9999_0-of-1_diff.bin");
625
626        let snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
627        let latest_full_snapshot = snapshot_set.get_latest_full_snapshot().unwrap();
628        let diff_snapshots =
629            snapshot_set.get_all_diff_snapshots_since(latest_full_snapshot.ordinal);
630
631        assert_eq!(diff_snapshots.len(), 2);
632        assert_eq!(diff_snapshots[0].ordinal, SnapshotOrdinal(5));
633        assert_eq!(diff_snapshots[1].ordinal, SnapshotOrdinal(9999));
634    }
635
636    #[test]
637    fn publishes_completed_snapshot() {
638        let tmp_dir = create_temp_dir();
639        create_snapshot_file(tmp_dir.path(), "snapshot_1_0-of-1_diff.bin"); // Incorporated into snapshot
640        create_snapshot_file(tmp_dir.path(), "snapshot_2_0-of-1_diff.bin"); // Incorporated into snapshot
641        create_snapshot_file(tmp_dir.path(), "snapshot_3_0-of-1_pending.bin");
642        create_snapshot_file(tmp_dir.path(), "snapshot_4_0-of-1_diff.bin"); // Created after snapshot cut-off
643        let mut snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
644
645        snapshot_set
646            .publish_completed_snapshot(SnapshotOrdinal(3), true, false)
647            .unwrap();
648
649        // Verify that the existing snapshot set reflects the correct change.
650        assert_eq!(snapshot_set.snapshots.len(), 2);
651        assert_eq!(snapshot_set.snapshots[0].ordinal, SnapshotOrdinal(3));
652        assert_eq!(
653            snapshot_set.snapshots[0].snapshot_type,
654            SnapshotType::FullCompleted
655        );
656        assert_eq!(snapshot_set.snapshots[1].ordinal, SnapshotOrdinal(4));
657
658        // Construct a new SnapShotSet to verify that the file changes actually hit disk.
659        drop(snapshot_set);
660        let snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
661        assert_eq!(snapshot_set.snapshots.len(), 2);
662        assert_eq!(snapshot_set.snapshots[0].ordinal, SnapshotOrdinal(3));
663        assert_eq!(
664            snapshot_set.snapshots[0].snapshot_type,
665            SnapshotType::FullCompleted
666        );
667        assert_eq!(snapshot_set.snapshots[1].ordinal, SnapshotOrdinal(4));
668    }
669
670    #[test]
671    fn publishes_completed_snapshot_purges_prior_pending() {
672        let tmp_dir = create_temp_dir();
673        create_snapshot_file(tmp_dir.path(), "snapshot_1_0-of-1_pending.bin"); // Prior abandoned
674        create_snapshot_file(tmp_dir.path(), "snapshot_2_0-of-1_pending.bin");
675        let mut snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
676
677        snapshot_set
678            .publish_completed_snapshot(SnapshotOrdinal(2), false, true)
679            .unwrap();
680
681        // Verify that the existing snapshot set reflects the correct change.
682        assert_eq!(snapshot_set.snapshots.len(), 1);
683        assert_eq!(snapshot_set.snapshots[0].ordinal, SnapshotOrdinal(2));
684
685        // Construct a new SnapShotSet to verify that the file changes actually hit disk.
686        drop(snapshot_set);
687        snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
688        assert_eq!(snapshot_set.snapshots.len(), 1);
689        assert_eq!(snapshot_set.snapshots[0].ordinal, SnapshotOrdinal(2));
690    }
691
692    #[test]
693    fn publishes_completed_snapshot_already_published() {
694        let tmp_dir = create_temp_dir();
695        create_snapshot_file(tmp_dir.path(), "snapshot_1_0-of-1_full.bin");
696        let mut snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
697
698        let error = snapshot_set
699            .publish_completed_snapshot(SnapshotOrdinal(1), true, true)
700            .map_err(|e| e.kind());
701
702        assert_eq!(error, Err(io::ErrorKind::AlreadyExists));
703    }
704
705    #[test]
706    fn publishes_completed_snapshot_not_found() {
707        let tmp_dir = create_temp_dir();
708        create_snapshot_file(tmp_dir.path(), "snapshot_1_0-of-1_full.bin");
709        let mut snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
710
711        let error = snapshot_set
712            .publish_completed_snapshot(SnapshotOrdinal(2), true, true)
713            .map_err(|e| e.kind());
714
715        assert_eq!(error, Err(io::ErrorKind::NotFound));
716    }
717
718    #[test]
719    fn gets_snapshots_to_restore() {
720        let tmp_dir = create_temp_dir();
721        create_snapshot_file(tmp_dir.path(), "snapshot_1_0-of-1_diff.bin");
722        create_snapshot_file(tmp_dir.path(), "snapshot_2_0-of-1_full.bin");
723        create_snapshot_file(tmp_dir.path(), "snapshot_3_0-of-1_diff.bin");
724        create_snapshot_file(tmp_dir.path(), "snapshot_4_0-of-1_pending.bin");
725        create_snapshot_file(tmp_dir.path(), "snapshot_5_0-of-1_diff.bin");
726
727        let snapshot_set = FileSnapshotSet::new(tmp_dir.path()).unwrap();
728        let snapshots_to_restore = snapshot_set.get_snapshots_to_restore();
729
730        assert_eq!(snapshots_to_restore.len(), 3);
731        assert_eq!(snapshots_to_restore[0].ordinal, SnapshotOrdinal(2));
732        assert_eq!(snapshots_to_restore[1].ordinal, SnapshotOrdinal(3));
733        assert_eq!(snapshots_to_restore[2].ordinal, SnapshotOrdinal(5));
734    }
735}