1use std::path::{Path, PathBuf};
2
3use rusqlite::{Connection, params};
4use serde::Serialize;
5
6use crate::errors::{Result, StateError};
7
8const MIGRATIONS: &[&str] = &[
9 "CREATE TABLE IF NOT EXISTS applies (
10 id INTEGER PRIMARY KEY AUTOINCREMENT,
11 timestamp TEXT NOT NULL,
12 profile TEXT NOT NULL,
13 plan_hash TEXT NOT NULL,
14 status TEXT NOT NULL,
15 summary TEXT
16 );
17
18 CREATE TABLE IF NOT EXISTS drift_events (
19 id INTEGER PRIMARY KEY AUTOINCREMENT,
20 timestamp TEXT NOT NULL,
21 resource_type TEXT NOT NULL,
22 resource_id TEXT NOT NULL,
23 expected TEXT,
24 actual TEXT,
25 source TEXT NOT NULL DEFAULT 'local',
26 resolved_by INTEGER,
27 FOREIGN KEY (resolved_by) REFERENCES applies(id)
28 );
29
30 CREATE TABLE IF NOT EXISTS managed_resources (
31 id INTEGER PRIMARY KEY AUTOINCREMENT,
32 resource_type TEXT NOT NULL,
33 resource_id TEXT NOT NULL,
34 source TEXT NOT NULL DEFAULT 'local',
35 last_hash TEXT,
36 last_applied INTEGER,
37 UNIQUE(resource_type, resource_id),
38 FOREIGN KEY (last_applied) REFERENCES applies(id)
39 );
40
41 CREATE TABLE IF NOT EXISTS config_sources (
42 id INTEGER PRIMARY KEY AUTOINCREMENT,
43 name TEXT NOT NULL UNIQUE,
44 origin_url TEXT NOT NULL,
45 origin_branch TEXT NOT NULL DEFAULT 'main',
46 last_fetched TEXT,
47 last_commit TEXT,
48 source_version TEXT,
49 pinned_version TEXT,
50 status TEXT NOT NULL DEFAULT 'active'
51 );
52
53 CREATE TABLE IF NOT EXISTS source_applies (
54 id INTEGER PRIMARY KEY AUTOINCREMENT,
55 source_id INTEGER NOT NULL,
56 apply_id INTEGER NOT NULL,
57 source_commit TEXT NOT NULL,
58 FOREIGN KEY (source_id) REFERENCES config_sources(id),
59 FOREIGN KEY (apply_id) REFERENCES applies(id)
60 );
61
62 CREATE TABLE IF NOT EXISTS source_conflicts (
63 id INTEGER PRIMARY KEY AUTOINCREMENT,
64 timestamp TEXT NOT NULL,
65 source_name TEXT NOT NULL,
66 resource_type TEXT NOT NULL,
67 resource_id TEXT NOT NULL,
68 resolution TEXT NOT NULL,
69 detail TEXT
70 );
71
72 CREATE TABLE IF NOT EXISTS pending_decisions (
73 id INTEGER PRIMARY KEY AUTOINCREMENT,
74 source TEXT NOT NULL,
75 resource TEXT NOT NULL,
76 tier TEXT NOT NULL,
77 action TEXT NOT NULL,
78 summary TEXT NOT NULL,
79 created_at TEXT NOT NULL,
80 resolved_at TEXT,
81 resolution TEXT
82 );
83
84 CREATE UNIQUE INDEX IF NOT EXISTS idx_pending_decisions_source_resource
85 ON pending_decisions (source, resource)
86 WHERE resolved_at IS NULL;
87
88 CREATE TABLE IF NOT EXISTS source_config_hashes (
89 source TEXT PRIMARY KEY,
90 config_hash TEXT NOT NULL,
91 merged_at TEXT NOT NULL
92 );
93
94 CREATE TABLE IF NOT EXISTS module_state (
95 id INTEGER PRIMARY KEY AUTOINCREMENT,
96 module_name TEXT NOT NULL UNIQUE,
97 installed_at TEXT NOT NULL,
98 last_applied INTEGER,
99 packages_hash TEXT NOT NULL,
100 files_hash TEXT NOT NULL,
101 git_sources TEXT,
102 status TEXT NOT NULL DEFAULT 'installed',
103 FOREIGN KEY (last_applied) REFERENCES applies(id)
104 );
105
106 CREATE TABLE IF NOT EXISTS schema_version (
107 version INTEGER NOT NULL
108 );
109
110 INSERT INTO schema_version (version) VALUES (0);",
111 "CREATE TABLE IF NOT EXISTS file_backups (
113 id INTEGER PRIMARY KEY AUTOINCREMENT,
114 apply_id INTEGER NOT NULL,
115 file_path TEXT NOT NULL,
116 content_hash TEXT NOT NULL,
117 content BLOB NOT NULL,
118 permissions INTEGER,
119 was_symlink INTEGER NOT NULL DEFAULT 0,
120 symlink_target TEXT,
121 oversized INTEGER NOT NULL DEFAULT 0,
122 backed_up_at TEXT NOT NULL,
123 FOREIGN KEY (apply_id) REFERENCES applies(id)
124 );
125
126 CREATE INDEX IF NOT EXISTS idx_file_backups_apply ON file_backups (apply_id);
127 CREATE INDEX IF NOT EXISTS idx_file_backups_path ON file_backups (file_path);
128
129 CREATE TABLE IF NOT EXISTS apply_journal (
130 id INTEGER PRIMARY KEY AUTOINCREMENT,
131 apply_id INTEGER NOT NULL,
132 action_index INTEGER NOT NULL,
133 phase TEXT NOT NULL,
134 action_type TEXT NOT NULL,
135 resource_id TEXT NOT NULL,
136 pre_state TEXT,
137 post_state TEXT,
138 status TEXT NOT NULL DEFAULT 'pending',
139 error TEXT,
140 started_at TEXT NOT NULL,
141 completed_at TEXT,
142 FOREIGN KEY (apply_id) REFERENCES applies(id)
143 );
144
145 CREATE INDEX IF NOT EXISTS idx_apply_journal_apply ON apply_journal (apply_id);
146
147 CREATE TABLE IF NOT EXISTS module_file_manifest (
148 id INTEGER PRIMARY KEY AUTOINCREMENT,
149 module_name TEXT NOT NULL,
150 file_path TEXT NOT NULL,
151 content_hash TEXT NOT NULL,
152 strategy TEXT NOT NULL,
153 last_applied INTEGER,
154 UNIQUE(module_name, file_path),
155 FOREIGN KEY (last_applied) REFERENCES applies(id)
156 );
157
158 CREATE INDEX IF NOT EXISTS idx_module_file_manifest_module ON module_file_manifest (module_name);",
159 "ALTER TABLE apply_journal ADD COLUMN script_output TEXT;",
161 "CREATE TABLE IF NOT EXISTS compliance_snapshots (
163 id INTEGER PRIMARY KEY AUTOINCREMENT,
164 timestamp TEXT NOT NULL,
165 content_hash TEXT NOT NULL,
166 snapshot_json TEXT NOT NULL,
167 summary_compliant INTEGER NOT NULL,
168 summary_warning INTEGER NOT NULL,
169 summary_violation INTEGER NOT NULL
170 );",
171];
172
173#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
175pub enum ApplyStatus {
176 Success,
178 Partial,
180 Failed,
182 InProgress,
184}
185
186impl ApplyStatus {
187 fn as_str(&self) -> &str {
188 match self {
189 ApplyStatus::Success => "success",
190 ApplyStatus::Partial => "partial",
191 ApplyStatus::Failed => "failed",
192 ApplyStatus::InProgress => "in_progress",
193 }
194 }
195
196 fn from_str(s: &str) -> Self {
197 match s {
198 "success" => ApplyStatus::Success,
199 "partial" => ApplyStatus::Partial,
200 "in_progress" => ApplyStatus::InProgress,
201 "failed" => ApplyStatus::Failed,
202 _ => ApplyStatus::Failed,
203 }
204 }
205}
206
207#[derive(Debug, Clone, Serialize)]
209pub struct ApplyRecord {
210 pub id: i64,
211 pub timestamp: String,
212 pub profile: String,
213 pub plan_hash: String,
214 pub status: ApplyStatus,
215 pub summary: Option<String>,
216}
217
218#[derive(Debug, Clone, Serialize)]
220pub struct DriftEvent {
221 pub id: i64,
222 pub timestamp: String,
223 pub resource_type: String,
224 pub resource_id: String,
225 pub expected: Option<String>,
226 pub actual: Option<String>,
227 pub resolved_by: Option<i64>,
228 pub source: String,
229}
230
231#[derive(Debug, Clone, Serialize)]
233pub struct ManagedResource {
234 pub resource_type: String,
235 pub resource_id: String,
236 pub source: String,
237 pub last_hash: Option<String>,
238 pub last_applied: Option<i64>,
239}
240
241#[derive(Debug, Clone, Serialize)]
243pub struct ConfigSourceRecord {
244 pub id: i64,
245 pub name: String,
246 pub origin_url: String,
247 pub origin_branch: String,
248 pub last_fetched: Option<String>,
249 pub last_commit: Option<String>,
250 pub source_version: Option<String>,
251 pub pinned_version: Option<String>,
252 pub status: String,
253}
254
255#[derive(Debug, Clone)]
257pub struct SourceConflictRecord {
258 pub id: i64,
259 pub timestamp: String,
260 pub source_name: String,
261 pub resource_type: String,
262 pub resource_id: String,
263 pub resolution: String,
264 pub detail: Option<String>,
265}
266
267#[derive(Debug, Clone, Serialize)]
269pub struct PendingDecision {
270 pub id: i64,
271 pub source: String,
272 pub resource: String,
273 pub tier: String,
274 pub action: String,
275 pub summary: String,
276 pub created_at: String,
277 pub resolved_at: Option<String>,
278 pub resolution: Option<String>,
279}
280
281#[derive(Debug, Clone)]
283pub struct SourceConfigHash {
284 pub source: String,
285 pub config_hash: String,
286 pub merged_at: String,
287}
288
289#[derive(Debug, Clone, Serialize)]
291pub struct ModuleStateRecord {
292 pub module_name: String,
293 pub installed_at: String,
294 pub last_applied: Option<i64>,
295 pub packages_hash: String,
296 pub files_hash: String,
297 pub git_sources: Option<String>,
298 pub status: String,
299}
300
301#[derive(Debug, Clone)]
303pub struct FileBackupRecord {
304 pub id: i64,
305 pub apply_id: i64,
306 pub file_path: String,
307 pub content_hash: String,
308 pub content: Vec<u8>,
309 pub permissions: Option<u32>,
310 pub was_symlink: bool,
311 pub symlink_target: Option<String>,
312 pub oversized: bool,
313 pub backed_up_at: String,
314}
315
316#[derive(Debug, Clone)]
318pub struct JournalEntry {
319 pub id: i64,
320 pub apply_id: i64,
321 pub action_index: i64,
322 pub phase: String,
323 pub action_type: String,
324 pub resource_id: String,
325 pub pre_state: Option<String>,
326 pub post_state: Option<String>,
327 pub status: String,
328 pub error: Option<String>,
329 pub started_at: String,
330 pub completed_at: Option<String>,
331 pub script_output: Option<String>,
332}
333
334#[derive(Debug, Clone, Serialize)]
336pub struct ComplianceHistoryRow {
337 pub id: i64,
338 pub timestamp: String,
339 pub compliant: i64,
340 pub warning: i64,
341 pub violation: i64,
342}
343
344#[derive(Debug, Clone)]
346pub struct ModuleFileRecord {
347 pub module_name: String,
348 pub file_path: String,
349 pub content_hash: String,
350 pub strategy: String,
351 pub last_applied: Option<i64>,
352}
353
354pub struct StateStore {
356 conn: Connection,
357}
358
359impl StateStore {
360 pub fn open_default() -> Result<Self> {
363 let data_dir = default_state_dir()?;
364 std::fs::create_dir_all(&data_dir).map_err(|_| StateError::DirectoryNotWritable {
365 path: data_dir.clone(),
366 })?;
367 let db_path = data_dir.join("state.db");
368 Self::open(&db_path)
369 }
370
371 pub fn open(path: &Path) -> Result<Self> {
373 let conn = Connection::open(path)?;
374 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
375 conn.busy_timeout(std::time::Duration::from_secs(5))?;
376
377 let mut store = Self { conn };
378 store.run_migrations()?;
379 Ok(store)
380 }
381
382 pub fn open_in_memory() -> Result<Self> {
384 let conn = Connection::open_in_memory()?;
385 conn.execute_batch("PRAGMA foreign_keys=ON;")?;
386
387 let mut store = Self { conn };
388 store.run_migrations()?;
389 Ok(store)
390 }
391
392 fn run_migrations(&mut self) -> Result<()> {
393 self.conn
396 .execute_batch("BEGIN EXCLUSIVE")
397 .map_err(|e| StateError::MigrationFailed {
398 message: format!("failed to acquire migration lock: {e}"),
399 })?;
400
401 let current_version = self.schema_version();
402
403 for (i, migration) in MIGRATIONS.iter().enumerate() {
404 if i >= current_version {
405 self.conn.execute_batch(migration).map_err(|e| {
406 let _ = self.conn.execute_batch("ROLLBACK");
407 StateError::MigrationFailed {
408 message: format!("migration {}: {}", i, e),
409 }
410 })?;
411 let new_version = (i + 1) as i64;
413 self.conn
414 .execute(
415 "UPDATE schema_version SET version = ?1",
416 rusqlite::params![new_version],
417 )
418 .map_err(|e| {
419 let _ = self.conn.execute_batch("ROLLBACK");
420 StateError::MigrationFailed {
421 message: format!("migration {}: failed to update version: {}", i, e),
422 }
423 })?;
424 }
425 }
426
427 self.conn
428 .execute_batch("COMMIT")
429 .map_err(|e| StateError::MigrationFailed {
430 message: format!("failed to commit migrations: {e}"),
431 })?;
432
433 Ok(())
434 }
435
436 fn schema_version(&self) -> usize {
437 self.conn
438 .query_row("SELECT version FROM schema_version", [], |row| {
439 row.get::<_, i64>(0)
440 })
441 .map(|v| v as usize)
442 .unwrap_or(0)
443 }
444
445 pub fn record_apply(
447 &self,
448 profile: &str,
449 plan_hash: &str,
450 status: ApplyStatus,
451 summary: Option<&str>,
452 ) -> Result<i64> {
453 let timestamp = crate::utc_now_iso8601();
454 self.conn
455 .execute(
456 "INSERT INTO applies (timestamp, profile, plan_hash, status, summary) VALUES (?1, ?2, ?3, ?4, ?5)",
457 params![timestamp, profile, plan_hash, status.as_str(), summary],
458 )
459 ?;
460 Ok(self.conn.last_insert_rowid())
461 }
462
463 pub fn record_drift(
465 &self,
466 resource_type: &str,
467 resource_id: &str,
468 expected: Option<&str>,
469 actual: Option<&str>,
470 source: &str,
471 ) -> Result<i64> {
472 let timestamp = crate::utc_now_iso8601();
473 self.conn
474 .execute(
475 "INSERT INTO drift_events (timestamp, resource_type, resource_id, expected, actual, source) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
476 params![timestamp, resource_type, resource_id, expected, actual, source],
477 )
478 ?;
479 Ok(self.conn.last_insert_rowid())
480 }
481
482 pub fn resolve_drift(
484 &self,
485 apply_id: i64,
486 resource_type: &str,
487 resource_id: &str,
488 ) -> Result<()> {
489 self.conn
490 .execute(
491 "UPDATE drift_events SET resolved_by = ?1 WHERE resource_type = ?2 AND resource_id = ?3 AND resolved_by IS NULL",
492 params![apply_id, resource_type, resource_id],
493 )
494 ?;
495 Ok(())
496 }
497
498 pub fn upsert_managed_resource(
500 &self,
501 resource_type: &str,
502 resource_id: &str,
503 source: &str,
504 hash: Option<&str>,
505 apply_id: Option<i64>,
506 ) -> Result<()> {
507 self.conn
508 .execute(
509 "INSERT INTO managed_resources (resource_type, resource_id, source, last_hash, last_applied)
510 VALUES (?1, ?2, ?3, ?4, ?5)
511 ON CONFLICT(resource_type, resource_id) DO UPDATE SET
512 source = excluded.source,
513 last_hash = excluded.last_hash,
514 last_applied = excluded.last_applied",
515 params![resource_type, resource_id, source, hash, apply_id],
516 )
517 ?;
518 Ok(())
519 }
520
521 pub fn is_resource_managed(&self, resource_type: &str, resource_id: &str) -> Result<bool> {
523 let count: i64 = self.conn.query_row(
524 "SELECT COUNT(*) FROM managed_resources WHERE resource_type = ?1 AND resource_id = ?2",
525 params![resource_type, resource_id],
526 |row| row.get(0),
527 )?;
528 Ok(count > 0)
529 }
530
531 pub fn last_apply(&self) -> Result<Option<ApplyRecord>> {
533 let result = self.conn.query_row(
534 "SELECT id, timestamp, profile, plan_hash, status, summary FROM applies ORDER BY id DESC LIMIT 1",
535 [],
536 |row| {
537 Ok(ApplyRecord {
538 id: row.get(0)?,
539 timestamp: row.get(1)?,
540 profile: row.get(2)?,
541 plan_hash: row.get(3)?,
542 status: ApplyStatus::from_str(&row.get::<_, String>(4)?),
543 summary: row.get(5)?,
544 })
545 },
546 );
547
548 match result {
549 Ok(record) => Ok(Some(record)),
550 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
551 Err(e) => Err(StateError::Database(e.to_string()).into()),
552 }
553 }
554
555 pub fn get_apply(&self, apply_id: i64) -> Result<Option<ApplyRecord>> {
557 let result = self.conn.query_row(
558 "SELECT id, timestamp, profile, plan_hash, status, summary FROM applies WHERE id = ?1",
559 params![apply_id],
560 |row| {
561 Ok(ApplyRecord {
562 id: row.get(0)?,
563 timestamp: row.get(1)?,
564 profile: row.get(2)?,
565 plan_hash: row.get(3)?,
566 status: ApplyStatus::from_str(&row.get::<_, String>(4)?),
567 summary: row.get(5)?,
568 })
569 },
570 );
571
572 match result {
573 Ok(record) => Ok(Some(record)),
574 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
575 Err(e) => Err(StateError::Database(e.to_string()).into()),
576 }
577 }
578
579 pub fn history(&self, limit: u32) -> Result<Vec<ApplyRecord>> {
581 let mut stmt = self
582 .conn
583 .prepare(
584 "SELECT id, timestamp, profile, plan_hash, status, summary FROM applies ORDER BY id DESC LIMIT ?1",
585 )
586 ?;
587
588 let records = stmt
589 .query_map(params![limit], |row| {
590 Ok(ApplyRecord {
591 id: row.get(0)?,
592 timestamp: row.get(1)?,
593 profile: row.get(2)?,
594 plan_hash: row.get(3)?,
595 status: ApplyStatus::from_str(&row.get::<_, String>(4)?),
596 summary: row.get(5)?,
597 })
598 })?
599 .collect::<std::result::Result<Vec<_>, _>>()?;
600
601 Ok(records)
602 }
603
604 pub fn managed_resources(&self) -> Result<Vec<ManagedResource>> {
606 let mut stmt = self
607 .conn
608 .prepare(
609 "SELECT resource_type, resource_id, source, last_hash, last_applied FROM managed_resources ORDER BY resource_type, resource_id",
610 )
611 ?;
612
613 let resources = stmt
614 .query_map([], |row| {
615 Ok(ManagedResource {
616 resource_type: row.get(0)?,
617 resource_id: row.get(1)?,
618 source: row.get(2)?,
619 last_hash: row.get(3)?,
620 last_applied: row.get(4)?,
621 })
622 })?
623 .collect::<std::result::Result<Vec<_>, _>>()?;
624
625 Ok(resources)
626 }
627
628 pub fn unresolved_drift(&self) -> Result<Vec<DriftEvent>> {
630 let mut stmt = self
631 .conn
632 .prepare(
633 "SELECT id, timestamp, resource_type, resource_id, expected, actual, resolved_by, source FROM drift_events WHERE resolved_by IS NULL ORDER BY timestamp DESC",
634 )
635 ?;
636
637 let events = stmt
638 .query_map([], |row| {
639 Ok(DriftEvent {
640 id: row.get(0)?,
641 timestamp: row.get(1)?,
642 resource_type: row.get(2)?,
643 resource_id: row.get(3)?,
644 expected: row.get(4)?,
645 actual: row.get(5)?,
646 resolved_by: row.get(6)?,
647 source: row.get(7)?,
648 })
649 })?
650 .collect::<std::result::Result<Vec<_>, _>>()?;
651
652 Ok(events)
653 }
654
655 pub fn upsert_config_source(
659 &self,
660 name: &str,
661 origin_url: &str,
662 origin_branch: &str,
663 last_commit: Option<&str>,
664 source_version: Option<&str>,
665 pinned_version: Option<&str>,
666 ) -> Result<i64> {
667 let timestamp = crate::utc_now_iso8601();
668 self.conn
669 .execute(
670 "INSERT INTO config_sources (name, origin_url, origin_branch, last_fetched, last_commit, source_version, pinned_version)
671 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
672 ON CONFLICT(name) DO UPDATE SET
673 origin_url = excluded.origin_url,
674 origin_branch = excluded.origin_branch,
675 last_fetched = excluded.last_fetched,
676 last_commit = excluded.last_commit,
677 source_version = excluded.source_version,
678 pinned_version = excluded.pinned_version",
679 params![name, origin_url, origin_branch, timestamp, last_commit, source_version, pinned_version],
680 )
681 ?;
682 Ok(self.conn.last_insert_rowid())
683 }
684
685 pub fn config_sources(&self) -> Result<Vec<ConfigSourceRecord>> {
687 let mut stmt = self
688 .conn
689 .prepare(
690 "SELECT id, name, origin_url, origin_branch, last_fetched, last_commit, source_version, pinned_version, status
691 FROM config_sources ORDER BY name",
692 )
693 ?;
694
695 let sources = stmt
696 .query_map([], |row| {
697 Ok(ConfigSourceRecord {
698 id: row.get(0)?,
699 name: row.get(1)?,
700 origin_url: row.get(2)?,
701 origin_branch: row.get(3)?,
702 last_fetched: row.get(4)?,
703 last_commit: row.get(5)?,
704 source_version: row.get(6)?,
705 pinned_version: row.get(7)?,
706 status: row.get(8)?,
707 })
708 })?
709 .collect::<std::result::Result<Vec<_>, _>>()?;
710
711 Ok(sources)
712 }
713
714 pub fn config_source_by_name(&self, name: &str) -> Result<Option<ConfigSourceRecord>> {
716 let result = self.conn.query_row(
717 "SELECT id, name, origin_url, origin_branch, last_fetched, last_commit, source_version, pinned_version, status
718 FROM config_sources WHERE name = ?1",
719 params![name],
720 |row| {
721 Ok(ConfigSourceRecord {
722 id: row.get(0)?,
723 name: row.get(1)?,
724 origin_url: row.get(2)?,
725 origin_branch: row.get(3)?,
726 last_fetched: row.get(4)?,
727 last_commit: row.get(5)?,
728 source_version: row.get(6)?,
729 pinned_version: row.get(7)?,
730 status: row.get(8)?,
731 })
732 },
733 );
734
735 match result {
736 Ok(record) => Ok(Some(record)),
737 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
738 Err(e) => Err(StateError::Database(e.to_string()).into()),
739 }
740 }
741
742 pub fn remove_config_source(&self, name: &str) -> Result<()> {
744 self.conn
745 .execute("DELETE FROM config_sources WHERE name = ?1", params![name])?;
746 Ok(())
747 }
748
749 pub fn update_config_source_status(&self, name: &str, status: &str) -> Result<()> {
751 self.conn.execute(
752 "UPDATE config_sources SET status = ?1 WHERE name = ?2",
753 params![status, name],
754 )?;
755 Ok(())
756 }
757
758 pub fn record_source_apply(
760 &self,
761 source_name: &str,
762 apply_id: i64,
763 source_commit: &str,
764 ) -> Result<()> {
765 let source = self.config_source_by_name(source_name)?;
766 if let Some(src) = source {
767 self.conn.execute(
768 "INSERT INTO source_applies (source_id, apply_id, source_commit)
769 VALUES (?1, ?2, ?3)",
770 params![src.id, apply_id, source_commit],
771 )?;
772 }
773 Ok(())
774 }
775
776 pub fn record_source_conflict(
778 &self,
779 source_name: &str,
780 resource_type: &str,
781 resource_id: &str,
782 resolution: &str,
783 detail: Option<&str>,
784 ) -> Result<()> {
785 let timestamp = crate::utc_now_iso8601();
786 self.conn
787 .execute(
788 "INSERT INTO source_conflicts (timestamp, source_name, resource_type, resource_id, resolution, detail)
789 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
790 params![timestamp, source_name, resource_type, resource_id, resolution, detail],
791 )
792 ?;
793 Ok(())
794 }
795
796 pub fn upsert_pending_decision(
801 &self,
802 source: &str,
803 resource: &str,
804 tier: &str,
805 action: &str,
806 summary: &str,
807 ) -> Result<i64> {
808 let timestamp = crate::utc_now_iso8601();
809 let updated = self.conn.execute(
811 "UPDATE pending_decisions SET tier = ?1, action = ?2, summary = ?3, created_at = ?4
812 WHERE source = ?5 AND resource = ?6 AND resolved_at IS NULL",
813 params![tier, action, summary, timestamp, source, resource],
814 )?;
815
816 if updated > 0 {
817 let id = self
818 .conn
819 .query_row(
820 "SELECT id FROM pending_decisions WHERE source = ?1 AND resource = ?2 AND resolved_at IS NULL",
821 params![source, resource],
822 |row| row.get(0),
823 )
824 ?;
825 return Ok(id);
826 }
827
828 self.conn.execute(
829 "INSERT INTO pending_decisions (source, resource, tier, action, summary, created_at)
830 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
831 params![source, resource, tier, action, summary, timestamp],
832 )?;
833 Ok(self.conn.last_insert_rowid())
834 }
835
836 pub fn pending_decisions(&self) -> Result<Vec<PendingDecision>> {
838 let mut stmt = self
839 .conn
840 .prepare(
841 "SELECT id, source, resource, tier, action, summary, created_at, resolved_at, resolution
842 FROM pending_decisions WHERE resolved_at IS NULL ORDER BY created_at DESC",
843 )
844 ?;
845
846 let rows = stmt
847 .query_map([], |row| {
848 Ok(PendingDecision {
849 id: row.get(0)?,
850 source: row.get(1)?,
851 resource: row.get(2)?,
852 tier: row.get(3)?,
853 action: row.get(4)?,
854 summary: row.get(5)?,
855 created_at: row.get(6)?,
856 resolved_at: row.get(7)?,
857 resolution: row.get(8)?,
858 })
859 })?
860 .collect::<std::result::Result<Vec<_>, _>>()?;
861
862 Ok(rows)
863 }
864
865 pub fn pending_decisions_for_source(&self, source: &str) -> Result<Vec<PendingDecision>> {
867 let mut stmt = self
868 .conn
869 .prepare(
870 "SELECT id, source, resource, tier, action, summary, created_at, resolved_at, resolution
871 FROM pending_decisions WHERE source = ?1 AND resolved_at IS NULL ORDER BY created_at DESC",
872 )
873 ?;
874
875 let rows = stmt
876 .query_map(params![source], |row| {
877 Ok(PendingDecision {
878 id: row.get(0)?,
879 source: row.get(1)?,
880 resource: row.get(2)?,
881 tier: row.get(3)?,
882 action: row.get(4)?,
883 summary: row.get(5)?,
884 created_at: row.get(6)?,
885 resolved_at: row.get(7)?,
886 resolution: row.get(8)?,
887 })
888 })?
889 .collect::<std::result::Result<Vec<_>, _>>()?;
890
891 Ok(rows)
892 }
893
894 pub fn resolve_decision(&self, resource: &str, resolution: &str) -> Result<bool> {
896 let timestamp = crate::utc_now_iso8601();
897 let updated = self.conn.execute(
898 "UPDATE pending_decisions SET resolved_at = ?1, resolution = ?2
899 WHERE resource = ?3 AND resolved_at IS NULL",
900 params![timestamp, resolution, resource],
901 )?;
902 Ok(updated > 0)
903 }
904
905 pub fn resolve_decisions_for_source(&self, source: &str, resolution: &str) -> Result<usize> {
907 let timestamp = crate::utc_now_iso8601();
908 let updated = self.conn.execute(
909 "UPDATE pending_decisions SET resolved_at = ?1, resolution = ?2
910 WHERE source = ?3 AND resolved_at IS NULL",
911 params![timestamp, resolution, source],
912 )?;
913 Ok(updated)
914 }
915
916 pub fn resolve_all_decisions(&self, resolution: &str) -> Result<usize> {
918 let timestamp = crate::utc_now_iso8601();
919 let updated = self.conn.execute(
920 "UPDATE pending_decisions SET resolved_at = ?1, resolution = ?2
921 WHERE resolved_at IS NULL",
922 params![timestamp, resolution],
923 )?;
924 Ok(updated)
925 }
926
927 pub fn source_config_hash(&self, source: &str) -> Result<Option<SourceConfigHash>> {
931 let result = self.conn.query_row(
932 "SELECT source, config_hash, merged_at FROM source_config_hashes WHERE source = ?1",
933 params![source],
934 |row| {
935 Ok(SourceConfigHash {
936 source: row.get(0)?,
937 config_hash: row.get(1)?,
938 merged_at: row.get(2)?,
939 })
940 },
941 );
942
943 match result {
944 Ok(record) => Ok(Some(record)),
945 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
946 Err(e) => Err(StateError::Database(e.to_string()).into()),
947 }
948 }
949
950 pub fn set_source_config_hash(&self, source: &str, config_hash: &str) -> Result<()> {
952 let timestamp = crate::utc_now_iso8601();
953 self.conn.execute(
954 "INSERT INTO source_config_hashes (source, config_hash, merged_at)
955 VALUES (?1, ?2, ?3)
956 ON CONFLICT(source) DO UPDATE SET
957 config_hash = excluded.config_hash,
958 merged_at = excluded.merged_at",
959 params![source, config_hash, timestamp],
960 )?;
961 Ok(())
962 }
963
964 pub fn remove_source_config_hash(&self, source: &str) -> Result<()> {
966 self.conn.execute(
967 "DELETE FROM source_config_hashes WHERE source = ?1",
968 params![source],
969 )?;
970 Ok(())
971 }
972
973 pub fn managed_resources_by_source(&self, source_name: &str) -> Result<Vec<ManagedResource>> {
975 let mut stmt = self.conn.prepare(
976 "SELECT resource_type, resource_id, source, last_hash, last_applied
977 FROM managed_resources WHERE source = ?1 ORDER BY resource_type, resource_id",
978 )?;
979
980 let resources = stmt
981 .query_map(params![source_name], |row| {
982 Ok(ManagedResource {
983 resource_type: row.get(0)?,
984 resource_id: row.get(1)?,
985 source: row.get(2)?,
986 last_hash: row.get(3)?,
987 last_applied: row.get(4)?,
988 })
989 })?
990 .collect::<std::result::Result<Vec<_>, _>>()?;
991
992 Ok(resources)
993 }
994
995 pub fn upsert_module_state(
999 &self,
1000 module_name: &str,
1001 last_applied: Option<i64>,
1002 packages_hash: &str,
1003 files_hash: &str,
1004 git_sources: Option<&str>,
1005 status: &str,
1006 ) -> Result<()> {
1007 let now = crate::utc_now_iso8601();
1008 self.conn
1009 .execute(
1010 "INSERT INTO module_state (module_name, installed_at, last_applied, packages_hash, files_hash, git_sources, status)
1011 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1012 ON CONFLICT(module_name) DO UPDATE SET
1013 last_applied = ?3,
1014 packages_hash = ?4,
1015 files_hash = ?5,
1016 git_sources = ?6,
1017 status = ?7",
1018 params![module_name, now, last_applied, packages_hash, files_hash, git_sources, status],
1019 )
1020 ?;
1021 Ok(())
1022 }
1023
1024 pub fn module_states(&self) -> Result<Vec<ModuleStateRecord>> {
1026 let mut stmt = self
1027 .conn
1028 .prepare(
1029 "SELECT module_name, installed_at, last_applied, packages_hash, files_hash, git_sources, status
1030 FROM module_state ORDER BY module_name",
1031 )
1032 ?;
1033
1034 let records = stmt
1035 .query_map([], |row| {
1036 Ok(ModuleStateRecord {
1037 module_name: row.get(0)?,
1038 installed_at: row.get(1)?,
1039 last_applied: row.get(2)?,
1040 packages_hash: row.get(3)?,
1041 files_hash: row.get(4)?,
1042 git_sources: row.get(5)?,
1043 status: row.get(6)?,
1044 })
1045 })?
1046 .collect::<std::result::Result<Vec<_>, _>>()?;
1047
1048 Ok(records)
1049 }
1050
1051 pub fn module_state_by_name(&self, module_name: &str) -> Result<Option<ModuleStateRecord>> {
1053 let mut stmt = self
1054 .conn
1055 .prepare(
1056 "SELECT module_name, installed_at, last_applied, packages_hash, files_hash, git_sources, status
1057 FROM module_state WHERE module_name = ?1",
1058 )
1059 ?;
1060
1061 let mut rows = stmt.query_map(params![module_name], |row| {
1062 Ok(ModuleStateRecord {
1063 module_name: row.get(0)?,
1064 installed_at: row.get(1)?,
1065 last_applied: row.get(2)?,
1066 packages_hash: row.get(3)?,
1067 files_hash: row.get(4)?,
1068 git_sources: row.get(5)?,
1069 status: row.get(6)?,
1070 })
1071 })?;
1072
1073 match rows.next() {
1074 Some(Ok(record)) => Ok(Some(record)),
1075 Some(Err(e)) => Err(StateError::Database(e.to_string()).into()),
1076 None => Ok(None),
1077 }
1078 }
1079
1080 pub fn remove_module_state(&self, module_name: &str) -> Result<()> {
1082 self.conn.execute(
1083 "DELETE FROM module_state WHERE module_name = ?1",
1084 params![module_name],
1085 )?;
1086 Ok(())
1087 }
1088
1089 pub fn store_file_backup(
1093 &self,
1094 apply_id: i64,
1095 file_path: &str,
1096 state: &crate::FileState,
1097 ) -> Result<()> {
1098 let timestamp = crate::utc_now_iso8601();
1099 self.conn.execute(
1100 "INSERT INTO file_backups (apply_id, file_path, content_hash, content, permissions, was_symlink, symlink_target, oversized, backed_up_at)
1101 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1102 params![
1103 apply_id,
1104 file_path,
1105 state.content_hash,
1106 state.content,
1107 state.permissions.map(|p| p as i64),
1108 state.is_symlink as i64,
1109 state.symlink_target.as_ref().map(|p| p.display().to_string()),
1110 state.oversized as i64,
1111 timestamp,
1112 ],
1113 )?;
1114 Ok(())
1115 }
1116
1117 pub fn get_file_backup(
1119 &self,
1120 apply_id: i64,
1121 file_path: &str,
1122 ) -> Result<Option<FileBackupRecord>> {
1123 let result = self.conn.query_row(
1124 "SELECT id, apply_id, file_path, content_hash, content, permissions, was_symlink, symlink_target, oversized, backed_up_at
1125 FROM file_backups WHERE apply_id = ?1 AND file_path = ?2",
1126 params![apply_id, file_path],
1127 |row| {
1128 Ok(FileBackupRecord {
1129 id: row.get(0)?,
1130 apply_id: row.get(1)?,
1131 file_path: row.get(2)?,
1132 content_hash: row.get(3)?,
1133 content: row.get(4)?,
1134 permissions: row.get::<_, Option<i64>>(5)?.map(|p| p as u32),
1135 was_symlink: row.get::<_, i64>(6)? != 0,
1136 symlink_target: row.get(7)?,
1137 oversized: row.get::<_, i64>(8)? != 0,
1138 backed_up_at: row.get(9)?,
1139 })
1140 },
1141 );
1142
1143 match result {
1144 Ok(record) => Ok(Some(record)),
1145 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1146 Err(e) => Err(StateError::Database(e.to_string()).into()),
1147 }
1148 }
1149
1150 pub fn get_apply_backups(&self, apply_id: i64) -> Result<Vec<FileBackupRecord>> {
1152 let mut stmt = self.conn.prepare(
1153 "SELECT id, apply_id, file_path, content_hash, content, permissions, was_symlink, symlink_target, oversized, backed_up_at
1154 FROM file_backups WHERE apply_id = ?1 ORDER BY id",
1155 )?;
1156
1157 let records = stmt
1158 .query_map(params![apply_id], |row| {
1159 Ok(FileBackupRecord {
1160 id: row.get(0)?,
1161 apply_id: row.get(1)?,
1162 file_path: row.get(2)?,
1163 content_hash: row.get(3)?,
1164 content: row.get(4)?,
1165 permissions: row.get::<_, Option<i64>>(5)?.map(|p| p as u32),
1166 was_symlink: row.get::<_, i64>(6)? != 0,
1167 symlink_target: row.get(7)?,
1168 oversized: row.get::<_, i64>(8)? != 0,
1169 backed_up_at: row.get(9)?,
1170 })
1171 })?
1172 .collect::<std::result::Result<Vec<_>, _>>()?;
1173
1174 Ok(records)
1175 }
1176
1177 pub fn latest_backup_for_path(&self, file_path: &str) -> Result<Option<FileBackupRecord>> {
1179 let result = self.conn.query_row(
1180 "SELECT id, apply_id, file_path, content_hash, content, permissions, was_symlink, symlink_target, oversized, backed_up_at
1181 FROM file_backups WHERE file_path = ?1 ORDER BY id DESC LIMIT 1",
1182 params![file_path],
1183 |row| {
1184 Ok(FileBackupRecord {
1185 id: row.get(0)?,
1186 apply_id: row.get(1)?,
1187 file_path: row.get(2)?,
1188 content_hash: row.get(3)?,
1189 content: row.get(4)?,
1190 permissions: row.get::<_, Option<i64>>(5)?.map(|p| p as u32),
1191 was_symlink: row.get::<_, i64>(6)? != 0,
1192 symlink_target: row.get(7)?,
1193 oversized: row.get::<_, i64>(8)? != 0,
1194 backed_up_at: row.get(9)?,
1195 })
1196 },
1197 );
1198
1199 match result {
1200 Ok(record) => Ok(Some(record)),
1201 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1202 Err(e) => Err(StateError::Database(e.to_string()).into()),
1203 }
1204 }
1205
1206 pub fn file_backups_after_apply(&self, after_apply_id: i64) -> Result<Vec<FileBackupRecord>> {
1210 let mut stmt = self.conn.prepare(
1213 "SELECT b.id, b.apply_id, b.file_path, b.content_hash, b.content, b.permissions,
1214 b.was_symlink, b.symlink_target, b.oversized, b.backed_up_at
1215 FROM file_backups b
1216 INNER JOIN (
1217 SELECT file_path, MIN(apply_id) AS min_apply_id
1218 FROM file_backups
1219 WHERE apply_id > ?1
1220 GROUP BY file_path
1221 ) earliest ON b.file_path = earliest.file_path AND b.apply_id = earliest.min_apply_id
1222 ORDER BY b.id",
1223 )?;
1224
1225 let records = stmt
1226 .query_map(params![after_apply_id], |row| {
1227 Ok(FileBackupRecord {
1228 id: row.get(0)?,
1229 apply_id: row.get(1)?,
1230 file_path: row.get(2)?,
1231 content_hash: row.get(3)?,
1232 content: row.get(4)?,
1233 permissions: row.get::<_, Option<i64>>(5)?.map(|p| p as u32),
1234 was_symlink: row.get::<_, i64>(6)? != 0,
1235 symlink_target: row.get(7)?,
1236 oversized: row.get::<_, i64>(8)? != 0,
1237 backed_up_at: row.get(9)?,
1238 })
1239 })?
1240 .collect::<std::result::Result<Vec<_>, _>>()?;
1241
1242 Ok(records)
1243 }
1244
1245 pub fn journal_entries_after_apply(&self, after_apply_id: i64) -> Result<Vec<JournalEntry>> {
1247 let mut stmt = self.conn.prepare(
1248 "SELECT id, apply_id, action_index, phase, action_type, resource_id, pre_state, post_state, status, error, started_at, completed_at, script_output
1249 FROM apply_journal WHERE apply_id > ?1 AND status = 'completed' ORDER BY apply_id DESC, action_index DESC",
1250 )?;
1251
1252 let records = stmt
1253 .query_map(params![after_apply_id], |row| {
1254 Ok(JournalEntry {
1255 id: row.get(0)?,
1256 apply_id: row.get(1)?,
1257 action_index: row.get(2)?,
1258 phase: row.get(3)?,
1259 action_type: row.get(4)?,
1260 resource_id: row.get(5)?,
1261 pre_state: row.get(6)?,
1262 post_state: row.get(7)?,
1263 status: row.get(8)?,
1264 error: row.get(9)?,
1265 started_at: row.get(10)?,
1266 completed_at: row.get(11)?,
1267 script_output: row.get(12)?,
1268 })
1269 })?
1270 .collect::<std::result::Result<Vec<_>, _>>()?;
1271
1272 Ok(records)
1273 }
1274
1275 pub fn prune_old_backups(&self, keep_last_n: usize) -> Result<usize> {
1277 let deleted: usize = self.conn.execute(
1278 "DELETE FROM file_backups WHERE apply_id NOT IN (
1279 SELECT id FROM applies ORDER BY id DESC LIMIT ?1
1280 )",
1281 params![keep_last_n as i64],
1282 )?;
1283 Ok(deleted)
1284 }
1285
1286 pub fn journal_begin(
1290 &self,
1291 apply_id: i64,
1292 action_index: usize,
1293 phase: &str,
1294 action_type: &str,
1295 resource_id: &str,
1296 pre_state: Option<&str>,
1297 ) -> Result<i64> {
1298 let timestamp = crate::utc_now_iso8601();
1299 self.conn.execute(
1300 "INSERT INTO apply_journal (apply_id, action_index, phase, action_type, resource_id, pre_state, status, started_at)
1301 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'pending', ?7)",
1302 params![apply_id, action_index as i64, phase, action_type, resource_id, pre_state, timestamp],
1303 )?;
1304 Ok(self.conn.last_insert_rowid())
1305 }
1306
1307 pub fn journal_complete(
1309 &self,
1310 journal_id: i64,
1311 post_state: Option<&str>,
1312 script_output: Option<&str>,
1313 ) -> Result<()> {
1314 let timestamp = crate::utc_now_iso8601();
1315 self.conn.execute(
1316 "UPDATE apply_journal SET status = 'completed', post_state = ?1, completed_at = ?2, script_output = ?3 WHERE id = ?4",
1317 params![post_state, timestamp, script_output, journal_id],
1318 )?;
1319 Ok(())
1320 }
1321
1322 pub fn journal_fail(&self, journal_id: i64, error: &str) -> Result<()> {
1324 let timestamp = crate::utc_now_iso8601();
1325 self.conn.execute(
1326 "UPDATE apply_journal SET status = 'failed', error = ?1, completed_at = ?2 WHERE id = ?3",
1327 params![error, timestamp, journal_id],
1328 )?;
1329 Ok(())
1330 }
1331
1332 pub fn journal_completed_actions(&self, apply_id: i64) -> Result<Vec<JournalEntry>> {
1334 self.query_journal(apply_id, Some("completed"))
1335 }
1336
1337 pub fn journal_entries(&self, apply_id: i64) -> Result<Vec<JournalEntry>> {
1339 self.query_journal(apply_id, None)
1340 }
1341
1342 fn query_journal(
1343 &self,
1344 apply_id: i64,
1345 status_filter: Option<&str>,
1346 ) -> Result<Vec<JournalEntry>> {
1347 let base_sql = if status_filter.is_some() {
1348 "SELECT id, apply_id, action_index, phase, action_type, resource_id, pre_state, post_state, status, error, started_at, completed_at, script_output
1349 FROM apply_journal WHERE apply_id = ?1 AND status = ?2 ORDER BY action_index"
1350 } else {
1351 "SELECT id, apply_id, action_index, phase, action_type, resource_id, pre_state, post_state, status, error, started_at, completed_at, script_output
1352 FROM apply_journal WHERE apply_id = ?1 ORDER BY action_index"
1353 };
1354
1355 let mut stmt = self.conn.prepare(base_sql)?;
1356
1357 let map_row = |row: &rusqlite::Row| -> rusqlite::Result<JournalEntry> {
1358 Ok(JournalEntry {
1359 id: row.get(0)?,
1360 apply_id: row.get(1)?,
1361 action_index: row.get(2)?,
1362 phase: row.get(3)?,
1363 action_type: row.get(4)?,
1364 resource_id: row.get(5)?,
1365 pre_state: row.get(6)?,
1366 post_state: row.get(7)?,
1367 status: row.get(8)?,
1368 error: row.get(9)?,
1369 started_at: row.get(10)?,
1370 completed_at: row.get(11)?,
1371 script_output: row.get(12)?,
1372 })
1373 };
1374
1375 let entries: Vec<JournalEntry> = if let Some(status) = status_filter {
1376 stmt.query_map(params![apply_id, status], map_row)?
1377 .collect::<std::result::Result<Vec<_>, _>>()?
1378 } else {
1379 stmt.query_map(params![apply_id], map_row)?
1380 .collect::<std::result::Result<Vec<_>, _>>()?
1381 };
1382
1383 Ok(entries)
1384 }
1385
1386 pub fn upsert_module_file(
1390 &self,
1391 module_name: &str,
1392 file_path: &str,
1393 content_hash: &str,
1394 strategy: &str,
1395 apply_id: i64,
1396 ) -> Result<()> {
1397 self.conn.execute(
1398 "INSERT INTO module_file_manifest (module_name, file_path, content_hash, strategy, last_applied)
1399 VALUES (?1, ?2, ?3, ?4, ?5)
1400 ON CONFLICT(module_name, file_path) DO UPDATE SET
1401 content_hash = excluded.content_hash,
1402 strategy = excluded.strategy,
1403 last_applied = excluded.last_applied",
1404 params![module_name, file_path, content_hash, strategy, apply_id],
1405 )?;
1406 Ok(())
1407 }
1408
1409 pub fn module_deployed_files(&self, module_name: &str) -> Result<Vec<ModuleFileRecord>> {
1411 let mut stmt = self.conn.prepare(
1412 "SELECT module_name, file_path, content_hash, strategy, last_applied
1413 FROM module_file_manifest WHERE module_name = ?1 ORDER BY file_path",
1414 )?;
1415
1416 let records = stmt
1417 .query_map(params![module_name], |row| {
1418 Ok(ModuleFileRecord {
1419 module_name: row.get(0)?,
1420 file_path: row.get(1)?,
1421 content_hash: row.get(2)?,
1422 strategy: row.get(3)?,
1423 last_applied: row.get(4)?,
1424 })
1425 })?
1426 .collect::<std::result::Result<Vec<_>, _>>()?;
1427
1428 Ok(records)
1429 }
1430
1431 pub fn delete_module_files(&self, module_name: &str) -> Result<()> {
1433 self.conn.execute(
1434 "DELETE FROM module_file_manifest WHERE module_name = ?1",
1435 params![module_name],
1436 )?;
1437 Ok(())
1438 }
1439
1440 pub fn update_apply_status(
1442 &self,
1443 apply_id: i64,
1444 status: ApplyStatus,
1445 summary: Option<&str>,
1446 ) -> Result<()> {
1447 self.conn.execute(
1448 "UPDATE applies SET status = ?1, summary = ?2 WHERE id = ?3",
1449 params![status.as_str(), summary, apply_id],
1450 )?;
1451 Ok(())
1452 }
1453
1454 pub fn store_compliance_snapshot(
1459 &self,
1460 snapshot: &crate::compliance::ComplianceSnapshot,
1461 hash: &str,
1462 ) -> Result<()> {
1463 let json = serde_json::to_string(snapshot)
1464 .map_err(|e| StateError::Database(format!("failed to serialize snapshot: {}", e)))?;
1465 self.conn.execute(
1466 "INSERT INTO compliance_snapshots (timestamp, content_hash, snapshot_json, summary_compliant, summary_warning, summary_violation)
1467 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1468 params![
1469 snapshot.timestamp,
1470 hash,
1471 json,
1472 snapshot.summary.compliant as i64,
1473 snapshot.summary.warning as i64,
1474 snapshot.summary.violation as i64,
1475 ],
1476 )?;
1477 Ok(())
1478 }
1479
1480 pub fn latest_compliance_hash(&self) -> Result<Option<String>> {
1482 let result = self.conn.query_row(
1483 "SELECT content_hash FROM compliance_snapshots ORDER BY id DESC LIMIT 1",
1484 [],
1485 |row| row.get(0),
1486 );
1487
1488 match result {
1489 Ok(hash) => Ok(Some(hash)),
1490 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1491 Err(e) => Err(StateError::Database(e.to_string()).into()),
1492 }
1493 }
1494
1495 pub fn compliance_history(
1498 &self,
1499 since: Option<&str>,
1500 limit: u32,
1501 ) -> Result<Vec<ComplianceHistoryRow>> {
1502 if let Some(since_ts) = since {
1503 let mut stmt = self.conn.prepare(
1504 "SELECT id, timestamp, summary_compliant, summary_warning, summary_violation
1505 FROM compliance_snapshots WHERE timestamp > ?1 ORDER BY id DESC LIMIT ?2",
1506 )?;
1507
1508 let rows = stmt
1509 .query_map(params![since_ts, limit], |row| {
1510 Ok(ComplianceHistoryRow {
1511 id: row.get(0)?,
1512 timestamp: row.get(1)?,
1513 compliant: row.get(2)?,
1514 warning: row.get(3)?,
1515 violation: row.get(4)?,
1516 })
1517 })?
1518 .collect::<std::result::Result<Vec<_>, _>>()?;
1519 Ok(rows)
1520 } else {
1521 let mut stmt = self.conn.prepare(
1522 "SELECT id, timestamp, summary_compliant, summary_warning, summary_violation
1523 FROM compliance_snapshots ORDER BY id DESC LIMIT ?1",
1524 )?;
1525
1526 let rows = stmt
1527 .query_map(params![limit], |row| {
1528 Ok(ComplianceHistoryRow {
1529 id: row.get(0)?,
1530 timestamp: row.get(1)?,
1531 compliant: row.get(2)?,
1532 warning: row.get(3)?,
1533 violation: row.get(4)?,
1534 })
1535 })?
1536 .collect::<std::result::Result<Vec<_>, _>>()?;
1537 Ok(rows)
1538 }
1539 }
1540
1541 pub fn get_compliance_snapshot(
1543 &self,
1544 id: i64,
1545 ) -> Result<Option<crate::compliance::ComplianceSnapshot>> {
1546 let result = self.conn.query_row(
1547 "SELECT snapshot_json FROM compliance_snapshots WHERE id = ?1",
1548 params![id],
1549 |row| row.get::<_, String>(0),
1550 );
1551
1552 match result {
1553 Ok(json) => {
1554 let snapshot: crate::compliance::ComplianceSnapshot = serde_json::from_str(&json)
1555 .map_err(|e| {
1556 StateError::Database(format!("failed to deserialize snapshot: {}", e))
1557 })?;
1558 Ok(Some(snapshot))
1559 }
1560 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1561 Err(e) => Err(StateError::Database(e.to_string()).into()),
1562 }
1563 }
1564
1565 pub fn prune_compliance_snapshots(&self, before_timestamp: &str) -> Result<usize> {
1568 let deleted = self.conn.execute(
1569 "DELETE FROM compliance_snapshots WHERE timestamp < ?1",
1570 params![before_timestamp],
1571 )?;
1572 Ok(deleted)
1573 }
1574}
1575
1576pub fn plan_hash(data: &str) -> String {
1578 crate::sha256_hex(data.as_bytes())
1579}
1580
1581pub fn default_state_dir() -> Result<PathBuf> {
1582 if let Ok(dir) = std::env::var("CFGD_STATE_DIR") {
1583 return Ok(PathBuf::from(dir));
1584 }
1585 let base = directories::BaseDirs::new().ok_or_else(|| StateError::DirectoryNotWritable {
1586 path: PathBuf::from("~/.local/share/cfgd"),
1587 })?;
1588 Ok(base.data_local_dir().join("cfgd"))
1589}
1590
1591const PENDING_CONFIG_FILENAME: &str = "pending-server-config.json";
1592
1593pub fn save_pending_server_config(config: &serde_json::Value) -> Result<PathBuf> {
1595 let dir = default_state_dir()?;
1596 std::fs::create_dir_all(&dir)
1597 .map_err(|_| StateError::DirectoryNotWritable { path: dir.clone() })?;
1598 let path = dir.join(PENDING_CONFIG_FILENAME);
1599 let json = serde_json::to_string_pretty(config)
1600 .map_err(|e| StateError::Database(format!("failed to serialize pending config: {}", e)))?;
1601 crate::atomic_write_str(&path, &json)
1602 .map_err(|_| StateError::DirectoryNotWritable { path: path.clone() })?;
1603 Ok(path)
1604}
1605
1606pub fn load_pending_server_config() -> Result<Option<serde_json::Value>> {
1608 let dir = default_state_dir()?;
1609 let path = dir.join(PENDING_CONFIG_FILENAME);
1610 if !path.exists() {
1611 return Ok(None);
1612 }
1613 let contents = std::fs::read_to_string(&path)
1614 .map_err(|_| StateError::DirectoryNotWritable { path: path.clone() })?;
1615 let value: serde_json::Value = serde_json::from_str(&contents)
1616 .map_err(|e| StateError::Database(format!("failed to parse pending config: {}", e)))?;
1617 Ok(Some(value))
1618}
1619
1620pub fn clear_pending_server_config() -> Result<()> {
1622 let dir = default_state_dir()?;
1623 let path = dir.join(PENDING_CONFIG_FILENAME);
1624 match std::fs::remove_file(&path) {
1625 Ok(()) => Ok(()),
1626 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1627 Err(_) => Err(StateError::DirectoryNotWritable { path }.into()),
1628 }
1629}
1630
1631#[cfg(test)]
1632mod tests {
1633 use super::*;
1634
1635 #[test]
1636 fn open_in_memory() {
1637 let store = StateStore::open_in_memory().unwrap();
1638 assert!(store.last_apply().unwrap().is_none());
1639 }
1640
1641 #[test]
1642 fn record_and_retrieve_apply() {
1643 let store = StateStore::open_in_memory().unwrap();
1644 let id = store
1645 .record_apply(
1646 "default",
1647 "abc123",
1648 ApplyStatus::Success,
1649 Some("{\"files\": 3}"),
1650 )
1651 .unwrap();
1652 assert!(id > 0);
1653
1654 let last = store.last_apply().unwrap().unwrap();
1655 assert_eq!(last.id, id);
1656 assert_eq!(last.profile, "default");
1657 assert_eq!(last.plan_hash, "abc123");
1658 assert_eq!(last.status, ApplyStatus::Success);
1659 assert_eq!(last.summary.as_deref(), Some("{\"files\": 3}"));
1660 }
1661
1662 #[test]
1663 fn history_returns_most_recent_first() {
1664 let store = StateStore::open_in_memory().unwrap();
1665 store
1666 .record_apply("p1", "h1", ApplyStatus::Success, None)
1667 .unwrap();
1668 store
1669 .record_apply("p2", "h2", ApplyStatus::Partial, None)
1670 .unwrap();
1671 store
1672 .record_apply("p3", "h3", ApplyStatus::Failed, None)
1673 .unwrap();
1674
1675 let history = store.history(10).unwrap();
1676 assert_eq!(history.len(), 3);
1677 assert_eq!(history[0].profile, "p3");
1678 assert_eq!(history[1].profile, "p2");
1679 assert_eq!(history[2].profile, "p1");
1680 }
1681
1682 #[test]
1683 fn history_respects_limit() {
1684 let store = StateStore::open_in_memory().unwrap();
1685 for i in 0..10 {
1686 store
1687 .record_apply(&format!("p{}", i), "h", ApplyStatus::Success, None)
1688 .unwrap();
1689 }
1690
1691 let history = store.history(3).unwrap();
1692 assert_eq!(history.len(), 3);
1693 }
1694
1695 #[test]
1696 fn record_and_retrieve_drift() {
1697 let store = StateStore::open_in_memory().unwrap();
1698 store
1699 .record_drift(
1700 "file",
1701 "/home/user/.zshrc",
1702 Some("abc"),
1703 Some("def"),
1704 "local",
1705 )
1706 .unwrap();
1707
1708 let events = store.unresolved_drift().unwrap();
1709 assert_eq!(events.len(), 1);
1710 assert_eq!(events[0].resource_type, "file");
1711 assert_eq!(events[0].resource_id, "/home/user/.zshrc");
1712 assert_eq!(events[0].expected.as_deref(), Some("abc"));
1713 assert_eq!(events[0].actual.as_deref(), Some("def"));
1714 assert!(events[0].resolved_by.is_none());
1715 }
1716
1717 #[test]
1718 fn resolve_drift_links_to_apply() {
1719 let store = StateStore::open_in_memory().unwrap();
1720 store
1721 .record_drift("file", "/test", Some("a"), Some("b"), "local")
1722 .unwrap();
1723
1724 let apply_id = store
1725 .record_apply("default", "h", ApplyStatus::Success, None)
1726 .unwrap();
1727 store.resolve_drift(apply_id, "file", "/test").unwrap();
1728
1729 let events = store.unresolved_drift().unwrap();
1730 assert!(events.is_empty());
1731 }
1732
1733 #[test]
1734 fn upsert_managed_resource() {
1735 let store = StateStore::open_in_memory().unwrap();
1736 store
1737 .upsert_managed_resource("file", "/home/.zshrc", "local", Some("hash1"), None)
1738 .unwrap();
1739
1740 let resources = store.managed_resources().unwrap();
1741 assert_eq!(resources.len(), 1);
1742 assert_eq!(resources[0].resource_type, "file");
1743 assert_eq!(resources[0].resource_id, "/home/.zshrc");
1744 assert_eq!(resources[0].last_hash.as_deref(), Some("hash1"));
1745
1746 store
1748 .upsert_managed_resource("file", "/home/.zshrc", "local", Some("hash2"), None)
1749 .unwrap();
1750
1751 let resources = store.managed_resources().unwrap();
1752 assert_eq!(resources.len(), 1);
1753 assert_eq!(resources[0].last_hash.as_deref(), Some("hash2"));
1754 }
1755
1756 #[test]
1757 fn is_resource_managed() {
1758 let store = StateStore::open_in_memory().unwrap();
1759
1760 assert!(!store.is_resource_managed("file", "/home/.zshrc").unwrap());
1761
1762 store
1763 .upsert_managed_resource("file", "/home/.zshrc", "local", Some("hash1"), None)
1764 .unwrap();
1765
1766 assert!(store.is_resource_managed("file", "/home/.zshrc").unwrap());
1767 assert!(!store.is_resource_managed("file", "/home/.bashrc").unwrap());
1768 assert!(
1769 !store
1770 .is_resource_managed("package", "/home/.zshrc")
1771 .unwrap()
1772 );
1773 }
1774
1775 #[test]
1776 fn managed_resources_unique_constraint() {
1777 let store = StateStore::open_in_memory().unwrap();
1778 store
1779 .upsert_managed_resource("file", "/a", "local", None, None)
1780 .unwrap();
1781 store
1782 .upsert_managed_resource("package", "/a", "local", None, None)
1783 .unwrap();
1784
1785 let resources = store.managed_resources().unwrap();
1786 assert_eq!(resources.len(), 2);
1787 }
1788
1789 #[test]
1790 fn plan_hash_is_deterministic() {
1791 let h1 = plan_hash("test plan data");
1792 let h2 = plan_hash("test plan data");
1793 assert_eq!(h1, h2);
1794 assert_ne!(h1, plan_hash("different data"));
1795 }
1796
1797 #[test]
1798 fn now_iso8601_format() {
1799 let ts = crate::utc_now_iso8601();
1800 assert!(ts.contains('T'));
1801 assert!(ts.ends_with('Z'));
1802 assert_eq!(ts.len(), 20);
1803 }
1804
1805 #[test]
1806 fn open_file_based_store() {
1807 let dir = tempfile::tempdir().unwrap();
1808 let db_path = dir.path().join("state.db");
1809
1810 let store = StateStore::open(&db_path).unwrap();
1811 store
1812 .record_apply("test", "hash", ApplyStatus::Success, None)
1813 .unwrap();
1814
1815 let store2 = StateStore::open(&db_path).unwrap();
1817 let last = store2.last_apply().unwrap().unwrap();
1818 assert_eq!(last.profile, "test");
1819 }
1820
1821 #[test]
1824 fn upsert_and_list_config_sources() {
1825 let store = StateStore::open_in_memory().unwrap();
1826 store
1827 .upsert_config_source(
1828 "acme",
1829 "git@github.com:acme/config.git",
1830 "master",
1831 Some("abc123"),
1832 Some("2.1.0"),
1833 Some("~2"),
1834 )
1835 .unwrap();
1836
1837 let sources = store.config_sources().unwrap();
1838 assert_eq!(sources.len(), 1);
1839 assert_eq!(sources[0].name, "acme");
1840 assert_eq!(sources[0].origin_url, "git@github.com:acme/config.git");
1841 assert_eq!(sources[0].last_commit.as_deref(), Some("abc123"));
1842 assert_eq!(sources[0].source_version.as_deref(), Some("2.1.0"));
1843 assert_eq!(sources[0].status, "active");
1844 }
1845
1846 #[test]
1847 fn config_source_by_name() {
1848 let store = StateStore::open_in_memory().unwrap();
1849 store
1850 .upsert_config_source("acme", "url", "main", None, None, None)
1851 .unwrap();
1852
1853 let found = store.config_source_by_name("acme").unwrap();
1854 assert!(found.is_some());
1855 assert_eq!(found.unwrap().name, "acme");
1856
1857 let not_found = store.config_source_by_name("nonexistent").unwrap();
1858 assert!(not_found.is_none());
1859 }
1860
1861 #[test]
1862 fn remove_config_source() {
1863 let store = StateStore::open_in_memory().unwrap();
1864 store
1865 .upsert_config_source("acme", "url", "main", None, None, None)
1866 .unwrap();
1867
1868 store.remove_config_source("acme").unwrap();
1869 let sources = store.config_sources().unwrap();
1870 assert!(sources.is_empty());
1871 }
1872
1873 #[test]
1874 fn update_config_source_status() {
1875 let store = StateStore::open_in_memory().unwrap();
1876 store
1877 .upsert_config_source("acme", "url", "main", None, None, None)
1878 .unwrap();
1879
1880 store
1881 .update_config_source_status("acme", "inactive")
1882 .unwrap();
1883 let source = store.config_source_by_name("acme").unwrap().unwrap();
1884 assert_eq!(source.status, "inactive");
1885 }
1886
1887 #[test]
1888 fn record_source_conflict() {
1889 let store = StateStore::open_in_memory().unwrap();
1890 store
1891 .record_source_conflict(
1892 "acme",
1893 "package",
1894 "git-secrets (brew)",
1895 "REQUIRED",
1896 Some("team requirement"),
1897 )
1898 .unwrap();
1899
1900 let count: i64 = store
1902 .conn
1903 .query_row(
1904 "SELECT COUNT(*) FROM source_conflicts WHERE source_name = ?1",
1905 params!["acme"],
1906 |row| row.get(0),
1907 )
1908 .unwrap();
1909 assert_eq!(count, 1, "one conflict should be recorded");
1910
1911 let (resource_type, resource_id, resolution, detail): (String, String, String, Option<String>) = store
1912 .conn
1913 .query_row(
1914 "SELECT resource_type, resource_id, resolution, detail FROM source_conflicts WHERE source_name = ?1",
1915 params!["acme"],
1916 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1917 )
1918 .unwrap();
1919 assert_eq!(resource_type, "package");
1920 assert_eq!(resource_id, "git-secrets (brew)");
1921 assert_eq!(resolution, "REQUIRED");
1922 assert_eq!(detail.as_deref(), Some("team requirement"));
1923 }
1924
1925 #[test]
1926 fn managed_resources_by_source() {
1927 let store = StateStore::open_in_memory().unwrap();
1928 store
1929 .upsert_managed_resource("file", "/a", "local", None, None)
1930 .unwrap();
1931 store
1932 .upsert_managed_resource("file", "/b", "acme", None, None)
1933 .unwrap();
1934 store
1935 .upsert_managed_resource("package", "git-secrets", "acme", None, None)
1936 .unwrap();
1937
1938 let acme_resources = store.managed_resources_by_source("acme").unwrap();
1939 assert_eq!(acme_resources.len(), 2);
1940
1941 let local_resources = store.managed_resources_by_source("local").unwrap();
1942 assert_eq!(local_resources.len(), 1);
1943 }
1944
1945 #[test]
1946 fn upsert_config_source_updates_on_conflict() {
1947 let store = StateStore::open_in_memory().unwrap();
1948 store
1949 .upsert_config_source("acme", "url1", "main", Some("commit1"), Some("1.0.0"), None)
1950 .unwrap();
1951 store
1952 .upsert_config_source(
1953 "acme",
1954 "url2",
1955 "dev",
1956 Some("commit2"),
1957 Some("2.0.0"),
1958 Some("~2"),
1959 )
1960 .unwrap();
1961
1962 let sources = store.config_sources().unwrap();
1963 assert_eq!(sources.len(), 1);
1964 assert_eq!(sources[0].origin_url, "url2");
1965 assert_eq!(sources[0].origin_branch, "dev");
1966 assert_eq!(sources[0].last_commit.as_deref(), Some("commit2"));
1967 assert_eq!(sources[0].source_version.as_deref(), Some("2.0.0"));
1968 }
1969
1970 #[test]
1973 fn upsert_and_list_pending_decisions() {
1974 let store = StateStore::open_in_memory().unwrap();
1975 let id = store
1976 .upsert_pending_decision(
1977 "acme",
1978 "packages.brew.k9s",
1979 "recommended",
1980 "install",
1981 "install k9s (recommended by acme)",
1982 )
1983 .unwrap();
1984 assert!(id > 0);
1985
1986 let decisions = store.pending_decisions().unwrap();
1987 assert_eq!(decisions.len(), 1);
1988 assert_eq!(decisions[0].source, "acme");
1989 assert_eq!(decisions[0].resource, "packages.brew.k9s");
1990 assert_eq!(decisions[0].tier, "recommended");
1991 assert_eq!(decisions[0].action, "install");
1992 assert!(decisions[0].resolved_at.is_none());
1993 }
1994
1995 #[test]
1996 fn upsert_pending_decision_updates_existing() {
1997 let store = StateStore::open_in_memory().unwrap();
1998 store
1999 .upsert_pending_decision(
2000 "acme",
2001 "packages.brew.k9s",
2002 "recommended",
2003 "install",
2004 "original summary",
2005 )
2006 .unwrap();
2007 store
2008 .upsert_pending_decision(
2009 "acme",
2010 "packages.brew.k9s",
2011 "recommended",
2012 "update",
2013 "updated summary",
2014 )
2015 .unwrap();
2016
2017 let decisions = store.pending_decisions().unwrap();
2018 assert_eq!(decisions.len(), 1);
2019 assert_eq!(decisions[0].action, "update");
2020 assert_eq!(decisions[0].summary, "updated summary");
2021 }
2022
2023 #[test]
2024 fn resolve_decision_by_resource() {
2025 let store = StateStore::open_in_memory().unwrap();
2026 store
2027 .upsert_pending_decision("acme", "packages.brew.k9s", "recommended", "install", "k9s")
2028 .unwrap();
2029
2030 let resolved = store
2031 .resolve_decision("packages.brew.k9s", "accepted")
2032 .unwrap();
2033 assert!(resolved);
2034
2035 let pending = store.pending_decisions().unwrap();
2036 assert!(pending.is_empty());
2037 }
2038
2039 #[test]
2040 fn resolve_decision_nonexistent_returns_false() {
2041 let store = StateStore::open_in_memory().unwrap();
2042 let resolved = store
2043 .resolve_decision("nonexistent.resource", "accepted")
2044 .unwrap();
2045 assert!(!resolved);
2046 }
2047
2048 #[test]
2049 fn resolve_decisions_for_source() {
2050 let store = StateStore::open_in_memory().unwrap();
2051 store
2052 .upsert_pending_decision("acme", "packages.brew.k9s", "recommended", "install", "k9s")
2053 .unwrap();
2054 store
2055 .upsert_pending_decision(
2056 "acme",
2057 "packages.brew.stern",
2058 "recommended",
2059 "install",
2060 "stern",
2061 )
2062 .unwrap();
2063 store
2064 .upsert_pending_decision("other", "packages.brew.bat", "optional", "install", "bat")
2065 .unwrap();
2066
2067 let count = store
2068 .resolve_decisions_for_source("acme", "accepted")
2069 .unwrap();
2070 assert_eq!(count, 2);
2071
2072 let pending = store.pending_decisions().unwrap();
2073 assert_eq!(pending.len(), 1);
2074 assert_eq!(pending[0].source, "other");
2075 }
2076
2077 #[test]
2078 fn resolve_all_decisions() {
2079 let store = StateStore::open_in_memory().unwrap();
2080 store
2081 .upsert_pending_decision("a", "r1", "recommended", "install", "s1")
2082 .unwrap();
2083 store
2084 .upsert_pending_decision("b", "r2", "optional", "install", "s2")
2085 .unwrap();
2086
2087 let count = store.resolve_all_decisions("accepted").unwrap();
2088 assert_eq!(count, 2);
2089
2090 let pending = store.pending_decisions().unwrap();
2091 assert!(pending.is_empty());
2092 }
2093
2094 #[test]
2095 fn pending_decisions_for_source() {
2096 let store = StateStore::open_in_memory().unwrap();
2097 store
2098 .upsert_pending_decision("acme", "r1", "recommended", "install", "s1")
2099 .unwrap();
2100 store
2101 .upsert_pending_decision("other", "r2", "optional", "install", "s2")
2102 .unwrap();
2103
2104 let acme = store.pending_decisions_for_source("acme").unwrap();
2105 assert_eq!(acme.len(), 1);
2106 assert_eq!(acme[0].resource, "r1");
2107 }
2108
2109 #[test]
2112 fn set_and_get_source_config_hash() {
2113 let store = StateStore::open_in_memory().unwrap();
2114 store.set_source_config_hash("acme", "hash123").unwrap();
2115
2116 let hash = store.source_config_hash("acme").unwrap().unwrap();
2117 assert_eq!(hash.config_hash, "hash123");
2118 }
2119
2120 #[test]
2121 fn source_config_hash_upsert() {
2122 let store = StateStore::open_in_memory().unwrap();
2123 store.set_source_config_hash("acme", "hash1").unwrap();
2124 store.set_source_config_hash("acme", "hash2").unwrap();
2125
2126 let hash = store.source_config_hash("acme").unwrap().unwrap();
2127 assert_eq!(hash.config_hash, "hash2");
2128 }
2129
2130 #[test]
2131 fn source_config_hash_not_found() {
2132 let store = StateStore::open_in_memory().unwrap();
2133 let hash = store.source_config_hash("nonexistent").unwrap();
2134 assert!(hash.is_none());
2135 }
2136
2137 #[test]
2138 fn remove_source_config_hash() {
2139 let store = StateStore::open_in_memory().unwrap();
2140 store.set_source_config_hash("acme", "hash1").unwrap();
2141 store.remove_source_config_hash("acme").unwrap();
2142
2143 let hash = store.source_config_hash("acme").unwrap();
2144 assert!(hash.is_none());
2145 }
2146
2147 #[test]
2148 fn file_backup_store_and_retrieve() {
2149 let store = StateStore::open_in_memory().unwrap();
2150 let apply_id = store
2151 .record_apply("test", "hash", ApplyStatus::Success, None)
2152 .unwrap();
2153
2154 let state = crate::FileState {
2155 content: b"original content".to_vec(),
2156 content_hash: "abc123".to_string(),
2157 permissions: Some(0o644),
2158 is_symlink: false,
2159 symlink_target: None,
2160 oversized: false,
2161 };
2162
2163 store
2164 .store_file_backup(apply_id, "/home/user/.bashrc", &state)
2165 .unwrap();
2166
2167 let backup = store
2168 .get_file_backup(apply_id, "/home/user/.bashrc")
2169 .unwrap()
2170 .unwrap();
2171 assert_eq!(backup.content, b"original content");
2172 assert_eq!(backup.content_hash, "abc123");
2173 assert_eq!(backup.permissions, Some(0o644));
2174 assert!(!backup.was_symlink);
2175 assert!(!backup.oversized);
2176 }
2177
2178 #[test]
2179 fn file_backup_symlink() {
2180 let store = StateStore::open_in_memory().unwrap();
2181 let apply_id = store
2182 .record_apply("test", "hash", ApplyStatus::Success, None)
2183 .unwrap();
2184
2185 let state = crate::FileState {
2186 content: Vec::new(),
2187 content_hash: String::new(),
2188 permissions: None,
2189 is_symlink: true,
2190 symlink_target: Some(PathBuf::from("/etc/original")),
2191 oversized: false,
2192 };
2193
2194 store
2195 .store_file_backup(apply_id, "/home/user/link", &state)
2196 .unwrap();
2197
2198 let backup = store
2199 .get_file_backup(apply_id, "/home/user/link")
2200 .unwrap()
2201 .unwrap();
2202 assert!(backup.was_symlink);
2203 assert_eq!(backup.symlink_target.unwrap(), "/etc/original");
2204 }
2205
2206 #[test]
2207 fn get_apply_backups_returns_all() {
2208 let store = StateStore::open_in_memory().unwrap();
2209 let apply_id = store
2210 .record_apply("test", "hash", ApplyStatus::Success, None)
2211 .unwrap();
2212
2213 for i in 0..3 {
2214 let state = crate::FileState {
2215 content: format!("content {}", i).into_bytes(),
2216 content_hash: format!("hash{}", i),
2217 permissions: Some(0o644),
2218 is_symlink: false,
2219 symlink_target: None,
2220 oversized: false,
2221 };
2222 store
2223 .store_file_backup(apply_id, &format!("/file{}", i), &state)
2224 .unwrap();
2225 }
2226
2227 let backups = store.get_apply_backups(apply_id).unwrap();
2228 assert_eq!(backups.len(), 3);
2229 }
2230
2231 #[test]
2232 fn latest_backup_for_path_returns_most_recent() {
2233 let store = StateStore::open_in_memory().unwrap();
2234
2235 for i in 0..3 {
2236 let apply_id = store
2237 .record_apply("test", &format!("hash{}", i), ApplyStatus::Success, None)
2238 .unwrap();
2239 let state = crate::FileState {
2240 content: format!("content v{}", i).into_bytes(),
2241 content_hash: format!("hash{}", i),
2242 permissions: Some(0o644),
2243 is_symlink: false,
2244 symlink_target: None,
2245 oversized: false,
2246 };
2247 store
2248 .store_file_backup(apply_id, "/home/user/.bashrc", &state)
2249 .unwrap();
2250 }
2251
2252 let backup = store
2253 .latest_backup_for_path("/home/user/.bashrc")
2254 .unwrap()
2255 .unwrap();
2256 assert_eq!(backup.content_hash, "hash2");
2257 }
2258
2259 #[test]
2260 fn journal_lifecycle() {
2261 let store = StateStore::open_in_memory().unwrap();
2262 let apply_id = store
2263 .record_apply("test", "hash", ApplyStatus::Success, None)
2264 .unwrap();
2265
2266 let j1 = store
2267 .journal_begin(apply_id, 0, "files", "create", "/home/user/.bashrc", None)
2268 .unwrap();
2269 store.journal_complete(j1, Some("hash123"), None).unwrap();
2270
2271 let j2 = store
2272 .journal_begin(apply_id, 1, "files", "update", "/home/user/.zshrc", None)
2273 .unwrap();
2274 store.journal_fail(j2, "permission denied").unwrap();
2275
2276 let j3 = store
2278 .journal_begin(apply_id, 2, "scripts", "run", "setup.sh", None)
2279 .unwrap();
2280 store
2281 .journal_complete(j3, None, Some("installed deps\nall good"))
2282 .unwrap();
2283
2284 let completed = store.journal_completed_actions(apply_id).unwrap();
2285 assert_eq!(completed.len(), 2);
2286 assert_eq!(completed[0].resource_id, "/home/user/.bashrc");
2287 assert_eq!(completed[0].status, "completed");
2288 assert!(completed[0].script_output.is_none());
2289 assert_eq!(completed[1].resource_id, "setup.sh");
2290 assert_eq!(
2291 completed[1].script_output.as_deref(),
2292 Some("installed deps\nall good")
2293 );
2294
2295 let all = store.journal_entries(apply_id).unwrap();
2297 assert_eq!(all.len(), 3);
2298 assert_eq!(all[1].status, "failed");
2299 }
2300
2301 #[test]
2302 fn module_file_manifest_crud() {
2303 let store = StateStore::open_in_memory().unwrap();
2304 let apply_id = store
2305 .record_apply("test", "hash", ApplyStatus::Success, None)
2306 .unwrap();
2307
2308 store
2309 .upsert_module_file(
2310 "nvim",
2311 "/home/user/.config/nvim/init.lua",
2312 "hash1",
2313 "Copy",
2314 apply_id,
2315 )
2316 .unwrap();
2317 store
2318 .upsert_module_file(
2319 "nvim",
2320 "/home/user/.config/nvim/lazy.lua",
2321 "hash2",
2322 "Copy",
2323 apply_id,
2324 )
2325 .unwrap();
2326
2327 let files = store.module_deployed_files("nvim").unwrap();
2328 assert_eq!(files.len(), 2);
2329 assert_eq!(files[0].file_path, "/home/user/.config/nvim/init.lua");
2330
2331 store
2333 .upsert_module_file(
2334 "nvim",
2335 "/home/user/.config/nvim/init.lua",
2336 "newhash",
2337 "Symlink",
2338 apply_id,
2339 )
2340 .unwrap();
2341 let files = store.module_deployed_files("nvim").unwrap();
2342 assert_eq!(files.len(), 2);
2343 assert_eq!(files[0].content_hash, "newhash");
2344 assert_eq!(files[0].strategy, "Symlink");
2345
2346 store.delete_module_files("nvim").unwrap();
2348 let files = store.module_deployed_files("nvim").unwrap();
2349 assert!(files.is_empty());
2350 }
2351
2352 #[test]
2353 fn prune_old_backups_keeps_recent() {
2354 let store = StateStore::open_in_memory().unwrap();
2355
2356 for i in 0..5 {
2358 let apply_id = store
2359 .record_apply("test", &format!("hash{}", i), ApplyStatus::Success, None)
2360 .unwrap();
2361 let state = crate::FileState {
2362 content: format!("content {}", i).into_bytes(),
2363 content_hash: format!("hash{}", i),
2364 permissions: Some(0o644),
2365 is_symlink: false,
2366 symlink_target: None,
2367 oversized: false,
2368 };
2369 store.store_file_backup(apply_id, "/file", &state).unwrap();
2370 }
2371
2372 let pruned = store.prune_old_backups(2).unwrap();
2374 assert_eq!(pruned, 3);
2375
2376 let all: i64 = store
2378 .conn
2379 .query_row("SELECT COUNT(*) FROM file_backups", [], |row| row.get(0))
2380 .unwrap();
2381 assert_eq!(all, 2);
2382 }
2383
2384 #[test]
2385 fn update_apply_status_works() {
2386 let store = StateStore::open_in_memory().unwrap();
2387 let apply_id = store
2388 .record_apply("test", "hash", ApplyStatus::Success, None)
2389 .unwrap();
2390
2391 store
2392 .update_apply_status(apply_id, ApplyStatus::Partial, Some("{\"failed\":1}"))
2393 .unwrap();
2394
2395 let record = store.last_apply().unwrap().unwrap();
2396 assert_eq!(record.status, ApplyStatus::Partial);
2397 assert_eq!(record.summary.unwrap(), "{\"failed\":1}");
2398 }
2399
2400 #[test]
2401 fn schema_version_is_4_after_migration() {
2402 let store = StateStore::open_in_memory().unwrap();
2403 let version = store.schema_version();
2404 assert_eq!(version, 4);
2405 }
2406
2407 fn make_test_snapshot() -> crate::compliance::ComplianceSnapshot {
2410 crate::compliance::ComplianceSnapshot {
2411 timestamp: crate::utc_now_iso8601(),
2412 machine: crate::compliance::MachineInfo {
2413 hostname: "test-host".into(),
2414 os: "linux".into(),
2415 arch: "x86_64".into(),
2416 },
2417 profile: "default".into(),
2418 sources: vec!["local".into()],
2419 checks: vec![
2420 crate::compliance::ComplianceCheck {
2421 category: "file".into(),
2422 target: Some("/home/user/.zshrc".into()),
2423 status: crate::compliance::ComplianceStatus::Compliant,
2424 detail: Some("present".into()),
2425 ..Default::default()
2426 },
2427 crate::compliance::ComplianceCheck {
2428 category: "package".into(),
2429 name: Some("ripgrep".into()),
2430 status: crate::compliance::ComplianceStatus::Violation,
2431 detail: Some("not installed".into()),
2432 ..Default::default()
2433 },
2434 crate::compliance::ComplianceCheck {
2435 category: "system".into(),
2436 key: Some("shell".into()),
2437 status: crate::compliance::ComplianceStatus::Warning,
2438 detail: Some("no configurator".into()),
2439 ..Default::default()
2440 },
2441 ],
2442 summary: crate::compliance::ComplianceSummary {
2443 compliant: 1,
2444 warning: 1,
2445 violation: 1,
2446 },
2447 }
2448 }
2449
2450 #[test]
2451 fn compliance_snapshot_roundtrip() {
2452 let store = StateStore::open_in_memory().unwrap();
2453 let snapshot = make_test_snapshot();
2454
2455 let json = serde_json::to_string(&snapshot).unwrap();
2456 let hash = crate::sha256_hex(json.as_bytes());
2457
2458 store.store_compliance_snapshot(&snapshot, &hash).unwrap();
2459
2460 let latest = store.latest_compliance_hash().unwrap().unwrap();
2462 assert_eq!(latest, hash);
2463
2464 let history = store.compliance_history(None, 10).unwrap();
2466 assert_eq!(history.len(), 1);
2467 let row = &history[0];
2468 assert_eq!(row.compliant, 1);
2469 assert_eq!(row.warning, 1);
2470 assert_eq!(row.violation, 1);
2471
2472 let retrieved = store.get_compliance_snapshot(row.id).unwrap().unwrap();
2474 assert_eq!(retrieved.profile, "default");
2475 assert_eq!(retrieved.checks.len(), 3);
2476 assert_eq!(retrieved.summary.compliant, 1);
2477 }
2478
2479 #[test]
2480 fn compliance_latest_hash_empty() {
2481 let store = StateStore::open_in_memory().unwrap();
2482 assert!(store.latest_compliance_hash().unwrap().is_none());
2483 }
2484
2485 #[test]
2486 fn compliance_latest_hash_returns_most_recent() {
2487 let store = StateStore::open_in_memory().unwrap();
2488
2489 let mut s1 = make_test_snapshot();
2490 s1.timestamp = "2026-01-01T00:00:00Z".into();
2491 store.store_compliance_snapshot(&s1, "hash1").unwrap();
2492
2493 let mut s2 = make_test_snapshot();
2494 s2.timestamp = "2026-01-02T00:00:00Z".into();
2495 store.store_compliance_snapshot(&s2, "hash2").unwrap();
2496
2497 let latest = store.latest_compliance_hash().unwrap().unwrap();
2498 assert_eq!(latest, "hash2");
2499 }
2500
2501 #[test]
2502 fn compliance_prune_removes_old_snapshots() {
2503 let store = StateStore::open_in_memory().unwrap();
2504
2505 let mut s1 = make_test_snapshot();
2506 s1.timestamp = "2026-01-01T00:00:00Z".into();
2507 store.store_compliance_snapshot(&s1, "hash1").unwrap();
2508
2509 let mut s2 = make_test_snapshot();
2510 s2.timestamp = "2026-01-15T00:00:00Z".into();
2511 store.store_compliance_snapshot(&s2, "hash2").unwrap();
2512
2513 let mut s3 = make_test_snapshot();
2514 s3.timestamp = "2026-02-01T00:00:00Z".into();
2515 store.store_compliance_snapshot(&s3, "hash3").unwrap();
2516
2517 let deleted = store
2519 .prune_compliance_snapshots("2026-02-01T00:00:00Z")
2520 .unwrap();
2521 assert_eq!(deleted, 2);
2522
2523 let history = store.compliance_history(None, 10).unwrap();
2524 assert_eq!(history.len(), 1);
2525 }
2526
2527 #[test]
2528 fn compliance_history_with_since() {
2529 let store = StateStore::open_in_memory().unwrap();
2530
2531 let mut s1 = make_test_snapshot();
2532 s1.timestamp = "2026-01-01T00:00:00Z".into();
2533 store.store_compliance_snapshot(&s1, "h1").unwrap();
2534
2535 let mut s2 = make_test_snapshot();
2536 s2.timestamp = "2026-01-10T00:00:00Z".into();
2537 store.store_compliance_snapshot(&s2, "h2").unwrap();
2538
2539 let mut s3 = make_test_snapshot();
2540 s3.timestamp = "2026-01-20T00:00:00Z".into();
2541 store.store_compliance_snapshot(&s3, "h3").unwrap();
2542
2543 let history = store
2544 .compliance_history(Some("2026-01-05T00:00:00Z"), 10)
2545 .unwrap();
2546 assert_eq!(history.len(), 2);
2547 }
2548
2549 #[test]
2550 fn compliance_get_nonexistent() {
2551 let store = StateStore::open_in_memory().unwrap();
2552 assert!(store.get_compliance_snapshot(999).unwrap().is_none());
2553 }
2554
2555 #[test]
2558 fn module_state_upsert_and_retrieve() {
2559 let store = StateStore::open_in_memory().unwrap();
2560
2561 let apply1 = store
2563 .record_apply("default", "h1", ApplyStatus::Success, None)
2564 .unwrap();
2565
2566 store
2567 .upsert_module_state(
2568 "nvim",
2569 Some(apply1),
2570 "pkg-hash-1",
2571 "file-hash-1",
2572 None,
2573 "installed",
2574 )
2575 .unwrap();
2576 store
2577 .upsert_module_state(
2578 "tmux",
2579 None,
2580 "pkg-hash-2",
2581 "file-hash-2",
2582 Some("https://github.com/example/tmux.git@abc123"),
2583 "installed",
2584 )
2585 .unwrap();
2586
2587 let states = store.module_states().unwrap();
2588 assert_eq!(states.len(), 2);
2589 assert_eq!(states[0].module_name, "nvim");
2591 assert_eq!(states[0].packages_hash, "pkg-hash-1");
2592 assert_eq!(states[0].files_hash, "file-hash-1");
2593 assert_eq!(states[0].status, "installed");
2594 assert_eq!(states[0].last_applied, Some(apply1));
2595 assert!(states[0].git_sources.is_none());
2596
2597 assert_eq!(states[1].module_name, "tmux");
2598 assert!(states[1].last_applied.is_none());
2599 assert_eq!(
2600 states[1].git_sources.as_deref(),
2601 Some("https://github.com/example/tmux.git@abc123")
2602 );
2603 }
2604
2605 #[test]
2606 fn module_state_by_name_found_and_not_found() {
2607 let store = StateStore::open_in_memory().unwrap();
2608
2609 let apply_id = store
2610 .record_apply("default", "h", ApplyStatus::Success, None)
2611 .unwrap();
2612
2613 store
2614 .upsert_module_state("shell", Some(apply_id), "h1", "h2", None, "installed")
2615 .unwrap();
2616
2617 let found = store.module_state_by_name("shell").unwrap();
2618 assert!(found.is_some());
2619 let rec = found.unwrap();
2620 assert_eq!(rec.module_name, "shell");
2621 assert_eq!(rec.last_applied, Some(apply_id));
2622
2623 let not_found = store.module_state_by_name("nonexistent").unwrap();
2624 assert!(not_found.is_none());
2625 }
2626
2627 #[test]
2628 fn module_state_upsert_updates_on_conflict() {
2629 let store = StateStore::open_in_memory().unwrap();
2630
2631 let apply1 = store
2632 .record_apply("default", "h1", ApplyStatus::Success, None)
2633 .unwrap();
2634 let apply2 = store
2635 .record_apply("default", "h2", ApplyStatus::Success, None)
2636 .unwrap();
2637
2638 store
2639 .upsert_module_state(
2640 "nvim",
2641 Some(apply1),
2642 "old-pkg",
2643 "old-file",
2644 None,
2645 "installed",
2646 )
2647 .unwrap();
2648 store
2649 .upsert_module_state("nvim", Some(apply2), "new-pkg", "new-file", None, "updated")
2650 .unwrap();
2651
2652 let states = store.module_states().unwrap();
2653 assert_eq!(
2654 states.len(),
2655 1,
2656 "upsert should update, not insert duplicate"
2657 );
2658 assert_eq!(states[0].packages_hash, "new-pkg");
2659 assert_eq!(states[0].files_hash, "new-file");
2660 assert_eq!(states[0].status, "updated");
2661 assert_eq!(states[0].last_applied, Some(apply2));
2662 }
2663
2664 #[test]
2665 fn module_state_remove() {
2666 let store = StateStore::open_in_memory().unwrap();
2667
2668 store
2669 .upsert_module_state("nvim", None, "h1", "h2", None, "installed")
2670 .unwrap();
2671 store
2672 .upsert_module_state("tmux", None, "h3", "h4", None, "installed")
2673 .unwrap();
2674
2675 assert_eq!(store.module_states().unwrap().len(), 2);
2676
2677 store.remove_module_state("nvim").unwrap();
2678 let states = store.module_states().unwrap();
2679 assert_eq!(states.len(), 1);
2680 assert_eq!(states[0].module_name, "tmux");
2681
2682 store.remove_module_state("nonexistent").unwrap();
2684 assert_eq!(store.module_states().unwrap().len(), 1);
2685 }
2686
2687 #[test]
2690 fn record_source_apply_links_to_source() {
2691 let store = StateStore::open_in_memory().unwrap();
2692
2693 store
2695 .upsert_config_source(
2696 "acme",
2697 "https://github.com/acme/config.git",
2698 "main",
2699 None,
2700 None,
2701 None,
2702 )
2703 .unwrap();
2704
2705 let apply_id = store
2707 .record_apply("default", "plan-hash-1", ApplyStatus::Success, None)
2708 .unwrap();
2709 store
2710 .record_source_apply("acme", apply_id, "abc123def")
2711 .unwrap();
2712
2713 let source = store.config_source_by_name("acme").unwrap();
2715 assert!(source.is_some());
2716 }
2717
2718 #[test]
2719 fn record_source_apply_nonexistent_source_is_noop() {
2720 let store = StateStore::open_in_memory().unwrap();
2721
2722 let apply_id = store
2723 .record_apply("default", "plan-hash-1", ApplyStatus::Success, None)
2724 .unwrap();
2725
2726 store
2728 .record_source_apply("nonexistent", apply_id, "abc123")
2729 .unwrap();
2730
2731 let count: i64 = store
2733 .conn
2734 .query_row("SELECT COUNT(*) FROM source_applies", [], |row| row.get(0))
2735 .unwrap();
2736 assert_eq!(
2737 count, 0,
2738 "no source_applies row should exist for nonexistent source"
2739 );
2740
2741 let source = store.config_source_by_name("nonexistent").unwrap();
2743 assert!(source.is_none(), "nonexistent source should not be created");
2744 }
2745
2746 #[test]
2749 fn file_backups_after_apply_returns_earliest_per_path() {
2750 let store = StateStore::open_in_memory().unwrap();
2751
2752 let apply1 = store
2753 .record_apply("default", "hash1", ApplyStatus::Success, None)
2754 .unwrap();
2755 let apply2 = store
2756 .record_apply("default", "hash2", ApplyStatus::Success, None)
2757 .unwrap();
2758 let apply3 = store
2759 .record_apply("default", "hash3", ApplyStatus::Success, None)
2760 .unwrap();
2761
2762 let state_v1 = crate::FileState {
2764 content: b"version1".to_vec(),
2765 content_hash: "hash-v1".into(),
2766 permissions: None,
2767 is_symlink: false,
2768 symlink_target: None,
2769 oversized: false,
2770 };
2771 let state_v2 = crate::FileState {
2772 content: b"version2".to_vec(),
2773 content_hash: "hash-v2".into(),
2774 permissions: None,
2775 is_symlink: false,
2776 symlink_target: None,
2777 oversized: false,
2778 };
2779
2780 store
2781 .store_file_backup(apply2, "/etc/config", &state_v1)
2782 .unwrap();
2783 store
2784 .store_file_backup(apply3, "/etc/config", &state_v2)
2785 .unwrap();
2786
2787 let backups = store.file_backups_after_apply(apply1).unwrap();
2789 assert_eq!(backups.len(), 1);
2790 assert_eq!(backups[0].file_path, "/etc/config");
2791 assert_eq!(backups[0].apply_id, apply2);
2792 assert_eq!(backups[0].content_hash, "hash-v1");
2793
2794 let backups_after_2 = store.file_backups_after_apply(apply2).unwrap();
2796 assert_eq!(backups_after_2.len(), 1);
2797 assert_eq!(backups_after_2[0].apply_id, apply3);
2798 assert_eq!(backups_after_2[0].content_hash, "hash-v2");
2799
2800 let backups_after_3 = store.file_backups_after_apply(apply3).unwrap();
2802 assert!(backups_after_3.is_empty());
2803 }
2804
2805 #[test]
2808 fn journal_entries_after_apply_returns_completed_desc() {
2809 let store = StateStore::open_in_memory().unwrap();
2810
2811 let apply1 = store
2812 .record_apply("default", "hash1", ApplyStatus::Success, None)
2813 .unwrap();
2814 let apply2 = store
2815 .record_apply("default", "hash2", ApplyStatus::Success, None)
2816 .unwrap();
2817
2818 let j1 = store
2820 .journal_begin(apply2, 0, "Packages", "install", "brew:curl", None)
2821 .unwrap();
2822 store.journal_complete(j1, None, None).unwrap();
2823 let j2 = store
2824 .journal_begin(apply2, 1, "Packages", "install", "brew:wget", None)
2825 .unwrap();
2826 store.journal_complete(j2, None, None).unwrap();
2827 let j3 = store
2829 .journal_begin(apply2, 2, "Packages", "install", "brew:vim", None)
2830 .unwrap();
2831 store.journal_fail(j3, "package not found").unwrap();
2832
2833 let entries = store.journal_entries_after_apply(apply1).unwrap();
2834 assert_eq!(
2835 entries.len(),
2836 2,
2837 "should return only completed entries, not failed"
2838 );
2839 assert_eq!(entries[0].resource_id, "brew:wget");
2841 assert_eq!(entries[1].resource_id, "brew:curl");
2842 assert_eq!(entries[0].status, "completed");
2843 assert_eq!(entries[1].status, "completed");
2844 }
2845
2846 #[test]
2849 fn concurrent_in_memory_stores_are_independent() {
2850 let store_a = StateStore::open_in_memory().unwrap();
2851 let store_b = StateStore::open_in_memory().unwrap();
2852
2853 store_a
2854 .record_apply("default", "hash-a", ApplyStatus::Success, None)
2855 .unwrap();
2856
2857 assert!(store_b.last_apply().unwrap().is_none());
2859 assert_eq!(store_a.history(10).unwrap().len(), 1);
2860 assert_eq!(store_b.history(10).unwrap().len(), 0);
2861 }
2862
2863 #[test]
2866 fn schema_version_after_open() {
2867 let store = StateStore::open_in_memory().unwrap();
2868 let version = store.schema_version();
2869 assert!(
2870 version >= 4,
2871 "schema version should be at least 4 after migrations: got {version}"
2872 );
2873 }
2874
2875 #[test]
2878 fn get_apply_existing_and_nonexistent() {
2879 let store = StateStore::open_in_memory().unwrap();
2880
2881 let apply_id = store
2882 .record_apply(
2883 "default",
2884 "plan-hash",
2885 ApplyStatus::Success,
2886 Some("{\"summary\": true}"),
2887 )
2888 .unwrap();
2889
2890 let found = store.get_apply(apply_id).unwrap();
2891 assert!(found.is_some());
2892 let rec = found.unwrap();
2893 assert_eq!(rec.id, apply_id);
2894 assert_eq!(rec.plan_hash, "plan-hash");
2895 assert_eq!(rec.status, ApplyStatus::Success);
2896 assert_eq!(rec.summary.as_deref(), Some("{\"summary\": true}"));
2897
2898 let not_found = store.get_apply(99999).unwrap();
2899 assert!(not_found.is_none());
2900 }
2901
2902 #[test]
2905 fn update_apply_status_changes_status() {
2906 let store = StateStore::open_in_memory().unwrap();
2907
2908 let apply_id = store
2909 .record_apply("default", "hash", ApplyStatus::InProgress, None)
2910 .unwrap();
2911
2912 store
2913 .update_apply_status(apply_id, ApplyStatus::Success, Some("{\"total\": 5}"))
2914 .unwrap();
2915
2916 let rec = store.get_apply(apply_id).unwrap().unwrap();
2917 assert_eq!(rec.status, ApplyStatus::Success);
2918 assert_eq!(rec.summary.as_deref(), Some("{\"total\": 5}"));
2919 }
2920}