1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{anyhow, Context, Result};
6use axum::{
7 extract::Json,
8 extract::Query,
9 extract::State,
10 http::{header, Request, StatusCode},
11 middleware::{self, Next},
12 response::sse::{Event as SseEvent, KeepAlive, Sse},
13 response::{IntoResponse, Response},
14 routing::get,
15 Router,
16};
17use futures::Stream;
18use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
19use serde::Deserialize;
20use serde_json::Value;
21use tokio::sync::broadcast;
22use tokio::time::{Duration, Instant};
23
24use crate::core::context_os::ContextOsMetrics;
25use crate::engine::ContextEngine;
26use crate::tools::LeanCtxServer;
27
28pub mod context_views;
29
30#[cfg(feature = "team-server")]
31pub mod team;
32
33use std::pin::Pin;
35
36pub(crate) struct SseDisconnectGuard<I> {
37 pub(crate) inner: Pin<Box<dyn Stream<Item = I> + Send>>,
38 pub(crate) metrics: Arc<ContextOsMetrics>,
39}
40
41impl<I> Stream for SseDisconnectGuard<I> {
42 type Item = I;
43
44 fn poll_next(
45 mut self: Pin<&mut Self>,
46 cx: &mut std::task::Context<'_>,
47 ) -> std::task::Poll<Option<Self::Item>> {
48 self.inner.as_mut().poll_next(cx)
49 }
50}
51
52impl<I> Drop for SseDisconnectGuard<I> {
53 fn drop(&mut self) {
54 self.metrics.record_sse_disconnect();
55 }
56}
57
58const MAX_ID_LEN: usize = 64;
59
60fn sanitize_id(raw: &str) -> String {
61 let trimmed = raw.trim();
62 if trimmed.is_empty() {
63 return "default".to_string();
64 }
65 let cleaned: String = trimmed
66 .chars()
67 .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
68 .take(MAX_ID_LEN)
69 .collect();
70 if cleaned.is_empty() {
71 "default".to_string()
72 } else {
73 cleaned
74 }
75}
76
77#[derive(Clone, Debug)]
78pub struct HttpServerConfig {
79 pub host: String,
80 pub port: u16,
81 pub project_root: PathBuf,
82 pub auth_token: Option<String>,
83 pub stateful_mode: bool,
84 pub json_response: bool,
85 pub disable_host_check: bool,
86 pub allowed_hosts: Vec<String>,
87 pub max_body_bytes: usize,
88 pub max_concurrency: usize,
89 pub max_rps: u32,
90 pub rate_burst: u32,
91 pub request_timeout_ms: u64,
92}
93
94impl Default for HttpServerConfig {
95 fn default() -> Self {
96 let project_root = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
97 Self {
98 host: "127.0.0.1".to_string(),
99 port: 8080,
100 project_root,
101 auth_token: None,
102 stateful_mode: false,
103 json_response: true,
104 disable_host_check: false,
105 allowed_hosts: Vec::new(),
106 max_body_bytes: 2 * 1024 * 1024,
107 max_concurrency: 32,
108 max_rps: 50,
109 rate_burst: 100,
110 request_timeout_ms: 30_000,
111 }
112 }
113}
114
115impl HttpServerConfig {
116 pub fn validate(&self) -> Result<()> {
117 let host = self.host.trim().to_lowercase();
118 let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
119 if !is_loopback && self.auth_token.as_deref().unwrap_or("").is_empty() {
120 return Err(anyhow!(
121 "Refusing to bind to host='{host}' without auth. Provide --auth-token (or bind to 127.0.0.1)."
122 ));
123 }
124 Ok(())
125 }
126
127 pub fn effective_auth_token(&self) -> Option<String> {
128 if let Some(ref token) = self.auth_token {
129 if !token.is_empty() {
130 return Some(token.clone());
131 }
132 }
133 let host = self.host.trim().to_lowercase();
134 let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
135 if is_loopback {
136 let auto_token = crate::core::session_token::generate_token();
137 eprintln!(
138 "[lean-ctx] Auto-generated auth token for loopback: {auto_token}\n\
139 Pass as Bearer token or set --auth-token explicitly."
140 );
141 Some(auto_token)
142 } else {
143 None
144 }
145 }
146
147 fn mcp_http_config(&self) -> StreamableHttpServerConfig {
148 let mut cfg = StreamableHttpServerConfig::default()
149 .with_stateful_mode(self.stateful_mode)
150 .with_json_response(self.json_response);
151
152 if self.disable_host_check {
153 tracing::warn!(
154 "⚠ --disable-host-check is active: DNS rebinding protection is OFF. \
155 Do NOT use this in production or on non-loopback interfaces."
156 );
157 cfg = cfg.disable_allowed_hosts();
158 return cfg;
159 }
160
161 if !self.allowed_hosts.is_empty() {
162 cfg = cfg.with_allowed_hosts(self.allowed_hosts.clone());
163 return cfg;
164 }
165
166 let host = self.host.trim();
168 if host == "127.0.0.1" || host == "localhost" || host == "::1" {
169 cfg.allowed_hosts.push(host.to_string());
170 }
171
172 cfg
173 }
174}
175
176#[derive(Clone)]
177struct AppState {
178 token: Option<String>,
179 concurrency: Arc<tokio::sync::Semaphore>,
180 rate: Arc<RateLimiter>,
181 project_root: String,
182 timeout: Duration,
183 server: LeanCtxServer,
184}
185
186#[derive(Debug)]
187struct RateLimiter {
188 max_rps: f64,
189 burst: f64,
190 state: tokio::sync::Mutex<RateState>,
191}
192
193#[derive(Debug, Clone, Copy)]
194struct RateState {
195 tokens: f64,
196 last: Instant,
197}
198
199impl RateLimiter {
200 fn new(max_rps: u32, burst: u32) -> Self {
201 let now = Instant::now();
202 Self {
203 max_rps: (max_rps.max(1)) as f64,
204 burst: (burst.max(1)) as f64,
205 state: tokio::sync::Mutex::new(RateState {
206 tokens: (burst.max(1)) as f64,
207 last: now,
208 }),
209 }
210 }
211
212 async fn allow(&self) -> bool {
213 let mut s = self.state.lock().await;
214 let now = Instant::now();
215 let elapsed = now.saturating_duration_since(s.last);
216 let refill = elapsed.as_secs_f64() * self.max_rps;
217 s.tokens = (s.tokens + refill).min(self.burst);
218 s.last = now;
219 if s.tokens >= 1.0 {
220 s.tokens -= 1.0;
221 true
222 } else {
223 false
224 }
225 }
226}
227
228async fn auth_middleware(
229 State(state): State<AppState>,
230 req: Request<axum::body::Body>,
231 next: Next,
232) -> Response {
233 if state.token.is_none() {
234 return next.run(req).await;
235 }
236
237 if req.uri().path() == "/health" {
238 return next.run(req).await;
239 }
240
241 let expected = state.token.as_deref().unwrap_or("");
242 let Some(h) = req.headers().get(header::AUTHORIZATION) else {
243 return StatusCode::UNAUTHORIZED.into_response();
244 };
245 let Ok(s) = h.to_str() else {
246 return StatusCode::UNAUTHORIZED.into_response();
247 };
248 let Some(token) = s
249 .strip_prefix("Bearer ")
250 .or_else(|| s.strip_prefix("bearer "))
251 else {
252 return StatusCode::UNAUTHORIZED.into_response();
253 };
254 if !constant_time_eq(token.as_bytes(), expected.as_bytes()) {
255 return StatusCode::UNAUTHORIZED.into_response();
256 }
257
258 next.run(req).await
259}
260
261fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
262 use subtle::ConstantTimeEq;
263 if a.len() != b.len() {
264 return false;
265 }
266 bool::from(a.ct_eq(b))
267}
268
269async fn rate_limit_middleware(
270 State(state): State<AppState>,
271 req: Request<axum::body::Body>,
272 next: Next,
273) -> Response {
274 if !state.rate.allow().await {
275 return StatusCode::TOO_MANY_REQUESTS.into_response();
276 }
277 next.run(req).await
278}
279
280async fn concurrency_middleware(
281 State(state): State<AppState>,
282 req: Request<axum::body::Body>,
283 next: Next,
284) -> Response {
285 let Ok(permit) = state.concurrency.clone().try_acquire_owned() else {
286 return StatusCode::TOO_MANY_REQUESTS.into_response();
287 };
288 let resp = next.run(req).await;
289 drop(permit);
290 resp
291}
292
293async fn health() -> impl IntoResponse {
294 (StatusCode::OK, "ok\n")
295}
296
297async fn v1_shutdown() -> impl IntoResponse {
298 tokio::spawn(async {
299 tokio::time::sleep(Duration::from_millis(100)).await;
300 std::process::exit(0);
301 });
302 (StatusCode::OK, "shutting down\n")
303}
304
305#[derive(Debug, Deserialize)]
306#[serde(rename_all = "camelCase")]
307struct ToolCallBody {
308 name: String,
309 #[serde(default)]
310 arguments: Option<Value>,
311 #[serde(default)]
312 _workspace_id: Option<String>,
313 #[serde(default)]
314 _channel_id: Option<String>,
315}
316
317#[derive(Debug, Deserialize)]
318#[serde(rename_all = "camelCase")]
319struct EventsQuery {
320 #[serde(default)]
321 workspace_id: Option<String>,
322 #[serde(default)]
323 channel_id: Option<String>,
324 #[serde(default)]
325 since: Option<i64>,
326 #[serde(default)]
327 limit: Option<usize>,
328 #[serde(default)]
331 kind: Option<String>,
332}
333
334async fn v1_manifest(State(state): State<AppState>) -> impl IntoResponse {
335 let _ = state;
336 let v = crate::core::mcp_manifest::manifest_value();
337 (StatusCode::OK, Json(v))
338}
339
340#[derive(Debug, Deserialize)]
341#[serde(rename_all = "camelCase")]
342struct ToolsQuery {
343 #[serde(default)]
344 offset: Option<usize>,
345 #[serde(default)]
346 limit: Option<usize>,
347}
348
349async fn v1_tools(State(state): State<AppState>, Query(q): Query<ToolsQuery>) -> impl IntoResponse {
350 let _ = state;
351 let v = crate::core::mcp_manifest::manifest_value();
352 let tools = v
353 .get("tools")
354 .and_then(|t| t.get("granular"))
355 .cloned()
356 .unwrap_or(Value::Array(vec![]));
357
358 let all = tools.as_array().cloned().unwrap_or_default();
359 let total = all.len();
360 let offset = q.offset.unwrap_or(0).min(total);
361 let limit = q.limit.unwrap_or(200).min(500);
362 let page = all.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
363
364 (
365 StatusCode::OK,
366 Json(serde_json::json!({
367 "tools": page,
368 "total": total,
369 "offset": offset,
370 "limit": limit,
371 })),
372 )
373}
374
375async fn v1_tool_call(
376 State(state): State<AppState>,
377 Json(body): Json<ToolCallBody>,
378) -> impl IntoResponse {
379 let engine = ContextEngine::from_server(state.server.clone());
380 match tokio::time::timeout(
381 state.timeout,
382 engine.call_tool_value(&body.name, body.arguments),
383 )
384 .await
385 {
386 Ok(Ok(v)) => (StatusCode::OK, Json(serde_json::json!({ "result": v }))).into_response(),
387 Ok(Err(e)) => {
388 tracing::warn!("tool call error: {e}");
389 (
390 StatusCode::BAD_REQUEST,
391 Json(serde_json::json!({ "error": "tool_error", "code": "TOOL_ERROR" })),
392 )
393 .into_response()
394 }
395 Err(_) => (
396 StatusCode::GATEWAY_TIMEOUT,
397 Json(serde_json::json!({ "error": "request_timeout" })),
398 )
399 .into_response(),
400 }
401}
402
403async fn v1_events(
404 State(state): State<AppState>,
405 Query(q): Query<EventsQuery>,
406) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
407 use crate::core::context_os::{redact_event_payload, ContextEventV1, RedactionLevel};
408
409 let ws = sanitize_id(&q.workspace_id.unwrap_or_else(|| "default".to_string()));
410 let ch = sanitize_id(&q.channel_id.unwrap_or_else(|| "default".to_string()));
411 let _ = &state.project_root;
412 let since = q.since.unwrap_or(0);
413 let limit = q.limit.unwrap_or(200).min(1000);
414 let redaction = RedactionLevel::RefsOnly;
415
416 let kind_filter: Option<Vec<String>> = q
417 .kind
418 .as_deref()
419 .map(|k| k.split(',').map(|s| s.trim().to_string()).collect());
420
421 let rt = crate::core::context_os::runtime();
422 let replay = rt.bus.read(&ws, &ch, since, limit);
423
424 let replay = if let Some(ref kinds) = kind_filter {
425 replay
426 .into_iter()
427 .filter(|ev| kinds.contains(&ev.kind))
428 .collect()
429 } else {
430 replay
431 };
432
433 let rx = if let Some(ref kinds) = kind_filter {
434 let kind_refs: Vec<&str> = kinds.iter().map(String::as_str).collect();
435 let filter = crate::core::context_os::TopicFilter::kinds(&kind_refs);
436 if let Some(sub) = rt.bus.subscribe_filtered(&ws, &ch, filter) {
437 crate::core::context_os::SubscriptionKind::Filtered(sub)
438 } else {
439 tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
440 let (_, rx) = broadcast::channel::<ContextEventV1>(1);
441 crate::core::context_os::SubscriptionKind::Unfiltered(rx)
442 }
443 } else if let Some(sub) = rt.bus.subscribe(&ws, &ch) {
444 crate::core::context_os::SubscriptionKind::Unfiltered(sub)
445 } else {
446 tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
447 let (_, rx) = broadcast::channel::<ContextEventV1>(1);
448 crate::core::context_os::SubscriptionKind::Unfiltered(rx)
449 };
450
451 rt.metrics.record_sse_connect();
452 rt.metrics.record_events_replayed(replay.len() as u64);
453 rt.metrics.record_workspace_active(&ws);
454
455 let bus = rt.bus.clone();
456 let metrics = rt.metrics.clone();
457 let pending: std::collections::VecDeque<ContextEventV1> = replay.into();
458
459 let stream = futures::stream::unfold(
460 (
461 pending,
462 rx,
463 ws.clone(),
464 ch.clone(),
465 since,
466 redaction,
467 bus,
468 metrics,
469 ),
470 |(mut pending, mut rx, ws, ch, mut last_id, redaction, bus, metrics)| async move {
471 if let Some(mut ev) = pending.pop_front() {
472 last_id = ev.id;
473 redact_event_payload(&mut ev, redaction);
474 let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
475 let evt = SseEvent::default()
476 .id(ev.id.to_string())
477 .event(ev.kind)
478 .data(data);
479 return Some((
480 Ok(evt),
481 (pending, rx, ws, ch, last_id, redaction, bus, metrics),
482 ));
483 }
484
485 loop {
486 match rx.recv().await {
487 Ok(mut ev) if ev.id > last_id => {
488 last_id = ev.id;
489 redact_event_payload(&mut ev, redaction);
490 let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
491 let evt = SseEvent::default()
492 .id(ev.id.to_string())
493 .event(ev.kind)
494 .data(data);
495 return Some((
496 Ok(evt),
497 (pending, rx, ws, ch, last_id, redaction, bus, metrics),
498 ));
499 }
500 Ok(_) => {}
501 Err(broadcast::error::RecvError::Closed) => return None,
502 Err(broadcast::error::RecvError::Lagged(skipped)) => {
503 let missed = bus.read(&ws, &ch, last_id, skipped as usize);
504 metrics.record_events_replayed(missed.len() as u64);
505 for ev in missed {
506 last_id = last_id.max(ev.id);
507 pending.push_back(ev);
508 }
509 }
510 }
511 }
512 },
513 );
514
515 let metrics_ref = rt.metrics.clone();
516 let guarded = SseDisconnectGuard {
517 inner: Box::pin(stream),
518 metrics: metrics_ref,
519 };
520
521 Sse::new(guarded).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
522}
523
524#[derive(Debug, Deserialize)]
525struct AuditEventsQuery {
526 #[serde(default = "default_audit_limit")]
527 limit: usize,
528}
529
530fn default_audit_limit() -> usize {
531 100
532}
533
534async fn v1_audit_events(Query(q): Query<AuditEventsQuery>) -> impl IntoResponse {
535 let capped = q.limit.min(1000);
536 let boundary_events = crate::core::memory_boundary::load_audit_events(capped);
537 let trail_events = crate::core::audit_trail::load_recent(capped);
538
539 Json(serde_json::json!({
540 "cross_project_events": boundary_events,
541 "audit_trail": trail_events,
542 }))
543}
544
545async fn v1_metrics(State(_state): State<AppState>) -> impl IntoResponse {
546 let rt = crate::core::context_os::runtime();
547 let snap = rt.metrics.snapshot();
548 (
549 StatusCode::OK,
550 Json(serde_json::to_value(snap).unwrap_or_default()),
551 )
552}
553
554const MAX_HANDOFF_PAYLOAD_BYTES: usize = 1_000_000;
555const MAX_HANDOFF_FILES: usize = 50;
556
557async fn v1_a2a_handoff(
558 State(state): State<AppState>,
559 Json(body): Json<Value>,
560) -> impl IntoResponse {
561 let envelope = match crate::core::a2a_transport::parse_envelope(
562 &serde_json::to_string(&body).unwrap_or_default(),
563 ) {
564 Ok(env) => env,
565 Err(e) => {
566 tracing::warn!("a2a handoff parse error: {e}");
567 return (
568 StatusCode::BAD_REQUEST,
569 Json(serde_json::json!({"error": "invalid_envelope"})),
570 );
571 }
572 };
573
574 if envelope.payload_json.len() > MAX_HANDOFF_PAYLOAD_BYTES {
575 tracing::warn!(
576 "a2a handoff payload too large: {} bytes (limit {MAX_HANDOFF_PAYLOAD_BYTES})",
577 envelope.payload_json.len()
578 );
579 return (
580 StatusCode::PAYLOAD_TOO_LARGE,
581 Json(serde_json::json!({"error": "payload_too_large"})),
582 );
583 }
584
585 let rt = crate::core::context_os::runtime();
586 rt.bus.append(
587 &state.project_root,
588 "a2a",
589 &crate::core::context_os::ContextEventKindV1::SessionMutated,
590 Some(&envelope.sender.agent_id),
591 serde_json::json!({
592 "type": "handoff_received",
593 "content_type": format!("{:?}", envelope.content_type),
594 "sender": envelope.sender.agent_id,
595 "payload_size": envelope.payload_json.len(),
596 }),
597 );
598
599 match envelope.content_type {
600 crate::core::a2a_transport::TransportContentType::ContextPackage => {
601 let dir = std::path::Path::new(&state.project_root)
602 .join(".lean-ctx")
603 .join("handoffs")
604 .join("packages");
605 let _ = std::fs::create_dir_all(&dir);
606 evict_oldest_files(&dir, MAX_HANDOFF_FILES);
607 let out = dir.join(format!(
608 "ctx-{}.{}",
609 chrono::Utc::now().format("%Y%m%d_%H%M%S"),
610 crate::core::contracts::PACKAGE_EXTENSION
611 ));
612 if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
613 tracing::error!("a2a handoff write failed: {e}");
614 return (
615 StatusCode::INTERNAL_SERVER_ERROR,
616 Json(serde_json::json!({"error": "write_failed"})),
617 );
618 }
619 (
620 StatusCode::OK,
621 Json(serde_json::json!({
622 "status": "received",
623 "content_type": "context_package",
624 })),
625 )
626 }
627 crate::core::a2a_transport::TransportContentType::HandoffBundle => {
628 let dir = std::path::Path::new(&state.project_root)
629 .join(".lean-ctx")
630 .join("handoffs");
631 let _ = std::fs::create_dir_all(&dir);
632 evict_oldest_files(&dir, MAX_HANDOFF_FILES);
633 let out = dir.join(format!(
634 "received-{}.json",
635 chrono::Utc::now().format("%Y%m%d_%H%M%S")
636 ));
637 if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
638 tracing::error!("a2a handoff write failed: {e}");
639 return (
640 StatusCode::INTERNAL_SERVER_ERROR,
641 Json(serde_json::json!({"error": "write_failed"})),
642 );
643 }
644 (
645 StatusCode::OK,
646 Json(serde_json::json!({
647 "status": "received",
648 "content_type": "handoff_bundle",
649 })),
650 )
651 }
652 _ => (
653 StatusCode::OK,
654 Json(serde_json::json!({
655 "status": "received",
656 "content_type": format!("{:?}", envelope.content_type),
657 })),
658 ),
659 }
660}
661
662fn evict_oldest_files(dir: &std::path::Path, max_files: usize) {
663 let Ok(entries) = std::fs::read_dir(dir) else {
664 return;
665 };
666 let mut files: Vec<(std::time::SystemTime, std::path::PathBuf)> = entries
667 .filter_map(|e| {
668 let e = e.ok()?;
669 let meta = e.metadata().ok()?;
670 if meta.is_file() {
671 Some((meta.modified().unwrap_or(std::time::UNIX_EPOCH), e.path()))
672 } else {
673 None
674 }
675 })
676 .collect();
677
678 if files.len() < max_files {
679 return;
680 }
681 files.sort_by_key(|(mtime, _)| *mtime);
682 let to_remove = files.len().saturating_sub(max_files.saturating_sub(1));
683 for (_, path) in files.into_iter().take(to_remove) {
684 let _ = std::fs::remove_file(path);
685 }
686}
687
688async fn a2a_jsonrpc(Json(body): Json<Value>) -> impl IntoResponse {
689 let req: crate::core::a2a::a2a_compat::JsonRpcRequest = match serde_json::from_value(body) {
690 Ok(r) => r,
691 Err(e) => {
692 tracing::debug!("a2a JSON-RPC parse error: {e}");
693 return (
694 StatusCode::BAD_REQUEST,
695 Json(serde_json::json!({
696 "jsonrpc": "2.0",
697 "id": null,
698 "error": {"code": -32700, "message": "invalid request"}
699 })),
700 );
701 }
702 };
703 let resp = crate::core::a2a::a2a_compat::handle_a2a_jsonrpc(&req);
704 let json = serde_json::to_value(resp).unwrap_or_default();
705 (StatusCode::OK, Json(json))
706}
707
708async fn v1_a2a_agent_card(State(state): State<AppState>) -> impl IntoResponse {
709 let card = crate::core::a2a::agent_card::build_agent_card(&state.project_root);
710 (
711 StatusCode::OK,
712 [(header::CONTENT_TYPE, "application/json")],
713 Json(card),
714 )
715}
716
717async fn mcp_server_card() -> impl IntoResponse {
718 let card = serde_json::json!({
719 "name": "lean-ctx",
720 "version": env!("CARGO_PKG_VERSION"),
721 "description": "Context Infrastructure Layer — compression, caching, governance for AI agents",
722 "capabilities": {
723 "tools": true,
724 "resources": false,
725 "prompts": false,
726 "sampling": false
727 },
728 "tool_categories": [
729 {"name": "file_operations", "tools": ["ctx_read", "ctx_search", "ctx_tree", "ctx_edit"], "avg_token_cost": 150},
730 {"name": "session_management", "tools": ["ctx_session", "ctx_compress", "ctx_dedup", "ctx_preload"], "avg_token_cost": 80},
731 {"name": "intelligence", "tools": ["ctx_knowledge", "ctx_semantic_search", "ctx_graph", "ctx_overview"], "avg_token_cost": 200},
732 {"name": "agent_ops", "tools": ["ctx_agent", "ctx_handoff", "ctx_task", "ctx_share"], "avg_token_cost": 120}
733 ],
734 "features": {
735 "compression": "deterministic AST-based, 40-70% token reduction",
736 "caching": "session-scoped with zstd, re-reads ~13 tokens",
737 "audit_trail": "SHA-256 chained JSONL",
738 "rbac": "5 built-in roles with capability-based access",
739 "sandboxing": "Level 0 (subprocess) + Level 1 (OS-level)",
740 "secret_detection": "8 regex patterns + custom"
741 },
742 "security": {
743 "path_jail": true,
744 "rate_limiting": true,
745 "budget_tracking": true,
746 "signed_handoffs": true,
747 "timing_safe_auth": true
748 }
749 });
750 Json(card)
751}
752
753async fn v1_agents_register(
754 State(state): State<AppState>,
755 Json(body): Json<Value>,
756) -> impl IntoResponse {
757 let agent_type = body
758 .get("agent_type")
759 .and_then(|v| v.as_str())
760 .unwrap_or("unknown");
761 let role = body.get("role").and_then(|v| v.as_str());
762 let project_root = body
763 .get("project_root")
764 .and_then(|v| v.as_str())
765 .unwrap_or(&state.project_root);
766
767 let mut registry = crate::core::agents::AgentRegistry::load_or_create();
768 let agent_id = registry.register(agent_type, role, project_root);
769 let _ = registry.save();
770
771 Json(serde_json::json!({
772 "agent_id": agent_id,
773 "status": "registered"
774 }))
775}
776
777async fn v1_agents_heartbeat(Json(body): Json<Value>) -> impl IntoResponse {
778 let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
779 let mut registry = crate::core::agents::AgentRegistry::load_or_create();
780 registry.update_heartbeat(agent_id);
781 let _ = registry.save();
782 Json(serde_json::json!({"status": "ok"}))
783}
784
785async fn v1_agents_list() -> impl IntoResponse {
786 let registry = crate::core::agents::AgentRegistry::load_or_create();
787 let active = registry.list_active(None);
788 Json(serde_json::json!({
789 "agents": active.iter().map(|a| serde_json::json!({
790 "agent_id": a.agent_id,
791 "agent_type": a.agent_type,
792 "role": a.role,
793 "status": a.status.to_string(),
794 "last_active": a.last_active.to_rfc3339(),
795 })).collect::<Vec<_>>()
796 }))
797}
798
799async fn v1_agents_deregister(Json(body): Json<Value>) -> impl IntoResponse {
800 let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
801 let mut registry = crate::core::agents::AgentRegistry::load_or_create();
802 registry.set_status(
803 agent_id,
804 crate::core::agents::AgentStatus::Finished,
805 Some("deregistered via API"),
806 );
807 let _ = registry.save();
808 Json(serde_json::json!({"status": "deregistered"}))
809}
810
811async fn v1_agents_events_sse(
812) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
813 let stream = futures::stream::unfold(0usize, |last_count| async move {
814 loop {
815 tokio::time::sleep(Duration::from_secs(5)).await;
816 let registry = crate::core::agents::AgentRegistry::load_or_create();
817 let active = registry.list_active(None);
818 let count = active.len();
819 if count != last_count {
820 let data = serde_json::json!({
821 "type": "agents_changed",
822 "active_count": count,
823 "agents": active.iter().map(|a| &a.agent_id).collect::<Vec<_>>(),
824 });
825 return Some((
826 Ok::<_, std::convert::Infallible>(SseEvent::default().data(data.to_string())),
827 count,
828 ));
829 }
830 }
831 });
832
833 Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
834}
835
836fn build_app_router(cfg: &HttpServerConfig) -> Router {
837 let project_root = cfg.project_root.to_string_lossy().to_string();
838 let service_project_root = project_root.clone();
839 let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
840 Ok(LeanCtxServer::new_shared_with_context(
841 &service_project_root,
842 "default",
843 "default",
844 ))
845 };
846 let mcp_http = StreamableHttpService::new(
847 service_factory,
848 Arc::new(
849 rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
850 ),
851 cfg.mcp_http_config(),
852 );
853
854 let rest_server = LeanCtxServer::new_shared_with_context(&project_root, "default", "default");
855
856 let state = AppState {
857 token: cfg.effective_auth_token(),
858 concurrency: Arc::new(tokio::sync::Semaphore::new(cfg.max_concurrency.max(1))),
859 rate: Arc::new(RateLimiter::new(cfg.max_rps, cfg.rate_burst)),
860 project_root,
861 timeout: Duration::from_millis(cfg.request_timeout_ms.max(1)),
862 server: rest_server,
863 };
864
865 Router::new()
866 .route("/health", get(health))
867 .route("/v1/shutdown", axum::routing::post(v1_shutdown))
868 .route("/v1/manifest", get(v1_manifest))
869 .route("/v1/tools", get(v1_tools))
870 .route("/v1/tools/call", axum::routing::post(v1_tool_call))
871 .route("/v1/events", get(v1_events))
872 .route(
873 "/v1/context/summary",
874 get(context_views::v1_context_summary),
875 )
876 .route("/v1/events/search", get(context_views::v1_events_search))
877 .route("/v1/events/lineage", get(context_views::v1_event_lineage))
878 .route("/v1/metrics", get(v1_metrics))
879 .route("/v1/audit/events", get(v1_audit_events))
880 .route("/v1/a2a/handoff", axum::routing::post(v1_a2a_handoff))
881 .route("/v1/a2a/agent-card", get(v1_a2a_agent_card))
882 .route("/.well-known/agent.json", get(v1_a2a_agent_card))
883 .route("/.well-known/mcp-server.json", get(mcp_server_card))
884 .route("/a2a", axum::routing::post(a2a_jsonrpc))
885 .route(
886 "/v1/agents/register",
887 axum::routing::post(v1_agents_register),
888 )
889 .route(
890 "/v1/agents/heartbeat",
891 axum::routing::post(v1_agents_heartbeat),
892 )
893 .route("/v1/agents/list", get(v1_agents_list))
894 .route(
895 "/v1/agents/deregister",
896 axum::routing::post(v1_agents_deregister),
897 )
898 .route("/v1/agents/events", get(v1_agents_events_sse))
899 .fallback_service(mcp_http)
900 .layer(axum::extract::DefaultBodyLimit::max(cfg.max_body_bytes))
901 .layer(middleware::from_fn_with_state(
902 state.clone(),
903 rate_limit_middleware,
904 ))
905 .layer(middleware::from_fn_with_state(
906 state.clone(),
907 concurrency_middleware,
908 ))
909 .layer(middleware::from_fn_with_state(
910 state.clone(),
911 auth_middleware,
912 ))
913 .with_state(state)
914}
915
916pub async fn serve(cfg: HttpServerConfig) -> Result<()> {
917 crate::core::protocol::set_mcp_context(true);
918 cfg.validate()?;
919
920 let addr: SocketAddr = format!("{}:{}", cfg.host, cfg.port)
921 .parse()
922 .context("invalid host/port")?;
923
924 let app = build_app_router(&cfg);
925
926 let listener = tokio::net::TcpListener::bind(addr)
927 .await
928 .with_context(|| format!("bind {addr}"))?;
929
930 tracing::info!(
931 "lean-ctx Streamable HTTP server listening on http://{addr} (project_root={})",
932 cfg.project_root.display()
933 );
934
935 axum::serve(listener, app)
936 .with_graceful_shutdown(async move {
937 let _ = tokio::signal::ctrl_c().await;
938 })
939 .await
940 .context("http server")?;
941 Ok(())
942}
943
944#[cfg(windows)]
945impl axum::serve::Listener for crate::ipc::NamedPipeListener {
946 type Io = tokio::net::windows::named_pipe::NamedPipeServer;
947 type Addr = String;
948
949 async fn accept(&mut self) -> (Self::Io, Self::Addr) {
950 loop {
951 match self.accept_pipe().await {
952 Ok(pipe) => return (pipe, self.name().to_string()),
953 Err(e) => {
954 tracing::error!("named pipe accept error: {e}");
955 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
956 }
957 }
958 }
959 }
960
961 fn local_addr(&self) -> std::io::Result<Self::Addr> {
962 Ok(self.name().to_string())
963 }
964}
965
966pub async fn serve_ipc(cfg: HttpServerConfig, addr: crate::ipc::DaemonAddr) -> Result<()> {
969 cfg.validate()?;
970
971 match addr {
972 #[cfg(unix)]
973 crate::ipc::DaemonAddr::Unix(ref path) => {
974 let app = build_app_router(&cfg);
975 let listener = crate::ipc::bind_listener(&addr)?;
976
977 tracing::info!(
978 "lean-ctx daemon listening on {} (project_root={})",
979 path.display(),
980 cfg.project_root.display()
981 );
982
983 axum::serve(listener, app.into_make_service())
984 .with_graceful_shutdown(async move {
985 let _ = tokio::signal::ctrl_c().await;
986 })
987 .await
988 .context("ipc server")?;
989 Ok(())
990 }
991 #[cfg(windows)]
992 crate::ipc::DaemonAddr::NamedPipe(ref name) => {
993 let app = build_app_router(&cfg);
994 let listener = crate::ipc::bind_listener(&addr)?;
995
996 tracing::info!(
997 "lean-ctx daemon listening on {} (project_root={})",
998 name,
999 cfg.project_root.display()
1000 );
1001
1002 axum::serve(listener, app.into_make_service())
1003 .with_graceful_shutdown(async move {
1004 let _ = tokio::signal::ctrl_c().await;
1005 })
1006 .await
1007 .context("ipc server")?;
1008 Ok(())
1009 }
1010 }
1011}
1012
1013#[cfg(test)]
1014mod tests {
1015 use super::*;
1016 use axum::body::Body;
1017 use axum::http::Request;
1018 use futures::StreamExt;
1019 use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
1020 use serde_json::json;
1021 use tower::ServiceExt;
1022
1023 async fn read_first_sse_message(body: Body) -> String {
1024 let mut stream = body.into_data_stream();
1025 let mut buf: Vec<u8> = Vec::new();
1026 for _ in 0..32 {
1027 let next = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
1028 let Ok(Some(Ok(bytes))) = next else {
1029 break;
1030 };
1031 buf.extend_from_slice(&bytes);
1032 if buf.windows(2).any(|w| w == b"\n\n") {
1033 break;
1034 }
1035 }
1036 String::from_utf8_lossy(&buf).to_string()
1037 }
1038
1039 #[tokio::test]
1040 async fn auth_token_blocks_requests_without_bearer_header() {
1041 let dir = tempfile::tempdir().expect("tempdir");
1042 let root_str = dir.path().to_string_lossy().to_string();
1043 let service_project_root = root_str.clone();
1044 let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
1045 Ok(LeanCtxServer::new_shared_with_context(
1046 &service_project_root,
1047 "default",
1048 "default",
1049 ))
1050 };
1051 let cfg = StreamableHttpServerConfig::default()
1052 .with_stateful_mode(false)
1053 .with_json_response(true);
1054
1055 let mcp_http = StreamableHttpService::new(
1056 service_factory,
1057 Arc::new(
1058 rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
1059 ),
1060 cfg,
1061 );
1062
1063 let state = AppState {
1064 token: Some("secret".to_string()),
1065 concurrency: Arc::new(tokio::sync::Semaphore::new(4)),
1066 rate: Arc::new(RateLimiter::new(50, 100)),
1067 project_root: root_str.clone(),
1068 timeout: Duration::from_secs(30),
1069 server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1070 };
1071
1072 let app = Router::new()
1073 .fallback_service(mcp_http)
1074 .layer(middleware::from_fn_with_state(
1075 state.clone(),
1076 auth_middleware,
1077 ))
1078 .with_state(state);
1079
1080 let body = json!({
1081 "jsonrpc": "2.0",
1082 "id": 1,
1083 "method": "tools/list",
1084 "params": {}
1085 })
1086 .to_string();
1087
1088 let req = Request::builder()
1089 .method("POST")
1090 .uri("/")
1091 .header("Host", "localhost")
1092 .header("Accept", "application/json, text/event-stream")
1093 .header("Content-Type", "application/json")
1094 .body(Body::from(body))
1095 .expect("request");
1096
1097 let resp = app.clone().oneshot(req).await.expect("resp");
1098 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1099 }
1100
1101 #[tokio::test]
1102 async fn mcp_service_factory_isolates_per_client_state() {
1103 let dir = tempfile::tempdir().expect("tempdir");
1104 let root_str = dir.path().to_string_lossy().to_string();
1105
1106 let service_project_root = root_str.clone();
1108 let service_factory = move || -> Result<LeanCtxServer, std::convert::Infallible> {
1109 Ok(LeanCtxServer::new_shared_with_context(
1110 &service_project_root,
1111 "default",
1112 "default",
1113 ))
1114 };
1115
1116 let s1 = service_factory().expect("server 1");
1117 let s2 = service_factory().expect("server 2");
1118
1119 *s1.client_name.write().await = "client-a".to_string();
1122 *s2.client_name.write().await = "client-b".to_string();
1123
1124 let a = s1.client_name.read().await.clone();
1125 let b = s2.client_name.read().await.clone();
1126 assert_eq!(a, "client-a");
1127 assert_eq!(b, "client-b");
1128 }
1129
1130 #[tokio::test]
1131 async fn rate_limit_returns_429_when_exhausted() {
1132 let state = AppState {
1133 token: None,
1134 concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1135 rate: Arc::new(RateLimiter::new(1, 1)),
1136 project_root: ".".to_string(),
1137 timeout: Duration::from_secs(30),
1138 server: LeanCtxServer::new_shared_with_context(".", "default", "default"),
1139 };
1140
1141 let app = Router::new()
1142 .route("/limited", get(|| async { (StatusCode::OK, "ok\n") }))
1143 .layer(middleware::from_fn_with_state(
1144 state.clone(),
1145 rate_limit_middleware,
1146 ))
1147 .with_state(state);
1148
1149 let req1 = Request::builder()
1150 .method("GET")
1151 .uri("/limited")
1152 .header("Host", "localhost")
1153 .body(Body::empty())
1154 .expect("req1");
1155 let resp1 = app.clone().oneshot(req1).await.expect("resp1");
1156 assert_eq!(resp1.status(), StatusCode::OK);
1157
1158 let req2 = Request::builder()
1159 .method("GET")
1160 .uri("/limited")
1161 .header("Host", "localhost")
1162 .body(Body::empty())
1163 .expect("req2");
1164 let resp2 = app.clone().oneshot(req2).await.expect("resp2");
1165 assert_eq!(resp2.status(), StatusCode::TOO_MANY_REQUESTS);
1166 }
1167
1168 #[tokio::test]
1169 async fn audit_events_endpoint_returns_json() {
1170 let dir = tempfile::tempdir().expect("tempdir");
1171 let root_str = dir.path().to_string_lossy().to_string();
1172
1173 let state = AppState {
1174 token: None,
1175 concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1176 rate: Arc::new(RateLimiter::new(50, 100)),
1177 project_root: root_str.clone(),
1178 timeout: Duration::from_secs(30),
1179 server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1180 };
1181
1182 let app = Router::new()
1183 .route("/v1/audit/events", get(v1_audit_events))
1184 .with_state(state);
1185
1186 let req = Request::builder()
1187 .method("GET")
1188 .uri("/v1/audit/events?limit=10")
1189 .header("Host", "localhost")
1190 .body(Body::empty())
1191 .unwrap();
1192
1193 let resp = app.oneshot(req).await.unwrap();
1194 assert_eq!(resp.status(), StatusCode::OK);
1195
1196 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1197 .await
1198 .unwrap();
1199 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1200 assert!(json.get("cross_project_events").unwrap().is_array());
1201 assert!(json.get("audit_trail").unwrap().is_array());
1202 }
1203
1204 #[tokio::test]
1205 async fn events_endpoint_replays_tool_call_event() {
1206 use crate::core::context_os::{self, ContextEventKindV1};
1207
1208 let dir = tempfile::tempdir().expect("tempdir");
1209 std::fs::create_dir_all(dir.path().join(".git")).expect("git marker");
1210 std::fs::write(dir.path().join("a.txt"), "ok").expect("file");
1211 let root_str = dir.path().to_string_lossy().to_string();
1212
1213 let state = AppState {
1214 token: None,
1215 concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1216 rate: Arc::new(RateLimiter::new(50, 100)),
1217 project_root: root_str.clone(),
1218 timeout: Duration::from_secs(30),
1219 server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1220 };
1221
1222 let app = Router::new()
1223 .route("/v1/events", get(v1_events))
1224 .with_state(state);
1225
1226 let rt = context_os::runtime();
1228 rt.bus.append(
1229 "ws1",
1230 "ch1",
1231 &ContextEventKindV1::ToolCallRecorded,
1232 Some("test-agent"),
1233 json!({"tool": "ctx_session", "action": "status"}),
1234 );
1235
1236 let req = Request::builder()
1237 .method("GET")
1238 .uri("/v1/events?workspaceId=ws1&channelId=ch1&since=0&limit=1")
1239 .header("Host", "localhost")
1240 .header("Accept", "text/event-stream")
1241 .body(Body::empty())
1242 .expect("req");
1243 let resp = app.clone().oneshot(req).await.expect("events");
1244 assert_eq!(resp.status(), StatusCode::OK);
1245
1246 let msg = read_first_sse_message(resp.into_body()).await;
1247 assert!(msg.contains("event: tool_call_recorded"), "msg={msg:?}");
1248 assert!(msg.contains("\"ws1\""), "msg={msg:?}");
1249 assert!(msg.contains("\"ch1\""), "msg={msg:?}");
1250 }
1251}