use crate::error::Result;
use super::{StateConn, StateStore, pg_sql};
#[derive(Debug)]
#[allow(dead_code)]
pub struct ExportMetric {
pub export_name: String,
pub run_id: Option<String>,
pub run_at: String,
pub duration_ms: i64,
pub total_rows: i64,
pub peak_rss_mb: Option<i64>,
pub status: String,
pub error_message: Option<String>,
pub tuning_profile: Option<String>,
pub format: Option<String>,
pub mode: Option<String>,
pub files_produced: i64,
pub bytes_written: i64,
pub retries: i64,
pub validated: Option<bool>,
pub schema_changed: Option<bool>,
}
impl StateStore {
#[allow(clippy::too_many_arguments)]
pub fn record_metric(
&self,
export_name: &str,
run_id: &str,
duration_ms: i64,
total_rows: i64,
peak_rss_mb: Option<i64>,
status: &str,
error_message: Option<&str>,
tuning_profile: Option<&str>,
format: Option<&str>,
mode: Option<&str>,
files_produced: i64,
bytes_written: i64,
retries: i64,
validated: Option<bool>,
schema_changed: Option<bool>,
) -> Result<()> {
let now = chrono::Utc::now().to_rfc3339();
let sql = "INSERT INTO export_metrics (export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb,
status, error_message, tuning_profile, format, mode,
files_produced, bytes_written, retries, validated, schema_changed)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)";
match &self.conn {
StateConn::Sqlite(c) => {
c.execute(
sql,
rusqlite::params![
export_name,
run_id,
now,
duration_ms,
total_rows,
peak_rss_mb,
status,
error_message,
tuning_profile,
format,
mode,
files_produced,
bytes_written,
retries,
validated,
schema_changed
],
)?;
}
StateConn::Postgres(client) => {
let mut c = client.borrow_mut();
c.execute(
&pg_sql(sql),
&[
&export_name,
&run_id,
&now,
&duration_ms,
&total_rows,
&peak_rss_mb,
&status,
&error_message,
&tuning_profile,
&format,
&mode,
&files_produced,
&bytes_written,
&retries,
&validated,
&schema_changed,
],
)?;
}
}
Ok(())
}
pub fn get_metrics(
&self,
export_name: Option<&str>,
limit: usize,
) -> Result<Vec<ExportMetric>> {
let cols = "export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb,
status, error_message, tuning_profile, format, mode,
files_produced, bytes_written, retries, validated, schema_changed";
let limit_i64 = limit as i64;
match &self.conn {
StateConn::Sqlite(c) => {
let (sql, params): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(
name,
) = export_name
{
(
"SELECT export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb, \
status, error_message, tuning_profile, format, mode, \
files_produced, bytes_written, retries, validated, schema_changed \
FROM export_metrics WHERE export_name = ?1 ORDER BY id DESC LIMIT ?2",
vec![Box::new(name.to_string()), Box::new(limit_i64)],
)
} else {
(
"SELECT export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb, \
status, error_message, tuning_profile, format, mode, \
files_produced, bytes_written, retries, validated, schema_changed \
FROM export_metrics ORDER BY id DESC LIMIT ?1",
vec![Box::new(limit_i64)],
)
};
let mut stmt = c.prepare(sql)?;
let params_refs: Vec<&dyn rusqlite::types::ToSql> =
params.iter().map(|p| p.as_ref()).collect();
let rows = stmt.query_map(params_refs.as_slice(), |row| {
Ok(ExportMetric {
export_name: row.get(0)?,
run_id: row.get(1)?,
run_at: row.get(2)?,
duration_ms: row.get(3)?,
total_rows: row.get(4)?,
peak_rss_mb: row.get(5)?,
status: row.get(6)?,
error_message: row.get(7)?,
tuning_profile: row.get(8)?,
format: row.get(9)?,
mode: row.get(10)?,
files_produced: row.get::<_, Option<i64>>(11)?.unwrap_or(0),
bytes_written: row.get::<_, Option<i64>>(12)?.unwrap_or(0),
retries: row.get::<_, Option<i64>>(13)?.unwrap_or(0),
validated: row.get(14)?,
schema_changed: row.get(15)?,
})
})?;
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
}
StateConn::Postgres(client) => {
let mut c = client.borrow_mut();
let rows = if let Some(name) = export_name {
c.query(
&format!("SELECT {} FROM export_metrics WHERE export_name = $1 ORDER BY id DESC LIMIT $2", cols),
&[&name, &limit_i64],
)?
} else {
c.query(
&format!(
"SELECT {} FROM export_metrics ORDER BY id DESC LIMIT $1",
cols
),
&[&limit_i64],
)?
};
Ok(rows
.iter()
.map(|row| ExportMetric {
export_name: row.get(0),
run_id: row.get(1),
run_at: row.get(2),
duration_ms: row.get(3),
total_rows: row.get(4),
peak_rss_mb: row.get(5),
status: row.get(6),
error_message: row.get(7),
tuning_profile: row.get(8),
format: row.get(9),
mode: row.get(10),
files_produced: row.get::<_, Option<i64>>(11).unwrap_or(0),
bytes_written: row.get::<_, Option<i64>>(12).unwrap_or(0),
retries: row.get::<_, Option<i64>>(13).unwrap_or(0),
validated: row.get(14),
schema_changed: row.get(15),
})
.collect())
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn store() -> StateStore {
StateStore::open_in_memory().expect("in-memory store")
}
#[test]
fn record_and_query_metrics() {
let s = store();
s.record_metric(
"orders",
"run_001",
1200,
50000,
Some(142),
"success",
None,
Some("safe"),
Some("parquet"),
Some("full"),
1,
4096,
0,
Some(true),
Some(false),
)
.unwrap();
s.record_metric(
"orders",
"run_002",
300,
0,
Some(30),
"failed",
Some("timeout"),
Some("safe"),
Some("parquet"),
Some("full"),
0,
0,
2,
None,
None,
)
.unwrap();
let metrics = s.get_metrics(Some("orders"), 10).unwrap();
assert_eq!(metrics.len(), 2);
assert_eq!(metrics[0].status, "failed");
assert_eq!(metrics[0].run_id.as_deref(), Some("run_002"));
assert_eq!(metrics[0].retries, 2);
assert_eq!(metrics[1].total_rows, 50000);
assert_eq!(metrics[1].run_id.as_deref(), Some("run_001"));
assert_eq!(metrics[1].files_produced, 1);
assert_eq!(metrics[1].bytes_written, 4096);
assert_eq!(metrics[1].validated, Some(true));
assert_eq!(metrics[1].schema_changed, Some(false));
}
#[test]
fn query_metrics_all_exports() {
let s = store();
s.record_metric(
"orders", "r1", 100, 1000, None, "success", None, None, None, None, 1, 500, 0, None,
None,
)
.unwrap();
s.record_metric(
"users", "r2", 200, 2000, None, "success", None, None, None, None, 1, 800, 0, None,
None,
)
.unwrap();
let metrics = s.get_metrics(None, 10).unwrap();
assert_eq!(metrics.len(), 2);
}
#[test]
fn metrics_limit_works() {
let s = store();
for i in 0..10 {
s.record_metric(
"t",
&format!("r{}", i),
i * 100,
i,
None,
"success",
None,
None,
None,
None,
0,
0,
0,
None,
None,
)
.unwrap();
}
let metrics = s.get_metrics(Some("t"), 3).unwrap();
assert_eq!(metrics.len(), 3);
}
}