Skip to main content

mini_app_core/
snapshot.rs

1/// Snapshot utilities for mini-app-mcp data tools.
2///
3/// This module provides two public async functions:
4///
5/// - [`write_snapshot_db`] — creates an online SQLite snapshot of a table's
6///   database in `{scope_dir}/_snapshots/`.  No YAML is copied (snapshots are
7///   DB-only).
8/// - [`purge_old_snapshots`] — removes the oldest snapshot files beyond the
9///   configured retention limit.
10///
11/// All I/O is performed inside `tokio::task::spawn_blocking` (K-110) to
12/// avoid blocking the async executor.  The SQLite snapshot uses
13/// `rusqlite::Connection::backup` with a fresh source connection so the
14/// existing `Store`'s `Mutex<Connection>` is never borrowed (K-103).
15///
16/// # Snapshot placement
17///
18/// ```text
19/// {scope_dir}/
20///   _snapshots/
21///     {table}.{unix_secs}.db
22/// ```
23///
24/// # Retention isolation
25///
26/// Snapshot retention is controlled exclusively by `MINI_APP_SNAPSHOT_RETENTION`
27/// (default `10`).  The `_backup/` directory and `MINI_APP_BACKUP_RETENTION` are
28/// never read, written, or purged by this module (Crux: snapshot retention
29/// isolation).
30use std::collections::HashMap;
31use std::path::{Path, PathBuf};
32use std::sync::Arc;
33use std::time::{SystemTime, UNIX_EPOCH};
34
35use arc_swap::ArcSwap;
36use rusqlite::Connection;
37use schemars::JsonSchema;
38use serde::Deserialize;
39
40use crate::config::Config;
41use crate::error::MiniAppError;
42use crate::registry::TableRegistry;
43
44/// Creates a SQLite snapshot for a table using the hot backup API.
45///
46/// The snapshot is written to
47/// `{scope_dir}/_snapshots/{table}.{unix_secs}.db`.  The `_snapshots/`
48/// directory is created if it does not exist.
49///
50/// The snapshot file is created via
51/// `rusqlite::Connection::backup(DatabaseName::Main, …, None)` using a fresh
52/// source connection opened from `db_path` — the existing Store connection is
53/// never borrowed (K-103).  This satisfies the SQLite Online Backup API
54/// contract that "the source can be used while the backup is running".
55///
56/// A `PRAGMA wal_checkpoint(TRUNCATE)` is attempted before the backup to
57/// ensure the WAL is flushed into the main DB file so the snapshot captures
58/// the most recent committed state.  If the checkpoint fails it is logged as a
59/// warning and the backup continues regardless (rusqlite's backup API handles
60/// WAL-mode databases internally).
61///
62/// **Crux (rusqlite hot backup API)**: only `rusqlite::Connection::backup` is
63/// used to create the snapshot.  `std::fs::copy` of the `.db` file is never
64/// used because it would produce a corrupted or stale snapshot when the source
65/// database has an open WAL file.
66///
67/// # Arguments
68/// - `scope_dir`: the `.mini-app/<scope>/` root directory for this table.
69/// - `table`: the logical table name (used as filename prefix).
70/// - `db_path`: path to the SQLite database file to snapshot.
71///
72/// # Returns
73/// `Ok(())` on success.
74///
75/// # Errors
76/// - [`MiniAppError::Snapshot`] if the timestamp cannot be determined, the
77///   snapshot directory cannot be created, or the SQLite backup fails.
78/// - [`MiniAppError::Snapshot`] if the `spawn_blocking` task panics.
79pub async fn write_snapshot_db(
80    scope_dir: &Path,
81    table: &str,
82    db_path: &Path,
83) -> Result<(), MiniAppError> {
84    let scope_dir = scope_dir.to_path_buf();
85    let table = table.to_string();
86    let db_path = db_path.to_path_buf();
87
88    tokio::task::spawn_blocking(move || -> Result<(), MiniAppError> {
89        write_snapshot_db_sync(&scope_dir, &table, &db_path)
90    })
91    .await
92    .map_err(|e| MiniAppError::Snapshot(format!("blocking task panic: {e}")))?
93}
94
95/// Synchronous implementation of [`write_snapshot_db`], executed inside
96/// `spawn_blocking`.
97fn write_snapshot_db_sync(
98    scope_dir: &Path,
99    table: &str,
100    db_path: &Path,
101) -> Result<(), MiniAppError> {
102    // Obtain current Unix timestamp (seconds since UNIX_EPOCH).
103    let unix_secs = SystemTime::now()
104        .duration_since(UNIX_EPOCH)
105        .map_err(|e| MiniAppError::Snapshot(format!("system clock error: {e}")))?
106        .as_secs();
107
108    let snapshot_dir = scope_dir.join("_snapshots");
109    std::fs::create_dir_all(&snapshot_dir)
110        .map_err(|e| MiniAppError::Snapshot(format!("cannot create snapshot dir: {e}")))?;
111
112    // Open a fresh source connection for the backup so we don't borrow the
113    // Store's Mutex<Connection> (K-103).
114    let src_conn = Connection::open(db_path)
115        .map_err(|e| MiniAppError::Snapshot(format!("cannot open source db: {e}")))?;
116
117    // Attempt WAL checkpoint before snapshot.  Failure is non-fatal.
118    if let Err(e) = src_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE)") {
119        tracing::warn!(error = %e, "WAL checkpoint before snapshot failed; continuing anyway");
120    }
121
122    let db_dst = snapshot_dir.join(format!("{}.{}.db", table, unix_secs));
123    // Crux: use rusqlite::Connection::backup (hot backup API), never std::fs::copy.
124    src_conn
125        .backup(rusqlite::DatabaseName::Main, &db_dst, None)
126        .map_err(|e| MiniAppError::Snapshot(format!("rusqlite backup failed: {e}")))?;
127
128    Ok(())
129}
130
131/// Removes the oldest snapshot files beyond the retention limit.
132///
133/// Scans `{scope_dir}/_snapshots/` for files matching `{table}.*.db`.  Files
134/// are sorted by the numeric timestamp embedded in their name (descending —
135/// newest first).  Files beyond the `retention` limit are deleted.
136///
137/// If a file cannot be removed (e.g. already deleted), the error is logged as
138/// a warning and purge continues for the remaining files.
139///
140/// **Crux (snapshot retention isolation)**: this function only touches
141/// `{scope_dir}/_snapshots/`.  It never reads, writes, or removes files from
142/// `{scope_dir}/_backup/`, and it never consults `MINI_APP_BACKUP_RETENTION`.
143///
144/// # Arguments
145/// - `scope_dir`: the `.mini-app/<scope>/` root for this table.
146/// - `table`: the logical table name used as filename prefix.
147/// - `retention`: number of snapshot files to keep (files beyond this count
148///   are deleted).
149///
150/// # Returns
151/// `Ok(())` on success (including the no-op case where fewer than
152/// `retention + 1` snapshot files exist).
153///
154/// # Errors
155/// - [`MiniAppError::Snapshot`] if the `_snapshots` directory cannot be read,
156///   or if the `spawn_blocking` task panics.
157pub async fn purge_old_snapshots(
158    scope_dir: &Path,
159    table: &str,
160    retention: usize,
161) -> Result<(), MiniAppError> {
162    let scope_dir = scope_dir.to_path_buf();
163    let table = table.to_string();
164
165    tokio::task::spawn_blocking(move || -> Result<(), MiniAppError> {
166        purge_old_snapshots_sync(&scope_dir, &table, retention)
167    })
168    .await
169    .map_err(|e| MiniAppError::Snapshot(format!("blocking task panic: {e}")))?
170}
171
172/// Synchronous implementation of [`purge_old_snapshots`], executed inside
173/// `spawn_blocking`.
174fn purge_old_snapshots_sync(
175    scope_dir: &Path,
176    table: &str,
177    retention: usize,
178) -> Result<(), MiniAppError> {
179    let snapshot_dir = scope_dir.join("_snapshots");
180
181    // If the snapshot directory does not exist yet, nothing to purge.
182    if !snapshot_dir.exists() {
183        return Ok(());
184    }
185
186    // Collect timestamps from .db files that belong to this table.
187    let entries = std::fs::read_dir(&snapshot_dir)
188        .map_err(|e| MiniAppError::Snapshot(format!("cannot read snapshot dir: {e}")))?;
189
190    let mut timestamps: Vec<u64> = entries
191        .filter_map(|entry| {
192            let entry = entry.ok()?;
193            let name = entry.file_name();
194            let name = name.to_string_lossy();
195            parse_snapshot_timestamp(&name, table, "db")
196        })
197        .collect();
198
199    // Sort descending — newest first.
200    timestamps.sort_unstable_by(|a, b| b.cmp(a));
201
202    // Delete snapshot files beyond `retention`.
203    for ts in timestamps.iter().skip(retention) {
204        let db_path = snapshot_dir.join(format!("{}.{}.db", table, ts));
205
206        if let Err(e) = std::fs::remove_file(&db_path) {
207            tracing::warn!(
208                path = %db_path.display(),
209                error = %e,
210                "failed to remove old snapshot db; continuing"
211            );
212        }
213    }
214
215    Ok(())
216}
217
218/// Parses the numeric timestamp from a snapshot filename of the form
219/// `{table}.{ts}.{ext}`.
220///
221/// Returns `None` if the name does not match the expected pattern or if the
222/// timestamp segment is not a valid `u64`.
223///
224/// # Arguments
225/// - `filename`: the bare filename string to parse.
226/// - `table`: the expected table name prefix.
227/// - `ext`: the expected extension (without leading dot), e.g. `"db"`.
228pub(crate) fn parse_snapshot_timestamp(filename: &str, table: &str, ext: &str) -> Option<u64> {
229    // Expected format: "{table}.{ts}.{ext}"
230    let prefix = format!("{}.", table);
231    let suffix = format!(".{}", ext);
232
233    let without_prefix = filename.strip_prefix(&prefix)?;
234    let ts_str = without_prefix.strip_suffix(&suffix)?;
235    ts_str.parse::<u64>().ok()
236}
237
238// =============================================================================
239// MCP tool: data_snapshot
240// =============================================================================
241
242/// Parameters for the `data_snapshot` MCP tool.
243///
244/// All fields are optional; `None` means "all" (tables / scopes).  The
245/// `dry_run` flag, when `true`, returns inspection metadata without touching
246/// any file or database state (Crux: dry_run zero-write guarantee).
247#[derive(Debug, Default, Deserialize, JsonSchema)]
248#[serde(default)]
249pub struct DataSnapshotParams {
250    /// Target a single table by name.  When `None`, all mounted tables in the
251    /// given `scope` (or all scopes) are snapshotted.
252    pub table: Option<String>,
253    /// Restrict operation to `"project"` or `"user"` scope.  When `None`,
254    /// all scopes are considered.
255    pub scope: Option<String>,
256    /// When `true`, returns `affects` metadata (target tables, row counts,
257    /// would-purge counts) **without** creating, modifying, or deleting any
258    /// file or database state (Crux: dry_run zero-write guarantee).
259    pub dry_run: Option<bool>,
260}
261
262/// A single entry resolved for snapshotting.
263///
264/// Holds clones of the Arc pointers extracted from the registry so the
265/// ArcSwap Guard can be dropped before any `.await` (K-110 / no
266/// await-holding-lock).
267struct SnapshotTarget {
268    table_name: String,
269    scope_root: PathBuf,
270    db_path: PathBuf,
271    store: Arc<crate::store::Store>,
272}
273
274/// Executes the `data_snapshot` MCP tool.
275///
276/// Fan-out logic:
277/// - `table=Some + scope=Some` → 1 matching entry (scope-filtered).
278/// - `table=Some + scope=None` → 1 entry via `registry.resolve`.
279/// - `table=None + scope=Some("project")` → all entries whose `schema_path`
280///   starts with `config.project_dir`.
281/// - `table=None + scope=Some("user")` → same with `config.user_dir`.
282/// - `table=None + scope=None` → all mounted entries.
283///
284/// When `dry_run=true` (Crux: dry_run zero-write guarantee):
285/// - Returns `{ "dry_run": true, "affects": { "target_tables": [...],
286///   "row_counts": {...}, "would_purge_generations": {...} } }`.
287/// - **No file or database state is created, modified, or deleted.**
288///
289/// When `dry_run=false` (or omitted):
290/// - Calls [`write_snapshot_db`] then [`purge_old_snapshots`] per entry.
291/// - Returns `{ "snapshotted": [...], "purged": [...] }`.
292///
293/// # Arguments
294/// - `config`: server mount configuration (dirs + retention).
295/// - `tables`: the live `ArcSwap`-wrapped table registry.
296/// - `params`: tool parameters.
297///
298/// # Returns
299/// JSON string with operation results.
300///
301/// # Errors
302/// - [`MiniAppError::Snapshot`] if any snapshot or purge operation fails, or
303///   if the scope argument is unrecognised.
304pub async fn do_data_snapshot(
305    config: &Config,
306    tables: &Arc<ArcSwap<TableRegistry>>,
307    params: DataSnapshotParams,
308) -> Result<String, MiniAppError> {
309    let dry_run = params.dry_run.unwrap_or(false);
310
311    // Resolve the target entries from the registry.  The ArcSwap Guard is
312    // dropped immediately after the clone loop (no Guard across .await).
313    let targets: Vec<SnapshotTarget> = {
314        let registry = tables.load_full();
315        resolve_targets(
316            &registry,
317            config,
318            params.table.as_deref(),
319            params.scope.as_deref(),
320        )?
321    };
322
323    if dry_run {
324        // Crux: dry_run zero-write guarantee — read-only path only.
325        let mut target_tables: Vec<String> = targets.iter().map(|t| t.table_name.clone()).collect();
326        target_tables.sort();
327
328        let mut row_counts: HashMap<String, u64> = HashMap::new();
329        let mut would_purge: HashMap<String, usize> = HashMap::new();
330
331        for target in &targets {
332            // row_count uses Store::row_count() which is read-only (SELECT COUNT(*)).
333            let count = target.store.row_count().await.map_err(|e| {
334                MiniAppError::Snapshot(format!(
335                    "row_count failed for table '{}': {e}",
336                    target.table_name
337                ))
338            })?;
339            row_counts.insert(target.table_name.clone(), count);
340
341            // Compute would-purge count by scanning _snapshots/ read-only.
342            // No write occurs here (Crux: dry_run zero-write guarantee).
343            let purge_count = count_would_purge(
344                &target.scope_root,
345                &target.table_name,
346                config.snapshot_retention(),
347            );
348            would_purge.insert(target.table_name.clone(), purge_count);
349        }
350
351        let result = serde_json::json!({
352            "dry_run": true,
353            "affects": {
354                "target_tables": target_tables,
355                "row_counts": row_counts,
356                "would_purge_generations": would_purge,
357            }
358        });
359        return serde_json::to_string(&result)
360            .map_err(|e| MiniAppError::Snapshot(format!("json serialization error: {e}")));
361    }
362
363    // Real path: write snapshots and purge old generations.
364    let mut snapshotted: Vec<serde_json::Value> = Vec::new();
365    let mut purged: Vec<serde_json::Value> = Vec::new();
366
367    let retention = config.snapshot_retention();
368
369    for target in &targets {
370        // Write the snapshot using the hot backup API (Crux: rusqlite hot backup API).
371        write_snapshot_db(&target.scope_root, &target.table_name, &target.db_path).await?;
372
373        // Determine the timestamp of the snapshot just written (newest file).
374        let snapshot_path = newest_snapshot_path(&target.scope_root, &target.table_name);
375        let unix_secs = snapshot_path.as_ref().and_then(|p| {
376            p.file_name()
377                .and_then(|n| n.to_str())
378                .and_then(|n| parse_snapshot_timestamp(n, &target.table_name, "db"))
379        });
380
381        let scope_label = scope_label_for(&target.scope_root, config);
382        snapshotted.push(serde_json::json!({
383            "table": target.table_name,
384            "scope": scope_label,
385            "snapshot_path": snapshot_path.as_ref().map(|p| p.display().to_string()).unwrap_or_default(),
386            "unix_secs": unix_secs,
387        }));
388
389        // Purge old generations (Crux: snapshot retention isolation — only
390        // calls config.snapshot_retention(), never backup_retention()).
391        let snapshot_dir = target.scope_root.join("_snapshots");
392        let before_count = count_snapshots_in_dir(&snapshot_dir, &target.table_name);
393        purge_old_snapshots(&target.scope_root, &target.table_name, retention).await?;
394        let after_count = count_snapshots_in_dir(&snapshot_dir, &target.table_name);
395        let removed = before_count.saturating_sub(after_count);
396
397        if removed > 0 {
398            purged.push(serde_json::json!({
399                "table": target.table_name,
400                "generations_removed": removed,
401            }));
402        }
403    }
404
405    let result = serde_json::json!({
406        "snapshotted": snapshotted,
407        "purged": purged,
408    });
409    serde_json::to_string(&result)
410        .map_err(|e| MiniAppError::Snapshot(format!("json serialization error: {e}")))
411}
412
413/// Resolves the list of snapshot targets from the registry according to
414/// `table` and `scope` filter parameters.
415///
416/// # Arguments
417/// - `registry`: the current table registry snapshot.
418/// - `config`: mount config (for scope dir resolution).
419/// - `table`: optional table name filter.
420/// - `scope`: optional scope string (`"project"` or `"user"`).
421///
422/// # Returns
423/// A `Vec<SnapshotTarget>` sorted by table name for deterministic output.
424///
425/// # Errors
426/// - [`MiniAppError::Snapshot`] if the scope is unrecognised or if a
427///   `schema_path` has no parent directory.
428/// - [`MiniAppError::TableNotFound`] / [`MiniAppError::TableRequired`] from
429///   `registry.resolve` when `table=Some`.
430fn resolve_targets(
431    registry: &TableRegistry,
432    config: &Config,
433    table: Option<&str>,
434    scope: Option<&str>,
435) -> Result<Vec<SnapshotTarget>, MiniAppError> {
436    let is_legacy = registry.default_table().is_some();
437
438    if let Some(table_name) = table {
439        // Single-table path: resolve via registry.
440        let entry = registry.resolve(Some(table_name))?;
441        let scope_root = derive_scope_root(&entry.schema_path, is_legacy)?;
442        let db_path = entry
443            .schema_path
444            .parent()
445            .ok_or_else(|| MiniAppError::Snapshot("schema_path has no parent dir".into()))?
446            .join(format!("{}.db", table_name));
447
448        // Verify scope filter if provided.
449        if let Some(scope_str) = scope {
450            let expected_dir = resolve_scope_dir(config, scope_str)?;
451            if let Some(expected) = expected_dir {
452                if !scope_root.starts_with(&expected) {
453                    return Ok(Vec::new()); // No match.
454                }
455            }
456        }
457
458        return Ok(vec![SnapshotTarget {
459            table_name: table_name.to_string(),
460            scope_root,
461            db_path,
462            store: Arc::clone(&entry.store),
463        }]);
464    }
465
466    // Multi-table path: iterate entries with optional scope filter.
467    let scope_filter: Option<PathBuf> = match scope {
468        Some(s) => resolve_scope_dir(config, s)?,
469        None => None,
470    };
471
472    let mut targets: Vec<SnapshotTarget> = registry
473        .entries()
474        .iter()
475        .filter_map(|(name, entry)| {
476            let scope_root = derive_scope_root(&entry.schema_path, is_legacy).ok()?;
477            // Apply scope filter if present.
478            if let Some(ref expected) = scope_filter {
479                if !scope_root.starts_with(expected) {
480                    return None;
481                }
482            }
483            let db_path = entry.schema_path.parent()?.join(format!("{}.db", name));
484            Some(SnapshotTarget {
485                table_name: name.clone(),
486                scope_root,
487                db_path,
488                store: Arc::clone(&entry.store),
489            })
490        })
491        .collect();
492
493    // Sort by table name for deterministic output (HashMap is unordered).
494    targets.sort_by(|a, b| a.table_name.cmp(&b.table_name));
495    Ok(targets)
496}
497
498/// Derives the `scope_root` path from a `schema_path`.
499///
500/// In **multi-table mode** (`is_legacy = false`), `schema_path` follows
501/// `{scope_root}/{table}/schema.yaml`, so the scope root is 2 levels up.
502///
503/// In **legacy mode** (`is_legacy = true`), `schema_path` is an arbitrary
504/// path provided via `MINI_APP_SCHEMA`, so the scope root is 1 level up
505/// (same directory as the schema file).
506///
507/// # Errors
508/// - [`MiniAppError::Snapshot`] if a required parent directory cannot be
509///   determined.
510fn derive_scope_root(schema_path: &Path, is_legacy: bool) -> Result<PathBuf, MiniAppError> {
511    if is_legacy {
512        schema_path
513            .parent()
514            .map(|p| p.to_path_buf())
515            .ok_or_else(|| MiniAppError::Snapshot("schema_path has no parent dir".into()))
516    } else {
517        schema_path
518            .parent()
519            .and_then(|p| p.parent())
520            .map(|p| p.to_path_buf())
521            .ok_or_else(|| MiniAppError::Snapshot("schema_path has no grandparent dir".into()))
522    }
523}
524
525/// Resolves the filesystem path for a scope string (`"project"` or `"user"`).
526///
527/// Returns `Ok(None)` if the corresponding directory is not configured.
528///
529/// # Errors
530/// - [`MiniAppError::Snapshot`] if `scope` is not `"project"` or `"user"`.
531fn resolve_scope_dir(config: &Config, scope: &str) -> Result<Option<PathBuf>, MiniAppError> {
532    match scope {
533        "project" => Ok(config.project_dir.as_deref().map(|p| p.to_path_buf())),
534        "user" => Ok(config.user_dir.as_deref().map(|p| p.to_path_buf())),
535        other => Err(MiniAppError::Snapshot(format!(
536            "unrecognised scope '{other}': expected 'project' or 'user'"
537        ))),
538    }
539}
540
541/// Returns a human-readable scope label (`"project"`, `"user"`, or `"unknown"`)
542/// by comparing `scope_root` against the configured dirs.
543fn scope_label_for(scope_root: &Path, config: &Config) -> &'static str {
544    if let Some(pd) = config.project_dir.as_deref() {
545        if scope_root.starts_with(pd) {
546            return "project";
547        }
548    }
549    if let Some(ud) = config.user_dir.as_deref() {
550        if scope_root.starts_with(ud) {
551            return "user";
552        }
553    }
554    "unknown"
555}
556
557/// Counts how many snapshot generations would be purged for a given table
558/// given the current retention setting.
559///
560/// Reads the `_snapshots/` directory but never writes, modifies, or deletes
561/// anything (Crux: dry_run zero-write guarantee).
562///
563/// Returns `0` if the `_snapshots/` directory does not exist.
564fn count_would_purge(scope_root: &Path, table: &str, retention: usize) -> usize {
565    let snapshot_dir = scope_root.join("_snapshots");
566    if !snapshot_dir.exists() {
567        return 0;
568    }
569    let Ok(entries) = std::fs::read_dir(&snapshot_dir) else {
570        return 0;
571    };
572    let count = entries
573        .filter_map(|e| {
574            let e = e.ok()?;
575            let name = e.file_name();
576            parse_snapshot_timestamp(&name.to_string_lossy(), table, "db").map(|_| ())
577        })
578        .count();
579    count.saturating_sub(retention)
580}
581
582/// Counts the number of `.db` snapshot files for `table` in `snapshot_dir`.
583fn count_snapshots_in_dir(snapshot_dir: &Path, table: &str) -> usize {
584    if !snapshot_dir.exists() {
585        return 0;
586    }
587    let Ok(entries) = std::fs::read_dir(snapshot_dir) else {
588        return 0;
589    };
590    entries
591        .filter_map(|e| {
592            let e = e.ok()?;
593            let name = e.file_name();
594            parse_snapshot_timestamp(&name.to_string_lossy(), table, "db").map(|_| ())
595        })
596        .count()
597}
598
599/// Returns the path of the newest snapshot file for `table` in `scope_root/_snapshots/`,
600/// or `None` if none exist.
601fn newest_snapshot_path(scope_root: &Path, table: &str) -> Option<PathBuf> {
602    let snapshot_dir = scope_root.join("_snapshots");
603    let entries = std::fs::read_dir(&snapshot_dir).ok()?;
604    let mut best: Option<(u64, PathBuf)> = None;
605    for entry in entries.flatten() {
606        let name = entry.file_name();
607        let name_str = name.to_string_lossy();
608        if let Some(ts) = parse_snapshot_timestamp(&name_str, table, "db") {
609            if best.as_ref().is_none_or(|(best_ts, _)| ts > *best_ts) {
610                best = Some((ts, entry.path()));
611            }
612        }
613    }
614    best.map(|(_, path)| path)
615}
616
617/// Returns the sorted list of snapshot timestamps (descending) for a given
618/// table, scanning only `.db` files.  Used internally for testing.
619///
620/// # Arguments
621/// - `snapshot_dir`: the `_snapshots/` directory to scan.
622/// - `table`: the logical table name.
623///
624/// # Returns
625/// A `Vec<u64>` of timestamps sorted newest-first.
626///
627/// # Errors
628/// - [`MiniAppError::Snapshot`] if the directory cannot be read.
629#[cfg(test)]
630fn list_snapshot_timestamps(snapshot_dir: &Path, table: &str) -> Result<Vec<u64>, MiniAppError> {
631    let entries = std::fs::read_dir(snapshot_dir)
632        .map_err(|e| MiniAppError::Snapshot(format!("cannot read snapshot dir: {e}")))?;
633
634    let mut timestamps: Vec<u64> = entries
635        .filter_map(|entry| {
636            let entry = entry.ok()?;
637            let name = entry.file_name();
638            let name = name.to_string_lossy().to_string();
639            parse_snapshot_timestamp(&name, table, "db")
640        })
641        .collect();
642
643    timestamps.sort_unstable_by(|a, b| b.cmp(a));
644    Ok(timestamps)
645}
646
647#[cfg(test)]
648mod tests {
649    use super::*;
650    use rusqlite::Connection;
651    use std::path::PathBuf;
652    use tempfile::TempDir;
653    use tokio::task;
654
655    /// Helper: create a minimal SQLite database with WAL mode enabled at `path`.
656    fn create_test_db(path: &Path) {
657        // SAFETY: Connection::open and execute_batch are safe in test context;
658        // panicking here would fail the test with a clear message.
659        let conn = Connection::open(path).expect("open test db");
660        conn.execute_batch(
661            "PRAGMA journal_mode=WAL; CREATE TABLE IF NOT EXISTS t (id INTEGER PRIMARY KEY, v TEXT);",
662        )
663        .expect("setup test db");
664    }
665
666    // ── T1: happy-path ────────────────────────────────────────────────────
667
668    /// T1: write_snapshot_db creates exactly one .db file in `_snapshots/`
669    /// and does NOT create any .yaml file (snapshots are DB-only).
670    #[tokio::test]
671    async fn write_snapshot_db_creates_db_file_only() {
672        let dir = TempDir::new().expect("temp dir");
673        let scope_dir = dir.path();
674        let db_path = scope_dir.join("items.db");
675
676        create_test_db(&db_path);
677
678        write_snapshot_db(scope_dir, "items", &db_path)
679            .await
680            .expect("write_snapshot_db must succeed");
681
682        let snapshot_dir = scope_dir.join("_snapshots");
683        assert!(snapshot_dir.exists(), "_snapshots dir must be created");
684
685        let entries: Vec<_> = std::fs::read_dir(&snapshot_dir)
686            .expect("read snapshot dir")
687            .filter_map(|e| e.ok())
688            .collect();
689
690        let yaml_count = entries
691            .iter()
692            .filter(|e| e.file_name().to_string_lossy().ends_with(".yaml"))
693            .count();
694        let db_count = entries
695            .iter()
696            .filter(|e| e.file_name().to_string_lossy().ends_with(".db"))
697            .count();
698
699        assert_eq!(yaml_count, 0, "snapshot must NOT create any yaml file");
700        assert_eq!(db_count, 1, "exactly one db snapshot must exist");
701    }
702
703    /// T1: purge_old_snapshots keeps only the N newest .db files.
704    #[tokio::test]
705    async fn purge_old_snapshots_keeps_n_newest() {
706        let dir = TempDir::new().expect("temp dir");
707        let scope_dir = dir.path();
708        let snapshot_dir = scope_dir.join("_snapshots");
709        std::fs::create_dir_all(&snapshot_dir).expect("create snapshot dir");
710
711        // Create 5 fake snapshot .db files with distinct timestamps.
712        for ts in [100u64, 200, 300, 400, 500] {
713            std::fs::write(snapshot_dir.join(format!("items.{}.db", ts)), b"db").expect("write db");
714        }
715
716        purge_old_snapshots(scope_dir, "items", 3)
717            .await
718            .expect("purge must succeed");
719
720        // Newest 3 timestamps: 500, 400, 300.  Oldest 2 (100, 200) must be gone.
721        let timestamps = list_snapshot_timestamps(&snapshot_dir, "items").expect("list timestamps");
722        assert_eq!(timestamps.len(), 3, "exactly 3 snapshots must remain");
723        assert_eq!(timestamps, vec![500, 400, 300], "newest 3 must be kept");
724
725        // Verify the deleted snapshots are truly gone.
726        assert!(!snapshot_dir.join("items.100.db").exists());
727        assert!(!snapshot_dir.join("items.200.db").exists());
728    }
729
730    // ── T2: boundary / edge-case ──────────────────────────────────────────
731
732    /// T2: purge_old_snapshots is a no-op when snapshot count is below retention.
733    #[tokio::test]
734    async fn purge_old_snapshots_no_op_when_below_limit() {
735        let dir = TempDir::new().expect("temp dir");
736        let scope_dir = dir.path();
737        let snapshot_dir = scope_dir.join("_snapshots");
738        std::fs::create_dir_all(&snapshot_dir).expect("create snapshot dir");
739
740        // Only 2 snapshots, retention = 10.
741        for ts in [100u64, 200] {
742            std::fs::write(snapshot_dir.join(format!("items.{}.db", ts)), b"db").expect("write db");
743        }
744
745        purge_old_snapshots(scope_dir, "items", 10)
746            .await
747            .expect("purge must succeed");
748
749        let timestamps = list_snapshot_timestamps(&snapshot_dir, "items").expect("list timestamps");
750        assert_eq!(timestamps.len(), 2, "both snapshots must still exist");
751    }
752
753    /// T2: purge_old_snapshots is a no-op when _snapshots/ directory does not
754    /// exist yet (first call before any snapshot has been written).
755    #[tokio::test]
756    async fn purge_old_snapshots_no_op_when_dir_missing() {
757        let dir = TempDir::new().expect("temp dir");
758        let scope_dir = dir.path();
759        // _snapshots/ directory is never created.
760
761        let result = purge_old_snapshots(scope_dir, "items", 10).await;
762        assert!(result.is_ok(), "purge must succeed when dir is missing");
763
764        // Directory must still not exist after no-op purge.
765        assert!(!scope_dir.join("_snapshots").exists());
766    }
767
768    // ── T3: error-path ────────────────────────────────────────────────────
769
770    /// T3: write_snapshot_db returns Snapshot error when db_path does not exist.
771    #[tokio::test]
772    async fn write_snapshot_db_missing_db_returns_snapshot_variant() {
773        let dir = TempDir::new().expect("temp dir");
774        let scope_dir = dir.path();
775
776        // Point to a non-existent database file.
777        let result =
778            write_snapshot_db(scope_dir, "items", Path::new("/nonexistent/items.db")).await;
779
780        let err = result.expect_err("missing db file must error");
781        assert!(
782            matches!(err, MiniAppError::Snapshot(_)),
783            "expected Snapshot variant, got {:?}",
784            err
785        );
786    }
787
788    // ── Concurrency: snapshot does not block concurrent writes ────────────
789
790    /// Concurrency test: snapshot runs concurrently with INSERT operations and
791    /// both complete successfully.
792    ///
793    /// This verifies `rusqlite::Connection::backup` is safe to call on a
794    /// WAL-mode database while another connection is writing.  rusqlite docs
795    /// state "source can be used while the backup is running".
796    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
797    async fn test_snapshot_does_not_block_concurrent_writes() {
798        let dir = TempDir::new().expect("temp dir");
799        let db_path = dir.path().join("concurrent.db");
800
801        // Prepare DB with WAL mode and a table.
802        {
803            let conn = Connection::open(&db_path).expect("open db");
804            conn.execute_batch(
805                "PRAGMA journal_mode=WAL; CREATE TABLE rows (id INTEGER PRIMARY KEY, val TEXT);",
806            )
807            .expect("setup db");
808        }
809
810        let db_path_writer = db_path.clone();
811        let scope_dir = dir.path().to_path_buf();
812
813        // Launch writer task: inserts 100 rows using a separate connection.
814        let writer = task::spawn(async move {
815            task::spawn_blocking(move || {
816                let conn = Connection::open(&db_path_writer).expect("open writer db");
817                for i in 0i64..100 {
818                    conn.execute("INSERT INTO rows (val) VALUES (?1)", [format!("v{}", i)])
819                        .expect("insert row");
820                }
821            })
822            .await
823            .expect("writer blocking task")
824        });
825
826        // Launch snapshot task: runs the snapshot while writer is active.
827        let snapshot_task = write_snapshot_db(&scope_dir, "concurrent", &db_path);
828
829        let (writer_result, snapshot_result) = tokio::join!(writer, snapshot_task);
830
831        writer_result.expect("writer must succeed");
832        snapshot_result.expect("snapshot must succeed");
833
834        // The snapshot file must exist and be a valid SQLite database.
835        let snapshot_dir = scope_dir.join("_snapshots");
836        let snapshot_entries: Vec<PathBuf> = std::fs::read_dir(&snapshot_dir)
837            .expect("read snapshot dir")
838            .filter_map(|e| e.ok())
839            .map(|e| e.path())
840            .filter(|p| {
841                p.extension()
842                    .and_then(|x| x.to_str())
843                    .map(|x| x == "db")
844                    .unwrap_or(false)
845            })
846            .collect();
847        assert!(
848            !snapshot_entries.is_empty(),
849            "at least one db snapshot must exist"
850        );
851
852        // Verify snapshot db is a valid SQLite database (can be opened).
853        let snap_conn = Connection::open(&snapshot_entries[0]).expect("open snapshot db");
854        let snap_row_count: i64 = snap_conn
855            .query_row("SELECT COUNT(*) FROM rows", [], |row| row.get(0))
856            .unwrap_or(0);
857        // Snapshot may have captured 0..100 rows (concurrent; exact count not deterministic).
858        assert!(snap_row_count >= 0, "snapshot db must be a valid sqlite db");
859    }
860
861    // ── Concurrency: spawn_blocking cancel safety ─────────────────────────
862
863    /// Cancel-safety test: dropping a `write_snapshot_db` Future immediately
864    /// after spawn_blocking starts does not leave the source DB in a corrupt state.
865    ///
866    /// `tokio::task::spawn_blocking` is abort-unsafe: once the blocking
867    /// closure starts running it runs to completion even if the outer Future
868    /// is dropped.  This test verifies that the source DB remains valid after
869    /// the Future has been dropped.
870    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
871    async fn test_spawn_blocking_cancel_safety_snapshot_survives() {
872        let dir = TempDir::new().expect("temp dir");
873        let scope_dir = dir.path().to_path_buf();
874        let db_path = scope_dir.join("cancel_test.db");
875
876        {
877            let conn = Connection::open(&db_path).expect("open db");
878            conn.execute_batch(
879                "PRAGMA journal_mode=WAL; CREATE TABLE rows (id INTEGER PRIMARY KEY, val TEXT);",
880            )
881            .expect("setup db");
882        }
883
884        // Issue snapshot with a very short timeout to trigger "cancel" of the Future.
885        // spawn_blocking closure continues running even after the outer Future is dropped.
886        let snapshot_fut = write_snapshot_db(&scope_dir, "cancel_test", &db_path);
887        let result = tokio::time::timeout(std::time::Duration::from_millis(1), snapshot_fut).await;
888
889        // Give the spawn_blocking closure time to complete (it runs to completion
890        // regardless of the timeout because spawn_blocking is abort-unsafe).
891        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
892
893        // The source DB must not be corrupted regardless of whether the Future
894        // was cancelled or completed.
895        let src_conn = Connection::open(&db_path).expect("source db must still be openable");
896        let _count: i64 = src_conn
897            .query_row("SELECT COUNT(*) FROM rows", [], |row| row.get(0))
898            .expect("source db must be a valid sqlite db after cancellation");
899
900        // If the future completed successfully, verify the snapshot directory exists.
901        if let Ok(Ok(())) = result {
902            let snapshot_dir = scope_dir.join("_snapshots");
903            assert!(
904                snapshot_dir.exists(),
905                "snapshot dir must exist on successful write"
906            );
907        }
908        // Whether timed out or not, no panic occurred — the test passes.
909    }
910
911    // ── Integration: do_data_snapshot dry_run zero-write guarantee ────────
912
913    /// T1/Crux2: dry_run=true returns affects metadata without creating any
914    /// file or database state.
915    ///
916    /// Verifies the Crux "dry_run zero-write guarantee": after calling
917    /// `do_data_snapshot` with `dry_run=true`, the `_snapshots/` directory
918    /// must not exist (it was not present before the call).
919    #[tokio::test]
920    async fn test_do_data_snapshot_dry_run_zero_write() {
921        use crate::config::Config;
922        use crate::registry::{TableEntry, TableRegistry};
923        use crate::schema::{FieldDef, FieldType, SchemaConfig};
924        use crate::store::Store;
925        use arc_swap::ArcSwap;
926        use std::collections::HashMap;
927
928        let dir = TempDir::new().expect("temp dir");
929        let table_name = "items";
930
931        // Create multi-table layout: scope_root/{table}/schema.yaml
932        // scope_root = dir.path()
933        // table_dir  = dir.path()/{table}/
934        // schema_path = dir.path()/{table}/schema.yaml
935        // db_path     = dir.path()/{table}/{table}.db
936        let table_dir = dir.path().join(table_name);
937        std::fs::create_dir_all(&table_dir).expect("create table dir");
938
939        let schema_path = table_dir.join("schema.yaml");
940        std::fs::write(
941            &schema_path,
942            "table: items\nfields:\n  - name: title\n    type: string\n    required: true\n",
943        )
944        .expect("write schema.yaml");
945
946        let db_path = table_dir.join(format!("{}.db", table_name));
947        // SAFETY: Connection::open and execute_batch are safe in test context.
948        let conn = Connection::open(&db_path).expect("open test db");
949        conn.execute_batch(
950            "PRAGMA journal_mode=WAL; \
951             CREATE TABLE IF NOT EXISTS rows (id TEXT PRIMARY KEY, data TEXT, created_at TEXT, updated_at TEXT);",
952        )
953        .expect("setup test db");
954        drop(conn);
955
956        // Build Store and TableRegistry in multi-table mode (default_table = None).
957        let schema = SchemaConfig {
958            table: table_name.to_string(),
959            title: None,
960            description: None,
961            fields: vec![FieldDef {
962                name: "title".to_string(),
963                ty: FieldType::String,
964                required: true,
965                description: None,
966            }],
967            dump: None,
968        };
969        let store = Store::open(&db_path, schema.clone())
970            .await
971            .expect("open store");
972
973        let entry = TableEntry {
974            store: Arc::new(store),
975            schema: Arc::new(schema),
976            schema_path: Arc::new(schema_path),
977        };
978        let mut entries = HashMap::new();
979        entries.insert(table_name.to_string(), entry);
980        // Multi-table mode: default_table = None — scope_root is 2 levels up from schema.yaml
981        let registry = TableRegistry::from_entries(entries, None);
982        let tables: Arc<ArcSwap<TableRegistry>> = Arc::new(ArcSwap::from_pointee(registry));
983
984        // Config: project_dir points to scope_root (dir.path()).
985        let config = Config {
986            schema_path: None,
987            db_path: None,
988            user_dir: None,
989            project_dir: Some(dir.path().to_path_buf()),
990            backup_retention: None,
991            snapshot_retention: None,
992        };
993
994        // _snapshots/ must NOT exist before the dry_run call.
995        // In multi-table mode scope_root = dir.path(), so _snapshots is at dir.path()/_snapshots/.
996        let snapshots_dir = dir.path().join("_snapshots");
997        assert!(
998            !snapshots_dir.exists(),
999            "_snapshots must not exist before dry_run call"
1000        );
1001
1002        let params = DataSnapshotParams {
1003            table: None,
1004            scope: None,
1005            dry_run: Some(true),
1006        };
1007
1008        let result = do_data_snapshot(&config, &tables, params)
1009            .await
1010            .expect("do_data_snapshot dry_run must succeed");
1011
1012        // _snapshots/ must STILL not exist — zero-write guarantee (Crux 2).
1013        assert!(
1014            !snapshots_dir.exists(),
1015            "_snapshots must not be created by dry_run=true (Crux: zero-write guarantee)"
1016        );
1017
1018        // Response must carry dry_run: true and affects.target_tables.
1019        // SAFETY: serde_json::from_str is safe to unwrap in test context.
1020        let json: serde_json::Value =
1021            serde_json::from_str(&result).expect("result must be valid JSON");
1022        assert_eq!(
1023            json["dry_run"],
1024            serde_json::Value::Bool(true),
1025            "response must contain dry_run: true"
1026        );
1027        let target_tables = json["affects"]["target_tables"]
1028            .as_array()
1029            .expect("affects.target_tables must be an array");
1030        assert_eq!(
1031            target_tables.len(),
1032            1,
1033            "exactly one table should be in target_tables"
1034        );
1035        assert_eq!(
1036            target_tables[0],
1037            serde_json::Value::String(table_name.to_string()),
1038            "target table must be 'items'"
1039        );
1040
1041        // row_counts and would_purge_generations must be present.
1042        assert!(
1043            json["affects"]["row_counts"].is_object(),
1044            "row_counts must be an object"
1045        );
1046        assert!(
1047            json["affects"]["would_purge_generations"].is_object(),
1048            "would_purge_generations must be an object"
1049        );
1050    }
1051}