1use crate::error::Result;
2
3use super::{StateConn, StateStore, pg_sql};
4
5#[derive(Debug)]
7#[allow(dead_code)]
8pub struct ExportMetric {
9 pub export_name: String,
10 pub run_id: Option<String>,
11 pub run_at: String,
12 pub duration_ms: i64,
13 pub total_rows: i64,
14 pub peak_rss_mb: Option<i64>,
15 pub status: String,
16 pub error_message: Option<String>,
17 pub tuning_profile: Option<String>,
18 pub format: Option<String>,
19 pub mode: Option<String>,
20 pub files_produced: i64,
21 pub bytes_written: i64,
22 pub retries: i64,
23 pub validated: Option<bool>,
24 pub schema_changed: Option<bool>,
25}
26
27impl StateStore {
32 #[allow(clippy::too_many_arguments)]
33 pub fn record_metric(
34 &self,
35 export_name: &str,
36 run_id: &str,
37 duration_ms: i64,
38 total_rows: i64,
39 peak_rss_mb: Option<i64>,
40 status: &str,
41 error_message: Option<&str>,
42 tuning_profile: Option<&str>,
43 format: Option<&str>,
44 mode: Option<&str>,
45 files_produced: i64,
46 bytes_written: i64,
47 retries: i64,
48 validated: Option<bool>,
49 schema_changed: Option<bool>,
50 ) -> Result<()> {
51 let now = chrono::Utc::now().to_rfc3339();
52 let sql = "INSERT INTO export_metrics (export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb,
53 status, error_message, tuning_profile, format, mode,
54 files_produced, bytes_written, retries, validated, schema_changed)
55 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)";
56 match &self.conn {
57 StateConn::Sqlite(c) => {
58 c.execute(
59 sql,
60 rusqlite::params![
61 export_name,
62 run_id,
63 now,
64 duration_ms,
65 total_rows,
66 peak_rss_mb,
67 status,
68 error_message,
69 tuning_profile,
70 format,
71 mode,
72 files_produced,
73 bytes_written,
74 retries,
75 validated,
76 schema_changed
77 ],
78 )?;
79 }
80 StateConn::Postgres(client) => {
81 let mut c = client.borrow_mut();
82 c.execute(
83 &pg_sql(sql),
84 &[
85 &export_name,
86 &run_id,
87 &now,
88 &duration_ms,
89 &total_rows,
90 &peak_rss_mb,
91 &status,
92 &error_message,
93 &tuning_profile,
94 &format,
95 &mode,
96 &files_produced,
97 &bytes_written,
98 &retries,
99 &validated,
100 &schema_changed,
101 ],
102 )?;
103 }
104 }
105 Ok(())
106 }
107
108 pub fn get_metrics(
109 &self,
110 export_name: Option<&str>,
111 limit: usize,
112 ) -> Result<Vec<ExportMetric>> {
113 let cols = "export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb,
114 status, error_message, tuning_profile, format, mode,
115 files_produced, bytes_written, retries, validated, schema_changed";
116
117 let limit_i64 = limit as i64;
118 match &self.conn {
119 StateConn::Sqlite(c) => {
120 let (sql, params): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(
121 name,
122 ) = export_name
123 {
124 (
125 "SELECT export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb, \
126 status, error_message, tuning_profile, format, mode, \
127 files_produced, bytes_written, retries, validated, schema_changed \
128 FROM export_metrics WHERE export_name = ?1 ORDER BY id DESC LIMIT ?2",
129 vec![Box::new(name.to_string()), Box::new(limit_i64)],
130 )
131 } else {
132 (
133 "SELECT export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb, \
134 status, error_message, tuning_profile, format, mode, \
135 files_produced, bytes_written, retries, validated, schema_changed \
136 FROM export_metrics ORDER BY id DESC LIMIT ?1",
137 vec![Box::new(limit_i64)],
138 )
139 };
140 let mut stmt = c.prepare(sql)?;
141 let params_refs: Vec<&dyn rusqlite::types::ToSql> =
142 params.iter().map(|p| p.as_ref()).collect();
143 let rows = stmt.query_map(params_refs.as_slice(), |row| {
144 Ok(ExportMetric {
145 export_name: row.get(0)?,
146 run_id: row.get(1)?,
147 run_at: row.get(2)?,
148 duration_ms: row.get(3)?,
149 total_rows: row.get(4)?,
150 peak_rss_mb: row.get(5)?,
151 status: row.get(6)?,
152 error_message: row.get(7)?,
153 tuning_profile: row.get(8)?,
154 format: row.get(9)?,
155 mode: row.get(10)?,
156 files_produced: row.get::<_, Option<i64>>(11)?.unwrap_or(0),
157 bytes_written: row.get::<_, Option<i64>>(12)?.unwrap_or(0),
158 retries: row.get::<_, Option<i64>>(13)?.unwrap_or(0),
159 validated: row.get(14)?,
160 schema_changed: row.get(15)?,
161 })
162 })?;
163 rows.collect::<std::result::Result<Vec<_>, _>>()
164 .map_err(Into::into)
165 }
166 StateConn::Postgres(client) => {
167 let mut c = client.borrow_mut();
170 let rows = if let Some(name) = export_name {
171 c.query(
172 &format!("SELECT {} FROM export_metrics WHERE export_name = $1 ORDER BY id DESC LIMIT $2", cols),
173 &[&name, &limit_i64],
174 )?
175 } else {
176 c.query(
177 &format!(
178 "SELECT {} FROM export_metrics ORDER BY id DESC LIMIT $1",
179 cols
180 ),
181 &[&limit_i64],
182 )?
183 };
184 Ok(rows
185 .iter()
186 .map(|row| ExportMetric {
187 export_name: row.get(0),
188 run_id: row.get(1),
189 run_at: row.get(2),
190 duration_ms: row.get(3),
191 total_rows: row.get(4),
192 peak_rss_mb: row.get(5),
193 status: row.get(6),
194 error_message: row.get(7),
195 tuning_profile: row.get(8),
196 format: row.get(9),
197 mode: row.get(10),
198 files_produced: row.get::<_, Option<i64>>(11).unwrap_or(0),
199 bytes_written: row.get::<_, Option<i64>>(12).unwrap_or(0),
200 retries: row.get::<_, Option<i64>>(13).unwrap_or(0),
201 validated: row.get(14),
202 schema_changed: row.get(15),
203 })
204 .collect())
205 }
206 }
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213
214 fn store() -> StateStore {
215 StateStore::open_in_memory().expect("in-memory store")
216 }
217
218 #[test]
219 fn record_and_query_metrics() {
220 let s = store();
221 s.record_metric(
222 "orders",
223 "run_001",
224 1200,
225 50000,
226 Some(142),
227 "success",
228 None,
229 Some("safe"),
230 Some("parquet"),
231 Some("full"),
232 1,
233 4096,
234 0,
235 Some(true),
236 Some(false),
237 )
238 .unwrap();
239 s.record_metric(
240 "orders",
241 "run_002",
242 300,
243 0,
244 Some(30),
245 "failed",
246 Some("timeout"),
247 Some("safe"),
248 Some("parquet"),
249 Some("full"),
250 0,
251 0,
252 2,
253 None,
254 None,
255 )
256 .unwrap();
257
258 let metrics = s.get_metrics(Some("orders"), 10).unwrap();
259 assert_eq!(metrics.len(), 2);
260 assert_eq!(metrics[0].status, "failed");
261 assert_eq!(metrics[0].run_id.as_deref(), Some("run_002"));
262 assert_eq!(metrics[0].retries, 2);
263 assert_eq!(metrics[1].total_rows, 50000);
264 assert_eq!(metrics[1].run_id.as_deref(), Some("run_001"));
265 assert_eq!(metrics[1].files_produced, 1);
266 assert_eq!(metrics[1].bytes_written, 4096);
267 assert_eq!(metrics[1].validated, Some(true));
268 assert_eq!(metrics[1].schema_changed, Some(false));
269 }
270
271 #[test]
272 fn query_metrics_all_exports() {
273 let s = store();
274 s.record_metric(
275 "orders", "r1", 100, 1000, None, "success", None, None, None, None, 1, 500, 0, None,
276 None,
277 )
278 .unwrap();
279 s.record_metric(
280 "users", "r2", 200, 2000, None, "success", None, None, None, None, 1, 800, 0, None,
281 None,
282 )
283 .unwrap();
284
285 let metrics = s.get_metrics(None, 10).unwrap();
286 assert_eq!(metrics.len(), 2);
287 }
288
289 #[test]
290 fn metrics_limit_works() {
291 let s = store();
292 for i in 0..10 {
293 s.record_metric(
294 "t",
295 &format!("r{}", i),
296 i * 100,
297 i,
298 None,
299 "success",
300 None,
301 None,
302 None,
303 None,
304 0,
305 0,
306 0,
307 None,
308 None,
309 )
310 .unwrap();
311 }
312 let metrics = s.get_metrics(Some("t"), 3).unwrap();
313 assert_eq!(metrics.len(), 3);
314 }
315}