1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{anyhow, Context, Result};
6use axum::{
7 extract::Json,
8 extract::Query,
9 extract::State,
10 http::{header, Request, StatusCode},
11 middleware::{self, Next},
12 response::sse::{Event as SseEvent, KeepAlive, Sse},
13 response::{IntoResponse, Response},
14 routing::get,
15 Router,
16};
17use futures::Stream;
18use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
19use serde::Deserialize;
20use serde_json::Value;
21use tokio::sync::broadcast;
22use tokio::time::{Duration, Instant};
23
24use crate::core::context_os::ContextOsMetrics;
25use crate::engine::ContextEngine;
26use crate::tools::LeanCtxServer;
27
28pub mod context_views;
29pub mod savings_ingest;
30pub mod team;
31
32use std::pin::Pin;
34
35pub(crate) struct SseDisconnectGuard<I> {
36 pub(crate) inner: Pin<Box<dyn Stream<Item = I> + Send>>,
37 pub(crate) metrics: Arc<ContextOsMetrics>,
38}
39
40impl<I> Stream for SseDisconnectGuard<I> {
41 type Item = I;
42
43 fn poll_next(
44 mut self: Pin<&mut Self>,
45 cx: &mut std::task::Context<'_>,
46 ) -> std::task::Poll<Option<Self::Item>> {
47 self.inner.as_mut().poll_next(cx)
48 }
49}
50
51impl<I> Drop for SseDisconnectGuard<I> {
52 fn drop(&mut self) {
53 self.metrics.record_sse_disconnect();
54 }
55}
56
57const MAX_ID_LEN: usize = 64;
58
59fn sanitize_id(raw: &str) -> String {
60 let trimmed = raw.trim();
61 if trimmed.is_empty() {
62 return "default".to_string();
63 }
64 let cleaned: String = trimmed
65 .chars()
66 .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
67 .take(MAX_ID_LEN)
68 .collect();
69 if cleaned.is_empty() {
70 "default".to_string()
71 } else {
72 cleaned
73 }
74}
75
76#[derive(Clone, Debug)]
77pub struct HttpServerConfig {
78 pub host: String,
79 pub port: u16,
80 pub project_root: PathBuf,
81 pub auth_token: Option<String>,
82 pub stateful_mode: bool,
83 pub json_response: bool,
84 pub disable_host_check: bool,
85 pub allowed_hosts: Vec<String>,
86 pub max_body_bytes: usize,
87 pub max_concurrency: usize,
88 pub max_rps: u32,
89 pub rate_burst: u32,
90 pub request_timeout_ms: u64,
91}
92
93impl Default for HttpServerConfig {
94 fn default() -> Self {
95 let project_root = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
96 Self {
97 host: "127.0.0.1".to_string(),
98 port: 8080,
99 project_root,
100 auth_token: None,
101 stateful_mode: false,
102 json_response: true,
103 disable_host_check: false,
104 allowed_hosts: Vec::new(),
105 max_body_bytes: 2 * 1024 * 1024,
106 max_concurrency: 32,
107 max_rps: 50,
108 rate_burst: 100,
109 request_timeout_ms: 30_000,
110 }
111 }
112}
113
114impl HttpServerConfig {
115 pub fn validate(&self) -> Result<()> {
116 let host = self.host.trim().to_lowercase();
117 let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
118 if !is_loopback && self.auth_token.as_deref().unwrap_or("").is_empty() {
119 return Err(anyhow!(
120 "Refusing to bind to host='{host}' without auth. Provide --auth-token (or bind to 127.0.0.1)."
121 ));
122 }
123 Ok(())
124 }
125
126 pub fn effective_auth_token(&self) -> Option<String> {
127 if let Some(ref token) = self.auth_token {
128 if !token.is_empty() {
129 return Some(token.clone());
130 }
131 }
132 let host = self.host.trim().to_lowercase();
133 let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
134 if is_loopback {
135 let auto_token = crate::core::session_token::generate_token();
136 eprintln!(
137 "[lean-ctx] Auto-generated auth token for loopback: {auto_token}\n\
138 Pass as Bearer token or set --auth-token explicitly."
139 );
140 Some(auto_token)
141 } else {
142 None
143 }
144 }
145
146 fn mcp_http_config(&self) -> StreamableHttpServerConfig {
147 let mut cfg = StreamableHttpServerConfig::default()
148 .with_stateful_mode(self.stateful_mode)
149 .with_json_response(self.json_response);
150
151 if self.disable_host_check {
152 tracing::warn!(
153 "⚠ --disable-host-check is active: DNS rebinding protection is OFF. \
154 Do NOT use this in production or on non-loopback interfaces."
155 );
156 cfg = cfg.disable_allowed_hosts();
157 return cfg;
158 }
159
160 if !self.allowed_hosts.is_empty() {
161 cfg = cfg.with_allowed_hosts(self.allowed_hosts.clone());
162 return cfg;
163 }
164
165 let host = self.host.trim();
167 if host == "127.0.0.1" || host == "localhost" || host == "::1" {
168 cfg.allowed_hosts.push(host.to_string());
169 }
170
171 cfg
172 }
173}
174
175#[derive(Clone)]
176struct AppState {
177 token: Option<String>,
178 concurrency: Arc<tokio::sync::Semaphore>,
179 rate: Arc<RateLimiter>,
180 project_root: String,
181 timeout: Duration,
182 server: LeanCtxServer,
183}
184
185#[derive(Debug)]
186struct RateLimiter {
187 max_rps: f64,
188 burst: f64,
189 state: tokio::sync::Mutex<RateState>,
190}
191
192#[derive(Debug, Clone, Copy)]
193struct RateState {
194 tokens: f64,
195 last: Instant,
196}
197
198impl RateLimiter {
199 fn new(max_rps: u32, burst: u32) -> Self {
200 let now = Instant::now();
201 Self {
202 max_rps: (max_rps.max(1)) as f64,
203 burst: (burst.max(1)) as f64,
204 state: tokio::sync::Mutex::new(RateState {
205 tokens: (burst.max(1)) as f64,
206 last: now,
207 }),
208 }
209 }
210
211 async fn allow(&self) -> bool {
212 let mut s = self.state.lock().await;
213 let now = Instant::now();
214 let elapsed = now.saturating_duration_since(s.last);
215 let refill = elapsed.as_secs_f64() * self.max_rps;
216 s.tokens = (s.tokens + refill).min(self.burst);
217 s.last = now;
218 if s.tokens >= 1.0 {
219 s.tokens -= 1.0;
220 true
221 } else {
222 false
223 }
224 }
225}
226
227async fn auth_middleware(
228 State(state): State<AppState>,
229 req: Request<axum::body::Body>,
230 next: Next,
231) -> Response {
232 if state.token.is_none() {
233 return next.run(req).await;
234 }
235
236 if req.uri().path() == "/health" {
237 return next.run(req).await;
238 }
239
240 let expected = state.token.as_deref().unwrap_or("");
241 let Some(h) = req.headers().get(header::AUTHORIZATION) else {
242 return json_error(
243 StatusCode::UNAUTHORIZED,
244 "unauthorized",
245 "missing Authorization header",
246 );
247 };
248 let Ok(s) = h.to_str() else {
249 return json_error(
250 StatusCode::UNAUTHORIZED,
251 "unauthorized",
252 "malformed Authorization header",
253 );
254 };
255 let Some(token) = s
256 .strip_prefix("Bearer ")
257 .or_else(|| s.strip_prefix("bearer "))
258 else {
259 return json_error(
260 StatusCode::UNAUTHORIZED,
261 "unauthorized",
262 "Authorization must use the Bearer scheme",
263 );
264 };
265 if !constant_time_eq(token.as_bytes(), expected.as_bytes()) {
266 return json_error(
267 StatusCode::UNAUTHORIZED,
268 "unauthorized",
269 "invalid bearer token",
270 );
271 }
272
273 next.run(req).await
274}
275
276pub(crate) fn json_error(status: StatusCode, error_code: &str, message: &str) -> Response {
282 (
283 status,
284 Json(serde_json::json!({ "error": message, "error_code": error_code })),
285 )
286 .into_response()
287}
288
289fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
290 use subtle::ConstantTimeEq;
291 if a.len() != b.len() {
292 return false;
293 }
294 bool::from(a.ct_eq(b))
295}
296
297async fn rate_limit_middleware(
298 State(state): State<AppState>,
299 req: Request<axum::body::Body>,
300 next: Next,
301) -> Response {
302 if !state.rate.allow().await {
303 return StatusCode::TOO_MANY_REQUESTS.into_response();
304 }
305 next.run(req).await
306}
307
308async fn concurrency_middleware(
309 State(state): State<AppState>,
310 req: Request<axum::body::Body>,
311 next: Next,
312) -> Response {
313 let Ok(permit) = state.concurrency.clone().try_acquire_owned() else {
314 return StatusCode::TOO_MANY_REQUESTS.into_response();
315 };
316 let resp = next.run(req).await;
317 drop(permit);
318 resp
319}
320
321async fn health() -> impl IntoResponse {
322 (StatusCode::OK, "ok\n")
323}
324
325async fn v1_shutdown() -> impl IntoResponse {
326 tokio::spawn(async {
327 tokio::time::sleep(Duration::from_millis(100)).await;
328 std::process::exit(0);
329 });
330 (StatusCode::OK, "shutting down\n")
331}
332
333#[derive(Debug, Deserialize)]
334#[serde(rename_all = "camelCase")]
335struct ToolCallBody {
336 name: String,
337 #[serde(default)]
338 arguments: Option<Value>,
339 #[serde(default)]
340 _workspace_id: Option<String>,
341 #[serde(default)]
342 _channel_id: Option<String>,
343}
344
345#[derive(Debug, Deserialize)]
346#[serde(rename_all = "camelCase")]
347struct EventsQuery {
348 #[serde(default)]
349 workspace_id: Option<String>,
350 #[serde(default)]
351 channel_id: Option<String>,
352 #[serde(default)]
353 since: Option<i64>,
354 #[serde(default)]
355 limit: Option<usize>,
356 #[serde(default)]
359 kind: Option<String>,
360}
361
362async fn v1_manifest(State(state): State<AppState>) -> impl IntoResponse {
363 let _ = state;
364 let v = crate::core::mcp_manifest::manifest_value();
365 (StatusCode::OK, Json(v))
366}
367
368#[derive(Debug, Deserialize)]
369#[serde(rename_all = "camelCase")]
370struct ToolsQuery {
371 #[serde(default)]
372 offset: Option<usize>,
373 #[serde(default)]
374 limit: Option<usize>,
375}
376
377async fn v1_tools(State(state): State<AppState>, Query(q): Query<ToolsQuery>) -> impl IntoResponse {
378 let _ = state;
379 let v = crate::core::mcp_manifest::manifest_value();
380 let tools = v
381 .get("tools")
382 .and_then(|t| t.get("granular"))
383 .cloned()
384 .unwrap_or(Value::Array(vec![]));
385
386 let all = tools.as_array().cloned().unwrap_or_default();
387 let total = all.len();
388 let offset = q.offset.unwrap_or(0).min(total);
389 let limit = q.limit.unwrap_or(200).min(500);
390 let page = all.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
391
392 (
393 StatusCode::OK,
394 Json(serde_json::json!({
395 "tools": page,
396 "total": total,
397 "offset": offset,
398 "limit": limit,
399 })),
400 )
401}
402
403async fn v1_tool_call(
404 State(state): State<AppState>,
405 Json(body): Json<ToolCallBody>,
406) -> impl IntoResponse {
407 let engine = ContextEngine::from_server(state.server.clone());
408 match tokio::time::timeout(
409 state.timeout,
410 engine.call_tool_value(&body.name, body.arguments),
411 )
412 .await
413 {
414 Ok(Ok(v)) => (StatusCode::OK, Json(serde_json::json!({ "result": v }))).into_response(),
415 Ok(Err(e)) => {
416 tracing::warn!("tool call error: {e}");
417 json_error(
418 StatusCode::BAD_REQUEST,
419 "tool_error",
420 "tool execution failed",
421 )
422 }
423 Err(_) => json_error(
424 StatusCode::GATEWAY_TIMEOUT,
425 "request_timeout",
426 "tool call timed out",
427 ),
428 }
429}
430
431async fn v1_events(
432 State(state): State<AppState>,
433 Query(q): Query<EventsQuery>,
434) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
435 use crate::core::context_os::{redact_event_payload, ContextEventV1, RedactionLevel};
436
437 let ws = sanitize_id(&q.workspace_id.unwrap_or_else(|| "default".to_string()));
438 let ch = sanitize_id(&q.channel_id.unwrap_or_else(|| "default".to_string()));
439 let _ = &state.project_root;
440 let since = q.since.unwrap_or(0);
441 let limit = q.limit.unwrap_or(200).min(1000);
442 let redaction = RedactionLevel::RefsOnly;
443
444 let kind_filter: Option<Vec<String>> = q
445 .kind
446 .as_deref()
447 .map(|k| k.split(',').map(|s| s.trim().to_string()).collect());
448
449 let rt = crate::core::context_os::runtime();
450 let replay = rt.bus.read(&ws, &ch, since, limit);
451
452 let replay = if let Some(ref kinds) = kind_filter {
453 replay
454 .into_iter()
455 .filter(|ev| kinds.contains(&ev.kind))
456 .collect()
457 } else {
458 replay
459 };
460
461 let rx = if let Some(ref kinds) = kind_filter {
462 let kind_refs: Vec<&str> = kinds.iter().map(String::as_str).collect();
463 let filter = crate::core::context_os::TopicFilter::kinds(&kind_refs);
464 if let Some(sub) = rt.bus.subscribe_filtered(&ws, &ch, filter) {
465 crate::core::context_os::SubscriptionKind::Filtered(sub)
466 } else {
467 tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
468 let (_, rx) = broadcast::channel::<ContextEventV1>(1);
469 crate::core::context_os::SubscriptionKind::Unfiltered(rx)
470 }
471 } else if let Some(sub) = rt.bus.subscribe(&ws, &ch) {
472 crate::core::context_os::SubscriptionKind::Unfiltered(sub)
473 } else {
474 tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
475 let (_, rx) = broadcast::channel::<ContextEventV1>(1);
476 crate::core::context_os::SubscriptionKind::Unfiltered(rx)
477 };
478
479 rt.metrics.record_sse_connect();
480 rt.metrics.record_events_replayed(replay.len() as u64);
481 rt.metrics.record_workspace_active(&ws);
482
483 let bus = rt.bus.clone();
484 let metrics = rt.metrics.clone();
485 let pending: std::collections::VecDeque<ContextEventV1> = replay.into();
486
487 let stream = futures::stream::unfold(
488 (
489 pending,
490 rx,
491 ws.clone(),
492 ch.clone(),
493 since,
494 redaction,
495 bus,
496 metrics,
497 ),
498 |(mut pending, mut rx, ws, ch, mut last_id, redaction, bus, metrics)| async move {
499 if let Some(mut ev) = pending.pop_front() {
500 last_id = ev.id;
501 redact_event_payload(&mut ev, redaction);
502 let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
503 let evt = SseEvent::default()
504 .id(ev.id.to_string())
505 .event(ev.kind)
506 .data(data);
507 return Some((
508 Ok(evt),
509 (pending, rx, ws, ch, last_id, redaction, bus, metrics),
510 ));
511 }
512
513 loop {
514 match rx.recv().await {
515 Ok(mut ev) if ev.id > last_id => {
516 last_id = ev.id;
517 redact_event_payload(&mut ev, redaction);
518 let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
519 let evt = SseEvent::default()
520 .id(ev.id.to_string())
521 .event(ev.kind)
522 .data(data);
523 return Some((
524 Ok(evt),
525 (pending, rx, ws, ch, last_id, redaction, bus, metrics),
526 ));
527 }
528 Ok(_) => {}
529 Err(broadcast::error::RecvError::Closed) => return None,
530 Err(broadcast::error::RecvError::Lagged(skipped)) => {
531 let missed = bus.read(&ws, &ch, last_id, skipped as usize);
532 metrics.record_events_replayed(missed.len() as u64);
533 for ev in missed {
534 last_id = last_id.max(ev.id);
535 pending.push_back(ev);
536 }
537 }
538 }
539 }
540 },
541 );
542
543 let metrics_ref = rt.metrics.clone();
544 let guarded = SseDisconnectGuard {
545 inner: Box::pin(stream),
546 metrics: metrics_ref,
547 };
548
549 Sse::new(guarded).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
550}
551
552#[derive(Debug, Deserialize)]
553struct AuditEventsQuery {
554 #[serde(default = "default_audit_limit")]
555 limit: usize,
556}
557
558fn default_audit_limit() -> usize {
559 100
560}
561
562async fn v1_audit_events(Query(q): Query<AuditEventsQuery>) -> impl IntoResponse {
563 let capped = q.limit.min(1000);
564 let boundary_events = crate::core::memory_boundary::load_audit_events(capped);
565 let trail_events = crate::core::audit_trail::load_recent(capped);
566
567 Json(serde_json::json!({
568 "cross_project_events": boundary_events,
569 "audit_trail": trail_events,
570 }))
571}
572
573async fn v1_metrics(State(_state): State<AppState>) -> impl IntoResponse {
574 let rt = crate::core::context_os::runtime();
575 let snap = rt.metrics.snapshot();
576 (
577 StatusCode::OK,
578 Json(serde_json::to_value(snap).unwrap_or_default()),
579 )
580}
581
582const MAX_HANDOFF_PAYLOAD_BYTES: usize = 1_000_000;
583const MAX_HANDOFF_FILES: usize = 50;
584
585async fn v1_a2a_handoff(
586 State(state): State<AppState>,
587 Json(body): Json<Value>,
588) -> impl IntoResponse {
589 let envelope = match crate::core::a2a_transport::parse_envelope(
590 &serde_json::to_string(&body).unwrap_or_default(),
591 ) {
592 Ok(env) => env,
593 Err(e) => {
594 tracing::warn!("a2a handoff parse error: {e}");
595 return (
596 StatusCode::BAD_REQUEST,
597 Json(serde_json::json!({"error": "invalid_envelope"})),
598 );
599 }
600 };
601
602 if envelope.payload_json.len() > MAX_HANDOFF_PAYLOAD_BYTES {
603 tracing::warn!(
604 "a2a handoff payload too large: {} bytes (limit {MAX_HANDOFF_PAYLOAD_BYTES})",
605 envelope.payload_json.len()
606 );
607 return (
608 StatusCode::PAYLOAD_TOO_LARGE,
609 Json(serde_json::json!({"error": "payload_too_large"})),
610 );
611 }
612
613 let rt = crate::core::context_os::runtime();
614 rt.bus.append(
615 &state.project_root,
616 "a2a",
617 &crate::core::context_os::ContextEventKindV1::SessionMutated,
618 Some(&envelope.sender.agent_id),
619 serde_json::json!({
620 "type": "handoff_received",
621 "content_type": format!("{:?}", envelope.content_type),
622 "sender": envelope.sender.agent_id,
623 "payload_size": envelope.payload_json.len(),
624 }),
625 );
626
627 match envelope.content_type {
628 crate::core::a2a_transport::TransportContentType::ContextPackage => {
629 let dir = std::path::Path::new(&state.project_root)
630 .join(".lean-ctx")
631 .join("handoffs")
632 .join("packages");
633 let _ = std::fs::create_dir_all(&dir);
634 evict_oldest_files(&dir, MAX_HANDOFF_FILES);
635 let out = dir.join(format!(
636 "ctx-{}.{}",
637 chrono::Utc::now().format("%Y%m%d_%H%M%S"),
638 crate::core::contracts::PACKAGE_EXTENSION
639 ));
640 if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
641 tracing::error!("a2a handoff write failed: {e}");
642 return (
643 StatusCode::INTERNAL_SERVER_ERROR,
644 Json(serde_json::json!({"error": "write_failed"})),
645 );
646 }
647 (
648 StatusCode::OK,
649 Json(serde_json::json!({
650 "status": "received",
651 "content_type": "context_package",
652 })),
653 )
654 }
655 crate::core::a2a_transport::TransportContentType::HandoffBundle => {
656 let dir = std::path::Path::new(&state.project_root)
657 .join(".lean-ctx")
658 .join("handoffs");
659 let _ = std::fs::create_dir_all(&dir);
660 evict_oldest_files(&dir, MAX_HANDOFF_FILES);
661 let out = dir.join(format!(
662 "received-{}.json",
663 chrono::Utc::now().format("%Y%m%d_%H%M%S")
664 ));
665 if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
666 tracing::error!("a2a handoff write failed: {e}");
667 return (
668 StatusCode::INTERNAL_SERVER_ERROR,
669 Json(serde_json::json!({"error": "write_failed"})),
670 );
671 }
672 (
673 StatusCode::OK,
674 Json(serde_json::json!({
675 "status": "received",
676 "content_type": "handoff_bundle",
677 })),
678 )
679 }
680 _ => (
681 StatusCode::OK,
682 Json(serde_json::json!({
683 "status": "received",
684 "content_type": format!("{:?}", envelope.content_type),
685 })),
686 ),
687 }
688}
689
690fn evict_oldest_files(dir: &std::path::Path, max_files: usize) {
691 let Ok(entries) = std::fs::read_dir(dir) else {
692 return;
693 };
694 let mut files: Vec<(std::time::SystemTime, std::path::PathBuf)> = entries
695 .filter_map(|e| {
696 let e = e.ok()?;
697 let meta = e.metadata().ok()?;
698 if meta.is_file() {
699 Some((meta.modified().unwrap_or(std::time::UNIX_EPOCH), e.path()))
700 } else {
701 None
702 }
703 })
704 .collect();
705
706 if files.len() < max_files {
707 return;
708 }
709 files.sort_by_key(|(mtime, _)| *mtime);
710 let to_remove = files.len().saturating_sub(max_files.saturating_sub(1));
711 for (_, path) in files.into_iter().take(to_remove) {
712 let _ = std::fs::remove_file(path);
713 }
714}
715
716async fn a2a_jsonrpc(Json(body): Json<Value>) -> impl IntoResponse {
717 let req: crate::core::a2a::a2a_compat::JsonRpcRequest = match serde_json::from_value(body) {
718 Ok(r) => r,
719 Err(e) => {
720 tracing::debug!("a2a JSON-RPC parse error: {e}");
721 return (
722 StatusCode::BAD_REQUEST,
723 Json(serde_json::json!({
724 "jsonrpc": "2.0",
725 "id": null,
726 "error": {"code": -32700, "message": "invalid request"}
727 })),
728 );
729 }
730 };
731 let resp = crate::core::a2a::a2a_compat::handle_a2a_jsonrpc(&req);
732 let json = serde_json::to_value(resp).unwrap_or_default();
733 (StatusCode::OK, Json(json))
734}
735
736async fn v1_a2a_agent_card(State(state): State<AppState>) -> impl IntoResponse {
737 let card = crate::core::a2a::agent_card::build_agent_card(&state.project_root);
738 (
739 StatusCode::OK,
740 [(header::CONTENT_TYPE, "application/json")],
741 Json(card),
742 )
743}
744
745async fn mcp_server_card() -> impl IntoResponse {
746 let card = serde_json::json!({
747 "name": "lean-ctx",
748 "version": env!("CARGO_PKG_VERSION"),
749 "description": "Context Infrastructure Layer — compression, caching, governance for AI agents",
750 "capabilities": {
751 "tools": true,
752 "resources": false,
753 "prompts": false,
754 "sampling": false
755 },
756 "tool_categories": [
757 {"name": "file_operations", "tools": ["ctx_read", "ctx_search", "ctx_tree", "ctx_edit"], "avg_token_cost": 150},
758 {"name": "session_management", "tools": ["ctx_session", "ctx_compress", "ctx_dedup", "ctx_preload"], "avg_token_cost": 80},
759 {"name": "intelligence", "tools": ["ctx_knowledge", "ctx_semantic_search", "ctx_graph", "ctx_overview"], "avg_token_cost": 200},
760 {"name": "agent_ops", "tools": ["ctx_agent", "ctx_handoff", "ctx_task", "ctx_share"], "avg_token_cost": 120}
761 ],
762 "features": {
763 "compression": "deterministic AST-based, 40-70% token reduction",
764 "caching": "session-scoped with zstd, re-reads ~13 tokens",
765 "audit_trail": "SHA-256 chained JSONL",
766 "rbac": "5 built-in roles with capability-based access",
767 "sandboxing": "Level 0 (subprocess) + Level 1 (OS-level)",
768 "secret_detection": "8 regex patterns + custom"
769 },
770 "security": {
771 "path_jail": true,
772 "rate_limiting": true,
773 "budget_tracking": true,
774 "signed_handoffs": true,
775 "timing_safe_auth": true
776 }
777 });
778 Json(card)
779}
780
781async fn v1_agents_register(
782 State(state): State<AppState>,
783 Json(body): Json<Value>,
784) -> impl IntoResponse {
785 let agent_type = body
786 .get("agent_type")
787 .and_then(|v| v.as_str())
788 .unwrap_or("unknown");
789 let role = body.get("role").and_then(|v| v.as_str());
790 let project_root = body
791 .get("project_root")
792 .and_then(|v| v.as_str())
793 .unwrap_or(&state.project_root);
794
795 let mut registry = crate::core::agents::AgentRegistry::load_or_create();
796 let agent_id = registry.register(agent_type, role, project_root);
797 let _ = registry.save();
798
799 Json(serde_json::json!({
800 "agent_id": agent_id,
801 "status": "registered"
802 }))
803}
804
805async fn v1_agents_heartbeat(Json(body): Json<Value>) -> impl IntoResponse {
806 let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
807 let mut registry = crate::core::agents::AgentRegistry::load_or_create();
808 registry.update_heartbeat(agent_id);
809 let _ = registry.save();
810 Json(serde_json::json!({"status": "ok"}))
811}
812
813async fn v1_agents_list() -> impl IntoResponse {
814 let registry = crate::core::agents::AgentRegistry::load_or_create();
815 let active = registry.list_active(None);
816 Json(serde_json::json!({
817 "agents": active.iter().map(|a| serde_json::json!({
818 "agent_id": a.agent_id,
819 "agent_type": a.agent_type,
820 "role": a.role,
821 "status": a.status.to_string(),
822 "last_active": a.last_active.to_rfc3339(),
823 })).collect::<Vec<_>>()
824 }))
825}
826
827async fn v1_agents_deregister(Json(body): Json<Value>) -> impl IntoResponse {
828 let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
829 let mut registry = crate::core::agents::AgentRegistry::load_or_create();
830 registry.set_status(
831 agent_id,
832 crate::core::agents::AgentStatus::Finished,
833 Some("deregistered via API"),
834 );
835 let _ = registry.save();
836 Json(serde_json::json!({"status": "deregistered"}))
837}
838
839async fn v1_agents_events_sse(
840) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
841 let stream = futures::stream::unfold(0usize, |last_count| async move {
842 loop {
843 tokio::time::sleep(Duration::from_secs(5)).await;
844 let registry = crate::core::agents::AgentRegistry::load_or_create();
845 let active = registry.list_active(None);
846 let count = active.len();
847 if count != last_count {
848 let data = serde_json::json!({
849 "type": "agents_changed",
850 "active_count": count,
851 "agents": active.iter().map(|a| &a.agent_id).collect::<Vec<_>>(),
852 });
853 return Some((
854 Ok::<_, std::convert::Infallible>(SseEvent::default().data(data.to_string())),
855 count,
856 ));
857 }
858 }
859 });
860
861 Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
862}
863
864fn build_app_router(cfg: &HttpServerConfig) -> Router {
865 let project_root = cfg.project_root.to_string_lossy().to_string();
866 let service_project_root = project_root.clone();
867 let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
868 Ok(LeanCtxServer::new_shared_with_context(
869 &service_project_root,
870 "default",
871 "default",
872 ))
873 };
874 let mcp_http = StreamableHttpService::new(
875 service_factory,
876 Arc::new(
877 rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
878 ),
879 cfg.mcp_http_config(),
880 );
881
882 let rest_server = LeanCtxServer::new_shared_with_context(&project_root, "default", "default");
883
884 let state = AppState {
885 token: cfg.effective_auth_token(),
886 concurrency: Arc::new(tokio::sync::Semaphore::new(cfg.max_concurrency.max(1))),
887 rate: Arc::new(RateLimiter::new(cfg.max_rps, cfg.rate_burst)),
888 project_root,
889 timeout: Duration::from_millis(cfg.request_timeout_ms.max(1)),
890 server: rest_server,
891 };
892
893 Router::new()
894 .route("/health", get(health))
895 .route("/v1/shutdown", axum::routing::post(v1_shutdown))
896 .route("/v1/manifest", get(v1_manifest))
897 .route("/v1/tools", get(v1_tools))
898 .route("/v1/tools/call", axum::routing::post(v1_tool_call))
899 .route("/v1/events", get(v1_events))
900 .route(
901 "/v1/context/summary",
902 get(context_views::v1_context_summary),
903 )
904 .route("/v1/events/search", get(context_views::v1_events_search))
905 .route("/v1/events/lineage", get(context_views::v1_event_lineage))
906 .route("/v1/metrics", get(v1_metrics))
907 .route("/v1/audit/events", get(v1_audit_events))
908 .route("/v1/a2a/handoff", axum::routing::post(v1_a2a_handoff))
909 .route("/v1/a2a/agent-card", get(v1_a2a_agent_card))
910 .route("/.well-known/agent.json", get(v1_a2a_agent_card))
911 .route("/.well-known/mcp-server.json", get(mcp_server_card))
912 .route("/a2a", axum::routing::post(a2a_jsonrpc))
913 .route(
914 "/v1/agents/register",
915 axum::routing::post(v1_agents_register),
916 )
917 .route(
918 "/v1/agents/heartbeat",
919 axum::routing::post(v1_agents_heartbeat),
920 )
921 .route("/v1/agents/list", get(v1_agents_list))
922 .route(
923 "/v1/agents/deregister",
924 axum::routing::post(v1_agents_deregister),
925 )
926 .route("/v1/agents/events", get(v1_agents_events_sse))
927 .fallback_service(mcp_http)
928 .layer(axum::extract::DefaultBodyLimit::max(cfg.max_body_bytes))
929 .layer(middleware::from_fn_with_state(
930 state.clone(),
931 rate_limit_middleware,
932 ))
933 .layer(middleware::from_fn_with_state(
934 state.clone(),
935 concurrency_middleware,
936 ))
937 .layer(middleware::from_fn_with_state(
938 state.clone(),
939 auth_middleware,
940 ))
941 .with_state(state)
942}
943
944pub async fn serve(cfg: HttpServerConfig) -> Result<()> {
945 crate::core::protocol::set_mcp_context(true);
946 cfg.validate()?;
947
948 let addr: SocketAddr = format!("{}:{}", cfg.host, cfg.port)
949 .parse()
950 .context("invalid host/port")?;
951
952 let app = build_app_router(&cfg);
953
954 let listener = tokio::net::TcpListener::bind(addr)
955 .await
956 .with_context(|| format!("bind {addr}"))?;
957
958 tracing::info!(
959 "lean-ctx Streamable HTTP server listening on http://{addr} (project_root={})",
960 cfg.project_root.display()
961 );
962
963 axum::serve(listener, app)
964 .with_graceful_shutdown(async move {
965 let _ = tokio::signal::ctrl_c().await;
966 })
967 .await
968 .context("http server")?;
969 Ok(())
970}
971
972#[cfg(windows)]
973impl axum::serve::Listener for crate::ipc::NamedPipeListener {
974 type Io = tokio::net::windows::named_pipe::NamedPipeServer;
975 type Addr = String;
976
977 async fn accept(&mut self) -> (Self::Io, Self::Addr) {
978 loop {
979 match self.accept_pipe().await {
980 Ok(pipe) => return (pipe, self.name().to_string()),
981 Err(e) => {
982 tracing::error!("named pipe accept error: {e}");
983 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
984 }
985 }
986 }
987 }
988
989 fn local_addr(&self) -> std::io::Result<Self::Addr> {
990 Ok(self.name().to_string())
991 }
992}
993
994pub async fn serve_ipc(cfg: HttpServerConfig, addr: crate::ipc::DaemonAddr) -> Result<()> {
997 cfg.validate()?;
998
999 match addr {
1000 #[cfg(unix)]
1001 crate::ipc::DaemonAddr::Unix(ref path) => {
1002 let app = build_app_router(&cfg);
1003 let listener = crate::ipc::bind_listener(&addr)?;
1004
1005 tracing::info!(
1006 "lean-ctx daemon listening on {} (project_root={})",
1007 path.display(),
1008 cfg.project_root.display()
1009 );
1010
1011 axum::serve(listener, app.into_make_service())
1012 .with_graceful_shutdown(async move {
1013 let _ = tokio::signal::ctrl_c().await;
1014 })
1015 .await
1016 .context("ipc server")?;
1017 Ok(())
1018 }
1019 #[cfg(windows)]
1020 crate::ipc::DaemonAddr::NamedPipe(ref name) => {
1021 let app = build_app_router(&cfg);
1022 let listener = crate::ipc::bind_listener(&addr)?;
1023
1024 tracing::info!(
1025 "lean-ctx daemon listening on {} (project_root={})",
1026 name,
1027 cfg.project_root.display()
1028 );
1029
1030 axum::serve(listener, app.into_make_service())
1031 .with_graceful_shutdown(async move {
1032 let _ = tokio::signal::ctrl_c().await;
1033 })
1034 .await
1035 .context("ipc server")?;
1036 Ok(())
1037 }
1038 }
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043 use super::*;
1044 use axum::body::Body;
1045 use axum::http::Request;
1046 use futures::StreamExt;
1047 use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
1048 use serde_json::json;
1049 use tower::ServiceExt;
1050
1051 async fn read_first_sse_message(body: Body) -> String {
1052 let mut stream = body.into_data_stream();
1053 let mut buf: Vec<u8> = Vec::new();
1054 for _ in 0..32 {
1055 let next = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
1056 let Ok(Some(Ok(bytes))) = next else {
1057 break;
1058 };
1059 buf.extend_from_slice(&bytes);
1060 if buf.windows(2).any(|w| w == b"\n\n") {
1061 break;
1062 }
1063 }
1064 String::from_utf8_lossy(&buf).to_string()
1065 }
1066
1067 #[tokio::test]
1068 async fn auth_token_blocks_requests_without_bearer_header() {
1069 let dir = tempfile::tempdir().expect("tempdir");
1070 let root_str = dir.path().to_string_lossy().to_string();
1071 let service_project_root = root_str.clone();
1072 let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
1073 Ok(LeanCtxServer::new_shared_with_context(
1074 &service_project_root,
1075 "default",
1076 "default",
1077 ))
1078 };
1079 let cfg = StreamableHttpServerConfig::default()
1080 .with_stateful_mode(false)
1081 .with_json_response(true);
1082
1083 let mcp_http = StreamableHttpService::new(
1084 service_factory,
1085 Arc::new(
1086 rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
1087 ),
1088 cfg,
1089 );
1090
1091 let state = AppState {
1092 token: Some("secret".to_string()),
1093 concurrency: Arc::new(tokio::sync::Semaphore::new(4)),
1094 rate: Arc::new(RateLimiter::new(50, 100)),
1095 project_root: root_str.clone(),
1096 timeout: Duration::from_secs(30),
1097 server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1098 };
1099
1100 let app = Router::new()
1101 .fallback_service(mcp_http)
1102 .layer(middleware::from_fn_with_state(
1103 state.clone(),
1104 auth_middleware,
1105 ))
1106 .with_state(state);
1107
1108 let body = json!({
1109 "jsonrpc": "2.0",
1110 "id": 1,
1111 "method": "tools/list",
1112 "params": {}
1113 })
1114 .to_string();
1115
1116 let req = Request::builder()
1117 .method("POST")
1118 .uri("/")
1119 .header("Host", "localhost")
1120 .header("Accept", "application/json, text/event-stream")
1121 .header("Content-Type", "application/json")
1122 .body(Body::from(body))
1123 .expect("request");
1124
1125 let resp = app.clone().oneshot(req).await.expect("resp");
1126 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1127 }
1128
1129 #[tokio::test]
1130 async fn mcp_service_factory_isolates_per_client_state() {
1131 let dir = tempfile::tempdir().expect("tempdir");
1132 let root_str = dir.path().to_string_lossy().to_string();
1133
1134 let service_project_root = root_str.clone();
1136 let service_factory = move || -> Result<LeanCtxServer, std::convert::Infallible> {
1137 Ok(LeanCtxServer::new_shared_with_context(
1138 &service_project_root,
1139 "default",
1140 "default",
1141 ))
1142 };
1143
1144 let s1 = service_factory().expect("server 1");
1145 let s2 = service_factory().expect("server 2");
1146
1147 *s1.client_name.write().await = "client-a".to_string();
1150 *s2.client_name.write().await = "client-b".to_string();
1151
1152 let a = s1.client_name.read().await.clone();
1153 let b = s2.client_name.read().await.clone();
1154 assert_eq!(a, "client-a");
1155 assert_eq!(b, "client-b");
1156 }
1157
1158 #[tokio::test]
1159 async fn rate_limit_returns_429_when_exhausted() {
1160 let state = AppState {
1161 token: None,
1162 concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1163 rate: Arc::new(RateLimiter::new(1, 1)),
1164 project_root: ".".to_string(),
1165 timeout: Duration::from_secs(30),
1166 server: LeanCtxServer::new_shared_with_context(".", "default", "default"),
1167 };
1168
1169 let app = Router::new()
1170 .route("/limited", get(|| async { (StatusCode::OK, "ok\n") }))
1171 .layer(middleware::from_fn_with_state(
1172 state.clone(),
1173 rate_limit_middleware,
1174 ))
1175 .with_state(state);
1176
1177 let req1 = Request::builder()
1178 .method("GET")
1179 .uri("/limited")
1180 .header("Host", "localhost")
1181 .body(Body::empty())
1182 .expect("req1");
1183 let resp1 = app.clone().oneshot(req1).await.expect("resp1");
1184 assert_eq!(resp1.status(), StatusCode::OK);
1185
1186 let req2 = Request::builder()
1187 .method("GET")
1188 .uri("/limited")
1189 .header("Host", "localhost")
1190 .body(Body::empty())
1191 .expect("req2");
1192 let resp2 = app.clone().oneshot(req2).await.expect("resp2");
1193 assert_eq!(resp2.status(), StatusCode::TOO_MANY_REQUESTS);
1194 }
1195
1196 #[tokio::test]
1197 async fn audit_events_endpoint_returns_json() {
1198 let dir = tempfile::tempdir().expect("tempdir");
1199 let root_str = dir.path().to_string_lossy().to_string();
1200
1201 let state = AppState {
1202 token: None,
1203 concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1204 rate: Arc::new(RateLimiter::new(50, 100)),
1205 project_root: root_str.clone(),
1206 timeout: Duration::from_secs(30),
1207 server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1208 };
1209
1210 let app = Router::new()
1211 .route("/v1/audit/events", get(v1_audit_events))
1212 .with_state(state);
1213
1214 let req = Request::builder()
1215 .method("GET")
1216 .uri("/v1/audit/events?limit=10")
1217 .header("Host", "localhost")
1218 .body(Body::empty())
1219 .unwrap();
1220
1221 let resp = app.oneshot(req).await.unwrap();
1222 assert_eq!(resp.status(), StatusCode::OK);
1223
1224 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1225 .await
1226 .unwrap();
1227 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1228 assert!(json.get("cross_project_events").unwrap().is_array());
1229 assert!(json.get("audit_trail").unwrap().is_array());
1230 }
1231
1232 #[tokio::test]
1233 async fn events_endpoint_replays_tool_call_event() {
1234 use crate::core::context_os::{self, ContextEventKindV1};
1235
1236 let dir = tempfile::tempdir().expect("tempdir");
1237 std::fs::create_dir_all(dir.path().join(".git")).expect("git marker");
1238 std::fs::write(dir.path().join("a.txt"), "ok").expect("file");
1239 let root_str = dir.path().to_string_lossy().to_string();
1240
1241 let state = AppState {
1242 token: None,
1243 concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1244 rate: Arc::new(RateLimiter::new(50, 100)),
1245 project_root: root_str.clone(),
1246 timeout: Duration::from_secs(30),
1247 server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1248 };
1249
1250 let app = Router::new()
1251 .route("/v1/events", get(v1_events))
1252 .with_state(state);
1253
1254 let rt = context_os::runtime();
1256 rt.bus.append(
1257 "ws1",
1258 "ch1",
1259 &ContextEventKindV1::ToolCallRecorded,
1260 Some("test-agent"),
1261 json!({"tool": "ctx_session", "action": "status"}),
1262 );
1263
1264 let req = Request::builder()
1265 .method("GET")
1266 .uri("/v1/events?workspaceId=ws1&channelId=ch1&since=0&limit=1")
1267 .header("Host", "localhost")
1268 .header("Accept", "text/event-stream")
1269 .body(Body::empty())
1270 .expect("req");
1271 let resp = app.clone().oneshot(req).await.expect("events");
1272 assert_eq!(resp.status(), StatusCode::OK);
1273
1274 let msg = read_first_sse_message(resp.into_body()).await;
1275 assert!(msg.contains("event: tool_call_recorded"), "msg={msg:?}");
1276 assert!(msg.contains("\"ws1\""), "msg={msg:?}");
1277 assert!(msg.contains("\"ch1\""), "msg={msg:?}");
1278 }
1279}