1use crate::core::config::Config;
5use crate::core::data_source::DataSource;
6use crate::metrics::types::{MetricsReport, RankedTool};
7use crate::store::{GuidanceReport, InsightsStats, RemoteEventAgg, Store, SummaryStats};
8use anyhow::Result;
9use std::collections::HashMap;
10use std::path::Path;
11
12pub fn try_remote_event_agg(
15 store: &Store,
16 cfg: &Config,
17 workspace: &Path,
18) -> Result<Option<RemoteEventAgg>> {
19 if cfg.sync.team_id.trim().is_empty() {
20 return Ok(None);
21 }
22 let Some(ctx) = crate::sync::ingest_ctx(cfg, workspace.to_path_buf()) else {
23 return Ok(None);
24 };
25 let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx) else {
26 return Ok(None);
27 };
28 let agg = store.remote_event_aggregate(&cfg.sync.team_id, &wh)?;
29 Ok(Some(agg))
30}
31
32fn merge_count_rows(rows: impl IntoIterator<Item = Vec<(String, u64)>>) -> Vec<(String, u64)> {
33 let mut m: HashMap<String, u64> = HashMap::new();
34 for set in rows {
35 for (k, v) in set {
36 *m.entry(k).or_insert(0) += v;
37 }
38 }
39 let mut v: Vec<_> = m.into_iter().collect();
40 v.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
41 v
42}
43
44fn summary_from_remote(r: &RemoteEventAgg) -> SummaryStats {
45 SummaryStats {
46 session_count: r.session_count,
47 total_cost_usd_e6: r.total_cost_usd_e6,
48 by_agent: r.by_agent.clone(),
49 by_model: r.by_model.clone(),
50 top_tools: r.top_tools.clone(),
51 }
52}
53
54pub fn merge_summary_stats(
56 local: SummaryStats,
57 remote: &RemoteEventAgg,
58 source: DataSource,
59) -> SummaryStats {
60 if remote.event_count == 0 {
61 return local;
62 }
63 match source {
64 DataSource::Local => local,
65 DataSource::Provider => summary_from_remote(remote),
66 DataSource::Mixed => SummaryStats {
67 session_count: local.session_count.saturating_add(remote.session_count),
68 total_cost_usd_e6: local
69 .total_cost_usd_e6
70 .saturating_add(remote.total_cost_usd_e6),
71 by_agent: merge_count_rows(vec![local.by_agent, remote.by_agent.clone()]),
72 by_model: merge_count_rows(vec![local.by_model, remote.by_model.clone()]),
73 top_tools: merge_count_rows(vec![local.top_tools, remote.top_tools.clone()]),
74 },
75 }
76}
77
78fn insights_from_remote(r: RemoteEventAgg) -> InsightsStats {
79 let RemoteEventAgg {
80 session_count,
81 event_count,
82 total_cost_usd_e6,
83 sessions_with_cost,
84 sessions_by_day,
85 top_tools,
86 ..
87 } = r;
88 let top_tools = top_tools.into_iter().take(5).collect();
89 InsightsStats {
90 total_sessions: session_count,
91 running_sessions: 0,
92 total_events: event_count,
93 sessions_by_day,
94 recent: vec![],
95 top_tools,
96 total_cost_usd_e6,
97 sessions_with_cost,
98 }
99}
100
101fn merge_insights_mixed(local: &InsightsStats, r: &RemoteEventAgg) -> InsightsStats {
102 let mut out = local.clone();
103 out.total_sessions = out.total_sessions.saturating_add(r.session_count);
104 out.total_events = out.total_events.saturating_add(r.event_count);
105 out.total_cost_usd_e6 = out.total_cost_usd_e6.saturating_add(r.total_cost_usd_e6);
106 out.sessions_with_cost = out.sessions_with_cost.saturating_add(r.sessions_with_cost);
107 if out.sessions_by_day.len() == r.sessions_by_day.len() {
108 for (i, (_, c)) in r.sessions_by_day.iter().enumerate() {
109 out.sessions_by_day[i].1 = out.sessions_by_day[i].1.saturating_add(*c);
110 }
111 }
112 out.top_tools = merge_count_rows(vec![
113 local.top_tools.clone(),
114 r.top_tools.iter().take(10).cloned().collect(),
115 ])
116 .into_iter()
117 .take(5)
118 .collect();
119 out
120}
121
122pub fn merge_insights_stats(
123 local: InsightsStats,
124 remote: &RemoteEventAgg,
125 source: DataSource,
126) -> InsightsStats {
127 if remote.event_count == 0 {
128 return local;
129 }
130 match source {
131 DataSource::Local => local,
132 DataSource::Provider => insights_from_remote(remote.clone()),
133 DataSource::Mixed => merge_insights_mixed(&local, remote),
134 }
135}
136
137pub fn merge_guidance_sessions_in_window(
139 mut report: GuidanceReport,
140 remote: &RemoteEventAgg,
141 source: DataSource,
142) -> GuidanceReport {
143 if remote.event_count == 0 {
144 return report;
145 }
146 match source {
147 DataSource::Local => {}
148 DataSource::Provider => {
149 report.sessions_in_window = report.sessions_in_window.max(remote.session_count);
150 }
151 DataSource::Mixed => {
152 report.sessions_in_window = report
153 .sessions_in_window
154 .saturating_add(remote.session_count);
155 }
156 }
157 report
158}
159
160fn merge_one_tool_row(target: &mut Vec<RankedTool>, row: RankedTool) {
161 if let Some(existing) = target.iter_mut().find(|t| t.tool == row.tool) {
162 existing.calls = existing.calls.saturating_add(row.calls);
163 existing.total_tokens = existing.total_tokens.saturating_add(row.total_tokens);
164 existing.total_reasoning_tokens = existing
165 .total_reasoning_tokens
166 .saturating_add(row.total_reasoning_tokens);
167 existing.p50_ms = match (existing.p50_ms, row.p50_ms) {
168 (Some(a), Some(b)) => Some(a.max(b)),
169 (a, b) => a.or(b),
170 };
171 existing.p95_ms = match (existing.p95_ms, row.p95_ms) {
172 (Some(a), Some(b)) => Some(a.max(b)),
173 (a, b) => a.or(b),
174 };
175 return;
176 }
177 target.push(row);
178}
179
180pub fn apply_remote_to_metrics(
182 mut report: MetricsReport,
183 remote: &RemoteEventAgg,
184 source: DataSource,
185) -> MetricsReport {
186 if source == DataSource::Local || remote.event_count == 0 {
187 return report;
188 }
189 let token_map: HashMap<String, u64> = remote.tool_token_totals.iter().cloned().collect();
190 for (tool, calls) in &remote.top_tools {
191 let toks = token_map.get(tool).copied().unwrap_or(0);
192 let row = RankedTool {
193 tool: tool.clone(),
194 calls: *calls,
195 p50_ms: None,
196 p95_ms: None,
197 total_tokens: toks,
198 total_reasoning_tokens: 0,
199 };
200 merge_one_tool_row(&mut report.slowest_tools, row.clone());
201 merge_one_tool_row(&mut report.highest_token_tools, row);
202 }
203 report
204 .slowest_tools
205 .sort_by(|a, b| b.calls.cmp(&a.calls).then_with(|| a.tool.cmp(&b.tool)));
206 report.highest_token_tools.sort_by(|a, b| {
207 b.total_tokens
208 .cmp(&a.total_tokens)
209 .then_with(|| a.tool.cmp(&b.tool))
210 });
211 report.slowest_tools.truncate(10);
212 report.highest_token_tools.truncate(10);
213 report
214}