use super::sync;
use crate::store::cold_start::latest_segment_watermark;
use crate::store::file_classification::StoreFileKind;
use crate::store::snapshot_report::{
destination_path_digest, snapshot_evidence_report, SnapshotEvidenceReport, SnapshotFileKind,
SnapshotFinding, SnapshotOptions, SnapshotReportInput,
};
use crate::store::{Open, Store, StoreError};
#[derive(Default)]
struct SnapshotCopyAcc {
copied_segment_ids_sorted: Vec<u64>,
copied_visibility_ranges_present: bool,
copied_pending_compaction_marker_present: bool,
copied_idempotency_store_present: bool,
}
impl SnapshotCopyAcc {
fn record(&mut self, file_kind: SnapshotFileKind, source_kind: &StoreFileKind) {
match file_kind {
SnapshotFileKind::Segment => {
if let Some(segment_id) = source_kind.segment_id() {
self.copied_segment_ids_sorted.push(segment_id.as_u64());
}
}
SnapshotFileKind::VisibilityRanges => self.copied_visibility_ranges_present = true,
SnapshotFileKind::PendingCompactionMarker => {
self.copied_pending_compaction_marker_present = true;
}
SnapshotFileKind::IdempotencyStore => self.copied_idempotency_store_present = true,
}
}
}
pub(crate) fn snapshot(
store: &Store<Open>,
dest: &std::path::Path,
options: SnapshotOptions,
) -> Result<SnapshotEvidenceReport, StoreError> {
tracing::debug!(
target: "batpak::flow",
flow = "snapshot",
destination = %dest.display()
);
let keys_excluded = super::resolve_keyset_exclusion(store, options.keyset_policy, "snapshot")?;
let fs = store.config.fs();
let _lifecycle = store.lifecycle_gate.lock();
let snapshot_fence = store.begin_visibility_fence()?;
let fence_token = snapshot_fence.token();
sync(store)?;
store
.index
.idemp
.flush(&store.config.data_dir, fs.as_ref())?;
let (source_watermark_segment_id, source_watermark_offset) =
latest_segment_watermark(&store.config.data_dir)?;
fs.reject_symlink_leaf(dest, "snapshot destination")?;
fs.create_dir_all(dest).map_err(StoreError::Io)?;
let cleared_artifact_count = clear_snapshot_store_artifacts(fs.as_ref(), dest)?;
let entries = fs
.read_dir(&store.config.data_dir)
.map_err(StoreError::Io)?;
let mut acc = SnapshotCopyAcc::default();
let mut findings = Vec::new();
if cleared_artifact_count > 0 {
findings.push(SnapshotFinding::DestinationCleared {
artifact_count: cleared_artifact_count,
});
}
for entry in entries {
let entry = entry.map_err(StoreError::Io)?;
let path = entry.path();
let source_kind = StoreFileKind::from_path(&path);
if let Some(file_kind) = snapshot_source_file_kind(&source_kind) {
let dest_path = dest.join(entry.file_name());
fs.reject_symlink_leaf(&dest_path, "snapshot entry")?;
fs.copy(&path, &dest_path).map_err(StoreError::Io)?;
acc.record(file_kind, &source_kind);
}
}
snapshot_fence.cancel()?;
findings.push(SnapshotFinding::FenceTokenCancelled);
findings.push(SnapshotFinding::CopyByteHashUnavailable {
reason:
"snapshot v1 records structural file identity; per-file byte hash table is out of scope"
.to_string(),
file_kind: SnapshotFileKind::Segment,
});
findings.extend(keys_excluded.then_some(SnapshotFinding::KeysExcluded));
Ok(snapshot_evidence_report(SnapshotReportInput {
fence_token,
source_watermark_segment_id,
source_watermark_offset,
copied_segment_ids_sorted: acc.copied_segment_ids_sorted,
copied_visibility_ranges_present: acc.copied_visibility_ranges_present,
copied_pending_compaction_marker_present: acc.copied_pending_compaction_marker_present,
copied_idempotency_store_present: acc.copied_idempotency_store_present,
destination_path_digest: destination_path_digest(dest),
findings,
})?)
}
fn snapshot_source_file_kind(file_kind: &StoreFileKind) -> Option<SnapshotFileKind> {
if !file_kind.should_copy_into_snapshot() {
return None;
}
match file_kind {
StoreFileKind::Segment(_) => Some(SnapshotFileKind::Segment),
StoreFileKind::VisibilityRanges => Some(SnapshotFileKind::VisibilityRanges),
StoreFileKind::IdempotencyStore => Some(SnapshotFileKind::IdempotencyStore),
StoreFileKind::PendingCompactionMarker => Some(SnapshotFileKind::PendingCompactionMarker),
StoreFileKind::MalformedSegment(_)
| StoreFileKind::Checkpoint
| StoreFileKind::MmapIndex
| StoreFileKind::CompactSource
| StoreFileKind::CursorDirectory
| StoreFileKind::Keyset
| StoreFileKind::Other => None,
}
}
pub(super) fn snapshot_destination_should_clear(path: &std::path::Path) -> bool {
StoreFileKind::from_path(path).should_clear_from_snapshot_destination()
}
pub(super) fn clear_snapshot_store_artifacts(
fs: &dyn crate::store::platform::fs::StoreFs,
dest: &std::path::Path,
) -> Result<usize, StoreError> {
use super::lifecycle_fs::{remove_dir_all_if_present, remove_file_if_present};
let entries = fs.read_dir(dest).map_err(StoreError::Io)?;
let mut removed = 0;
for entry in entries {
let entry = entry.map_err(StoreError::Io)?;
let path = entry.path();
if snapshot_destination_should_clear(&path) {
removed += usize::from(remove_file_if_present(&path)?);
continue;
}
if path.is_dir() && StoreFileKind::from_path(&path) == StoreFileKind::CursorDirectory {
removed += usize::from(remove_dir_all_if_present(&path)?);
}
}
Ok(removed)
}
#[cfg(test)]
mod tests {
use super::clear_snapshot_store_artifacts;
use crate::store::file_classification::CURSOR_DIRECTORY;
use crate::store::platform::fs::{write_file_atomically, RealFs};
use crate::store::StoreError;
#[test]
fn clear_removes_cursor_state_only_when_both_directory_and_name_match() {
let dest = tempfile::TempDir::new().expect("create snapshot destination");
let write_fixture = |name: &str, bytes: &'static [u8]| {
write_file_atomically(dest.path(), &dest.path().join(name), name, |file| {
use std::io::Write;
file.write_all(bytes).map_err(StoreError::Io)
})
.expect("write fixture through the platform seam");
};
write_fixture("000001.fbat", b"seg");
crate::store::platform::fs::create_dir_all(&dest.path().join("caller-owned"))
.expect("create foreign directory");
write_fixture(CURSOR_DIRECTORY, b"not a dir");
let removed = clear_snapshot_store_artifacts(&RealFs, dest.path())
.expect("clearing touches only store artifacts and cannot error here");
assert_eq!(removed, 1, "exactly the segment artifact is cleared");
assert!(
!dest.path().join("000001.fbat").exists(),
"the segment artifact is gone"
);
assert!(
dest.path().join("caller-owned").is_dir(),
"a foreign directory survives: being a directory alone must not qualify"
);
assert!(
dest.path().join(CURSOR_DIRECTORY).is_file(),
"a cursor-NAMED regular file survives: the name alone must not qualify"
);
}
#[test]
fn snapshot_source_file_kind_maps_copyable_kinds_and_rejects_the_rest() {
use super::snapshot_source_file_kind;
use crate::store::file_classification::StoreFileKind;
use crate::store::segment::SegmentId;
use crate::store::snapshot_report::SnapshotFileKind;
let segment = StoreFileKind::Segment(SegmentId::from_stem("4").expect("segment id"));
assert_eq!(
snapshot_source_file_kind(&segment),
Some(SnapshotFileKind::Segment)
);
assert_eq!(
snapshot_source_file_kind(&StoreFileKind::VisibilityRanges),
Some(SnapshotFileKind::VisibilityRanges)
);
assert_eq!(
snapshot_source_file_kind(&StoreFileKind::IdempotencyStore),
Some(SnapshotFileKind::IdempotencyStore)
);
assert_eq!(
snapshot_source_file_kind(&StoreFileKind::PendingCompactionMarker),
Some(SnapshotFileKind::PendingCompactionMarker)
);
assert_eq!(snapshot_source_file_kind(&StoreFileKind::Checkpoint), None);
assert_eq!(snapshot_source_file_kind(&StoreFileKind::Keyset), None);
assert_eq!(snapshot_source_file_kind(&StoreFileKind::Other), None);
}
}