1use std::collections::{HashMap, HashSet};
2
3#[cfg(feature = "team-server")]
4use axum::Extension;
5use axum::{extract::Query, http::StatusCode, response::IntoResponse, Json};
6use serde::{Deserialize, Serialize};
7
8use crate::core::context_os::redaction::{redact_event_payload, RedactionLevel};
9use crate::core::context_os::{ContextEventKindV1, ContextEventV1};
10
11#[cfg(feature = "team-server")]
12use super::team::TeamRequestContext;
13
14fn resolve_workspace(
18 query_ws: Option<String>,
19 #[cfg(feature = "team-server")] team_ctx: Option<&TeamRequestContext>,
20 #[cfg(not(feature = "team-server"))] _team_ctx: Option<&()>,
21) -> String {
22 #[cfg(feature = "team-server")]
23 if let Some(ctx) = team_ctx {
24 return ctx.workspace_id.clone();
25 }
26 query_ws.unwrap_or_else(|| "default".to_string())
27}
28
29#[derive(Deserialize)]
30pub struct SummaryQuery {
31 #[serde(rename = "workspaceId")]
32 pub workspace_id: Option<String>,
33 #[serde(rename = "channelId")]
34 pub channel_id: Option<String>,
35 pub limit: Option<usize>,
36}
37
38#[derive(Serialize)]
39#[serde(rename_all = "camelCase")]
40pub struct ContextSummary {
41 pub workspace_id: String,
42 pub channel_id: String,
43 pub total_events: usize,
44 pub latest_version: i64,
45 pub active_agents: Vec<String>,
46 pub recent_decisions: Vec<DecisionSummary>,
47 pub knowledge_delta: Vec<KnowledgeDelta>,
48 pub conflict_alerts: Vec<ConflictAlert>,
49 pub event_counts_by_kind: HashMap<String, usize>,
50}
51
52#[derive(Clone, Serialize)]
53#[serde(rename_all = "camelCase")]
54pub struct DecisionSummary {
55 pub agent: String,
56 pub tool: String,
57 pub action: Option<String>,
58 pub reasoning: Option<String>,
59 pub timestamp: String,
60}
61
62#[derive(Serialize)]
63#[serde(rename_all = "camelCase")]
64pub struct KnowledgeDelta {
65 pub category: String,
66 pub key: String,
67 pub agent: String,
68 pub timestamp: String,
69}
70
71#[derive(Serialize)]
72#[serde(rename_all = "camelCase")]
73pub struct ConflictAlert {
74 pub category: String,
75 pub key: String,
76 pub agents: Vec<String>,
77}
78
79#[derive(Deserialize)]
80pub struct SearchQuery {
81 pub q: String,
82 #[serde(rename = "workspaceId")]
83 pub workspace_id: Option<String>,
84 #[serde(rename = "channelId")]
85 pub channel_id: Option<String>,
86 pub limit: Option<usize>,
87}
88
89pub async fn v1_events_search(
90 #[cfg(feature = "team-server")] team_ctx: Option<Extension<TeamRequestContext>>,
91 Query(q): Query<SearchQuery>,
92) -> impl IntoResponse {
93 #[cfg(feature = "team-server")]
94 let ws = resolve_workspace(q.workspace_id, team_ctx.as_ref().map(|e| &e.0));
95 #[cfg(not(feature = "team-server"))]
96 let ws = resolve_workspace(q.workspace_id, None);
97 let limit = q.limit.unwrap_or(20).min(100);
98
99 let rt = crate::core::context_os::runtime();
100 let mut results = rt.bus.search(&ws, q.channel_id.as_deref(), &q.q, limit);
101 for ev in &mut results {
102 redact_event_payload(ev, RedactionLevel::Summary);
103 }
104
105 (
106 StatusCode::OK,
107 Json(serde_json::json!({
108 "query": q.q,
109 "workspaceId": ws,
110 "channelId": q.channel_id,
111 "results": results,
112 "count": results.len(),
113 })),
114 )
115}
116
117#[derive(Deserialize)]
118pub struct LineageQuery {
119 pub id: i64,
120 pub depth: Option<usize>,
121 #[serde(rename = "workspaceId")]
122 pub workspace_id: Option<String>,
123}
124
125pub async fn v1_event_lineage(
126 #[cfg(feature = "team-server")] team_ctx: Option<Extension<TeamRequestContext>>,
127 Query(q): Query<LineageQuery>,
128) -> impl IntoResponse {
129 #[cfg(feature = "team-server")]
130 let ws = resolve_workspace(q.workspace_id.clone(), team_ctx.as_ref().map(|e| &e.0));
131 #[cfg(not(feature = "team-server"))]
132 let ws = resolve_workspace(q.workspace_id.clone(), None);
133 let depth = q.depth.unwrap_or(20).min(50);
134
135 let rt = crate::core::context_os::runtime();
136 let mut chain = rt.bus.lineage(q.id, &ws, depth);
137 for ev in &mut chain {
138 redact_event_payload(ev, RedactionLevel::Summary);
139 }
140
141 (
142 StatusCode::OK,
143 Json(serde_json::json!({
144 "eventId": q.id,
145 "chain": chain,
146 "depth": chain.len(),
147 })),
148 )
149}
150
151pub async fn v1_context_summary(
152 #[cfg(feature = "team-server")] team_ctx: Option<Extension<TeamRequestContext>>,
153 Query(q): Query<SummaryQuery>,
154) -> impl IntoResponse {
155 #[cfg(feature = "team-server")]
156 let ws = resolve_workspace(q.workspace_id, team_ctx.as_ref().map(|e| &e.0));
157 #[cfg(not(feature = "team-server"))]
158 let ws = resolve_workspace(q.workspace_id, None);
159 let ch = q.channel_id.unwrap_or_else(|| "default".to_string());
160 let limit = q.limit.unwrap_or(100).min(500);
161
162 let rt = crate::core::context_os::runtime();
163 let mut events = rt.bus.read(&ws, &ch, 0, limit);
164 for ev in &mut events {
165 redact_event_payload(ev, RedactionLevel::Summary);
166 }
167
168 let summary = build_summary(&ws, &ch, &events);
169 (
170 StatusCode::OK,
171 Json(serde_json::to_value(summary).unwrap_or_default()),
172 )
173}
174
175fn build_summary(ws: &str, ch: &str, events: &[ContextEventV1]) -> ContextSummary {
176 let mut agents: HashSet<String> = HashSet::new();
177 let mut kind_counts: HashMap<String, usize> = HashMap::new();
178 let mut decisions = Vec::new();
179 let mut knowledge_deltas = Vec::new();
180 let mut latest_version: i64 = 0;
181
182 let mut knowledge_writers: HashMap<(String, String), HashSet<String>> = HashMap::new();
184
185 for ev in events {
186 if let Some(ref actor) = ev.actor {
187 agents.insert(actor.clone());
188 }
189 *kind_counts.entry(ev.kind.clone()).or_insert(0) += 1;
190 latest_version = latest_version.max(ev.version);
191
192 let p = &ev.payload;
193 let tool = p
194 .get("tool")
195 .and_then(|v| v.as_str())
196 .unwrap_or("")
197 .to_string();
198 let action = p.get("action").and_then(|v| v.as_str()).map(String::from);
199 let reasoning = {
200 let mut payload_clone = p.clone();
201 crate::core::context_os::redact_payload_value(
202 &mut payload_clone,
203 crate::core::context_os::RedactionLevel::Summary,
204 );
205 payload_clone
206 .get("reasoning")
207 .and_then(|v| v.as_str())
208 .map(String::from)
209 };
210
211 if ev.kind == ContextEventKindV1::SessionMutated.as_str()
212 || ev.kind == ContextEventKindV1::KnowledgeRemembered.as_str()
213 {
214 decisions.push(DecisionSummary {
215 agent: ev.actor.clone().unwrap_or_default(),
216 tool: tool.clone(),
217 action: action.clone(),
218 reasoning,
219 timestamp: ev.timestamp.to_rfc3339(),
220 });
221 }
222
223 if ev.kind == ContextEventKindV1::KnowledgeRemembered.as_str() {
224 let cat = p
225 .get("category")
226 .and_then(|v| v.as_str())
227 .unwrap_or("")
228 .to_string();
229 let key = p
230 .get("key")
231 .and_then(|v| v.as_str())
232 .unwrap_or("")
233 .to_string();
234 knowledge_deltas.push(KnowledgeDelta {
235 category: cat.clone(),
236 key: key.clone(),
237 agent: ev.actor.clone().unwrap_or_default(),
238 timestamp: ev.timestamp.to_rfc3339(),
239 });
240
241 if let Some(ref actor) = ev.actor {
242 knowledge_writers
243 .entry((cat, key))
244 .or_default()
245 .insert(actor.clone());
246 }
247 }
248 }
249
250 let conflict_alerts: Vec<ConflictAlert> = knowledge_writers
251 .into_iter()
252 .filter(|(_, writers)| writers.len() > 1)
253 .map(|((cat, key), writers)| ConflictAlert {
254 category: cat,
255 key,
256 agents: writers.into_iter().collect(),
257 })
258 .collect();
259
260 let recent_limit = 10;
261 let decisions: Vec<_> = if decisions.len() > recent_limit {
262 decisions[decisions.len() - recent_limit..].to_vec()
263 } else {
264 decisions
265 };
266
267 ContextSummary {
268 workspace_id: ws.to_string(),
269 channel_id: ch.to_string(),
270 total_events: events.len(),
271 latest_version,
272 active_agents: agents.into_iter().collect(),
273 recent_decisions: decisions,
274 knowledge_delta: knowledge_deltas,
275 conflict_alerts,
276 event_counts_by_kind: kind_counts,
277 }
278}