Skip to main content

cfgd_core/state/
mod.rs

1use std::path::{Path, PathBuf};
2
3use rusqlite::{Connection, params};
4use serde::Serialize;
5
6use crate::errors::{Result, StateError};
7
8const MIGRATIONS: &[&str] = &[
9    "CREATE TABLE IF NOT EXISTS applies (
10        id INTEGER PRIMARY KEY AUTOINCREMENT,
11        timestamp TEXT NOT NULL,
12        profile TEXT NOT NULL,
13        plan_hash TEXT NOT NULL,
14        status TEXT NOT NULL,
15        summary TEXT
16    );
17
18    CREATE TABLE IF NOT EXISTS drift_events (
19        id INTEGER PRIMARY KEY AUTOINCREMENT,
20        timestamp TEXT NOT NULL,
21        resource_type TEXT NOT NULL,
22        resource_id TEXT NOT NULL,
23        expected TEXT,
24        actual TEXT,
25        source TEXT NOT NULL DEFAULT 'local',
26        resolved_by INTEGER,
27        FOREIGN KEY (resolved_by) REFERENCES applies(id)
28    );
29
30    CREATE TABLE IF NOT EXISTS managed_resources (
31        id INTEGER PRIMARY KEY AUTOINCREMENT,
32        resource_type TEXT NOT NULL,
33        resource_id TEXT NOT NULL,
34        source TEXT NOT NULL DEFAULT 'local',
35        last_hash TEXT,
36        last_applied INTEGER,
37        UNIQUE(resource_type, resource_id),
38        FOREIGN KEY (last_applied) REFERENCES applies(id)
39    );
40
41    CREATE TABLE IF NOT EXISTS config_sources (
42        id INTEGER PRIMARY KEY AUTOINCREMENT,
43        name TEXT NOT NULL UNIQUE,
44        origin_url TEXT NOT NULL,
45        origin_branch TEXT NOT NULL DEFAULT 'main',
46        last_fetched TEXT,
47        last_commit TEXT,
48        source_version TEXT,
49        pinned_version TEXT,
50        status TEXT NOT NULL DEFAULT 'active'
51    );
52
53    CREATE TABLE IF NOT EXISTS source_applies (
54        id INTEGER PRIMARY KEY AUTOINCREMENT,
55        source_id INTEGER NOT NULL,
56        apply_id INTEGER NOT NULL,
57        source_commit TEXT NOT NULL,
58        FOREIGN KEY (source_id) REFERENCES config_sources(id),
59        FOREIGN KEY (apply_id) REFERENCES applies(id)
60    );
61
62    CREATE TABLE IF NOT EXISTS source_conflicts (
63        id INTEGER PRIMARY KEY AUTOINCREMENT,
64        timestamp TEXT NOT NULL,
65        source_name TEXT NOT NULL,
66        resource_type TEXT NOT NULL,
67        resource_id TEXT NOT NULL,
68        resolution TEXT NOT NULL,
69        detail TEXT
70    );
71
72    CREATE TABLE IF NOT EXISTS pending_decisions (
73        id          INTEGER PRIMARY KEY AUTOINCREMENT,
74        source      TEXT NOT NULL,
75        resource    TEXT NOT NULL,
76        tier        TEXT NOT NULL,
77        action      TEXT NOT NULL,
78        summary     TEXT NOT NULL,
79        created_at  TEXT NOT NULL,
80        resolved_at TEXT,
81        resolution  TEXT
82    );
83
84    CREATE UNIQUE INDEX IF NOT EXISTS idx_pending_decisions_source_resource
85        ON pending_decisions (source, resource)
86        WHERE resolved_at IS NULL;
87
88    CREATE TABLE IF NOT EXISTS source_config_hashes (
89        source      TEXT PRIMARY KEY,
90        config_hash TEXT NOT NULL,
91        merged_at   TEXT NOT NULL
92    );
93
94    CREATE TABLE IF NOT EXISTS module_state (
95        id              INTEGER PRIMARY KEY AUTOINCREMENT,
96        module_name     TEXT NOT NULL UNIQUE,
97        installed_at    TEXT NOT NULL,
98        last_applied    INTEGER,
99        packages_hash   TEXT NOT NULL,
100        files_hash      TEXT NOT NULL,
101        git_sources     TEXT,
102        status          TEXT NOT NULL DEFAULT 'installed',
103        FOREIGN KEY (last_applied) REFERENCES applies(id)
104    );
105
106    CREATE TABLE IF NOT EXISTS schema_version (
107        version INTEGER NOT NULL
108    );
109
110    INSERT INTO schema_version (version) VALUES (0);",
111    // Migration 2: File safety — backup store, transaction journal, module file manifest
112    "CREATE TABLE IF NOT EXISTS file_backups (
113        id              INTEGER PRIMARY KEY AUTOINCREMENT,
114        apply_id        INTEGER NOT NULL,
115        file_path       TEXT NOT NULL,
116        content_hash    TEXT NOT NULL,
117        content         BLOB NOT NULL,
118        permissions     INTEGER,
119        was_symlink     INTEGER NOT NULL DEFAULT 0,
120        symlink_target  TEXT,
121        oversized       INTEGER NOT NULL DEFAULT 0,
122        backed_up_at    TEXT NOT NULL,
123        FOREIGN KEY (apply_id) REFERENCES applies(id)
124    );
125
126    CREATE INDEX IF NOT EXISTS idx_file_backups_apply ON file_backups (apply_id);
127    CREATE INDEX IF NOT EXISTS idx_file_backups_path ON file_backups (file_path);
128
129    CREATE TABLE IF NOT EXISTS apply_journal (
130        id              INTEGER PRIMARY KEY AUTOINCREMENT,
131        apply_id        INTEGER NOT NULL,
132        action_index    INTEGER NOT NULL,
133        phase           TEXT NOT NULL,
134        action_type     TEXT NOT NULL,
135        resource_id     TEXT NOT NULL,
136        pre_state       TEXT,
137        post_state      TEXT,
138        status          TEXT NOT NULL DEFAULT 'pending',
139        error           TEXT,
140        started_at      TEXT NOT NULL,
141        completed_at    TEXT,
142        FOREIGN KEY (apply_id) REFERENCES applies(id)
143    );
144
145    CREATE INDEX IF NOT EXISTS idx_apply_journal_apply ON apply_journal (apply_id);
146
147    CREATE TABLE IF NOT EXISTS module_file_manifest (
148        id              INTEGER PRIMARY KEY AUTOINCREMENT,
149        module_name     TEXT NOT NULL,
150        file_path       TEXT NOT NULL,
151        content_hash    TEXT NOT NULL,
152        strategy        TEXT NOT NULL,
153        last_applied    INTEGER,
154        UNIQUE(module_name, file_path),
155        FOREIGN KEY (last_applied) REFERENCES applies(id)
156    );
157
158    CREATE INDEX IF NOT EXISTS idx_module_file_manifest_module ON module_file_manifest (module_name);",
159    // Migration 3: Script output capture — store stdout/stderr from script actions
160    "ALTER TABLE apply_journal ADD COLUMN script_output TEXT;",
161    // Migration 4: Compliance snapshots — periodic machine state snapshots
162    "CREATE TABLE IF NOT EXISTS compliance_snapshots (
163        id INTEGER PRIMARY KEY AUTOINCREMENT,
164        timestamp TEXT NOT NULL,
165        content_hash TEXT NOT NULL,
166        snapshot_json TEXT NOT NULL,
167        summary_compliant INTEGER NOT NULL,
168        summary_warning INTEGER NOT NULL,
169        summary_violation INTEGER NOT NULL
170    );",
171];
172
173/// Apply status for a reconciliation run.
174#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
175pub enum ApplyStatus {
176    /// Apply completed with all actions successful.
177    Success,
178    /// Apply completed but some actions failed.
179    Partial,
180    /// Apply failed entirely.
181    Failed,
182    /// Apply is currently in progress (not yet finished).
183    InProgress,
184}
185
186impl ApplyStatus {
187    fn as_str(&self) -> &str {
188        match self {
189            ApplyStatus::Success => "success",
190            ApplyStatus::Partial => "partial",
191            ApplyStatus::Failed => "failed",
192            ApplyStatus::InProgress => "in_progress",
193        }
194    }
195
196    fn from_str(s: &str) -> Self {
197        match s {
198            "success" => ApplyStatus::Success,
199            "partial" => ApplyStatus::Partial,
200            "in_progress" => ApplyStatus::InProgress,
201            "failed" => ApplyStatus::Failed,
202            _ => ApplyStatus::Failed,
203        }
204    }
205}
206
207/// A recorded apply operation.
208#[derive(Debug, Clone, Serialize)]
209pub struct ApplyRecord {
210    pub id: i64,
211    pub timestamp: String,
212    pub profile: String,
213    pub plan_hash: String,
214    pub status: ApplyStatus,
215    pub summary: Option<String>,
216}
217
218/// A recorded drift event.
219#[derive(Debug, Clone, Serialize)]
220pub struct DriftEvent {
221    pub id: i64,
222    pub timestamp: String,
223    pub resource_type: String,
224    pub resource_id: String,
225    pub expected: Option<String>,
226    pub actual: Option<String>,
227    pub resolved_by: Option<i64>,
228    pub source: String,
229}
230
231/// A managed resource tracked in the state store.
232#[derive(Debug, Clone, Serialize)]
233pub struct ManagedResource {
234    pub resource_type: String,
235    pub resource_id: String,
236    pub source: String,
237    pub last_hash: Option<String>,
238    pub last_applied: Option<i64>,
239}
240
241/// A tracked config source.
242#[derive(Debug, Clone, Serialize)]
243pub struct ConfigSourceRecord {
244    pub id: i64,
245    pub name: String,
246    pub origin_url: String,
247    pub origin_branch: String,
248    pub last_fetched: Option<String>,
249    pub last_commit: Option<String>,
250    pub source_version: Option<String>,
251    pub pinned_version: Option<String>,
252    pub status: String,
253}
254
255/// A conflict record from composition.
256#[derive(Debug, Clone)]
257pub struct SourceConflictRecord {
258    pub id: i64,
259    pub timestamp: String,
260    pub source_name: String,
261    pub resource_type: String,
262    pub resource_id: String,
263    pub resolution: String,
264    pub detail: Option<String>,
265}
266
267/// A pending decision for a source item needing user review.
268#[derive(Debug, Clone, Serialize)]
269pub struct PendingDecision {
270    pub id: i64,
271    pub source: String,
272    pub resource: String,
273    pub tier: String,
274    pub action: String,
275    pub summary: String,
276    pub created_at: String,
277    pub resolved_at: Option<String>,
278    pub resolution: Option<String>,
279}
280
281/// A stored config hash for detecting source changes.
282#[derive(Debug, Clone)]
283pub struct SourceConfigHash {
284    pub source: String,
285    pub config_hash: String,
286    pub merged_at: String,
287}
288
289/// A module's state in the state store.
290#[derive(Debug, Clone, Serialize)]
291pub struct ModuleStateRecord {
292    pub module_name: String,
293    pub installed_at: String,
294    pub last_applied: Option<i64>,
295    pub packages_hash: String,
296    pub files_hash: String,
297    pub git_sources: Option<String>,
298    pub status: String,
299}
300
301/// A file backup record from the safety store.
302#[derive(Debug, Clone)]
303pub struct FileBackupRecord {
304    pub id: i64,
305    pub apply_id: i64,
306    pub file_path: String,
307    pub content_hash: String,
308    pub content: Vec<u8>,
309    pub permissions: Option<u32>,
310    pub was_symlink: bool,
311    pub symlink_target: Option<String>,
312    pub oversized: bool,
313    pub backed_up_at: String,
314}
315
316/// A journal entry for a single action within an apply.
317#[derive(Debug, Clone)]
318pub struct JournalEntry {
319    pub id: i64,
320    pub apply_id: i64,
321    pub action_index: i64,
322    pub phase: String,
323    pub action_type: String,
324    pub resource_id: String,
325    pub pre_state: Option<String>,
326    pub post_state: Option<String>,
327    pub status: String,
328    pub error: Option<String>,
329    pub started_at: String,
330    pub completed_at: Option<String>,
331    pub script_output: Option<String>,
332}
333
334/// A compliance snapshot summary row from the state store.
335#[derive(Debug, Clone, Serialize)]
336pub struct ComplianceHistoryRow {
337    pub id: i64,
338    pub timestamp: String,
339    pub compliant: i64,
340    pub warning: i64,
341    pub violation: i64,
342}
343
344/// A module file manifest entry — tracks which files a module deployed.
345#[derive(Debug, Clone)]
346pub struct ModuleFileRecord {
347    pub module_name: String,
348    pub file_path: String,
349    pub content_hash: String,
350    pub strategy: String,
351    pub last_applied: Option<i64>,
352}
353
354/// SQLite-backed state store for cfgd.
355pub struct StateStore {
356    conn: Connection,
357}
358
359impl StateStore {
360    /// Open or create a state store at the default location.
361    /// Uses `~/.local/share/cfgd/state.db`.
362    pub fn open_default() -> Result<Self> {
363        let data_dir = default_state_dir()?;
364        std::fs::create_dir_all(&data_dir).map_err(|_| StateError::DirectoryNotWritable {
365            path: data_dir.clone(),
366        })?;
367        let db_path = data_dir.join("state.db");
368        Self::open(&db_path)
369    }
370
371    /// Open or create a state store at the given path.
372    pub fn open(path: &Path) -> Result<Self> {
373        let conn = Connection::open(path)?;
374        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
375        conn.busy_timeout(std::time::Duration::from_secs(5))?;
376
377        let mut store = Self { conn };
378        store.run_migrations()?;
379        Ok(store)
380    }
381
382    /// Create an in-memory state store (for testing).
383    pub fn open_in_memory() -> Result<Self> {
384        let conn = Connection::open_in_memory()?;
385        conn.execute_batch("PRAGMA foreign_keys=ON;")?;
386
387        let mut store = Self { conn };
388        store.run_migrations()?;
389        Ok(store)
390    }
391
392    fn run_migrations(&mut self) -> Result<()> {
393        // Use EXCLUSIVE transaction to serialize concurrent migration attempts
394        // (e.g. parallel cargo test processes sharing the same state DB).
395        self.conn
396            .execute_batch("BEGIN EXCLUSIVE")
397            .map_err(|e| StateError::MigrationFailed {
398                message: format!("failed to acquire migration lock: {e}"),
399            })?;
400
401        let current_version = self.schema_version();
402
403        for (i, migration) in MIGRATIONS.iter().enumerate() {
404            if i >= current_version {
405                self.conn.execute_batch(migration).map_err(|e| {
406                    let _ = self.conn.execute_batch("ROLLBACK");
407                    StateError::MigrationFailed {
408                        message: format!("migration {}: {}", i, e),
409                    }
410                })?;
411                // Set version automatically — no hardcoded UPDATE in migration SQL
412                let new_version = (i + 1) as i64;
413                self.conn
414                    .execute(
415                        "UPDATE schema_version SET version = ?1",
416                        rusqlite::params![new_version],
417                    )
418                    .map_err(|e| {
419                        let _ = self.conn.execute_batch("ROLLBACK");
420                        StateError::MigrationFailed {
421                            message: format!("migration {}: failed to update version: {}", i, e),
422                        }
423                    })?;
424            }
425        }
426
427        self.conn
428            .execute_batch("COMMIT")
429            .map_err(|e| StateError::MigrationFailed {
430                message: format!("failed to commit migrations: {e}"),
431            })?;
432
433        Ok(())
434    }
435
436    fn schema_version(&self) -> usize {
437        self.conn
438            .query_row("SELECT version FROM schema_version", [], |row| {
439                row.get::<_, i64>(0)
440            })
441            .map(|v| v as usize)
442            .unwrap_or(0)
443    }
444
445    /// Record a completed apply operation.
446    pub fn record_apply(
447        &self,
448        profile: &str,
449        plan_hash: &str,
450        status: ApplyStatus,
451        summary: Option<&str>,
452    ) -> Result<i64> {
453        let timestamp = crate::utc_now_iso8601();
454        self.conn
455            .execute(
456                "INSERT INTO applies (timestamp, profile, plan_hash, status, summary) VALUES (?1, ?2, ?3, ?4, ?5)",
457                params![timestamp, profile, plan_hash, status.as_str(), summary],
458            )
459            ?;
460        Ok(self.conn.last_insert_rowid())
461    }
462
463    /// Record a drift event.
464    pub fn record_drift(
465        &self,
466        resource_type: &str,
467        resource_id: &str,
468        expected: Option<&str>,
469        actual: Option<&str>,
470        source: &str,
471    ) -> Result<i64> {
472        let timestamp = crate::utc_now_iso8601();
473        self.conn
474            .execute(
475                "INSERT INTO drift_events (timestamp, resource_type, resource_id, expected, actual, source) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
476                params![timestamp, resource_type, resource_id, expected, actual, source],
477            )
478            ?;
479        Ok(self.conn.last_insert_rowid())
480    }
481
482    /// Resolve drift events by linking them to an apply.
483    pub fn resolve_drift(
484        &self,
485        apply_id: i64,
486        resource_type: &str,
487        resource_id: &str,
488    ) -> Result<()> {
489        self.conn
490            .execute(
491                "UPDATE drift_events SET resolved_by = ?1 WHERE resource_type = ?2 AND resource_id = ?3 AND resolved_by IS NULL",
492                params![apply_id, resource_type, resource_id],
493            )
494            ?;
495        Ok(())
496    }
497
498    /// Upsert a managed resource record.
499    pub fn upsert_managed_resource(
500        &self,
501        resource_type: &str,
502        resource_id: &str,
503        source: &str,
504        hash: Option<&str>,
505        apply_id: Option<i64>,
506    ) -> Result<()> {
507        self.conn
508            .execute(
509                "INSERT INTO managed_resources (resource_type, resource_id, source, last_hash, last_applied)
510                 VALUES (?1, ?2, ?3, ?4, ?5)
511                 ON CONFLICT(resource_type, resource_id) DO UPDATE SET
512                    source = excluded.source,
513                    last_hash = excluded.last_hash,
514                    last_applied = excluded.last_applied",
515                params![resource_type, resource_id, source, hash, apply_id],
516            )
517            ?;
518        Ok(())
519    }
520
521    /// Check if a resource is tracked in managed_resources.
522    pub fn is_resource_managed(&self, resource_type: &str, resource_id: &str) -> Result<bool> {
523        let count: i64 = self.conn.query_row(
524            "SELECT COUNT(*) FROM managed_resources WHERE resource_type = ?1 AND resource_id = ?2",
525            params![resource_type, resource_id],
526            |row| row.get(0),
527        )?;
528        Ok(count > 0)
529    }
530
531    /// Get the most recent apply record.
532    pub fn last_apply(&self) -> Result<Option<ApplyRecord>> {
533        let result = self.conn.query_row(
534            "SELECT id, timestamp, profile, plan_hash, status, summary FROM applies ORDER BY id DESC LIMIT 1",
535            [],
536            |row| {
537                Ok(ApplyRecord {
538                    id: row.get(0)?,
539                    timestamp: row.get(1)?,
540                    profile: row.get(2)?,
541                    plan_hash: row.get(3)?,
542                    status: ApplyStatus::from_str(&row.get::<_, String>(4)?),
543                    summary: row.get(5)?,
544                })
545            },
546        );
547
548        match result {
549            Ok(record) => Ok(Some(record)),
550            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
551            Err(e) => Err(StateError::Database(e.to_string()).into()),
552        }
553    }
554
555    /// Get a specific apply record by ID.
556    pub fn get_apply(&self, apply_id: i64) -> Result<Option<ApplyRecord>> {
557        let result = self.conn.query_row(
558            "SELECT id, timestamp, profile, plan_hash, status, summary FROM applies WHERE id = ?1",
559            params![apply_id],
560            |row| {
561                Ok(ApplyRecord {
562                    id: row.get(0)?,
563                    timestamp: row.get(1)?,
564                    profile: row.get(2)?,
565                    plan_hash: row.get(3)?,
566                    status: ApplyStatus::from_str(&row.get::<_, String>(4)?),
567                    summary: row.get(5)?,
568                })
569            },
570        );
571
572        match result {
573            Ok(record) => Ok(Some(record)),
574            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
575            Err(e) => Err(StateError::Database(e.to_string()).into()),
576        }
577    }
578
579    /// Get apply history (most recent first), limited to `limit` entries.
580    pub fn history(&self, limit: u32) -> Result<Vec<ApplyRecord>> {
581        let mut stmt = self
582            .conn
583            .prepare(
584                "SELECT id, timestamp, profile, plan_hash, status, summary FROM applies ORDER BY id DESC LIMIT ?1",
585            )
586            ?;
587
588        let records = stmt
589            .query_map(params![limit], |row| {
590                Ok(ApplyRecord {
591                    id: row.get(0)?,
592                    timestamp: row.get(1)?,
593                    profile: row.get(2)?,
594                    plan_hash: row.get(3)?,
595                    status: ApplyStatus::from_str(&row.get::<_, String>(4)?),
596                    summary: row.get(5)?,
597                })
598            })?
599            .collect::<std::result::Result<Vec<_>, _>>()?;
600
601        Ok(records)
602    }
603
604    /// Get all managed resources.
605    pub fn managed_resources(&self) -> Result<Vec<ManagedResource>> {
606        let mut stmt = self
607            .conn
608            .prepare(
609                "SELECT resource_type, resource_id, source, last_hash, last_applied FROM managed_resources ORDER BY resource_type, resource_id",
610            )
611            ?;
612
613        let resources = stmt
614            .query_map([], |row| {
615                Ok(ManagedResource {
616                    resource_type: row.get(0)?,
617                    resource_id: row.get(1)?,
618                    source: row.get(2)?,
619                    last_hash: row.get(3)?,
620                    last_applied: row.get(4)?,
621                })
622            })?
623            .collect::<std::result::Result<Vec<_>, _>>()?;
624
625        Ok(resources)
626    }
627
628    /// Get unresolved drift events.
629    pub fn unresolved_drift(&self) -> Result<Vec<DriftEvent>> {
630        let mut stmt = self
631            .conn
632            .prepare(
633                "SELECT id, timestamp, resource_type, resource_id, expected, actual, resolved_by, source FROM drift_events WHERE resolved_by IS NULL ORDER BY timestamp DESC",
634            )
635            ?;
636
637        let events = stmt
638            .query_map([], |row| {
639                Ok(DriftEvent {
640                    id: row.get(0)?,
641                    timestamp: row.get(1)?,
642                    resource_type: row.get(2)?,
643                    resource_id: row.get(3)?,
644                    expected: row.get(4)?,
645                    actual: row.get(5)?,
646                    resolved_by: row.get(6)?,
647                    source: row.get(7)?,
648                })
649            })?
650            .collect::<std::result::Result<Vec<_>, _>>()?;
651
652        Ok(events)
653    }
654
655    // --- Config source state methods (Phase 9) ---
656
657    /// Upsert a config source record.
658    pub fn upsert_config_source(
659        &self,
660        name: &str,
661        origin_url: &str,
662        origin_branch: &str,
663        last_commit: Option<&str>,
664        source_version: Option<&str>,
665        pinned_version: Option<&str>,
666    ) -> Result<i64> {
667        let timestamp = crate::utc_now_iso8601();
668        self.conn
669            .execute(
670                "INSERT INTO config_sources (name, origin_url, origin_branch, last_fetched, last_commit, source_version, pinned_version)
671                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
672                 ON CONFLICT(name) DO UPDATE SET
673                    origin_url = excluded.origin_url,
674                    origin_branch = excluded.origin_branch,
675                    last_fetched = excluded.last_fetched,
676                    last_commit = excluded.last_commit,
677                    source_version = excluded.source_version,
678                    pinned_version = excluded.pinned_version",
679                params![name, origin_url, origin_branch, timestamp, last_commit, source_version, pinned_version],
680            )
681            ?;
682        Ok(self.conn.last_insert_rowid())
683    }
684
685    /// Get all config sources.
686    pub fn config_sources(&self) -> Result<Vec<ConfigSourceRecord>> {
687        let mut stmt = self
688            .conn
689            .prepare(
690                "SELECT id, name, origin_url, origin_branch, last_fetched, last_commit, source_version, pinned_version, status
691                 FROM config_sources ORDER BY name",
692            )
693            ?;
694
695        let sources = stmt
696            .query_map([], |row| {
697                Ok(ConfigSourceRecord {
698                    id: row.get(0)?,
699                    name: row.get(1)?,
700                    origin_url: row.get(2)?,
701                    origin_branch: row.get(3)?,
702                    last_fetched: row.get(4)?,
703                    last_commit: row.get(5)?,
704                    source_version: row.get(6)?,
705                    pinned_version: row.get(7)?,
706                    status: row.get(8)?,
707                })
708            })?
709            .collect::<std::result::Result<Vec<_>, _>>()?;
710
711        Ok(sources)
712    }
713
714    /// Get a config source by name.
715    pub fn config_source_by_name(&self, name: &str) -> Result<Option<ConfigSourceRecord>> {
716        let result = self.conn.query_row(
717            "SELECT id, name, origin_url, origin_branch, last_fetched, last_commit, source_version, pinned_version, status
718             FROM config_sources WHERE name = ?1",
719            params![name],
720            |row| {
721                Ok(ConfigSourceRecord {
722                    id: row.get(0)?,
723                    name: row.get(1)?,
724                    origin_url: row.get(2)?,
725                    origin_branch: row.get(3)?,
726                    last_fetched: row.get(4)?,
727                    last_commit: row.get(5)?,
728                    source_version: row.get(6)?,
729                    pinned_version: row.get(7)?,
730                    status: row.get(8)?,
731                })
732            },
733        );
734
735        match result {
736            Ok(record) => Ok(Some(record)),
737            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
738            Err(e) => Err(StateError::Database(e.to_string()).into()),
739        }
740    }
741
742    /// Remove a config source from state.
743    pub fn remove_config_source(&self, name: &str) -> Result<()> {
744        self.conn
745            .execute("DELETE FROM config_sources WHERE name = ?1", params![name])?;
746        Ok(())
747    }
748
749    /// Update the status of a config source.
750    pub fn update_config_source_status(&self, name: &str, status: &str) -> Result<()> {
751        self.conn.execute(
752            "UPDATE config_sources SET status = ?1 WHERE name = ?2",
753            params![status, name],
754        )?;
755        Ok(())
756    }
757
758    /// Record a source apply (links a source's commit to an apply).
759    pub fn record_source_apply(
760        &self,
761        source_name: &str,
762        apply_id: i64,
763        source_commit: &str,
764    ) -> Result<()> {
765        let source = self.config_source_by_name(source_name)?;
766        if let Some(src) = source {
767            self.conn.execute(
768                "INSERT INTO source_applies (source_id, apply_id, source_commit)
769                     VALUES (?1, ?2, ?3)",
770                params![src.id, apply_id, source_commit],
771            )?;
772        }
773        Ok(())
774    }
775
776    /// Record a composition conflict.
777    pub fn record_source_conflict(
778        &self,
779        source_name: &str,
780        resource_type: &str,
781        resource_id: &str,
782        resolution: &str,
783        detail: Option<&str>,
784    ) -> Result<()> {
785        let timestamp = crate::utc_now_iso8601();
786        self.conn
787            .execute(
788                "INSERT INTO source_conflicts (timestamp, source_name, resource_type, resource_id, resolution, detail)
789                 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
790                params![timestamp, source_name, resource_type, resource_id, resolution, detail],
791            )
792            ?;
793        Ok(())
794    }
795
796    // --- Pending decisions ---
797
798    /// Upsert a pending decision. If an unresolved decision already exists for this
799    /// (source, resource) pair, updates the summary and resets the timestamp.
800    pub fn upsert_pending_decision(
801        &self,
802        source: &str,
803        resource: &str,
804        tier: &str,
805        action: &str,
806        summary: &str,
807    ) -> Result<i64> {
808        let timestamp = crate::utc_now_iso8601();
809        // Try to update an existing unresolved row first
810        let updated = self.conn.execute(
811            "UPDATE pending_decisions SET tier = ?1, action = ?2, summary = ?3, created_at = ?4
812                 WHERE source = ?5 AND resource = ?6 AND resolved_at IS NULL",
813            params![tier, action, summary, timestamp, source, resource],
814        )?;
815
816        if updated > 0 {
817            let id = self
818                .conn
819                .query_row(
820                    "SELECT id FROM pending_decisions WHERE source = ?1 AND resource = ?2 AND resolved_at IS NULL",
821                    params![source, resource],
822                    |row| row.get(0),
823                )
824                ?;
825            return Ok(id);
826        }
827
828        self.conn.execute(
829            "INSERT INTO pending_decisions (source, resource, tier, action, summary, created_at)
830                 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
831            params![source, resource, tier, action, summary, timestamp],
832        )?;
833        Ok(self.conn.last_insert_rowid())
834    }
835
836    /// Get all unresolved pending decisions.
837    pub fn pending_decisions(&self) -> Result<Vec<PendingDecision>> {
838        let mut stmt = self
839            .conn
840            .prepare(
841                "SELECT id, source, resource, tier, action, summary, created_at, resolved_at, resolution
842                 FROM pending_decisions WHERE resolved_at IS NULL ORDER BY created_at DESC",
843            )
844            ?;
845
846        let rows = stmt
847            .query_map([], |row| {
848                Ok(PendingDecision {
849                    id: row.get(0)?,
850                    source: row.get(1)?,
851                    resource: row.get(2)?,
852                    tier: row.get(3)?,
853                    action: row.get(4)?,
854                    summary: row.get(5)?,
855                    created_at: row.get(6)?,
856                    resolved_at: row.get(7)?,
857                    resolution: row.get(8)?,
858                })
859            })?
860            .collect::<std::result::Result<Vec<_>, _>>()?;
861
862        Ok(rows)
863    }
864
865    /// Get pending decisions for a specific source.
866    pub fn pending_decisions_for_source(&self, source: &str) -> Result<Vec<PendingDecision>> {
867        let mut stmt = self
868            .conn
869            .prepare(
870                "SELECT id, source, resource, tier, action, summary, created_at, resolved_at, resolution
871                 FROM pending_decisions WHERE source = ?1 AND resolved_at IS NULL ORDER BY created_at DESC",
872            )
873            ?;
874
875        let rows = stmt
876            .query_map(params![source], |row| {
877                Ok(PendingDecision {
878                    id: row.get(0)?,
879                    source: row.get(1)?,
880                    resource: row.get(2)?,
881                    tier: row.get(3)?,
882                    action: row.get(4)?,
883                    summary: row.get(5)?,
884                    created_at: row.get(6)?,
885                    resolved_at: row.get(7)?,
886                    resolution: row.get(8)?,
887                })
888            })?
889            .collect::<std::result::Result<Vec<_>, _>>()?;
890
891        Ok(rows)
892    }
893
894    /// Resolve a pending decision by resource path.
895    pub fn resolve_decision(&self, resource: &str, resolution: &str) -> Result<bool> {
896        let timestamp = crate::utc_now_iso8601();
897        let updated = self.conn.execute(
898            "UPDATE pending_decisions SET resolved_at = ?1, resolution = ?2
899                 WHERE resource = ?3 AND resolved_at IS NULL",
900            params![timestamp, resolution, resource],
901        )?;
902        Ok(updated > 0)
903    }
904
905    /// Resolve all pending decisions for a source.
906    pub fn resolve_decisions_for_source(&self, source: &str, resolution: &str) -> Result<usize> {
907        let timestamp = crate::utc_now_iso8601();
908        let updated = self.conn.execute(
909            "UPDATE pending_decisions SET resolved_at = ?1, resolution = ?2
910                 WHERE source = ?3 AND resolved_at IS NULL",
911            params![timestamp, resolution, source],
912        )?;
913        Ok(updated)
914    }
915
916    /// Resolve all pending decisions.
917    pub fn resolve_all_decisions(&self, resolution: &str) -> Result<usize> {
918        let timestamp = crate::utc_now_iso8601();
919        let updated = self.conn.execute(
920            "UPDATE pending_decisions SET resolved_at = ?1, resolution = ?2
921                 WHERE resolved_at IS NULL",
922            params![timestamp, resolution],
923        )?;
924        Ok(updated)
925    }
926
927    // --- Source config hashes ---
928
929    /// Get the stored config hash for a source.
930    pub fn source_config_hash(&self, source: &str) -> Result<Option<SourceConfigHash>> {
931        let result = self.conn.query_row(
932            "SELECT source, config_hash, merged_at FROM source_config_hashes WHERE source = ?1",
933            params![source],
934            |row| {
935                Ok(SourceConfigHash {
936                    source: row.get(0)?,
937                    config_hash: row.get(1)?,
938                    merged_at: row.get(2)?,
939                })
940            },
941        );
942
943        match result {
944            Ok(record) => Ok(Some(record)),
945            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
946            Err(e) => Err(StateError::Database(e.to_string()).into()),
947        }
948    }
949
950    /// Upsert a source config hash.
951    pub fn set_source_config_hash(&self, source: &str, config_hash: &str) -> Result<()> {
952        let timestamp = crate::utc_now_iso8601();
953        self.conn.execute(
954            "INSERT INTO source_config_hashes (source, config_hash, merged_at)
955                 VALUES (?1, ?2, ?3)
956                 ON CONFLICT(source) DO UPDATE SET
957                    config_hash = excluded.config_hash,
958                    merged_at = excluded.merged_at",
959            params![source, config_hash, timestamp],
960        )?;
961        Ok(())
962    }
963
964    /// Remove the config hash for a source.
965    pub fn remove_source_config_hash(&self, source: &str) -> Result<()> {
966        self.conn.execute(
967            "DELETE FROM source_config_hashes WHERE source = ?1",
968            params![source],
969        )?;
970        Ok(())
971    }
972
973    /// Get managed resources from a specific source.
974    pub fn managed_resources_by_source(&self, source_name: &str) -> Result<Vec<ManagedResource>> {
975        let mut stmt = self.conn.prepare(
976            "SELECT resource_type, resource_id, source, last_hash, last_applied
977                 FROM managed_resources WHERE source = ?1 ORDER BY resource_type, resource_id",
978        )?;
979
980        let resources = stmt
981            .query_map(params![source_name], |row| {
982                Ok(ManagedResource {
983                    resource_type: row.get(0)?,
984                    resource_id: row.get(1)?,
985                    source: row.get(2)?,
986                    last_hash: row.get(3)?,
987                    last_applied: row.get(4)?,
988                })
989            })?
990            .collect::<std::result::Result<Vec<_>, _>>()?;
991
992        Ok(resources)
993    }
994
995    // --- Module state ---
996
997    /// Insert or update module state.
998    pub fn upsert_module_state(
999        &self,
1000        module_name: &str,
1001        last_applied: Option<i64>,
1002        packages_hash: &str,
1003        files_hash: &str,
1004        git_sources: Option<&str>,
1005        status: &str,
1006    ) -> Result<()> {
1007        let now = crate::utc_now_iso8601();
1008        self.conn
1009            .execute(
1010                "INSERT INTO module_state (module_name, installed_at, last_applied, packages_hash, files_hash, git_sources, status)
1011                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1012                 ON CONFLICT(module_name) DO UPDATE SET
1013                    last_applied = ?3,
1014                    packages_hash = ?4,
1015                    files_hash = ?5,
1016                    git_sources = ?6,
1017                    status = ?7",
1018                params![module_name, now, last_applied, packages_hash, files_hash, git_sources, status],
1019            )
1020            ?;
1021        Ok(())
1022    }
1023
1024    /// Get all module states.
1025    pub fn module_states(&self) -> Result<Vec<ModuleStateRecord>> {
1026        let mut stmt = self
1027            .conn
1028            .prepare(
1029                "SELECT module_name, installed_at, last_applied, packages_hash, files_hash, git_sources, status
1030                 FROM module_state ORDER BY module_name",
1031            )
1032            ?;
1033
1034        let records = stmt
1035            .query_map([], |row| {
1036                Ok(ModuleStateRecord {
1037                    module_name: row.get(0)?,
1038                    installed_at: row.get(1)?,
1039                    last_applied: row.get(2)?,
1040                    packages_hash: row.get(3)?,
1041                    files_hash: row.get(4)?,
1042                    git_sources: row.get(5)?,
1043                    status: row.get(6)?,
1044                })
1045            })?
1046            .collect::<std::result::Result<Vec<_>, _>>()?;
1047
1048        Ok(records)
1049    }
1050
1051    /// Get module state by name.
1052    pub fn module_state_by_name(&self, module_name: &str) -> Result<Option<ModuleStateRecord>> {
1053        let mut stmt = self
1054            .conn
1055            .prepare(
1056                "SELECT module_name, installed_at, last_applied, packages_hash, files_hash, git_sources, status
1057                 FROM module_state WHERE module_name = ?1",
1058            )
1059            ?;
1060
1061        let mut rows = stmt.query_map(params![module_name], |row| {
1062            Ok(ModuleStateRecord {
1063                module_name: row.get(0)?,
1064                installed_at: row.get(1)?,
1065                last_applied: row.get(2)?,
1066                packages_hash: row.get(3)?,
1067                files_hash: row.get(4)?,
1068                git_sources: row.get(5)?,
1069                status: row.get(6)?,
1070            })
1071        })?;
1072
1073        match rows.next() {
1074            Some(Ok(record)) => Ok(Some(record)),
1075            Some(Err(e)) => Err(StateError::Database(e.to_string()).into()),
1076            None => Ok(None),
1077        }
1078    }
1079
1080    /// Remove module state by name.
1081    pub fn remove_module_state(&self, module_name: &str) -> Result<()> {
1082        self.conn.execute(
1083            "DELETE FROM module_state WHERE module_name = ?1",
1084            params![module_name],
1085        )?;
1086        Ok(())
1087    }
1088
1089    // --- File backup methods ---
1090
1091    /// Store a file backup before overwriting.
1092    pub fn store_file_backup(
1093        &self,
1094        apply_id: i64,
1095        file_path: &str,
1096        state: &crate::FileState,
1097    ) -> Result<()> {
1098        let timestamp = crate::utc_now_iso8601();
1099        self.conn.execute(
1100            "INSERT INTO file_backups (apply_id, file_path, content_hash, content, permissions, was_symlink, symlink_target, oversized, backed_up_at)
1101             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1102            params![
1103                apply_id,
1104                file_path,
1105                state.content_hash,
1106                state.content,
1107                state.permissions.map(|p| p as i64),
1108                state.is_symlink as i64,
1109                state.symlink_target.as_ref().map(|p| p.display().to_string()),
1110                state.oversized as i64,
1111                timestamp,
1112            ],
1113        )?;
1114        Ok(())
1115    }
1116
1117    /// Get a file backup by apply_id and path.
1118    pub fn get_file_backup(
1119        &self,
1120        apply_id: i64,
1121        file_path: &str,
1122    ) -> Result<Option<FileBackupRecord>> {
1123        let result = self.conn.query_row(
1124            "SELECT id, apply_id, file_path, content_hash, content, permissions, was_symlink, symlink_target, oversized, backed_up_at
1125             FROM file_backups WHERE apply_id = ?1 AND file_path = ?2",
1126            params![apply_id, file_path],
1127            |row| {
1128                Ok(FileBackupRecord {
1129                    id: row.get(0)?,
1130                    apply_id: row.get(1)?,
1131                    file_path: row.get(2)?,
1132                    content_hash: row.get(3)?,
1133                    content: row.get(4)?,
1134                    permissions: row.get::<_, Option<i64>>(5)?.map(|p| p as u32),
1135                    was_symlink: row.get::<_, i64>(6)? != 0,
1136                    symlink_target: row.get(7)?,
1137                    oversized: row.get::<_, i64>(8)? != 0,
1138                    backed_up_at: row.get(9)?,
1139                })
1140            },
1141        );
1142
1143        match result {
1144            Ok(record) => Ok(Some(record)),
1145            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1146            Err(e) => Err(StateError::Database(e.to_string()).into()),
1147        }
1148    }
1149
1150    /// Get all file backups for a specific apply (for full rollback).
1151    pub fn get_apply_backups(&self, apply_id: i64) -> Result<Vec<FileBackupRecord>> {
1152        let mut stmt = self.conn.prepare(
1153            "SELECT id, apply_id, file_path, content_hash, content, permissions, was_symlink, symlink_target, oversized, backed_up_at
1154             FROM file_backups WHERE apply_id = ?1 ORDER BY id",
1155        )?;
1156
1157        let records = stmt
1158            .query_map(params![apply_id], |row| {
1159                Ok(FileBackupRecord {
1160                    id: row.get(0)?,
1161                    apply_id: row.get(1)?,
1162                    file_path: row.get(2)?,
1163                    content_hash: row.get(3)?,
1164                    content: row.get(4)?,
1165                    permissions: row.get::<_, Option<i64>>(5)?.map(|p| p as u32),
1166                    was_symlink: row.get::<_, i64>(6)? != 0,
1167                    symlink_target: row.get(7)?,
1168                    oversized: row.get::<_, i64>(8)? != 0,
1169                    backed_up_at: row.get(9)?,
1170                })
1171            })?
1172            .collect::<std::result::Result<Vec<_>, _>>()?;
1173
1174        Ok(records)
1175    }
1176
1177    /// Get the most recent backup for a file path (for restore after removal).
1178    pub fn latest_backup_for_path(&self, file_path: &str) -> Result<Option<FileBackupRecord>> {
1179        let result = self.conn.query_row(
1180            "SELECT id, apply_id, file_path, content_hash, content, permissions, was_symlink, symlink_target, oversized, backed_up_at
1181             FROM file_backups WHERE file_path = ?1 ORDER BY id DESC LIMIT 1",
1182            params![file_path],
1183            |row| {
1184                Ok(FileBackupRecord {
1185                    id: row.get(0)?,
1186                    apply_id: row.get(1)?,
1187                    file_path: row.get(2)?,
1188                    content_hash: row.get(3)?,
1189                    content: row.get(4)?,
1190                    permissions: row.get::<_, Option<i64>>(5)?.map(|p| p as u32),
1191                    was_symlink: row.get::<_, i64>(6)? != 0,
1192                    symlink_target: row.get(7)?,
1193                    oversized: row.get::<_, i64>(8)? != 0,
1194                    backed_up_at: row.get(9)?,
1195                })
1196            },
1197        );
1198
1199        match result {
1200            Ok(record) => Ok(Some(record)),
1201            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1202            Err(e) => Err(StateError::Database(e.to_string()).into()),
1203        }
1204    }
1205
1206    /// Get the earliest file backup for each unique file path from applies after the given ID.
1207    /// This captures the state that existed right after the target apply completed, for each
1208    /// file that was subsequently modified. Used by rollback to restore to a prior apply's state.
1209    pub fn file_backups_after_apply(&self, after_apply_id: i64) -> Result<Vec<FileBackupRecord>> {
1210        // For each distinct file_path in backups from applies after `after_apply_id`,
1211        // pick the backup with the smallest apply_id (earliest apply after target).
1212        let mut stmt = self.conn.prepare(
1213            "SELECT b.id, b.apply_id, b.file_path, b.content_hash, b.content, b.permissions,
1214                    b.was_symlink, b.symlink_target, b.oversized, b.backed_up_at
1215             FROM file_backups b
1216             INNER JOIN (
1217                 SELECT file_path, MIN(apply_id) AS min_apply_id
1218                 FROM file_backups
1219                 WHERE apply_id > ?1
1220                 GROUP BY file_path
1221             ) earliest ON b.file_path = earliest.file_path AND b.apply_id = earliest.min_apply_id
1222             ORDER BY b.id",
1223        )?;
1224
1225        let records = stmt
1226            .query_map(params![after_apply_id], |row| {
1227                Ok(FileBackupRecord {
1228                    id: row.get(0)?,
1229                    apply_id: row.get(1)?,
1230                    file_path: row.get(2)?,
1231                    content_hash: row.get(3)?,
1232                    content: row.get(4)?,
1233                    permissions: row.get::<_, Option<i64>>(5)?.map(|p| p as u32),
1234                    was_symlink: row.get::<_, i64>(6)? != 0,
1235                    symlink_target: row.get(7)?,
1236                    oversized: row.get::<_, i64>(8)? != 0,
1237                    backed_up_at: row.get(9)?,
1238                })
1239            })?
1240            .collect::<std::result::Result<Vec<_>, _>>()?;
1241
1242        Ok(records)
1243    }
1244
1245    /// Get all journal entries from applies after the given ID, for rollback tracking.
1246    pub fn journal_entries_after_apply(&self, after_apply_id: i64) -> Result<Vec<JournalEntry>> {
1247        let mut stmt = self.conn.prepare(
1248            "SELECT id, apply_id, action_index, phase, action_type, resource_id, pre_state, post_state, status, error, started_at, completed_at, script_output
1249             FROM apply_journal WHERE apply_id > ?1 AND status = 'completed' ORDER BY apply_id DESC, action_index DESC",
1250        )?;
1251
1252        let records = stmt
1253            .query_map(params![after_apply_id], |row| {
1254                Ok(JournalEntry {
1255                    id: row.get(0)?,
1256                    apply_id: row.get(1)?,
1257                    action_index: row.get(2)?,
1258                    phase: row.get(3)?,
1259                    action_type: row.get(4)?,
1260                    resource_id: row.get(5)?,
1261                    pre_state: row.get(6)?,
1262                    post_state: row.get(7)?,
1263                    status: row.get(8)?,
1264                    error: row.get(9)?,
1265                    started_at: row.get(10)?,
1266                    completed_at: row.get(11)?,
1267                    script_output: row.get(12)?,
1268                })
1269            })?
1270            .collect::<std::result::Result<Vec<_>, _>>()?;
1271
1272        Ok(records)
1273    }
1274
1275    /// Prune old backups, keeping only the last N applies' worth.
1276    pub fn prune_old_backups(&self, keep_last_n: usize) -> Result<usize> {
1277        let deleted: usize = self.conn.execute(
1278            "DELETE FROM file_backups WHERE apply_id NOT IN (
1279                SELECT id FROM applies ORDER BY id DESC LIMIT ?1
1280            )",
1281            params![keep_last_n as i64],
1282        )?;
1283        Ok(deleted)
1284    }
1285
1286    // --- Apply journal methods ---
1287
1288    /// Record the start of a journal action.
1289    pub fn journal_begin(
1290        &self,
1291        apply_id: i64,
1292        action_index: usize,
1293        phase: &str,
1294        action_type: &str,
1295        resource_id: &str,
1296        pre_state: Option<&str>,
1297    ) -> Result<i64> {
1298        let timestamp = crate::utc_now_iso8601();
1299        self.conn.execute(
1300            "INSERT INTO apply_journal (apply_id, action_index, phase, action_type, resource_id, pre_state, status, started_at)
1301             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'pending', ?7)",
1302            params![apply_id, action_index as i64, phase, action_type, resource_id, pre_state, timestamp],
1303        )?;
1304        Ok(self.conn.last_insert_rowid())
1305    }
1306
1307    /// Mark a journal action as completed, optionally storing script output.
1308    pub fn journal_complete(
1309        &self,
1310        journal_id: i64,
1311        post_state: Option<&str>,
1312        script_output: Option<&str>,
1313    ) -> Result<()> {
1314        let timestamp = crate::utc_now_iso8601();
1315        self.conn.execute(
1316            "UPDATE apply_journal SET status = 'completed', post_state = ?1, completed_at = ?2, script_output = ?3 WHERE id = ?4",
1317            params![post_state, timestamp, script_output, journal_id],
1318        )?;
1319        Ok(())
1320    }
1321
1322    /// Mark a journal action as failed.
1323    pub fn journal_fail(&self, journal_id: i64, error: &str) -> Result<()> {
1324        let timestamp = crate::utc_now_iso8601();
1325        self.conn.execute(
1326            "UPDATE apply_journal SET status = 'failed', error = ?1, completed_at = ?2 WHERE id = ?3",
1327            params![error, timestamp, journal_id],
1328        )?;
1329        Ok(())
1330    }
1331
1332    /// Get completed actions for an apply (for rollback).
1333    pub fn journal_completed_actions(&self, apply_id: i64) -> Result<Vec<JournalEntry>> {
1334        self.query_journal(apply_id, Some("completed"))
1335    }
1336
1337    /// Get all journal entries for an apply (all statuses).
1338    pub fn journal_entries(&self, apply_id: i64) -> Result<Vec<JournalEntry>> {
1339        self.query_journal(apply_id, None)
1340    }
1341
1342    fn query_journal(
1343        &self,
1344        apply_id: i64,
1345        status_filter: Option<&str>,
1346    ) -> Result<Vec<JournalEntry>> {
1347        let base_sql = if status_filter.is_some() {
1348            "SELECT id, apply_id, action_index, phase, action_type, resource_id, pre_state, post_state, status, error, started_at, completed_at, script_output
1349             FROM apply_journal WHERE apply_id = ?1 AND status = ?2 ORDER BY action_index"
1350        } else {
1351            "SELECT id, apply_id, action_index, phase, action_type, resource_id, pre_state, post_state, status, error, started_at, completed_at, script_output
1352             FROM apply_journal WHERE apply_id = ?1 ORDER BY action_index"
1353        };
1354
1355        let mut stmt = self.conn.prepare(base_sql)?;
1356
1357        let map_row = |row: &rusqlite::Row| -> rusqlite::Result<JournalEntry> {
1358            Ok(JournalEntry {
1359                id: row.get(0)?,
1360                apply_id: row.get(1)?,
1361                action_index: row.get(2)?,
1362                phase: row.get(3)?,
1363                action_type: row.get(4)?,
1364                resource_id: row.get(5)?,
1365                pre_state: row.get(6)?,
1366                post_state: row.get(7)?,
1367                status: row.get(8)?,
1368                error: row.get(9)?,
1369                started_at: row.get(10)?,
1370                completed_at: row.get(11)?,
1371                script_output: row.get(12)?,
1372            })
1373        };
1374
1375        let entries: Vec<JournalEntry> = if let Some(status) = status_filter {
1376            stmt.query_map(params![apply_id, status], map_row)?
1377                .collect::<std::result::Result<Vec<_>, _>>()?
1378        } else {
1379            stmt.query_map(params![apply_id], map_row)?
1380                .collect::<std::result::Result<Vec<_>, _>>()?
1381        };
1382
1383        Ok(entries)
1384    }
1385
1386    // --- Module file manifest methods ---
1387
1388    /// Record a file deployed by a module.
1389    pub fn upsert_module_file(
1390        &self,
1391        module_name: &str,
1392        file_path: &str,
1393        content_hash: &str,
1394        strategy: &str,
1395        apply_id: i64,
1396    ) -> Result<()> {
1397        self.conn.execute(
1398            "INSERT INTO module_file_manifest (module_name, file_path, content_hash, strategy, last_applied)
1399             VALUES (?1, ?2, ?3, ?4, ?5)
1400             ON CONFLICT(module_name, file_path) DO UPDATE SET
1401                content_hash = excluded.content_hash,
1402                strategy = excluded.strategy,
1403                last_applied = excluded.last_applied",
1404            params![module_name, file_path, content_hash, strategy, apply_id],
1405        )?;
1406        Ok(())
1407    }
1408
1409    /// Get all files deployed by a module.
1410    pub fn module_deployed_files(&self, module_name: &str) -> Result<Vec<ModuleFileRecord>> {
1411        let mut stmt = self.conn.prepare(
1412            "SELECT module_name, file_path, content_hash, strategy, last_applied
1413             FROM module_file_manifest WHERE module_name = ?1 ORDER BY file_path",
1414        )?;
1415
1416        let records = stmt
1417            .query_map(params![module_name], |row| {
1418                Ok(ModuleFileRecord {
1419                    module_name: row.get(0)?,
1420                    file_path: row.get(1)?,
1421                    content_hash: row.get(2)?,
1422                    strategy: row.get(3)?,
1423                    last_applied: row.get(4)?,
1424                })
1425            })?
1426            .collect::<std::result::Result<Vec<_>, _>>()?;
1427
1428        Ok(records)
1429    }
1430
1431    /// Delete all manifest entries for a module.
1432    pub fn delete_module_files(&self, module_name: &str) -> Result<()> {
1433        self.conn.execute(
1434            "DELETE FROM module_file_manifest WHERE module_name = ?1",
1435            params![module_name],
1436        )?;
1437        Ok(())
1438    }
1439
1440    /// Update apply status (for changing "in-progress" to final status).
1441    pub fn update_apply_status(
1442        &self,
1443        apply_id: i64,
1444        status: ApplyStatus,
1445        summary: Option<&str>,
1446    ) -> Result<()> {
1447        self.conn.execute(
1448            "UPDATE applies SET status = ?1, summary = ?2 WHERE id = ?3",
1449            params![status.as_str(), summary, apply_id],
1450        )?;
1451        Ok(())
1452    }
1453
1454    // --- Compliance snapshot methods ---
1455
1456    /// Store a compliance snapshot. The caller provides the content hash
1457    /// (typically `sha256_hex` of the serialized JSON).
1458    pub fn store_compliance_snapshot(
1459        &self,
1460        snapshot: &crate::compliance::ComplianceSnapshot,
1461        hash: &str,
1462    ) -> Result<()> {
1463        let json = serde_json::to_string(snapshot)
1464            .map_err(|e| StateError::Database(format!("failed to serialize snapshot: {}", e)))?;
1465        self.conn.execute(
1466            "INSERT INTO compliance_snapshots (timestamp, content_hash, snapshot_json, summary_compliant, summary_warning, summary_violation)
1467             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1468            params![
1469                snapshot.timestamp,
1470                hash,
1471                json,
1472                snapshot.summary.compliant as i64,
1473                snapshot.summary.warning as i64,
1474                snapshot.summary.violation as i64,
1475            ],
1476        )?;
1477        Ok(())
1478    }
1479
1480    /// Get the content hash of the most recently stored compliance snapshot.
1481    pub fn latest_compliance_hash(&self) -> Result<Option<String>> {
1482        let result = self.conn.query_row(
1483            "SELECT content_hash FROM compliance_snapshots ORDER BY id DESC LIMIT 1",
1484            [],
1485            |row| row.get(0),
1486        );
1487
1488        match result {
1489            Ok(hash) => Ok(Some(hash)),
1490            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1491            Err(e) => Err(StateError::Database(e.to_string()).into()),
1492        }
1493    }
1494
1495    /// Get compliance snapshot history as summary rows.
1496    /// If `since` is provided, only return snapshots after that ISO 8601 timestamp.
1497    pub fn compliance_history(
1498        &self,
1499        since: Option<&str>,
1500        limit: u32,
1501    ) -> Result<Vec<ComplianceHistoryRow>> {
1502        if let Some(since_ts) = since {
1503            let mut stmt = self.conn.prepare(
1504                "SELECT id, timestamp, summary_compliant, summary_warning, summary_violation
1505                 FROM compliance_snapshots WHERE timestamp > ?1 ORDER BY id DESC LIMIT ?2",
1506            )?;
1507
1508            let rows = stmt
1509                .query_map(params![since_ts, limit], |row| {
1510                    Ok(ComplianceHistoryRow {
1511                        id: row.get(0)?,
1512                        timestamp: row.get(1)?,
1513                        compliant: row.get(2)?,
1514                        warning: row.get(3)?,
1515                        violation: row.get(4)?,
1516                    })
1517                })?
1518                .collect::<std::result::Result<Vec<_>, _>>()?;
1519            Ok(rows)
1520        } else {
1521            let mut stmt = self.conn.prepare(
1522                "SELECT id, timestamp, summary_compliant, summary_warning, summary_violation
1523                 FROM compliance_snapshots ORDER BY id DESC LIMIT ?1",
1524            )?;
1525
1526            let rows = stmt
1527                .query_map(params![limit], |row| {
1528                    Ok(ComplianceHistoryRow {
1529                        id: row.get(0)?,
1530                        timestamp: row.get(1)?,
1531                        compliant: row.get(2)?,
1532                        warning: row.get(3)?,
1533                        violation: row.get(4)?,
1534                    })
1535                })?
1536                .collect::<std::result::Result<Vec<_>, _>>()?;
1537            Ok(rows)
1538        }
1539    }
1540
1541    /// Retrieve a full compliance snapshot by ID.
1542    pub fn get_compliance_snapshot(
1543        &self,
1544        id: i64,
1545    ) -> Result<Option<crate::compliance::ComplianceSnapshot>> {
1546        let result = self.conn.query_row(
1547            "SELECT snapshot_json FROM compliance_snapshots WHERE id = ?1",
1548            params![id],
1549            |row| row.get::<_, String>(0),
1550        );
1551
1552        match result {
1553            Ok(json) => {
1554                let snapshot: crate::compliance::ComplianceSnapshot = serde_json::from_str(&json)
1555                    .map_err(|e| {
1556                    StateError::Database(format!("failed to deserialize snapshot: {}", e))
1557                })?;
1558                Ok(Some(snapshot))
1559            }
1560            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1561            Err(e) => Err(StateError::Database(e.to_string()).into()),
1562        }
1563    }
1564
1565    /// Remove compliance snapshots older than the given ISO 8601 timestamp.
1566    /// Returns the number of rows deleted.
1567    pub fn prune_compliance_snapshots(&self, before_timestamp: &str) -> Result<usize> {
1568        let deleted = self.conn.execute(
1569            "DELETE FROM compliance_snapshots WHERE timestamp < ?1",
1570            params![before_timestamp],
1571        )?;
1572        Ok(deleted)
1573    }
1574}
1575
1576/// Compute SHA256 hash of a serializable plan for deduplication.
1577pub fn plan_hash(data: &str) -> String {
1578    crate::sha256_hex(data.as_bytes())
1579}
1580
1581pub fn default_state_dir() -> Result<PathBuf> {
1582    if let Ok(dir) = std::env::var("CFGD_STATE_DIR") {
1583        return Ok(PathBuf::from(dir));
1584    }
1585    let base = directories::BaseDirs::new().ok_or_else(|| StateError::DirectoryNotWritable {
1586        path: PathBuf::from("~/.local/share/cfgd"),
1587    })?;
1588    Ok(base.data_local_dir().join("cfgd"))
1589}
1590
1591const PENDING_CONFIG_FILENAME: &str = "pending-server-config.json";
1592
1593/// Save a desired config received from the device gateway for later reconciliation.
1594pub fn save_pending_server_config(config: &serde_json::Value) -> Result<PathBuf> {
1595    let dir = default_state_dir()?;
1596    std::fs::create_dir_all(&dir)
1597        .map_err(|_| StateError::DirectoryNotWritable { path: dir.clone() })?;
1598    let path = dir.join(PENDING_CONFIG_FILENAME);
1599    let json = serde_json::to_string_pretty(config)
1600        .map_err(|e| StateError::Database(format!("failed to serialize pending config: {}", e)))?;
1601    crate::atomic_write_str(&path, &json)
1602        .map_err(|_| StateError::DirectoryNotWritable { path: path.clone() })?;
1603    Ok(path)
1604}
1605
1606/// Load a pending server config, if one exists.
1607pub fn load_pending_server_config() -> Result<Option<serde_json::Value>> {
1608    let dir = default_state_dir()?;
1609    let path = dir.join(PENDING_CONFIG_FILENAME);
1610    if !path.exists() {
1611        return Ok(None);
1612    }
1613    let contents = std::fs::read_to_string(&path)
1614        .map_err(|_| StateError::DirectoryNotWritable { path: path.clone() })?;
1615    let value: serde_json::Value = serde_json::from_str(&contents)
1616        .map_err(|e| StateError::Database(format!("failed to parse pending config: {}", e)))?;
1617    Ok(Some(value))
1618}
1619
1620/// Remove the pending server config file after it has been consumed.
1621pub fn clear_pending_server_config() -> Result<()> {
1622    let dir = default_state_dir()?;
1623    let path = dir.join(PENDING_CONFIG_FILENAME);
1624    match std::fs::remove_file(&path) {
1625        Ok(()) => Ok(()),
1626        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1627        Err(_) => Err(StateError::DirectoryNotWritable { path }.into()),
1628    }
1629}
1630
1631#[cfg(test)]
1632mod tests {
1633    use super::*;
1634
1635    #[test]
1636    fn open_in_memory() {
1637        let store = StateStore::open_in_memory().unwrap();
1638        assert!(store.last_apply().unwrap().is_none());
1639    }
1640
1641    #[test]
1642    fn record_and_retrieve_apply() {
1643        let store = StateStore::open_in_memory().unwrap();
1644        let id = store
1645            .record_apply(
1646                "default",
1647                "abc123",
1648                ApplyStatus::Success,
1649                Some("{\"files\": 3}"),
1650            )
1651            .unwrap();
1652        assert!(id > 0);
1653
1654        let last = store.last_apply().unwrap().unwrap();
1655        assert_eq!(last.id, id);
1656        assert_eq!(last.profile, "default");
1657        assert_eq!(last.plan_hash, "abc123");
1658        assert_eq!(last.status, ApplyStatus::Success);
1659        assert_eq!(last.summary.as_deref(), Some("{\"files\": 3}"));
1660    }
1661
1662    #[test]
1663    fn history_returns_most_recent_first() {
1664        let store = StateStore::open_in_memory().unwrap();
1665        store
1666            .record_apply("p1", "h1", ApplyStatus::Success, None)
1667            .unwrap();
1668        store
1669            .record_apply("p2", "h2", ApplyStatus::Partial, None)
1670            .unwrap();
1671        store
1672            .record_apply("p3", "h3", ApplyStatus::Failed, None)
1673            .unwrap();
1674
1675        let history = store.history(10).unwrap();
1676        assert_eq!(history.len(), 3);
1677        assert_eq!(history[0].profile, "p3");
1678        assert_eq!(history[1].profile, "p2");
1679        assert_eq!(history[2].profile, "p1");
1680    }
1681
1682    #[test]
1683    fn history_respects_limit() {
1684        let store = StateStore::open_in_memory().unwrap();
1685        for i in 0..10 {
1686            store
1687                .record_apply(&format!("p{}", i), "h", ApplyStatus::Success, None)
1688                .unwrap();
1689        }
1690
1691        let history = store.history(3).unwrap();
1692        assert_eq!(history.len(), 3);
1693    }
1694
1695    #[test]
1696    fn record_and_retrieve_drift() {
1697        let store = StateStore::open_in_memory().unwrap();
1698        store
1699            .record_drift(
1700                "file",
1701                "/home/user/.zshrc",
1702                Some("abc"),
1703                Some("def"),
1704                "local",
1705            )
1706            .unwrap();
1707
1708        let events = store.unresolved_drift().unwrap();
1709        assert_eq!(events.len(), 1);
1710        assert_eq!(events[0].resource_type, "file");
1711        assert_eq!(events[0].resource_id, "/home/user/.zshrc");
1712        assert_eq!(events[0].expected.as_deref(), Some("abc"));
1713        assert_eq!(events[0].actual.as_deref(), Some("def"));
1714        assert!(events[0].resolved_by.is_none());
1715    }
1716
1717    #[test]
1718    fn resolve_drift_links_to_apply() {
1719        let store = StateStore::open_in_memory().unwrap();
1720        store
1721            .record_drift("file", "/test", Some("a"), Some("b"), "local")
1722            .unwrap();
1723
1724        let apply_id = store
1725            .record_apply("default", "h", ApplyStatus::Success, None)
1726            .unwrap();
1727        store.resolve_drift(apply_id, "file", "/test").unwrap();
1728
1729        let events = store.unresolved_drift().unwrap();
1730        assert!(events.is_empty());
1731    }
1732
1733    #[test]
1734    fn upsert_managed_resource() {
1735        let store = StateStore::open_in_memory().unwrap();
1736        store
1737            .upsert_managed_resource("file", "/home/.zshrc", "local", Some("hash1"), None)
1738            .unwrap();
1739
1740        let resources = store.managed_resources().unwrap();
1741        assert_eq!(resources.len(), 1);
1742        assert_eq!(resources[0].resource_type, "file");
1743        assert_eq!(resources[0].resource_id, "/home/.zshrc");
1744        assert_eq!(resources[0].last_hash.as_deref(), Some("hash1"));
1745
1746        // Update with new hash
1747        store
1748            .upsert_managed_resource("file", "/home/.zshrc", "local", Some("hash2"), None)
1749            .unwrap();
1750
1751        let resources = store.managed_resources().unwrap();
1752        assert_eq!(resources.len(), 1);
1753        assert_eq!(resources[0].last_hash.as_deref(), Some("hash2"));
1754    }
1755
1756    #[test]
1757    fn is_resource_managed() {
1758        let store = StateStore::open_in_memory().unwrap();
1759
1760        assert!(!store.is_resource_managed("file", "/home/.zshrc").unwrap());
1761
1762        store
1763            .upsert_managed_resource("file", "/home/.zshrc", "local", Some("hash1"), None)
1764            .unwrap();
1765
1766        assert!(store.is_resource_managed("file", "/home/.zshrc").unwrap());
1767        assert!(!store.is_resource_managed("file", "/home/.bashrc").unwrap());
1768        assert!(
1769            !store
1770                .is_resource_managed("package", "/home/.zshrc")
1771                .unwrap()
1772        );
1773    }
1774
1775    #[test]
1776    fn managed_resources_unique_constraint() {
1777        let store = StateStore::open_in_memory().unwrap();
1778        store
1779            .upsert_managed_resource("file", "/a", "local", None, None)
1780            .unwrap();
1781        store
1782            .upsert_managed_resource("package", "/a", "local", None, None)
1783            .unwrap();
1784
1785        let resources = store.managed_resources().unwrap();
1786        assert_eq!(resources.len(), 2);
1787    }
1788
1789    #[test]
1790    fn plan_hash_is_deterministic() {
1791        let h1 = plan_hash("test plan data");
1792        let h2 = plan_hash("test plan data");
1793        assert_eq!(h1, h2);
1794        assert_ne!(h1, plan_hash("different data"));
1795    }
1796
1797    #[test]
1798    fn now_iso8601_format() {
1799        let ts = crate::utc_now_iso8601();
1800        assert!(ts.contains('T'));
1801        assert!(ts.ends_with('Z'));
1802        assert_eq!(ts.len(), 20);
1803    }
1804
1805    #[test]
1806    fn open_file_based_store() {
1807        let dir = tempfile::tempdir().unwrap();
1808        let db_path = dir.path().join("state.db");
1809
1810        let store = StateStore::open(&db_path).unwrap();
1811        store
1812            .record_apply("test", "hash", ApplyStatus::Success, None)
1813            .unwrap();
1814
1815        // Reopen and verify persistence
1816        let store2 = StateStore::open(&db_path).unwrap();
1817        let last = store2.last_apply().unwrap().unwrap();
1818        assert_eq!(last.profile, "test");
1819    }
1820
1821    // --- Config source state tests ---
1822
1823    #[test]
1824    fn upsert_and_list_config_sources() {
1825        let store = StateStore::open_in_memory().unwrap();
1826        store
1827            .upsert_config_source(
1828                "acme",
1829                "git@github.com:acme/config.git",
1830                "master",
1831                Some("abc123"),
1832                Some("2.1.0"),
1833                Some("~2"),
1834            )
1835            .unwrap();
1836
1837        let sources = store.config_sources().unwrap();
1838        assert_eq!(sources.len(), 1);
1839        assert_eq!(sources[0].name, "acme");
1840        assert_eq!(sources[0].origin_url, "git@github.com:acme/config.git");
1841        assert_eq!(sources[0].last_commit.as_deref(), Some("abc123"));
1842        assert_eq!(sources[0].source_version.as_deref(), Some("2.1.0"));
1843        assert_eq!(sources[0].status, "active");
1844    }
1845
1846    #[test]
1847    fn config_source_by_name() {
1848        let store = StateStore::open_in_memory().unwrap();
1849        store
1850            .upsert_config_source("acme", "url", "main", None, None, None)
1851            .unwrap();
1852
1853        let found = store.config_source_by_name("acme").unwrap();
1854        assert!(found.is_some());
1855        assert_eq!(found.unwrap().name, "acme");
1856
1857        let not_found = store.config_source_by_name("nonexistent").unwrap();
1858        assert!(not_found.is_none());
1859    }
1860
1861    #[test]
1862    fn remove_config_source() {
1863        let store = StateStore::open_in_memory().unwrap();
1864        store
1865            .upsert_config_source("acme", "url", "main", None, None, None)
1866            .unwrap();
1867
1868        store.remove_config_source("acme").unwrap();
1869        let sources = store.config_sources().unwrap();
1870        assert!(sources.is_empty());
1871    }
1872
1873    #[test]
1874    fn update_config_source_status() {
1875        let store = StateStore::open_in_memory().unwrap();
1876        store
1877            .upsert_config_source("acme", "url", "main", None, None, None)
1878            .unwrap();
1879
1880        store
1881            .update_config_source_status("acme", "inactive")
1882            .unwrap();
1883        let source = store.config_source_by_name("acme").unwrap().unwrap();
1884        assert_eq!(source.status, "inactive");
1885    }
1886
1887    #[test]
1888    fn record_source_conflict() {
1889        let store = StateStore::open_in_memory().unwrap();
1890        store
1891            .record_source_conflict(
1892                "acme",
1893                "package",
1894                "git-secrets (brew)",
1895                "REQUIRED",
1896                Some("team requirement"),
1897            )
1898            .unwrap();
1899
1900        // Verify the conflict was actually persisted
1901        let count: i64 = store
1902            .conn
1903            .query_row(
1904                "SELECT COUNT(*) FROM source_conflicts WHERE source_name = ?1",
1905                params!["acme"],
1906                |row| row.get(0),
1907            )
1908            .unwrap();
1909        assert_eq!(count, 1, "one conflict should be recorded");
1910
1911        let (resource_type, resource_id, resolution, detail): (String, String, String, Option<String>) = store
1912            .conn
1913            .query_row(
1914                "SELECT resource_type, resource_id, resolution, detail FROM source_conflicts WHERE source_name = ?1",
1915                params!["acme"],
1916                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1917            )
1918            .unwrap();
1919        assert_eq!(resource_type, "package");
1920        assert_eq!(resource_id, "git-secrets (brew)");
1921        assert_eq!(resolution, "REQUIRED");
1922        assert_eq!(detail.as_deref(), Some("team requirement"));
1923    }
1924
1925    #[test]
1926    fn managed_resources_by_source() {
1927        let store = StateStore::open_in_memory().unwrap();
1928        store
1929            .upsert_managed_resource("file", "/a", "local", None, None)
1930            .unwrap();
1931        store
1932            .upsert_managed_resource("file", "/b", "acme", None, None)
1933            .unwrap();
1934        store
1935            .upsert_managed_resource("package", "git-secrets", "acme", None, None)
1936            .unwrap();
1937
1938        let acme_resources = store.managed_resources_by_source("acme").unwrap();
1939        assert_eq!(acme_resources.len(), 2);
1940
1941        let local_resources = store.managed_resources_by_source("local").unwrap();
1942        assert_eq!(local_resources.len(), 1);
1943    }
1944
1945    #[test]
1946    fn upsert_config_source_updates_on_conflict() {
1947        let store = StateStore::open_in_memory().unwrap();
1948        store
1949            .upsert_config_source("acme", "url1", "main", Some("commit1"), Some("1.0.0"), None)
1950            .unwrap();
1951        store
1952            .upsert_config_source(
1953                "acme",
1954                "url2",
1955                "dev",
1956                Some("commit2"),
1957                Some("2.0.0"),
1958                Some("~2"),
1959            )
1960            .unwrap();
1961
1962        let sources = store.config_sources().unwrap();
1963        assert_eq!(sources.len(), 1);
1964        assert_eq!(sources[0].origin_url, "url2");
1965        assert_eq!(sources[0].origin_branch, "dev");
1966        assert_eq!(sources[0].last_commit.as_deref(), Some("commit2"));
1967        assert_eq!(sources[0].source_version.as_deref(), Some("2.0.0"));
1968    }
1969
1970    // --- Pending decision tests ---
1971
1972    #[test]
1973    fn upsert_and_list_pending_decisions() {
1974        let store = StateStore::open_in_memory().unwrap();
1975        let id = store
1976            .upsert_pending_decision(
1977                "acme",
1978                "packages.brew.k9s",
1979                "recommended",
1980                "install",
1981                "install k9s (recommended by acme)",
1982            )
1983            .unwrap();
1984        assert!(id > 0);
1985
1986        let decisions = store.pending_decisions().unwrap();
1987        assert_eq!(decisions.len(), 1);
1988        assert_eq!(decisions[0].source, "acme");
1989        assert_eq!(decisions[0].resource, "packages.brew.k9s");
1990        assert_eq!(decisions[0].tier, "recommended");
1991        assert_eq!(decisions[0].action, "install");
1992        assert!(decisions[0].resolved_at.is_none());
1993    }
1994
1995    #[test]
1996    fn upsert_pending_decision_updates_existing() {
1997        let store = StateStore::open_in_memory().unwrap();
1998        store
1999            .upsert_pending_decision(
2000                "acme",
2001                "packages.brew.k9s",
2002                "recommended",
2003                "install",
2004                "original summary",
2005            )
2006            .unwrap();
2007        store
2008            .upsert_pending_decision(
2009                "acme",
2010                "packages.brew.k9s",
2011                "recommended",
2012                "update",
2013                "updated summary",
2014            )
2015            .unwrap();
2016
2017        let decisions = store.pending_decisions().unwrap();
2018        assert_eq!(decisions.len(), 1);
2019        assert_eq!(decisions[0].action, "update");
2020        assert_eq!(decisions[0].summary, "updated summary");
2021    }
2022
2023    #[test]
2024    fn resolve_decision_by_resource() {
2025        let store = StateStore::open_in_memory().unwrap();
2026        store
2027            .upsert_pending_decision("acme", "packages.brew.k9s", "recommended", "install", "k9s")
2028            .unwrap();
2029
2030        let resolved = store
2031            .resolve_decision("packages.brew.k9s", "accepted")
2032            .unwrap();
2033        assert!(resolved);
2034
2035        let pending = store.pending_decisions().unwrap();
2036        assert!(pending.is_empty());
2037    }
2038
2039    #[test]
2040    fn resolve_decision_nonexistent_returns_false() {
2041        let store = StateStore::open_in_memory().unwrap();
2042        let resolved = store
2043            .resolve_decision("nonexistent.resource", "accepted")
2044            .unwrap();
2045        assert!(!resolved);
2046    }
2047
2048    #[test]
2049    fn resolve_decisions_for_source() {
2050        let store = StateStore::open_in_memory().unwrap();
2051        store
2052            .upsert_pending_decision("acme", "packages.brew.k9s", "recommended", "install", "k9s")
2053            .unwrap();
2054        store
2055            .upsert_pending_decision(
2056                "acme",
2057                "packages.brew.stern",
2058                "recommended",
2059                "install",
2060                "stern",
2061            )
2062            .unwrap();
2063        store
2064            .upsert_pending_decision("other", "packages.brew.bat", "optional", "install", "bat")
2065            .unwrap();
2066
2067        let count = store
2068            .resolve_decisions_for_source("acme", "accepted")
2069            .unwrap();
2070        assert_eq!(count, 2);
2071
2072        let pending = store.pending_decisions().unwrap();
2073        assert_eq!(pending.len(), 1);
2074        assert_eq!(pending[0].source, "other");
2075    }
2076
2077    #[test]
2078    fn resolve_all_decisions() {
2079        let store = StateStore::open_in_memory().unwrap();
2080        store
2081            .upsert_pending_decision("a", "r1", "recommended", "install", "s1")
2082            .unwrap();
2083        store
2084            .upsert_pending_decision("b", "r2", "optional", "install", "s2")
2085            .unwrap();
2086
2087        let count = store.resolve_all_decisions("accepted").unwrap();
2088        assert_eq!(count, 2);
2089
2090        let pending = store.pending_decisions().unwrap();
2091        assert!(pending.is_empty());
2092    }
2093
2094    #[test]
2095    fn pending_decisions_for_source() {
2096        let store = StateStore::open_in_memory().unwrap();
2097        store
2098            .upsert_pending_decision("acme", "r1", "recommended", "install", "s1")
2099            .unwrap();
2100        store
2101            .upsert_pending_decision("other", "r2", "optional", "install", "s2")
2102            .unwrap();
2103
2104        let acme = store.pending_decisions_for_source("acme").unwrap();
2105        assert_eq!(acme.len(), 1);
2106        assert_eq!(acme[0].resource, "r1");
2107    }
2108
2109    // --- Source config hash tests ---
2110
2111    #[test]
2112    fn set_and_get_source_config_hash() {
2113        let store = StateStore::open_in_memory().unwrap();
2114        store.set_source_config_hash("acme", "hash123").unwrap();
2115
2116        let hash = store.source_config_hash("acme").unwrap().unwrap();
2117        assert_eq!(hash.config_hash, "hash123");
2118    }
2119
2120    #[test]
2121    fn source_config_hash_upsert() {
2122        let store = StateStore::open_in_memory().unwrap();
2123        store.set_source_config_hash("acme", "hash1").unwrap();
2124        store.set_source_config_hash("acme", "hash2").unwrap();
2125
2126        let hash = store.source_config_hash("acme").unwrap().unwrap();
2127        assert_eq!(hash.config_hash, "hash2");
2128    }
2129
2130    #[test]
2131    fn source_config_hash_not_found() {
2132        let store = StateStore::open_in_memory().unwrap();
2133        let hash = store.source_config_hash("nonexistent").unwrap();
2134        assert!(hash.is_none());
2135    }
2136
2137    #[test]
2138    fn remove_source_config_hash() {
2139        let store = StateStore::open_in_memory().unwrap();
2140        store.set_source_config_hash("acme", "hash1").unwrap();
2141        store.remove_source_config_hash("acme").unwrap();
2142
2143        let hash = store.source_config_hash("acme").unwrap();
2144        assert!(hash.is_none());
2145    }
2146
2147    #[test]
2148    fn file_backup_store_and_retrieve() {
2149        let store = StateStore::open_in_memory().unwrap();
2150        let apply_id = store
2151            .record_apply("test", "hash", ApplyStatus::Success, None)
2152            .unwrap();
2153
2154        let state = crate::FileState {
2155            content: b"original content".to_vec(),
2156            content_hash: "abc123".to_string(),
2157            permissions: Some(0o644),
2158            is_symlink: false,
2159            symlink_target: None,
2160            oversized: false,
2161        };
2162
2163        store
2164            .store_file_backup(apply_id, "/home/user/.bashrc", &state)
2165            .unwrap();
2166
2167        let backup = store
2168            .get_file_backup(apply_id, "/home/user/.bashrc")
2169            .unwrap()
2170            .unwrap();
2171        assert_eq!(backup.content, b"original content");
2172        assert_eq!(backup.content_hash, "abc123");
2173        assert_eq!(backup.permissions, Some(0o644));
2174        assert!(!backup.was_symlink);
2175        assert!(!backup.oversized);
2176    }
2177
2178    #[test]
2179    fn file_backup_symlink() {
2180        let store = StateStore::open_in_memory().unwrap();
2181        let apply_id = store
2182            .record_apply("test", "hash", ApplyStatus::Success, None)
2183            .unwrap();
2184
2185        let state = crate::FileState {
2186            content: Vec::new(),
2187            content_hash: String::new(),
2188            permissions: None,
2189            is_symlink: true,
2190            symlink_target: Some(PathBuf::from("/etc/original")),
2191            oversized: false,
2192        };
2193
2194        store
2195            .store_file_backup(apply_id, "/home/user/link", &state)
2196            .unwrap();
2197
2198        let backup = store
2199            .get_file_backup(apply_id, "/home/user/link")
2200            .unwrap()
2201            .unwrap();
2202        assert!(backup.was_symlink);
2203        assert_eq!(backup.symlink_target.unwrap(), "/etc/original");
2204    }
2205
2206    #[test]
2207    fn get_apply_backups_returns_all() {
2208        let store = StateStore::open_in_memory().unwrap();
2209        let apply_id = store
2210            .record_apply("test", "hash", ApplyStatus::Success, None)
2211            .unwrap();
2212
2213        for i in 0..3 {
2214            let state = crate::FileState {
2215                content: format!("content {}", i).into_bytes(),
2216                content_hash: format!("hash{}", i),
2217                permissions: Some(0o644),
2218                is_symlink: false,
2219                symlink_target: None,
2220                oversized: false,
2221            };
2222            store
2223                .store_file_backup(apply_id, &format!("/file{}", i), &state)
2224                .unwrap();
2225        }
2226
2227        let backups = store.get_apply_backups(apply_id).unwrap();
2228        assert_eq!(backups.len(), 3);
2229    }
2230
2231    #[test]
2232    fn latest_backup_for_path_returns_most_recent() {
2233        let store = StateStore::open_in_memory().unwrap();
2234
2235        for i in 0..3 {
2236            let apply_id = store
2237                .record_apply("test", &format!("hash{}", i), ApplyStatus::Success, None)
2238                .unwrap();
2239            let state = crate::FileState {
2240                content: format!("content v{}", i).into_bytes(),
2241                content_hash: format!("hash{}", i),
2242                permissions: Some(0o644),
2243                is_symlink: false,
2244                symlink_target: None,
2245                oversized: false,
2246            };
2247            store
2248                .store_file_backup(apply_id, "/home/user/.bashrc", &state)
2249                .unwrap();
2250        }
2251
2252        let backup = store
2253            .latest_backup_for_path("/home/user/.bashrc")
2254            .unwrap()
2255            .unwrap();
2256        assert_eq!(backup.content_hash, "hash2");
2257    }
2258
2259    #[test]
2260    fn journal_lifecycle() {
2261        let store = StateStore::open_in_memory().unwrap();
2262        let apply_id = store
2263            .record_apply("test", "hash", ApplyStatus::Success, None)
2264            .unwrap();
2265
2266        let j1 = store
2267            .journal_begin(apply_id, 0, "files", "create", "/home/user/.bashrc", None)
2268            .unwrap();
2269        store.journal_complete(j1, Some("hash123"), None).unwrap();
2270
2271        let j2 = store
2272            .journal_begin(apply_id, 1, "files", "update", "/home/user/.zshrc", None)
2273            .unwrap();
2274        store.journal_fail(j2, "permission denied").unwrap();
2275
2276        // Script action with captured output
2277        let j3 = store
2278            .journal_begin(apply_id, 2, "scripts", "run", "setup.sh", None)
2279            .unwrap();
2280        store
2281            .journal_complete(j3, None, Some("installed deps\nall good"))
2282            .unwrap();
2283
2284        let completed = store.journal_completed_actions(apply_id).unwrap();
2285        assert_eq!(completed.len(), 2);
2286        assert_eq!(completed[0].resource_id, "/home/user/.bashrc");
2287        assert_eq!(completed[0].status, "completed");
2288        assert!(completed[0].script_output.is_none());
2289        assert_eq!(completed[1].resource_id, "setup.sh");
2290        assert_eq!(
2291            completed[1].script_output.as_deref(),
2292            Some("installed deps\nall good")
2293        );
2294
2295        // journal_entries returns all entries including failed ones
2296        let all = store.journal_entries(apply_id).unwrap();
2297        assert_eq!(all.len(), 3);
2298        assert_eq!(all[1].status, "failed");
2299    }
2300
2301    #[test]
2302    fn module_file_manifest_crud() {
2303        let store = StateStore::open_in_memory().unwrap();
2304        let apply_id = store
2305            .record_apply("test", "hash", ApplyStatus::Success, None)
2306            .unwrap();
2307
2308        store
2309            .upsert_module_file(
2310                "nvim",
2311                "/home/user/.config/nvim/init.lua",
2312                "hash1",
2313                "Copy",
2314                apply_id,
2315            )
2316            .unwrap();
2317        store
2318            .upsert_module_file(
2319                "nvim",
2320                "/home/user/.config/nvim/lazy.lua",
2321                "hash2",
2322                "Copy",
2323                apply_id,
2324            )
2325            .unwrap();
2326
2327        let files = store.module_deployed_files("nvim").unwrap();
2328        assert_eq!(files.len(), 2);
2329        assert_eq!(files[0].file_path, "/home/user/.config/nvim/init.lua");
2330
2331        // Upsert updates existing
2332        store
2333            .upsert_module_file(
2334                "nvim",
2335                "/home/user/.config/nvim/init.lua",
2336                "newhash",
2337                "Symlink",
2338                apply_id,
2339            )
2340            .unwrap();
2341        let files = store.module_deployed_files("nvim").unwrap();
2342        assert_eq!(files.len(), 2);
2343        assert_eq!(files[0].content_hash, "newhash");
2344        assert_eq!(files[0].strategy, "Symlink");
2345
2346        // Delete all
2347        store.delete_module_files("nvim").unwrap();
2348        let files = store.module_deployed_files("nvim").unwrap();
2349        assert!(files.is_empty());
2350    }
2351
2352    #[test]
2353    fn prune_old_backups_keeps_recent() {
2354        let store = StateStore::open_in_memory().unwrap();
2355
2356        // Create 5 applies with backups
2357        for i in 0..5 {
2358            let apply_id = store
2359                .record_apply("test", &format!("hash{}", i), ApplyStatus::Success, None)
2360                .unwrap();
2361            let state = crate::FileState {
2362                content: format!("content {}", i).into_bytes(),
2363                content_hash: format!("hash{}", i),
2364                permissions: Some(0o644),
2365                is_symlink: false,
2366                symlink_target: None,
2367                oversized: false,
2368            };
2369            store.store_file_backup(apply_id, "/file", &state).unwrap();
2370        }
2371
2372        // Prune keeping last 2
2373        let pruned = store.prune_old_backups(2).unwrap();
2374        assert_eq!(pruned, 3);
2375
2376        // Only 2 backups remain
2377        let all: i64 = store
2378            .conn
2379            .query_row("SELECT COUNT(*) FROM file_backups", [], |row| row.get(0))
2380            .unwrap();
2381        assert_eq!(all, 2);
2382    }
2383
2384    #[test]
2385    fn update_apply_status_works() {
2386        let store = StateStore::open_in_memory().unwrap();
2387        let apply_id = store
2388            .record_apply("test", "hash", ApplyStatus::Success, None)
2389            .unwrap();
2390
2391        store
2392            .update_apply_status(apply_id, ApplyStatus::Partial, Some("{\"failed\":1}"))
2393            .unwrap();
2394
2395        let record = store.last_apply().unwrap().unwrap();
2396        assert_eq!(record.status, ApplyStatus::Partial);
2397        assert_eq!(record.summary.unwrap(), "{\"failed\":1}");
2398    }
2399
2400    #[test]
2401    fn schema_version_is_4_after_migration() {
2402        let store = StateStore::open_in_memory().unwrap();
2403        let version = store.schema_version();
2404        assert_eq!(version, 4);
2405    }
2406
2407    // --- Compliance snapshot tests ---
2408
2409    fn make_test_snapshot() -> crate::compliance::ComplianceSnapshot {
2410        crate::compliance::ComplianceSnapshot {
2411            timestamp: crate::utc_now_iso8601(),
2412            machine: crate::compliance::MachineInfo {
2413                hostname: "test-host".into(),
2414                os: "linux".into(),
2415                arch: "x86_64".into(),
2416            },
2417            profile: "default".into(),
2418            sources: vec!["local".into()],
2419            checks: vec![
2420                crate::compliance::ComplianceCheck {
2421                    category: "file".into(),
2422                    target: Some("/home/user/.zshrc".into()),
2423                    status: crate::compliance::ComplianceStatus::Compliant,
2424                    detail: Some("present".into()),
2425                    ..Default::default()
2426                },
2427                crate::compliance::ComplianceCheck {
2428                    category: "package".into(),
2429                    name: Some("ripgrep".into()),
2430                    status: crate::compliance::ComplianceStatus::Violation,
2431                    detail: Some("not installed".into()),
2432                    ..Default::default()
2433                },
2434                crate::compliance::ComplianceCheck {
2435                    category: "system".into(),
2436                    key: Some("shell".into()),
2437                    status: crate::compliance::ComplianceStatus::Warning,
2438                    detail: Some("no configurator".into()),
2439                    ..Default::default()
2440                },
2441            ],
2442            summary: crate::compliance::ComplianceSummary {
2443                compliant: 1,
2444                warning: 1,
2445                violation: 1,
2446            },
2447        }
2448    }
2449
2450    #[test]
2451    fn compliance_snapshot_roundtrip() {
2452        let store = StateStore::open_in_memory().unwrap();
2453        let snapshot = make_test_snapshot();
2454
2455        let json = serde_json::to_string(&snapshot).unwrap();
2456        let hash = crate::sha256_hex(json.as_bytes());
2457
2458        store.store_compliance_snapshot(&snapshot, &hash).unwrap();
2459
2460        // Retrieve by latest hash
2461        let latest = store.latest_compliance_hash().unwrap().unwrap();
2462        assert_eq!(latest, hash);
2463
2464        // Retrieve full snapshot by history
2465        let history = store.compliance_history(None, 10).unwrap();
2466        assert_eq!(history.len(), 1);
2467        let row = &history[0];
2468        assert_eq!(row.compliant, 1);
2469        assert_eq!(row.warning, 1);
2470        assert_eq!(row.violation, 1);
2471
2472        // Retrieve by ID
2473        let retrieved = store.get_compliance_snapshot(row.id).unwrap().unwrap();
2474        assert_eq!(retrieved.profile, "default");
2475        assert_eq!(retrieved.checks.len(), 3);
2476        assert_eq!(retrieved.summary.compliant, 1);
2477    }
2478
2479    #[test]
2480    fn compliance_latest_hash_empty() {
2481        let store = StateStore::open_in_memory().unwrap();
2482        assert!(store.latest_compliance_hash().unwrap().is_none());
2483    }
2484
2485    #[test]
2486    fn compliance_latest_hash_returns_most_recent() {
2487        let store = StateStore::open_in_memory().unwrap();
2488
2489        let mut s1 = make_test_snapshot();
2490        s1.timestamp = "2026-01-01T00:00:00Z".into();
2491        store.store_compliance_snapshot(&s1, "hash1").unwrap();
2492
2493        let mut s2 = make_test_snapshot();
2494        s2.timestamp = "2026-01-02T00:00:00Z".into();
2495        store.store_compliance_snapshot(&s2, "hash2").unwrap();
2496
2497        let latest = store.latest_compliance_hash().unwrap().unwrap();
2498        assert_eq!(latest, "hash2");
2499    }
2500
2501    #[test]
2502    fn compliance_prune_removes_old_snapshots() {
2503        let store = StateStore::open_in_memory().unwrap();
2504
2505        let mut s1 = make_test_snapshot();
2506        s1.timestamp = "2026-01-01T00:00:00Z".into();
2507        store.store_compliance_snapshot(&s1, "hash1").unwrap();
2508
2509        let mut s2 = make_test_snapshot();
2510        s2.timestamp = "2026-01-15T00:00:00Z".into();
2511        store.store_compliance_snapshot(&s2, "hash2").unwrap();
2512
2513        let mut s3 = make_test_snapshot();
2514        s3.timestamp = "2026-02-01T00:00:00Z".into();
2515        store.store_compliance_snapshot(&s3, "hash3").unwrap();
2516
2517        // Prune everything before Feb
2518        let deleted = store
2519            .prune_compliance_snapshots("2026-02-01T00:00:00Z")
2520            .unwrap();
2521        assert_eq!(deleted, 2);
2522
2523        let history = store.compliance_history(None, 10).unwrap();
2524        assert_eq!(history.len(), 1);
2525    }
2526
2527    #[test]
2528    fn compliance_history_with_since() {
2529        let store = StateStore::open_in_memory().unwrap();
2530
2531        let mut s1 = make_test_snapshot();
2532        s1.timestamp = "2026-01-01T00:00:00Z".into();
2533        store.store_compliance_snapshot(&s1, "h1").unwrap();
2534
2535        let mut s2 = make_test_snapshot();
2536        s2.timestamp = "2026-01-10T00:00:00Z".into();
2537        store.store_compliance_snapshot(&s2, "h2").unwrap();
2538
2539        let mut s3 = make_test_snapshot();
2540        s3.timestamp = "2026-01-20T00:00:00Z".into();
2541        store.store_compliance_snapshot(&s3, "h3").unwrap();
2542
2543        let history = store
2544            .compliance_history(Some("2026-01-05T00:00:00Z"), 10)
2545            .unwrap();
2546        assert_eq!(history.len(), 2);
2547    }
2548
2549    #[test]
2550    fn compliance_get_nonexistent() {
2551        let store = StateStore::open_in_memory().unwrap();
2552        assert!(store.get_compliance_snapshot(999).unwrap().is_none());
2553    }
2554
2555    // --- Module state CRUD ---
2556
2557    #[test]
2558    fn module_state_upsert_and_retrieve() {
2559        let store = StateStore::open_in_memory().unwrap();
2560
2561        // Create apply records first (foreign key constraint)
2562        let apply1 = store
2563            .record_apply("default", "h1", ApplyStatus::Success, None)
2564            .unwrap();
2565
2566        store
2567            .upsert_module_state(
2568                "nvim",
2569                Some(apply1),
2570                "pkg-hash-1",
2571                "file-hash-1",
2572                None,
2573                "installed",
2574            )
2575            .unwrap();
2576        store
2577            .upsert_module_state(
2578                "tmux",
2579                None,
2580                "pkg-hash-2",
2581                "file-hash-2",
2582                Some("https://github.com/example/tmux.git@abc123"),
2583                "installed",
2584            )
2585            .unwrap();
2586
2587        let states = store.module_states().unwrap();
2588        assert_eq!(states.len(), 2);
2589        // Ordered by module_name
2590        assert_eq!(states[0].module_name, "nvim");
2591        assert_eq!(states[0].packages_hash, "pkg-hash-1");
2592        assert_eq!(states[0].files_hash, "file-hash-1");
2593        assert_eq!(states[0].status, "installed");
2594        assert_eq!(states[0].last_applied, Some(apply1));
2595        assert!(states[0].git_sources.is_none());
2596
2597        assert_eq!(states[1].module_name, "tmux");
2598        assert!(states[1].last_applied.is_none());
2599        assert_eq!(
2600            states[1].git_sources.as_deref(),
2601            Some("https://github.com/example/tmux.git@abc123")
2602        );
2603    }
2604
2605    #[test]
2606    fn module_state_by_name_found_and_not_found() {
2607        let store = StateStore::open_in_memory().unwrap();
2608
2609        let apply_id = store
2610            .record_apply("default", "h", ApplyStatus::Success, None)
2611            .unwrap();
2612
2613        store
2614            .upsert_module_state("shell", Some(apply_id), "h1", "h2", None, "installed")
2615            .unwrap();
2616
2617        let found = store.module_state_by_name("shell").unwrap();
2618        assert!(found.is_some());
2619        let rec = found.unwrap();
2620        assert_eq!(rec.module_name, "shell");
2621        assert_eq!(rec.last_applied, Some(apply_id));
2622
2623        let not_found = store.module_state_by_name("nonexistent").unwrap();
2624        assert!(not_found.is_none());
2625    }
2626
2627    #[test]
2628    fn module_state_upsert_updates_on_conflict() {
2629        let store = StateStore::open_in_memory().unwrap();
2630
2631        let apply1 = store
2632            .record_apply("default", "h1", ApplyStatus::Success, None)
2633            .unwrap();
2634        let apply2 = store
2635            .record_apply("default", "h2", ApplyStatus::Success, None)
2636            .unwrap();
2637
2638        store
2639            .upsert_module_state(
2640                "nvim",
2641                Some(apply1),
2642                "old-pkg",
2643                "old-file",
2644                None,
2645                "installed",
2646            )
2647            .unwrap();
2648        store
2649            .upsert_module_state("nvim", Some(apply2), "new-pkg", "new-file", None, "updated")
2650            .unwrap();
2651
2652        let states = store.module_states().unwrap();
2653        assert_eq!(
2654            states.len(),
2655            1,
2656            "upsert should update, not insert duplicate"
2657        );
2658        assert_eq!(states[0].packages_hash, "new-pkg");
2659        assert_eq!(states[0].files_hash, "new-file");
2660        assert_eq!(states[0].status, "updated");
2661        assert_eq!(states[0].last_applied, Some(apply2));
2662    }
2663
2664    #[test]
2665    fn module_state_remove() {
2666        let store = StateStore::open_in_memory().unwrap();
2667
2668        store
2669            .upsert_module_state("nvim", None, "h1", "h2", None, "installed")
2670            .unwrap();
2671        store
2672            .upsert_module_state("tmux", None, "h3", "h4", None, "installed")
2673            .unwrap();
2674
2675        assert_eq!(store.module_states().unwrap().len(), 2);
2676
2677        store.remove_module_state("nvim").unwrap();
2678        let states = store.module_states().unwrap();
2679        assert_eq!(states.len(), 1);
2680        assert_eq!(states[0].module_name, "tmux");
2681
2682        // Removing nonexistent module should not error
2683        store.remove_module_state("nonexistent").unwrap();
2684        assert_eq!(store.module_states().unwrap().len(), 1);
2685    }
2686
2687    // --- record_source_apply ---
2688
2689    #[test]
2690    fn record_source_apply_links_to_source() {
2691        let store = StateStore::open_in_memory().unwrap();
2692
2693        // Create a source first
2694        store
2695            .upsert_config_source(
2696                "acme",
2697                "https://github.com/acme/config.git",
2698                "main",
2699                None,
2700                None,
2701                None,
2702            )
2703            .unwrap();
2704
2705        // Record an apply
2706        let apply_id = store
2707            .record_apply("default", "plan-hash-1", ApplyStatus::Success, None)
2708            .unwrap();
2709        store
2710            .record_source_apply("acme", apply_id, "abc123def")
2711            .unwrap();
2712
2713        // Verify the source exists and was linked
2714        let source = store.config_source_by_name("acme").unwrap();
2715        assert!(source.is_some());
2716    }
2717
2718    #[test]
2719    fn record_source_apply_nonexistent_source_is_noop() {
2720        let store = StateStore::open_in_memory().unwrap();
2721
2722        let apply_id = store
2723            .record_apply("default", "plan-hash-1", ApplyStatus::Success, None)
2724            .unwrap();
2725
2726        // Recording for a nonexistent source should be a no-op (not an error)
2727        store
2728            .record_source_apply("nonexistent", apply_id, "abc123")
2729            .unwrap();
2730
2731        // Verify no rows were inserted into source_applies
2732        let count: i64 = store
2733            .conn
2734            .query_row("SELECT COUNT(*) FROM source_applies", [], |row| row.get(0))
2735            .unwrap();
2736        assert_eq!(
2737            count, 0,
2738            "no source_applies row should exist for nonexistent source"
2739        );
2740
2741        // Verify the source still doesn't exist
2742        let source = store.config_source_by_name("nonexistent").unwrap();
2743        assert!(source.is_none(), "nonexistent source should not be created");
2744    }
2745
2746    // --- file_backups_after_apply ---
2747
2748    #[test]
2749    fn file_backups_after_apply_returns_earliest_per_path() {
2750        let store = StateStore::open_in_memory().unwrap();
2751
2752        let apply1 = store
2753            .record_apply("default", "hash1", ApplyStatus::Success, None)
2754            .unwrap();
2755        let apply2 = store
2756            .record_apply("default", "hash2", ApplyStatus::Success, None)
2757            .unwrap();
2758        let apply3 = store
2759            .record_apply("default", "hash3", ApplyStatus::Success, None)
2760            .unwrap();
2761
2762        // Backup same file at apply2 and apply3
2763        let state_v1 = crate::FileState {
2764            content: b"version1".to_vec(),
2765            content_hash: "hash-v1".into(),
2766            permissions: None,
2767            is_symlink: false,
2768            symlink_target: None,
2769            oversized: false,
2770        };
2771        let state_v2 = crate::FileState {
2772            content: b"version2".to_vec(),
2773            content_hash: "hash-v2".into(),
2774            permissions: None,
2775            is_symlink: false,
2776            symlink_target: None,
2777            oversized: false,
2778        };
2779
2780        store
2781            .store_file_backup(apply2, "/etc/config", &state_v1)
2782            .unwrap();
2783        store
2784            .store_file_backup(apply3, "/etc/config", &state_v2)
2785            .unwrap();
2786
2787        // Backups after apply1 should return the EARLIEST backup per path (apply2's version)
2788        let backups = store.file_backups_after_apply(apply1).unwrap();
2789        assert_eq!(backups.len(), 1);
2790        assert_eq!(backups[0].file_path, "/etc/config");
2791        assert_eq!(backups[0].apply_id, apply2);
2792        assert_eq!(backups[0].content_hash, "hash-v1");
2793
2794        // Backups after apply2 should return apply3's version
2795        let backups_after_2 = store.file_backups_after_apply(apply2).unwrap();
2796        assert_eq!(backups_after_2.len(), 1);
2797        assert_eq!(backups_after_2[0].apply_id, apply3);
2798        assert_eq!(backups_after_2[0].content_hash, "hash-v2");
2799
2800        // Backups after apply3 should be empty
2801        let backups_after_3 = store.file_backups_after_apply(apply3).unwrap();
2802        assert!(backups_after_3.is_empty());
2803    }
2804
2805    // --- journal_entries_after_apply ---
2806
2807    #[test]
2808    fn journal_entries_after_apply_returns_completed_desc() {
2809        let store = StateStore::open_in_memory().unwrap();
2810
2811        let apply1 = store
2812            .record_apply("default", "hash1", ApplyStatus::Success, None)
2813            .unwrap();
2814        let apply2 = store
2815            .record_apply("default", "hash2", ApplyStatus::Success, None)
2816            .unwrap();
2817
2818        // Journal entries for apply2
2819        let j1 = store
2820            .journal_begin(apply2, 0, "Packages", "install", "brew:curl", None)
2821            .unwrap();
2822        store.journal_complete(j1, None, None).unwrap();
2823        let j2 = store
2824            .journal_begin(apply2, 1, "Packages", "install", "brew:wget", None)
2825            .unwrap();
2826        store.journal_complete(j2, None, None).unwrap();
2827        // A failed entry should NOT be returned
2828        let j3 = store
2829            .journal_begin(apply2, 2, "Packages", "install", "brew:vim", None)
2830            .unwrap();
2831        store.journal_fail(j3, "package not found").unwrap();
2832
2833        let entries = store.journal_entries_after_apply(apply1).unwrap();
2834        assert_eq!(
2835            entries.len(),
2836            2,
2837            "should return only completed entries, not failed"
2838        );
2839        // Results are ordered by apply_id DESC, action_index DESC
2840        assert_eq!(entries[0].resource_id, "brew:wget");
2841        assert_eq!(entries[1].resource_id, "brew:curl");
2842        assert_eq!(entries[0].status, "completed");
2843        assert_eq!(entries[1].status, "completed");
2844    }
2845
2846    // --- concurrent in-memory stores ---
2847
2848    #[test]
2849    fn concurrent_in_memory_stores_are_independent() {
2850        let store_a = StateStore::open_in_memory().unwrap();
2851        let store_b = StateStore::open_in_memory().unwrap();
2852
2853        store_a
2854            .record_apply("default", "hash-a", ApplyStatus::Success, None)
2855            .unwrap();
2856
2857        // store_b should be empty — separate database
2858        assert!(store_b.last_apply().unwrap().is_none());
2859        assert_eq!(store_a.history(10).unwrap().len(), 1);
2860        assert_eq!(store_b.history(10).unwrap().len(), 0);
2861    }
2862
2863    // --- schema migration ---
2864
2865    #[test]
2866    fn schema_version_after_open() {
2867        let store = StateStore::open_in_memory().unwrap();
2868        let version = store.schema_version();
2869        assert!(
2870            version >= 4,
2871            "schema version should be at least 4 after migrations: got {version}"
2872        );
2873    }
2874
2875    // --- get_apply by id ---
2876
2877    #[test]
2878    fn get_apply_existing_and_nonexistent() {
2879        let store = StateStore::open_in_memory().unwrap();
2880
2881        let apply_id = store
2882            .record_apply(
2883                "default",
2884                "plan-hash",
2885                ApplyStatus::Success,
2886                Some("{\"summary\": true}"),
2887            )
2888            .unwrap();
2889
2890        let found = store.get_apply(apply_id).unwrap();
2891        assert!(found.is_some());
2892        let rec = found.unwrap();
2893        assert_eq!(rec.id, apply_id);
2894        assert_eq!(rec.plan_hash, "plan-hash");
2895        assert_eq!(rec.status, ApplyStatus::Success);
2896        assert_eq!(rec.summary.as_deref(), Some("{\"summary\": true}"));
2897
2898        let not_found = store.get_apply(99999).unwrap();
2899        assert!(not_found.is_none());
2900    }
2901
2902    // --- update_apply_status ---
2903
2904    #[test]
2905    fn update_apply_status_changes_status() {
2906        let store = StateStore::open_in_memory().unwrap();
2907
2908        let apply_id = store
2909            .record_apply("default", "hash", ApplyStatus::InProgress, None)
2910            .unwrap();
2911
2912        store
2913            .update_apply_status(apply_id, ApplyStatus::Success, Some("{\"total\": 5}"))
2914            .unwrap();
2915
2916        let rec = store.get_apply(apply_id).unwrap().unwrap();
2917        assert_eq!(rec.status, ApplyStatus::Success);
2918        assert_eq!(rec.summary.as_deref(), Some("{\"total\": 5}"));
2919    }
2920}