1use crate::error::Result;
2
3use super::{StateConn, StateStore, pg_sql};
4
5#[derive(Debug)]
7#[allow(dead_code)]
8pub struct ExportMetric {
9 pub export_name: String,
10 pub run_id: Option<String>,
11 pub run_at: String,
12 pub duration_ms: i64,
13 pub total_rows: i64,
14 pub peak_rss_mb: Option<i64>,
15 pub status: String,
16 pub error_message: Option<String>,
17 pub tuning_profile: Option<String>,
18 pub format: Option<String>,
19 pub mode: Option<String>,
20 pub files_produced: i64,
21 pub bytes_written: i64,
22 pub retries: i64,
23 pub validated: Option<bool>,
24 pub schema_changed: Option<bool>,
25}
26
27#[derive(Debug, Default, Clone)]
35pub struct MetricRow {
36 pub export_name: String,
37 pub run_id: String,
38 pub duration_ms: i64,
39 pub total_rows: i64,
40 pub peak_rss_mb: Option<i64>,
41 pub status: String,
42 pub error_message: Option<String>,
43 pub tuning_profile: Option<String>,
44 pub format: Option<String>,
45 pub mode: Option<String>,
46 pub files_produced: i64,
47 pub bytes_written: i64,
48 pub retries: i64,
49 pub validated: Option<bool>,
50 pub schema_changed: Option<bool>,
51 pub files_committed: i64,
53 pub reconciled: Option<bool>,
54 pub source_count: Option<i64>,
55 pub quality_passed: Option<bool>,
56 pub pg_temp_bytes_delta: Option<i64>,
57 pub batch_size: i64,
58 pub batch_size_memory_mb: Option<i64>,
59 pub skip_reason: Option<String>,
60 pub schema_fingerprint: Option<String>,
61 pub chunk_size: Option<i64>,
62 pub parallel: Option<i64>,
63 pub source_type: Option<String>,
64 pub destination_type: Option<String>,
65 pub rivet_version: Option<String>,
66 pub longest_chunk_ms: Option<i64>,
68}
69
70impl StateStore {
75 #[allow(clippy::too_many_arguments, dead_code)]
84 pub fn record_metric(
85 &self,
86 export_name: &str,
87 run_id: &str,
88 duration_ms: i64,
89 total_rows: i64,
90 peak_rss_mb: Option<i64>,
91 status: &str,
92 error_message: Option<&str>,
93 tuning_profile: Option<&str>,
94 format: Option<&str>,
95 mode: Option<&str>,
96 files_produced: i64,
97 bytes_written: i64,
98 retries: i64,
99 validated: Option<bool>,
100 schema_changed: Option<bool>,
101 ) -> Result<()> {
102 self.record_metric_full(&MetricRow {
103 export_name: export_name.to_string(),
104 run_id: run_id.to_string(),
105 duration_ms,
106 total_rows,
107 peak_rss_mb,
108 status: status.to_string(),
109 error_message: error_message.map(str::to_string),
110 tuning_profile: tuning_profile.map(str::to_string),
111 format: format.map(str::to_string),
112 mode: mode.map(str::to_string),
113 files_produced,
114 bytes_written,
115 retries,
116 validated,
117 schema_changed,
118 ..Default::default()
119 })
120 }
121
122 pub fn record_metric_full(&self, m: &MetricRow) -> Result<()> {
126 let now = chrono::Utc::now().to_rfc3339();
127 let sql = "INSERT INTO export_metrics (
128 export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb,
129 status, error_message, tuning_profile, format, mode,
130 files_produced, bytes_written, retries, validated, schema_changed,
131 files_committed, reconciled, source_count, quality_passed, pg_temp_bytes_delta,
132 batch_size, batch_size_memory_mb, skip_reason, schema_fingerprint,
133 chunk_size, parallel, source_type, destination_type, rivet_version,
134 longest_chunk_ms)
135 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16,
136 ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26, ?27, ?28, ?29, ?30, ?31)";
137 match &self.conn {
138 StateConn::Sqlite(c) => {
139 c.execute(
140 sql,
141 rusqlite::params![
142 m.export_name,
143 m.run_id,
144 now,
145 m.duration_ms,
146 m.total_rows,
147 m.peak_rss_mb,
148 m.status,
149 m.error_message,
150 m.tuning_profile,
151 m.format,
152 m.mode,
153 m.files_produced,
154 m.bytes_written,
155 m.retries,
156 m.validated,
157 m.schema_changed,
158 m.files_committed,
159 m.reconciled,
160 m.source_count,
161 m.quality_passed,
162 m.pg_temp_bytes_delta,
163 m.batch_size,
164 m.batch_size_memory_mb,
165 m.skip_reason,
166 m.schema_fingerprint,
167 m.chunk_size,
168 m.parallel,
169 m.source_type,
170 m.destination_type,
171 m.rivet_version,
172 m.longest_chunk_ms
173 ],
174 )?;
175 }
176 StateConn::Postgres(client) => {
177 let mut c = client.borrow_mut();
178 c.execute(
179 &pg_sql(sql),
180 &[
181 &m.export_name,
182 &m.run_id,
183 &now,
184 &m.duration_ms,
185 &m.total_rows,
186 &m.peak_rss_mb,
187 &m.status,
188 &m.error_message,
189 &m.tuning_profile,
190 &m.format,
191 &m.mode,
192 &m.files_produced,
193 &m.bytes_written,
194 &m.retries,
195 &m.validated,
196 &m.schema_changed,
197 &m.files_committed,
198 &m.reconciled,
199 &m.source_count,
200 &m.quality_passed,
201 &m.pg_temp_bytes_delta,
202 &m.batch_size,
203 &m.batch_size_memory_mb,
204 &m.skip_reason,
205 &m.schema_fingerprint,
206 &m.chunk_size,
207 &m.parallel,
208 &m.source_type,
209 &m.destination_type,
210 &m.rivet_version,
211 &m.longest_chunk_ms,
212 ],
213 )?;
214 }
215 }
216 Ok(())
217 }
218
219 pub fn record_harm(
225 &self,
226 run_id: &str,
227 export_name: &str,
228 deltas: &[(String, i64)],
229 ) -> Result<()> {
230 if deltas.is_empty() {
231 return Ok(());
232 }
233 let now = chrono::Utc::now().to_rfc3339();
234 let sql = "INSERT INTO export_harm (run_id, export_name, metric, delta, recorded_at) \
235 VALUES (?1, ?2, ?3, ?4, ?5)";
236 match &self.conn {
237 StateConn::Sqlite(c) => {
238 for (metric, delta) in deltas {
239 c.execute(
240 sql,
241 rusqlite::params![run_id, export_name, metric, delta, now],
242 )?;
243 }
244 }
245 StateConn::Postgres(client) => {
246 let mut c = client.borrow_mut();
247 for (metric, delta) in deltas {
248 c.execute(&pg_sql(sql), &[&run_id, &export_name, metric, delta, &now])?;
249 }
250 }
251 }
252 Ok(())
253 }
254
255 #[cfg(test)]
258 pub(crate) fn harm_rows_for_test(&self, run_id: &str) -> Vec<(String, i64)> {
259 match &self.conn {
260 StateConn::Sqlite(c) => {
261 let mut stmt = c
262 .prepare(
263 "SELECT metric, delta FROM export_harm WHERE run_id = ?1 ORDER BY metric",
264 )
265 .expect("prepare export_harm read");
266 let rows = stmt
267 .query_map([run_id], |r| {
268 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?))
269 })
270 .expect("query export_harm");
271 rows.filter_map(|r| r.ok()).collect()
272 }
273 _ => Vec::new(),
274 }
275 }
276
277 #[cfg(test)]
281 pub(crate) fn metric_scalar_i64(&self, run_id: &str, column: &str) -> Option<i64> {
282 match &self.conn {
283 StateConn::Sqlite(c) => c
284 .query_row(
285 &format!("SELECT {column} FROM export_metrics WHERE run_id = ?1"),
286 [run_id],
287 |r| r.get::<_, Option<i64>>(0),
288 )
289 .ok()
290 .flatten(),
291 _ => None,
292 }
293 }
294
295 pub fn get_metrics(
296 &self,
297 export_name: Option<&str>,
298 limit: usize,
299 ) -> Result<Vec<ExportMetric>> {
300 let cols = "export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb,
301 status, error_message, tuning_profile, format, mode,
302 files_produced, bytes_written, retries, validated, schema_changed";
303
304 let limit_i64 = limit as i64;
305 match &self.conn {
306 StateConn::Sqlite(c) => {
307 let (sql, params): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(
308 name,
309 ) = export_name
310 {
311 (
312 "SELECT export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb, \
313 status, error_message, tuning_profile, format, mode, \
314 files_produced, bytes_written, retries, validated, schema_changed \
315 FROM export_metrics WHERE export_name = ?1 ORDER BY id DESC LIMIT ?2",
316 vec![Box::new(name.to_string()), Box::new(limit_i64)],
317 )
318 } else {
319 (
320 "SELECT export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb, \
321 status, error_message, tuning_profile, format, mode, \
322 files_produced, bytes_written, retries, validated, schema_changed \
323 FROM export_metrics ORDER BY id DESC LIMIT ?1",
324 vec![Box::new(limit_i64)],
325 )
326 };
327 let mut stmt = c.prepare(sql)?;
328 let params_refs: Vec<&dyn rusqlite::types::ToSql> =
329 params.iter().map(|p| p.as_ref()).collect();
330 let rows = stmt.query_map(params_refs.as_slice(), |row| {
331 Ok(ExportMetric {
332 export_name: row.get(0)?,
333 run_id: row.get(1)?,
334 run_at: row.get(2)?,
335 duration_ms: row.get(3)?,
336 total_rows: row.get(4)?,
337 peak_rss_mb: row.get(5)?,
338 status: row.get(6)?,
339 error_message: row.get(7)?,
340 tuning_profile: row.get(8)?,
341 format: row.get(9)?,
342 mode: row.get(10)?,
343 files_produced: row.get::<_, Option<i64>>(11)?.unwrap_or(0),
344 bytes_written: row.get::<_, Option<i64>>(12)?.unwrap_or(0),
345 retries: row.get::<_, Option<i64>>(13)?.unwrap_or(0),
346 validated: row.get(14)?,
347 schema_changed: row.get(15)?,
348 })
349 })?;
350 rows.collect::<std::result::Result<Vec<_>, _>>()
351 .map_err(Into::into)
352 }
353 StateConn::Postgres(client) => {
354 let mut c = client.borrow_mut();
357 let rows = if let Some(name) = export_name {
358 c.query(
359 &format!("SELECT {} FROM export_metrics WHERE export_name = $1 ORDER BY id DESC LIMIT $2", cols),
360 &[&name, &limit_i64],
361 )?
362 } else {
363 c.query(
364 &format!(
365 "SELECT {} FROM export_metrics ORDER BY id DESC LIMIT $1",
366 cols
367 ),
368 &[&limit_i64],
369 )?
370 };
371 Ok(rows
372 .iter()
373 .map(|row| ExportMetric {
374 export_name: row.get(0),
375 run_id: row.get(1),
376 run_at: row.get(2),
377 duration_ms: row.get(3),
378 total_rows: row.get(4),
379 peak_rss_mb: row.get(5),
380 status: row.get(6),
381 error_message: row.get(7),
382 tuning_profile: row.get(8),
383 format: row.get(9),
384 mode: row.get(10),
385 files_produced: row.get::<_, Option<i64>>(11).unwrap_or(0),
386 bytes_written: row.get::<_, Option<i64>>(12).unwrap_or(0),
387 retries: row.get::<_, Option<i64>>(13).unwrap_or(0),
388 validated: row.get(14),
389 schema_changed: row.get(15),
390 })
391 .collect())
392 }
393 }
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400
401 fn store() -> StateStore {
402 StateStore::open_in_memory().expect("in-memory store")
403 }
404
405 #[test]
406 fn record_and_query_metrics() {
407 let s = store();
408 s.record_metric(
409 "orders",
410 "run_001",
411 1200,
412 50000,
413 Some(142),
414 "success",
415 None,
416 Some("safe"),
417 Some("parquet"),
418 Some("full"),
419 1,
420 4096,
421 0,
422 Some(true),
423 Some(false),
424 )
425 .unwrap();
426 s.record_metric(
427 "orders",
428 "run_002",
429 300,
430 0,
431 Some(30),
432 "failed",
433 Some("timeout"),
434 Some("safe"),
435 Some("parquet"),
436 Some("full"),
437 0,
438 0,
439 2,
440 None,
441 None,
442 )
443 .unwrap();
444
445 let metrics = s.get_metrics(Some("orders"), 10).unwrap();
446 assert_eq!(metrics.len(), 2);
447 assert_eq!(metrics[0].status, "failed");
448 assert_eq!(metrics[0].run_id.as_deref(), Some("run_002"));
449 assert_eq!(metrics[0].retries, 2);
450 assert_eq!(metrics[1].total_rows, 50000);
451 assert_eq!(metrics[1].run_id.as_deref(), Some("run_001"));
452 assert_eq!(metrics[1].files_produced, 1);
453 assert_eq!(metrics[1].bytes_written, 4096);
454 assert_eq!(metrics[1].validated, Some(true));
455 assert_eq!(metrics[1].schema_changed, Some(false));
456 }
457
458 #[test]
459 fn record_metric_full_persists_v9_columns_in_order() {
460 let s = store();
461 s.record_metric_full(&MetricRow {
462 export_name: "orders".into(),
463 run_id: "r1".into(),
464 duration_ms: 1200,
465 total_rows: 50_000,
466 status: "success".into(),
467 files_committed: 11,
470 source_count: Some(50_000),
471 pg_temp_bytes_delta: Some(1_048_576),
472 batch_size: 32_000,
473 chunk_size: Some(100_000),
474 parallel: Some(4),
475 longest_chunk_ms: Some(1_839),
476 ..Default::default()
477 })
478 .unwrap();
479
480 let got = s.get_metrics(Some("orders"), 1).unwrap();
482 assert_eq!(got.len(), 1);
483 assert_eq!(got[0].total_rows, 50_000);
484 assert_eq!(got[0].run_id.as_deref(), Some("r1"));
485
486 assert_eq!(s.metric_scalar_i64("r1", "files_committed"), Some(11));
490 assert_eq!(s.metric_scalar_i64("r1", "source_count"), Some(50_000));
491 assert_eq!(
492 s.metric_scalar_i64("r1", "pg_temp_bytes_delta"),
493 Some(1_048_576)
494 );
495 assert_eq!(s.metric_scalar_i64("r1", "batch_size"), Some(32_000));
496 assert_eq!(s.metric_scalar_i64("r1", "chunk_size"), Some(100_000));
497 assert_eq!(s.metric_scalar_i64("r1", "parallel"), Some(4));
498 assert_eq!(s.metric_scalar_i64("r1", "longest_chunk_ms"), Some(1_839));
499 }
500
501 #[test]
502 fn record_harm_round_trips_per_counter_rows() {
503 let s = store();
508 let deltas = vec![
509 ("pg_tup_returned".to_string(), 1_000_000),
510 ("pg_blks_read".to_string(), 2_048),
511 ("pg_temp_files".to_string(), 3),
512 ];
513 s.record_harm("run-h", "content_items", &deltas).unwrap();
514
515 assert_eq!(
517 s.harm_rows_for_test("run-h"),
518 vec![
519 ("pg_blks_read".to_string(), 2_048),
520 ("pg_temp_files".to_string(), 3),
521 ("pg_tup_returned".to_string(), 1_000_000),
522 ]
523 );
524
525 s.record_harm("run-empty", "x", &[]).unwrap();
527 assert!(s.harm_rows_for_test("run-empty").is_empty());
528 }
529
530 #[test]
531 fn query_metrics_all_exports() {
532 let s = store();
533 s.record_metric(
534 "orders", "r1", 100, 1000, None, "success", None, None, None, None, 1, 500, 0, None,
535 None,
536 )
537 .unwrap();
538 s.record_metric(
539 "users", "r2", 200, 2000, None, "success", None, None, None, None, 1, 800, 0, None,
540 None,
541 )
542 .unwrap();
543
544 let metrics = s.get_metrics(None, 10).unwrap();
545 assert_eq!(metrics.len(), 2);
546 }
547
548 #[test]
549 fn metrics_limit_works() {
550 let s = store();
551 for i in 0..10 {
552 s.record_metric(
553 "t",
554 &format!("r{}", i),
555 i * 100,
556 i,
557 None,
558 "success",
559 None,
560 None,
561 None,
562 None,
563 0,
564 0,
565 0,
566 None,
567 None,
568 )
569 .unwrap();
570 }
571 let metrics = s.get_metrics(Some("t"), 3).unwrap();
572 assert_eq!(metrics.len(), 3);
573 }
574}