1use super::activity::activity;
4use super::rollup::{cost_session_count, counts, has_tokens, pct, token_totals};
5use super::types::*;
6use crate::core::event::{Event, EventKind, SessionRecord, SessionStatus};
7use crate::store::Store;
8use anyhow::Result;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12const ACTIVE_TTL_MS: u64 = 5 * 60_000;
13const ORPHAN_TTL_MS: u64 = 30 * 60_000;
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
16pub struct VisualizationQuery {
17 pub workspace: String,
18 pub selected_session_id: Option<String>,
19 pub now_ms: u64,
20 pub day_start_hour: u8,
21}
22
23pub fn build_report(store: &Store, query: VisualizationQuery) -> Result<VisualizationReport> {
24 let sessions = store.list_sessions(&query.workspace)?;
25 let pairs = store.workspace_events(&query.workspace)?;
26 let selected = selected_detail(store, &query)?;
27 let summaries = trace_summaries(&sessions, &pairs, query.now_ms);
28 Ok(VisualizationReport {
29 generated_at_ms: query.now_ms,
30 workspace: query.workspace,
31 totals: totals(&sessions, &pairs),
32 activity: activity(&pairs, query.now_ms),
33 sessions: summaries,
34 selected,
35 quality: quality(&sessions, &pairs),
36 })
37}
38
39fn selected_detail(store: &Store, query: &VisualizationQuery) -> Result<Option<TraceDetail>> {
40 let Some(id) = query.selected_session_id.as_deref() else {
41 return Ok(None);
42 };
43 let Some(session) = store.get_session(id)? else {
44 return Ok(None);
45 };
46 Ok(Some(TraceDetail {
47 session,
48 events: store.list_events_for_session(id)?,
49 spans: store.session_span_tree(id)?,
50 files: store.files_for_session(id)?,
51 }))
52}
53
54fn totals(sessions: &[SessionRecord], pairs: &[(SessionRecord, Event)]) -> VisualizationTotals {
55 let tokens = token_totals(pairs.iter().map(|(_, e)| e));
56 VisualizationTotals {
57 session_count: sessions.len() as u64,
58 running_count: sessions
59 .iter()
60 .filter(|s| s.status != SessionStatus::Done)
61 .count() as u64,
62 event_count: pairs.len() as u64,
63 error_count: pairs
64 .iter()
65 .filter(|(_, e)| e.kind == EventKind::Error)
66 .count() as u64,
67 tool_call_count: pairs
68 .iter()
69 .filter(|(_, e)| e.kind == EventKind::ToolCall)
70 .count() as u64,
71 cost_usd_e6: pairs.iter().filter_map(|(_, e)| e.cost_usd_e6).sum(),
72 tokens,
73 }
74}
75
76fn trace_summaries(
77 sessions: &[SessionRecord],
78 pairs: &[(SessionRecord, Event)],
79 now_ms: u64,
80) -> Vec<TraceSummary> {
81 let grouped = group_events(pairs);
82 sessions
83 .iter()
84 .take(100)
85 .map(|s| {
86 trace_summary(
87 s,
88 grouped.get(s.id.as_str()).cloned().unwrap_or_default(),
89 now_ms,
90 )
91 })
92 .collect()
93}
94
95fn trace_summary(session: &SessionRecord, events: Vec<&Event>, now_ms: u64) -> TraceSummary {
96 let (status, status_reason) = derived_status(session, &events, now_ms);
97 TraceSummary {
98 id: session.id.clone(),
99 agent: session.agent.clone(),
100 model: session.model.clone(),
101 status,
102 status_reason,
103 started_at_ms: session.started_at_ms,
104 ended_at_ms: session.ended_at_ms,
105 last_event_ms: events.iter().map(|e| e.ts_ms).max(),
106 event_count: events.len() as u64,
107 error_count: events.iter().filter(|e| e.kind == EventKind::Error).count() as u64,
108 tool_call_count: events
109 .iter()
110 .filter(|e| e.kind == EventKind::ToolCall)
111 .count() as u64,
112 cost_usd_e6: events.iter().filter_map(|e| e.cost_usd_e6).sum(),
113 tokens: token_totals(events.iter().copied()),
114 top_tools: top_tools(&events),
115 }
116}
117
118fn quality(sessions: &[SessionRecord], pairs: &[(SessionRecord, Event)]) -> DataQuality {
119 let token_events = pairs.iter().filter(|(_, e)| has_tokens(e)).count();
120 let cost_events = pairs
121 .iter()
122 .filter(|(_, e)| e.cost_usd_e6.is_some())
123 .count();
124 DataQuality {
125 token_coverage_pct: pct(pairs.len(), token_events),
126 cost_coverage_pct: pct(pairs.len(), cost_events),
127 partial_cost_sessions: sessions.len().saturating_sub(cost_session_count(pairs)) as u64,
128 warnings: empty_warnings(sessions, pairs),
129 ..DataQuality::default()
130 }
131}
132
133fn derived_status(s: &SessionRecord, events: &[&Event], now_ms: u64) -> (DerivedStatus, String) {
134 if events.iter().any(|e| e.kind == EventKind::Error) {
135 return (DerivedStatus::Errored, "error event".into());
136 }
137 if s.status == SessionStatus::Done || s.ended_at_ms.is_some() {
138 return (DerivedStatus::Done, "session ended".into());
139 }
140 match events.iter().map(|e| e.ts_ms).max() {
141 Some(ts) if now_ms.saturating_sub(ts) <= ACTIVE_TTL_MS => {
142 (DerivedStatus::Active, "recent event".into())
143 }
144 Some(ts) if now_ms.saturating_sub(ts) >= ORPHAN_TTL_MS => {
145 (DerivedStatus::Orphaned, "stale open session".into())
146 }
147 Some(_) => (DerivedStatus::Idle, "no recent event".into()),
148 None => (DerivedStatus::Idle, "no events".into()),
149 }
150}
151
152fn group_events(pairs: &[(SessionRecord, Event)]) -> HashMap<&str, Vec<&Event>> {
153 let mut out: HashMap<&str, Vec<&Event>> = HashMap::new();
154 pairs
155 .iter()
156 .for_each(|(s, e)| out.entry(s.id.as_str()).or_default().push(e));
157 out
158}
159
160fn top_tools(events: &[&Event]) -> Vec<(String, u64)> {
161 let mut counts = counts(events.iter().filter_map(|e| e.tool.as_deref()));
162 counts.truncate(5);
163 counts
164}
165
166fn empty_warnings(sessions: &[SessionRecord], pairs: &[(SessionRecord, Event)]) -> Vec<String> {
167 (sessions.is_empty() || pairs.is_empty())
168 .then(|| "no local telemetry for workspace".to_string())
169 .into_iter()
170 .collect()
171}