use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use crate::commit::TenantId;
use crate::index::hnsw::DistanceMetric;
use crate::version::{SchemaVersion, WireVersion};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct SnapshotManifestVersion(pub u32);
impl SnapshotManifestVersion {
pub const CURRENT: SnapshotManifestVersion = SnapshotManifestVersion(1);
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct HnswSnapshotEntry {
pub embedding_model: String,
pub vector_dim: u32,
pub distance_metric: DistanceMetric,
pub source_log_watermark: u64,
pub content_key: String,
pub checksum: Option<String>,
pub deleted_count_pending: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EncryptionMetadata {
pub algorithm: String,
pub dek_id: String,
pub iv_b64: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SnapshotManifest {
pub manifest_version: SnapshotManifestVersion,
pub tenant_id: TenantId,
pub snapshot_id: String,
pub created_at_unix_micros: i64,
pub wire_version: WireVersion,
pub table_schema_versions: BTreeMap<String, SchemaVersion>,
pub oplog_watermark: u64,
pub oplog_floor: u64,
pub forget_floor: Option<u64>,
pub sqlite_checkpoint_key: String,
pub sqlite_checkpoint_checksum: Option<String>,
pub hnsw_snapshots: Vec<HnswSnapshotEntry>,
pub encryption: Option<EncryptionMetadata>,
pub label: Option<String>,
}
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum ManifestValidationError {
#[error("oplog_floor ({floor}) must be <= oplog_watermark ({watermark})")]
OplogRangeInverted { floor: u64, watermark: u64 },
#[error(
"forget_floor ({forget_floor}) is above oplog_watermark ({watermark}); \
tombstone reference is outside the snapshot range"
)]
ForgetFloorAboveWatermark { forget_floor: u64, watermark: u64 },
#[error(
"manifest_version ({got:?}) is newer than this build's max ({current:?}); \
upgrade the binary before reading this snapshot"
)]
ManifestVersionUnsupported {
got: SnapshotManifestVersion,
current: SnapshotManifestVersion,
},
#[error(
"snapshot has no SQLite checkpoint key (empty string); \
restore would have nothing to apply"
)]
EmptySqliteCheckpointKey,
#[error(
"RESTORE WOULD RESURRECT DELETED DATA: destination tombstone floor \
({dest_tombstone_floor}) is above snapshot forget_floor ({snap_forget_floor}). \
Memories tombstoned in the destination after the snapshot would reappear. \
Refusing per RFC 011 restore-no-resurrect invariant."
)]
WouldResurrectDeletedData {
dest_tombstone_floor: u64,
snap_forget_floor: u64,
},
#[error("destination wire version ({dest:?}) is incompatible with snapshot ({snap:?})")]
WireVersionMismatch {
dest: WireVersion,
snap: WireVersion,
},
#[error(
"destination configured embedding model `{dest_model}` does not match \
snapshot's HNSW model `{snap_model}` (vector_dim or distance_metric \
would differ)"
)]
HnswModelMismatch {
dest_model: String,
snap_model: String,
},
}
impl SnapshotManifest {
pub fn validate_internal(&self) -> Result<(), ManifestValidationError> {
if self.oplog_floor > self.oplog_watermark {
return Err(ManifestValidationError::OplogRangeInverted {
floor: self.oplog_floor,
watermark: self.oplog_watermark,
});
}
if let Some(ff) = self.forget_floor {
if ff > self.oplog_watermark {
return Err(ManifestValidationError::ForgetFloorAboveWatermark {
forget_floor: ff,
watermark: self.oplog_watermark,
});
}
}
if self.manifest_version.0 > SnapshotManifestVersion::CURRENT.0 {
return Err(ManifestValidationError::ManifestVersionUnsupported {
got: self.manifest_version,
current: SnapshotManifestVersion::CURRENT,
});
}
if self.sqlite_checkpoint_key.is_empty() {
return Err(ManifestValidationError::EmptySqliteCheckpointKey);
}
Ok(())
}
pub fn validate_for_restore(
&self,
dest_wire: WireVersion,
dest_tombstone_floor: u64,
dest_embedding_model: &str,
) -> Result<(), ManifestValidationError> {
self.validate_internal()?;
if dest_wire.major != self.wire_version.major {
return Err(ManifestValidationError::WireVersionMismatch {
dest: dest_wire,
snap: self.wire_version,
});
}
if let Some(ff) = self.forget_floor {
if dest_tombstone_floor > ff {
return Err(ManifestValidationError::WouldResurrectDeletedData {
dest_tombstone_floor,
snap_forget_floor: ff,
});
}
}
let any_match = self
.hnsw_snapshots
.iter()
.any(|s| s.embedding_model == dest_embedding_model);
if !any_match && !self.hnsw_snapshots.is_empty() {
return Err(ManifestValidationError::HnswModelMismatch {
dest_model: dest_embedding_model.to_string(),
snap_model: self.hnsw_snapshots[0].embedding_model.clone(),
});
}
Ok(())
}
pub fn to_json(&self) -> Result<String, serde_json::Error> {
serde_json::to_string_pretty(self)
}
pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
serde_json::from_str(s)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::index::hnsw::DistanceMetric;
fn sample_manifest() -> SnapshotManifest {
SnapshotManifest {
manifest_version: SnapshotManifestVersion::CURRENT,
tenant_id: TenantId::new(1),
snapshot_id: "01934567-89ab-7cde-8000-000000000001".into(),
created_at_unix_micros: 1_700_000_000_000_000,
wire_version: WireVersion::new(1, 0),
table_schema_versions: {
let mut m = BTreeMap::new();
m.insert("memory_commit_log".to_string(), SchemaVersion::new(1));
m.insert("hnsw_manifests".to_string(), SchemaVersion::new(1));
m
},
oplog_watermark: 5_000,
oplog_floor: 1,
forget_floor: Some(2_500),
sqlite_checkpoint_key: "tenants/1/snap-abc/sqlite.db".into(),
sqlite_checkpoint_checksum: Some("aabbcc".into()),
hnsw_snapshots: vec![HnswSnapshotEntry {
embedding_model: "all-MiniLM-L6-v2".into(),
vector_dim: 384,
distance_metric: DistanceMetric::Cosine,
source_log_watermark: 5_000,
content_key: "tenants/1/snap-abc/hnsw-MiniLM.bin".into(),
checksum: Some("ddeeff".into()),
deleted_count_pending: 12,
}],
encryption: Some(EncryptionMetadata {
algorithm: "aes-256-gcm".into(),
dek_id: "tenant-1-dek-v3".into(),
iv_b64: "AAECAwQFBgcICQoLDA0ODw==".into(),
}),
label: Some("pre-migration".into()),
}
}
#[test]
fn current_version_is_one() {
assert_eq!(SnapshotManifestVersion::CURRENT.0, 1);
}
#[test]
fn json_round_trip_is_lossless() {
let m = sample_manifest();
let json = m.to_json().unwrap();
let back = SnapshotManifest::from_json(&json).unwrap();
assert_eq!(m, back);
}
#[test]
fn validate_internal_passes_on_sample() {
sample_manifest().validate_internal().unwrap();
}
#[test]
fn validate_internal_rejects_inverted_oplog_range() {
let mut m = sample_manifest();
m.oplog_floor = 9_000;
m.oplog_watermark = 1_000;
match m.validate_internal() {
Err(ManifestValidationError::OplogRangeInverted { floor, watermark }) => {
assert_eq!(floor, 9_000);
assert_eq!(watermark, 1_000);
}
other => panic!("expected OplogRangeInverted, got {other:?}"),
}
}
#[test]
fn validate_internal_rejects_forget_floor_above_watermark() {
let mut m = sample_manifest();
m.forget_floor = Some(99_999);
match m.validate_internal() {
Err(ManifestValidationError::ForgetFloorAboveWatermark { .. }) => {}
other => panic!("expected ForgetFloorAboveWatermark, got {other:?}"),
}
}
#[test]
fn validate_internal_rejects_future_manifest_version() {
let mut m = sample_manifest();
m.manifest_version = SnapshotManifestVersion(99);
assert!(matches!(
m.validate_internal(),
Err(ManifestValidationError::ManifestVersionUnsupported { .. })
));
}
#[test]
fn validate_internal_rejects_empty_sqlite_key() {
let mut m = sample_manifest();
m.sqlite_checkpoint_key.clear();
assert!(matches!(
m.validate_internal(),
Err(ManifestValidationError::EmptySqliteCheckpointKey)
));
}
#[test]
fn forget_floor_optional_is_allowed() {
let mut m = sample_manifest();
m.forget_floor = None; m.validate_internal().unwrap();
}
#[test]
fn validate_for_restore_passes_on_compatible_destination() {
let m = sample_manifest();
m.validate_for_restore(WireVersion::new(1, 0), 100, "all-MiniLM-L6-v2")
.unwrap();
}
#[test]
fn validate_for_restore_refuses_resurrect_scenario() {
let m = sample_manifest();
let result = m.validate_for_restore(
WireVersion::new(1, 0),
5_000, "all-MiniLM-L6-v2",
);
match result {
Err(ManifestValidationError::WouldResurrectDeletedData {
dest_tombstone_floor,
snap_forget_floor,
}) => {
assert_eq!(dest_tombstone_floor, 5_000);
assert_eq!(snap_forget_floor, 2_500);
}
other => panic!("expected WouldResurrectDeletedData, got {other:?}"),
}
}
#[test]
fn validate_for_restore_no_forget_floor_means_safe() {
let mut m = sample_manifest();
m.forget_floor = None;
m.validate_for_restore(WireVersion::new(1, 0), 999_999, "all-MiniLM-L6-v2")
.unwrap();
}
#[test]
fn validate_for_restore_refuses_wire_major_mismatch() {
let m = sample_manifest();
let result = m.validate_for_restore(
WireVersion::new(2, 0), 100,
"all-MiniLM-L6-v2",
);
assert!(matches!(
result,
Err(ManifestValidationError::WireVersionMismatch { .. })
));
}
#[test]
fn validate_for_restore_refuses_hnsw_model_mismatch() {
let m = sample_manifest();
let result = m.validate_for_restore(
WireVersion::new(1, 0),
100,
"bge-base", );
assert!(matches!(
result,
Err(ManifestValidationError::HnswModelMismatch { .. })
));
}
#[test]
fn validate_for_restore_allows_no_hnsw_snapshots() {
let mut m = sample_manifest();
m.hnsw_snapshots.clear();
m.validate_for_restore(WireVersion::new(1, 0), 100, "anything")
.unwrap();
}
#[test]
fn validate_for_restore_matches_among_multiple_hnsw_snapshots() {
let mut m = sample_manifest();
m.hnsw_snapshots.push(HnswSnapshotEntry {
embedding_model: "bge-base".into(),
vector_dim: 768,
distance_metric: DistanceMetric::Cosine,
source_log_watermark: 5_000,
content_key: "tenants/1/snap-abc/hnsw-bge.bin".into(),
checksum: None,
deleted_count_pending: 0,
});
m.validate_for_restore(WireVersion::new(1, 0), 100, "bge-base")
.unwrap();
m.validate_for_restore(WireVersion::new(1, 0), 100, "all-MiniLM-L6-v2")
.unwrap();
}
#[test]
fn encryption_optional_means_unencrypted() {
let mut m = sample_manifest();
m.encryption = None;
m.validate_internal().unwrap();
let json = m.to_json().unwrap();
let back = SnapshotManifest::from_json(&json).unwrap();
assert!(back.encryption.is_none());
}
#[test]
fn label_is_optional() {
let mut m = sample_manifest();
m.label = None;
m.validate_internal().unwrap();
}
}