Skip to main content

rivet/state/
journal_store.rs

1use crate::error::Result;
2use crate::journal::RunJournal;
3
4use super::{StateConn, StateStore};
5
6impl StateStore {
7    /// Persist a completed `RunJournal` to the state DB.
8    ///
9    /// Called once per export run, after `RunCompleted` has been recorded.
10    /// Overwrites any existing row for the same `run_id` (idempotent on retry).
11    pub fn store_journal(&self, journal: &RunJournal) -> Result<()> {
12        let json = serde_json::to_string(journal)?;
13        let now = chrono::Utc::now().to_rfc3339();
14        match &self.conn {
15            StateConn::Sqlite(c) => {
16                c.execute(
17                    "INSERT OR REPLACE INTO run_journal (run_id, export_name, finished_at, journal_json)
18                     VALUES (?1, ?2, ?3, ?4)",
19                    rusqlite::params![journal.run_id, journal.export_name, now, json],
20                )?;
21            }
22            StateConn::Postgres(client) => {
23                let mut c = client.borrow_mut();
24                c.execute(
25                    "INSERT INTO run_journal (run_id, export_name, finished_at, journal_json)
26                     VALUES ($1, $2, $3, $4)
27                     ON CONFLICT (run_id) DO UPDATE SET
28                         export_name  = excluded.export_name,
29                         finished_at  = excluded.finished_at,
30                         journal_json = excluded.journal_json",
31                    &[&journal.run_id, &journal.export_name, &now, &json],
32                )?;
33            }
34        }
35        Ok(())
36    }
37
38    /// Load a journal by `run_id`.  Returns `None` if the run is not found.
39    #[allow(dead_code)]
40    pub fn load_journal(&self, run_id: &str) -> Result<Option<RunJournal>> {
41        match &self.conn {
42            StateConn::Sqlite(c) => {
43                let result = c.query_row(
44                    "SELECT journal_json FROM run_journal WHERE run_id = ?1",
45                    rusqlite::params![run_id],
46                    |row| row.get::<_, String>(0),
47                );
48                match result {
49                    Ok(json) => Ok(Some(serde_json::from_str(&json)?)),
50                    Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
51                    Err(e) => Err(e.into()),
52                }
53            }
54            StateConn::Postgres(client) => {
55                let mut c = client.borrow_mut();
56                match c.query_opt(
57                    "SELECT journal_json FROM run_journal WHERE run_id = $1",
58                    &[&run_id],
59                )? {
60                    Some(row) => {
61                        let json: String = row.get(0);
62                        Ok(Some(serde_json::from_str(&json)?))
63                    }
64                    None => Ok(None),
65                }
66            }
67        }
68    }
69
70    /// Return the most recent `limit` journal entries for an export, newest first.
71    #[allow(dead_code)]
72    pub fn recent_journals(&self, export_name: &str, limit: usize) -> Result<Vec<RunJournal>> {
73        let sql = "SELECT journal_json FROM run_journal
74             WHERE export_name = ?1
75             ORDER BY finished_at DESC
76             LIMIT ?2";
77        match &self.conn {
78            StateConn::Sqlite(c) => {
79                let mut stmt = c.prepare(sql)?;
80                let rows = stmt.query_map(rusqlite::params![export_name, limit as i64], |row| {
81                    row.get::<_, String>(0)
82                })?;
83                let mut out = Vec::new();
84                for row in rows {
85                    out.push(serde_json::from_str::<RunJournal>(&row?)?);
86                }
87                Ok(out)
88            }
89            StateConn::Postgres(client) => {
90                let mut c = client.borrow_mut();
91                let rows = c.query(
92                    &format!(
93                        "SELECT journal_json FROM run_journal
94                         WHERE export_name = $1
95                         ORDER BY finished_at DESC
96                         LIMIT {}",
97                        limit
98                    ),
99                    &[&export_name],
100                )?;
101                let mut out = Vec::new();
102                for row in rows {
103                    let json: String = row.get(0);
104                    out.push(serde_json::from_str::<RunJournal>(&json)?);
105                }
106                Ok(out)
107            }
108        }
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115    use crate::journal::{RunEvent, RunJournal};
116
117    fn make_journal(run_id: &str, export: &str) -> RunJournal {
118        let mut j = RunJournal::new(run_id, export);
119        j.record(RunEvent::FileWritten {
120            file_name: "part0.parquet".into(),
121            rows: 1_000,
122            bytes: 65_536,
123            part_index: 0,
124        });
125        j.record(RunEvent::RunCompleted {
126            status: "success".into(),
127            error_message: None,
128            duration_ms: 420,
129        });
130        j
131    }
132
133    #[test]
134    fn store_and_load_roundtrip() {
135        let store = StateStore::open_in_memory().unwrap();
136        let j = make_journal("run_abc_001", "orders");
137        store.store_journal(&j).unwrap();
138
139        let loaded = store.load_journal("run_abc_001").unwrap().unwrap();
140        assert_eq!(loaded.run_id, "run_abc_001");
141        assert_eq!(loaded.export_name, "orders");
142        assert_eq!(loaded.entries.len(), 2);
143        assert!(matches!(
144            loaded.entries[0].event,
145            RunEvent::FileWritten { rows: 1_000, .. }
146        ));
147        assert!(matches!(
148            loaded.entries[1].event,
149            RunEvent::RunCompleted { ref status, .. } if status == "success"
150        ));
151    }
152
153    #[test]
154    fn load_missing_returns_none() {
155        let store = StateStore::open_in_memory().unwrap();
156        assert!(store.load_journal("nonexistent").unwrap().is_none());
157    }
158
159    #[test]
160    fn store_is_idempotent_on_same_run_id() {
161        let store = StateStore::open_in_memory().unwrap();
162        let j = make_journal("run_idem", "payments");
163        store.store_journal(&j).unwrap();
164        store.store_journal(&j).unwrap();
165
166        let loaded = store.load_journal("run_idem").unwrap().unwrap();
167        assert_eq!(loaded.entries.len(), 2);
168    }
169
170    #[test]
171    fn recent_journals_returns_newest_first() {
172        let store = StateStore::open_in_memory().unwrap();
173        for i in 1..=3_u32 {
174            std::thread::sleep(std::time::Duration::from_millis(2));
175            store
176                .store_journal(&make_journal(&format!("run_{i:03}"), "events"))
177                .unwrap();
178        }
179
180        let recent = store.recent_journals("events", 2).unwrap();
181        assert_eq!(recent.len(), 2);
182        assert_eq!(recent[0].run_id, "run_003");
183        assert_eq!(recent[1].run_id, "run_002");
184    }
185}