Skip to main content

rivet/state/
run_aggregate.rs

1//! Aggregate run summary store — one row per `rivet run` invocation.
2//!
3//! Per-export rows stay in `export_metrics`.  This table answers:
4//! - "what happened in the last N scheduled runs as a whole?"
5//! - "which run produced the most data / the most failures?"
6//!
7//! `details_json` carries the per-export breakdown so callers do not have to
8//! join on `run_at` ranges to reconstruct the run.  This is intentional: the
9//! aggregate row is observational, not a source of truth — `export_metrics`
10//! remains the canonical per-export record.
11use crate::error::Result;
12
13use super::{StateConn, StateStore, pg_sql};
14
15/// One aggregated `rivet run`.
16#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
17pub struct RunAggregate {
18    /// Unique id assigned by the pipeline (`agg_<utc_ts>`).
19    pub run_aggregate_id: String,
20    pub started_at: String,
21    pub finished_at: String,
22    pub duration_ms: i64,
23    pub config_path: Option<String>,
24    /// `sequential` | `parallel-threads` | `parallel-processes`.
25    pub parallel_mode: String,
26    pub total_exports: usize,
27    pub success_count: usize,
28    pub failed_count: usize,
29    pub skipped_count: usize,
30    pub total_rows: i64,
31    pub total_files: i64,
32    pub total_bytes: u64,
33    pub per_export: Vec<RunAggregateEntry>,
34}
35
36/// Per-export row inside an aggregate.
37#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
38pub struct RunAggregateEntry {
39    pub export_name: String,
40    pub status: String,
41    pub run_id: String,
42    pub rows: i64,
43    pub files: i64,
44    pub bytes: u64,
45    pub duration_ms: i64,
46    pub mode: String,
47    pub error_message: Option<String>,
48}
49
50impl StateStore {
51    /// Persist an aggregate.  `per_export` is serialized as a JSON array into
52    /// `details_json`.
53    pub fn record_run_aggregate(&self, agg: &RunAggregate) -> Result<()> {
54        let details = serde_json::to_string(&agg.per_export)
55            .map_err(|e| anyhow::anyhow!("run_aggregate: serialize details_json: {:#}", e))?;
56        let sql = "INSERT INTO run_aggregate (
57                run_aggregate_id, started_at, finished_at, duration_ms,
58                config_path, parallel_mode,
59                total_exports, success_count, failed_count, skipped_count,
60                total_rows, total_files, total_bytes, details_json
61            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)";
62        match &self.conn {
63            StateConn::Sqlite(c) => {
64                c.execute(
65                    sql,
66                    rusqlite::params![
67                        agg.run_aggregate_id,
68                        agg.started_at,
69                        agg.finished_at,
70                        agg.duration_ms,
71                        agg.config_path,
72                        agg.parallel_mode,
73                        agg.total_exports as i64,
74                        agg.success_count as i64,
75                        agg.failed_count as i64,
76                        agg.skipped_count as i64,
77                        agg.total_rows,
78                        agg.total_files,
79                        agg.total_bytes as i64,
80                        details,
81                    ],
82                )?;
83            }
84            StateConn::Postgres(client) => {
85                let mut c = client.borrow_mut();
86                c.execute(
87                    &pg_sql(sql),
88                    &[
89                        &agg.run_aggregate_id,
90                        &agg.started_at,
91                        &agg.finished_at,
92                        &agg.duration_ms,
93                        &agg.config_path,
94                        &agg.parallel_mode,
95                        &(agg.total_exports as i64),
96                        &(agg.success_count as i64),
97                        &(agg.failed_count as i64),
98                        &(agg.skipped_count as i64),
99                        &agg.total_rows,
100                        &agg.total_files,
101                        &(agg.total_bytes as i64),
102                        &details,
103                    ],
104                )?;
105            }
106        }
107        Ok(())
108    }
109
110    /// Most-recent aggregates first.
111    #[allow(dead_code)]
112    pub fn get_recent_run_aggregates(&self, limit: usize) -> Result<Vec<RunAggregate>> {
113        let sql = "SELECT run_aggregate_id, started_at, finished_at, duration_ms,
114                    config_path, parallel_mode,
115                    total_exports, success_count, failed_count, skipped_count,
116                    total_rows, total_files, total_bytes, details_json
117             FROM run_aggregate
118             ORDER BY finished_at DESC
119             LIMIT ?1";
120        match &self.conn {
121            StateConn::Sqlite(c) => {
122                let mut stmt = c.prepare(sql)?;
123                let rows = stmt.query_map([limit as i64], |row| {
124                    let details_json: String = row.get(13)?;
125                    let per_export: Vec<RunAggregateEntry> =
126                        serde_json::from_str(&details_json).unwrap_or_default();
127                    Ok(RunAggregate {
128                        run_aggregate_id: row.get(0)?,
129                        started_at: row.get(1)?,
130                        finished_at: row.get(2)?,
131                        duration_ms: row.get(3)?,
132                        config_path: row.get(4)?,
133                        parallel_mode: row.get(5)?,
134                        total_exports: row.get::<_, i64>(6)? as usize,
135                        success_count: row.get::<_, i64>(7)? as usize,
136                        failed_count: row.get::<_, i64>(8)? as usize,
137                        skipped_count: row.get::<_, i64>(9)? as usize,
138                        total_rows: row.get(10)?,
139                        total_files: row.get(11)?,
140                        total_bytes: row.get::<_, i64>(12)? as u64,
141                        per_export,
142                    })
143                })?;
144                rows.collect::<std::result::Result<Vec<_>, _>>()
145                    .map_err(Into::into)
146            }
147            StateConn::Postgres(client) => {
148                let mut c = client.borrow_mut();
149                let rows = c.query(
150                    &format!(
151                        "SELECT run_aggregate_id, started_at, finished_at, duration_ms,
152                                config_path, parallel_mode,
153                                total_exports, success_count, failed_count, skipped_count,
154                                total_rows, total_files, total_bytes, details_json
155                         FROM run_aggregate
156                         ORDER BY finished_at DESC
157                         LIMIT {}",
158                        limit
159                    ),
160                    &[],
161                )?;
162                Ok(rows
163                    .iter()
164                    .map(|row| {
165                        let details_json: String = row.get(13);
166                        let per_export: Vec<RunAggregateEntry> =
167                            serde_json::from_str(&details_json).unwrap_or_default();
168                        RunAggregate {
169                            run_aggregate_id: row.get(0),
170                            started_at: row.get(1),
171                            finished_at: row.get(2),
172                            duration_ms: row.get(3),
173                            config_path: row.get(4),
174                            parallel_mode: row.get(5),
175                            total_exports: row.get::<_, i64>(6) as usize,
176                            success_count: row.get::<_, i64>(7) as usize,
177                            failed_count: row.get::<_, i64>(8) as usize,
178                            skipped_count: row.get::<_, i64>(9) as usize,
179                            total_rows: row.get(10),
180                            total_files: row.get(11),
181                            total_bytes: row.get::<_, i64>(12) as u64,
182                            per_export,
183                        }
184                    })
185                    .collect())
186            }
187        }
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194
195    fn sample(id: &str) -> RunAggregate {
196        RunAggregate {
197            run_aggregate_id: id.into(),
198            started_at: "2026-04-27T10:00:00Z".into(),
199            finished_at: "2026-04-27T10:11:30Z".into(),
200            duration_ms: 690_000,
201            config_path: Some("pilot.yaml".into()),
202            parallel_mode: "sequential".into(),
203            total_exports: 2,
204            success_count: 1,
205            failed_count: 1,
206            skipped_count: 0,
207            total_rows: 1_500_000,
208            total_files: 12,
209            total_bytes: 750 * 1024 * 1024,
210            per_export: vec![
211                RunAggregateEntry {
212                    export_name: "orders".into(),
213                    status: "success".into(),
214                    run_id: "orders_20260427T100000".into(),
215                    rows: 1_000_000,
216                    files: 10,
217                    bytes: 600 * 1024 * 1024,
218                    duration_ms: 600_000,
219                    mode: "chunked".into(),
220                    error_message: None,
221                },
222                RunAggregateEntry {
223                    export_name: "users".into(),
224                    status: "failed".into(),
225                    run_id: "users_20260427T100000".into(),
226                    rows: 500_000,
227                    files: 2,
228                    bytes: 150 * 1024 * 1024,
229                    duration_ms: 80_000,
230                    mode: "full".into(),
231                    error_message: Some("connection reset".into()),
232                },
233            ],
234        }
235    }
236
237    #[test]
238    fn record_and_query_round_trip() {
239        let s = StateStore::open_in_memory().unwrap();
240        s.record_run_aggregate(&sample("agg_001")).unwrap();
241        s.record_run_aggregate(&sample("agg_002")).unwrap();
242
243        let rows = s.get_recent_run_aggregates(10).unwrap();
244        assert_eq!(rows.len(), 2);
245        let ids: Vec<_> = rows.iter().map(|r| r.run_aggregate_id.as_str()).collect();
246        assert!(ids.contains(&"agg_001"));
247        assert!(ids.contains(&"agg_002"));
248
249        let r = rows
250            .iter()
251            .find(|r| r.run_aggregate_id == "agg_001")
252            .unwrap();
253        assert_eq!(r.total_exports, 2);
254        assert_eq!(r.success_count, 1);
255        assert_eq!(r.failed_count, 1);
256        assert_eq!(r.total_rows, 1_500_000);
257        assert_eq!(r.per_export.len(), 2);
258        assert_eq!(r.per_export[0].export_name, "orders");
259        assert_eq!(
260            r.per_export[1].error_message.as_deref(),
261            Some("connection reset")
262        );
263    }
264
265    #[test]
266    fn limit_is_respected() {
267        let s = StateStore::open_in_memory().unwrap();
268        for i in 0..5 {
269            let mut a = sample(&format!("agg_{i:03}"));
270            a.finished_at = format!("2026-04-27T10:{:02}:00Z", i);
271            s.record_run_aggregate(&a).unwrap();
272        }
273        let rows = s.get_recent_run_aggregates(3).unwrap();
274        assert_eq!(rows.len(), 3);
275        assert_eq!(rows[0].run_aggregate_id, "agg_004");
276        assert_eq!(rows[1].run_aggregate_id, "agg_003");
277        assert_eq!(rows[2].run_aggregate_id, "agg_002");
278    }
279
280    #[test]
281    fn empty_per_export_is_allowed() {
282        let s = StateStore::open_in_memory().unwrap();
283        let mut a = sample("agg_empty");
284        a.per_export.clear();
285        a.total_exports = 0;
286        a.success_count = 0;
287        a.failed_count = 0;
288        s.record_run_aggregate(&a).unwrap();
289
290        let rows = s.get_recent_run_aggregates(10).unwrap();
291        assert_eq!(rows.len(), 1);
292        assert!(rows[0].per_export.is_empty());
293    }
294}