Skip to main content

kaizen/shell/
remote_observe.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Merge `remote_events` aggregates into `summary` / `insights` / `metrics` for non-local sources.
3
4use crate::core::config::Config;
5use crate::core::data_source::DataSource;
6use crate::metrics::types::{MetricsReport, RankedTool};
7use crate::store::{GuidanceReport, InsightsStats, RemoteEventAgg, Store, SummaryStats};
8use anyhow::Result;
9use std::collections::HashMap;
10use std::path::Path;
11
12/// Load provider-side aggregates when `team_id` + `workspace_hash` (from config + workspace) are set.
13/// Returns `Ok(None)` if sync context is missing; `Err` on DB errors.
14pub fn try_remote_event_agg(
15    store: &Store,
16    cfg: &Config,
17    workspace: &Path,
18) -> Result<Option<RemoteEventAgg>> {
19    if cfg.sync.team_id.trim().is_empty() {
20        return Ok(None);
21    }
22    let Some(ctx) = crate::sync::ingest_ctx(cfg, workspace.to_path_buf()) else {
23        return Ok(None);
24    };
25    let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx) else {
26        return Ok(None);
27    };
28    let agg = store.remote_event_aggregate(&cfg.sync.team_id, &wh)?;
29    Ok(Some(agg))
30}
31
32fn merge_count_rows(rows: impl IntoIterator<Item = Vec<(String, u64)>>) -> Vec<(String, u64)> {
33    let mut m: HashMap<String, u64> = HashMap::new();
34    for set in rows {
35        for (k, v) in set {
36            *m.entry(k).or_insert(0) += v;
37        }
38    }
39    let mut v: Vec<_> = m.into_iter().collect();
40    v.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
41    v
42}
43
44fn summary_from_remote(r: &RemoteEventAgg) -> SummaryStats {
45    SummaryStats {
46        session_count: r.session_count,
47        total_cost_usd_e6: r.total_cost_usd_e6,
48        by_agent: r.by_agent.clone(),
49        by_model: r.by_model.clone(),
50        top_tools: r.top_tools.clone(),
51    }
52}
53
54/// Merge or replace headline stats for `DataSource` (see `specs/observe-pipeline.qnt`).
55pub fn merge_summary_stats(
56    local: SummaryStats,
57    remote: &RemoteEventAgg,
58    source: DataSource,
59) -> SummaryStats {
60    if remote.event_count == 0 {
61        return local;
62    }
63    match source {
64        DataSource::Local => local,
65        DataSource::Provider => summary_from_remote(remote),
66        DataSource::Mixed => SummaryStats {
67            session_count: local.session_count.saturating_add(remote.session_count),
68            total_cost_usd_e6: local
69                .total_cost_usd_e6
70                .saturating_add(remote.total_cost_usd_e6),
71            by_agent: merge_count_rows(vec![local.by_agent, remote.by_agent.clone()]),
72            by_model: merge_count_rows(vec![local.by_model, remote.by_model.clone()]),
73            top_tools: merge_count_rows(vec![local.top_tools, remote.top_tools.clone()]),
74        },
75    }
76}
77
78fn insights_from_remote(r: RemoteEventAgg) -> InsightsStats {
79    let RemoteEventAgg {
80        session_count,
81        event_count,
82        total_cost_usd_e6,
83        sessions_with_cost,
84        sessions_by_day,
85        top_tools,
86        ..
87    } = r;
88    let top_tools = top_tools.into_iter().take(5).collect();
89    InsightsStats {
90        total_sessions: session_count,
91        running_sessions: 0,
92        total_events: event_count,
93        sessions_by_day,
94        recent: vec![],
95        top_tools,
96        total_cost_usd_e6,
97        sessions_with_cost,
98    }
99}
100
101fn merge_insights_mixed(local: &InsightsStats, r: &RemoteEventAgg) -> InsightsStats {
102    let mut out = local.clone();
103    out.total_sessions = out.total_sessions.saturating_add(r.session_count);
104    out.total_events = out.total_events.saturating_add(r.event_count);
105    out.total_cost_usd_e6 = out.total_cost_usd_e6.saturating_add(r.total_cost_usd_e6);
106    out.sessions_with_cost = out.sessions_with_cost.saturating_add(r.sessions_with_cost);
107    if out.sessions_by_day.len() == r.sessions_by_day.len() {
108        for (i, (_, c)) in r.sessions_by_day.iter().enumerate() {
109            out.sessions_by_day[i].1 = out.sessions_by_day[i].1.saturating_add(*c);
110        }
111    }
112    out.top_tools = merge_count_rows(vec![
113        local.top_tools.clone(),
114        r.top_tools.iter().take(10).cloned().collect(),
115    ])
116    .into_iter()
117    .take(5)
118    .collect();
119    out
120}
121
122pub fn merge_insights_stats(
123    local: InsightsStats,
124    remote: &RemoteEventAgg,
125    source: DataSource,
126) -> InsightsStats {
127    if remote.event_count == 0 {
128        return local;
129    }
130    match source {
131        DataSource::Local => local,
132        DataSource::Provider => insights_from_remote(remote.clone()),
133        DataSource::Mixed => merge_insights_mixed(&local, remote),
134    }
135}
136
137/// Session count in guidance teaser: for `mixed`, add remote session cardinality; for `provider`, prefer the larger of local vs remote when remote is loaded.
138pub fn merge_guidance_sessions_in_window(
139    mut report: GuidanceReport,
140    remote: &RemoteEventAgg,
141    source: DataSource,
142) -> GuidanceReport {
143    if remote.event_count == 0 {
144        return report;
145    }
146    match source {
147        DataSource::Local => {}
148        DataSource::Provider => {
149            report.sessions_in_window = report.sessions_in_window.max(remote.session_count);
150        }
151        DataSource::Mixed => {
152            report.sessions_in_window = report
153                .sessions_in_window
154                .saturating_add(remote.session_count);
155        }
156    }
157    report
158}
159
160fn merge_one_tool_row(target: &mut Vec<RankedTool>, row: RankedTool) {
161    if let Some(existing) = target.iter_mut().find(|t| t.tool == row.tool) {
162        existing.calls = existing.calls.saturating_add(row.calls);
163        existing.total_tokens = existing.total_tokens.saturating_add(row.total_tokens);
164        existing.total_reasoning_tokens = existing
165            .total_reasoning_tokens
166            .saturating_add(row.total_reasoning_tokens);
167        existing.p50_ms = match (existing.p50_ms, row.p50_ms) {
168            (Some(a), Some(b)) => Some(a.max(b)),
169            (a, b) => a.or(b),
170        };
171        existing.p95_ms = match (existing.p95_ms, row.p95_ms) {
172            (Some(a), Some(b)) => Some(a.max(b)),
173            (a, b) => a.or(b),
174        };
175        return;
176    }
177    target.push(row);
178}
179
180/// Fold remote event-derived tool call / token stats into the tool ranking sections only (files stay local).
181pub fn apply_remote_to_metrics(
182    mut report: MetricsReport,
183    remote: &RemoteEventAgg,
184    source: DataSource,
185) -> MetricsReport {
186    if source == DataSource::Local || remote.event_count == 0 {
187        return report;
188    }
189    let token_map: HashMap<String, u64> = remote.tool_token_totals.iter().cloned().collect();
190    for (tool, calls) in &remote.top_tools {
191        let toks = token_map.get(tool).copied().unwrap_or(0);
192        let row = RankedTool {
193            tool: tool.clone(),
194            calls: *calls,
195            p50_ms: None,
196            p95_ms: None,
197            total_tokens: toks,
198            total_reasoning_tokens: 0,
199        };
200        merge_one_tool_row(&mut report.slowest_tools, row.clone());
201        merge_one_tool_row(&mut report.highest_token_tools, row);
202    }
203    report
204        .slowest_tools
205        .sort_by(|a, b| b.calls.cmp(&a.calls).then_with(|| a.tool.cmp(&b.tool)));
206    report.highest_token_tools.sort_by(|a, b| {
207        b.total_tokens
208            .cmp(&a.total_tokens)
209            .then_with(|| a.tool.cmp(&b.tool))
210    });
211    report.slowest_tools.truncate(10);
212    report.highest_token_tools.truncate(10);
213    report
214}