use crate::error::{FusekiError, FusekiResult};
use crate::store::{RdfSerializationFormat, Store};
use crate::store_ext::StoreExt;
use chrono::{DateTime, Utc};
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::io::Read;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs;
use tokio::time::{self, Duration};
use tracing::{debug, error, info, warn};
pub struct BackupManager {
store: Arc<Store>,
config: BackupConfig,
last_backup: Arc<tokio::sync::RwLock<Option<DateTime<Utc>>>>,
last_full_backup: Arc<tokio::sync::RwLock<Option<DateTime<Utc>>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupConfig {
pub enabled: bool,
pub interval_hours: u64,
pub backup_dir: PathBuf,
pub max_backups: usize,
pub compression: bool,
pub include_indexes: bool,
pub strategy: BackupStrategy,
}
impl Default for BackupConfig {
fn default() -> Self {
Self {
enabled: false,
interval_hours: 24,
backup_dir: PathBuf::from("/data/backups"),
max_backups: 7,
compression: true,
include_indexes: true,
strategy: BackupStrategy::Full,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BackupStrategy {
Full,
Incremental,
Differential,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupMetadata {
pub id: String,
pub timestamp: DateTime<Utc>,
pub strategy: BackupStrategy,
pub size_bytes: u64,
pub compressed: bool,
pub triple_count: Option<u64>,
pub checksum: Option<String>,
pub description: Option<String>,
}
impl BackupManager {
pub fn new(store: Arc<Store>, config: BackupConfig) -> Self {
Self {
store,
config,
last_backup: Arc::new(tokio::sync::RwLock::new(None)),
last_full_backup: Arc::new(tokio::sync::RwLock::new(None)),
}
}
pub async fn start(&self) -> FusekiResult<()> {
if !self.config.enabled {
info!("Automatic backups disabled");
return Ok(());
}
info!(
"Starting automatic backup scheduler (interval: {} hours)",
self.config.interval_hours
);
fs::create_dir_all(&self.config.backup_dir)
.await
.map_err(|e| {
FusekiError::internal(format!("Failed to create backup directory: {}", e))
})?;
loop {
if let Err(e) = self.perform_backup().await {
error!("Backup failed: {}", e);
}
let interval = Duration::from_secs(self.config.interval_hours * 3600);
time::sleep(interval).await;
}
}
pub async fn perform_backup(&self) -> FusekiResult<BackupMetadata> {
info!("Starting backup (strategy: {:?})", self.config.strategy);
let backup_id = format!("backup-{}", Utc::now().format("%Y%m%d-%H%M%S"));
let backup_path = self.config.backup_dir.join(&backup_id);
fs::create_dir_all(&backup_path).await.map_err(|e| {
FusekiError::internal(format!("Failed to create backup directory: {}", e))
})?;
let metadata = match self.config.strategy {
BackupStrategy::Full => self.perform_full_backup(&backup_path, &backup_id).await?,
BackupStrategy::Incremental => {
self.perform_incremental_backup(&backup_path, &backup_id)
.await?
}
BackupStrategy::Differential => {
self.perform_differential_backup(&backup_path, &backup_id)
.await?
}
};
let final_metadata = if self.config.compression {
self.compress_backup(&backup_path, metadata).await?
} else {
metadata
};
self.save_metadata(&backup_path, &final_metadata).await?;
let now = Utc::now();
*self.last_backup.write().await = Some(now);
if final_metadata.strategy == BackupStrategy::Full {
*self.last_full_backup.write().await = Some(now);
info!("Updated last full backup timestamp");
}
self.cleanup_old_backups().await?;
info!("Backup completed successfully: {}", backup_id);
Ok(final_metadata)
}
async fn perform_full_backup(
&self,
backup_path: &Path,
backup_id: &str,
) -> FusekiResult<BackupMetadata> {
info!("Performing full backup");
let export_path = backup_path.join("data.nq");
let datasets = self
.store
.list_datasets()
.map_err(|e| FusekiError::internal(format!("Failed to list datasets: {}", e)))?;
let mut all_data = String::new();
let mut total_triple_count: usize = 0;
match self.store.export_data(RdfSerializationFormat::NQuads, None) {
Ok(data) => {
all_data.push_str(&data);
total_triple_count += self.store.count_triples("default");
}
Err(e) => {
warn!("Failed to export default dataset: {}", e);
}
}
for dataset in &datasets {
if dataset != "default" {
match self
.store
.export_data(RdfSerializationFormat::NQuads, Some(dataset))
{
Ok(data) => {
all_data.push_str(&data);
total_triple_count += self.store.count_triples(dataset);
}
Err(e) => {
warn!("Failed to export dataset {}: {}", dataset, e);
}
}
}
}
let mut hasher = Sha256::new();
hasher.update(all_data.as_bytes());
let checksum = format!("{:x}", hasher.finalize());
fs::write(&export_path, all_data.as_bytes())
.await
.map_err(|e| FusekiError::internal(format!("Failed to write backup: {}", e)))?;
let size_bytes = fs::metadata(&export_path)
.await
.map_err(|e| FusekiError::internal(format!("Failed to get file size: {}", e)))?
.len();
info!(
"Full backup completed: {} triples, {} bytes",
total_triple_count, size_bytes
);
Ok(BackupMetadata {
id: backup_id.to_string(),
timestamp: Utc::now(),
strategy: BackupStrategy::Full,
size_bytes,
compressed: false,
triple_count: Some(total_triple_count as u64),
checksum: Some(checksum),
description: Some(format!("Full backup of {} datasets", datasets.len())),
})
}
async fn perform_incremental_backup(
&self,
backup_path: &Path,
backup_id: &str,
) -> FusekiResult<BackupMetadata> {
info!("Performing incremental backup");
let last_backup_time = self.last_backup.read().await;
let since = match *last_backup_time {
Some(time) => time,
None => {
warn!("No previous backup found, performing full backup instead");
return self.perform_full_backup(backup_path, backup_id).await;
}
};
drop(last_backup_time);
info!("Fetching changes since {}", since);
let changes = self.store.get_changes_since(since).await?;
if changes.is_empty() {
info!("No changes detected since last backup");
}
let export_path = backup_path.join("changes.nq");
let mut content = String::new();
content.push_str(&format!("# Incremental backup since {}\n", since));
content.push_str(&format!("# {} changes\n", changes.len()));
for change in &changes {
content.push_str(&format!(
"# Change {} at {}: {} (graphs: {:?})\n",
change.id, change.timestamp, change.operation_type, change.affected_graphs
));
}
fs::write(&export_path, content.as_bytes())
.await
.map_err(|e| FusekiError::internal(format!("Failed to write backup: {}", e)))?;
let size_bytes = fs::metadata(&export_path)
.await
.map_err(|e| FusekiError::internal(format!("Failed to get file size: {}", e)))?
.len();
let checksum = self.calculate_checksum(&export_path).await?;
Ok(BackupMetadata {
id: backup_id.to_string(),
timestamp: Utc::now(),
strategy: BackupStrategy::Incremental,
size_bytes,
compressed: false,
triple_count: Some(changes.len() as u64),
checksum: Some(checksum),
description: Some(format!(
"Incremental backup with {} changes since {}",
changes.len(),
since
)),
})
}
async fn perform_differential_backup(
&self,
backup_path: &Path,
backup_id: &str,
) -> FusekiResult<BackupMetadata> {
info!("Performing differential backup");
let last_full_backup_time = self.last_full_backup.read().await;
let since = match *last_full_backup_time {
Some(time) => time,
None => {
warn!("No previous full backup found, performing full backup instead");
return self.perform_full_backup(backup_path, backup_id).await;
}
};
drop(last_full_backup_time);
info!("Fetching changes since last full backup at {}", since);
let changes = self.store.get_changes_since(since).await?;
if changes.is_empty() {
info!("No changes detected since last full backup");
}
let export_path = backup_path.join("diff.nq");
let mut content = String::new();
content.push_str(&format!(
"# Differential backup since last full backup at {}\n",
since
));
content.push_str(&format!("# {} changes\n", changes.len()));
for change in &changes {
content.push_str(&format!(
"# Change {} at {}: {} (graphs: {:?})\n",
change.id, change.timestamp, change.operation_type, change.affected_graphs
));
}
fs::write(&export_path, content.as_bytes())
.await
.map_err(|e| FusekiError::internal(format!("Failed to write backup: {}", e)))?;
let size_bytes = fs::metadata(&export_path)
.await
.map_err(|e| FusekiError::internal(format!("Failed to get file size: {}", e)))?
.len();
let checksum = self.calculate_checksum(&export_path).await?;
Ok(BackupMetadata {
id: backup_id.to_string(),
timestamp: Utc::now(),
strategy: BackupStrategy::Differential,
size_bytes,
compressed: false,
triple_count: Some(changes.len() as u64),
checksum: Some(checksum),
description: Some(format!(
"Differential backup with {} changes since last full backup at {}",
changes.len(),
since
)),
})
}
async fn compress_backup(
&self,
backup_path: &Path,
metadata: BackupMetadata,
) -> FusekiResult<BackupMetadata> {
debug!("Compressing backup");
let data_file = match metadata.strategy {
BackupStrategy::Full => backup_path.join("data.nq"),
BackupStrategy::Incremental => backup_path.join("changes.nq"),
BackupStrategy::Differential => backup_path.join("diff.nq"),
};
if !data_file.exists() {
return Ok(metadata);
}
let data = fs::read(&data_file).await.map_err(|e| {
FusekiError::internal(format!("Failed to read data for compression: {}", e))
})?;
let compressed_path = data_file.with_extension("nq.gz");
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder
.write_all(&data)
.map_err(|e| FusekiError::internal(format!("Failed to compress data: {}", e)))?;
let compressed_data = encoder
.finish()
.map_err(|e| FusekiError::internal(format!("Failed to finalize compression: {}", e)))?;
fs::write(&compressed_path, &compressed_data)
.await
.map_err(|e| {
FusekiError::internal(format!("Failed to write compressed file: {}", e))
})?;
fs::remove_file(&data_file).await.map_err(|e| {
FusekiError::internal(format!("Failed to remove uncompressed file: {}", e))
})?;
let mut compressed_metadata = metadata;
compressed_metadata.compressed = true;
compressed_metadata.size_bytes = compressed_data.len() as u64;
info!(
"Compressed backup: {} -> {} bytes ({}% reduction)",
data.len(),
compressed_data.len(),
if data.is_empty() {
0
} else {
100 - (compressed_data.len() * 100 / data.len())
}
);
Ok(compressed_metadata)
}
async fn calculate_checksum(&self, file_path: &Path) -> FusekiResult<String> {
let data = fs::read(file_path).await.map_err(|e| {
FusekiError::internal(format!("Failed to read file for checksum: {}", e))
})?;
let mut hasher = Sha256::new();
hasher.update(&data);
Ok(format!("{:x}", hasher.finalize()))
}
async fn save_metadata(
&self,
backup_path: &Path,
metadata: &BackupMetadata,
) -> FusekiResult<()> {
let metadata_path = backup_path.join("metadata.json");
let metadata_json = serde_json::to_string_pretty(metadata)
.map_err(|e| FusekiError::internal(format!("Failed to serialize metadata: {}", e)))?;
fs::write(&metadata_path, metadata_json)
.await
.map_err(|e| FusekiError::internal(format!("Failed to write metadata: {}", e)))?;
Ok(())
}
async fn cleanup_old_backups(&self) -> FusekiResult<()> {
debug!("Cleaning up old backups");
let mut entries = fs::read_dir(&self.config.backup_dir).await.map_err(|e| {
FusekiError::internal(format!("Failed to read backup directory: {}", e))
})?;
let mut backups = Vec::new();
while let Some(entry) = entries
.next_entry()
.await
.map_err(|e| FusekiError::internal(format!("Failed to read directory entry: {}", e)))?
{
if entry
.file_type()
.await
.map_err(|e| FusekiError::internal(format!("Failed to get file type: {}", e)))?
.is_dir()
{
if let Ok(metadata) = entry.metadata().await {
if let Ok(created) = metadata.created() {
backups.push((entry.path(), created));
}
}
}
}
backups.sort_by_key(|(_, created)| *created);
while backups.len() > self.config.max_backups {
if let Some((path, _)) = backups.first() {
info!("Removing old backup: {:?}", path);
fs::remove_dir_all(path).await.map_err(|e| {
FusekiError::internal(format!("Failed to remove old backup: {}", e))
})?;
backups.remove(0);
} else {
break;
}
}
Ok(())
}
pub async fn restore_backup(&self, backup_id: &str) -> FusekiResult<()> {
info!("Restoring from backup: {}", backup_id);
let backup_path = self.config.backup_dir.join(backup_id);
if !backup_path.exists() {
return Err(FusekiError::internal(format!(
"Backup not found: {}",
backup_id
)));
}
let metadata_path = backup_path.join("metadata.json");
let metadata_json = fs::read_to_string(&metadata_path)
.await
.map_err(|e| FusekiError::internal(format!("Failed to read metadata: {}", e)))?;
let metadata: BackupMetadata = serde_json::from_str(&metadata_json)
.map_err(|e| FusekiError::internal(format!("Failed to parse metadata: {}", e)))?;
let base_filename = match metadata.strategy {
BackupStrategy::Full => "data.nq",
BackupStrategy::Incremental => "changes.nq",
BackupStrategy::Differential => "diff.nq",
};
let compressed_path = backup_path.join(format!("{}.gz", base_filename));
let uncompressed_path = backup_path.join(base_filename);
let data = if compressed_path.exists() {
debug!("Decompressing backup data from {:?}", compressed_path);
let compressed_data = fs::read(&compressed_path).await.map_err(|e| {
FusekiError::internal(format!("Failed to read compressed backup: {}", e))
})?;
let mut decoder = GzDecoder::new(&compressed_data[..]);
let mut decompressed = String::new();
decoder.read_to_string(&mut decompressed).map_err(|e| {
FusekiError::internal(format!("Failed to decompress backup: {}", e))
})?;
decompressed
} else if uncompressed_path.exists() {
debug!(
"Reading uncompressed backup data from {:?}",
uncompressed_path
);
fs::read_to_string(&uncompressed_path)
.await
.map_err(|e| FusekiError::internal(format!("Failed to read backup: {}", e)))?
} else {
return Err(FusekiError::internal(format!(
"Backup data file not found in {}",
backup_id
)));
};
if let Some(expected_checksum) = &metadata.checksum {
let mut hasher = Sha256::new();
hasher.update(data.as_bytes());
let actual_checksum = format!("{:x}", hasher.finalize());
if &actual_checksum != expected_checksum {
return Err(FusekiError::internal(format!(
"Checksum mismatch: expected {}, got {}",
expected_checksum, actual_checksum
)));
}
debug!("Checksum verified: {}", actual_checksum);
}
info!("Clearing current store data before restore");
if let Err(e) = self.store.update("DROP ALL") {
warn!("Failed to clear store (may be empty): {}", e);
}
info!("Importing backup data ({} bytes)", data.len());
let imported_count = self
.store
.import_data(&data, RdfSerializationFormat::NQuads, None)
.await
.map_err(|e| FusekiError::internal(format!("Failed to import backup data: {}", e)))?;
info!("Imported {} triples from backup", imported_count);
info!(
"Restore completed successfully from backup: {} ({} triples)",
backup_id,
metadata.triple_count.unwrap_or(0)
);
Ok(())
}
pub async fn list_backups(&self) -> FusekiResult<Vec<BackupMetadata>> {
let mut entries = fs::read_dir(&self.config.backup_dir).await.map_err(|e| {
FusekiError::internal(format!("Failed to read backup directory: {}", e))
})?;
let mut backups = Vec::new();
while let Some(entry) = entries
.next_entry()
.await
.map_err(|e| FusekiError::internal(format!("Failed to read directory entry: {}", e)))?
{
if entry
.file_type()
.await
.map_err(|e| FusekiError::internal(format!("Failed to get file type: {}", e)))?
.is_dir()
{
let metadata_path = entry.path().join("metadata.json");
if metadata_path.exists() {
if let Ok(metadata_json) = fs::read_to_string(&metadata_path).await {
if let Ok(metadata) = serde_json::from_str::<BackupMetadata>(&metadata_json)
{
backups.push(metadata);
}
}
}
}
}
backups.sort_by_key(|item| std::cmp::Reverse(item.timestamp));
Ok(backups)
}
pub async fn last_backup_time(&self) -> Option<DateTime<Utc>> {
*self.last_backup.read().await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_backup_config_default() {
let config = BackupConfig::default();
assert_eq!(config.interval_hours, 24);
assert_eq!(config.max_backups, 7);
assert!(config.compression);
}
#[test]
fn test_backup_metadata_serialization() {
let metadata = BackupMetadata {
id: "test-backup".to_string(),
timestamp: Utc::now(),
strategy: BackupStrategy::Full,
size_bytes: 1024,
compressed: true,
triple_count: Some(1000),
checksum: Some("abc123".to_string()),
description: Some("Test backup".to_string()),
};
let json = serde_json::to_string(&metadata).unwrap();
let deserialized: BackupMetadata = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.id, metadata.id);
assert_eq!(deserialized.strategy, BackupStrategy::Full);
}
}