1use crate::error::Result;
12
13use super::{StateConn, StateStore, pg_sql};
14
15#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
17pub struct RunAggregate {
18 pub run_aggregate_id: String,
20 pub started_at: String,
21 pub finished_at: String,
22 pub duration_ms: i64,
23 pub config_path: Option<String>,
24 pub parallel_mode: String,
26 pub total_exports: usize,
27 pub success_count: usize,
28 pub failed_count: usize,
29 pub skipped_count: usize,
30 pub total_rows: i64,
31 pub total_files: i64,
32 pub total_bytes: u64,
33 pub per_export: Vec<RunAggregateEntry>,
34}
35
36#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
38pub struct RunAggregateEntry {
39 pub export_name: String,
40 pub status: String,
41 pub run_id: String,
42 pub rows: i64,
43 pub files: i64,
44 pub bytes: u64,
45 pub duration_ms: i64,
46 pub mode: String,
47 pub error_message: Option<String>,
48}
49
50impl StateStore {
51 pub fn record_run_aggregate(&self, agg: &RunAggregate) -> Result<()> {
54 let details = serde_json::to_string(&agg.per_export)
55 .map_err(|e| anyhow::anyhow!("run_aggregate: serialize details_json: {:#}", e))?;
56 let sql = "INSERT INTO run_aggregate (
57 run_aggregate_id, started_at, finished_at, duration_ms,
58 config_path, parallel_mode,
59 total_exports, success_count, failed_count, skipped_count,
60 total_rows, total_files, total_bytes, details_json
61 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)";
62 match &self.conn {
63 StateConn::Sqlite(c) => {
64 c.execute(
65 sql,
66 rusqlite::params![
67 agg.run_aggregate_id,
68 agg.started_at,
69 agg.finished_at,
70 agg.duration_ms,
71 agg.config_path,
72 agg.parallel_mode,
73 agg.total_exports as i64,
74 agg.success_count as i64,
75 agg.failed_count as i64,
76 agg.skipped_count as i64,
77 agg.total_rows,
78 agg.total_files,
79 agg.total_bytes as i64,
80 details,
81 ],
82 )?;
83 }
84 StateConn::Postgres(client) => {
85 let mut c = client.borrow_mut();
86 c.execute(
87 &pg_sql(sql),
88 &[
89 &agg.run_aggregate_id,
90 &agg.started_at,
91 &agg.finished_at,
92 &agg.duration_ms,
93 &agg.config_path,
94 &agg.parallel_mode,
95 &(agg.total_exports as i64),
96 &(agg.success_count as i64),
97 &(agg.failed_count as i64),
98 &(agg.skipped_count as i64),
99 &agg.total_rows,
100 &agg.total_files,
101 &(agg.total_bytes as i64),
102 &details,
103 ],
104 )?;
105 }
106 }
107 Ok(())
108 }
109
110 #[allow(dead_code)]
112 pub fn get_recent_run_aggregates(&self, limit: usize) -> Result<Vec<RunAggregate>> {
113 let sql = "SELECT run_aggregate_id, started_at, finished_at, duration_ms,
114 config_path, parallel_mode,
115 total_exports, success_count, failed_count, skipped_count,
116 total_rows, total_files, total_bytes, details_json
117 FROM run_aggregate
118 ORDER BY finished_at DESC
119 LIMIT ?1";
120 match &self.conn {
121 StateConn::Sqlite(c) => {
122 let mut stmt = c.prepare(sql)?;
123 let rows = stmt.query_map([limit as i64], |row| {
124 let details_json: String = row.get(13)?;
125 let per_export: Vec<RunAggregateEntry> =
126 serde_json::from_str(&details_json).unwrap_or_default();
127 Ok(RunAggregate {
128 run_aggregate_id: row.get(0)?,
129 started_at: row.get(1)?,
130 finished_at: row.get(2)?,
131 duration_ms: row.get(3)?,
132 config_path: row.get(4)?,
133 parallel_mode: row.get(5)?,
134 total_exports: row.get::<_, i64>(6)? as usize,
135 success_count: row.get::<_, i64>(7)? as usize,
136 failed_count: row.get::<_, i64>(8)? as usize,
137 skipped_count: row.get::<_, i64>(9)? as usize,
138 total_rows: row.get(10)?,
139 total_files: row.get(11)?,
140 total_bytes: row.get::<_, i64>(12)? as u64,
141 per_export,
142 })
143 })?;
144 rows.collect::<std::result::Result<Vec<_>, _>>()
145 .map_err(Into::into)
146 }
147 StateConn::Postgres(client) => {
148 let mut c = client.borrow_mut();
149 let rows = c.query(
150 &format!(
151 "SELECT run_aggregate_id, started_at, finished_at, duration_ms,
152 config_path, parallel_mode,
153 total_exports, success_count, failed_count, skipped_count,
154 total_rows, total_files, total_bytes, details_json
155 FROM run_aggregate
156 ORDER BY finished_at DESC
157 LIMIT {}",
158 limit
159 ),
160 &[],
161 )?;
162 Ok(rows
163 .iter()
164 .map(|row| {
165 let details_json: String = row.get(13);
166 let per_export: Vec<RunAggregateEntry> =
167 serde_json::from_str(&details_json).unwrap_or_default();
168 RunAggregate {
169 run_aggregate_id: row.get(0),
170 started_at: row.get(1),
171 finished_at: row.get(2),
172 duration_ms: row.get(3),
173 config_path: row.get(4),
174 parallel_mode: row.get(5),
175 total_exports: row.get::<_, i64>(6) as usize,
176 success_count: row.get::<_, i64>(7) as usize,
177 failed_count: row.get::<_, i64>(8) as usize,
178 skipped_count: row.get::<_, i64>(9) as usize,
179 total_rows: row.get(10),
180 total_files: row.get(11),
181 total_bytes: row.get::<_, i64>(12) as u64,
182 per_export,
183 }
184 })
185 .collect())
186 }
187 }
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194
195 fn sample(id: &str) -> RunAggregate {
196 RunAggregate {
197 run_aggregate_id: id.into(),
198 started_at: "2026-04-27T10:00:00Z".into(),
199 finished_at: "2026-04-27T10:11:30Z".into(),
200 duration_ms: 690_000,
201 config_path: Some("pilot.yaml".into()),
202 parallel_mode: "sequential".into(),
203 total_exports: 2,
204 success_count: 1,
205 failed_count: 1,
206 skipped_count: 0,
207 total_rows: 1_500_000,
208 total_files: 12,
209 total_bytes: 750 * 1024 * 1024,
210 per_export: vec![
211 RunAggregateEntry {
212 export_name: "orders".into(),
213 status: "success".into(),
214 run_id: "orders_20260427T100000".into(),
215 rows: 1_000_000,
216 files: 10,
217 bytes: 600 * 1024 * 1024,
218 duration_ms: 600_000,
219 mode: "chunked".into(),
220 error_message: None,
221 },
222 RunAggregateEntry {
223 export_name: "users".into(),
224 status: "failed".into(),
225 run_id: "users_20260427T100000".into(),
226 rows: 500_000,
227 files: 2,
228 bytes: 150 * 1024 * 1024,
229 duration_ms: 80_000,
230 mode: "full".into(),
231 error_message: Some("connection reset".into()),
232 },
233 ],
234 }
235 }
236
237 #[test]
238 fn record_and_query_round_trip() {
239 let s = StateStore::open_in_memory().unwrap();
240 s.record_run_aggregate(&sample("agg_001")).unwrap();
241 s.record_run_aggregate(&sample("agg_002")).unwrap();
242
243 let rows = s.get_recent_run_aggregates(10).unwrap();
244 assert_eq!(rows.len(), 2);
245 let ids: Vec<_> = rows.iter().map(|r| r.run_aggregate_id.as_str()).collect();
246 assert!(ids.contains(&"agg_001"));
247 assert!(ids.contains(&"agg_002"));
248
249 let r = rows
250 .iter()
251 .find(|r| r.run_aggregate_id == "agg_001")
252 .unwrap();
253 assert_eq!(r.total_exports, 2);
254 assert_eq!(r.success_count, 1);
255 assert_eq!(r.failed_count, 1);
256 assert_eq!(r.total_rows, 1_500_000);
257 assert_eq!(r.per_export.len(), 2);
258 assert_eq!(r.per_export[0].export_name, "orders");
259 assert_eq!(
260 r.per_export[1].error_message.as_deref(),
261 Some("connection reset")
262 );
263 }
264
265 #[test]
266 fn limit_is_respected() {
267 let s = StateStore::open_in_memory().unwrap();
268 for i in 0..5 {
269 let mut a = sample(&format!("agg_{i:03}"));
270 a.finished_at = format!("2026-04-27T10:{:02}:00Z", i);
271 s.record_run_aggregate(&a).unwrap();
272 }
273 let rows = s.get_recent_run_aggregates(3).unwrap();
274 assert_eq!(rows.len(), 3);
275 assert_eq!(rows[0].run_aggregate_id, "agg_004");
276 assert_eq!(rows[1].run_aggregate_id, "agg_003");
277 assert_eq!(rows[2].run_aggregate_id, "agg_002");
278 }
279
280 #[test]
281 fn empty_per_export_is_allowed() {
282 let s = StateStore::open_in_memory().unwrap();
283 let mut a = sample("agg_empty");
284 a.per_export.clear();
285 a.total_exports = 0;
286 a.success_count = 0;
287 a.failed_count = 0;
288 s.record_run_aggregate(&a).unwrap();
289
290 let rows = s.get_recent_run_aggregates(10).unwrap();
291 assert_eq!(rows.len(), 1);
292 assert!(rows[0].per_export.is_empty());
293 }
294}