Skip to main content

lean_ctx/http_server/
context_views.rs

1use std::collections::{HashMap, HashSet};
2
3use axum::Extension;
4use axum::{extract::Query, http::StatusCode, response::IntoResponse, Json};
5use serde::{Deserialize, Serialize};
6
7use crate::core::context_os::redaction::{redact_event_payload, RedactionLevel};
8use crate::core::context_os::{ContextEventKindV1, ContextEventV1};
9
10use super::team::TeamRequestContext;
11
12/// When running behind the team server, the workspace is bound to the
13/// authenticated token's header. The query parameter is ignored.
14/// In standalone mode (no TeamRequestContext), the query parameter is used.
15fn resolve_workspace(query_ws: Option<String>, team_ctx: Option<&TeamRequestContext>) -> String {
16    if let Some(ctx) = team_ctx {
17        return ctx.workspace_id.clone();
18    }
19    query_ws.unwrap_or_else(|| "default".to_string())
20}
21
22#[derive(Deserialize)]
23pub struct SummaryQuery {
24    #[serde(rename = "workspaceId")]
25    pub workspace_id: Option<String>,
26    #[serde(rename = "channelId")]
27    pub channel_id: Option<String>,
28    pub limit: Option<usize>,
29}
30
31#[derive(Serialize)]
32#[serde(rename_all = "camelCase")]
33pub struct ContextSummary {
34    pub workspace_id: String,
35    pub channel_id: String,
36    pub total_events: usize,
37    pub latest_version: i64,
38    pub active_agents: Vec<String>,
39    pub recent_decisions: Vec<DecisionSummary>,
40    pub knowledge_delta: Vec<KnowledgeDelta>,
41    pub conflict_alerts: Vec<ConflictAlert>,
42    pub event_counts_by_kind: HashMap<String, usize>,
43}
44
45#[derive(Clone, Serialize)]
46#[serde(rename_all = "camelCase")]
47pub struct DecisionSummary {
48    pub agent: String,
49    pub tool: String,
50    pub action: Option<String>,
51    pub reasoning: Option<String>,
52    pub timestamp: String,
53}
54
55#[derive(Serialize)]
56#[serde(rename_all = "camelCase")]
57pub struct KnowledgeDelta {
58    pub category: String,
59    pub key: String,
60    pub agent: String,
61    pub timestamp: String,
62}
63
64#[derive(Serialize)]
65#[serde(rename_all = "camelCase")]
66pub struct ConflictAlert {
67    pub category: String,
68    pub key: String,
69    pub agents: Vec<String>,
70}
71
72#[derive(Deserialize)]
73pub struct SearchQuery {
74    pub q: String,
75    #[serde(rename = "workspaceId")]
76    pub workspace_id: Option<String>,
77    #[serde(rename = "channelId")]
78    pub channel_id: Option<String>,
79    pub limit: Option<usize>,
80}
81
82pub async fn v1_events_search(
83    team_ctx: Option<Extension<TeamRequestContext>>,
84    Query(q): Query<SearchQuery>,
85) -> impl IntoResponse {
86    let ws = resolve_workspace(q.workspace_id, team_ctx.as_ref().map(|e| &e.0));
87    let limit = q.limit.unwrap_or(20).min(100);
88
89    let rt = crate::core::context_os::runtime();
90    let mut results = rt.bus.search(&ws, q.channel_id.as_deref(), &q.q, limit);
91    for ev in &mut results {
92        redact_event_payload(ev, RedactionLevel::Summary);
93    }
94
95    (
96        StatusCode::OK,
97        Json(serde_json::json!({
98            "query": q.q,
99            "workspaceId": ws,
100            "channelId": q.channel_id,
101            "results": results,
102            "count": results.len(),
103        })),
104    )
105}
106
107#[derive(Deserialize)]
108pub struct LineageQuery {
109    pub id: i64,
110    pub depth: Option<usize>,
111    #[serde(rename = "workspaceId")]
112    pub workspace_id: Option<String>,
113}
114
115pub async fn v1_event_lineage(
116    team_ctx: Option<Extension<TeamRequestContext>>,
117    Query(q): Query<LineageQuery>,
118) -> impl IntoResponse {
119    let ws = resolve_workspace(q.workspace_id.clone(), team_ctx.as_ref().map(|e| &e.0));
120    let depth = q.depth.unwrap_or(20).min(50);
121
122    let rt = crate::core::context_os::runtime();
123    let mut chain = rt.bus.lineage(q.id, &ws, depth);
124    for ev in &mut chain {
125        redact_event_payload(ev, RedactionLevel::Summary);
126    }
127
128    (
129        StatusCode::OK,
130        Json(serde_json::json!({
131            "eventId": q.id,
132            "chain": chain,
133            "depth": chain.len(),
134        })),
135    )
136}
137
138pub async fn v1_context_summary(
139    team_ctx: Option<Extension<TeamRequestContext>>,
140    Query(q): Query<SummaryQuery>,
141) -> impl IntoResponse {
142    let ws = resolve_workspace(q.workspace_id, team_ctx.as_ref().map(|e| &e.0));
143    let ch = q.channel_id.unwrap_or_else(|| "default".to_string());
144    let limit = q.limit.unwrap_or(100).min(500);
145
146    let rt = crate::core::context_os::runtime();
147    let mut events = rt.bus.read(&ws, &ch, 0, limit);
148    for ev in &mut events {
149        redact_event_payload(ev, RedactionLevel::Summary);
150    }
151
152    let summary = build_summary(&ws, &ch, &events);
153    (
154        StatusCode::OK,
155        Json(serde_json::to_value(summary).unwrap_or_default()),
156    )
157}
158
159fn build_summary(ws: &str, ch: &str, events: &[ContextEventV1]) -> ContextSummary {
160    let mut agents: HashSet<String> = HashSet::new();
161    let mut kind_counts: HashMap<String, usize> = HashMap::new();
162    let mut decisions = Vec::new();
163    let mut knowledge_deltas = Vec::new();
164    let mut latest_version: i64 = 0;
165
166    // Track knowledge writes per category/key for conflict detection.
167    let mut knowledge_writers: HashMap<(String, String), HashSet<String>> = HashMap::new();
168
169    for ev in events {
170        if let Some(ref actor) = ev.actor {
171            agents.insert(actor.clone());
172        }
173        *kind_counts.entry(ev.kind.clone()).or_insert(0) += 1;
174        latest_version = latest_version.max(ev.version);
175
176        let p = &ev.payload;
177        let tool = p
178            .get("tool")
179            .and_then(|v| v.as_str())
180            .unwrap_or("")
181            .to_string();
182        let action = p.get("action").and_then(|v| v.as_str()).map(String::from);
183        let reasoning = {
184            let mut payload_clone = p.clone();
185            crate::core::context_os::redact_payload_value(
186                &mut payload_clone,
187                crate::core::context_os::RedactionLevel::Summary,
188            );
189            payload_clone
190                .get("reasoning")
191                .and_then(|v| v.as_str())
192                .map(String::from)
193        };
194
195        if ev.kind == ContextEventKindV1::SessionMutated.as_str()
196            || ev.kind == ContextEventKindV1::KnowledgeRemembered.as_str()
197        {
198            decisions.push(DecisionSummary {
199                agent: ev.actor.clone().unwrap_or_default(),
200                tool: tool.clone(),
201                action: action.clone(),
202                reasoning,
203                timestamp: ev.timestamp.to_rfc3339(),
204            });
205        }
206
207        if ev.kind == ContextEventKindV1::KnowledgeRemembered.as_str() {
208            let cat = p
209                .get("category")
210                .and_then(|v| v.as_str())
211                .unwrap_or("")
212                .to_string();
213            let key = p
214                .get("key")
215                .and_then(|v| v.as_str())
216                .unwrap_or("")
217                .to_string();
218            knowledge_deltas.push(KnowledgeDelta {
219                category: cat.clone(),
220                key: key.clone(),
221                agent: ev.actor.clone().unwrap_or_default(),
222                timestamp: ev.timestamp.to_rfc3339(),
223            });
224
225            if let Some(ref actor) = ev.actor {
226                knowledge_writers
227                    .entry((cat, key))
228                    .or_default()
229                    .insert(actor.clone());
230            }
231        }
232    }
233
234    let conflict_alerts: Vec<ConflictAlert> = knowledge_writers
235        .into_iter()
236        .filter(|(_, writers)| writers.len() > 1)
237        .map(|((cat, key), writers)| ConflictAlert {
238            category: cat,
239            key,
240            agents: writers.into_iter().collect(),
241        })
242        .collect();
243
244    let recent_limit = 10;
245    let decisions: Vec<_> = if decisions.len() > recent_limit {
246        decisions[decisions.len() - recent_limit..].to_vec()
247    } else {
248        decisions
249    };
250
251    ContextSummary {
252        workspace_id: ws.to_string(),
253        channel_id: ch.to_string(),
254        total_events: events.len(),
255        latest_version,
256        active_agents: agents.into_iter().collect(),
257        recent_decisions: decisions,
258        knowledge_delta: knowledge_deltas,
259        conflict_alerts,
260        event_counts_by_kind: kind_counts,
261    }
262}