Skip to main content

kaizen/visualization/
build.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2
3use super::activity::activity;
4use super::types::*;
5use crate::core::event::{SessionRecord, SessionStatus};
6use crate::store::Store;
7use anyhow::{Result, ensure};
8use serde::{Deserialize, Serialize};
9
10const ACTIVE_TTL_MS: u64 = 5 * 60_000;
11const ORPHAN_TTL_MS: u64 = 30 * 60_000;
12
13#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
14pub struct VisualizationLimits {
15    /// Maximum latest sessions materialized for summaries.
16    pub sessions: usize,
17    /// Maximum latest events materialized for selected detail.
18    pub selected_events: usize,
19    /// Maximum spans materialized for selected detail.
20    pub selected_spans: usize,
21    /// Maximum files materialized for selected detail.
22    pub selected_files: usize,
23}
24
25#[derive(Clone, Debug, Serialize, Deserialize)]
26pub struct VisualizationQuery {
27    pub workspace: String,
28    pub selected_session_id: Option<String>,
29    pub now_ms: u64,
30    /// Compute heatmap bins only for surfaces that render them.
31    pub include_activity: bool,
32    /// Fall back to latest session when requested selection is absent or invalid.
33    pub select_latest: bool,
34    pub limits: VisualizationLimits,
35}
36
37#[derive(Clone, Copy, Debug, Eq, PartialEq)]
38pub(crate) struct MaterializedRows {
39    pub(crate) sessions: usize,
40    pub(crate) selected_events: usize,
41    pub(crate) selected_spans: usize,
42    pub(crate) selected_files: usize,
43}
44
45pub(crate) struct BuiltReport {
46    pub(crate) report: VisualizationReport,
47    pub(crate) materialized: MaterializedRows,
48}
49
50pub fn build_report(store: &Store, query: VisualizationQuery) -> Result<VisualizationReport> {
51    Ok(build_report_observed(store, query)?.report)
52}
53
54pub(crate) fn build_report_observed(
55    store: &Store,
56    query: VisualizationQuery,
57) -> Result<BuiltReport> {
58    validate(&query.limits)?;
59    let active_since_ms = query.now_ms.saturating_sub(ACTIVE_TTL_MS);
60    let (totals, quality) = store.visualization_totals(&query.workspace, active_since_ms)?;
61    let sessions =
62        store.visualization_sessions(&query.workspace, query.limits.sessions, query.now_ms)?;
63    let selected = selected_detail(store, &query, sessions.first())?;
64    let activity = activity_report(store, &query)?;
65    let materialized = counts(&sessions, &selected);
66    Ok(BuiltReport {
67        report: report(query, totals, quality, sessions, selected, activity),
68        materialized,
69    })
70}
71
72fn validate(limits: &VisualizationLimits) -> Result<()> {
73    ensure_positive(limits.sessions, "session")?;
74    ensure_positive(limits.selected_events, "event")?;
75    ensure_positive(limits.selected_spans, "span")?;
76    ensure_positive(limits.selected_files, "file")?;
77    Ok(())
78}
79
80fn ensure_positive(limit: usize, kind: &str) -> Result<()> {
81    ensure!(limit > 0, "visualization {kind} limit must be positive");
82    Ok(())
83}
84
85fn activity_report(store: &Store, query: &VisualizationQuery) -> Result<ActivityReport> {
86    if query.include_activity {
87        activity(store, &query.workspace, query.now_ms)
88    } else {
89        Ok(Default::default())
90    }
91}
92
93fn selected_detail(
94    store: &Store,
95    query: &VisualizationQuery,
96    latest: Option<&TraceSummary>,
97) -> Result<Option<TraceDetail>> {
98    let Some(session) = selected_session(store, query, latest)? else {
99        return Ok(None);
100    };
101    let id = session.id.clone();
102    Ok(Some(TraceDetail {
103        session,
104        events: store.list_latest_events_for_session(&id, query.limits.selected_events)?,
105        spans: store.limited_session_span_tree(&id, query.limits.selected_spans)?,
106        files: store.limited_files_for_session(&id, query.limits.selected_files)?,
107    }))
108}
109
110fn selected_session(
111    store: &Store,
112    query: &VisualizationQuery,
113    latest: Option<&TraceSummary>,
114) -> Result<Option<SessionRecord>> {
115    if let Some(session) = requested_session(store, query)? {
116        return Ok(Some(session));
117    }
118    if !query.select_latest {
119        return Ok(None);
120    }
121    latest
122        .map(|summary| store.get_session(&summary.id))
123        .transpose()
124        .map(Option::flatten)
125}
126
127fn requested_session(store: &Store, query: &VisualizationQuery) -> Result<Option<SessionRecord>> {
128    let Some(id) = query.selected_session_id.as_deref() else {
129        return Ok(None);
130    };
131    Ok(store
132        .get_session(id)?
133        .filter(|session| session.workspace == query.workspace))
134}
135
136fn counts(sessions: &[TraceSummary], selected: &Option<TraceDetail>) -> MaterializedRows {
137    MaterializedRows {
138        sessions: sessions.len(),
139        selected_events: selected.as_ref().map_or(0, |detail| detail.events.len()),
140        selected_spans: selected
141            .as_ref()
142            .map_or(0, |detail| span_count(&detail.spans)),
143        selected_files: selected.as_ref().map_or(0, |detail| detail.files.len()),
144    }
145}
146
147fn span_count(spans: &[crate::store::SpanNode]) -> usize {
148    spans
149        .iter()
150        .map(|node| 1 + span_count(&node.children))
151        .sum()
152}
153
154fn report(
155    query: VisualizationQuery,
156    totals: VisualizationTotals,
157    quality: DataQuality,
158    sessions: Vec<TraceSummary>,
159    selected: Option<TraceDetail>,
160    activity: ActivityReport,
161) -> VisualizationReport {
162    VisualizationReport {
163        generated_at_ms: query.now_ms,
164        workspace: query.workspace,
165        totals,
166        activity,
167        sessions,
168        selected,
169        quality,
170    }
171}
172
173pub(crate) fn derive_status(
174    session: &SessionRecord,
175    last_event_ms: Option<u64>,
176    error_count: u64,
177    now_ms: u64,
178) -> (DerivedStatus, String) {
179    if error_count > 0 {
180        return (DerivedStatus::Errored, "error event".into());
181    }
182    if session.status == SessionStatus::Done || session.ended_at_ms.is_some() {
183        return (DerivedStatus::Done, "session ended".into());
184    }
185    match last_event_ms {
186        Some(ts) if now_ms.saturating_sub(ts) <= ACTIVE_TTL_MS => {
187            (DerivedStatus::Active, "recent event".into())
188        }
189        Some(ts) if now_ms.saturating_sub(ts) >= ORPHAN_TTL_MS => {
190            (DerivedStatus::Orphaned, "stale open session".into())
191        }
192        Some(_) => (DerivedStatus::Idle, "no recent event".into()),
193        None => (DerivedStatus::Idle, "no events".into()),
194    }
195}