1use std::collections::{HashMap, HashSet};
2
3#[cfg(feature = "team-server")]
4use axum::Extension;
5use axum::{extract::Query, http::StatusCode, response::IntoResponse, Json};
6use serde::{Deserialize, Serialize};
7
8use crate::core::context_os::redaction::{redact_event_payload, RedactionLevel};
9use crate::core::context_os::{ContextEventKindV1, ContextEventV1};
10
11#[cfg(feature = "team-server")]
12use super::team::TeamRequestContext;
13
14fn resolve_workspace(
18 query_ws: Option<String>,
19 #[cfg(feature = "team-server")] team_ctx: Option<&TeamRequestContext>,
20 #[cfg(not(feature = "team-server"))] _team_ctx: Option<&()>,
21) -> String {
22 #[cfg(feature = "team-server")]
23 if let Some(ctx) = team_ctx {
24 return ctx.workspace_id.clone();
25 }
26 query_ws.unwrap_or_else(|| "default".to_string())
27}
28
29#[derive(Deserialize)]
30pub struct SummaryQuery {
31 #[serde(rename = "workspaceId")]
32 pub workspace_id: Option<String>,
33 #[serde(rename = "channelId")]
34 pub channel_id: Option<String>,
35 pub limit: Option<usize>,
36}
37
38#[derive(Serialize)]
39#[serde(rename_all = "camelCase")]
40pub struct ContextSummary {
41 pub workspace_id: String,
42 pub channel_id: String,
43 pub total_events: usize,
44 pub latest_version: i64,
45 pub active_agents: Vec<String>,
46 pub recent_decisions: Vec<DecisionSummary>,
47 pub knowledge_delta: Vec<KnowledgeDelta>,
48 pub conflict_alerts: Vec<ConflictAlert>,
49 pub event_counts_by_kind: HashMap<String, usize>,
50}
51
52#[derive(Clone, Serialize)]
53#[serde(rename_all = "camelCase")]
54pub struct DecisionSummary {
55 pub agent: String,
56 pub tool: String,
57 pub action: Option<String>,
58 pub reasoning: Option<String>,
59 pub timestamp: String,
60}
61
62#[derive(Serialize)]
63#[serde(rename_all = "camelCase")]
64pub struct KnowledgeDelta {
65 pub category: String,
66 pub key: String,
67 pub agent: String,
68 pub timestamp: String,
69}
70
71#[derive(Serialize)]
72#[serde(rename_all = "camelCase")]
73pub struct ConflictAlert {
74 pub category: String,
75 pub key: String,
76 pub agents: Vec<String>,
77}
78
79#[derive(Deserialize)]
80pub struct SearchQuery {
81 pub q: String,
82 #[serde(rename = "workspaceId")]
83 pub workspace_id: Option<String>,
84 #[serde(rename = "channelId")]
85 pub channel_id: Option<String>,
86 pub limit: Option<usize>,
87}
88
89pub async fn v1_events_search(
90 #[cfg(feature = "team-server")] team_ctx: Option<Extension<TeamRequestContext>>,
91 Query(q): Query<SearchQuery>,
92) -> impl IntoResponse {
93 #[cfg(feature = "team-server")]
94 let ws = resolve_workspace(q.workspace_id, team_ctx.as_ref().map(|e| &e.0));
95 #[cfg(not(feature = "team-server"))]
96 let ws = resolve_workspace(q.workspace_id, None);
97 let limit = q.limit.unwrap_or(20).min(100);
98
99 let rt = crate::core::context_os::runtime();
100 let mut results = rt.bus.search(&ws, q.channel_id.as_deref(), &q.q, limit);
101 for ev in &mut results {
102 redact_event_payload(ev, RedactionLevel::Summary);
103 }
104
105 (
106 StatusCode::OK,
107 Json(serde_json::json!({
108 "query": q.q,
109 "workspaceId": ws,
110 "channelId": q.channel_id,
111 "results": results,
112 "count": results.len(),
113 })),
114 )
115}
116
117#[derive(Deserialize)]
118pub struct LineageQuery {
119 pub id: i64,
120 pub depth: Option<usize>,
121 #[serde(rename = "workspaceId")]
122 pub workspace_id: Option<String>,
123}
124
125pub async fn v1_event_lineage(
126 #[cfg(feature = "team-server")] team_ctx: Option<Extension<TeamRequestContext>>,
127 Query(q): Query<LineageQuery>,
128) -> impl IntoResponse {
129 #[cfg(feature = "team-server")]
130 let ws = resolve_workspace(q.workspace_id.clone(), team_ctx.as_ref().map(|e| &e.0));
131 #[cfg(not(feature = "team-server"))]
132 let ws = resolve_workspace(q.workspace_id.clone(), None);
133 let depth = q.depth.unwrap_or(20).min(50);
134
135 let rt = crate::core::context_os::runtime();
136 let mut chain = rt.bus.lineage(q.id, &ws, depth);
137 for ev in &mut chain {
138 redact_event_payload(ev, RedactionLevel::Summary);
139 }
140
141 (
142 StatusCode::OK,
143 Json(serde_json::json!({
144 "eventId": q.id,
145 "chain": chain,
146 "depth": chain.len(),
147 })),
148 )
149}
150
151pub async fn v1_context_summary(
152 #[cfg(feature = "team-server")] team_ctx: Option<Extension<TeamRequestContext>>,
153 Query(q): Query<SummaryQuery>,
154) -> impl IntoResponse {
155 #[cfg(feature = "team-server")]
156 let ws = resolve_workspace(q.workspace_id, team_ctx.as_ref().map(|e| &e.0));
157 #[cfg(not(feature = "team-server"))]
158 let ws = resolve_workspace(q.workspace_id, None);
159 let ch = q.channel_id.unwrap_or_else(|| "default".to_string());
160 let limit = q.limit.unwrap_or(100).min(500);
161
162 let rt = crate::core::context_os::runtime();
163 let events = rt.bus.read(&ws, &ch, 0, limit);
164
165 let summary = build_summary(&ws, &ch, &events);
166 (
167 StatusCode::OK,
168 Json(serde_json::to_value(summary).unwrap_or_default()),
169 )
170}
171
172fn build_summary(ws: &str, ch: &str, events: &[ContextEventV1]) -> ContextSummary {
173 let mut agents: HashSet<String> = HashSet::new();
174 let mut kind_counts: HashMap<String, usize> = HashMap::new();
175 let mut decisions = Vec::new();
176 let mut knowledge_deltas = Vec::new();
177 let mut latest_version: i64 = 0;
178
179 let mut knowledge_writers: HashMap<(String, String), HashSet<String>> = HashMap::new();
181
182 for ev in events {
183 if let Some(ref actor) = ev.actor {
184 agents.insert(actor.clone());
185 }
186 *kind_counts.entry(ev.kind.clone()).or_insert(0) += 1;
187 latest_version = latest_version.max(ev.version);
188
189 let p = &ev.payload;
190 let tool = p
191 .get("tool")
192 .and_then(|v| v.as_str())
193 .unwrap_or("")
194 .to_string();
195 let action = p.get("action").and_then(|v| v.as_str()).map(String::from);
196 let reasoning = p
197 .get("reasoning")
198 .and_then(|v| v.as_str())
199 .map(String::from);
200
201 if ev.kind == ContextEventKindV1::SessionMutated.as_str()
202 || ev.kind == ContextEventKindV1::KnowledgeRemembered.as_str()
203 {
204 decisions.push(DecisionSummary {
205 agent: ev.actor.clone().unwrap_or_default(),
206 tool: tool.clone(),
207 action: action.clone(),
208 reasoning,
209 timestamp: ev.timestamp.to_rfc3339(),
210 });
211 }
212
213 if ev.kind == ContextEventKindV1::KnowledgeRemembered.as_str() {
214 let cat = p
215 .get("category")
216 .and_then(|v| v.as_str())
217 .unwrap_or("")
218 .to_string();
219 let key = p
220 .get("key")
221 .and_then(|v| v.as_str())
222 .unwrap_or("")
223 .to_string();
224 knowledge_deltas.push(KnowledgeDelta {
225 category: cat.clone(),
226 key: key.clone(),
227 agent: ev.actor.clone().unwrap_or_default(),
228 timestamp: ev.timestamp.to_rfc3339(),
229 });
230
231 if let Some(ref actor) = ev.actor {
232 knowledge_writers
233 .entry((cat, key))
234 .or_default()
235 .insert(actor.clone());
236 }
237 }
238 }
239
240 let conflict_alerts: Vec<ConflictAlert> = knowledge_writers
241 .into_iter()
242 .filter(|(_, writers)| writers.len() > 1)
243 .map(|((cat, key), writers)| ConflictAlert {
244 category: cat,
245 key,
246 agents: writers.into_iter().collect(),
247 })
248 .collect();
249
250 let recent_limit = 10;
251 let decisions: Vec<_> = if decisions.len() > recent_limit {
252 decisions[decisions.len() - recent_limit..].to_vec()
253 } else {
254 decisions
255 };
256
257 ContextSummary {
258 workspace_id: ws.to_string(),
259 channel_id: ch.to_string(),
260 total_events: events.len(),
261 latest_version,
262 active_agents: agents.into_iter().collect(),
263 recent_decisions: decisions,
264 knowledge_delta: knowledge_deltas,
265 conflict_alerts,
266 event_counts_by_kind: kind_counts,
267 }
268}