Skip to main content

lean_ctx/http_server/team/
mod.rs

1use std::collections::{BTreeSet, HashMap};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::{AtomicI64, Ordering};
4use std::sync::Arc;
5
6use anyhow::{anyhow, Context, Result};
7use axum::{
8    body::{self, Body},
9    extract::{Extension, Json, Query, State},
10    http::{header, Request, StatusCode},
11    middleware::{self, Next},
12    response::sse::{Event as SseEvent, KeepAlive, Sse},
13    response::{IntoResponse, Response},
14    routing::get,
15    Router,
16};
17use futures::Stream;
18use md5::{Digest, Md5};
19use rmcp::{
20    handler::server::ServerHandler,
21    model::{
22        CallToolRequest, CallToolRequestParams, CallToolResult, ClientJsonRpcMessage,
23        ClientRequest, JsonRpcRequest, NumberOrString, ServerJsonRpcMessage, ServerResult,
24    },
25    service::{serve_directly, RequestContext, RoleServer},
26    transport::{OneshotTransport, StreamableHttpService},
27};
28use serde::{Deserialize, Serialize};
29use serde_json::{json, Map, Value};
30use sha2::Sha256;
31use tokio::io::AsyncWriteExt;
32use tokio::sync::broadcast;
33use tokio::time::Duration;
34
35use crate::tools::LeanCtxServer;
36#[cfg(test)]
37mod tests;
38
39const WORKSPACE_ARG_KEY: &str = "workspaceId";
40const CHANNEL_ARG_KEY: &str = "channelId";
41const WORKSPACE_HEADER: &str = "x-leanctx-workspace";
42
43#[derive(Clone, Debug, Serialize, Deserialize)]
44#[serde(rename_all = "camelCase")]
45pub struct TeamServerConfig {
46    pub host: String,
47    pub port: u16,
48    pub default_workspace_id: String,
49    pub workspaces: Vec<TeamWorkspaceConfig>,
50    #[serde(default)]
51    pub tokens: Vec<TeamTokenConfig>,
52    pub audit_log_path: PathBuf,
53    #[serde(default)]
54    pub disable_host_check: bool,
55    #[serde(default)]
56    pub allowed_hosts: Vec<String>,
57    #[serde(default = "default_max_body_bytes")]
58    pub max_body_bytes: usize,
59    #[serde(default = "default_max_concurrency")]
60    pub max_concurrency: usize,
61    #[serde(default = "default_max_rps")]
62    pub max_rps: u32,
63    #[serde(default = "default_rate_burst")]
64    pub rate_burst: u32,
65    #[serde(default = "default_request_timeout_ms")]
66    pub request_timeout_ms: u64,
67    #[serde(default)]
68    pub stateful_mode: bool,
69    #[serde(default = "default_true")]
70    pub json_response: bool,
71}
72
73fn default_true() -> bool {
74    true
75}
76fn default_max_body_bytes() -> usize {
77    2 * 1024 * 1024
78}
79fn default_max_concurrency() -> usize {
80    32
81}
82fn default_max_rps() -> u32 {
83    50
84}
85fn default_rate_burst() -> u32 {
86    100
87}
88fn default_request_timeout_ms() -> u64 {
89    30_000
90}
91
92#[derive(Clone, Debug, Serialize, Deserialize)]
93#[serde(rename_all = "camelCase")]
94pub struct TeamWorkspaceConfig {
95    pub id: String,
96    pub label: Option<String>,
97    pub root: PathBuf,
98}
99
100#[derive(Clone, Debug, Serialize, Deserialize)]
101#[serde(rename_all = "camelCase")]
102pub struct TeamTokenConfig {
103    pub id: String,
104    /// Stored as lowercase hex of SHA-256(token).
105    pub sha256_hex: String,
106    pub scopes: Vec<TeamScope>,
107}
108
109#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
110#[serde(rename_all = "lowercase")]
111pub enum TeamScope {
112    Search,
113    Graph,
114    Artifacts,
115    Index,
116    Events,
117    SessionMutations,
118    Knowledge,
119    Audit,
120}
121
122impl TeamServerConfig {
123    pub fn load(path: &Path) -> Result<Self> {
124        let s =
125            std::fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
126        let cfg: Self =
127            serde_json::from_str(&s).with_context(|| format!("parse {}", path.display()))?;
128        cfg.validate()?;
129        Ok(cfg)
130    }
131
132    pub fn save(&self, path: &Path) -> Result<()> {
133        let s = serde_json::to_string_pretty(self).context("serialize TeamServerConfig")?;
134        std::fs::write(path, format!("{s}\n")).with_context(|| format!("write {}", path.display()))
135    }
136
137    pub fn validate(&self) -> Result<()> {
138        if self.workspaces.is_empty() {
139            return Err(anyhow!("team server requires at least 1 workspace"));
140        }
141        let mut ws_ids = BTreeSet::new();
142        for ws in &self.workspaces {
143            let id = ws.id.trim();
144            if id.is_empty() {
145                return Err(anyhow!("workspace id must be non-empty"));
146            }
147            if !ws_ids.insert(id.to_string()) {
148                return Err(anyhow!("duplicate workspace id: {id}"));
149            }
150            if !ws.root.exists() {
151                return Err(anyhow!(
152                    "workspace root does not exist: {}",
153                    ws.root.display()
154                ));
155            }
156        }
157        if !ws_ids.contains(self.default_workspace_id.trim()) {
158            return Err(anyhow!(
159                "defaultWorkspaceId '{}' not found in workspaces",
160                self.default_workspace_id
161            ));
162        }
163
164        let mut token_ids = BTreeSet::new();
165        for t in &self.tokens {
166            let id = t.id.trim();
167            if id.is_empty() {
168                return Err(anyhow!("token id must be non-empty"));
169            }
170            if !token_ids.insert(id.to_string()) {
171                return Err(anyhow!("duplicate token id: {id}"));
172            }
173            if t.scopes.is_empty() {
174                return Err(anyhow!("token '{id}' must have at least 1 scope"));
175            }
176            parse_sha256_hex(&t.sha256_hex)
177                .with_context(|| format!("token '{id}' invalid sha256Hex"))?;
178        }
179
180        if let Some(parent) = self.audit_log_path.parent() {
181            if !parent.as_os_str().is_empty() && !parent.exists() {
182                return Err(anyhow!(
183                    "auditLogPath parent does not exist: {}",
184                    parent.display()
185                ));
186            }
187        }
188        Ok(())
189    }
190
191    pub fn validate_for_serve(&self) -> Result<()> {
192        self.validate()?;
193        if self.tokens.is_empty() {
194            return Err(anyhow!("team server requires at least 1 token"));
195        }
196        Ok(())
197    }
198}
199
200#[derive(Clone)]
201struct TeamAuthContext {
202    token_id: String,
203    scopes: BTreeSet<TeamScope>,
204}
205
206#[derive(Clone)]
207pub struct TeamRequestContext {
208    pub workspace_id: String,
209}
210
211#[derive(Clone)]
212pub struct TeamState {
213    auth: Arc<Vec<TeamTokenConfig>>,
214    engine: Arc<TeamContextEngine>,
215    audit: Arc<tokio::sync::Mutex<tokio::fs::File>>,
216    pub savings_store_dir: Arc<tokio::sync::Mutex<std::path::PathBuf>>,
217}
218
219#[derive(Clone)]
220pub struct TeamAppState {
221    concurrency: Arc<tokio::sync::Semaphore>,
222    rate: Arc<super::RateLimiter>,
223    timeout: Duration,
224    pub team: Arc<TeamState>,
225    max_body_bytes: usize,
226}
227
228#[derive(Debug, Deserialize)]
229#[serde(rename_all = "camelCase")]
230struct ToolCallBody {
231    name: String,
232    #[serde(default)]
233    arguments: Option<Value>,
234    #[serde(default)]
235    workspace_id: Option<String>,
236    #[serde(default)]
237    channel_id: Option<String>,
238}
239
240#[derive(Debug, Deserialize)]
241#[serde(rename_all = "camelCase")]
242struct ToolsQuery {
243    #[serde(default)]
244    offset: Option<usize>,
245    #[serde(default)]
246    limit: Option<usize>,
247}
248
249#[derive(Debug, Deserialize)]
250#[serde(rename_all = "camelCase")]
251struct EventsQuery {
252    #[serde(default)]
253    since: Option<i64>,
254    #[serde(default)]
255    limit: Option<usize>,
256    #[serde(default)]
257    channel_id: Option<String>,
258}
259
260#[derive(Clone)]
261struct TeamCtxServer {
262    default_workspace_id: String,
263    roots: Arc<HashMap<String, String>>,
264}
265
266impl TeamCtxServer {
267    fn default_root(&self) -> &str {
268        self.roots
269            .get(&self.default_workspace_id)
270            .expect("default workspace root")
271    }
272
273    fn rewrite_dot_paths(args: &mut Map<String, Value>, root: &str) {
274        for k in ["path", "target_directory", "targetDirectory"] {
275            let Some(Value::String(s)) = args.get(k) else {
276                continue;
277            };
278            let t = s.trim();
279            if t.is_empty() || t == "." {
280                args.insert(k.to_string(), Value::String(root.to_string()));
281            }
282        }
283    }
284
285    fn pick_workspace(
286        &self,
287        args: &mut Map<String, Value>,
288    ) -> std::result::Result<(String, String), rmcp::ErrorData> {
289        let ws = args
290            .get(WORKSPACE_ARG_KEY)
291            .and_then(|v| v.as_str())
292            .unwrap_or(self.default_workspace_id.as_str())
293            .to_string();
294        args.remove(WORKSPACE_ARG_KEY);
295
296        let root = self
297            .roots
298            .get(&ws)
299            .cloned()
300            .ok_or_else(|| rmcp::ErrorData::invalid_params("unknown workspaceId", None))?;
301        Self::rewrite_dot_paths(args, &root);
302        Ok((ws, root))
303    }
304
305    fn make_server(&self, workspace_id: &str, channel_id: &str) -> LeanCtxServer {
306        let root = self
307            .roots
308            .get(workspace_id)
309            .cloned()
310            .unwrap_or_else(|| self.default_root().to_string());
311        LeanCtxServer::new_shared_with_context(&root, workspace_id, channel_id)
312    }
313}
314
315impl ServerHandler for TeamCtxServer {
316    fn get_info(&self) -> rmcp::model::ServerInfo {
317        let s = self.make_server(&self.default_workspace_id, "default");
318        <LeanCtxServer as ServerHandler>::get_info(&s)
319    }
320
321    async fn initialize(
322        &self,
323        request: rmcp::model::InitializeRequestParams,
324        context: RequestContext<RoleServer>,
325    ) -> std::result::Result<rmcp::model::InitializeResult, rmcp::ErrorData> {
326        let s = self.make_server(&self.default_workspace_id, "default");
327        <LeanCtxServer as ServerHandler>::initialize(&s, request, context).await
328    }
329
330    async fn list_tools(
331        &self,
332        request: Option<rmcp::model::PaginatedRequestParams>,
333        context: RequestContext<RoleServer>,
334    ) -> std::result::Result<rmcp::model::ListToolsResult, rmcp::ErrorData> {
335        let s = self.make_server(&self.default_workspace_id, "default");
336        <LeanCtxServer as ServerHandler>::list_tools(&s, request, context).await
337    }
338
339    async fn call_tool(
340        &self,
341        mut request: CallToolRequestParams,
342        context: RequestContext<RoleServer>,
343    ) -> std::result::Result<CallToolResult, rmcp::ErrorData> {
344        let mut args = request.arguments.take().unwrap_or_default();
345        let (ws, root) = self.pick_workspace(&mut args)?;
346        let channel = args
347            .get(CHANNEL_ARG_KEY)
348            .and_then(|v| v.as_str())
349            .unwrap_or("default")
350            .to_string();
351        args.remove(CHANNEL_ARG_KEY);
352        // Re-apply dot path rewriting against the resolved root.
353        Self::rewrite_dot_paths(&mut args, &root);
354        request.arguments = Some(args);
355        let s = LeanCtxServer::new_shared_with_context(&root, &ws, &channel);
356        <LeanCtxServer as ServerHandler>::call_tool(&s, request, context).await
357    }
358}
359
360struct TeamContextEngine {
361    server: TeamCtxServer,
362    next_id: AtomicI64,
363}
364
365impl TeamContextEngine {
366    fn new(server: TeamCtxServer) -> Self {
367        Self {
368            server,
369            next_id: AtomicI64::new(1),
370        }
371    }
372
373    fn manifest_value() -> Value {
374        crate::core::mcp_manifest::manifest_value()
375    }
376
377    async fn call_tool_value(&self, name: &str, arguments: Option<Value>) -> Result<Value> {
378        let result = self.call_tool_result(name, arguments).await?;
379        serde_json::to_value(result).map_err(|e| anyhow!("serialize CallToolResult: {e}"))
380    }
381
382    async fn call_tool_result(
383        &self,
384        name: &str,
385        arguments: Option<Value>,
386    ) -> Result<CallToolResult> {
387        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
388        let req_id = NumberOrString::Number(id);
389
390        let args_obj: Map<String, Value> = match arguments {
391            None => Map::new(),
392            Some(Value::Object(m)) => m,
393            Some(other) => {
394                return Err(anyhow!(
395                    "tool arguments must be a JSON object (got {other})"
396                ))
397            }
398        };
399
400        let params = CallToolRequestParams::new(name.to_string()).with_arguments(args_obj);
401        let call: CallToolRequest = CallToolRequest::new(params);
402        let client_req = ClientRequest::CallToolRequest(call);
403        let msg = ClientJsonRpcMessage::Request(JsonRpcRequest::new(req_id, client_req));
404
405        let (transport, mut rx) = OneshotTransport::<RoleServer>::new(msg);
406        let service = serve_directly(self.server.clone(), transport, None);
407        tokio::spawn(async move {
408            let _ = service.waiting().await;
409        });
410
411        let Some(server_msg) = rx.recv().await else {
412            return Err(anyhow!("no response from tool call"));
413        };
414
415        match server_msg {
416            ServerJsonRpcMessage::Response(r) => match r.result {
417                ServerResult::CallToolResult(result) => Ok(result),
418                other => Err(anyhow!("unexpected server result: {other:?}")),
419            },
420            ServerJsonRpcMessage::Error(e) => Err(anyhow!("{e:?}")).context("tool call error"),
421            ServerJsonRpcMessage::Notification(_) => Err(anyhow!("unexpected notification")),
422            ServerJsonRpcMessage::Request(_) => Err(anyhow!("unexpected request")),
423        }
424    }
425}
426
427fn sha256_hex(bytes: &[u8]) -> String {
428    let mut h = Sha256::new();
429    h.update(bytes);
430    let digest = h.finalize();
431    hex_lower(&digest)
432}
433
434fn hex_lower(bytes: &[u8]) -> String {
435    const HEX: &[u8; 16] = b"0123456789abcdef";
436    let mut out = Vec::with_capacity(bytes.len() * 2);
437    for &b in bytes {
438        out.push(HEX[(b >> 4) as usize]);
439        out.push(HEX[(b & 0x0f) as usize]);
440    }
441    String::from_utf8(out).unwrap_or_default()
442}
443
444fn parse_sha256_hex(s: &str) -> Result<Vec<u8>> {
445    let s = s.trim();
446    if s.len() != 64 {
447        return Err(anyhow!("sha256 hex must be 64 chars"));
448    }
449    let mut out = Vec::with_capacity(32);
450    let bytes = s.as_bytes();
451    let to_nibble = |c: u8| -> Option<u8> {
452        match c {
453            b'0'..=b'9' => Some(c - b'0'),
454            b'a'..=b'f' => Some(c - b'a' + 10),
455            b'A'..=b'F' => Some(c - b'A' + 10),
456            _ => None,
457        }
458    };
459    for i in (0..64).step_by(2) {
460        let hi = to_nibble(bytes[i]).ok_or_else(|| anyhow!("invalid hex"))?;
461        let lo = to_nibble(bytes[i + 1]).ok_or_else(|| anyhow!("invalid hex"))?;
462        out.push((hi << 4) | lo);
463    }
464    Ok(out)
465}
466
467fn required_scopes(tool_name: &str, args: Option<&Value>) -> Option<BTreeSet<TeamScope>> {
468    if matches!(tool_name, "ctx_shell" | "ctx_execute" | "ctx_edit") {
469        return None;
470    }
471
472    if tool_name == "ctx" {
473        let Value::Object(m) = args? else {
474            return None;
475        };
476        let sub = m.get("tool")?.as_str()?.trim();
477        if sub.is_empty() {
478            return None;
479        }
480        let canonical = if sub.starts_with("ctx_") {
481            sub.to_string()
482        } else {
483            format!("ctx_{sub}")
484        };
485        let mut m2 = m.clone();
486        m2.remove("tool");
487        return required_scopes(&canonical, Some(&Value::Object(m2)));
488    }
489
490    let mut s = BTreeSet::new();
491    match tool_name {
492        // Search scope (read/discovery/analysis)
493        "ctx_read" | "ctx_multi_read" | "ctx_smart_read" | "ctx_search" | "ctx_tree"
494        | "ctx_outline" | "ctx_expand" | "ctx_delta" | "ctx_dedup" | "ctx_prefetch"
495        | "ctx_preload" | "ctx_review" | "ctx_response" | "ctx_task" | "ctx_overview"
496        | "ctx_architecture" | "ctx_benchmark" | "ctx_cost" | "ctx_intent" | "ctx_heatmap"
497        | "ctx_gain" | "ctx_analyze" | "ctx_discover_tools" | "ctx_discover" | "ctx_symbol"
498        | "ctx_index" | "ctx_metrics" | "ctx_cache" | "ctx_agent" => {
499            s.insert(TeamScope::Search);
500            Some(s)
501        }
502        // Pack needs search + graph (it includes impact/graph-derived context)
503        "ctx_pack" => {
504            s.insert(TeamScope::Search);
505            s.insert(TeamScope::Graph);
506            Some(s)
507        }
508        // Graph scope
509        "ctx_graph" | "ctx_impact" | "ctx_callgraph" | "ctx_routes" => {
510            s.insert(TeamScope::Graph);
511
512            if tool_name == "ctx_graph" {
513                let action = args
514                    .and_then(|v| v.get("action"))
515                    .and_then(|v| v.as_str())
516                    .unwrap_or("");
517                if matches!(
518                    action,
519                    "index-build"
520                        | "index-build-full"
521                        | "index-build-background"
522                        | "index-build-full-background"
523                ) {
524                    s.insert(TeamScope::Index);
525                }
526            }
527
528            Some(s)
529        }
530        "ctx_semantic_search" => {
531            s.insert(TeamScope::Search);
532            if args
533                .and_then(|v| v.get("artifacts"))
534                .and_then(Value::as_bool)
535                .unwrap_or(false)
536            {
537                s.insert(TeamScope::Artifacts);
538            }
539            if args
540                .and_then(|v| v.get("action"))
541                .and_then(|v| v.as_str())
542                .is_some_and(|v| v.eq_ignore_ascii_case("reindex"))
543            {
544                s.insert(TeamScope::Index);
545            }
546            Some(s)
547        }
548        // Session-mutating tools
549        "ctx_session" | "ctx_handoff" | "ctx_workflow" | "ctx_compress" | "ctx_share" => {
550            s.insert(TeamScope::SessionMutations);
551            Some(s)
552        }
553        // Knowledge tools
554        "ctx_knowledge" | "ctx_knowledge_relations" => {
555            s.insert(TeamScope::Knowledge);
556            Some(s)
557        }
558        // Artifact + proof tools
559        "ctx_artifacts" | "ctx_proof" | "ctx_verify" => {
560            s.insert(TeamScope::Artifacts);
561            Some(s)
562        }
563        _ => None,
564    }
565}
566
567async fn team_rate_limit_middleware(
568    State(state): State<TeamAppState>,
569    req: Request<Body>,
570    next: Next,
571) -> Response {
572    if req.uri().path() == "/health" {
573        return next.run(req).await;
574    }
575    if !state.rate.allow().await {
576        return StatusCode::TOO_MANY_REQUESTS.into_response();
577    }
578    next.run(req).await
579}
580
581async fn team_concurrency_middleware(
582    State(state): State<TeamAppState>,
583    req: Request<Body>,
584    next: Next,
585) -> Response {
586    if req.uri().path() == "/health" {
587        return next.run(req).await;
588    }
589    let Ok(permit) = state.concurrency.clone().try_acquire_owned() else {
590        return StatusCode::TOO_MANY_REQUESTS.into_response();
591    };
592    let resp = next.run(req).await;
593    drop(permit);
594    resp
595}
596
597async fn team_auth_middleware(
598    State(state): State<TeamAppState>,
599    mut req: Request<Body>,
600    next: Next,
601) -> Response {
602    if req.uri().path() == "/health" {
603        return next.run(req).await;
604    }
605
606    let Some(h) = req.headers().get(header::AUTHORIZATION) else {
607        return super::json_error(
608            StatusCode::UNAUTHORIZED,
609            "unauthorized",
610            "missing Authorization header",
611        );
612    };
613    let Ok(s) = h.to_str() else {
614        return super::json_error(
615            StatusCode::UNAUTHORIZED,
616            "unauthorized",
617            "malformed Authorization header",
618        );
619    };
620    let Some(token) = s
621        .strip_prefix("Bearer ")
622        .or_else(|| s.strip_prefix("bearer "))
623    else {
624        return super::json_error(
625            StatusCode::UNAUTHORIZED,
626            "unauthorized",
627            "Authorization must use the Bearer scheme",
628        );
629    };
630
631    let token_hash = sha256_hex(token.as_bytes());
632    let mut matched: Option<TeamTokenConfig> = None;
633    for t in state.team.auth.iter() {
634        if super::constant_time_eq(token_hash.as_bytes(), t.sha256_hex.as_bytes()) {
635            matched = Some(t.clone());
636            break;
637        }
638    }
639    let Some(tok) = matched else {
640        return super::json_error(
641            StatusCode::UNAUTHORIZED,
642            "unauthorized",
643            "invalid bearer token",
644        );
645    };
646    let tok_scopes: BTreeSet<TeamScope> = tok.scopes.iter().copied().collect();
647
648    let workspace_id = req
649        .headers()
650        .get(WORKSPACE_HEADER)
651        .and_then(|h| h.to_str().ok())
652        .map(|s| s.trim().to_string())
653        .filter(|s| !s.is_empty())
654        .unwrap_or_else(|| state.team.engine.server.default_workspace_id.clone());
655    if !state.team.engine.server.roots.contains_key(&workspace_id) {
656        return super::json_error(
657            StatusCode::BAD_REQUEST,
658            "unknown_workspace",
659            "unknown workspace",
660        );
661    }
662    let workspace_id_for_audit = workspace_id.clone();
663
664    req.extensions_mut().insert(TeamAuthContext {
665        token_id: tok.id.clone(),
666        scopes: tok_scopes.clone(),
667    });
668    req.extensions_mut()
669        .insert(TeamRequestContext { workspace_id });
670
671    // Endpoint-level authz (non-tool endpoints).
672    let path0 = req.uri().path();
673    if path0 == "/v1/events" {
674        let allow = tok_scopes.contains(&TeamScope::Events);
675        let _ = audit_write(
676            &state.team.audit,
677            &tok.id,
678            &workspace_id_for_audit,
679            None,
680            Some("events"),
681            allow,
682            if allow { None } else { Some("scope_denied") },
683            None,
684        )
685        .await;
686        if !allow {
687            return super::json_error(
688                StatusCode::FORBIDDEN,
689                "scope_denied",
690                "token lacks required scope: events",
691            );
692        }
693    }
694
695    if path0 == "/v1/metrics" {
696        let allow = tok_scopes.contains(&TeamScope::Audit);
697        let _ = audit_write(
698            &state.team.audit,
699            &tok.id,
700            &workspace_id_for_audit,
701            None,
702            Some("metrics"),
703            allow,
704            if allow { None } else { Some("scope_denied") },
705            None,
706        )
707        .await;
708        if !allow {
709            return super::json_error(
710                StatusCode::FORBIDDEN,
711                "scope_denied",
712                "token lacks required scope: audit",
713            );
714        }
715    }
716
717    // Tool-level authz for MCP fallback (tools/call).
718    let path = req.uri().path().to_string();
719    if path != "/v1/tools/call"
720        && path != "/v1/tools"
721        && path != "/v1/manifest"
722        && path != "/health"
723    {
724        if req.method() != axum::http::Method::POST {
725            return next.run(req).await;
726        }
727
728        let (parts, body0) = req.into_parts();
729        let Ok(bytes) = body::to_bytes(body0, state.max_body_bytes).await else {
730            return super::json_error(
731                StatusCode::BAD_REQUEST,
732                "invalid_request",
733                "could not read request body",
734            );
735        };
736
737        let mut allow = false;
738        let mut denied_reason: Option<String> = None;
739        if let Ok(v) = serde_json::from_slice::<Value>(&bytes) {
740            if v.is_array() {
741                denied_reason = Some("batch_requests_not_supported".to_string());
742                let _ = audit_write(
743                    &state.team.audit,
744                    &tok.id,
745                    &workspace_id_for_audit,
746                    None,
747                    None,
748                    false,
749                    denied_reason.as_deref(),
750                    None,
751                )
752                .await;
753            } else {
754                let method = v.get("method").and_then(|m| m.as_str()).unwrap_or("");
755                if method.eq_ignore_ascii_case("tools/call") {
756                    let tool = v
757                        .pointer("/params/name")
758                        .and_then(|x| x.as_str())
759                        .unwrap_or("");
760                    let args = v.pointer("/params/arguments");
761                    let req_scopes = required_scopes(tool, args);
762                    allow = match req_scopes {
763                        None => false,
764                        Some(reqs) => reqs.is_subset(&tok_scopes),
765                    };
766                    if !allow {
767                        denied_reason = Some("scope_denied".to_string());
768                    }
769                    let _ = audit_write(
770                        &state.team.audit,
771                        &tok.id,
772                        &workspace_id_for_audit,
773                        Some(tool),
774                        Some(method),
775                        allow,
776                        denied_reason.as_deref(),
777                        args,
778                    )
779                    .await;
780                } else {
781                    allow = true;
782                }
783            }
784        }
785
786        if !allow {
787            return super::json_error(
788                StatusCode::FORBIDDEN,
789                "scope_denied",
790                "token lacks required scope for this tool",
791            );
792        }
793
794        req = Request::from_parts(parts, Body::from(bytes));
795    }
796
797    next.run(req).await
798}
799
800async fn audit_write(
801    file: &tokio::sync::Mutex<tokio::fs::File>,
802    token_id: &str,
803    workspace_id: &str,
804    tool: Option<&str>,
805    method: Option<&str>,
806    allowed: bool,
807    denied_reason: Option<&str>,
808    args: Option<&Value>,
809) -> Result<()> {
810    let args_hash = args
811        .map(|a| {
812            let s = a.to_string();
813            let mut hasher = Md5::new();
814            hasher.update(s.as_bytes());
815            format!("{:x}", hasher.finalize())
816        })
817        .unwrap_or_default();
818
819    let ts = chrono::Local::now().to_rfc3339();
820    let rec = json!({
821        "ts": ts,
822        "tokenId": token_id,
823        "workspaceId": workspace_id,
824        "tool": tool,
825        "method": method,
826        "allowed": allowed,
827        "deniedReason": denied_reason,
828        "argumentsMd5": args_hash,
829    });
830
831    let mut guard = file.lock().await;
832    guard.write_all(rec.to_string().as_bytes()).await?;
833    guard.write_all(b"\n").await?;
834    guard.flush().await?;
835    Ok(())
836}
837
838/// Event-level audit entry: records who triggered which Context OS event.
839async fn audit_event(
840    file: &tokio::sync::Mutex<tokio::fs::File>,
841    token_id: &str,
842    workspace_id: &str,
843    channel_id: &str,
844    event_kind: &str,
845    actor: Option<&str>,
846    event_id: i64,
847) -> Result<()> {
848    let ts = chrono::Local::now().to_rfc3339();
849    let rec = json!({
850        "ts": ts,
851        "type": "context_event",
852        "tokenId": token_id,
853        "workspaceId": workspace_id,
854        "channelId": channel_id,
855        "eventKind": event_kind,
856        "actor": actor,
857        "eventId": event_id,
858    });
859
860    let mut guard = file.lock().await;
861    guard.write_all(rec.to_string().as_bytes()).await?;
862    guard.write_all(b"\n").await?;
863    guard.flush().await?;
864    Ok(())
865}
866
867async fn v1_manifest(State(_state): State<TeamAppState>) -> impl IntoResponse {
868    let v = TeamContextEngine::manifest_value();
869    (StatusCode::OK, Json(v))
870}
871
872async fn v1_tools(
873    State(_state): State<TeamAppState>,
874    Query(q): Query<ToolsQuery>,
875) -> impl IntoResponse {
876    let v = TeamContextEngine::manifest_value();
877    let tools = v
878        .get("tools")
879        .and_then(|t| t.get("granular"))
880        .cloned()
881        .unwrap_or(Value::Array(vec![]));
882
883    let all = tools.as_array().cloned().unwrap_or_default();
884    let total = all.len();
885    let offset = q.offset.unwrap_or(0).min(total);
886    let limit = q.limit.unwrap_or(200).min(500);
887    let page = all.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
888
889    (
890        StatusCode::OK,
891        Json(json!({
892            "tools": page,
893            "total": total,
894            "offset": offset,
895            "limit": limit,
896        })),
897    )
898}
899
900async fn v1_tool_call(
901    State(state): State<TeamAppState>,
902    Extension(auth): Extension<TeamAuthContext>,
903    Extension(ctx): Extension<TeamRequestContext>,
904    Json(body): Json<ToolCallBody>,
905) -> impl IntoResponse {
906    let workspace_id = body
907        .workspace_id
908        .clone()
909        .unwrap_or_else(|| ctx.workspace_id.clone());
910    if !state.team.engine.server.roots.contains_key(&workspace_id) {
911        let _ = audit_write(
912            &state.team.audit,
913            &auth.token_id,
914            &workspace_id,
915            Some(&body.name),
916            Some("/v1/tools/call"),
917            false,
918            Some("unknown_workspace"),
919            body.arguments.as_ref(),
920        )
921        .await;
922        return super::json_error(
923            StatusCode::BAD_REQUEST,
924            "unknown_workspace",
925            "unknown workspace",
926        );
927    }
928
929    let mut args = match body.arguments {
930        None => Value::Object(Map::new()),
931        Some(Value::Object(m)) => Value::Object(m),
932        Some(other) => {
933            let _ = audit_write(
934                &state.team.audit,
935                &auth.token_id,
936                &workspace_id,
937                Some(&body.name),
938                Some("/v1/tools/call"),
939                false,
940                Some("invalid_arguments"),
941                Some(&other),
942            )
943            .await;
944            return super::json_error(
945                StatusCode::BAD_REQUEST,
946                "invalid_arguments",
947                &format!("tool arguments must be a JSON object (got {other})"),
948            );
949        }
950    };
951
952    if let Value::Object(ref mut m) = args {
953        m.insert(
954            WORKSPACE_ARG_KEY.to_string(),
955            Value::String(workspace_id.clone()),
956        );
957        if let Some(ch) = body.channel_id.as_deref() {
958            if !ch.trim().is_empty() {
959                m.insert(
960                    CHANNEL_ARG_KEY.to_string(),
961                    Value::String(ch.trim().to_string()),
962                );
963            }
964        }
965    }
966
967    let required = required_scopes(&body.name, Some(&args));
968    let allowed = match required {
969        None => false,
970        Some(reqs) => reqs.is_subset(&auth.scopes),
971    };
972    if !allowed {
973        let _ = audit_write(
974            &state.team.audit,
975            &auth.token_id,
976            &workspace_id,
977            Some(&body.name),
978            Some("/v1/tools/call"),
979            false,
980            Some("scope_denied"),
981            Some(&args),
982        )
983        .await;
984        return super::json_error(
985            StatusCode::FORBIDDEN,
986            "scope_denied",
987            "token lacks required scope for this tool",
988        );
989    }
990
991    let tool_name = body.name.clone();
992    let call = tokio::time::timeout(
993        state.timeout,
994        state
995            .team
996            .engine
997            .call_tool_value(&tool_name, Some(args.clone())),
998    )
999    .await;
1000
1001    match call {
1002        Ok(Ok(v)) => {
1003            let _ = audit_write(
1004                &state.team.audit,
1005                &auth.token_id,
1006                &workspace_id,
1007                Some(&tool_name),
1008                Some("/v1/tools/call"),
1009                true,
1010                None,
1011                Some(&args),
1012            )
1013            .await;
1014            (StatusCode::OK, Json(json!({ "result": v }))).into_response()
1015        }
1016        Ok(Err(e)) => {
1017            let _ = audit_write(
1018                &state.team.audit,
1019                &auth.token_id,
1020                &workspace_id,
1021                Some(&tool_name),
1022                Some("/v1/tools/call"),
1023                true,
1024                Some("tool_error"),
1025                Some(&args),
1026            )
1027            .await;
1028            {
1029                tracing::warn!("team tool call error: {e}");
1030                super::json_error(
1031                    StatusCode::BAD_REQUEST,
1032                    "tool_error",
1033                    "tool execution failed",
1034                )
1035            }
1036        }
1037        Err(_) => {
1038            let _ = audit_write(
1039                &state.team.audit,
1040                &auth.token_id,
1041                &workspace_id,
1042                Some(&tool_name),
1043                Some("/v1/tools/call"),
1044                true,
1045                Some("request_timeout"),
1046                Some(&args),
1047            )
1048            .await;
1049            super::json_error(
1050                StatusCode::GATEWAY_TIMEOUT,
1051                "request_timeout",
1052                "tool call timed out",
1053            )
1054        }
1055    }
1056}
1057
1058async fn v1_events(
1059    State(state): State<TeamAppState>,
1060    Extension(auth): Extension<TeamAuthContext>,
1061    Extension(ctx): Extension<TeamRequestContext>,
1062    Query(q): Query<EventsQuery>,
1063) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
1064    let ws = ctx.workspace_id;
1065    let ch = q.channel_id.unwrap_or_else(|| "default".to_string());
1066    let since = q.since.unwrap_or(0);
1067    let limit = q.limit.unwrap_or(200).min(1000);
1068
1069    let _ = audit_event(
1070        &state.team.audit,
1071        &auth.token_id,
1072        &ws,
1073        &ch,
1074        "sse_subscribe",
1075        None,
1076        since,
1077    )
1078    .await;
1079
1080    let rt = crate::core::context_os::runtime();
1081    let replay = rt.bus.read(&ws, &ch, since, limit);
1082    let rx = if let Some(rx) = rt.bus.subscribe(&ws, &ch) {
1083        rx
1084    } else {
1085        tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
1086        let (_, rx) = tokio::sync::broadcast::channel::<crate::core::context_os::ContextEventV1>(1);
1087        rx
1088    };
1089    rt.metrics.record_sse_connect();
1090    rt.metrics.record_events_replayed(replay.len() as u64);
1091    rt.metrics.record_workspace_active(&ws);
1092
1093    let bus = rt.bus.clone();
1094    let metrics = rt.metrics.clone();
1095    let pending: std::collections::VecDeque<crate::core::context_os::ContextEventV1> =
1096        replay.into();
1097
1098    use crate::core::context_os::{redact_event_payload, RedactionLevel};
1099    let redaction = RedactionLevel::RefsOnly;
1100
1101    let stream = futures::stream::unfold(
1102        (
1103            pending,
1104            rx,
1105            ws.clone(),
1106            ch.clone(),
1107            since,
1108            redaction,
1109            bus,
1110            metrics,
1111        ),
1112        |(mut pending, mut rx, ws, ch, mut last_id, redaction, bus, metrics)| async move {
1113            if let Some(mut ev) = pending.pop_front() {
1114                last_id = ev.id;
1115                redact_event_payload(&mut ev, redaction);
1116                let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
1117                let evt = SseEvent::default()
1118                    .id(ev.id.to_string())
1119                    .event(ev.kind)
1120                    .data(data);
1121                return Some((
1122                    Ok(evt),
1123                    (pending, rx, ws, ch, last_id, redaction, bus, metrics),
1124                ));
1125            }
1126
1127            loop {
1128                match rx.recv().await {
1129                    Ok(mut ev) if ev.id > last_id => {
1130                        last_id = ev.id;
1131                        redact_event_payload(&mut ev, redaction);
1132                        let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
1133                        let evt = SseEvent::default()
1134                            .id(ev.id.to_string())
1135                            .event(ev.kind)
1136                            .data(data);
1137                        return Some((
1138                            Ok(evt),
1139                            (pending, rx, ws, ch, last_id, redaction, bus, metrics),
1140                        ));
1141                    }
1142                    Ok(_) => {}
1143                    Err(broadcast::error::RecvError::Closed) => return None,
1144                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
1145                        let missed = bus.read(&ws, &ch, last_id, skipped as usize);
1146                        metrics.record_events_replayed(missed.len() as u64);
1147                        for ev in missed {
1148                            last_id = last_id.max(ev.id);
1149                            pending.push_back(ev);
1150                        }
1151                    }
1152                }
1153            }
1154        },
1155    );
1156
1157    let metrics_ref = rt.metrics.clone();
1158    let guarded = super::SseDisconnectGuard {
1159        inner: Box::pin(stream),
1160        metrics: metrics_ref,
1161    };
1162
1163    Sse::new(guarded).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
1164}
1165
1166async fn v1_team_metrics(State(_state): State<TeamAppState>) -> impl IntoResponse {
1167    let rt = crate::core::context_os::runtime();
1168    let snap = rt.metrics.snapshot();
1169    (
1170        StatusCode::OK,
1171        Json(serde_json::to_value(snap).unwrap_or_default()),
1172    )
1173}
1174
1175fn streamable_http_config(cfg: &TeamServerConfig) -> rmcp::transport::StreamableHttpServerConfig {
1176    let mut out = rmcp::transport::StreamableHttpServerConfig::default()
1177        .with_stateful_mode(cfg.stateful_mode)
1178        .with_json_response(cfg.json_response);
1179
1180    if cfg.disable_host_check {
1181        out = out.disable_allowed_hosts();
1182        return out;
1183    }
1184    if !cfg.allowed_hosts.is_empty() {
1185        out = out.with_allowed_hosts(cfg.allowed_hosts.clone());
1186        return out;
1187    }
1188    let host = cfg.host.trim();
1189    if host == "127.0.0.1" || host == "localhost" || host == "::1" {
1190        out.allowed_hosts.push(host.to_string());
1191    }
1192    out
1193}
1194
1195pub async fn serve_team(cfg: TeamServerConfig) -> Result<()> {
1196    cfg.validate_for_serve()?;
1197
1198    let addr: std::net::SocketAddr = format!("{}:{}", cfg.host, cfg.port)
1199        .parse()
1200        .context("invalid host/port")?;
1201
1202    let team_server = TeamCtxServer {
1203        default_workspace_id: cfg.default_workspace_id.clone(),
1204        roots: Arc::new(
1205            cfg.workspaces
1206                .iter()
1207                .map(|w| (w.id.clone(), w.root.to_string_lossy().to_string()))
1208                .collect(),
1209        ),
1210    };
1211    let engine = Arc::new(TeamContextEngine::new(team_server.clone()));
1212
1213    let audit_file = tokio::fs::OpenOptions::new()
1214        .create(true)
1215        .append(true)
1216        .open(&cfg.audit_log_path)
1217        .await
1218        .with_context(|| format!("open audit log {}", cfg.audit_log_path.display()))?;
1219
1220    let savings_dir = cfg
1221        .audit_log_path
1222        .parent()
1223        .unwrap_or(std::path::Path::new("."))
1224        .join("savings");
1225    let team = Arc::new(TeamState {
1226        auth: Arc::new(cfg.tokens.clone()),
1227        engine,
1228        audit: Arc::new(tokio::sync::Mutex::new(audit_file)),
1229        savings_store_dir: Arc::new(tokio::sync::Mutex::new(savings_dir)),
1230    });
1231
1232    let state = TeamAppState {
1233        concurrency: Arc::new(tokio::sync::Semaphore::new(cfg.max_concurrency.max(1))),
1234        rate: Arc::new(super::RateLimiter::new(cfg.max_rps, cfg.rate_burst)),
1235        timeout: Duration::from_millis(cfg.request_timeout_ms.max(1)),
1236        team,
1237        max_body_bytes: cfg.max_body_bytes,
1238    };
1239
1240    let service_factory =
1241        move || -> std::result::Result<TeamCtxServer, std::io::Error> { Ok(team_server.clone()) };
1242    let mcp_http = StreamableHttpService::new(
1243        service_factory,
1244        Arc::new(
1245            rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
1246        ),
1247        streamable_http_config(&cfg),
1248    );
1249
1250    let app = Router::new()
1251        .route("/health", get(super::health))
1252        .route("/v1/manifest", get(v1_manifest))
1253        .route("/v1/tools", get(v1_tools))
1254        .route("/v1/tools/call", axum::routing::post(v1_tool_call))
1255        .route("/v1/events", get(v1_events))
1256        .route(
1257            "/v1/context/summary",
1258            get(super::context_views::v1_context_summary),
1259        )
1260        .route(
1261            "/v1/events/search",
1262            get(super::context_views::v1_events_search),
1263        )
1264        .route(
1265            "/v1/events/lineage",
1266            get(super::context_views::v1_event_lineage),
1267        )
1268        .route("/v1/metrics", get(v1_team_metrics))
1269        .route(
1270            "/api/v1/savings/ingest",
1271            axum::routing::post(super::savings_ingest::v1_savings_ingest),
1272        )
1273        .fallback_service(mcp_http)
1274        .layer(axum::extract::DefaultBodyLimit::max(cfg.max_body_bytes))
1275        .layer(middleware::from_fn_with_state(
1276            state.clone(),
1277            team_rate_limit_middleware,
1278        ))
1279        .layer(middleware::from_fn_with_state(
1280            state.clone(),
1281            team_concurrency_middleware,
1282        ))
1283        .layer(middleware::from_fn_with_state(
1284            state.clone(),
1285            team_auth_middleware,
1286        ))
1287        .with_state(state);
1288
1289    let listener = tokio::net::TcpListener::bind(addr)
1290        .await
1291        .with_context(|| format!("bind {addr}"))?;
1292
1293    tracing::info!(
1294        "lean-ctx TEAM server listening on http://{addr} (workspaces={}, audit={})",
1295        cfg.workspaces.len(),
1296        cfg.audit_log_path.display()
1297    );
1298
1299    axum::serve(listener, app)
1300        .with_graceful_shutdown(async move {
1301            let _ = tokio::signal::ctrl_c().await;
1302        })
1303        .await
1304        .context("team http server")?;
1305    Ok(())
1306}
1307
1308pub fn create_token() -> Result<(String, String)> {
1309    let mut bytes = [0u8; 32];
1310    getrandom::fill(&mut bytes).map_err(|e| anyhow!("getrandom: {e}"))?;
1311    let token = hex_lower(&bytes);
1312    let hash = sha256_hex(token.as_bytes());
1313    Ok((token, hash))
1314}