Skip to main content

roboticus_db/
metrics.rs

1use crate::{Database, DbResultExt};
2use roboticus_core::Result;
3
4#[derive(Debug, Clone)]
5pub struct TransactionRecord {
6    pub id: String,
7    pub tx_type: String,
8    pub amount: f64,
9    pub currency: String,
10    pub counterparty: Option<String>,
11    pub tx_hash: Option<String>,
12    pub metadata_json: Option<String>,
13    pub created_at: String,
14}
15
16#[allow(clippy::too_many_arguments)]
17pub fn record_inference_cost(
18    db: &Database,
19    model: &str,
20    provider: &str,
21    tokens_in: i64,
22    tokens_out: i64,
23    cost: f64,
24    tier: Option<&str>,
25    cached: bool,
26    latency_ms: Option<i64>,
27    quality_score: Option<f64>,
28    escalation: bool,
29    turn_id: Option<&str>,
30) -> Result<String> {
31    let conn = db.conn();
32    let id = uuid::Uuid::new_v4().to_string();
33    conn.execute(
34        "INSERT INTO inference_costs \
35         (id, model, provider, tokens_in, tokens_out, cost, tier, cached, latency_ms, quality_score, escalation, turn_id) \
36         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
37        rusqlite::params![
38            id,
39            model,
40            provider,
41            tokens_in,
42            tokens_out,
43            cost,
44            tier,
45            cached as i32,
46            latency_ms,
47            quality_score,
48            escalation as i32,
49            turn_id
50        ],
51    )
52    .db_err()?;
53    Ok(id)
54}
55
56pub fn record_transaction(
57    db: &Database,
58    tx_type: &str,
59    amount: f64,
60    currency: &str,
61    counterparty: Option<&str>,
62    tx_hash: Option<&str>,
63) -> Result<String> {
64    record_transaction_with_metadata(db, tx_type, amount, currency, counterparty, tx_hash, None)
65}
66
67pub fn record_transaction_with_metadata(
68    db: &Database,
69    tx_type: &str,
70    amount: f64,
71    currency: &str,
72    counterparty: Option<&str>,
73    tx_hash: Option<&str>,
74    metadata_json: Option<&str>,
75) -> Result<String> {
76    let conn = db.conn();
77    let id = uuid::Uuid::new_v4().to_string();
78    conn.execute(
79        "INSERT INTO transactions (id, tx_type, amount, currency, counterparty, tx_hash, metadata_json) \
80         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
81        rusqlite::params![id, tx_type, amount, currency, counterparty, tx_hash, metadata_json],
82    )
83    .db_err()?;
84    Ok(id)
85}
86
87pub fn query_transactions(db: &Database, hours: i64) -> Result<Vec<TransactionRecord>> {
88    // Ensure hours is positive to prevent a negative value from producing
89    // a malformed datetime modifier (e.g., "--5 hours" becomes a SQL comment).
90    let hours = hours.unsigned_abs().max(1);
91    let conn = db.conn();
92    let mut stmt = conn
93        .prepare(
94            "SELECT id, tx_type, amount, currency, counterparty, tx_hash, metadata_json, created_at \
95             FROM transactions \
96             WHERE created_at >= datetime('now', ?1) \
97             ORDER BY created_at DESC",
98        )
99        .db_err()?;
100
101    let offset = format!("-{hours} hours");
102    let rows = stmt
103        .query_map([&offset], |row| {
104            Ok(TransactionRecord {
105                id: row.get(0)?,
106                tx_type: row.get(1)?,
107                amount: row.get(2)?,
108                currency: row.get(3)?,
109                counterparty: row.get(4)?,
110                tx_hash: row.get(5)?,
111                metadata_json: row.get(6)?,
112                created_at: row.get(7)?,
113            })
114        })
115        .db_err()?;
116
117    rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
118}
119
120/// Sum transaction amounts within the given time window (in hours).
121/// Used by treasury policy enforcement to check hourly/daily rate limits.
122pub fn sum_transaction_amounts(db: &Database, hours: i64) -> Result<f64> {
123    let hours = hours.unsigned_abs().max(1);
124    let conn = db.conn();
125    let offset = format!("-{hours} hours");
126    let total: f64 = conn
127        .query_row(
128            "SELECT COALESCE(SUM(amount), 0.0) FROM transactions \
129             WHERE created_at >= datetime('now', ?1)",
130            [&offset],
131            |row| row.get(0),
132        )
133        .db_err()?;
134    Ok(total)
135}
136
137/// Return the most recent quality observations from `inference_costs`, ordered
138/// oldest-first so that the caller can feed them into a ring buffer in chronological
139/// order. Each row is `(model, quality_score)`.
140pub fn recent_quality_scores(db: &Database, limit: i64) -> Result<Vec<(String, f64)>> {
141    let limit = limit.max(1);
142    let conn = db.conn();
143    let mut stmt = conn
144        .prepare(
145            "SELECT model, quality_score FROM inference_costs \
146             WHERE quality_score IS NOT NULL \
147             ORDER BY created_at DESC, rowid DESC LIMIT ?1",
148        )
149        .db_err()?;
150    let rows: Vec<(String, f64)> = stmt
151        .query_map(rusqlite::params![limit], |row| {
152            Ok((row.get::<_, String>(0)?, row.get::<_, f64>(1)?))
153        })
154        .db_err()?
155        .filter_map(|r| {
156            r.inspect_err(|e| tracing::warn!(error = %e, "metrics: skipping malformed cost row"))
157                .ok()
158        })
159        .collect();
160    // Reverse so oldest comes first (ring buffer insertion order).
161    let mut rows = rows;
162    rows.reverse();
163    Ok(rows)
164}
165
166pub fn record_metric_snapshot(db: &Database, metrics_json: &str) -> Result<String> {
167    let conn = db.conn();
168    let id = uuid::Uuid::new_v4().to_string();
169    conn.execute(
170        "INSERT INTO metric_snapshots (id, metrics_json) VALUES (?1, ?2)",
171        rusqlite::params![id, metrics_json],
172    )
173    .db_err()?;
174    Ok(id)
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180
181    fn test_db() -> Database {
182        Database::new(":memory:").unwrap()
183    }
184
185    #[test]
186    fn record_and_query_inference_cost() {
187        let db = test_db();
188        let id = record_inference_cost(
189            &db,
190            "claude-4",
191            "anthropic",
192            1000,
193            500,
194            0.015,
195            Some("T1"),
196            false,
197            Some(150),
198            Some(0.92),
199            false,
200            None,
201        )
202        .unwrap();
203        assert!(!id.is_empty());
204    }
205
206    #[test]
207    fn record_and_query_transactions() {
208        let db = test_db();
209        record_transaction(&db, "inference", 0.01, "USD", Some("anthropic"), None).unwrap();
210        record_transaction(&db, "earning", 1.00, "USDC", Some("user-42"), Some("0xabc")).unwrap();
211
212        let txs = query_transactions(&db, 24).unwrap();
213        assert_eq!(txs.len(), 2);
214    }
215
216    #[test]
217    fn record_metric_snapshot_works() {
218        let db = test_db();
219        let id = record_metric_snapshot(&db, r#"{"cpu":0.5,"mem_mb":128}"#).unwrap();
220        assert!(!id.is_empty());
221    }
222
223    #[test]
224    fn query_transactions_empty() {
225        let db = test_db();
226        let txs = query_transactions(&db, 1).unwrap();
227        assert!(txs.is_empty());
228    }
229
230    #[test]
231    fn record_transaction_all_optional_none() {
232        let db = test_db();
233        let id = record_transaction(&db, "yield", 0.5, "USDC", None, None).unwrap();
234        assert!(!id.is_empty());
235        let txs = query_transactions(&db, 24).unwrap();
236        assert_eq!(txs.len(), 1);
237        assert!(txs[0].counterparty.is_none());
238        assert!(txs[0].tx_hash.is_none());
239    }
240
241    #[test]
242    fn record_inference_cost_cached() {
243        let db = test_db();
244        let id = record_inference_cost(
245            &db, "gpt-4", "openai", 100, 50, 0.005, None, true, None, None, false, None,
246        )
247        .unwrap();
248        assert!(!id.is_empty());
249    }
250
251    #[test]
252    fn transaction_record_fields_populated() {
253        let db = test_db();
254        record_transaction(&db, "payment", 10.0, "USDC", Some("vendor"), Some("0xhash")).unwrap();
255        let txs = query_transactions(&db, 24).unwrap();
256        assert_eq!(txs[0].tx_type, "payment");
257        assert!((txs[0].amount - 10.0).abs() < f64::EPSILON);
258        assert_eq!(txs[0].currency, "USDC");
259        assert_eq!(txs[0].counterparty.as_deref(), Some("vendor"));
260        assert_eq!(txs[0].tx_hash.as_deref(), Some("0xhash"));
261        assert!(!txs[0].created_at.is_empty());
262    }
263
264    #[test]
265    fn multiple_metric_snapshots() {
266        let db = test_db();
267        let id1 = record_metric_snapshot(&db, r#"{"cpu":0.1}"#).unwrap();
268        let id2 = record_metric_snapshot(&db, r#"{"cpu":0.9}"#).unwrap();
269        assert_ne!(id1, id2);
270    }
271
272    #[test]
273    fn recent_quality_scores_empty() {
274        let db = test_db();
275        let scores = recent_quality_scores(&db, 10).unwrap();
276        assert!(scores.is_empty());
277    }
278
279    #[test]
280    fn recent_quality_scores_returns_oldest_first() {
281        let db = test_db();
282        // Insert three rows with quality scores.
283        record_inference_cost(
284            &db,
285            "model-a",
286            "prov",
287            100,
288            50,
289            0.01,
290            None,
291            false,
292            Some(100),
293            Some(0.7),
294            false,
295            None,
296        )
297        .unwrap();
298        record_inference_cost(
299            &db,
300            "model-b",
301            "prov",
302            200,
303            100,
304            0.02,
305            None,
306            false,
307            Some(200),
308            Some(0.9),
309            false,
310            None,
311        )
312        .unwrap();
313        record_inference_cost(
314            &db,
315            "model-a",
316            "prov",
317            150,
318            75,
319            0.015,
320            None,
321            false,
322            Some(150),
323            Some(0.85),
324            false,
325            None,
326        )
327        .unwrap();
328
329        let scores = recent_quality_scores(&db, 10).unwrap();
330        assert_eq!(scores.len(), 3);
331        // Oldest first means first inserted row comes first.
332        assert_eq!(scores[0].0, "model-a");
333        assert!((scores[0].1 - 0.7).abs() < f64::EPSILON);
334        assert_eq!(scores[2].0, "model-a");
335        assert!((scores[2].1 - 0.85).abs() < f64::EPSILON);
336    }
337
338    #[test]
339    fn recent_quality_scores_skips_null() {
340        let db = test_db();
341        record_inference_cost(
342            &db,
343            "m",
344            "p",
345            100,
346            50,
347            0.01,
348            None,
349            false,
350            None,
351            Some(0.8),
352            false,
353            None,
354        )
355        .unwrap();
356        // Insert a row with NULL quality_score.
357        record_inference_cost(
358            &db, "m", "p", 100, 50, 0.01, None, true, None, None, false, None,
359        )
360        .unwrap();
361        let scores = recent_quality_scores(&db, 10).unwrap();
362        assert_eq!(scores.len(), 1);
363        assert!((scores[0].1 - 0.8).abs() < f64::EPSILON);
364    }
365
366    #[test]
367    fn recent_quality_scores_respects_limit() {
368        let db = test_db();
369        for i in 0..5 {
370            record_inference_cost(
371                &db,
372                "m",
373                "p",
374                100,
375                50,
376                0.01,
377                None,
378                false,
379                None,
380                Some(i as f64 * 0.2),
381                false,
382                None,
383            )
384            .unwrap();
385        }
386        let scores = recent_quality_scores(&db, 3).unwrap();
387        assert_eq!(scores.len(), 3);
388    }
389}