use crate::backend::native::types::{NativeBackendError, NativeResult};
use crate::backend::native::v2::export::ExportManifest;
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum SnapshotLifecycleState {
Creating,
Stable,
Verifying,
Importable,
Applied,
Obsolete,
}
impl SnapshotLifecycleState {
pub fn is_terminal(self) -> bool {
matches!(
self,
SnapshotLifecycleState::Applied | SnapshotLifecycleState::Obsolete
)
}
pub fn allows_export(self) -> bool {
matches!(
self,
SnapshotLifecycleState::Stable | SnapshotLifecycleState::Importable
)
}
pub fn allows_import(self) -> bool {
matches!(self, SnapshotLifecycleState::Importable)
}
}
pub struct SnapshotLifecycleInspector {
export_dir: PathBuf,
}
impl SnapshotLifecycleInspector {
pub fn new(export_dir: &Path) -> Self {
Self {
export_dir: export_dir.to_path_buf(),
}
}
pub fn determine_state(&self) -> NativeResult<SnapshotLifecycleState> {
let manifest_path = self.export_dir.join("export.manifest");
let snapshot_files = self.list_snapshot_files()?;
if self.is_export_in_progress(&snapshot_files)? {
return Ok(SnapshotLifecycleState::Creating);
}
if self.is_snapshot_stable(&manifest_path, &snapshot_files)? {
if self.is_snapshot_importable(&manifest_path, &snapshot_files)? {
return Ok(SnapshotLifecycleState::Importable);
}
return Ok(SnapshotLifecycleState::Stable);
}
if self.is_validation_in_progress(&snapshot_files)? {
return Ok(SnapshotLifecycleState::Verifying);
}
if self.is_snapshot_importable(&manifest_path, &snapshot_files)? {
return Ok(SnapshotLifecycleState::Importable);
}
if self.is_snapshot_applied(&snapshot_files)? {
return Ok(SnapshotLifecycleState::Applied);
}
if snapshot_files.is_empty() {
return Ok(SnapshotLifecycleState::Obsolete);
}
Ok(SnapshotLifecycleState::Obsolete)
}
fn is_export_in_progress(&self, snapshot_files: &[PathBuf]) -> NativeResult<bool> {
if snapshot_files.iter().any(|p| {
p.extension().map_or(false, |ext| ext == "tmp")
&& !p.file_name().map_or(false, |name| {
name.to_str().map_or(false, |s| s.ends_with(".complete"))
})
}) {
return Ok(true);
}
let manifest_path = self.export_dir.join("export.manifest");
let has_snapshot_files = snapshot_files.iter().any(|p| {
p.extension().map_or(false, |ext| ext == "v2")
|| p.file_name().map_or(false, |name| {
name.to_str().map_or(false, |s| s == "export.manifest")
})
});
if !manifest_path.exists() && has_snapshot_files {
return Ok(true);
}
Ok(false)
}
fn is_snapshot_stable(
&self,
manifest_path: &Path,
snapshot_files: &[PathBuf],
) -> NativeResult<bool> {
if !manifest_path.exists() {
return Ok(false);
}
let manifest = match self.read_manifest(manifest_path) {
Ok(m) => m,
Err(_) => return Ok(false),
};
if !self.all_required_files_present(&manifest, snapshot_files)? {
return Ok(false);
}
if snapshot_files
.iter()
.any(|p| p.extension().map_or(false, |ext| ext == "tmp"))
{
return Ok(false);
}
Ok(true)
}
fn is_validation_in_progress(&self, _snapshot_files: &[PathBuf]) -> NativeResult<bool> {
let validation_lock = self.export_dir.join("validation.lock");
if validation_lock.exists() {
return Ok(true);
}
Ok(false)
}
fn is_snapshot_importable(
&self,
manifest_path: &Path,
snapshot_files: &[PathBuf],
) -> NativeResult<bool> {
if !self.is_snapshot_stable(manifest_path, snapshot_files)? {
return Ok(false);
}
let validation_marker = self.export_dir.join("validation.complete");
if !validation_marker.exists() {
return Ok(false);
}
let validation_error = self.export_dir.join("validation.error");
if validation_error.exists() {
return Ok(false);
}
Ok(true)
}
fn is_snapshot_applied(&self, _snapshot_files: &[PathBuf]) -> NativeResult<bool> {
let import_marker = self.export_dir.join("import.complete");
if import_marker.exists() {
return Ok(true);
}
Ok(false)
}
fn list_snapshot_files(&self) -> NativeResult<Vec<PathBuf>> {
if !self.export_dir.exists() {
return Ok(vec![]);
}
let mut files = Vec::new();
let entries = match std::fs::read_dir(&self.export_dir) {
Ok(entries) => entries,
Err(_) => return Ok(vec![]),
};
for entry in entries {
let entry = match entry {
Ok(entry) => entry,
Err(_) => continue,
};
let path = entry.path();
if let Some(name) = path.file_name() {
if let Some(name_str) = name.to_str() {
if name_str.ends_with(".v2")
|| name_str.ends_with(".tmp")
|| name_str == "export.manifest"
|| name_str.starts_with("validation.")
|| name_str.starts_with("import.")
{
files.push(path);
}
}
}
}
Ok(files)
}
fn read_manifest(&self, manifest_path: &Path) -> NativeResult<ExportManifest> {
use crate::backend::native::v2::export::ManifestSerializer;
ManifestSerializer::read_from_file(manifest_path)
}
fn all_required_files_present(
&self,
_manifest: &ExportManifest,
snapshot_files: &[PathBuf],
) -> NativeResult<bool> {
let v2_files: Vec<&PathBuf> = snapshot_files
.iter()
.filter(|p| p.extension().map_or(false, |ext| ext == "v2"))
.collect();
if v2_files.len() != 1 {
return Ok(false);
}
let snapshot_file = &v2_files[0];
if !snapshot_file.exists() {
return Ok(false);
}
let metadata = match std::fs::metadata(snapshot_file) {
Ok(meta) => meta,
Err(_) => return Ok(false),
};
if metadata.len() < 80 {
return Ok(false);
}
Ok(true)
}
pub fn validate_transition(
&self,
from: SnapshotLifecycleState,
to: SnapshotLifecycleState,
) -> NativeResult<()> {
match (from, to) {
(SnapshotLifecycleState::Creating, SnapshotLifecycleState::Stable) => Ok(()),
(SnapshotLifecycleState::Creating, SnapshotLifecycleState::Obsolete) => Ok(()),
(SnapshotLifecycleState::Stable, SnapshotLifecycleState::Verifying) => Ok(()),
(SnapshotLifecycleState::Stable, SnapshotLifecycleState::Obsolete) => Ok(()),
(SnapshotLifecycleState::Verifying, SnapshotLifecycleState::Stable) => Ok(()), (SnapshotLifecycleState::Verifying, SnapshotLifecycleState::Importable) => Ok(()),
(SnapshotLifecycleState::Verifying, SnapshotLifecycleState::Obsolete) => Ok(()),
(SnapshotLifecycleState::Importable, SnapshotLifecycleState::Applied) => Ok(()),
(SnapshotLifecycleState::Importable, SnapshotLifecycleState::Obsolete) => Ok(()),
(SnapshotLifecycleState::Applied, SnapshotLifecycleState::Obsolete) => Ok(()),
(from, to) if from == to => Ok(()),
(from, to) => Err(NativeBackendError::InvalidState {
context: format!(
"Invalid snapshot lifecycle state transition from {:?} to {:?}",
from, to
),
source: None,
}),
}
}
pub fn get_metadata(&self) -> NativeResult<SnapshotMetadata> {
let state = self.determine_state()?;
let manifest_path = self.export_dir.join("export.manifest");
let (manifest_exists, export_timestamp) = if manifest_path.exists() {
let metadata =
std::fs::metadata(&manifest_path).map_err(|e| NativeBackendError::Io(e))?;
let timestamp = metadata
.created()
.or_else(|_| metadata.modified())
.or_else(|_| Ok(SystemTime::now()))
.map_err(
|_: std::time::SystemTimeError| NativeBackendError::CorruptionDetected {
context: "Cannot determine manifest timestamp".to_string(),
source: None,
},
)?
.duration_since(UNIX_EPOCH)
.map_err(
|_: std::time::SystemTimeError| NativeBackendError::CorruptionDetected {
context: "Invalid manifest timestamp".to_string(),
source: None,
},
)?
.as_secs();
(true, timestamp)
} else {
(false, 0)
};
Ok(SnapshotMetadata {
state,
export_dir: self.export_dir.clone(),
manifest_exists,
export_timestamp,
inspection_timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(
|_: std::time::SystemTimeError| NativeBackendError::CorruptionDetected {
context: "System clock error".to_string(),
source: None,
},
)?
.as_secs(),
})
}
}
#[derive(Debug, Clone)]
pub struct SnapshotMetadata {
pub state: SnapshotLifecycleState,
pub export_dir: PathBuf,
pub manifest_exists: bool,
pub export_timestamp: u64,
pub inspection_timestamp: u64,
}
impl SnapshotMetadata {
pub fn age_seconds(&self) -> u64 {
self.inspection_timestamp
.saturating_sub(self.export_timestamp)
}
pub fn is_stale(&self, max_age_seconds: u64) -> bool {
self.age_seconds() > max_age_seconds
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::native::constants::MAGIC_BYTES;
use crate::backend::native::v2::export::ExportManifest;
use crate::backend::native::v2::wal::recovery::states::{
Authority, RecoveryState as ExplicitRecoveryState,
};
use std::fs;
use tempfile::{NamedTempFile, TempDir};
#[test]
fn test_snapshot_lifecycle_clean_export() {
let temp_dir = TempDir::new().unwrap();
let inspector = SnapshotLifecycleInspector::new(temp_dir.path());
let state = inspector.determine_state().unwrap();
assert_eq!(state, SnapshotLifecycleState::Obsolete);
let snapshot_file = temp_dir.path().join("test_snapshot.v2");
create_minimal_v2_file(&snapshot_file);
let manifest = create_test_manifest();
write_manifest(temp_dir.path(), &manifest);
let state = inspector.determine_state().unwrap();
assert_eq!(state, SnapshotLifecycleState::Stable);
assert!(state.allows_export());
assert!(!state.allows_import());
inspector
.validate_transition(
SnapshotLifecycleState::Stable,
SnapshotLifecycleState::Verifying,
)
.unwrap();
}
#[test]
fn test_snapshot_lifecycle_incomplete_export() {
let temp_dir = TempDir::new().unwrap();
let inspector = SnapshotLifecycleInspector::new(temp_dir.path());
let temp_file = temp_dir.path().join("test_snapshot.tmp");
fs::write(&temp_file, b"temporary data").unwrap();
let state = inspector.determine_state().unwrap();
assert_eq!(state, SnapshotLifecycleState::Creating);
}
#[test]
fn test_snapshot_lifecycle_importable() {
let temp_dir = TempDir::new().unwrap();
let inspector = SnapshotLifecycleInspector::new(temp_dir.path());
let snapshot_file = temp_dir.path().join("test_snapshot.v2");
create_minimal_v2_file(&snapshot_file);
let manifest = create_test_manifest();
write_manifest(temp_dir.path(), &manifest);
let validation_marker = temp_dir.path().join("validation.complete");
fs::write(&validation_marker, b"validation complete").unwrap();
let state = inspector.determine_state().unwrap();
assert_eq!(state, SnapshotLifecycleState::Importable);
}
#[test]
fn test_snapshot_lifecycle_obsolete_after_import() {
let temp_dir = TempDir::new().unwrap();
let inspector = SnapshotLifecycleInspector::new(temp_dir.path());
let import_marker = temp_dir.path().join("import.complete");
fs::write(&import_marker, b"import complete").unwrap();
let state = inspector.determine_state().unwrap();
assert_eq!(state, SnapshotLifecycleState::Applied);
}
#[test]
fn test_lifecycle_state_properties() {
assert!(!SnapshotLifecycleState::Creating.is_terminal());
assert!(!SnapshotLifecycleState::Stable.is_terminal());
assert!(!SnapshotLifecycleState::Verifying.is_terminal());
assert!(!SnapshotLifecycleState::Importable.is_terminal());
assert!(SnapshotLifecycleState::Applied.is_terminal());
assert!(SnapshotLifecycleState::Obsolete.is_terminal());
assert!(!SnapshotLifecycleState::Creating.allows_export());
assert!(SnapshotLifecycleState::Stable.allows_export());
assert!(SnapshotLifecycleState::Importable.allows_export());
assert!(!SnapshotLifecycleState::Stable.allows_import());
assert!(SnapshotLifecycleState::Importable.allows_import());
}
#[test]
fn test_snapshot_metadata() {
let temp_dir = TempDir::new().unwrap();
let inspector = SnapshotLifecycleInspector::new(temp_dir.path());
let metadata = inspector.get_metadata().unwrap();
assert_eq!(metadata.state, SnapshotLifecycleState::Obsolete);
assert!(!metadata.manifest_exists);
assert_eq!(metadata.export_timestamp, 0);
assert!(metadata.inspection_timestamp > 0);
}
fn create_minimal_v2_file(path: &Path) {
use std::io::Write;
let mut file = fs::File::create(path).unwrap();
file.write_all(&MAGIC_BYTES).unwrap();
let padding = vec![0u8; 80 - MAGIC_BYTES.len()];
file.write_all(&padding).unwrap();
file.sync_all().unwrap();
}
fn create_test_manifest() -> ExportManifest {
use crate::backend::native::v2::export::ExportMode;
ExportManifest {
magic: crate::backend::native::v2::export::ExportManifest::MAGIC,
version: crate::backend::native::v2::export::ExportManifest::VERSION,
recovery_state: ExplicitRecoveryState::CleanShutdown,
authority: Authority::GraphFile,
export_mode: ExportMode::Snapshot,
graph_checkpoint_lsn: 0,
wal_start_lsn: None,
wal_end_lsn: None,
graph_format_version: 2,
wal_format_version: 1,
v2_clustered_edges: true,
export_timestamp: 1704067200,
export_duration_ms: 150,
graph_checksum: 1234567890,
wal_checksum: None,
total_records: 42,
total_bytes: 1048576,
reserved: [0; 8],
}
}
fn write_manifest(export_dir: &Path, manifest: &ExportManifest) {
use crate::backend::native::v2::export::ManifestSerializer;
let manifest_path = export_dir.join("export.manifest");
ManifestSerializer::write_to_file(manifest, manifest_path).unwrap();
}
}