Skip to main content

kaizen/visualization/
build.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2
3use super::activity::activity;
4use super::rollup::{cost_session_count, counts, has_tokens, pct, token_totals};
5use super::types::*;
6use crate::core::event::{Event, EventKind, SessionRecord, SessionStatus};
7use crate::store::Store;
8use anyhow::Result;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12const ACTIVE_TTL_MS: u64 = 5 * 60_000;
13const ORPHAN_TTL_MS: u64 = 30 * 60_000;
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
16pub struct VisualizationQuery {
17    pub workspace: String,
18    pub selected_session_id: Option<String>,
19    pub now_ms: u64,
20    pub day_start_hour: u8,
21}
22
23pub fn build_report(store: &Store, query: VisualizationQuery) -> Result<VisualizationReport> {
24    let sessions = store.list_sessions(&query.workspace)?;
25    let pairs = store.workspace_events(&query.workspace)?;
26    let selected = selected_detail(store, &query)?;
27    let summaries = trace_summaries(&sessions, &pairs, query.now_ms);
28    Ok(VisualizationReport {
29        generated_at_ms: query.now_ms,
30        workspace: query.workspace,
31        totals: totals(&sessions, &pairs),
32        activity: activity(&pairs, query.now_ms),
33        sessions: summaries,
34        selected,
35        quality: quality(&sessions, &pairs),
36    })
37}
38
39fn selected_detail(store: &Store, query: &VisualizationQuery) -> Result<Option<TraceDetail>> {
40    let Some(id) = query.selected_session_id.as_deref() else {
41        return Ok(None);
42    };
43    let Some(session) = store.get_session(id)? else {
44        return Ok(None);
45    };
46    Ok(Some(TraceDetail {
47        session,
48        events: store.list_events_for_session(id)?,
49        spans: store.session_span_tree(id)?,
50        files: store.files_for_session(id)?,
51    }))
52}
53
54fn totals(sessions: &[SessionRecord], pairs: &[(SessionRecord, Event)]) -> VisualizationTotals {
55    let tokens = token_totals(pairs.iter().map(|(_, e)| e));
56    VisualizationTotals {
57        session_count: sessions.len() as u64,
58        running_count: sessions
59            .iter()
60            .filter(|s| s.status != SessionStatus::Done)
61            .count() as u64,
62        event_count: pairs.len() as u64,
63        error_count: pairs
64            .iter()
65            .filter(|(_, e)| e.kind == EventKind::Error)
66            .count() as u64,
67        tool_call_count: pairs
68            .iter()
69            .filter(|(_, e)| e.kind == EventKind::ToolCall)
70            .count() as u64,
71        cost_usd_e6: pairs.iter().filter_map(|(_, e)| e.cost_usd_e6).sum(),
72        tokens,
73    }
74}
75
76fn trace_summaries(
77    sessions: &[SessionRecord],
78    pairs: &[(SessionRecord, Event)],
79    now_ms: u64,
80) -> Vec<TraceSummary> {
81    let grouped = group_events(pairs);
82    sessions
83        .iter()
84        .take(100)
85        .map(|s| {
86            trace_summary(
87                s,
88                grouped.get(s.id.as_str()).cloned().unwrap_or_default(),
89                now_ms,
90            )
91        })
92        .collect()
93}
94
95fn trace_summary(session: &SessionRecord, events: Vec<&Event>, now_ms: u64) -> TraceSummary {
96    let (status, status_reason) = derived_status(session, &events, now_ms);
97    TraceSummary {
98        id: session.id.clone(),
99        agent: session.agent.clone(),
100        model: session.model.clone(),
101        status,
102        status_reason,
103        started_at_ms: session.started_at_ms,
104        ended_at_ms: session.ended_at_ms,
105        last_event_ms: events.iter().map(|e| e.ts_ms).max(),
106        event_count: events.len() as u64,
107        error_count: events.iter().filter(|e| e.kind == EventKind::Error).count() as u64,
108        tool_call_count: events
109            .iter()
110            .filter(|e| e.kind == EventKind::ToolCall)
111            .count() as u64,
112        cost_usd_e6: events.iter().filter_map(|e| e.cost_usd_e6).sum(),
113        tokens: token_totals(events.iter().copied()),
114        top_tools: top_tools(&events),
115    }
116}
117
118fn quality(sessions: &[SessionRecord], pairs: &[(SessionRecord, Event)]) -> DataQuality {
119    let token_events = pairs.iter().filter(|(_, e)| has_tokens(e)).count();
120    let cost_events = pairs
121        .iter()
122        .filter(|(_, e)| e.cost_usd_e6.is_some())
123        .count();
124    DataQuality {
125        token_coverage_pct: pct(pairs.len(), token_events),
126        cost_coverage_pct: pct(pairs.len(), cost_events),
127        partial_cost_sessions: sessions.len().saturating_sub(cost_session_count(pairs)) as u64,
128        warnings: empty_warnings(sessions, pairs),
129        ..DataQuality::default()
130    }
131}
132
133fn derived_status(s: &SessionRecord, events: &[&Event], now_ms: u64) -> (DerivedStatus, String) {
134    if events.iter().any(|e| e.kind == EventKind::Error) {
135        return (DerivedStatus::Errored, "error event".into());
136    }
137    if s.status == SessionStatus::Done || s.ended_at_ms.is_some() {
138        return (DerivedStatus::Done, "session ended".into());
139    }
140    match events.iter().map(|e| e.ts_ms).max() {
141        Some(ts) if now_ms.saturating_sub(ts) <= ACTIVE_TTL_MS => {
142            (DerivedStatus::Active, "recent event".into())
143        }
144        Some(ts) if now_ms.saturating_sub(ts) >= ORPHAN_TTL_MS => {
145            (DerivedStatus::Orphaned, "stale open session".into())
146        }
147        Some(_) => (DerivedStatus::Idle, "no recent event".into()),
148        None => (DerivedStatus::Idle, "no events".into()),
149    }
150}
151
152fn group_events(pairs: &[(SessionRecord, Event)]) -> HashMap<&str, Vec<&Event>> {
153    let mut out: HashMap<&str, Vec<&Event>> = HashMap::new();
154    pairs
155        .iter()
156        .for_each(|(s, e)| out.entry(s.id.as_str()).or_default().push(e));
157    out
158}
159
160fn top_tools(events: &[&Event]) -> Vec<(String, u64)> {
161    let mut counts = counts(events.iter().filter_map(|e| e.tool.as_deref()));
162    counts.truncate(5);
163    counts
164}
165
166fn empty_warnings(sessions: &[SessionRecord], pairs: &[(SessionRecord, Event)]) -> Vec<String> {
167    (sessions.is_empty() || pairs.is_empty())
168        .then(|| "no local telemetry for workspace".to_string())
169        .into_iter()
170        .collect()
171}