use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::fs;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use crate::error::{ClusterError, Result};
use crate::raft::OxirsNodeId;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BackupFormat {
Json,
Binary,
CompressedBinary,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BackupType {
Full,
Incremental,
Snapshot,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BackupStatus {
InProgress,
Completed,
Failed,
Verified,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupConfig {
pub backup_dir: PathBuf,
pub format: BackupFormat,
pub enable_compression: bool,
pub compression_level: i32,
pub enable_encryption: bool,
pub verify_after_backup: bool,
pub max_backup_age_days: u32,
pub max_backups: u32,
}
impl Default for BackupConfig {
fn default() -> Self {
Self {
backup_dir: PathBuf::from("./backups"),
format: BackupFormat::CompressedBinary,
enable_compression: true,
compression_level: 6,
enable_encryption: false,
verify_after_backup: true,
max_backup_age_days: 30,
max_backups: 10,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupMetadata {
pub id: String,
pub backup_type: BackupType,
pub format: BackupFormat,
pub created_at: SystemTime,
pub node_id: OxirsNodeId,
pub file_path: PathBuf,
pub size_bytes: u64,
pub triple_count: u64,
pub status: BackupStatus,
pub checksum: Option<String>,
pub parent_backup_id: Option<String>,
pub completed_at: Option<SystemTime>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RestoreOptions {
pub backup_id: String,
pub skip_verification: bool,
pub overwrite_existing: bool,
pub point_in_time: Option<SystemTime>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct BackupStatistics {
pub total_backups: u64,
pub total_bytes_backed_up: u64,
pub total_restores: u64,
pub failed_backups: u64,
pub failed_restores: u64,
pub last_backup: Option<SystemTime>,
pub last_restore: Option<SystemTime>,
pub avg_backup_time_ms: f64,
pub avg_restore_time_ms: f64,
}
pub struct BackupRestoreManager {
config: BackupConfig,
backups: Arc<RwLock<HashMap<String, BackupMetadata>>>,
stats: Arc<RwLock<BackupStatistics>>,
node_id: OxirsNodeId,
}
impl BackupRestoreManager {
pub async fn new(config: BackupConfig, node_id: OxirsNodeId) -> Result<Self> {
fs::create_dir_all(&config.backup_dir).await.map_err(|e| {
ClusterError::Other(format!("Failed to create backup directory: {}", e))
})?;
let manager = Self {
config,
backups: Arc::new(RwLock::new(HashMap::new())),
stats: Arc::new(RwLock::new(BackupStatistics::default())),
node_id,
};
manager.load_backup_metadata().await?;
Ok(manager)
}
pub async fn create_full_backup(&self, data: Vec<(String, String, String)>) -> Result<String> {
let backup_id = uuid::Uuid::new_v4().to_string();
let timestamp = SystemTime::now();
info!("Creating full backup: {}", backup_id);
let file_name = format!("backup_full_{}_{}.dat", self.node_id, backup_id);
let file_path = self.config.backup_dir.join(&file_name);
let start_time = std::time::Instant::now();
let serialized_data = self.serialize_data(&data)?;
let final_data = if self.config.enable_compression {
self.compress_data(&serialized_data)?
} else {
serialized_data
};
fs::write(&file_path, &final_data)
.await
.map_err(|e| ClusterError::Other(format!("Failed to write backup: {}", e)))?;
let size_bytes = final_data.len() as u64;
let elapsed = start_time.elapsed().as_millis() as f64;
let checksum = self.calculate_checksum(&final_data);
let metadata = BackupMetadata {
id: backup_id.clone(),
backup_type: BackupType::Full,
format: self.config.format,
created_at: timestamp,
node_id: self.node_id,
file_path: file_path.clone(),
size_bytes,
triple_count: data.len() as u64,
status: BackupStatus::Completed,
checksum: Some(checksum),
parent_backup_id: None,
completed_at: Some(SystemTime::now()),
error: None,
};
{
let mut backups = self.backups.write().await;
backups.insert(backup_id.clone(), metadata.clone());
}
{
let mut stats = self.stats.write().await;
stats.total_backups += 1;
stats.total_bytes_backed_up += size_bytes;
stats.last_backup = Some(timestamp);
if stats.total_backups > 1 {
stats.avg_backup_time_ms =
(stats.avg_backup_time_ms * (stats.total_backups - 1) as f64 + elapsed)
/ stats.total_backups as f64;
} else {
stats.avg_backup_time_ms = elapsed;
}
}
if self.config.verify_after_backup {
self.verify_backup(&backup_id).await?;
}
self.cleanup_old_backups().await?;
info!(
"Full backup completed: {} ({} bytes, {} triples, {:.2}ms)",
backup_id,
size_bytes,
data.len(),
elapsed
);
Ok(backup_id)
}
pub async fn create_incremental_backup(
&self,
parent_backup_id: &str,
changed_data: Vec<(String, String, String)>,
) -> Result<String> {
{
let backups = self.backups.read().await;
if !backups.contains_key(parent_backup_id) {
return Err(ClusterError::Other(format!(
"Parent backup not found: {}",
parent_backup_id
)));
}
}
let backup_id = uuid::Uuid::new_v4().to_string();
let timestamp = SystemTime::now();
info!(
"Creating incremental backup: {} (parent: {})",
backup_id, parent_backup_id
);
let file_name = format!("backup_incr_{}_{}.dat", self.node_id, backup_id);
let file_path = self.config.backup_dir.join(&file_name);
let start_time = std::time::Instant::now();
let serialized_data = self.serialize_data(&changed_data)?;
let final_data = if self.config.enable_compression {
self.compress_data(&serialized_data)?
} else {
serialized_data
};
fs::write(&file_path, &final_data)
.await
.map_err(|e| ClusterError::Other(format!("Failed to write backup: {}", e)))?;
let size_bytes = final_data.len() as u64;
let elapsed = start_time.elapsed().as_millis() as f64;
let checksum = self.calculate_checksum(&final_data);
let metadata = BackupMetadata {
id: backup_id.clone(),
backup_type: BackupType::Incremental,
format: self.config.format,
created_at: timestamp,
node_id: self.node_id,
file_path,
size_bytes,
triple_count: changed_data.len() as u64,
status: BackupStatus::Completed,
checksum: Some(checksum),
parent_backup_id: Some(parent_backup_id.to_string()),
completed_at: Some(SystemTime::now()),
error: None,
};
{
let mut backups = self.backups.write().await;
backups.insert(backup_id.clone(), metadata);
}
{
let mut stats = self.stats.write().await;
stats.total_backups += 1;
stats.total_bytes_backed_up += size_bytes;
stats.last_backup = Some(timestamp);
if stats.total_backups > 1 {
stats.avg_backup_time_ms =
(stats.avg_backup_time_ms * (stats.total_backups - 1) as f64 + elapsed)
/ stats.total_backups as f64;
} else {
stats.avg_backup_time_ms = elapsed;
}
}
info!(
"Incremental backup completed: {} ({} bytes, {} triples, {:.2}ms)",
backup_id,
size_bytes,
changed_data.len(),
elapsed
);
Ok(backup_id)
}
pub async fn restore_backup(
&self,
options: RestoreOptions,
) -> Result<Vec<(String, String, String)>> {
let backup_id = &options.backup_id;
info!("Restoring from backup: {}", backup_id);
let metadata = {
let backups = self.backups.read().await;
backups
.get(backup_id)
.cloned()
.ok_or_else(|| ClusterError::Other(format!("Backup not found: {}", backup_id)))?
};
let start_time = std::time::Instant::now();
if !options.skip_verification {
self.verify_backup(backup_id).await?;
}
let file_data = fs::read(&metadata.file_path)
.await
.map_err(|e| ClusterError::Other(format!("Failed to read backup file: {}", e)))?;
let decompressed_data = if self.config.enable_compression {
self.decompress_data(&file_data)?
} else {
file_data
};
let mut restored_data = self.deserialize_data(&decompressed_data)?;
if metadata.backup_type == BackupType::Incremental {
if let Some(parent_id) = &metadata.parent_backup_id {
let parent_options = RestoreOptions {
backup_id: parent_id.clone(),
skip_verification: options.skip_verification,
overwrite_existing: false,
point_in_time: options.point_in_time,
};
let mut parent_data = Box::pin(self.restore_backup(parent_options)).await?;
parent_data.extend(restored_data);
restored_data = parent_data;
}
}
let elapsed = start_time.elapsed().as_millis() as f64;
{
let mut stats = self.stats.write().await;
stats.total_restores += 1;
stats.last_restore = Some(SystemTime::now());
if stats.total_restores > 1 {
stats.avg_restore_time_ms =
(stats.avg_restore_time_ms * (stats.total_restores - 1) as f64 + elapsed)
/ stats.total_restores as f64;
} else {
stats.avg_restore_time_ms = elapsed;
}
}
info!(
"Restore completed: {} ({} triples, {:.2}ms)",
backup_id,
restored_data.len(),
elapsed
);
Ok(restored_data)
}
pub async fn verify_backup(&self, backup_id: &str) -> Result<()> {
debug!("Verifying backup: {}", backup_id);
let metadata = {
let backups = self.backups.read().await;
backups
.get(backup_id)
.cloned()
.ok_or_else(|| ClusterError::Other(format!("Backup not found: {}", backup_id)))?
};
let file_data = fs::read(&metadata.file_path)
.await
.map_err(|e| ClusterError::Other(format!("Failed to read backup file: {}", e)))?;
if let Some(expected_checksum) = &metadata.checksum {
let actual_checksum = self.calculate_checksum(&file_data);
if &actual_checksum != expected_checksum {
return Err(ClusterError::Other(format!(
"Backup verification failed: checksum mismatch (expected: {}, actual: {})",
expected_checksum, actual_checksum
)));
}
}
{
let mut backups = self.backups.write().await;
if let Some(backup) = backups.get_mut(backup_id) {
backup.status = BackupStatus::Verified;
}
}
debug!("Backup verified successfully: {}", backup_id);
Ok(())
}
pub async fn list_backups(&self) -> Vec<BackupMetadata> {
let backups = self.backups.read().await;
backups.values().cloned().collect()
}
pub async fn get_backup_metadata(&self, backup_id: &str) -> Option<BackupMetadata> {
let backups = self.backups.read().await;
backups.get(backup_id).cloned()
}
pub async fn delete_backup(&self, backup_id: &str) -> Result<()> {
info!("Deleting backup: {}", backup_id);
let metadata = {
let mut backups = self.backups.write().await;
backups
.remove(backup_id)
.ok_or_else(|| ClusterError::Other(format!("Backup not found: {}", backup_id)))?
};
if metadata.file_path.exists() {
fs::remove_file(&metadata.file_path)
.await
.map_err(|e| ClusterError::Other(format!("Failed to delete backup file: {}", e)))?;
}
info!("Backup deleted: {}", backup_id);
Ok(())
}
pub async fn get_statistics(&self) -> BackupStatistics {
self.stats.read().await.clone()
}
fn serialize_data(&self, data: &[(String, String, String)]) -> Result<Vec<u8>> {
match self.config.format {
BackupFormat::Json => serde_json::to_vec(data)
.map_err(|e| ClusterError::Serialize(format!("JSON serialization failed: {}", e))),
BackupFormat::Binary | BackupFormat::CompressedBinary => {
oxicode::serde::encode_to_vec(&data, oxicode::config::standard()).map_err(|e| {
ClusterError::Serialize(format!("Binary serialization failed: {}", e))
})
}
}
}
fn deserialize_data(&self, data: &[u8]) -> Result<Vec<(String, String, String)>> {
match self.config.format {
BackupFormat::Json => serde_json::from_slice(data).map_err(|e| {
ClusterError::Serialize(format!("JSON deserialization failed: {}", e))
}),
BackupFormat::Binary | BackupFormat::CompressedBinary => {
oxicode::serde::decode_from_slice(data, oxicode::config::standard())
.map(|(v, _)| v)
.map_err(|e| {
ClusterError::Serialize(format!("Binary deserialization failed: {}", e))
})
}
}
}
fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
oxiarc_zstd::encode_all(data, self.config.compression_level)
.map_err(|e| ClusterError::Other(format!("Compression failed: {}", e)))
}
fn decompress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
oxiarc_zstd::decode_all(data)
.map_err(|e| ClusterError::Other(format!("Decompression failed: {}", e)))
}
fn calculate_checksum(&self, data: &[u8]) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(data);
format!("{:x}", hasher.finalize())
}
async fn load_backup_metadata(&self) -> Result<()> {
debug!(
"Loading backup metadata from {}",
self.config.backup_dir.display()
);
Ok(())
}
async fn cleanup_old_backups(&self) -> Result<()> {
let mut backups = self.backups.write().await;
let mut backup_list: Vec<_> = backups.values().cloned().collect();
backup_list.sort_by_key(|b| std::cmp::Reverse(b.created_at));
if backup_list.len() > self.config.max_backups as usize {
for backup in backup_list.iter().skip(self.config.max_backups as usize) {
info!("Removing old backup (max count exceeded): {}", backup.id);
backups.remove(&backup.id);
if backup.file_path.exists() {
if let Err(e) = fs::remove_file(&backup.file_path).await {
warn!("Failed to delete backup file: {}", e);
}
}
}
}
let now = SystemTime::now();
let max_age =
std::time::Duration::from_secs(self.config.max_backup_age_days as u64 * 24 * 60 * 60);
for backup in backup_list.iter() {
if let Ok(age) = now.duration_since(backup.created_at) {
if age > max_age {
info!("Removing old backup (max age exceeded): {}", backup.id);
backups.remove(&backup.id);
if backup.file_path.exists() {
if let Err(e) = fs::remove_file(&backup.file_path).await {
warn!("Failed to delete backup file: {}", e);
}
}
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
#[tokio::test]
async fn test_backup_restore_manager_creation() {
let temp_dir = env::temp_dir().join("oxirs_backup_test_1");
let config = BackupConfig {
backup_dir: temp_dir.clone(),
..Default::default()
};
let manager = BackupRestoreManager::new(config, 1).await;
assert!(manager.is_ok());
let _ = tokio::fs::remove_dir_all(&temp_dir).await;
}
#[tokio::test]
async fn test_full_backup_creation() {
let temp_dir = env::temp_dir().join("oxirs_backup_test_2");
let config = BackupConfig {
backup_dir: temp_dir.clone(),
verify_after_backup: false,
..Default::default()
};
let manager = BackupRestoreManager::new(config, 1).await.unwrap();
let data = vec![
("s1".to_string(), "p1".to_string(), "o1".to_string()),
("s2".to_string(), "p2".to_string(), "o2".to_string()),
];
let backup_id = manager.create_full_backup(data.clone()).await;
assert!(backup_id.is_ok());
let backup_id = backup_id.unwrap();
let metadata = manager.get_backup_metadata(&backup_id).await;
assert!(metadata.is_some());
let metadata = metadata.unwrap();
assert_eq!(metadata.backup_type, BackupType::Full);
assert_eq!(metadata.triple_count, 2);
let _ = tokio::fs::remove_dir_all(&temp_dir).await;
}
#[tokio::test]
async fn test_backup_and_restore() {
let temp_dir = env::temp_dir().join("oxirs_backup_test_3");
let config = BackupConfig {
backup_dir: temp_dir.clone(),
verify_after_backup: false,
..Default::default()
};
let manager = BackupRestoreManager::new(config, 1).await.unwrap();
let original_data = vec![
("s1".to_string(), "p1".to_string(), "o1".to_string()),
("s2".to_string(), "p2".to_string(), "o2".to_string()),
];
let backup_id = manager
.create_full_backup(original_data.clone())
.await
.unwrap();
let options = RestoreOptions {
backup_id,
skip_verification: true,
overwrite_existing: true,
point_in_time: None,
};
let restored_data = manager.restore_backup(options).await;
assert!(restored_data.is_ok());
let restored_data = restored_data.unwrap();
assert_eq!(restored_data.len(), original_data.len());
let _ = tokio::fs::remove_dir_all(&temp_dir).await;
}
#[tokio::test]
async fn test_backup_statistics() {
let temp_dir = env::temp_dir().join("oxirs_backup_test_4");
let config = BackupConfig {
backup_dir: temp_dir.clone(),
verify_after_backup: false,
..Default::default()
};
let manager = BackupRestoreManager::new(config, 1).await.unwrap();
let data = vec![("s1".to_string(), "p1".to_string(), "o1".to_string())];
let _ = manager.create_full_backup(data).await.unwrap();
let stats = manager.get_statistics().await;
assert_eq!(stats.total_backups, 1);
assert!(stats.total_bytes_backed_up > 0);
let _ = tokio::fs::remove_dir_all(&temp_dir).await;
}
}