1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{anyhow, Context, Result};
6use axum::{
7 extract::Json,
8 extract::Query,
9 extract::State,
10 http::{header, Request, StatusCode},
11 middleware::{self, Next},
12 response::sse::{Event as SseEvent, KeepAlive, Sse},
13 response::{IntoResponse, Response},
14 routing::get,
15 Router,
16};
17use futures::Stream;
18use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
19use serde::Deserialize;
20use serde_json::Value;
21use tokio::sync::broadcast;
22use tokio::time::{Duration, Instant};
23
24use crate::core::context_os::ContextOsMetrics;
25use crate::engine::ContextEngine;
26use crate::tools::LeanCtxServer;
27
28pub mod context_views;
29
30#[cfg(feature = "team-server")]
31pub mod team;
32
33use std::pin::Pin;
35
36pub(crate) struct SseDisconnectGuard<I> {
37 pub(crate) inner: Pin<Box<dyn Stream<Item = I> + Send>>,
38 pub(crate) metrics: Arc<ContextOsMetrics>,
39}
40
41impl<I> Stream for SseDisconnectGuard<I> {
42 type Item = I;
43
44 fn poll_next(
45 mut self: Pin<&mut Self>,
46 cx: &mut std::task::Context<'_>,
47 ) -> std::task::Poll<Option<Self::Item>> {
48 self.inner.as_mut().poll_next(cx)
49 }
50}
51
52impl<I> Drop for SseDisconnectGuard<I> {
53 fn drop(&mut self) {
54 self.metrics.record_sse_disconnect();
55 }
56}
57
58const MAX_ID_LEN: usize = 64;
59
60fn sanitize_id(raw: &str) -> String {
61 let trimmed = raw.trim();
62 if trimmed.is_empty() {
63 return "default".to_string();
64 }
65 let cleaned: String = trimmed
66 .chars()
67 .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
68 .take(MAX_ID_LEN)
69 .collect();
70 if cleaned.is_empty() {
71 "default".to_string()
72 } else {
73 cleaned
74 }
75}
76
77#[derive(Clone, Debug)]
78pub struct HttpServerConfig {
79 pub host: String,
80 pub port: u16,
81 pub project_root: PathBuf,
82 pub auth_token: Option<String>,
83 pub stateful_mode: bool,
84 pub json_response: bool,
85 pub disable_host_check: bool,
86 pub allowed_hosts: Vec<String>,
87 pub max_body_bytes: usize,
88 pub max_concurrency: usize,
89 pub max_rps: u32,
90 pub rate_burst: u32,
91 pub request_timeout_ms: u64,
92}
93
94impl Default for HttpServerConfig {
95 fn default() -> Self {
96 let project_root = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
97 Self {
98 host: "127.0.0.1".to_string(),
99 port: 8080,
100 project_root,
101 auth_token: None,
102 stateful_mode: false,
103 json_response: true,
104 disable_host_check: false,
105 allowed_hosts: Vec::new(),
106 max_body_bytes: 2 * 1024 * 1024,
107 max_concurrency: 32,
108 max_rps: 50,
109 rate_burst: 100,
110 request_timeout_ms: 30_000,
111 }
112 }
113}
114
115impl HttpServerConfig {
116 pub fn validate(&self) -> Result<()> {
117 let host = self.host.trim().to_lowercase();
118 let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
119 if !is_loopback && self.auth_token.as_deref().unwrap_or("").is_empty() {
120 return Err(anyhow!(
121 "Refusing to bind to host='{host}' without auth. Provide --auth-token (or bind to 127.0.0.1)."
122 ));
123 }
124 Ok(())
125 }
126
127 pub fn effective_auth_token(&self) -> Option<String> {
128 if let Some(ref token) = self.auth_token {
129 if !token.is_empty() {
130 return Some(token.clone());
131 }
132 }
133 let host = self.host.trim().to_lowercase();
134 let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
135 if is_loopback {
136 let auto_token = crate::core::session_token::generate_token();
137 eprintln!(
138 "[lean-ctx] Auto-generated auth token for loopback: {auto_token}\n\
139 Pass as Bearer token or set --auth-token explicitly."
140 );
141 Some(auto_token)
142 } else {
143 None
144 }
145 }
146
147 fn mcp_http_config(&self) -> StreamableHttpServerConfig {
148 let mut cfg = StreamableHttpServerConfig::default()
149 .with_stateful_mode(self.stateful_mode)
150 .with_json_response(self.json_response);
151
152 if self.disable_host_check {
153 tracing::warn!(
154 "⚠ --disable-host-check is active: DNS rebinding protection is OFF. \
155 Do NOT use this in production or on non-loopback interfaces."
156 );
157 cfg = cfg.disable_allowed_hosts();
158 return cfg;
159 }
160
161 if !self.allowed_hosts.is_empty() {
162 cfg = cfg.with_allowed_hosts(self.allowed_hosts.clone());
163 return cfg;
164 }
165
166 let host = self.host.trim();
168 if host == "127.0.0.1" || host == "localhost" || host == "::1" {
169 cfg.allowed_hosts.push(host.to_string());
170 }
171
172 cfg
173 }
174}
175
176#[derive(Clone)]
177struct AppState {
178 token: Option<String>,
179 concurrency: Arc<tokio::sync::Semaphore>,
180 rate: Arc<RateLimiter>,
181 project_root: String,
182 timeout: Duration,
183 server: LeanCtxServer,
184}
185
186#[derive(Debug)]
187struct RateLimiter {
188 max_rps: f64,
189 burst: f64,
190 state: tokio::sync::Mutex<RateState>,
191}
192
193#[derive(Debug, Clone, Copy)]
194struct RateState {
195 tokens: f64,
196 last: Instant,
197}
198
199impl RateLimiter {
200 fn new(max_rps: u32, burst: u32) -> Self {
201 let now = Instant::now();
202 Self {
203 max_rps: (max_rps.max(1)) as f64,
204 burst: (burst.max(1)) as f64,
205 state: tokio::sync::Mutex::new(RateState {
206 tokens: (burst.max(1)) as f64,
207 last: now,
208 }),
209 }
210 }
211
212 async fn allow(&self) -> bool {
213 let mut s = self.state.lock().await;
214 let now = Instant::now();
215 let elapsed = now.saturating_duration_since(s.last);
216 let refill = elapsed.as_secs_f64() * self.max_rps;
217 s.tokens = (s.tokens + refill).min(self.burst);
218 s.last = now;
219 if s.tokens >= 1.0 {
220 s.tokens -= 1.0;
221 true
222 } else {
223 false
224 }
225 }
226}
227
228async fn auth_middleware(
229 State(state): State<AppState>,
230 req: Request<axum::body::Body>,
231 next: Next,
232) -> Response {
233 if state.token.is_none() {
234 return next.run(req).await;
235 }
236
237 if req.uri().path() == "/health" {
238 return next.run(req).await;
239 }
240
241 let expected = state.token.as_deref().unwrap_or("");
242 let Some(h) = req.headers().get(header::AUTHORIZATION) else {
243 return json_error(
244 StatusCode::UNAUTHORIZED,
245 "unauthorized",
246 "missing Authorization header",
247 );
248 };
249 let Ok(s) = h.to_str() else {
250 return json_error(
251 StatusCode::UNAUTHORIZED,
252 "unauthorized",
253 "malformed Authorization header",
254 );
255 };
256 let Some(token) = s
257 .strip_prefix("Bearer ")
258 .or_else(|| s.strip_prefix("bearer "))
259 else {
260 return json_error(
261 StatusCode::UNAUTHORIZED,
262 "unauthorized",
263 "Authorization must use the Bearer scheme",
264 );
265 };
266 if !constant_time_eq(token.as_bytes(), expected.as_bytes()) {
267 return json_error(
268 StatusCode::UNAUTHORIZED,
269 "unauthorized",
270 "invalid bearer token",
271 );
272 }
273
274 next.run(req).await
275}
276
277pub(crate) fn json_error(status: StatusCode, error_code: &str, message: &str) -> Response {
283 (
284 status,
285 Json(serde_json::json!({ "error": message, "error_code": error_code })),
286 )
287 .into_response()
288}
289
290fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
291 use subtle::ConstantTimeEq;
292 if a.len() != b.len() {
293 return false;
294 }
295 bool::from(a.ct_eq(b))
296}
297
298async fn rate_limit_middleware(
299 State(state): State<AppState>,
300 req: Request<axum::body::Body>,
301 next: Next,
302) -> Response {
303 if !state.rate.allow().await {
304 return StatusCode::TOO_MANY_REQUESTS.into_response();
305 }
306 next.run(req).await
307}
308
309async fn concurrency_middleware(
310 State(state): State<AppState>,
311 req: Request<axum::body::Body>,
312 next: Next,
313) -> Response {
314 let Ok(permit) = state.concurrency.clone().try_acquire_owned() else {
315 return StatusCode::TOO_MANY_REQUESTS.into_response();
316 };
317 let resp = next.run(req).await;
318 drop(permit);
319 resp
320}
321
322async fn health() -> impl IntoResponse {
323 (StatusCode::OK, "ok\n")
324}
325
326async fn v1_shutdown() -> impl IntoResponse {
327 tokio::spawn(async {
328 tokio::time::sleep(Duration::from_millis(100)).await;
329 std::process::exit(0);
330 });
331 (StatusCode::OK, "shutting down\n")
332}
333
334#[derive(Debug, Deserialize)]
335#[serde(rename_all = "camelCase")]
336struct ToolCallBody {
337 name: String,
338 #[serde(default)]
339 arguments: Option<Value>,
340 #[serde(default)]
341 _workspace_id: Option<String>,
342 #[serde(default)]
343 _channel_id: Option<String>,
344}
345
346#[derive(Debug, Deserialize)]
347#[serde(rename_all = "camelCase")]
348struct EventsQuery {
349 #[serde(default)]
350 workspace_id: Option<String>,
351 #[serde(default)]
352 channel_id: Option<String>,
353 #[serde(default)]
354 since: Option<i64>,
355 #[serde(default)]
356 limit: Option<usize>,
357 #[serde(default)]
360 kind: Option<String>,
361}
362
363async fn v1_manifest(State(state): State<AppState>) -> impl IntoResponse {
364 let _ = state;
365 let v = crate::core::mcp_manifest::manifest_value();
366 (StatusCode::OK, Json(v))
367}
368
369#[derive(Debug, Deserialize)]
370#[serde(rename_all = "camelCase")]
371struct ToolsQuery {
372 #[serde(default)]
373 offset: Option<usize>,
374 #[serde(default)]
375 limit: Option<usize>,
376}
377
378async fn v1_tools(State(state): State<AppState>, Query(q): Query<ToolsQuery>) -> impl IntoResponse {
379 let _ = state;
380 let v = crate::core::mcp_manifest::manifest_value();
381 let tools = v
382 .get("tools")
383 .and_then(|t| t.get("granular"))
384 .cloned()
385 .unwrap_or(Value::Array(vec![]));
386
387 let all = tools.as_array().cloned().unwrap_or_default();
388 let total = all.len();
389 let offset = q.offset.unwrap_or(0).min(total);
390 let limit = q.limit.unwrap_or(200).min(500);
391 let page = all.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
392
393 (
394 StatusCode::OK,
395 Json(serde_json::json!({
396 "tools": page,
397 "total": total,
398 "offset": offset,
399 "limit": limit,
400 })),
401 )
402}
403
404async fn v1_tool_call(
405 State(state): State<AppState>,
406 Json(body): Json<ToolCallBody>,
407) -> impl IntoResponse {
408 let engine = ContextEngine::from_server(state.server.clone());
409 match tokio::time::timeout(
410 state.timeout,
411 engine.call_tool_value(&body.name, body.arguments),
412 )
413 .await
414 {
415 Ok(Ok(v)) => (StatusCode::OK, Json(serde_json::json!({ "result": v }))).into_response(),
416 Ok(Err(e)) => {
417 tracing::warn!("tool call error: {e}");
418 json_error(
419 StatusCode::BAD_REQUEST,
420 "tool_error",
421 "tool execution failed",
422 )
423 }
424 Err(_) => json_error(
425 StatusCode::GATEWAY_TIMEOUT,
426 "request_timeout",
427 "tool call timed out",
428 ),
429 }
430}
431
432async fn v1_events(
433 State(state): State<AppState>,
434 Query(q): Query<EventsQuery>,
435) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
436 use crate::core::context_os::{redact_event_payload, ContextEventV1, RedactionLevel};
437
438 let ws = sanitize_id(&q.workspace_id.unwrap_or_else(|| "default".to_string()));
439 let ch = sanitize_id(&q.channel_id.unwrap_or_else(|| "default".to_string()));
440 let _ = &state.project_root;
441 let since = q.since.unwrap_or(0);
442 let limit = q.limit.unwrap_or(200).min(1000);
443 let redaction = RedactionLevel::RefsOnly;
444
445 let kind_filter: Option<Vec<String>> = q
446 .kind
447 .as_deref()
448 .map(|k| k.split(',').map(|s| s.trim().to_string()).collect());
449
450 let rt = crate::core::context_os::runtime();
451 let replay = rt.bus.read(&ws, &ch, since, limit);
452
453 let replay = if let Some(ref kinds) = kind_filter {
454 replay
455 .into_iter()
456 .filter(|ev| kinds.contains(&ev.kind))
457 .collect()
458 } else {
459 replay
460 };
461
462 let rx = if let Some(ref kinds) = kind_filter {
463 let kind_refs: Vec<&str> = kinds.iter().map(String::as_str).collect();
464 let filter = crate::core::context_os::TopicFilter::kinds(&kind_refs);
465 if let Some(sub) = rt.bus.subscribe_filtered(&ws, &ch, filter) {
466 crate::core::context_os::SubscriptionKind::Filtered(sub)
467 } else {
468 tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
469 let (_, rx) = broadcast::channel::<ContextEventV1>(1);
470 crate::core::context_os::SubscriptionKind::Unfiltered(rx)
471 }
472 } else if let Some(sub) = rt.bus.subscribe(&ws, &ch) {
473 crate::core::context_os::SubscriptionKind::Unfiltered(sub)
474 } else {
475 tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
476 let (_, rx) = broadcast::channel::<ContextEventV1>(1);
477 crate::core::context_os::SubscriptionKind::Unfiltered(rx)
478 };
479
480 rt.metrics.record_sse_connect();
481 rt.metrics.record_events_replayed(replay.len() as u64);
482 rt.metrics.record_workspace_active(&ws);
483
484 let bus = rt.bus.clone();
485 let metrics = rt.metrics.clone();
486 let pending: std::collections::VecDeque<ContextEventV1> = replay.into();
487
488 let stream = futures::stream::unfold(
489 (
490 pending,
491 rx,
492 ws.clone(),
493 ch.clone(),
494 since,
495 redaction,
496 bus,
497 metrics,
498 ),
499 |(mut pending, mut rx, ws, ch, mut last_id, redaction, bus, metrics)| async move {
500 if let Some(mut ev) = pending.pop_front() {
501 last_id = ev.id;
502 redact_event_payload(&mut ev, redaction);
503 let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
504 let evt = SseEvent::default()
505 .id(ev.id.to_string())
506 .event(ev.kind)
507 .data(data);
508 return Some((
509 Ok(evt),
510 (pending, rx, ws, ch, last_id, redaction, bus, metrics),
511 ));
512 }
513
514 loop {
515 match rx.recv().await {
516 Ok(mut ev) if ev.id > last_id => {
517 last_id = ev.id;
518 redact_event_payload(&mut ev, redaction);
519 let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
520 let evt = SseEvent::default()
521 .id(ev.id.to_string())
522 .event(ev.kind)
523 .data(data);
524 return Some((
525 Ok(evt),
526 (pending, rx, ws, ch, last_id, redaction, bus, metrics),
527 ));
528 }
529 Ok(_) => {}
530 Err(broadcast::error::RecvError::Closed) => return None,
531 Err(broadcast::error::RecvError::Lagged(skipped)) => {
532 let missed = bus.read(&ws, &ch, last_id, skipped as usize);
533 metrics.record_events_replayed(missed.len() as u64);
534 for ev in missed {
535 last_id = last_id.max(ev.id);
536 pending.push_back(ev);
537 }
538 }
539 }
540 }
541 },
542 );
543
544 let metrics_ref = rt.metrics.clone();
545 let guarded = SseDisconnectGuard {
546 inner: Box::pin(stream),
547 metrics: metrics_ref,
548 };
549
550 Sse::new(guarded).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
551}
552
553#[derive(Debug, Deserialize)]
554struct AuditEventsQuery {
555 #[serde(default = "default_audit_limit")]
556 limit: usize,
557}
558
559fn default_audit_limit() -> usize {
560 100
561}
562
563async fn v1_audit_events(Query(q): Query<AuditEventsQuery>) -> impl IntoResponse {
564 let capped = q.limit.min(1000);
565 let boundary_events = crate::core::memory_boundary::load_audit_events(capped);
566 let trail_events = crate::core::audit_trail::load_recent(capped);
567
568 Json(serde_json::json!({
569 "cross_project_events": boundary_events,
570 "audit_trail": trail_events,
571 }))
572}
573
574async fn v1_metrics(State(_state): State<AppState>) -> impl IntoResponse {
575 let rt = crate::core::context_os::runtime();
576 let snap = rt.metrics.snapshot();
577 (
578 StatusCode::OK,
579 Json(serde_json::to_value(snap).unwrap_or_default()),
580 )
581}
582
583const MAX_HANDOFF_PAYLOAD_BYTES: usize = 1_000_000;
584const MAX_HANDOFF_FILES: usize = 50;
585
586async fn v1_a2a_handoff(
587 State(state): State<AppState>,
588 Json(body): Json<Value>,
589) -> impl IntoResponse {
590 let envelope = match crate::core::a2a_transport::parse_envelope(
591 &serde_json::to_string(&body).unwrap_or_default(),
592 ) {
593 Ok(env) => env,
594 Err(e) => {
595 tracing::warn!("a2a handoff parse error: {e}");
596 return (
597 StatusCode::BAD_REQUEST,
598 Json(serde_json::json!({"error": "invalid_envelope"})),
599 );
600 }
601 };
602
603 if envelope.payload_json.len() > MAX_HANDOFF_PAYLOAD_BYTES {
604 tracing::warn!(
605 "a2a handoff payload too large: {} bytes (limit {MAX_HANDOFF_PAYLOAD_BYTES})",
606 envelope.payload_json.len()
607 );
608 return (
609 StatusCode::PAYLOAD_TOO_LARGE,
610 Json(serde_json::json!({"error": "payload_too_large"})),
611 );
612 }
613
614 let rt = crate::core::context_os::runtime();
615 rt.bus.append(
616 &state.project_root,
617 "a2a",
618 &crate::core::context_os::ContextEventKindV1::SessionMutated,
619 Some(&envelope.sender.agent_id),
620 serde_json::json!({
621 "type": "handoff_received",
622 "content_type": format!("{:?}", envelope.content_type),
623 "sender": envelope.sender.agent_id,
624 "payload_size": envelope.payload_json.len(),
625 }),
626 );
627
628 match envelope.content_type {
629 crate::core::a2a_transport::TransportContentType::ContextPackage => {
630 let dir = std::path::Path::new(&state.project_root)
631 .join(".lean-ctx")
632 .join("handoffs")
633 .join("packages");
634 let _ = std::fs::create_dir_all(&dir);
635 evict_oldest_files(&dir, MAX_HANDOFF_FILES);
636 let out = dir.join(format!(
637 "ctx-{}.{}",
638 chrono::Utc::now().format("%Y%m%d_%H%M%S"),
639 crate::core::contracts::PACKAGE_EXTENSION
640 ));
641 if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
642 tracing::error!("a2a handoff write failed: {e}");
643 return (
644 StatusCode::INTERNAL_SERVER_ERROR,
645 Json(serde_json::json!({"error": "write_failed"})),
646 );
647 }
648 (
649 StatusCode::OK,
650 Json(serde_json::json!({
651 "status": "received",
652 "content_type": "context_package",
653 })),
654 )
655 }
656 crate::core::a2a_transport::TransportContentType::HandoffBundle => {
657 let dir = std::path::Path::new(&state.project_root)
658 .join(".lean-ctx")
659 .join("handoffs");
660 let _ = std::fs::create_dir_all(&dir);
661 evict_oldest_files(&dir, MAX_HANDOFF_FILES);
662 let out = dir.join(format!(
663 "received-{}.json",
664 chrono::Utc::now().format("%Y%m%d_%H%M%S")
665 ));
666 if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
667 tracing::error!("a2a handoff write failed: {e}");
668 return (
669 StatusCode::INTERNAL_SERVER_ERROR,
670 Json(serde_json::json!({"error": "write_failed"})),
671 );
672 }
673 (
674 StatusCode::OK,
675 Json(serde_json::json!({
676 "status": "received",
677 "content_type": "handoff_bundle",
678 })),
679 )
680 }
681 _ => (
682 StatusCode::OK,
683 Json(serde_json::json!({
684 "status": "received",
685 "content_type": format!("{:?}", envelope.content_type),
686 })),
687 ),
688 }
689}
690
691fn evict_oldest_files(dir: &std::path::Path, max_files: usize) {
692 let Ok(entries) = std::fs::read_dir(dir) else {
693 return;
694 };
695 let mut files: Vec<(std::time::SystemTime, std::path::PathBuf)> = entries
696 .filter_map(|e| {
697 let e = e.ok()?;
698 let meta = e.metadata().ok()?;
699 if meta.is_file() {
700 Some((meta.modified().unwrap_or(std::time::UNIX_EPOCH), e.path()))
701 } else {
702 None
703 }
704 })
705 .collect();
706
707 if files.len() < max_files {
708 return;
709 }
710 files.sort_by_key(|(mtime, _)| *mtime);
711 let to_remove = files.len().saturating_sub(max_files.saturating_sub(1));
712 for (_, path) in files.into_iter().take(to_remove) {
713 let _ = std::fs::remove_file(path);
714 }
715}
716
717async fn a2a_jsonrpc(Json(body): Json<Value>) -> impl IntoResponse {
718 let req: crate::core::a2a::a2a_compat::JsonRpcRequest = match serde_json::from_value(body) {
719 Ok(r) => r,
720 Err(e) => {
721 tracing::debug!("a2a JSON-RPC parse error: {e}");
722 return (
723 StatusCode::BAD_REQUEST,
724 Json(serde_json::json!({
725 "jsonrpc": "2.0",
726 "id": null,
727 "error": {"code": -32700, "message": "invalid request"}
728 })),
729 );
730 }
731 };
732 let resp = crate::core::a2a::a2a_compat::handle_a2a_jsonrpc(&req);
733 let json = serde_json::to_value(resp).unwrap_or_default();
734 (StatusCode::OK, Json(json))
735}
736
737async fn v1_a2a_agent_card(State(state): State<AppState>) -> impl IntoResponse {
738 let card = crate::core::a2a::agent_card::build_agent_card(&state.project_root);
739 (
740 StatusCode::OK,
741 [(header::CONTENT_TYPE, "application/json")],
742 Json(card),
743 )
744}
745
746async fn mcp_server_card() -> impl IntoResponse {
747 let card = serde_json::json!({
748 "name": "lean-ctx",
749 "version": env!("CARGO_PKG_VERSION"),
750 "description": "Context Infrastructure Layer — compression, caching, governance for AI agents",
751 "capabilities": {
752 "tools": true,
753 "resources": false,
754 "prompts": false,
755 "sampling": false
756 },
757 "tool_categories": [
758 {"name": "file_operations", "tools": ["ctx_read", "ctx_search", "ctx_tree", "ctx_edit"], "avg_token_cost": 150},
759 {"name": "session_management", "tools": ["ctx_session", "ctx_compress", "ctx_dedup", "ctx_preload"], "avg_token_cost": 80},
760 {"name": "intelligence", "tools": ["ctx_knowledge", "ctx_semantic_search", "ctx_graph", "ctx_overview"], "avg_token_cost": 200},
761 {"name": "agent_ops", "tools": ["ctx_agent", "ctx_handoff", "ctx_task", "ctx_share"], "avg_token_cost": 120}
762 ],
763 "features": {
764 "compression": "deterministic AST-based, 40-70% token reduction",
765 "caching": "session-scoped with zstd, re-reads ~13 tokens",
766 "audit_trail": "SHA-256 chained JSONL",
767 "rbac": "5 built-in roles with capability-based access",
768 "sandboxing": "Level 0 (subprocess) + Level 1 (OS-level)",
769 "secret_detection": "8 regex patterns + custom"
770 },
771 "security": {
772 "path_jail": true,
773 "rate_limiting": true,
774 "budget_tracking": true,
775 "signed_handoffs": true,
776 "timing_safe_auth": true
777 }
778 });
779 Json(card)
780}
781
782async fn v1_agents_register(
783 State(state): State<AppState>,
784 Json(body): Json<Value>,
785) -> impl IntoResponse {
786 let agent_type = body
787 .get("agent_type")
788 .and_then(|v| v.as_str())
789 .unwrap_or("unknown");
790 let role = body.get("role").and_then(|v| v.as_str());
791 let project_root = body
792 .get("project_root")
793 .and_then(|v| v.as_str())
794 .unwrap_or(&state.project_root);
795
796 let mut registry = crate::core::agents::AgentRegistry::load_or_create();
797 let agent_id = registry.register(agent_type, role, project_root);
798 let _ = registry.save();
799
800 Json(serde_json::json!({
801 "agent_id": agent_id,
802 "status": "registered"
803 }))
804}
805
806async fn v1_agents_heartbeat(Json(body): Json<Value>) -> impl IntoResponse {
807 let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
808 let mut registry = crate::core::agents::AgentRegistry::load_or_create();
809 registry.update_heartbeat(agent_id);
810 let _ = registry.save();
811 Json(serde_json::json!({"status": "ok"}))
812}
813
814async fn v1_agents_list() -> impl IntoResponse {
815 let registry = crate::core::agents::AgentRegistry::load_or_create();
816 let active = registry.list_active(None);
817 Json(serde_json::json!({
818 "agents": active.iter().map(|a| serde_json::json!({
819 "agent_id": a.agent_id,
820 "agent_type": a.agent_type,
821 "role": a.role,
822 "status": a.status.to_string(),
823 "last_active": a.last_active.to_rfc3339(),
824 })).collect::<Vec<_>>()
825 }))
826}
827
828async fn v1_agents_deregister(Json(body): Json<Value>) -> impl IntoResponse {
829 let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
830 let mut registry = crate::core::agents::AgentRegistry::load_or_create();
831 registry.set_status(
832 agent_id,
833 crate::core::agents::AgentStatus::Finished,
834 Some("deregistered via API"),
835 );
836 let _ = registry.save();
837 Json(serde_json::json!({"status": "deregistered"}))
838}
839
840async fn v1_agents_events_sse(
841) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
842 let stream = futures::stream::unfold(0usize, |last_count| async move {
843 loop {
844 tokio::time::sleep(Duration::from_secs(5)).await;
845 let registry = crate::core::agents::AgentRegistry::load_or_create();
846 let active = registry.list_active(None);
847 let count = active.len();
848 if count != last_count {
849 let data = serde_json::json!({
850 "type": "agents_changed",
851 "active_count": count,
852 "agents": active.iter().map(|a| &a.agent_id).collect::<Vec<_>>(),
853 });
854 return Some((
855 Ok::<_, std::convert::Infallible>(SseEvent::default().data(data.to_string())),
856 count,
857 ));
858 }
859 }
860 });
861
862 Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
863}
864
865fn build_app_router(cfg: &HttpServerConfig) -> Router {
866 let project_root = cfg.project_root.to_string_lossy().to_string();
867 let service_project_root = project_root.clone();
868 let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
869 Ok(LeanCtxServer::new_shared_with_context(
870 &service_project_root,
871 "default",
872 "default",
873 ))
874 };
875 let mcp_http = StreamableHttpService::new(
876 service_factory,
877 Arc::new(
878 rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
879 ),
880 cfg.mcp_http_config(),
881 );
882
883 let rest_server = LeanCtxServer::new_shared_with_context(&project_root, "default", "default");
884
885 let state = AppState {
886 token: cfg.effective_auth_token(),
887 concurrency: Arc::new(tokio::sync::Semaphore::new(cfg.max_concurrency.max(1))),
888 rate: Arc::new(RateLimiter::new(cfg.max_rps, cfg.rate_burst)),
889 project_root,
890 timeout: Duration::from_millis(cfg.request_timeout_ms.max(1)),
891 server: rest_server,
892 };
893
894 Router::new()
895 .route("/health", get(health))
896 .route("/v1/shutdown", axum::routing::post(v1_shutdown))
897 .route("/v1/manifest", get(v1_manifest))
898 .route("/v1/tools", get(v1_tools))
899 .route("/v1/tools/call", axum::routing::post(v1_tool_call))
900 .route("/v1/events", get(v1_events))
901 .route(
902 "/v1/context/summary",
903 get(context_views::v1_context_summary),
904 )
905 .route("/v1/events/search", get(context_views::v1_events_search))
906 .route("/v1/events/lineage", get(context_views::v1_event_lineage))
907 .route("/v1/metrics", get(v1_metrics))
908 .route("/v1/audit/events", get(v1_audit_events))
909 .route("/v1/a2a/handoff", axum::routing::post(v1_a2a_handoff))
910 .route("/v1/a2a/agent-card", get(v1_a2a_agent_card))
911 .route("/.well-known/agent.json", get(v1_a2a_agent_card))
912 .route("/.well-known/mcp-server.json", get(mcp_server_card))
913 .route("/a2a", axum::routing::post(a2a_jsonrpc))
914 .route(
915 "/v1/agents/register",
916 axum::routing::post(v1_agents_register),
917 )
918 .route(
919 "/v1/agents/heartbeat",
920 axum::routing::post(v1_agents_heartbeat),
921 )
922 .route("/v1/agents/list", get(v1_agents_list))
923 .route(
924 "/v1/agents/deregister",
925 axum::routing::post(v1_agents_deregister),
926 )
927 .route("/v1/agents/events", get(v1_agents_events_sse))
928 .fallback_service(mcp_http)
929 .layer(axum::extract::DefaultBodyLimit::max(cfg.max_body_bytes))
930 .layer(middleware::from_fn_with_state(
931 state.clone(),
932 rate_limit_middleware,
933 ))
934 .layer(middleware::from_fn_with_state(
935 state.clone(),
936 concurrency_middleware,
937 ))
938 .layer(middleware::from_fn_with_state(
939 state.clone(),
940 auth_middleware,
941 ))
942 .with_state(state)
943}
944
945pub async fn serve(cfg: HttpServerConfig) -> Result<()> {
946 crate::core::protocol::set_mcp_context(true);
947 cfg.validate()?;
948
949 let addr: SocketAddr = format!("{}:{}", cfg.host, cfg.port)
950 .parse()
951 .context("invalid host/port")?;
952
953 let app = build_app_router(&cfg);
954
955 let listener = tokio::net::TcpListener::bind(addr)
956 .await
957 .with_context(|| format!("bind {addr}"))?;
958
959 tracing::info!(
960 "lean-ctx Streamable HTTP server listening on http://{addr} (project_root={})",
961 cfg.project_root.display()
962 );
963
964 axum::serve(listener, app)
965 .with_graceful_shutdown(async move {
966 let _ = tokio::signal::ctrl_c().await;
967 })
968 .await
969 .context("http server")?;
970 Ok(())
971}
972
973#[cfg(windows)]
974impl axum::serve::Listener for crate::ipc::NamedPipeListener {
975 type Io = tokio::net::windows::named_pipe::NamedPipeServer;
976 type Addr = String;
977
978 async fn accept(&mut self) -> (Self::Io, Self::Addr) {
979 loop {
980 match self.accept_pipe().await {
981 Ok(pipe) => return (pipe, self.name().to_string()),
982 Err(e) => {
983 tracing::error!("named pipe accept error: {e}");
984 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
985 }
986 }
987 }
988 }
989
990 fn local_addr(&self) -> std::io::Result<Self::Addr> {
991 Ok(self.name().to_string())
992 }
993}
994
995pub async fn serve_ipc(cfg: HttpServerConfig, addr: crate::ipc::DaemonAddr) -> Result<()> {
998 cfg.validate()?;
999
1000 match addr {
1001 #[cfg(unix)]
1002 crate::ipc::DaemonAddr::Unix(ref path) => {
1003 let app = build_app_router(&cfg);
1004 let listener = crate::ipc::bind_listener(&addr)?;
1005
1006 tracing::info!(
1007 "lean-ctx daemon listening on {} (project_root={})",
1008 path.display(),
1009 cfg.project_root.display()
1010 );
1011
1012 axum::serve(listener, app.into_make_service())
1013 .with_graceful_shutdown(async move {
1014 let _ = tokio::signal::ctrl_c().await;
1015 })
1016 .await
1017 .context("ipc server")?;
1018 Ok(())
1019 }
1020 #[cfg(windows)]
1021 crate::ipc::DaemonAddr::NamedPipe(ref name) => {
1022 let app = build_app_router(&cfg);
1023 let listener = crate::ipc::bind_listener(&addr)?;
1024
1025 tracing::info!(
1026 "lean-ctx daemon listening on {} (project_root={})",
1027 name,
1028 cfg.project_root.display()
1029 );
1030
1031 axum::serve(listener, app.into_make_service())
1032 .with_graceful_shutdown(async move {
1033 let _ = tokio::signal::ctrl_c().await;
1034 })
1035 .await
1036 .context("ipc server")?;
1037 Ok(())
1038 }
1039 }
1040}
1041
1042#[cfg(test)]
1043mod tests {
1044 use super::*;
1045 use axum::body::Body;
1046 use axum::http::Request;
1047 use futures::StreamExt;
1048 use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
1049 use serde_json::json;
1050 use tower::ServiceExt;
1051
1052 async fn read_first_sse_message(body: Body) -> String {
1053 let mut stream = body.into_data_stream();
1054 let mut buf: Vec<u8> = Vec::new();
1055 for _ in 0..32 {
1056 let next = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
1057 let Ok(Some(Ok(bytes))) = next else {
1058 break;
1059 };
1060 buf.extend_from_slice(&bytes);
1061 if buf.windows(2).any(|w| w == b"\n\n") {
1062 break;
1063 }
1064 }
1065 String::from_utf8_lossy(&buf).to_string()
1066 }
1067
1068 #[tokio::test]
1069 async fn auth_token_blocks_requests_without_bearer_header() {
1070 let dir = tempfile::tempdir().expect("tempdir");
1071 let root_str = dir.path().to_string_lossy().to_string();
1072 let service_project_root = root_str.clone();
1073 let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
1074 Ok(LeanCtxServer::new_shared_with_context(
1075 &service_project_root,
1076 "default",
1077 "default",
1078 ))
1079 };
1080 let cfg = StreamableHttpServerConfig::default()
1081 .with_stateful_mode(false)
1082 .with_json_response(true);
1083
1084 let mcp_http = StreamableHttpService::new(
1085 service_factory,
1086 Arc::new(
1087 rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
1088 ),
1089 cfg,
1090 );
1091
1092 let state = AppState {
1093 token: Some("secret".to_string()),
1094 concurrency: Arc::new(tokio::sync::Semaphore::new(4)),
1095 rate: Arc::new(RateLimiter::new(50, 100)),
1096 project_root: root_str.clone(),
1097 timeout: Duration::from_secs(30),
1098 server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1099 };
1100
1101 let app = Router::new()
1102 .fallback_service(mcp_http)
1103 .layer(middleware::from_fn_with_state(
1104 state.clone(),
1105 auth_middleware,
1106 ))
1107 .with_state(state);
1108
1109 let body = json!({
1110 "jsonrpc": "2.0",
1111 "id": 1,
1112 "method": "tools/list",
1113 "params": {}
1114 })
1115 .to_string();
1116
1117 let req = Request::builder()
1118 .method("POST")
1119 .uri("/")
1120 .header("Host", "localhost")
1121 .header("Accept", "application/json, text/event-stream")
1122 .header("Content-Type", "application/json")
1123 .body(Body::from(body))
1124 .expect("request");
1125
1126 let resp = app.clone().oneshot(req).await.expect("resp");
1127 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1128 }
1129
1130 #[tokio::test]
1131 async fn mcp_service_factory_isolates_per_client_state() {
1132 let dir = tempfile::tempdir().expect("tempdir");
1133 let root_str = dir.path().to_string_lossy().to_string();
1134
1135 let service_project_root = root_str.clone();
1137 let service_factory = move || -> Result<LeanCtxServer, std::convert::Infallible> {
1138 Ok(LeanCtxServer::new_shared_with_context(
1139 &service_project_root,
1140 "default",
1141 "default",
1142 ))
1143 };
1144
1145 let s1 = service_factory().expect("server 1");
1146 let s2 = service_factory().expect("server 2");
1147
1148 *s1.client_name.write().await = "client-a".to_string();
1151 *s2.client_name.write().await = "client-b".to_string();
1152
1153 let a = s1.client_name.read().await.clone();
1154 let b = s2.client_name.read().await.clone();
1155 assert_eq!(a, "client-a");
1156 assert_eq!(b, "client-b");
1157 }
1158
1159 #[tokio::test]
1160 async fn rate_limit_returns_429_when_exhausted() {
1161 let state = AppState {
1162 token: None,
1163 concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1164 rate: Arc::new(RateLimiter::new(1, 1)),
1165 project_root: ".".to_string(),
1166 timeout: Duration::from_secs(30),
1167 server: LeanCtxServer::new_shared_with_context(".", "default", "default"),
1168 };
1169
1170 let app = Router::new()
1171 .route("/limited", get(|| async { (StatusCode::OK, "ok\n") }))
1172 .layer(middleware::from_fn_with_state(
1173 state.clone(),
1174 rate_limit_middleware,
1175 ))
1176 .with_state(state);
1177
1178 let req1 = Request::builder()
1179 .method("GET")
1180 .uri("/limited")
1181 .header("Host", "localhost")
1182 .body(Body::empty())
1183 .expect("req1");
1184 let resp1 = app.clone().oneshot(req1).await.expect("resp1");
1185 assert_eq!(resp1.status(), StatusCode::OK);
1186
1187 let req2 = Request::builder()
1188 .method("GET")
1189 .uri("/limited")
1190 .header("Host", "localhost")
1191 .body(Body::empty())
1192 .expect("req2");
1193 let resp2 = app.clone().oneshot(req2).await.expect("resp2");
1194 assert_eq!(resp2.status(), StatusCode::TOO_MANY_REQUESTS);
1195 }
1196
1197 #[tokio::test]
1198 async fn audit_events_endpoint_returns_json() {
1199 let dir = tempfile::tempdir().expect("tempdir");
1200 let root_str = dir.path().to_string_lossy().to_string();
1201
1202 let state = AppState {
1203 token: None,
1204 concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1205 rate: Arc::new(RateLimiter::new(50, 100)),
1206 project_root: root_str.clone(),
1207 timeout: Duration::from_secs(30),
1208 server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1209 };
1210
1211 let app = Router::new()
1212 .route("/v1/audit/events", get(v1_audit_events))
1213 .with_state(state);
1214
1215 let req = Request::builder()
1216 .method("GET")
1217 .uri("/v1/audit/events?limit=10")
1218 .header("Host", "localhost")
1219 .body(Body::empty())
1220 .unwrap();
1221
1222 let resp = app.oneshot(req).await.unwrap();
1223 assert_eq!(resp.status(), StatusCode::OK);
1224
1225 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1226 .await
1227 .unwrap();
1228 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1229 assert!(json.get("cross_project_events").unwrap().is_array());
1230 assert!(json.get("audit_trail").unwrap().is_array());
1231 }
1232
1233 #[tokio::test]
1234 async fn events_endpoint_replays_tool_call_event() {
1235 use crate::core::context_os::{self, ContextEventKindV1};
1236
1237 let dir = tempfile::tempdir().expect("tempdir");
1238 std::fs::create_dir_all(dir.path().join(".git")).expect("git marker");
1239 std::fs::write(dir.path().join("a.txt"), "ok").expect("file");
1240 let root_str = dir.path().to_string_lossy().to_string();
1241
1242 let state = AppState {
1243 token: None,
1244 concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1245 rate: Arc::new(RateLimiter::new(50, 100)),
1246 project_root: root_str.clone(),
1247 timeout: Duration::from_secs(30),
1248 server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1249 };
1250
1251 let app = Router::new()
1252 .route("/v1/events", get(v1_events))
1253 .with_state(state);
1254
1255 let rt = context_os::runtime();
1257 rt.bus.append(
1258 "ws1",
1259 "ch1",
1260 &ContextEventKindV1::ToolCallRecorded,
1261 Some("test-agent"),
1262 json!({"tool": "ctx_session", "action": "status"}),
1263 );
1264
1265 let req = Request::builder()
1266 .method("GET")
1267 .uri("/v1/events?workspaceId=ws1&channelId=ch1&since=0&limit=1")
1268 .header("Host", "localhost")
1269 .header("Accept", "text/event-stream")
1270 .body(Body::empty())
1271 .expect("req");
1272 let resp = app.clone().oneshot(req).await.expect("events");
1273 assert_eq!(resp.status(), StatusCode::OK);
1274
1275 let msg = read_first_sse_message(resp.into_body()).await;
1276 assert!(msg.contains("event: tool_call_recorded"), "msg={msg:?}");
1277 assert!(msg.contains("\"ws1\""), "msg={msg:?}");
1278 assert!(msg.contains("\"ch1\""), "msg={msg:?}");
1279 }
1280}