Skip to main content

rivet/state/
metrics.rs

1use crate::error::Result;
2
3use super::{StateConn, StateStore, pg_sql};
4
5/// One row from `export_metrics`.
6#[derive(Debug)]
7#[allow(dead_code)]
8pub struct ExportMetric {
9    pub export_name: String,
10    pub run_id: Option<String>,
11    pub run_at: String,
12    pub duration_ms: i64,
13    pub total_rows: i64,
14    pub peak_rss_mb: Option<i64>,
15    pub status: String,
16    pub error_message: Option<String>,
17    pub tuning_profile: Option<String>,
18    pub format: Option<String>,
19    pub mode: Option<String>,
20    pub files_produced: i64,
21    pub bytes_written: i64,
22    pub retries: i64,
23    pub validated: Option<bool>,
24    pub schema_changed: Option<bool>,
25}
26
27/// Every column written to one `export_metrics` row.
28///
29/// Bundles the original 15 metrics with the v9 additions (source harm,
30/// completeness, memory, config dimensions) so a new metric is a struct field +
31/// a column, not another positional argument to a 30-arg function. `Default`
32/// lets a call site fill only the signals it actually has; the run path builds
33/// the whole thing via `pipeline::job::build_metric_row`.
34#[derive(Debug, Default, Clone)]
35pub struct MetricRow {
36    pub export_name: String,
37    pub run_id: String,
38    pub duration_ms: i64,
39    pub total_rows: i64,
40    pub peak_rss_mb: Option<i64>,
41    pub status: String,
42    pub error_message: Option<String>,
43    pub tuning_profile: Option<String>,
44    pub format: Option<String>,
45    pub mode: Option<String>,
46    pub files_produced: i64,
47    pub bytes_written: i64,
48    pub retries: i64,
49    pub validated: Option<bool>,
50    pub schema_changed: Option<bool>,
51    // ── v9: post-pilot analysis signals ──
52    pub files_committed: i64,
53    pub reconciled: Option<bool>,
54    pub source_count: Option<i64>,
55    pub quality_passed: Option<bool>,
56    pub pg_temp_bytes_delta: Option<i64>,
57    pub batch_size: i64,
58    pub batch_size_memory_mb: Option<i64>,
59    pub skip_reason: Option<String>,
60    pub schema_fingerprint: Option<String>,
61    pub chunk_size: Option<i64>,
62    pub parallel: Option<i64>,
63    pub source_type: Option<String>,
64    pub destination_type: Option<String>,
65    pub rivet_version: Option<String>,
66    // ── v10: timing ──
67    pub longest_chunk_ms: Option<i64>,
68}
69
70/// Metrics store — reads and writes `export_metrics`.
71///
72/// Invariant I4 (Metric After Verdict) governs when `record_metric` is called:
73/// only after the terminal run outcome is determined.
74impl StateStore {
75    /// Back-compat shim: the original 15-field metric. Fills the v9 columns with
76    /// defaults (NULL) and delegates to [`record_metric_full`]. The production
77    /// run/apply path now builds a full [`MetricRow`]; this shim remains for the
78    /// unit + integration tests that only assert the core signals.
79    ///
80    /// `#[allow(dead_code)]`: the only non-test caller migrated to
81    /// `record_metric_full`, and the bin/lib dead-code pass can't see the uses
82    /// in `tests/*` (same reason `RunSummary::stub_for_testing` carries it).
83    #[allow(clippy::too_many_arguments, dead_code)]
84    pub fn record_metric(
85        &self,
86        export_name: &str,
87        run_id: &str,
88        duration_ms: i64,
89        total_rows: i64,
90        peak_rss_mb: Option<i64>,
91        status: &str,
92        error_message: Option<&str>,
93        tuning_profile: Option<&str>,
94        format: Option<&str>,
95        mode: Option<&str>,
96        files_produced: i64,
97        bytes_written: i64,
98        retries: i64,
99        validated: Option<bool>,
100        schema_changed: Option<bool>,
101    ) -> Result<()> {
102        self.record_metric_full(&MetricRow {
103            export_name: export_name.to_string(),
104            run_id: run_id.to_string(),
105            duration_ms,
106            total_rows,
107            peak_rss_mb,
108            status: status.to_string(),
109            error_message: error_message.map(str::to_string),
110            tuning_profile: tuning_profile.map(str::to_string),
111            format: format.map(str::to_string),
112            mode: mode.map(str::to_string),
113            files_produced,
114            bytes_written,
115            retries,
116            validated,
117            schema_changed,
118            ..Default::default()
119        })
120    }
121
122    /// Insert one fully-populated `export_metrics` row (all v1 + v9 columns).
123    /// The production run/apply path builds the complete [`MetricRow`]; the
124    /// 15-field [`record_metric`] shim covers callers with only the core signals.
125    pub fn record_metric_full(&self, m: &MetricRow) -> Result<()> {
126        let now = chrono::Utc::now().to_rfc3339();
127        let sql = "INSERT INTO export_metrics (
128             export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb,
129             status, error_message, tuning_profile, format, mode,
130             files_produced, bytes_written, retries, validated, schema_changed,
131             files_committed, reconciled, source_count, quality_passed, pg_temp_bytes_delta,
132             batch_size, batch_size_memory_mb, skip_reason, schema_fingerprint,
133             chunk_size, parallel, source_type, destination_type, rivet_version,
134             longest_chunk_ms)
135             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16,
136             ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26, ?27, ?28, ?29, ?30, ?31)";
137        match &self.conn {
138            StateConn::Sqlite(c) => {
139                c.execute(
140                    sql,
141                    rusqlite::params![
142                        m.export_name,
143                        m.run_id,
144                        now,
145                        m.duration_ms,
146                        m.total_rows,
147                        m.peak_rss_mb,
148                        m.status,
149                        m.error_message,
150                        m.tuning_profile,
151                        m.format,
152                        m.mode,
153                        m.files_produced,
154                        m.bytes_written,
155                        m.retries,
156                        m.validated,
157                        m.schema_changed,
158                        m.files_committed,
159                        m.reconciled,
160                        m.source_count,
161                        m.quality_passed,
162                        m.pg_temp_bytes_delta,
163                        m.batch_size,
164                        m.batch_size_memory_mb,
165                        m.skip_reason,
166                        m.schema_fingerprint,
167                        m.chunk_size,
168                        m.parallel,
169                        m.source_type,
170                        m.destination_type,
171                        m.rivet_version,
172                        m.longest_chunk_ms
173                    ],
174                )?;
175            }
176            StateConn::Postgres(client) => {
177                let mut c = client.borrow_mut();
178                c.execute(
179                    &pg_sql(sql),
180                    &[
181                        &m.export_name,
182                        &m.run_id,
183                        &now,
184                        &m.duration_ms,
185                        &m.total_rows,
186                        &m.peak_rss_mb,
187                        &m.status,
188                        &m.error_message,
189                        &m.tuning_profile,
190                        &m.format,
191                        &m.mode,
192                        &m.files_produced,
193                        &m.bytes_written,
194                        &m.retries,
195                        &m.validated,
196                        &m.schema_changed,
197                        &m.files_committed,
198                        &m.reconciled,
199                        &m.source_count,
200                        &m.quality_passed,
201                        &m.pg_temp_bytes_delta,
202                        &m.batch_size,
203                        &m.batch_size_memory_mb,
204                        &m.skip_reason,
205                        &m.schema_fingerprint,
206                        &m.chunk_size,
207                        &m.parallel,
208                        &m.source_type,
209                        &m.destination_type,
210                        &m.rivet_version,
211                        &m.longest_chunk_ms,
212                    ],
213                )?;
214            }
215        }
216        Ok(())
217    }
218
219    /// Record per-run source-harm deltas (Tier 2): one row per counter into
220    /// `export_harm`, keyed on `run_id`. Best-effort observability — the caller
221    /// logs and ignores any error; a missing harm row never affects the run
222    /// verdict. No-op for an empty delta set (e.g. a non-Postgres source whose
223    /// probe was skipped, or a counter set that didn't move).
224    pub fn record_harm(
225        &self,
226        run_id: &str,
227        export_name: &str,
228        deltas: &[(String, i64)],
229    ) -> Result<()> {
230        if deltas.is_empty() {
231            return Ok(());
232        }
233        let now = chrono::Utc::now().to_rfc3339();
234        let sql = "INSERT INTO export_harm (run_id, export_name, metric, delta, recorded_at) \
235                   VALUES (?1, ?2, ?3, ?4, ?5)";
236        match &self.conn {
237            StateConn::Sqlite(c) => {
238                for (metric, delta) in deltas {
239                    c.execute(
240                        sql,
241                        rusqlite::params![run_id, export_name, metric, delta, now],
242                    )?;
243                }
244            }
245            StateConn::Postgres(client) => {
246                let mut c = client.borrow_mut();
247                for (metric, delta) in deltas {
248                    c.execute(&pg_sql(sql), &[&run_id, &export_name, metric, delta, &now])?;
249                }
250            }
251        }
252        Ok(())
253    }
254
255    /// Test-only read of the `export_harm` rows for a run, `(metric, delta)`
256    /// sorted by metric — lets tests trace the harm signal through the table.
257    #[cfg(test)]
258    pub(crate) fn harm_rows_for_test(&self, run_id: &str) -> Vec<(String, i64)> {
259        match &self.conn {
260            StateConn::Sqlite(c) => {
261                let mut stmt = c
262                    .prepare(
263                        "SELECT metric, delta FROM export_harm WHERE run_id = ?1 ORDER BY metric",
264                    )
265                    .expect("prepare export_harm read");
266                let rows = stmt
267                    .query_map([run_id], |r| {
268                        Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?))
269                    })
270                    .expect("query export_harm");
271                rows.filter_map(|r| r.ok()).collect()
272            }
273            _ => Vec::new(),
274        }
275    }
276
277    /// Test-only raw scalar read of a v9 metric column the typed `get_metrics`
278    /// path intentionally doesn't surface — lets tests pin that the wide INSERT
279    /// mapped each field to the right column (catches a positional param swap).
280    #[cfg(test)]
281    pub(crate) fn metric_scalar_i64(&self, run_id: &str, column: &str) -> Option<i64> {
282        match &self.conn {
283            StateConn::Sqlite(c) => c
284                .query_row(
285                    &format!("SELECT {column} FROM export_metrics WHERE run_id = ?1"),
286                    [run_id],
287                    |r| r.get::<_, Option<i64>>(0),
288                )
289                .ok()
290                .flatten(),
291            _ => None,
292        }
293    }
294
295    pub fn get_metrics(
296        &self,
297        export_name: Option<&str>,
298        limit: usize,
299    ) -> Result<Vec<ExportMetric>> {
300        let cols = "export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb,
301                    status, error_message, tuning_profile, format, mode,
302                    files_produced, bytes_written, retries, validated, schema_changed";
303
304        let limit_i64 = limit as i64;
305        match &self.conn {
306            StateConn::Sqlite(c) => {
307                let (sql, params): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(
308                    name,
309                ) = export_name
310                {
311                    (
312                        "SELECT export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb, \
313                             status, error_message, tuning_profile, format, mode, \
314                             files_produced, bytes_written, retries, validated, schema_changed \
315                             FROM export_metrics WHERE export_name = ?1 ORDER BY id DESC LIMIT ?2",
316                        vec![Box::new(name.to_string()), Box::new(limit_i64)],
317                    )
318                } else {
319                    (
320                        "SELECT export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb, \
321                             status, error_message, tuning_profile, format, mode, \
322                             files_produced, bytes_written, retries, validated, schema_changed \
323                             FROM export_metrics ORDER BY id DESC LIMIT ?1",
324                        vec![Box::new(limit_i64)],
325                    )
326                };
327                let mut stmt = c.prepare(sql)?;
328                let params_refs: Vec<&dyn rusqlite::types::ToSql> =
329                    params.iter().map(|p| p.as_ref()).collect();
330                let rows = stmt.query_map(params_refs.as_slice(), |row| {
331                    Ok(ExportMetric {
332                        export_name: row.get(0)?,
333                        run_id: row.get(1)?,
334                        run_at: row.get(2)?,
335                        duration_ms: row.get(3)?,
336                        total_rows: row.get(4)?,
337                        peak_rss_mb: row.get(5)?,
338                        status: row.get(6)?,
339                        error_message: row.get(7)?,
340                        tuning_profile: row.get(8)?,
341                        format: row.get(9)?,
342                        mode: row.get(10)?,
343                        files_produced: row.get::<_, Option<i64>>(11)?.unwrap_or(0),
344                        bytes_written: row.get::<_, Option<i64>>(12)?.unwrap_or(0),
345                        retries: row.get::<_, Option<i64>>(13)?.unwrap_or(0),
346                        validated: row.get(14)?,
347                        schema_changed: row.get(15)?,
348                    })
349                })?;
350                rows.collect::<std::result::Result<Vec<_>, _>>()
351                    .map_err(Into::into)
352            }
353            StateConn::Postgres(client) => {
354                // Single borrow for the duration of this call; safe because all Postgres
355                // operations in StateStore are sequential (no re-entrant borrows).
356                let mut c = client.borrow_mut();
357                let rows = if let Some(name) = export_name {
358                    c.query(
359                        &format!("SELECT {} FROM export_metrics WHERE export_name = $1 ORDER BY id DESC LIMIT $2", cols),
360                        &[&name, &limit_i64],
361                    )?
362                } else {
363                    c.query(
364                        &format!(
365                            "SELECT {} FROM export_metrics ORDER BY id DESC LIMIT $1",
366                            cols
367                        ),
368                        &[&limit_i64],
369                    )?
370                };
371                Ok(rows
372                    .iter()
373                    .map(|row| ExportMetric {
374                        export_name: row.get(0),
375                        run_id: row.get(1),
376                        run_at: row.get(2),
377                        duration_ms: row.get(3),
378                        total_rows: row.get(4),
379                        peak_rss_mb: row.get(5),
380                        status: row.get(6),
381                        error_message: row.get(7),
382                        tuning_profile: row.get(8),
383                        format: row.get(9),
384                        mode: row.get(10),
385                        files_produced: row.get::<_, Option<i64>>(11).unwrap_or(0),
386                        bytes_written: row.get::<_, Option<i64>>(12).unwrap_or(0),
387                        retries: row.get::<_, Option<i64>>(13).unwrap_or(0),
388                        validated: row.get(14),
389                        schema_changed: row.get(15),
390                    })
391                    .collect())
392            }
393        }
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400
401    fn store() -> StateStore {
402        StateStore::open_in_memory().expect("in-memory store")
403    }
404
405    #[test]
406    fn record_and_query_metrics() {
407        let s = store();
408        s.record_metric(
409            "orders",
410            "run_001",
411            1200,
412            50000,
413            Some(142),
414            "success",
415            None,
416            Some("safe"),
417            Some("parquet"),
418            Some("full"),
419            1,
420            4096,
421            0,
422            Some(true),
423            Some(false),
424        )
425        .unwrap();
426        s.record_metric(
427            "orders",
428            "run_002",
429            300,
430            0,
431            Some(30),
432            "failed",
433            Some("timeout"),
434            Some("safe"),
435            Some("parquet"),
436            Some("full"),
437            0,
438            0,
439            2,
440            None,
441            None,
442        )
443        .unwrap();
444
445        let metrics = s.get_metrics(Some("orders"), 10).unwrap();
446        assert_eq!(metrics.len(), 2);
447        assert_eq!(metrics[0].status, "failed");
448        assert_eq!(metrics[0].run_id.as_deref(), Some("run_002"));
449        assert_eq!(metrics[0].retries, 2);
450        assert_eq!(metrics[1].total_rows, 50000);
451        assert_eq!(metrics[1].run_id.as_deref(), Some("run_001"));
452        assert_eq!(metrics[1].files_produced, 1);
453        assert_eq!(metrics[1].bytes_written, 4096);
454        assert_eq!(metrics[1].validated, Some(true));
455        assert_eq!(metrics[1].schema_changed, Some(false));
456    }
457
458    #[test]
459    fn record_metric_full_persists_v9_columns_in_order() {
460        let s = store();
461        s.record_metric_full(&MetricRow {
462            export_name: "orders".into(),
463            run_id: "r1".into(),
464            duration_ms: 1200,
465            total_rows: 50_000,
466            status: "success".into(),
467            // v9 signals, chosen as distinct values so a positional param swap
468            // in the 30-column INSERT shows up as a wrong-column read below.
469            files_committed: 11,
470            source_count: Some(50_000),
471            pg_temp_bytes_delta: Some(1_048_576),
472            batch_size: 32_000,
473            chunk_size: Some(100_000),
474            parallel: Some(4),
475            longest_chunk_ms: Some(1_839),
476            ..Default::default()
477        })
478        .unwrap();
479
480        // Core read path is unchanged.
481        let got = s.get_metrics(Some("orders"), 1).unwrap();
482        assert_eq!(got.len(), 1);
483        assert_eq!(got[0].total_rows, 50_000);
484        assert_eq!(got[0].run_id.as_deref(), Some("r1"));
485
486        // v9 columns round-trip to the right column (chunk_size=100000 vs
487        // parallel=4 pinned separately so a swap of the two Option<i64> params
488        // can't pass).
489        assert_eq!(s.metric_scalar_i64("r1", "files_committed"), Some(11));
490        assert_eq!(s.metric_scalar_i64("r1", "source_count"), Some(50_000));
491        assert_eq!(
492            s.metric_scalar_i64("r1", "pg_temp_bytes_delta"),
493            Some(1_048_576)
494        );
495        assert_eq!(s.metric_scalar_i64("r1", "batch_size"), Some(32_000));
496        assert_eq!(s.metric_scalar_i64("r1", "chunk_size"), Some(100_000));
497        assert_eq!(s.metric_scalar_i64("r1", "parallel"), Some(4));
498        assert_eq!(s.metric_scalar_i64("r1", "longest_chunk_ms"), Some(1_839));
499    }
500
501    #[test]
502    fn record_harm_round_trips_per_counter_rows() {
503        // Traces the Tier 2 signal through SQLite: v11 migration creates
504        // export_harm, record_harm writes one row per counter, the read returns
505        // them. (The live source-probe that produces these deltas needs a real
506        // DB; this validates the storage path it lands in.)
507        let s = store();
508        let deltas = vec![
509            ("pg_tup_returned".to_string(), 1_000_000),
510            ("pg_blks_read".to_string(), 2_048),
511            ("pg_temp_files".to_string(), 3),
512        ];
513        s.record_harm("run-h", "content_items", &deltas).unwrap();
514
515        // One row per counter, keyed on run_id (read sorted by metric).
516        assert_eq!(
517            s.harm_rows_for_test("run-h"),
518            vec![
519                ("pg_blks_read".to_string(), 2_048),
520                ("pg_temp_files".to_string(), 3),
521                ("pg_tup_returned".to_string(), 1_000_000),
522            ]
523        );
524
525        // Empty delta set (probe skipped / counters unmoved) → no rows, no error.
526        s.record_harm("run-empty", "x", &[]).unwrap();
527        assert!(s.harm_rows_for_test("run-empty").is_empty());
528    }
529
530    #[test]
531    fn query_metrics_all_exports() {
532        let s = store();
533        s.record_metric(
534            "orders", "r1", 100, 1000, None, "success", None, None, None, None, 1, 500, 0, None,
535            None,
536        )
537        .unwrap();
538        s.record_metric(
539            "users", "r2", 200, 2000, None, "success", None, None, None, None, 1, 800, 0, None,
540            None,
541        )
542        .unwrap();
543
544        let metrics = s.get_metrics(None, 10).unwrap();
545        assert_eq!(metrics.len(), 2);
546    }
547
548    #[test]
549    fn metrics_limit_works() {
550        let s = store();
551        for i in 0..10 {
552            s.record_metric(
553                "t",
554                &format!("r{}", i),
555                i * 100,
556                i,
557                None,
558                "success",
559                None,
560                None,
561                None,
562                None,
563                0,
564                0,
565                0,
566                None,
567                None,
568            )
569            .unwrap();
570        }
571        let metrics = s.get_metrics(Some("t"), 3).unwrap();
572        assert_eq!(metrics.len(), 3);
573    }
574}