Skip to main content

lean_ctx/http_server/
context_views.rs

1use std::collections::{HashMap, HashSet};
2
3#[cfg(feature = "team-server")]
4use axum::Extension;
5use axum::{extract::Query, http::StatusCode, response::IntoResponse, Json};
6use serde::{Deserialize, Serialize};
7
8use crate::core::context_os::redaction::{redact_event_payload, RedactionLevel};
9use crate::core::context_os::{ContextEventKindV1, ContextEventV1};
10
11#[cfg(feature = "team-server")]
12use super::team::TeamRequestContext;
13
14/// When running behind the team server, the workspace is bound to the
15/// authenticated token's header. The query parameter is ignored.
16/// In standalone mode (no TeamRequestContext), the query parameter is used.
17fn resolve_workspace(
18    query_ws: Option<String>,
19    #[cfg(feature = "team-server")] team_ctx: Option<&TeamRequestContext>,
20    #[cfg(not(feature = "team-server"))] _team_ctx: Option<&()>,
21) -> String {
22    #[cfg(feature = "team-server")]
23    if let Some(ctx) = team_ctx {
24        return ctx.workspace_id.clone();
25    }
26    query_ws.unwrap_or_else(|| "default".to_string())
27}
28
29#[derive(Deserialize)]
30pub struct SummaryQuery {
31    #[serde(rename = "workspaceId")]
32    pub workspace_id: Option<String>,
33    #[serde(rename = "channelId")]
34    pub channel_id: Option<String>,
35    pub limit: Option<usize>,
36}
37
38#[derive(Serialize)]
39#[serde(rename_all = "camelCase")]
40pub struct ContextSummary {
41    pub workspace_id: String,
42    pub channel_id: String,
43    pub total_events: usize,
44    pub latest_version: i64,
45    pub active_agents: Vec<String>,
46    pub recent_decisions: Vec<DecisionSummary>,
47    pub knowledge_delta: Vec<KnowledgeDelta>,
48    pub conflict_alerts: Vec<ConflictAlert>,
49    pub event_counts_by_kind: HashMap<String, usize>,
50}
51
52#[derive(Clone, Serialize)]
53#[serde(rename_all = "camelCase")]
54pub struct DecisionSummary {
55    pub agent: String,
56    pub tool: String,
57    pub action: Option<String>,
58    pub reasoning: Option<String>,
59    pub timestamp: String,
60}
61
62#[derive(Serialize)]
63#[serde(rename_all = "camelCase")]
64pub struct KnowledgeDelta {
65    pub category: String,
66    pub key: String,
67    pub agent: String,
68    pub timestamp: String,
69}
70
71#[derive(Serialize)]
72#[serde(rename_all = "camelCase")]
73pub struct ConflictAlert {
74    pub category: String,
75    pub key: String,
76    pub agents: Vec<String>,
77}
78
79#[derive(Deserialize)]
80pub struct SearchQuery {
81    pub q: String,
82    #[serde(rename = "workspaceId")]
83    pub workspace_id: Option<String>,
84    #[serde(rename = "channelId")]
85    pub channel_id: Option<String>,
86    pub limit: Option<usize>,
87}
88
89pub async fn v1_events_search(
90    #[cfg(feature = "team-server")] team_ctx: Option<Extension<TeamRequestContext>>,
91    Query(q): Query<SearchQuery>,
92) -> impl IntoResponse {
93    #[cfg(feature = "team-server")]
94    let ws = resolve_workspace(q.workspace_id, team_ctx.as_ref().map(|e| &e.0));
95    #[cfg(not(feature = "team-server"))]
96    let ws = resolve_workspace(q.workspace_id, None);
97    let limit = q.limit.unwrap_or(20).min(100);
98
99    let rt = crate::core::context_os::runtime();
100    let mut results = rt.bus.search(&ws, q.channel_id.as_deref(), &q.q, limit);
101    for ev in &mut results {
102        redact_event_payload(ev, RedactionLevel::Summary);
103    }
104
105    (
106        StatusCode::OK,
107        Json(serde_json::json!({
108            "query": q.q,
109            "workspaceId": ws,
110            "channelId": q.channel_id,
111            "results": results,
112            "count": results.len(),
113        })),
114    )
115}
116
117#[derive(Deserialize)]
118pub struct LineageQuery {
119    pub id: i64,
120    pub depth: Option<usize>,
121    #[serde(rename = "workspaceId")]
122    pub workspace_id: Option<String>,
123}
124
125pub async fn v1_event_lineage(
126    #[cfg(feature = "team-server")] team_ctx: Option<Extension<TeamRequestContext>>,
127    Query(q): Query<LineageQuery>,
128) -> impl IntoResponse {
129    #[cfg(feature = "team-server")]
130    let ws = resolve_workspace(q.workspace_id.clone(), team_ctx.as_ref().map(|e| &e.0));
131    #[cfg(not(feature = "team-server"))]
132    let ws = resolve_workspace(q.workspace_id.clone(), None);
133    let depth = q.depth.unwrap_or(20).min(50);
134
135    let rt = crate::core::context_os::runtime();
136    let mut chain = rt.bus.lineage(q.id, &ws, depth);
137    for ev in &mut chain {
138        redact_event_payload(ev, RedactionLevel::Summary);
139    }
140
141    (
142        StatusCode::OK,
143        Json(serde_json::json!({
144            "eventId": q.id,
145            "chain": chain,
146            "depth": chain.len(),
147        })),
148    )
149}
150
151pub async fn v1_context_summary(
152    #[cfg(feature = "team-server")] team_ctx: Option<Extension<TeamRequestContext>>,
153    Query(q): Query<SummaryQuery>,
154) -> impl IntoResponse {
155    #[cfg(feature = "team-server")]
156    let ws = resolve_workspace(q.workspace_id, team_ctx.as_ref().map(|e| &e.0));
157    #[cfg(not(feature = "team-server"))]
158    let ws = resolve_workspace(q.workspace_id, None);
159    let ch = q.channel_id.unwrap_or_else(|| "default".to_string());
160    let limit = q.limit.unwrap_or(100).min(500);
161
162    let rt = crate::core::context_os::runtime();
163    let events = rt.bus.read(&ws, &ch, 0, limit);
164
165    let summary = build_summary(&ws, &ch, &events);
166    (
167        StatusCode::OK,
168        Json(serde_json::to_value(summary).unwrap_or_default()),
169    )
170}
171
172fn build_summary(ws: &str, ch: &str, events: &[ContextEventV1]) -> ContextSummary {
173    let mut agents: HashSet<String> = HashSet::new();
174    let mut kind_counts: HashMap<String, usize> = HashMap::new();
175    let mut decisions = Vec::new();
176    let mut knowledge_deltas = Vec::new();
177    let mut latest_version: i64 = 0;
178
179    // Track knowledge writes per category/key for conflict detection.
180    let mut knowledge_writers: HashMap<(String, String), HashSet<String>> = HashMap::new();
181
182    for ev in events {
183        if let Some(ref actor) = ev.actor {
184            agents.insert(actor.clone());
185        }
186        *kind_counts.entry(ev.kind.clone()).or_insert(0) += 1;
187        latest_version = latest_version.max(ev.version);
188
189        let p = &ev.payload;
190        let tool = p
191            .get("tool")
192            .and_then(|v| v.as_str())
193            .unwrap_or("")
194            .to_string();
195        let action = p.get("action").and_then(|v| v.as_str()).map(String::from);
196        let reasoning = p
197            .get("reasoning")
198            .and_then(|v| v.as_str())
199            .map(String::from);
200
201        if ev.kind == ContextEventKindV1::SessionMutated.as_str()
202            || ev.kind == ContextEventKindV1::KnowledgeRemembered.as_str()
203        {
204            decisions.push(DecisionSummary {
205                agent: ev.actor.clone().unwrap_or_default(),
206                tool: tool.clone(),
207                action: action.clone(),
208                reasoning,
209                timestamp: ev.timestamp.to_rfc3339(),
210            });
211        }
212
213        if ev.kind == ContextEventKindV1::KnowledgeRemembered.as_str() {
214            let cat = p
215                .get("category")
216                .and_then(|v| v.as_str())
217                .unwrap_or("")
218                .to_string();
219            let key = p
220                .get("key")
221                .and_then(|v| v.as_str())
222                .unwrap_or("")
223                .to_string();
224            knowledge_deltas.push(KnowledgeDelta {
225                category: cat.clone(),
226                key: key.clone(),
227                agent: ev.actor.clone().unwrap_or_default(),
228                timestamp: ev.timestamp.to_rfc3339(),
229            });
230
231            if let Some(ref actor) = ev.actor {
232                knowledge_writers
233                    .entry((cat, key))
234                    .or_default()
235                    .insert(actor.clone());
236            }
237        }
238    }
239
240    let conflict_alerts: Vec<ConflictAlert> = knowledge_writers
241        .into_iter()
242        .filter(|(_, writers)| writers.len() > 1)
243        .map(|((cat, key), writers)| ConflictAlert {
244            category: cat,
245            key,
246            agents: writers.into_iter().collect(),
247        })
248        .collect();
249
250    let recent_limit = 10;
251    let decisions: Vec<_> = if decisions.len() > recent_limit {
252        decisions[decisions.len() - recent_limit..].to_vec()
253    } else {
254        decisions
255    };
256
257    ContextSummary {
258        workspace_id: ws.to_string(),
259        channel_id: ch.to_string(),
260        total_events: events.len(),
261        latest_version,
262        active_agents: agents.into_iter().collect(),
263        recent_decisions: decisions,
264        knowledge_delta: knowledge_deltas,
265        conflict_alerts,
266        event_counts_by_kind: kind_counts,
267    }
268}