1use std::collections::{BTreeSet, HashMap};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::{AtomicI64, Ordering};
4use std::sync::Arc;
5
6use anyhow::{anyhow, Context, Result};
7use axum::{
8 body::{self, Body},
9 extract::{Extension, Json, Query, State},
10 http::{header, Request, StatusCode},
11 middleware::{self, Next},
12 response::sse::{Event as SseEvent, KeepAlive, Sse},
13 response::{IntoResponse, Response},
14 routing::get,
15 Router,
16};
17use futures::Stream;
18use md5::{Digest, Md5};
19use rmcp::{
20 handler::server::ServerHandler,
21 model::{
22 CallToolRequest, CallToolRequestParams, CallToolResult, ClientJsonRpcMessage,
23 ClientRequest, JsonRpcRequest, NumberOrString, ServerJsonRpcMessage, ServerResult,
24 },
25 service::{serve_directly, RequestContext, RoleServer},
26 transport::{OneshotTransport, StreamableHttpService},
27};
28use serde::{Deserialize, Serialize};
29use serde_json::{json, Map, Value};
30use sha2::Sha256;
31use tokio::io::AsyncWriteExt;
32use tokio::sync::broadcast;
33use tokio::time::Duration;
34
35use crate::tools::LeanCtxServer;
36#[cfg(test)]
37mod tests;
38
39const WORKSPACE_ARG_KEY: &str = "workspaceId";
40const CHANNEL_ARG_KEY: &str = "channelId";
41const WORKSPACE_HEADER: &str = "x-leanctx-workspace";
42
43#[derive(Clone, Debug, Serialize, Deserialize)]
44#[serde(rename_all = "camelCase")]
45pub struct TeamServerConfig {
46 pub host: String,
47 pub port: u16,
48 pub default_workspace_id: String,
49 pub workspaces: Vec<TeamWorkspaceConfig>,
50 #[serde(default)]
51 pub tokens: Vec<TeamTokenConfig>,
52 pub audit_log_path: PathBuf,
53 #[serde(default)]
54 pub disable_host_check: bool,
55 #[serde(default)]
56 pub allowed_hosts: Vec<String>,
57 #[serde(default = "default_max_body_bytes")]
58 pub max_body_bytes: usize,
59 #[serde(default = "default_max_concurrency")]
60 pub max_concurrency: usize,
61 #[serde(default = "default_max_rps")]
62 pub max_rps: u32,
63 #[serde(default = "default_rate_burst")]
64 pub rate_burst: u32,
65 #[serde(default = "default_request_timeout_ms")]
66 pub request_timeout_ms: u64,
67 #[serde(default)]
68 pub stateful_mode: bool,
69 #[serde(default = "default_true")]
70 pub json_response: bool,
71}
72
73fn default_true() -> bool {
74 true
75}
76fn default_max_body_bytes() -> usize {
77 2 * 1024 * 1024
78}
79fn default_max_concurrency() -> usize {
80 32
81}
82fn default_max_rps() -> u32 {
83 50
84}
85fn default_rate_burst() -> u32 {
86 100
87}
88fn default_request_timeout_ms() -> u64 {
89 30_000
90}
91
92#[derive(Clone, Debug, Serialize, Deserialize)]
93#[serde(rename_all = "camelCase")]
94pub struct TeamWorkspaceConfig {
95 pub id: String,
96 pub label: Option<String>,
97 pub root: PathBuf,
98}
99
100#[derive(Clone, Debug, Serialize, Deserialize)]
101#[serde(rename_all = "camelCase")]
102pub struct TeamTokenConfig {
103 pub id: String,
104 pub sha256_hex: String,
106 pub scopes: Vec<TeamScope>,
107}
108
109#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
110#[serde(rename_all = "lowercase")]
111pub enum TeamScope {
112 Search,
113 Graph,
114 Artifacts,
115 Index,
116 Events,
117 SessionMutations,
118 Knowledge,
119 Audit,
120}
121
122impl TeamServerConfig {
123 pub fn load(path: &Path) -> Result<Self> {
124 let s =
125 std::fs::read_to_string(path).with_context(|| format!("read {}", path.display()))?;
126 let cfg: Self =
127 serde_json::from_str(&s).with_context(|| format!("parse {}", path.display()))?;
128 cfg.validate()?;
129 Ok(cfg)
130 }
131
132 pub fn save(&self, path: &Path) -> Result<()> {
133 let s = serde_json::to_string_pretty(self).context("serialize TeamServerConfig")?;
134 std::fs::write(path, format!("{s}\n")).with_context(|| format!("write {}", path.display()))
135 }
136
137 pub fn validate(&self) -> Result<()> {
138 if self.workspaces.is_empty() {
139 return Err(anyhow!("team server requires at least 1 workspace"));
140 }
141 let mut ws_ids = BTreeSet::new();
142 for ws in &self.workspaces {
143 let id = ws.id.trim();
144 if id.is_empty() {
145 return Err(anyhow!("workspace id must be non-empty"));
146 }
147 if !ws_ids.insert(id.to_string()) {
148 return Err(anyhow!("duplicate workspace id: {id}"));
149 }
150 if !ws.root.exists() {
151 return Err(anyhow!(
152 "workspace root does not exist: {}",
153 ws.root.display()
154 ));
155 }
156 }
157 if !ws_ids.contains(self.default_workspace_id.trim()) {
158 return Err(anyhow!(
159 "defaultWorkspaceId '{}' not found in workspaces",
160 self.default_workspace_id
161 ));
162 }
163
164 let mut token_ids = BTreeSet::new();
165 for t in &self.tokens {
166 let id = t.id.trim();
167 if id.is_empty() {
168 return Err(anyhow!("token id must be non-empty"));
169 }
170 if !token_ids.insert(id.to_string()) {
171 return Err(anyhow!("duplicate token id: {id}"));
172 }
173 if t.scopes.is_empty() {
174 return Err(anyhow!("token '{id}' must have at least 1 scope"));
175 }
176 parse_sha256_hex(&t.sha256_hex)
177 .with_context(|| format!("token '{id}' invalid sha256Hex"))?;
178 }
179
180 if let Some(parent) = self.audit_log_path.parent() {
181 if !parent.as_os_str().is_empty() && !parent.exists() {
182 return Err(anyhow!(
183 "auditLogPath parent does not exist: {}",
184 parent.display()
185 ));
186 }
187 }
188 Ok(())
189 }
190
191 pub fn validate_for_serve(&self) -> Result<()> {
192 self.validate()?;
193 if self.tokens.is_empty() {
194 return Err(anyhow!("team server requires at least 1 token"));
195 }
196 Ok(())
197 }
198}
199
200#[derive(Clone)]
201struct TeamAuthContext {
202 token_id: String,
203 scopes: BTreeSet<TeamScope>,
204}
205
206#[derive(Clone)]
207pub struct TeamRequestContext {
208 pub workspace_id: String,
209}
210
211#[derive(Clone)]
212pub struct TeamState {
213 auth: Arc<Vec<TeamTokenConfig>>,
214 engine: Arc<TeamContextEngine>,
215 audit: Arc<tokio::sync::Mutex<tokio::fs::File>>,
216 pub savings_store_dir: Arc<tokio::sync::Mutex<std::path::PathBuf>>,
217}
218
219#[derive(Clone)]
220pub struct TeamAppState {
221 concurrency: Arc<tokio::sync::Semaphore>,
222 rate: Arc<super::RateLimiter>,
223 timeout: Duration,
224 pub team: Arc<TeamState>,
225 max_body_bytes: usize,
226}
227
228#[derive(Debug, Deserialize)]
229#[serde(rename_all = "camelCase")]
230struct ToolCallBody {
231 name: String,
232 #[serde(default)]
233 arguments: Option<Value>,
234 #[serde(default)]
235 workspace_id: Option<String>,
236 #[serde(default)]
237 channel_id: Option<String>,
238}
239
240#[derive(Debug, Deserialize)]
241#[serde(rename_all = "camelCase")]
242struct ToolsQuery {
243 #[serde(default)]
244 offset: Option<usize>,
245 #[serde(default)]
246 limit: Option<usize>,
247}
248
249#[derive(Debug, Deserialize)]
250#[serde(rename_all = "camelCase")]
251struct EventsQuery {
252 #[serde(default)]
253 since: Option<i64>,
254 #[serde(default)]
255 limit: Option<usize>,
256 #[serde(default)]
257 channel_id: Option<String>,
258}
259
260#[derive(Clone)]
261struct TeamCtxServer {
262 default_workspace_id: String,
263 roots: Arc<HashMap<String, String>>,
264}
265
266impl TeamCtxServer {
267 fn default_root(&self) -> &str {
268 self.roots
269 .get(&self.default_workspace_id)
270 .expect("default workspace root")
271 }
272
273 fn rewrite_dot_paths(args: &mut Map<String, Value>, root: &str) {
274 for k in ["path", "target_directory", "targetDirectory"] {
275 let Some(Value::String(s)) = args.get(k) else {
276 continue;
277 };
278 let t = s.trim();
279 if t.is_empty() || t == "." {
280 args.insert(k.to_string(), Value::String(root.to_string()));
281 }
282 }
283 }
284
285 fn pick_workspace(
286 &self,
287 args: &mut Map<String, Value>,
288 ) -> std::result::Result<(String, String), rmcp::ErrorData> {
289 let ws = args
290 .get(WORKSPACE_ARG_KEY)
291 .and_then(|v| v.as_str())
292 .unwrap_or(self.default_workspace_id.as_str())
293 .to_string();
294 args.remove(WORKSPACE_ARG_KEY);
295
296 let root = self
297 .roots
298 .get(&ws)
299 .cloned()
300 .ok_or_else(|| rmcp::ErrorData::invalid_params("unknown workspaceId", None))?;
301 Self::rewrite_dot_paths(args, &root);
302 Ok((ws, root))
303 }
304
305 fn make_server(&self, workspace_id: &str, channel_id: &str) -> LeanCtxServer {
306 let root = self
307 .roots
308 .get(workspace_id)
309 .cloned()
310 .unwrap_or_else(|| self.default_root().to_string());
311 LeanCtxServer::new_shared_with_context(&root, workspace_id, channel_id)
312 }
313}
314
315impl ServerHandler for TeamCtxServer {
316 fn get_info(&self) -> rmcp::model::ServerInfo {
317 let s = self.make_server(&self.default_workspace_id, "default");
318 <LeanCtxServer as ServerHandler>::get_info(&s)
319 }
320
321 async fn initialize(
322 &self,
323 request: rmcp::model::InitializeRequestParams,
324 context: RequestContext<RoleServer>,
325 ) -> std::result::Result<rmcp::model::InitializeResult, rmcp::ErrorData> {
326 let s = self.make_server(&self.default_workspace_id, "default");
327 <LeanCtxServer as ServerHandler>::initialize(&s, request, context).await
328 }
329
330 async fn list_tools(
331 &self,
332 request: Option<rmcp::model::PaginatedRequestParams>,
333 context: RequestContext<RoleServer>,
334 ) -> std::result::Result<rmcp::model::ListToolsResult, rmcp::ErrorData> {
335 let s = self.make_server(&self.default_workspace_id, "default");
336 <LeanCtxServer as ServerHandler>::list_tools(&s, request, context).await
337 }
338
339 async fn call_tool(
340 &self,
341 mut request: CallToolRequestParams,
342 context: RequestContext<RoleServer>,
343 ) -> std::result::Result<CallToolResult, rmcp::ErrorData> {
344 let mut args = request.arguments.take().unwrap_or_default();
345 let (ws, root) = self.pick_workspace(&mut args)?;
346 let channel = args
347 .get(CHANNEL_ARG_KEY)
348 .and_then(|v| v.as_str())
349 .unwrap_or("default")
350 .to_string();
351 args.remove(CHANNEL_ARG_KEY);
352 Self::rewrite_dot_paths(&mut args, &root);
354 request.arguments = Some(args);
355 let s = LeanCtxServer::new_shared_with_context(&root, &ws, &channel);
356 <LeanCtxServer as ServerHandler>::call_tool(&s, request, context).await
357 }
358}
359
360struct TeamContextEngine {
361 server: TeamCtxServer,
362 next_id: AtomicI64,
363}
364
365impl TeamContextEngine {
366 fn new(server: TeamCtxServer) -> Self {
367 Self {
368 server,
369 next_id: AtomicI64::new(1),
370 }
371 }
372
373 fn manifest_value() -> Value {
374 crate::core::mcp_manifest::manifest_value()
375 }
376
377 async fn call_tool_value(&self, name: &str, arguments: Option<Value>) -> Result<Value> {
378 let result = self.call_tool_result(name, arguments).await?;
379 serde_json::to_value(result).map_err(|e| anyhow!("serialize CallToolResult: {e}"))
380 }
381
382 async fn call_tool_result(
383 &self,
384 name: &str,
385 arguments: Option<Value>,
386 ) -> Result<CallToolResult> {
387 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
388 let req_id = NumberOrString::Number(id);
389
390 let args_obj: Map<String, Value> = match arguments {
391 None => Map::new(),
392 Some(Value::Object(m)) => m,
393 Some(other) => {
394 return Err(anyhow!(
395 "tool arguments must be a JSON object (got {other})"
396 ))
397 }
398 };
399
400 let params = CallToolRequestParams::new(name.to_string()).with_arguments(args_obj);
401 let call: CallToolRequest = CallToolRequest::new(params);
402 let client_req = ClientRequest::CallToolRequest(call);
403 let msg = ClientJsonRpcMessage::Request(JsonRpcRequest::new(req_id, client_req));
404
405 let (transport, mut rx) = OneshotTransport::<RoleServer>::new(msg);
406 let service = serve_directly(self.server.clone(), transport, None);
407 tokio::spawn(async move {
408 let _ = service.waiting().await;
409 });
410
411 let Some(server_msg) = rx.recv().await else {
412 return Err(anyhow!("no response from tool call"));
413 };
414
415 match server_msg {
416 ServerJsonRpcMessage::Response(r) => match r.result {
417 ServerResult::CallToolResult(result) => Ok(result),
418 other => Err(anyhow!("unexpected server result: {other:?}")),
419 },
420 ServerJsonRpcMessage::Error(e) => Err(anyhow!("{e:?}")).context("tool call error"),
421 ServerJsonRpcMessage::Notification(_) => Err(anyhow!("unexpected notification")),
422 ServerJsonRpcMessage::Request(_) => Err(anyhow!("unexpected request")),
423 }
424 }
425}
426
427fn sha256_hex(bytes: &[u8]) -> String {
428 let mut h = Sha256::new();
429 h.update(bytes);
430 let digest = h.finalize();
431 hex_lower(&digest)
432}
433
434fn hex_lower(bytes: &[u8]) -> String {
435 const HEX: &[u8; 16] = b"0123456789abcdef";
436 let mut out = Vec::with_capacity(bytes.len() * 2);
437 for &b in bytes {
438 out.push(HEX[(b >> 4) as usize]);
439 out.push(HEX[(b & 0x0f) as usize]);
440 }
441 String::from_utf8(out).unwrap_or_default()
442}
443
444fn parse_sha256_hex(s: &str) -> Result<Vec<u8>> {
445 let s = s.trim();
446 if s.len() != 64 {
447 return Err(anyhow!("sha256 hex must be 64 chars"));
448 }
449 let mut out = Vec::with_capacity(32);
450 let bytes = s.as_bytes();
451 let to_nibble = |c: u8| -> Option<u8> {
452 match c {
453 b'0'..=b'9' => Some(c - b'0'),
454 b'a'..=b'f' => Some(c - b'a' + 10),
455 b'A'..=b'F' => Some(c - b'A' + 10),
456 _ => None,
457 }
458 };
459 for i in (0..64).step_by(2) {
460 let hi = to_nibble(bytes[i]).ok_or_else(|| anyhow!("invalid hex"))?;
461 let lo = to_nibble(bytes[i + 1]).ok_or_else(|| anyhow!("invalid hex"))?;
462 out.push((hi << 4) | lo);
463 }
464 Ok(out)
465}
466
467fn required_scopes(tool_name: &str, args: Option<&Value>) -> Option<BTreeSet<TeamScope>> {
468 if matches!(tool_name, "ctx_shell" | "ctx_execute" | "ctx_edit") {
469 return None;
470 }
471
472 if tool_name == "ctx" {
473 let Value::Object(m) = args? else {
474 return None;
475 };
476 let sub = m.get("tool")?.as_str()?.trim();
477 if sub.is_empty() {
478 return None;
479 }
480 let canonical = if sub.starts_with("ctx_") {
481 sub.to_string()
482 } else {
483 format!("ctx_{sub}")
484 };
485 let mut m2 = m.clone();
486 m2.remove("tool");
487 return required_scopes(&canonical, Some(&Value::Object(m2)));
488 }
489
490 let mut s = BTreeSet::new();
491 match tool_name {
492 "ctx_read" | "ctx_multi_read" | "ctx_smart_read" | "ctx_search" | "ctx_tree"
494 | "ctx_outline" | "ctx_expand" | "ctx_delta" | "ctx_dedup" | "ctx_prefetch"
495 | "ctx_preload" | "ctx_review" | "ctx_response" | "ctx_task" | "ctx_overview"
496 | "ctx_architecture" | "ctx_benchmark" | "ctx_cost" | "ctx_intent" | "ctx_heatmap"
497 | "ctx_gain" | "ctx_analyze" | "ctx_discover_tools" | "ctx_discover" | "ctx_symbol"
498 | "ctx_index" | "ctx_metrics" | "ctx_cache" | "ctx_agent" => {
499 s.insert(TeamScope::Search);
500 Some(s)
501 }
502 "ctx_pack" => {
504 s.insert(TeamScope::Search);
505 s.insert(TeamScope::Graph);
506 Some(s)
507 }
508 "ctx_graph" | "ctx_impact" | "ctx_callgraph" | "ctx_routes" => {
510 s.insert(TeamScope::Graph);
511
512 if tool_name == "ctx_graph" {
513 let action = args
514 .and_then(|v| v.get("action"))
515 .and_then(|v| v.as_str())
516 .unwrap_or("");
517 if matches!(
518 action,
519 "index-build"
520 | "index-build-full"
521 | "index-build-background"
522 | "index-build-full-background"
523 ) {
524 s.insert(TeamScope::Index);
525 }
526 }
527
528 Some(s)
529 }
530 "ctx_semantic_search" => {
531 s.insert(TeamScope::Search);
532 if args
533 .and_then(|v| v.get("artifacts"))
534 .and_then(Value::as_bool)
535 .unwrap_or(false)
536 {
537 s.insert(TeamScope::Artifacts);
538 }
539 if args
540 .and_then(|v| v.get("action"))
541 .and_then(|v| v.as_str())
542 .is_some_and(|v| v.eq_ignore_ascii_case("reindex"))
543 {
544 s.insert(TeamScope::Index);
545 }
546 Some(s)
547 }
548 "ctx_session" | "ctx_handoff" | "ctx_workflow" | "ctx_compress" | "ctx_share" => {
550 s.insert(TeamScope::SessionMutations);
551 Some(s)
552 }
553 "ctx_knowledge" | "ctx_knowledge_relations" => {
555 s.insert(TeamScope::Knowledge);
556 Some(s)
557 }
558 "ctx_artifacts" | "ctx_proof" | "ctx_verify" => {
560 s.insert(TeamScope::Artifacts);
561 Some(s)
562 }
563 _ => None,
564 }
565}
566
567async fn team_rate_limit_middleware(
568 State(state): State<TeamAppState>,
569 req: Request<Body>,
570 next: Next,
571) -> Response {
572 if req.uri().path() == "/health" {
573 return next.run(req).await;
574 }
575 if !state.rate.allow().await {
576 return StatusCode::TOO_MANY_REQUESTS.into_response();
577 }
578 next.run(req).await
579}
580
581async fn team_concurrency_middleware(
582 State(state): State<TeamAppState>,
583 req: Request<Body>,
584 next: Next,
585) -> Response {
586 if req.uri().path() == "/health" {
587 return next.run(req).await;
588 }
589 let Ok(permit) = state.concurrency.clone().try_acquire_owned() else {
590 return StatusCode::TOO_MANY_REQUESTS.into_response();
591 };
592 let resp = next.run(req).await;
593 drop(permit);
594 resp
595}
596
597async fn team_auth_middleware(
598 State(state): State<TeamAppState>,
599 mut req: Request<Body>,
600 next: Next,
601) -> Response {
602 if req.uri().path() == "/health" {
603 return next.run(req).await;
604 }
605
606 let Some(h) = req.headers().get(header::AUTHORIZATION) else {
607 return super::json_error(
608 StatusCode::UNAUTHORIZED,
609 "unauthorized",
610 "missing Authorization header",
611 );
612 };
613 let Ok(s) = h.to_str() else {
614 return super::json_error(
615 StatusCode::UNAUTHORIZED,
616 "unauthorized",
617 "malformed Authorization header",
618 );
619 };
620 let Some(token) = s
621 .strip_prefix("Bearer ")
622 .or_else(|| s.strip_prefix("bearer "))
623 else {
624 return super::json_error(
625 StatusCode::UNAUTHORIZED,
626 "unauthorized",
627 "Authorization must use the Bearer scheme",
628 );
629 };
630
631 let token_hash = sha256_hex(token.as_bytes());
632 let mut matched: Option<TeamTokenConfig> = None;
633 for t in state.team.auth.iter() {
634 if super::constant_time_eq(token_hash.as_bytes(), t.sha256_hex.as_bytes()) {
635 matched = Some(t.clone());
636 break;
637 }
638 }
639 let Some(tok) = matched else {
640 return super::json_error(
641 StatusCode::UNAUTHORIZED,
642 "unauthorized",
643 "invalid bearer token",
644 );
645 };
646 let tok_scopes: BTreeSet<TeamScope> = tok.scopes.iter().copied().collect();
647
648 let workspace_id = req
649 .headers()
650 .get(WORKSPACE_HEADER)
651 .and_then(|h| h.to_str().ok())
652 .map(|s| s.trim().to_string())
653 .filter(|s| !s.is_empty())
654 .unwrap_or_else(|| state.team.engine.server.default_workspace_id.clone());
655 if !state.team.engine.server.roots.contains_key(&workspace_id) {
656 return super::json_error(
657 StatusCode::BAD_REQUEST,
658 "unknown_workspace",
659 "unknown workspace",
660 );
661 }
662 let workspace_id_for_audit = workspace_id.clone();
663
664 req.extensions_mut().insert(TeamAuthContext {
665 token_id: tok.id.clone(),
666 scopes: tok_scopes.clone(),
667 });
668 req.extensions_mut()
669 .insert(TeamRequestContext { workspace_id });
670
671 let path0 = req.uri().path();
673 if path0 == "/v1/events" {
674 let allow = tok_scopes.contains(&TeamScope::Events);
675 let _ = audit_write(
676 &state.team.audit,
677 &tok.id,
678 &workspace_id_for_audit,
679 None,
680 Some("events"),
681 allow,
682 if allow { None } else { Some("scope_denied") },
683 None,
684 )
685 .await;
686 if !allow {
687 return super::json_error(
688 StatusCode::FORBIDDEN,
689 "scope_denied",
690 "token lacks required scope: events",
691 );
692 }
693 }
694
695 if path0 == "/v1/metrics" {
696 let allow = tok_scopes.contains(&TeamScope::Audit);
697 let _ = audit_write(
698 &state.team.audit,
699 &tok.id,
700 &workspace_id_for_audit,
701 None,
702 Some("metrics"),
703 allow,
704 if allow { None } else { Some("scope_denied") },
705 None,
706 )
707 .await;
708 if !allow {
709 return super::json_error(
710 StatusCode::FORBIDDEN,
711 "scope_denied",
712 "token lacks required scope: audit",
713 );
714 }
715 }
716
717 let path = req.uri().path().to_string();
719 if path != "/v1/tools/call"
720 && path != "/v1/tools"
721 && path != "/v1/manifest"
722 && path != "/health"
723 {
724 if req.method() != axum::http::Method::POST {
725 return next.run(req).await;
726 }
727
728 let (parts, body0) = req.into_parts();
729 let Ok(bytes) = body::to_bytes(body0, state.max_body_bytes).await else {
730 return super::json_error(
731 StatusCode::BAD_REQUEST,
732 "invalid_request",
733 "could not read request body",
734 );
735 };
736
737 let mut allow = false;
738 let mut denied_reason: Option<String> = None;
739 if let Ok(v) = serde_json::from_slice::<Value>(&bytes) {
740 if v.is_array() {
741 denied_reason = Some("batch_requests_not_supported".to_string());
742 let _ = audit_write(
743 &state.team.audit,
744 &tok.id,
745 &workspace_id_for_audit,
746 None,
747 None,
748 false,
749 denied_reason.as_deref(),
750 None,
751 )
752 .await;
753 } else {
754 let method = v.get("method").and_then(|m| m.as_str()).unwrap_or("");
755 if method.eq_ignore_ascii_case("tools/call") {
756 let tool = v
757 .pointer("/params/name")
758 .and_then(|x| x.as_str())
759 .unwrap_or("");
760 let args = v.pointer("/params/arguments");
761 let req_scopes = required_scopes(tool, args);
762 allow = match req_scopes {
763 None => false,
764 Some(reqs) => reqs.is_subset(&tok_scopes),
765 };
766 if !allow {
767 denied_reason = Some("scope_denied".to_string());
768 }
769 let _ = audit_write(
770 &state.team.audit,
771 &tok.id,
772 &workspace_id_for_audit,
773 Some(tool),
774 Some(method),
775 allow,
776 denied_reason.as_deref(),
777 args,
778 )
779 .await;
780 } else {
781 allow = true;
782 }
783 }
784 }
785
786 if !allow {
787 return super::json_error(
788 StatusCode::FORBIDDEN,
789 "scope_denied",
790 "token lacks required scope for this tool",
791 );
792 }
793
794 req = Request::from_parts(parts, Body::from(bytes));
795 }
796
797 next.run(req).await
798}
799
800async fn audit_write(
801 file: &tokio::sync::Mutex<tokio::fs::File>,
802 token_id: &str,
803 workspace_id: &str,
804 tool: Option<&str>,
805 method: Option<&str>,
806 allowed: bool,
807 denied_reason: Option<&str>,
808 args: Option<&Value>,
809) -> Result<()> {
810 let args_hash = args
811 .map(|a| {
812 let s = a.to_string();
813 let mut hasher = Md5::new();
814 hasher.update(s.as_bytes());
815 format!("{:x}", hasher.finalize())
816 })
817 .unwrap_or_default();
818
819 let ts = chrono::Local::now().to_rfc3339();
820 let rec = json!({
821 "ts": ts,
822 "tokenId": token_id,
823 "workspaceId": workspace_id,
824 "tool": tool,
825 "method": method,
826 "allowed": allowed,
827 "deniedReason": denied_reason,
828 "argumentsMd5": args_hash,
829 });
830
831 let mut guard = file.lock().await;
832 guard.write_all(rec.to_string().as_bytes()).await?;
833 guard.write_all(b"\n").await?;
834 guard.flush().await?;
835 Ok(())
836}
837
838async fn audit_event(
840 file: &tokio::sync::Mutex<tokio::fs::File>,
841 token_id: &str,
842 workspace_id: &str,
843 channel_id: &str,
844 event_kind: &str,
845 actor: Option<&str>,
846 event_id: i64,
847) -> Result<()> {
848 let ts = chrono::Local::now().to_rfc3339();
849 let rec = json!({
850 "ts": ts,
851 "type": "context_event",
852 "tokenId": token_id,
853 "workspaceId": workspace_id,
854 "channelId": channel_id,
855 "eventKind": event_kind,
856 "actor": actor,
857 "eventId": event_id,
858 });
859
860 let mut guard = file.lock().await;
861 guard.write_all(rec.to_string().as_bytes()).await?;
862 guard.write_all(b"\n").await?;
863 guard.flush().await?;
864 Ok(())
865}
866
867async fn v1_manifest(State(_state): State<TeamAppState>) -> impl IntoResponse {
868 let v = TeamContextEngine::manifest_value();
869 (StatusCode::OK, Json(v))
870}
871
872async fn v1_tools(
873 State(_state): State<TeamAppState>,
874 Query(q): Query<ToolsQuery>,
875) -> impl IntoResponse {
876 let v = TeamContextEngine::manifest_value();
877 let tools = v
878 .get("tools")
879 .and_then(|t| t.get("granular"))
880 .cloned()
881 .unwrap_or(Value::Array(vec![]));
882
883 let all = tools.as_array().cloned().unwrap_or_default();
884 let total = all.len();
885 let offset = q.offset.unwrap_or(0).min(total);
886 let limit = q.limit.unwrap_or(200).min(500);
887 let page = all.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
888
889 (
890 StatusCode::OK,
891 Json(json!({
892 "tools": page,
893 "total": total,
894 "offset": offset,
895 "limit": limit,
896 })),
897 )
898}
899
900async fn v1_tool_call(
901 State(state): State<TeamAppState>,
902 Extension(auth): Extension<TeamAuthContext>,
903 Extension(ctx): Extension<TeamRequestContext>,
904 Json(body): Json<ToolCallBody>,
905) -> impl IntoResponse {
906 let workspace_id = body
907 .workspace_id
908 .clone()
909 .unwrap_or_else(|| ctx.workspace_id.clone());
910 if !state.team.engine.server.roots.contains_key(&workspace_id) {
911 let _ = audit_write(
912 &state.team.audit,
913 &auth.token_id,
914 &workspace_id,
915 Some(&body.name),
916 Some("/v1/tools/call"),
917 false,
918 Some("unknown_workspace"),
919 body.arguments.as_ref(),
920 )
921 .await;
922 return super::json_error(
923 StatusCode::BAD_REQUEST,
924 "unknown_workspace",
925 "unknown workspace",
926 );
927 }
928
929 let mut args = match body.arguments {
930 None => Value::Object(Map::new()),
931 Some(Value::Object(m)) => Value::Object(m),
932 Some(other) => {
933 let _ = audit_write(
934 &state.team.audit,
935 &auth.token_id,
936 &workspace_id,
937 Some(&body.name),
938 Some("/v1/tools/call"),
939 false,
940 Some("invalid_arguments"),
941 Some(&other),
942 )
943 .await;
944 return super::json_error(
945 StatusCode::BAD_REQUEST,
946 "invalid_arguments",
947 &format!("tool arguments must be a JSON object (got {other})"),
948 );
949 }
950 };
951
952 if let Value::Object(ref mut m) = args {
953 m.insert(
954 WORKSPACE_ARG_KEY.to_string(),
955 Value::String(workspace_id.clone()),
956 );
957 if let Some(ch) = body.channel_id.as_deref() {
958 if !ch.trim().is_empty() {
959 m.insert(
960 CHANNEL_ARG_KEY.to_string(),
961 Value::String(ch.trim().to_string()),
962 );
963 }
964 }
965 }
966
967 let required = required_scopes(&body.name, Some(&args));
968 let allowed = match required {
969 None => false,
970 Some(reqs) => reqs.is_subset(&auth.scopes),
971 };
972 if !allowed {
973 let _ = audit_write(
974 &state.team.audit,
975 &auth.token_id,
976 &workspace_id,
977 Some(&body.name),
978 Some("/v1/tools/call"),
979 false,
980 Some("scope_denied"),
981 Some(&args),
982 )
983 .await;
984 return super::json_error(
985 StatusCode::FORBIDDEN,
986 "scope_denied",
987 "token lacks required scope for this tool",
988 );
989 }
990
991 let tool_name = body.name.clone();
992 let call = tokio::time::timeout(
993 state.timeout,
994 state
995 .team
996 .engine
997 .call_tool_value(&tool_name, Some(args.clone())),
998 )
999 .await;
1000
1001 match call {
1002 Ok(Ok(v)) => {
1003 let _ = audit_write(
1004 &state.team.audit,
1005 &auth.token_id,
1006 &workspace_id,
1007 Some(&tool_name),
1008 Some("/v1/tools/call"),
1009 true,
1010 None,
1011 Some(&args),
1012 )
1013 .await;
1014 (StatusCode::OK, Json(json!({ "result": v }))).into_response()
1015 }
1016 Ok(Err(e)) => {
1017 let _ = audit_write(
1018 &state.team.audit,
1019 &auth.token_id,
1020 &workspace_id,
1021 Some(&tool_name),
1022 Some("/v1/tools/call"),
1023 true,
1024 Some("tool_error"),
1025 Some(&args),
1026 )
1027 .await;
1028 {
1029 tracing::warn!("team tool call error: {e}");
1030 super::json_error(
1031 StatusCode::BAD_REQUEST,
1032 "tool_error",
1033 "tool execution failed",
1034 )
1035 }
1036 }
1037 Err(_) => {
1038 let _ = audit_write(
1039 &state.team.audit,
1040 &auth.token_id,
1041 &workspace_id,
1042 Some(&tool_name),
1043 Some("/v1/tools/call"),
1044 true,
1045 Some("request_timeout"),
1046 Some(&args),
1047 )
1048 .await;
1049 super::json_error(
1050 StatusCode::GATEWAY_TIMEOUT,
1051 "request_timeout",
1052 "tool call timed out",
1053 )
1054 }
1055 }
1056}
1057
1058async fn v1_events(
1059 State(state): State<TeamAppState>,
1060 Extension(auth): Extension<TeamAuthContext>,
1061 Extension(ctx): Extension<TeamRequestContext>,
1062 Query(q): Query<EventsQuery>,
1063) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
1064 let ws = ctx.workspace_id;
1065 let ch = q.channel_id.unwrap_or_else(|| "default".to_string());
1066 let since = q.since.unwrap_or(0);
1067 let limit = q.limit.unwrap_or(200).min(1000);
1068
1069 let _ = audit_event(
1070 &state.team.audit,
1071 &auth.token_id,
1072 &ws,
1073 &ch,
1074 "sse_subscribe",
1075 None,
1076 since,
1077 )
1078 .await;
1079
1080 let rt = crate::core::context_os::runtime();
1081 let replay = rt.bus.read(&ws, &ch, since, limit);
1082 let rx = if let Some(rx) = rt.bus.subscribe(&ws, &ch) {
1083 rx
1084 } else {
1085 tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
1086 let (_, rx) = tokio::sync::broadcast::channel::<crate::core::context_os::ContextEventV1>(1);
1087 rx
1088 };
1089 rt.metrics.record_sse_connect();
1090 rt.metrics.record_events_replayed(replay.len() as u64);
1091 rt.metrics.record_workspace_active(&ws);
1092
1093 let bus = rt.bus.clone();
1094 let metrics = rt.metrics.clone();
1095 let pending: std::collections::VecDeque<crate::core::context_os::ContextEventV1> =
1096 replay.into();
1097
1098 use crate::core::context_os::{redact_event_payload, RedactionLevel};
1099 let redaction = RedactionLevel::RefsOnly;
1100
1101 let stream = futures::stream::unfold(
1102 (
1103 pending,
1104 rx,
1105 ws.clone(),
1106 ch.clone(),
1107 since,
1108 redaction,
1109 bus,
1110 metrics,
1111 ),
1112 |(mut pending, mut rx, ws, ch, mut last_id, redaction, bus, metrics)| async move {
1113 if let Some(mut ev) = pending.pop_front() {
1114 last_id = ev.id;
1115 redact_event_payload(&mut ev, redaction);
1116 let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
1117 let evt = SseEvent::default()
1118 .id(ev.id.to_string())
1119 .event(ev.kind)
1120 .data(data);
1121 return Some((
1122 Ok(evt),
1123 (pending, rx, ws, ch, last_id, redaction, bus, metrics),
1124 ));
1125 }
1126
1127 loop {
1128 match rx.recv().await {
1129 Ok(mut ev) if ev.id > last_id => {
1130 last_id = ev.id;
1131 redact_event_payload(&mut ev, redaction);
1132 let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
1133 let evt = SseEvent::default()
1134 .id(ev.id.to_string())
1135 .event(ev.kind)
1136 .data(data);
1137 return Some((
1138 Ok(evt),
1139 (pending, rx, ws, ch, last_id, redaction, bus, metrics),
1140 ));
1141 }
1142 Ok(_) => {}
1143 Err(broadcast::error::RecvError::Closed) => return None,
1144 Err(broadcast::error::RecvError::Lagged(skipped)) => {
1145 let missed = bus.read(&ws, &ch, last_id, skipped as usize);
1146 metrics.record_events_replayed(missed.len() as u64);
1147 for ev in missed {
1148 last_id = last_id.max(ev.id);
1149 pending.push_back(ev);
1150 }
1151 }
1152 }
1153 }
1154 },
1155 );
1156
1157 let metrics_ref = rt.metrics.clone();
1158 let guarded = super::SseDisconnectGuard {
1159 inner: Box::pin(stream),
1160 metrics: metrics_ref,
1161 };
1162
1163 Sse::new(guarded).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
1164}
1165
1166async fn v1_team_metrics(State(_state): State<TeamAppState>) -> impl IntoResponse {
1167 let rt = crate::core::context_os::runtime();
1168 let snap = rt.metrics.snapshot();
1169 (
1170 StatusCode::OK,
1171 Json(serde_json::to_value(snap).unwrap_or_default()),
1172 )
1173}
1174
1175fn streamable_http_config(cfg: &TeamServerConfig) -> rmcp::transport::StreamableHttpServerConfig {
1176 let mut out = rmcp::transport::StreamableHttpServerConfig::default()
1177 .with_stateful_mode(cfg.stateful_mode)
1178 .with_json_response(cfg.json_response);
1179
1180 if cfg.disable_host_check {
1181 out = out.disable_allowed_hosts();
1182 return out;
1183 }
1184 if !cfg.allowed_hosts.is_empty() {
1185 out = out.with_allowed_hosts(cfg.allowed_hosts.clone());
1186 return out;
1187 }
1188 let host = cfg.host.trim();
1189 if host == "127.0.0.1" || host == "localhost" || host == "::1" {
1190 out.allowed_hosts.push(host.to_string());
1191 }
1192 out
1193}
1194
1195pub async fn serve_team(cfg: TeamServerConfig) -> Result<()> {
1196 cfg.validate_for_serve()?;
1197
1198 let addr: std::net::SocketAddr = format!("{}:{}", cfg.host, cfg.port)
1199 .parse()
1200 .context("invalid host/port")?;
1201
1202 let team_server = TeamCtxServer {
1203 default_workspace_id: cfg.default_workspace_id.clone(),
1204 roots: Arc::new(
1205 cfg.workspaces
1206 .iter()
1207 .map(|w| (w.id.clone(), w.root.to_string_lossy().to_string()))
1208 .collect(),
1209 ),
1210 };
1211 let engine = Arc::new(TeamContextEngine::new(team_server.clone()));
1212
1213 let audit_file = tokio::fs::OpenOptions::new()
1214 .create(true)
1215 .append(true)
1216 .open(&cfg.audit_log_path)
1217 .await
1218 .with_context(|| format!("open audit log {}", cfg.audit_log_path.display()))?;
1219
1220 let savings_dir = cfg
1221 .audit_log_path
1222 .parent()
1223 .unwrap_or(std::path::Path::new("."))
1224 .join("savings");
1225 let team = Arc::new(TeamState {
1226 auth: Arc::new(cfg.tokens.clone()),
1227 engine,
1228 audit: Arc::new(tokio::sync::Mutex::new(audit_file)),
1229 savings_store_dir: Arc::new(tokio::sync::Mutex::new(savings_dir)),
1230 });
1231
1232 let state = TeamAppState {
1233 concurrency: Arc::new(tokio::sync::Semaphore::new(cfg.max_concurrency.max(1))),
1234 rate: Arc::new(super::RateLimiter::new(cfg.max_rps, cfg.rate_burst)),
1235 timeout: Duration::from_millis(cfg.request_timeout_ms.max(1)),
1236 team,
1237 max_body_bytes: cfg.max_body_bytes,
1238 };
1239
1240 let service_factory =
1241 move || -> std::result::Result<TeamCtxServer, std::io::Error> { Ok(team_server.clone()) };
1242 let mcp_http = StreamableHttpService::new(
1243 service_factory,
1244 Arc::new(
1245 rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
1246 ),
1247 streamable_http_config(&cfg),
1248 );
1249
1250 let app = Router::new()
1251 .route("/health", get(super::health))
1252 .route("/v1/manifest", get(v1_manifest))
1253 .route("/v1/tools", get(v1_tools))
1254 .route("/v1/tools/call", axum::routing::post(v1_tool_call))
1255 .route("/v1/events", get(v1_events))
1256 .route(
1257 "/v1/context/summary",
1258 get(super::context_views::v1_context_summary),
1259 )
1260 .route(
1261 "/v1/events/search",
1262 get(super::context_views::v1_events_search),
1263 )
1264 .route(
1265 "/v1/events/lineage",
1266 get(super::context_views::v1_event_lineage),
1267 )
1268 .route("/v1/metrics", get(v1_team_metrics))
1269 .route(
1270 "/api/v1/savings/ingest",
1271 axum::routing::post(super::savings_ingest::v1_savings_ingest),
1272 )
1273 .fallback_service(mcp_http)
1274 .layer(axum::extract::DefaultBodyLimit::max(cfg.max_body_bytes))
1275 .layer(middleware::from_fn_with_state(
1276 state.clone(),
1277 team_rate_limit_middleware,
1278 ))
1279 .layer(middleware::from_fn_with_state(
1280 state.clone(),
1281 team_concurrency_middleware,
1282 ))
1283 .layer(middleware::from_fn_with_state(
1284 state.clone(),
1285 team_auth_middleware,
1286 ))
1287 .with_state(state);
1288
1289 let listener = tokio::net::TcpListener::bind(addr)
1290 .await
1291 .with_context(|| format!("bind {addr}"))?;
1292
1293 tracing::info!(
1294 "lean-ctx TEAM server listening on http://{addr} (workspaces={}, audit={})",
1295 cfg.workspaces.len(),
1296 cfg.audit_log_path.display()
1297 );
1298
1299 axum::serve(listener, app)
1300 .with_graceful_shutdown(async move {
1301 let _ = tokio::signal::ctrl_c().await;
1302 })
1303 .await
1304 .context("team http server")?;
1305 Ok(())
1306}
1307
1308pub fn create_token() -> Result<(String, String)> {
1309 let mut bytes = [0u8; 32];
1310 getrandom::fill(&mut bytes).map_err(|e| anyhow!("getrandom: {e}"))?;
1311 let token = hex_lower(&bytes);
1312 let hash = sha256_hex(token.as_bytes());
1313 Ok((token, hash))
1314}