rivet/state/
journal_store.rs1use crate::error::Result;
2use crate::journal::RunJournal;
3
4use super::{StateConn, StateStore};
5
6impl StateStore {
7 pub fn store_journal(&self, journal: &RunJournal) -> Result<()> {
12 let json = serde_json::to_string(journal)?;
13 let now = chrono::Utc::now().to_rfc3339();
14 match &self.conn {
15 StateConn::Sqlite(c) => {
16 c.execute(
17 "INSERT OR REPLACE INTO run_journal (run_id, export_name, finished_at, journal_json)
18 VALUES (?1, ?2, ?3, ?4)",
19 rusqlite::params![journal.run_id, journal.export_name, now, json],
20 )?;
21 }
22 StateConn::Postgres(client) => {
23 let mut c = client.borrow_mut();
24 c.execute(
25 "INSERT INTO run_journal (run_id, export_name, finished_at, journal_json)
26 VALUES ($1, $2, $3, $4)
27 ON CONFLICT (run_id) DO UPDATE SET
28 export_name = excluded.export_name,
29 finished_at = excluded.finished_at,
30 journal_json = excluded.journal_json",
31 &[&journal.run_id, &journal.export_name, &now, &json],
32 )?;
33 }
34 }
35 Ok(())
36 }
37
38 #[allow(dead_code)]
40 pub fn load_journal(&self, run_id: &str) -> Result<Option<RunJournal>> {
41 match &self.conn {
42 StateConn::Sqlite(c) => {
43 let result = c.query_row(
44 "SELECT journal_json FROM run_journal WHERE run_id = ?1",
45 rusqlite::params![run_id],
46 |row| row.get::<_, String>(0),
47 );
48 match result {
49 Ok(json) => Ok(Some(serde_json::from_str(&json)?)),
50 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
51 Err(e) => Err(e.into()),
52 }
53 }
54 StateConn::Postgres(client) => {
55 let mut c = client.borrow_mut();
56 match c.query_opt(
57 "SELECT journal_json FROM run_journal WHERE run_id = $1",
58 &[&run_id],
59 )? {
60 Some(row) => {
61 let json: String = row.get(0);
62 Ok(Some(serde_json::from_str(&json)?))
63 }
64 None => Ok(None),
65 }
66 }
67 }
68 }
69
70 #[allow(dead_code)]
72 pub fn recent_journals(&self, export_name: &str, limit: usize) -> Result<Vec<RunJournal>> {
73 let sql = "SELECT journal_json FROM run_journal
74 WHERE export_name = ?1
75 ORDER BY finished_at DESC
76 LIMIT ?2";
77 match &self.conn {
78 StateConn::Sqlite(c) => {
79 let mut stmt = c.prepare(sql)?;
80 let rows = stmt.query_map(rusqlite::params![export_name, limit as i64], |row| {
81 row.get::<_, String>(0)
82 })?;
83 let mut out = Vec::new();
84 for row in rows {
85 out.push(serde_json::from_str::<RunJournal>(&row?)?);
86 }
87 Ok(out)
88 }
89 StateConn::Postgres(client) => {
90 let mut c = client.borrow_mut();
91 let rows = c.query(
92 &format!(
93 "SELECT journal_json FROM run_journal
94 WHERE export_name = $1
95 ORDER BY finished_at DESC
96 LIMIT {}",
97 limit
98 ),
99 &[&export_name],
100 )?;
101 let mut out = Vec::new();
102 for row in rows {
103 let json: String = row.get(0);
104 out.push(serde_json::from_str::<RunJournal>(&json)?);
105 }
106 Ok(out)
107 }
108 }
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115 use crate::journal::{RunEvent, RunJournal};
116
117 fn make_journal(run_id: &str, export: &str) -> RunJournal {
118 let mut j = RunJournal::new(run_id, export);
119 j.record(RunEvent::FileWritten {
120 file_name: "part0.parquet".into(),
121 rows: 1_000,
122 bytes: 65_536,
123 part_index: 0,
124 });
125 j.record(RunEvent::RunCompleted {
126 status: "success".into(),
127 error_message: None,
128 duration_ms: 420,
129 });
130 j
131 }
132
133 #[test]
134 fn store_and_load_roundtrip() {
135 let store = StateStore::open_in_memory().unwrap();
136 let j = make_journal("run_abc_001", "orders");
137 store.store_journal(&j).unwrap();
138
139 let loaded = store.load_journal("run_abc_001").unwrap().unwrap();
140 assert_eq!(loaded.run_id, "run_abc_001");
141 assert_eq!(loaded.export_name, "orders");
142 assert_eq!(loaded.entries.len(), 2);
143 assert!(matches!(
144 loaded.entries[0].event,
145 RunEvent::FileWritten { rows: 1_000, .. }
146 ));
147 assert!(matches!(
148 loaded.entries[1].event,
149 RunEvent::RunCompleted { ref status, .. } if status == "success"
150 ));
151 }
152
153 #[test]
154 fn load_missing_returns_none() {
155 let store = StateStore::open_in_memory().unwrap();
156 assert!(store.load_journal("nonexistent").unwrap().is_none());
157 }
158
159 #[test]
160 fn store_is_idempotent_on_same_run_id() {
161 let store = StateStore::open_in_memory().unwrap();
162 let j = make_journal("run_idem", "payments");
163 store.store_journal(&j).unwrap();
164 store.store_journal(&j).unwrap();
165
166 let loaded = store.load_journal("run_idem").unwrap().unwrap();
167 assert_eq!(loaded.entries.len(), 2);
168 }
169
170 #[test]
171 fn recent_journals_returns_newest_first() {
172 let store = StateStore::open_in_memory().unwrap();
173 for i in 1..=3_u32 {
174 std::thread::sleep(std::time::Duration::from_millis(2));
175 store
176 .store_journal(&make_journal(&format!("run_{i:03}"), "events"))
177 .unwrap();
178 }
179
180 let recent = store.recent_journals("events", 2).unwrap();
181 assert_eq!(recent.len(), 2);
182 assert_eq!(recent[0].run_id, "run_003");
183 assert_eq!(recent[1].run_id, "run_002");
184 }
185}