1use std::collections::BTreeMap;
7
8use frankensqlite::Connection;
9use frankensqlite::Row;
10use frankensqlite::compat::{ConnectionExt, ParamValue, RowExt};
11
12use super::bucketing;
13use super::types::*;
14
15pub fn table_exists(conn: &Connection, name: &str) -> bool {
21 if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
23 return false;
24 }
25 let rows =
26 match conn.query_map_collect(&format!("PRAGMA table_info({})", name), &[], |row: &Row| {
27 row.get_typed::<String>(1)
28 }) {
29 Ok(rows) => rows,
30 Err(_) => return false,
31 };
32 !rows.is_empty()
33}
34
35fn table_has_column(conn: &Connection, table: &str, column: &str) -> bool {
36 if !table.chars().all(|c| c.is_alphanumeric() || c == '_')
38 || !column.chars().all(|c| c.is_alphanumeric() || c == '_')
39 {
40 return false;
41 }
42 let rows =
43 match conn.query_map_collect(&format!("PRAGMA table_info({table})"), &[], |row: &Row| {
44 row.get_typed::<String>(1)
45 }) {
46 Ok(rows) => rows,
47 Err(_) => return false,
48 };
49
50 rows.iter().any(|name| name == column)
51}
52
53fn table_has_plan_token_rollups(conn: &Connection, table: &str) -> bool {
54 table_has_column(conn, table, "plan_content_tokens_est_total")
55 && table_has_column(conn, table, "plan_api_tokens_total")
56}
57
58fn normalize_epoch_millis(ts: i64) -> i64 {
59 if (0..100_000_000_000).contains(&ts) {
61 ts.saturating_mul(1000)
62 } else {
63 ts
64 }
65}
66fn normalized_epoch_millis_sql(expr: &str) -> String {
67 format!(
68 "CASE WHEN ({expr}) >= 0 AND ({expr}) < 100000000000 THEN ({expr}) * 1000 ELSE ({expr}) END"
69 )
70}
71
72fn is_recently_updated(last_updated: Option<i64>, now_ms: i64, threshold_ms: i64) -> bool {
73 last_updated.is_some_and(|ts| (now_ms - normalize_epoch_millis(ts)).abs() < threshold_ms)
74}
75
76fn normalized_analytics_source_id_value(source_id: &str) -> String {
77 let trimmed = source_id.trim();
78 if trimmed.is_empty()
79 || trimmed.eq_ignore_ascii_case(crate::sources::provenance::LOCAL_SOURCE_ID)
80 {
81 crate::sources::provenance::LOCAL_SOURCE_ID.to_string()
82 } else {
83 trimmed.to_string()
84 }
85}
86
87fn normalized_analytics_source_identity_value(source_id: &str, origin_host: &str) -> String {
88 let trimmed_source_id = source_id.trim();
89 if trimmed_source_id.is_empty() {
90 let trimmed_origin_host = origin_host.trim();
91 if trimmed_origin_host.is_empty() {
92 crate::sources::provenance::LOCAL_SOURCE_ID.to_string()
93 } else {
94 trimmed_origin_host.to_string()
95 }
96 } else if trimmed_source_id.eq_ignore_ascii_case(crate::sources::provenance::LOCAL_SOURCE_ID) {
97 crate::sources::provenance::LOCAL_SOURCE_ID.to_string()
98 } else {
99 trimmed_source_id.to_string()
100 }
101}
102
103fn breakdown_row_with_value(key: String, bucket: UsageBucket, value: i64) -> BreakdownRow {
104 BreakdownRow {
105 message_count: bucket.message_count,
106 key,
107 value,
108 bucket,
109 }
110}
111
112fn analytics_query_error(context: &str, err: impl std::fmt::Display) -> AnalyticsError {
113 AnalyticsError::Db(format!("{context}: {err}"))
114}
115
116fn normalized_analytics_source_id_sql_expr(column: &str) -> String {
117 format!(
118 "CASE WHEN TRIM(COALESCE({column}, '')) = '' THEN '{local}' WHEN LOWER(TRIM(COALESCE({column}, ''))) = '{local}' THEN '{local}' ELSE TRIM(COALESCE({column}, '')) END",
119 local = crate::sources::provenance::LOCAL_SOURCE_ID,
120 )
121}
122
123fn normalized_analytics_source_identity_sql_expr(
124 source_id_column: &str,
125 origin_host_column: &str,
126) -> String {
127 format!(
128 "CASE WHEN TRIM(COALESCE({source_id_column}, '')) = '' THEN CASE WHEN TRIM(COALESCE({origin_host_column}, '')) = '' THEN '{local}' ELSE TRIM(COALESCE({origin_host_column}, '')) END WHEN LOWER(TRIM(COALESCE({source_id_column}, ''))) = '{local}' THEN '{local}' ELSE TRIM(COALESCE({source_id_column}, '')) END",
129 local = crate::sources::provenance::LOCAL_SOURCE_ID,
130 )
131}
132
133fn normalized_analytics_source_id_with_fallback_sql_expr(
134 primary_source_id_column: &str,
135 fallback_source_id_column: &str,
136) -> String {
137 let fallback_sql = normalized_analytics_source_id_sql_expr(fallback_source_id_column);
138 format!(
139 "CASE WHEN TRIM(COALESCE({primary_source_id_column}, '')) = '' THEN {fallback_sql} WHEN LOWER(TRIM(COALESCE({primary_source_id_column}, ''))) = '{local}' THEN '{local}' ELSE TRIM(COALESCE({primary_source_id_column}, '')) END",
140 local = crate::sources::provenance::LOCAL_SOURCE_ID,
141 )
142}
143
144fn normalized_analytics_source_identity_with_fallback_sql_expr(
145 primary_source_id_column: &str,
146 fallback_source_id_column: &str,
147 origin_host_column: &str,
148) -> String {
149 let fallback_sql = normalized_analytics_source_identity_sql_expr(
150 fallback_source_id_column,
151 origin_host_column,
152 );
153 format!(
154 "CASE WHEN TRIM(COALESCE({primary_source_id_column}, '')) = '' THEN {fallback_sql} WHEN LOWER(TRIM(COALESCE({primary_source_id_column}, ''))) = '{local}' THEN '{local}' ELSE TRIM(COALESCE({primary_source_id_column}, '')) END",
155 local = crate::sources::provenance::LOCAL_SOURCE_ID,
156 )
157}
158
159fn normalized_analytics_agent_value(agent_slug: &str) -> String {
160 let trimmed = agent_slug.trim();
161 if trimmed.is_empty() {
162 "unknown".to_string()
163 } else {
164 trimmed.to_string()
165 }
166}
167
168fn normalized_analytics_agent_sql_expr(column: &str) -> String {
169 format!(
170 "CASE WHEN TRIM(COALESCE({column}, '')) = '' THEN 'unknown' ELSE TRIM(COALESCE({column}, '')) END"
171 )
172}
173
174fn sql_string_literal(value: &str) -> String {
175 format!("'{}'", value.replace('\'', "''"))
176}
177
178fn canonical_message_metrics_from_sql(conn: &Connection) -> Option<String> {
179 if !table_exists(conn, "message_metrics")
180 || !table_has_column(conn, "message_metrics", "message_id")
181 {
182 return None;
183 }
184
185 let mut select_parts = vec!["message_id".to_string()];
186 for column in [
187 "created_at_ms",
188 "hour_id",
189 "day_id",
190 "agent_slug",
191 "workspace_id",
192 "source_id",
193 "role",
194 "content_chars",
195 "content_tokens_est",
196 "api_input_tokens",
197 "api_output_tokens",
198 "api_cache_read_tokens",
199 "api_cache_creation_tokens",
200 "api_thinking_tokens",
201 "tool_call_count",
202 "has_plan",
203 ] {
204 if table_has_column(conn, "message_metrics", column) {
205 select_parts.push(format!("MAX({column}) AS {column}"));
206 }
207 }
208 if table_has_column(conn, "message_metrics", "api_data_source") {
209 select_parts.push(
210 "CASE
211 WHEN MAX(CASE WHEN LOWER(TRIM(COALESCE(api_data_source, ''))) = 'api' THEN 1 ELSE 0 END) != 0 THEN 'api'
212 WHEN MAX(CASE WHEN LOWER(TRIM(COALESCE(api_data_source, ''))) = 'estimated' THEN 1 ELSE 0 END) != 0 THEN 'estimated'
213 ELSE NULL
214 END AS api_data_source"
215 .to_string(),
216 );
217 }
218
219 Some(format!(
220 "(SELECT {} FROM message_metrics GROUP BY message_id) mm",
221 select_parts.join(", ")
222 ))
223}
224
225fn analytics_source_filter_matches_key(filter: &SourceFilter, key: &str) -> bool {
226 let normalized_key = normalized_analytics_source_id_value(key);
227 match filter {
228 SourceFilter::All => true,
229 SourceFilter::Local => normalized_key == crate::sources::provenance::LOCAL_SOURCE_ID,
230 SourceFilter::Remote => normalized_key != crate::sources::provenance::LOCAL_SOURCE_ID,
231 SourceFilter::Specific(source_id) => {
232 normalized_key == normalized_analytics_source_id_value(source_id)
233 }
234 }
235}
236
237fn push_source_filter_clause(
238 parts: &mut Vec<String>,
239 _params: &mut Vec<ParamValue>,
240 filter: &SourceFilter,
241 normalized_source_sql: &str,
242) {
243 match filter {
244 SourceFilter::All => {}
245 SourceFilter::Local => {
246 parts.push(format!(
247 "{normalized_source_sql} = {}",
248 sql_string_literal(crate::sources::provenance::LOCAL_SOURCE_ID)
249 ));
250 }
251 SourceFilter::Remote => {
252 parts.push(format!(
253 "{normalized_source_sql} != {}",
254 sql_string_literal(crate::sources::provenance::LOCAL_SOURCE_ID)
255 ));
256 }
257 SourceFilter::Specific(source_id) => {
258 let normalized_source_id = normalized_analytics_source_id_value(source_id);
259 parts.push(format!(
260 "{normalized_source_sql} = {}",
261 sql_string_literal(&normalized_source_id)
262 ));
263 }
264 }
265}
266
267#[derive(Debug, Default)]
269struct RollupStats {
270 row_count: i64,
271 min_day: Option<i64>,
272 max_day: Option<i64>,
273 last_updated: Option<i64>,
274}
275
276fn rollup_stats_from_summary_row(row: &Row) -> RollupStats {
277 RollupStats {
278 row_count: row.get_typed::<i64>(0).unwrap_or(0),
279 min_day: row.get_typed::<Option<i64>>(1).unwrap_or(None),
280 max_day: row.get_typed::<Option<i64>>(2).unwrap_or(None),
281 last_updated: row.get_typed::<Option<i64>>(3).unwrap_or(None),
282 }
283}
284
285#[derive(Clone, Copy)]
287enum AnalyticsTimeColumn<'a> {
288 Day(&'a str),
289 Hour(&'a str),
290 TimestampMs(&'a str),
291}
292
293fn normalized_analytics_model_family_sql_expr(column: &str) -> String {
294 format!(
295 "CASE WHEN TRIM(COALESCE({column}, '')) = '' THEN 'unknown' ELSE TRIM(COALESCE({column}, '')) END"
296 )
297}
298
299fn build_where_parts_for_columns<'a>(
300 filter: &'a AnalyticsFilter,
301 agent_column_sql: Option<String>,
302 source_column_sql: String,
303 workspace_column: Option<&'a str>,
304) -> (Vec<String>, Vec<ParamValue>) {
305 let mut parts: Vec<String> = Vec::new();
306 let mut params: Vec<ParamValue> = Vec::new();
307
308 if !filter.agents.is_empty() {
309 if let Some(normalized_agent_sql) = agent_column_sql.as_deref() {
310 if filter.agents.len() == 1 {
311 parts.push(format!(
312 "{normalized_agent_sql} = {}",
313 sql_string_literal(&normalized_analytics_agent_value(
314 filter.agents[0].as_str()
315 ))
316 ));
317 } else {
318 let agent_literals: Vec<String> = filter
319 .agents
320 .iter()
321 .map(|agent| {
322 sql_string_literal(&normalized_analytics_agent_value(agent.as_str()))
323 })
324 .collect();
325 parts.push(format!(
326 "{normalized_agent_sql} IN ({})",
327 agent_literals.join(", ")
328 ));
329 }
330 } else {
331 parts.push("1 = 0".to_string());
332 return (parts, params);
333 }
334 }
335
336 push_source_filter_clause(&mut parts, &mut params, &filter.source, &source_column_sql);
337
338 if let Some(workspace_column) = workspace_column
339 && !filter.workspace_ids.is_empty()
340 {
341 if filter.workspace_ids.len() == 1 {
342 parts.push(format!("{workspace_column} = {}", filter.workspace_ids[0]));
343 } else {
344 let workspace_literals: Vec<String> = filter
345 .workspace_ids
346 .iter()
347 .map(|workspace_id| workspace_id.to_string())
348 .collect();
349 parts.push(format!(
350 "{workspace_column} IN ({})",
351 workspace_literals.join(", ")
352 ));
353 }
354 }
355
356 (parts, params)
357}
358
359fn build_filtered_where_sql<'a>(
360 filter: &'a AnalyticsFilter,
361 workspace_column: Option<&'a str>,
362 agent_column_sql: Option<String>,
363 source_column_sql: String,
364 time_column: Option<AnalyticsTimeColumn<'a>>,
365) -> (String, Vec<ParamValue>) {
366 let (mut parts, params) = build_where_parts_for_columns(
367 filter,
368 agent_column_sql,
369 source_column_sql,
370 workspace_column,
371 );
372
373 match time_column {
374 Some(AnalyticsTimeColumn::Day(column)) => {
375 let (day_min, day_max) = bucketing::resolve_day_range(filter);
376 if let Some(min) = day_min {
377 parts.push(format!("{column} >= {min}"));
378 }
379 if let Some(max) = day_max {
380 parts.push(format!("{column} <= {max}"));
381 }
382 }
383 Some(AnalyticsTimeColumn::Hour(column)) => {
384 let (hour_min, hour_max) = bucketing::resolve_hour_range(filter);
385 if let Some(min) = hour_min {
386 parts.push(format!("{column} >= {min}"));
387 }
388 if let Some(max) = hour_max {
389 parts.push(format!("{column} <= {max}"));
390 }
391 }
392 Some(AnalyticsTimeColumn::TimestampMs(column)) => {
393 let normalized_column = normalized_epoch_millis_sql(column);
394 if let Some(min) = filter.since_ms {
395 parts.push(format!("{normalized_column} >= {min}"));
396 }
397 if let Some(max) = filter.until_ms {
398 parts.push(format!("{normalized_column} <= {max}"));
399 }
400 }
401 None => {}
402 }
403
404 let where_sql = if parts.is_empty() {
405 String::new()
406 } else {
407 format!(" WHERE {}", parts.join(" AND "))
408 };
409
410 (where_sql, params)
411}
412
413pub fn build_where_parts<'a>(
423 filter: &'a AnalyticsFilter,
424 workspace_column: Option<&'a str>,
425) -> (Vec<String>, Vec<ParamValue>) {
426 build_where_parts_for_columns(
427 filter,
428 Some(normalized_analytics_agent_sql_expr("agent_slug")),
429 normalized_analytics_source_id_sql_expr("source_id"),
430 workspace_column,
431 )
432}
433fn message_metrics_time_sql(conn: &Connection) -> Option<String> {
434 let has_messages = table_exists(conn, "messages");
435 let has_conversations = table_exists(conn, "conversations");
436 let joins_available = has_messages && has_conversations;
437 let has_message_created_at =
438 joins_available && table_has_column(conn, "messages", "created_at");
439 let has_conversation_started_at =
440 joins_available && table_has_column(conn, "conversations", "started_at");
441 let has_message_metrics_created_at = table_has_column(conn, "message_metrics", "created_at_ms");
442
443 let mut timestamp_terms: Vec<&str> = Vec::new();
444 if has_message_created_at {
445 timestamp_terms.push("m.created_at");
446 }
447 if has_message_metrics_created_at {
448 timestamp_terms.push("mm.created_at_ms");
449 }
450 if has_conversation_started_at {
451 timestamp_terms.push("c.started_at");
452 }
453
454 if timestamp_terms.is_empty() {
455 None
456 } else {
457 Some(format!("COALESCE({}, 0)", timestamp_terms.join(", ")))
458 }
459}
460
461fn message_metrics_from_sql_and_source_sql(conn: &Connection) -> (String, String) {
462 let message_metrics_sql = canonical_message_metrics_from_sql(conn)
463 .unwrap_or_else(|| "message_metrics mm".to_string());
464 if table_exists(conn, "messages") && table_exists(conn, "conversations") {
465 let source_sql = if table_has_column(conn, "conversations", "origin_host") {
466 normalized_analytics_source_identity_with_fallback_sql_expr(
467 "mm.source_id",
468 "c.source_id",
469 "c.origin_host",
470 )
471 } else {
472 normalized_analytics_source_id_with_fallback_sql_expr("mm.source_id", "c.source_id")
473 };
474 (
475 format!(
476 "{message_metrics_sql} JOIN messages m ON m.id = mm.message_id JOIN conversations c ON c.id = m.conversation_id"
477 ),
478 source_sql,
479 )
480 } else {
481 (
482 message_metrics_sql,
483 normalized_analytics_source_id_sql_expr("mm.source_id"),
484 )
485 }
486}
487
488fn token_usage_from_sql_agent_and_source_sql(
489 conn: &Connection,
490) -> (String, Option<String>, String) {
491 let has_agent_id = table_has_column(conn, "token_usage", "agent_id");
492 let has_agents = table_exists(conn, "agents") && has_agent_id;
493 let has_conversation_id = table_has_column(conn, "token_usage", "conversation_id");
494 let has_conversations = table_exists(conn, "conversations") && has_conversation_id;
495 let has_message_id = table_has_column(conn, "token_usage", "message_id");
496
497 let mut from_sql = if has_message_id {
498 let mut select_items = vec!["message_id".to_string()];
499 for column in [
500 "conversation_id",
501 "agent_id",
502 "workspace_id",
503 "source_id",
504 "timestamp_ms",
505 "day_id",
506 "model_name",
507 "model_family",
508 "total_tokens",
509 "input_tokens",
510 "output_tokens",
511 "cache_read_tokens",
512 "cache_creation_tokens",
513 "thinking_tokens",
514 "estimated_cost_usd",
515 "role",
516 "content_chars",
517 "tool_call_count",
518 ] {
519 if table_has_column(conn, "token_usage", column) {
520 select_items.push(format!("MAX({column}) AS {column}"));
521 }
522 }
523 if table_has_column(conn, "token_usage", "data_source") {
524 select_items.push(
525 "CASE
526 WHEN MAX(CASE WHEN LOWER(TRIM(COALESCE(data_source, ''))) = 'api' THEN 1 ELSE 0 END) != 0 THEN 'api'
527 WHEN MAX(CASE WHEN LOWER(TRIM(COALESCE(data_source, ''))) = 'estimated' THEN 1 ELSE 0 END) != 0 THEN 'estimated'
528 ELSE NULL
529 END AS data_source"
530 .to_string(),
531 );
532 }
533 format!(
534 "(SELECT {} FROM token_usage GROUP BY message_id) tu",
535 select_items.join(", ")
536 )
537 } else {
538 "token_usage tu".to_string()
539 };
540 if has_agents {
541 from_sql.push_str(" LEFT JOIN agents a ON a.id = tu.agent_id");
542 }
543
544 let source_sql = if has_conversations {
545 from_sql.push_str(" LEFT JOIN conversations c ON c.id = tu.conversation_id");
546 if table_has_column(conn, "conversations", "origin_host") {
547 normalized_analytics_source_identity_with_fallback_sql_expr(
548 "tu.source_id",
549 "c.source_id",
550 "c.origin_host",
551 )
552 } else {
553 normalized_analytics_source_id_with_fallback_sql_expr("tu.source_id", "c.source_id")
554 }
555 } else {
556 normalized_analytics_source_id_sql_expr("tu.source_id")
557 };
558
559 (
560 from_sql,
561 has_agents.then(|| normalized_analytics_agent_sql_expr("a.slug")),
562 source_sql,
563 )
564}
565
566fn token_usage_agent_sql_or_unknown(agent_sql: Option<String>) -> String {
567 agent_sql.unwrap_or_else(|| "'unknown'".to_string())
568}
569
570fn token_usage_time_sql(conn: &Connection) -> Option<String> {
571 let has_timestamp_ms = table_has_column(conn, "token_usage", "timestamp_ms");
572 let has_conversations = table_exists(conn, "conversations");
573 let has_conversation_join =
574 has_conversations && table_has_column(conn, "token_usage", "conversation_id");
575 let has_conversation_started_at =
576 has_conversation_join && table_has_column(conn, "conversations", "started_at");
577
578 let mut timestamp_terms: Vec<&str> = Vec::new();
579 if has_timestamp_ms {
580 timestamp_terms.push("tu.timestamp_ms");
581 }
582 if has_conversation_started_at {
583 timestamp_terms.push("c.started_at");
584 }
585
586 if timestamp_terms.is_empty() {
587 None
588 } else {
589 Some(format!("COALESCE({}, 0)", timestamp_terms.join(", ")))
590 }
591}
592
593#[allow(clippy::too_many_arguments)]
594fn query_table_stats_from_source<'a>(
595 conn: &Connection,
596 required_table: &str,
597 from_sql: &str,
598 bucket_col: &str,
599 updated_col: Option<&str>,
600 filter: &'a AnalyticsFilter,
601 workspace_column: Option<&'a str>,
602 agent_column_sql: Option<String>,
603 source_column_sql: String,
604 time_column: Option<AnalyticsTimeColumn<'a>>,
605) -> RollupStats {
606 if !table_exists(conn, required_table) {
607 return RollupStats::default();
608 }
609
610 let (where_sql, params) = build_filtered_where_sql(
611 filter,
612 workspace_column,
613 agent_column_sql,
614 source_column_sql,
615 time_column,
616 );
617 let updated_expr = updated_col.map_or_else(|| "NULL".to_string(), |uc| format!("MAX({uc})"));
618 let sql = format!(
619 "SELECT COUNT(*), MIN({bucket_col}), MAX({bucket_col}), {updated_expr} FROM {from_sql}{where_sql}"
620 );
621
622 conn.query_row_map(&sql, ¶ms, |row: &Row| {
623 Ok(rollup_stats_from_summary_row(row))
624 })
625 .unwrap_or_default()
626}
627
628fn query_scalar_i64(conn: &Connection, sql: &str, params: &[ParamValue]) -> i64 {
629 conn.query_row_map(sql, params, |row: &Row| row.get_typed(0))
630 .unwrap_or(0)
631}
632
633fn query_total_messages_filtered(conn: &Connection, filter: &AnalyticsFilter) -> i64 {
634 if !table_exists(conn, "messages") || !table_exists(conn, "conversations") {
635 return 0;
636 }
637
638 let has_agents = table_exists(conn, "agents");
639 let canonical_message_metrics_sql = canonical_message_metrics_from_sql(conn);
640 let has_message_metrics_created_at = canonical_message_metrics_sql.is_some()
641 && table_has_column(conn, "message_metrics", "created_at_ms");
642 let mut from_sql = if has_agents {
643 "messages m JOIN conversations c ON c.id = m.conversation_id LEFT JOIN agents a ON a.id = c.agent_id"
644 .to_string()
645 } else {
646 "messages m JOIN conversations c ON c.id = m.conversation_id".to_string()
647 };
648 if let Some(message_metrics_sql) = &canonical_message_metrics_sql {
649 from_sql.push_str(" LEFT JOIN ");
650 from_sql.push_str(message_metrics_sql);
651 from_sql.push_str(" ON mm.message_id = m.id");
652 }
653 let source_sql = if table_has_column(conn, "conversations", "origin_host") {
654 normalized_analytics_source_identity_sql_expr("c.source_id", "c.origin_host")
655 } else {
656 normalized_analytics_source_id_sql_expr("c.source_id")
657 };
658 let message_time_sql = if has_message_metrics_created_at {
659 "COALESCE(m.created_at, mm.created_at_ms, c.started_at, 0)"
660 } else {
661 "COALESCE(m.created_at, c.started_at, 0)"
662 };
663 let (where_sql, params) = build_filtered_where_sql(
664 filter,
665 Some("c.workspace_id"),
666 has_agents.then(|| normalized_analytics_agent_sql_expr("a.slug")),
667 source_sql,
668 Some(AnalyticsTimeColumn::TimestampMs(message_time_sql)),
669 );
670
671 query_scalar_i64(
672 conn,
673 &format!("SELECT COUNT(*) FROM {from_sql}{where_sql}"),
674 ¶ms,
675 )
676}
677
678fn query_message_metrics_filtered_count(
679 conn: &Connection,
680 filter: &AnalyticsFilter,
681 extra_condition: Option<&str>,
682) -> i64 {
683 if !table_exists(conn, "message_metrics") {
684 return 0;
685 }
686
687 let (from_sql, source_sql) = message_metrics_from_sql_and_source_sql(conn);
688 let message_metrics_time_sql = message_metrics_time_sql(conn);
689 let time_column = message_metrics_time_sql
690 .as_deref()
691 .map(AnalyticsTimeColumn::TimestampMs)
692 .unwrap_or(AnalyticsTimeColumn::Day("mm.day_id"));
693 let (where_sql, params) = build_filtered_where_sql(
694 filter,
695 Some("mm.workspace_id"),
696 Some(normalized_analytics_agent_sql_expr("mm.agent_slug")),
697 source_sql,
698 Some(time_column),
699 );
700 let sql = match extra_condition {
701 Some(extra) if where_sql.is_empty() => {
702 format!("SELECT COUNT(*) FROM {from_sql} WHERE {extra}")
703 }
704 Some(extra) => format!("SELECT COUNT(*) FROM {from_sql}{where_sql} AND {extra}"),
705 None => format!("SELECT COUNT(*) FROM {from_sql}{where_sql}"),
706 };
707
708 query_scalar_i64(conn, &sql, ¶ms)
709}
710
711fn query_token_usage_filtered_count(
712 conn: &Connection,
713 filter: &AnalyticsFilter,
714 extra_condition: Option<&str>,
715) -> i64 {
716 if !table_exists(conn, "token_usage") {
717 return 0;
718 }
719
720 let (from_sql, agent_sql, source_sql) = token_usage_from_sql_agent_and_source_sql(conn);
721 let token_usage_time_sql = token_usage_time_sql(conn);
722 let time_column = token_usage_time_sql
723 .as_deref()
724 .map(AnalyticsTimeColumn::TimestampMs)
725 .unwrap_or(AnalyticsTimeColumn::Day("tu.day_id"));
726 let (where_sql, params) = build_filtered_where_sql(
727 filter,
728 Some("tu.workspace_id"),
729 agent_sql,
730 source_sql,
731 Some(time_column),
732 );
733 let sql = match extra_condition {
734 Some(extra) if where_sql.is_empty() => {
735 format!("SELECT COUNT(*) FROM {from_sql} WHERE {extra}")
736 }
737 Some(extra) => format!("SELECT COUNT(*) FROM {from_sql}{where_sql} AND {extra}"),
738 None => format!("SELECT COUNT(*) FROM {from_sql}{where_sql}"),
739 };
740
741 query_scalar_i64(conn, &sql, ¶ms)
742}
743
744fn track_b_requires_token_usage_fallback(filter: &AnalyticsFilter) -> bool {
745 !filter.workspace_ids.is_empty() || !matches!(filter.source, SourceFilter::All)
746}
747
748fn token_usage_supports_track_b_metric(conn: &Connection, metric: Metric) -> bool {
749 if !table_exists(conn, "token_usage") {
750 return false;
751 }
752
753 match metric {
754 Metric::ApiTotal => table_has_column(conn, "token_usage", "total_tokens"),
755 Metric::ApiInput => table_has_column(conn, "token_usage", "input_tokens"),
756 Metric::ApiOutput => table_has_column(conn, "token_usage", "output_tokens"),
757 Metric::CacheRead => table_has_column(conn, "token_usage", "cache_read_tokens"),
758 Metric::CacheCreation => table_has_column(conn, "token_usage", "cache_creation_tokens"),
759 Metric::Thinking => table_has_column(conn, "token_usage", "thinking_tokens"),
760 Metric::ContentEstTotal => table_has_column(conn, "token_usage", "content_chars"),
761 Metric::ToolCalls => table_has_column(conn, "token_usage", "tool_call_count"),
762 Metric::EstimatedCostUsd => table_has_column(conn, "token_usage", "estimated_cost_usd"),
763 Metric::PlanCount | Metric::CoveragePct | Metric::MessageCount => true,
764 }
765}
766
767fn message_metrics_supports_track_a_metric(conn: &Connection, metric: Metric) -> bool {
768 if !table_exists(conn, "message_metrics")
769 || !table_has_column(conn, "message_metrics", "message_id")
770 {
771 return false;
772 }
773
774 match metric {
775 Metric::ApiTotal => {
776 table_has_column(conn, "message_metrics", "api_input_tokens")
777 && table_has_column(conn, "message_metrics", "api_output_tokens")
778 && table_has_column(conn, "message_metrics", "api_cache_read_tokens")
779 && table_has_column(conn, "message_metrics", "api_cache_creation_tokens")
780 && table_has_column(conn, "message_metrics", "api_thinking_tokens")
781 }
782 Metric::ApiInput => table_has_column(conn, "message_metrics", "api_input_tokens"),
783 Metric::ApiOutput => table_has_column(conn, "message_metrics", "api_output_tokens"),
784 Metric::CacheRead => table_has_column(conn, "message_metrics", "api_cache_read_tokens"),
785 Metric::CacheCreation => {
786 table_has_column(conn, "message_metrics", "api_cache_creation_tokens")
787 }
788 Metric::Thinking => table_has_column(conn, "message_metrics", "api_thinking_tokens"),
789 Metric::ContentEstTotal => table_has_column(conn, "message_metrics", "content_tokens_est"),
790 Metric::CoveragePct => table_has_column(conn, "message_metrics", "api_data_source"),
791 Metric::MessageCount => true,
792 Metric::ToolCalls => table_has_column(conn, "message_metrics", "tool_call_count"),
793 Metric::PlanCount => table_has_column(conn, "message_metrics", "has_plan"),
794 Metric::EstimatedCostUsd => false,
795 }
796}
797
798fn track_a_breakdown_supports_raw_metric(conn: &Connection, metric: Metric) -> bool {
799 if !table_exists(conn, "messages") || !table_exists(conn, "conversations") {
800 return false;
801 }
802
803 match metric {
804 Metric::MessageCount => true,
805 Metric::EstimatedCostUsd => false,
806 _ => message_metrics_supports_track_a_metric(conn, metric),
807 }
808}
809
810fn track_a_breakdown_requires_raw_fallback(filter: &AnalyticsFilter, dim: Dim) -> bool {
811 matches!(dim, Dim::Source)
812 || (matches!(dim, Dim::Agent | Dim::Workspace)
813 && (track_a_timeseries_requires_source_fallback(filter)
814 || analytics_requires_exact_raw_time_filter(filter)))
815}
816
817fn track_b_breakdown_requires_token_usage_fallback(filter: &AnalyticsFilter, dim: Dim) -> bool {
818 matches!(dim, Dim::Source)
819 || track_b_requires_token_usage_fallback(filter)
820 || analytics_requires_exact_raw_time_filter(filter)
821}
822
823fn track_a_timeseries_requires_source_fallback(filter: &AnalyticsFilter) -> bool {
824 match &filter.source {
825 SourceFilter::Remote => true,
826 SourceFilter::Specific(source_id) => {
827 normalized_analytics_source_id_value(source_id.as_str())
828 != crate::sources::provenance::LOCAL_SOURCE_ID
829 }
830 SourceFilter::All | SourceFilter::Local => false,
831 }
832}
833
834fn track_a_tools_supports_raw_source_fallback(conn: &Connection) -> bool {
835 table_exists(conn, "messages")
836 && table_exists(conn, "conversations")
837 && table_exists(conn, "message_metrics")
838 && table_has_column(conn, "message_metrics", "message_id")
839 && table_has_column(conn, "message_metrics", "tool_call_count")
840}
841
842fn analytics_requires_exact_raw_time_filter(filter: &AnalyticsFilter) -> bool {
843 filter.since_ms.is_some() || filter.until_ms.is_some()
844}
845
846fn track_a_timeseries_requires_raw_fallback(filter: &AnalyticsFilter) -> bool {
847 track_a_timeseries_requires_source_fallback(filter)
848 || analytics_requires_exact_raw_time_filter(filter)
849}
850
851fn track_b_cost_timeseries_requires_token_usage_fallback(
852 filter: &AnalyticsFilter,
853 group_by: GroupBy,
854) -> bool {
855 matches!(group_by, GroupBy::Hour)
856 || track_b_requires_token_usage_fallback(filter)
857 || analytics_requires_exact_raw_time_filter(filter)
858}
859
860fn query_track_a_rollup_status_with_message_metrics_fallback(
861 conn: &Connection,
862 table: &str,
863 bucket_col: &str,
864 filter: &AnalyticsFilter,
865) -> RollupStats {
866 if !table_exists(conn, table) {
867 return RollupStats::default();
868 }
869
870 let default_stats = || {
871 query_table_stats_from_source(
872 conn,
873 table,
874 table,
875 bucket_col,
876 Some("last_updated"),
877 filter,
878 Some("workspace_id"),
879 Some(normalized_analytics_agent_sql_expr("agent_slug")),
880 normalized_analytics_source_id_sql_expr("source_id"),
881 Some(match bucket_col {
882 "hour_id" => AnalyticsTimeColumn::Hour("hour_id"),
883 "day_id" => AnalyticsTimeColumn::Day("day_id"),
884 _ => return RollupStats::default(),
885 }),
886 )
887 };
888
889 if !(track_a_timeseries_requires_source_fallback(filter)
890 || analytics_requires_exact_raw_time_filter(filter))
891 || !table_exists(conn, "message_metrics")
892 || !table_exists(conn, "messages")
893 || !table_exists(conn, "conversations")
894 || !table_has_column(conn, "message_metrics", "message_id")
895 || !table_has_column(conn, "message_metrics", bucket_col)
896 {
897 return default_stats();
898 }
899
900 let (message_metrics_from_sql, message_metrics_source_sql) =
901 message_metrics_from_sql_and_source_sql(conn);
902 let message_metrics_bucket_col = match bucket_col {
903 "hour_id" => "mm.hour_id",
904 "day_id" => "mm.day_id",
905 _ => return default_stats(),
906 };
907 let message_metrics_agent_sql = normalized_analytics_agent_sql_expr("mm.agent_slug");
908 let message_metrics_time_sql = message_metrics_time_sql(conn);
909 let (where_sql, params) = build_filtered_where_sql(
910 filter,
911 Some("mm.workspace_id"),
912 Some(message_metrics_agent_sql.clone()),
913 message_metrics_source_sql.clone(),
914 Some(
915 message_metrics_time_sql
916 .as_deref()
917 .map(AnalyticsTimeColumn::TimestampMs)
918 .unwrap_or(match bucket_col {
919 "hour_id" => AnalyticsTimeColumn::Hour("mm.hour_id"),
920 "day_id" => AnalyticsTimeColumn::Day("mm.day_id"),
921 _ => return default_stats(),
922 }),
923 ),
924 );
925 let rollup_agent_sql = normalized_analytics_agent_sql_expr("rollup.agent_slug");
926 let rollup_source_sql = normalized_analytics_source_id_with_fallback_sql_expr(
927 "rollup.source_id",
928 "filtered_keys.source_id",
929 );
930 let sql = format!(
931 "SELECT COUNT(*), MIN(bucket_id), MAX(bucket_id), MAX(last_updated)\n FROM (\n SELECT DISTINCT\n rollup.{bucket_col} AS bucket_id,\n rollup.last_updated AS last_updated\n FROM {table} rollup\n JOIN (\n SELECT {message_metrics_bucket_col} AS bucket_id,\n {message_metrics_agent_sql} AS agent_slug,\n mm.workspace_id AS workspace_id,\n {message_metrics_source_sql} AS source_id\n FROM {message_metrics_from_sql}\n {where_sql}\n GROUP BY {message_metrics_bucket_col},\n {message_metrics_agent_sql},\n mm.workspace_id,\n {message_metrics_source_sql}\n ) filtered_keys\n ON rollup.{bucket_col} = filtered_keys.bucket_id\n AND {rollup_agent_sql} = filtered_keys.agent_slug\n AND rollup.workspace_id = filtered_keys.workspace_id\n AND {rollup_source_sql} = filtered_keys.source_id\n ) matched_track_a_rollups"
932 );
933
934 conn.query_row_map(&sql, ¶ms, |row: &Row| {
935 Ok(rollup_stats_from_summary_row(row))
936 })
937 .unwrap_or_default()
938}
939
940fn query_token_daily_stats_status(conn: &Connection, filter: &AnalyticsFilter) -> RollupStats {
941 if !table_exists(conn, "token_daily_stats") {
942 return RollupStats::default();
943 }
944
945 if !(track_b_requires_token_usage_fallback(filter)
946 || analytics_requires_exact_raw_time_filter(filter))
947 || !table_exists(conn, "token_usage")
948 {
949 return query_table_stats_from_source(
950 conn,
951 "token_daily_stats",
952 "token_daily_stats",
953 "day_id",
954 Some("last_updated"),
955 filter,
956 None,
957 Some(normalized_analytics_agent_sql_expr("agent_slug")),
958 normalized_analytics_source_id_sql_expr("source_id"),
959 Some(AnalyticsTimeColumn::Day("day_id")),
960 );
961 }
962
963 let (token_usage_from_sql, token_usage_agent_sql, token_usage_source_sql) =
964 token_usage_from_sql_agent_and_source_sql(conn);
965 let token_usage_agent_sql = token_usage_agent_sql_or_unknown(token_usage_agent_sql);
966 let token_usage_model_sql = normalized_analytics_model_family_sql_expr("tu.model_family");
967 let token_usage_time_sql = token_usage_time_sql(conn);
968 let (where_sql, params) = build_filtered_where_sql(
969 filter,
970 Some("tu.workspace_id"),
971 Some(token_usage_agent_sql.clone()),
972 token_usage_source_sql.clone(),
973 Some(
974 token_usage_time_sql
975 .as_deref()
976 .map(AnalyticsTimeColumn::TimestampMs)
977 .unwrap_or(AnalyticsTimeColumn::Day("tu.day_id")),
978 ),
979 );
980 let tds_agent_sql = normalized_analytics_agent_sql_expr("tds.agent_slug");
981 let tds_source_sql = normalized_analytics_source_id_with_fallback_sql_expr(
982 "tds.source_id",
983 "filtered_keys.source_id",
984 );
985 let tds_model_sql = normalized_analytics_model_family_sql_expr("tds.model_family");
986 let sql = format!(
987 "SELECT COUNT(*), MIN(day_id), MAX(day_id), MAX(last_updated) FROM ( SELECT DISTINCT tds.day_id AS day_id, tds.agent_slug AS agent_slug, tds.source_id AS source_id, tds.model_family AS model_family, tds.last_updated AS last_updated FROM token_daily_stats tds JOIN ( SELECT tu.day_id AS day_id, {token_usage_agent_sql} AS agent_slug, {token_usage_source_sql} AS source_id, {token_usage_model_sql} AS model_family FROM {token_usage_from_sql} {where_sql} GROUP BY tu.day_id, {token_usage_agent_sql}, {token_usage_source_sql}, {token_usage_model_sql} ) filtered_keys ON tds.day_id = filtered_keys.day_id AND {tds_agent_sql} = filtered_keys.agent_slug AND {tds_source_sql} = filtered_keys.source_id AND {tds_model_sql} = filtered_keys.model_family ) matched_token_daily_stats"
988 );
989
990 conn.query_row_map(&sql, ¶ms, |row: &Row| {
991 Ok(rollup_stats_from_summary_row(row))
992 })
993 .unwrap_or_default()
994}
995
996pub fn query_status(conn: &Connection, filter: &AnalyticsFilter) -> AnalyticsResult<StatusResult> {
1002 const TABLE_MESSAGE_METRICS: &str = "message_metrics";
1003 const TABLE_USAGE_HOURLY: &str = "usage_hourly";
1004 const TABLE_USAGE_DAILY: &str = "usage_daily";
1005 const TABLE_TOKEN_USAGE: &str = "token_usage";
1006 const TABLE_TOKEN_DAILY_STATS: &str = "token_daily_stats";
1007
1008 let has_message_metrics = table_exists(conn, TABLE_MESSAGE_METRICS);
1009 let has_usage_hourly = table_exists(conn, TABLE_USAGE_HOURLY);
1010 let has_usage_daily = table_exists(conn, TABLE_USAGE_DAILY);
1011 let has_token_usage = table_exists(conn, TABLE_TOKEN_USAGE);
1012 let has_token_daily_stats = table_exists(conn, TABLE_TOKEN_DAILY_STATS);
1013
1014 let (message_metrics_from_sql, message_metrics_source_sql) =
1015 message_metrics_from_sql_and_source_sql(conn);
1016 let message_metrics_time_sql = message_metrics_time_sql(conn);
1017 let mm = query_table_stats_from_source(
1018 conn,
1019 TABLE_MESSAGE_METRICS,
1020 &message_metrics_from_sql,
1021 "mm.day_id",
1022 None,
1023 filter,
1024 Some("mm.workspace_id"),
1025 Some(normalized_analytics_agent_sql_expr("mm.agent_slug")),
1026 message_metrics_source_sql,
1027 Some(
1028 message_metrics_time_sql
1029 .as_deref()
1030 .map(AnalyticsTimeColumn::TimestampMs)
1031 .unwrap_or(AnalyticsTimeColumn::Day("mm.day_id")),
1032 ),
1033 );
1034 let uh = query_track_a_rollup_status_with_message_metrics_fallback(
1035 conn,
1036 TABLE_USAGE_HOURLY,
1037 "hour_id",
1038 filter,
1039 );
1040 let ud = query_track_a_rollup_status_with_message_metrics_fallback(
1041 conn,
1042 TABLE_USAGE_DAILY,
1043 "day_id",
1044 filter,
1045 );
1046 let (token_usage_from_sql, token_usage_agent_sql, token_usage_source_sql) =
1047 token_usage_from_sql_agent_and_source_sql(conn);
1048 let token_usage_time_sql = token_usage_time_sql(conn);
1049 let tu = query_table_stats_from_source(
1050 conn,
1051 TABLE_TOKEN_USAGE,
1052 &token_usage_from_sql,
1053 "tu.day_id",
1054 None,
1055 filter,
1056 Some("tu.workspace_id"),
1057 token_usage_agent_sql,
1058 token_usage_source_sql,
1059 Some(
1060 token_usage_time_sql
1061 .as_deref()
1062 .map(AnalyticsTimeColumn::TimestampMs)
1063 .unwrap_or(AnalyticsTimeColumn::Day("tu.day_id")),
1064 ),
1065 );
1066 let tds = query_token_daily_stats_status(conn, filter);
1067
1068 let total_messages = query_total_messages_filtered(conn, filter);
1069
1070 let api_coverage_pct = if has_message_metrics && mm.row_count > 0 {
1071 let api_count =
1072 query_message_metrics_filtered_count(conn, filter, Some("api_data_source = 'api'"));
1073 (api_count as f64 / mm.row_count as f64) * 100.0
1074 } else {
1075 0.0
1076 };
1077
1078 let model_coverage_pct = if has_token_usage && tu.row_count > 0 {
1079 let with_model = query_token_usage_filtered_count(
1080 conn,
1081 filter,
1082 Some("model_name IS NOT NULL AND TRIM(model_name) != ''"),
1083 );
1084 (with_model as f64 / tu.row_count as f64) * 100.0
1085 } else {
1086 0.0
1087 };
1088
1089 let estimate_only_pct = if has_token_usage && tu.row_count > 0 {
1090 let estimates =
1091 query_token_usage_filtered_count(conn, filter, Some("data_source = 'estimated'"));
1092 (estimates as f64 / tu.row_count as f64) * 100.0
1093 } else {
1094 0.0
1095 };
1096
1097 let mm_coverage_pct = if total_messages > 0 {
1098 (mm.row_count as f64 / total_messages as f64) * 100.0
1099 } else {
1100 0.0
1101 };
1102
1103 let mut drift_signals: Vec<DriftSignal> = Vec::new();
1104
1105 let now_ms = std::time::SystemTime::now()
1106 .duration_since(std::time::UNIX_EPOCH)
1107 .map(|d| d.as_millis() as i64)
1108 .unwrap_or(0);
1109 let stale_threshold_ms: i64 = 86_400_000;
1110
1111 let track_a_fresh = is_recently_updated(uh.last_updated, now_ms, stale_threshold_ms);
1112 let track_b_fresh = is_recently_updated(tds.last_updated, now_ms, stale_threshold_ms);
1113
1114 if track_a_fresh && !track_b_fresh && has_token_daily_stats {
1115 drift_signals.push(DriftSignal {
1116 signal: "track_freshness_mismatch".into(),
1117 detail:
1118 "Track A (usage_hourly/daily) is fresh but Track B (token_daily_stats) is stale"
1119 .into(),
1120 severity: "warning".into(),
1121 });
1122 }
1123 if track_b_fresh && !track_a_fresh && has_usage_hourly {
1124 drift_signals.push(DriftSignal {
1125 signal: "track_freshness_mismatch".into(),
1126 detail:
1127 "Track B (token_daily_stats) is fresh but Track A (usage_hourly/daily) is stale"
1128 .into(),
1129 severity: "warning".into(),
1130 });
1131 }
1132
1133 if mm.row_count > 0 && uh.row_count == 0 && has_usage_hourly {
1134 drift_signals.push(DriftSignal {
1135 signal: "missing_rollups".into(),
1136 detail: "message_metrics has data but usage_hourly is empty — rebuild needed".into(),
1137 severity: "error".into(),
1138 });
1139 }
1140 if mm.row_count > 0 && ud.row_count == 0 && has_usage_daily {
1141 drift_signals.push(DriftSignal {
1142 signal: "missing_rollups".into(),
1143 detail: "message_metrics has data but usage_daily is empty — rebuild needed".into(),
1144 severity: "error".into(),
1145 });
1146 }
1147 if tu.row_count > 0 && tds.row_count == 0 && has_token_daily_stats {
1148 drift_signals.push(DriftSignal {
1149 signal: "missing_rollups".into(),
1150 detail: "token_usage has data but token_daily_stats is empty — rebuild needed".into(),
1151 severity: "error".into(),
1152 });
1153 }
1154
1155 if total_messages > 100 && mm.row_count == 0 && tu.row_count == 0 {
1156 drift_signals.push(DriftSignal {
1157 signal: "no_analytics_data".into(),
1158 detail: format!("{total_messages} messages indexed but no analytics computed"),
1159 severity: "error".into(),
1160 });
1161 }
1162
1163 let has_error_drift = drift_signals.iter().any(|s| s.severity == "error");
1164 let has_warning_drift = drift_signals.iter().any(|s| s.severity == "warning");
1165
1166 let recommended_action = if has_error_drift {
1167 if mm.row_count == 0 && tu.row_count == 0 {
1168 "rebuild_all"
1169 } else if mm.row_count > 0 && (uh.row_count == 0 || ud.row_count == 0) {
1170 "rebuild_track_a"
1171 } else if tu.row_count > 0 && tds.row_count == 0 {
1172 "rebuild_track_b"
1173 } else {
1174 "rebuild_all"
1175 }
1176 } else if has_warning_drift {
1177 if track_a_fresh && !track_b_fresh {
1178 "rebuild_track_b"
1179 } else if track_b_fresh && !track_a_fresh {
1180 "rebuild_track_a"
1181 } else {
1182 "none"
1183 }
1184 } else {
1185 "none"
1186 };
1187
1188 let make_table_info = |name: &str, exists: bool, stats: &RollupStats| TableInfo {
1189 table: name.into(),
1190 exists,
1191 row_count: stats.row_count,
1192 min_day_id: stats.min_day,
1193 max_day_id: stats.max_day,
1194 last_updated: stats.last_updated,
1195 };
1196
1197 Ok(StatusResult {
1198 tables: vec![
1199 make_table_info(TABLE_MESSAGE_METRICS, has_message_metrics, &mm),
1200 make_table_info(TABLE_USAGE_HOURLY, has_usage_hourly, &uh),
1201 make_table_info(TABLE_USAGE_DAILY, has_usage_daily, &ud),
1202 make_table_info(TABLE_TOKEN_USAGE, has_token_usage, &tu),
1203 make_table_info(TABLE_TOKEN_DAILY_STATS, has_token_daily_stats, &tds),
1204 ],
1205 coverage: CoverageInfo {
1206 total_messages,
1207 message_metrics_coverage_pct: (mm_coverage_pct * 100.0).round() / 100.0,
1208 api_token_coverage_pct: (api_coverage_pct * 100.0).round() / 100.0,
1209 model_name_coverage_pct: (model_coverage_pct * 100.0).round() / 100.0,
1210 estimate_only_pct: (estimate_only_pct * 100.0).round() / 100.0,
1211 },
1212 drift: DriftInfo {
1213 signals: drift_signals,
1214 track_a_fresh,
1215 track_b_fresh,
1216 },
1217 recommended_action: recommended_action.into(),
1218 })
1219}
1220
1221pub fn query_tokens_timeseries(
1227 conn: &Connection,
1228 filter: &AnalyticsFilter,
1229 group_by: GroupBy,
1230) -> AnalyticsResult<TimeseriesResult> {
1231 let query_start = std::time::Instant::now();
1232
1233 if track_a_timeseries_requires_raw_fallback(filter)
1234 && table_exists(conn, "messages")
1235 && table_exists(conn, "conversations")
1236 {
1237 return query_track_a_timeseries_from_raw(conn, filter, group_by, query_start);
1238 }
1239
1240 let (table, bucket_col) = match group_by {
1242 GroupBy::Hour => ("usage_hourly", "hour_id"),
1243 _ => ("usage_daily", "day_id"),
1244 };
1245
1246 if !table_exists(conn, table) {
1248 return Ok(TimeseriesResult {
1249 buckets: vec![],
1250 totals: UsageBucket::default(),
1251 source_table: table.into(),
1252 group_by,
1253 elapsed_ms: query_start.elapsed().as_millis() as u64,
1254 path: "none".into(),
1255 });
1256 }
1257
1258 let (day_min, day_max) = bucketing::resolve_day_range(filter);
1260 let (hour_min, hour_max) = bucketing::resolve_hour_range(filter);
1261
1262 let (dim_parts, dim_params) = build_where_parts(filter, Some("workspace_id"));
1263 let mut where_parts = dim_parts;
1264 let mut bind_values = dim_params;
1265
1266 match group_by {
1267 GroupBy::Hour => {
1268 if let Some(min) = hour_min {
1269 bind_values.push(ParamValue::from(min));
1270 where_parts.push(format!("{bucket_col} >= ?{}", bind_values.len()));
1271 }
1272 if let Some(max) = hour_max {
1273 bind_values.push(ParamValue::from(max));
1274 where_parts.push(format!("{bucket_col} <= ?{}", bind_values.len()));
1275 }
1276 }
1277 _ => {
1278 if let Some(min) = day_min {
1279 bind_values.push(ParamValue::from(min));
1280 where_parts.push(format!("{bucket_col} >= ?{}", bind_values.len()));
1281 }
1282 if let Some(max) = day_max {
1283 bind_values.push(ParamValue::from(max));
1284 where_parts.push(format!("{bucket_col} <= ?{}", bind_values.len()));
1285 }
1286 }
1287 }
1288
1289 let where_clause = if where_parts.is_empty() {
1290 String::new()
1291 } else {
1292 format!(" WHERE {}", where_parts.join(" AND "))
1293 };
1294
1295 let has_plan_token_rollups = table_has_plan_token_rollups(conn, table);
1296 let plan_content_expr = if has_plan_token_rollups {
1297 "SUM(plan_content_tokens_est_total)"
1298 } else {
1299 "0"
1300 };
1301 let plan_api_expr = if has_plan_token_rollups {
1302 "SUM(plan_api_tokens_total)"
1303 } else {
1304 "0"
1305 };
1306
1307 let sql = format!(
1308 "SELECT {bucket_col},
1309 SUM(message_count),
1310 SUM(user_message_count),
1311 SUM(assistant_message_count),
1312 SUM(tool_call_count),
1313 SUM(plan_message_count),
1314 {plan_content_expr},
1315 {plan_api_expr},
1316 SUM(api_coverage_message_count),
1317 SUM(content_tokens_est_total),
1318 SUM(content_tokens_est_user),
1319 SUM(content_tokens_est_assistant),
1320 SUM(api_tokens_total),
1321 SUM(api_input_tokens_total),
1322 SUM(api_output_tokens_total),
1323 SUM(api_cache_read_tokens_total),
1324 SUM(api_cache_creation_tokens_total),
1325 SUM(api_thinking_tokens_total)
1326 FROM (
1327 SELECT * FROM {table}
1328 {where_clause}
1329 ) filtered_tokens_timeseries
1330 GROUP BY {bucket_col}
1331 ORDER BY {bucket_col}"
1332 );
1333
1334 let param_values: Vec<ParamValue> = bind_values.clone();
1335
1336 let raw_buckets: Vec<(i64, UsageBucket)> = conn
1337 .query_map_collect(&sql, ¶m_values, |row: &Row| {
1338 Ok((
1339 row.get_typed::<i64>(0)?,
1340 UsageBucket {
1341 message_count: row.get_typed(1)?,
1342 user_message_count: row.get_typed(2)?,
1343 assistant_message_count: row.get_typed(3)?,
1344 tool_call_count: row.get_typed(4)?,
1345 plan_message_count: row.get_typed(5)?,
1346 plan_content_tokens_est_total: row.get_typed(6)?,
1347 plan_api_tokens_total: row.get_typed(7)?,
1348 api_coverage_message_count: row.get_typed(8)?,
1349 content_tokens_est_total: row.get_typed(9)?,
1350 content_tokens_est_user: row.get_typed(10)?,
1351 content_tokens_est_assistant: row.get_typed(11)?,
1352 api_tokens_total: row.get_typed(12)?,
1353 api_input_tokens_total: row.get_typed(13)?,
1354 api_output_tokens_total: row.get_typed(14)?,
1355 api_cache_read_tokens_total: row.get_typed(15)?,
1356 api_cache_creation_tokens_total: row.get_typed(16)?,
1357 api_thinking_tokens_total: row.get_typed(17)?,
1358 ..Default::default()
1359 },
1360 ))
1361 })
1362 .map_err(|e| analytics_query_error("Analytics query failed", e))?;
1363
1364 let final_buckets: Vec<(String, UsageBucket)> = match group_by {
1366 GroupBy::Hour => raw_buckets
1367 .into_iter()
1368 .map(|(id, row)| (bucketing::hour_id_to_iso(id), row))
1369 .collect(),
1370 GroupBy::Day => raw_buckets
1371 .into_iter()
1372 .map(|(id, row)| (bucketing::day_id_to_iso(id), row))
1373 .collect(),
1374 GroupBy::Week => {
1375 let mut merged: BTreeMap<String, UsageBucket> = BTreeMap::new();
1376 for (day_id, row) in raw_buckets {
1377 let key = bucketing::day_id_to_iso_week(day_id);
1378 merged.entry(key).or_default().merge(&row);
1379 }
1380 merged.into_iter().collect()
1381 }
1382 GroupBy::Month => {
1383 let mut merged: BTreeMap<String, UsageBucket> = BTreeMap::new();
1384 for (day_id, row) in raw_buckets {
1385 let key = bucketing::day_id_to_month(day_id);
1386 merged.entry(key).or_default().merge(&row);
1387 }
1388 merged.into_iter().collect()
1389 }
1390 };
1391
1392 let mut totals = UsageBucket::default();
1394 for (_, row) in &final_buckets {
1395 totals.merge(row);
1396 }
1397
1398 let elapsed_ms = query_start.elapsed().as_millis() as u64;
1399
1400 Ok(TimeseriesResult {
1401 buckets: final_buckets,
1402 totals,
1403 source_table: table.into(),
1404 group_by,
1405 elapsed_ms,
1406 path: "rollup".into(),
1407 })
1408}
1409
1410fn query_track_a_timeseries_from_raw(
1411 conn: &Connection,
1412 filter: &AnalyticsFilter,
1413 group_by: GroupBy,
1414 query_start: std::time::Instant,
1415) -> AnalyticsResult<TimeseriesResult> {
1416 let has_agents = table_exists(conn, "agents");
1417 let has_origin_host = table_has_column(conn, "conversations", "origin_host");
1418 let canonical_message_metrics_sql = canonical_message_metrics_from_sql(conn);
1419 let join_message_metrics = canonical_message_metrics_sql.is_some();
1420 let has_message_metrics_created_at =
1421 join_message_metrics && table_has_column(conn, "message_metrics", "created_at_ms");
1422 let has_content_tokens_est =
1423 join_message_metrics && table_has_column(conn, "message_metrics", "content_tokens_est");
1424 let has_api_input_tokens =
1425 join_message_metrics && table_has_column(conn, "message_metrics", "api_input_tokens");
1426 let has_api_output_tokens =
1427 join_message_metrics && table_has_column(conn, "message_metrics", "api_output_tokens");
1428 let has_api_cache_read_tokens =
1429 join_message_metrics && table_has_column(conn, "message_metrics", "api_cache_read_tokens");
1430 let has_api_cache_creation_tokens = join_message_metrics
1431 && table_has_column(conn, "message_metrics", "api_cache_creation_tokens");
1432 let has_api_thinking_tokens =
1433 join_message_metrics && table_has_column(conn, "message_metrics", "api_thinking_tokens");
1434 let has_api_data_source =
1435 join_message_metrics && table_has_column(conn, "message_metrics", "api_data_source");
1436 let has_tool_call_count =
1437 join_message_metrics && table_has_column(conn, "message_metrics", "tool_call_count");
1438 let has_has_plan =
1439 join_message_metrics && table_has_column(conn, "message_metrics", "has_plan");
1440
1441 let source_id_sql = "TRIM(COALESCE(c.source_id, ''))";
1442 let origin_host_sql = if has_origin_host {
1443 "TRIM(COALESCE(c.origin_host, ''))"
1444 } else {
1445 "''"
1446 };
1447
1448 let mut from_sql = String::from("messages m JOIN conversations c ON c.id = m.conversation_id");
1449 if has_agents {
1450 from_sql.push_str(" LEFT JOIN agents a ON a.id = c.agent_id");
1451 }
1452 if let Some(message_metrics_sql) = &canonical_message_metrics_sql {
1453 from_sql.push_str(" LEFT JOIN ");
1454 from_sql.push_str(message_metrics_sql);
1455 from_sql.push_str(" ON mm.message_id = m.id");
1456 }
1457
1458 let filter_for_sql = AnalyticsFilter {
1459 source: SourceFilter::All,
1460 ..filter.clone()
1461 };
1462 let message_time_sql = if has_message_metrics_created_at {
1463 if join_message_metrics {
1464 "COALESCE(m.created_at, mm.created_at_ms, c.started_at, 0)"
1465 } else {
1466 "COALESCE(m.created_at, (SELECT MAX(message_metrics.created_at_ms) FROM message_metrics WHERE message_metrics.message_id = m.id), c.started_at, 0)"
1467 }
1468 } else {
1469 "COALESCE(m.created_at, c.started_at, 0)"
1470 };
1471 let (where_sql, params) = build_filtered_where_sql(
1472 &filter_for_sql,
1473 Some("c.workspace_id"),
1474 has_agents.then(|| normalized_analytics_agent_sql_expr("a.slug")),
1475 sql_string_literal("all"),
1476 Some(AnalyticsTimeColumn::TimestampMs(message_time_sql)),
1477 );
1478
1479 let content_tokens_expr = if has_content_tokens_est {
1480 "COALESCE(mm.content_tokens_est, 0)"
1481 } else {
1482 "0"
1483 };
1484 let api_input_expr = if has_api_input_tokens {
1485 "COALESCE(mm.api_input_tokens, 0)"
1486 } else {
1487 "0"
1488 };
1489 let api_output_expr = if has_api_output_tokens {
1490 "COALESCE(mm.api_output_tokens, 0)"
1491 } else {
1492 "0"
1493 };
1494 let api_cache_read_expr = if has_api_cache_read_tokens {
1495 "COALESCE(mm.api_cache_read_tokens, 0)"
1496 } else {
1497 "0"
1498 };
1499 let api_cache_creation_expr = if has_api_cache_creation_tokens {
1500 "COALESCE(mm.api_cache_creation_tokens, 0)"
1501 } else {
1502 "0"
1503 };
1504 let api_thinking_expr = if has_api_thinking_tokens {
1505 "COALESCE(mm.api_thinking_tokens, 0)"
1506 } else {
1507 "0"
1508 };
1509 let api_covered_expr = if has_api_data_source {
1510 "CASE WHEN mm.api_data_source = 'api' THEN 1 ELSE 0 END"
1511 } else {
1512 "0"
1513 };
1514 let tool_call_expr = if has_tool_call_count {
1515 "COALESCE(mm.tool_call_count, 0)"
1516 } else {
1517 "0"
1518 };
1519 let has_plan_expr = if has_has_plan {
1520 "CASE WHEN COALESCE(mm.has_plan, 0) != 0 THEN 1 ELSE 0 END"
1521 } else {
1522 "0"
1523 };
1524
1525 let sql = format!(
1526 "SELECT m.conversation_id,
1527 m.role,
1528 {message_time_sql},
1529 {content_tokens_expr},
1530 {api_input_expr},
1531 {api_output_expr},
1532 {api_cache_read_expr},
1533 {api_cache_creation_expr},
1534 {api_thinking_expr},
1535 {api_covered_expr},
1536 {tool_call_expr},
1537 {has_plan_expr},
1538 {source_id_sql},
1539 {origin_host_sql}
1540 FROM {from_sql}{where_sql}"
1541 );
1542
1543 let row_buckets: Vec<(String, String, i64, UsageBucket)> = conn
1544 .query_map_collect(&sql, ¶ms, |row: &Row| {
1545 let role: String = row.get_typed(1)?;
1547 let created_at_ms: i64 = row.get_typed(2)?;
1548 let content_tokens_est: i64 = row.get_typed(3)?;
1549 let api_input_tokens: i64 = row.get_typed(4)?;
1550 let api_output_tokens: i64 = row.get_typed(5)?;
1551 let api_cache_read_tokens: i64 = row.get_typed(6)?;
1552 let api_cache_creation_tokens: i64 = row.get_typed(7)?;
1553 let api_thinking_tokens: i64 = row.get_typed(8)?;
1554 let api_covered: i64 = row.get_typed(9)?;
1555 let tool_call_count: i64 = row.get_typed(10)?;
1556 let has_plan: i64 = row.get_typed(11)?;
1557 let source_id: String = row.get_typed(12)?;
1558 let origin_host: String = row.get_typed(13)?;
1559
1560 let normalized_created_at_ms = normalize_epoch_millis(created_at_ms);
1561 let bucket_id = match group_by {
1562 GroupBy::Hour => crate::storage::sqlite::FrankenStorage::hour_id_from_millis(
1563 normalized_created_at_ms,
1564 ),
1565 GroupBy::Day | GroupBy::Week | GroupBy::Month => {
1566 crate::storage::sqlite::FrankenStorage::day_id_from_millis(
1567 normalized_created_at_ms,
1568 )
1569 }
1570 };
1571
1572 let mut bucket = UsageBucket {
1573 message_count: 1,
1574 user_message_count: i64::from(role == "user"),
1575 assistant_message_count: i64::from(role == "assistant"),
1576 tool_call_count,
1577 plan_message_count: has_plan,
1578 api_coverage_message_count: api_covered,
1579 content_tokens_est_total: content_tokens_est,
1580 content_tokens_est_user: if role == "user" {
1581 content_tokens_est
1582 } else {
1583 0
1584 },
1585 content_tokens_est_assistant: if role == "assistant" {
1586 content_tokens_est
1587 } else {
1588 0
1589 },
1590 api_input_tokens_total: api_input_tokens,
1591 api_output_tokens_total: api_output_tokens,
1592 api_cache_read_tokens_total: api_cache_read_tokens,
1593 api_cache_creation_tokens_total: api_cache_creation_tokens,
1594 api_thinking_tokens_total: api_thinking_tokens,
1595 plan_content_tokens_est_total: if has_plan != 0 { content_tokens_est } else { 0 },
1596 ..Default::default()
1597 };
1598 bucket.api_tokens_total = api_input_tokens
1599 + api_output_tokens
1600 + api_cache_read_tokens
1601 + api_cache_creation_tokens
1602 + api_thinking_tokens;
1603 if has_plan != 0 && api_covered != 0 {
1604 bucket.plan_api_tokens_total = bucket.api_tokens_total;
1605 }
1606
1607 Ok((source_id, origin_host, bucket_id, bucket))
1608 })
1609 .map_err(|e| analytics_query_error("Analytics query failed", e))?;
1610
1611 let mut grouped_buckets: BTreeMap<i64, UsageBucket> = BTreeMap::new();
1612 for (source_id, origin_host, bucket_id, bucket) in row_buckets {
1613 let normalized_key = normalized_analytics_source_identity_value(&source_id, &origin_host);
1614 if !analytics_source_filter_matches_key(&filter.source, &normalized_key) {
1615 continue;
1616 }
1617 grouped_buckets.entry(bucket_id).or_default().merge(&bucket);
1618 }
1619
1620 let raw_buckets: Vec<(i64, UsageBucket)> = grouped_buckets.into_iter().collect();
1621 let final_buckets: Vec<(String, UsageBucket)> = match group_by {
1622 GroupBy::Hour => raw_buckets
1623 .into_iter()
1624 .map(|(id, row)| (bucketing::hour_id_to_iso(id), row))
1625 .collect(),
1626 GroupBy::Day => raw_buckets
1627 .into_iter()
1628 .map(|(id, row)| (bucketing::day_id_to_iso(id), row))
1629 .collect(),
1630 GroupBy::Week => {
1631 let mut merged: BTreeMap<String, UsageBucket> = BTreeMap::new();
1632 for (day_id, row) in raw_buckets {
1633 let key = bucketing::day_id_to_iso_week(day_id);
1634 merged.entry(key).or_default().merge(&row);
1635 }
1636 merged.into_iter().collect()
1637 }
1638 GroupBy::Month => {
1639 let mut merged: BTreeMap<String, UsageBucket> = BTreeMap::new();
1640 for (day_id, row) in raw_buckets {
1641 let key = bucketing::day_id_to_month(day_id);
1642 merged.entry(key).or_default().merge(&row);
1643 }
1644 merged.into_iter().collect()
1645 }
1646 };
1647
1648 let mut totals = UsageBucket::default();
1649 for (_, row) in &final_buckets {
1650 totals.merge(row);
1651 }
1652
1653 Ok(TimeseriesResult {
1654 buckets: final_buckets,
1655 totals,
1656 source_table: if join_message_metrics {
1657 "message_metrics".into()
1658 } else {
1659 "messages".into()
1660 },
1661 group_by,
1662 elapsed_ms: query_start.elapsed().as_millis() as u64,
1663 path: "raw".into(),
1664 })
1665}
1666
1667fn query_cost_timeseries_from_token_usage(
1678 conn: &Connection,
1679 filter: &AnalyticsFilter,
1680 group_by: GroupBy,
1681 query_start: std::time::Instant,
1682) -> AnalyticsResult<TimeseriesResult> {
1683 if !table_exists(conn, "token_usage") {
1684 return Ok(TimeseriesResult {
1685 buckets: vec![],
1686 totals: UsageBucket::default(),
1687 source_table: "token_usage".into(),
1688 group_by,
1689 elapsed_ms: query_start.elapsed().as_millis() as u64,
1690 path: "none".into(),
1691 });
1692 }
1693
1694 let has_input_tokens = table_has_column(conn, "token_usage", "input_tokens");
1695 let has_output_tokens = table_has_column(conn, "token_usage", "output_tokens");
1696 let has_cache_read_tokens = table_has_column(conn, "token_usage", "cache_read_tokens");
1697 let has_cache_creation_tokens = table_has_column(conn, "token_usage", "cache_creation_tokens");
1698 let has_thinking_tokens = table_has_column(conn, "token_usage", "thinking_tokens");
1699 let has_estimated_cost = table_has_column(conn, "token_usage", "estimated_cost_usd");
1700 let has_role = table_has_column(conn, "token_usage", "role");
1701 let has_content_chars = table_has_column(conn, "token_usage", "content_chars");
1702 let has_tool_call_count = table_has_column(conn, "token_usage", "tool_call_count");
1703
1704 let (token_usage_from_sql, token_usage_agent_sql, token_usage_source_sql) =
1705 token_usage_from_sql_agent_and_source_sql(conn);
1706 let token_usage_time_sql = token_usage_time_sql(conn);
1707 let has_exact_time = token_usage_time_sql.is_some();
1708 let time_column = token_usage_time_sql
1709 .as_deref()
1710 .map(AnalyticsTimeColumn::TimestampMs)
1711 .unwrap_or(AnalyticsTimeColumn::Day("tu.day_id"));
1712 let (where_sql, params) = build_filtered_where_sql(
1713 filter,
1714 Some("tu.workspace_id"),
1715 token_usage_agent_sql,
1716 token_usage_source_sql,
1717 Some(time_column),
1718 );
1719
1720 let sql = format!(
1721 "SELECT {},
1722 {} AS role,
1723 {},
1724 {},
1725 {},
1726 {},
1727 {},
1728 {},
1729 {},
1730 {},
1731 {}
1732 FROM {token_usage_from_sql}
1733 {where_sql}",
1734 token_usage_time_sql.as_deref().unwrap_or("tu.day_id"),
1735 if has_role { "tu.role" } else { "''" },
1736 if has_tool_call_count {
1737 "COALESCE(tu.tool_call_count, 0)"
1738 } else {
1739 "0"
1740 },
1741 if has_input_tokens {
1742 "COALESCE(tu.input_tokens, 0)"
1743 } else {
1744 "0"
1745 },
1746 if has_output_tokens {
1747 "COALESCE(tu.output_tokens, 0)"
1748 } else {
1749 "0"
1750 },
1751 if has_cache_read_tokens {
1752 "COALESCE(tu.cache_read_tokens, 0)"
1753 } else {
1754 "0"
1755 },
1756 if has_cache_creation_tokens {
1757 "COALESCE(tu.cache_creation_tokens, 0)"
1758 } else {
1759 "0"
1760 },
1761 if has_thinking_tokens {
1762 "COALESCE(tu.thinking_tokens, 0)"
1763 } else {
1764 "0"
1765 },
1766 "COALESCE(tu.total_tokens, 0)",
1767 if has_content_chars {
1768 "COALESCE(tu.content_chars, 0)"
1769 } else {
1770 "0"
1771 },
1772 if has_estimated_cost {
1773 "COALESCE(tu.estimated_cost_usd, 0.0)"
1774 } else {
1775 "0.0"
1776 },
1777 );
1778
1779 let raw_rows: Vec<(i64, UsageBucket)> = conn
1780 .query_map_collect(&sql, ¶ms, |row: &Row| {
1781 let raw_time_value: i64 = row.get_typed(0)?;
1782 let role: String = row.get_typed(1)?;
1783 let tool_calls: i64 = row.get_typed(2)?;
1784 let input_tok: i64 = row.get_typed(3)?;
1785 let output_tok: i64 = row.get_typed(4)?;
1786 let cache_read: i64 = row.get_typed(5)?;
1787 let cache_create: i64 = row.get_typed(6)?;
1788 let thinking: i64 = row.get_typed(7)?;
1789 let grand_total: i64 = row.get_typed(8)?;
1790 let content_chars: i64 = row.get_typed(9)?;
1791 let cost: f64 = row.get_typed(10)?;
1792
1793 let bucket_id = if has_exact_time {
1794 let normalized_created_at_ms = normalize_epoch_millis(raw_time_value);
1795 match group_by {
1796 GroupBy::Hour => crate::storage::sqlite::FrankenStorage::hour_id_from_millis(
1797 normalized_created_at_ms,
1798 ),
1799 GroupBy::Day | GroupBy::Week | GroupBy::Month => {
1800 crate::storage::sqlite::FrankenStorage::day_id_from_millis(
1801 normalized_created_at_ms,
1802 )
1803 }
1804 }
1805 } else {
1806 raw_time_value
1807 };
1808
1809 Ok((
1810 bucket_id,
1811 UsageBucket {
1812 message_count: 1,
1813 user_message_count: i64::from(role == "user"),
1814 assistant_message_count: i64::from(role == "assistant"),
1815 tool_call_count: tool_calls,
1816 api_coverage_message_count: 1,
1817 content_tokens_est_total: content_chars / 4,
1818 api_tokens_total: grand_total,
1819 api_input_tokens_total: input_tok,
1820 api_output_tokens_total: output_tok,
1821 api_cache_read_tokens_total: cache_read,
1822 api_cache_creation_tokens_total: cache_create,
1823 api_thinking_tokens_total: thinking,
1824 estimated_cost_usd: cost,
1825 ..Default::default()
1826 },
1827 ))
1828 })
1829 .map_err(|e| analytics_query_error("Cost timeseries query failed", e))?;
1830
1831 let mut grouped_buckets: BTreeMap<i64, UsageBucket> = BTreeMap::new();
1832 for (bucket_id, bucket) in raw_rows {
1833 grouped_buckets.entry(bucket_id).or_default().merge(&bucket);
1834 }
1835
1836 let raw_buckets: Vec<(i64, UsageBucket)> = grouped_buckets.into_iter().collect();
1837 let final_buckets: Vec<(String, UsageBucket)> = match group_by {
1838 GroupBy::Hour if has_exact_time => raw_buckets
1839 .into_iter()
1840 .map(|(id, row)| (bucketing::hour_id_to_iso(id), row))
1841 .collect(),
1842 GroupBy::Hour | GroupBy::Day => raw_buckets
1843 .into_iter()
1844 .map(|(id, row)| (bucketing::day_id_to_iso(id), row))
1845 .collect(),
1846 GroupBy::Week => {
1847 let mut merged: BTreeMap<String, UsageBucket> = BTreeMap::new();
1848 for (day_id, row) in raw_buckets {
1849 let key = bucketing::day_id_to_iso_week(day_id);
1850 merged.entry(key).or_default().merge(&row);
1851 }
1852 merged.into_iter().collect()
1853 }
1854 GroupBy::Month => {
1855 let mut merged: BTreeMap<String, UsageBucket> = BTreeMap::new();
1856 for (day_id, row) in raw_buckets {
1857 let key = bucketing::day_id_to_month(day_id);
1858 merged.entry(key).or_default().merge(&row);
1859 }
1860 merged.into_iter().collect()
1861 }
1862 };
1863
1864 let mut totals = UsageBucket::default();
1865 for (_, row) in &final_buckets {
1866 totals.merge(row);
1867 }
1868
1869 Ok(TimeseriesResult {
1870 buckets: final_buckets,
1871 totals,
1872 source_table: "token_usage".into(),
1873 group_by,
1874 elapsed_ms: query_start.elapsed().as_millis() as u64,
1875 path: "raw".into(),
1876 })
1877}
1878
1879pub fn query_cost_timeseries(
1880 conn: &Connection,
1881 filter: &AnalyticsFilter,
1882 group_by: GroupBy,
1883) -> AnalyticsResult<TimeseriesResult> {
1884 let query_start = std::time::Instant::now();
1885
1886 let table = "token_daily_stats";
1887
1888 if track_b_cost_timeseries_requires_token_usage_fallback(filter, group_by)
1889 && token_usage_supports_track_b_metric(conn, Metric::ApiTotal)
1890 && token_usage_supports_track_b_metric(conn, Metric::EstimatedCostUsd)
1891 {
1892 return query_cost_timeseries_from_token_usage(conn, filter, group_by, query_start);
1893 }
1894
1895 if !table_exists(conn, table) {
1896 return Ok(TimeseriesResult {
1897 buckets: vec![],
1898 totals: UsageBucket::default(),
1899 source_table: table.into(),
1900 group_by,
1901 elapsed_ms: query_start.elapsed().as_millis() as u64,
1902 path: "none".into(),
1903 });
1904 }
1905
1906 let (day_min, day_max) = bucketing::resolve_day_range(filter);
1908 let (dim_parts, dim_params) = build_where_parts(filter, None);
1909 let mut where_parts = dim_parts;
1910 let mut bind_values = dim_params;
1911
1912 if let Some(min) = day_min {
1913 bind_values.push(ParamValue::from(min));
1914 where_parts.push(format!("day_id >= ?{}", bind_values.len()));
1915 }
1916 if let Some(max) = day_max {
1917 bind_values.push(ParamValue::from(max));
1918 where_parts.push(format!("day_id <= ?{}", bind_values.len()));
1919 }
1920
1921 if table == "token_daily_stats" {
1924 where_parts.push("model_family != 'all'".into());
1925 where_parts.push("agent_slug != 'all'".into());
1926 where_parts.push("LOWER(TRIM(COALESCE(source_id, ''))) != 'all'".into());
1927 }
1928
1929 let where_clause = if where_parts.is_empty() {
1930 String::new()
1931 } else {
1932 format!(" WHERE {}", where_parts.join(" AND "))
1933 };
1934
1935 let sql = format!(
1936 "SELECT day_id,
1937 SUM(api_call_count),
1938 SUM(user_message_count),
1939 SUM(assistant_message_count),
1940 SUM(total_tool_calls),
1941 SUM(total_input_tokens),
1942 SUM(total_output_tokens),
1943 SUM(total_cache_read_tokens),
1944 SUM(total_cache_creation_tokens),
1945 SUM(total_thinking_tokens),
1946 SUM(grand_total_tokens),
1947 SUM(total_content_chars),
1948 SUM(estimated_cost_usd)
1949 FROM (
1950 SELECT * FROM {table}
1951 {where_clause}
1952 ) filtered_cost_timeseries
1953 GROUP BY day_id
1954 ORDER BY day_id"
1955 );
1956
1957 let param_values: Vec<ParamValue> = bind_values.clone();
1958
1959 let raw_buckets: Vec<(i64, UsageBucket)> = conn
1960 .query_map_collect(&sql, ¶m_values, |row: &Row| {
1961 let day_id: i64 = row.get_typed(0)?;
1962 let api_call_count: i64 = row.get_typed(1)?;
1963 let user_msg: i64 = row.get_typed(2)?;
1964 let asst_msg: i64 = row.get_typed(3)?;
1965 let tool_calls: i64 = row.get_typed(4)?;
1966 let input_tok: i64 = row.get_typed(5)?;
1967 let output_tok: i64 = row.get_typed(6)?;
1968 let cache_read: i64 = row.get_typed(7)?;
1969 let cache_create: i64 = row.get_typed(8)?;
1970 let thinking: i64 = row.get_typed(9)?;
1971 let grand_total: i64 = row.get_typed(10)?;
1972 let content_chars: i64 = row.get_typed(11)?;
1973 let cost: f64 = row.get_typed(12)?;
1974
1975 Ok((
1976 day_id,
1977 UsageBucket {
1978 message_count: api_call_count,
1979 user_message_count: user_msg,
1980 assistant_message_count: asst_msg,
1981 tool_call_count: tool_calls,
1982 api_coverage_message_count: api_call_count, content_tokens_est_total: content_chars / 4,
1984 api_tokens_total: grand_total,
1985 api_input_tokens_total: input_tok,
1986 api_output_tokens_total: output_tok,
1987 api_cache_read_tokens_total: cache_read,
1988 api_cache_creation_tokens_total: cache_create,
1989 api_thinking_tokens_total: thinking,
1990 estimated_cost_usd: cost,
1991 ..Default::default()
1992 },
1993 ))
1994 })
1995 .map_err(|e| analytics_query_error("Cost timeseries query failed", e))?;
1996
1997 let final_buckets: Vec<(String, UsageBucket)> = match group_by {
1999 GroupBy::Hour | GroupBy::Day => raw_buckets
2000 .into_iter()
2001 .map(|(id, row)| (bucketing::day_id_to_iso(id), row))
2002 .collect(),
2003 GroupBy::Week => {
2004 let mut merged: BTreeMap<String, UsageBucket> = BTreeMap::new();
2005 for (day_id, row) in raw_buckets {
2006 let key = bucketing::day_id_to_iso_week(day_id);
2007 merged.entry(key).or_default().merge(&row);
2008 }
2009 merged.into_iter().collect()
2010 }
2011 GroupBy::Month => {
2012 let mut merged: BTreeMap<String, UsageBucket> = BTreeMap::new();
2013 for (day_id, row) in raw_buckets {
2014 let key = bucketing::day_id_to_month(day_id);
2015 merged.entry(key).or_default().merge(&row);
2016 }
2017 merged.into_iter().collect()
2018 }
2019 };
2020
2021 let mut totals = UsageBucket::default();
2023 for (_, row) in &final_buckets {
2024 totals.merge(row);
2025 }
2026
2027 let elapsed_ms = query_start.elapsed().as_millis() as u64;
2028
2029 Ok(TimeseriesResult {
2030 buckets: final_buckets,
2031 totals,
2032 source_table: table.into(),
2033 group_by,
2034 elapsed_ms,
2035 path: "rollup".into(),
2036 })
2037}
2038
2039fn breakdown_route(dim: Dim, metric: Metric) -> (&'static str, &'static str, bool) {
2044 match (dim, metric) {
2045 (Dim::Model, _) => ("token_daily_stats", "model_family", true),
2046 (Dim::Agent, Metric::EstimatedCostUsd) => ("token_daily_stats", "agent_slug", true),
2047 (Dim::Source, Metric::EstimatedCostUsd) => ("token_daily_stats", "source_id", true),
2048 (Dim::Agent, _) => ("usage_daily", "agent_slug", false),
2049 (Dim::Workspace, _) => ("usage_daily", "workspace_id", false),
2050 (Dim::Source, _) => ("usage_daily", "source_id", false),
2051 }
2052}
2053
2054fn query_track_b_breakdown_from_token_usage(
2055 conn: &Connection,
2056 filter: &AnalyticsFilter,
2057 dim: Dim,
2058 metric: Metric,
2059 limit: usize,
2060 query_start: std::time::Instant,
2061) -> AnalyticsResult<BreakdownResult> {
2062 if !table_exists(conn, "token_usage") {
2063 return Ok(BreakdownResult {
2064 rows: vec![],
2065 dim,
2066 metric,
2067 source_table: "token_usage".into(),
2068 elapsed_ms: query_start.elapsed().as_millis() as u64,
2069 });
2070 }
2071
2072 let has_input_tokens = table_has_column(conn, "token_usage", "input_tokens");
2073 let has_output_tokens = table_has_column(conn, "token_usage", "output_tokens");
2074 let has_cache_read_tokens = table_has_column(conn, "token_usage", "cache_read_tokens");
2075 let has_cache_creation_tokens = table_has_column(conn, "token_usage", "cache_creation_tokens");
2076 let has_thinking_tokens = table_has_column(conn, "token_usage", "thinking_tokens");
2077 let has_estimated_cost = table_has_column(conn, "token_usage", "estimated_cost_usd");
2078 let has_role = table_has_column(conn, "token_usage", "role");
2079 let has_content_chars = table_has_column(conn, "token_usage", "content_chars");
2080 let has_tool_call_count = table_has_column(conn, "token_usage", "tool_call_count");
2081
2082 let sum_or_zero = |expr: &str, present: bool| {
2083 if present {
2084 format!("SUM(COALESCE({expr}, 0))")
2085 } else {
2086 "SUM(0)".to_string()
2089 }
2090 };
2091 let count_role = |role: &str| {
2092 if has_role {
2093 format!("SUM(CASE WHEN tu.role = '{}' THEN 1 ELSE 0 END)", role)
2094 } else {
2095 "SUM(0)".to_string()
2096 }
2097 };
2098
2099 let (token_usage_from_sql, token_usage_agent_sql, token_usage_source_sql) =
2100 token_usage_from_sql_agent_and_source_sql(conn);
2101 let token_usage_agent_sql = token_usage_agent_sql_or_unknown(token_usage_agent_sql);
2102 let key_sql = match dim {
2103 Dim::Model => normalized_analytics_model_family_sql_expr("tu.model_family"),
2104 Dim::Agent => token_usage_agent_sql.clone(),
2105 Dim::Source => token_usage_source_sql.clone(),
2106 _ => unreachable!(
2107 "track-b token_usage fallback only supports model, agent, and source breakdowns"
2108 ),
2109 };
2110 let input_sql = sum_or_zero("tu.input_tokens", has_input_tokens);
2111 let output_sql = sum_or_zero("tu.output_tokens", has_output_tokens);
2112 let cache_read_sql = sum_or_zero("tu.cache_read_tokens", has_cache_read_tokens);
2113 let cache_creation_sql = sum_or_zero("tu.cache_creation_tokens", has_cache_creation_tokens);
2114 let thinking_sql = sum_or_zero("tu.thinking_tokens", has_thinking_tokens);
2115 let total_sql = sum_or_zero("tu.total_tokens", true);
2116 let content_chars_sql = sum_or_zero("tu.content_chars", has_content_chars);
2117 let estimated_cost_sql = if has_estimated_cost {
2118 "SUM(COALESCE(tu.estimated_cost_usd, 0.0))".to_string()
2119 } else {
2120 "0.0".to_string()
2121 };
2122 let tool_calls_sql = sum_or_zero("tu.tool_call_count", has_tool_call_count);
2123 let user_count_sql = count_role("user");
2124 let assistant_count_sql = count_role("assistant");
2125
2126 let order_sql = match metric {
2127 Metric::ApiTotal => total_sql.clone(),
2128 Metric::ApiInput => input_sql.clone(),
2129 Metric::ApiOutput => output_sql.clone(),
2130 Metric::CacheRead => cache_read_sql.clone(),
2131 Metric::CacheCreation => cache_creation_sql.clone(),
2132 Metric::Thinking => thinking_sql.clone(),
2133 Metric::ContentEstTotal => content_chars_sql.clone(),
2134 Metric::ToolCalls => tool_calls_sql.clone(),
2135 Metric::PlanCount | Metric::CoveragePct | Metric::MessageCount => "COUNT(*)".to_string(),
2136 Metric::EstimatedCostUsd => estimated_cost_sql.clone(),
2137 };
2138
2139 let token_usage_time_sql = token_usage_time_sql(conn);
2140 let time_column = token_usage_time_sql
2141 .as_deref()
2142 .map(AnalyticsTimeColumn::TimestampMs)
2143 .unwrap_or(AnalyticsTimeColumn::Day("tu.day_id"));
2144 let (where_sql, params) = build_filtered_where_sql(
2145 filter,
2146 Some("tu.workspace_id"),
2147 Some(token_usage_agent_sql.clone()),
2148 token_usage_source_sql,
2149 Some(time_column),
2150 );
2151
2152 let sql = format!(
2153 "SELECT {key_sql},
2154 COUNT(*),
2155 {user_count_sql},
2156 {assistant_count_sql},
2157 {tool_calls_sql},
2158 {input_sql},
2159 {output_sql},
2160 {cache_read_sql},
2161 {cache_creation_sql},
2162 {thinking_sql},
2163 {total_sql},
2164 {content_chars_sql},
2165 {estimated_cost_sql}
2166 FROM {token_usage_from_sql}
2167 {where_sql}
2168 GROUP BY {key_sql}
2169 ORDER BY {order_sql} DESC
2170 LIMIT {limit}"
2171 );
2172
2173 let raw_rows = conn
2174 .query_map_collect(&sql, ¶ms, |row: &Row| {
2175 let get_i64 = |idx| {
2176 row.get_typed::<i64>(idx)
2177 .or_else(|_| row.get_typed::<f64>(idx).map(|value| value.round() as i64))
2178 };
2179 let get_f64 = |idx| {
2180 row.get_typed::<f64>(idx)
2181 .or_else(|_| row.get_typed::<i64>(idx).map(|value| value as f64))
2182 };
2183
2184 let key: String = row.get_typed(0)?;
2185 let message_count: i64 = get_i64(1)?;
2186 let user_message_count: i64 = get_i64(2)?;
2187 let assistant_message_count: i64 = get_i64(3)?;
2188 let tool_call_count: i64 = get_i64(4)?;
2189 let api_input_tokens_total: i64 = get_i64(5)?;
2190 let api_output_tokens_total: i64 = get_i64(6)?;
2191 let api_cache_read_tokens_total: i64 = get_i64(7)?;
2192 let api_cache_creation_tokens_total: i64 = get_i64(8)?;
2193 let api_thinking_tokens_total: i64 = get_i64(9)?;
2194 let api_tokens_total: i64 = get_i64(10)?;
2195 let total_content_chars: i64 = get_i64(11)?;
2196 let estimated_cost_usd: f64 = get_f64(12)?;
2197 let bucket = UsageBucket {
2198 message_count,
2199 user_message_count,
2200 assistant_message_count,
2201 tool_call_count,
2202 api_coverage_message_count: message_count,
2203 content_tokens_est_total: total_content_chars / 4,
2204 api_tokens_total,
2205 api_input_tokens_total,
2206 api_output_tokens_total,
2207 api_cache_read_tokens_total,
2208 api_cache_creation_tokens_total,
2209 api_thinking_tokens_total,
2210 estimated_cost_usd,
2211 ..Default::default()
2212 };
2213 Ok((key, bucket))
2214 })
2215 .map_err(|e| analytics_query_error("Breakdown query failed", e))?;
2216
2217 let rows = raw_rows
2218 .into_iter()
2219 .map(|(key, bucket)| {
2220 let value = match metric {
2221 Metric::ApiTotal => bucket.api_tokens_total,
2222 Metric::ApiInput => bucket.api_input_tokens_total,
2223 Metric::ApiOutput => bucket.api_output_tokens_total,
2224 Metric::CacheRead => bucket.api_cache_read_tokens_total,
2225 Metric::CacheCreation => bucket.api_cache_creation_tokens_total,
2226 Metric::Thinking => bucket.api_thinking_tokens_total,
2227 Metric::ContentEstTotal => bucket.content_tokens_est_total,
2228 Metric::ToolCalls => bucket.tool_call_count,
2229 Metric::PlanCount => 0,
2230 Metric::CoveragePct => {
2231 super::derive::safe_pct(bucket.api_coverage_message_count, bucket.message_count)
2232 .round() as i64
2233 }
2234 Metric::MessageCount => bucket.message_count,
2235 Metric::EstimatedCostUsd => bucket.estimated_cost_usd.round() as i64,
2236 };
2237 breakdown_row_with_value(key, bucket, value)
2238 })
2239 .collect();
2240
2241 Ok(BreakdownResult {
2242 rows,
2243 dim,
2244 metric,
2245 source_table: "token_usage".into(),
2246 elapsed_ms: query_start.elapsed().as_millis() as u64,
2247 })
2248}
2249
2250fn query_track_a_breakdown_from_raw(
2251 conn: &Connection,
2252 filter: &AnalyticsFilter,
2253 dim: Dim,
2254 metric: Metric,
2255 limit: usize,
2256 query_start: std::time::Instant,
2257) -> AnalyticsResult<BreakdownResult> {
2258 let has_agents = table_exists(conn, "agents");
2259 let has_origin_host = table_has_column(conn, "conversations", "origin_host");
2260 let canonical_message_metrics_sql = canonical_message_metrics_from_sql(conn);
2261 let join_message_metrics =
2262 !matches!(metric, Metric::MessageCount) && canonical_message_metrics_sql.is_some();
2263 let message_metrics_has_message_id = table_exists(conn, "message_metrics")
2264 && table_has_column(conn, "message_metrics", "message_id");
2265 let has_api_data_source =
2266 join_message_metrics && table_has_column(conn, "message_metrics", "api_data_source");
2267 let has_tool_call_count =
2268 join_message_metrics && table_has_column(conn, "message_metrics", "tool_call_count");
2269 let has_has_plan =
2270 join_message_metrics && table_has_column(conn, "message_metrics", "has_plan");
2271 let has_message_metrics_created_at = message_metrics_has_message_id
2272 && table_has_column(conn, "message_metrics", "created_at_ms");
2273
2274 let source_id_sql = "TRIM(COALESCE(c.source_id, ''))";
2275 let origin_host_sql = if has_origin_host {
2276 "TRIM(COALESCE(c.origin_host, ''))"
2277 } else {
2278 "''"
2279 };
2280
2281 let mut from_sql = String::from("messages m JOIN conversations c ON c.id = m.conversation_id");
2282 if has_agents {
2283 from_sql.push_str(" LEFT JOIN agents a ON a.id = c.agent_id");
2284 }
2285 if let Some(message_metrics_sql) = &canonical_message_metrics_sql {
2286 from_sql.push_str(" LEFT JOIN ");
2287 from_sql.push_str(message_metrics_sql);
2288 from_sql.push_str(" ON mm.message_id = m.id");
2289 }
2290
2291 let filter_for_sql = AnalyticsFilter {
2292 source: SourceFilter::All,
2293 ..filter.clone()
2294 };
2295 let message_time_sql = if has_message_metrics_created_at {
2296 if join_message_metrics {
2297 "COALESCE(m.created_at, mm.created_at_ms, c.started_at, 0)"
2298 } else {
2299 "COALESCE(m.created_at, (SELECT MAX(message_metrics.created_at_ms) FROM message_metrics WHERE message_metrics.message_id = m.id), c.started_at, 0)"
2300 }
2301 } else {
2302 "COALESCE(m.created_at, c.started_at, 0)"
2303 };
2304 let (where_sql, params) = build_filtered_where_sql(
2305 &filter_for_sql,
2306 Some("c.workspace_id"),
2307 has_agents.then(|| normalized_analytics_agent_sql_expr("a.slug")),
2308 sql_string_literal("all"),
2309 Some(AnalyticsTimeColumn::TimestampMs(message_time_sql)),
2310 );
2311
2312 let dim_key_expr = match dim {
2313 Dim::Source => "''".to_string(),
2314 Dim::Agent => {
2315 if has_agents {
2316 normalized_analytics_agent_sql_expr("a.slug")
2317 } else {
2318 "'unknown'".to_string()
2319 }
2320 }
2321 Dim::Workspace => "CAST(COALESCE(c.workspace_id, 0) AS TEXT)".to_string(),
2322 Dim::Model => unreachable!("track A raw breakdown does not support model dimension"),
2323 };
2324 let content_tokens_expr = if join_message_metrics {
2325 "COALESCE(mm.content_tokens_est, 0)"
2326 } else {
2327 "0"
2328 };
2329 let api_input_expr = if join_message_metrics {
2330 "COALESCE(mm.api_input_tokens, 0)"
2331 } else {
2332 "0"
2333 };
2334 let api_output_expr = if join_message_metrics {
2335 "COALESCE(mm.api_output_tokens, 0)"
2336 } else {
2337 "0"
2338 };
2339 let api_cache_read_expr = if join_message_metrics {
2340 "COALESCE(mm.api_cache_read_tokens, 0)"
2341 } else {
2342 "0"
2343 };
2344 let api_cache_creation_expr = if join_message_metrics {
2345 "COALESCE(mm.api_cache_creation_tokens, 0)"
2346 } else {
2347 "0"
2348 };
2349 let api_thinking_expr = if join_message_metrics {
2350 "COALESCE(mm.api_thinking_tokens, 0)"
2351 } else {
2352 "0"
2353 };
2354 let api_covered_expr = if has_api_data_source {
2355 "CASE WHEN mm.api_data_source = 'api' THEN 1 ELSE 0 END"
2356 } else {
2357 "0"
2358 };
2359 let tool_call_expr = if has_tool_call_count {
2360 "COALESCE(mm.tool_call_count, 0)"
2361 } else {
2362 "0"
2363 };
2364 let has_plan_expr = if has_has_plan {
2365 "CASE WHEN COALESCE(mm.has_plan, 0) != 0 THEN 1 ELSE 0 END"
2366 } else {
2367 "0"
2368 };
2369
2370 let sql = format!(
2371 "SELECT m.conversation_id,
2372 {dim_key_expr},
2373 m.role,
2374 {content_tokens_expr},
2375 {api_input_expr},
2376 {api_output_expr},
2377 {api_cache_read_expr},
2378 {api_cache_creation_expr},
2379 {api_thinking_expr},
2380 {api_covered_expr},
2381 {tool_call_expr},
2382 {has_plan_expr},
2383 {source_id_sql},
2384 {origin_host_sql}
2385 FROM {from_sql}{where_sql}"
2386 );
2387
2388 let row_buckets = conn
2389 .query_map_collect(&sql, ¶ms, |row: &Row| {
2390 let dim_key: String = row.get_typed(1)?;
2392 let role: String = row.get_typed(2)?;
2393 let content_tokens_est: i64 = row.get_typed(3)?;
2394 let api_input_tokens: i64 = row.get_typed(4)?;
2395 let api_output_tokens: i64 = row.get_typed(5)?;
2396 let api_cache_read_tokens: i64 = row.get_typed(6)?;
2397 let api_cache_creation_tokens: i64 = row.get_typed(7)?;
2398 let api_thinking_tokens: i64 = row.get_typed(8)?;
2399 let api_covered: i64 = row.get_typed(9)?;
2400 let tool_call_count: i64 = row.get_typed(10)?;
2401 let has_plan: i64 = row.get_typed(11)?;
2402 let source_id: String = row.get_typed(12)?;
2403 let origin_host: String = row.get_typed(13)?;
2404
2405 let mut bucket = UsageBucket {
2406 message_count: 1,
2407 user_message_count: i64::from(role == "user"),
2408 assistant_message_count: i64::from(role == "assistant"),
2409 tool_call_count,
2410 plan_message_count: has_plan,
2411 api_coverage_message_count: api_covered,
2412 content_tokens_est_total: content_tokens_est,
2413 content_tokens_est_user: if role == "user" {
2414 content_tokens_est
2415 } else {
2416 0
2417 },
2418 content_tokens_est_assistant: if role == "assistant" {
2419 content_tokens_est
2420 } else {
2421 0
2422 },
2423 api_input_tokens_total: api_input_tokens,
2424 api_output_tokens_total: api_output_tokens,
2425 api_cache_read_tokens_total: api_cache_read_tokens,
2426 api_cache_creation_tokens_total: api_cache_creation_tokens,
2427 api_thinking_tokens_total: api_thinking_tokens,
2428 ..Default::default()
2429 };
2430 bucket.api_tokens_total = api_input_tokens
2431 + api_output_tokens
2432 + api_cache_read_tokens
2433 + api_cache_creation_tokens
2434 + api_thinking_tokens;
2435 Ok((source_id, origin_host, dim_key, bucket))
2436 })
2437 .map_err(|e| analytics_query_error("Breakdown query failed", e))?;
2438
2439 let mut grouped_buckets: BTreeMap<String, UsageBucket> = BTreeMap::new();
2440 for (source_id, origin_host, dim_key, bucket) in row_buckets {
2441 let normalized_source_key =
2442 normalized_analytics_source_identity_value(&source_id, &origin_host);
2443 if !analytics_source_filter_matches_key(&filter.source, &normalized_source_key) {
2444 continue;
2445 }
2446
2447 let group_key = if matches!(dim, Dim::Source) {
2448 normalized_source_key
2449 } else {
2450 dim_key
2451 };
2452 grouped_buckets.entry(group_key).or_default().merge(&bucket);
2453 }
2454
2455 let mut rows: Vec<BreakdownRow> = grouped_buckets
2456 .into_iter()
2457 .map(|(key, bucket)| {
2458 let value = match metric {
2459 Metric::ApiTotal => bucket.api_tokens_total,
2460 Metric::ApiInput => bucket.api_input_tokens_total,
2461 Metric::ApiOutput => bucket.api_output_tokens_total,
2462 Metric::CacheRead => bucket.api_cache_read_tokens_total,
2463 Metric::CacheCreation => bucket.api_cache_creation_tokens_total,
2464 Metric::Thinking => bucket.api_thinking_tokens_total,
2465 Metric::ContentEstTotal => bucket.content_tokens_est_total,
2466 Metric::ToolCalls => bucket.tool_call_count,
2467 Metric::PlanCount => bucket.plan_message_count,
2468 Metric::CoveragePct => {
2469 super::derive::safe_pct(bucket.api_coverage_message_count, bucket.message_count)
2470 .round() as i64
2471 }
2472 Metric::MessageCount => bucket.message_count,
2473 Metric::EstimatedCostUsd => bucket.estimated_cost_usd.round() as i64,
2474 };
2475 breakdown_row_with_value(key, bucket, value)
2476 })
2477 .collect();
2478
2479 rows.sort_by(|a, b| b.value.cmp(&a.value).then_with(|| a.key.cmp(&b.key)));
2480 rows.truncate(limit);
2481
2482 Ok(BreakdownResult {
2483 rows,
2484 dim,
2485 metric,
2486 source_table: if join_message_metrics {
2487 "message_metrics".into()
2488 } else {
2489 "messages".into()
2490 },
2491 elapsed_ms: query_start.elapsed().as_millis() as u64,
2492 })
2493}
2494
2495pub fn query_breakdown(
2501 conn: &Connection,
2502 filter: &AnalyticsFilter,
2503 dim: Dim,
2504 metric: Metric,
2505 limit: usize,
2506) -> AnalyticsResult<BreakdownResult> {
2507 let query_start = std::time::Instant::now();
2508
2509 let (table, dim_col, use_track_b) = breakdown_route(dim, metric);
2512 if use_track_b
2513 && token_usage_supports_track_b_metric(conn, metric)
2514 && track_b_breakdown_requires_token_usage_fallback(filter, dim)
2515 {
2516 return query_track_b_breakdown_from_token_usage(
2517 conn,
2518 filter,
2519 dim,
2520 metric,
2521 limit,
2522 query_start,
2523 );
2524 }
2525 if !use_track_b
2526 && track_a_breakdown_supports_raw_metric(conn, metric)
2527 && track_a_breakdown_requires_raw_fallback(filter, dim)
2528 {
2529 return query_track_a_breakdown_from_raw(conn, filter, dim, metric, limit, query_start);
2530 }
2531 let dim_col_sql = match dim {
2532 Dim::Source => normalized_analytics_source_id_sql_expr(dim_col),
2533 Dim::Agent => normalized_analytics_agent_sql_expr(dim_col),
2534 _ => dim_col.to_string(),
2535 };
2536
2537 if !table_exists(conn, table) {
2538 return Ok(BreakdownResult {
2539 rows: vec![],
2540 dim,
2541 metric,
2542 source_table: table.into(),
2543 elapsed_ms: query_start.elapsed().as_millis() as u64,
2544 });
2545 }
2546
2547 let filter_for_sql = if matches!(dim, Dim::Source) {
2549 AnalyticsFilter {
2550 source: SourceFilter::All,
2551 ..filter.clone()
2552 }
2553 } else {
2554 filter.clone()
2555 };
2556 let (day_min, day_max) = bucketing::resolve_day_range(filter);
2557 let (dim_parts, dim_params) = build_where_parts(
2558 &filter_for_sql,
2559 if use_track_b {
2560 None
2561 } else {
2562 Some("workspace_id")
2563 },
2564 );
2565 let mut where_parts = dim_parts;
2566 let mut bind_values = dim_params;
2567
2568 if let Some(min) = day_min {
2569 bind_values.push(ParamValue::from(min));
2570 where_parts.push(format!("day_id >= ?{}", bind_values.len()));
2571 }
2572 if let Some(max) = day_max {
2573 bind_values.push(ParamValue::from(max));
2574 where_parts.push(format!("day_id <= ?{}", bind_values.len()));
2575 }
2576
2577 if use_track_b {
2580 where_parts.push("model_family != 'all'".into());
2581 where_parts.push("agent_slug != 'all'".into());
2582 where_parts.push("source_id != 'all'".into());
2583 }
2584
2585 let where_clause = if where_parts.is_empty() {
2586 String::new()
2587 } else {
2588 format!(" WHERE {}", where_parts.join(" AND "))
2589 };
2590
2591 let sql_limit = if matches!(dim, Dim::Source) && !matches!(filter.source, SourceFilter::All) {
2596 None
2597 } else {
2598 Some(limit)
2599 };
2600 let sql = if use_track_b {
2601 build_breakdown_sql_track_b(&dim_col_sql, &metric, &where_clause, sql_limit)
2603 } else {
2604 let has_plan_token_rollups = table_has_plan_token_rollups(conn, "usage_daily");
2606 build_breakdown_sql_track_a(
2607 &dim_col_sql,
2608 &metric,
2609 &where_clause,
2610 sql_limit,
2611 has_plan_token_rollups,
2612 )
2613 };
2614
2615 let param_values: Vec<ParamValue> = bind_values.clone();
2616
2617 let mut rows = if use_track_b {
2618 read_breakdown_rows_track_b(conn, &sql, ¶m_values, &metric)?
2619 } else {
2620 read_breakdown_rows_track_a(conn, &sql, ¶m_values, &metric)?
2621 };
2622
2623 if matches!(dim, Dim::Source) {
2624 rows.retain(|row| analytics_source_filter_matches_key(&filter.source, &row.key));
2625 rows.truncate(limit);
2626 }
2627
2628 let elapsed_ms = query_start.elapsed().as_millis() as u64;
2629
2630 Ok(BreakdownResult {
2631 rows,
2632 dim,
2633 metric,
2634 source_table: table.into(),
2635 elapsed_ms,
2636 })
2637}
2638
2639fn build_breakdown_sql_track_a(
2641 dim_col: &str,
2642 metric: &Metric,
2643 where_clause: &str,
2644 limit: Option<usize>,
2645 has_plan_token_rollups: bool,
2646) -> String {
2647 let (sort_value_sql, order_by_expr) = match metric {
2648 Metric::CoveragePct => (
2649 "SUM(api_coverage_message_count)".to_string(),
2650 "CASE
2651 WHEN SUM(message_count) = 0 THEN 0.0
2652 ELSE CAST(SUM(api_coverage_message_count) AS REAL) / CAST(SUM(message_count) AS REAL)
2653 END"
2654 .to_string(),
2655 ),
2656 _ => {
2657 let order_col = metric.rollup_column().unwrap_or("api_tokens_total");
2658 (format!("SUM({order_col})"), format!("SUM({order_col})"))
2659 }
2660 };
2661 let plan_content_expr = if has_plan_token_rollups {
2662 "SUM(plan_content_tokens_est_total)"
2663 } else {
2664 "0"
2665 };
2666 let plan_api_expr = if has_plan_token_rollups {
2667 "SUM(plan_api_tokens_total)"
2668 } else {
2669 "0"
2670 };
2671 let limit_clause = limit
2672 .map(|limit| {
2673 format!(
2674 "
2675 LIMIT {limit}"
2676 )
2677 })
2678 .unwrap_or_default();
2679 format!(
2680 "SELECT CAST({dim_col} AS TEXT),
2681 SUM(message_count),
2682 SUM(user_message_count),
2683 SUM(assistant_message_count),
2684 SUM(tool_call_count),
2685 SUM(plan_message_count),
2686 {plan_content_expr},
2687 {plan_api_expr},
2688 SUM(api_coverage_message_count),
2689 SUM(content_tokens_est_total),
2690 SUM(content_tokens_est_user),
2691 SUM(content_tokens_est_assistant),
2692 SUM(api_tokens_total),
2693 SUM(api_input_tokens_total),
2694 SUM(api_output_tokens_total),
2695 SUM(api_cache_read_tokens_total),
2696 SUM(api_cache_creation_tokens_total),
2697 SUM(api_thinking_tokens_total),
2698 {sort_value_sql}
2699 FROM (
2700 SELECT * FROM usage_daily
2701 {where_clause}
2702 ) filtered_usage_daily
2703 GROUP BY CAST({dim_col} AS TEXT)
2704 ORDER BY {order_by_expr} DESC, CAST({dim_col} AS TEXT) ASC{limit_clause}"
2705 )
2706}
2707
2708fn build_breakdown_sql_track_b(
2710 dim_col: &str,
2711 metric: &Metric,
2712 where_clause: &str,
2713 limit: Option<usize>,
2714) -> String {
2715 let order_col = match metric {
2717 Metric::ApiTotal => "grand_total_tokens",
2718 Metric::ApiInput => "total_input_tokens",
2719 Metric::ApiOutput => "total_output_tokens",
2720 Metric::CacheRead => "total_cache_read_tokens",
2721 Metric::CacheCreation => "total_cache_creation_tokens",
2722 Metric::Thinking => "total_thinking_tokens",
2723 Metric::ContentEstTotal => "total_content_chars",
2724 Metric::ToolCalls => "total_tool_calls",
2725 Metric::PlanCount => "api_call_count",
2728 Metric::CoveragePct => "api_call_count",
2730 Metric::MessageCount => "api_call_count",
2731 Metric::EstimatedCostUsd => "estimated_cost_usd",
2732 };
2733 let limit_clause = limit
2734 .map(|limit| {
2735 format!(
2736 "
2737 LIMIT {limit}"
2738 )
2739 })
2740 .unwrap_or_default();
2741 format!(
2742 "SELECT {dim_col},
2743 SUM(api_call_count),
2744 SUM(user_message_count),
2745 SUM(assistant_message_count),
2746 SUM(total_tool_calls),
2747 SUM(total_input_tokens),
2748 SUM(total_output_tokens),
2749 SUM(total_cache_read_tokens),
2750 SUM(total_cache_creation_tokens),
2751 SUM(total_thinking_tokens),
2752 SUM(grand_total_tokens),
2753 SUM(total_content_chars),
2754 SUM(estimated_cost_usd),
2755 SUM({order_col})
2756 FROM (
2757 SELECT * FROM token_daily_stats
2758 {where_clause}
2759 ) filtered_token_daily_stats
2760 GROUP BY {dim_col}
2761 ORDER BY SUM({order_col}) DESC{limit_clause}"
2762 )
2763}
2764
2765fn read_breakdown_rows_track_a(
2767 conn: &Connection,
2768 sql: &str,
2769 params: &[ParamValue],
2770 metric: &Metric,
2771) -> AnalyticsResult<Vec<BreakdownRow>> {
2772 let raw_rows = conn
2773 .query_map_collect(sql, params, |row: &Row| {
2774 let key: String = row.get_typed(0)?;
2775 let bucket = UsageBucket {
2776 message_count: row.get_typed(1)?,
2777 user_message_count: row.get_typed(2)?,
2778 assistant_message_count: row.get_typed(3)?,
2779 tool_call_count: row.get_typed(4)?,
2780 plan_message_count: row.get_typed(5)?,
2781 plan_content_tokens_est_total: row.get_typed(6)?,
2782 plan_api_tokens_total: row.get_typed(7)?,
2783 api_coverage_message_count: row.get_typed(8)?,
2784 content_tokens_est_total: row.get_typed(9)?,
2785 content_tokens_est_user: row.get_typed(10)?,
2786 content_tokens_est_assistant: row.get_typed(11)?,
2787 api_tokens_total: row.get_typed(12)?,
2788 api_input_tokens_total: row.get_typed(13)?,
2789 api_output_tokens_total: row.get_typed(14)?,
2790 api_cache_read_tokens_total: row.get_typed(15)?,
2791 api_cache_creation_tokens_total: row.get_typed(16)?,
2792 api_thinking_tokens_total: row.get_typed(17)?,
2793 ..Default::default()
2794 };
2795 let sort_value: i64 = row.get_typed(18)?;
2796 Ok((key, bucket, sort_value))
2797 })
2798 .map_err(|e| analytics_query_error("Breakdown query failed", e))?;
2799
2800 let mut result = Vec::new();
2801 for (key, bucket, sort_value) in raw_rows {
2802 let value = match metric {
2804 Metric::CoveragePct => {
2805 let pct = super::derive::safe_pct(
2806 bucket.api_coverage_message_count,
2807 bucket.message_count,
2808 );
2809 pct.round() as i64
2810 }
2811 Metric::EstimatedCostUsd => 0,
2813 _ => sort_value,
2814 };
2815 result.push(breakdown_row_with_value(key, bucket, value));
2816 }
2817 Ok(result)
2818}
2819
2820fn read_breakdown_rows_track_b(
2822 conn: &Connection,
2823 sql: &str,
2824 params: &[ParamValue],
2825 metric: &Metric,
2826) -> AnalyticsResult<Vec<BreakdownRow>> {
2827 let raw_rows = conn
2828 .query_map_collect(sql, params, |row: &Row| {
2829 let key: String = row.get_typed(0)?;
2830 let api_call_count: i64 = row.get_typed(1)?;
2831 let user_message_count: i64 = row.get_typed(2)?;
2832 let assistant_message_count: i64 = row.get_typed(3)?;
2833 let total_tool_calls: i64 = row.get_typed(4)?;
2834 let total_input: i64 = row.get_typed(5)?;
2835 let total_output: i64 = row.get_typed(6)?;
2836 let total_cache_read: i64 = row.get_typed(7)?;
2837 let total_cache_creation: i64 = row.get_typed(8)?;
2838 let total_thinking: i64 = row.get_typed(9)?;
2839 let grand_total: i64 = row.get_typed(10)?;
2840 let total_content_chars: i64 = row.get_typed(11)?;
2841 let estimated_cost: f64 = row.get_typed(12)?;
2842 let sort_value: i64 = match row.get_typed::<f64>(13) {
2846 Ok(v) => v.round() as i64,
2847 Err(_) => row.get_typed(13)?,
2848 };
2849
2850 let bucket = UsageBucket {
2852 message_count: api_call_count,
2853 user_message_count,
2854 assistant_message_count,
2855 tool_call_count: total_tool_calls,
2856 api_coverage_message_count: api_call_count, content_tokens_est_total: total_content_chars / 4, api_tokens_total: grand_total,
2859 api_input_tokens_total: total_input,
2860 api_output_tokens_total: total_output,
2861 api_cache_read_tokens_total: total_cache_read,
2862 api_cache_creation_tokens_total: total_cache_creation,
2863 api_thinking_tokens_total: total_thinking,
2864 estimated_cost_usd: estimated_cost,
2865 ..Default::default()
2866 };
2867
2868 Ok((key, bucket, sort_value))
2869 })
2870 .map_err(|e| analytics_query_error("Breakdown query failed", e))?;
2871
2872 let mut result = Vec::new();
2873 for (key, bucket, sort_value) in raw_rows {
2874 let value = match metric {
2875 Metric::CoveragePct => {
2876 super::derive::safe_pct(bucket.api_coverage_message_count, bucket.message_count)
2877 .round() as i64
2878 }
2879 Metric::ContentEstTotal => bucket.content_tokens_est_total,
2880 Metric::PlanCount => 0,
2881 _ => sort_value,
2882 };
2883 result.push(breakdown_row_with_value(key, bucket, value));
2884 }
2885 Ok(result)
2886}
2887
2888fn query_tools_from_raw(
2897 conn: &Connection,
2898 filter: &AnalyticsFilter,
2899 query_start: std::time::Instant,
2900 limit: usize,
2901) -> AnalyticsResult<ToolReport> {
2902 let has_agents = table_exists(conn, "agents");
2903 let has_origin_host = table_has_column(conn, "conversations", "origin_host");
2904 let canonical_message_metrics_sql = canonical_message_metrics_from_sql(conn);
2905 let has_message_metrics_created_at = canonical_message_metrics_sql.is_some()
2906 && table_has_column(conn, "message_metrics", "created_at_ms");
2907 let has_content_tokens_est = table_has_column(conn, "message_metrics", "content_tokens_est");
2908 let has_api_input_tokens = table_has_column(conn, "message_metrics", "api_input_tokens");
2909 let has_api_output_tokens = table_has_column(conn, "message_metrics", "api_output_tokens");
2910 let has_api_cache_read_tokens =
2911 table_has_column(conn, "message_metrics", "api_cache_read_tokens");
2912 let has_api_cache_creation_tokens =
2913 table_has_column(conn, "message_metrics", "api_cache_creation_tokens");
2914 let has_api_thinking_tokens = table_has_column(conn, "message_metrics", "api_thinking_tokens");
2915
2916 let source_id_sql = "TRIM(COALESCE(c.source_id, ''))";
2917 let origin_host_sql = if has_origin_host {
2918 "TRIM(COALESCE(c.origin_host, ''))"
2919 } else {
2920 "''"
2921 };
2922
2923 let mut from_sql = String::from("messages m JOIN conversations c ON c.id = m.conversation_id");
2924 if has_agents {
2925 from_sql.push_str(" LEFT JOIN agents a ON a.id = c.agent_id");
2926 }
2927 if let Some(message_metrics_sql) = &canonical_message_metrics_sql {
2928 from_sql.push_str(" LEFT JOIN ");
2929 from_sql.push_str(message_metrics_sql);
2930 from_sql.push_str(" ON mm.message_id = m.id");
2931 }
2932
2933 let filter_for_sql = AnalyticsFilter {
2934 source: SourceFilter::All,
2935 ..filter.clone()
2936 };
2937 let message_time_sql = if has_message_metrics_created_at {
2938 "COALESCE(m.created_at, mm.created_at_ms, c.started_at, 0)"
2939 } else {
2940 "COALESCE(m.created_at, c.started_at, 0)"
2941 };
2942 let (where_sql, params) = build_filtered_where_sql(
2943 &filter_for_sql,
2944 Some("c.workspace_id"),
2945 has_agents.then(|| normalized_analytics_agent_sql_expr("a.slug")),
2946 sql_string_literal("all"),
2947 Some(AnalyticsTimeColumn::TimestampMs(message_time_sql)),
2948 );
2949
2950 let agent_sql = if has_agents {
2951 normalized_analytics_agent_sql_expr("a.slug")
2952 } else {
2953 "'unknown'".to_string()
2954 };
2955 let tool_call_expr = "COALESCE(mm.tool_call_count, 0)";
2956 let content_tokens_expr = if has_content_tokens_est {
2957 "COALESCE(mm.content_tokens_est, 0)"
2958 } else {
2959 "0"
2960 };
2961 let api_input_expr = if has_api_input_tokens {
2962 "COALESCE(mm.api_input_tokens, 0)"
2963 } else {
2964 "0"
2965 };
2966 let api_output_expr = if has_api_output_tokens {
2967 "COALESCE(mm.api_output_tokens, 0)"
2968 } else {
2969 "0"
2970 };
2971 let api_cache_read_expr = if has_api_cache_read_tokens {
2972 "COALESCE(mm.api_cache_read_tokens, 0)"
2973 } else {
2974 "0"
2975 };
2976 let api_cache_creation_expr = if has_api_cache_creation_tokens {
2977 "COALESCE(mm.api_cache_creation_tokens, 0)"
2978 } else {
2979 "0"
2980 };
2981 let api_thinking_expr = if has_api_thinking_tokens {
2982 "COALESCE(mm.api_thinking_tokens, 0)"
2983 } else {
2984 "0"
2985 };
2986 let api_tokens_expr = format!(
2987 "({api_input_expr} + {api_output_expr} + {api_cache_read_expr} + {api_cache_creation_expr} + {api_thinking_expr})"
2988 );
2989
2990 let sql = format!(
2991 "SELECT m.conversation_id,
2992 {agent_sql},
2993 {tool_call_expr},
2994 {content_tokens_expr},
2995 {api_tokens_expr},
2996 {source_id_sql},
2997 {origin_host_sql}
2998 FROM {from_sql}{where_sql}"
2999 );
3000
3001 let raw_rows = conn
3002 .query_map_collect(&sql, ¶ms, |row: &Row| {
3003 let key: String = row.get_typed(1)?;
3005 let tool_call_count: i64 = row.get_typed(2)?;
3006 let content_tokens_est: i64 = row.get_typed(3)?;
3007 let api_tokens_total: i64 = row.get_typed(4)?;
3008 let source_id: String = row.get_typed(5)?;
3009 let origin_host: String = row.get_typed(6)?;
3010
3011 Ok((
3012 source_id,
3013 origin_host,
3014 key,
3015 tool_call_count,
3016 content_tokens_est,
3017 api_tokens_total,
3018 ))
3019 })
3020 .map_err(|e| analytics_query_error("Tool report query failed", e))?;
3021
3022 let mut grouped_rows: BTreeMap<String, (i64, i64, i64, i64)> = BTreeMap::new();
3023 for (
3024 source_id,
3025 origin_host,
3026 key,
3027 tool_call_count,
3028 content_tokens_est_total,
3029 api_tokens_total,
3030 ) in raw_rows
3031 {
3032 let normalized_source_key =
3033 normalized_analytics_source_identity_value(&source_id, &origin_host);
3034 if !analytics_source_filter_matches_key(&filter.source, &normalized_source_key) {
3035 continue;
3036 }
3037
3038 let entry = grouped_rows.entry(key).or_default();
3039 entry.0 += tool_call_count;
3040 entry.1 += 1;
3041 entry.2 += api_tokens_total;
3042 entry.3 += content_tokens_est_total;
3043 }
3044
3045 let mut rows: Vec<ToolRow> = grouped_rows
3046 .into_iter()
3047 .map(
3048 |(
3049 key,
3050 (tool_call_count, message_count, api_tokens_total, content_tokens_est_total),
3051 )| {
3052 let tool_calls_per_1k_api_tokens = if api_tokens_total > 0 {
3053 Some(tool_call_count as f64 / (api_tokens_total as f64 / 1000.0))
3054 } else {
3055 None
3056 };
3057 let tool_calls_per_1k_content_tokens = if content_tokens_est_total > 0 {
3058 Some(tool_call_count as f64 / (content_tokens_est_total as f64 / 1000.0))
3059 } else {
3060 None
3061 };
3062 ToolRow {
3063 key,
3064 tool_call_count,
3065 message_count,
3066 api_tokens_total,
3067 tool_calls_per_1k_api_tokens,
3068 tool_calls_per_1k_content_tokens,
3069 }
3070 },
3071 )
3072 .collect();
3073
3074 rows.sort_by(|a, b| {
3075 b.tool_call_count
3076 .cmp(&a.tool_call_count)
3077 .then_with(|| a.key.cmp(&b.key))
3078 });
3079
3080 let total_tool_calls = rows.iter().map(|row| row.tool_call_count).sum();
3081 let total_messages = rows.iter().map(|row| row.message_count).sum();
3082 let total_api_tokens = rows.iter().map(|row| row.api_tokens_total).sum();
3083
3084 rows.truncate(limit);
3085
3086 Ok(ToolReport {
3087 rows,
3088 total_tool_calls,
3089 total_messages,
3090 total_api_tokens,
3091 source_table: "message_metrics".into(),
3092 elapsed_ms: query_start.elapsed().as_millis() as u64,
3093 })
3094}
3095
3096pub fn query_tools(
3097 conn: &Connection,
3098 filter: &AnalyticsFilter,
3099 group_by: GroupBy,
3100 limit: usize,
3101) -> AnalyticsResult<ToolReport> {
3102 let query_start = std::time::Instant::now();
3103
3104 if track_a_timeseries_requires_raw_fallback(filter)
3105 && track_a_tools_supports_raw_source_fallback(conn)
3106 {
3107 return query_tools_from_raw(conn, filter, query_start, limit);
3108 }
3109
3110 let (table, bucket_col) = match group_by {
3111 GroupBy::Hour => ("usage_hourly", "hour_id"),
3112 _ => ("usage_daily", "day_id"),
3113 };
3114
3115 if !table_exists(conn, table) {
3116 return Ok(ToolReport {
3117 rows: vec![],
3118 total_tool_calls: 0,
3119 total_messages: 0,
3120 total_api_tokens: 0,
3121 source_table: table.into(),
3122 elapsed_ms: query_start.elapsed().as_millis() as u64,
3123 });
3124 }
3125
3126 let (day_min, day_max) = bucketing::resolve_day_range(filter);
3128 let (hour_min, hour_max) = bucketing::resolve_hour_range(filter);
3129 let (dim_parts, dim_params) = build_where_parts(filter, Some("workspace_id"));
3130 let mut where_parts = dim_parts;
3131 let mut bind_values = dim_params;
3132
3133 match group_by {
3134 GroupBy::Hour => {
3135 if let Some(min) = hour_min {
3136 bind_values.push(ParamValue::from(min));
3137 where_parts.push(format!("{bucket_col} >= ?{}", bind_values.len()));
3138 }
3139 if let Some(max) = hour_max {
3140 bind_values.push(ParamValue::from(max));
3141 where_parts.push(format!("{bucket_col} <= ?{}", bind_values.len()));
3142 }
3143 }
3144 _ => {
3145 if let Some(min) = day_min {
3146 bind_values.push(ParamValue::from(min));
3147 where_parts.push(format!("{bucket_col} >= ?{}", bind_values.len()));
3148 }
3149 if let Some(max) = day_max {
3150 bind_values.push(ParamValue::from(max));
3151 where_parts.push(format!("{bucket_col} <= ?{}", bind_values.len()));
3152 }
3153 }
3154 }
3155
3156 let where_clause = if where_parts.is_empty() {
3157 String::new()
3158 } else {
3159 format!(" WHERE {}", where_parts.join(" AND "))
3160 };
3161
3162 let tool_agent_sql = normalized_analytics_agent_sql_expr("agent_slug");
3164 let sql = format!(
3165 "SELECT {tool_agent_sql},
3166 SUM(tool_call_count),
3167 SUM(message_count),
3168 SUM(api_tokens_total),
3169 SUM(content_tokens_est_total)
3170 FROM (
3171 SELECT * FROM {table}
3172 {where_clause}
3173 ) filtered_tool_usage
3174 GROUP BY {tool_agent_sql}
3175 ORDER BY SUM(tool_call_count) DESC"
3176 );
3177
3178 let param_values: Vec<ParamValue> = bind_values.clone();
3179
3180 let tool_rows = conn
3181 .query_map_collect(&sql, ¶m_values, |row: &Row| {
3182 let key: String = row.get_typed(0)?;
3183 let tool_call_count: i64 = row.get_typed(1)?;
3184 let message_count: i64 = row.get_typed(2)?;
3185 let api_tokens_total: i64 = row.get_typed(3)?;
3186 let content_tokens_est_total: i64 = row.get_typed(4)?;
3187
3188 let tool_calls_per_1k_api = if api_tokens_total > 0 {
3189 Some(tool_call_count as f64 / (api_tokens_total as f64 / 1000.0))
3190 } else {
3191 None
3192 };
3193 let tool_calls_per_1k_content = if content_tokens_est_total > 0 {
3194 Some(tool_call_count as f64 / (content_tokens_est_total as f64 / 1000.0))
3195 } else {
3196 None
3197 };
3198
3199 Ok(ToolRow {
3200 key,
3201 tool_call_count,
3202 message_count,
3203 api_tokens_total,
3204 tool_calls_per_1k_api_tokens: tool_calls_per_1k_api,
3205 tool_calls_per_1k_content_tokens: tool_calls_per_1k_content,
3206 })
3207 })
3208 .map_err(|e| analytics_query_error("Tool report query failed", e))?;
3209
3210 let mut rows = Vec::new();
3211 let mut total_tool_calls: i64 = 0;
3212 let mut total_messages: i64 = 0;
3213 let mut total_api_tokens: i64 = 0;
3214
3215 for r in tool_rows {
3216 total_tool_calls += r.tool_call_count;
3217 total_messages += r.message_count;
3218 total_api_tokens += r.api_tokens_total;
3219 rows.push(r);
3220 }
3221 rows.truncate(limit);
3222
3223 let elapsed_ms = query_start.elapsed().as_millis() as u64;
3224
3225 Ok(ToolReport {
3226 rows,
3227 total_tool_calls,
3228 total_messages,
3229 total_api_tokens,
3230 source_table: table.into(),
3231 elapsed_ms,
3232 })
3233}
3234
3235pub fn query_session_scatter(
3246 conn: &Connection,
3247 filter: &AnalyticsFilter,
3248 limit: usize,
3249) -> AnalyticsResult<Vec<SessionScatterPoint>> {
3250 if !table_exists(conn, "conversations") || !table_exists(conn, "messages") {
3251 return Ok(Vec::new());
3252 }
3253
3254 let has_agents = table_exists(conn, "agents");
3255 if !has_agents && !filter.agents.is_empty() {
3256 return Ok(Vec::new());
3257 }
3258
3259 let mut where_parts: Vec<String> = Vec::new();
3260 let mut bind_values: Vec<ParamValue> = Vec::new();
3261
3262 let canonical_message_metrics_sql = canonical_message_metrics_from_sql(conn);
3263 let has_message_metrics = canonical_message_metrics_sql.is_some();
3264 let has_token_usage = table_exists(conn, "token_usage");
3265 let has_mm_created_at =
3266 has_message_metrics && table_has_column(conn, "message_metrics", "created_at_ms");
3267 let has_tu_timestamp = has_token_usage && table_has_column(conn, "token_usage", "timestamp_ms");
3268
3269 if !filter.agents.is_empty() {
3271 let normalized_agent_sql = normalized_analytics_agent_sql_expr("a.slug");
3272 let agent_literals: Vec<String> = filter
3273 .agents
3274 .iter()
3275 .map(|agent| sql_string_literal(&normalized_analytics_agent_value(agent.as_str())))
3276 .collect();
3277 where_parts.push(format!(
3278 "{normalized_agent_sql} IN ({})",
3279 agent_literals.join(", ")
3280 ));
3281 }
3282
3283 let normalized_source_sql = if table_has_column(conn, "conversations", "origin_host") {
3285 normalized_analytics_source_identity_sql_expr("c.source_id", "c.origin_host")
3286 } else {
3287 normalized_analytics_source_id_sql_expr("c.source_id")
3288 };
3289 push_source_filter_clause(
3290 &mut where_parts,
3291 &mut bind_values,
3292 &filter.source,
3293 &normalized_source_sql,
3294 );
3295
3296 if !filter.workspace_ids.is_empty() {
3298 let placeholders: Vec<String> = filter
3299 .workspace_ids
3300 .iter()
3301 .map(|workspace_id| {
3302 bind_values.push(ParamValue::from(*workspace_id));
3303 format!("?{}", bind_values.len())
3304 })
3305 .collect();
3306 where_parts.push(format!(
3307 "COALESCE(c.workspace_id, 0) IN ({})",
3308 placeholders.join(", ")
3309 ));
3310 }
3311
3312 let has_conv_rollup = table_has_column(conn, "conversations", "grand_total_tokens");
3313 let has_mm_api_source =
3314 has_message_metrics && table_has_column(conn, "message_metrics", "api_data_source");
3315
3316 let message_metrics_join = canonical_message_metrics_sql
3317 .as_deref()
3318 .map(|message_metrics_sql| {
3319 format!(" LEFT JOIN {message_metrics_sql} ON mm.message_id = m.id")
3320 })
3321 .unwrap_or_default();
3322 let token_usage_join = if has_token_usage {
3323 if has_tu_timestamp {
3324 " LEFT JOIN (SELECT message_id, MAX(COALESCE(total_tokens, 0)) AS total_tokens, MAX(timestamp_ms) AS timestamp_ms FROM token_usage GROUP BY message_id) tu ON tu.message_id = m.id"
3325 } else {
3326 " LEFT JOIN (SELECT message_id, MAX(COALESCE(total_tokens, 0)) AS total_tokens FROM token_usage GROUP BY message_id) tu ON tu.message_id = m.id"
3327 }
3328 } else {
3329 ""
3330 };
3331
3332 let mm_api_sum = "COALESCE(mm.api_input_tokens, 0)
3333 + COALESCE(mm.api_output_tokens, 0)
3334 + COALESCE(mm.api_cache_read_tokens, 0)
3335 + COALESCE(mm.api_cache_creation_tokens, 0)
3336 + COALESCE(mm.api_thinking_tokens, 0)";
3337 let mm_has_api_values = "COALESCE(
3338 mm.api_input_tokens,
3339 mm.api_output_tokens,
3340 mm.api_cache_read_tokens,
3341 mm.api_cache_creation_tokens,
3342 mm.api_thinking_tokens
3343 ) IS NOT NULL";
3344 let message_token_expr = if has_message_metrics && has_token_usage {
3345 if has_mm_api_source {
3346 format!(
3347 "CASE
3348 WHEN mm.message_id IS NULL THEN COALESCE(tu.total_tokens, 0)
3349 WHEN LOWER(TRIM(COALESCE(mm.api_data_source, 'api'))) = 'estimated'
3350 THEN COALESCE(tu.total_tokens, 0)
3351 WHEN {mm_has_api_values} THEN {mm_api_sum}
3352 ELSE COALESCE(tu.total_tokens, 0)
3353 END"
3354 )
3355 } else {
3356 format!(
3357 "CASE
3358 WHEN mm.message_id IS NULL THEN COALESCE(tu.total_tokens, 0)
3359 WHEN {mm_has_api_values} THEN {mm_api_sum}
3360 ELSE COALESCE(tu.total_tokens, 0)
3361 END"
3362 )
3363 }
3364 } else if has_message_metrics {
3365 format!(
3366 "CASE
3367 WHEN mm.message_id IS NOT NULL THEN {mm_api_sum}
3368 ELSE 0
3369 END"
3370 )
3371 } else if has_token_usage {
3372 "COALESCE(tu.total_tokens, 0)".to_string()
3373 } else {
3374 "0".to_string()
3375 };
3376 let normalize_sql = |expr: &str| {
3377 format!("CASE WHEN {expr} BETWEEN 0 AND 100000000000 THEN {expr} * 1000 ELSE {expr} END")
3378 };
3379 let normalized_created_at = normalize_sql("m.created_at");
3380 let normalized_mm_created_at = normalize_sql("mm.created_at_ms");
3381 let normalized_tu_timestamp = normalize_sql("tu.timestamp_ms");
3382 let normalized_started_at = normalize_sql("c_msg.started_at");
3383 let message_timestamp_expr = match (has_mm_created_at, has_tu_timestamp) {
3384 (true, true) => format!(
3385 "CASE
3386 WHEN m.created_at IS NOT NULL THEN {normalized_created_at}
3387 WHEN mm.created_at_ms IS NOT NULL THEN {normalized_mm_created_at}
3388 WHEN tu.timestamp_ms IS NOT NULL THEN {normalized_tu_timestamp}
3389 WHEN c_msg.started_at IS NOT NULL THEN {normalized_started_at}
3390 ELSE 0
3391 END"
3392 ),
3393 (true, false) => format!(
3394 "CASE
3395 WHEN m.created_at IS NOT NULL THEN {normalized_created_at}
3396 WHEN mm.created_at_ms IS NOT NULL THEN {normalized_mm_created_at}
3397 WHEN c_msg.started_at IS NOT NULL THEN {normalized_started_at}
3398 ELSE 0
3399 END"
3400 ),
3401 (false, true) => format!(
3402 "CASE
3403 WHEN m.created_at IS NOT NULL THEN {normalized_created_at}
3404 WHEN tu.timestamp_ms IS NOT NULL THEN {normalized_tu_timestamp}
3405 WHEN c_msg.started_at IS NOT NULL THEN {normalized_started_at}
3406 ELSE 0
3407 END"
3408 ),
3409 (false, false) => format!(
3410 "CASE
3411 WHEN m.created_at IS NOT NULL THEN {normalized_created_at}
3412 WHEN c_msg.started_at IS NOT NULL THEN {normalized_started_at}
3413 ELSE 0
3414 END"
3415 ),
3416 };
3417 let per_message_sql = format!(
3418 "(SELECT m.id AS message_id,
3419 m.conversation_id AS conversation_id,
3420 {message_token_expr} AS message_api_tokens,
3421 {message_timestamp_expr} AS event_ts_ms
3422 FROM messages m
3423 JOIN conversations c_msg ON c_msg.id = m.conversation_id
3424 {message_metrics_join}
3425 {token_usage_join}) msg"
3426 );
3427 if let Some(min) = filter.since_ms {
3428 bind_values.push(ParamValue::from(min));
3429 where_parts.push(format!("msg.event_ts_ms >= ?{}", bind_values.len()));
3430 }
3431 if let Some(max) = filter.until_ms {
3432 bind_values.push(ParamValue::from(max));
3433 where_parts.push(format!("msg.event_ts_ms <= ?{}", bind_values.len()));
3434 }
3435
3436 let where_clause = if where_parts.is_empty() {
3437 String::new()
3438 } else {
3439 format!(" WHERE {}", where_parts.join(" AND "))
3440 };
3441
3442 let detailed_token_expr = "SUM(COALESCE(msg.message_api_tokens, 0))";
3443 let token_expr = if has_conv_rollup {
3444 format!(
3445 "CASE
3446 WHEN MAX(COALESCE(c.grand_total_tokens, 0)) > ({detailed_token_expr})
3447 THEN MAX(COALESCE(c.grand_total_tokens, 0))
3448 ELSE ({detailed_token_expr})
3449 END"
3450 )
3451 } else {
3452 detailed_token_expr.to_string()
3453 };
3454
3455 let agents_join = if has_agents {
3456 "LEFT JOIN agents a ON a.id = c.agent_id"
3457 } else {
3458 ""
3459 };
3460
3461 let sql = format!(
3462 "SELECT {normalized_source_sql},
3463 c.source_path,
3464 COUNT(msg.message_id) AS message_count,
3465 {token_expr} AS api_tokens_total
3466 FROM conversations c
3467 JOIN {per_message_sql} ON msg.conversation_id = c.id
3468 {agents_join}
3469 {where_clause}
3470 GROUP BY c.id, {normalized_source_sql}, c.source_path
3471 HAVING COUNT(msg.message_id) > 0
3472 ORDER BY api_tokens_total DESC, message_count DESC
3473 LIMIT {limit}"
3474 );
3475
3476 let param_values: Vec<ParamValue> = bind_values.clone();
3477
3478 let points = conn
3479 .query_map_collect(&sql, ¶m_values, |row: &Row| {
3480 Ok(SessionScatterPoint {
3481 source_id: row.get_typed(0)?,
3482 source_path: row.get_typed(1)?,
3483 message_count: row.get_typed(2)?,
3484 api_tokens_total: row.get_typed::<Option<i64>>(3)?.unwrap_or(0),
3485 })
3486 })
3487 .map_err(|e| analytics_query_error("Session scatter query failed", e))?;
3488
3489 Ok(points)
3490}
3491
3492pub fn query_unpriced_models(
3500 conn: &Connection,
3501 limit: usize,
3502) -> AnalyticsResult<UnpricedModelsReport> {
3503 if !table_exists(conn, "token_usage")
3504 || !table_has_column(conn, "token_usage", "total_tokens")
3505 || !table_has_column(conn, "token_usage", "estimated_cost_usd")
3506 {
3507 return Ok(UnpricedModelsReport {
3508 models: Vec::new(),
3509 total_unpriced_tokens: 0,
3510 total_priced_tokens: 0,
3511 });
3512 }
3513
3514 let has_model_name = table_has_column(conn, "token_usage", "model_name");
3515 let (from_sql, _, _) = token_usage_from_sql_agent_and_source_sql(conn);
3516 let models_sql = if has_model_name {
3517 format!(
3518 "SELECT CASE
3519 WHEN TRIM(COALESCE(tu.model_name, '')) = '' THEN '(none)'
3520 ELSE TRIM(COALESCE(tu.model_name, ''))
3521 END AS model,
3522 SUM(COALESCE(tu.total_tokens, 0)) AS tot,
3523 COUNT(*) AS cnt
3524 FROM {from_sql}
3525 WHERE tu.estimated_cost_usd IS NULL
3526 GROUP BY model
3527 ORDER BY tot DESC
3528 LIMIT ?1"
3529 )
3530 } else {
3531 format!(
3532 "SELECT '(none)' AS model,
3533 SUM(COALESCE(tu.total_tokens, 0)) AS tot,
3534 COUNT(*) AS cnt
3535 FROM {from_sql}
3536 WHERE tu.estimated_cost_usd IS NULL
3537 HAVING COUNT(*) > 0
3538 LIMIT ?1"
3539 )
3540 };
3541
3542 let models: Vec<UnpricedModel> = conn
3543 .query_map_collect(
3544 &models_sql,
3545 &[ParamValue::from(limit as i64)],
3546 |row: &Row| {
3547 Ok(UnpricedModel {
3548 model_name: row.get_typed(0)?,
3549 total_tokens: row.get_typed(1)?,
3550 row_count: row.get_typed(2)?,
3551 })
3552 },
3553 )
3554 .map_err(|e| AnalyticsError::Db(e.to_string()))?;
3555
3556 let total_unpriced_tokens: i64 = conn
3557 .query_row_map(
3558 &format!(
3559 "SELECT SUM(COALESCE(tu.total_tokens, 0))
3560 FROM {from_sql}
3561 WHERE tu.estimated_cost_usd IS NULL"
3562 ),
3563 &[],
3564 |r: &Row| Ok(r.get_typed::<Option<i64>>(0)?.unwrap_or(0)),
3565 )
3566 .unwrap_or(0);
3567
3568 let total_priced_tokens: i64 = conn
3569 .query_row_map(
3570 &format!(
3571 "SELECT SUM(COALESCE(tu.total_tokens, 0))
3572 FROM {from_sql}
3573 WHERE tu.estimated_cost_usd IS NOT NULL"
3574 ),
3575 &[],
3576 |r: &Row| Ok(r.get_typed::<Option<i64>>(0)?.unwrap_or(0)),
3577 )
3578 .unwrap_or(0);
3579
3580 Ok(UnpricedModelsReport {
3581 models,
3582 total_unpriced_tokens,
3583 total_priced_tokens,
3584 })
3585}
3586
3587#[cfg(test)]
3592mod tests {
3593 use super::*;
3594
3595 #[test]
3596 fn build_where_parts_empty_filter() {
3597 let f = AnalyticsFilter::default();
3598 let (parts, params) = build_where_parts(&f, None);
3599 assert!(parts.is_empty());
3600 assert!(params.is_empty());
3601 }
3602
3603 #[test]
3604 fn build_where_parts_single_agent() {
3605 let f = AnalyticsFilter {
3606 agents: vec!["claude_code".into()],
3607 ..Default::default()
3608 };
3609 let (parts, params) = build_where_parts(&f, None);
3610 assert_eq!(parts.len(), 1);
3611 assert!(parts[0].contains("TRIM(COALESCE(agent_slug, ''))"));
3612 assert!(parts[0].contains("'claude_code'"));
3613 assert!(params.is_empty());
3614 }
3615
3616 #[test]
3617 fn build_where_parts_single_agent_normalizes_trimmed_unknown_alias() {
3618 let f = AnalyticsFilter {
3619 agents: vec![" ".into()],
3620 ..Default::default()
3621 };
3622 let (parts, params) = build_where_parts(&f, None);
3623 assert_eq!(parts.len(), 1);
3624 assert!(parts[0].contains("TRIM(COALESCE(agent_slug, ''))"));
3625 assert!(parts[0].contains("'unknown'"));
3626 assert!(params.is_empty());
3627 }
3628
3629 #[test]
3630 fn build_where_parts_multiple_agents() {
3631 let f = AnalyticsFilter {
3632 agents: vec!["claude_code".into(), "codex".into(), "aider".into()],
3633 ..Default::default()
3634 };
3635 let (parts, params) = build_where_parts(&f, None);
3636 assert_eq!(parts.len(), 1);
3637 assert!(parts[0].contains("TRIM(COALESCE(agent_slug, ''))"));
3638 assert!(parts[0].contains("'claude_code'"));
3639 assert!(parts[0].contains("'codex'"));
3640 assert!(parts[0].contains("'aider'"));
3641 assert!(params.is_empty());
3642 }
3643
3644 #[test]
3645 fn build_where_parts_source_local() {
3646 let f = AnalyticsFilter {
3647 source: SourceFilter::Local,
3648 ..Default::default()
3649 };
3650 let (parts, params) = build_where_parts(&f, None);
3651 assert_eq!(parts.len(), 1);
3652 assert!(parts[0].contains("CASE WHEN TRIM(COALESCE(source_id, '')) = ''"));
3653 assert!(parts[0].contains("= 'local'"));
3654 assert!(params.is_empty());
3655 }
3656
3657 #[test]
3658 fn build_where_parts_source_remote() {
3659 let f = AnalyticsFilter {
3660 source: SourceFilter::Remote,
3661 ..Default::default()
3662 };
3663 let (parts, params) = build_where_parts(&f, None);
3664 assert_eq!(parts.len(), 1);
3665 assert!(parts[0].contains("CASE WHEN TRIM(COALESCE(source_id, '')) = ''"));
3666 assert!(parts[0].contains("!= 'local'"));
3667 assert!(params.is_empty());
3668 }
3669
3670 #[test]
3671 fn build_where_parts_source_specific() {
3672 let f = AnalyticsFilter {
3673 source: SourceFilter::Specific("myhost.local".into()),
3674 ..Default::default()
3675 };
3676 let (parts, params) = build_where_parts(&f, None);
3677 assert_eq!(parts.len(), 1);
3678 assert!(parts[0].contains("CASE WHEN TRIM(COALESCE(source_id, '')) = ''"));
3679 assert!(parts[0].contains("= 'myhost.local'"));
3680 assert!(params.is_empty());
3681 }
3682
3683 #[test]
3684 fn build_where_parts_source_specific_normalizes_trimmed_local_alias() {
3685 let f = AnalyticsFilter {
3686 source: SourceFilter::Specific(" LOCAL ".into()),
3687 ..Default::default()
3688 };
3689 let (parts, params) = build_where_parts(&f, None);
3690 assert_eq!(parts.len(), 1);
3691 assert!(parts[0].contains("CASE WHEN TRIM(COALESCE(source_id, '')) = ''"));
3692 assert!(parts[0].contains("= 'local'"));
3693 assert!(params.is_empty());
3694 }
3695
3696 #[test]
3697 fn build_where_parts_combined() {
3698 let f = AnalyticsFilter {
3699 agents: vec!["codex".into()],
3700 source: SourceFilter::Local,
3701 ..Default::default()
3702 };
3703 let (parts, params) = build_where_parts(&f, None);
3704 assert_eq!(parts.len(), 2);
3705 assert!(parts[0].contains("TRIM(COALESCE(agent_slug, ''))"));
3706 assert!(parts[0].contains("'codex'"));
3707 assert!(parts[1].contains("= 'local'"));
3708 assert!(params.is_empty());
3709 }
3710
3711 #[test]
3712 fn build_where_parts_workspace_filter_enabled() {
3713 let f = AnalyticsFilter {
3714 workspace_ids: vec![7, 42],
3715 ..Default::default()
3716 };
3717 let (parts, params) = build_where_parts(&f, Some("workspace_id"));
3718 assert_eq!(parts.len(), 1);
3719 assert!(parts[0].contains("workspace_id IN (7, 42)"));
3720 assert!(params.is_empty());
3721 }
3722
3723 #[test]
3724 fn build_where_parts_workspace_filter_disabled() {
3725 let f = AnalyticsFilter {
3726 workspace_ids: vec![7, 42],
3727 ..Default::default()
3728 };
3729 let (parts, params) = build_where_parts(&f, None);
3730 assert!(parts.is_empty());
3731 assert!(params.is_empty());
3732 }
3733
3734 fn setup_usage_daily_db() -> Connection {
3740 let conn = Connection::open(":memory:").unwrap();
3741 conn.execute_batch(
3742 "CREATE TABLE usage_daily (
3743 day_id INTEGER NOT NULL,
3744 agent_slug TEXT NOT NULL,
3745 workspace_id INTEGER NOT NULL DEFAULT 0,
3746 source_id TEXT NOT NULL DEFAULT 'local',
3747 message_count INTEGER NOT NULL DEFAULT 0,
3748 user_message_count INTEGER NOT NULL DEFAULT 0,
3749 assistant_message_count INTEGER NOT NULL DEFAULT 0,
3750 tool_call_count INTEGER NOT NULL DEFAULT 0,
3751 plan_message_count INTEGER NOT NULL DEFAULT 0,
3752 plan_content_tokens_est_total INTEGER NOT NULL DEFAULT 0,
3753 plan_api_tokens_total INTEGER NOT NULL DEFAULT 0,
3754 api_coverage_message_count INTEGER NOT NULL DEFAULT 0,
3755 content_tokens_est_total INTEGER NOT NULL DEFAULT 0,
3756 content_tokens_est_user INTEGER NOT NULL DEFAULT 0,
3757 content_tokens_est_assistant INTEGER NOT NULL DEFAULT 0,
3758 api_tokens_total INTEGER NOT NULL DEFAULT 0,
3759 api_input_tokens_total INTEGER NOT NULL DEFAULT 0,
3760 api_output_tokens_total INTEGER NOT NULL DEFAULT 0,
3761 api_cache_read_tokens_total INTEGER NOT NULL DEFAULT 0,
3762 api_cache_creation_tokens_total INTEGER NOT NULL DEFAULT 0,
3763 api_thinking_tokens_total INTEGER NOT NULL DEFAULT 0,
3764 last_updated INTEGER NOT NULL DEFAULT 0,
3765 PRIMARY KEY (day_id, agent_slug, workspace_id, source_id)
3766 );",
3767 )
3768 .unwrap();
3769
3770 let rows = [
3772 (
3773 20250,
3774 "claude_code",
3775 1,
3776 "local",
3777 100,
3778 50,
3779 50,
3780 20,
3781 5,
3782 80,
3783 40000,
3784 20000,
3785 20000,
3786 60000,
3787 30000,
3788 25000,
3789 3000,
3790 1500,
3791 500,
3792 ),
3793 (
3794 20250, "codex", 1, "local", 50, 25, 25, 10, 2, 40, 20000, 10000, 10000, 30000,
3795 15000, 12000, 2000, 800, 200,
3796 ),
3797 (
3798 20250, "aider", 2, "remote", 30, 15, 15, 5, 0, 0, 12000, 6000, 6000, 0, 0, 0, 0, 0,
3799 0,
3800 ),
3801 (
3802 20251,
3803 "claude_code",
3804 1,
3805 "local",
3806 120,
3807 60,
3808 60,
3809 25,
3810 8,
3811 100,
3812 50000,
3813 25000,
3814 25000,
3815 80000,
3816 40000,
3817 32000,
3818 5000,
3819 2000,
3820 1000,
3821 ),
3822 (
3823 20251, "codex", 1, "local", 60, 30, 30, 15, 3, 50, 25000, 12500, 12500, 40000,
3824 20000, 16000, 2500, 1000, 500,
3825 ),
3826 ];
3827
3828 for r in &rows {
3829 conn.execute_compat(
3830 "INSERT INTO usage_daily (day_id, agent_slug, workspace_id, source_id,
3831 message_count, user_message_count, assistant_message_count,
3832 tool_call_count, plan_message_count, api_coverage_message_count,
3833 content_tokens_est_total, content_tokens_est_user, content_tokens_est_assistant,
3834 api_tokens_total, api_input_tokens_total, api_output_tokens_total,
3835 api_cache_read_tokens_total, api_cache_creation_tokens_total,
3836 api_thinking_tokens_total)
3837 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19)",
3838 frankensqlite::params![
3839 r.0, r.1, r.2, r.3, r.4, r.5, r.6, r.7, r.8, r.9, r.10, r.11, r.12, r.13, r.14,
3840 r.15, r.16, r.17, r.18
3841 ],
3842 )
3843 .unwrap();
3844 }
3845
3846 conn
3847 }
3848
3849 #[allow(dead_code)]
3850 fn setup_usage_daily_legacy_db() -> Connection {
3852 let conn = Connection::open(":memory:").unwrap();
3853 conn.execute_batch(
3854 "CREATE TABLE usage_daily (
3855 day_id INTEGER NOT NULL,
3856 agent_slug TEXT NOT NULL,
3857 workspace_id INTEGER NOT NULL DEFAULT 0,
3858 source_id TEXT NOT NULL DEFAULT 'local',
3859 message_count INTEGER NOT NULL DEFAULT 0,
3860 user_message_count INTEGER NOT NULL DEFAULT 0,
3861 assistant_message_count INTEGER NOT NULL DEFAULT 0,
3862 tool_call_count INTEGER NOT NULL DEFAULT 0,
3863 plan_message_count INTEGER NOT NULL DEFAULT 0,
3864 api_coverage_message_count INTEGER NOT NULL DEFAULT 0,
3865 content_tokens_est_total INTEGER NOT NULL DEFAULT 0,
3866 content_tokens_est_user INTEGER NOT NULL DEFAULT 0,
3867 content_tokens_est_assistant INTEGER NOT NULL DEFAULT 0,
3868 api_tokens_total INTEGER NOT NULL DEFAULT 0,
3869 api_input_tokens_total INTEGER NOT NULL DEFAULT 0,
3870 api_output_tokens_total INTEGER NOT NULL DEFAULT 0,
3871 api_cache_read_tokens_total INTEGER NOT NULL DEFAULT 0,
3872 api_cache_creation_tokens_total INTEGER NOT NULL DEFAULT 0,
3873 api_thinking_tokens_total INTEGER NOT NULL DEFAULT 0,
3874 last_updated INTEGER NOT NULL DEFAULT 0,
3875 PRIMARY KEY (day_id, agent_slug, workspace_id, source_id)
3876 );
3877 INSERT INTO usage_daily VALUES
3878 (20254, 'codex', 1, 'local',
3879 3, 1, 2, 4, 1, 2,
3880 900, 300, 600,
3881 1200, 600, 500, 50, 30, 20,
3882 0);",
3883 )
3884 .unwrap();
3885 conn
3886 }
3887
3888 fn setup_usage_hourly_db() -> Connection {
3889 let conn = Connection::open(":memory:").unwrap();
3890 conn.execute_batch(
3891 "CREATE TABLE usage_hourly (
3892 hour_id INTEGER NOT NULL,
3893 agent_slug TEXT NOT NULL,
3894 workspace_id INTEGER NOT NULL DEFAULT 0,
3895 source_id TEXT NOT NULL DEFAULT 'local',
3896 message_count INTEGER NOT NULL DEFAULT 0,
3897 user_message_count INTEGER NOT NULL DEFAULT 0,
3898 assistant_message_count INTEGER NOT NULL DEFAULT 0,
3899 tool_call_count INTEGER NOT NULL DEFAULT 0,
3900 plan_message_count INTEGER NOT NULL DEFAULT 0,
3901 plan_content_tokens_est_total INTEGER NOT NULL DEFAULT 0,
3902 plan_api_tokens_total INTEGER NOT NULL DEFAULT 0,
3903 api_coverage_message_count INTEGER NOT NULL DEFAULT 0,
3904 content_tokens_est_total INTEGER NOT NULL DEFAULT 0,
3905 content_tokens_est_user INTEGER NOT NULL DEFAULT 0,
3906 content_tokens_est_assistant INTEGER NOT NULL DEFAULT 0,
3907 api_tokens_total INTEGER NOT NULL DEFAULT 0,
3908 api_input_tokens_total INTEGER NOT NULL DEFAULT 0,
3909 api_output_tokens_total INTEGER NOT NULL DEFAULT 0,
3910 api_cache_read_tokens_total INTEGER NOT NULL DEFAULT 0,
3911 api_cache_creation_tokens_total INTEGER NOT NULL DEFAULT 0,
3912 api_thinking_tokens_total INTEGER NOT NULL DEFAULT 0,
3913 last_updated INTEGER NOT NULL DEFAULT 0,
3914 PRIMARY KEY (hour_id, agent_slug, workspace_id, source_id)
3915 );",
3916 )
3917 .unwrap();
3918
3919 conn.execute_compat(
3920 "INSERT INTO usage_hourly (
3921 hour_id, agent_slug, workspace_id, source_id,
3922 message_count, user_message_count, assistant_message_count,
3923 tool_call_count, plan_message_count,
3924 plan_content_tokens_est_total, plan_api_tokens_total,
3925 api_coverage_message_count,
3926 content_tokens_est_total, content_tokens_est_user, content_tokens_est_assistant,
3927 api_tokens_total, api_input_tokens_total, api_output_tokens_total,
3928 api_cache_read_tokens_total, api_cache_creation_tokens_total, api_thinking_tokens_total,
3929 last_updated
3930 ) VALUES
3931 (?1, 'codex', 1, 'local',
3932 10, 4, 6, 3, 1,
3933 200, 400,
3934 8,
3935 1200, 500, 700,
3936 1400, 700, 550, 100, 25, 25,
3937 ?2)",
3938 frankensqlite::params![1000_i64, 1_i64],
3939 )
3940 .unwrap();
3941
3942 conn.execute_compat(
3943 "INSERT INTO usage_hourly (
3944 hour_id, agent_slug, workspace_id, source_id,
3945 message_count, user_message_count, assistant_message_count,
3946 tool_call_count, plan_message_count,
3947 plan_content_tokens_est_total, plan_api_tokens_total,
3948 api_coverage_message_count,
3949 content_tokens_est_total, content_tokens_est_user, content_tokens_est_assistant,
3950 api_tokens_total, api_input_tokens_total, api_output_tokens_total,
3951 api_cache_read_tokens_total, api_cache_creation_tokens_total, api_thinking_tokens_total,
3952 last_updated
3953 ) VALUES
3954 (?1, 'codex', 1, 'local',
3955 20, 9, 11, 5, 2,
3956 400, 700,
3957 17,
3958 2200, 900, 1300,
3959 2600, 1300, 1000, 200, 50, 50,
3960 ?2)",
3961 frankensqlite::params![1001_i64, 2_i64],
3962 )
3963 .unwrap();
3964 conn
3965 }
3966
3967 fn setup_tools_remote_source_fallback_db() -> Connection {
3968 let conn = Connection::open(":memory:").unwrap();
3969 conn.execute_batch(
3970 "CREATE TABLE agents (
3971 id INTEGER PRIMARY KEY,
3972 slug TEXT NOT NULL
3973 );
3974 CREATE TABLE conversations (
3975 id INTEGER PRIMARY KEY,
3976 agent_id INTEGER NOT NULL,
3977 workspace_id INTEGER,
3978 source_id TEXT NOT NULL,
3979 origin_host TEXT,
3980 source_path TEXT NOT NULL,
3981 started_at INTEGER
3982 );
3983 CREATE TABLE messages (
3984 id INTEGER PRIMARY KEY,
3985 conversation_id INTEGER NOT NULL,
3986 idx INTEGER NOT NULL,
3987 role TEXT NOT NULL,
3988 created_at INTEGER,
3989 content TEXT NOT NULL
3990 );
3991 CREATE TABLE message_metrics (
3992 message_id INTEGER PRIMARY KEY,
3993 created_at_ms INTEGER,
3994 tool_call_count INTEGER NOT NULL DEFAULT 0,
3995 content_tokens_est INTEGER,
3996 api_input_tokens INTEGER,
3997 api_output_tokens INTEGER,
3998 api_cache_read_tokens INTEGER,
3999 api_cache_creation_tokens INTEGER,
4000 api_thinking_tokens INTEGER
4001 );
4002 CREATE TABLE usage_daily (
4003 day_id INTEGER NOT NULL,
4004 agent_slug TEXT NOT NULL,
4005 workspace_id INTEGER NOT NULL DEFAULT 0,
4006 source_id TEXT NOT NULL DEFAULT 'local',
4007 message_count INTEGER NOT NULL DEFAULT 0,
4008 user_message_count INTEGER NOT NULL DEFAULT 0,
4009 assistant_message_count INTEGER NOT NULL DEFAULT 0,
4010 tool_call_count INTEGER NOT NULL DEFAULT 0,
4011 plan_message_count INTEGER NOT NULL DEFAULT 0,
4012 api_coverage_message_count INTEGER NOT NULL DEFAULT 0,
4013 content_tokens_est_total INTEGER NOT NULL DEFAULT 0,
4014 content_tokens_est_user INTEGER NOT NULL DEFAULT 0,
4015 content_tokens_est_assistant INTEGER NOT NULL DEFAULT 0,
4016 api_tokens_total INTEGER NOT NULL DEFAULT 0,
4017 api_input_tokens_total INTEGER NOT NULL DEFAULT 0,
4018 api_output_tokens_total INTEGER NOT NULL DEFAULT 0,
4019 api_cache_read_tokens_total INTEGER NOT NULL DEFAULT 0,
4020 api_cache_creation_tokens_total INTEGER NOT NULL DEFAULT 0,
4021 api_thinking_tokens_total INTEGER NOT NULL DEFAULT 0,
4022 last_updated INTEGER NOT NULL DEFAULT 0,
4023 PRIMARY KEY (day_id, agent_slug, workspace_id, source_id)
4024 );",
4025 )
4026 .unwrap();
4027
4028 conn.execute("INSERT INTO agents (id, slug) VALUES (1, 'codex')")
4029 .unwrap();
4030 conn.execute("INSERT INTO agents (id, slug) VALUES (2, 'claude_code')")
4031 .unwrap();
4032
4033 conn.execute(
4034 "INSERT INTO conversations
4035 (id, agent_id, workspace_id, source_id, origin_host, source_path, started_at)
4036 VALUES (1, 1, 1, 'local', '', '/sessions/local.jsonl', 1700000000000)",
4037 )
4038 .unwrap();
4039 conn.execute(
4040 "INSERT INTO conversations
4041 (id, agent_id, workspace_id, source_id, origin_host, source_path, started_at)
4042 VALUES (2, 2, 2, ' ', 'remote-ci', '/sessions/remote.jsonl', 1700000001000)",
4043 )
4044 .unwrap();
4045
4046 conn.execute(
4047 "INSERT INTO messages (id, conversation_id, idx, role, created_at, content)
4048 VALUES (11, 1, 0, 'assistant', 1700000000000, 'local tool')",
4049 )
4050 .unwrap();
4051 conn.execute(
4052 "INSERT INTO messages (id, conversation_id, idx, role, created_at, content)
4053 VALUES (21, 2, 0, 'assistant', 1700000001000, 'remote tool')",
4054 )
4055 .unwrap();
4056
4057 conn.execute(
4058 "INSERT INTO message_metrics
4059 (message_id, created_at_ms, tool_call_count, content_tokens_est,
4060 api_input_tokens, api_output_tokens, api_cache_read_tokens,
4061 api_cache_creation_tokens, api_thinking_tokens)
4062 VALUES (11, 1700000000000, 2, 30, 10, 20, 0, 0, 0)",
4063 )
4064 .unwrap();
4065 conn.execute(
4066 "INSERT INTO message_metrics
4067 (message_id, created_at_ms, tool_call_count, content_tokens_est,
4068 api_input_tokens, api_output_tokens, api_cache_read_tokens,
4069 api_cache_creation_tokens, api_thinking_tokens)
4070 VALUES (21, 1700000001000, 7, 90, 30, 70, 0, 0, 0)",
4071 )
4072 .unwrap();
4073
4074 conn.execute(
4075 "INSERT INTO usage_daily
4076 (day_id, agent_slug, workspace_id, source_id, message_count,
4077 assistant_message_count, tool_call_count, content_tokens_est_total,
4078 content_tokens_est_assistant, api_tokens_total, api_input_tokens_total,
4079 api_output_tokens_total, last_updated)
4080 VALUES (20250, 'codex', 1, 'local', 1, 1, 2, 30, 30, 30, 10, 20, 1)",
4081 )
4082 .unwrap();
4083 conn.execute(
4084 "INSERT INTO usage_daily
4085 (day_id, agent_slug, workspace_id, source_id, message_count,
4086 assistant_message_count, tool_call_count, content_tokens_est_total,
4087 content_tokens_est_assistant, api_tokens_total, api_input_tokens_total,
4088 api_output_tokens_total, last_updated)
4089 VALUES (20250, 'claude_code', 2, ' ', 1, 1, 7, 90, 90, 100, 30, 70, 1)",
4090 )
4091 .unwrap();
4092
4093 conn
4094 }
4095
4096 fn setup_token_daily_stats_db() -> Connection {
4098 let conn = Connection::open(":memory:").unwrap();
4099 conn.execute_batch(
4100 "CREATE TABLE token_daily_stats (
4101 day_id INTEGER NOT NULL,
4102 agent_slug TEXT NOT NULL,
4103 source_id TEXT NOT NULL DEFAULT 'all',
4104 model_family TEXT NOT NULL DEFAULT 'all',
4105 api_call_count INTEGER NOT NULL DEFAULT 0,
4106 user_message_count INTEGER NOT NULL DEFAULT 0,
4107 assistant_message_count INTEGER NOT NULL DEFAULT 0,
4108 tool_message_count INTEGER NOT NULL DEFAULT 0,
4109 total_input_tokens INTEGER NOT NULL DEFAULT 0,
4110 total_output_tokens INTEGER NOT NULL DEFAULT 0,
4111 total_cache_read_tokens INTEGER NOT NULL DEFAULT 0,
4112 total_cache_creation_tokens INTEGER NOT NULL DEFAULT 0,
4113 total_thinking_tokens INTEGER NOT NULL DEFAULT 0,
4114 grand_total_tokens INTEGER NOT NULL DEFAULT 0,
4115 total_content_chars INTEGER NOT NULL DEFAULT 0,
4116 total_tool_calls INTEGER NOT NULL DEFAULT 0,
4117 estimated_cost_usd REAL NOT NULL DEFAULT 0.0,
4118 session_count INTEGER NOT NULL DEFAULT 0,
4119 last_updated INTEGER NOT NULL,
4120 PRIMARY KEY (day_id, agent_slug, source_id, model_family)
4121 );",
4122 )
4123 .unwrap();
4124
4125 let now = std::time::SystemTime::now()
4127 .duration_since(std::time::UNIX_EPOCH)
4128 .unwrap()
4129 .as_secs() as i64;
4130 conn.execute_compat(
4131 "INSERT INTO token_daily_stats VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19)",
4132 frankensqlite::params![20250, "claude_code", "local", "opus", 80, 40, 40, 5, 30000, 25000, 3000, 1500, 500, 60000, 160000, 20, 1.50, 3, now],
4133 ).unwrap();
4134 conn.execute_compat(
4135 "INSERT INTO token_daily_stats VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19)",
4136 frankensqlite::params![20250, "claude_code", "local", "sonnet", 40, 20, 20, 2, 10000, 8000, 1000, 500, 200, 19700, 80000, 8, 0.40, 2, now],
4137 ).unwrap();
4138 conn.execute_compat(
4139 "INSERT INTO token_daily_stats VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19)",
4140 frankensqlite::params![20250, "codex", "local", "gpt-4o", 50, 25, 25, 3, 15000, 12000, 2000, 800, 0, 29800, 100000, 10, 0.80, 1, now],
4141 ).unwrap();
4142
4143 conn
4144 }
4145
4146 fn setup_status_freshness_db(
4147 hourly_last_updated: i64,
4148 track_b_last_updated: i64,
4149 ) -> Connection {
4150 let conn = Connection::open(":memory:").unwrap();
4151 conn.execute_batch(
4152 "CREATE TABLE usage_hourly (
4153 hour_id INTEGER NOT NULL,
4154 last_updated INTEGER NOT NULL
4155 );
4156 CREATE TABLE token_daily_stats (
4157 day_id INTEGER NOT NULL,
4158 last_updated INTEGER NOT NULL
4159 );",
4160 )
4161 .unwrap();
4162
4163 conn.execute_compat(
4164 "INSERT INTO usage_hourly (hour_id, last_updated) VALUES (?1, ?2)",
4165 frankensqlite::params![123_i64, hourly_last_updated],
4166 )
4167 .unwrap();
4168 conn.execute_compat(
4169 "INSERT INTO token_daily_stats (day_id, last_updated) VALUES (?1, ?2)",
4170 frankensqlite::params![456_i64, track_b_last_updated],
4171 )
4172 .unwrap();
4173
4174 conn
4175 }
4176
4177 fn setup_session_scatter_db() -> Connection {
4178 let conn = Connection::open(":memory:").unwrap();
4179 conn.execute_batch(
4180 "CREATE TABLE agents (
4181 id INTEGER PRIMARY KEY,
4182 slug TEXT NOT NULL
4183 );
4184 CREATE TABLE conversations (
4185 id INTEGER PRIMARY KEY,
4186 agent_id INTEGER NOT NULL,
4187 workspace_id INTEGER,
4188 source_id TEXT NOT NULL,
4189 origin_host TEXT,
4190 source_path TEXT NOT NULL,
4191 started_at INTEGER,
4192 grand_total_tokens INTEGER
4193 );
4194 CREATE TABLE messages (
4195 id INTEGER PRIMARY KEY,
4196 conversation_id INTEGER NOT NULL,
4197 idx INTEGER NOT NULL,
4198 role TEXT NOT NULL,
4199 created_at INTEGER,
4200 content TEXT NOT NULL
4201 );
4202 CREATE TABLE message_metrics (
4203 message_id INTEGER PRIMARY KEY,
4204 api_input_tokens INTEGER,
4205 api_output_tokens INTEGER,
4206 api_cache_read_tokens INTEGER,
4207 api_cache_creation_tokens INTEGER,
4208 api_thinking_tokens INTEGER
4209 );",
4210 )
4211 .unwrap();
4212
4213 conn.execute("INSERT INTO agents (id, slug) VALUES (1, 'codex')")
4214 .unwrap();
4215 conn.execute("INSERT INTO agents (id, slug) VALUES (2, 'claude_code')")
4216 .unwrap();
4217
4218 conn.execute(
4219 "INSERT INTO conversations
4220 (id, agent_id, workspace_id, source_id, source_path, started_at, grand_total_tokens)
4221 VALUES (1, 1, 10, 'local', '/sessions/a.jsonl', 1700000000000, 1000)",
4222 )
4223 .unwrap();
4224 conn.execute(
4225 "INSERT INTO conversations
4226 (id, agent_id, workspace_id, source_id, source_path, started_at, grand_total_tokens)
4227 VALUES (2, 2, 20, 'remote-ci', '/sessions/b.jsonl', 1700000000000, 2300)",
4228 )
4229 .unwrap();
4230
4231 conn.execute(
4233 "INSERT INTO messages (id, conversation_id, idx, role, created_at, content)
4234 VALUES (11, 1, 0, 'user', 1700000001000, 'a1')",
4235 )
4236 .unwrap();
4237 conn.execute(
4238 "INSERT INTO messages (id, conversation_id, idx, role, created_at, content)
4239 VALUES (12, 1, 1, 'assistant', 1700000002000, 'a2')",
4240 )
4241 .unwrap();
4242 conn.execute(
4243 "INSERT INTO message_metrics
4244 (message_id, api_input_tokens, api_output_tokens, api_cache_read_tokens, api_cache_creation_tokens, api_thinking_tokens)
4245 VALUES (11, 200, 250, 0, 0, 50)",
4246 )
4247 .unwrap();
4248 conn.execute(
4249 "INSERT INTO message_metrics
4250 (message_id, api_input_tokens, api_output_tokens, api_cache_read_tokens, api_cache_creation_tokens, api_thinking_tokens)
4251 VALUES (12, 200, 300, 0, 0, 0)",
4252 )
4253 .unwrap();
4254
4255 conn.execute(
4257 "INSERT INTO messages (id, conversation_id, idx, role, created_at, content)
4258 VALUES (21, 2, 0, 'user', 1700000001000, 'b1')",
4259 )
4260 .unwrap();
4261 conn.execute(
4262 "INSERT INTO messages (id, conversation_id, idx, role, created_at, content)
4263 VALUES (22, 2, 1, 'assistant', 1700000002000, 'b2')",
4264 )
4265 .unwrap();
4266 conn.execute(
4267 "INSERT INTO messages (id, conversation_id, idx, role, created_at, content)
4268 VALUES (23, 2, 2, 'assistant', 1700000003000, 'b3')",
4269 )
4270 .unwrap();
4271 conn.execute(
4272 "INSERT INTO message_metrics
4273 (message_id, api_input_tokens, api_output_tokens, api_cache_read_tokens, api_cache_creation_tokens, api_thinking_tokens)
4274 VALUES (21, 300, 500, 0, 0, 0)",
4275 )
4276 .unwrap();
4277 conn.execute(
4278 "INSERT INTO message_metrics
4279 (message_id, api_input_tokens, api_output_tokens, api_cache_read_tokens, api_cache_creation_tokens, api_thinking_tokens)
4280 VALUES (22, 500, 500, 0, 0, 0)",
4281 )
4282 .unwrap();
4283 conn.execute(
4284 "INSERT INTO message_metrics
4285 (message_id, api_input_tokens, api_output_tokens, api_cache_read_tokens, api_cache_creation_tokens, api_thinking_tokens)
4286 VALUES (23, 200, 300, 0, 0, 0)",
4287 )
4288 .unwrap();
4289
4290 conn
4291 }
4292
4293 fn setup_session_scatter_with_token_usage_fallback_db() -> Connection {
4294 let conn = setup_session_scatter_db();
4295 conn.execute_batch(
4296 "CREATE TABLE token_usage (
4297 message_id INTEGER PRIMARY KEY,
4298 total_tokens INTEGER
4299 );",
4300 )
4301 .unwrap();
4302
4303 conn.execute("INSERT INTO token_usage (message_id, total_tokens) VALUES (11, 999)")
4305 .unwrap();
4306 conn.execute(
4308 "UPDATE message_metrics
4309 SET api_input_tokens = NULL,
4310 api_output_tokens = NULL,
4311 api_cache_read_tokens = NULL,
4312 api_cache_creation_tokens = NULL,
4313 api_thinking_tokens = NULL
4314 WHERE message_id = 12",
4315 )
4316 .unwrap();
4317 conn.execute("INSERT INTO token_usage (message_id, total_tokens) VALUES (12, 900)")
4318 .unwrap();
4319
4320 conn
4321 }
4322
4323 fn setup_session_scatter_with_api_source_column_db() -> Connection {
4324 let conn = setup_session_scatter_with_token_usage_fallback_db();
4325 conn.execute("ALTER TABLE message_metrics ADD COLUMN api_data_source TEXT")
4326 .unwrap();
4327 conn.execute(
4330 "UPDATE message_metrics
4331 SET api_data_source = 'api'
4332 WHERE message_id IN (11, 12)",
4333 )
4334 .unwrap();
4335 conn
4336 }
4337
4338 fn setup_duplicate_message_metrics_raw_db() -> Connection {
4339 let conn = Connection::open(":memory:").unwrap();
4340 conn.execute_batch(
4341 "CREATE TABLE agents (
4342 id INTEGER PRIMARY KEY,
4343 slug TEXT NOT NULL
4344 );
4345 CREATE TABLE conversations (
4346 id INTEGER PRIMARY KEY,
4347 agent_id INTEGER NOT NULL,
4348 workspace_id INTEGER,
4349 source_id TEXT NOT NULL,
4350 origin_host TEXT,
4351 source_path TEXT NOT NULL,
4352 started_at INTEGER,
4353 grand_total_tokens INTEGER
4354 );
4355 CREATE TABLE messages (
4356 id INTEGER PRIMARY KEY,
4357 conversation_id INTEGER NOT NULL,
4358 idx INTEGER NOT NULL,
4359 role TEXT NOT NULL,
4360 created_at INTEGER,
4361 content TEXT NOT NULL
4362 );
4363 CREATE TABLE message_metrics (
4364 id INTEGER PRIMARY KEY,
4365 message_id INTEGER NOT NULL,
4366 created_at_ms INTEGER,
4367 tool_call_count INTEGER,
4368 content_tokens_est INTEGER,
4369 api_input_tokens INTEGER,
4370 api_output_tokens INTEGER,
4371 api_cache_read_tokens INTEGER,
4372 api_cache_creation_tokens INTEGER,
4373 api_thinking_tokens INTEGER,
4374 api_data_source TEXT,
4375 has_plan INTEGER
4376 );",
4377 )
4378 .unwrap();
4379
4380 conn.execute("INSERT INTO agents (id, slug) VALUES (1, 'codex')")
4381 .unwrap();
4382 conn.execute(
4383 "INSERT INTO conversations
4384 (id, agent_id, workspace_id, source_id, source_path, started_at, grand_total_tokens)
4385 VALUES (1, 1, 10, 'local', '/sessions/dup.jsonl', 1700000000000, 1200)",
4386 )
4387 .unwrap();
4388 conn.execute(
4389 "INSERT INTO messages (id, conversation_id, idx, role, created_at, content)
4390 VALUES (11, 1, 0, 'user', 1700000001000, 'dup-a')",
4391 )
4392 .unwrap();
4393 conn.execute(
4394 "INSERT INTO messages (id, conversation_id, idx, role, created_at, content)
4395 VALUES (12, 1, 1, 'assistant', 1700000002000, 'dup-b')",
4396 )
4397 .unwrap();
4398 conn.execute_batch(
4399 "INSERT INTO message_metrics
4400 (id, message_id, created_at_ms, tool_call_count, content_tokens_est,
4401 api_input_tokens, api_output_tokens, api_cache_read_tokens,
4402 api_cache_creation_tokens, api_thinking_tokens, api_data_source, has_plan)
4403 VALUES
4404 (1, 11, 1700000001000, 3, 100, 200, 300, 0, 0, 0, 'api', 1),
4405 (2, 11, 1700000001000, 3, 100, 200, 300, 0, 0, 0, 'api', 1),
4406 (3, 12, 1700000002000, 4, 120, 250, 450, 0, 0, 0, 'api', 0);",
4407 )
4408 .unwrap();
4409
4410 conn
4411 }
4412
4413 fn setup_status_filter_db() -> Connection {
4414 let conn = Connection::open(":memory:").unwrap();
4415 let now_ms = std::time::SystemTime::now()
4416 .duration_since(std::time::UNIX_EPOCH)
4417 .unwrap()
4418 .as_millis() as i64;
4419 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
4420 let day11_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(11);
4421 conn.execute_batch(
4422 "CREATE TABLE agents (
4423 id INTEGER PRIMARY KEY,
4424 slug TEXT NOT NULL
4425 );
4426 CREATE TABLE conversations (
4427 id INTEGER PRIMARY KEY,
4428 agent_id INTEGER NOT NULL,
4429 workspace_id INTEGER,
4430 source_id TEXT NOT NULL,
4431 source_path TEXT NOT NULL,
4432 started_at INTEGER
4433 );
4434 CREATE TABLE messages (
4435 id INTEGER PRIMARY KEY,
4436 conversation_id INTEGER NOT NULL,
4437 idx INTEGER NOT NULL,
4438 role TEXT NOT NULL,
4439 created_at INTEGER,
4440 content TEXT NOT NULL
4441 );
4442 CREATE TABLE message_metrics (
4443 message_id INTEGER PRIMARY KEY,
4444 created_at_ms INTEGER NOT NULL,
4445 hour_id INTEGER NOT NULL,
4446 day_id INTEGER NOT NULL,
4447 agent_slug TEXT NOT NULL,
4448 workspace_id INTEGER NOT NULL DEFAULT 0,
4449 source_id TEXT NOT NULL DEFAULT 'local',
4450 role TEXT NOT NULL,
4451 content_chars INTEGER NOT NULL,
4452 content_tokens_est INTEGER NOT NULL,
4453 api_input_tokens INTEGER,
4454 api_output_tokens INTEGER,
4455 api_cache_read_tokens INTEGER,
4456 api_cache_creation_tokens INTEGER,
4457 api_thinking_tokens INTEGER,
4458 api_data_source TEXT NOT NULL DEFAULT 'estimated'
4459 );
4460 CREATE TABLE usage_hourly (
4461 hour_id INTEGER NOT NULL,
4462 agent_slug TEXT NOT NULL,
4463 workspace_id INTEGER NOT NULL DEFAULT 0,
4464 source_id TEXT NOT NULL DEFAULT 'local',
4465 message_count INTEGER NOT NULL DEFAULT 0,
4466 user_message_count INTEGER NOT NULL DEFAULT 0,
4467 assistant_message_count INTEGER NOT NULL DEFAULT 0,
4468 tool_call_count INTEGER NOT NULL DEFAULT 0,
4469 plan_message_count INTEGER NOT NULL DEFAULT 0,
4470 api_coverage_message_count INTEGER NOT NULL DEFAULT 0,
4471 content_tokens_est_total INTEGER NOT NULL DEFAULT 0,
4472 content_tokens_est_user INTEGER NOT NULL DEFAULT 0,
4473 content_tokens_est_assistant INTEGER NOT NULL DEFAULT 0,
4474 api_tokens_total INTEGER NOT NULL DEFAULT 0,
4475 api_input_tokens_total INTEGER NOT NULL DEFAULT 0,
4476 api_output_tokens_total INTEGER NOT NULL DEFAULT 0,
4477 api_cache_read_tokens_total INTEGER NOT NULL DEFAULT 0,
4478 api_cache_creation_tokens_total INTEGER NOT NULL DEFAULT 0,
4479 api_thinking_tokens_total INTEGER NOT NULL DEFAULT 0,
4480 last_updated INTEGER NOT NULL DEFAULT 0,
4481 PRIMARY KEY (hour_id, agent_slug, workspace_id, source_id)
4482 );
4483 CREATE TABLE usage_daily (
4484 day_id INTEGER NOT NULL,
4485 agent_slug TEXT NOT NULL,
4486 workspace_id INTEGER NOT NULL DEFAULT 0,
4487 source_id TEXT NOT NULL DEFAULT 'local',
4488 message_count INTEGER NOT NULL DEFAULT 0,
4489 user_message_count INTEGER NOT NULL DEFAULT 0,
4490 assistant_message_count INTEGER NOT NULL DEFAULT 0,
4491 tool_call_count INTEGER NOT NULL DEFAULT 0,
4492 plan_message_count INTEGER NOT NULL DEFAULT 0,
4493 api_coverage_message_count INTEGER NOT NULL DEFAULT 0,
4494 content_tokens_est_total INTEGER NOT NULL DEFAULT 0,
4495 content_tokens_est_user INTEGER NOT NULL DEFAULT 0,
4496 content_tokens_est_assistant INTEGER NOT NULL DEFAULT 0,
4497 api_tokens_total INTEGER NOT NULL DEFAULT 0,
4498 api_input_tokens_total INTEGER NOT NULL DEFAULT 0,
4499 api_output_tokens_total INTEGER NOT NULL DEFAULT 0,
4500 api_cache_read_tokens_total INTEGER NOT NULL DEFAULT 0,
4501 api_cache_creation_tokens_total INTEGER NOT NULL DEFAULT 0,
4502 api_thinking_tokens_total INTEGER NOT NULL DEFAULT 0,
4503 last_updated INTEGER NOT NULL DEFAULT 0,
4504 PRIMARY KEY (day_id, agent_slug, workspace_id, source_id)
4505 );
4506 CREATE TABLE token_usage (
4507 id INTEGER PRIMARY KEY AUTOINCREMENT,
4508 message_id INTEGER NOT NULL,
4509 conversation_id INTEGER NOT NULL,
4510 agent_id INTEGER NOT NULL,
4511 workspace_id INTEGER,
4512 source_id TEXT NOT NULL DEFAULT 'local',
4513 timestamp_ms INTEGER NOT NULL,
4514 day_id INTEGER NOT NULL,
4515 model_name TEXT,
4516 model_family TEXT,
4517 total_tokens INTEGER,
4518 data_source TEXT NOT NULL DEFAULT 'api'
4519 );
4520 CREATE TABLE token_daily_stats (
4521 day_id INTEGER NOT NULL,
4522 agent_slug TEXT NOT NULL,
4523 source_id TEXT NOT NULL DEFAULT 'all',
4524 model_family TEXT NOT NULL DEFAULT 'all',
4525 api_call_count INTEGER NOT NULL DEFAULT 0,
4526 user_message_count INTEGER NOT NULL DEFAULT 0,
4527 assistant_message_count INTEGER NOT NULL DEFAULT 0,
4528 tool_message_count INTEGER NOT NULL DEFAULT 0,
4529 total_input_tokens INTEGER NOT NULL DEFAULT 0,
4530 total_output_tokens INTEGER NOT NULL DEFAULT 0,
4531 total_cache_read_tokens INTEGER NOT NULL DEFAULT 0,
4532 total_cache_creation_tokens INTEGER NOT NULL DEFAULT 0,
4533 total_thinking_tokens INTEGER NOT NULL DEFAULT 0,
4534 grand_total_tokens INTEGER NOT NULL DEFAULT 0,
4535 total_content_chars INTEGER NOT NULL DEFAULT 0,
4536 total_tool_calls INTEGER NOT NULL DEFAULT 0,
4537 estimated_cost_usd REAL NOT NULL DEFAULT 0.0,
4538 session_count INTEGER NOT NULL DEFAULT 0,
4539 last_updated INTEGER NOT NULL,
4540 PRIMARY KEY (day_id, agent_slug, source_id, model_family)
4541 );",
4542 )
4543 .unwrap();
4544
4545 conn.execute("INSERT INTO agents (id, slug) VALUES (1, 'codex')")
4546 .unwrap();
4547 conn.execute("INSERT INTO agents (id, slug) VALUES (2, 'claude_code')")
4548 .unwrap();
4549
4550 conn.execute(&format!(
4551 "INSERT INTO conversations (id, agent_id, workspace_id, source_id, source_path, started_at)
4552 VALUES (1, 1, 1, 'local', '/sessions/a.jsonl', {day10_ms})"
4553 ))
4554 .unwrap();
4555 conn.execute(&format!(
4556 "INSERT INTO conversations (id, agent_id, workspace_id, source_id, source_path, started_at)
4557 VALUES (2, 2, 2, 'remote-ci', '/sessions/b.jsonl', {day11_ms})"
4558 ))
4559 .unwrap();
4560
4561 conn.execute(&format!(
4562 "INSERT INTO messages (id, conversation_id, idx, role, created_at, content)
4563 VALUES (11, 1, 0, 'user', {}, 'a1')",
4564 day10_ms + 100,
4565 ))
4566 .unwrap();
4567 conn.execute(&format!(
4568 "INSERT INTO messages (id, conversation_id, idx, role, created_at, content)
4569 VALUES (12, 1, 1, 'assistant', {}, 'a2')",
4570 day10_ms + 200,
4571 ))
4572 .unwrap();
4573 conn.execute(&format!(
4574 "INSERT INTO messages (id, conversation_id, idx, role, created_at, content)
4575 VALUES (21, 2, 0, 'assistant', {}, 'b1')",
4576 day11_ms + 100,
4577 ))
4578 .unwrap();
4579
4580 conn.execute_batch(
4581 &format!(
4582 "INSERT INTO message_metrics
4583 (message_id, created_at_ms, hour_id, day_id, agent_slug, workspace_id, source_id,
4584 role, content_chars, content_tokens_est, api_input_tokens, api_output_tokens,
4585 api_cache_read_tokens, api_cache_creation_tokens, api_thinking_tokens, api_data_source)
4586 VALUES
4587 (11, {day10_ms} + 100, 240, 10, 'codex', 1, 'local', 'user', 10, 3, 5, 7, 0, 0, 0, 'api'),
4588 (12, {day10_ms} + 200, 240, 10, 'codex', 1, 'local', 'assistant', 12, 4, 8, 9, 0, 0, 0, 'api'),
4589 (21, {day11_ms} + 100, 264, 11, 'claude_code', 2, 'remote-ci', 'assistant', 14, 5, NULL, NULL, NULL, NULL, NULL, 'estimated');
4590 INSERT INTO usage_hourly
4591 (hour_id, agent_slug, workspace_id, source_id, message_count, user_message_count,
4592 assistant_message_count, api_coverage_message_count, api_tokens_total, last_updated)
4593 VALUES
4594 (240, 'codex', 1, 'local', 2, 1, 1, 2, 29, {now_ms}),
4595 (264, 'claude_code', 2, 'remote-ci', 1, 0, 1, 0, 0, {now_ms});
4596 INSERT INTO usage_daily
4597 (day_id, agent_slug, workspace_id, source_id, message_count, user_message_count,
4598 assistant_message_count, api_coverage_message_count, api_tokens_total, last_updated)
4599 VALUES
4600 (10, 'codex', 1, 'local', 2, 1, 1, 2, 29, {now_ms}),
4601 (11, 'claude_code', 2, 'remote-ci', 1, 0, 1, 0, 0, {now_ms});
4602 INSERT INTO token_usage
4603 (message_id, conversation_id, agent_id, workspace_id, source_id, timestamp_ms, day_id,
4604 model_name, model_family, total_tokens, data_source)
4605 VALUES
4606 (11, 1, 1, 1, 'local', {day10_ms} + 100, 10, 'gpt-4o-mini', 'gpt-4o', 12, 'api'),
4607 (12, 1, 1, 1, 'local', {day10_ms} + 200, 10, 'gpt-4o-mini', 'gpt-4o', 17, 'api'),
4608 (21, 2, 2, 2, 'remote-ci', {day11_ms} + 100, 11, NULL, 'claude', 11, 'estimated');
4609 INSERT INTO token_daily_stats
4610 (day_id, agent_slug, source_id, model_family, api_call_count, user_message_count,
4611 assistant_message_count, grand_total_tokens, session_count, last_updated)
4612 VALUES
4613 (10, 'codex', 'local', 'gpt-4o', 2, 1, 1, 29, 1, {now_ms}),
4614 (11, 'claude_code', 'remote-ci', 'claude', 0, 0, 1, 11, 1, {now_ms});"
4615 ),
4616 )
4617 .unwrap();
4618
4619 conn
4620 }
4621
4622 #[test]
4623 fn normalize_epoch_millis_preserves_negative_millisecond_values() {
4624 assert_eq!(normalize_epoch_millis(-1_000), -1_000);
4625 assert_eq!(normalize_epoch_millis(-86_400_000), -86_400_000);
4626 assert_eq!(normalize_epoch_millis(1_700_000_000), 1_700_000_000_000);
4627 }
4628
4629 fn setup_legacy_status_filter_db_without_message_metrics_created_at() -> Connection {
4630 let conn = Connection::open(":memory:").unwrap();
4631 let now_ms = std::time::SystemTime::now()
4632 .duration_since(std::time::UNIX_EPOCH)
4633 .unwrap()
4634 .as_millis() as i64;
4635 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
4636 conn.execute_batch(
4637 "CREATE TABLE agents (
4638 id INTEGER PRIMARY KEY,
4639 slug TEXT NOT NULL
4640 );
4641 CREATE TABLE conversations (
4642 id INTEGER PRIMARY KEY,
4643 agent_id INTEGER NOT NULL,
4644 workspace_id INTEGER,
4645 source_id TEXT NOT NULL,
4646 source_path TEXT NOT NULL,
4647 started_at INTEGER
4648 );
4649 CREATE TABLE messages (
4650 id INTEGER PRIMARY KEY,
4651 conversation_id INTEGER NOT NULL,
4652 idx INTEGER NOT NULL,
4653 role TEXT NOT NULL,
4654 created_at INTEGER,
4655 content TEXT NOT NULL
4656 );
4657 CREATE TABLE message_metrics (
4658 message_id INTEGER PRIMARY KEY,
4659 hour_id INTEGER NOT NULL,
4660 day_id INTEGER NOT NULL,
4661 agent_slug TEXT NOT NULL,
4662 workspace_id INTEGER NOT NULL DEFAULT 0,
4663 source_id TEXT NOT NULL DEFAULT 'local',
4664 role TEXT NOT NULL,
4665 content_chars INTEGER NOT NULL,
4666 content_tokens_est INTEGER NOT NULL,
4667 api_input_tokens INTEGER,
4668 api_output_tokens INTEGER,
4669 api_cache_read_tokens INTEGER,
4670 api_cache_creation_tokens INTEGER,
4671 api_thinking_tokens INTEGER,
4672 api_data_source TEXT NOT NULL DEFAULT 'estimated'
4673 );
4674 CREATE TABLE usage_hourly (
4675 hour_id INTEGER NOT NULL,
4676 agent_slug TEXT NOT NULL,
4677 workspace_id INTEGER NOT NULL DEFAULT 0,
4678 source_id TEXT NOT NULL DEFAULT 'local',
4679 message_count INTEGER NOT NULL DEFAULT 0,
4680 user_message_count INTEGER NOT NULL DEFAULT 0,
4681 assistant_message_count INTEGER NOT NULL DEFAULT 0,
4682 tool_call_count INTEGER NOT NULL DEFAULT 0,
4683 plan_message_count INTEGER NOT NULL DEFAULT 0,
4684 api_coverage_message_count INTEGER NOT NULL DEFAULT 0,
4685 content_tokens_est_total INTEGER NOT NULL DEFAULT 0,
4686 content_tokens_est_user INTEGER NOT NULL DEFAULT 0,
4687 content_tokens_est_assistant INTEGER NOT NULL DEFAULT 0,
4688 api_tokens_total INTEGER NOT NULL DEFAULT 0,
4689 api_input_tokens_total INTEGER NOT NULL DEFAULT 0,
4690 api_output_tokens_total INTEGER NOT NULL DEFAULT 0,
4691 api_cache_read_tokens_total INTEGER NOT NULL DEFAULT 0,
4692 api_cache_creation_tokens_total INTEGER NOT NULL DEFAULT 0,
4693 api_thinking_tokens_total INTEGER NOT NULL DEFAULT 0,
4694 last_updated INTEGER NOT NULL DEFAULT 0,
4695 PRIMARY KEY (hour_id, agent_slug, workspace_id, source_id)
4696 );
4697 CREATE TABLE usage_daily (
4698 day_id INTEGER NOT NULL,
4699 agent_slug TEXT NOT NULL,
4700 workspace_id INTEGER NOT NULL DEFAULT 0,
4701 source_id TEXT NOT NULL DEFAULT 'local',
4702 message_count INTEGER NOT NULL DEFAULT 0,
4703 user_message_count INTEGER NOT NULL DEFAULT 0,
4704 assistant_message_count INTEGER NOT NULL DEFAULT 0,
4705 tool_call_count INTEGER NOT NULL DEFAULT 0,
4706 plan_message_count INTEGER NOT NULL DEFAULT 0,
4707 api_coverage_message_count INTEGER NOT NULL DEFAULT 0,
4708 content_tokens_est_total INTEGER NOT NULL DEFAULT 0,
4709 content_tokens_est_user INTEGER NOT NULL DEFAULT 0,
4710 content_tokens_est_assistant INTEGER NOT NULL DEFAULT 0,
4711 api_tokens_total INTEGER NOT NULL DEFAULT 0,
4712 api_input_tokens_total INTEGER NOT NULL DEFAULT 0,
4713 api_output_tokens_total INTEGER NOT NULL DEFAULT 0,
4714 api_cache_read_tokens_total INTEGER NOT NULL DEFAULT 0,
4715 api_cache_creation_tokens_total INTEGER NOT NULL DEFAULT 0,
4716 api_thinking_tokens_total INTEGER NOT NULL DEFAULT 0,
4717 last_updated INTEGER NOT NULL DEFAULT 0,
4718 PRIMARY KEY (day_id, agent_slug, workspace_id, source_id)
4719 );",
4720 )
4721 .unwrap();
4722
4723 conn.execute("INSERT INTO agents (id, slug) VALUES (1, 'codex')")
4724 .unwrap();
4725 conn.execute(&format!(
4726 "INSERT INTO conversations (id, agent_id, workspace_id, source_id, source_path, started_at)
4727 VALUES (1, 1, 1, 'local', '/sessions/legacy-a.jsonl', {day10_ms})"
4728 ))
4729 .unwrap();
4730 conn.execute(&format!(
4731 "INSERT INTO messages (id, conversation_id, idx, role, created_at, content)
4732 VALUES (11, 1, 0, 'user', {}, 'legacy-a1')",
4733 day10_ms + 100,
4734 ))
4735 .unwrap();
4736 conn.execute(&format!(
4737 "INSERT INTO messages (id, conversation_id, idx, role, created_at, content)
4738 VALUES (12, 1, 1, 'assistant', {}, 'legacy-a2')",
4739 day10_ms + 200,
4740 ))
4741 .unwrap();
4742
4743 conn.execute_batch(
4744 &format!(
4745 "INSERT INTO message_metrics
4746 (message_id, hour_id, day_id, agent_slug, workspace_id, source_id,
4747 role, content_chars, content_tokens_est, api_input_tokens, api_output_tokens,
4748 api_cache_read_tokens, api_cache_creation_tokens, api_thinking_tokens, api_data_source)
4749 VALUES
4750 (11, 240, 10, 'codex', 1, 'local', 'user', 10, 3, 5, 7, 0, 0, 0, 'api'),
4751 (12, 240, 10, 'codex', 1, 'local', 'assistant', 12, 4, 8, 9, 0, 0, 0, 'api');
4752 INSERT INTO usage_hourly
4753 (hour_id, agent_slug, workspace_id, source_id, message_count, user_message_count,
4754 assistant_message_count, api_coverage_message_count, api_tokens_total, last_updated)
4755 VALUES
4756 (240, 'codex', 1, 'local', 2, 1, 1, 2, 29, {now_ms});
4757 INSERT INTO usage_daily
4758 (day_id, agent_slug, workspace_id, source_id, message_count, user_message_count,
4759 assistant_message_count, api_coverage_message_count, api_tokens_total, last_updated)
4760 VALUES
4761 (10, 'codex', 1, 'local', 2, 1, 1, 2, 29, {now_ms});"
4762 ),
4763 )
4764 .unwrap();
4765
4766 conn
4767 }
4768
4769 fn setup_legacy_track_b_filter_db_without_token_usage_timestamp() -> Connection {
4770 let conn = setup_legacy_status_filter_db_without_message_metrics_created_at();
4771 let now_ms = std::time::SystemTime::now()
4772 .duration_since(std::time::UNIX_EPOCH)
4773 .unwrap()
4774 .as_millis() as i64;
4775 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
4776
4777 conn.execute_batch(
4778 "CREATE TABLE token_usage (
4779 id INTEGER PRIMARY KEY AUTOINCREMENT,
4780 message_id INTEGER NOT NULL,
4781 conversation_id INTEGER NOT NULL,
4782 agent_id INTEGER NOT NULL,
4783 workspace_id INTEGER,
4784 source_id TEXT NOT NULL DEFAULT 'local',
4785 day_id INTEGER NOT NULL,
4786 model_name TEXT,
4787 model_family TEXT,
4788 total_tokens INTEGER,
4789 estimated_cost_usd REAL NOT NULL DEFAULT 0.0,
4790 data_source TEXT NOT NULL DEFAULT 'api'
4791 );
4792 CREATE TABLE token_daily_stats (
4793 day_id INTEGER NOT NULL,
4794 agent_slug TEXT NOT NULL,
4795 source_id TEXT NOT NULL DEFAULT 'all',
4796 model_family TEXT NOT NULL DEFAULT 'all',
4797 api_call_count INTEGER NOT NULL DEFAULT 0,
4798 user_message_count INTEGER NOT NULL DEFAULT 0,
4799 assistant_message_count INTEGER NOT NULL DEFAULT 0,
4800 tool_message_count INTEGER NOT NULL DEFAULT 0,
4801 total_input_tokens INTEGER NOT NULL DEFAULT 0,
4802 total_output_tokens INTEGER NOT NULL DEFAULT 0,
4803 total_cache_read_tokens INTEGER NOT NULL DEFAULT 0,
4804 total_cache_creation_tokens INTEGER NOT NULL DEFAULT 0,
4805 total_thinking_tokens INTEGER NOT NULL DEFAULT 0,
4806 grand_total_tokens INTEGER NOT NULL DEFAULT 0,
4807 total_content_chars INTEGER NOT NULL DEFAULT 0,
4808 total_tool_calls INTEGER NOT NULL DEFAULT 0,
4809 estimated_cost_usd REAL NOT NULL DEFAULT 0.0,
4810 session_count INTEGER NOT NULL DEFAULT 0,
4811 last_updated INTEGER NOT NULL,
4812 PRIMARY KEY (day_id, agent_slug, source_id, model_family)
4813 );",
4814 )
4815 .unwrap();
4816
4817 conn.execute_batch(
4818 &format!(
4819 "INSERT INTO token_usage
4820 (message_id, conversation_id, agent_id, workspace_id, source_id, day_id,
4821 model_name, model_family, total_tokens, estimated_cost_usd, data_source)
4822 VALUES
4823 (11, 1, 1, 1, 'local', 10, 'gpt-4o-mini', 'gpt-4o', 12, 0.12, 'api'),
4824 (12, 1, 1, 1, 'local', 10, 'gpt-4o-mini', 'gpt-4o', 17, 0.17, 'api');
4825 INSERT INTO token_daily_stats
4826 (day_id, agent_slug, source_id, model_family, api_call_count, user_message_count,
4827 assistant_message_count, grand_total_tokens, estimated_cost_usd, session_count, last_updated)
4828 VALUES
4829 (10, 'codex', 'local', 'gpt-4o', 2, 1, 1, 29, 0.29, 1, {now_ms});
4830 UPDATE conversations SET started_at = {day10_ms} + 100 WHERE id = 1;"
4831 ),
4832 )
4833 .unwrap();
4834
4835 conn
4836 }
4837
4838 #[test]
4839 fn query_status_treats_millisecond_timestamps_as_fresh() {
4840 let now_ms = std::time::SystemTime::now()
4841 .duration_since(std::time::UNIX_EPOCH)
4842 .unwrap()
4843 .as_millis() as i64;
4844 let conn = setup_status_freshness_db(now_ms - 1_000, now_ms - 2_000);
4845
4846 let result = query_status(&conn, &AnalyticsFilter::default()).unwrap();
4847
4848 assert!(result.drift.track_a_fresh);
4849 assert!(result.drift.track_b_fresh);
4850 assert_eq!(result.recommended_action, "none");
4851 }
4852
4853 #[test]
4854 fn query_status_supports_legacy_second_timestamps() {
4855 let now_secs = std::time::SystemTime::now()
4856 .duration_since(std::time::UNIX_EPOCH)
4857 .unwrap()
4858 .as_secs() as i64;
4859 let conn = setup_status_freshness_db(now_secs - 5, now_secs - 10);
4860
4861 let result = query_status(&conn, &AnalyticsFilter::default()).unwrap();
4862
4863 assert!(result.drift.track_a_fresh);
4864 assert!(result.drift.track_b_fresh);
4865 }
4866
4867 #[test]
4868 fn query_status_detects_millisecond_freshness_mismatch() {
4869 let now_ms = std::time::SystemTime::now()
4870 .duration_since(std::time::UNIX_EPOCH)
4871 .unwrap()
4872 .as_millis() as i64;
4873 let stale_ms = now_ms - (3 * 86_400_000);
4874 let conn = setup_status_freshness_db(now_ms - 1_000, stale_ms);
4875
4876 let result = query_status(&conn, &AnalyticsFilter::default()).unwrap();
4877
4878 assert!(result.drift.track_a_fresh);
4879 assert!(!result.drift.track_b_fresh);
4880 assert_eq!(result.recommended_action, "rebuild_track_b");
4881 assert!(
4882 result
4883 .drift
4884 .signals
4885 .iter()
4886 .any(|signal| signal.signal == "track_freshness_mismatch")
4887 );
4888 }
4889
4890 #[test]
4891 fn query_status_deduplicates_duplicate_token_usage_rows_in_coverage() {
4892 let conn = setup_status_filter_db();
4893 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
4894 conn.execute(&format!(
4895 "INSERT INTO token_usage
4896 (message_id, conversation_id, agent_id, workspace_id, source_id, timestamp_ms, day_id,
4897 model_name, model_family, total_tokens, data_source)
4898 VALUES
4899 (11, 1, 1, 1, 'local', {}, 10, NULL, 'gpt-4o', 12, 'estimated')",
4900 day10_ms + 100
4901 ))
4902 .unwrap();
4903
4904 let filter = AnalyticsFilter {
4905 since_ms: Some(day10_ms),
4906 until_ms: Some(day10_ms + 1_000),
4907 agents: vec!["codex".into()],
4908 source: SourceFilter::Local,
4909 workspace_ids: vec![1],
4910 };
4911
4912 let result = query_status(&conn, &filter).unwrap();
4913
4914 assert_eq!(status_table_row_count(&result, "token_usage"), 2);
4915 assert_eq!(result.coverage.model_name_coverage_pct, 100.0);
4916 assert_eq!(result.coverage.estimate_only_pct, 0.0);
4917 }
4918
4919 #[test]
4920 fn query_status_blank_duplicate_token_usage_data_source_does_not_override_estimated() {
4921 let conn = setup_status_filter_db();
4922 let day11_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(11);
4923 conn.execute("UPDATE token_usage SET data_source = 'estimated' WHERE message_id = 21")
4924 .unwrap();
4925 conn.execute(&format!(
4926 "INSERT INTO token_usage
4927 (message_id, conversation_id, agent_id, workspace_id, source_id, timestamp_ms, day_id,
4928 model_name, model_family, total_tokens, data_source)
4929 VALUES
4930 (21, 2, 2, 2, 'remote-ci', {}, 11, NULL, 'claude', 11, ' ')",
4931 day11_ms + 100
4932 ))
4933 .unwrap();
4934
4935 let filter = AnalyticsFilter {
4936 agents: vec!["claude_code".into()],
4937 source: SourceFilter::Specific("remote-ci".into()),
4938 workspace_ids: vec![2],
4939 ..Default::default()
4940 };
4941
4942 let result = query_status(&conn, &filter).unwrap();
4943
4944 assert_eq!(status_table_row_count(&result, "token_usage"), 1);
4945 assert_eq!(result.coverage.total_messages, 1);
4946 assert_eq!(result.coverage.estimate_only_pct, 100.0);
4947 }
4948
4949 fn status_table_row_count(result: &StatusResult, table: &str) -> i64 {
4950 result
4951 .tables
4952 .iter()
4953 .find(|info| info.table == table)
4954 .map(|info| info.row_count)
4955 .unwrap_or(-1)
4956 }
4957
4958 #[test]
4959 fn query_status_applies_dimensional_filters_to_tables_and_coverage() {
4960 let conn = setup_status_filter_db();
4961 let filter = AnalyticsFilter {
4962 since_ms: Some(crate::storage::sqlite::FrankenStorage::millis_from_day_id(
4963 10,
4964 )),
4965 until_ms: Some(crate::storage::sqlite::FrankenStorage::millis_from_day_id(10) + 1_000),
4966 agents: vec![" codex ".into()],
4967 source: SourceFilter::Specific(" LOCAL ".into()),
4968 workspace_ids: vec![1],
4969 };
4970
4971 let result = query_status(&conn, &filter).unwrap();
4972
4973 assert_eq!(result.coverage.total_messages, 2);
4974 assert_eq!(status_table_row_count(&result, "message_metrics"), 2);
4975 assert_eq!(status_table_row_count(&result, "usage_hourly"), 1);
4976 assert_eq!(status_table_row_count(&result, "usage_daily"), 1);
4977 assert_eq!(status_table_row_count(&result, "token_usage"), 2);
4978 assert_eq!(status_table_row_count(&result, "token_daily_stats"), 1);
4979 assert_eq!(result.coverage.message_metrics_coverage_pct, 100.0);
4980 assert_eq!(result.coverage.api_token_coverage_pct, 100.0);
4981 assert_eq!(result.coverage.model_name_coverage_pct, 100.0);
4982 assert_eq!(result.coverage.estimate_only_pct, 0.0);
4983 assert_eq!(result.recommended_action, "none");
4984 }
4985
4986 #[test]
4987 fn query_status_subday_filter_excludes_same_day_rollup_rows_without_raw_matches() {
4988 let conn = setup_status_filter_db();
4989 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
4990 conn.execute(&format!(
4991 "UPDATE messages SET created_at = {} WHERE conversation_id = 1",
4992 day10_ms + 10_000
4993 ))
4994 .unwrap();
4995 conn.execute(&format!(
4996 "UPDATE message_metrics SET created_at_ms = {} WHERE agent_slug = 'codex'",
4997 day10_ms + 10_000
4998 ))
4999 .unwrap();
5000 conn.execute(&format!(
5001 "UPDATE token_usage SET timestamp_ms = {} WHERE agent_id = 1",
5002 day10_ms + 10_000
5003 ))
5004 .unwrap();
5005
5006 let filter = AnalyticsFilter {
5007 since_ms: Some(day10_ms),
5008 until_ms: Some(day10_ms + 500),
5009 agents: vec!["codex".into()],
5010 ..Default::default()
5011 };
5012
5013 let result = query_status(&conn, &filter).unwrap();
5014
5015 assert_eq!(result.coverage.total_messages, 0);
5016 assert_eq!(status_table_row_count(&result, "message_metrics"), 0);
5017 assert_eq!(status_table_row_count(&result, "usage_hourly"), 0);
5018 assert_eq!(status_table_row_count(&result, "usage_daily"), 0);
5019 assert_eq!(status_table_row_count(&result, "token_usage"), 0);
5020 assert_eq!(status_table_row_count(&result, "token_daily_stats"), 0);
5021 assert_eq!(result.recommended_action, "none");
5022 }
5023
5024 #[test]
5025 fn query_status_uses_exact_raw_timestamps_for_subday_coverage_counts() {
5026 let conn = setup_status_filter_db();
5027 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
5028 conn.execute(&format!(
5029 "UPDATE messages SET created_at = {} WHERE id = 12",
5030 day10_ms + 10_000
5031 ))
5032 .unwrap();
5033 conn.execute(&format!(
5034 "UPDATE message_metrics SET created_at_ms = {} WHERE message_id = 12",
5035 day10_ms + 10_000
5036 ))
5037 .unwrap();
5038 conn.execute(&format!(
5039 "UPDATE token_usage SET timestamp_ms = {}, model_name = NULL, data_source = 'estimated' WHERE message_id = 12",
5040 day10_ms + 10_000
5041 ))
5042 .unwrap();
5043
5044 let filter = AnalyticsFilter {
5045 since_ms: Some(day10_ms),
5046 until_ms: Some(day10_ms + 500),
5047 agents: vec!["codex".into()],
5048 source: SourceFilter::Local,
5049 workspace_ids: vec![1],
5050 };
5051
5052 let result = query_status(&conn, &filter).unwrap();
5053
5054 assert_eq!(result.coverage.total_messages, 1);
5055 assert_eq!(status_table_row_count(&result, "message_metrics"), 1);
5056 assert_eq!(status_table_row_count(&result, "token_usage"), 1);
5057 assert_eq!(result.coverage.message_metrics_coverage_pct, 100.0);
5058 assert_eq!(result.coverage.api_token_coverage_pct, 100.0);
5059 assert_eq!(result.coverage.model_name_coverage_pct, 100.0);
5060 assert_eq!(result.coverage.estimate_only_pct, 0.0);
5061 }
5062
5063 #[test]
5064 fn query_status_uses_message_metrics_timestamp_when_message_created_at_missing() {
5065 let conn = setup_status_filter_db();
5066 conn.execute("UPDATE messages SET created_at = NULL WHERE conversation_id = 1")
5067 .unwrap();
5068
5069 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
5070 let filter = AnalyticsFilter {
5071 since_ms: Some(day10_ms),
5072 until_ms: Some(day10_ms + 86_399_999),
5073 agents: vec!["codex".into()],
5074 source: SourceFilter::Local,
5075 workspace_ids: vec![1],
5076 };
5077
5078 let result = query_status(&conn, &filter).unwrap();
5079
5080 assert_eq!(result.coverage.total_messages, 2);
5081 assert_eq!(status_table_row_count(&result, "message_metrics"), 2);
5082 assert_eq!(result.coverage.message_metrics_coverage_pct, 100.0);
5083 }
5084
5085 #[test]
5086 fn query_status_uses_message_created_at_when_message_metrics_timestamp_column_is_missing() {
5087 let conn = setup_legacy_status_filter_db_without_message_metrics_created_at();
5088 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
5089 conn.execute(&format!(
5090 "UPDATE messages SET created_at = {} WHERE conversation_id = 1",
5091 day10_ms + 10_000
5092 ))
5093 .unwrap();
5094
5095 let filter = AnalyticsFilter {
5096 since_ms: Some(day10_ms),
5097 until_ms: Some(day10_ms + 500),
5098 agents: vec!["codex".into()],
5099 source: SourceFilter::Local,
5100 workspace_ids: vec![1],
5101 };
5102
5103 let result = query_status(&conn, &filter).unwrap();
5104
5105 assert_eq!(result.coverage.total_messages, 0);
5106 assert_eq!(status_table_row_count(&result, "message_metrics"), 0);
5107 assert_eq!(status_table_row_count(&result, "usage_hourly"), 0);
5108 assert_eq!(status_table_row_count(&result, "usage_daily"), 0);
5109 assert_eq!(result.recommended_action, "none");
5110 }
5111
5112 #[test]
5113 fn query_status_uses_conversation_started_at_when_message_metrics_timestamp_column_is_missing()
5114 {
5115 let conn = setup_legacy_status_filter_db_without_message_metrics_created_at();
5116 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
5117 conn.execute("UPDATE messages SET created_at = NULL WHERE conversation_id = 1")
5118 .unwrap();
5119 conn.execute(&format!(
5120 "UPDATE conversations SET started_at = {} WHERE id = 1",
5121 day10_ms + 10_000
5122 ))
5123 .unwrap();
5124
5125 let filter = AnalyticsFilter {
5126 since_ms: Some(day10_ms),
5127 until_ms: Some(day10_ms + 500),
5128 agents: vec!["codex".into()],
5129 source: SourceFilter::Local,
5130 workspace_ids: vec![1],
5131 };
5132
5133 let result = query_status(&conn, &filter).unwrap();
5134
5135 assert_eq!(result.coverage.total_messages, 0);
5136 assert_eq!(status_table_row_count(&result, "message_metrics"), 0);
5137 assert_eq!(status_table_row_count(&result, "usage_hourly"), 0);
5138 assert_eq!(status_table_row_count(&result, "usage_daily"), 0);
5139 assert_eq!(result.recommended_action, "none");
5140 }
5141
5142 #[test]
5143 fn query_status_uses_conversation_started_at_when_token_usage_timestamp_column_is_missing() {
5144 let conn = setup_legacy_track_b_filter_db_without_token_usage_timestamp();
5145 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
5146 conn.execute(&format!(
5147 "UPDATE conversations SET started_at = {} WHERE id = 1",
5148 day10_ms + 10_000
5149 ))
5150 .unwrap();
5151
5152 let filter = AnalyticsFilter {
5153 since_ms: Some(day10_ms),
5154 until_ms: Some(day10_ms + 500),
5155 agents: vec!["codex".into()],
5156 source: SourceFilter::Local,
5157 workspace_ids: vec![1],
5158 };
5159
5160 let result = query_status(&conn, &filter).unwrap();
5161
5162 assert_eq!(status_table_row_count(&result, "token_usage"), 0);
5163 assert_eq!(status_table_row_count(&result, "token_daily_stats"), 0);
5164 assert_eq!(result.coverage.model_name_coverage_pct, 0.0);
5165 assert_eq!(result.coverage.estimate_only_pct, 0.0);
5166 assert_eq!(result.recommended_action, "rebuild_track_b");
5167 }
5168
5169 #[test]
5170 fn query_cost_timeseries_uses_conversation_started_at_when_token_usage_timestamp_column_is_missing()
5171 {
5172 let conn = setup_legacy_track_b_filter_db_without_token_usage_timestamp();
5173 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
5174 conn.execute(&format!(
5175 "UPDATE conversations SET started_at = {} WHERE id = 1",
5176 day10_ms + 10_000
5177 ))
5178 .unwrap();
5179
5180 let filter = AnalyticsFilter {
5181 since_ms: Some(day10_ms),
5182 until_ms: Some(day10_ms + 500),
5183 agents: vec!["codex".into()],
5184 source: SourceFilter::Local,
5185 workspace_ids: vec![1],
5186 };
5187
5188 let result = query_cost_timeseries(&conn, &filter, GroupBy::Hour).unwrap();
5189
5190 assert_eq!(result.source_table, "token_usage");
5191 assert!(result.buckets.is_empty());
5192 assert_eq!(result.totals.api_tokens_total, 0);
5193 assert_eq!(result.totals.estimated_cost_usd, 0.0);
5194 }
5195
5196 #[test]
5197 fn query_breakdown_model_api_total_uses_conversation_started_at_when_token_usage_timestamp_column_is_missing()
5198 {
5199 let conn = setup_legacy_track_b_filter_db_without_token_usage_timestamp();
5200 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
5201 conn.execute(&format!(
5202 "UPDATE conversations SET started_at = {} WHERE id = 1",
5203 day10_ms + 10_000
5204 ))
5205 .unwrap();
5206
5207 let filter = AnalyticsFilter {
5208 since_ms: Some(day10_ms),
5209 until_ms: Some(day10_ms + 500),
5210 agents: vec!["codex".into()],
5211 source: SourceFilter::Local,
5212 workspace_ids: vec![1],
5213 };
5214
5215 let result = query_breakdown(&conn, &filter, Dim::Model, Metric::ApiTotal, 10).unwrap();
5216
5217 assert_eq!(result.source_table, "token_usage");
5218 assert!(result.rows.is_empty());
5219 }
5220
5221 #[test]
5222 fn query_status_unknown_workspace_filter_returns_empty_subset() {
5223 let conn = setup_status_filter_db();
5224 let filter = AnalyticsFilter {
5225 workspace_ids: vec![999],
5226 ..Default::default()
5227 };
5228
5229 let result = query_status(&conn, &filter).unwrap();
5230
5231 assert_eq!(result.coverage.total_messages, 0);
5232 assert_eq!(status_table_row_count(&result, "message_metrics"), 0);
5233 assert_eq!(status_table_row_count(&result, "usage_hourly"), 0);
5234 assert_eq!(status_table_row_count(&result, "usage_daily"), 0);
5235 assert_eq!(status_table_row_count(&result, "token_usage"), 0);
5236 assert_eq!(status_table_row_count(&result, "token_daily_stats"), 0);
5237 assert!(result.drift.signals.is_empty());
5238 assert_eq!(result.recommended_action, "none");
5239 }
5240
5241 #[test]
5242 fn query_status_source_filter_matches_blank_remote_raw_source_ids_via_origin_host() {
5243 let conn = setup_status_filter_db();
5244 conn.execute("ALTER TABLE conversations ADD COLUMN origin_host TEXT")
5245 .unwrap();
5246 conn.execute(
5247 "UPDATE conversations SET source_id = ' ', origin_host = 'remote-ci' WHERE id = 2",
5248 )
5249 .unwrap();
5250 conn.execute(
5251 "UPDATE message_metrics SET source_id = ' ' WHERE agent_slug = 'claude_code'",
5252 )
5253 .unwrap();
5254 conn.execute("UPDATE usage_hourly SET source_id = ' ' WHERE agent_slug = 'claude_code'")
5255 .unwrap();
5256 conn.execute("UPDATE usage_daily SET source_id = ' ' WHERE agent_slug = 'claude_code'")
5257 .unwrap();
5258 conn.execute("UPDATE token_usage SET source_id = ' ' WHERE conversation_id = 2")
5259 .unwrap();
5260 conn.execute(
5261 "UPDATE token_daily_stats SET source_id = ' ' WHERE agent_slug = 'claude_code'",
5262 )
5263 .unwrap();
5264
5265 let filter = AnalyticsFilter {
5266 source: SourceFilter::Specific("remote-ci".into()),
5267 ..Default::default()
5268 };
5269 let result = query_status(&conn, &filter).unwrap();
5270
5271 assert_eq!(result.coverage.total_messages, 1);
5272 assert_eq!(status_table_row_count(&result, "message_metrics"), 1);
5273 assert_eq!(status_table_row_count(&result, "usage_hourly"), 1);
5274 assert_eq!(status_table_row_count(&result, "usage_daily"), 1);
5275 assert_eq!(status_table_row_count(&result, "token_usage"), 1);
5276 assert_eq!(status_table_row_count(&result, "token_daily_stats"), 1);
5277 assert_eq!(result.recommended_action, "none");
5278 }
5279
5280 #[test]
5281 fn query_breakdown_by_agent_returns_ordered_rows() {
5282 let conn = setup_usage_daily_db();
5283 let filter = AnalyticsFilter::default();
5284 let result = query_breakdown(&conn, &filter, Dim::Agent, Metric::ApiTotal, 10).unwrap();
5285
5286 assert_eq!(result.dim, Dim::Agent);
5287 assert_eq!(result.metric, Metric::ApiTotal);
5288 assert!(!result.rows.is_empty());
5289 assert_eq!(result.rows[0].key, "claude_code");
5291 for i in 1..result.rows.len() {
5293 assert!(result.rows[i - 1].value >= result.rows[i].value);
5294 }
5295 }
5296
5297 #[test]
5298 fn query_breakdown_by_agent_coalesces_trimmed_and_blank_agent_slugs() {
5299 let conn = setup_usage_daily_db();
5300 conn.execute("UPDATE usage_daily SET agent_slug = ' codex ' WHERE agent_slug = 'codex'")
5301 .unwrap();
5302 conn.execute("UPDATE usage_daily SET agent_slug = ' ' WHERE agent_slug = 'aider'")
5303 .unwrap();
5304
5305 let result = query_breakdown(
5306 &conn,
5307 &AnalyticsFilter::default(),
5308 Dim::Agent,
5309 Metric::ToolCalls,
5310 10,
5311 )
5312 .unwrap();
5313
5314 let codex = result.rows.iter().find(|row| row.key == "codex").unwrap();
5315 assert_eq!(codex.bucket.tool_call_count, 25);
5316
5317 let unknown = result.rows.iter().find(|row| row.key == "unknown").unwrap();
5318 assert_eq!(unknown.bucket.tool_call_count, 5);
5319 }
5320
5321 #[test]
5322 fn query_breakdown_by_agent_coverage_pct_orders_by_coverage_before_limit() {
5323 let conn = setup_usage_daily_db();
5324 conn.execute(
5325 "UPDATE usage_daily
5326 SET api_coverage_message_count = CASE agent_slug
5327 WHEN 'claude_code' THEN 10
5328 WHEN 'codex' THEN message_count
5329 ELSE api_coverage_message_count
5330 END",
5331 )
5332 .unwrap();
5333
5334 let result = query_breakdown(
5335 &conn,
5336 &AnalyticsFilter::default(),
5337 Dim::Agent,
5338 Metric::CoveragePct,
5339 1,
5340 )
5341 .unwrap();
5342
5343 assert_eq!(result.source_table, "usage_daily");
5344 assert_eq!(result.rows.len(), 1);
5345 assert_eq!(result.rows[0].key, "codex");
5346 assert_eq!(result.rows[0].value, 100);
5347 }
5348
5349 #[test]
5350 fn query_breakdown_by_source_filters_correctly() {
5351 let conn = setup_usage_daily_db();
5352 let filter = AnalyticsFilter {
5353 source: SourceFilter::Local,
5354 ..Default::default()
5355 };
5356 let result =
5357 query_breakdown(&conn, &filter, Dim::Source, Metric::MessageCount, 10).unwrap();
5358
5359 assert_eq!(result.rows.len(), 1);
5361 assert_eq!(result.rows[0].key, "local");
5362 }
5363
5364 #[test]
5365 fn query_breakdown_by_source_specific_filter_applies_before_limit_on_track_a_rollup() {
5366 let conn = setup_usage_daily_db();
5367 let filter = AnalyticsFilter {
5368 source: SourceFilter::Specific("remote".into()),
5369 ..Default::default()
5370 };
5371 let result = query_breakdown(&conn, &filter, Dim::Source, Metric::MessageCount, 1).unwrap();
5372
5373 assert_eq!(result.source_table, "usage_daily");
5374 assert_eq!(result.rows.len(), 1);
5375 assert_eq!(result.rows[0].key, "remote");
5376 assert_eq!(result.rows[0].value, 30);
5377 }
5378
5379 #[test]
5380 fn query_breakdown_by_source_coalesces_trimmed_local_ids() {
5381 let conn = setup_usage_daily_db();
5382 conn.execute_compat(
5383 "INSERT INTO usage_daily (day_id, agent_slug, workspace_id, source_id,
5384 message_count, user_message_count, assistant_message_count,
5385 tool_call_count, plan_message_count, api_coverage_message_count,
5386 content_tokens_est_total, content_tokens_est_user, content_tokens_est_assistant,
5387 api_tokens_total, api_input_tokens_total, api_output_tokens_total,
5388 api_cache_read_tokens_total, api_cache_creation_tokens_total,
5389 api_thinking_tokens_total)
5390 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19)",
5391 frankensqlite::params![
5392 20252,
5393 "cursor",
5394 3,
5395 " LOCAL ",
5396 5,
5397 2,
5398 3,
5399 1,
5400 0,
5401 1,
5402 500,
5403 250,
5404 250,
5405 700,
5406 300,
5407 300,
5408 50,
5409 30,
5410 20
5411 ],
5412 )
5413 .unwrap();
5414
5415 let result = query_breakdown(
5416 &conn,
5417 &AnalyticsFilter::default(),
5418 Dim::Source,
5419 Metric::MessageCount,
5420 10,
5421 )
5422 .unwrap();
5423 let local_rows: Vec<_> = result
5424 .rows
5425 .iter()
5426 .filter(|row| row.key == "local")
5427 .collect();
5428
5429 assert_eq!(local_rows.len(), 1);
5430 assert_eq!(local_rows[0].value, 335);
5431 }
5432
5433 #[test]
5434 fn query_breakdown_by_source_with_cost_metric_coalesces_trimmed_local_ids() {
5435 let conn = setup_token_daily_stats_db();
5436 let now = std::time::SystemTime::now()
5437 .duration_since(std::time::UNIX_EPOCH)
5438 .unwrap()
5439 .as_secs() as i64;
5440 conn.execute_compat(
5441 "INSERT INTO token_daily_stats VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19)",
5442 frankensqlite::params![20251, "cursor", " LOCAL ", "sonnet", 10, 5, 5, 1, 1500, 1200, 0, 0, 0, 2700, 9000, 1, 0.25, 1, now],
5443 )
5444 .unwrap();
5445
5446 let result = query_breakdown(
5447 &conn,
5448 &AnalyticsFilter::default(),
5449 Dim::Source,
5450 Metric::EstimatedCostUsd,
5451 10,
5452 )
5453 .unwrap();
5454
5455 assert_eq!(result.rows.len(), 1);
5456 assert_eq!(result.rows[0].key, "local");
5457 assert_eq!(result.rows[0].value, 3);
5458 }
5459
5460 #[test]
5461 fn query_breakdown_by_source_specific_filter_applies_before_limit_on_track_b_rollup() {
5462 let conn = setup_token_daily_stats_db();
5463 let now = std::time::SystemTime::now()
5464 .duration_since(std::time::UNIX_EPOCH)
5465 .unwrap()
5466 .as_secs() as i64;
5467 conn.execute_compat(
5468 "INSERT INTO token_daily_stats VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19)",
5469 frankensqlite::params![20250, "claude_code", "remote-ci", "sonnet", 5, 2, 3, 1, 1200, 900, 0, 0, 0, 2100, 6000, 1, 0.6, 1, now],
5470 )
5471 .unwrap();
5472
5473 let filter = AnalyticsFilter {
5474 source: SourceFilter::Specific("remote-ci".into()),
5475 ..Default::default()
5476 };
5477 let result =
5478 query_breakdown(&conn, &filter, Dim::Source, Metric::EstimatedCostUsd, 1).unwrap();
5479
5480 assert_eq!(result.source_table, "token_daily_stats");
5481 assert_eq!(result.rows.len(), 1);
5482 assert_eq!(result.rows[0].key, "remote-ci");
5483 assert_eq!(result.rows[0].value, 1);
5484 }
5485
5486 #[test]
5487 fn query_breakdown_by_source_message_count_recovers_blank_remote_usage_daily_source_via_origin_host()
5488 {
5489 let conn = setup_status_filter_db();
5490 conn.execute("ALTER TABLE conversations ADD COLUMN origin_host TEXT")
5491 .unwrap();
5492 conn.execute(
5493 "UPDATE conversations SET source_id = ' ', origin_host = 'remote-ci' WHERE id = 2",
5494 )
5495 .unwrap();
5496 conn.execute("UPDATE usage_daily SET source_id = ' ' WHERE agent_slug = 'claude_code'")
5497 .unwrap();
5498
5499 let result = query_breakdown(
5500 &conn,
5501 &AnalyticsFilter::default(),
5502 Dim::Source,
5503 Metric::MessageCount,
5504 10,
5505 )
5506 .unwrap();
5507
5508 assert_eq!(result.source_table, "messages");
5509 let remote = result
5510 .rows
5511 .iter()
5512 .find(|row| row.key == "remote-ci")
5513 .expect("remote source row should exist");
5514 assert_eq!(remote.value, 1);
5515 assert_eq!(remote.message_count, 1);
5516 let local = result
5517 .rows
5518 .iter()
5519 .find(|row| row.key == "local")
5520 .expect("local source row should exist");
5521 assert_eq!(local.value, 2);
5522 }
5523
5524 #[test]
5525 fn query_breakdown_by_source_api_total_matches_blank_remote_usage_daily_source_via_origin_host()
5526 {
5527 let conn = setup_status_filter_db();
5528 conn.execute("ALTER TABLE conversations ADD COLUMN origin_host TEXT")
5529 .unwrap();
5530 conn.execute(
5531 "UPDATE conversations SET source_id = ' ', origin_host = 'remote-ci' WHERE id = 2",
5532 )
5533 .unwrap();
5534 conn.execute("UPDATE usage_daily SET source_id = ' ' WHERE agent_slug = 'claude_code'")
5535 .unwrap();
5536 conn.execute(
5537 "UPDATE message_metrics SET api_input_tokens = 13, api_output_tokens = 7, api_data_source = 'api' WHERE message_id = 21",
5538 )
5539 .unwrap();
5540
5541 let filter = AnalyticsFilter {
5542 source: SourceFilter::Specific("remote-ci".into()),
5543 ..Default::default()
5544 };
5545 let result = query_breakdown(&conn, &filter, Dim::Source, Metric::ApiTotal, 10).unwrap();
5546
5547 assert_eq!(result.source_table, "message_metrics");
5548 assert_eq!(result.rows.len(), 1);
5549 assert_eq!(result.rows[0].key, "remote-ci");
5550 assert_eq!(result.rows[0].value, 20);
5551 assert_eq!(result.rows[0].bucket.api_tokens_total, 20);
5552 assert_eq!(result.rows[0].bucket.api_coverage_message_count, 1);
5553 }
5554
5555 #[test]
5556 fn query_breakdown_source_with_cost_metric_source_filter_matches_blank_remote_token_usage_source_via_origin_host()
5557 {
5558 let conn = setup_status_filter_db();
5559 conn.execute("ALTER TABLE conversations ADD COLUMN origin_host TEXT")
5560 .unwrap();
5561 conn.execute("ALTER TABLE token_usage ADD COLUMN estimated_cost_usd REAL")
5562 .unwrap();
5563 conn.execute(
5564 "UPDATE conversations SET source_id = ' ', origin_host = 'remote-ci' WHERE id = 2",
5565 )
5566 .unwrap();
5567 conn.execute(
5568 "UPDATE token_usage SET source_id = ' ', estimated_cost_usd = 0.4 WHERE conversation_id = 2",
5569 )
5570 .unwrap();
5571 conn.execute(
5572 "UPDATE token_daily_stats SET source_id = ' ' WHERE agent_slug = 'claude_code'",
5573 )
5574 .unwrap();
5575
5576 let filter = AnalyticsFilter {
5577 source: SourceFilter::Specific("remote-ci".into()),
5578 ..Default::default()
5579 };
5580 let result =
5581 query_breakdown(&conn, &filter, Dim::Source, Metric::EstimatedCostUsd, 10).unwrap();
5582
5583 assert_eq!(result.source_table, "token_usage");
5584 assert_eq!(result.rows.len(), 1);
5585 assert_eq!(result.rows[0].key, "remote-ci");
5586 assert_eq!(result.rows[0].message_count, 1);
5587 assert!((result.rows[0].bucket.estimated_cost_usd - 0.4).abs() < 0.001);
5588 }
5589
5590 #[test]
5591 fn query_breakdown_agent_with_cost_metric_source_filter_matches_blank_remote_token_usage_source_via_origin_host()
5592 {
5593 let conn = setup_status_filter_db();
5594 conn.execute("ALTER TABLE conversations ADD COLUMN origin_host TEXT")
5595 .unwrap();
5596 conn.execute("ALTER TABLE token_usage ADD COLUMN estimated_cost_usd REAL")
5597 .unwrap();
5598 conn.execute(
5599 "UPDATE conversations SET source_id = ' ', origin_host = 'remote-ci' WHERE id = 2",
5600 )
5601 .unwrap();
5602 conn.execute(
5603 "UPDATE token_usage SET source_id = ' ', estimated_cost_usd = 0.4 WHERE conversation_id = 2",
5604 )
5605 .unwrap();
5606 conn.execute(
5607 "UPDATE token_daily_stats SET source_id = ' ' WHERE agent_slug = 'claude_code'",
5608 )
5609 .unwrap();
5610
5611 let filter = AnalyticsFilter {
5612 source: SourceFilter::Specific("remote-ci".into()),
5613 ..Default::default()
5614 };
5615 let result =
5616 query_breakdown(&conn, &filter, Dim::Agent, Metric::EstimatedCostUsd, 10).unwrap();
5617
5618 assert_eq!(result.source_table, "token_usage");
5619 assert_eq!(result.rows.len(), 1);
5620 assert_eq!(result.rows[0].key, "claude_code");
5621 assert_eq!(result.rows[0].message_count, 1);
5622 assert!((result.rows[0].bucket.estimated_cost_usd - 0.4).abs() < 0.001);
5623 }
5624
5625 #[test]
5626 fn query_breakdown_source_with_cost_metric_default_uses_token_usage_to_recover_blank_remote_source_via_origin_host()
5627 {
5628 let conn = setup_status_filter_db();
5629 conn.execute("ALTER TABLE conversations ADD COLUMN origin_host TEXT")
5630 .unwrap();
5631 conn.execute("ALTER TABLE token_usage ADD COLUMN estimated_cost_usd REAL")
5632 .unwrap();
5633 conn.execute(
5634 "UPDATE conversations SET source_id = ' ', origin_host = 'remote-ci' WHERE id = 2",
5635 )
5636 .unwrap();
5637 conn.execute(
5638 "UPDATE token_usage SET source_id = ' ', estimated_cost_usd = 0.4 WHERE conversation_id = 2",
5639 )
5640 .unwrap();
5641 conn.execute(
5642 "UPDATE token_daily_stats SET source_id = ' ', estimated_cost_usd = 0.4 WHERE agent_slug = 'claude_code'",
5643 )
5644 .unwrap();
5645
5646 let result = query_breakdown(
5647 &conn,
5648 &AnalyticsFilter::default(),
5649 Dim::Source,
5650 Metric::EstimatedCostUsd,
5651 10,
5652 )
5653 .unwrap();
5654
5655 assert_eq!(result.source_table, "token_usage");
5656 let remote = result
5657 .rows
5658 .iter()
5659 .find(|row| row.key == "remote-ci")
5660 .expect("remote source row should exist");
5661 assert_eq!(remote.message_count, 1);
5662 assert!((remote.bucket.estimated_cost_usd - 0.4).abs() < 0.001);
5663 }
5664
5665 #[test]
5666 fn query_breakdown_workspace_filter_applies_on_track_a() {
5667 let conn = setup_usage_daily_db();
5668 let filter = AnalyticsFilter {
5669 workspace_ids: vec![2],
5670 ..Default::default()
5671 };
5672 let result = query_breakdown(&conn, &filter, Dim::Agent, Metric::MessageCount, 10).unwrap();
5673 assert_eq!(result.rows.len(), 1);
5674 assert_eq!(result.rows[0].key, "aider");
5675 assert_eq!(result.rows[0].value, 30);
5676 }
5677
5678 #[test]
5679 fn query_breakdown_by_agent_tool_calls_matches_blank_remote_usage_daily_source_via_origin_host()
5680 {
5681 let conn = setup_tools_remote_source_fallback_db();
5682 let filter = AnalyticsFilter {
5683 source: SourceFilter::Specific("remote-ci".into()),
5684 ..Default::default()
5685 };
5686
5687 let result = query_breakdown(&conn, &filter, Dim::Agent, Metric::ToolCalls, 10).unwrap();
5688
5689 assert_eq!(result.source_table, "message_metrics");
5690 assert_eq!(result.rows.len(), 1);
5691 assert_eq!(result.rows[0].key, "claude_code");
5692 assert_eq!(result.rows[0].message_count, 1);
5693 assert_eq!(result.rows[0].value, 7);
5694 assert_eq!(result.rows[0].bucket.tool_call_count, 7);
5695 }
5696
5697 #[test]
5698 fn query_breakdown_by_agent_plan_count_matches_blank_remote_usage_daily_source_via_origin_host()
5699 {
5700 let conn = setup_tools_remote_source_fallback_db();
5701 conn.execute("ALTER TABLE message_metrics ADD COLUMN has_plan INTEGER NOT NULL DEFAULT 0")
5702 .unwrap();
5703 conn.execute("UPDATE message_metrics SET has_plan = 1 WHERE message_id = 21")
5704 .unwrap();
5705 let filter = AnalyticsFilter {
5706 source: SourceFilter::Specific("remote-ci".into()),
5707 ..Default::default()
5708 };
5709
5710 let result = query_breakdown(&conn, &filter, Dim::Agent, Metric::PlanCount, 10).unwrap();
5711
5712 assert_eq!(result.source_table, "message_metrics");
5713 assert_eq!(result.rows.len(), 1);
5714 assert_eq!(result.rows[0].key, "claude_code");
5715 assert_eq!(result.rows[0].message_count, 1);
5716 assert_eq!(result.rows[0].value, 1);
5717 assert_eq!(result.rows[0].bucket.plan_message_count, 1);
5718 }
5719
5720 #[test]
5721 fn query_breakdown_by_agent_message_count_uses_message_metrics_timestamp_when_message_created_at_missing()
5722 {
5723 let conn = setup_tools_remote_source_fallback_db();
5724 conn.execute("UPDATE messages SET created_at = NULL WHERE id = 21")
5725 .unwrap();
5726 let filter = AnalyticsFilter {
5727 source: SourceFilter::Specific("remote-ci".into()),
5728 since_ms: Some(1_700_000_000_500),
5729 until_ms: Some(1_700_000_001_500),
5730 ..Default::default()
5731 };
5732
5733 let result = query_breakdown(&conn, &filter, Dim::Agent, Metric::MessageCount, 10).unwrap();
5734
5735 assert_eq!(result.source_table, "messages");
5736 assert_eq!(result.rows.len(), 1);
5737 assert_eq!(result.rows[0].key, "claude_code");
5738 assert_eq!(result.rows[0].value, 1);
5739 assert_eq!(result.rows[0].message_count, 1);
5740 }
5741
5742 #[test]
5743 fn query_breakdown_by_agent_api_total_uses_message_metrics_timestamp_when_message_created_at_missing()
5744 {
5745 let conn = setup_tools_remote_source_fallback_db();
5746 conn.execute("UPDATE messages SET created_at = NULL WHERE id = 21")
5747 .unwrap();
5748 let filter = AnalyticsFilter {
5749 source: SourceFilter::Specific("remote-ci".into()),
5750 since_ms: Some(1_700_000_000_500),
5751 until_ms: Some(1_700_000_001_500),
5752 ..Default::default()
5753 };
5754
5755 let result = query_breakdown(&conn, &filter, Dim::Agent, Metric::ApiTotal, 10).unwrap();
5756
5757 assert_eq!(result.source_table, "message_metrics");
5758 assert_eq!(result.rows.len(), 1);
5759 assert_eq!(result.rows[0].key, "claude_code");
5760 assert_eq!(result.rows[0].value, 100);
5761 assert_eq!(result.rows[0].bucket.api_tokens_total, 100);
5762 }
5763
5764 #[test]
5765 fn query_breakdown_by_agent_api_total_subday_filter_excludes_same_day_rollup_rows_without_raw_matches()
5766 {
5767 let conn = setup_status_filter_db();
5768 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
5769 let later_ms = day10_ms + (12 * 60 * 60 * 1000);
5770
5771 conn.execute(&format!(
5772 "UPDATE messages SET created_at = {later_ms} WHERE conversation_id = 1"
5773 ))
5774 .unwrap();
5775 conn.execute(&format!(
5776 "UPDATE message_metrics SET created_at_ms = {later_ms} WHERE agent_slug = 'codex'"
5777 ))
5778 .unwrap();
5779
5780 let filter = AnalyticsFilter {
5781 since_ms: Some(day10_ms),
5782 until_ms: Some(day10_ms + 500),
5783 agents: vec!["codex".into()],
5784 ..Default::default()
5785 };
5786 let result = query_breakdown(&conn, &filter, Dim::Agent, Metric::ApiTotal, 10).unwrap();
5787
5788 assert_eq!(result.source_table, "message_metrics");
5789 assert!(result.rows.is_empty());
5790 }
5791
5792 #[test]
5793 fn query_breakdown_model_api_total_deduplicates_duplicate_token_usage_rows() {
5794 let conn = setup_status_filter_db();
5795 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
5796 conn.execute(&format!(
5797 "INSERT INTO token_usage
5798 (message_id, conversation_id, agent_id, workspace_id, source_id, timestamp_ms, day_id,
5799 model_name, model_family, total_tokens, data_source)
5800 VALUES
5801 (11, 1, 1, 1, 'local', {}, 10, 'gpt-4o-mini', 'gpt-4o', 12, 'api')",
5802 day10_ms + 100
5803 ))
5804 .unwrap();
5805
5806 let filter = AnalyticsFilter {
5807 workspace_ids: vec![1],
5808 agents: vec!["codex".into()],
5809 source: SourceFilter::Local,
5810 ..Default::default()
5811 };
5812 let result = query_breakdown(&conn, &filter, Dim::Model, Metric::ApiTotal, 10).unwrap();
5813
5814 assert_eq!(result.source_table, "token_usage");
5815 assert_eq!(result.rows.len(), 1);
5816 assert_eq!(result.rows[0].message_count, 2);
5817 assert_eq!(result.rows[0].value, 29);
5818 assert_eq!(result.rows[0].bucket.api_tokens_total, 29);
5819 }
5820
5821 #[test]
5822 fn query_breakdown_by_agent_api_total_matches_blank_remote_usage_daily_source_via_origin_host()
5823 {
5824 let conn = setup_tools_remote_source_fallback_db();
5825 let filter = AnalyticsFilter {
5826 source: SourceFilter::Specific("remote-ci".into()),
5827 ..Default::default()
5828 };
5829
5830 let result = query_breakdown(&conn, &filter, Dim::Agent, Metric::ApiTotal, 10).unwrap();
5831
5832 assert_eq!(result.source_table, "message_metrics");
5833 assert_eq!(result.rows.len(), 1);
5834 assert_eq!(result.rows[0].key, "claude_code");
5835 assert_eq!(result.rows[0].message_count, 1);
5836 assert_eq!(result.rows[0].value, 100);
5837 assert_eq!(result.rows[0].bucket.api_tokens_total, 100);
5838 }
5839
5840 #[test]
5841 fn query_breakdown_by_workspace_message_count_matches_blank_remote_usage_daily_source_via_origin_host()
5842 {
5843 let conn = setup_tools_remote_source_fallback_db();
5844 let filter = AnalyticsFilter {
5845 source: SourceFilter::Specific("remote-ci".into()),
5846 ..Default::default()
5847 };
5848
5849 let result =
5850 query_breakdown(&conn, &filter, Dim::Workspace, Metric::MessageCount, 10).unwrap();
5851
5852 assert_eq!(result.source_table, "messages");
5853 assert_eq!(result.rows.len(), 1);
5854 assert_eq!(result.rows[0].key, "2");
5855 assert_eq!(result.rows[0].message_count, 1);
5856 assert_eq!(result.rows[0].value, 1);
5857 }
5858
5859 #[test]
5860 fn query_breakdown_model_workspace_filter_uses_token_usage_and_normalizes_filters() {
5861 let conn = setup_status_filter_db();
5862 let filter = AnalyticsFilter {
5863 workspace_ids: vec![1],
5864 agents: vec![" codex ".into()],
5865 source: SourceFilter::Specific(" LOCAL ".into()),
5866 ..Default::default()
5867 };
5868 let result = query_breakdown(&conn, &filter, Dim::Model, Metric::ApiTotal, 10).unwrap();
5869
5870 assert_eq!(result.source_table, "token_usage");
5871 assert_eq!(result.rows.len(), 1);
5872 assert_eq!(result.rows[0].key, "gpt-4o");
5873 assert_eq!(result.rows[0].message_count, 2);
5874 assert_eq!(result.rows[0].value, 29);
5875 assert_eq!(result.rows[0].bucket.api_tokens_total, 29);
5876 }
5877
5878 #[test]
5879 fn query_breakdown_model_api_total_subday_filter_excludes_same_day_rollup_rows_without_raw_matches()
5880 {
5881 let conn = setup_status_filter_db();
5882 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
5883 let later_ms = day10_ms + (12 * 60 * 60 * 1000);
5884
5885 conn.execute(&format!(
5886 "UPDATE token_usage SET timestamp_ms = {later_ms} WHERE conversation_id = 1"
5887 ))
5888 .unwrap();
5889
5890 let filter = AnalyticsFilter {
5891 since_ms: Some(day10_ms),
5892 until_ms: Some(day10_ms + 500),
5893 agents: vec!["codex".into()],
5894 ..Default::default()
5895 };
5896 let result = query_breakdown(&conn, &filter, Dim::Model, Metric::ApiTotal, 10).unwrap();
5897
5898 assert_eq!(result.source_table, "token_usage");
5899 assert!(result.rows.is_empty());
5900 }
5901
5902 #[test]
5903 fn query_breakdown_model_unknown_workspace_filter_returns_empty() {
5904 let conn = setup_status_filter_db();
5905 let filter = AnalyticsFilter {
5906 workspace_ids: vec![999],
5907 ..Default::default()
5908 };
5909 let result = query_breakdown(&conn, &filter, Dim::Model, Metric::ApiTotal, 10).unwrap();
5910
5911 assert_eq!(result.source_table, "token_usage");
5912 assert!(result.rows.is_empty());
5913 }
5914
5915 #[test]
5916 fn query_breakdown_model_workspace_filter_matches_blank_remote_token_usage_source_via_origin_host()
5917 {
5918 let conn = setup_status_filter_db();
5919 conn.execute("ALTER TABLE conversations ADD COLUMN origin_host TEXT")
5920 .unwrap();
5921 conn.execute(
5922 "UPDATE conversations SET source_id = ' ', origin_host = 'remote-ci' WHERE id = 2",
5923 )
5924 .unwrap();
5925 conn.execute("UPDATE token_usage SET source_id = ' ' WHERE conversation_id = 2")
5926 .unwrap();
5927 conn.execute(
5928 "UPDATE token_daily_stats SET source_id = ' ' WHERE agent_slug = 'claude_code'",
5929 )
5930 .unwrap();
5931
5932 let filter = AnalyticsFilter {
5933 workspace_ids: vec![2],
5934 agents: vec!["claude_code".into()],
5935 source: SourceFilter::Specific("remote-ci".into()),
5936 ..Default::default()
5937 };
5938 let result = query_breakdown(&conn, &filter, Dim::Model, Metric::ApiTotal, 10).unwrap();
5939
5940 assert_eq!(result.source_table, "token_usage");
5941 assert_eq!(result.rows.len(), 1);
5942 assert_eq!(result.rows[0].key, "claude");
5943 assert_eq!(result.rows[0].message_count, 1);
5944 assert_eq!(result.rows[0].value, 11);
5945 assert_eq!(result.rows[0].bucket.api_tokens_total, 11);
5946 }
5947
5948 #[test]
5949 fn query_breakdown_model_source_filter_matches_blank_remote_token_daily_stats_source_via_origin_host()
5950 {
5951 let conn = setup_status_filter_db();
5952 conn.execute("ALTER TABLE conversations ADD COLUMN origin_host TEXT")
5953 .unwrap();
5954 conn.execute(
5955 "UPDATE conversations SET source_id = ' ', origin_host = 'remote-ci' WHERE id = 2",
5956 )
5957 .unwrap();
5958 conn.execute("UPDATE token_usage SET source_id = ' ' WHERE conversation_id = 2")
5959 .unwrap();
5960 conn.execute(
5961 "UPDATE token_daily_stats SET source_id = ' ' WHERE agent_slug = 'claude_code'",
5962 )
5963 .unwrap();
5964
5965 let filter = AnalyticsFilter {
5966 agents: vec!["claude_code".into()],
5967 source: SourceFilter::Specific("remote-ci".into()),
5968 ..Default::default()
5969 };
5970 let result = query_breakdown(&conn, &filter, Dim::Model, Metric::ApiTotal, 10).unwrap();
5971
5972 assert_eq!(result.source_table, "token_usage");
5973 assert_eq!(result.rows.len(), 1);
5974 assert_eq!(result.rows[0].key, "claude");
5975 assert_eq!(result.rows[0].message_count, 1);
5976 assert_eq!(result.rows[0].value, 11);
5977 assert_eq!(result.rows[0].bucket.api_tokens_total, 11);
5978 }
5979
5980 #[test]
5981 fn query_breakdown_by_model_uses_track_b() {
5982 let conn = setup_token_daily_stats_db();
5983 let filter = AnalyticsFilter::default();
5984 let result = query_breakdown(&conn, &filter, Dim::Model, Metric::ApiTotal, 10).unwrap();
5985
5986 assert_eq!(result.source_table, "token_daily_stats");
5987 assert_eq!(result.rows.len(), 3); assert_eq!(result.rows[0].key, "opus");
5990 }
5991
5992 #[test]
5993 fn query_breakdown_limit_caps_rows() {
5994 let conn = setup_usage_daily_db();
5995 let filter = AnalyticsFilter::default();
5996 let result = query_breakdown(&conn, &filter, Dim::Agent, Metric::ApiTotal, 2).unwrap();
5997
5998 assert_eq!(result.rows.len(), 2);
5999 }
6000
6001 #[test]
6002 fn query_breakdown_missing_table_returns_empty() {
6003 let conn = Connection::open(":memory:").unwrap();
6004 let filter = AnalyticsFilter::default();
6005 let result = query_breakdown(&conn, &filter, Dim::Agent, Metric::ApiTotal, 10).unwrap();
6006 assert!(result.rows.is_empty());
6007 }
6008
6009 #[test]
6010 fn query_breakdown_result_to_json_shape() {
6011 let conn = setup_usage_daily_db();
6012 let filter = AnalyticsFilter::default();
6013 let result = query_breakdown(&conn, &filter, Dim::Agent, Metric::ApiTotal, 10).unwrap();
6014
6015 let json = result.to_cli_json();
6016 assert_eq!(json["dim"], "agent");
6017 assert_eq!(json["metric"], "api_total");
6018 assert!(json["rows"].is_array());
6019 assert!(json["row_count"].is_number());
6020 assert!(json["_meta"]["elapsed_ms"].is_number());
6021 }
6022
6023 #[test]
6024 fn query_unpriced_models_totals_include_hidden_models_beyond_limit() {
6025 let conn = Connection::open(":memory:").unwrap();
6026 conn.execute_batch(
6027 "CREATE TABLE token_usage (
6028 model_name TEXT,
6029 total_tokens INTEGER,
6030 estimated_cost_usd REAL
6031 );
6032 INSERT INTO token_usage (model_name, total_tokens, estimated_cost_usd) VALUES
6033 ('model-a', 100, NULL),
6034 ('model-b', 40, NULL),
6035 ('model-c', 10, NULL),
6036 ('model-priced', 25, 0.5);",
6037 )
6038 .unwrap();
6039
6040 let result = query_unpriced_models(&conn, 1).unwrap();
6041
6042 assert_eq!(result.models.len(), 1);
6043 assert_eq!(result.models[0].model_name, "model-a");
6044 assert_eq!(result.models[0].total_tokens, 100);
6045 assert_eq!(result.total_unpriced_tokens, 150);
6046 assert_eq!(result.total_priced_tokens, 25);
6047 }
6048
6049 #[test]
6050 fn query_unpriced_models_deduplicates_duplicate_token_usage_rows() {
6051 let conn = Connection::open(":memory:").unwrap();
6052 conn.execute_batch(
6053 "CREATE TABLE token_usage (
6054 message_id INTEGER,
6055 model_name TEXT,
6056 total_tokens INTEGER,
6057 estimated_cost_usd REAL
6058 );
6059 INSERT INTO token_usage (message_id, model_name, total_tokens, estimated_cost_usd) VALUES
6060 (1, 'model-a', 100, NULL),
6061 (1, 'model-a', 100, NULL),
6062 (2, 'model-b', 40, NULL),
6063 (3, 'model-priced', 25, 0.5),
6064 (3, 'model-priced', 25, 0.5);",
6065 )
6066 .unwrap();
6067
6068 let result = query_unpriced_models(&conn, 10).unwrap();
6069
6070 assert_eq!(result.models.len(), 2);
6071 assert_eq!(result.models[0].model_name, "model-a");
6072 assert_eq!(result.models[0].total_tokens, 100);
6073 assert_eq!(result.models[0].row_count, 1);
6074 assert_eq!(result.models[1].model_name, "model-b");
6075 assert_eq!(result.models[1].total_tokens, 40);
6076 assert_eq!(result.models[1].row_count, 1);
6077 assert_eq!(result.total_unpriced_tokens, 140);
6078 assert_eq!(result.total_priced_tokens, 25);
6079 }
6080
6081 #[test]
6082 fn query_unpriced_models_coalesces_blank_model_names_into_none_bucket() {
6083 let conn = Connection::open(":memory:").unwrap();
6084 conn.execute_batch(
6085 "CREATE TABLE token_usage (
6086 model_name TEXT,
6087 total_tokens INTEGER,
6088 estimated_cost_usd REAL
6089 );
6090 INSERT INTO token_usage (model_name, total_tokens, estimated_cost_usd) VALUES
6091 (NULL, 100, NULL),
6092 (' ', 40, NULL),
6093 (' model-a ', 10, NULL);",
6094 )
6095 .unwrap();
6096
6097 let result = query_unpriced_models(&conn, 10).unwrap();
6098
6099 assert_eq!(result.models.len(), 2);
6100 assert_eq!(result.models[0].model_name, "(none)");
6101 assert_eq!(result.models[0].total_tokens, 140);
6102 assert_eq!(result.models[0].row_count, 2);
6103 assert_eq!(result.models[1].model_name, "model-a");
6104 assert_eq!(result.models[1].total_tokens, 10);
6105 assert_eq!(result.total_unpriced_tokens, 150);
6106 }
6107
6108 #[test]
6109 fn query_unpriced_models_missing_estimated_cost_column_returns_empty_report() {
6110 let conn = Connection::open(":memory:").unwrap();
6111 conn.execute_batch(
6112 "CREATE TABLE token_usage (
6113 model_name TEXT,
6114 total_tokens INTEGER
6115 );
6116 INSERT INTO token_usage (model_name, total_tokens) VALUES
6117 ('model-a', 100),
6118 ('model-b', 40);",
6119 )
6120 .unwrap();
6121
6122 let result = query_unpriced_models(&conn, 10).unwrap();
6123
6124 assert!(result.models.is_empty());
6125 assert_eq!(result.total_unpriced_tokens, 0);
6126 assert_eq!(result.total_priced_tokens, 0);
6127 }
6128
6129 #[test]
6130 fn query_unpriced_models_without_model_name_column_uses_none_bucket() {
6131 let conn = Connection::open(":memory:").unwrap();
6132 conn.execute_batch(
6133 "CREATE TABLE token_usage (
6134 total_tokens INTEGER,
6135 estimated_cost_usd REAL
6136 );
6137 INSERT INTO token_usage (total_tokens, estimated_cost_usd) VALUES
6138 (100, NULL),
6139 (40, NULL),
6140 (25, 0.5);",
6141 )
6142 .unwrap();
6143
6144 let result = query_unpriced_models(&conn, 10).unwrap();
6145
6146 assert_eq!(result.models.len(), 1);
6147 assert_eq!(result.models[0].model_name, "(none)");
6148 assert_eq!(result.models[0].total_tokens, 140);
6149 assert_eq!(result.models[0].row_count, 2);
6150 assert_eq!(result.total_unpriced_tokens, 140);
6151 assert_eq!(result.total_priced_tokens, 25);
6152 }
6153
6154 #[test]
6155 fn query_tools_returns_agent_breakdown() {
6156 let conn = setup_usage_daily_db();
6157 let filter = AnalyticsFilter::default();
6158 let result = query_tools(&conn, &filter, GroupBy::Day, 10).unwrap();
6159
6160 assert!(!result.rows.is_empty());
6161 assert_eq!(result.rows[0].key, "claude_code");
6163 assert_eq!(result.rows[0].tool_call_count, 45);
6164
6165 let sum: i64 = result.rows.iter().map(|r| r.tool_call_count).sum();
6167 assert_eq!(result.total_tool_calls, sum);
6168 }
6169
6170 #[test]
6171 fn query_tools_normalizes_grouped_agent_slugs() {
6172 let conn = setup_usage_daily_db();
6173 conn.execute("UPDATE usage_daily SET agent_slug = ' codex ' WHERE agent_slug = 'codex'")
6174 .unwrap();
6175 conn.execute("UPDATE usage_daily SET agent_slug = '' WHERE agent_slug = 'aider'")
6176 .unwrap();
6177
6178 let result = query_tools(&conn, &AnalyticsFilter::default(), GroupBy::Day, 10).unwrap();
6179
6180 let codex = result.rows.iter().find(|row| row.key == "codex").unwrap();
6181 assert_eq!(codex.tool_call_count, 25);
6182
6183 let unknown = result.rows.iter().find(|row| row.key == "unknown").unwrap();
6184 assert_eq!(unknown.tool_call_count, 5);
6185 }
6186
6187 #[test]
6188 fn query_tools_workspace_filter_applies() {
6189 let conn = setup_usage_daily_db();
6190 let filter = AnalyticsFilter {
6191 workspace_ids: vec![2],
6192 ..Default::default()
6193 };
6194 let result = query_tools(&conn, &filter, GroupBy::Day, 10).unwrap();
6195 assert_eq!(result.rows.len(), 1);
6196 assert_eq!(result.rows[0].key, "aider");
6197 assert_eq!(result.rows[0].tool_call_count, 5);
6198 }
6199
6200 #[test]
6201 fn query_tools_totals_include_hidden_rows_beyond_limit() {
6202 let conn = setup_usage_daily_db();
6203 let result = query_tools(&conn, &AnalyticsFilter::default(), GroupBy::Day, 1).unwrap();
6204
6205 assert_eq!(result.rows.len(), 1);
6206 assert_eq!(result.rows[0].key, "claude_code");
6207 assert_eq!(result.rows[0].tool_call_count, 45);
6208 assert_eq!(result.total_tool_calls, 75);
6209 assert_eq!(result.total_messages, 360);
6210 assert_eq!(result.total_api_tokens, 210_000);
6211 }
6212
6213 #[test]
6214 fn query_tools_raw_totals_include_hidden_rows_beyond_limit() {
6215 let conn = setup_tools_remote_source_fallback_db();
6216 let filter = AnalyticsFilter {
6217 since_ms: Some(1_700_000_000_000),
6218 until_ms: Some(1_700_000_002_000),
6219 ..Default::default()
6220 };
6221 let result = query_tools(&conn, &filter, GroupBy::Day, 1).unwrap();
6222
6223 assert_eq!(result.source_table, "message_metrics");
6224 assert_eq!(result.rows.len(), 1);
6225 assert_eq!(result.rows[0].key, "claude_code");
6226 assert_eq!(result.rows[0].tool_call_count, 7);
6227 assert_eq!(result.total_tool_calls, 9);
6228 assert_eq!(result.total_messages, 2);
6229 assert_eq!(result.total_api_tokens, 130);
6230 }
6231
6232 #[test]
6233 fn query_tools_source_filter_matches_blank_remote_usage_daily_source_via_origin_host() {
6234 let conn = setup_tools_remote_source_fallback_db();
6235 let filter = AnalyticsFilter {
6236 source: SourceFilter::Specific("remote-ci".into()),
6237 ..Default::default()
6238 };
6239
6240 let result = query_tools(&conn, &filter, GroupBy::Day, 10).unwrap();
6241
6242 assert_eq!(result.source_table, "message_metrics");
6243 assert_eq!(result.rows.len(), 1);
6244 assert_eq!(result.rows[0].key, "claude_code");
6245 assert_eq!(result.rows[0].tool_call_count, 7);
6246 assert_eq!(result.rows[0].message_count, 1);
6247 assert_eq!(result.rows[0].api_tokens_total, 100);
6248 assert_eq!(result.total_tool_calls, 7);
6249 assert_eq!(result.total_messages, 1);
6250 assert_eq!(result.total_api_tokens, 100);
6251 }
6252
6253 #[test]
6254 fn query_tools_subday_filter_excludes_same_day_rollup_rows_without_raw_matches() {
6255 let conn = setup_tools_remote_source_fallback_db();
6256 let day_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(20250);
6257 let later_ms = day_ms + (12 * 60 * 60 * 1000);
6258 conn.execute(&format!(
6259 "UPDATE messages SET created_at = {later_ms} WHERE conversation_id = 1"
6260 ))
6261 .unwrap();
6262 conn.execute(&format!(
6263 "UPDATE message_metrics SET created_at_ms = {later_ms} WHERE message_id = 11"
6264 ))
6265 .unwrap();
6266
6267 let filter = AnalyticsFilter {
6268 since_ms: Some(day_ms),
6269 until_ms: Some(day_ms + 500),
6270 agents: vec!["codex".into()],
6271 ..Default::default()
6272 };
6273 let result = query_tools(&conn, &filter, GroupBy::Day, 10).unwrap();
6274
6275 assert_eq!(result.source_table, "message_metrics");
6276 assert!(result.rows.is_empty());
6277 assert_eq!(result.total_tool_calls, 0);
6278 assert_eq!(result.total_messages, 0);
6279 assert_eq!(result.total_api_tokens, 0);
6280 }
6281
6282 #[test]
6283 fn query_tools_derived_metrics_correct() {
6284 let conn = setup_usage_daily_db();
6285 let filter = AnalyticsFilter::default();
6286 let result = query_tools(&conn, &filter, GroupBy::Day, 10).unwrap();
6287
6288 for row in &result.rows {
6289 if row.api_tokens_total > 0 {
6290 let expected = row.tool_call_count as f64 / (row.api_tokens_total as f64 / 1000.0);
6291 assert!((row.tool_calls_per_1k_api_tokens.unwrap() - expected).abs() < 0.001);
6292 }
6293 }
6294 }
6295
6296 #[test]
6297 fn query_tools_missing_table_returns_empty() {
6298 let conn = Connection::open(":memory:").unwrap();
6299 let filter = AnalyticsFilter::default();
6300 let result = query_tools(&conn, &filter, GroupBy::Day, 10).unwrap();
6301 assert!(result.rows.is_empty());
6302 assert_eq!(result.total_tool_calls, 0);
6303 }
6304
6305 #[test]
6306 fn query_tools_report_to_json_shape() {
6307 let conn = setup_usage_daily_db();
6308 let filter = AnalyticsFilter::default();
6309 let result = query_tools(&conn, &filter, GroupBy::Day, 10).unwrap();
6310
6311 let json = result.to_cli_json();
6312 assert!(json["rows"].is_array());
6313 assert!(json["totals"]["tool_call_count"].is_number());
6314 assert!(json["_meta"]["elapsed_ms"].is_number());
6315 }
6316
6317 #[test]
6318 fn query_tools_hour_group_uses_usage_hourly() {
6319 let conn = setup_usage_hourly_db();
6320 let filter = AnalyticsFilter::default();
6321 let result = query_tools(&conn, &filter, GroupBy::Hour, 10).unwrap();
6322
6323 assert_eq!(result.source_table, "usage_hourly");
6324 assert_eq!(result.rows.len(), 1);
6325 assert_eq!(result.rows[0].key, "codex");
6326 assert_eq!(result.rows[0].tool_call_count, 8);
6327 assert_eq!(result.rows[0].message_count, 30);
6328 assert_eq!(result.rows[0].api_tokens_total, 4000);
6329 }
6330
6331 #[test]
6332 fn query_total_messages_filtered_deduplicates_duplicate_message_metrics_rows() {
6333 let conn = setup_duplicate_message_metrics_raw_db();
6334 let filter = AnalyticsFilter {
6335 since_ms: Some(1_700_000_000_000),
6336 until_ms: Some(1_700_000_003_000),
6337 ..Default::default()
6338 };
6339
6340 assert_eq!(query_total_messages_filtered(&conn, &filter), 2);
6341 }
6342
6343 #[test]
6344 fn query_total_messages_filtered_uses_conversation_started_at_when_message_timestamps_missing()
6345 {
6346 let conn = setup_duplicate_message_metrics_raw_db();
6347 conn.execute("UPDATE messages SET created_at = NULL")
6348 .unwrap();
6349 conn.execute("UPDATE message_metrics SET created_at_ms = NULL")
6350 .unwrap();
6351 let filter = AnalyticsFilter {
6352 since_ms: Some(1_700_000_000_000),
6353 until_ms: Some(1_700_000_000_500),
6354 ..Default::default()
6355 };
6356
6357 assert_eq!(query_total_messages_filtered(&conn, &filter), 2);
6358 }
6359
6360 #[test]
6361 fn query_tokens_timeseries_deduplicates_duplicate_message_metrics_rows() {
6362 let conn = setup_duplicate_message_metrics_raw_db();
6363 let filter = AnalyticsFilter {
6364 since_ms: Some(1_700_000_000_000),
6365 until_ms: Some(1_700_000_003_000),
6366 ..Default::default()
6367 };
6368 let result = query_tokens_timeseries(&conn, &filter, GroupBy::Day).unwrap();
6369
6370 assert_eq!(result.source_table, "message_metrics");
6371 assert_eq!(result.path, "raw");
6372 assert_eq!(result.totals.message_count, 2);
6373 assert_eq!(result.totals.tool_call_count, 7);
6374 assert_eq!(result.totals.api_tokens_total, 1_200);
6375 }
6376
6377 #[test]
6378 fn query_tokens_timeseries_uses_conversation_started_at_when_message_timestamps_missing() {
6379 let conn = setup_duplicate_message_metrics_raw_db();
6380 conn.execute("UPDATE messages SET created_at = NULL")
6381 .unwrap();
6382 conn.execute("UPDATE message_metrics SET created_at_ms = NULL")
6383 .unwrap();
6384 let filter = AnalyticsFilter {
6385 since_ms: Some(1_700_000_000_000),
6386 until_ms: Some(1_700_000_000_500),
6387 ..Default::default()
6388 };
6389 let result = query_tokens_timeseries(&conn, &filter, GroupBy::Day).unwrap();
6390
6391 assert_eq!(result.source_table, "message_metrics");
6392 assert_eq!(result.path, "raw");
6393 assert_eq!(result.totals.message_count, 2);
6394 assert_eq!(result.totals.api_tokens_total, 1_200);
6395 }
6396
6397 #[test]
6398 fn query_breakdown_by_agent_api_total_deduplicates_duplicate_message_metrics_rows() {
6399 let conn = setup_duplicate_message_metrics_raw_db();
6400 let filter = AnalyticsFilter {
6401 since_ms: Some(1_700_000_000_000),
6402 until_ms: Some(1_700_000_003_000),
6403 ..Default::default()
6404 };
6405 let result = query_breakdown(&conn, &filter, Dim::Agent, Metric::ApiTotal, 10).unwrap();
6406
6407 assert_eq!(result.source_table, "message_metrics");
6408 assert_eq!(result.rows.len(), 1);
6409 assert_eq!(result.rows[0].key, "codex");
6410 assert_eq!(result.rows[0].message_count, 2);
6411 assert_eq!(result.rows[0].value, 1_200);
6412 }
6413
6414 #[test]
6415 fn query_breakdown_by_agent_message_count_uses_conversation_started_at_when_message_timestamps_missing()
6416 {
6417 let conn = setup_duplicate_message_metrics_raw_db();
6418 conn.execute("UPDATE messages SET created_at = NULL")
6419 .unwrap();
6420 conn.execute("UPDATE message_metrics SET created_at_ms = NULL")
6421 .unwrap();
6422 let filter = AnalyticsFilter {
6423 since_ms: Some(1_700_000_000_000),
6424 until_ms: Some(1_700_000_000_500),
6425 ..Default::default()
6426 };
6427 let result = query_breakdown(&conn, &filter, Dim::Agent, Metric::MessageCount, 10).unwrap();
6428
6429 assert_eq!(result.source_table, "messages");
6430 assert_eq!(result.rows.len(), 1);
6431 assert_eq!(result.rows[0].key, "codex");
6432 assert_eq!(result.rows[0].value, 2);
6433 }
6434
6435 #[test]
6436 fn query_tools_deduplicates_duplicate_message_metrics_rows() {
6437 let conn = setup_duplicate_message_metrics_raw_db();
6438 let filter = AnalyticsFilter {
6439 since_ms: Some(1_700_000_000_000),
6440 until_ms: Some(1_700_000_003_000),
6441 ..Default::default()
6442 };
6443 let result = query_tools(&conn, &filter, GroupBy::Day, 10).unwrap();
6444
6445 assert_eq!(result.source_table, "message_metrics");
6446 assert_eq!(result.rows.len(), 1);
6447 assert_eq!(result.rows[0].key, "codex");
6448 assert_eq!(result.rows[0].tool_call_count, 7);
6449 assert_eq!(result.rows[0].message_count, 2);
6450 assert_eq!(result.rows[0].api_tokens_total, 1_200);
6451 }
6452
6453 #[test]
6454 fn query_tools_uses_conversation_started_at_when_message_timestamps_missing() {
6455 let conn = setup_duplicate_message_metrics_raw_db();
6456 conn.execute("UPDATE messages SET created_at = NULL")
6457 .unwrap();
6458 conn.execute("UPDATE message_metrics SET created_at_ms = NULL")
6459 .unwrap();
6460 let filter = AnalyticsFilter {
6461 since_ms: Some(1_700_000_000_000),
6462 until_ms: Some(1_700_000_000_500),
6463 ..Default::default()
6464 };
6465 let result = query_tools(&conn, &filter, GroupBy::Day, 10).unwrap();
6466
6467 assert_eq!(result.source_table, "message_metrics");
6468 assert_eq!(result.rows.len(), 1);
6469 assert_eq!(result.rows[0].key, "codex");
6470 assert_eq!(result.rows[0].message_count, 2);
6471 assert_eq!(result.rows[0].tool_call_count, 7);
6472 }
6473
6474 #[test]
6475 fn query_session_scatter_deduplicates_duplicate_message_metrics_rows() {
6476 let conn = setup_duplicate_message_metrics_raw_db();
6477 let points = query_session_scatter(&conn, &AnalyticsFilter::default(), 10).unwrap();
6478
6479 assert_eq!(points.len(), 1);
6480 assert_eq!(points[0].source_path, "/sessions/dup.jsonl");
6481 assert_eq!(points[0].message_count, 2);
6482 assert_eq!(points[0].api_tokens_total, 1_200);
6483 }
6484
6485 #[test]
6486 fn query_session_scatter_returns_sorted_points() {
6487 let conn = setup_session_scatter_db();
6488 let points = query_session_scatter(&conn, &AnalyticsFilter::default(), 10).unwrap();
6489
6490 assert_eq!(points.len(), 2);
6491 assert_eq!(points[0].source_path, "/sessions/b.jsonl");
6492 assert_eq!(points[0].message_count, 3);
6493 assert_eq!(points[0].api_tokens_total, 2300);
6494
6495 assert_eq!(points[1].source_path, "/sessions/a.jsonl");
6496 assert_eq!(points[1].message_count, 2);
6497 assert_eq!(points[1].api_tokens_total, 1000);
6498 }
6499
6500 #[test]
6501 fn query_session_scatter_applies_agent_and_source_filters() {
6502 let conn = setup_session_scatter_db();
6503 let filter = AnalyticsFilter {
6504 agents: vec!["codex".into()],
6505 source: SourceFilter::Local,
6506 ..Default::default()
6507 };
6508
6509 let points = query_session_scatter(&conn, &filter, 10).unwrap();
6510 assert_eq!(points.len(), 1);
6511 assert_eq!(points[0].source_id, "local");
6512 assert_eq!(points[0].source_path, "/sessions/a.jsonl");
6513 assert_eq!(points[0].message_count, 2);
6514 assert_eq!(points[0].api_tokens_total, 1000);
6515 }
6516
6517 #[test]
6518 fn query_session_scatter_without_agents_table_still_returns_points() {
6519 let conn = setup_session_scatter_db();
6520 conn.execute_batch("DROP TABLE agents;").unwrap();
6521
6522 let points = query_session_scatter(&conn, &AnalyticsFilter::default(), 10).unwrap();
6523 assert_eq!(points.len(), 2);
6524 assert_eq!(points[0].source_path, "/sessions/b.jsonl");
6525 assert_eq!(points[1].source_path, "/sessions/a.jsonl");
6526 }
6527
6528 #[test]
6529 fn query_session_scatter_with_missing_agent_row_keeps_session_without_filter() {
6530 let conn = setup_session_scatter_db();
6531 conn.execute("DELETE FROM agents WHERE id = 2").unwrap();
6532
6533 let points = query_session_scatter(&conn, &AnalyticsFilter::default(), 10).unwrap();
6534 assert_eq!(points.len(), 2);
6535 assert_eq!(points[0].source_path, "/sessions/b.jsonl");
6536 assert_eq!(points[0].api_tokens_total, 2300);
6537 }
6538
6539 #[test]
6540 fn query_session_scatter_normalizes_trimmed_agent_filter_and_agent_slug() {
6541 let conn = setup_session_scatter_db();
6542 conn.execute("UPDATE agents SET slug = ' codex ' WHERE id = 1")
6543 .unwrap();
6544
6545 let filter = AnalyticsFilter {
6546 agents: vec![" codex ".into()],
6547 ..Default::default()
6548 };
6549
6550 let points = query_session_scatter(&conn, &filter, 10).unwrap();
6551 assert_eq!(points.len(), 1);
6552 assert_eq!(points[0].source_path, "/sessions/a.jsonl");
6553 assert_eq!(points[0].api_tokens_total, 1000);
6554 }
6555
6556 #[test]
6557 fn query_session_scatter_normalizes_trimmed_local_source_ids() {
6558 let conn = setup_session_scatter_db();
6559 conn.execute("UPDATE conversations SET source_id = ' LOCAL ' WHERE id = 1")
6560 .unwrap();
6561
6562 let filter = AnalyticsFilter {
6563 agents: vec!["codex".into()],
6564 source: SourceFilter::Local,
6565 ..Default::default()
6566 };
6567
6568 let points = query_session_scatter(&conn, &filter, 10).unwrap();
6569 assert_eq!(points.len(), 1);
6570 assert_eq!(points[0].source_id, "local");
6571 assert_eq!(points[0].source_path, "/sessions/a.jsonl");
6572 }
6573
6574 #[test]
6575 fn query_session_scatter_matches_blank_remote_source_id_via_origin_host() {
6576 let conn = setup_session_scatter_db();
6577 conn.execute(
6578 "UPDATE conversations SET source_id = ' ', origin_host = 'remote-ci' WHERE id = 2",
6579 )
6580 .unwrap();
6581
6582 let filter = AnalyticsFilter {
6583 source: SourceFilter::Specific("remote-ci".into()),
6584 ..Default::default()
6585 };
6586
6587 let points = query_session_scatter(&conn, &filter, 10).unwrap();
6588 assert_eq!(points.len(), 1);
6589 assert_eq!(points[0].source_id, "remote-ci");
6590 assert_eq!(points[0].source_path, "/sessions/b.jsonl");
6591 assert_eq!(points[0].message_count, 3);
6592 assert_eq!(points[0].api_tokens_total, 2300);
6593 }
6594
6595 #[test]
6596 fn query_session_scatter_falls_back_to_token_usage_when_mm_tokens_missing() {
6597 let conn = setup_session_scatter_with_token_usage_fallback_db();
6598 let filter = AnalyticsFilter {
6599 agents: vec!["codex".into()],
6600 source: SourceFilter::Local,
6601 ..Default::default()
6602 };
6603
6604 let points = query_session_scatter(&conn, &filter, 10).unwrap();
6605 assert_eq!(points.len(), 1);
6606 assert_eq!(points[0].source_path, "/sessions/a.jsonl");
6607 assert_eq!(points[0].message_count, 2);
6608 assert_eq!(points[0].api_tokens_total, 1400);
6611 }
6612
6613 #[test]
6614 fn query_session_scatter_aggregates_duplicate_token_usage_rows_per_message() {
6615 let conn = setup_session_scatter_db();
6616 conn.execute_batch(
6617 "CREATE TABLE token_usage (
6618 id INTEGER PRIMARY KEY,
6619 message_id INTEGER NOT NULL,
6620 total_tokens INTEGER
6621 );",
6622 )
6623 .unwrap();
6624 conn.execute("INSERT INTO token_usage (id, message_id, total_tokens) VALUES (1, 11, 600)")
6625 .unwrap();
6626 conn.execute("INSERT INTO token_usage (id, message_id, total_tokens) VALUES (2, 11, 700)")
6627 .unwrap();
6628
6629 let filter = AnalyticsFilter {
6630 agents: vec!["codex".into()],
6631 source: SourceFilter::Local,
6632 ..Default::default()
6633 };
6634
6635 let points = query_session_scatter(&conn, &filter, 10).unwrap();
6636 assert_eq!(points.len(), 1);
6637 assert_eq!(points[0].source_path, "/sessions/a.jsonl");
6638 assert_eq!(points[0].message_count, 2);
6639 assert_eq!(points[0].api_tokens_total, 1000);
6642 }
6643
6644 #[test]
6645 fn query_session_scatter_uses_max_token_usage_total_for_duplicate_rows_per_message() {
6646 let conn = setup_session_scatter_db();
6647 conn.execute_batch(
6648 "CREATE TABLE token_usage (
6649 id INTEGER PRIMARY KEY,
6650 message_id INTEGER NOT NULL,
6651 total_tokens INTEGER
6652 );",
6653 )
6654 .unwrap();
6655 conn.execute(
6656 "UPDATE message_metrics
6657 SET api_input_tokens = NULL,
6658 api_output_tokens = NULL,
6659 api_cache_read_tokens = NULL,
6660 api_cache_creation_tokens = NULL,
6661 api_thinking_tokens = NULL
6662 WHERE message_id = 12",
6663 )
6664 .unwrap();
6665 conn.execute("INSERT INTO token_usage (id, message_id, total_tokens) VALUES (1, 12, 400)")
6666 .unwrap();
6667 conn.execute("INSERT INTO token_usage (id, message_id, total_tokens) VALUES (2, 12, 900)")
6668 .unwrap();
6669
6670 let filter = AnalyticsFilter {
6671 agents: vec!["codex".into()],
6672 source: SourceFilter::Local,
6673 ..Default::default()
6674 };
6675
6676 let points = query_session_scatter(&conn, &filter, 10).unwrap();
6677 assert_eq!(points.len(), 1);
6678 assert_eq!(points[0].source_path, "/sessions/a.jsonl");
6679 assert_eq!(points[0].message_count, 2);
6680 assert_eq!(points[0].api_tokens_total, 1400);
6683 }
6684
6685 #[test]
6686 fn query_session_scatter_falls_back_to_conversation_rollup_when_detailed_tokens_are_sparse() {
6687 let conn = setup_session_scatter_db();
6688 conn.execute(
6689 "UPDATE message_metrics
6690 SET api_input_tokens = NULL,
6691 api_output_tokens = NULL,
6692 api_cache_read_tokens = NULL,
6693 api_cache_creation_tokens = NULL,
6694 api_thinking_tokens = NULL
6695 WHERE message_id = 12",
6696 )
6697 .unwrap();
6698
6699 let filter = AnalyticsFilter {
6700 agents: vec!["codex".into()],
6701 source: SourceFilter::Local,
6702 ..Default::default()
6703 };
6704
6705 let points = query_session_scatter(&conn, &filter, 10).unwrap();
6706 assert_eq!(points.len(), 1);
6707 assert_eq!(points[0].source_path, "/sessions/a.jsonl");
6708 assert_eq!(points[0].message_count, 2);
6709 assert_eq!(points[0].api_tokens_total, 1000);
6712 }
6713
6714 #[test]
6715 fn query_session_scatter_uses_message_metrics_timestamp_when_message_created_at_missing() {
6716 let conn = setup_session_scatter_db();
6717 conn.execute("ALTER TABLE message_metrics ADD COLUMN created_at_ms INTEGER")
6718 .unwrap();
6719 conn.execute(
6720 "UPDATE message_metrics
6721 SET created_at_ms = CASE message_id
6722 WHEN 11 THEN 1700000001000
6723 WHEN 12 THEN 1700000002000
6724 WHEN 21 THEN 1700000001000
6725 WHEN 22 THEN 1700000002000
6726 WHEN 23 THEN 1700000003000
6727 ELSE 0
6728 END",
6729 )
6730 .unwrap();
6731 conn.execute("UPDATE messages SET created_at = NULL WHERE conversation_id = 2")
6732 .unwrap();
6733
6734 let filter = AnalyticsFilter {
6735 source: SourceFilter::Specific("remote-ci".into()),
6736 since_ms: Some(1_700_000_000_500),
6737 until_ms: Some(1_700_000_003_500),
6738 ..Default::default()
6739 };
6740
6741 let points = query_session_scatter(&conn, &filter, 10).unwrap();
6742 assert_eq!(points.len(), 1);
6743 assert_eq!(points[0].source_path, "/sessions/b.jsonl");
6744 assert_eq!(points[0].api_tokens_total, 2300);
6745 }
6746
6747 #[test]
6748 fn query_session_scatter_uses_token_usage_timestamp_when_message_created_at_missing() {
6749 let conn = setup_session_scatter_with_token_usage_fallback_db();
6750 conn.execute("ALTER TABLE token_usage ADD COLUMN timestamp_ms INTEGER")
6751 .unwrap();
6752 conn.execute(
6753 "UPDATE token_usage
6754 SET timestamp_ms = CASE message_id
6755 WHEN 11 THEN 1700000001000
6756 WHEN 12 THEN 1700000002000
6757 ELSE 0
6758 END",
6759 )
6760 .unwrap();
6761 conn.execute("UPDATE messages SET created_at = NULL WHERE conversation_id = 1")
6762 .unwrap();
6763
6764 let filter = AnalyticsFilter {
6765 agents: vec!["codex".into()],
6766 source: SourceFilter::Local,
6767 since_ms: Some(1_700_000_000_500),
6768 until_ms: Some(1_700_000_002_500),
6769 ..Default::default()
6770 };
6771
6772 let points = query_session_scatter(&conn, &filter, 10).unwrap();
6773
6774 assert_eq!(points.len(), 1);
6775 assert_eq!(points[0].source_path, "/sessions/a.jsonl");
6776 assert_eq!(points[0].api_tokens_total, 1400);
6777 }
6778
6779 #[test]
6780 fn query_cost_timeseries_deduplicates_duplicate_token_usage_rows() {
6781 let conn = setup_status_filter_db();
6782 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
6783 conn.execute("ALTER TABLE token_usage ADD COLUMN estimated_cost_usd REAL")
6784 .unwrap();
6785 conn.execute(
6786 "UPDATE token_usage
6787 SET estimated_cost_usd = CASE message_id
6788 WHEN 11 THEN 0.2
6789 WHEN 12 THEN 0.3
6790 WHEN 21 THEN 0.4
6791 ELSE 0.0
6792 END",
6793 )
6794 .unwrap();
6795 conn.execute(&format!(
6796 "INSERT INTO token_usage
6797 (message_id, conversation_id, agent_id, workspace_id, source_id, timestamp_ms, day_id,
6798 model_name, model_family, total_tokens, data_source, estimated_cost_usd)
6799 VALUES
6800 (11, 1, 1, 1, 'local', {}, 10, 'gpt-4o-mini', 'gpt-4o', 12, 'api', 0.2)",
6801 day10_ms + 100
6802 ))
6803 .unwrap();
6804
6805 let filter = AnalyticsFilter {
6806 workspace_ids: vec![1],
6807 agents: vec!["codex".into()],
6808 source: SourceFilter::Local,
6809 ..Default::default()
6810 };
6811 let result = query_cost_timeseries(&conn, &filter, GroupBy::Day).unwrap();
6812
6813 assert_eq!(result.source_table, "token_usage");
6814 assert_eq!(result.path, "raw");
6815 assert_eq!(result.buckets.len(), 1);
6816 assert_eq!(result.buckets[0].1.message_count, 2);
6817 assert_eq!(result.buckets[0].1.api_tokens_total, 29);
6818 assert!((result.buckets[0].1.estimated_cost_usd - 0.5).abs() < 0.001);
6819 assert_eq!(result.totals.api_tokens_total, 29);
6820 assert!((result.totals.estimated_cost_usd - 0.5).abs() < 0.001);
6821 }
6822
6823 #[test]
6824 fn query_session_scatter_with_api_source_column_preserves_legacy_mm_rows() {
6825 let conn = setup_session_scatter_with_api_source_column_db();
6826 let points = query_session_scatter(&conn, &AnalyticsFilter::default(), 10).unwrap();
6827
6828 assert_eq!(points.len(), 2);
6829 let session_a = points
6830 .iter()
6831 .find(|p| p.source_path == "/sessions/a.jsonl")
6832 .expect("session A should exist");
6833 let session_b = points
6834 .iter()
6835 .find(|p| p.source_path == "/sessions/b.jsonl")
6836 .expect("session B should exist");
6837
6838 assert_eq!(session_a.api_tokens_total, 1400);
6840 assert_eq!(session_b.api_tokens_total, 2300);
6843 }
6844
6845 #[test]
6846 fn query_breakdown_with_agent_filter() {
6847 let conn = setup_usage_daily_db();
6848 let filter = AnalyticsFilter {
6849 agents: vec!["codex".into()],
6850 ..Default::default()
6851 };
6852 let result = query_breakdown(&conn, &filter, Dim::Agent, Metric::ApiTotal, 10).unwrap();
6853
6854 assert_eq!(result.rows.len(), 1);
6855 assert_eq!(result.rows[0].key, "codex");
6856 assert_eq!(result.rows[0].value, 70000);
6858 }
6859
6860 #[test]
6861 fn metric_display_roundtrip() {
6862 assert_eq!(Metric::ApiTotal.to_string(), "api_total");
6863 assert_eq!(Metric::ToolCalls.to_string(), "tool_calls");
6864 assert_eq!(Metric::CoveragePct.to_string(), "coverage_pct");
6865 }
6866
6867 #[test]
6868 fn dim_display_roundtrip() {
6869 assert_eq!(Dim::Agent.to_string(), "agent");
6870 assert_eq!(Dim::Model.to_string(), "model");
6871 assert_eq!(Dim::Workspace.to_string(), "workspace");
6872 assert_eq!(Dim::Source.to_string(), "source");
6873 }
6874
6875 #[test]
6876 fn metric_rollup_column_coverage_pct_is_none() {
6877 assert!(Metric::CoveragePct.rollup_column().is_none());
6878 }
6879
6880 #[test]
6881 fn metric_rollup_column_api_total_is_some() {
6882 assert_eq!(Metric::ApiTotal.rollup_column(), Some("api_tokens_total"));
6883 }
6884
6885 #[test]
6890 fn query_cost_timeseries_returns_cost_data() {
6891 let conn = setup_token_daily_stats_db();
6892 let filter = AnalyticsFilter::default();
6893 let result = query_cost_timeseries(&conn, &filter, GroupBy::Day).unwrap();
6894
6895 assert_eq!(result.source_table, "token_daily_stats");
6896 assert_eq!(result.buckets.len(), 1); let (_, bucket) = &result.buckets[0];
6898 assert!((bucket.estimated_cost_usd - 2.70).abs() < 0.01);
6900 assert_eq!(bucket.api_tokens_total, 109_500);
6902 assert_eq!(bucket.message_count, 170);
6904 }
6905
6906 #[test]
6907 fn query_cost_timeseries_totals_match_bucket_sums() {
6908 let conn = setup_token_daily_stats_db();
6909 let filter = AnalyticsFilter::default();
6910 let result = query_cost_timeseries(&conn, &filter, GroupBy::Day).unwrap();
6911
6912 let sum_cost: f64 = result
6913 .buckets
6914 .iter()
6915 .map(|(_, b)| b.estimated_cost_usd)
6916 .sum();
6917 assert!((result.totals.estimated_cost_usd - sum_cost).abs() < 0.001);
6918 }
6919
6920 #[test]
6921 fn query_tokens_timeseries_source_filter_matches_blank_remote_usage_daily_source_via_origin_host()
6922 {
6923 let conn = setup_status_filter_db();
6924 conn.execute("ALTER TABLE conversations ADD COLUMN origin_host TEXT")
6925 .unwrap();
6926 conn.execute(
6927 "UPDATE conversations SET source_id = ' ', origin_host = 'remote-ci' WHERE id = 2",
6928 )
6929 .unwrap();
6930 conn.execute(
6931 "UPDATE message_metrics SET source_id = ' ' WHERE agent_slug = 'claude_code'",
6932 )
6933 .unwrap();
6934 conn.execute("UPDATE usage_hourly SET source_id = ' ' WHERE agent_slug = 'claude_code'")
6935 .unwrap();
6936 conn.execute("UPDATE usage_daily SET source_id = ' ' WHERE agent_slug = 'claude_code'")
6937 .unwrap();
6938
6939 let filter = AnalyticsFilter {
6940 source: SourceFilter::Specific("remote-ci".into()),
6941 ..Default::default()
6942 };
6943 let result = query_tokens_timeseries(&conn, &filter, GroupBy::Day).unwrap();
6944
6945 assert_eq!(result.source_table, "message_metrics");
6946 assert_eq!(result.buckets.len(), 1);
6947 assert_eq!(result.buckets[0].1.message_count, 1);
6948 assert_eq!(result.buckets[0].1.assistant_message_count, 1);
6949 assert_eq!(result.buckets[0].1.content_tokens_est_total, 5);
6950 assert_eq!(result.totals.message_count, 1);
6951 assert_eq!(result.totals.content_tokens_est_total, 5);
6952 }
6953
6954 #[test]
6955 fn query_tokens_timeseries_subday_filter_excludes_same_day_rollup_rows_without_raw_matches() {
6956 let conn = setup_status_filter_db();
6957 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
6958 let later_ms = day10_ms + (12 * 60 * 60 * 1000);
6959
6960 conn.execute(&format!(
6961 "UPDATE messages SET created_at = {later_ms} WHERE conversation_id = 1"
6962 ))
6963 .unwrap();
6964 conn.execute(&format!(
6965 "UPDATE message_metrics SET created_at_ms = {later_ms} WHERE agent_slug = 'codex'"
6966 ))
6967 .unwrap();
6968
6969 let filter = AnalyticsFilter {
6970 since_ms: Some(day10_ms),
6971 until_ms: Some(day10_ms + 500),
6972 agents: vec!["codex".into()],
6973 ..Default::default()
6974 };
6975 let result = query_tokens_timeseries(&conn, &filter, GroupBy::Day).unwrap();
6976
6977 assert_eq!(result.source_table, "message_metrics");
6978 assert_eq!(result.path, "raw");
6979 assert!(result.buckets.is_empty());
6980 assert_eq!(result.totals.message_count, 0);
6981 assert_eq!(result.totals.api_tokens_total, 0);
6982 }
6983
6984 #[test]
6985 fn query_tokens_timeseries_uses_legacy_second_message_metrics_timestamps_for_exact_filters() {
6986 let conn = setup_status_filter_db();
6987 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
6988 let second_ts = (day10_ms + 100) / 1000;
6989
6990 conn.execute("UPDATE messages SET created_at = NULL WHERE conversation_id = 1")
6991 .unwrap();
6992 conn.execute(&format!(
6993 "UPDATE message_metrics SET created_at_ms = {second_ts} WHERE agent_slug = 'codex'"
6994 ))
6995 .unwrap();
6996
6997 let filter = AnalyticsFilter {
6998 since_ms: Some(day10_ms),
6999 until_ms: Some(day10_ms + 500),
7000 agents: vec!["codex".into()],
7001 ..Default::default()
7002 };
7003 let result = query_tokens_timeseries(&conn, &filter, GroupBy::Day).unwrap();
7004
7005 assert_eq!(result.source_table, "message_metrics");
7006 assert_eq!(result.path, "raw");
7007 assert_eq!(result.buckets.len(), 1);
7008 assert_eq!(result.buckets[0].1.message_count, 2);
7009 assert_eq!(result.buckets[0].1.api_tokens_total, 29);
7010 assert_eq!(result.totals.message_count, 2);
7011 assert_eq!(result.totals.api_tokens_total, 29);
7012 }
7013
7014 #[test]
7015 fn query_cost_timeseries_source_filter_matches_blank_remote_token_daily_stats_source_via_origin_host()
7016 {
7017 let conn = setup_status_filter_db();
7018 conn.execute("ALTER TABLE conversations ADD COLUMN origin_host TEXT")
7019 .unwrap();
7020 conn.execute("ALTER TABLE token_usage ADD COLUMN estimated_cost_usd REAL")
7021 .unwrap();
7022 conn.execute(
7023 "UPDATE conversations SET source_id = ' ', origin_host = 'remote-ci' WHERE id = 2",
7024 )
7025 .unwrap();
7026 conn.execute("UPDATE token_usage SET source_id = ' ', estimated_cost_usd = 0.4 WHERE conversation_id = 2")
7027 .unwrap();
7028 conn.execute(
7029 "UPDATE token_daily_stats SET source_id = ' ' WHERE agent_slug = 'claude_code'",
7030 )
7031 .unwrap();
7032
7033 let filter = AnalyticsFilter {
7034 source: SourceFilter::Specific("remote-ci".into()),
7035 ..Default::default()
7036 };
7037 let result = query_cost_timeseries(&conn, &filter, GroupBy::Day).unwrap();
7038
7039 assert_eq!(result.source_table, "token_usage");
7040 assert_eq!(result.buckets.len(), 1);
7041 assert_eq!(result.buckets[0].1.api_tokens_total, 11);
7042 assert!((result.buckets[0].1.estimated_cost_usd - 0.4).abs() < 0.001);
7043 assert_eq!(result.totals.api_tokens_total, 11);
7044 assert!((result.totals.estimated_cost_usd - 0.4).abs() < 0.001);
7045 }
7046
7047 #[test]
7048 fn query_cost_timeseries_subday_filter_excludes_same_day_rollup_rows_without_raw_matches() {
7049 let conn = setup_status_filter_db();
7050 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
7051 let later_ms = day10_ms + (12 * 60 * 60 * 1000);
7052 conn.execute("ALTER TABLE token_usage ADD COLUMN estimated_cost_usd REAL")
7053 .unwrap();
7054 conn.execute(
7055 "UPDATE token_usage
7056 SET estimated_cost_usd = CASE message_id
7057 WHEN 11 THEN 0.2
7058 WHEN 12 THEN 0.3
7059 WHEN 21 THEN 0.4
7060 ELSE 0.0
7061 END",
7062 )
7063 .unwrap();
7064 conn.execute(&format!(
7065 "UPDATE token_usage SET timestamp_ms = {later_ms} WHERE conversation_id = 1"
7066 ))
7067 .unwrap();
7068
7069 let filter = AnalyticsFilter {
7070 since_ms: Some(day10_ms),
7071 until_ms: Some(day10_ms + 500),
7072 agents: vec!["codex".into()],
7073 ..Default::default()
7074 };
7075 let result = query_cost_timeseries(&conn, &filter, GroupBy::Day).unwrap();
7076
7077 assert_eq!(result.source_table, "token_usage");
7078 assert_eq!(result.path, "raw");
7079 assert!(result.buckets.is_empty());
7080 assert_eq!(result.totals.api_tokens_total, 0);
7081 assert!((result.totals.estimated_cost_usd - 0.0).abs() < 0.001);
7082 }
7083
7084 #[test]
7085 fn query_cost_timeseries_uses_legacy_second_token_usage_timestamps_for_exact_filters() {
7086 let conn = setup_status_filter_db();
7087 let day10_ms = crate::storage::sqlite::FrankenStorage::millis_from_day_id(10);
7088 let second_ts = (day10_ms + 100) / 1000;
7089 conn.execute("ALTER TABLE token_usage ADD COLUMN estimated_cost_usd REAL")
7090 .unwrap();
7091 conn.execute(
7092 "UPDATE token_usage
7093 SET estimated_cost_usd = CASE message_id
7094 WHEN 11 THEN 0.2
7095 WHEN 12 THEN 0.3
7096 WHEN 21 THEN 0.4
7097 ELSE 0.0
7098 END",
7099 )
7100 .unwrap();
7101 conn.execute(&format!(
7102 "UPDATE token_usage SET timestamp_ms = {second_ts} WHERE conversation_id = 1"
7103 ))
7104 .unwrap();
7105
7106 let filter = AnalyticsFilter {
7107 since_ms: Some(day10_ms),
7108 until_ms: Some(day10_ms + 500),
7109 agents: vec!["codex".into()],
7110 ..Default::default()
7111 };
7112 let result = query_cost_timeseries(&conn, &filter, GroupBy::Day).unwrap();
7113
7114 assert_eq!(result.source_table, "token_usage");
7115 assert_eq!(result.path, "raw");
7116 assert_eq!(result.buckets.len(), 1);
7117 assert_eq!(result.buckets[0].1.api_tokens_total, 29);
7118 assert!((result.buckets[0].1.estimated_cost_usd - 0.5).abs() < 0.001);
7119 assert_eq!(result.totals.api_tokens_total, 29);
7120 assert!((result.totals.estimated_cost_usd - 0.5).abs() < 0.001);
7121 }
7122
7123 #[test]
7124 fn query_cost_timeseries_hour_group_uses_token_usage_hour_buckets() {
7125 let conn = setup_status_filter_db();
7126 conn.execute("ALTER TABLE token_usage ADD COLUMN estimated_cost_usd REAL")
7127 .unwrap();
7128 conn.execute(
7129 "UPDATE token_usage
7130 SET estimated_cost_usd = CASE message_id
7131 WHEN 11 THEN 0.2
7132 WHEN 12 THEN 0.3
7133 WHEN 21 THEN 0.4
7134 ELSE 0.0
7135 END",
7136 )
7137 .unwrap();
7138
7139 let result =
7140 query_cost_timeseries(&conn, &AnalyticsFilter::default(), GroupBy::Hour).unwrap();
7141
7142 assert_eq!(result.source_table, "token_usage");
7143 assert_eq!(result.path, "raw");
7144 assert_eq!(result.buckets.len(), 2);
7145 assert!(
7146 result
7147 .buckets
7148 .iter()
7149 .all(|(bucket, _)| bucket.contains('T'))
7150 );
7151 assert_eq!(result.totals.api_tokens_total, 40);
7152 assert!((result.totals.estimated_cost_usd - 0.9).abs() < 0.001);
7153 }
7154
7155 #[test]
7156 fn query_cost_timeseries_missing_table_returns_empty() {
7157 let conn = Connection::open(":memory:").unwrap();
7158 let filter = AnalyticsFilter::default();
7159 let result = query_cost_timeseries(&conn, &filter, GroupBy::Day).unwrap();
7160
7161 assert!(result.buckets.is_empty());
7162 assert_eq!(result.totals.estimated_cost_usd, 0.0);
7163 assert_eq!(result.path, "none");
7164 }
7165
7166 #[test]
7167 fn query_breakdown_agent_with_cost_metric_normalizes_trimmed_agent_slug() {
7168 let conn = setup_token_daily_stats_db();
7169 conn.execute(
7170 "UPDATE token_daily_stats SET agent_slug = ' codex ' WHERE agent_slug = 'codex'",
7171 )
7172 .unwrap();
7173
7174 let filter = AnalyticsFilter {
7175 agents: vec![" codex ".into()],
7176 ..Default::default()
7177 };
7178 let result =
7179 query_breakdown(&conn, &filter, Dim::Agent, Metric::EstimatedCostUsd, 10).unwrap();
7180
7181 assert_eq!(result.rows.len(), 1);
7182 assert_eq!(result.rows[0].key, "codex");
7183 assert!((result.rows[0].bucket.estimated_cost_usd - 0.80).abs() < f64::EPSILON);
7184 }
7185
7186 #[test]
7187 fn query_breakdown_agent_with_cost_metric_uses_track_b() {
7188 let conn = setup_token_daily_stats_db();
7189 let filter = AnalyticsFilter::default();
7190 let result =
7191 query_breakdown(&conn, &filter, Dim::Agent, Metric::EstimatedCostUsd, 10).unwrap();
7192
7193 assert_eq!(result.source_table, "token_daily_stats");
7195 assert!(!result.rows.is_empty());
7196 assert_eq!(result.rows[0].key, "claude_code");
7198 assert!((result.rows[0].bucket.estimated_cost_usd - 1.90).abs() < 0.01);
7199 assert!((result.rows[1].bucket.estimated_cost_usd - 0.80).abs() < 0.01);
7200 }
7201
7202 #[test]
7203 fn query_breakdown_workspace_with_cost_metric_uses_track_a_zero_values() {
7204 let conn = setup_usage_daily_db();
7205 let filter = AnalyticsFilter::default();
7206 let result =
7207 query_breakdown(&conn, &filter, Dim::Workspace, Metric::EstimatedCostUsd, 10).unwrap();
7208
7209 assert_eq!(result.source_table, "usage_daily");
7210 assert!(!result.rows.is_empty());
7211 assert!(result.rows.iter().all(|r| r.value == 0));
7212 assert!(
7213 result
7214 .rows
7215 .iter()
7216 .all(|r| r.bucket.estimated_cost_usd == 0.0)
7217 );
7218 }
7219
7220 #[test]
7221 fn query_breakdown_model_with_cost_metric_orders_by_cost() {
7222 let conn = setup_token_daily_stats_db();
7223 let filter = AnalyticsFilter::default();
7224 let result =
7225 query_breakdown(&conn, &filter, Dim::Model, Metric::EstimatedCostUsd, 10).unwrap();
7226
7227 assert_eq!(result.rows[0].key, "opus");
7229 assert!((result.rows[0].bucket.estimated_cost_usd - 1.50).abs() < 0.01);
7230 }
7231
7232 #[test]
7233 fn query_breakdown_model_content_est_total_uses_content_chars() {
7234 let conn = setup_token_daily_stats_db();
7235 let filter = AnalyticsFilter::default();
7236 let result =
7237 query_breakdown(&conn, &filter, Dim::Model, Metric::ContentEstTotal, 10).unwrap();
7238
7239 assert_eq!(result.rows[0].key, "opus");
7241 assert_eq!(result.rows[0].value, 40_000);
7242 assert_eq!(result.rows[1].key, "gpt-4o");
7243 assert_eq!(result.rows[1].value, 25_000);
7244 }
7245
7246 #[test]
7247 fn query_breakdown_model_coverage_pct_is_derived() {
7248 let conn = setup_token_daily_stats_db();
7249 let filter = AnalyticsFilter::default();
7250 let result = query_breakdown(&conn, &filter, Dim::Model, Metric::CoveragePct, 10).unwrap();
7251
7252 assert!(!result.rows.is_empty());
7253 assert!(result.rows.iter().all(|r| r.value == 100));
7254 }
7255
7256 #[test]
7257 fn query_breakdown_model_plan_count_is_zero_on_track_b() {
7258 let conn = setup_token_daily_stats_db();
7259 let filter = AnalyticsFilter::default();
7260 let result = query_breakdown(&conn, &filter, Dim::Model, Metric::PlanCount, 10).unwrap();
7261
7262 assert!(!result.rows.is_empty());
7263 assert!(result.rows.iter().all(|r| r.value == 0));
7264 }
7265}