1use std::collections::HashMap;
84use std::path::{Path, PathBuf};
85
86use async_trait::async_trait;
87use chrono::{DateTime, NaiveDate, Utc};
88use rusqlite::{params, params_from_iter, Connection, OptionalExtension};
89use thiserror::Error;
90
91use crate::metrics::types::{
92 DailyMetrics, ForwardEndpointMetrics, ForwardMetricsFilter, ForwardMetricsSummary,
93 ForwardRequestMetrics, ForwardStatus, MetricsDateFilter, MetricsSummary, ModelMetrics,
94 RoundMetrics, RoundStatus, SessionDetail, SessionMetrics, SessionMetricsFilter, SessionStatus,
95 TokenUsage, ToolCallMetrics,
96};
97
98pub type MetricsResult<T> = Result<T, MetricsError>;
103
104#[derive(Debug, Error)]
109pub enum MetricsError {
110 #[error("sqlite error: {0}")]
115 Sqlite(#[from] rusqlite::Error),
116
117 #[error("time parse error: {0}")]
122 Chrono(#[from] chrono::ParseError),
123
124 #[error("io error: {0}")]
129 Io(#[from] std::io::Error),
130
131 #[error("storage task join error: {0}")]
136 Task(String),
137
138 #[error("invalid metrics data: {0}")]
143 InvalidData(String),
144}
145
146#[derive(Debug, Clone)]
170pub struct ToolCallCompletion {
171 pub completed_at: DateTime<Utc>,
173 pub success: bool,
175 pub error: Option<String>,
177}
178
179#[async_trait]
232pub trait MetricsStorage: Send + Sync {
233 async fn init(&self) -> MetricsResult<()>;
242
243 async fn upsert_session_start(
260 &self,
261 session_id: &str,
262 model: &str,
263 started_at: DateTime<Utc>,
264 ) -> MetricsResult<()>;
265
266 async fn update_session_message_count(
276 &self,
277 session_id: &str,
278 message_count: u32,
279 updated_at: DateTime<Utc>,
280 ) -> MetricsResult<()>;
281
282 async fn complete_session(
292 &self,
293 session_id: &str,
294 status: SessionStatus,
295 completed_at: DateTime<Utc>,
296 ) -> MetricsResult<()>;
297
298 async fn insert_round_start(
310 &self,
311 round_id: &str,
312 session_id: &str,
313 model: &str,
314 started_at: DateTime<Utc>,
315 ) -> MetricsResult<()>;
316
317 async fn complete_round(
330 &self,
331 round_id: &str,
332 completed_at: DateTime<Utc>,
333 status: RoundStatus,
334 usage: TokenUsage,
335 prompt_cached_tool_outputs: u32,
336 prompt_cached_tool_tokens_saved: u32,
337 error: Option<String>,
338 ) -> MetricsResult<()>;
339
340 async fn record_round_compression(
342 &self,
343 round_id: &str,
344 compressed_at: DateTime<Utc>,
345 tokens_saved: u32,
346 ) -> MetricsResult<()>;
347
348 async fn insert_tool_start(
361 &self,
362 tool_call_id: &str,
363 round_id: &str,
364 session_id: &str,
365 tool_name: &str,
366 started_at: DateTime<Utc>,
367 ) -> MetricsResult<()>;
368
369 async fn complete_tool_call(
379 &self,
380 tool_call_id: &str,
381 completion: ToolCallCompletion,
382 ) -> MetricsResult<()>;
383
384 async fn insert_forward_start(
398 &self,
399 forward_id: &str,
400 endpoint: &str,
401 model: &str,
402 is_stream: bool,
403 started_at: DateTime<Utc>,
404 ) -> MetricsResult<()>;
405
406 async fn complete_forward(
420 &self,
421 forward_id: &str,
422 completed_at: DateTime<Utc>,
423 status_code: Option<u16>,
424 status: ForwardStatus,
425 usage: Option<TokenUsage>,
426 error: Option<String>,
427 ) -> MetricsResult<()>;
428
429 async fn forward_summary(
457 &self,
458 filter: ForwardMetricsFilter,
459 ) -> MetricsResult<ForwardMetricsSummary>;
460
461 async fn forward_by_endpoint(
483 &self,
484 filter: ForwardMetricsFilter,
485 ) -> MetricsResult<Vec<ForwardEndpointMetrics>>;
486
487 async fn forward_requests(
510 &self,
511 filter: ForwardMetricsFilter,
512 ) -> MetricsResult<Vec<ForwardRequestMetrics>>;
513
514 async fn forward_daily_metrics(
537 &self,
538 days: u32,
539 end_date: Option<NaiveDate>,
540 ) -> MetricsResult<Vec<DailyMetrics>>;
541
542 async fn summary(&self, filter: MetricsDateFilter) -> MetricsResult<MetricsSummary>;
566
567 async fn by_model(&self, filter: MetricsDateFilter) -> MetricsResult<Vec<ModelMetrics>>;
589
590 async fn sessions(&self, filter: SessionMetricsFilter) -> MetricsResult<Vec<SessionMetrics>>;
620
621 async fn session_detail(&self, session_id: &str) -> MetricsResult<Option<SessionDetail>>;
649
650 async fn increment_execute_sync_mismatch(
652 &self,
653 reason: &str,
654 occurred_at: DateTime<Utc>,
655 ) -> MetricsResult<()>;
656
657 async fn daily_metrics(
685 &self,
686 days: u32,
687 end_date: Option<NaiveDate>,
688 ) -> MetricsResult<Vec<DailyMetrics>>;
689
690 async fn prune_rounds_before(&self, cutoff: DateTime<Utc>) -> MetricsResult<u64>;
718
719 async fn reconcile_stale_executions(
721 &self,
722 active_session_ids: &[String],
723 awaiting_response_session_ids: &[String],
724 ) -> MetricsResult<()>;
725}
726
727#[derive(Debug, Clone)]
796pub struct SqliteMetricsStorage {
797 db_path: PathBuf,
799}
800
801impl SqliteMetricsStorage {
802 pub fn new(db_path: impl AsRef<Path>) -> Self {
820 Self {
821 db_path: db_path.as_ref().to_path_buf(),
822 }
823 }
824
825 async fn with_connection<T, F>(&self, func: F) -> MetricsResult<T>
848 where
849 T: Send + 'static,
850 F: FnOnce(&Connection) -> MetricsResult<T> + Send + 'static,
851 {
852 let db_path = self.db_path.clone();
853 tokio::task::spawn_blocking(move || {
854 let connection = open_connection(&db_path)?;
855 func(&connection)
856 })
857 .await
858 .map_err(|error| MetricsError::Task(error.to_string()))?
859 }
860}
861
862#[async_trait]
863impl MetricsStorage for SqliteMetricsStorage {
864 async fn init(&self) -> MetricsResult<()> {
865 self.with_connection(|connection| {
866 connection.execute_batch(
867 r#"
868 CREATE TABLE IF NOT EXISTS session_metrics (
869 session_id TEXT PRIMARY KEY,
870 model TEXT NOT NULL,
871 started_at TEXT NOT NULL,
872 completed_at TEXT,
873 status TEXT NOT NULL DEFAULT 'running',
874 total_rounds INTEGER NOT NULL DEFAULT 0,
875 prompt_tokens INTEGER NOT NULL DEFAULT 0,
876 completion_tokens INTEGER NOT NULL DEFAULT 0,
877 total_tokens INTEGER NOT NULL DEFAULT 0,
878 prompt_cached_tool_outputs INTEGER NOT NULL DEFAULT 0,
879 prompt_cached_tool_tokens_saved INTEGER NOT NULL DEFAULT 0,
880 total_compression_events INTEGER NOT NULL DEFAULT 0,
881 total_tokens_saved INTEGER NOT NULL DEFAULT 0,
882 tool_call_count INTEGER NOT NULL DEFAULT 0,
883 message_count INTEGER NOT NULL DEFAULT 0,
884 updated_at TEXT NOT NULL
885 );
886
887 CREATE TABLE IF NOT EXISTS round_metrics (
888 round_id TEXT PRIMARY KEY,
889 session_id TEXT NOT NULL,
890 model TEXT NOT NULL,
891 started_at TEXT NOT NULL,
892 completed_at TEXT,
893 status TEXT NOT NULL DEFAULT 'running',
894 prompt_tokens INTEGER NOT NULL DEFAULT 0,
895 completion_tokens INTEGER NOT NULL DEFAULT 0,
896 total_tokens INTEGER NOT NULL DEFAULT 0,
897 prompt_cached_tool_outputs INTEGER NOT NULL DEFAULT 0,
898 prompt_cached_tool_tokens_saved INTEGER NOT NULL DEFAULT 0,
899 compression_count INTEGER NOT NULL DEFAULT 0,
900 tokens_saved INTEGER NOT NULL DEFAULT 0,
901 error TEXT,
902 FOREIGN KEY(session_id) REFERENCES session_metrics(session_id) ON DELETE CASCADE
903 );
904
905 CREATE TABLE IF NOT EXISTS tool_call_metrics (
906 tool_call_id TEXT PRIMARY KEY,
907 round_id TEXT NOT NULL,
908 session_id TEXT NOT NULL,
909 tool_name TEXT NOT NULL,
910 started_at TEXT NOT NULL,
911 completed_at TEXT,
912 success INTEGER,
913 error TEXT,
914 FOREIGN KEY(round_id) REFERENCES round_metrics(round_id) ON DELETE CASCADE,
915 FOREIGN KEY(session_id) REFERENCES session_metrics(session_id) ON DELETE CASCADE
916 );
917
918 CREATE INDEX IF NOT EXISTS idx_session_started_at ON session_metrics(started_at);
919 CREATE INDEX IF NOT EXISTS idx_session_model ON session_metrics(model);
920 CREATE INDEX IF NOT EXISTS idx_round_session ON round_metrics(session_id);
921 CREATE INDEX IF NOT EXISTS idx_tool_session ON tool_call_metrics(session_id);
922 CREATE INDEX IF NOT EXISTS idx_tool_started_at ON tool_call_metrics(started_at);
923 CREATE INDEX IF NOT EXISTS idx_tool_name ON tool_call_metrics(tool_name);
924
925 CREATE TABLE IF NOT EXISTS forward_request_metrics (
926 forward_id TEXT PRIMARY KEY,
927 endpoint TEXT NOT NULL,
928 model TEXT NOT NULL,
929 is_stream INTEGER NOT NULL,
930 started_at TEXT NOT NULL,
931 completed_at TEXT,
932 status_code INTEGER,
933 status TEXT NOT NULL DEFAULT 'pending',
934 prompt_tokens INTEGER,
935 completion_tokens INTEGER,
936 total_tokens INTEGER,
937 error TEXT,
938 updated_at TEXT NOT NULL
939 );
940
941 CREATE TABLE IF NOT EXISTS execute_sync_mismatch_metrics (
942 reason TEXT NOT NULL,
943 mismatch_date TEXT NOT NULL,
944 count INTEGER NOT NULL DEFAULT 0,
945 updated_at TEXT NOT NULL,
946 PRIMARY KEY (reason, mismatch_date)
947 );
948
949 CREATE INDEX IF NOT EXISTS idx_forward_started_at ON forward_request_metrics(started_at);
950 CREATE INDEX IF NOT EXISTS idx_forward_endpoint ON forward_request_metrics(endpoint);
951 CREATE INDEX IF NOT EXISTS idx_forward_model ON forward_request_metrics(model);
952 CREATE INDEX IF NOT EXISTS idx_execute_sync_mismatch_date ON execute_sync_mismatch_metrics(mismatch_date);
953 CREATE INDEX IF NOT EXISTS idx_execute_sync_mismatch_reason ON execute_sync_mismatch_metrics(reason);
954 "#,
955 )?;
956 ensure_integer_column(
957 connection,
958 "session_metrics",
959 "prompt_cached_tool_outputs",
960 0,
961 )?;
962 ensure_integer_column(connection, "session_metrics", "prompt_cached_tool_tokens_saved", 0)?;
963 ensure_integer_column(connection, "session_metrics", "total_compression_events", 0)?;
964 ensure_integer_column(connection, "session_metrics", "total_tokens_saved", 0)?;
965 ensure_integer_column(connection, "round_metrics", "prompt_cached_tool_outputs", 0)?;
966 ensure_integer_column(connection, "round_metrics", "prompt_cached_tool_tokens_saved", 0)?;
967 ensure_integer_column(connection, "round_metrics", "compression_count", 0)?;
968 ensure_integer_column(connection, "round_metrics", "tokens_saved", 0)?;
969 connection.execute(
970 "UPDATE forward_request_metrics SET status = 'pending' WHERE status IS NULL OR trim(status) = ''",
971 [],
972 )?;
973 Ok(())
974 })
975 .await
976 }
977
978 async fn upsert_session_start(
979 &self,
980 session_id: &str,
981 model: &str,
982 started_at: DateTime<Utc>,
983 ) -> MetricsResult<()> {
984 let session_id = session_id.to_string();
985 let model = model.to_string();
986 let started_at = format_timestamp(started_at);
987
988 self.with_connection(move |connection| {
989 connection.execute(
990 r#"
991 INSERT INTO session_metrics (
992 session_id, model, started_at, status, updated_at
993 ) VALUES (?1, ?2, ?3, 'running', ?3)
994 ON CONFLICT(session_id) DO UPDATE SET
995 model = excluded.model,
996 started_at = CASE
997 WHEN session_metrics.started_at <= excluded.started_at THEN session_metrics.started_at
998 ELSE excluded.started_at
999 END,
1000 completed_at = NULL,
1001 status = 'running',
1002 updated_at = excluded.updated_at
1003 "#,
1004 params![session_id, model, started_at],
1005 )?;
1006 Ok(())
1007 })
1008 .await
1009 }
1010
1011 async fn update_session_message_count(
1012 &self,
1013 session_id: &str,
1014 message_count: u32,
1015 updated_at: DateTime<Utc>,
1016 ) -> MetricsResult<()> {
1017 let session_id = session_id.to_string();
1018 let updated_at = format_timestamp(updated_at);
1019
1020 self.with_connection(move |connection| {
1021 connection.execute(
1022 "UPDATE session_metrics SET message_count = ?1, updated_at = ?2 WHERE session_id = ?3",
1023 params![i64::from(message_count), updated_at, session_id],
1024 )?;
1025 Ok(())
1026 })
1027 .await
1028 }
1029
1030 async fn complete_session(
1031 &self,
1032 session_id: &str,
1033 status: SessionStatus,
1034 completed_at: DateTime<Utc>,
1035 ) -> MetricsResult<()> {
1036 let session_id = session_id.to_string();
1037 let completed_at_str = format_timestamp(completed_at);
1038
1039 self.with_connection(move |connection| {
1040 refresh_session_aggregates(connection, &session_id, completed_at)?;
1041 connection.execute(
1042 "UPDATE session_metrics SET status = ?1, completed_at = ?2, updated_at = ?2 WHERE session_id = ?3",
1043 params![status.as_str(), completed_at_str, session_id],
1044 )?;
1045 Ok(())
1046 })
1047 .await
1048 }
1049
1050 async fn insert_round_start(
1051 &self,
1052 round_id: &str,
1053 session_id: &str,
1054 model: &str,
1055 started_at: DateTime<Utc>,
1056 ) -> MetricsResult<()> {
1057 let round_id = round_id.to_string();
1058 let session_id = session_id.to_string();
1059 let model = model.to_string();
1060 let started_at_str = format_timestamp(started_at);
1061
1062 self.with_connection(move |connection| {
1063 connection.execute(
1064 r#"
1065 INSERT INTO round_metrics (
1066 round_id, session_id, model, started_at, status
1067 ) VALUES (?1, ?2, ?3, ?4, 'running')
1068 ON CONFLICT(round_id) DO NOTHING
1069 "#,
1070 params![round_id, session_id, model, started_at_str],
1071 )?;
1072 refresh_session_aggregates(connection, &session_id, started_at)?;
1073 Ok(())
1074 })
1075 .await
1076 }
1077
1078 async fn complete_round(
1079 &self,
1080 round_id: &str,
1081 completed_at: DateTime<Utc>,
1082 status: RoundStatus,
1083 usage: TokenUsage,
1084 prompt_cached_tool_outputs: u32,
1085 prompt_cached_tool_tokens_saved: u32,
1086 error: Option<String>,
1087 ) -> MetricsResult<()> {
1088 let round_id = round_id.to_string();
1089 let completed_at_str = format_timestamp(completed_at);
1090
1091 self.with_connection(move |connection| {
1092 let session_id: String = connection.query_row(
1093 "SELECT session_id FROM round_metrics WHERE round_id = ?1",
1094 params![round_id],
1095 |row| row.get(0),
1096 )?;
1097
1098 connection.execute(
1099 r#"
1100 UPDATE round_metrics
1101 SET completed_at = ?1,
1102 status = ?2,
1103 prompt_tokens = ?3,
1104 completion_tokens = ?4,
1105 total_tokens = ?5,
1106 prompt_cached_tool_outputs = ?6,
1107 prompt_cached_tool_tokens_saved = COALESCE(prompt_cached_tool_tokens_saved, 0) + ?7,
1108 tokens_saved = COALESCE(tokens_saved, 0) + ?8,
1109 error = ?9
1110 WHERE round_id = ?10
1111 "#,
1112 params![
1113 completed_at_str,
1114 status.as_str(),
1115 usage.prompt_tokens as i64,
1116 usage.completion_tokens as i64,
1117 usage.total_tokens as i64,
1118 i64::from(prompt_cached_tool_outputs),
1119 i64::from(prompt_cached_tool_tokens_saved),
1120 i64::from(prompt_cached_tool_tokens_saved),
1121 error,
1122 round_id,
1123 ],
1124 )?;
1125
1126 refresh_session_aggregates(connection, &session_id, completed_at)?;
1127 Ok(())
1128 })
1129 .await
1130 }
1131
1132 async fn record_round_compression(
1133 &self,
1134 round_id: &str,
1135 compressed_at: DateTime<Utc>,
1136 tokens_saved: u32,
1137 ) -> MetricsResult<()> {
1138 let round_id = round_id.to_string();
1139
1140 self.with_connection(move |connection| {
1141 let session_id: String = connection.query_row(
1142 "SELECT session_id FROM round_metrics WHERE round_id = ?1",
1143 params![round_id],
1144 |row| row.get(0),
1145 )?;
1146
1147 connection.execute(
1148 r#"
1149 UPDATE round_metrics
1150 SET compression_count = COALESCE(compression_count, 0) + 1,
1151 tokens_saved = COALESCE(tokens_saved, 0) + ?1
1152 WHERE round_id = ?2
1153 "#,
1154 params![i64::from(tokens_saved), round_id],
1155 )?;
1156
1157 refresh_session_aggregates(connection, &session_id, compressed_at)?;
1158 Ok(())
1159 })
1160 .await
1161 }
1162
1163 async fn insert_tool_start(
1164 &self,
1165 tool_call_id: &str,
1166 round_id: &str,
1167 session_id: &str,
1168 tool_name: &str,
1169 started_at: DateTime<Utc>,
1170 ) -> MetricsResult<()> {
1171 let tool_call_id = tool_call_id.to_string();
1172 let round_id = round_id.to_string();
1173 let session_id = session_id.to_string();
1174 let tool_name = tool_name.to_string();
1175 let started_at_str = format_timestamp(started_at);
1176
1177 self.with_connection(move |connection| {
1178 connection.execute(
1179 r#"
1180 INSERT INTO tool_call_metrics (
1181 tool_call_id, round_id, session_id, tool_name, started_at
1182 ) VALUES (?1, ?2, ?3, ?4, ?5)
1183 ON CONFLICT(tool_call_id) DO UPDATE SET
1184 round_id = excluded.round_id,
1185 session_id = excluded.session_id,
1186 tool_name = excluded.tool_name,
1187 started_at = excluded.started_at
1188 "#,
1189 params![
1190 tool_call_id,
1191 round_id,
1192 session_id,
1193 tool_name,
1194 started_at_str
1195 ],
1196 )?;
1197 Ok(())
1198 })
1199 .await
1200 }
1201
1202 async fn complete_tool_call(
1203 &self,
1204 tool_call_id: &str,
1205 completion: ToolCallCompletion,
1206 ) -> MetricsResult<()> {
1207 let tool_call_id = tool_call_id.to_string();
1208 let completed_at = format_timestamp(completion.completed_at);
1209 let success = if completion.success { 1_i64 } else { 0_i64 };
1210 let error = completion.error;
1211
1212 self.with_connection(move |connection| {
1213 let session_id: String = connection.query_row(
1214 "SELECT session_id FROM tool_call_metrics WHERE tool_call_id = ?1",
1215 params![tool_call_id],
1216 |row| row.get(0),
1217 )?;
1218
1219 connection.execute(
1220 "UPDATE tool_call_metrics SET completed_at = ?1, success = ?2, error = ?3 WHERE tool_call_id = ?4",
1221 params![completed_at, success, error, tool_call_id],
1222 )?;
1223
1224 refresh_session_aggregates(connection, &session_id, completion.completed_at)?;
1225 Ok(())
1226 })
1227 .await
1228 }
1229
1230 async fn insert_forward_start(
1231 &self,
1232 forward_id: &str,
1233 endpoint: &str,
1234 model: &str,
1235 is_stream: bool,
1236 started_at: DateTime<Utc>,
1237 ) -> MetricsResult<()> {
1238 let forward_id = forward_id.to_string();
1239 let endpoint = endpoint.to_string();
1240 let model = model.to_string();
1241 let is_stream_int = if is_stream { 1_i64 } else { 0_i64 };
1242 let started_at_str = format_timestamp(started_at);
1243
1244 self.with_connection(move |connection| {
1245 connection.execute(
1246 r#"
1247 INSERT INTO forward_request_metrics (
1248 forward_id, endpoint, model, is_stream, started_at, status, updated_at
1249 ) VALUES (?1, ?2, ?3, ?4, ?5, 'pending', ?5)
1250 ON CONFLICT(forward_id) DO UPDATE SET
1251 endpoint = excluded.endpoint,
1252 model = excluded.model,
1253 is_stream = excluded.is_stream,
1254 started_at = excluded.started_at,
1255 completed_at = NULL,
1256 status_code = NULL,
1257 status = 'pending',
1258 prompt_tokens = NULL,
1259 completion_tokens = NULL,
1260 total_tokens = NULL,
1261 error = NULL,
1262 updated_at = excluded.updated_at
1263 "#,
1264 params![forward_id, endpoint, model, is_stream_int, started_at_str],
1265 )?;
1266 Ok(())
1267 })
1268 .await
1269 }
1270
1271 async fn complete_forward(
1272 &self,
1273 forward_id: &str,
1274 completed_at: DateTime<Utc>,
1275 status_code: Option<u16>,
1276 status: ForwardStatus,
1277 usage: Option<TokenUsage>,
1278 error: Option<String>,
1279 ) -> MetricsResult<()> {
1280 let forward_id = forward_id.to_string();
1281 let completed_at_str = format_timestamp(completed_at);
1282 let status_code_int = status_code.map(|s| s as i64);
1283 let (prompt, completion, total) = match usage {
1284 Some(u) => (
1285 Some(u.prompt_tokens as i64),
1286 Some(u.completion_tokens as i64),
1287 Some(u.total_tokens as i64),
1288 ),
1289 None => (None, None, None),
1290 };
1291
1292 self.with_connection(move |connection| {
1293 connection.execute(
1294 r#"
1295 UPDATE forward_request_metrics
1296 SET completed_at = ?1,
1297 status_code = ?2,
1298 status = ?3,
1299 prompt_tokens = ?4,
1300 completion_tokens = ?5,
1301 total_tokens = ?6,
1302 error = ?7,
1303 updated_at = ?1
1304 WHERE forward_id = ?8
1305 "#,
1306 params![
1307 completed_at_str,
1308 status_code_int,
1309 status.as_str(),
1310 prompt,
1311 completion,
1312 total,
1313 error,
1314 forward_id,
1315 ],
1316 )?;
1317 Ok(())
1318 })
1319 .await
1320 }
1321
1322 async fn forward_summary(
1323 &self,
1324 filter: ForwardMetricsFilter,
1325 ) -> MetricsResult<ForwardMetricsSummary> {
1326 self.with_connection(move |connection| {
1327 let mut params_vec = Vec::new();
1328 let where_clause = build_forward_where_clause(
1329 filter.start_date,
1330 filter.end_date,
1331 filter.endpoint.as_deref(),
1332 filter.model.as_deref(),
1333 &mut params_vec,
1334 );
1335
1336 let sql = format!(
1337 "SELECT COUNT(*), \
1338 COALESCE(SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END), 0), \
1339 COALESCE(SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END), 0), \
1340 COALESCE(SUM(prompt_tokens), 0), \
1341 COALESCE(SUM(completion_tokens), 0), \
1342 COALESCE(SUM(total_tokens), 0), \
1343 AVG(CASE WHEN completed_at IS NOT NULL THEN \
1344 (julianday(completed_at) - julianday(started_at)) * 86400000 END) \
1345 FROM forward_request_metrics {}",
1346 where_clause
1347 );
1348
1349 let mut stmt = connection.prepare(&sql)?;
1350 let summary = stmt.query_row(params_from_iter(params_vec.iter()), |row| {
1351 let avg_duration: Option<f64> = row.get(6)?;
1352 Ok(ForwardMetricsSummary {
1353 total_requests: row.get::<_, i64>(0)? as u64,
1354 successful_requests: row.get::<_, i64>(1)? as u64,
1355 failed_requests: row.get::<_, i64>(2)? as u64,
1356 total_tokens: TokenUsage {
1357 prompt_tokens: row.get::<_, i64>(3)? as u64,
1358 completion_tokens: row.get::<_, i64>(4)? as u64,
1359 total_tokens: row.get::<_, i64>(5)? as u64,
1360 },
1361 avg_duration_ms: avg_duration.map(|d| d as u64),
1362 })
1363 })?;
1364
1365 Ok(summary)
1366 })
1367 .await
1368 }
1369
1370 async fn forward_by_endpoint(
1371 &self,
1372 filter: ForwardMetricsFilter,
1373 ) -> MetricsResult<Vec<ForwardEndpointMetrics>> {
1374 self.with_connection(move |connection| {
1375 let mut params_vec = Vec::new();
1376 let where_clause = build_forward_where_clause(
1377 filter.start_date,
1378 filter.end_date,
1379 None,
1380 filter.model.as_deref(),
1381 &mut params_vec,
1382 );
1383
1384 let sql = format!(
1385 "SELECT endpoint, COUNT(*), \
1386 COALESCE(SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END), 0), \
1387 COALESCE(SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END), 0), \
1388 COALESCE(SUM(prompt_tokens), 0), \
1389 COALESCE(SUM(completion_tokens), 0), \
1390 COALESCE(SUM(total_tokens), 0), \
1391 AVG(CASE WHEN completed_at IS NOT NULL THEN \
1392 (julianday(completed_at) - julianday(started_at)) * 86400000 END) \
1393 FROM forward_request_metrics {} \
1394 GROUP BY endpoint ORDER BY COUNT(*) DESC",
1395 where_clause
1396 );
1397
1398 let mut stmt = connection.prepare(&sql)?;
1399 let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1400 let mut endpoints = Vec::new();
1401
1402 while let Some(row) = rows.next()? {
1403 let avg_duration: Option<f64> = row.get(7)?;
1404 endpoints.push(ForwardEndpointMetrics {
1405 endpoint: row.get(0)?,
1406 requests: row.get::<_, i64>(1)? as u64,
1407 successful: row.get::<_, i64>(2)? as u64,
1408 failed: row.get::<_, i64>(3)? as u64,
1409 tokens: TokenUsage {
1410 prompt_tokens: row.get::<_, i64>(4)? as u64,
1411 completion_tokens: row.get::<_, i64>(5)? as u64,
1412 total_tokens: row.get::<_, i64>(6)? as u64,
1413 },
1414 avg_duration_ms: avg_duration.map(|d| d as u64),
1415 });
1416 }
1417
1418 Ok(endpoints)
1419 })
1420 .await
1421 }
1422
1423 async fn forward_requests(
1424 &self,
1425 filter: ForwardMetricsFilter,
1426 ) -> MetricsResult<Vec<ForwardRequestMetrics>> {
1427 self.with_connection(move |connection| {
1428 let mut params_vec = Vec::new();
1429 let where_clause = build_forward_where_clause(
1430 filter.start_date,
1431 filter.end_date,
1432 filter.endpoint.as_deref(),
1433 filter.model.as_deref(),
1434 &mut params_vec,
1435 );
1436
1437 let limit = i64::from(filter.limit.unwrap_or(100).min(1_000));
1438 let sql = format!(
1439 "SELECT forward_id, endpoint, model, is_stream, started_at, completed_at, \
1440 status_code, status, prompt_tokens, completion_tokens, total_tokens, error \
1441 FROM forward_request_metrics {} \
1442 ORDER BY started_at DESC LIMIT {}",
1443 where_clause, limit
1444 );
1445
1446 let mut stmt = connection.prepare(&sql)?;
1447 let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1448 let mut requests = Vec::new();
1449
1450 while let Some(row) = rows.next()? {
1451 let started_at = parse_timestamp(row.get::<_, String>(4)?)?;
1452 let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(5)?)?;
1453 let status_raw: Option<String> = row.get(7)?;
1454 let status = status_raw.and_then(|s| ForwardStatus::from_db(&s));
1455
1456 let prompt: Option<i64> = row.get(8)?;
1457 let completion: Option<i64> = row.get(9)?;
1458 let total: Option<i64> = row.get(10)?;
1459 let token_usage = match (prompt, completion, total) {
1460 (Some(p), Some(c), Some(t)) => Some(TokenUsage {
1461 prompt_tokens: p as u64,
1462 completion_tokens: c as u64,
1463 total_tokens: t as u64,
1464 }),
1465 _ => None,
1466 };
1467
1468 requests.push(ForwardRequestMetrics {
1469 forward_id: row.get(0)?,
1470 endpoint: row.get(1)?,
1471 model: row.get(2)?,
1472 is_stream: row.get::<_, i64>(3)? > 0,
1473 started_at,
1474 completed_at,
1475 status_code: row.get::<_, Option<i64>>(6)?.map(|s| s as u16),
1476 status,
1477 token_usage,
1478 error: row.get(11)?,
1479 duration_ms: compute_duration_ms(started_at, completed_at),
1480 });
1481 }
1482
1483 Ok(requests)
1484 })
1485 .await
1486 }
1487
1488 async fn forward_daily_metrics(
1489 &self,
1490 days: u32,
1491 end_date: Option<NaiveDate>,
1492 ) -> MetricsResult<Vec<DailyMetrics>> {
1493 let end_date = end_date.unwrap_or_else(|| Utc::now().date_naive());
1494 let span = days.max(1) - 1;
1495 let start_date = end_date - chrono::Duration::days(i64::from(span));
1496
1497 self.with_connection(move |connection| {
1498 let mut stmt = connection.prepare(
1499 r#"
1500 SELECT
1501 date(started_at) AS date_key,
1502 COUNT(*) AS total_sessions,
1503 COALESCE(SUM(prompt_tokens), 0) AS prompt_tokens,
1504 COALESCE(SUM(completion_tokens), 0) AS completion_tokens,
1505 COALESCE(SUM(total_tokens), 0) AS total_tokens
1506 FROM forward_request_metrics
1507 WHERE date(started_at) BETWEEN date(?1) AND date(?2)
1508 GROUP BY date_key
1509 ORDER BY date_key ASC
1510 "#,
1511 )?;
1512
1513 let mut rows = stmt.query(params![start_date.to_string(), end_date.to_string()])?;
1514 let mut result = Vec::new();
1515
1516 while let Some(row) = rows.next()? {
1517 let date = NaiveDate::parse_from_str(&row.get::<_, String>(0)?, "%Y-%m-%d")?;
1518
1519 result.push(DailyMetrics {
1520 date,
1521 total_sessions: row.get::<_, i64>(1)? as u32,
1522 total_rounds: 0,
1523 total_token_usage: TokenUsage {
1524 prompt_tokens: row.get::<_, i64>(2)? as u64,
1525 completion_tokens: row.get::<_, i64>(3)? as u64,
1526 total_tokens: row.get::<_, i64>(4)? as u64,
1527 },
1528 total_tool_calls: 0,
1529 prompt_cached_tool_outputs: 0,
1530 model_breakdown: HashMap::new(),
1531 tool_breakdown: HashMap::new(),
1532 });
1533 }
1534
1535 Ok(result)
1536 })
1537 .await
1538 }
1539
1540 async fn summary(&self, filter: MetricsDateFilter) -> MetricsResult<MetricsSummary> {
1541 self.with_connection(move |connection| {
1542 let mut params_vec = Vec::new();
1543 let where_clause = build_session_where_clause(
1544 filter.start_date,
1545 filter.end_date,
1546 None,
1547 &mut params_vec,
1548 );
1549
1550 let summary_sql = format!(
1551 "SELECT COUNT(*), COALESCE(SUM(prompt_tokens), 0), COALESCE(SUM(completion_tokens), 0), COALESCE(SUM(total_tokens), 0), COALESCE(SUM(tool_call_count), 0), COALESCE(SUM(prompt_cached_tool_outputs), 0), COALESCE(SUM(prompt_cached_tool_tokens_saved), 0), COALESCE(SUM(total_compression_events), 0), COALESCE(SUM(total_tokens_saved), 0), COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0), COALESCE(SUM(CASE WHEN status = 'awaiting_response' THEN 1 ELSE 0 END), 0), COALESCE(SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END), 0), COALESCE(SUM(CASE WHEN status = 'cancelled' THEN 1 ELSE 0 END), 0) FROM session_metrics {}",
1552 where_clause
1553 );
1554
1555 let mut stmt = connection.prepare(&summary_sql)?;
1556 let mut summary = stmt.query_row(params_from_iter(params_vec.iter()), |row| {
1557 Ok(MetricsSummary {
1558 total_sessions: row.get::<_, i64>(0)? as u64,
1559 total_tokens: TokenUsage {
1560 prompt_tokens: row.get::<_, i64>(1)? as u64,
1561 completion_tokens: row.get::<_, i64>(2)? as u64,
1562 total_tokens: row.get::<_, i64>(3)? as u64,
1563 },
1564 total_tool_calls: row.get::<_, i64>(4)? as u64,
1565 prompt_cached_tool_outputs: row.get::<_, i64>(5)? as u64,
1566 tool_context_tokens_saved: row.get::<_, i64>(6)? as u64,
1567 total_compression_events: row.get::<_, i64>(7)? as u64,
1568 total_tokens_saved: row.get::<_, i64>(8)? as u64,
1569 non_tool_compression_tokens_saved: (row.get::<_, i64>(8)? - row.get::<_, i64>(6)?).max(0) as u64,
1570 completed_sessions: row.get::<_, i64>(9)? as u64,
1571 awaiting_response_sessions: row.get::<_, i64>(10)? as u64,
1572 error_sessions: row.get::<_, i64>(11)? as u64,
1573 cancelled_sessions: row.get::<_, i64>(12)? as u64,
1574 total_sync_mismatches: 0,
1575 sync_mismatch_breakdown: HashMap::new(),
1576 active_sessions: 0,
1577 })
1578 })?;
1579
1580 let mut mismatch_params = Vec::new();
1581 let mismatch_clause = build_execute_sync_mismatch_where_clause(
1582 filter.start_date,
1583 filter.end_date,
1584 None,
1585 &mut mismatch_params,
1586 );
1587 let mismatch_sql = format!(
1588 "SELECT COALESCE(SUM(count), 0) FROM execute_sync_mismatch_metrics {}",
1589 mismatch_clause
1590 );
1591 let mut mismatch_stmt = connection.prepare(&mismatch_sql)?;
1592 summary.total_sync_mismatches = mismatch_stmt
1593 .query_row(params_from_iter(mismatch_params.iter()), |row| row.get::<_, i64>(0))?
1594 as u64;
1595 summary.sync_mismatch_breakdown = load_execute_sync_mismatch_breakdown(
1596 connection,
1597 filter.start_date,
1598 filter.end_date,
1599 )?;
1600 let mut active_params = Vec::new();
1601 let active_clause = build_session_where_clause(
1602 filter.start_date,
1603 filter.end_date,
1604 Some("running"),
1605 &mut active_params,
1606 );
1607 let active_sql = format!(
1608 "SELECT COUNT(*) FROM session_metrics {}",
1609 active_clause
1610 );
1611 let mut active_stmt = connection.prepare(&active_sql)?;
1612 let active_sessions = active_stmt.query_row(params_from_iter(active_params.iter()), |row| {
1613 row.get::<_, i64>(0)
1614 })? as u64;
1615
1616 Ok(MetricsSummary {
1617 active_sessions,
1618 ..summary
1619 })
1620 })
1621 .await
1622 }
1623
1624 async fn by_model(&self, filter: MetricsDateFilter) -> MetricsResult<Vec<ModelMetrics>> {
1625 self.with_connection(move |connection| {
1626 let mut params_vec = Vec::new();
1627 let where_clause = build_session_where_clause(
1628 filter.start_date,
1629 filter.end_date,
1630 None,
1631 &mut params_vec,
1632 );
1633
1634 let sql = format!(
1635 "SELECT model, COUNT(*), COALESCE(SUM(total_rounds), 0), COALESCE(SUM(prompt_tokens), 0), COALESCE(SUM(completion_tokens), 0), COALESCE(SUM(total_tokens), 0), COALESCE(SUM(tool_call_count), 0), COALESCE(SUM(prompt_cached_tool_outputs), 0) FROM session_metrics {} GROUP BY model ORDER BY SUM(total_tokens) DESC",
1636 where_clause
1637 );
1638
1639 let mut stmt = connection.prepare(&sql)?;
1640 let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1641 let mut models = Vec::new();
1642
1643 while let Some(row) = rows.next()? {
1644 models.push(ModelMetrics {
1645 model: row.get(0)?,
1646 sessions: row.get::<_, i64>(1)? as u64,
1647 rounds: row.get::<_, i64>(2)? as u64,
1648 tokens: TokenUsage {
1649 prompt_tokens: row.get::<_, i64>(3)? as u64,
1650 completion_tokens: row.get::<_, i64>(4)? as u64,
1651 total_tokens: row.get::<_, i64>(5)? as u64,
1652 },
1653 tool_calls: row.get::<_, i64>(6)? as u64,
1654 prompt_cached_tool_outputs: row.get::<_, i64>(7)? as u64,
1655 });
1656 }
1657
1658 Ok(models)
1659 })
1660 .await
1661 }
1662
1663 async fn sessions(&self, filter: SessionMetricsFilter) -> MetricsResult<Vec<SessionMetrics>> {
1664 self.with_connection(move |connection| {
1665 let mut params_vec = Vec::new();
1666 let where_clause = build_session_where_clause(
1667 filter.start_date,
1668 filter.end_date,
1669 None,
1670 &mut params_vec,
1671 );
1672 let mut conditions = if where_clause.is_empty() {
1673 Vec::new()
1674 } else {
1675 vec![where_clause.replacen("WHERE ", "", 1)]
1676 };
1677 if let Some(model) = filter.model {
1678 conditions.push("model = ?".to_string());
1679 params_vec.push(model);
1680 }
1681
1682 let where_sql = if conditions.is_empty() {
1683 String::new()
1684 } else {
1685 format!("WHERE {}", conditions.join(" AND "))
1686 };
1687
1688 let limit = i64::from(filter.limit.unwrap_or(100).min(1_000));
1689 let sql = format!(
1690 "SELECT session_id, model, started_at, completed_at, total_rounds, prompt_tokens, completion_tokens, total_tokens, tool_call_count, prompt_cached_tool_outputs, prompt_cached_tool_tokens_saved, total_compression_events, total_tokens_saved, status, message_count FROM session_metrics {} ORDER BY started_at DESC LIMIT {}",
1691 where_sql, limit
1692 );
1693
1694 let mut stmt = connection.prepare(&sql)?;
1695 let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1696 let mut sessions = Vec::new();
1697
1698 while let Some(row) = rows.next()? {
1699 let session_id: String = row.get(0)?;
1700 let started_at = parse_timestamp(row.get::<_, String>(2)?)?;
1701 let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(3)?)?;
1702 let status_raw: String = row.get(13)?;
1703 let status = SessionStatus::from_db(&status_raw).ok_or_else(|| {
1704 MetricsError::InvalidData(format!("unknown session status: {}", status_raw))
1705 })?;
1706 let tool_breakdown = load_tool_breakdown(connection, &session_id)?;
1707
1708 sessions.push(SessionMetrics {
1709 session_id,
1710 model: row.get(1)?,
1711 started_at,
1712 completed_at,
1713 total_rounds: row.get::<_, i64>(4)? as u32,
1714 total_token_usage: TokenUsage {
1715 prompt_tokens: row.get::<_, i64>(5)? as u64,
1716 completion_tokens: row.get::<_, i64>(6)? as u64,
1717 total_tokens: row.get::<_, i64>(7)? as u64,
1718 },
1719 tool_call_count: row.get::<_, i64>(8)? as u32,
1720 prompt_cached_tool_outputs: row.get::<_, i64>(9)? as u64,
1721 prompt_cached_tool_tokens_saved: row.get::<_, i64>(10)? as u64,
1722 total_compression_events: row.get::<_, i64>(11)? as u64,
1723 total_tokens_saved: row.get::<_, i64>(12)? as u64,
1724 tool_breakdown,
1725 status,
1726 message_count: row.get::<_, i64>(14)? as u32,
1727 duration_ms: compute_duration_ms(started_at, completed_at),
1728 });
1729 }
1730
1731 Ok(sessions)
1732 })
1733 .await
1734 }
1735
1736 async fn session_detail(&self, session_id: &str) -> MetricsResult<Option<SessionDetail>> {
1737 let session_id = session_id.to_string();
1738 self.with_connection(move |connection| {
1739 let session_sql = "SELECT session_id, model, started_at, completed_at, total_rounds, prompt_tokens, completion_tokens, total_tokens, tool_call_count, prompt_cached_tool_outputs, prompt_cached_tool_tokens_saved, total_compression_events, total_tokens_saved, status, message_count FROM session_metrics WHERE session_id = ?1";
1740 let session_row = connection
1741 .query_row(session_sql, params![session_id], |row| {
1742 Ok((
1743 row.get::<_, String>(0)?,
1744 row.get::<_, String>(1)?,
1745 row.get::<_, String>(2)?,
1746 row.get::<_, Option<String>>(3)?,
1747 row.get::<_, i64>(4)?,
1748 row.get::<_, i64>(5)?,
1749 row.get::<_, i64>(6)?,
1750 row.get::<_, i64>(7)?,
1751 row.get::<_, i64>(8)?,
1752 row.get::<_, i64>(9)?,
1753 row.get::<_, i64>(10)?,
1754 row.get::<_, i64>(11)?,
1755 row.get::<_, i64>(12)?,
1756 row.get::<_, String>(13)?,
1757 row.get::<_, i64>(14)?,
1758 ))
1759 })
1760 .optional()?;
1761
1762 let Some((
1763 session_id,
1764 model,
1765 started_at_raw,
1766 completed_at_raw,
1767 total_rounds,
1768 prompt_tokens,
1769 completion_tokens,
1770 total_tokens,
1771 tool_call_count,
1772 prompt_cached_tool_outputs,
1773 prompt_cached_tool_tokens_saved,
1774 total_compression_events,
1775 total_tokens_saved,
1776 status_raw,
1777 message_count,
1778 )) = session_row
1779 else {
1780 return Ok(None);
1781 };
1782
1783 let started_at = parse_timestamp(started_at_raw)?;
1784 let completed_at = parse_optional_timestamp(completed_at_raw)?;
1785 let status = SessionStatus::from_db(&status_raw).ok_or_else(|| {
1786 MetricsError::InvalidData(format!("unknown session status: {}", status_raw))
1787 })?;
1788 let tool_breakdown = load_tool_breakdown(connection, &session_id)?;
1789
1790 let session = SessionMetrics {
1791 session_id: session_id.clone(),
1792 model,
1793 started_at,
1794 completed_at,
1795 total_rounds: total_rounds as u32,
1796 total_token_usage: TokenUsage {
1797 prompt_tokens: prompt_tokens as u64,
1798 completion_tokens: completion_tokens as u64,
1799 total_tokens: total_tokens as u64,
1800 },
1801 tool_call_count: tool_call_count as u32,
1802 prompt_cached_tool_outputs: prompt_cached_tool_outputs as u64,
1803 prompt_cached_tool_tokens_saved: prompt_cached_tool_tokens_saved as u64,
1804 total_compression_events: total_compression_events as u64,
1805 total_tokens_saved: total_tokens_saved as u64,
1806 tool_breakdown,
1807 status,
1808 message_count: message_count as u32,
1809 duration_ms: compute_duration_ms(started_at, completed_at),
1810 };
1811
1812 let rounds = load_rounds(connection, &session_id)?;
1813 Ok(Some(SessionDetail { session, rounds }))
1814 })
1815 .await
1816 }
1817
1818 async fn increment_execute_sync_mismatch(
1819 &self,
1820 reason: &str,
1821 occurred_at: DateTime<Utc>,
1822 ) -> MetricsResult<()> {
1823 let reason = reason.to_string();
1824 let mismatch_date = occurred_at.date_naive().to_string();
1825 let updated_at = format_timestamp(occurred_at);
1826
1827 self.with_connection(move |connection| {
1828 connection.execute(
1829 r#"
1830 INSERT INTO execute_sync_mismatch_metrics (reason, mismatch_date, count, updated_at)
1831 VALUES (?1, ?2, 1, ?3)
1832 ON CONFLICT(reason, mismatch_date) DO UPDATE SET
1833 count = count + 1,
1834 updated_at = excluded.updated_at
1835 "#,
1836 params![reason, mismatch_date, updated_at],
1837 )?;
1838 Ok(())
1839 })
1840 .await
1841 }
1842
1843 async fn daily_metrics(
1844 &self,
1845 days: u32,
1846 end_date: Option<NaiveDate>,
1847 ) -> MetricsResult<Vec<DailyMetrics>> {
1848 let end_date = end_date.unwrap_or_else(|| Utc::now().date_naive());
1849 let span = days.max(1) - 1;
1850 let start_date = end_date - chrono::Duration::days(i64::from(span));
1851
1852 self.with_connection(move |connection| {
1853 let mut stmt = connection.prepare(
1854 r#"
1855 SELECT
1856 date(started_at) AS date_key,
1857 COUNT(*) AS total_sessions,
1858 COALESCE(SUM(total_rounds), 0) AS total_rounds,
1859 COALESCE(SUM(prompt_tokens), 0) AS prompt_tokens,
1860 COALESCE(SUM(completion_tokens), 0) AS completion_tokens,
1861 COALESCE(SUM(total_tokens), 0) AS total_tokens,
1862 COALESCE(SUM(tool_call_count), 0) AS total_tool_calls,
1863 COALESCE(SUM(prompt_cached_tool_outputs), 0) AS prompt_cached_tool_outputs
1864 FROM session_metrics
1865 WHERE date(started_at) BETWEEN date(?1) AND date(?2)
1866 GROUP BY date_key
1867 ORDER BY date_key ASC
1868 "#,
1869 )?;
1870
1871 let mut rows = stmt.query(params![start_date.to_string(), end_date.to_string()])?;
1872 let mut result = Vec::new();
1873
1874 while let Some(row) = rows.next()? {
1875 let date = NaiveDate::parse_from_str(&row.get::<_, String>(0)?, "%Y-%m-%d")?;
1876 let model_breakdown = load_daily_model_breakdown(connection, date)?;
1877 let tool_breakdown = load_daily_tool_breakdown(connection, date)?;
1878
1879 result.push(DailyMetrics {
1880 date,
1881 total_sessions: row.get::<_, i64>(1)? as u32,
1882 total_rounds: row.get::<_, i64>(2)? as u32,
1883 total_token_usage: TokenUsage {
1884 prompt_tokens: row.get::<_, i64>(3)? as u64,
1885 completion_tokens: row.get::<_, i64>(4)? as u64,
1886 total_tokens: row.get::<_, i64>(5)? as u64,
1887 },
1888 total_tool_calls: row.get::<_, i64>(6)? as u32,
1889 prompt_cached_tool_outputs: row.get::<_, i64>(7)? as u64,
1890 model_breakdown,
1891 tool_breakdown,
1892 });
1893 }
1894
1895 Ok(result)
1896 })
1897 .await
1898 }
1899
1900 async fn prune_rounds_before(&self, cutoff: DateTime<Utc>) -> MetricsResult<u64> {
1901 self.with_connection(move |connection| {
1902 let cutoff_str = format_timestamp(cutoff);
1903 let deleted = connection.execute(
1904 "DELETE FROM round_metrics WHERE started_at < ?1",
1905 params![cutoff_str],
1906 )?;
1907
1908 let mut stmt = connection.prepare("SELECT session_id FROM session_metrics")?;
1909 let session_ids: Vec<String> = stmt
1910 .query_map([], |row| row.get(0))?
1911 .collect::<Result<Vec<String>, _>>()?;
1912 for session_id in session_ids {
1913 refresh_session_aggregates(connection, &session_id, Utc::now())?;
1914 }
1915
1916 Ok(deleted as u64)
1917 })
1918 .await
1919 }
1920
1921 async fn reconcile_stale_executions(
1922 &self,
1923 active_session_ids: &[String],
1924 awaiting_response_session_ids: &[String],
1925 ) -> MetricsResult<()> {
1926 let active_session_ids = active_session_ids.to_vec();
1927 let awaiting_response_session_ids = awaiting_response_session_ids.to_vec();
1928
1929 self.with_connection(move |connection| {
1930 let reconciled_at = Utc::now();
1931 let reconciled_at_str = format_timestamp(reconciled_at);
1932
1933 let mut stmt = connection.prepare(
1934 "SELECT session_id FROM session_metrics WHERE status = 'running'",
1935 )?;
1936 let running_session_ids: Vec<String> = stmt
1937 .query_map([], |row| row.get(0))?
1938 .collect::<Result<Vec<String>, _>>()?;
1939
1940 for session_id in running_session_ids {
1941 if active_session_ids.iter().any(|id| id == &session_id) {
1942 continue;
1943 }
1944
1945 let status = if awaiting_response_session_ids
1946 .iter()
1947 .any(|id| id == &session_id)
1948 {
1949 SessionStatus::AwaitingResponse
1950 } else {
1951 SessionStatus::Completed
1952 };
1953
1954 connection.execute(
1955 "UPDATE session_metrics SET status = ?1, completed_at = COALESCE(completed_at, ?2), updated_at = ?2 WHERE session_id = ?3",
1956 params![status.as_str(), reconciled_at_str, session_id],
1957 )?;
1958 refresh_session_aggregates(connection, &session_id, reconciled_at)?;
1959 }
1960
1961 connection.execute(
1962 "UPDATE round_metrics SET status = 'error', completed_at = COALESCE(completed_at, ?1), error = COALESCE(error, 'reconciled_stale_round') WHERE status = 'running'",
1963 params![reconciled_at_str],
1964 )?;
1965 connection.execute(
1966 "UPDATE forward_request_metrics SET status = 'error', completed_at = COALESCE(completed_at, ?1), error = COALESCE(error, 'reconciled_stale_forward'), updated_at = ?1 WHERE status = 'pending' AND completed_at IS NULL",
1967 params![reconciled_at_str],
1968 )?;
1969
1970 Ok(())
1971 })
1972 .await
1973 }
1974}
1975
1976fn open_connection(path: &Path) -> MetricsResult<Connection> {
1997 if let Some(parent) = path.parent() {
1998 std::fs::create_dir_all(parent)?;
1999 }
2000 let connection = Connection::open(path)?;
2001 connection.execute_batch(
2002 r#"
2003 PRAGMA journal_mode = WAL;
2004 PRAGMA foreign_keys = ON;
2005 PRAGMA synchronous = NORMAL;
2006 "#,
2007 )?;
2008 Ok(connection)
2009}
2010
2011fn format_timestamp(timestamp: DateTime<Utc>) -> String {
2021 timestamp.to_rfc3339()
2022}
2023
2024fn parse_timestamp(raw: String) -> MetricsResult<DateTime<Utc>> {
2034 Ok(DateTime::parse_from_rfc3339(&raw)?.with_timezone(&Utc))
2035}
2036
2037fn parse_optional_timestamp(raw: Option<String>) -> MetricsResult<Option<DateTime<Utc>>> {
2047 raw.map(parse_timestamp).transpose()
2048}
2049
2050fn compute_duration_ms(
2062 started_at: DateTime<Utc>,
2063 completed_at: Option<DateTime<Utc>>,
2064) -> Option<u64> {
2065 completed_at.and_then(|end| {
2066 end.signed_duration_since(started_at)
2067 .num_milliseconds()
2068 .try_into()
2069 .ok()
2070 })
2071}
2072
2073fn build_session_where_clause(
2090 start_date: Option<NaiveDate>,
2091 end_date: Option<NaiveDate>,
2092 required_status: Option<&str>,
2093 params_vec: &mut Vec<String>,
2094) -> String {
2095 let mut conditions = Vec::new();
2096
2097 if let Some(start) = start_date {
2098 conditions.push("date(started_at) >= date(?)".to_string());
2099 params_vec.push(start.to_string());
2100 }
2101
2102 if let Some(end) = end_date {
2103 conditions.push("date(started_at) <= date(?)".to_string());
2104 params_vec.push(end.to_string());
2105 }
2106
2107 if let Some(status) = required_status {
2108 conditions.push("status = ?".to_string());
2109 params_vec.push(status.to_string());
2110 }
2111
2112 if conditions.is_empty() {
2113 String::new()
2114 } else {
2115 format!("WHERE {}", conditions.join(" AND "))
2116 }
2117}
2118
2119fn build_forward_where_clause(
2137 start_date: Option<NaiveDate>,
2138 end_date: Option<NaiveDate>,
2139 endpoint: Option<&str>,
2140 model: Option<&str>,
2141 params_vec: &mut Vec<String>,
2142) -> String {
2143 let mut conditions = Vec::new();
2144
2145 if let Some(start) = start_date {
2146 conditions.push("date(started_at) >= date(?)".to_string());
2147 params_vec.push(start.to_string());
2148 }
2149
2150 if let Some(end) = end_date {
2151 conditions.push("date(started_at) <= date(?)".to_string());
2152 params_vec.push(end.to_string());
2153 }
2154
2155 if let Some(ep) = endpoint {
2156 conditions.push("endpoint = ?".to_string());
2157 params_vec.push(ep.to_string());
2158 }
2159
2160 if let Some(m) = model {
2161 conditions.push("model = ?".to_string());
2162 params_vec.push(m.to_string());
2163 }
2164
2165 if conditions.is_empty() {
2166 String::new()
2167 } else {
2168 format!("WHERE {}", conditions.join(" AND "))
2169 }
2170}
2171
2172fn build_execute_sync_mismatch_where_clause(
2173 start_date: Option<NaiveDate>,
2174 end_date: Option<NaiveDate>,
2175 reason: Option<&str>,
2176 params_vec: &mut Vec<String>,
2177) -> String {
2178 let mut conditions = Vec::new();
2179
2180 if let Some(start) = start_date {
2181 conditions.push("date(mismatch_date) >= date(?)".to_string());
2182 params_vec.push(start.to_string());
2183 }
2184
2185 if let Some(end) = end_date {
2186 conditions.push("date(mismatch_date) <= date(?)".to_string());
2187 params_vec.push(end.to_string());
2188 }
2189
2190 if let Some(reason) = reason {
2191 conditions.push("reason = ?".to_string());
2192 params_vec.push(reason.to_string());
2193 }
2194
2195 if conditions.is_empty() {
2196 String::new()
2197 } else {
2198 format!("WHERE {}", conditions.join(" AND "))
2199 }
2200}
2201
2202fn ensure_integer_column(
2203 connection: &Connection,
2204 table: &str,
2205 column: &str,
2206 default_value: i64,
2207) -> MetricsResult<()> {
2208 let pragma = format!("PRAGMA table_info({table})");
2209 let mut stmt = connection.prepare(&pragma)?;
2210 let mut rows = stmt.query([])?;
2211 while let Some(row) = rows.next()? {
2212 let name: String = row.get(1)?;
2213 if name == column {
2214 return Ok(());
2215 }
2216 }
2217
2218 let alter =
2219 format!("ALTER TABLE {table} ADD COLUMN {column} INTEGER NOT NULL DEFAULT {default_value}");
2220 connection.execute(&alter, [])?;
2221 Ok(())
2222}
2223
2224fn refresh_session_aggregates(
2252 connection: &Connection,
2253 session_id: &str,
2254 updated_at: DateTime<Utc>,
2255) -> MetricsResult<()> {
2256 let updated_at = format_timestamp(updated_at);
2257 connection.execute(
2258 r#"
2259 UPDATE session_metrics
2260 SET
2261 total_rounds = COALESCE((SELECT COUNT(*) FROM round_metrics WHERE session_id = ?1), 0),
2262 prompt_tokens = COALESCE((SELECT SUM(prompt_tokens) FROM round_metrics WHERE session_id = ?1), 0),
2263 completion_tokens = COALESCE((SELECT SUM(completion_tokens) FROM round_metrics WHERE session_id = ?1), 0),
2264 total_tokens = COALESCE((SELECT SUM(total_tokens) FROM round_metrics WHERE session_id = ?1), 0),
2265 prompt_cached_tool_outputs = COALESCE((SELECT SUM(prompt_cached_tool_outputs) FROM round_metrics WHERE session_id = ?1), 0),
2266 prompt_cached_tool_tokens_saved = COALESCE((SELECT SUM(prompt_cached_tool_tokens_saved) FROM round_metrics WHERE session_id = ?1), 0),
2267 total_compression_events = COALESCE((SELECT SUM(compression_count) FROM round_metrics WHERE session_id = ?1), 0),
2268 total_tokens_saved = COALESCE((SELECT SUM(tokens_saved) FROM round_metrics WHERE session_id = ?1), 0),
2269 tool_call_count = COALESCE((SELECT COUNT(*) FROM tool_call_metrics WHERE session_id = ?1), 0),
2270 updated_at = ?2
2271 WHERE session_id = ?1
2272 "#,
2273 params![session_id, updated_at],
2274 )?;
2275 Ok(())
2276}
2277
2278fn load_tool_breakdown(
2299 connection: &Connection,
2300 session_id: &str,
2301) -> MetricsResult<HashMap<String, u32>> {
2302 let mut stmt = connection.prepare(
2303 "SELECT tool_name, COUNT(*) FROM tool_call_metrics WHERE session_id = ?1 GROUP BY tool_name",
2304 )?;
2305 let mut rows = stmt.query(params![session_id])?;
2306 let mut breakdown = HashMap::new();
2307
2308 while let Some(row) = rows.next()? {
2309 let tool: String = row.get(0)?;
2310 let count: i64 = row.get(1)?;
2311 breakdown.insert(tool, count as u32);
2312 }
2313
2314 Ok(breakdown)
2315}
2316
2317fn load_rounds(connection: &Connection, session_id: &str) -> MetricsResult<Vec<RoundMetrics>> {
2338 let mut stmt = connection.prepare(
2339 "SELECT round_id, session_id, model, started_at, completed_at, status, prompt_tokens, completion_tokens, total_tokens, prompt_cached_tool_outputs, prompt_cached_tool_tokens_saved, compression_count, tokens_saved, error FROM round_metrics WHERE session_id = ?1 ORDER BY started_at ASC",
2340 )?;
2341 let mut rows = stmt.query(params![session_id])?;
2342 let mut rounds = Vec::new();
2343
2344 while let Some(row) = rows.next()? {
2345 let round_id: String = row.get(0)?;
2346 let started_at = parse_timestamp(row.get::<_, String>(3)?)?;
2347 let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(4)?)?;
2348 let status_raw: String = row.get(5)?;
2349 let status = RoundStatus::from_db(&status_raw).ok_or_else(|| {
2350 MetricsError::InvalidData(format!("unknown round status: {}", status_raw))
2351 })?;
2352
2353 rounds.push(RoundMetrics {
2354 round_id: round_id.clone(),
2355 session_id: row.get(1)?,
2356 model: row.get(2)?,
2357 started_at,
2358 completed_at,
2359 token_usage: TokenUsage {
2360 prompt_tokens: row.get::<_, i64>(6)? as u64,
2361 completion_tokens: row.get::<_, i64>(7)? as u64,
2362 total_tokens: row.get::<_, i64>(8)? as u64,
2363 },
2364 tool_calls: load_tool_calls(connection, &round_id)?,
2365 status,
2366 prompt_cached_tool_outputs: row.get::<_, i64>(9)? as u32,
2367 prompt_cached_tool_tokens_saved: row.get::<_, i64>(10)? as u32,
2368 compression_count: row.get::<_, i64>(11)? as u32,
2369 tokens_saved: row.get::<_, i64>(12)? as u32,
2370 error: row.get(13)?,
2371 duration_ms: compute_duration_ms(started_at, completed_at),
2372 });
2373 }
2374
2375 Ok(rounds)
2376}
2377
2378fn load_tool_calls(connection: &Connection, round_id: &str) -> MetricsResult<Vec<ToolCallMetrics>> {
2392 let mut stmt = connection.prepare(
2393 "SELECT tool_call_id, tool_name, started_at, completed_at, success, error FROM tool_call_metrics WHERE round_id = ?1 ORDER BY started_at ASC",
2394 )?;
2395 let mut rows = stmt.query(params![round_id])?;
2396 let mut tools = Vec::new();
2397
2398 while let Some(row) = rows.next()? {
2399 let started_at = parse_timestamp(row.get::<_, String>(2)?)?;
2400 let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(3)?)?;
2401 let success = row.get::<_, Option<i64>>(4)?.map(|value| value > 0);
2402
2403 tools.push(ToolCallMetrics {
2404 tool_call_id: row.get(0)?,
2405 tool_name: row.get(1)?,
2406 started_at,
2407 completed_at,
2408 success,
2409 error: row.get(5)?,
2410 duration_ms: compute_duration_ms(started_at, completed_at),
2411 });
2412 }
2413
2414 Ok(tools)
2415}
2416
2417fn load_daily_model_breakdown(
2438 connection: &Connection,
2439 date: NaiveDate,
2440) -> MetricsResult<HashMap<String, TokenUsage>> {
2441 let mut stmt = connection.prepare(
2442 r#"
2443 SELECT model,
2444 COALESCE(SUM(prompt_tokens), 0),
2445 COALESCE(SUM(completion_tokens), 0),
2446 COALESCE(SUM(total_tokens), 0)
2447 FROM session_metrics
2448 WHERE date(started_at) = date(?1)
2449 GROUP BY model
2450 "#,
2451 )?;
2452
2453 let mut rows = stmt.query(params![date.to_string()])?;
2454 let mut breakdown = HashMap::new();
2455
2456 while let Some(row) = rows.next()? {
2457 breakdown.insert(
2458 row.get::<_, String>(0)?,
2459 TokenUsage {
2460 prompt_tokens: row.get::<_, i64>(1)? as u64,
2461 completion_tokens: row.get::<_, i64>(2)? as u64,
2462 total_tokens: row.get::<_, i64>(3)? as u64,
2463 },
2464 );
2465 }
2466
2467 Ok(breakdown)
2468}
2469
2470fn load_daily_tool_breakdown(
2491 connection: &Connection,
2492 date: NaiveDate,
2493) -> MetricsResult<HashMap<String, u32>> {
2494 let mut stmt = connection.prepare(
2495 r#"
2496 SELECT tool_name, COUNT(*)
2497 FROM tool_call_metrics
2498 WHERE date(started_at) = date(?1)
2499 GROUP BY tool_name
2500 "#,
2501 )?;
2502
2503 let mut rows = stmt.query(params![date.to_string()])?;
2504 let mut breakdown = HashMap::new();
2505
2506 while let Some(row) = rows.next()? {
2507 breakdown.insert(row.get::<_, String>(0)?, row.get::<_, i64>(1)? as u32);
2508 }
2509
2510 Ok(breakdown)
2511}
2512
2513fn load_execute_sync_mismatch_breakdown(
2514 connection: &Connection,
2515 start_date: Option<NaiveDate>,
2516 end_date: Option<NaiveDate>,
2517) -> MetricsResult<HashMap<String, u64>> {
2518 let mut params_vec = Vec::new();
2519 let where_clause =
2520 build_execute_sync_mismatch_where_clause(start_date, end_date, None, &mut params_vec);
2521 let sql = format!(
2522 "SELECT reason, COALESCE(SUM(count), 0) FROM execute_sync_mismatch_metrics {} GROUP BY reason ORDER BY reason ASC",
2523 where_clause
2524 );
2525 let mut stmt = connection.prepare(&sql)?;
2526 let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
2527 let mut breakdown = HashMap::new();
2528
2529 while let Some(row) = rows.next()? {
2530 breakdown.insert(row.get::<_, String>(0)?, row.get::<_, i64>(1)? as u64);
2531 }
2532
2533 Ok(breakdown)
2534}
2535
2536#[cfg(test)]
2537mod tests {
2538 use std::collections::HashMap;
2539
2540 use chrono::{NaiveDate, TimeZone, Utc};
2541 use tempfile::tempdir;
2542
2543 use super::{MetricsStorage, SqliteMetricsStorage, ToolCallCompletion};
2544 use crate::metrics::types::{
2545 ForwardMetricsFilter, ForwardStatus, MetricsDateFilter, RoundStatus, SessionMetricsFilter,
2546 SessionStatus, TokenUsage,
2547 };
2548
2549 #[tokio::test]
2550 async fn storage_records_session_and_round_data_for_summary_queries() {
2551 let dir = tempdir().expect("temp dir");
2552 let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2553
2554 storage.init().await.expect("init storage");
2555
2556 let started_at = Utc
2557 .with_ymd_and_hms(2026, 2, 10, 10, 0, 0)
2558 .single()
2559 .expect("valid datetime");
2560 storage
2561 .upsert_session_start("session-a", "gpt-4", started_at)
2562 .await
2563 .expect("session started");
2564 storage
2565 .update_session_message_count("session-a", 7, started_at)
2566 .await
2567 .expect("message count update");
2568
2569 storage
2570 .insert_round_start("round-a", "session-a", "gpt-4", started_at)
2571 .await
2572 .expect("round start");
2573 storage
2574 .insert_tool_start("tool-1", "round-a", "session-a", "read_file", started_at)
2575 .await
2576 .expect("tool start");
2577 storage
2578 .complete_tool_call(
2579 "tool-1",
2580 ToolCallCompletion {
2581 completed_at: started_at,
2582 success: true,
2583 error: None,
2584 },
2585 )
2586 .await
2587 .expect("tool completion");
2588 storage
2589 .complete_round(
2590 "round-a",
2591 started_at,
2592 RoundStatus::Success,
2593 TokenUsage {
2594 prompt_tokens: 10,
2595 completion_tokens: 15,
2596 total_tokens: 25,
2597 },
2598 3,
2599 0,
2600 None,
2601 )
2602 .await
2603 .expect("round completion");
2604 storage
2605 .complete_session("session-a", SessionStatus::Completed, started_at)
2606 .await
2607 .expect("session completion");
2608
2609 let summary = storage
2610 .summary(MetricsDateFilter::default())
2611 .await
2612 .expect("summary query");
2613
2614 assert_eq!(summary.total_sessions, 1);
2615 assert_eq!(summary.total_tokens.total_tokens, 25);
2616 assert_eq!(summary.total_tool_calls, 1);
2617 assert_eq!(summary.prompt_cached_tool_outputs, 3);
2618
2619 let detail = storage
2620 .session_detail("session-a")
2621 .await
2622 .expect("session detail query")
2623 .expect("session detail should exist");
2624 assert_eq!(detail.session.prompt_cached_tool_outputs, 3);
2625 assert_eq!(detail.rounds.len(), 1);
2626 assert_eq!(detail.rounds[0].prompt_cached_tool_outputs, 3);
2627 }
2628
2629 #[tokio::test]
2630 async fn storage_filters_sessions_and_returns_tool_breakdown() {
2631 let dir = tempdir().expect("temp dir");
2632 let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2633
2634 storage.init().await.expect("init storage");
2635
2636 let day_a = Utc
2637 .with_ymd_and_hms(2026, 2, 1, 9, 0, 0)
2638 .single()
2639 .expect("valid datetime");
2640 let day_b = Utc
2641 .with_ymd_and_hms(2026, 2, 5, 9, 0, 0)
2642 .single()
2643 .expect("valid datetime");
2644
2645 storage
2646 .upsert_session_start("s1", "gpt-4", day_a)
2647 .await
2648 .expect("session start");
2649 storage
2650 .insert_round_start("r1", "s1", "gpt-4", day_a)
2651 .await
2652 .expect("round start");
2653 storage
2654 .insert_tool_start("t1", "r1", "s1", "read_file", day_a)
2655 .await
2656 .expect("tool start");
2657 storage
2658 .complete_tool_call(
2659 "t1",
2660 ToolCallCompletion {
2661 completed_at: day_a,
2662 success: true,
2663 error: None,
2664 },
2665 )
2666 .await
2667 .expect("tool complete");
2668 storage
2669 .complete_round(
2670 "r1",
2671 day_a,
2672 RoundStatus::Success,
2673 TokenUsage {
2674 prompt_tokens: 1,
2675 completion_tokens: 1,
2676 total_tokens: 2,
2677 },
2678 0,
2679 0,
2680 None,
2681 )
2682 .await
2683 .expect("round complete");
2684
2685 storage
2686 .upsert_session_start("s2", "claude-3", day_b)
2687 .await
2688 .expect("session start");
2689
2690 let sessions = storage
2691 .sessions(SessionMetricsFilter {
2692 start_date: Some(NaiveDate::from_ymd_opt(2026, 2, 1).expect("valid date")),
2693 end_date: Some(NaiveDate::from_ymd_opt(2026, 2, 3).expect("valid date")),
2694 model: Some("gpt-4".to_string()),
2695 limit: Some(100),
2696 })
2697 .await
2698 .expect("sessions query");
2699
2700 assert_eq!(sessions.len(), 1);
2701 assert_eq!(sessions[0].session_id, "s1");
2702 assert_eq!(sessions[0].tool_breakdown.get("read_file"), Some(&1));
2703 }
2704
2705 #[tokio::test]
2706 async fn storage_produces_daily_rollups_with_model_and_tool_breakdowns() {
2707 let dir = tempdir().expect("temp dir");
2708 let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2709
2710 storage.init().await.expect("init storage");
2711
2712 let now = Utc
2713 .with_ymd_and_hms(2026, 2, 10, 12, 0, 0)
2714 .single()
2715 .expect("valid datetime");
2716 storage
2717 .upsert_session_start("daily-1", "gpt-4", now)
2718 .await
2719 .expect("session start");
2720 storage
2721 .insert_round_start("daily-r1", "daily-1", "gpt-4", now)
2722 .await
2723 .expect("round start");
2724 storage
2725 .insert_tool_start("daily-t1", "daily-r1", "daily-1", "write_file", now)
2726 .await
2727 .expect("tool start");
2728 storage
2729 .complete_tool_call(
2730 "daily-t1",
2731 ToolCallCompletion {
2732 completed_at: now,
2733 success: true,
2734 error: None,
2735 },
2736 )
2737 .await
2738 .expect("tool complete");
2739 storage
2740 .complete_round(
2741 "daily-r1",
2742 now,
2743 RoundStatus::Success,
2744 TokenUsage {
2745 prompt_tokens: 3,
2746 completion_tokens: 7,
2747 total_tokens: 10,
2748 },
2749 0,
2750 0,
2751 None,
2752 )
2753 .await
2754 .expect("round completion");
2755
2756 let daily = storage
2757 .daily_metrics(
2758 7,
2759 Some(NaiveDate::from_ymd_opt(2026, 2, 10).expect("valid date")),
2760 )
2761 .await
2762 .expect("daily metrics");
2763
2764 assert_eq!(daily.len(), 1);
2765 let row = &daily[0];
2766 assert_eq!(row.total_sessions, 1);
2767 assert_eq!(row.total_rounds, 1);
2768 assert_eq!(row.total_tool_calls, 1);
2769 assert_eq!(
2770 row.model_breakdown
2771 .get("gpt-4")
2772 .map(|usage| usage.total_tokens),
2773 Some(10)
2774 );
2775 assert_eq!(
2776 row.tool_breakdown,
2777 HashMap::from([(String::from("write_file"), 1)])
2778 );
2779 }
2780
2781 #[tokio::test]
2782 async fn storage_reconciles_stale_running_sessions_rounds_and_forwards() {
2783 let dir = tempdir().expect("temp dir");
2784 let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2785
2786 storage.init().await.expect("init storage");
2787
2788 let now = Utc
2789 .with_ymd_and_hms(2026, 2, 12, 9, 0, 0)
2790 .single()
2791 .expect("valid datetime");
2792
2793 storage
2794 .upsert_session_start("stale-await", "gpt-4", now)
2795 .await
2796 .expect("await session start");
2797 storage
2798 .insert_round_start("round-await", "stale-await", "gpt-4", now)
2799 .await
2800 .expect("await round start");
2801
2802 storage
2803 .upsert_session_start("stale-complete", "gpt-4", now)
2804 .await
2805 .expect("complete session start");
2806 storage
2807 .insert_round_start("round-complete", "stale-complete", "gpt-4", now)
2808 .await
2809 .expect("complete round start");
2810
2811 storage
2812 .insert_forward_start(
2813 "forward-pending",
2814 "/v1/chat/completions",
2815 "gpt-4",
2816 false,
2817 now,
2818 )
2819 .await
2820 .expect("forward start");
2821
2822 storage
2823 .reconcile_stale_executions(&[], &[String::from("stale-await")])
2824 .await
2825 .expect("reconcile stale executions");
2826
2827 let sessions = storage
2828 .sessions(SessionMetricsFilter::default())
2829 .await
2830 .expect("sessions query");
2831 let stale_await = sessions
2832 .iter()
2833 .find(|session| session.session_id == "stale-await")
2834 .expect("stale-await should exist");
2835 let stale_complete = sessions
2836 .iter()
2837 .find(|session| session.session_id == "stale-complete")
2838 .expect("stale-complete should exist");
2839 assert_eq!(stale_await.status, SessionStatus::AwaitingResponse);
2840 assert_eq!(stale_complete.status, SessionStatus::Completed);
2841 assert!(stale_await.completed_at.is_some());
2842 assert!(stale_complete.completed_at.is_some());
2843
2844 let await_detail = storage
2845 .session_detail("stale-await")
2846 .await
2847 .expect("await detail query")
2848 .expect("await detail exists");
2849 let complete_detail = storage
2850 .session_detail("stale-complete")
2851 .await
2852 .expect("complete detail query")
2853 .expect("complete detail exists");
2854 assert_eq!(await_detail.rounds[0].status, RoundStatus::Error);
2855 assert_eq!(complete_detail.rounds[0].status, RoundStatus::Error);
2856 assert_eq!(
2857 await_detail.rounds[0].error.as_deref(),
2858 Some("reconciled_stale_round")
2859 );
2860 assert_eq!(
2861 complete_detail.rounds[0].error.as_deref(),
2862 Some("reconciled_stale_round")
2863 );
2864
2865 let forward_requests = storage
2866 .forward_requests(ForwardMetricsFilter::default())
2867 .await
2868 .expect("forward requests query");
2869 assert_eq!(forward_requests.len(), 1);
2870 assert_eq!(forward_requests[0].status, Some(ForwardStatus::Error));
2871 assert_eq!(
2872 forward_requests[0].error.as_deref(),
2873 Some("reconciled_stale_forward")
2874 );
2875
2876 let forward_summary = storage
2877 .forward_summary(ForwardMetricsFilter::default())
2878 .await
2879 .expect("forward summary query");
2880 assert_eq!(forward_summary.total_requests, 1);
2881 assert_eq!(forward_summary.successful_requests, 0);
2882 assert_eq!(forward_summary.failed_requests, 1);
2883
2884 let summary = storage
2885 .summary(MetricsDateFilter::default())
2886 .await
2887 .expect("summary query");
2888 assert_eq!(summary.total_sessions, 2);
2889 assert_eq!(summary.active_sessions, 0);
2890 assert_eq!(summary.awaiting_response_sessions, 1);
2891 assert_eq!(summary.completed_sessions, 1);
2892 assert_eq!(summary.error_sessions, 0);
2893 assert_eq!(summary.cancelled_sessions, 0);
2894 }
2895
2896 #[tokio::test]
2897 async fn storage_summarizes_execute_sync_mismatches_by_reason() {
2898 let dir = tempdir().expect("temp dir");
2899 let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2900
2901 storage.init().await.expect("init storage");
2902
2903 let day_a = Utc
2904 .with_ymd_and_hms(2026, 2, 10, 10, 0, 0)
2905 .single()
2906 .expect("valid datetime");
2907 let day_b = Utc
2908 .with_ymd_and_hms(2026, 2, 11, 10, 0, 0)
2909 .single()
2910 .expect("valid datetime");
2911
2912 storage
2913 .increment_execute_sync_mismatch("message_count", day_a)
2914 .await
2915 .expect("message_count mismatch one");
2916 storage
2917 .increment_execute_sync_mismatch("message_count", day_a)
2918 .await
2919 .expect("message_count mismatch two");
2920 storage
2921 .increment_execute_sync_mismatch("pending_question", day_a)
2922 .await
2923 .expect("pending question mismatch");
2924 storage
2925 .increment_execute_sync_mismatch("last_message_id", day_b)
2926 .await
2927 .expect("last_message_id mismatch");
2928
2929 let day_a_summary = storage
2930 .summary(MetricsDateFilter {
2931 start_date: Some(NaiveDate::from_ymd_opt(2026, 2, 10).expect("valid date")),
2932 end_date: Some(NaiveDate::from_ymd_opt(2026, 2, 10).expect("valid date")),
2933 })
2934 .await
2935 .expect("day a summary");
2936
2937 assert_eq!(day_a_summary.total_sync_mismatches, 3);
2938 assert_eq!(
2939 day_a_summary.sync_mismatch_breakdown,
2940 HashMap::from([
2941 (String::from("message_count"), 2_u64),
2942 (String::from("pending_question"), 1_u64),
2943 ])
2944 );
2945
2946 let full_summary = storage
2947 .summary(MetricsDateFilter::default())
2948 .await
2949 .expect("full summary");
2950 assert_eq!(full_summary.total_sync_mismatches, 4);
2951 assert_eq!(
2952 full_summary.sync_mismatch_breakdown.get("last_message_id"),
2953 Some(&1_u64)
2954 );
2955 }
2956}