Skip to main content

lean_ctx/http_server/
mod.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{anyhow, Context, Result};
6use axum::{
7    extract::Json,
8    extract::Query,
9    extract::State,
10    http::{header, Request, StatusCode},
11    middleware::{self, Next},
12    response::sse::{Event as SseEvent, KeepAlive, Sse},
13    response::{IntoResponse, Response},
14    routing::get,
15    Router,
16};
17use futures::Stream;
18use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
19use serde::Deserialize;
20use serde_json::Value;
21use tokio::sync::broadcast;
22use tokio::time::{Duration, Instant};
23
24use crate::core::context_os::ContextOsMetrics;
25use crate::engine::ContextEngine;
26use crate::tools::LeanCtxServer;
27
28pub mod context_views;
29
30#[cfg(feature = "team-server")]
31pub mod team;
32
33/// Wrapper stream that calls `record_sse_disconnect` on drop.
34use std::pin::Pin;
35
36pub(crate) struct SseDisconnectGuard<I> {
37    pub(crate) inner: Pin<Box<dyn Stream<Item = I> + Send>>,
38    pub(crate) metrics: Arc<ContextOsMetrics>,
39}
40
41impl<I> Stream for SseDisconnectGuard<I> {
42    type Item = I;
43
44    fn poll_next(
45        mut self: Pin<&mut Self>,
46        cx: &mut std::task::Context<'_>,
47    ) -> std::task::Poll<Option<Self::Item>> {
48        self.inner.as_mut().poll_next(cx)
49    }
50}
51
52impl<I> Drop for SseDisconnectGuard<I> {
53    fn drop(&mut self) {
54        self.metrics.record_sse_disconnect();
55    }
56}
57
58const MAX_ID_LEN: usize = 64;
59
60fn sanitize_id(raw: &str) -> String {
61    let trimmed = raw.trim();
62    if trimmed.is_empty() {
63        return "default".to_string();
64    }
65    let cleaned: String = trimmed
66        .chars()
67        .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
68        .take(MAX_ID_LEN)
69        .collect();
70    if cleaned.is_empty() {
71        "default".to_string()
72    } else {
73        cleaned
74    }
75}
76
77#[derive(Clone, Debug)]
78pub struct HttpServerConfig {
79    pub host: String,
80    pub port: u16,
81    pub project_root: PathBuf,
82    pub auth_token: Option<String>,
83    pub stateful_mode: bool,
84    pub json_response: bool,
85    pub disable_host_check: bool,
86    pub allowed_hosts: Vec<String>,
87    pub max_body_bytes: usize,
88    pub max_concurrency: usize,
89    pub max_rps: u32,
90    pub rate_burst: u32,
91    pub request_timeout_ms: u64,
92}
93
94impl Default for HttpServerConfig {
95    fn default() -> Self {
96        let project_root = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
97        Self {
98            host: "127.0.0.1".to_string(),
99            port: 8080,
100            project_root,
101            auth_token: None,
102            stateful_mode: false,
103            json_response: true,
104            disable_host_check: false,
105            allowed_hosts: Vec::new(),
106            max_body_bytes: 2 * 1024 * 1024,
107            max_concurrency: 32,
108            max_rps: 50,
109            rate_burst: 100,
110            request_timeout_ms: 30_000,
111        }
112    }
113}
114
115impl HttpServerConfig {
116    pub fn validate(&self) -> Result<()> {
117        let host = self.host.trim().to_lowercase();
118        let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
119        if !is_loopback && self.auth_token.as_deref().unwrap_or("").is_empty() {
120            return Err(anyhow!(
121                "Refusing to bind to host='{host}' without auth. Provide --auth-token (or bind to 127.0.0.1)."
122            ));
123        }
124        Ok(())
125    }
126
127    pub fn effective_auth_token(&self) -> Option<String> {
128        if let Some(ref token) = self.auth_token {
129            if !token.is_empty() {
130                return Some(token.clone());
131            }
132        }
133        let host = self.host.trim().to_lowercase();
134        let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
135        if is_loopback {
136            let auto_token = crate::core::session_token::generate_token();
137            eprintln!(
138                "[lean-ctx] Auto-generated auth token for loopback: {auto_token}\n\
139                 Pass as Bearer token or set --auth-token explicitly."
140            );
141            Some(auto_token)
142        } else {
143            None
144        }
145    }
146
147    fn mcp_http_config(&self) -> StreamableHttpServerConfig {
148        let mut cfg = StreamableHttpServerConfig::default()
149            .with_stateful_mode(self.stateful_mode)
150            .with_json_response(self.json_response);
151
152        if self.disable_host_check {
153            tracing::warn!(
154                "⚠ --disable-host-check is active: DNS rebinding protection is OFF. \
155                 Do NOT use this in production or on non-loopback interfaces."
156            );
157            cfg = cfg.disable_allowed_hosts();
158            return cfg;
159        }
160
161        if !self.allowed_hosts.is_empty() {
162            cfg = cfg.with_allowed_hosts(self.allowed_hosts.clone());
163            return cfg;
164        }
165
166        // Keep rmcp's secure loopback defaults; also allow the configured host (if it's loopback).
167        let host = self.host.trim();
168        if host == "127.0.0.1" || host == "localhost" || host == "::1" {
169            cfg.allowed_hosts.push(host.to_string());
170        }
171
172        cfg
173    }
174}
175
176#[derive(Clone)]
177struct AppState {
178    token: Option<String>,
179    concurrency: Arc<tokio::sync::Semaphore>,
180    rate: Arc<RateLimiter>,
181    project_root: String,
182    timeout: Duration,
183    server: LeanCtxServer,
184}
185
186#[derive(Debug)]
187struct RateLimiter {
188    max_rps: f64,
189    burst: f64,
190    state: tokio::sync::Mutex<RateState>,
191}
192
193#[derive(Debug, Clone, Copy)]
194struct RateState {
195    tokens: f64,
196    last: Instant,
197}
198
199impl RateLimiter {
200    fn new(max_rps: u32, burst: u32) -> Self {
201        let now = Instant::now();
202        Self {
203            max_rps: (max_rps.max(1)) as f64,
204            burst: (burst.max(1)) as f64,
205            state: tokio::sync::Mutex::new(RateState {
206                tokens: (burst.max(1)) as f64,
207                last: now,
208            }),
209        }
210    }
211
212    async fn allow(&self) -> bool {
213        let mut s = self.state.lock().await;
214        let now = Instant::now();
215        let elapsed = now.saturating_duration_since(s.last);
216        let refill = elapsed.as_secs_f64() * self.max_rps;
217        s.tokens = (s.tokens + refill).min(self.burst);
218        s.last = now;
219        if s.tokens >= 1.0 {
220            s.tokens -= 1.0;
221            true
222        } else {
223            false
224        }
225    }
226}
227
228async fn auth_middleware(
229    State(state): State<AppState>,
230    req: Request<axum::body::Body>,
231    next: Next,
232) -> Response {
233    if state.token.is_none() {
234        return next.run(req).await;
235    }
236
237    if req.uri().path() == "/health" {
238        return next.run(req).await;
239    }
240
241    let expected = state.token.as_deref().unwrap_or("");
242    let Some(h) = req.headers().get(header::AUTHORIZATION) else {
243        return json_error(
244            StatusCode::UNAUTHORIZED,
245            "unauthorized",
246            "missing Authorization header",
247        );
248    };
249    let Ok(s) = h.to_str() else {
250        return json_error(
251            StatusCode::UNAUTHORIZED,
252            "unauthorized",
253            "malformed Authorization header",
254        );
255    };
256    let Some(token) = s
257        .strip_prefix("Bearer ")
258        .or_else(|| s.strip_prefix("bearer "))
259    else {
260        return json_error(
261            StatusCode::UNAUTHORIZED,
262            "unauthorized",
263            "Authorization must use the Bearer scheme",
264        );
265    };
266    if !constant_time_eq(token.as_bytes(), expected.as_bytes()) {
267        return json_error(
268            StatusCode::UNAUTHORIZED,
269            "unauthorized",
270            "invalid bearer token",
271        );
272    }
273
274    next.run(req).await
275}
276
277/// Structured REST error envelope: `{ "error": <human message>, "error_code": <stable code> }`.
278///
279/// `error_code` is the stable, machine-readable string SDKs switch on; `error` carries the
280/// human-facing message. Used for every REST (non-A2A) error so clients branch on a code
281/// instead of parsing prose. The A2A JSON-RPC surface keeps its own `-32xxx` envelope.
282pub(crate) fn json_error(status: StatusCode, error_code: &str, message: &str) -> Response {
283    (
284        status,
285        Json(serde_json::json!({ "error": message, "error_code": error_code })),
286    )
287        .into_response()
288}
289
290fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
291    use subtle::ConstantTimeEq;
292    if a.len() != b.len() {
293        return false;
294    }
295    bool::from(a.ct_eq(b))
296}
297
298async fn rate_limit_middleware(
299    State(state): State<AppState>,
300    req: Request<axum::body::Body>,
301    next: Next,
302) -> Response {
303    if !state.rate.allow().await {
304        return StatusCode::TOO_MANY_REQUESTS.into_response();
305    }
306    next.run(req).await
307}
308
309async fn concurrency_middleware(
310    State(state): State<AppState>,
311    req: Request<axum::body::Body>,
312    next: Next,
313) -> Response {
314    let Ok(permit) = state.concurrency.clone().try_acquire_owned() else {
315        return StatusCode::TOO_MANY_REQUESTS.into_response();
316    };
317    let resp = next.run(req).await;
318    drop(permit);
319    resp
320}
321
322async fn health() -> impl IntoResponse {
323    (StatusCode::OK, "ok\n")
324}
325
326async fn v1_shutdown() -> impl IntoResponse {
327    tokio::spawn(async {
328        tokio::time::sleep(Duration::from_millis(100)).await;
329        std::process::exit(0);
330    });
331    (StatusCode::OK, "shutting down\n")
332}
333
334#[derive(Debug, Deserialize)]
335#[serde(rename_all = "camelCase")]
336struct ToolCallBody {
337    name: String,
338    #[serde(default)]
339    arguments: Option<Value>,
340    #[serde(default)]
341    _workspace_id: Option<String>,
342    #[serde(default)]
343    _channel_id: Option<String>,
344}
345
346#[derive(Debug, Deserialize)]
347#[serde(rename_all = "camelCase")]
348struct EventsQuery {
349    #[serde(default)]
350    workspace_id: Option<String>,
351    #[serde(default)]
352    channel_id: Option<String>,
353    #[serde(default)]
354    since: Option<i64>,
355    #[serde(default)]
356    limit: Option<usize>,
357    /// Comma-separated event kind filter (e.g. `tool_call,session_start`).
358    /// When set, only matching events are delivered via SSE.
359    #[serde(default)]
360    kind: Option<String>,
361}
362
363async fn v1_manifest(State(state): State<AppState>) -> impl IntoResponse {
364    let _ = state;
365    let v = crate::core::mcp_manifest::manifest_value();
366    (StatusCode::OK, Json(v))
367}
368
369#[derive(Debug, Deserialize)]
370#[serde(rename_all = "camelCase")]
371struct ToolsQuery {
372    #[serde(default)]
373    offset: Option<usize>,
374    #[serde(default)]
375    limit: Option<usize>,
376}
377
378async fn v1_tools(State(state): State<AppState>, Query(q): Query<ToolsQuery>) -> impl IntoResponse {
379    let _ = state;
380    let v = crate::core::mcp_manifest::manifest_value();
381    let tools = v
382        .get("tools")
383        .and_then(|t| t.get("granular"))
384        .cloned()
385        .unwrap_or(Value::Array(vec![]));
386
387    let all = tools.as_array().cloned().unwrap_or_default();
388    let total = all.len();
389    let offset = q.offset.unwrap_or(0).min(total);
390    let limit = q.limit.unwrap_or(200).min(500);
391    let page = all.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
392
393    (
394        StatusCode::OK,
395        Json(serde_json::json!({
396            "tools": page,
397            "total": total,
398            "offset": offset,
399            "limit": limit,
400        })),
401    )
402}
403
404async fn v1_tool_call(
405    State(state): State<AppState>,
406    Json(body): Json<ToolCallBody>,
407) -> impl IntoResponse {
408    let engine = ContextEngine::from_server(state.server.clone());
409    match tokio::time::timeout(
410        state.timeout,
411        engine.call_tool_value(&body.name, body.arguments),
412    )
413    .await
414    {
415        Ok(Ok(v)) => (StatusCode::OK, Json(serde_json::json!({ "result": v }))).into_response(),
416        Ok(Err(e)) => {
417            tracing::warn!("tool call error: {e}");
418            json_error(
419                StatusCode::BAD_REQUEST,
420                "tool_error",
421                "tool execution failed",
422            )
423        }
424        Err(_) => json_error(
425            StatusCode::GATEWAY_TIMEOUT,
426            "request_timeout",
427            "tool call timed out",
428        ),
429    }
430}
431
432async fn v1_events(
433    State(state): State<AppState>,
434    Query(q): Query<EventsQuery>,
435) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
436    use crate::core::context_os::{redact_event_payload, ContextEventV1, RedactionLevel};
437
438    let ws = sanitize_id(&q.workspace_id.unwrap_or_else(|| "default".to_string()));
439    let ch = sanitize_id(&q.channel_id.unwrap_or_else(|| "default".to_string()));
440    let _ = &state.project_root;
441    let since = q.since.unwrap_or(0);
442    let limit = q.limit.unwrap_or(200).min(1000);
443    let redaction = RedactionLevel::RefsOnly;
444
445    let kind_filter: Option<Vec<String>> = q
446        .kind
447        .as_deref()
448        .map(|k| k.split(',').map(|s| s.trim().to_string()).collect());
449
450    let rt = crate::core::context_os::runtime();
451    let replay = rt.bus.read(&ws, &ch, since, limit);
452
453    let replay = if let Some(ref kinds) = kind_filter {
454        replay
455            .into_iter()
456            .filter(|ev| kinds.contains(&ev.kind))
457            .collect()
458    } else {
459        replay
460    };
461
462    let rx = if let Some(ref kinds) = kind_filter {
463        let kind_refs: Vec<&str> = kinds.iter().map(String::as_str).collect();
464        let filter = crate::core::context_os::TopicFilter::kinds(&kind_refs);
465        if let Some(sub) = rt.bus.subscribe_filtered(&ws, &ch, filter) {
466            crate::core::context_os::SubscriptionKind::Filtered(sub)
467        } else {
468            tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
469            let (_, rx) = broadcast::channel::<ContextEventV1>(1);
470            crate::core::context_os::SubscriptionKind::Unfiltered(rx)
471        }
472    } else if let Some(sub) = rt.bus.subscribe(&ws, &ch) {
473        crate::core::context_os::SubscriptionKind::Unfiltered(sub)
474    } else {
475        tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
476        let (_, rx) = broadcast::channel::<ContextEventV1>(1);
477        crate::core::context_os::SubscriptionKind::Unfiltered(rx)
478    };
479
480    rt.metrics.record_sse_connect();
481    rt.metrics.record_events_replayed(replay.len() as u64);
482    rt.metrics.record_workspace_active(&ws);
483
484    let bus = rt.bus.clone();
485    let metrics = rt.metrics.clone();
486    let pending: std::collections::VecDeque<ContextEventV1> = replay.into();
487
488    let stream = futures::stream::unfold(
489        (
490            pending,
491            rx,
492            ws.clone(),
493            ch.clone(),
494            since,
495            redaction,
496            bus,
497            metrics,
498        ),
499        |(mut pending, mut rx, ws, ch, mut last_id, redaction, bus, metrics)| async move {
500            if let Some(mut ev) = pending.pop_front() {
501                last_id = ev.id;
502                redact_event_payload(&mut ev, redaction);
503                let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
504                let evt = SseEvent::default()
505                    .id(ev.id.to_string())
506                    .event(ev.kind)
507                    .data(data);
508                return Some((
509                    Ok(evt),
510                    (pending, rx, ws, ch, last_id, redaction, bus, metrics),
511                ));
512            }
513
514            loop {
515                match rx.recv().await {
516                    Ok(mut ev) if ev.id > last_id => {
517                        last_id = ev.id;
518                        redact_event_payload(&mut ev, redaction);
519                        let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
520                        let evt = SseEvent::default()
521                            .id(ev.id.to_string())
522                            .event(ev.kind)
523                            .data(data);
524                        return Some((
525                            Ok(evt),
526                            (pending, rx, ws, ch, last_id, redaction, bus, metrics),
527                        ));
528                    }
529                    Ok(_) => {}
530                    Err(broadcast::error::RecvError::Closed) => return None,
531                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
532                        let missed = bus.read(&ws, &ch, last_id, skipped as usize);
533                        metrics.record_events_replayed(missed.len() as u64);
534                        for ev in missed {
535                            last_id = last_id.max(ev.id);
536                            pending.push_back(ev);
537                        }
538                    }
539                }
540            }
541        },
542    );
543
544    let metrics_ref = rt.metrics.clone();
545    let guarded = SseDisconnectGuard {
546        inner: Box::pin(stream),
547        metrics: metrics_ref,
548    };
549
550    Sse::new(guarded).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
551}
552
553#[derive(Debug, Deserialize)]
554struct AuditEventsQuery {
555    #[serde(default = "default_audit_limit")]
556    limit: usize,
557}
558
559fn default_audit_limit() -> usize {
560    100
561}
562
563async fn v1_audit_events(Query(q): Query<AuditEventsQuery>) -> impl IntoResponse {
564    let capped = q.limit.min(1000);
565    let boundary_events = crate::core::memory_boundary::load_audit_events(capped);
566    let trail_events = crate::core::audit_trail::load_recent(capped);
567
568    Json(serde_json::json!({
569        "cross_project_events": boundary_events,
570        "audit_trail": trail_events,
571    }))
572}
573
574async fn v1_metrics(State(_state): State<AppState>) -> impl IntoResponse {
575    let rt = crate::core::context_os::runtime();
576    let snap = rt.metrics.snapshot();
577    (
578        StatusCode::OK,
579        Json(serde_json::to_value(snap).unwrap_or_default()),
580    )
581}
582
583const MAX_HANDOFF_PAYLOAD_BYTES: usize = 1_000_000;
584const MAX_HANDOFF_FILES: usize = 50;
585
586async fn v1_a2a_handoff(
587    State(state): State<AppState>,
588    Json(body): Json<Value>,
589) -> impl IntoResponse {
590    let envelope = match crate::core::a2a_transport::parse_envelope(
591        &serde_json::to_string(&body).unwrap_or_default(),
592    ) {
593        Ok(env) => env,
594        Err(e) => {
595            tracing::warn!("a2a handoff parse error: {e}");
596            return (
597                StatusCode::BAD_REQUEST,
598                Json(serde_json::json!({"error": "invalid_envelope"})),
599            );
600        }
601    };
602
603    if envelope.payload_json.len() > MAX_HANDOFF_PAYLOAD_BYTES {
604        tracing::warn!(
605            "a2a handoff payload too large: {} bytes (limit {MAX_HANDOFF_PAYLOAD_BYTES})",
606            envelope.payload_json.len()
607        );
608        return (
609            StatusCode::PAYLOAD_TOO_LARGE,
610            Json(serde_json::json!({"error": "payload_too_large"})),
611        );
612    }
613
614    let rt = crate::core::context_os::runtime();
615    rt.bus.append(
616        &state.project_root,
617        "a2a",
618        &crate::core::context_os::ContextEventKindV1::SessionMutated,
619        Some(&envelope.sender.agent_id),
620        serde_json::json!({
621            "type": "handoff_received",
622            "content_type": format!("{:?}", envelope.content_type),
623            "sender": envelope.sender.agent_id,
624            "payload_size": envelope.payload_json.len(),
625        }),
626    );
627
628    match envelope.content_type {
629        crate::core::a2a_transport::TransportContentType::ContextPackage => {
630            let dir = std::path::Path::new(&state.project_root)
631                .join(".lean-ctx")
632                .join("handoffs")
633                .join("packages");
634            let _ = std::fs::create_dir_all(&dir);
635            evict_oldest_files(&dir, MAX_HANDOFF_FILES);
636            let out = dir.join(format!(
637                "ctx-{}.{}",
638                chrono::Utc::now().format("%Y%m%d_%H%M%S"),
639                crate::core::contracts::PACKAGE_EXTENSION
640            ));
641            if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
642                tracing::error!("a2a handoff write failed: {e}");
643                return (
644                    StatusCode::INTERNAL_SERVER_ERROR,
645                    Json(serde_json::json!({"error": "write_failed"})),
646                );
647            }
648            (
649                StatusCode::OK,
650                Json(serde_json::json!({
651                    "status": "received",
652                    "content_type": "context_package",
653                })),
654            )
655        }
656        crate::core::a2a_transport::TransportContentType::HandoffBundle => {
657            let dir = std::path::Path::new(&state.project_root)
658                .join(".lean-ctx")
659                .join("handoffs");
660            let _ = std::fs::create_dir_all(&dir);
661            evict_oldest_files(&dir, MAX_HANDOFF_FILES);
662            let out = dir.join(format!(
663                "received-{}.json",
664                chrono::Utc::now().format("%Y%m%d_%H%M%S")
665            ));
666            if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
667                tracing::error!("a2a handoff write failed: {e}");
668                return (
669                    StatusCode::INTERNAL_SERVER_ERROR,
670                    Json(serde_json::json!({"error": "write_failed"})),
671                );
672            }
673            (
674                StatusCode::OK,
675                Json(serde_json::json!({
676                    "status": "received",
677                    "content_type": "handoff_bundle",
678                })),
679            )
680        }
681        _ => (
682            StatusCode::OK,
683            Json(serde_json::json!({
684                "status": "received",
685                "content_type": format!("{:?}", envelope.content_type),
686            })),
687        ),
688    }
689}
690
691fn evict_oldest_files(dir: &std::path::Path, max_files: usize) {
692    let Ok(entries) = std::fs::read_dir(dir) else {
693        return;
694    };
695    let mut files: Vec<(std::time::SystemTime, std::path::PathBuf)> = entries
696        .filter_map(|e| {
697            let e = e.ok()?;
698            let meta = e.metadata().ok()?;
699            if meta.is_file() {
700                Some((meta.modified().unwrap_or(std::time::UNIX_EPOCH), e.path()))
701            } else {
702                None
703            }
704        })
705        .collect();
706
707    if files.len() < max_files {
708        return;
709    }
710    files.sort_by_key(|(mtime, _)| *mtime);
711    let to_remove = files.len().saturating_sub(max_files.saturating_sub(1));
712    for (_, path) in files.into_iter().take(to_remove) {
713        let _ = std::fs::remove_file(path);
714    }
715}
716
717async fn a2a_jsonrpc(Json(body): Json<Value>) -> impl IntoResponse {
718    let req: crate::core::a2a::a2a_compat::JsonRpcRequest = match serde_json::from_value(body) {
719        Ok(r) => r,
720        Err(e) => {
721            tracing::debug!("a2a JSON-RPC parse error: {e}");
722            return (
723                StatusCode::BAD_REQUEST,
724                Json(serde_json::json!({
725                    "jsonrpc": "2.0",
726                    "id": null,
727                    "error": {"code": -32700, "message": "invalid request"}
728                })),
729            );
730        }
731    };
732    let resp = crate::core::a2a::a2a_compat::handle_a2a_jsonrpc(&req);
733    let json = serde_json::to_value(resp).unwrap_or_default();
734    (StatusCode::OK, Json(json))
735}
736
737async fn v1_a2a_agent_card(State(state): State<AppState>) -> impl IntoResponse {
738    let card = crate::core::a2a::agent_card::build_agent_card(&state.project_root);
739    (
740        StatusCode::OK,
741        [(header::CONTENT_TYPE, "application/json")],
742        Json(card),
743    )
744}
745
746async fn mcp_server_card() -> impl IntoResponse {
747    let card = serde_json::json!({
748        "name": "lean-ctx",
749        "version": env!("CARGO_PKG_VERSION"),
750        "description": "Context Infrastructure Layer — compression, caching, governance for AI agents",
751        "capabilities": {
752            "tools": true,
753            "resources": false,
754            "prompts": false,
755            "sampling": false
756        },
757        "tool_categories": [
758            {"name": "file_operations", "tools": ["ctx_read", "ctx_search", "ctx_tree", "ctx_edit"], "avg_token_cost": 150},
759            {"name": "session_management", "tools": ["ctx_session", "ctx_compress", "ctx_dedup", "ctx_preload"], "avg_token_cost": 80},
760            {"name": "intelligence", "tools": ["ctx_knowledge", "ctx_semantic_search", "ctx_graph", "ctx_overview"], "avg_token_cost": 200},
761            {"name": "agent_ops", "tools": ["ctx_agent", "ctx_handoff", "ctx_task", "ctx_share"], "avg_token_cost": 120}
762        ],
763        "features": {
764            "compression": "deterministic AST-based, 40-70% token reduction",
765            "caching": "session-scoped with zstd, re-reads ~13 tokens",
766            "audit_trail": "SHA-256 chained JSONL",
767            "rbac": "5 built-in roles with capability-based access",
768            "sandboxing": "Level 0 (subprocess) + Level 1 (OS-level)",
769            "secret_detection": "8 regex patterns + custom"
770        },
771        "security": {
772            "path_jail": true,
773            "rate_limiting": true,
774            "budget_tracking": true,
775            "signed_handoffs": true,
776            "timing_safe_auth": true
777        }
778    });
779    Json(card)
780}
781
782async fn v1_agents_register(
783    State(state): State<AppState>,
784    Json(body): Json<Value>,
785) -> impl IntoResponse {
786    let agent_type = body
787        .get("agent_type")
788        .and_then(|v| v.as_str())
789        .unwrap_or("unknown");
790    let role = body.get("role").and_then(|v| v.as_str());
791    let project_root = body
792        .get("project_root")
793        .and_then(|v| v.as_str())
794        .unwrap_or(&state.project_root);
795
796    let mut registry = crate::core::agents::AgentRegistry::load_or_create();
797    let agent_id = registry.register(agent_type, role, project_root);
798    let _ = registry.save();
799
800    Json(serde_json::json!({
801        "agent_id": agent_id,
802        "status": "registered"
803    }))
804}
805
806async fn v1_agents_heartbeat(Json(body): Json<Value>) -> impl IntoResponse {
807    let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
808    let mut registry = crate::core::agents::AgentRegistry::load_or_create();
809    registry.update_heartbeat(agent_id);
810    let _ = registry.save();
811    Json(serde_json::json!({"status": "ok"}))
812}
813
814async fn v1_agents_list() -> impl IntoResponse {
815    let registry = crate::core::agents::AgentRegistry::load_or_create();
816    let active = registry.list_active(None);
817    Json(serde_json::json!({
818        "agents": active.iter().map(|a| serde_json::json!({
819            "agent_id": a.agent_id,
820            "agent_type": a.agent_type,
821            "role": a.role,
822            "status": a.status.to_string(),
823            "last_active": a.last_active.to_rfc3339(),
824        })).collect::<Vec<_>>()
825    }))
826}
827
828async fn v1_agents_deregister(Json(body): Json<Value>) -> impl IntoResponse {
829    let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
830    let mut registry = crate::core::agents::AgentRegistry::load_or_create();
831    registry.set_status(
832        agent_id,
833        crate::core::agents::AgentStatus::Finished,
834        Some("deregistered via API"),
835    );
836    let _ = registry.save();
837    Json(serde_json::json!({"status": "deregistered"}))
838}
839
840async fn v1_agents_events_sse(
841) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
842    let stream = futures::stream::unfold(0usize, |last_count| async move {
843        loop {
844            tokio::time::sleep(Duration::from_secs(5)).await;
845            let registry = crate::core::agents::AgentRegistry::load_or_create();
846            let active = registry.list_active(None);
847            let count = active.len();
848            if count != last_count {
849                let data = serde_json::json!({
850                    "type": "agents_changed",
851                    "active_count": count,
852                    "agents": active.iter().map(|a| &a.agent_id).collect::<Vec<_>>(),
853                });
854                return Some((
855                    Ok::<_, std::convert::Infallible>(SseEvent::default().data(data.to_string())),
856                    count,
857                ));
858            }
859        }
860    });
861
862    Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
863}
864
865fn build_app_router(cfg: &HttpServerConfig) -> Router {
866    let project_root = cfg.project_root.to_string_lossy().to_string();
867    let service_project_root = project_root.clone();
868    let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
869        Ok(LeanCtxServer::new_shared_with_context(
870            &service_project_root,
871            "default",
872            "default",
873        ))
874    };
875    let mcp_http = StreamableHttpService::new(
876        service_factory,
877        Arc::new(
878            rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
879        ),
880        cfg.mcp_http_config(),
881    );
882
883    let rest_server = LeanCtxServer::new_shared_with_context(&project_root, "default", "default");
884
885    let state = AppState {
886        token: cfg.effective_auth_token(),
887        concurrency: Arc::new(tokio::sync::Semaphore::new(cfg.max_concurrency.max(1))),
888        rate: Arc::new(RateLimiter::new(cfg.max_rps, cfg.rate_burst)),
889        project_root,
890        timeout: Duration::from_millis(cfg.request_timeout_ms.max(1)),
891        server: rest_server,
892    };
893
894    Router::new()
895        .route("/health", get(health))
896        .route("/v1/shutdown", axum::routing::post(v1_shutdown))
897        .route("/v1/manifest", get(v1_manifest))
898        .route("/v1/tools", get(v1_tools))
899        .route("/v1/tools/call", axum::routing::post(v1_tool_call))
900        .route("/v1/events", get(v1_events))
901        .route(
902            "/v1/context/summary",
903            get(context_views::v1_context_summary),
904        )
905        .route("/v1/events/search", get(context_views::v1_events_search))
906        .route("/v1/events/lineage", get(context_views::v1_event_lineage))
907        .route("/v1/metrics", get(v1_metrics))
908        .route("/v1/audit/events", get(v1_audit_events))
909        .route("/v1/a2a/handoff", axum::routing::post(v1_a2a_handoff))
910        .route("/v1/a2a/agent-card", get(v1_a2a_agent_card))
911        .route("/.well-known/agent.json", get(v1_a2a_agent_card))
912        .route("/.well-known/mcp-server.json", get(mcp_server_card))
913        .route("/a2a", axum::routing::post(a2a_jsonrpc))
914        .route(
915            "/v1/agents/register",
916            axum::routing::post(v1_agents_register),
917        )
918        .route(
919            "/v1/agents/heartbeat",
920            axum::routing::post(v1_agents_heartbeat),
921        )
922        .route("/v1/agents/list", get(v1_agents_list))
923        .route(
924            "/v1/agents/deregister",
925            axum::routing::post(v1_agents_deregister),
926        )
927        .route("/v1/agents/events", get(v1_agents_events_sse))
928        .fallback_service(mcp_http)
929        .layer(axum::extract::DefaultBodyLimit::max(cfg.max_body_bytes))
930        .layer(middleware::from_fn_with_state(
931            state.clone(),
932            rate_limit_middleware,
933        ))
934        .layer(middleware::from_fn_with_state(
935            state.clone(),
936            concurrency_middleware,
937        ))
938        .layer(middleware::from_fn_with_state(
939            state.clone(),
940            auth_middleware,
941        ))
942        .with_state(state)
943}
944
945pub async fn serve(cfg: HttpServerConfig) -> Result<()> {
946    crate::core::protocol::set_mcp_context(true);
947    cfg.validate()?;
948
949    let addr: SocketAddr = format!("{}:{}", cfg.host, cfg.port)
950        .parse()
951        .context("invalid host/port")?;
952
953    let app = build_app_router(&cfg);
954
955    let listener = tokio::net::TcpListener::bind(addr)
956        .await
957        .with_context(|| format!("bind {addr}"))?;
958
959    tracing::info!(
960        "lean-ctx Streamable HTTP server listening on http://{addr} (project_root={})",
961        cfg.project_root.display()
962    );
963
964    axum::serve(listener, app)
965        .with_graceful_shutdown(async move {
966            let _ = tokio::signal::ctrl_c().await;
967        })
968        .await
969        .context("http server")?;
970    Ok(())
971}
972
973#[cfg(windows)]
974impl axum::serve::Listener for crate::ipc::NamedPipeListener {
975    type Io = tokio::net::windows::named_pipe::NamedPipeServer;
976    type Addr = String;
977
978    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
979        loop {
980            match self.accept_pipe().await {
981                Ok(pipe) => return (pipe, self.name().to_string()),
982                Err(e) => {
983                    tracing::error!("named pipe accept error: {e}");
984                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
985                }
986            }
987        }
988    }
989
990    fn local_addr(&self) -> std::io::Result<Self::Addr> {
991        Ok(self.name().to_string())
992    }
993}
994
995/// Serve the daemon over a platform-independent IPC channel (UDS on Unix,
996/// Named Pipes on Windows).
997pub async fn serve_ipc(cfg: HttpServerConfig, addr: crate::ipc::DaemonAddr) -> Result<()> {
998    cfg.validate()?;
999
1000    match addr {
1001        #[cfg(unix)]
1002        crate::ipc::DaemonAddr::Unix(ref path) => {
1003            let app = build_app_router(&cfg);
1004            let listener = crate::ipc::bind_listener(&addr)?;
1005
1006            tracing::info!(
1007                "lean-ctx daemon listening on {} (project_root={})",
1008                path.display(),
1009                cfg.project_root.display()
1010            );
1011
1012            axum::serve(listener, app.into_make_service())
1013                .with_graceful_shutdown(async move {
1014                    let _ = tokio::signal::ctrl_c().await;
1015                })
1016                .await
1017                .context("ipc server")?;
1018            Ok(())
1019        }
1020        #[cfg(windows)]
1021        crate::ipc::DaemonAddr::NamedPipe(ref name) => {
1022            let app = build_app_router(&cfg);
1023            let listener = crate::ipc::bind_listener(&addr)?;
1024
1025            tracing::info!(
1026                "lean-ctx daemon listening on {} (project_root={})",
1027                name,
1028                cfg.project_root.display()
1029            );
1030
1031            axum::serve(listener, app.into_make_service())
1032                .with_graceful_shutdown(async move {
1033                    let _ = tokio::signal::ctrl_c().await;
1034                })
1035                .await
1036                .context("ipc server")?;
1037            Ok(())
1038        }
1039    }
1040}
1041
1042#[cfg(test)]
1043mod tests {
1044    use super::*;
1045    use axum::body::Body;
1046    use axum::http::Request;
1047    use futures::StreamExt;
1048    use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
1049    use serde_json::json;
1050    use tower::ServiceExt;
1051
1052    async fn read_first_sse_message(body: Body) -> String {
1053        let mut stream = body.into_data_stream();
1054        let mut buf: Vec<u8> = Vec::new();
1055        for _ in 0..32 {
1056            let next = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
1057            let Ok(Some(Ok(bytes))) = next else {
1058                break;
1059            };
1060            buf.extend_from_slice(&bytes);
1061            if buf.windows(2).any(|w| w == b"\n\n") {
1062                break;
1063            }
1064        }
1065        String::from_utf8_lossy(&buf).to_string()
1066    }
1067
1068    #[tokio::test]
1069    async fn auth_token_blocks_requests_without_bearer_header() {
1070        let dir = tempfile::tempdir().expect("tempdir");
1071        let root_str = dir.path().to_string_lossy().to_string();
1072        let service_project_root = root_str.clone();
1073        let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
1074            Ok(LeanCtxServer::new_shared_with_context(
1075                &service_project_root,
1076                "default",
1077                "default",
1078            ))
1079        };
1080        let cfg = StreamableHttpServerConfig::default()
1081            .with_stateful_mode(false)
1082            .with_json_response(true);
1083
1084        let mcp_http = StreamableHttpService::new(
1085            service_factory,
1086            Arc::new(
1087                rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
1088            ),
1089            cfg,
1090        );
1091
1092        let state = AppState {
1093            token: Some("secret".to_string()),
1094            concurrency: Arc::new(tokio::sync::Semaphore::new(4)),
1095            rate: Arc::new(RateLimiter::new(50, 100)),
1096            project_root: root_str.clone(),
1097            timeout: Duration::from_secs(30),
1098            server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1099        };
1100
1101        let app = Router::new()
1102            .fallback_service(mcp_http)
1103            .layer(middleware::from_fn_with_state(
1104                state.clone(),
1105                auth_middleware,
1106            ))
1107            .with_state(state);
1108
1109        let body = json!({
1110            "jsonrpc": "2.0",
1111            "id": 1,
1112            "method": "tools/list",
1113            "params": {}
1114        })
1115        .to_string();
1116
1117        let req = Request::builder()
1118            .method("POST")
1119            .uri("/")
1120            .header("Host", "localhost")
1121            .header("Accept", "application/json, text/event-stream")
1122            .header("Content-Type", "application/json")
1123            .body(Body::from(body))
1124            .expect("request");
1125
1126        let resp = app.clone().oneshot(req).await.expect("resp");
1127        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1128    }
1129
1130    #[tokio::test]
1131    async fn mcp_service_factory_isolates_per_client_state() {
1132        let dir = tempfile::tempdir().expect("tempdir");
1133        let root_str = dir.path().to_string_lossy().to_string();
1134
1135        // Mirrors the serve() setup: service_factory must create a fresh server per MCP session.
1136        let service_project_root = root_str.clone();
1137        let service_factory = move || -> Result<LeanCtxServer, std::convert::Infallible> {
1138            Ok(LeanCtxServer::new_shared_with_context(
1139                &service_project_root,
1140                "default",
1141                "default",
1142            ))
1143        };
1144
1145        let s1 = service_factory().expect("server 1");
1146        let s2 = service_factory().expect("server 2");
1147
1148        // If the two servers accidentally share the same Arc-backed fields, these writes would
1149        // clobber each other. This test stays independent of rmcp's InitializeRequestParams API.
1150        *s1.client_name.write().await = "client-a".to_string();
1151        *s2.client_name.write().await = "client-b".to_string();
1152
1153        let a = s1.client_name.read().await.clone();
1154        let b = s2.client_name.read().await.clone();
1155        assert_eq!(a, "client-a");
1156        assert_eq!(b, "client-b");
1157    }
1158
1159    #[tokio::test]
1160    async fn rate_limit_returns_429_when_exhausted() {
1161        let state = AppState {
1162            token: None,
1163            concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1164            rate: Arc::new(RateLimiter::new(1, 1)),
1165            project_root: ".".to_string(),
1166            timeout: Duration::from_secs(30),
1167            server: LeanCtxServer::new_shared_with_context(".", "default", "default"),
1168        };
1169
1170        let app = Router::new()
1171            .route("/limited", get(|| async { (StatusCode::OK, "ok\n") }))
1172            .layer(middleware::from_fn_with_state(
1173                state.clone(),
1174                rate_limit_middleware,
1175            ))
1176            .with_state(state);
1177
1178        let req1 = Request::builder()
1179            .method("GET")
1180            .uri("/limited")
1181            .header("Host", "localhost")
1182            .body(Body::empty())
1183            .expect("req1");
1184        let resp1 = app.clone().oneshot(req1).await.expect("resp1");
1185        assert_eq!(resp1.status(), StatusCode::OK);
1186
1187        let req2 = Request::builder()
1188            .method("GET")
1189            .uri("/limited")
1190            .header("Host", "localhost")
1191            .body(Body::empty())
1192            .expect("req2");
1193        let resp2 = app.clone().oneshot(req2).await.expect("resp2");
1194        assert_eq!(resp2.status(), StatusCode::TOO_MANY_REQUESTS);
1195    }
1196
1197    #[tokio::test]
1198    async fn audit_events_endpoint_returns_json() {
1199        let dir = tempfile::tempdir().expect("tempdir");
1200        let root_str = dir.path().to_string_lossy().to_string();
1201
1202        let state = AppState {
1203            token: None,
1204            concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1205            rate: Arc::new(RateLimiter::new(50, 100)),
1206            project_root: root_str.clone(),
1207            timeout: Duration::from_secs(30),
1208            server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1209        };
1210
1211        let app = Router::new()
1212            .route("/v1/audit/events", get(v1_audit_events))
1213            .with_state(state);
1214
1215        let req = Request::builder()
1216            .method("GET")
1217            .uri("/v1/audit/events?limit=10")
1218            .header("Host", "localhost")
1219            .body(Body::empty())
1220            .unwrap();
1221
1222        let resp = app.oneshot(req).await.unwrap();
1223        assert_eq!(resp.status(), StatusCode::OK);
1224
1225        let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1226            .await
1227            .unwrap();
1228        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1229        assert!(json.get("cross_project_events").unwrap().is_array());
1230        assert!(json.get("audit_trail").unwrap().is_array());
1231    }
1232
1233    #[tokio::test]
1234    async fn events_endpoint_replays_tool_call_event() {
1235        use crate::core::context_os::{self, ContextEventKindV1};
1236
1237        let dir = tempfile::tempdir().expect("tempdir");
1238        std::fs::create_dir_all(dir.path().join(".git")).expect("git marker");
1239        std::fs::write(dir.path().join("a.txt"), "ok").expect("file");
1240        let root_str = dir.path().to_string_lossy().to_string();
1241
1242        let state = AppState {
1243            token: None,
1244            concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1245            rate: Arc::new(RateLimiter::new(50, 100)),
1246            project_root: root_str.clone(),
1247            timeout: Duration::from_secs(30),
1248            server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1249        };
1250
1251        let app = Router::new()
1252            .route("/v1/events", get(v1_events))
1253            .with_state(state);
1254
1255        // Directly append an event to the bus — no fire-and-forget timing dependency.
1256        let rt = context_os::runtime();
1257        rt.bus.append(
1258            "ws1",
1259            "ch1",
1260            &ContextEventKindV1::ToolCallRecorded,
1261            Some("test-agent"),
1262            json!({"tool": "ctx_session", "action": "status"}),
1263        );
1264
1265        let req = Request::builder()
1266            .method("GET")
1267            .uri("/v1/events?workspaceId=ws1&channelId=ch1&since=0&limit=1")
1268            .header("Host", "localhost")
1269            .header("Accept", "text/event-stream")
1270            .body(Body::empty())
1271            .expect("req");
1272        let resp = app.clone().oneshot(req).await.expect("events");
1273        assert_eq!(resp.status(), StatusCode::OK);
1274
1275        let msg = read_first_sse_message(resp.into_body()).await;
1276        assert!(msg.contains("event: tool_call_recorded"), "msg={msg:?}");
1277        assert!(msg.contains("\"ws1\""), "msg={msg:?}");
1278        assert!(msg.contains("\"ch1\""), "msg={msg:?}");
1279    }
1280}