Skip to main content

lean_ctx/http_server/
mod.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{anyhow, Context, Result};
6use axum::{
7    extract::Json,
8    extract::Query,
9    extract::State,
10    http::{header, Request, StatusCode},
11    middleware::{self, Next},
12    response::sse::{Event as SseEvent, KeepAlive, Sse},
13    response::{IntoResponse, Response},
14    routing::get,
15    Router,
16};
17use futures::Stream;
18use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
19use serde::Deserialize;
20use serde_json::Value;
21use tokio::sync::broadcast;
22use tokio::time::{Duration, Instant};
23
24use crate::core::context_os::ContextOsMetrics;
25use crate::engine::ContextEngine;
26use crate::tools::LeanCtxServer;
27
28pub mod context_views;
29pub mod savings_ingest;
30pub mod team;
31
32/// Wrapper stream that calls `record_sse_disconnect` on drop.
33use std::pin::Pin;
34
35pub(crate) struct SseDisconnectGuard<I> {
36    pub(crate) inner: Pin<Box<dyn Stream<Item = I> + Send>>,
37    pub(crate) metrics: Arc<ContextOsMetrics>,
38}
39
40impl<I> Stream for SseDisconnectGuard<I> {
41    type Item = I;
42
43    fn poll_next(
44        mut self: Pin<&mut Self>,
45        cx: &mut std::task::Context<'_>,
46    ) -> std::task::Poll<Option<Self::Item>> {
47        self.inner.as_mut().poll_next(cx)
48    }
49}
50
51impl<I> Drop for SseDisconnectGuard<I> {
52    fn drop(&mut self) {
53        self.metrics.record_sse_disconnect();
54    }
55}
56
57const MAX_ID_LEN: usize = 64;
58
59fn sanitize_id(raw: &str) -> String {
60    let trimmed = raw.trim();
61    if trimmed.is_empty() {
62        return "default".to_string();
63    }
64    let cleaned: String = trimmed
65        .chars()
66        .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
67        .take(MAX_ID_LEN)
68        .collect();
69    if cleaned.is_empty() {
70        "default".to_string()
71    } else {
72        cleaned
73    }
74}
75
76#[derive(Clone, Debug)]
77pub struct HttpServerConfig {
78    pub host: String,
79    pub port: u16,
80    pub project_root: PathBuf,
81    pub auth_token: Option<String>,
82    pub stateful_mode: bool,
83    pub json_response: bool,
84    pub disable_host_check: bool,
85    pub allowed_hosts: Vec<String>,
86    pub max_body_bytes: usize,
87    pub max_concurrency: usize,
88    pub max_rps: u32,
89    pub rate_burst: u32,
90    pub request_timeout_ms: u64,
91}
92
93impl Default for HttpServerConfig {
94    fn default() -> Self {
95        let project_root = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
96        Self {
97            host: "127.0.0.1".to_string(),
98            port: 8080,
99            project_root,
100            auth_token: None,
101            stateful_mode: false,
102            json_response: true,
103            disable_host_check: false,
104            allowed_hosts: Vec::new(),
105            max_body_bytes: 2 * 1024 * 1024,
106            max_concurrency: 32,
107            max_rps: 50,
108            rate_burst: 100,
109            request_timeout_ms: 30_000,
110        }
111    }
112}
113
114impl HttpServerConfig {
115    pub fn validate(&self) -> Result<()> {
116        let host = self.host.trim().to_lowercase();
117        let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
118        if !is_loopback && self.auth_token.as_deref().unwrap_or("").is_empty() {
119            return Err(anyhow!(
120                "Refusing to bind to host='{host}' without auth. Provide --auth-token (or bind to 127.0.0.1)."
121            ));
122        }
123        Ok(())
124    }
125
126    pub fn effective_auth_token(&self) -> Option<String> {
127        if let Some(ref token) = self.auth_token {
128            if !token.is_empty() {
129                return Some(token.clone());
130            }
131        }
132        let host = self.host.trim().to_lowercase();
133        let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
134        if is_loopback {
135            let auto_token = crate::core::session_token::generate_token();
136            eprintln!(
137                "[lean-ctx] Auto-generated auth token for loopback: {auto_token}\n\
138                 Pass as Bearer token or set --auth-token explicitly."
139            );
140            Some(auto_token)
141        } else {
142            None
143        }
144    }
145
146    fn mcp_http_config(&self) -> StreamableHttpServerConfig {
147        let mut cfg = StreamableHttpServerConfig::default()
148            .with_stateful_mode(self.stateful_mode)
149            .with_json_response(self.json_response);
150
151        if self.disable_host_check {
152            tracing::warn!(
153                "⚠ --disable-host-check is active: DNS rebinding protection is OFF. \
154                 Do NOT use this in production or on non-loopback interfaces."
155            );
156            cfg = cfg.disable_allowed_hosts();
157            return cfg;
158        }
159
160        if !self.allowed_hosts.is_empty() {
161            cfg = cfg.with_allowed_hosts(self.allowed_hosts.clone());
162            return cfg;
163        }
164
165        // Keep rmcp's secure loopback defaults; also allow the configured host (if it's loopback).
166        let host = self.host.trim();
167        if host == "127.0.0.1" || host == "localhost" || host == "::1" {
168            cfg.allowed_hosts.push(host.to_string());
169        }
170
171        cfg
172    }
173}
174
175#[derive(Clone)]
176struct AppState {
177    token: Option<String>,
178    concurrency: Arc<tokio::sync::Semaphore>,
179    rate: Arc<RateLimiter>,
180    project_root: String,
181    timeout: Duration,
182    server: LeanCtxServer,
183}
184
185#[derive(Debug)]
186struct RateLimiter {
187    max_rps: f64,
188    burst: f64,
189    state: tokio::sync::Mutex<RateState>,
190}
191
192#[derive(Debug, Clone, Copy)]
193struct RateState {
194    tokens: f64,
195    last: Instant,
196}
197
198impl RateLimiter {
199    fn new(max_rps: u32, burst: u32) -> Self {
200        let now = Instant::now();
201        Self {
202            max_rps: (max_rps.max(1)) as f64,
203            burst: (burst.max(1)) as f64,
204            state: tokio::sync::Mutex::new(RateState {
205                tokens: (burst.max(1)) as f64,
206                last: now,
207            }),
208        }
209    }
210
211    async fn allow(&self) -> bool {
212        let mut s = self.state.lock().await;
213        let now = Instant::now();
214        let elapsed = now.saturating_duration_since(s.last);
215        let refill = elapsed.as_secs_f64() * self.max_rps;
216        s.tokens = (s.tokens + refill).min(self.burst);
217        s.last = now;
218        if s.tokens >= 1.0 {
219            s.tokens -= 1.0;
220            true
221        } else {
222            false
223        }
224    }
225}
226
227async fn auth_middleware(
228    State(state): State<AppState>,
229    req: Request<axum::body::Body>,
230    next: Next,
231) -> Response {
232    if state.token.is_none() {
233        return next.run(req).await;
234    }
235
236    if req.uri().path() == "/health" {
237        return next.run(req).await;
238    }
239
240    let expected = state.token.as_deref().unwrap_or("");
241    let Some(h) = req.headers().get(header::AUTHORIZATION) else {
242        return json_error(
243            StatusCode::UNAUTHORIZED,
244            "unauthorized",
245            "missing Authorization header",
246        );
247    };
248    let Ok(s) = h.to_str() else {
249        return json_error(
250            StatusCode::UNAUTHORIZED,
251            "unauthorized",
252            "malformed Authorization header",
253        );
254    };
255    let Some(token) = s
256        .strip_prefix("Bearer ")
257        .or_else(|| s.strip_prefix("bearer "))
258    else {
259        return json_error(
260            StatusCode::UNAUTHORIZED,
261            "unauthorized",
262            "Authorization must use the Bearer scheme",
263        );
264    };
265    if !constant_time_eq(token.as_bytes(), expected.as_bytes()) {
266        return json_error(
267            StatusCode::UNAUTHORIZED,
268            "unauthorized",
269            "invalid bearer token",
270        );
271    }
272
273    next.run(req).await
274}
275
276/// Structured REST error envelope: `{ "error": <human message>, "error_code": <stable code> }`.
277///
278/// `error_code` is the stable, machine-readable string SDKs switch on; `error` carries the
279/// human-facing message. Used for every REST (non-A2A) error so clients branch on a code
280/// instead of parsing prose. The A2A JSON-RPC surface keeps its own `-32xxx` envelope.
281pub(crate) fn json_error(status: StatusCode, error_code: &str, message: &str) -> Response {
282    (
283        status,
284        Json(serde_json::json!({ "error": message, "error_code": error_code })),
285    )
286        .into_response()
287}
288
289fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
290    use subtle::ConstantTimeEq;
291    if a.len() != b.len() {
292        return false;
293    }
294    bool::from(a.ct_eq(b))
295}
296
297async fn rate_limit_middleware(
298    State(state): State<AppState>,
299    req: Request<axum::body::Body>,
300    next: Next,
301) -> Response {
302    if !state.rate.allow().await {
303        return StatusCode::TOO_MANY_REQUESTS.into_response();
304    }
305    next.run(req).await
306}
307
308async fn concurrency_middleware(
309    State(state): State<AppState>,
310    req: Request<axum::body::Body>,
311    next: Next,
312) -> Response {
313    let Ok(permit) = state.concurrency.clone().try_acquire_owned() else {
314        return StatusCode::TOO_MANY_REQUESTS.into_response();
315    };
316    let resp = next.run(req).await;
317    drop(permit);
318    resp
319}
320
321async fn health() -> impl IntoResponse {
322    (StatusCode::OK, "ok\n")
323}
324
325async fn v1_shutdown() -> impl IntoResponse {
326    tokio::spawn(async {
327        tokio::time::sleep(Duration::from_millis(100)).await;
328        std::process::exit(0);
329    });
330    (StatusCode::OK, "shutting down\n")
331}
332
333#[derive(Debug, Deserialize)]
334#[serde(rename_all = "camelCase")]
335struct ToolCallBody {
336    name: String,
337    #[serde(default)]
338    arguments: Option<Value>,
339    #[serde(default)]
340    _workspace_id: Option<String>,
341    #[serde(default)]
342    _channel_id: Option<String>,
343}
344
345#[derive(Debug, Deserialize)]
346#[serde(rename_all = "camelCase")]
347struct EventsQuery {
348    #[serde(default)]
349    workspace_id: Option<String>,
350    #[serde(default)]
351    channel_id: Option<String>,
352    #[serde(default)]
353    since: Option<i64>,
354    #[serde(default)]
355    limit: Option<usize>,
356    /// Comma-separated event kind filter (e.g. `tool_call,session_start`).
357    /// When set, only matching events are delivered via SSE.
358    #[serde(default)]
359    kind: Option<String>,
360}
361
362async fn v1_manifest(State(state): State<AppState>) -> impl IntoResponse {
363    let _ = state;
364    let v = crate::core::mcp_manifest::manifest_value();
365    (StatusCode::OK, Json(v))
366}
367
368#[derive(Debug, Deserialize)]
369#[serde(rename_all = "camelCase")]
370struct ToolsQuery {
371    #[serde(default)]
372    offset: Option<usize>,
373    #[serde(default)]
374    limit: Option<usize>,
375}
376
377async fn v1_tools(State(state): State<AppState>, Query(q): Query<ToolsQuery>) -> impl IntoResponse {
378    let _ = state;
379    let v = crate::core::mcp_manifest::manifest_value();
380    let tools = v
381        .get("tools")
382        .and_then(|t| t.get("granular"))
383        .cloned()
384        .unwrap_or(Value::Array(vec![]));
385
386    let all = tools.as_array().cloned().unwrap_or_default();
387    let total = all.len();
388    let offset = q.offset.unwrap_or(0).min(total);
389    let limit = q.limit.unwrap_or(200).min(500);
390    let page = all.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
391
392    (
393        StatusCode::OK,
394        Json(serde_json::json!({
395            "tools": page,
396            "total": total,
397            "offset": offset,
398            "limit": limit,
399        })),
400    )
401}
402
403async fn v1_tool_call(
404    State(state): State<AppState>,
405    Json(body): Json<ToolCallBody>,
406) -> impl IntoResponse {
407    let engine = ContextEngine::from_server(state.server.clone());
408    match tokio::time::timeout(
409        state.timeout,
410        engine.call_tool_value(&body.name, body.arguments),
411    )
412    .await
413    {
414        Ok(Ok(v)) => (StatusCode::OK, Json(serde_json::json!({ "result": v }))).into_response(),
415        Ok(Err(e)) => {
416            tracing::warn!("tool call error: {e}");
417            json_error(
418                StatusCode::BAD_REQUEST,
419                "tool_error",
420                "tool execution failed",
421            )
422        }
423        Err(_) => json_error(
424            StatusCode::GATEWAY_TIMEOUT,
425            "request_timeout",
426            "tool call timed out",
427        ),
428    }
429}
430
431async fn v1_events(
432    State(state): State<AppState>,
433    Query(q): Query<EventsQuery>,
434) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
435    use crate::core::context_os::{redact_event_payload, ContextEventV1, RedactionLevel};
436
437    let ws = sanitize_id(&q.workspace_id.unwrap_or_else(|| "default".to_string()));
438    let ch = sanitize_id(&q.channel_id.unwrap_or_else(|| "default".to_string()));
439    let _ = &state.project_root;
440    let since = q.since.unwrap_or(0);
441    let limit = q.limit.unwrap_or(200).min(1000);
442    let redaction = RedactionLevel::RefsOnly;
443
444    let kind_filter: Option<Vec<String>> = q
445        .kind
446        .as_deref()
447        .map(|k| k.split(',').map(|s| s.trim().to_string()).collect());
448
449    let rt = crate::core::context_os::runtime();
450    let replay = rt.bus.read(&ws, &ch, since, limit);
451
452    let replay = if let Some(ref kinds) = kind_filter {
453        replay
454            .into_iter()
455            .filter(|ev| kinds.contains(&ev.kind))
456            .collect()
457    } else {
458        replay
459    };
460
461    let rx = if let Some(ref kinds) = kind_filter {
462        let kind_refs: Vec<&str> = kinds.iter().map(String::as_str).collect();
463        let filter = crate::core::context_os::TopicFilter::kinds(&kind_refs);
464        if let Some(sub) = rt.bus.subscribe_filtered(&ws, &ch, filter) {
465            crate::core::context_os::SubscriptionKind::Filtered(sub)
466        } else {
467            tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
468            let (_, rx) = broadcast::channel::<ContextEventV1>(1);
469            crate::core::context_os::SubscriptionKind::Unfiltered(rx)
470        }
471    } else if let Some(sub) = rt.bus.subscribe(&ws, &ch) {
472        crate::core::context_os::SubscriptionKind::Unfiltered(sub)
473    } else {
474        tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
475        let (_, rx) = broadcast::channel::<ContextEventV1>(1);
476        crate::core::context_os::SubscriptionKind::Unfiltered(rx)
477    };
478
479    rt.metrics.record_sse_connect();
480    rt.metrics.record_events_replayed(replay.len() as u64);
481    rt.metrics.record_workspace_active(&ws);
482
483    let bus = rt.bus.clone();
484    let metrics = rt.metrics.clone();
485    let pending: std::collections::VecDeque<ContextEventV1> = replay.into();
486
487    let stream = futures::stream::unfold(
488        (
489            pending,
490            rx,
491            ws.clone(),
492            ch.clone(),
493            since,
494            redaction,
495            bus,
496            metrics,
497        ),
498        |(mut pending, mut rx, ws, ch, mut last_id, redaction, bus, metrics)| async move {
499            if let Some(mut ev) = pending.pop_front() {
500                last_id = ev.id;
501                redact_event_payload(&mut ev, redaction);
502                let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
503                let evt = SseEvent::default()
504                    .id(ev.id.to_string())
505                    .event(ev.kind)
506                    .data(data);
507                return Some((
508                    Ok(evt),
509                    (pending, rx, ws, ch, last_id, redaction, bus, metrics),
510                ));
511            }
512
513            loop {
514                match rx.recv().await {
515                    Ok(mut ev) if ev.id > last_id => {
516                        last_id = ev.id;
517                        redact_event_payload(&mut ev, redaction);
518                        let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
519                        let evt = SseEvent::default()
520                            .id(ev.id.to_string())
521                            .event(ev.kind)
522                            .data(data);
523                        return Some((
524                            Ok(evt),
525                            (pending, rx, ws, ch, last_id, redaction, bus, metrics),
526                        ));
527                    }
528                    Ok(_) => {}
529                    Err(broadcast::error::RecvError::Closed) => return None,
530                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
531                        let missed = bus.read(&ws, &ch, last_id, skipped as usize);
532                        metrics.record_events_replayed(missed.len() as u64);
533                        for ev in missed {
534                            last_id = last_id.max(ev.id);
535                            pending.push_back(ev);
536                        }
537                    }
538                }
539            }
540        },
541    );
542
543    let metrics_ref = rt.metrics.clone();
544    let guarded = SseDisconnectGuard {
545        inner: Box::pin(stream),
546        metrics: metrics_ref,
547    };
548
549    Sse::new(guarded).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
550}
551
552#[derive(Debug, Deserialize)]
553struct AuditEventsQuery {
554    #[serde(default = "default_audit_limit")]
555    limit: usize,
556}
557
558fn default_audit_limit() -> usize {
559    100
560}
561
562async fn v1_audit_events(Query(q): Query<AuditEventsQuery>) -> impl IntoResponse {
563    let capped = q.limit.min(1000);
564    let boundary_events = crate::core::memory_boundary::load_audit_events(capped);
565    let trail_events = crate::core::audit_trail::load_recent(capped);
566
567    Json(serde_json::json!({
568        "cross_project_events": boundary_events,
569        "audit_trail": trail_events,
570    }))
571}
572
573async fn v1_metrics(State(_state): State<AppState>) -> impl IntoResponse {
574    let rt = crate::core::context_os::runtime();
575    let snap = rt.metrics.snapshot();
576    (
577        StatusCode::OK,
578        Json(serde_json::to_value(snap).unwrap_or_default()),
579    )
580}
581
582const MAX_HANDOFF_PAYLOAD_BYTES: usize = 1_000_000;
583const MAX_HANDOFF_FILES: usize = 50;
584
585async fn v1_a2a_handoff(
586    State(state): State<AppState>,
587    Json(body): Json<Value>,
588) -> impl IntoResponse {
589    let envelope = match crate::core::a2a_transport::parse_envelope(
590        &serde_json::to_string(&body).unwrap_or_default(),
591    ) {
592        Ok(env) => env,
593        Err(e) => {
594            tracing::warn!("a2a handoff parse error: {e}");
595            return (
596                StatusCode::BAD_REQUEST,
597                Json(serde_json::json!({"error": "invalid_envelope"})),
598            );
599        }
600    };
601
602    if envelope.payload_json.len() > MAX_HANDOFF_PAYLOAD_BYTES {
603        tracing::warn!(
604            "a2a handoff payload too large: {} bytes (limit {MAX_HANDOFF_PAYLOAD_BYTES})",
605            envelope.payload_json.len()
606        );
607        return (
608            StatusCode::PAYLOAD_TOO_LARGE,
609            Json(serde_json::json!({"error": "payload_too_large"})),
610        );
611    }
612
613    let rt = crate::core::context_os::runtime();
614    rt.bus.append(
615        &state.project_root,
616        "a2a",
617        &crate::core::context_os::ContextEventKindV1::SessionMutated,
618        Some(&envelope.sender.agent_id),
619        serde_json::json!({
620            "type": "handoff_received",
621            "content_type": format!("{:?}", envelope.content_type),
622            "sender": envelope.sender.agent_id,
623            "payload_size": envelope.payload_json.len(),
624        }),
625    );
626
627    match envelope.content_type {
628        crate::core::a2a_transport::TransportContentType::ContextPackage => {
629            let dir = std::path::Path::new(&state.project_root)
630                .join(".lean-ctx")
631                .join("handoffs")
632                .join("packages");
633            let _ = std::fs::create_dir_all(&dir);
634            evict_oldest_files(&dir, MAX_HANDOFF_FILES);
635            let out = dir.join(format!(
636                "ctx-{}.{}",
637                chrono::Utc::now().format("%Y%m%d_%H%M%S"),
638                crate::core::contracts::PACKAGE_EXTENSION
639            ));
640            if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
641                tracing::error!("a2a handoff write failed: {e}");
642                return (
643                    StatusCode::INTERNAL_SERVER_ERROR,
644                    Json(serde_json::json!({"error": "write_failed"})),
645                );
646            }
647            (
648                StatusCode::OK,
649                Json(serde_json::json!({
650                    "status": "received",
651                    "content_type": "context_package",
652                })),
653            )
654        }
655        crate::core::a2a_transport::TransportContentType::HandoffBundle => {
656            let dir = std::path::Path::new(&state.project_root)
657                .join(".lean-ctx")
658                .join("handoffs");
659            let _ = std::fs::create_dir_all(&dir);
660            evict_oldest_files(&dir, MAX_HANDOFF_FILES);
661            let out = dir.join(format!(
662                "received-{}.json",
663                chrono::Utc::now().format("%Y%m%d_%H%M%S")
664            ));
665            if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
666                tracing::error!("a2a handoff write failed: {e}");
667                return (
668                    StatusCode::INTERNAL_SERVER_ERROR,
669                    Json(serde_json::json!({"error": "write_failed"})),
670                );
671            }
672            (
673                StatusCode::OK,
674                Json(serde_json::json!({
675                    "status": "received",
676                    "content_type": "handoff_bundle",
677                })),
678            )
679        }
680        _ => (
681            StatusCode::OK,
682            Json(serde_json::json!({
683                "status": "received",
684                "content_type": format!("{:?}", envelope.content_type),
685            })),
686        ),
687    }
688}
689
690fn evict_oldest_files(dir: &std::path::Path, max_files: usize) {
691    let Ok(entries) = std::fs::read_dir(dir) else {
692        return;
693    };
694    let mut files: Vec<(std::time::SystemTime, std::path::PathBuf)> = entries
695        .filter_map(|e| {
696            let e = e.ok()?;
697            let meta = e.metadata().ok()?;
698            if meta.is_file() {
699                Some((meta.modified().unwrap_or(std::time::UNIX_EPOCH), e.path()))
700            } else {
701                None
702            }
703        })
704        .collect();
705
706    if files.len() < max_files {
707        return;
708    }
709    files.sort_by_key(|(mtime, _)| *mtime);
710    let to_remove = files.len().saturating_sub(max_files.saturating_sub(1));
711    for (_, path) in files.into_iter().take(to_remove) {
712        let _ = std::fs::remove_file(path);
713    }
714}
715
716async fn a2a_jsonrpc(Json(body): Json<Value>) -> impl IntoResponse {
717    let req: crate::core::a2a::a2a_compat::JsonRpcRequest = match serde_json::from_value(body) {
718        Ok(r) => r,
719        Err(e) => {
720            tracing::debug!("a2a JSON-RPC parse error: {e}");
721            return (
722                StatusCode::BAD_REQUEST,
723                Json(serde_json::json!({
724                    "jsonrpc": "2.0",
725                    "id": null,
726                    "error": {"code": -32700, "message": "invalid request"}
727                })),
728            );
729        }
730    };
731    let resp = crate::core::a2a::a2a_compat::handle_a2a_jsonrpc(&req);
732    let json = serde_json::to_value(resp).unwrap_or_default();
733    (StatusCode::OK, Json(json))
734}
735
736async fn v1_a2a_agent_card(State(state): State<AppState>) -> impl IntoResponse {
737    let card = crate::core::a2a::agent_card::build_agent_card(&state.project_root);
738    (
739        StatusCode::OK,
740        [(header::CONTENT_TYPE, "application/json")],
741        Json(card),
742    )
743}
744
745async fn mcp_server_card() -> impl IntoResponse {
746    let card = serde_json::json!({
747        "name": "lean-ctx",
748        "version": env!("CARGO_PKG_VERSION"),
749        "description": "Context Infrastructure Layer — compression, caching, governance for AI agents",
750        "capabilities": {
751            "tools": true,
752            "resources": false,
753            "prompts": false,
754            "sampling": false
755        },
756        "tool_categories": [
757            {"name": "file_operations", "tools": ["ctx_read", "ctx_search", "ctx_tree", "ctx_edit"], "avg_token_cost": 150},
758            {"name": "session_management", "tools": ["ctx_session", "ctx_compress", "ctx_dedup", "ctx_preload"], "avg_token_cost": 80},
759            {"name": "intelligence", "tools": ["ctx_knowledge", "ctx_semantic_search", "ctx_graph", "ctx_overview"], "avg_token_cost": 200},
760            {"name": "agent_ops", "tools": ["ctx_agent", "ctx_handoff", "ctx_task", "ctx_share"], "avg_token_cost": 120}
761        ],
762        "features": {
763            "compression": "deterministic AST-based, 40-70% token reduction",
764            "caching": "session-scoped with zstd, re-reads ~13 tokens",
765            "audit_trail": "SHA-256 chained JSONL",
766            "rbac": "5 built-in roles with capability-based access",
767            "sandboxing": "Level 0 (subprocess) + Level 1 (OS-level)",
768            "secret_detection": "8 regex patterns + custom"
769        },
770        "security": {
771            "path_jail": true,
772            "rate_limiting": true,
773            "budget_tracking": true,
774            "signed_handoffs": true,
775            "timing_safe_auth": true
776        }
777    });
778    Json(card)
779}
780
781async fn v1_agents_register(
782    State(state): State<AppState>,
783    Json(body): Json<Value>,
784) -> impl IntoResponse {
785    let agent_type = body
786        .get("agent_type")
787        .and_then(|v| v.as_str())
788        .unwrap_or("unknown");
789    let role = body.get("role").and_then(|v| v.as_str());
790    let project_root = body
791        .get("project_root")
792        .and_then(|v| v.as_str())
793        .unwrap_or(&state.project_root);
794
795    let mut registry = crate::core::agents::AgentRegistry::load_or_create();
796    let agent_id = registry.register(agent_type, role, project_root);
797    let _ = registry.save();
798
799    Json(serde_json::json!({
800        "agent_id": agent_id,
801        "status": "registered"
802    }))
803}
804
805async fn v1_agents_heartbeat(Json(body): Json<Value>) -> impl IntoResponse {
806    let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
807    let mut registry = crate::core::agents::AgentRegistry::load_or_create();
808    registry.update_heartbeat(agent_id);
809    let _ = registry.save();
810    Json(serde_json::json!({"status": "ok"}))
811}
812
813async fn v1_agents_list() -> impl IntoResponse {
814    let registry = crate::core::agents::AgentRegistry::load_or_create();
815    let active = registry.list_active(None);
816    Json(serde_json::json!({
817        "agents": active.iter().map(|a| serde_json::json!({
818            "agent_id": a.agent_id,
819            "agent_type": a.agent_type,
820            "role": a.role,
821            "status": a.status.to_string(),
822            "last_active": a.last_active.to_rfc3339(),
823        })).collect::<Vec<_>>()
824    }))
825}
826
827async fn v1_agents_deregister(Json(body): Json<Value>) -> impl IntoResponse {
828    let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
829    let mut registry = crate::core::agents::AgentRegistry::load_or_create();
830    registry.set_status(
831        agent_id,
832        crate::core::agents::AgentStatus::Finished,
833        Some("deregistered via API"),
834    );
835    let _ = registry.save();
836    Json(serde_json::json!({"status": "deregistered"}))
837}
838
839async fn v1_agents_events_sse(
840) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
841    let stream = futures::stream::unfold(0usize, |last_count| async move {
842        loop {
843            tokio::time::sleep(Duration::from_secs(5)).await;
844            let registry = crate::core::agents::AgentRegistry::load_or_create();
845            let active = registry.list_active(None);
846            let count = active.len();
847            if count != last_count {
848                let data = serde_json::json!({
849                    "type": "agents_changed",
850                    "active_count": count,
851                    "agents": active.iter().map(|a| &a.agent_id).collect::<Vec<_>>(),
852                });
853                return Some((
854                    Ok::<_, std::convert::Infallible>(SseEvent::default().data(data.to_string())),
855                    count,
856                ));
857            }
858        }
859    });
860
861    Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
862}
863
864fn build_app_router(cfg: &HttpServerConfig) -> Router {
865    let project_root = cfg.project_root.to_string_lossy().to_string();
866    let service_project_root = project_root.clone();
867    let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
868        Ok(LeanCtxServer::new_shared_with_context(
869            &service_project_root,
870            "default",
871            "default",
872        ))
873    };
874    let mcp_http = StreamableHttpService::new(
875        service_factory,
876        Arc::new(
877            rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
878        ),
879        cfg.mcp_http_config(),
880    );
881
882    let rest_server = LeanCtxServer::new_shared_with_context(&project_root, "default", "default");
883
884    let state = AppState {
885        token: cfg.effective_auth_token(),
886        concurrency: Arc::new(tokio::sync::Semaphore::new(cfg.max_concurrency.max(1))),
887        rate: Arc::new(RateLimiter::new(cfg.max_rps, cfg.rate_burst)),
888        project_root,
889        timeout: Duration::from_millis(cfg.request_timeout_ms.max(1)),
890        server: rest_server,
891    };
892
893    Router::new()
894        .route("/health", get(health))
895        .route("/v1/shutdown", axum::routing::post(v1_shutdown))
896        .route("/v1/manifest", get(v1_manifest))
897        .route("/v1/tools", get(v1_tools))
898        .route("/v1/tools/call", axum::routing::post(v1_tool_call))
899        .route("/v1/events", get(v1_events))
900        .route(
901            "/v1/context/summary",
902            get(context_views::v1_context_summary),
903        )
904        .route("/v1/events/search", get(context_views::v1_events_search))
905        .route("/v1/events/lineage", get(context_views::v1_event_lineage))
906        .route("/v1/metrics", get(v1_metrics))
907        .route("/v1/audit/events", get(v1_audit_events))
908        .route("/v1/a2a/handoff", axum::routing::post(v1_a2a_handoff))
909        .route("/v1/a2a/agent-card", get(v1_a2a_agent_card))
910        .route("/.well-known/agent.json", get(v1_a2a_agent_card))
911        .route("/.well-known/mcp-server.json", get(mcp_server_card))
912        .route("/a2a", axum::routing::post(a2a_jsonrpc))
913        .route(
914            "/v1/agents/register",
915            axum::routing::post(v1_agents_register),
916        )
917        .route(
918            "/v1/agents/heartbeat",
919            axum::routing::post(v1_agents_heartbeat),
920        )
921        .route("/v1/agents/list", get(v1_agents_list))
922        .route(
923            "/v1/agents/deregister",
924            axum::routing::post(v1_agents_deregister),
925        )
926        .route("/v1/agents/events", get(v1_agents_events_sse))
927        .fallback_service(mcp_http)
928        .layer(axum::extract::DefaultBodyLimit::max(cfg.max_body_bytes))
929        .layer(middleware::from_fn_with_state(
930            state.clone(),
931            rate_limit_middleware,
932        ))
933        .layer(middleware::from_fn_with_state(
934            state.clone(),
935            concurrency_middleware,
936        ))
937        .layer(middleware::from_fn_with_state(
938            state.clone(),
939            auth_middleware,
940        ))
941        .with_state(state)
942}
943
944pub async fn serve(cfg: HttpServerConfig) -> Result<()> {
945    crate::core::protocol::set_mcp_context(true);
946    cfg.validate()?;
947
948    let addr: SocketAddr = format!("{}:{}", cfg.host, cfg.port)
949        .parse()
950        .context("invalid host/port")?;
951
952    let app = build_app_router(&cfg);
953
954    let listener = tokio::net::TcpListener::bind(addr)
955        .await
956        .with_context(|| format!("bind {addr}"))?;
957
958    tracing::info!(
959        "lean-ctx Streamable HTTP server listening on http://{addr} (project_root={})",
960        cfg.project_root.display()
961    );
962
963    axum::serve(listener, app)
964        .with_graceful_shutdown(async move {
965            let _ = tokio::signal::ctrl_c().await;
966        })
967        .await
968        .context("http server")?;
969    Ok(())
970}
971
972#[cfg(windows)]
973impl axum::serve::Listener for crate::ipc::NamedPipeListener {
974    type Io = tokio::net::windows::named_pipe::NamedPipeServer;
975    type Addr = String;
976
977    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
978        loop {
979            match self.accept_pipe().await {
980                Ok(pipe) => return (pipe, self.name().to_string()),
981                Err(e) => {
982                    tracing::error!("named pipe accept error: {e}");
983                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
984                }
985            }
986        }
987    }
988
989    fn local_addr(&self) -> std::io::Result<Self::Addr> {
990        Ok(self.name().to_string())
991    }
992}
993
994/// Serve the daemon over a platform-independent IPC channel (UDS on Unix,
995/// Named Pipes on Windows).
996pub async fn serve_ipc(cfg: HttpServerConfig, addr: crate::ipc::DaemonAddr) -> Result<()> {
997    cfg.validate()?;
998
999    match addr {
1000        #[cfg(unix)]
1001        crate::ipc::DaemonAddr::Unix(ref path) => {
1002            let app = build_app_router(&cfg);
1003            let listener = crate::ipc::bind_listener(&addr)?;
1004
1005            tracing::info!(
1006                "lean-ctx daemon listening on {} (project_root={})",
1007                path.display(),
1008                cfg.project_root.display()
1009            );
1010
1011            axum::serve(listener, app.into_make_service())
1012                .with_graceful_shutdown(async move {
1013                    let _ = tokio::signal::ctrl_c().await;
1014                })
1015                .await
1016                .context("ipc server")?;
1017            Ok(())
1018        }
1019        #[cfg(windows)]
1020        crate::ipc::DaemonAddr::NamedPipe(ref name) => {
1021            let app = build_app_router(&cfg);
1022            let listener = crate::ipc::bind_listener(&addr)?;
1023
1024            tracing::info!(
1025                "lean-ctx daemon listening on {} (project_root={})",
1026                name,
1027                cfg.project_root.display()
1028            );
1029
1030            axum::serve(listener, app.into_make_service())
1031                .with_graceful_shutdown(async move {
1032                    let _ = tokio::signal::ctrl_c().await;
1033                })
1034                .await
1035                .context("ipc server")?;
1036            Ok(())
1037        }
1038    }
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043    use super::*;
1044    use axum::body::Body;
1045    use axum::http::Request;
1046    use futures::StreamExt;
1047    use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
1048    use serde_json::json;
1049    use tower::ServiceExt;
1050
1051    async fn read_first_sse_message(body: Body) -> String {
1052        let mut stream = body.into_data_stream();
1053        let mut buf: Vec<u8> = Vec::new();
1054        for _ in 0..32 {
1055            let next = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
1056            let Ok(Some(Ok(bytes))) = next else {
1057                break;
1058            };
1059            buf.extend_from_slice(&bytes);
1060            if buf.windows(2).any(|w| w == b"\n\n") {
1061                break;
1062            }
1063        }
1064        String::from_utf8_lossy(&buf).to_string()
1065    }
1066
1067    #[tokio::test]
1068    async fn auth_token_blocks_requests_without_bearer_header() {
1069        let dir = tempfile::tempdir().expect("tempdir");
1070        let root_str = dir.path().to_string_lossy().to_string();
1071        let service_project_root = root_str.clone();
1072        let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
1073            Ok(LeanCtxServer::new_shared_with_context(
1074                &service_project_root,
1075                "default",
1076                "default",
1077            ))
1078        };
1079        let cfg = StreamableHttpServerConfig::default()
1080            .with_stateful_mode(false)
1081            .with_json_response(true);
1082
1083        let mcp_http = StreamableHttpService::new(
1084            service_factory,
1085            Arc::new(
1086                rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
1087            ),
1088            cfg,
1089        );
1090
1091        let state = AppState {
1092            token: Some("secret".to_string()),
1093            concurrency: Arc::new(tokio::sync::Semaphore::new(4)),
1094            rate: Arc::new(RateLimiter::new(50, 100)),
1095            project_root: root_str.clone(),
1096            timeout: Duration::from_secs(30),
1097            server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1098        };
1099
1100        let app = Router::new()
1101            .fallback_service(mcp_http)
1102            .layer(middleware::from_fn_with_state(
1103                state.clone(),
1104                auth_middleware,
1105            ))
1106            .with_state(state);
1107
1108        let body = json!({
1109            "jsonrpc": "2.0",
1110            "id": 1,
1111            "method": "tools/list",
1112            "params": {}
1113        })
1114        .to_string();
1115
1116        let req = Request::builder()
1117            .method("POST")
1118            .uri("/")
1119            .header("Host", "localhost")
1120            .header("Accept", "application/json, text/event-stream")
1121            .header("Content-Type", "application/json")
1122            .body(Body::from(body))
1123            .expect("request");
1124
1125        let resp = app.clone().oneshot(req).await.expect("resp");
1126        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1127    }
1128
1129    #[tokio::test]
1130    async fn mcp_service_factory_isolates_per_client_state() {
1131        let dir = tempfile::tempdir().expect("tempdir");
1132        let root_str = dir.path().to_string_lossy().to_string();
1133
1134        // Mirrors the serve() setup: service_factory must create a fresh server per MCP session.
1135        let service_project_root = root_str.clone();
1136        let service_factory = move || -> Result<LeanCtxServer, std::convert::Infallible> {
1137            Ok(LeanCtxServer::new_shared_with_context(
1138                &service_project_root,
1139                "default",
1140                "default",
1141            ))
1142        };
1143
1144        let s1 = service_factory().expect("server 1");
1145        let s2 = service_factory().expect("server 2");
1146
1147        // If the two servers accidentally share the same Arc-backed fields, these writes would
1148        // clobber each other. This test stays independent of rmcp's InitializeRequestParams API.
1149        *s1.client_name.write().await = "client-a".to_string();
1150        *s2.client_name.write().await = "client-b".to_string();
1151
1152        let a = s1.client_name.read().await.clone();
1153        let b = s2.client_name.read().await.clone();
1154        assert_eq!(a, "client-a");
1155        assert_eq!(b, "client-b");
1156    }
1157
1158    #[tokio::test]
1159    async fn rate_limit_returns_429_when_exhausted() {
1160        let state = AppState {
1161            token: None,
1162            concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1163            rate: Arc::new(RateLimiter::new(1, 1)),
1164            project_root: ".".to_string(),
1165            timeout: Duration::from_secs(30),
1166            server: LeanCtxServer::new_shared_with_context(".", "default", "default"),
1167        };
1168
1169        let app = Router::new()
1170            .route("/limited", get(|| async { (StatusCode::OK, "ok\n") }))
1171            .layer(middleware::from_fn_with_state(
1172                state.clone(),
1173                rate_limit_middleware,
1174            ))
1175            .with_state(state);
1176
1177        let req1 = Request::builder()
1178            .method("GET")
1179            .uri("/limited")
1180            .header("Host", "localhost")
1181            .body(Body::empty())
1182            .expect("req1");
1183        let resp1 = app.clone().oneshot(req1).await.expect("resp1");
1184        assert_eq!(resp1.status(), StatusCode::OK);
1185
1186        let req2 = Request::builder()
1187            .method("GET")
1188            .uri("/limited")
1189            .header("Host", "localhost")
1190            .body(Body::empty())
1191            .expect("req2");
1192        let resp2 = app.clone().oneshot(req2).await.expect("resp2");
1193        assert_eq!(resp2.status(), StatusCode::TOO_MANY_REQUESTS);
1194    }
1195
1196    #[tokio::test]
1197    async fn audit_events_endpoint_returns_json() {
1198        let dir = tempfile::tempdir().expect("tempdir");
1199        let root_str = dir.path().to_string_lossy().to_string();
1200
1201        let state = AppState {
1202            token: None,
1203            concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1204            rate: Arc::new(RateLimiter::new(50, 100)),
1205            project_root: root_str.clone(),
1206            timeout: Duration::from_secs(30),
1207            server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1208        };
1209
1210        let app = Router::new()
1211            .route("/v1/audit/events", get(v1_audit_events))
1212            .with_state(state);
1213
1214        let req = Request::builder()
1215            .method("GET")
1216            .uri("/v1/audit/events?limit=10")
1217            .header("Host", "localhost")
1218            .body(Body::empty())
1219            .unwrap();
1220
1221        let resp = app.oneshot(req).await.unwrap();
1222        assert_eq!(resp.status(), StatusCode::OK);
1223
1224        let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1225            .await
1226            .unwrap();
1227        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1228        assert!(json.get("cross_project_events").unwrap().is_array());
1229        assert!(json.get("audit_trail").unwrap().is_array());
1230    }
1231
1232    #[tokio::test]
1233    async fn events_endpoint_replays_tool_call_event() {
1234        use crate::core::context_os::{self, ContextEventKindV1};
1235
1236        let dir = tempfile::tempdir().expect("tempdir");
1237        std::fs::create_dir_all(dir.path().join(".git")).expect("git marker");
1238        std::fs::write(dir.path().join("a.txt"), "ok").expect("file");
1239        let root_str = dir.path().to_string_lossy().to_string();
1240
1241        let state = AppState {
1242            token: None,
1243            concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1244            rate: Arc::new(RateLimiter::new(50, 100)),
1245            project_root: root_str.clone(),
1246            timeout: Duration::from_secs(30),
1247            server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1248        };
1249
1250        let app = Router::new()
1251            .route("/v1/events", get(v1_events))
1252            .with_state(state);
1253
1254        // Directly append an event to the bus — no fire-and-forget timing dependency.
1255        let rt = context_os::runtime();
1256        rt.bus.append(
1257            "ws1",
1258            "ch1",
1259            &ContextEventKindV1::ToolCallRecorded,
1260            Some("test-agent"),
1261            json!({"tool": "ctx_session", "action": "status"}),
1262        );
1263
1264        let req = Request::builder()
1265            .method("GET")
1266            .uri("/v1/events?workspaceId=ws1&channelId=ch1&since=0&limit=1")
1267            .header("Host", "localhost")
1268            .header("Accept", "text/event-stream")
1269            .body(Body::empty())
1270            .expect("req");
1271        let resp = app.clone().oneshot(req).await.expect("events");
1272        assert_eq!(resp.status(), StatusCode::OK);
1273
1274        let msg = read_first_sse_message(resp.into_body()).await;
1275        assert!(msg.contains("event: tool_call_recorded"), "msg={msg:?}");
1276        assert!(msg.contains("\"ws1\""), "msg={msg:?}");
1277        assert!(msg.contains("\"ch1\""), "msg={msg:?}");
1278    }
1279}