use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use arc_swap::ArcSwap;
use rusqlite::Connection;
use schemars::JsonSchema;
use serde::Deserialize;
use crate::config::Config;
use crate::error::MiniAppError;
use crate::registry::TableRegistry;
pub async fn write_snapshot_db(
scope_dir: &Path,
table: &str,
db_path: &Path,
) -> Result<(), MiniAppError> {
let scope_dir = scope_dir.to_path_buf();
let table = table.to_string();
let db_path = db_path.to_path_buf();
tokio::task::spawn_blocking(move || -> Result<(), MiniAppError> {
write_snapshot_db_sync(&scope_dir, &table, &db_path)
})
.await
.map_err(|e| MiniAppError::Snapshot(format!("blocking task panic: {e}")))?
}
fn write_snapshot_db_sync(
scope_dir: &Path,
table: &str,
db_path: &Path,
) -> Result<(), MiniAppError> {
let unix_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| MiniAppError::Snapshot(format!("system clock error: {e}")))?
.as_secs();
let snapshot_dir = scope_dir.join("_snapshots");
std::fs::create_dir_all(&snapshot_dir)
.map_err(|e| MiniAppError::Snapshot(format!("cannot create snapshot dir: {e}")))?;
let src_conn = Connection::open(db_path)
.map_err(|e| MiniAppError::Snapshot(format!("cannot open source db: {e}")))?;
if let Err(e) = src_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE)") {
tracing::warn!(error = %e, "WAL checkpoint before snapshot failed; continuing anyway");
}
let db_dst = snapshot_dir.join(format!("{}.{}.db", table, unix_secs));
src_conn
.backup(rusqlite::DatabaseName::Main, &db_dst, None)
.map_err(|e| MiniAppError::Snapshot(format!("rusqlite backup failed: {e}")))?;
Ok(())
}
pub async fn purge_old_snapshots(
scope_dir: &Path,
table: &str,
retention: usize,
) -> Result<(), MiniAppError> {
let scope_dir = scope_dir.to_path_buf();
let table = table.to_string();
tokio::task::spawn_blocking(move || -> Result<(), MiniAppError> {
purge_old_snapshots_sync(&scope_dir, &table, retention)
})
.await
.map_err(|e| MiniAppError::Snapshot(format!("blocking task panic: {e}")))?
}
fn purge_old_snapshots_sync(
scope_dir: &Path,
table: &str,
retention: usize,
) -> Result<(), MiniAppError> {
let snapshot_dir = scope_dir.join("_snapshots");
if !snapshot_dir.exists() {
return Ok(());
}
let entries = std::fs::read_dir(&snapshot_dir)
.map_err(|e| MiniAppError::Snapshot(format!("cannot read snapshot dir: {e}")))?;
let mut timestamps: Vec<u64> = entries
.filter_map(|entry| {
let entry = entry.ok()?;
let name = entry.file_name();
let name = name.to_string_lossy();
parse_snapshot_timestamp(&name, table, "db")
})
.collect();
timestamps.sort_unstable_by(|a, b| b.cmp(a));
for ts in timestamps.iter().skip(retention) {
let db_path = snapshot_dir.join(format!("{}.{}.db", table, ts));
if let Err(e) = std::fs::remove_file(&db_path) {
tracing::warn!(
path = %db_path.display(),
error = %e,
"failed to remove old snapshot db; continuing"
);
}
}
Ok(())
}
pub(crate) fn parse_snapshot_timestamp(filename: &str, table: &str, ext: &str) -> Option<u64> {
let prefix = format!("{}.", table);
let suffix = format!(".{}", ext);
let without_prefix = filename.strip_prefix(&prefix)?;
let ts_str = without_prefix.strip_suffix(&suffix)?;
ts_str.parse::<u64>().ok()
}
#[derive(Debug, Default, Deserialize, JsonSchema)]
#[serde(default)]
pub struct DataSnapshotParams {
pub table: Option<String>,
pub scope: Option<String>,
pub dry_run: Option<bool>,
}
struct SnapshotTarget {
table_name: String,
scope_root: PathBuf,
db_path: PathBuf,
store: Arc<crate::store::Store>,
}
pub async fn do_data_snapshot(
config: &Config,
tables: &Arc<ArcSwap<TableRegistry>>,
params: DataSnapshotParams,
) -> Result<String, MiniAppError> {
let dry_run = params.dry_run.unwrap_or(false);
let targets: Vec<SnapshotTarget> = {
let registry = tables.load_full();
resolve_targets(
®istry,
config,
params.table.as_deref(),
params.scope.as_deref(),
)?
};
if dry_run {
let mut target_tables: Vec<String> = targets.iter().map(|t| t.table_name.clone()).collect();
target_tables.sort();
let mut row_counts: HashMap<String, u64> = HashMap::new();
let mut would_purge: HashMap<String, usize> = HashMap::new();
for target in &targets {
let count = target.store.row_count().await.map_err(|e| {
MiniAppError::Snapshot(format!(
"row_count failed for table '{}': {e}",
target.table_name
))
})?;
row_counts.insert(target.table_name.clone(), count);
let purge_count = count_would_purge(
&target.scope_root,
&target.table_name,
config.snapshot_retention(),
);
would_purge.insert(target.table_name.clone(), purge_count);
}
let result = serde_json::json!({
"dry_run": true,
"affects": {
"target_tables": target_tables,
"row_counts": row_counts,
"would_purge_generations": would_purge,
}
});
return serde_json::to_string(&result)
.map_err(|e| MiniAppError::Snapshot(format!("json serialization error: {e}")));
}
let mut snapshotted: Vec<serde_json::Value> = Vec::new();
let mut purged: Vec<serde_json::Value> = Vec::new();
let retention = config.snapshot_retention();
for target in &targets {
write_snapshot_db(&target.scope_root, &target.table_name, &target.db_path).await?;
let snapshot_path = newest_snapshot_path(&target.scope_root, &target.table_name);
let unix_secs = snapshot_path.as_ref().and_then(|p| {
p.file_name()
.and_then(|n| n.to_str())
.and_then(|n| parse_snapshot_timestamp(n, &target.table_name, "db"))
});
let scope_label = scope_label_for(&target.scope_root, config);
snapshotted.push(serde_json::json!({
"table": target.table_name,
"scope": scope_label,
"snapshot_path": snapshot_path.as_ref().map(|p| p.display().to_string()).unwrap_or_default(),
"unix_secs": unix_secs,
}));
let snapshot_dir = target.scope_root.join("_snapshots");
let before_count = count_snapshots_in_dir(&snapshot_dir, &target.table_name);
purge_old_snapshots(&target.scope_root, &target.table_name, retention).await?;
let after_count = count_snapshots_in_dir(&snapshot_dir, &target.table_name);
let removed = before_count.saturating_sub(after_count);
if removed > 0 {
purged.push(serde_json::json!({
"table": target.table_name,
"generations_removed": removed,
}));
}
}
let result = serde_json::json!({
"snapshotted": snapshotted,
"purged": purged,
});
serde_json::to_string(&result)
.map_err(|e| MiniAppError::Snapshot(format!("json serialization error: {e}")))
}
fn resolve_targets(
registry: &TableRegistry,
config: &Config,
table: Option<&str>,
scope: Option<&str>,
) -> Result<Vec<SnapshotTarget>, MiniAppError> {
let is_legacy = registry.default_table().is_some();
if let Some(table_name) = table {
let entry = registry.resolve(Some(table_name))?;
let scope_root = derive_scope_root(&entry.schema_path, is_legacy)?;
let db_path = entry
.schema_path
.parent()
.ok_or_else(|| MiniAppError::Snapshot("schema_path has no parent dir".into()))?
.join(format!("{}.db", table_name));
if let Some(scope_str) = scope {
let expected_dir = resolve_scope_dir(config, scope_str)?;
if let Some(expected) = expected_dir {
if !scope_root.starts_with(&expected) {
return Ok(Vec::new()); }
}
}
return Ok(vec![SnapshotTarget {
table_name: table_name.to_string(),
scope_root,
db_path,
store: Arc::clone(&entry.store),
}]);
}
let scope_filter: Option<PathBuf> = match scope {
Some(s) => resolve_scope_dir(config, s)?,
None => None,
};
let mut targets: Vec<SnapshotTarget> = registry
.entries()
.iter()
.filter_map(|(name, entry)| {
let scope_root = derive_scope_root(&entry.schema_path, is_legacy).ok()?;
if let Some(ref expected) = scope_filter {
if !scope_root.starts_with(expected) {
return None;
}
}
let db_path = entry.schema_path.parent()?.join(format!("{}.db", name));
Some(SnapshotTarget {
table_name: name.clone(),
scope_root,
db_path,
store: Arc::clone(&entry.store),
})
})
.collect();
targets.sort_by(|a, b| a.table_name.cmp(&b.table_name));
Ok(targets)
}
fn derive_scope_root(schema_path: &Path, is_legacy: bool) -> Result<PathBuf, MiniAppError> {
if is_legacy {
schema_path
.parent()
.map(|p| p.to_path_buf())
.ok_or_else(|| MiniAppError::Snapshot("schema_path has no parent dir".into()))
} else {
schema_path
.parent()
.and_then(|p| p.parent())
.map(|p| p.to_path_buf())
.ok_or_else(|| MiniAppError::Snapshot("schema_path has no grandparent dir".into()))
}
}
fn resolve_scope_dir(config: &Config, scope: &str) -> Result<Option<PathBuf>, MiniAppError> {
match scope {
"project" => Ok(config.project_dir.as_deref().map(|p| p.to_path_buf())),
"user" => Ok(config.user_dir.as_deref().map(|p| p.to_path_buf())),
other => Err(MiniAppError::Snapshot(format!(
"unrecognised scope '{other}': expected 'project' or 'user'"
))),
}
}
fn scope_label_for(scope_root: &Path, config: &Config) -> &'static str {
if let Some(pd) = config.project_dir.as_deref() {
if scope_root.starts_with(pd) {
return "project";
}
}
if let Some(ud) = config.user_dir.as_deref() {
if scope_root.starts_with(ud) {
return "user";
}
}
"unknown"
}
fn count_would_purge(scope_root: &Path, table: &str, retention: usize) -> usize {
let snapshot_dir = scope_root.join("_snapshots");
if !snapshot_dir.exists() {
return 0;
}
let Ok(entries) = std::fs::read_dir(&snapshot_dir) else {
return 0;
};
let count = entries
.filter_map(|e| {
let e = e.ok()?;
let name = e.file_name();
parse_snapshot_timestamp(&name.to_string_lossy(), table, "db").map(|_| ())
})
.count();
count.saturating_sub(retention)
}
fn count_snapshots_in_dir(snapshot_dir: &Path, table: &str) -> usize {
if !snapshot_dir.exists() {
return 0;
}
let Ok(entries) = std::fs::read_dir(snapshot_dir) else {
return 0;
};
entries
.filter_map(|e| {
let e = e.ok()?;
let name = e.file_name();
parse_snapshot_timestamp(&name.to_string_lossy(), table, "db").map(|_| ())
})
.count()
}
fn newest_snapshot_path(scope_root: &Path, table: &str) -> Option<PathBuf> {
let snapshot_dir = scope_root.join("_snapshots");
let entries = std::fs::read_dir(&snapshot_dir).ok()?;
let mut best: Option<(u64, PathBuf)> = None;
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if let Some(ts) = parse_snapshot_timestamp(&name_str, table, "db") {
if best.as_ref().is_none_or(|(best_ts, _)| ts > *best_ts) {
best = Some((ts, entry.path()));
}
}
}
best.map(|(_, path)| path)
}
#[cfg(test)]
fn list_snapshot_timestamps(snapshot_dir: &Path, table: &str) -> Result<Vec<u64>, MiniAppError> {
let entries = std::fs::read_dir(snapshot_dir)
.map_err(|e| MiniAppError::Snapshot(format!("cannot read snapshot dir: {e}")))?;
let mut timestamps: Vec<u64> = entries
.filter_map(|entry| {
let entry = entry.ok()?;
let name = entry.file_name();
let name = name.to_string_lossy().to_string();
parse_snapshot_timestamp(&name, table, "db")
})
.collect();
timestamps.sort_unstable_by(|a, b| b.cmp(a));
Ok(timestamps)
}
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::Connection;
use std::path::PathBuf;
use tempfile::TempDir;
use tokio::task;
fn create_test_db(path: &Path) {
let conn = Connection::open(path).expect("open test db");
conn.execute_batch(
"PRAGMA journal_mode=WAL; CREATE TABLE IF NOT EXISTS t (id INTEGER PRIMARY KEY, v TEXT);",
)
.expect("setup test db");
}
#[tokio::test]
async fn write_snapshot_db_creates_db_file_only() {
let dir = TempDir::new().expect("temp dir");
let scope_dir = dir.path();
let db_path = scope_dir.join("items.db");
create_test_db(&db_path);
write_snapshot_db(scope_dir, "items", &db_path)
.await
.expect("write_snapshot_db must succeed");
let snapshot_dir = scope_dir.join("_snapshots");
assert!(snapshot_dir.exists(), "_snapshots dir must be created");
let entries: Vec<_> = std::fs::read_dir(&snapshot_dir)
.expect("read snapshot dir")
.filter_map(|e| e.ok())
.collect();
let yaml_count = entries
.iter()
.filter(|e| e.file_name().to_string_lossy().ends_with(".yaml"))
.count();
let db_count = entries
.iter()
.filter(|e| e.file_name().to_string_lossy().ends_with(".db"))
.count();
assert_eq!(yaml_count, 0, "snapshot must NOT create any yaml file");
assert_eq!(db_count, 1, "exactly one db snapshot must exist");
}
#[tokio::test]
async fn purge_old_snapshots_keeps_n_newest() {
let dir = TempDir::new().expect("temp dir");
let scope_dir = dir.path();
let snapshot_dir = scope_dir.join("_snapshots");
std::fs::create_dir_all(&snapshot_dir).expect("create snapshot dir");
for ts in [100u64, 200, 300, 400, 500] {
std::fs::write(snapshot_dir.join(format!("items.{}.db", ts)), b"db").expect("write db");
}
purge_old_snapshots(scope_dir, "items", 3)
.await
.expect("purge must succeed");
let timestamps = list_snapshot_timestamps(&snapshot_dir, "items").expect("list timestamps");
assert_eq!(timestamps.len(), 3, "exactly 3 snapshots must remain");
assert_eq!(timestamps, vec![500, 400, 300], "newest 3 must be kept");
assert!(!snapshot_dir.join("items.100.db").exists());
assert!(!snapshot_dir.join("items.200.db").exists());
}
#[tokio::test]
async fn purge_old_snapshots_no_op_when_below_limit() {
let dir = TempDir::new().expect("temp dir");
let scope_dir = dir.path();
let snapshot_dir = scope_dir.join("_snapshots");
std::fs::create_dir_all(&snapshot_dir).expect("create snapshot dir");
for ts in [100u64, 200] {
std::fs::write(snapshot_dir.join(format!("items.{}.db", ts)), b"db").expect("write db");
}
purge_old_snapshots(scope_dir, "items", 10)
.await
.expect("purge must succeed");
let timestamps = list_snapshot_timestamps(&snapshot_dir, "items").expect("list timestamps");
assert_eq!(timestamps.len(), 2, "both snapshots must still exist");
}
#[tokio::test]
async fn purge_old_snapshots_no_op_when_dir_missing() {
let dir = TempDir::new().expect("temp dir");
let scope_dir = dir.path();
let result = purge_old_snapshots(scope_dir, "items", 10).await;
assert!(result.is_ok(), "purge must succeed when dir is missing");
assert!(!scope_dir.join("_snapshots").exists());
}
#[tokio::test]
async fn write_snapshot_db_missing_db_returns_snapshot_variant() {
let dir = TempDir::new().expect("temp dir");
let scope_dir = dir.path();
let result =
write_snapshot_db(scope_dir, "items", Path::new("/nonexistent/items.db")).await;
let err = result.expect_err("missing db file must error");
assert!(
matches!(err, MiniAppError::Snapshot(_)),
"expected Snapshot variant, got {:?}",
err
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_snapshot_does_not_block_concurrent_writes() {
let dir = TempDir::new().expect("temp dir");
let db_path = dir.path().join("concurrent.db");
{
let conn = Connection::open(&db_path).expect("open db");
conn.execute_batch(
"PRAGMA journal_mode=WAL; CREATE TABLE rows (id INTEGER PRIMARY KEY, val TEXT);",
)
.expect("setup db");
}
let db_path_writer = db_path.clone();
let scope_dir = dir.path().to_path_buf();
let writer = task::spawn(async move {
task::spawn_blocking(move || {
let conn = Connection::open(&db_path_writer).expect("open writer db");
for i in 0i64..100 {
conn.execute("INSERT INTO rows (val) VALUES (?1)", [format!("v{}", i)])
.expect("insert row");
}
})
.await
.expect("writer blocking task")
});
let snapshot_task = write_snapshot_db(&scope_dir, "concurrent", &db_path);
let (writer_result, snapshot_result) = tokio::join!(writer, snapshot_task);
writer_result.expect("writer must succeed");
snapshot_result.expect("snapshot must succeed");
let snapshot_dir = scope_dir.join("_snapshots");
let snapshot_entries: Vec<PathBuf> = std::fs::read_dir(&snapshot_dir)
.expect("read snapshot dir")
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
p.extension()
.and_then(|x| x.to_str())
.map(|x| x == "db")
.unwrap_or(false)
})
.collect();
assert!(
!snapshot_entries.is_empty(),
"at least one db snapshot must exist"
);
let snap_conn = Connection::open(&snapshot_entries[0]).expect("open snapshot db");
let snap_row_count: i64 = snap_conn
.query_row("SELECT COUNT(*) FROM rows", [], |row| row.get(0))
.unwrap_or(0);
assert!(snap_row_count >= 0, "snapshot db must be a valid sqlite db");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_spawn_blocking_cancel_safety_snapshot_survives() {
let dir = TempDir::new().expect("temp dir");
let scope_dir = dir.path().to_path_buf();
let db_path = scope_dir.join("cancel_test.db");
{
let conn = Connection::open(&db_path).expect("open db");
conn.execute_batch(
"PRAGMA journal_mode=WAL; CREATE TABLE rows (id INTEGER PRIMARY KEY, val TEXT);",
)
.expect("setup db");
}
let snapshot_fut = write_snapshot_db(&scope_dir, "cancel_test", &db_path);
let result = tokio::time::timeout(std::time::Duration::from_millis(1), snapshot_fut).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let src_conn = Connection::open(&db_path).expect("source db must still be openable");
let _count: i64 = src_conn
.query_row("SELECT COUNT(*) FROM rows", [], |row| row.get(0))
.expect("source db must be a valid sqlite db after cancellation");
if let Ok(Ok(())) = result {
let snapshot_dir = scope_dir.join("_snapshots");
assert!(
snapshot_dir.exists(),
"snapshot dir must exist on successful write"
);
}
}
#[tokio::test]
async fn test_do_data_snapshot_dry_run_zero_write() {
use crate::config::Config;
use crate::registry::{TableEntry, TableRegistry};
use crate::schema::{FieldDef, FieldType, SchemaConfig};
use crate::store::Store;
use arc_swap::ArcSwap;
use std::collections::HashMap;
let dir = TempDir::new().expect("temp dir");
let table_name = "items";
let table_dir = dir.path().join(table_name);
std::fs::create_dir_all(&table_dir).expect("create table dir");
let schema_path = table_dir.join("schema.yaml");
std::fs::write(
&schema_path,
"table: items\nfields:\n - name: title\n type: string\n required: true\n",
)
.expect("write schema.yaml");
let db_path = table_dir.join(format!("{}.db", table_name));
let conn = Connection::open(&db_path).expect("open test db");
conn.execute_batch(
"PRAGMA journal_mode=WAL; \
CREATE TABLE IF NOT EXISTS rows (id TEXT PRIMARY KEY, data TEXT, created_at TEXT, updated_at TEXT);",
)
.expect("setup test db");
drop(conn);
let schema = SchemaConfig {
table: table_name.to_string(),
title: None,
description: None,
fields: vec![FieldDef {
name: "title".to_string(),
ty: FieldType::String,
required: true,
description: None,
}],
dump: None,
};
let store = Store::open(&db_path, schema.clone())
.await
.expect("open store");
let entry = TableEntry {
store: Arc::new(store),
schema: Arc::new(schema),
schema_path: Arc::new(schema_path),
};
let mut entries = HashMap::new();
entries.insert(table_name.to_string(), entry);
let registry = TableRegistry::from_entries(entries, None);
let tables: Arc<ArcSwap<TableRegistry>> = Arc::new(ArcSwap::from_pointee(registry));
let config = Config {
schema_path: None,
db_path: None,
user_dir: None,
project_dir: Some(dir.path().to_path_buf()),
backup_retention: None,
snapshot_retention: None,
};
let snapshots_dir = dir.path().join("_snapshots");
assert!(
!snapshots_dir.exists(),
"_snapshots must not exist before dry_run call"
);
let params = DataSnapshotParams {
table: None,
scope: None,
dry_run: Some(true),
};
let result = do_data_snapshot(&config, &tables, params)
.await
.expect("do_data_snapshot dry_run must succeed");
assert!(
!snapshots_dir.exists(),
"_snapshots must not be created by dry_run=true (Crux: zero-write guarantee)"
);
let json: serde_json::Value =
serde_json::from_str(&result).expect("result must be valid JSON");
assert_eq!(
json["dry_run"],
serde_json::Value::Bool(true),
"response must contain dry_run: true"
);
let target_tables = json["affects"]["target_tables"]
.as_array()
.expect("affects.target_tables must be an array");
assert_eq!(
target_tables.len(),
1,
"exactly one table should be in target_tables"
);
assert_eq!(
target_tables[0],
serde_json::Value::String(table_name.to_string()),
"target table must be 'items'"
);
assert!(
json["affects"]["row_counts"].is_object(),
"row_counts must be an object"
);
assert!(
json["affects"]["would_purge_generations"].is_object(),
"would_purge_generations must be an object"
);
}
}