1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{anyhow, Context, Result};
6use axum::{
7 extract::Json,
8 extract::Query,
9 extract::State,
10 http::{header, Request, StatusCode},
11 middleware::{self, Next},
12 response::sse::{Event as SseEvent, KeepAlive, Sse},
13 response::{IntoResponse, Response},
14 routing::get,
15 Router,
16};
17use futures::Stream;
18use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
19use serde::Deserialize;
20use serde_json::Value;
21use tokio::sync::broadcast;
22use tokio::time::{Duration, Instant};
23
24use crate::core::context_os::ContextOsMetrics;
25use crate::engine::ContextEngine;
26use crate::tools::LeanCtxServer;
27
28pub mod context_views;
29
30#[cfg(feature = "team-server")]
31pub mod team;
32
33use std::pin::Pin;
35
36pub(crate) struct SseDisconnectGuard<I> {
37 pub(crate) inner: Pin<Box<dyn Stream<Item = I> + Send>>,
38 pub(crate) metrics: Arc<ContextOsMetrics>,
39}
40
41impl<I> Stream for SseDisconnectGuard<I> {
42 type Item = I;
43
44 fn poll_next(
45 mut self: Pin<&mut Self>,
46 cx: &mut std::task::Context<'_>,
47 ) -> std::task::Poll<Option<Self::Item>> {
48 self.inner.as_mut().poll_next(cx)
49 }
50}
51
52impl<I> Drop for SseDisconnectGuard<I> {
53 fn drop(&mut self) {
54 self.metrics.record_sse_disconnect();
55 }
56}
57
58const MAX_ID_LEN: usize = 64;
59
60fn sanitize_id(raw: &str) -> String {
61 let trimmed = raw.trim();
62 if trimmed.is_empty() {
63 return "default".to_string();
64 }
65 let cleaned: String = trimmed
66 .chars()
67 .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
68 .take(MAX_ID_LEN)
69 .collect();
70 if cleaned.is_empty() {
71 "default".to_string()
72 } else {
73 cleaned
74 }
75}
76
77#[derive(Clone, Debug)]
78pub struct HttpServerConfig {
79 pub host: String,
80 pub port: u16,
81 pub project_root: PathBuf,
82 pub auth_token: Option<String>,
83 pub stateful_mode: bool,
84 pub json_response: bool,
85 pub disable_host_check: bool,
86 pub allowed_hosts: Vec<String>,
87 pub max_body_bytes: usize,
88 pub max_concurrency: usize,
89 pub max_rps: u32,
90 pub rate_burst: u32,
91 pub request_timeout_ms: u64,
92}
93
94impl Default for HttpServerConfig {
95 fn default() -> Self {
96 let project_root = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
97 Self {
98 host: "127.0.0.1".to_string(),
99 port: 8080,
100 project_root,
101 auth_token: None,
102 stateful_mode: false,
103 json_response: true,
104 disable_host_check: false,
105 allowed_hosts: Vec::new(),
106 max_body_bytes: 2 * 1024 * 1024,
107 max_concurrency: 32,
108 max_rps: 50,
109 rate_burst: 100,
110 request_timeout_ms: 30_000,
111 }
112 }
113}
114
115impl HttpServerConfig {
116 pub fn validate(&self) -> Result<()> {
117 let host = self.host.trim().to_lowercase();
118 let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
119 if !is_loopback && self.auth_token.as_deref().unwrap_or("").is_empty() {
120 return Err(anyhow!(
121 "Refusing to bind to host='{host}' without auth. Provide --auth-token (or bind to 127.0.0.1)."
122 ));
123 }
124 Ok(())
125 }
126
127 fn mcp_http_config(&self) -> StreamableHttpServerConfig {
128 let mut cfg = StreamableHttpServerConfig::default()
129 .with_stateful_mode(self.stateful_mode)
130 .with_json_response(self.json_response);
131
132 if self.disable_host_check {
133 tracing::warn!(
134 "⚠ --disable-host-check is active: DNS rebinding protection is OFF. \
135 Do NOT use this in production or on non-loopback interfaces."
136 );
137 cfg = cfg.disable_allowed_hosts();
138 return cfg;
139 }
140
141 if !self.allowed_hosts.is_empty() {
142 cfg = cfg.with_allowed_hosts(self.allowed_hosts.clone());
143 return cfg;
144 }
145
146 let host = self.host.trim();
148 if host == "127.0.0.1" || host == "localhost" || host == "::1" {
149 cfg.allowed_hosts.push(host.to_string());
150 }
151
152 cfg
153 }
154}
155
156#[derive(Clone)]
157struct AppState {
158 token: Option<String>,
159 concurrency: Arc<tokio::sync::Semaphore>,
160 rate: Arc<RateLimiter>,
161 project_root: String,
162 timeout: Duration,
163 server: LeanCtxServer,
164}
165
166#[derive(Debug)]
167struct RateLimiter {
168 max_rps: f64,
169 burst: f64,
170 state: tokio::sync::Mutex<RateState>,
171}
172
173#[derive(Debug, Clone, Copy)]
174struct RateState {
175 tokens: f64,
176 last: Instant,
177}
178
179impl RateLimiter {
180 fn new(max_rps: u32, burst: u32) -> Self {
181 let now = Instant::now();
182 Self {
183 max_rps: (max_rps.max(1)) as f64,
184 burst: (burst.max(1)) as f64,
185 state: tokio::sync::Mutex::new(RateState {
186 tokens: (burst.max(1)) as f64,
187 last: now,
188 }),
189 }
190 }
191
192 async fn allow(&self) -> bool {
193 let mut s = self.state.lock().await;
194 let now = Instant::now();
195 let elapsed = now.saturating_duration_since(s.last);
196 let refill = elapsed.as_secs_f64() * self.max_rps;
197 s.tokens = (s.tokens + refill).min(self.burst);
198 s.last = now;
199 if s.tokens >= 1.0 {
200 s.tokens -= 1.0;
201 true
202 } else {
203 false
204 }
205 }
206}
207
208async fn auth_middleware(
209 State(state): State<AppState>,
210 req: Request<axum::body::Body>,
211 next: Next,
212) -> Response {
213 if state.token.is_none() {
214 return next.run(req).await;
215 }
216
217 if req.uri().path() == "/health" {
218 return next.run(req).await;
219 }
220
221 let expected = state.token.as_deref().unwrap_or("");
222 let Some(h) = req.headers().get(header::AUTHORIZATION) else {
223 return StatusCode::UNAUTHORIZED.into_response();
224 };
225 let Ok(s) = h.to_str() else {
226 return StatusCode::UNAUTHORIZED.into_response();
227 };
228 let Some(token) = s
229 .strip_prefix("Bearer ")
230 .or_else(|| s.strip_prefix("bearer "))
231 else {
232 return StatusCode::UNAUTHORIZED.into_response();
233 };
234 if !constant_time_eq(token.as_bytes(), expected.as_bytes()) {
235 return StatusCode::UNAUTHORIZED.into_response();
236 }
237
238 next.run(req).await
239}
240
241fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
242 use subtle::ConstantTimeEq;
243 if a.len() != b.len() {
244 return false;
245 }
246 bool::from(a.ct_eq(b))
247}
248
249async fn rate_limit_middleware(
250 State(state): State<AppState>,
251 req: Request<axum::body::Body>,
252 next: Next,
253) -> Response {
254 if !state.rate.allow().await {
255 return StatusCode::TOO_MANY_REQUESTS.into_response();
256 }
257 next.run(req).await
258}
259
260async fn concurrency_middleware(
261 State(state): State<AppState>,
262 req: Request<axum::body::Body>,
263 next: Next,
264) -> Response {
265 let Ok(permit) = state.concurrency.clone().try_acquire_owned() else {
266 return StatusCode::TOO_MANY_REQUESTS.into_response();
267 };
268 let resp = next.run(req).await;
269 drop(permit);
270 resp
271}
272
273async fn health() -> impl IntoResponse {
274 (StatusCode::OK, "ok\n")
275}
276
277async fn v1_shutdown() -> impl IntoResponse {
278 tokio::spawn(async {
279 tokio::time::sleep(Duration::from_millis(100)).await;
280 std::process::exit(0);
281 });
282 (StatusCode::OK, "shutting down\n")
283}
284
285#[derive(Debug, Deserialize)]
286#[serde(rename_all = "camelCase")]
287struct ToolCallBody {
288 name: String,
289 #[serde(default)]
290 arguments: Option<Value>,
291 #[serde(default)]
292 _workspace_id: Option<String>,
293 #[serde(default)]
294 _channel_id: Option<String>,
295}
296
297#[derive(Debug, Deserialize)]
298#[serde(rename_all = "camelCase")]
299struct EventsQuery {
300 #[serde(default)]
301 workspace_id: Option<String>,
302 #[serde(default)]
303 channel_id: Option<String>,
304 #[serde(default)]
305 since: Option<i64>,
306 #[serde(default)]
307 limit: Option<usize>,
308 #[serde(default)]
311 kind: Option<String>,
312}
313
314async fn v1_manifest(State(state): State<AppState>) -> impl IntoResponse {
315 let _ = state;
316 let v = crate::core::mcp_manifest::manifest_value();
317 (StatusCode::OK, Json(v))
318}
319
320#[derive(Debug, Deserialize)]
321#[serde(rename_all = "camelCase")]
322struct ToolsQuery {
323 #[serde(default)]
324 offset: Option<usize>,
325 #[serde(default)]
326 limit: Option<usize>,
327}
328
329async fn v1_tools(State(state): State<AppState>, Query(q): Query<ToolsQuery>) -> impl IntoResponse {
330 let _ = state;
331 let v = crate::core::mcp_manifest::manifest_value();
332 let tools = v
333 .get("tools")
334 .and_then(|t| t.get("granular"))
335 .cloned()
336 .unwrap_or(Value::Array(vec![]));
337
338 let all = tools.as_array().cloned().unwrap_or_default();
339 let total = all.len();
340 let offset = q.offset.unwrap_or(0).min(total);
341 let limit = q.limit.unwrap_or(200).min(500);
342 let page = all.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
343
344 (
345 StatusCode::OK,
346 Json(serde_json::json!({
347 "tools": page,
348 "total": total,
349 "offset": offset,
350 "limit": limit,
351 })),
352 )
353}
354
355async fn v1_tool_call(
356 State(state): State<AppState>,
357 Json(body): Json<ToolCallBody>,
358) -> impl IntoResponse {
359 let engine = ContextEngine::from_server(state.server.clone());
360 match tokio::time::timeout(
361 state.timeout,
362 engine.call_tool_value(&body.name, body.arguments),
363 )
364 .await
365 {
366 Ok(Ok(v)) => (StatusCode::OK, Json(serde_json::json!({ "result": v }))).into_response(),
367 Ok(Err(e)) => {
368 tracing::warn!("tool call error: {e}");
369 (
370 StatusCode::BAD_REQUEST,
371 Json(serde_json::json!({ "error": "tool_error", "code": "TOOL_ERROR" })),
372 )
373 .into_response()
374 }
375 Err(_) => (
376 StatusCode::GATEWAY_TIMEOUT,
377 Json(serde_json::json!({ "error": "request_timeout" })),
378 )
379 .into_response(),
380 }
381}
382
383async fn v1_events(
384 State(state): State<AppState>,
385 Query(q): Query<EventsQuery>,
386) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
387 use crate::core::context_os::{redact_event_payload, ContextEventV1, RedactionLevel};
388
389 let ws = sanitize_id(&q.workspace_id.unwrap_or_else(|| "default".to_string()));
390 let ch = sanitize_id(&q.channel_id.unwrap_or_else(|| "default".to_string()));
391 let _ = &state.project_root;
392 let since = q.since.unwrap_or(0);
393 let limit = q.limit.unwrap_or(200).min(1000);
394 let redaction = RedactionLevel::RefsOnly;
395
396 let kind_filter: Option<Vec<String>> = q
397 .kind
398 .as_deref()
399 .map(|k| k.split(',').map(|s| s.trim().to_string()).collect());
400
401 let rt = crate::core::context_os::runtime();
402 let replay = rt.bus.read(&ws, &ch, since, limit);
403
404 let replay = if let Some(ref kinds) = kind_filter {
405 replay
406 .into_iter()
407 .filter(|ev| kinds.contains(&ev.kind))
408 .collect()
409 } else {
410 replay
411 };
412
413 let rx = if let Some(ref kinds) = kind_filter {
414 let kind_refs: Vec<&str> = kinds.iter().map(String::as_str).collect();
415 let filter = crate::core::context_os::TopicFilter::kinds(&kind_refs);
416 if let Some(sub) = rt.bus.subscribe_filtered(&ws, &ch, filter) {
417 crate::core::context_os::SubscriptionKind::Filtered(sub)
418 } else {
419 tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
420 let (_, rx) = broadcast::channel::<ContextEventV1>(1);
421 crate::core::context_os::SubscriptionKind::Unfiltered(rx)
422 }
423 } else if let Some(sub) = rt.bus.subscribe(&ws, &ch) {
424 crate::core::context_os::SubscriptionKind::Unfiltered(sub)
425 } else {
426 tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
427 let (_, rx) = broadcast::channel::<ContextEventV1>(1);
428 crate::core::context_os::SubscriptionKind::Unfiltered(rx)
429 };
430
431 rt.metrics.record_sse_connect();
432 rt.metrics.record_events_replayed(replay.len() as u64);
433 rt.metrics.record_workspace_active(&ws);
434
435 let bus = rt.bus.clone();
436 let metrics = rt.metrics.clone();
437 let pending: std::collections::VecDeque<ContextEventV1> = replay.into();
438
439 let stream = futures::stream::unfold(
440 (
441 pending,
442 rx,
443 ws.clone(),
444 ch.clone(),
445 since,
446 redaction,
447 bus,
448 metrics,
449 ),
450 |(mut pending, mut rx, ws, ch, mut last_id, redaction, bus, metrics)| async move {
451 if let Some(mut ev) = pending.pop_front() {
452 last_id = ev.id;
453 redact_event_payload(&mut ev, redaction);
454 let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
455 let evt = SseEvent::default()
456 .id(ev.id.to_string())
457 .event(ev.kind)
458 .data(data);
459 return Some((
460 Ok(evt),
461 (pending, rx, ws, ch, last_id, redaction, bus, metrics),
462 ));
463 }
464
465 loop {
466 match rx.recv().await {
467 Ok(mut ev) if ev.id > last_id => {
468 last_id = ev.id;
469 redact_event_payload(&mut ev, redaction);
470 let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
471 let evt = SseEvent::default()
472 .id(ev.id.to_string())
473 .event(ev.kind)
474 .data(data);
475 return Some((
476 Ok(evt),
477 (pending, rx, ws, ch, last_id, redaction, bus, metrics),
478 ));
479 }
480 Ok(_) => {}
481 Err(broadcast::error::RecvError::Closed) => return None,
482 Err(broadcast::error::RecvError::Lagged(skipped)) => {
483 let missed = bus.read(&ws, &ch, last_id, skipped as usize);
484 metrics.record_events_replayed(missed.len() as u64);
485 for ev in missed {
486 last_id = last_id.max(ev.id);
487 pending.push_back(ev);
488 }
489 }
490 }
491 }
492 },
493 );
494
495 let metrics_ref = rt.metrics.clone();
496 let guarded = SseDisconnectGuard {
497 inner: Box::pin(stream),
498 metrics: metrics_ref,
499 };
500
501 Sse::new(guarded).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
502}
503
504#[derive(Debug, Deserialize)]
505struct AuditEventsQuery {
506 #[serde(default = "default_audit_limit")]
507 limit: usize,
508}
509
510fn default_audit_limit() -> usize {
511 100
512}
513
514async fn v1_audit_events(Query(q): Query<AuditEventsQuery>) -> impl IntoResponse {
515 let capped = q.limit.min(1000);
516 let boundary_events = crate::core::memory_boundary::load_audit_events(capped);
517 let trail_events = crate::core::audit_trail::load_recent(capped);
518
519 Json(serde_json::json!({
520 "cross_project_events": boundary_events,
521 "audit_trail": trail_events,
522 }))
523}
524
525async fn v1_metrics(State(_state): State<AppState>) -> impl IntoResponse {
526 let rt = crate::core::context_os::runtime();
527 let snap = rt.metrics.snapshot();
528 (
529 StatusCode::OK,
530 Json(serde_json::to_value(snap).unwrap_or_default()),
531 )
532}
533
534const MAX_HANDOFF_PAYLOAD_BYTES: usize = 1_000_000;
535const MAX_HANDOFF_FILES: usize = 50;
536
537async fn v1_a2a_handoff(
538 State(state): State<AppState>,
539 Json(body): Json<Value>,
540) -> impl IntoResponse {
541 let envelope = match crate::core::a2a_transport::parse_envelope(
542 &serde_json::to_string(&body).unwrap_or_default(),
543 ) {
544 Ok(env) => env,
545 Err(e) => {
546 tracing::warn!("a2a handoff parse error: {e}");
547 return (
548 StatusCode::BAD_REQUEST,
549 Json(serde_json::json!({"error": "invalid_envelope"})),
550 );
551 }
552 };
553
554 if envelope.payload_json.len() > MAX_HANDOFF_PAYLOAD_BYTES {
555 tracing::warn!(
556 "a2a handoff payload too large: {} bytes (limit {MAX_HANDOFF_PAYLOAD_BYTES})",
557 envelope.payload_json.len()
558 );
559 return (
560 StatusCode::PAYLOAD_TOO_LARGE,
561 Json(serde_json::json!({"error": "payload_too_large"})),
562 );
563 }
564
565 let rt = crate::core::context_os::runtime();
566 rt.bus.append(
567 &state.project_root,
568 "a2a",
569 &crate::core::context_os::ContextEventKindV1::SessionMutated,
570 Some(&envelope.sender.agent_id),
571 serde_json::json!({
572 "type": "handoff_received",
573 "content_type": format!("{:?}", envelope.content_type),
574 "sender": envelope.sender.agent_id,
575 "payload_size": envelope.payload_json.len(),
576 }),
577 );
578
579 match envelope.content_type {
580 crate::core::a2a_transport::TransportContentType::ContextPackage => {
581 let dir = std::path::Path::new(&state.project_root)
582 .join(".lean-ctx")
583 .join("handoffs")
584 .join("packages");
585 let _ = std::fs::create_dir_all(&dir);
586 evict_oldest_files(&dir, MAX_HANDOFF_FILES);
587 let out = dir.join(format!(
588 "ctx-{}.lctxpkg",
589 chrono::Utc::now().format("%Y%m%d_%H%M%S")
590 ));
591 if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
592 tracing::error!("a2a handoff write failed: {e}");
593 return (
594 StatusCode::INTERNAL_SERVER_ERROR,
595 Json(serde_json::json!({"error": "write_failed"})),
596 );
597 }
598 (
599 StatusCode::OK,
600 Json(serde_json::json!({
601 "status": "received",
602 "content_type": "context_package",
603 })),
604 )
605 }
606 crate::core::a2a_transport::TransportContentType::HandoffBundle => {
607 let dir = std::path::Path::new(&state.project_root)
608 .join(".lean-ctx")
609 .join("handoffs");
610 let _ = std::fs::create_dir_all(&dir);
611 evict_oldest_files(&dir, MAX_HANDOFF_FILES);
612 let out = dir.join(format!(
613 "received-{}.json",
614 chrono::Utc::now().format("%Y%m%d_%H%M%S")
615 ));
616 if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
617 tracing::error!("a2a handoff write failed: {e}");
618 return (
619 StatusCode::INTERNAL_SERVER_ERROR,
620 Json(serde_json::json!({"error": "write_failed"})),
621 );
622 }
623 (
624 StatusCode::OK,
625 Json(serde_json::json!({
626 "status": "received",
627 "content_type": "handoff_bundle",
628 })),
629 )
630 }
631 _ => (
632 StatusCode::OK,
633 Json(serde_json::json!({
634 "status": "received",
635 "content_type": format!("{:?}", envelope.content_type),
636 })),
637 ),
638 }
639}
640
641fn evict_oldest_files(dir: &std::path::Path, max_files: usize) {
642 let Ok(entries) = std::fs::read_dir(dir) else {
643 return;
644 };
645 let mut files: Vec<(std::time::SystemTime, std::path::PathBuf)> = entries
646 .filter_map(|e| {
647 let e = e.ok()?;
648 let meta = e.metadata().ok()?;
649 if meta.is_file() {
650 Some((meta.modified().unwrap_or(std::time::UNIX_EPOCH), e.path()))
651 } else {
652 None
653 }
654 })
655 .collect();
656
657 if files.len() < max_files {
658 return;
659 }
660 files.sort_by_key(|(mtime, _)| *mtime);
661 let to_remove = files.len().saturating_sub(max_files.saturating_sub(1));
662 for (_, path) in files.into_iter().take(to_remove) {
663 let _ = std::fs::remove_file(path);
664 }
665}
666
667async fn a2a_jsonrpc(Json(body): Json<Value>) -> impl IntoResponse {
668 let req: crate::core::a2a::a2a_compat::JsonRpcRequest = match serde_json::from_value(body) {
669 Ok(r) => r,
670 Err(e) => {
671 tracing::debug!("a2a JSON-RPC parse error: {e}");
672 return (
673 StatusCode::BAD_REQUEST,
674 Json(serde_json::json!({
675 "jsonrpc": "2.0",
676 "id": null,
677 "error": {"code": -32700, "message": "invalid request"}
678 })),
679 );
680 }
681 };
682 let resp = crate::core::a2a::a2a_compat::handle_a2a_jsonrpc(&req);
683 let json = serde_json::to_value(resp).unwrap_or_default();
684 (StatusCode::OK, Json(json))
685}
686
687async fn v1_a2a_agent_card(State(state): State<AppState>) -> impl IntoResponse {
688 let card = crate::core::a2a::agent_card::build_agent_card(&state.project_root);
689 (
690 StatusCode::OK,
691 [(header::CONTENT_TYPE, "application/json")],
692 Json(card),
693 )
694}
695
696async fn mcp_server_card() -> impl IntoResponse {
697 let card = serde_json::json!({
698 "name": "lean-ctx",
699 "version": env!("CARGO_PKG_VERSION"),
700 "description": "Context Infrastructure Layer — compression, caching, governance for AI agents",
701 "capabilities": {
702 "tools": true,
703 "resources": false,
704 "prompts": false,
705 "sampling": false
706 },
707 "tool_categories": [
708 {"name": "file_operations", "tools": ["ctx_read", "ctx_search", "ctx_tree", "ctx_edit"], "avg_token_cost": 150},
709 {"name": "session_management", "tools": ["ctx_session", "ctx_compress", "ctx_dedup", "ctx_preload"], "avg_token_cost": 80},
710 {"name": "intelligence", "tools": ["ctx_knowledge", "ctx_semantic_search", "ctx_graph", "ctx_overview"], "avg_token_cost": 200},
711 {"name": "agent_ops", "tools": ["ctx_agent", "ctx_handoff", "ctx_task", "ctx_share"], "avg_token_cost": 120}
712 ],
713 "features": {
714 "compression": "deterministic AST-based, 40-70% token reduction",
715 "caching": "session-scoped with zstd, re-reads ~13 tokens",
716 "audit_trail": "SHA-256 chained JSONL",
717 "rbac": "5 built-in roles with capability-based access",
718 "sandboxing": "Level 0 (subprocess) + Level 1 (OS-level)",
719 "secret_detection": "8 regex patterns + custom"
720 },
721 "security": {
722 "path_jail": true,
723 "rate_limiting": true,
724 "budget_tracking": true,
725 "signed_handoffs": true,
726 "timing_safe_auth": true
727 }
728 });
729 Json(card)
730}
731
732async fn v1_agents_register(
733 State(state): State<AppState>,
734 Json(body): Json<Value>,
735) -> impl IntoResponse {
736 let agent_type = body
737 .get("agent_type")
738 .and_then(|v| v.as_str())
739 .unwrap_or("unknown");
740 let role = body.get("role").and_then(|v| v.as_str());
741 let project_root = body
742 .get("project_root")
743 .and_then(|v| v.as_str())
744 .unwrap_or(&state.project_root);
745
746 let mut registry = crate::core::agents::AgentRegistry::load_or_create();
747 let agent_id = registry.register(agent_type, role, project_root);
748 let _ = registry.save();
749
750 Json(serde_json::json!({
751 "agent_id": agent_id,
752 "status": "registered"
753 }))
754}
755
756async fn v1_agents_heartbeat(Json(body): Json<Value>) -> impl IntoResponse {
757 let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
758 let mut registry = crate::core::agents::AgentRegistry::load_or_create();
759 registry.update_heartbeat(agent_id);
760 let _ = registry.save();
761 Json(serde_json::json!({"status": "ok"}))
762}
763
764async fn v1_agents_list() -> impl IntoResponse {
765 let registry = crate::core::agents::AgentRegistry::load_or_create();
766 let active = registry.list_active(None);
767 Json(serde_json::json!({
768 "agents": active.iter().map(|a| serde_json::json!({
769 "agent_id": a.agent_id,
770 "agent_type": a.agent_type,
771 "role": a.role,
772 "status": a.status.to_string(),
773 "last_active": a.last_active.to_rfc3339(),
774 })).collect::<Vec<_>>()
775 }))
776}
777
778async fn v1_agents_deregister(Json(body): Json<Value>) -> impl IntoResponse {
779 let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
780 let mut registry = crate::core::agents::AgentRegistry::load_or_create();
781 registry.set_status(
782 agent_id,
783 crate::core::agents::AgentStatus::Finished,
784 Some("deregistered via API"),
785 );
786 let _ = registry.save();
787 Json(serde_json::json!({"status": "deregistered"}))
788}
789
790async fn v1_agents_events_sse(
791) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
792 let stream = futures::stream::unfold(0usize, |last_count| async move {
793 loop {
794 tokio::time::sleep(Duration::from_secs(5)).await;
795 let registry = crate::core::agents::AgentRegistry::load_or_create();
796 let active = registry.list_active(None);
797 let count = active.len();
798 if count != last_count {
799 let data = serde_json::json!({
800 "type": "agents_changed",
801 "active_count": count,
802 "agents": active.iter().map(|a| &a.agent_id).collect::<Vec<_>>(),
803 });
804 return Some((
805 Ok::<_, std::convert::Infallible>(SseEvent::default().data(data.to_string())),
806 count,
807 ));
808 }
809 }
810 });
811
812 Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
813}
814
815fn build_app_router(cfg: &HttpServerConfig) -> Router {
816 let project_root = cfg.project_root.to_string_lossy().to_string();
817 let service_project_root = project_root.clone();
818 let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
819 Ok(LeanCtxServer::new_shared_with_context(
820 &service_project_root,
821 "default",
822 "default",
823 ))
824 };
825 let mcp_http = StreamableHttpService::new(
826 service_factory,
827 Arc::new(
828 rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
829 ),
830 cfg.mcp_http_config(),
831 );
832
833 let rest_server = LeanCtxServer::new_shared_with_context(&project_root, "default", "default");
834
835 let state = AppState {
836 token: cfg.auth_token.clone().filter(|t| !t.is_empty()),
837 concurrency: Arc::new(tokio::sync::Semaphore::new(cfg.max_concurrency.max(1))),
838 rate: Arc::new(RateLimiter::new(cfg.max_rps, cfg.rate_burst)),
839 project_root,
840 timeout: Duration::from_millis(cfg.request_timeout_ms.max(1)),
841 server: rest_server,
842 };
843
844 Router::new()
845 .route("/health", get(health))
846 .route("/v1/shutdown", axum::routing::post(v1_shutdown))
847 .route("/v1/manifest", get(v1_manifest))
848 .route("/v1/tools", get(v1_tools))
849 .route("/v1/tools/call", axum::routing::post(v1_tool_call))
850 .route("/v1/events", get(v1_events))
851 .route(
852 "/v1/context/summary",
853 get(context_views::v1_context_summary),
854 )
855 .route("/v1/events/search", get(context_views::v1_events_search))
856 .route("/v1/events/lineage", get(context_views::v1_event_lineage))
857 .route("/v1/metrics", get(v1_metrics))
858 .route("/v1/audit/events", get(v1_audit_events))
859 .route("/v1/a2a/handoff", axum::routing::post(v1_a2a_handoff))
860 .route("/v1/a2a/agent-card", get(v1_a2a_agent_card))
861 .route("/.well-known/agent.json", get(v1_a2a_agent_card))
862 .route("/.well-known/mcp-server.json", get(mcp_server_card))
863 .route("/a2a", axum::routing::post(a2a_jsonrpc))
864 .route(
865 "/v1/agents/register",
866 axum::routing::post(v1_agents_register),
867 )
868 .route(
869 "/v1/agents/heartbeat",
870 axum::routing::post(v1_agents_heartbeat),
871 )
872 .route("/v1/agents/list", get(v1_agents_list))
873 .route(
874 "/v1/agents/deregister",
875 axum::routing::post(v1_agents_deregister),
876 )
877 .route("/v1/agents/events", get(v1_agents_events_sse))
878 .fallback_service(mcp_http)
879 .layer(axum::extract::DefaultBodyLimit::max(cfg.max_body_bytes))
880 .layer(middleware::from_fn_with_state(
881 state.clone(),
882 rate_limit_middleware,
883 ))
884 .layer(middleware::from_fn_with_state(
885 state.clone(),
886 concurrency_middleware,
887 ))
888 .layer(middleware::from_fn_with_state(
889 state.clone(),
890 auth_middleware,
891 ))
892 .with_state(state)
893}
894
895pub async fn serve(cfg: HttpServerConfig) -> Result<()> {
896 crate::core::protocol::set_mcp_context(true);
897 cfg.validate()?;
898
899 let addr: SocketAddr = format!("{}:{}", cfg.host, cfg.port)
900 .parse()
901 .context("invalid host/port")?;
902
903 let app = build_app_router(&cfg);
904
905 let listener = tokio::net::TcpListener::bind(addr)
906 .await
907 .with_context(|| format!("bind {addr}"))?;
908
909 tracing::info!(
910 "lean-ctx Streamable HTTP server listening on http://{addr} (project_root={})",
911 cfg.project_root.display()
912 );
913
914 axum::serve(listener, app)
915 .with_graceful_shutdown(async move {
916 let _ = tokio::signal::ctrl_c().await;
917 })
918 .await
919 .context("http server")?;
920 Ok(())
921}
922
923#[cfg(windows)]
924impl axum::serve::Listener for crate::ipc::NamedPipeListener {
925 type Io = tokio::net::windows::named_pipe::NamedPipeServer;
926 type Addr = String;
927
928 async fn accept(&mut self) -> (Self::Io, Self::Addr) {
929 loop {
930 match self.accept_pipe().await {
931 Ok(pipe) => return (pipe, self.name().to_string()),
932 Err(e) => {
933 tracing::error!("named pipe accept error: {e}");
934 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
935 }
936 }
937 }
938 }
939
940 fn local_addr(&self) -> std::io::Result<Self::Addr> {
941 Ok(self.name().to_string())
942 }
943}
944
945pub async fn serve_ipc(cfg: HttpServerConfig, addr: crate::ipc::DaemonAddr) -> Result<()> {
948 cfg.validate()?;
949
950 match addr {
951 #[cfg(unix)]
952 crate::ipc::DaemonAddr::Unix(ref path) => {
953 let app = build_app_router(&cfg);
954 let listener = crate::ipc::bind_listener(&addr)?;
955
956 tracing::info!(
957 "lean-ctx daemon listening on {} (project_root={})",
958 path.display(),
959 cfg.project_root.display()
960 );
961
962 axum::serve(listener, app.into_make_service())
963 .with_graceful_shutdown(async move {
964 let _ = tokio::signal::ctrl_c().await;
965 })
966 .await
967 .context("ipc server")?;
968 Ok(())
969 }
970 #[cfg(windows)]
971 crate::ipc::DaemonAddr::NamedPipe(ref name) => {
972 let app = build_app_router(&cfg);
973 let listener = crate::ipc::bind_listener(&addr)?;
974
975 tracing::info!(
976 "lean-ctx daemon listening on {} (project_root={})",
977 name,
978 cfg.project_root.display()
979 );
980
981 axum::serve(listener, app.into_make_service())
982 .with_graceful_shutdown(async move {
983 let _ = tokio::signal::ctrl_c().await;
984 })
985 .await
986 .context("ipc server")?;
987 Ok(())
988 }
989 }
990}
991
992#[cfg(test)]
993mod tests {
994 use super::*;
995 use axum::body::Body;
996 use axum::http::Request;
997 use futures::StreamExt;
998 use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
999 use serde_json::json;
1000 use tower::ServiceExt;
1001
1002 async fn read_first_sse_message(body: Body) -> String {
1003 let mut stream = body.into_data_stream();
1004 let mut buf: Vec<u8> = Vec::new();
1005 for _ in 0..32 {
1006 let next = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
1007 let Ok(Some(Ok(bytes))) = next else {
1008 break;
1009 };
1010 buf.extend_from_slice(&bytes);
1011 if buf.windows(2).any(|w| w == b"\n\n") {
1012 break;
1013 }
1014 }
1015 String::from_utf8_lossy(&buf).to_string()
1016 }
1017
1018 #[tokio::test]
1019 async fn auth_token_blocks_requests_without_bearer_header() {
1020 let dir = tempfile::tempdir().expect("tempdir");
1021 let root_str = dir.path().to_string_lossy().to_string();
1022 let service_project_root = root_str.clone();
1023 let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
1024 Ok(LeanCtxServer::new_shared_with_context(
1025 &service_project_root,
1026 "default",
1027 "default",
1028 ))
1029 };
1030 let cfg = StreamableHttpServerConfig::default()
1031 .with_stateful_mode(false)
1032 .with_json_response(true);
1033
1034 let mcp_http = StreamableHttpService::new(
1035 service_factory,
1036 Arc::new(
1037 rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
1038 ),
1039 cfg,
1040 );
1041
1042 let state = AppState {
1043 token: Some("secret".to_string()),
1044 concurrency: Arc::new(tokio::sync::Semaphore::new(4)),
1045 rate: Arc::new(RateLimiter::new(50, 100)),
1046 project_root: root_str.clone(),
1047 timeout: Duration::from_secs(30),
1048 server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1049 };
1050
1051 let app = Router::new()
1052 .fallback_service(mcp_http)
1053 .layer(middleware::from_fn_with_state(
1054 state.clone(),
1055 auth_middleware,
1056 ))
1057 .with_state(state);
1058
1059 let body = json!({
1060 "jsonrpc": "2.0",
1061 "id": 1,
1062 "method": "tools/list",
1063 "params": {}
1064 })
1065 .to_string();
1066
1067 let req = Request::builder()
1068 .method("POST")
1069 .uri("/")
1070 .header("Host", "localhost")
1071 .header("Accept", "application/json, text/event-stream")
1072 .header("Content-Type", "application/json")
1073 .body(Body::from(body))
1074 .expect("request");
1075
1076 let resp = app.clone().oneshot(req).await.expect("resp");
1077 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1078 }
1079
1080 #[tokio::test]
1081 async fn mcp_service_factory_isolates_per_client_state() {
1082 let dir = tempfile::tempdir().expect("tempdir");
1083 let root_str = dir.path().to_string_lossy().to_string();
1084
1085 let service_project_root = root_str.clone();
1087 let service_factory = move || -> Result<LeanCtxServer, std::convert::Infallible> {
1088 Ok(LeanCtxServer::new_shared_with_context(
1089 &service_project_root,
1090 "default",
1091 "default",
1092 ))
1093 };
1094
1095 let s1 = service_factory().expect("server 1");
1096 let s2 = service_factory().expect("server 2");
1097
1098 *s1.client_name.write().await = "client-a".to_string();
1101 *s2.client_name.write().await = "client-b".to_string();
1102
1103 let a = s1.client_name.read().await.clone();
1104 let b = s2.client_name.read().await.clone();
1105 assert_eq!(a, "client-a");
1106 assert_eq!(b, "client-b");
1107 }
1108
1109 #[tokio::test]
1110 async fn rate_limit_returns_429_when_exhausted() {
1111 let state = AppState {
1112 token: None,
1113 concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1114 rate: Arc::new(RateLimiter::new(1, 1)),
1115 project_root: ".".to_string(),
1116 timeout: Duration::from_secs(30),
1117 server: LeanCtxServer::new_shared_with_context(".", "default", "default"),
1118 };
1119
1120 let app = Router::new()
1121 .route("/limited", get(|| async { (StatusCode::OK, "ok\n") }))
1122 .layer(middleware::from_fn_with_state(
1123 state.clone(),
1124 rate_limit_middleware,
1125 ))
1126 .with_state(state);
1127
1128 let req1 = Request::builder()
1129 .method("GET")
1130 .uri("/limited")
1131 .header("Host", "localhost")
1132 .body(Body::empty())
1133 .expect("req1");
1134 let resp1 = app.clone().oneshot(req1).await.expect("resp1");
1135 assert_eq!(resp1.status(), StatusCode::OK);
1136
1137 let req2 = Request::builder()
1138 .method("GET")
1139 .uri("/limited")
1140 .header("Host", "localhost")
1141 .body(Body::empty())
1142 .expect("req2");
1143 let resp2 = app.clone().oneshot(req2).await.expect("resp2");
1144 assert_eq!(resp2.status(), StatusCode::TOO_MANY_REQUESTS);
1145 }
1146
1147 #[tokio::test]
1148 async fn audit_events_endpoint_returns_json() {
1149 let dir = tempfile::tempdir().expect("tempdir");
1150 let root_str = dir.path().to_string_lossy().to_string();
1151
1152 let state = AppState {
1153 token: None,
1154 concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1155 rate: Arc::new(RateLimiter::new(50, 100)),
1156 project_root: root_str.clone(),
1157 timeout: Duration::from_secs(30),
1158 server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1159 };
1160
1161 let app = Router::new()
1162 .route("/v1/audit/events", get(v1_audit_events))
1163 .with_state(state);
1164
1165 let req = Request::builder()
1166 .method("GET")
1167 .uri("/v1/audit/events?limit=10")
1168 .header("Host", "localhost")
1169 .body(Body::empty())
1170 .unwrap();
1171
1172 let resp = app.oneshot(req).await.unwrap();
1173 assert_eq!(resp.status(), StatusCode::OK);
1174
1175 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1176 .await
1177 .unwrap();
1178 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1179 assert!(json.get("cross_project_events").unwrap().is_array());
1180 assert!(json.get("audit_trail").unwrap().is_array());
1181 }
1182
1183 #[tokio::test]
1184 async fn events_endpoint_replays_tool_call_event() {
1185 use crate::core::context_os::{self, ContextEventKindV1};
1186
1187 let dir = tempfile::tempdir().expect("tempdir");
1188 std::fs::create_dir_all(dir.path().join(".git")).expect("git marker");
1189 std::fs::write(dir.path().join("a.txt"), "ok").expect("file");
1190 let root_str = dir.path().to_string_lossy().to_string();
1191
1192 let state = AppState {
1193 token: None,
1194 concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1195 rate: Arc::new(RateLimiter::new(50, 100)),
1196 project_root: root_str.clone(),
1197 timeout: Duration::from_secs(30),
1198 server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1199 };
1200
1201 let app = Router::new()
1202 .route("/v1/events", get(v1_events))
1203 .with_state(state);
1204
1205 let rt = context_os::runtime();
1207 rt.bus.append(
1208 "ws1",
1209 "ch1",
1210 &ContextEventKindV1::ToolCallRecorded,
1211 Some("test-agent"),
1212 json!({"tool": "ctx_session", "action": "status"}),
1213 );
1214
1215 let req = Request::builder()
1216 .method("GET")
1217 .uri("/v1/events?workspaceId=ws1&channelId=ch1&since=0&limit=1")
1218 .header("Host", "localhost")
1219 .header("Accept", "text/event-stream")
1220 .body(Body::empty())
1221 .expect("req");
1222 let resp = app.clone().oneshot(req).await.expect("events");
1223 assert_eq!(resp.status(), StatusCode::OK);
1224
1225 let msg = read_first_sse_message(resp.into_body()).await;
1226 assert!(msg.contains("event: tool_call_recorded"), "msg={msg:?}");
1227 assert!(msg.contains("\"ws1\""), "msg={msg:?}");
1228 assert!(msg.contains("\"ch1\""), "msg={msg:?}");
1229 }
1230}