use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
use rusqlite::Connection;
use crate::error::MiniAppError;
pub async fn write_backup_pair(
scope_dir: &Path,
table: &str,
schema_yaml_path: &Path,
db_path: &Path,
) -> Result<(), MiniAppError> {
let scope_dir = scope_dir.to_path_buf();
let table = table.to_string();
let schema_yaml_path = schema_yaml_path.to_path_buf();
let db_path = db_path.to_path_buf();
tokio::task::spawn_blocking(move || -> Result<(), MiniAppError> {
write_backup_pair_sync(&scope_dir, &table, &schema_yaml_path, &db_path)
})
.await
.map_err(|e| MiniAppError::Backup(format!("blocking task panic: {e}")))?
}
fn write_backup_pair_sync(
scope_dir: &Path,
table: &str,
schema_yaml_path: &Path,
db_path: &Path,
) -> Result<(), MiniAppError> {
let unix_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| MiniAppError::Backup(format!("system clock error: {e}")))?
.as_secs();
let backup_dir = scope_dir.join("_backup");
std::fs::create_dir_all(&backup_dir)
.map_err(|e| MiniAppError::Backup(format!("cannot create backup dir: {e}")))?;
let yaml_dst = backup_dir.join(format!("{}.{}.yaml", table, unix_secs));
std::fs::copy(schema_yaml_path, &yaml_dst)
.map_err(|e| MiniAppError::Backup(format!("cannot copy schema yaml: {e}")))?;
let src_conn = Connection::open(db_path)
.map_err(|e| MiniAppError::Backup(format!("cannot open source db: {e}")))?;
if let Err(e) = src_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE)") {
tracing::warn!(error = %e, "WAL checkpoint before backup failed; continuing anyway");
}
let db_dst = backup_dir.join(format!("{}.{}.db", table, unix_secs));
src_conn
.backup(rusqlite::DatabaseName::Main, &db_dst, None)
.map_err(|e| MiniAppError::Backup(format!("rusqlite backup failed: {e}")))?;
Ok(())
}
pub async fn purge_old_backups(
scope_dir: &Path,
table: &str,
retention: usize,
) -> Result<(), MiniAppError> {
let scope_dir = scope_dir.to_path_buf();
let table = table.to_string();
tokio::task::spawn_blocking(move || -> Result<(), MiniAppError> {
purge_old_backups_sync(&scope_dir, &table, retention)
})
.await
.map_err(|e| MiniAppError::Backup(format!("blocking task panic: {e}")))?
}
fn purge_old_backups_sync(
scope_dir: &Path,
table: &str,
retention: usize,
) -> Result<(), MiniAppError> {
let backup_dir = scope_dir.join("_backup");
if !backup_dir.exists() {
return Ok(());
}
let entries = std::fs::read_dir(&backup_dir)
.map_err(|e| MiniAppError::Backup(format!("cannot read backup dir: {e}")))?;
let mut timestamps: Vec<u64> = entries
.filter_map(|entry| {
let entry = entry.ok()?;
let name = entry.file_name();
let name = name.to_string_lossy();
parse_backup_timestamp(&name, table, "yaml")
})
.collect();
timestamps.sort_unstable_by(|a, b| b.cmp(a));
for ts in timestamps.iter().skip(retention) {
let yaml_path = backup_dir.join(format!("{}.{}.yaml", table, ts));
let db_path = backup_dir.join(format!("{}.{}.db", table, ts));
if let Err(e) = std::fs::remove_file(&yaml_path) {
tracing::warn!(
path = %yaml_path.display(),
error = %e,
"failed to remove old backup yaml; continuing"
);
}
if let Err(e) = std::fs::remove_file(&db_path) {
tracing::warn!(
path = %db_path.display(),
error = %e,
"failed to remove old backup db; continuing"
);
}
}
Ok(())
}
fn parse_backup_timestamp(filename: &str, table: &str, ext: &str) -> Option<u64> {
let prefix = format!("{}.", table);
let suffix = format!(".{}", ext);
let without_prefix = filename.strip_prefix(&prefix)?;
let ts_str = without_prefix.strip_suffix(&suffix)?;
ts_str.parse::<u64>().ok()
}
#[cfg(test)]
fn list_backup_timestamps(backup_dir: &Path, table: &str) -> Result<Vec<u64>, MiniAppError> {
let entries = std::fs::read_dir(backup_dir)
.map_err(|e| MiniAppError::Backup(format!("cannot read backup dir: {e}")))?;
let mut timestamps: Vec<u64> = entries
.filter_map(|entry| {
let entry = entry.ok()?;
let name = entry.file_name();
let name = name.to_string_lossy().to_string();
parse_backup_timestamp(&name, table, "yaml")
})
.collect();
timestamps.sort_unstable_by(|a, b| b.cmp(a));
Ok(timestamps)
}
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::Connection;
use std::io::Write;
use std::path::PathBuf;
use tempfile::TempDir;
use tokio::task;
fn create_test_db(path: &Path) {
let conn = Connection::open(path).expect("open test db");
conn.execute_batch(
"PRAGMA journal_mode=WAL; CREATE TABLE IF NOT EXISTS t (id INTEGER PRIMARY KEY, v TEXT);",
)
.expect("setup test db");
}
fn create_test_schema_yaml(path: &Path) {
let mut f = std::fs::File::create(path).expect("create schema yaml");
f.write_all(b"table: items\nfields:\n - name: v\n type: string\n required: false\n")
.expect("write schema yaml");
}
#[tokio::test]
async fn write_backup_pair_creates_yaml_and_db() {
let dir = TempDir::new().expect("temp dir");
let scope_dir = dir.path();
let db_path = scope_dir.join("items.db");
let schema_path = scope_dir.join("schema.yaml");
create_test_db(&db_path);
create_test_schema_yaml(&schema_path);
write_backup_pair(scope_dir, "items", &schema_path, &db_path)
.await
.expect("write_backup_pair must succeed");
let backup_dir = scope_dir.join("_backup");
assert!(backup_dir.exists(), "_backup dir must be created");
let entries: Vec<_> = std::fs::read_dir(&backup_dir)
.expect("read backup dir")
.filter_map(|e| e.ok())
.collect();
let yaml_count = entries
.iter()
.filter(|e| e.file_name().to_string_lossy().ends_with(".yaml"))
.count();
let db_count = entries
.iter()
.filter(|e| e.file_name().to_string_lossy().ends_with(".db"))
.count();
assert_eq!(yaml_count, 1, "exactly one yaml backup must exist");
assert_eq!(db_count, 1, "exactly one db backup must exist");
}
#[tokio::test]
async fn purge_old_backups_keeps_n_newest() {
let dir = TempDir::new().expect("temp dir");
let scope_dir = dir.path();
let backup_dir = scope_dir.join("_backup");
std::fs::create_dir_all(&backup_dir).expect("create backup dir");
for ts in [100u64, 200, 300, 400, 500] {
std::fs::write(backup_dir.join(format!("items.{}.yaml", ts)), b"yaml")
.expect("write yaml");
std::fs::write(backup_dir.join(format!("items.{}.db", ts)), b"db").expect("write db");
}
purge_old_backups(scope_dir, "items", 3)
.await
.expect("purge must succeed");
let timestamps = list_backup_timestamps(&backup_dir, "items").expect("list timestamps");
assert_eq!(timestamps.len(), 3, "exactly 3 pairs must remain");
assert_eq!(timestamps, vec![500, 400, 300], "newest 3 must be kept");
assert!(!backup_dir.join("items.100.yaml").exists());
assert!(!backup_dir.join("items.100.db").exists());
assert!(!backup_dir.join("items.200.yaml").exists());
assert!(!backup_dir.join("items.200.db").exists());
}
#[tokio::test]
async fn purge_old_backups_no_op_when_below_limit() {
let dir = TempDir::new().expect("temp dir");
let scope_dir = dir.path();
let backup_dir = scope_dir.join("_backup");
std::fs::create_dir_all(&backup_dir).expect("create backup dir");
for ts in [100u64, 200] {
std::fs::write(backup_dir.join(format!("items.{}.yaml", ts)), b"yaml")
.expect("write yaml");
std::fs::write(backup_dir.join(format!("items.{}.db", ts)), b"db").expect("write db");
}
purge_old_backups(scope_dir, "items", 10)
.await
.expect("purge must succeed");
let timestamps = list_backup_timestamps(&backup_dir, "items").expect("list timestamps");
assert_eq!(timestamps.len(), 2, "both pairs must still exist");
}
#[tokio::test]
async fn write_backup_pair_io_error_returns_backup_variant() {
let dir = TempDir::new().expect("temp dir");
let scope_dir = dir.path();
let db_path = scope_dir.join("items.db");
create_test_db(&db_path);
let result = write_backup_pair(
scope_dir,
"items",
Path::new("/nonexistent/schema.yaml"),
&db_path,
)
.await;
let err = result.expect_err("missing schema file must error");
assert!(
matches!(err, MiniAppError::Backup(_)),
"expected Backup variant, got {:?}",
err
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_backup_does_not_block_concurrent_reads() {
let dir = TempDir::new().expect("temp dir");
let db_path = dir.path().join("concurrent.db");
let dst_path = dir.path().join("backup.db");
let schema_path = dir.path().join("schema.yaml");
{
let conn = Connection::open(&db_path).expect("open db");
conn.execute_batch(
"PRAGMA journal_mode=WAL; CREATE TABLE rows (id INTEGER PRIMARY KEY, val TEXT);",
)
.expect("setup db");
}
create_test_schema_yaml(&schema_path);
let db_path_writer = db_path.clone();
let dst_path_backup = dst_path.clone();
let schema_path_backup = schema_path.clone();
let scope_dir = dir.path().to_path_buf();
let writer = task::spawn(async move {
task::spawn_blocking(move || {
let conn = Connection::open(&db_path_writer).expect("open writer db");
for i in 0i64..100 {
conn.execute("INSERT INTO rows (val) VALUES (?1)", [format!("v{}", i)])
.expect("insert row");
}
})
.await
.expect("writer blocking task")
});
let backup_task =
write_backup_pair(&scope_dir, "concurrent", &schema_path_backup, &db_path);
let (writer_result, backup_result) = tokio::join!(writer, backup_task);
writer_result.expect("writer must succeed");
backup_result.expect("backup must succeed");
let backup_dir = scope_dir.join("_backup");
let backup_entries: Vec<PathBuf> = std::fs::read_dir(&backup_dir)
.expect("read backup dir")
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
p.extension()
.and_then(|x| x.to_str())
.map(|x| x == "db")
.unwrap_or(false)
})
.collect();
assert!(
!backup_entries.is_empty(),
"at least one db backup must exist"
);
let backup_conn = Connection::open(&backup_entries[0]).expect("open backup db");
let backup_row_count: i64 = backup_conn
.query_row("SELECT COUNT(*) FROM rows", [], |row| row.get(0))
.unwrap_or(0);
assert!(backup_row_count >= 0, "backup db must be a valid sqlite db");
let _ = dst_path_backup; }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_spawn_blocking_cancel_safety_insert_survives() {
let dir = TempDir::new().expect("temp dir");
let scope_dir = dir.path().to_path_buf();
let db_path = scope_dir.join("cancel_test.db");
let schema_path = scope_dir.join("schema.yaml");
{
let conn = Connection::open(&db_path).expect("open db");
conn.execute_batch(
"PRAGMA journal_mode=WAL; CREATE TABLE rows (id INTEGER PRIMARY KEY, val TEXT);",
)
.expect("setup db");
}
create_test_schema_yaml(&schema_path);
let backup_fut = write_backup_pair(&scope_dir, "cancel_test", &schema_path, &db_path);
let result = tokio::time::timeout(std::time::Duration::from_millis(1), backup_fut).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let src_conn = Connection::open(&db_path).expect("source db must still be openable");
let _count: i64 = src_conn
.query_row("SELECT COUNT(*) FROM rows", [], |row| row.get(0))
.expect("source db must be a valid sqlite db after cancellation");
if let Ok(Ok(())) = result {
let backup_dir = scope_dir.join("_backup");
assert!(
backup_dir.exists(),
"backup dir must exist on successful write"
);
}
}
}