1use std::collections::HashMap;
31use std::path::{Path, PathBuf};
32use std::sync::Arc;
33use std::time::{SystemTime, UNIX_EPOCH};
34
35use arc_swap::ArcSwap;
36use rusqlite::Connection;
37use schemars::JsonSchema;
38use serde::Deserialize;
39
40use crate::config::Config;
41use crate::error::MiniAppError;
42use crate::registry::TableRegistry;
43
44pub async fn write_snapshot_db(
80 scope_dir: &Path,
81 table: &str,
82 db_path: &Path,
83) -> Result<(), MiniAppError> {
84 let scope_dir = scope_dir.to_path_buf();
85 let table = table.to_string();
86 let db_path = db_path.to_path_buf();
87
88 tokio::task::spawn_blocking(move || -> Result<(), MiniAppError> {
89 write_snapshot_db_sync(&scope_dir, &table, &db_path)
90 })
91 .await
92 .map_err(|e| MiniAppError::Snapshot(format!("blocking task panic: {e}")))?
93}
94
95fn write_snapshot_db_sync(
98 scope_dir: &Path,
99 table: &str,
100 db_path: &Path,
101) -> Result<(), MiniAppError> {
102 let unix_secs = SystemTime::now()
104 .duration_since(UNIX_EPOCH)
105 .map_err(|e| MiniAppError::Snapshot(format!("system clock error: {e}")))?
106 .as_secs();
107
108 let snapshot_dir = scope_dir.join("_snapshots");
109 std::fs::create_dir_all(&snapshot_dir)
110 .map_err(|e| MiniAppError::Snapshot(format!("cannot create snapshot dir: {e}")))?;
111
112 let src_conn = Connection::open(db_path)
115 .map_err(|e| MiniAppError::Snapshot(format!("cannot open source db: {e}")))?;
116
117 if let Err(e) = src_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE)") {
119 tracing::warn!(error = %e, "WAL checkpoint before snapshot failed; continuing anyway");
120 }
121
122 let db_dst = snapshot_dir.join(format!("{}.{}.db", table, unix_secs));
123 src_conn
125 .backup(rusqlite::DatabaseName::Main, &db_dst, None)
126 .map_err(|e| MiniAppError::Snapshot(format!("rusqlite backup failed: {e}")))?;
127
128 Ok(())
129}
130
131pub async fn purge_old_snapshots(
158 scope_dir: &Path,
159 table: &str,
160 retention: usize,
161) -> Result<(), MiniAppError> {
162 let scope_dir = scope_dir.to_path_buf();
163 let table = table.to_string();
164
165 tokio::task::spawn_blocking(move || -> Result<(), MiniAppError> {
166 purge_old_snapshots_sync(&scope_dir, &table, retention)
167 })
168 .await
169 .map_err(|e| MiniAppError::Snapshot(format!("blocking task panic: {e}")))?
170}
171
172fn purge_old_snapshots_sync(
175 scope_dir: &Path,
176 table: &str,
177 retention: usize,
178) -> Result<(), MiniAppError> {
179 let snapshot_dir = scope_dir.join("_snapshots");
180
181 if !snapshot_dir.exists() {
183 return Ok(());
184 }
185
186 let entries = std::fs::read_dir(&snapshot_dir)
188 .map_err(|e| MiniAppError::Snapshot(format!("cannot read snapshot dir: {e}")))?;
189
190 let mut timestamps: Vec<u64> = entries
191 .filter_map(|entry| {
192 let entry = entry.ok()?;
193 let name = entry.file_name();
194 let name = name.to_string_lossy();
195 parse_snapshot_timestamp(&name, table, "db")
196 })
197 .collect();
198
199 timestamps.sort_unstable_by(|a, b| b.cmp(a));
201
202 for ts in timestamps.iter().skip(retention) {
204 let db_path = snapshot_dir.join(format!("{}.{}.db", table, ts));
205
206 if let Err(e) = std::fs::remove_file(&db_path) {
207 tracing::warn!(
208 path = %db_path.display(),
209 error = %e,
210 "failed to remove old snapshot db; continuing"
211 );
212 }
213 }
214
215 Ok(())
216}
217
218pub(crate) fn parse_snapshot_timestamp(filename: &str, table: &str, ext: &str) -> Option<u64> {
229 let prefix = format!("{}.", table);
231 let suffix = format!(".{}", ext);
232
233 let without_prefix = filename.strip_prefix(&prefix)?;
234 let ts_str = without_prefix.strip_suffix(&suffix)?;
235 ts_str.parse::<u64>().ok()
236}
237
238#[derive(Debug, Default, Deserialize, JsonSchema)]
248#[serde(default)]
249pub struct DataSnapshotParams {
250 pub table: Option<String>,
253 pub scope: Option<String>,
256 pub dry_run: Option<bool>,
260}
261
262struct SnapshotTarget {
268 table_name: String,
269 scope_root: PathBuf,
270 db_path: PathBuf,
271 store: Arc<crate::store::Store>,
272}
273
274pub async fn do_data_snapshot(
305 config: &Config,
306 tables: &Arc<ArcSwap<TableRegistry>>,
307 params: DataSnapshotParams,
308) -> Result<String, MiniAppError> {
309 let dry_run = params.dry_run.unwrap_or(false);
310
311 let targets: Vec<SnapshotTarget> = {
314 let registry = tables.load_full();
315 resolve_targets(
316 ®istry,
317 config,
318 params.table.as_deref(),
319 params.scope.as_deref(),
320 )?
321 };
322
323 if dry_run {
324 let mut target_tables: Vec<String> = targets.iter().map(|t| t.table_name.clone()).collect();
326 target_tables.sort();
327
328 let mut row_counts: HashMap<String, u64> = HashMap::new();
329 let mut would_purge: HashMap<String, usize> = HashMap::new();
330
331 for target in &targets {
332 let count = target.store.row_count().await.map_err(|e| {
334 MiniAppError::Snapshot(format!(
335 "row_count failed for table '{}': {e}",
336 target.table_name
337 ))
338 })?;
339 row_counts.insert(target.table_name.clone(), count);
340
341 let purge_count = count_would_purge(
344 &target.scope_root,
345 &target.table_name,
346 config.snapshot_retention(),
347 );
348 would_purge.insert(target.table_name.clone(), purge_count);
349 }
350
351 let result = serde_json::json!({
352 "dry_run": true,
353 "affects": {
354 "target_tables": target_tables,
355 "row_counts": row_counts,
356 "would_purge_generations": would_purge,
357 }
358 });
359 return serde_json::to_string(&result)
360 .map_err(|e| MiniAppError::Snapshot(format!("json serialization error: {e}")));
361 }
362
363 let mut snapshotted: Vec<serde_json::Value> = Vec::new();
365 let mut purged: Vec<serde_json::Value> = Vec::new();
366
367 let retention = config.snapshot_retention();
368
369 for target in &targets {
370 write_snapshot_db(&target.scope_root, &target.table_name, &target.db_path).await?;
372
373 let snapshot_path = newest_snapshot_path(&target.scope_root, &target.table_name);
375 let unix_secs = snapshot_path.as_ref().and_then(|p| {
376 p.file_name()
377 .and_then(|n| n.to_str())
378 .and_then(|n| parse_snapshot_timestamp(n, &target.table_name, "db"))
379 });
380
381 let scope_label = scope_label_for(&target.scope_root, config);
382 snapshotted.push(serde_json::json!({
383 "table": target.table_name,
384 "scope": scope_label,
385 "snapshot_path": snapshot_path.as_ref().map(|p| p.display().to_string()).unwrap_or_default(),
386 "unix_secs": unix_secs,
387 }));
388
389 let snapshot_dir = target.scope_root.join("_snapshots");
392 let before_count = count_snapshots_in_dir(&snapshot_dir, &target.table_name);
393 purge_old_snapshots(&target.scope_root, &target.table_name, retention).await?;
394 let after_count = count_snapshots_in_dir(&snapshot_dir, &target.table_name);
395 let removed = before_count.saturating_sub(after_count);
396
397 if removed > 0 {
398 purged.push(serde_json::json!({
399 "table": target.table_name,
400 "generations_removed": removed,
401 }));
402 }
403 }
404
405 let result = serde_json::json!({
406 "snapshotted": snapshotted,
407 "purged": purged,
408 });
409 serde_json::to_string(&result)
410 .map_err(|e| MiniAppError::Snapshot(format!("json serialization error: {e}")))
411}
412
413fn resolve_targets(
431 registry: &TableRegistry,
432 config: &Config,
433 table: Option<&str>,
434 scope: Option<&str>,
435) -> Result<Vec<SnapshotTarget>, MiniAppError> {
436 let is_legacy = registry.default_table().is_some();
437
438 if let Some(table_name) = table {
439 let entry = registry.resolve(Some(table_name))?;
441 let scope_root = derive_scope_root(&entry.schema_path, is_legacy)?;
442 let db_path = entry
443 .schema_path
444 .parent()
445 .ok_or_else(|| MiniAppError::Snapshot("schema_path has no parent dir".into()))?
446 .join(format!("{}.db", table_name));
447
448 if let Some(scope_str) = scope {
450 let expected_dir = resolve_scope_dir(config, scope_str)?;
451 if let Some(expected) = expected_dir {
452 if !scope_root.starts_with(&expected) {
453 return Ok(Vec::new()); }
455 }
456 }
457
458 return Ok(vec![SnapshotTarget {
459 table_name: table_name.to_string(),
460 scope_root,
461 db_path,
462 store: Arc::clone(&entry.store),
463 }]);
464 }
465
466 let scope_filter: Option<PathBuf> = match scope {
468 Some(s) => resolve_scope_dir(config, s)?,
469 None => None,
470 };
471
472 let mut targets: Vec<SnapshotTarget> = registry
473 .entries()
474 .iter()
475 .filter_map(|(name, entry)| {
476 let scope_root = derive_scope_root(&entry.schema_path, is_legacy).ok()?;
477 if let Some(ref expected) = scope_filter {
479 if !scope_root.starts_with(expected) {
480 return None;
481 }
482 }
483 let db_path = entry.schema_path.parent()?.join(format!("{}.db", name));
484 Some(SnapshotTarget {
485 table_name: name.clone(),
486 scope_root,
487 db_path,
488 store: Arc::clone(&entry.store),
489 })
490 })
491 .collect();
492
493 targets.sort_by(|a, b| a.table_name.cmp(&b.table_name));
495 Ok(targets)
496}
497
498fn derive_scope_root(schema_path: &Path, is_legacy: bool) -> Result<PathBuf, MiniAppError> {
511 if is_legacy {
512 schema_path
513 .parent()
514 .map(|p| p.to_path_buf())
515 .ok_or_else(|| MiniAppError::Snapshot("schema_path has no parent dir".into()))
516 } else {
517 schema_path
518 .parent()
519 .and_then(|p| p.parent())
520 .map(|p| p.to_path_buf())
521 .ok_or_else(|| MiniAppError::Snapshot("schema_path has no grandparent dir".into()))
522 }
523}
524
525fn resolve_scope_dir(config: &Config, scope: &str) -> Result<Option<PathBuf>, MiniAppError> {
532 match scope {
533 "project" => Ok(config.project_dir.as_deref().map(|p| p.to_path_buf())),
534 "user" => Ok(config.user_dir.as_deref().map(|p| p.to_path_buf())),
535 other => Err(MiniAppError::Snapshot(format!(
536 "unrecognised scope '{other}': expected 'project' or 'user'"
537 ))),
538 }
539}
540
541fn scope_label_for(scope_root: &Path, config: &Config) -> &'static str {
544 if let Some(pd) = config.project_dir.as_deref() {
545 if scope_root.starts_with(pd) {
546 return "project";
547 }
548 }
549 if let Some(ud) = config.user_dir.as_deref() {
550 if scope_root.starts_with(ud) {
551 return "user";
552 }
553 }
554 "unknown"
555}
556
557fn count_would_purge(scope_root: &Path, table: &str, retention: usize) -> usize {
565 let snapshot_dir = scope_root.join("_snapshots");
566 if !snapshot_dir.exists() {
567 return 0;
568 }
569 let Ok(entries) = std::fs::read_dir(&snapshot_dir) else {
570 return 0;
571 };
572 let count = entries
573 .filter_map(|e| {
574 let e = e.ok()?;
575 let name = e.file_name();
576 parse_snapshot_timestamp(&name.to_string_lossy(), table, "db").map(|_| ())
577 })
578 .count();
579 count.saturating_sub(retention)
580}
581
582fn count_snapshots_in_dir(snapshot_dir: &Path, table: &str) -> usize {
584 if !snapshot_dir.exists() {
585 return 0;
586 }
587 let Ok(entries) = std::fs::read_dir(snapshot_dir) else {
588 return 0;
589 };
590 entries
591 .filter_map(|e| {
592 let e = e.ok()?;
593 let name = e.file_name();
594 parse_snapshot_timestamp(&name.to_string_lossy(), table, "db").map(|_| ())
595 })
596 .count()
597}
598
599fn newest_snapshot_path(scope_root: &Path, table: &str) -> Option<PathBuf> {
602 let snapshot_dir = scope_root.join("_snapshots");
603 let entries = std::fs::read_dir(&snapshot_dir).ok()?;
604 let mut best: Option<(u64, PathBuf)> = None;
605 for entry in entries.flatten() {
606 let name = entry.file_name();
607 let name_str = name.to_string_lossy();
608 if let Some(ts) = parse_snapshot_timestamp(&name_str, table, "db") {
609 if best.as_ref().is_none_or(|(best_ts, _)| ts > *best_ts) {
610 best = Some((ts, entry.path()));
611 }
612 }
613 }
614 best.map(|(_, path)| path)
615}
616
617#[cfg(test)]
630fn list_snapshot_timestamps(snapshot_dir: &Path, table: &str) -> Result<Vec<u64>, MiniAppError> {
631 let entries = std::fs::read_dir(snapshot_dir)
632 .map_err(|e| MiniAppError::Snapshot(format!("cannot read snapshot dir: {e}")))?;
633
634 let mut timestamps: Vec<u64> = entries
635 .filter_map(|entry| {
636 let entry = entry.ok()?;
637 let name = entry.file_name();
638 let name = name.to_string_lossy().to_string();
639 parse_snapshot_timestamp(&name, table, "db")
640 })
641 .collect();
642
643 timestamps.sort_unstable_by(|a, b| b.cmp(a));
644 Ok(timestamps)
645}
646
647#[cfg(test)]
648mod tests {
649 use super::*;
650 use rusqlite::Connection;
651 use std::path::PathBuf;
652 use tempfile::TempDir;
653 use tokio::task;
654
655 fn create_test_db(path: &Path) {
657 let conn = Connection::open(path).expect("open test db");
660 conn.execute_batch(
661 "PRAGMA journal_mode=WAL; CREATE TABLE IF NOT EXISTS t (id INTEGER PRIMARY KEY, v TEXT);",
662 )
663 .expect("setup test db");
664 }
665
666 #[tokio::test]
671 async fn write_snapshot_db_creates_db_file_only() {
672 let dir = TempDir::new().expect("temp dir");
673 let scope_dir = dir.path();
674 let db_path = scope_dir.join("items.db");
675
676 create_test_db(&db_path);
677
678 write_snapshot_db(scope_dir, "items", &db_path)
679 .await
680 .expect("write_snapshot_db must succeed");
681
682 let snapshot_dir = scope_dir.join("_snapshots");
683 assert!(snapshot_dir.exists(), "_snapshots dir must be created");
684
685 let entries: Vec<_> = std::fs::read_dir(&snapshot_dir)
686 .expect("read snapshot dir")
687 .filter_map(|e| e.ok())
688 .collect();
689
690 let yaml_count = entries
691 .iter()
692 .filter(|e| e.file_name().to_string_lossy().ends_with(".yaml"))
693 .count();
694 let db_count = entries
695 .iter()
696 .filter(|e| e.file_name().to_string_lossy().ends_with(".db"))
697 .count();
698
699 assert_eq!(yaml_count, 0, "snapshot must NOT create any yaml file");
700 assert_eq!(db_count, 1, "exactly one db snapshot must exist");
701 }
702
703 #[tokio::test]
705 async fn purge_old_snapshots_keeps_n_newest() {
706 let dir = TempDir::new().expect("temp dir");
707 let scope_dir = dir.path();
708 let snapshot_dir = scope_dir.join("_snapshots");
709 std::fs::create_dir_all(&snapshot_dir).expect("create snapshot dir");
710
711 for ts in [100u64, 200, 300, 400, 500] {
713 std::fs::write(snapshot_dir.join(format!("items.{}.db", ts)), b"db").expect("write db");
714 }
715
716 purge_old_snapshots(scope_dir, "items", 3)
717 .await
718 .expect("purge must succeed");
719
720 let timestamps = list_snapshot_timestamps(&snapshot_dir, "items").expect("list timestamps");
722 assert_eq!(timestamps.len(), 3, "exactly 3 snapshots must remain");
723 assert_eq!(timestamps, vec![500, 400, 300], "newest 3 must be kept");
724
725 assert!(!snapshot_dir.join("items.100.db").exists());
727 assert!(!snapshot_dir.join("items.200.db").exists());
728 }
729
730 #[tokio::test]
734 async fn purge_old_snapshots_no_op_when_below_limit() {
735 let dir = TempDir::new().expect("temp dir");
736 let scope_dir = dir.path();
737 let snapshot_dir = scope_dir.join("_snapshots");
738 std::fs::create_dir_all(&snapshot_dir).expect("create snapshot dir");
739
740 for ts in [100u64, 200] {
742 std::fs::write(snapshot_dir.join(format!("items.{}.db", ts)), b"db").expect("write db");
743 }
744
745 purge_old_snapshots(scope_dir, "items", 10)
746 .await
747 .expect("purge must succeed");
748
749 let timestamps = list_snapshot_timestamps(&snapshot_dir, "items").expect("list timestamps");
750 assert_eq!(timestamps.len(), 2, "both snapshots must still exist");
751 }
752
753 #[tokio::test]
756 async fn purge_old_snapshots_no_op_when_dir_missing() {
757 let dir = TempDir::new().expect("temp dir");
758 let scope_dir = dir.path();
759 let result = purge_old_snapshots(scope_dir, "items", 10).await;
762 assert!(result.is_ok(), "purge must succeed when dir is missing");
763
764 assert!(!scope_dir.join("_snapshots").exists());
766 }
767
768 #[tokio::test]
772 async fn write_snapshot_db_missing_db_returns_snapshot_variant() {
773 let dir = TempDir::new().expect("temp dir");
774 let scope_dir = dir.path();
775
776 let result =
778 write_snapshot_db(scope_dir, "items", Path::new("/nonexistent/items.db")).await;
779
780 let err = result.expect_err("missing db file must error");
781 assert!(
782 matches!(err, MiniAppError::Snapshot(_)),
783 "expected Snapshot variant, got {:?}",
784 err
785 );
786 }
787
788 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
797 async fn test_snapshot_does_not_block_concurrent_writes() {
798 let dir = TempDir::new().expect("temp dir");
799 let db_path = dir.path().join("concurrent.db");
800
801 {
803 let conn = Connection::open(&db_path).expect("open db");
804 conn.execute_batch(
805 "PRAGMA journal_mode=WAL; CREATE TABLE rows (id INTEGER PRIMARY KEY, val TEXT);",
806 )
807 .expect("setup db");
808 }
809
810 let db_path_writer = db_path.clone();
811 let scope_dir = dir.path().to_path_buf();
812
813 let writer = task::spawn(async move {
815 task::spawn_blocking(move || {
816 let conn = Connection::open(&db_path_writer).expect("open writer db");
817 for i in 0i64..100 {
818 conn.execute("INSERT INTO rows (val) VALUES (?1)", [format!("v{}", i)])
819 .expect("insert row");
820 }
821 })
822 .await
823 .expect("writer blocking task")
824 });
825
826 let snapshot_task = write_snapshot_db(&scope_dir, "concurrent", &db_path);
828
829 let (writer_result, snapshot_result) = tokio::join!(writer, snapshot_task);
830
831 writer_result.expect("writer must succeed");
832 snapshot_result.expect("snapshot must succeed");
833
834 let snapshot_dir = scope_dir.join("_snapshots");
836 let snapshot_entries: Vec<PathBuf> = std::fs::read_dir(&snapshot_dir)
837 .expect("read snapshot dir")
838 .filter_map(|e| e.ok())
839 .map(|e| e.path())
840 .filter(|p| {
841 p.extension()
842 .and_then(|x| x.to_str())
843 .map(|x| x == "db")
844 .unwrap_or(false)
845 })
846 .collect();
847 assert!(
848 !snapshot_entries.is_empty(),
849 "at least one db snapshot must exist"
850 );
851
852 let snap_conn = Connection::open(&snapshot_entries[0]).expect("open snapshot db");
854 let snap_row_count: i64 = snap_conn
855 .query_row("SELECT COUNT(*) FROM rows", [], |row| row.get(0))
856 .unwrap_or(0);
857 assert!(snap_row_count >= 0, "snapshot db must be a valid sqlite db");
859 }
860
861 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
871 async fn test_spawn_blocking_cancel_safety_snapshot_survives() {
872 let dir = TempDir::new().expect("temp dir");
873 let scope_dir = dir.path().to_path_buf();
874 let db_path = scope_dir.join("cancel_test.db");
875
876 {
877 let conn = Connection::open(&db_path).expect("open db");
878 conn.execute_batch(
879 "PRAGMA journal_mode=WAL; CREATE TABLE rows (id INTEGER PRIMARY KEY, val TEXT);",
880 )
881 .expect("setup db");
882 }
883
884 let snapshot_fut = write_snapshot_db(&scope_dir, "cancel_test", &db_path);
887 let result = tokio::time::timeout(std::time::Duration::from_millis(1), snapshot_fut).await;
888
889 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
892
893 let src_conn = Connection::open(&db_path).expect("source db must still be openable");
896 let _count: i64 = src_conn
897 .query_row("SELECT COUNT(*) FROM rows", [], |row| row.get(0))
898 .expect("source db must be a valid sqlite db after cancellation");
899
900 if let Ok(Ok(())) = result {
902 let snapshot_dir = scope_dir.join("_snapshots");
903 assert!(
904 snapshot_dir.exists(),
905 "snapshot dir must exist on successful write"
906 );
907 }
908 }
910
911 #[tokio::test]
920 async fn test_do_data_snapshot_dry_run_zero_write() {
921 use crate::config::Config;
922 use crate::registry::{TableEntry, TableRegistry};
923 use crate::schema::{FieldDef, FieldType, SchemaConfig};
924 use crate::store::Store;
925 use arc_swap::ArcSwap;
926 use std::collections::HashMap;
927
928 let dir = TempDir::new().expect("temp dir");
929 let table_name = "items";
930
931 let table_dir = dir.path().join(table_name);
937 std::fs::create_dir_all(&table_dir).expect("create table dir");
938
939 let schema_path = table_dir.join("schema.yaml");
940 std::fs::write(
941 &schema_path,
942 "table: items\nfields:\n - name: title\n type: string\n required: true\n",
943 )
944 .expect("write schema.yaml");
945
946 let db_path = table_dir.join(format!("{}.db", table_name));
947 let conn = Connection::open(&db_path).expect("open test db");
949 conn.execute_batch(
950 "PRAGMA journal_mode=WAL; \
951 CREATE TABLE IF NOT EXISTS rows (id TEXT PRIMARY KEY, data TEXT, created_at TEXT, updated_at TEXT);",
952 )
953 .expect("setup test db");
954 drop(conn);
955
956 let schema = SchemaConfig {
958 table: table_name.to_string(),
959 title: None,
960 description: None,
961 fields: vec![FieldDef {
962 name: "title".to_string(),
963 ty: FieldType::String,
964 required: true,
965 description: None,
966 }],
967 dump: None,
968 };
969 let store = Store::open(&db_path, schema.clone())
970 .await
971 .expect("open store");
972
973 let entry = TableEntry {
974 store: Arc::new(store),
975 schema: Arc::new(schema),
976 schema_path: Arc::new(schema_path),
977 };
978 let mut entries = HashMap::new();
979 entries.insert(table_name.to_string(), entry);
980 let registry = TableRegistry::from_entries(entries, None);
982 let tables: Arc<ArcSwap<TableRegistry>> = Arc::new(ArcSwap::from_pointee(registry));
983
984 let config = Config {
986 schema_path: None,
987 db_path: None,
988 user_dir: None,
989 project_dir: Some(dir.path().to_path_buf()),
990 backup_retention: None,
991 snapshot_retention: None,
992 };
993
994 let snapshots_dir = dir.path().join("_snapshots");
997 assert!(
998 !snapshots_dir.exists(),
999 "_snapshots must not exist before dry_run call"
1000 );
1001
1002 let params = DataSnapshotParams {
1003 table: None,
1004 scope: None,
1005 dry_run: Some(true),
1006 };
1007
1008 let result = do_data_snapshot(&config, &tables, params)
1009 .await
1010 .expect("do_data_snapshot dry_run must succeed");
1011
1012 assert!(
1014 !snapshots_dir.exists(),
1015 "_snapshots must not be created by dry_run=true (Crux: zero-write guarantee)"
1016 );
1017
1018 let json: serde_json::Value =
1021 serde_json::from_str(&result).expect("result must be valid JSON");
1022 assert_eq!(
1023 json["dry_run"],
1024 serde_json::Value::Bool(true),
1025 "response must contain dry_run: true"
1026 );
1027 let target_tables = json["affects"]["target_tables"]
1028 .as_array()
1029 .expect("affects.target_tables must be an array");
1030 assert_eq!(
1031 target_tables.len(),
1032 1,
1033 "exactly one table should be in target_tables"
1034 );
1035 assert_eq!(
1036 target_tables[0],
1037 serde_json::Value::String(table_name.to_string()),
1038 "target table must be 'items'"
1039 );
1040
1041 assert!(
1043 json["affects"]["row_counts"].is_object(),
1044 "row_counts must be an object"
1045 );
1046 assert!(
1047 json["affects"]["would_purge_generations"].is_object(),
1048 "would_purge_generations must be an object"
1049 );
1050 }
1051}