use std::sync::Arc;
use async_trait::async_trait;
use chrono::Utc;
use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::io::{Read, Write};
use crate::error::TitoError;
use crate::types::{TitoEngine, TitoTransaction};
use crate::utils::next_string_lexicographically;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ChangelogEntry {
pub op: String,
pub key: String,
pub data: Option<Value>,
}
#[async_trait]
pub trait BackupStorage: Send + Sync {
type Error: std::fmt::Display + Send + Sync;
async fn upload(&self, key: &str, data: &[u8]) -> Result<(), Self::Error>;
async fn download(&self, key: &str) -> Result<Vec<u8>, Self::Error>;
async fn list_keys(&self, prefix: &str) -> Result<Vec<String>, Self::Error>;
async fn delete_keys(&self, keys: Vec<String>) -> Result<(), Self::Error>;
}
#[derive(Clone, Debug)]
pub struct BackupConfig {
pub chunk_size: u32,
pub full_backup_interval_secs: i64,
pub retention_secs: i64,
pub backup_root: String,
}
#[derive(Debug)]
pub struct BackupResult {
pub changelog_entries: usize,
pub full_backup_entries: Option<usize>,
}
#[derive(Debug)]
pub struct RestoreResult {
pub entries_restored: usize,
}
#[derive(Clone)]
pub struct TitoBackupService<E: TitoEngine, S: BackupStorage> {
engine: E,
storage: Arc<S>,
config: BackupConfig,
}
impl<E: TitoEngine, S: BackupStorage + 'static> TitoBackupService<E, S> {
pub fn new(engine: E, storage: Arc<S>, config: BackupConfig) -> Self {
Self {
engine,
storage,
config,
}
}
pub async fn run_backup(&self) -> Result<BackupResult, TitoError> {
let changelog_entries = self.run_changelog_backup().await?;
let full_backup_entries = self.check_and_run_full_backup().await?;
Ok(BackupResult {
changelog_entries,
full_backup_entries,
})
}
pub async fn run_full_backup(&self) -> Result<usize, TitoError> {
let timestamp = Utc::now().timestamp();
let root = &self.config.backup_root;
let scan_ranges: Vec<(Vec<u8>, Vec<u8>)> = vec![
(b"table:".to_vec(), b"table:\xff".to_vec()),
(b"event:".to_vec(), b"event:\xff".to_vec()),
];
let mut seq = 0;
let mut total_entries = 0;
let chunk_size = self.config.chunk_size;
for (range_start, range_end) in scan_ranges {
let mut cursor = range_start;
let end_key = range_end;
loop {
let batch_cursor = cursor.clone();
let batch_end = end_key.clone();
let results: Vec<(Vec<u8>, Vec<u8>)> = self
.engine
.clone()
.transaction(|tx| async move {
tx.scan(batch_cursor..batch_end, chunk_size)
.await
.map_err(|e| TitoError::BackupFailed(format!("Scan failed: {:?}", e)))
})
.await?;
if results.is_empty() {
break;
}
let last_key = results.last().map(|(k, _)| k.clone());
let entry_count = results.len();
let entries: Vec<ChangelogEntry> = results
.into_iter()
.filter_map(|(key_bytes, value_bytes)| {
let key = String::from_utf8(key_bytes).ok()?;
let data: Value = serde_json::from_slice(&value_bytes).ok()?;
Some(ChangelogEntry {
op: "put".to_string(),
key,
data: Some(data),
})
})
.collect();
let chunk_key = format!(
"{}/full/{}/chunk-{:04}.json.gz",
root, timestamp, seq
);
self.compress_and_upload(&entries, &chunk_key).await?;
total_entries += entry_count;
seq += 1;
if let Some(last) = last_key {
cursor = last;
cursor.push(0);
} else {
break;
}
}
}
self.upload_marker(&format!("{}/LATEST_FULL", root), timestamp)
.await?;
let all_changelog_keys = self
.storage
.list_keys(&format!("{}/changelog/", root))
.await
.map_err(|e| TitoError::BackupFailed(format!("List changelog keys: {}", e)))?;
if !all_changelog_keys.is_empty() {
self.storage
.delete_keys(all_changelog_keys)
.await
.map_err(|e| TitoError::BackupFailed(format!("Delete changelog chunks: {}", e)))?;
}
let retention_cutoff = timestamp - self.config.retention_secs;
let all_full_keys = self
.storage
.list_keys(&format!("{}/full/", root))
.await
.map_err(|e| TitoError::BackupFailed(format!("List full backup keys: {}", e)))?;
let old_full_keys: Vec<String> = all_full_keys
.into_iter()
.filter(|k| extract_backup_timestamp(k).map_or(false, |ts| ts < retention_cutoff))
.collect();
if !old_full_keys.is_empty() {
self.storage
.delete_keys(old_full_keys)
.await
.map_err(|e| TitoError::BackupFailed(format!("Delete old backups: {}", e)))?;
}
Ok(total_entries)
}
pub async fn restore(&self, timestamp: Option<i64>) -> Result<RestoreResult, TitoError> {
let root = &self.config.backup_root;
self.engine
.delete_range(&[], &[0xFFu8; 32])
.await
.map_err(|e| TitoError::BackupFailed(format!("Wipe failed: {:?}", e)))?;
let full_backup_ts = self
.find_full_backup_timestamp(timestamp)
.await?
.ok_or_else(|| TitoError::BackupFailed("No full backup found".to_string()))?;
let full_keys = self
.list_chunk_keys(&format!("{}/full/{}/", root, full_backup_ts))
.await?;
if full_keys.is_empty() {
return Err(TitoError::BackupFailed(format!(
"Full backup at {} has no chunk files",
full_backup_ts
)));
}
let mut total_restored = self.replay_chunks(&full_keys).await?;
let changelog_keys = self
.find_changelog_keys(full_backup_ts, timestamp)
.await?;
if !changelog_keys.is_empty() {
total_restored += self.replay_chunks(&changelog_keys).await?;
}
Ok(RestoreResult {
entries_restored: total_restored,
})
}
async fn check_and_run_full_backup(&self) -> Result<Option<usize>, TitoError> {
let now = Utc::now().timestamp();
let root = &self.config.backup_root;
let last_full_ts = self
.download_marker_timestamp(&format!("{}/LATEST_FULL", root))
.await
.unwrap_or(0);
if now - last_full_ts < self.config.full_backup_interval_secs {
return Ok(None);
}
let total = self.run_full_backup().await?;
Ok(Some(total))
}
async fn run_changelog_backup(&self) -> Result<usize, TitoError> {
let root = &self.config.backup_root;
let timestamp = Utc::now().timestamp();
let timestamp_ms = Utc::now().timestamp_millis();
let chunk_size = self.config.chunk_size;
let latest_key = format!("{}/LATEST", root);
let start_key = match self.download_marker_timestamp(&latest_key).await {
Some(ts) => {
let ts_ms = if ts < 1_000_000_000_000 { ts * 1000 } else { ts };
format!("changelog:{}:", ts_ms + 1)
}
None => "changelog:".to_string(),
};
let mut current_key = start_key;
let mut seq = 0;
let mut total_entries = 0;
let mut all_keys_to_delete: Vec<String> = Vec::new();
loop {
let scan_key = current_key.clone();
let (entries, last_key) = self
.engine
.clone()
.transaction(|tx| {
let scan_key = scan_key.clone();
async move { scan_changelog_range(&scan_key, chunk_size, &tx).await }
})
.await?;
if entries.is_empty() {
break;
}
let entry_count = entries.len();
let changelog_keys: Vec<String> = entries.iter().map(|(k, _)| k.clone()).collect();
let changelog_entries: Vec<_> = entries.into_iter().map(|(_, entry)| entry).collect();
let chunk_key = format!(
"{}/changelog/{}/chunk-{:04}.json.gz",
root, timestamp, seq
);
self.compress_and_upload(&changelog_entries, &chunk_key)
.await?;
all_keys_to_delete.extend(changelog_keys);
total_entries += entry_count;
seq += 1;
if let Some(lk) = last_key {
current_key = next_string_lexicographically(lk);
} else {
break;
}
}
if total_entries > 0 {
self.upload_marker(&format!("{}/LATEST", root), timestamp_ms)
.await?;
for batch in all_keys_to_delete.chunks(chunk_size as usize) {
let keys = batch.to_vec();
self.engine
.clone()
.transaction(|tx| {
let keys = keys.clone();
async move {
for key in keys {
tx.delete(key.as_bytes())
.await
.map_err(|e| {
TitoError::BackupFailed(format!(
"Delete changelog entry: {}",
e
))
})?;
}
Ok::<_, TitoError>(())
}
})
.await?;
}
}
Ok(total_entries)
}
async fn compress_and_upload(
&self,
entries: &[ChangelogEntry],
chunk_key: &str,
) -> Result<(), TitoError> {
let json_bytes = serde_json::to_vec(entries)
.map_err(|e| TitoError::BackupFailed(format!("Serialize: {}", e)))?;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder
.write_all(&json_bytes)
.map_err(|e| TitoError::BackupFailed(format!("Compress: {}", e)))?;
let compressed = encoder
.finish()
.map_err(|e| TitoError::BackupFailed(format!("Finish compress: {}", e)))?;
self.storage
.upload(chunk_key, &compressed)
.await
.map_err(|e| TitoError::BackupFailed(format!("Upload: {}", e)))?;
Ok(())
}
async fn upload_marker(&self, key: &str, value: i64) -> Result<(), TitoError> {
self.storage
.upload(key, value.to_string().as_bytes())
.await
.map_err(|e| TitoError::BackupFailed(format!("Upload marker: {}", e)))
}
async fn download_marker_timestamp(&self, key: &str) -> Option<i64> {
let bytes = self.storage.download(key).await.ok()?;
let ts_str = String::from_utf8_lossy(&bytes);
ts_str.trim().parse::<i64>().ok()
}
async fn find_full_backup_timestamp(
&self,
restore_ts: Option<i64>,
) -> Result<Option<i64>, TitoError> {
let root = &self.config.backup_root;
let all_keys = self
.storage
.list_keys(&format!("{}/full/", root))
.await
.map_err(|e| TitoError::BackupFailed(format!("List full backups: {}", e)))?;
if all_keys.is_empty() {
return Ok(None);
}
let mut timestamps: Vec<i64> = all_keys
.iter()
.filter_map(|k| extract_backup_timestamp(k))
.collect();
timestamps.sort();
timestamps.dedup();
if timestamps.is_empty() {
return Ok(None);
}
let selected = match restore_ts {
Some(ts) => timestamps.into_iter().filter(|&t| t <= ts).last(),
None => timestamps.into_iter().last(),
};
Ok(selected)
}
async fn find_changelog_keys(
&self,
full_ts: i64,
restore_ts: Option<i64>,
) -> Result<Vec<String>, TitoError> {
let root = &self.config.backup_root;
let all_keys = self
.storage
.list_keys(&format!("{}/changelog/", root))
.await
.map_err(|e| TitoError::BackupFailed(format!("List changelog keys: {}", e)))?;
let mut chunk_keys: Vec<String> = all_keys
.into_iter()
.filter(|k| k.ends_with(".json.gz"))
.filter(|k| {
extract_backup_timestamp(k)
.map_or(false, |ts| ts > full_ts && restore_ts.map_or(true, |rts| ts <= rts))
})
.collect();
chunk_keys.sort();
Ok(chunk_keys)
}
async fn list_chunk_keys(&self, prefix: &str) -> Result<Vec<String>, TitoError> {
let all_keys = self
.storage
.list_keys(prefix)
.await
.map_err(|e| TitoError::BackupFailed(format!("List chunk keys: {}", e)))?;
let mut chunk_keys: Vec<String> = all_keys
.into_iter()
.filter(|k| k.ends_with(".json.gz"))
.collect();
chunk_keys.sort();
Ok(chunk_keys)
}
async fn replay_chunks(&self, chunk_keys: &[String]) -> Result<usize, TitoError> {
let mut total_restored = 0;
for chunk_key in chunk_keys {
let compressed_bytes = self
.storage
.download(chunk_key)
.await
.map_err(|e| TitoError::BackupFailed(format!("Download {}: {}", chunk_key, e)))?;
let mut decoder = GzDecoder::new(&compressed_bytes[..]);
let mut json_bytes = Vec::new();
decoder
.read_to_end(&mut json_bytes)
.map_err(|e| TitoError::BackupFailed(format!("Decompress {}: {}", chunk_key, e)))?;
let entries: Vec<ChangelogEntry> = serde_json::from_slice(&json_bytes)
.map_err(|e| TitoError::BackupFailed(format!("Parse {}: {}", chunk_key, e)))?;
let chunk_count = entries.len();
for batch in entries.chunks(100) {
let batch: Vec<ChangelogEntry> = batch.to_vec();
self.engine
.clone()
.transaction(|tx| {
let batch = batch.clone();
async move {
for entry in batch {
match entry.op.as_str() {
"put" => {
if let Some(data) = entry.data {
let bytes = serde_json::to_vec(&data).map_err(|e| {
TitoError::BackupFailed(format!(
"Serialize {}: {}",
entry.key, e
))
})?;
tx.put(entry.key.as_bytes(), bytes).await.map_err(
|e| {
TitoError::BackupFailed(format!(
"Put {}: {}",
entry.key, e
))
},
)?;
}
}
"delete" => {
tx.delete(entry.key.as_bytes()).await.map_err(|e| {
TitoError::BackupFailed(format!(
"Delete {}: {}",
entry.key, e
))
})?;
}
_ => {}
}
}
Ok::<_, TitoError>(())
}
})
.await?;
}
total_restored += chunk_count;
}
Ok(total_restored)
}
}
async fn scan_changelog_range<T: TitoTransaction>(
start_key: &str,
limit: u32,
tx: &T,
) -> Result<(Vec<(String, ChangelogEntry)>, Option<String>), TitoError> {
let end_key = next_string_lexicographically("changelog:".to_string());
let results = tx
.scan(start_key.as_bytes()..end_key.as_bytes(), limit)
.await
.map_err(|e| TitoError::QueryFailed(format!("Scan failed: {}", e)))?;
let mut entries = Vec::new();
let mut last_key = None;
for (key_bytes, value_bytes) in results {
let key = String::from_utf8(key_bytes)
.map_err(|_| TitoError::DeserializationFailed("Invalid key".to_string()))?;
let entry: ChangelogEntry = serde_json::from_slice(&value_bytes)
.map_err(|_| TitoError::DeserializationFailed("Invalid changelog entry".to_string()))?;
last_key = Some(key.clone());
entries.push((key, entry));
}
Ok((entries, last_key))
}
fn extract_backup_timestamp(key: &str) -> Option<i64> {
let parts: Vec<&str> = key.split('/').collect();
if parts.len() == 4 {
parts[2].parse::<i64>().ok()
} else {
None
}
}