Skip to main content

cfgd_core/state/
journal.rs

1use rusqlite::params;
2
3use super::StateStore;
4use super::types::JournalEntry;
5use crate::errors::Result;
6
7impl StateStore {
8    /// Record the start of a journal action.
9    pub fn journal_begin(
10        &self,
11        apply_id: i64,
12        action_index: usize,
13        phase: &str,
14        action_type: &str,
15        resource_id: &str,
16        pre_state: Option<&str>,
17    ) -> Result<i64> {
18        let timestamp = crate::utc_now_iso8601();
19        self.conn.execute(
20            "INSERT INTO apply_journal (apply_id, action_index, phase, action_type, resource_id, pre_state, status, started_at)
21             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'pending', ?7)",
22            params![apply_id, action_index as i64, phase, action_type, resource_id, pre_state, timestamp],
23        )?;
24        Ok(self.conn.last_insert_rowid())
25    }
26
27    /// Mark a journal action as completed, optionally storing script output.
28    pub fn journal_complete(
29        &self,
30        journal_id: i64,
31        post_state: Option<&str>,
32        script_output: Option<&str>,
33    ) -> Result<()> {
34        let timestamp = crate::utc_now_iso8601();
35        self.conn.execute(
36            "UPDATE apply_journal SET status = 'completed', post_state = ?1, completed_at = ?2, script_output = ?3 WHERE id = ?4",
37            params![post_state, timestamp, script_output, journal_id],
38        )?;
39        Ok(())
40    }
41
42    /// Mark a journal action as failed.
43    pub fn journal_fail(&self, journal_id: i64, error: &str) -> Result<()> {
44        let timestamp = crate::utc_now_iso8601();
45        self.conn.execute(
46            "UPDATE apply_journal SET status = 'failed', error = ?1, completed_at = ?2 WHERE id = ?3",
47            params![error, timestamp, journal_id],
48        )?;
49        Ok(())
50    }
51
52    /// Get completed actions for an apply (for rollback).
53    pub fn journal_completed_actions(&self, apply_id: i64) -> Result<Vec<JournalEntry>> {
54        self.query_journal(apply_id, Some("completed"))
55    }
56
57    /// Get all journal entries for an apply (all statuses).
58    pub fn journal_entries(&self, apply_id: i64) -> Result<Vec<JournalEntry>> {
59        self.query_journal(apply_id, None)
60    }
61
62    /// Get all journal entries from applies after the given ID, for rollback tracking.
63    pub fn journal_entries_after_apply(&self, after_apply_id: i64) -> Result<Vec<JournalEntry>> {
64        let mut stmt = self.conn.prepare(
65            "SELECT id, apply_id, action_index, phase, action_type, resource_id, pre_state, post_state, status, error, started_at, completed_at, script_output
66             FROM apply_journal WHERE apply_id > ?1 AND status = 'completed' ORDER BY apply_id DESC, action_index DESC",
67        )?;
68
69        let records = stmt
70            .query_map(params![after_apply_id], |row| {
71                Ok(JournalEntry {
72                    id: row.get(0)?,
73                    apply_id: row.get(1)?,
74                    action_index: row.get(2)?,
75                    phase: row.get(3)?,
76                    action_type: row.get(4)?,
77                    resource_id: row.get(5)?,
78                    pre_state: row.get(6)?,
79                    post_state: row.get(7)?,
80                    status: row.get(8)?,
81                    error: row.get(9)?,
82                    started_at: row.get(10)?,
83                    completed_at: row.get(11)?,
84                    script_output: row.get(12)?,
85                })
86            })?
87            .collect::<std::result::Result<Vec<_>, _>>()?;
88
89        Ok(records)
90    }
91
92    fn query_journal(
93        &self,
94        apply_id: i64,
95        status_filter: Option<&str>,
96    ) -> Result<Vec<JournalEntry>> {
97        let base_sql = if status_filter.is_some() {
98            "SELECT id, apply_id, action_index, phase, action_type, resource_id, pre_state, post_state, status, error, started_at, completed_at, script_output
99             FROM apply_journal WHERE apply_id = ?1 AND status = ?2 ORDER BY action_index"
100        } else {
101            "SELECT id, apply_id, action_index, phase, action_type, resource_id, pre_state, post_state, status, error, started_at, completed_at, script_output
102             FROM apply_journal WHERE apply_id = ?1 ORDER BY action_index"
103        };
104
105        let mut stmt = self.conn.prepare(base_sql)?;
106
107        let map_row = |row: &rusqlite::Row| -> rusqlite::Result<JournalEntry> {
108            Ok(JournalEntry {
109                id: row.get(0)?,
110                apply_id: row.get(1)?,
111                action_index: row.get(2)?,
112                phase: row.get(3)?,
113                action_type: row.get(4)?,
114                resource_id: row.get(5)?,
115                pre_state: row.get(6)?,
116                post_state: row.get(7)?,
117                status: row.get(8)?,
118                error: row.get(9)?,
119                started_at: row.get(10)?,
120                completed_at: row.get(11)?,
121                script_output: row.get(12)?,
122            })
123        };
124
125        let entries: Vec<JournalEntry> = if let Some(status) = status_filter {
126            stmt.query_map(params![apply_id, status], map_row)?
127                .collect::<std::result::Result<Vec<_>, _>>()?
128        } else {
129            stmt.query_map(params![apply_id], map_row)?
130                .collect::<std::result::Result<Vec<_>, _>>()?
131        };
132
133        Ok(entries)
134    }
135}