Skip to main content

rivet/state/
metrics.rs

1use crate::error::Result;
2
3use super::{StateConn, StateStore, pg_sql};
4
5/// One row from `export_metrics`.
6#[derive(Debug)]
7#[allow(dead_code)]
8pub struct ExportMetric {
9    pub export_name: String,
10    pub run_id: Option<String>,
11    pub run_at: String,
12    pub duration_ms: i64,
13    pub total_rows: i64,
14    pub peak_rss_mb: Option<i64>,
15    pub status: String,
16    pub error_message: Option<String>,
17    pub tuning_profile: Option<String>,
18    pub format: Option<String>,
19    pub mode: Option<String>,
20    pub files_produced: i64,
21    pub bytes_written: i64,
22    pub retries: i64,
23    pub validated: Option<bool>,
24    pub schema_changed: Option<bool>,
25}
26
27/// Metrics store — reads and writes `export_metrics`.
28///
29/// Invariant I4 (Metric After Verdict) governs when `record_metric` is called:
30/// only after the terminal run outcome is determined.
31impl StateStore {
32    #[allow(clippy::too_many_arguments)]
33    pub fn record_metric(
34        &self,
35        export_name: &str,
36        run_id: &str,
37        duration_ms: i64,
38        total_rows: i64,
39        peak_rss_mb: Option<i64>,
40        status: &str,
41        error_message: Option<&str>,
42        tuning_profile: Option<&str>,
43        format: Option<&str>,
44        mode: Option<&str>,
45        files_produced: i64,
46        bytes_written: i64,
47        retries: i64,
48        validated: Option<bool>,
49        schema_changed: Option<bool>,
50    ) -> Result<()> {
51        let now = chrono::Utc::now().to_rfc3339();
52        let sql = "INSERT INTO export_metrics (export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb,
53             status, error_message, tuning_profile, format, mode,
54             files_produced, bytes_written, retries, validated, schema_changed)
55             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)";
56        match &self.conn {
57            StateConn::Sqlite(c) => {
58                c.execute(
59                    sql,
60                    rusqlite::params![
61                        export_name,
62                        run_id,
63                        now,
64                        duration_ms,
65                        total_rows,
66                        peak_rss_mb,
67                        status,
68                        error_message,
69                        tuning_profile,
70                        format,
71                        mode,
72                        files_produced,
73                        bytes_written,
74                        retries,
75                        validated,
76                        schema_changed
77                    ],
78                )?;
79            }
80            StateConn::Postgres(client) => {
81                let mut c = client.borrow_mut();
82                c.execute(
83                    &pg_sql(sql),
84                    &[
85                        &export_name,
86                        &run_id,
87                        &now,
88                        &duration_ms,
89                        &total_rows,
90                        &peak_rss_mb,
91                        &status,
92                        &error_message,
93                        &tuning_profile,
94                        &format,
95                        &mode,
96                        &files_produced,
97                        &bytes_written,
98                        &retries,
99                        &validated,
100                        &schema_changed,
101                    ],
102                )?;
103            }
104        }
105        Ok(())
106    }
107
108    pub fn get_metrics(
109        &self,
110        export_name: Option<&str>,
111        limit: usize,
112    ) -> Result<Vec<ExportMetric>> {
113        let cols = "export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb,
114                    status, error_message, tuning_profile, format, mode,
115                    files_produced, bytes_written, retries, validated, schema_changed";
116
117        let limit_i64 = limit as i64;
118        match &self.conn {
119            StateConn::Sqlite(c) => {
120                let (sql, params): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(
121                    name,
122                ) = export_name
123                {
124                    (
125                        "SELECT export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb, \
126                             status, error_message, tuning_profile, format, mode, \
127                             files_produced, bytes_written, retries, validated, schema_changed \
128                             FROM export_metrics WHERE export_name = ?1 ORDER BY id DESC LIMIT ?2",
129                        vec![Box::new(name.to_string()), Box::new(limit_i64)],
130                    )
131                } else {
132                    (
133                        "SELECT export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb, \
134                             status, error_message, tuning_profile, format, mode, \
135                             files_produced, bytes_written, retries, validated, schema_changed \
136                             FROM export_metrics ORDER BY id DESC LIMIT ?1",
137                        vec![Box::new(limit_i64)],
138                    )
139                };
140                let mut stmt = c.prepare(sql)?;
141                let params_refs: Vec<&dyn rusqlite::types::ToSql> =
142                    params.iter().map(|p| p.as_ref()).collect();
143                let rows = stmt.query_map(params_refs.as_slice(), |row| {
144                    Ok(ExportMetric {
145                        export_name: row.get(0)?,
146                        run_id: row.get(1)?,
147                        run_at: row.get(2)?,
148                        duration_ms: row.get(3)?,
149                        total_rows: row.get(4)?,
150                        peak_rss_mb: row.get(5)?,
151                        status: row.get(6)?,
152                        error_message: row.get(7)?,
153                        tuning_profile: row.get(8)?,
154                        format: row.get(9)?,
155                        mode: row.get(10)?,
156                        files_produced: row.get::<_, Option<i64>>(11)?.unwrap_or(0),
157                        bytes_written: row.get::<_, Option<i64>>(12)?.unwrap_or(0),
158                        retries: row.get::<_, Option<i64>>(13)?.unwrap_or(0),
159                        validated: row.get(14)?,
160                        schema_changed: row.get(15)?,
161                    })
162                })?;
163                rows.collect::<std::result::Result<Vec<_>, _>>()
164                    .map_err(Into::into)
165            }
166            StateConn::Postgres(client) => {
167                // Single borrow for the duration of this call; safe because all Postgres
168                // operations in StateStore are sequential (no re-entrant borrows).
169                let mut c = client.borrow_mut();
170                let rows = if let Some(name) = export_name {
171                    c.query(
172                        &format!("SELECT {} FROM export_metrics WHERE export_name = $1 ORDER BY id DESC LIMIT $2", cols),
173                        &[&name, &limit_i64],
174                    )?
175                } else {
176                    c.query(
177                        &format!(
178                            "SELECT {} FROM export_metrics ORDER BY id DESC LIMIT $1",
179                            cols
180                        ),
181                        &[&limit_i64],
182                    )?
183                };
184                Ok(rows
185                    .iter()
186                    .map(|row| ExportMetric {
187                        export_name: row.get(0),
188                        run_id: row.get(1),
189                        run_at: row.get(2),
190                        duration_ms: row.get(3),
191                        total_rows: row.get(4),
192                        peak_rss_mb: row.get(5),
193                        status: row.get(6),
194                        error_message: row.get(7),
195                        tuning_profile: row.get(8),
196                        format: row.get(9),
197                        mode: row.get(10),
198                        files_produced: row.get::<_, Option<i64>>(11).unwrap_or(0),
199                        bytes_written: row.get::<_, Option<i64>>(12).unwrap_or(0),
200                        retries: row.get::<_, Option<i64>>(13).unwrap_or(0),
201                        validated: row.get(14),
202                        schema_changed: row.get(15),
203                    })
204                    .collect())
205            }
206        }
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213
214    fn store() -> StateStore {
215        StateStore::open_in_memory().expect("in-memory store")
216    }
217
218    #[test]
219    fn record_and_query_metrics() {
220        let s = store();
221        s.record_metric(
222            "orders",
223            "run_001",
224            1200,
225            50000,
226            Some(142),
227            "success",
228            None,
229            Some("safe"),
230            Some("parquet"),
231            Some("full"),
232            1,
233            4096,
234            0,
235            Some(true),
236            Some(false),
237        )
238        .unwrap();
239        s.record_metric(
240            "orders",
241            "run_002",
242            300,
243            0,
244            Some(30),
245            "failed",
246            Some("timeout"),
247            Some("safe"),
248            Some("parquet"),
249            Some("full"),
250            0,
251            0,
252            2,
253            None,
254            None,
255        )
256        .unwrap();
257
258        let metrics = s.get_metrics(Some("orders"), 10).unwrap();
259        assert_eq!(metrics.len(), 2);
260        assert_eq!(metrics[0].status, "failed");
261        assert_eq!(metrics[0].run_id.as_deref(), Some("run_002"));
262        assert_eq!(metrics[0].retries, 2);
263        assert_eq!(metrics[1].total_rows, 50000);
264        assert_eq!(metrics[1].run_id.as_deref(), Some("run_001"));
265        assert_eq!(metrics[1].files_produced, 1);
266        assert_eq!(metrics[1].bytes_written, 4096);
267        assert_eq!(metrics[1].validated, Some(true));
268        assert_eq!(metrics[1].schema_changed, Some(false));
269    }
270
271    #[test]
272    fn query_metrics_all_exports() {
273        let s = store();
274        s.record_metric(
275            "orders", "r1", 100, 1000, None, "success", None, None, None, None, 1, 500, 0, None,
276            None,
277        )
278        .unwrap();
279        s.record_metric(
280            "users", "r2", 200, 2000, None, "success", None, None, None, None, 1, 800, 0, None,
281            None,
282        )
283        .unwrap();
284
285        let metrics = s.get_metrics(None, 10).unwrap();
286        assert_eq!(metrics.len(), 2);
287    }
288
289    #[test]
290    fn metrics_limit_works() {
291        let s = store();
292        for i in 0..10 {
293            s.record_metric(
294                "t",
295                &format!("r{}", i),
296                i * 100,
297                i,
298                None,
299                "success",
300                None,
301                None,
302                None,
303                None,
304                0,
305                0,
306                0,
307                None,
308                None,
309            )
310            .unwrap();
311        }
312        let metrics = s.get_metrics(Some("t"), 3).unwrap();
313        assert_eq!(metrics.len(), 3);
314    }
315}