use crate::{AzothDb, AzothError, BackupManifest, BackupOptions, Result};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckpointMetadata {
pub id: String,
pub timestamp: DateTime<Utc>,
pub sealed_event_id: u64,
pub size_bytes: u64,
pub name: Option<String>,
pub storage_type: String,
}
#[async_trait]
pub trait CheckpointStorage: Send + Sync {
async fn upload(&self, path: &Path, metadata: &CheckpointMetadata) -> Result<String>;
async fn download(&self, id: &str, path: &Path) -> Result<()>;
async fn delete(&self, id: &str) -> Result<()> {
let _ = id;
Err(AzothError::Config(
"Delete operation not supported by this storage backend".to_string(),
))
}
async fn list(&self) -> Result<Vec<CheckpointMetadata>> {
Err(AzothError::Config(
"List operation not supported by this storage backend".to_string(),
))
}
fn storage_type(&self) -> &str;
}
pub struct LocalStorage {
base_path: PathBuf,
}
impl LocalStorage {
pub fn new(base_path: PathBuf) -> Self {
Self { base_path }
}
fn safe_path(&self, id: &str) -> Result<PathBuf> {
if id.is_empty() {
return Err(AzothError::Config(
"Checkpoint id must not be empty".to_string(),
));
}
let is_safe = id
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.');
if !is_safe || id.contains("..") {
return Err(AzothError::Config(format!(
"Checkpoint id '{}' contains unsafe characters. \
Only alphanumeric, hyphen, underscore, and period are allowed.",
id
)));
}
let resolved = self.base_path.join(id);
if resolved.parent() != Some(&self.base_path) {
return Err(AzothError::Config(format!(
"Checkpoint id '{}' resolves outside the storage directory",
id
)));
}
Ok(resolved)
}
}
#[async_trait]
impl CheckpointStorage for LocalStorage {
async fn upload(&self, path: &Path, metadata: &CheckpointMetadata) -> Result<String> {
std::fs::create_dir_all(&self.base_path)?;
let filename = format!(
"{}-{}.tar",
metadata.timestamp.format("%Y%m%d-%H%M%S"),
&metadata.id
);
let dest_path = self.safe_path(&filename)?;
std::fs::copy(path, &dest_path)?;
let metadata_filename = format!("{}.json", &filename);
let metadata_path = self.safe_path(&metadata_filename)?;
let metadata_json = serde_json::to_string_pretty(metadata)
.map_err(|e| AzothError::Serialization(e.to_string()))?;
std::fs::write(metadata_path, metadata_json)?;
Ok(filename)
}
async fn download(&self, id: &str, path: &Path) -> Result<()> {
let src_path = self.safe_path(id)?;
if !src_path.exists() {
return Err(AzothError::NotFound(format!(
"Checkpoint not found: {}",
id
)));
}
std::fs::copy(&src_path, path)?;
Ok(())
}
async fn delete(&self, id: &str) -> Result<()> {
let checkpoint_path = self.safe_path(id)?;
let metadata_filename = format!("{}.json", id);
let metadata_path = self.safe_path(&metadata_filename)?;
if checkpoint_path.exists() {
std::fs::remove_file(&checkpoint_path)?;
}
if metadata_path.exists() {
std::fs::remove_file(&metadata_path)?;
}
Ok(())
}
async fn list(&self) -> Result<Vec<CheckpointMetadata>> {
let mut checkpoints = Vec::new();
if !self.base_path.exists() {
return Ok(checkpoints);
}
for entry in std::fs::read_dir(&self.base_path)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("json") {
let metadata_json = std::fs::read_to_string(&path)?;
let metadata: CheckpointMetadata = serde_json::from_str(&metadata_json)
.map_err(|e| AzothError::Serialization(e.to_string()))?;
checkpoints.push(metadata);
}
}
checkpoints.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
Ok(checkpoints)
}
fn storage_type(&self) -> &str {
"local"
}
}
#[derive(Clone)]
pub struct CheckpointConfig {
pub backup_options: BackupOptions,
pub temp_dir: Option<PathBuf>,
}
impl CheckpointConfig {
pub fn new() -> Self {
Self {
backup_options: BackupOptions::new(),
temp_dir: None,
}
}
pub fn with_encryption(mut self, key: crate::EncryptionKey) -> Self {
self.backup_options = self.backup_options.with_encryption(key);
self
}
pub fn with_compression(mut self, enabled: bool) -> Self {
self.backup_options = self.backup_options.with_compression(enabled);
self
}
pub fn with_compression_level(mut self, level: u32) -> Self {
self.backup_options = self.backup_options.with_compression_level(level);
self
}
pub fn with_temp_dir(mut self, dir: PathBuf) -> Self {
self.temp_dir = Some(dir);
self
}
}
impl Default for CheckpointConfig {
fn default() -> Self {
Self::new()
}
}
pub struct CheckpointManager<S: CheckpointStorage> {
db: Arc<AzothDb>,
storage: Arc<S>,
config: CheckpointConfig,
}
impl<S: CheckpointStorage> CheckpointManager<S> {
pub fn new(db: Arc<AzothDb>, storage: S, config: CheckpointConfig) -> Self {
Self {
db,
storage: Arc::new(storage),
config,
}
}
pub async fn create_checkpoint(&self) -> Result<CheckpointMetadata> {
let temp_dir = if let Some(ref dir) = self.config.temp_dir {
std::fs::create_dir_all(dir)?;
tempfile::tempdir_in(dir)?
} else {
tempfile::tempdir()?
};
tracing::info!("Creating checkpoint in {}", temp_dir.path().display());
self.db
.backup_with_options(temp_dir.path(), &self.config.backup_options)?;
let archive_dir = if let Some(ref dir) = self.config.temp_dir {
std::fs::create_dir_all(dir)?;
tempfile::tempdir_in(dir)?
} else {
tempfile::tempdir()?
};
let timestamp = Utc::now();
let temp_id = format!("checkpoint-{}", timestamp.format("%Y%m%d-%H%M%S"));
let archive_path = archive_dir.path().join(format!("{}.tar", &temp_id));
self.create_archive(temp_dir.path(), &archive_path)?;
let manifest_path = temp_dir.path().join("manifest.json");
let manifest_json = std::fs::read_to_string(&manifest_path)?;
let manifest: BackupManifest = serde_json::from_str(&manifest_json)
.map_err(|e| AzothError::Serialization(e.to_string()))?;
let sealed_event_id = manifest.sealed_event_id;
let size_bytes = std::fs::metadata(&archive_path)?.len();
let metadata = CheckpointMetadata {
id: temp_id.clone(),
timestamp,
sealed_event_id,
size_bytes,
name: None,
storage_type: self.storage.storage_type().to_string(),
};
tracing::info!(
"Uploading checkpoint to {} storage",
self.storage.storage_type()
);
let storage_id = self.storage.upload(&archive_path, &metadata).await?;
let final_metadata = CheckpointMetadata {
id: storage_id.clone(),
..metadata
};
tracing::info!(
"Checkpoint uploaded successfully: id={}, size={} bytes",
storage_id,
size_bytes
);
Ok(final_metadata)
}
pub async fn restore_checkpoint(&self, checkpoint_id: &str, target_path: &Path) -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let archive_path = temp_dir.path().join("checkpoint.tar");
tracing::info!("Downloading checkpoint: {}", checkpoint_id);
self.storage.download(checkpoint_id, &archive_path).await?;
let extract_dir = temp_dir.path().join("extracted");
std::fs::create_dir_all(&extract_dir)?;
let archive_file = File::open(&archive_path)?;
let mut archive = tar::Archive::new(archive_file);
archive.unpack(&extract_dir).map_err(AzothError::Io)?;
tracing::info!("Restoring database to {}", target_path.display());
AzothDb::restore_with_options(
extract_dir.as_path(),
target_path,
&self.config.backup_options,
)?;
tracing::info!("Checkpoint restored successfully");
Ok(())
}
pub async fn list_checkpoints(&self) -> Result<Vec<CheckpointMetadata>> {
self.storage.list().await
}
pub async fn delete_checkpoint(&self, checkpoint_id: &str) -> Result<()> {
self.storage.delete(checkpoint_id).await
}
pub async fn shutdown_checkpoint(&self) -> Result<CheckpointMetadata> {
tracing::info!("Creating shutdown checkpoint...");
let mut metadata = self.create_checkpoint().await?;
metadata.name = Some("shutdown".to_string());
tracing::info!("Shutdown checkpoint created successfully: {}", metadata.id);
Ok(metadata)
}
fn create_archive(&self, backup_dir: &Path, output_path: &Path) -> Result<()> {
let output_file = File::create(output_path)?;
let mut tar_builder = tar::Builder::new(output_file);
tar_builder
.append_dir_all(".", backup_dir)
.map_err(AzothError::Io)?;
tar_builder.finish().map_err(AzothError::Io)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_local_storage() -> Result<()> {
let temp_dir = TempDir::new()?;
let storage = LocalStorage::new(temp_dir.path().to_path_buf());
let test_file = temp_dir.path().join("test.tar");
std::fs::write(&test_file, b"test data")?;
let metadata = CheckpointMetadata {
id: "test-checkpoint".to_string(),
timestamp: Utc::now(),
sealed_event_id: 42,
size_bytes: 9,
name: Some("Test Checkpoint".to_string()),
storage_type: "local".to_string(),
};
let id = storage.upload(&test_file, &metadata).await?;
assert!(id.ends_with(".tar"));
let checkpoints = storage.list().await?;
assert_eq!(checkpoints.len(), 1);
assert_eq!(checkpoints[0].sealed_event_id, 42);
let download_path = temp_dir.path().join("downloaded.tar");
storage.download(&id, &download_path).await?;
let downloaded_data = std::fs::read(&download_path)?;
assert_eq!(downloaded_data, b"test data");
storage.delete(&id).await?;
let checkpoints = storage.list().await?;
assert_eq!(checkpoints.len(), 0);
Ok(())
}
}