1use std::collections::HashMap;
84use std::path::{Path, PathBuf};
85
86use async_trait::async_trait;
87use chrono::{DateTime, NaiveDate, Utc};
88use rusqlite::{params, params_from_iter, Connection, OptionalExtension};
89use thiserror::Error;
90
91use crate::metrics::types::{
92 DailyMetrics, ForwardEndpointMetrics, ForwardMetricsFilter, ForwardMetricsSummary,
93 ForwardRequestMetrics, ForwardStatus, MetricsDateFilter, MetricsSummary, ModelMetrics,
94 RoundMetrics, RoundStatus, SessionDetail, SessionMetrics, SessionMetricsFilter, SessionStatus,
95 TokenUsage, ToolCallMetrics,
96};
97
98pub type MetricsResult<T> = Result<T, MetricsError>;
103
104#[derive(Debug, Error)]
109pub enum MetricsError {
110 #[error("sqlite error: {0}")]
115 Sqlite(#[from] rusqlite::Error),
116
117 #[error("time parse error: {0}")]
122 Chrono(#[from] chrono::ParseError),
123
124 #[error("io error: {0}")]
129 Io(#[from] std::io::Error),
130
131 #[error("storage task join error: {0}")]
136 Task(String),
137
138 #[error("invalid metrics data: {0}")]
143 InvalidData(String),
144}
145
146#[derive(Debug, Clone)]
170pub struct ToolCallCompletion {
171 pub completed_at: DateTime<Utc>,
173 pub success: bool,
175 pub error: Option<String>,
177}
178
179#[async_trait]
232pub trait MetricsStorage: Send + Sync {
233 async fn init(&self) -> MetricsResult<()>;
242
243 async fn upsert_session_start(
260 &self,
261 session_id: &str,
262 model: &str,
263 started_at: DateTime<Utc>,
264 ) -> MetricsResult<()>;
265
266 async fn update_session_message_count(
276 &self,
277 session_id: &str,
278 message_count: u32,
279 updated_at: DateTime<Utc>,
280 ) -> MetricsResult<()>;
281
282 async fn complete_session(
292 &self,
293 session_id: &str,
294 status: SessionStatus,
295 completed_at: DateTime<Utc>,
296 ) -> MetricsResult<()>;
297
298 async fn insert_round_start(
310 &self,
311 round_id: &str,
312 session_id: &str,
313 model: &str,
314 started_at: DateTime<Utc>,
315 ) -> MetricsResult<()>;
316
317 async fn complete_round(
330 &self,
331 round_id: &str,
332 completed_at: DateTime<Utc>,
333 status: RoundStatus,
334 usage: TokenUsage,
335 prompt_cached_tool_outputs: u32,
336 error: Option<String>,
337 ) -> MetricsResult<()>;
338
339 async fn record_round_compression(
341 &self,
342 round_id: &str,
343 compressed_at: DateTime<Utc>,
344 tokens_saved: u32,
345 ) -> MetricsResult<()>;
346
347 async fn insert_tool_start(
360 &self,
361 tool_call_id: &str,
362 round_id: &str,
363 session_id: &str,
364 tool_name: &str,
365 started_at: DateTime<Utc>,
366 ) -> MetricsResult<()>;
367
368 async fn complete_tool_call(
378 &self,
379 tool_call_id: &str,
380 completion: ToolCallCompletion,
381 ) -> MetricsResult<()>;
382
383 async fn insert_forward_start(
397 &self,
398 forward_id: &str,
399 endpoint: &str,
400 model: &str,
401 is_stream: bool,
402 started_at: DateTime<Utc>,
403 ) -> MetricsResult<()>;
404
405 async fn complete_forward(
419 &self,
420 forward_id: &str,
421 completed_at: DateTime<Utc>,
422 status_code: Option<u16>,
423 status: ForwardStatus,
424 usage: Option<TokenUsage>,
425 error: Option<String>,
426 ) -> MetricsResult<()>;
427
428 async fn forward_summary(
456 &self,
457 filter: ForwardMetricsFilter,
458 ) -> MetricsResult<ForwardMetricsSummary>;
459
460 async fn forward_by_endpoint(
482 &self,
483 filter: ForwardMetricsFilter,
484 ) -> MetricsResult<Vec<ForwardEndpointMetrics>>;
485
486 async fn forward_requests(
509 &self,
510 filter: ForwardMetricsFilter,
511 ) -> MetricsResult<Vec<ForwardRequestMetrics>>;
512
513 async fn forward_daily_metrics(
536 &self,
537 days: u32,
538 end_date: Option<NaiveDate>,
539 ) -> MetricsResult<Vec<DailyMetrics>>;
540
541 async fn summary(&self, filter: MetricsDateFilter) -> MetricsResult<MetricsSummary>;
565
566 async fn by_model(&self, filter: MetricsDateFilter) -> MetricsResult<Vec<ModelMetrics>>;
588
589 async fn sessions(&self, filter: SessionMetricsFilter) -> MetricsResult<Vec<SessionMetrics>>;
619
620 async fn session_detail(&self, session_id: &str) -> MetricsResult<Option<SessionDetail>>;
648
649 async fn increment_execute_sync_mismatch(
651 &self,
652 reason: &str,
653 occurred_at: DateTime<Utc>,
654 ) -> MetricsResult<()>;
655
656 async fn daily_metrics(
684 &self,
685 days: u32,
686 end_date: Option<NaiveDate>,
687 ) -> MetricsResult<Vec<DailyMetrics>>;
688
689 async fn prune_rounds_before(&self, cutoff: DateTime<Utc>) -> MetricsResult<u64>;
717
718 async fn reconcile_stale_executions(
720 &self,
721 active_session_ids: &[String],
722 awaiting_response_session_ids: &[String],
723 ) -> MetricsResult<()>;
724}
725
726#[derive(Debug, Clone)]
795pub struct SqliteMetricsStorage {
796 db_path: PathBuf,
798}
799
800impl SqliteMetricsStorage {
801 pub fn new(db_path: impl AsRef<Path>) -> Self {
819 Self {
820 db_path: db_path.as_ref().to_path_buf(),
821 }
822 }
823
824 async fn with_connection<T, F>(&self, func: F) -> MetricsResult<T>
847 where
848 T: Send + 'static,
849 F: FnOnce(&Connection) -> MetricsResult<T> + Send + 'static,
850 {
851 let db_path = self.db_path.clone();
852 tokio::task::spawn_blocking(move || {
853 let connection = open_connection(&db_path)?;
854 func(&connection)
855 })
856 .await
857 .map_err(|error| MetricsError::Task(error.to_string()))?
858 }
859}
860
861#[async_trait]
862impl MetricsStorage for SqliteMetricsStorage {
863 async fn init(&self) -> MetricsResult<()> {
864 self.with_connection(|connection| {
865 connection.execute_batch(
866 r#"
867 CREATE TABLE IF NOT EXISTS session_metrics (
868 session_id TEXT PRIMARY KEY,
869 model TEXT NOT NULL,
870 started_at TEXT NOT NULL,
871 completed_at TEXT,
872 status TEXT NOT NULL DEFAULT 'running',
873 total_rounds INTEGER NOT NULL DEFAULT 0,
874 prompt_tokens INTEGER NOT NULL DEFAULT 0,
875 completion_tokens INTEGER NOT NULL DEFAULT 0,
876 total_tokens INTEGER NOT NULL DEFAULT 0,
877 prompt_cached_tool_outputs INTEGER NOT NULL DEFAULT 0,
878 total_compression_events INTEGER NOT NULL DEFAULT 0,
879 total_tokens_saved INTEGER NOT NULL DEFAULT 0,
880 tool_call_count INTEGER NOT NULL DEFAULT 0,
881 message_count INTEGER NOT NULL DEFAULT 0,
882 updated_at TEXT NOT NULL
883 );
884
885 CREATE TABLE IF NOT EXISTS round_metrics (
886 round_id TEXT PRIMARY KEY,
887 session_id TEXT NOT NULL,
888 model TEXT NOT NULL,
889 started_at TEXT NOT NULL,
890 completed_at TEXT,
891 status TEXT NOT NULL DEFAULT 'running',
892 prompt_tokens INTEGER NOT NULL DEFAULT 0,
893 completion_tokens INTEGER NOT NULL DEFAULT 0,
894 total_tokens INTEGER NOT NULL DEFAULT 0,
895 prompt_cached_tool_outputs INTEGER NOT NULL DEFAULT 0,
896 compression_count INTEGER NOT NULL DEFAULT 0,
897 tokens_saved INTEGER NOT NULL DEFAULT 0,
898 error TEXT,
899 FOREIGN KEY(session_id) REFERENCES session_metrics(session_id) ON DELETE CASCADE
900 );
901
902 CREATE TABLE IF NOT EXISTS tool_call_metrics (
903 tool_call_id TEXT PRIMARY KEY,
904 round_id TEXT NOT NULL,
905 session_id TEXT NOT NULL,
906 tool_name TEXT NOT NULL,
907 started_at TEXT NOT NULL,
908 completed_at TEXT,
909 success INTEGER,
910 error TEXT,
911 FOREIGN KEY(round_id) REFERENCES round_metrics(round_id) ON DELETE CASCADE,
912 FOREIGN KEY(session_id) REFERENCES session_metrics(session_id) ON DELETE CASCADE
913 );
914
915 CREATE INDEX IF NOT EXISTS idx_session_started_at ON session_metrics(started_at);
916 CREATE INDEX IF NOT EXISTS idx_session_model ON session_metrics(model);
917 CREATE INDEX IF NOT EXISTS idx_round_session ON round_metrics(session_id);
918 CREATE INDEX IF NOT EXISTS idx_tool_session ON tool_call_metrics(session_id);
919 CREATE INDEX IF NOT EXISTS idx_tool_started_at ON tool_call_metrics(started_at);
920 CREATE INDEX IF NOT EXISTS idx_tool_name ON tool_call_metrics(tool_name);
921
922 CREATE TABLE IF NOT EXISTS forward_request_metrics (
923 forward_id TEXT PRIMARY KEY,
924 endpoint TEXT NOT NULL,
925 model TEXT NOT NULL,
926 is_stream INTEGER NOT NULL,
927 started_at TEXT NOT NULL,
928 completed_at TEXT,
929 status_code INTEGER,
930 status TEXT NOT NULL DEFAULT 'pending',
931 prompt_tokens INTEGER,
932 completion_tokens INTEGER,
933 total_tokens INTEGER,
934 error TEXT,
935 updated_at TEXT NOT NULL
936 );
937
938 CREATE TABLE IF NOT EXISTS execute_sync_mismatch_metrics (
939 reason TEXT NOT NULL,
940 mismatch_date TEXT NOT NULL,
941 count INTEGER NOT NULL DEFAULT 0,
942 updated_at TEXT NOT NULL,
943 PRIMARY KEY (reason, mismatch_date)
944 );
945
946 CREATE INDEX IF NOT EXISTS idx_forward_started_at ON forward_request_metrics(started_at);
947 CREATE INDEX IF NOT EXISTS idx_forward_endpoint ON forward_request_metrics(endpoint);
948 CREATE INDEX IF NOT EXISTS idx_forward_model ON forward_request_metrics(model);
949 CREATE INDEX IF NOT EXISTS idx_execute_sync_mismatch_date ON execute_sync_mismatch_metrics(mismatch_date);
950 CREATE INDEX IF NOT EXISTS idx_execute_sync_mismatch_reason ON execute_sync_mismatch_metrics(reason);
951 "#,
952 )?;
953 ensure_integer_column(
954 connection,
955 "session_metrics",
956 "prompt_cached_tool_outputs",
957 0,
958 )?;
959 ensure_integer_column(connection, "session_metrics", "total_compression_events", 0)?;
960 ensure_integer_column(connection, "session_metrics", "total_tokens_saved", 0)?;
961 ensure_integer_column(connection, "round_metrics", "prompt_cached_tool_outputs", 0)?;
962 ensure_integer_column(connection, "round_metrics", "compression_count", 0)?;
963 ensure_integer_column(connection, "round_metrics", "tokens_saved", 0)?;
964 connection.execute(
965 "UPDATE forward_request_metrics SET status = 'pending' WHERE status IS NULL OR trim(status) = ''",
966 [],
967 )?;
968 Ok(())
969 })
970 .await
971 }
972
973 async fn upsert_session_start(
974 &self,
975 session_id: &str,
976 model: &str,
977 started_at: DateTime<Utc>,
978 ) -> MetricsResult<()> {
979 let session_id = session_id.to_string();
980 let model = model.to_string();
981 let started_at = format_timestamp(started_at);
982
983 self.with_connection(move |connection| {
984 connection.execute(
985 r#"
986 INSERT INTO session_metrics (
987 session_id, model, started_at, status, updated_at
988 ) VALUES (?1, ?2, ?3, 'running', ?3)
989 ON CONFLICT(session_id) DO UPDATE SET
990 model = excluded.model,
991 started_at = CASE
992 WHEN session_metrics.started_at <= excluded.started_at THEN session_metrics.started_at
993 ELSE excluded.started_at
994 END,
995 completed_at = NULL,
996 status = 'running',
997 updated_at = excluded.updated_at
998 "#,
999 params![session_id, model, started_at],
1000 )?;
1001 Ok(())
1002 })
1003 .await
1004 }
1005
1006 async fn update_session_message_count(
1007 &self,
1008 session_id: &str,
1009 message_count: u32,
1010 updated_at: DateTime<Utc>,
1011 ) -> MetricsResult<()> {
1012 let session_id = session_id.to_string();
1013 let updated_at = format_timestamp(updated_at);
1014
1015 self.with_connection(move |connection| {
1016 connection.execute(
1017 "UPDATE session_metrics SET message_count = ?1, updated_at = ?2 WHERE session_id = ?3",
1018 params![i64::from(message_count), updated_at, session_id],
1019 )?;
1020 Ok(())
1021 })
1022 .await
1023 }
1024
1025 async fn complete_session(
1026 &self,
1027 session_id: &str,
1028 status: SessionStatus,
1029 completed_at: DateTime<Utc>,
1030 ) -> MetricsResult<()> {
1031 let session_id = session_id.to_string();
1032 let completed_at_str = format_timestamp(completed_at);
1033
1034 self.with_connection(move |connection| {
1035 refresh_session_aggregates(connection, &session_id, completed_at)?;
1036 connection.execute(
1037 "UPDATE session_metrics SET status = ?1, completed_at = ?2, updated_at = ?2 WHERE session_id = ?3",
1038 params![status.as_str(), completed_at_str, session_id],
1039 )?;
1040 Ok(())
1041 })
1042 .await
1043 }
1044
1045 async fn insert_round_start(
1046 &self,
1047 round_id: &str,
1048 session_id: &str,
1049 model: &str,
1050 started_at: DateTime<Utc>,
1051 ) -> MetricsResult<()> {
1052 let round_id = round_id.to_string();
1053 let session_id = session_id.to_string();
1054 let model = model.to_string();
1055 let started_at_str = format_timestamp(started_at);
1056
1057 self.with_connection(move |connection| {
1058 connection.execute(
1059 r#"
1060 INSERT INTO round_metrics (
1061 round_id, session_id, model, started_at, status
1062 ) VALUES (?1, ?2, ?3, ?4, 'running')
1063 ON CONFLICT(round_id) DO NOTHING
1064 "#,
1065 params![round_id, session_id, model, started_at_str],
1066 )?;
1067 refresh_session_aggregates(connection, &session_id, started_at)?;
1068 Ok(())
1069 })
1070 .await
1071 }
1072
1073 async fn complete_round(
1074 &self,
1075 round_id: &str,
1076 completed_at: DateTime<Utc>,
1077 status: RoundStatus,
1078 usage: TokenUsage,
1079 prompt_cached_tool_outputs: u32,
1080 error: Option<String>,
1081 ) -> MetricsResult<()> {
1082 let round_id = round_id.to_string();
1083 let completed_at_str = format_timestamp(completed_at);
1084
1085 self.with_connection(move |connection| {
1086 let session_id: String = connection.query_row(
1087 "SELECT session_id FROM round_metrics WHERE round_id = ?1",
1088 params![round_id],
1089 |row| row.get(0),
1090 )?;
1091
1092 connection.execute(
1093 r#"
1094 UPDATE round_metrics
1095 SET completed_at = ?1,
1096 status = ?2,
1097 prompt_tokens = ?3,
1098 completion_tokens = ?4,
1099 total_tokens = ?5,
1100 prompt_cached_tool_outputs = ?6,
1101 error = ?7
1102 WHERE round_id = ?8
1103 "#,
1104 params![
1105 completed_at_str,
1106 status.as_str(),
1107 usage.prompt_tokens as i64,
1108 usage.completion_tokens as i64,
1109 usage.total_tokens as i64,
1110 i64::from(prompt_cached_tool_outputs),
1111 error,
1112 round_id,
1113 ],
1114 )?;
1115
1116 refresh_session_aggregates(connection, &session_id, completed_at)?;
1117 Ok(())
1118 })
1119 .await
1120 }
1121
1122 async fn record_round_compression(
1123 &self,
1124 round_id: &str,
1125 compressed_at: DateTime<Utc>,
1126 tokens_saved: u32,
1127 ) -> MetricsResult<()> {
1128 let round_id = round_id.to_string();
1129
1130 self.with_connection(move |connection| {
1131 let session_id: String = connection.query_row(
1132 "SELECT session_id FROM round_metrics WHERE round_id = ?1",
1133 params![round_id],
1134 |row| row.get(0),
1135 )?;
1136
1137 connection.execute(
1138 r#"
1139 UPDATE round_metrics
1140 SET compression_count = COALESCE(compression_count, 0) + 1,
1141 tokens_saved = COALESCE(tokens_saved, 0) + ?1
1142 WHERE round_id = ?2
1143 "#,
1144 params![i64::from(tokens_saved), round_id],
1145 )?;
1146
1147 refresh_session_aggregates(connection, &session_id, compressed_at)?;
1148 Ok(())
1149 })
1150 .await
1151 }
1152
1153 async fn insert_tool_start(
1154 &self,
1155 tool_call_id: &str,
1156 round_id: &str,
1157 session_id: &str,
1158 tool_name: &str,
1159 started_at: DateTime<Utc>,
1160 ) -> MetricsResult<()> {
1161 let tool_call_id = tool_call_id.to_string();
1162 let round_id = round_id.to_string();
1163 let session_id = session_id.to_string();
1164 let tool_name = tool_name.to_string();
1165 let started_at_str = format_timestamp(started_at);
1166
1167 self.with_connection(move |connection| {
1168 connection.execute(
1169 r#"
1170 INSERT INTO tool_call_metrics (
1171 tool_call_id, round_id, session_id, tool_name, started_at
1172 ) VALUES (?1, ?2, ?3, ?4, ?5)
1173 ON CONFLICT(tool_call_id) DO UPDATE SET
1174 round_id = excluded.round_id,
1175 session_id = excluded.session_id,
1176 tool_name = excluded.tool_name,
1177 started_at = excluded.started_at
1178 "#,
1179 params![
1180 tool_call_id,
1181 round_id,
1182 session_id,
1183 tool_name,
1184 started_at_str
1185 ],
1186 )?;
1187 Ok(())
1188 })
1189 .await
1190 }
1191
1192 async fn complete_tool_call(
1193 &self,
1194 tool_call_id: &str,
1195 completion: ToolCallCompletion,
1196 ) -> MetricsResult<()> {
1197 let tool_call_id = tool_call_id.to_string();
1198 let completed_at = format_timestamp(completion.completed_at);
1199 let success = if completion.success { 1_i64 } else { 0_i64 };
1200 let error = completion.error;
1201
1202 self.with_connection(move |connection| {
1203 let session_id: String = connection.query_row(
1204 "SELECT session_id FROM tool_call_metrics WHERE tool_call_id = ?1",
1205 params![tool_call_id],
1206 |row| row.get(0),
1207 )?;
1208
1209 connection.execute(
1210 "UPDATE tool_call_metrics SET completed_at = ?1, success = ?2, error = ?3 WHERE tool_call_id = ?4",
1211 params![completed_at, success, error, tool_call_id],
1212 )?;
1213
1214 refresh_session_aggregates(connection, &session_id, completion.completed_at)?;
1215 Ok(())
1216 })
1217 .await
1218 }
1219
1220 async fn insert_forward_start(
1221 &self,
1222 forward_id: &str,
1223 endpoint: &str,
1224 model: &str,
1225 is_stream: bool,
1226 started_at: DateTime<Utc>,
1227 ) -> MetricsResult<()> {
1228 let forward_id = forward_id.to_string();
1229 let endpoint = endpoint.to_string();
1230 let model = model.to_string();
1231 let is_stream_int = if is_stream { 1_i64 } else { 0_i64 };
1232 let started_at_str = format_timestamp(started_at);
1233
1234 self.with_connection(move |connection| {
1235 connection.execute(
1236 r#"
1237 INSERT INTO forward_request_metrics (
1238 forward_id, endpoint, model, is_stream, started_at, status, updated_at
1239 ) VALUES (?1, ?2, ?3, ?4, ?5, 'pending', ?5)
1240 ON CONFLICT(forward_id) DO UPDATE SET
1241 endpoint = excluded.endpoint,
1242 model = excluded.model,
1243 is_stream = excluded.is_stream,
1244 started_at = excluded.started_at,
1245 completed_at = NULL,
1246 status_code = NULL,
1247 status = 'pending',
1248 prompt_tokens = NULL,
1249 completion_tokens = NULL,
1250 total_tokens = NULL,
1251 error = NULL,
1252 updated_at = excluded.updated_at
1253 "#,
1254 params![forward_id, endpoint, model, is_stream_int, started_at_str],
1255 )?;
1256 Ok(())
1257 })
1258 .await
1259 }
1260
1261 async fn complete_forward(
1262 &self,
1263 forward_id: &str,
1264 completed_at: DateTime<Utc>,
1265 status_code: Option<u16>,
1266 status: ForwardStatus,
1267 usage: Option<TokenUsage>,
1268 error: Option<String>,
1269 ) -> MetricsResult<()> {
1270 let forward_id = forward_id.to_string();
1271 let completed_at_str = format_timestamp(completed_at);
1272 let status_code_int = status_code.map(|s| s as i64);
1273 let (prompt, completion, total) = match usage {
1274 Some(u) => (
1275 Some(u.prompt_tokens as i64),
1276 Some(u.completion_tokens as i64),
1277 Some(u.total_tokens as i64),
1278 ),
1279 None => (None, None, None),
1280 };
1281
1282 self.with_connection(move |connection| {
1283 connection.execute(
1284 r#"
1285 UPDATE forward_request_metrics
1286 SET completed_at = ?1,
1287 status_code = ?2,
1288 status = ?3,
1289 prompt_tokens = ?4,
1290 completion_tokens = ?5,
1291 total_tokens = ?6,
1292 error = ?7,
1293 updated_at = ?1
1294 WHERE forward_id = ?8
1295 "#,
1296 params![
1297 completed_at_str,
1298 status_code_int,
1299 status.as_str(),
1300 prompt,
1301 completion,
1302 total,
1303 error,
1304 forward_id,
1305 ],
1306 )?;
1307 Ok(())
1308 })
1309 .await
1310 }
1311
1312 async fn forward_summary(
1313 &self,
1314 filter: ForwardMetricsFilter,
1315 ) -> MetricsResult<ForwardMetricsSummary> {
1316 self.with_connection(move |connection| {
1317 let mut params_vec = Vec::new();
1318 let where_clause = build_forward_where_clause(
1319 filter.start_date,
1320 filter.end_date,
1321 filter.endpoint.as_deref(),
1322 filter.model.as_deref(),
1323 &mut params_vec,
1324 );
1325
1326 let sql = format!(
1327 "SELECT COUNT(*), \
1328 COALESCE(SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END), 0), \
1329 COALESCE(SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END), 0), \
1330 COALESCE(SUM(prompt_tokens), 0), \
1331 COALESCE(SUM(completion_tokens), 0), \
1332 COALESCE(SUM(total_tokens), 0), \
1333 AVG(CASE WHEN completed_at IS NOT NULL THEN \
1334 (julianday(completed_at) - julianday(started_at)) * 86400000 END) \
1335 FROM forward_request_metrics {}",
1336 where_clause
1337 );
1338
1339 let mut stmt = connection.prepare(&sql)?;
1340 let summary = stmt.query_row(params_from_iter(params_vec.iter()), |row| {
1341 let avg_duration: Option<f64> = row.get(6)?;
1342 Ok(ForwardMetricsSummary {
1343 total_requests: row.get::<_, i64>(0)? as u64,
1344 successful_requests: row.get::<_, i64>(1)? as u64,
1345 failed_requests: row.get::<_, i64>(2)? as u64,
1346 total_tokens: TokenUsage {
1347 prompt_tokens: row.get::<_, i64>(3)? as u64,
1348 completion_tokens: row.get::<_, i64>(4)? as u64,
1349 total_tokens: row.get::<_, i64>(5)? as u64,
1350 },
1351 avg_duration_ms: avg_duration.map(|d| d as u64),
1352 })
1353 })?;
1354
1355 Ok(summary)
1356 })
1357 .await
1358 }
1359
1360 async fn forward_by_endpoint(
1361 &self,
1362 filter: ForwardMetricsFilter,
1363 ) -> MetricsResult<Vec<ForwardEndpointMetrics>> {
1364 self.with_connection(move |connection| {
1365 let mut params_vec = Vec::new();
1366 let where_clause = build_forward_where_clause(
1367 filter.start_date,
1368 filter.end_date,
1369 None,
1370 filter.model.as_deref(),
1371 &mut params_vec,
1372 );
1373
1374 let sql = format!(
1375 "SELECT endpoint, COUNT(*), \
1376 COALESCE(SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END), 0), \
1377 COALESCE(SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END), 0), \
1378 COALESCE(SUM(prompt_tokens), 0), \
1379 COALESCE(SUM(completion_tokens), 0), \
1380 COALESCE(SUM(total_tokens), 0), \
1381 AVG(CASE WHEN completed_at IS NOT NULL THEN \
1382 (julianday(completed_at) - julianday(started_at)) * 86400000 END) \
1383 FROM forward_request_metrics {} \
1384 GROUP BY endpoint ORDER BY COUNT(*) DESC",
1385 where_clause
1386 );
1387
1388 let mut stmt = connection.prepare(&sql)?;
1389 let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1390 let mut endpoints = Vec::new();
1391
1392 while let Some(row) = rows.next()? {
1393 let avg_duration: Option<f64> = row.get(7)?;
1394 endpoints.push(ForwardEndpointMetrics {
1395 endpoint: row.get(0)?,
1396 requests: row.get::<_, i64>(1)? as u64,
1397 successful: row.get::<_, i64>(2)? as u64,
1398 failed: row.get::<_, i64>(3)? as u64,
1399 tokens: TokenUsage {
1400 prompt_tokens: row.get::<_, i64>(4)? as u64,
1401 completion_tokens: row.get::<_, i64>(5)? as u64,
1402 total_tokens: row.get::<_, i64>(6)? as u64,
1403 },
1404 avg_duration_ms: avg_duration.map(|d| d as u64),
1405 });
1406 }
1407
1408 Ok(endpoints)
1409 })
1410 .await
1411 }
1412
1413 async fn forward_requests(
1414 &self,
1415 filter: ForwardMetricsFilter,
1416 ) -> MetricsResult<Vec<ForwardRequestMetrics>> {
1417 self.with_connection(move |connection| {
1418 let mut params_vec = Vec::new();
1419 let where_clause = build_forward_where_clause(
1420 filter.start_date,
1421 filter.end_date,
1422 filter.endpoint.as_deref(),
1423 filter.model.as_deref(),
1424 &mut params_vec,
1425 );
1426
1427 let limit = i64::from(filter.limit.unwrap_or(100).min(1_000));
1428 let sql = format!(
1429 "SELECT forward_id, endpoint, model, is_stream, started_at, completed_at, \
1430 status_code, status, prompt_tokens, completion_tokens, total_tokens, error \
1431 FROM forward_request_metrics {} \
1432 ORDER BY started_at DESC LIMIT {}",
1433 where_clause, limit
1434 );
1435
1436 let mut stmt = connection.prepare(&sql)?;
1437 let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1438 let mut requests = Vec::new();
1439
1440 while let Some(row) = rows.next()? {
1441 let started_at = parse_timestamp(row.get::<_, String>(4)?)?;
1442 let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(5)?)?;
1443 let status_raw: Option<String> = row.get(7)?;
1444 let status = status_raw.and_then(|s| ForwardStatus::from_db(&s));
1445
1446 let prompt: Option<i64> = row.get(8)?;
1447 let completion: Option<i64> = row.get(9)?;
1448 let total: Option<i64> = row.get(10)?;
1449 let token_usage = match (prompt, completion, total) {
1450 (Some(p), Some(c), Some(t)) => Some(TokenUsage {
1451 prompt_tokens: p as u64,
1452 completion_tokens: c as u64,
1453 total_tokens: t as u64,
1454 }),
1455 _ => None,
1456 };
1457
1458 requests.push(ForwardRequestMetrics {
1459 forward_id: row.get(0)?,
1460 endpoint: row.get(1)?,
1461 model: row.get(2)?,
1462 is_stream: row.get::<_, i64>(3)? > 0,
1463 started_at,
1464 completed_at,
1465 status_code: row.get::<_, Option<i64>>(6)?.map(|s| s as u16),
1466 status,
1467 token_usage,
1468 error: row.get(11)?,
1469 duration_ms: compute_duration_ms(started_at, completed_at),
1470 });
1471 }
1472
1473 Ok(requests)
1474 })
1475 .await
1476 }
1477
1478 async fn forward_daily_metrics(
1479 &self,
1480 days: u32,
1481 end_date: Option<NaiveDate>,
1482 ) -> MetricsResult<Vec<DailyMetrics>> {
1483 let end_date = end_date.unwrap_or_else(|| Utc::now().date_naive());
1484 let span = days.max(1) - 1;
1485 let start_date = end_date - chrono::Duration::days(i64::from(span));
1486
1487 self.with_connection(move |connection| {
1488 let mut stmt = connection.prepare(
1489 r#"
1490 SELECT
1491 date(started_at) AS date_key,
1492 COUNT(*) AS total_sessions,
1493 COALESCE(SUM(prompt_tokens), 0) AS prompt_tokens,
1494 COALESCE(SUM(completion_tokens), 0) AS completion_tokens,
1495 COALESCE(SUM(total_tokens), 0) AS total_tokens
1496 FROM forward_request_metrics
1497 WHERE date(started_at) BETWEEN date(?1) AND date(?2)
1498 GROUP BY date_key
1499 ORDER BY date_key ASC
1500 "#,
1501 )?;
1502
1503 let mut rows = stmt.query(params![start_date.to_string(), end_date.to_string()])?;
1504 let mut result = Vec::new();
1505
1506 while let Some(row) = rows.next()? {
1507 let date = NaiveDate::parse_from_str(&row.get::<_, String>(0)?, "%Y-%m-%d")?;
1508
1509 result.push(DailyMetrics {
1510 date,
1511 total_sessions: row.get::<_, i64>(1)? as u32,
1512 total_rounds: 0,
1513 total_token_usage: TokenUsage {
1514 prompt_tokens: row.get::<_, i64>(2)? as u64,
1515 completion_tokens: row.get::<_, i64>(3)? as u64,
1516 total_tokens: row.get::<_, i64>(4)? as u64,
1517 },
1518 total_tool_calls: 0,
1519 prompt_cached_tool_outputs: 0,
1520 model_breakdown: HashMap::new(),
1521 tool_breakdown: HashMap::new(),
1522 });
1523 }
1524
1525 Ok(result)
1526 })
1527 .await
1528 }
1529
1530 async fn summary(&self, filter: MetricsDateFilter) -> MetricsResult<MetricsSummary> {
1531 self.with_connection(move |connection| {
1532 let mut params_vec = Vec::new();
1533 let where_clause = build_session_where_clause(
1534 filter.start_date,
1535 filter.end_date,
1536 None,
1537 &mut params_vec,
1538 );
1539
1540 let summary_sql = format!(
1541 "SELECT COUNT(*), COALESCE(SUM(prompt_tokens), 0), COALESCE(SUM(completion_tokens), 0), COALESCE(SUM(total_tokens), 0), COALESCE(SUM(tool_call_count), 0), COALESCE(SUM(prompt_cached_tool_outputs), 0), COALESCE(SUM(total_compression_events), 0), COALESCE(SUM(total_tokens_saved), 0), COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0), COALESCE(SUM(CASE WHEN status = 'awaiting_response' THEN 1 ELSE 0 END), 0), COALESCE(SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END), 0), COALESCE(SUM(CASE WHEN status = 'cancelled' THEN 1 ELSE 0 END), 0) FROM session_metrics {}",
1542 where_clause
1543 );
1544
1545 let mut stmt = connection.prepare(&summary_sql)?;
1546 let mut summary = stmt.query_row(params_from_iter(params_vec.iter()), |row| {
1547 Ok(MetricsSummary {
1548 total_sessions: row.get::<_, i64>(0)? as u64,
1549 total_tokens: TokenUsage {
1550 prompt_tokens: row.get::<_, i64>(1)? as u64,
1551 completion_tokens: row.get::<_, i64>(2)? as u64,
1552 total_tokens: row.get::<_, i64>(3)? as u64,
1553 },
1554 total_tool_calls: row.get::<_, i64>(4)? as u64,
1555 prompt_cached_tool_outputs: row.get::<_, i64>(5)? as u64,
1556 total_compression_events: row.get::<_, i64>(6)? as u64,
1557 total_tokens_saved: row.get::<_, i64>(7)? as u64,
1558 completed_sessions: row.get::<_, i64>(8)? as u64,
1559 awaiting_response_sessions: row.get::<_, i64>(9)? as u64,
1560 error_sessions: row.get::<_, i64>(10)? as u64,
1561 cancelled_sessions: row.get::<_, i64>(11)? as u64,
1562 total_sync_mismatches: 0,
1563 sync_mismatch_breakdown: HashMap::new(),
1564 active_sessions: 0,
1565 })
1566 })?;
1567
1568 let mut mismatch_params = Vec::new();
1569 let mismatch_clause = build_execute_sync_mismatch_where_clause(
1570 filter.start_date,
1571 filter.end_date,
1572 None,
1573 &mut mismatch_params,
1574 );
1575 let mismatch_sql = format!(
1576 "SELECT COALESCE(SUM(count), 0) FROM execute_sync_mismatch_metrics {}",
1577 mismatch_clause
1578 );
1579 let mut mismatch_stmt = connection.prepare(&mismatch_sql)?;
1580 summary.total_sync_mismatches = mismatch_stmt
1581 .query_row(params_from_iter(mismatch_params.iter()), |row| row.get::<_, i64>(0))?
1582 as u64;
1583 summary.sync_mismatch_breakdown = load_execute_sync_mismatch_breakdown(
1584 connection,
1585 filter.start_date,
1586 filter.end_date,
1587 )?;
1588 let mut active_params = Vec::new();
1589 let active_clause = build_session_where_clause(
1590 filter.start_date,
1591 filter.end_date,
1592 Some("running"),
1593 &mut active_params,
1594 );
1595 let active_sql = format!(
1596 "SELECT COUNT(*) FROM session_metrics {}",
1597 active_clause
1598 );
1599 let mut active_stmt = connection.prepare(&active_sql)?;
1600 let active_sessions = active_stmt.query_row(params_from_iter(active_params.iter()), |row| {
1601 row.get::<_, i64>(0)
1602 })? as u64;
1603
1604 Ok(MetricsSummary {
1605 active_sessions,
1606 ..summary
1607 })
1608 })
1609 .await
1610 }
1611
1612 async fn by_model(&self, filter: MetricsDateFilter) -> MetricsResult<Vec<ModelMetrics>> {
1613 self.with_connection(move |connection| {
1614 let mut params_vec = Vec::new();
1615 let where_clause = build_session_where_clause(
1616 filter.start_date,
1617 filter.end_date,
1618 None,
1619 &mut params_vec,
1620 );
1621
1622 let sql = format!(
1623 "SELECT model, COUNT(*), COALESCE(SUM(total_rounds), 0), COALESCE(SUM(prompt_tokens), 0), COALESCE(SUM(completion_tokens), 0), COALESCE(SUM(total_tokens), 0), COALESCE(SUM(tool_call_count), 0), COALESCE(SUM(prompt_cached_tool_outputs), 0) FROM session_metrics {} GROUP BY model ORDER BY SUM(total_tokens) DESC",
1624 where_clause
1625 );
1626
1627 let mut stmt = connection.prepare(&sql)?;
1628 let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1629 let mut models = Vec::new();
1630
1631 while let Some(row) = rows.next()? {
1632 models.push(ModelMetrics {
1633 model: row.get(0)?,
1634 sessions: row.get::<_, i64>(1)? as u64,
1635 rounds: row.get::<_, i64>(2)? as u64,
1636 tokens: TokenUsage {
1637 prompt_tokens: row.get::<_, i64>(3)? as u64,
1638 completion_tokens: row.get::<_, i64>(4)? as u64,
1639 total_tokens: row.get::<_, i64>(5)? as u64,
1640 },
1641 tool_calls: row.get::<_, i64>(6)? as u64,
1642 prompt_cached_tool_outputs: row.get::<_, i64>(7)? as u64,
1643 });
1644 }
1645
1646 Ok(models)
1647 })
1648 .await
1649 }
1650
1651 async fn sessions(&self, filter: SessionMetricsFilter) -> MetricsResult<Vec<SessionMetrics>> {
1652 self.with_connection(move |connection| {
1653 let mut params_vec = Vec::new();
1654 let where_clause = build_session_where_clause(
1655 filter.start_date,
1656 filter.end_date,
1657 None,
1658 &mut params_vec,
1659 );
1660 let mut conditions = if where_clause.is_empty() {
1661 Vec::new()
1662 } else {
1663 vec![where_clause.replacen("WHERE ", "", 1)]
1664 };
1665 if let Some(model) = filter.model {
1666 conditions.push("model = ?".to_string());
1667 params_vec.push(model);
1668 }
1669
1670 let where_sql = if conditions.is_empty() {
1671 String::new()
1672 } else {
1673 format!("WHERE {}", conditions.join(" AND "))
1674 };
1675
1676 let limit = i64::from(filter.limit.unwrap_or(100).min(1_000));
1677 let sql = format!(
1678 "SELECT session_id, model, started_at, completed_at, total_rounds, prompt_tokens, completion_tokens, total_tokens, tool_call_count, prompt_cached_tool_outputs, total_compression_events, total_tokens_saved, status, message_count FROM session_metrics {} ORDER BY started_at DESC LIMIT {}",
1679 where_sql, limit
1680 );
1681
1682 let mut stmt = connection.prepare(&sql)?;
1683 let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1684 let mut sessions = Vec::new();
1685
1686 while let Some(row) = rows.next()? {
1687 let session_id: String = row.get(0)?;
1688 let started_at = parse_timestamp(row.get::<_, String>(2)?)?;
1689 let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(3)?)?;
1690 let status_raw: String = row.get(12)?;
1691 let status = SessionStatus::from_db(&status_raw).ok_or_else(|| {
1692 MetricsError::InvalidData(format!("unknown session status: {}", status_raw))
1693 })?;
1694 let tool_breakdown = load_tool_breakdown(connection, &session_id)?;
1695
1696 sessions.push(SessionMetrics {
1697 session_id,
1698 model: row.get(1)?,
1699 started_at,
1700 completed_at,
1701 total_rounds: row.get::<_, i64>(4)? as u32,
1702 total_token_usage: TokenUsage {
1703 prompt_tokens: row.get::<_, i64>(5)? as u64,
1704 completion_tokens: row.get::<_, i64>(6)? as u64,
1705 total_tokens: row.get::<_, i64>(7)? as u64,
1706 },
1707 tool_call_count: row.get::<_, i64>(8)? as u32,
1708 prompt_cached_tool_outputs: row.get::<_, i64>(9)? as u64,
1709 total_compression_events: row.get::<_, i64>(10)? as u64,
1710 total_tokens_saved: row.get::<_, i64>(11)? as u64,
1711 tool_breakdown,
1712 status,
1713 message_count: row.get::<_, i64>(13)? as u32,
1714 duration_ms: compute_duration_ms(started_at, completed_at),
1715 });
1716 }
1717
1718 Ok(sessions)
1719 })
1720 .await
1721 }
1722
1723 async fn session_detail(&self, session_id: &str) -> MetricsResult<Option<SessionDetail>> {
1724 let session_id = session_id.to_string();
1725 self.with_connection(move |connection| {
1726 let session_sql = "SELECT session_id, model, started_at, completed_at, total_rounds, prompt_tokens, completion_tokens, total_tokens, tool_call_count, prompt_cached_tool_outputs, total_compression_events, total_tokens_saved, status, message_count FROM session_metrics WHERE session_id = ?1";
1727 let session_row = connection
1728 .query_row(session_sql, params![session_id], |row| {
1729 Ok((
1730 row.get::<_, String>(0)?,
1731 row.get::<_, String>(1)?,
1732 row.get::<_, String>(2)?,
1733 row.get::<_, Option<String>>(3)?,
1734 row.get::<_, i64>(4)?,
1735 row.get::<_, i64>(5)?,
1736 row.get::<_, i64>(6)?,
1737 row.get::<_, i64>(7)?,
1738 row.get::<_, i64>(8)?,
1739 row.get::<_, i64>(9)?,
1740 row.get::<_, i64>(10)?,
1741 row.get::<_, i64>(11)?,
1742 row.get::<_, String>(12)?,
1743 row.get::<_, i64>(13)?,
1744 ))
1745 })
1746 .optional()?;
1747
1748 let Some((
1749 session_id,
1750 model,
1751 started_at_raw,
1752 completed_at_raw,
1753 total_rounds,
1754 prompt_tokens,
1755 completion_tokens,
1756 total_tokens,
1757 tool_call_count,
1758 prompt_cached_tool_outputs,
1759 total_compression_events,
1760 total_tokens_saved,
1761 status_raw,
1762 message_count,
1763 )) = session_row
1764 else {
1765 return Ok(None);
1766 };
1767
1768 let started_at = parse_timestamp(started_at_raw)?;
1769 let completed_at = parse_optional_timestamp(completed_at_raw)?;
1770 let status = SessionStatus::from_db(&status_raw).ok_or_else(|| {
1771 MetricsError::InvalidData(format!("unknown session status: {}", status_raw))
1772 })?;
1773 let tool_breakdown = load_tool_breakdown(connection, &session_id)?;
1774
1775 let session = SessionMetrics {
1776 session_id: session_id.clone(),
1777 model,
1778 started_at,
1779 completed_at,
1780 total_rounds: total_rounds as u32,
1781 total_token_usage: TokenUsage {
1782 prompt_tokens: prompt_tokens as u64,
1783 completion_tokens: completion_tokens as u64,
1784 total_tokens: total_tokens as u64,
1785 },
1786 tool_call_count: tool_call_count as u32,
1787 prompt_cached_tool_outputs: prompt_cached_tool_outputs as u64,
1788 total_compression_events: total_compression_events as u64,
1789 total_tokens_saved: total_tokens_saved as u64,
1790 tool_breakdown,
1791 status,
1792 message_count: message_count as u32,
1793 duration_ms: compute_duration_ms(started_at, completed_at),
1794 };
1795
1796 let rounds = load_rounds(connection, &session_id)?;
1797 Ok(Some(SessionDetail { session, rounds }))
1798 })
1799 .await
1800 }
1801
1802 async fn increment_execute_sync_mismatch(
1803 &self,
1804 reason: &str,
1805 occurred_at: DateTime<Utc>,
1806 ) -> MetricsResult<()> {
1807 let reason = reason.to_string();
1808 let mismatch_date = occurred_at.date_naive().to_string();
1809 let updated_at = format_timestamp(occurred_at);
1810
1811 self.with_connection(move |connection| {
1812 connection.execute(
1813 r#"
1814 INSERT INTO execute_sync_mismatch_metrics (reason, mismatch_date, count, updated_at)
1815 VALUES (?1, ?2, 1, ?3)
1816 ON CONFLICT(reason, mismatch_date) DO UPDATE SET
1817 count = count + 1,
1818 updated_at = excluded.updated_at
1819 "#,
1820 params![reason, mismatch_date, updated_at],
1821 )?;
1822 Ok(())
1823 })
1824 .await
1825 }
1826
1827 async fn daily_metrics(
1828 &self,
1829 days: u32,
1830 end_date: Option<NaiveDate>,
1831 ) -> MetricsResult<Vec<DailyMetrics>> {
1832 let end_date = end_date.unwrap_or_else(|| Utc::now().date_naive());
1833 let span = days.max(1) - 1;
1834 let start_date = end_date - chrono::Duration::days(i64::from(span));
1835
1836 self.with_connection(move |connection| {
1837 let mut stmt = connection.prepare(
1838 r#"
1839 SELECT
1840 date(started_at) AS date_key,
1841 COUNT(*) AS total_sessions,
1842 COALESCE(SUM(total_rounds), 0) AS total_rounds,
1843 COALESCE(SUM(prompt_tokens), 0) AS prompt_tokens,
1844 COALESCE(SUM(completion_tokens), 0) AS completion_tokens,
1845 COALESCE(SUM(total_tokens), 0) AS total_tokens,
1846 COALESCE(SUM(tool_call_count), 0) AS total_tool_calls,
1847 COALESCE(SUM(prompt_cached_tool_outputs), 0) AS prompt_cached_tool_outputs
1848 FROM session_metrics
1849 WHERE date(started_at) BETWEEN date(?1) AND date(?2)
1850 GROUP BY date_key
1851 ORDER BY date_key ASC
1852 "#,
1853 )?;
1854
1855 let mut rows = stmt.query(params![start_date.to_string(), end_date.to_string()])?;
1856 let mut result = Vec::new();
1857
1858 while let Some(row) = rows.next()? {
1859 let date = NaiveDate::parse_from_str(&row.get::<_, String>(0)?, "%Y-%m-%d")?;
1860 let model_breakdown = load_daily_model_breakdown(connection, date)?;
1861 let tool_breakdown = load_daily_tool_breakdown(connection, date)?;
1862
1863 result.push(DailyMetrics {
1864 date,
1865 total_sessions: row.get::<_, i64>(1)? as u32,
1866 total_rounds: row.get::<_, i64>(2)? as u32,
1867 total_token_usage: TokenUsage {
1868 prompt_tokens: row.get::<_, i64>(3)? as u64,
1869 completion_tokens: row.get::<_, i64>(4)? as u64,
1870 total_tokens: row.get::<_, i64>(5)? as u64,
1871 },
1872 total_tool_calls: row.get::<_, i64>(6)? as u32,
1873 prompt_cached_tool_outputs: row.get::<_, i64>(7)? as u64,
1874 model_breakdown,
1875 tool_breakdown,
1876 });
1877 }
1878
1879 Ok(result)
1880 })
1881 .await
1882 }
1883
1884 async fn prune_rounds_before(&self, cutoff: DateTime<Utc>) -> MetricsResult<u64> {
1885 self.with_connection(move |connection| {
1886 let cutoff_str = format_timestamp(cutoff);
1887 let deleted = connection.execute(
1888 "DELETE FROM round_metrics WHERE started_at < ?1",
1889 params![cutoff_str],
1890 )?;
1891
1892 let mut stmt = connection.prepare("SELECT session_id FROM session_metrics")?;
1893 let session_ids: Vec<String> = stmt
1894 .query_map([], |row| row.get(0))?
1895 .collect::<Result<Vec<String>, _>>()?;
1896 for session_id in session_ids {
1897 refresh_session_aggregates(connection, &session_id, Utc::now())?;
1898 }
1899
1900 Ok(deleted as u64)
1901 })
1902 .await
1903 }
1904
1905 async fn reconcile_stale_executions(
1906 &self,
1907 active_session_ids: &[String],
1908 awaiting_response_session_ids: &[String],
1909 ) -> MetricsResult<()> {
1910 let active_session_ids = active_session_ids.to_vec();
1911 let awaiting_response_session_ids = awaiting_response_session_ids.to_vec();
1912
1913 self.with_connection(move |connection| {
1914 let reconciled_at = Utc::now();
1915 let reconciled_at_str = format_timestamp(reconciled_at);
1916
1917 let mut stmt = connection.prepare(
1918 "SELECT session_id FROM session_metrics WHERE status = 'running'",
1919 )?;
1920 let running_session_ids: Vec<String> = stmt
1921 .query_map([], |row| row.get(0))?
1922 .collect::<Result<Vec<String>, _>>()?;
1923
1924 for session_id in running_session_ids {
1925 if active_session_ids.iter().any(|id| id == &session_id) {
1926 continue;
1927 }
1928
1929 let status = if awaiting_response_session_ids
1930 .iter()
1931 .any(|id| id == &session_id)
1932 {
1933 SessionStatus::AwaitingResponse
1934 } else {
1935 SessionStatus::Completed
1936 };
1937
1938 connection.execute(
1939 "UPDATE session_metrics SET status = ?1, completed_at = COALESCE(completed_at, ?2), updated_at = ?2 WHERE session_id = ?3",
1940 params![status.as_str(), reconciled_at_str, session_id],
1941 )?;
1942 refresh_session_aggregates(connection, &session_id, reconciled_at)?;
1943 }
1944
1945 connection.execute(
1946 "UPDATE round_metrics SET status = 'error', completed_at = COALESCE(completed_at, ?1), error = COALESCE(error, 'reconciled_stale_round') WHERE status = 'running'",
1947 params![reconciled_at_str],
1948 )?;
1949 connection.execute(
1950 "UPDATE forward_request_metrics SET status = 'error', completed_at = COALESCE(completed_at, ?1), error = COALESCE(error, 'reconciled_stale_forward'), updated_at = ?1 WHERE status = 'pending' AND completed_at IS NULL",
1951 params![reconciled_at_str],
1952 )?;
1953
1954 Ok(())
1955 })
1956 .await
1957 }
1958}
1959
1960fn open_connection(path: &Path) -> MetricsResult<Connection> {
1981 if let Some(parent) = path.parent() {
1982 std::fs::create_dir_all(parent)?;
1983 }
1984 let connection = Connection::open(path)?;
1985 connection.execute_batch(
1986 r#"
1987 PRAGMA journal_mode = WAL;
1988 PRAGMA foreign_keys = ON;
1989 PRAGMA synchronous = NORMAL;
1990 "#,
1991 )?;
1992 Ok(connection)
1993}
1994
1995fn format_timestamp(timestamp: DateTime<Utc>) -> String {
2005 timestamp.to_rfc3339()
2006}
2007
2008fn parse_timestamp(raw: String) -> MetricsResult<DateTime<Utc>> {
2018 Ok(DateTime::parse_from_rfc3339(&raw)?.with_timezone(&Utc))
2019}
2020
2021fn parse_optional_timestamp(raw: Option<String>) -> MetricsResult<Option<DateTime<Utc>>> {
2031 raw.map(parse_timestamp).transpose()
2032}
2033
2034fn compute_duration_ms(
2046 started_at: DateTime<Utc>,
2047 completed_at: Option<DateTime<Utc>>,
2048) -> Option<u64> {
2049 completed_at.and_then(|end| {
2050 end.signed_duration_since(started_at)
2051 .num_milliseconds()
2052 .try_into()
2053 .ok()
2054 })
2055}
2056
2057fn build_session_where_clause(
2074 start_date: Option<NaiveDate>,
2075 end_date: Option<NaiveDate>,
2076 required_status: Option<&str>,
2077 params_vec: &mut Vec<String>,
2078) -> String {
2079 let mut conditions = Vec::new();
2080
2081 if let Some(start) = start_date {
2082 conditions.push("date(started_at) >= date(?)".to_string());
2083 params_vec.push(start.to_string());
2084 }
2085
2086 if let Some(end) = end_date {
2087 conditions.push("date(started_at) <= date(?)".to_string());
2088 params_vec.push(end.to_string());
2089 }
2090
2091 if let Some(status) = required_status {
2092 conditions.push("status = ?".to_string());
2093 params_vec.push(status.to_string());
2094 }
2095
2096 if conditions.is_empty() {
2097 String::new()
2098 } else {
2099 format!("WHERE {}", conditions.join(" AND "))
2100 }
2101}
2102
2103fn build_forward_where_clause(
2121 start_date: Option<NaiveDate>,
2122 end_date: Option<NaiveDate>,
2123 endpoint: Option<&str>,
2124 model: Option<&str>,
2125 params_vec: &mut Vec<String>,
2126) -> String {
2127 let mut conditions = Vec::new();
2128
2129 if let Some(start) = start_date {
2130 conditions.push("date(started_at) >= date(?)".to_string());
2131 params_vec.push(start.to_string());
2132 }
2133
2134 if let Some(end) = end_date {
2135 conditions.push("date(started_at) <= date(?)".to_string());
2136 params_vec.push(end.to_string());
2137 }
2138
2139 if let Some(ep) = endpoint {
2140 conditions.push("endpoint = ?".to_string());
2141 params_vec.push(ep.to_string());
2142 }
2143
2144 if let Some(m) = model {
2145 conditions.push("model = ?".to_string());
2146 params_vec.push(m.to_string());
2147 }
2148
2149 if conditions.is_empty() {
2150 String::new()
2151 } else {
2152 format!("WHERE {}", conditions.join(" AND "))
2153 }
2154}
2155
2156fn build_execute_sync_mismatch_where_clause(
2157 start_date: Option<NaiveDate>,
2158 end_date: Option<NaiveDate>,
2159 reason: Option<&str>,
2160 params_vec: &mut Vec<String>,
2161) -> String {
2162 let mut conditions = Vec::new();
2163
2164 if let Some(start) = start_date {
2165 conditions.push("date(mismatch_date) >= date(?)".to_string());
2166 params_vec.push(start.to_string());
2167 }
2168
2169 if let Some(end) = end_date {
2170 conditions.push("date(mismatch_date) <= date(?)".to_string());
2171 params_vec.push(end.to_string());
2172 }
2173
2174 if let Some(reason) = reason {
2175 conditions.push("reason = ?".to_string());
2176 params_vec.push(reason.to_string());
2177 }
2178
2179 if conditions.is_empty() {
2180 String::new()
2181 } else {
2182 format!("WHERE {}", conditions.join(" AND "))
2183 }
2184}
2185
2186fn ensure_integer_column(
2187 connection: &Connection,
2188 table: &str,
2189 column: &str,
2190 default_value: i64,
2191) -> MetricsResult<()> {
2192 let pragma = format!("PRAGMA table_info({table})");
2193 let mut stmt = connection.prepare(&pragma)?;
2194 let mut rows = stmt.query([])?;
2195 while let Some(row) = rows.next()? {
2196 let name: String = row.get(1)?;
2197 if name == column {
2198 return Ok(());
2199 }
2200 }
2201
2202 let alter =
2203 format!("ALTER TABLE {table} ADD COLUMN {column} INTEGER NOT NULL DEFAULT {default_value}");
2204 connection.execute(&alter, [])?;
2205 Ok(())
2206}
2207
2208fn refresh_session_aggregates(
2236 connection: &Connection,
2237 session_id: &str,
2238 updated_at: DateTime<Utc>,
2239) -> MetricsResult<()> {
2240 let updated_at = format_timestamp(updated_at);
2241 connection.execute(
2242 r#"
2243 UPDATE session_metrics
2244 SET
2245 total_rounds = COALESCE((SELECT COUNT(*) FROM round_metrics WHERE session_id = ?1), 0),
2246 prompt_tokens = COALESCE((SELECT SUM(prompt_tokens) FROM round_metrics WHERE session_id = ?1), 0),
2247 completion_tokens = COALESCE((SELECT SUM(completion_tokens) FROM round_metrics WHERE session_id = ?1), 0),
2248 total_tokens = COALESCE((SELECT SUM(total_tokens) FROM round_metrics WHERE session_id = ?1), 0),
2249 prompt_cached_tool_outputs = COALESCE((SELECT SUM(prompt_cached_tool_outputs) FROM round_metrics WHERE session_id = ?1), 0),
2250 total_compression_events = COALESCE((SELECT SUM(compression_count) FROM round_metrics WHERE session_id = ?1), 0),
2251 total_tokens_saved = COALESCE((SELECT SUM(tokens_saved) FROM round_metrics WHERE session_id = ?1), 0),
2252 tool_call_count = COALESCE((SELECT COUNT(*) FROM tool_call_metrics WHERE session_id = ?1), 0),
2253 updated_at = ?2
2254 WHERE session_id = ?1
2255 "#,
2256 params![session_id, updated_at],
2257 )?;
2258 Ok(())
2259}
2260
2261fn load_tool_breakdown(
2282 connection: &Connection,
2283 session_id: &str,
2284) -> MetricsResult<HashMap<String, u32>> {
2285 let mut stmt = connection.prepare(
2286 "SELECT tool_name, COUNT(*) FROM tool_call_metrics WHERE session_id = ?1 GROUP BY tool_name",
2287 )?;
2288 let mut rows = stmt.query(params![session_id])?;
2289 let mut breakdown = HashMap::new();
2290
2291 while let Some(row) = rows.next()? {
2292 let tool: String = row.get(0)?;
2293 let count: i64 = row.get(1)?;
2294 breakdown.insert(tool, count as u32);
2295 }
2296
2297 Ok(breakdown)
2298}
2299
2300fn load_rounds(connection: &Connection, session_id: &str) -> MetricsResult<Vec<RoundMetrics>> {
2321 let mut stmt = connection.prepare(
2322 "SELECT round_id, session_id, model, started_at, completed_at, status, prompt_tokens, completion_tokens, total_tokens, prompt_cached_tool_outputs, compression_count, tokens_saved, error FROM round_metrics WHERE session_id = ?1 ORDER BY started_at ASC",
2323 )?;
2324 let mut rows = stmt.query(params![session_id])?;
2325 let mut rounds = Vec::new();
2326
2327 while let Some(row) = rows.next()? {
2328 let round_id: String = row.get(0)?;
2329 let started_at = parse_timestamp(row.get::<_, String>(3)?)?;
2330 let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(4)?)?;
2331 let status_raw: String = row.get(5)?;
2332 let status = RoundStatus::from_db(&status_raw).ok_or_else(|| {
2333 MetricsError::InvalidData(format!("unknown round status: {}", status_raw))
2334 })?;
2335
2336 rounds.push(RoundMetrics {
2337 round_id: round_id.clone(),
2338 session_id: row.get(1)?,
2339 model: row.get(2)?,
2340 started_at,
2341 completed_at,
2342 token_usage: TokenUsage {
2343 prompt_tokens: row.get::<_, i64>(6)? as u64,
2344 completion_tokens: row.get::<_, i64>(7)? as u64,
2345 total_tokens: row.get::<_, i64>(8)? as u64,
2346 },
2347 tool_calls: load_tool_calls(connection, &round_id)?,
2348 status,
2349 prompt_cached_tool_outputs: row.get::<_, i64>(9)? as u32,
2350 compression_count: row.get::<_, i64>(10)? as u32,
2351 tokens_saved: row.get::<_, i64>(11)? as u32,
2352 error: row.get(12)?,
2353 duration_ms: compute_duration_ms(started_at, completed_at),
2354 });
2355 }
2356
2357 Ok(rounds)
2358}
2359
2360fn load_tool_calls(connection: &Connection, round_id: &str) -> MetricsResult<Vec<ToolCallMetrics>> {
2374 let mut stmt = connection.prepare(
2375 "SELECT tool_call_id, tool_name, started_at, completed_at, success, error FROM tool_call_metrics WHERE round_id = ?1 ORDER BY started_at ASC",
2376 )?;
2377 let mut rows = stmt.query(params![round_id])?;
2378 let mut tools = Vec::new();
2379
2380 while let Some(row) = rows.next()? {
2381 let started_at = parse_timestamp(row.get::<_, String>(2)?)?;
2382 let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(3)?)?;
2383 let success = row.get::<_, Option<i64>>(4)?.map(|value| value > 0);
2384
2385 tools.push(ToolCallMetrics {
2386 tool_call_id: row.get(0)?,
2387 tool_name: row.get(1)?,
2388 started_at,
2389 completed_at,
2390 success,
2391 error: row.get(5)?,
2392 duration_ms: compute_duration_ms(started_at, completed_at),
2393 });
2394 }
2395
2396 Ok(tools)
2397}
2398
2399fn load_daily_model_breakdown(
2420 connection: &Connection,
2421 date: NaiveDate,
2422) -> MetricsResult<HashMap<String, TokenUsage>> {
2423 let mut stmt = connection.prepare(
2424 r#"
2425 SELECT model,
2426 COALESCE(SUM(prompt_tokens), 0),
2427 COALESCE(SUM(completion_tokens), 0),
2428 COALESCE(SUM(total_tokens), 0)
2429 FROM session_metrics
2430 WHERE date(started_at) = date(?1)
2431 GROUP BY model
2432 "#,
2433 )?;
2434
2435 let mut rows = stmt.query(params![date.to_string()])?;
2436 let mut breakdown = HashMap::new();
2437
2438 while let Some(row) = rows.next()? {
2439 breakdown.insert(
2440 row.get::<_, String>(0)?,
2441 TokenUsage {
2442 prompt_tokens: row.get::<_, i64>(1)? as u64,
2443 completion_tokens: row.get::<_, i64>(2)? as u64,
2444 total_tokens: row.get::<_, i64>(3)? as u64,
2445 },
2446 );
2447 }
2448
2449 Ok(breakdown)
2450}
2451
2452fn load_daily_tool_breakdown(
2473 connection: &Connection,
2474 date: NaiveDate,
2475) -> MetricsResult<HashMap<String, u32>> {
2476 let mut stmt = connection.prepare(
2477 r#"
2478 SELECT tool_name, COUNT(*)
2479 FROM tool_call_metrics
2480 WHERE date(started_at) = date(?1)
2481 GROUP BY tool_name
2482 "#,
2483 )?;
2484
2485 let mut rows = stmt.query(params![date.to_string()])?;
2486 let mut breakdown = HashMap::new();
2487
2488 while let Some(row) = rows.next()? {
2489 breakdown.insert(row.get::<_, String>(0)?, row.get::<_, i64>(1)? as u32);
2490 }
2491
2492 Ok(breakdown)
2493}
2494
2495fn load_execute_sync_mismatch_breakdown(
2496 connection: &Connection,
2497 start_date: Option<NaiveDate>,
2498 end_date: Option<NaiveDate>,
2499) -> MetricsResult<HashMap<String, u64>> {
2500 let mut params_vec = Vec::new();
2501 let where_clause =
2502 build_execute_sync_mismatch_where_clause(start_date, end_date, None, &mut params_vec);
2503 let sql = format!(
2504 "SELECT reason, COALESCE(SUM(count), 0) FROM execute_sync_mismatch_metrics {} GROUP BY reason ORDER BY reason ASC",
2505 where_clause
2506 );
2507 let mut stmt = connection.prepare(&sql)?;
2508 let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
2509 let mut breakdown = HashMap::new();
2510
2511 while let Some(row) = rows.next()? {
2512 breakdown.insert(row.get::<_, String>(0)?, row.get::<_, i64>(1)? as u64);
2513 }
2514
2515 Ok(breakdown)
2516}
2517
2518#[cfg(test)]
2519mod tests {
2520 use std::collections::HashMap;
2521
2522 use chrono::{NaiveDate, TimeZone, Utc};
2523 use tempfile::tempdir;
2524
2525 use super::{MetricsStorage, SqliteMetricsStorage, ToolCallCompletion};
2526 use crate::metrics::types::{
2527 ForwardMetricsFilter, ForwardStatus, MetricsDateFilter, RoundStatus, SessionMetricsFilter,
2528 SessionStatus, TokenUsage,
2529 };
2530
2531 #[tokio::test]
2532 async fn storage_records_session_and_round_data_for_summary_queries() {
2533 let dir = tempdir().expect("temp dir");
2534 let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2535
2536 storage.init().await.expect("init storage");
2537
2538 let started_at = Utc
2539 .with_ymd_and_hms(2026, 2, 10, 10, 0, 0)
2540 .single()
2541 .expect("valid datetime");
2542 storage
2543 .upsert_session_start("session-a", "gpt-4", started_at)
2544 .await
2545 .expect("session started");
2546 storage
2547 .update_session_message_count("session-a", 7, started_at)
2548 .await
2549 .expect("message count update");
2550
2551 storage
2552 .insert_round_start("round-a", "session-a", "gpt-4", started_at)
2553 .await
2554 .expect("round start");
2555 storage
2556 .insert_tool_start("tool-1", "round-a", "session-a", "read_file", started_at)
2557 .await
2558 .expect("tool start");
2559 storage
2560 .complete_tool_call(
2561 "tool-1",
2562 ToolCallCompletion {
2563 completed_at: started_at,
2564 success: true,
2565 error: None,
2566 },
2567 )
2568 .await
2569 .expect("tool completion");
2570 storage
2571 .complete_round(
2572 "round-a",
2573 started_at,
2574 RoundStatus::Success,
2575 TokenUsage {
2576 prompt_tokens: 10,
2577 completion_tokens: 15,
2578 total_tokens: 25,
2579 },
2580 3,
2581 None,
2582 )
2583 .await
2584 .expect("round completion");
2585 storage
2586 .complete_session("session-a", SessionStatus::Completed, started_at)
2587 .await
2588 .expect("session completion");
2589
2590 let summary = storage
2591 .summary(MetricsDateFilter::default())
2592 .await
2593 .expect("summary query");
2594
2595 assert_eq!(summary.total_sessions, 1);
2596 assert_eq!(summary.total_tokens.total_tokens, 25);
2597 assert_eq!(summary.total_tool_calls, 1);
2598 assert_eq!(summary.prompt_cached_tool_outputs, 3);
2599
2600 let detail = storage
2601 .session_detail("session-a")
2602 .await
2603 .expect("session detail query")
2604 .expect("session detail should exist");
2605 assert_eq!(detail.session.prompt_cached_tool_outputs, 3);
2606 assert_eq!(detail.rounds.len(), 1);
2607 assert_eq!(detail.rounds[0].prompt_cached_tool_outputs, 3);
2608 }
2609
2610 #[tokio::test]
2611 async fn storage_filters_sessions_and_returns_tool_breakdown() {
2612 let dir = tempdir().expect("temp dir");
2613 let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2614
2615 storage.init().await.expect("init storage");
2616
2617 let day_a = Utc
2618 .with_ymd_and_hms(2026, 2, 1, 9, 0, 0)
2619 .single()
2620 .expect("valid datetime");
2621 let day_b = Utc
2622 .with_ymd_and_hms(2026, 2, 5, 9, 0, 0)
2623 .single()
2624 .expect("valid datetime");
2625
2626 storage
2627 .upsert_session_start("s1", "gpt-4", day_a)
2628 .await
2629 .expect("session start");
2630 storage
2631 .insert_round_start("r1", "s1", "gpt-4", day_a)
2632 .await
2633 .expect("round start");
2634 storage
2635 .insert_tool_start("t1", "r1", "s1", "read_file", day_a)
2636 .await
2637 .expect("tool start");
2638 storage
2639 .complete_tool_call(
2640 "t1",
2641 ToolCallCompletion {
2642 completed_at: day_a,
2643 success: true,
2644 error: None,
2645 },
2646 )
2647 .await
2648 .expect("tool complete");
2649 storage
2650 .complete_round(
2651 "r1",
2652 day_a,
2653 RoundStatus::Success,
2654 TokenUsage {
2655 prompt_tokens: 1,
2656 completion_tokens: 1,
2657 total_tokens: 2,
2658 },
2659 0,
2660 None,
2661 )
2662 .await
2663 .expect("round complete");
2664
2665 storage
2666 .upsert_session_start("s2", "claude-3", day_b)
2667 .await
2668 .expect("session start");
2669
2670 let sessions = storage
2671 .sessions(SessionMetricsFilter {
2672 start_date: Some(NaiveDate::from_ymd_opt(2026, 2, 1).expect("valid date")),
2673 end_date: Some(NaiveDate::from_ymd_opt(2026, 2, 3).expect("valid date")),
2674 model: Some("gpt-4".to_string()),
2675 limit: Some(100),
2676 })
2677 .await
2678 .expect("sessions query");
2679
2680 assert_eq!(sessions.len(), 1);
2681 assert_eq!(sessions[0].session_id, "s1");
2682 assert_eq!(sessions[0].tool_breakdown.get("read_file"), Some(&1));
2683 }
2684
2685 #[tokio::test]
2686 async fn storage_produces_daily_rollups_with_model_and_tool_breakdowns() {
2687 let dir = tempdir().expect("temp dir");
2688 let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2689
2690 storage.init().await.expect("init storage");
2691
2692 let now = Utc
2693 .with_ymd_and_hms(2026, 2, 10, 12, 0, 0)
2694 .single()
2695 .expect("valid datetime");
2696 storage
2697 .upsert_session_start("daily-1", "gpt-4", now)
2698 .await
2699 .expect("session start");
2700 storage
2701 .insert_round_start("daily-r1", "daily-1", "gpt-4", now)
2702 .await
2703 .expect("round start");
2704 storage
2705 .insert_tool_start("daily-t1", "daily-r1", "daily-1", "write_file", now)
2706 .await
2707 .expect("tool start");
2708 storage
2709 .complete_tool_call(
2710 "daily-t1",
2711 ToolCallCompletion {
2712 completed_at: now,
2713 success: true,
2714 error: None,
2715 },
2716 )
2717 .await
2718 .expect("tool complete");
2719 storage
2720 .complete_round(
2721 "daily-r1",
2722 now,
2723 RoundStatus::Success,
2724 TokenUsage {
2725 prompt_tokens: 3,
2726 completion_tokens: 7,
2727 total_tokens: 10,
2728 },
2729 0,
2730 None,
2731 )
2732 .await
2733 .expect("round completion");
2734
2735 let daily = storage
2736 .daily_metrics(
2737 7,
2738 Some(NaiveDate::from_ymd_opt(2026, 2, 10).expect("valid date")),
2739 )
2740 .await
2741 .expect("daily metrics");
2742
2743 assert_eq!(daily.len(), 1);
2744 let row = &daily[0];
2745 assert_eq!(row.total_sessions, 1);
2746 assert_eq!(row.total_rounds, 1);
2747 assert_eq!(row.total_tool_calls, 1);
2748 assert_eq!(
2749 row.model_breakdown
2750 .get("gpt-4")
2751 .map(|usage| usage.total_tokens),
2752 Some(10)
2753 );
2754 assert_eq!(
2755 row.tool_breakdown,
2756 HashMap::from([(String::from("write_file"), 1)])
2757 );
2758 }
2759
2760 #[tokio::test]
2761 async fn storage_reconciles_stale_running_sessions_rounds_and_forwards() {
2762 let dir = tempdir().expect("temp dir");
2763 let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2764
2765 storage.init().await.expect("init storage");
2766
2767 let now = Utc
2768 .with_ymd_and_hms(2026, 2, 12, 9, 0, 0)
2769 .single()
2770 .expect("valid datetime");
2771
2772 storage
2773 .upsert_session_start("stale-await", "gpt-4", now)
2774 .await
2775 .expect("await session start");
2776 storage
2777 .insert_round_start("round-await", "stale-await", "gpt-4", now)
2778 .await
2779 .expect("await round start");
2780
2781 storage
2782 .upsert_session_start("stale-complete", "gpt-4", now)
2783 .await
2784 .expect("complete session start");
2785 storage
2786 .insert_round_start("round-complete", "stale-complete", "gpt-4", now)
2787 .await
2788 .expect("complete round start");
2789
2790 storage
2791 .insert_forward_start(
2792 "forward-pending",
2793 "/v1/chat/completions",
2794 "gpt-4",
2795 false,
2796 now,
2797 )
2798 .await
2799 .expect("forward start");
2800
2801 storage
2802 .reconcile_stale_executions(&[], &[String::from("stale-await")])
2803 .await
2804 .expect("reconcile stale executions");
2805
2806 let sessions = storage
2807 .sessions(SessionMetricsFilter::default())
2808 .await
2809 .expect("sessions query");
2810 let stale_await = sessions
2811 .iter()
2812 .find(|session| session.session_id == "stale-await")
2813 .expect("stale-await should exist");
2814 let stale_complete = sessions
2815 .iter()
2816 .find(|session| session.session_id == "stale-complete")
2817 .expect("stale-complete should exist");
2818 assert_eq!(stale_await.status, SessionStatus::AwaitingResponse);
2819 assert_eq!(stale_complete.status, SessionStatus::Completed);
2820 assert!(stale_await.completed_at.is_some());
2821 assert!(stale_complete.completed_at.is_some());
2822
2823 let await_detail = storage
2824 .session_detail("stale-await")
2825 .await
2826 .expect("await detail query")
2827 .expect("await detail exists");
2828 let complete_detail = storage
2829 .session_detail("stale-complete")
2830 .await
2831 .expect("complete detail query")
2832 .expect("complete detail exists");
2833 assert_eq!(await_detail.rounds[0].status, RoundStatus::Error);
2834 assert_eq!(complete_detail.rounds[0].status, RoundStatus::Error);
2835 assert_eq!(
2836 await_detail.rounds[0].error.as_deref(),
2837 Some("reconciled_stale_round")
2838 );
2839 assert_eq!(
2840 complete_detail.rounds[0].error.as_deref(),
2841 Some("reconciled_stale_round")
2842 );
2843
2844 let forward_requests = storage
2845 .forward_requests(ForwardMetricsFilter::default())
2846 .await
2847 .expect("forward requests query");
2848 assert_eq!(forward_requests.len(), 1);
2849 assert_eq!(forward_requests[0].status, Some(ForwardStatus::Error));
2850 assert_eq!(
2851 forward_requests[0].error.as_deref(),
2852 Some("reconciled_stale_forward")
2853 );
2854
2855 let forward_summary = storage
2856 .forward_summary(ForwardMetricsFilter::default())
2857 .await
2858 .expect("forward summary query");
2859 assert_eq!(forward_summary.total_requests, 1);
2860 assert_eq!(forward_summary.successful_requests, 0);
2861 assert_eq!(forward_summary.failed_requests, 1);
2862
2863 let summary = storage
2864 .summary(MetricsDateFilter::default())
2865 .await
2866 .expect("summary query");
2867 assert_eq!(summary.total_sessions, 2);
2868 assert_eq!(summary.active_sessions, 0);
2869 assert_eq!(summary.awaiting_response_sessions, 1);
2870 assert_eq!(summary.completed_sessions, 1);
2871 assert_eq!(summary.error_sessions, 0);
2872 assert_eq!(summary.cancelled_sessions, 0);
2873 }
2874
2875 #[tokio::test]
2876 async fn storage_summarizes_execute_sync_mismatches_by_reason() {
2877 let dir = tempdir().expect("temp dir");
2878 let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2879
2880 storage.init().await.expect("init storage");
2881
2882 let day_a = Utc
2883 .with_ymd_and_hms(2026, 2, 10, 10, 0, 0)
2884 .single()
2885 .expect("valid datetime");
2886 let day_b = Utc
2887 .with_ymd_and_hms(2026, 2, 11, 10, 0, 0)
2888 .single()
2889 .expect("valid datetime");
2890
2891 storage
2892 .increment_execute_sync_mismatch("message_count", day_a)
2893 .await
2894 .expect("message_count mismatch one");
2895 storage
2896 .increment_execute_sync_mismatch("message_count", day_a)
2897 .await
2898 .expect("message_count mismatch two");
2899 storage
2900 .increment_execute_sync_mismatch("pending_question", day_a)
2901 .await
2902 .expect("pending question mismatch");
2903 storage
2904 .increment_execute_sync_mismatch("last_message_id", day_b)
2905 .await
2906 .expect("last_message_id mismatch");
2907
2908 let day_a_summary = storage
2909 .summary(MetricsDateFilter {
2910 start_date: Some(NaiveDate::from_ymd_opt(2026, 2, 10).expect("valid date")),
2911 end_date: Some(NaiveDate::from_ymd_opt(2026, 2, 10).expect("valid date")),
2912 })
2913 .await
2914 .expect("day a summary");
2915
2916 assert_eq!(day_a_summary.total_sync_mismatches, 3);
2917 assert_eq!(
2918 day_a_summary.sync_mismatch_breakdown,
2919 HashMap::from([
2920 (String::from("message_count"), 2_u64),
2921 (String::from("pending_question"), 1_u64),
2922 ])
2923 );
2924
2925 let full_summary = storage
2926 .summary(MetricsDateFilter::default())
2927 .await
2928 .expect("full summary");
2929 assert_eq!(full_summary.total_sync_mismatches, 4);
2930 assert_eq!(
2931 full_summary.sync_mismatch_breakdown.get("last_message_id"),
2932 Some(&1_u64)
2933 );
2934 }
2935}