Skip to main content

lean_ctx/http_server/
mod.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{anyhow, Context, Result};
6use axum::{
7    extract::Json,
8    extract::Query,
9    extract::State,
10    http::{header, Request, StatusCode},
11    middleware::{self, Next},
12    response::sse::{Event as SseEvent, KeepAlive, Sse},
13    response::{IntoResponse, Response},
14    routing::get,
15    Router,
16};
17use futures::Stream;
18use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
19use serde::Deserialize;
20use serde_json::Value;
21use tokio::sync::broadcast;
22use tokio::time::{Duration, Instant};
23
24use crate::core::context_os::ContextOsMetrics;
25use crate::engine::ContextEngine;
26use crate::tools::LeanCtxServer;
27
28pub mod context_views;
29
30#[cfg(feature = "team-server")]
31pub mod team;
32
33/// Wrapper stream that calls `record_sse_disconnect` on drop.
34use std::pin::Pin;
35
36pub(crate) struct SseDisconnectGuard<I> {
37    pub(crate) inner: Pin<Box<dyn Stream<Item = I> + Send>>,
38    pub(crate) metrics: Arc<ContextOsMetrics>,
39}
40
41impl<I> Stream for SseDisconnectGuard<I> {
42    type Item = I;
43
44    fn poll_next(
45        mut self: Pin<&mut Self>,
46        cx: &mut std::task::Context<'_>,
47    ) -> std::task::Poll<Option<Self::Item>> {
48        self.inner.as_mut().poll_next(cx)
49    }
50}
51
52impl<I> Drop for SseDisconnectGuard<I> {
53    fn drop(&mut self) {
54        self.metrics.record_sse_disconnect();
55    }
56}
57
58const MAX_ID_LEN: usize = 64;
59
60fn sanitize_id(raw: &str) -> String {
61    let trimmed = raw.trim();
62    if trimmed.is_empty() {
63        return "default".to_string();
64    }
65    let cleaned: String = trimmed
66        .chars()
67        .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
68        .take(MAX_ID_LEN)
69        .collect();
70    if cleaned.is_empty() {
71        "default".to_string()
72    } else {
73        cleaned
74    }
75}
76
77#[derive(Clone, Debug)]
78pub struct HttpServerConfig {
79    pub host: String,
80    pub port: u16,
81    pub project_root: PathBuf,
82    pub auth_token: Option<String>,
83    pub stateful_mode: bool,
84    pub json_response: bool,
85    pub disable_host_check: bool,
86    pub allowed_hosts: Vec<String>,
87    pub max_body_bytes: usize,
88    pub max_concurrency: usize,
89    pub max_rps: u32,
90    pub rate_burst: u32,
91    pub request_timeout_ms: u64,
92}
93
94impl Default for HttpServerConfig {
95    fn default() -> Self {
96        let project_root = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
97        Self {
98            host: "127.0.0.1".to_string(),
99            port: 8080,
100            project_root,
101            auth_token: None,
102            stateful_mode: false,
103            json_response: true,
104            disable_host_check: false,
105            allowed_hosts: Vec::new(),
106            max_body_bytes: 2 * 1024 * 1024,
107            max_concurrency: 32,
108            max_rps: 50,
109            rate_burst: 100,
110            request_timeout_ms: 30_000,
111        }
112    }
113}
114
115impl HttpServerConfig {
116    pub fn validate(&self) -> Result<()> {
117        let host = self.host.trim().to_lowercase();
118        let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
119        if !is_loopback && self.auth_token.as_deref().unwrap_or("").is_empty() {
120            return Err(anyhow!(
121                "Refusing to bind to host='{host}' without auth. Provide --auth-token (or bind to 127.0.0.1)."
122            ));
123        }
124        Ok(())
125    }
126
127    pub fn effective_auth_token(&self) -> Option<String> {
128        if let Some(ref token) = self.auth_token {
129            if !token.is_empty() {
130                return Some(token.clone());
131            }
132        }
133        let host = self.host.trim().to_lowercase();
134        let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
135        if is_loopback {
136            let auto_token = crate::core::session_token::generate_token();
137            eprintln!(
138                "[lean-ctx] Auto-generated auth token for loopback: {auto_token}\n\
139                 Pass as Bearer token or set --auth-token explicitly."
140            );
141            Some(auto_token)
142        } else {
143            None
144        }
145    }
146
147    fn mcp_http_config(&self) -> StreamableHttpServerConfig {
148        let mut cfg = StreamableHttpServerConfig::default()
149            .with_stateful_mode(self.stateful_mode)
150            .with_json_response(self.json_response);
151
152        if self.disable_host_check {
153            tracing::warn!(
154                "⚠ --disable-host-check is active: DNS rebinding protection is OFF. \
155                 Do NOT use this in production or on non-loopback interfaces."
156            );
157            cfg = cfg.disable_allowed_hosts();
158            return cfg;
159        }
160
161        if !self.allowed_hosts.is_empty() {
162            cfg = cfg.with_allowed_hosts(self.allowed_hosts.clone());
163            return cfg;
164        }
165
166        // Keep rmcp's secure loopback defaults; also allow the configured host (if it's loopback).
167        let host = self.host.trim();
168        if host == "127.0.0.1" || host == "localhost" || host == "::1" {
169            cfg.allowed_hosts.push(host.to_string());
170        }
171
172        cfg
173    }
174}
175
176#[derive(Clone)]
177struct AppState {
178    token: Option<String>,
179    concurrency: Arc<tokio::sync::Semaphore>,
180    rate: Arc<RateLimiter>,
181    project_root: String,
182    timeout: Duration,
183    server: LeanCtxServer,
184}
185
186#[derive(Debug)]
187struct RateLimiter {
188    max_rps: f64,
189    burst: f64,
190    state: tokio::sync::Mutex<RateState>,
191}
192
193#[derive(Debug, Clone, Copy)]
194struct RateState {
195    tokens: f64,
196    last: Instant,
197}
198
199impl RateLimiter {
200    fn new(max_rps: u32, burst: u32) -> Self {
201        let now = Instant::now();
202        Self {
203            max_rps: (max_rps.max(1)) as f64,
204            burst: (burst.max(1)) as f64,
205            state: tokio::sync::Mutex::new(RateState {
206                tokens: (burst.max(1)) as f64,
207                last: now,
208            }),
209        }
210    }
211
212    async fn allow(&self) -> bool {
213        let mut s = self.state.lock().await;
214        let now = Instant::now();
215        let elapsed = now.saturating_duration_since(s.last);
216        let refill = elapsed.as_secs_f64() * self.max_rps;
217        s.tokens = (s.tokens + refill).min(self.burst);
218        s.last = now;
219        if s.tokens >= 1.0 {
220            s.tokens -= 1.0;
221            true
222        } else {
223            false
224        }
225    }
226}
227
228async fn auth_middleware(
229    State(state): State<AppState>,
230    req: Request<axum::body::Body>,
231    next: Next,
232) -> Response {
233    if state.token.is_none() {
234        return next.run(req).await;
235    }
236
237    if req.uri().path() == "/health" {
238        return next.run(req).await;
239    }
240
241    let expected = state.token.as_deref().unwrap_or("");
242    let Some(h) = req.headers().get(header::AUTHORIZATION) else {
243        return StatusCode::UNAUTHORIZED.into_response();
244    };
245    let Ok(s) = h.to_str() else {
246        return StatusCode::UNAUTHORIZED.into_response();
247    };
248    let Some(token) = s
249        .strip_prefix("Bearer ")
250        .or_else(|| s.strip_prefix("bearer "))
251    else {
252        return StatusCode::UNAUTHORIZED.into_response();
253    };
254    if !constant_time_eq(token.as_bytes(), expected.as_bytes()) {
255        return StatusCode::UNAUTHORIZED.into_response();
256    }
257
258    next.run(req).await
259}
260
261fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
262    use subtle::ConstantTimeEq;
263    if a.len() != b.len() {
264        return false;
265    }
266    bool::from(a.ct_eq(b))
267}
268
269async fn rate_limit_middleware(
270    State(state): State<AppState>,
271    req: Request<axum::body::Body>,
272    next: Next,
273) -> Response {
274    if !state.rate.allow().await {
275        return StatusCode::TOO_MANY_REQUESTS.into_response();
276    }
277    next.run(req).await
278}
279
280async fn concurrency_middleware(
281    State(state): State<AppState>,
282    req: Request<axum::body::Body>,
283    next: Next,
284) -> Response {
285    let Ok(permit) = state.concurrency.clone().try_acquire_owned() else {
286        return StatusCode::TOO_MANY_REQUESTS.into_response();
287    };
288    let resp = next.run(req).await;
289    drop(permit);
290    resp
291}
292
293async fn health() -> impl IntoResponse {
294    (StatusCode::OK, "ok\n")
295}
296
297async fn v1_shutdown() -> impl IntoResponse {
298    tokio::spawn(async {
299        tokio::time::sleep(Duration::from_millis(100)).await;
300        std::process::exit(0);
301    });
302    (StatusCode::OK, "shutting down\n")
303}
304
305#[derive(Debug, Deserialize)]
306#[serde(rename_all = "camelCase")]
307struct ToolCallBody {
308    name: String,
309    #[serde(default)]
310    arguments: Option<Value>,
311    #[serde(default)]
312    _workspace_id: Option<String>,
313    #[serde(default)]
314    _channel_id: Option<String>,
315}
316
317#[derive(Debug, Deserialize)]
318#[serde(rename_all = "camelCase")]
319struct EventsQuery {
320    #[serde(default)]
321    workspace_id: Option<String>,
322    #[serde(default)]
323    channel_id: Option<String>,
324    #[serde(default)]
325    since: Option<i64>,
326    #[serde(default)]
327    limit: Option<usize>,
328    /// Comma-separated event kind filter (e.g. `tool_call,session_start`).
329    /// When set, only matching events are delivered via SSE.
330    #[serde(default)]
331    kind: Option<String>,
332}
333
334async fn v1_manifest(State(state): State<AppState>) -> impl IntoResponse {
335    let _ = state;
336    let v = crate::core::mcp_manifest::manifest_value();
337    (StatusCode::OK, Json(v))
338}
339
340#[derive(Debug, Deserialize)]
341#[serde(rename_all = "camelCase")]
342struct ToolsQuery {
343    #[serde(default)]
344    offset: Option<usize>,
345    #[serde(default)]
346    limit: Option<usize>,
347}
348
349async fn v1_tools(State(state): State<AppState>, Query(q): Query<ToolsQuery>) -> impl IntoResponse {
350    let _ = state;
351    let v = crate::core::mcp_manifest::manifest_value();
352    let tools = v
353        .get("tools")
354        .and_then(|t| t.get("granular"))
355        .cloned()
356        .unwrap_or(Value::Array(vec![]));
357
358    let all = tools.as_array().cloned().unwrap_or_default();
359    let total = all.len();
360    let offset = q.offset.unwrap_or(0).min(total);
361    let limit = q.limit.unwrap_or(200).min(500);
362    let page = all.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
363
364    (
365        StatusCode::OK,
366        Json(serde_json::json!({
367            "tools": page,
368            "total": total,
369            "offset": offset,
370            "limit": limit,
371        })),
372    )
373}
374
375async fn v1_tool_call(
376    State(state): State<AppState>,
377    Json(body): Json<ToolCallBody>,
378) -> impl IntoResponse {
379    let engine = ContextEngine::from_server(state.server.clone());
380    match tokio::time::timeout(
381        state.timeout,
382        engine.call_tool_value(&body.name, body.arguments),
383    )
384    .await
385    {
386        Ok(Ok(v)) => (StatusCode::OK, Json(serde_json::json!({ "result": v }))).into_response(),
387        Ok(Err(e)) => {
388            tracing::warn!("tool call error: {e}");
389            (
390                StatusCode::BAD_REQUEST,
391                Json(serde_json::json!({ "error": "tool_error", "code": "TOOL_ERROR" })),
392            )
393                .into_response()
394        }
395        Err(_) => (
396            StatusCode::GATEWAY_TIMEOUT,
397            Json(serde_json::json!({ "error": "request_timeout" })),
398        )
399            .into_response(),
400    }
401}
402
403async fn v1_events(
404    State(state): State<AppState>,
405    Query(q): Query<EventsQuery>,
406) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
407    use crate::core::context_os::{redact_event_payload, ContextEventV1, RedactionLevel};
408
409    let ws = sanitize_id(&q.workspace_id.unwrap_or_else(|| "default".to_string()));
410    let ch = sanitize_id(&q.channel_id.unwrap_or_else(|| "default".to_string()));
411    let _ = &state.project_root;
412    let since = q.since.unwrap_or(0);
413    let limit = q.limit.unwrap_or(200).min(1000);
414    let redaction = RedactionLevel::RefsOnly;
415
416    let kind_filter: Option<Vec<String>> = q
417        .kind
418        .as_deref()
419        .map(|k| k.split(',').map(|s| s.trim().to_string()).collect());
420
421    let rt = crate::core::context_os::runtime();
422    let replay = rt.bus.read(&ws, &ch, since, limit);
423
424    let replay = if let Some(ref kinds) = kind_filter {
425        replay
426            .into_iter()
427            .filter(|ev| kinds.contains(&ev.kind))
428            .collect()
429    } else {
430        replay
431    };
432
433    let rx = if let Some(ref kinds) = kind_filter {
434        let kind_refs: Vec<&str> = kinds.iter().map(String::as_str).collect();
435        let filter = crate::core::context_os::TopicFilter::kinds(&kind_refs);
436        if let Some(sub) = rt.bus.subscribe_filtered(&ws, &ch, filter) {
437            crate::core::context_os::SubscriptionKind::Filtered(sub)
438        } else {
439            tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
440            let (_, rx) = broadcast::channel::<ContextEventV1>(1);
441            crate::core::context_os::SubscriptionKind::Unfiltered(rx)
442        }
443    } else if let Some(sub) = rt.bus.subscribe(&ws, &ch) {
444        crate::core::context_os::SubscriptionKind::Unfiltered(sub)
445    } else {
446        tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
447        let (_, rx) = broadcast::channel::<ContextEventV1>(1);
448        crate::core::context_os::SubscriptionKind::Unfiltered(rx)
449    };
450
451    rt.metrics.record_sse_connect();
452    rt.metrics.record_events_replayed(replay.len() as u64);
453    rt.metrics.record_workspace_active(&ws);
454
455    let bus = rt.bus.clone();
456    let metrics = rt.metrics.clone();
457    let pending: std::collections::VecDeque<ContextEventV1> = replay.into();
458
459    let stream = futures::stream::unfold(
460        (
461            pending,
462            rx,
463            ws.clone(),
464            ch.clone(),
465            since,
466            redaction,
467            bus,
468            metrics,
469        ),
470        |(mut pending, mut rx, ws, ch, mut last_id, redaction, bus, metrics)| async move {
471            if let Some(mut ev) = pending.pop_front() {
472                last_id = ev.id;
473                redact_event_payload(&mut ev, redaction);
474                let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
475                let evt = SseEvent::default()
476                    .id(ev.id.to_string())
477                    .event(ev.kind)
478                    .data(data);
479                return Some((
480                    Ok(evt),
481                    (pending, rx, ws, ch, last_id, redaction, bus, metrics),
482                ));
483            }
484
485            loop {
486                match rx.recv().await {
487                    Ok(mut ev) if ev.id > last_id => {
488                        last_id = ev.id;
489                        redact_event_payload(&mut ev, redaction);
490                        let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
491                        let evt = SseEvent::default()
492                            .id(ev.id.to_string())
493                            .event(ev.kind)
494                            .data(data);
495                        return Some((
496                            Ok(evt),
497                            (pending, rx, ws, ch, last_id, redaction, bus, metrics),
498                        ));
499                    }
500                    Ok(_) => {}
501                    Err(broadcast::error::RecvError::Closed) => return None,
502                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
503                        let missed = bus.read(&ws, &ch, last_id, skipped as usize);
504                        metrics.record_events_replayed(missed.len() as u64);
505                        for ev in missed {
506                            last_id = last_id.max(ev.id);
507                            pending.push_back(ev);
508                        }
509                    }
510                }
511            }
512        },
513    );
514
515    let metrics_ref = rt.metrics.clone();
516    let guarded = SseDisconnectGuard {
517        inner: Box::pin(stream),
518        metrics: metrics_ref,
519    };
520
521    Sse::new(guarded).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
522}
523
524#[derive(Debug, Deserialize)]
525struct AuditEventsQuery {
526    #[serde(default = "default_audit_limit")]
527    limit: usize,
528}
529
530fn default_audit_limit() -> usize {
531    100
532}
533
534async fn v1_audit_events(Query(q): Query<AuditEventsQuery>) -> impl IntoResponse {
535    let capped = q.limit.min(1000);
536    let boundary_events = crate::core::memory_boundary::load_audit_events(capped);
537    let trail_events = crate::core::audit_trail::load_recent(capped);
538
539    Json(serde_json::json!({
540        "cross_project_events": boundary_events,
541        "audit_trail": trail_events,
542    }))
543}
544
545async fn v1_metrics(State(_state): State<AppState>) -> impl IntoResponse {
546    let rt = crate::core::context_os::runtime();
547    let snap = rt.metrics.snapshot();
548    (
549        StatusCode::OK,
550        Json(serde_json::to_value(snap).unwrap_or_default()),
551    )
552}
553
554const MAX_HANDOFF_PAYLOAD_BYTES: usize = 1_000_000;
555const MAX_HANDOFF_FILES: usize = 50;
556
557async fn v1_a2a_handoff(
558    State(state): State<AppState>,
559    Json(body): Json<Value>,
560) -> impl IntoResponse {
561    let envelope = match crate::core::a2a_transport::parse_envelope(
562        &serde_json::to_string(&body).unwrap_or_default(),
563    ) {
564        Ok(env) => env,
565        Err(e) => {
566            tracing::warn!("a2a handoff parse error: {e}");
567            return (
568                StatusCode::BAD_REQUEST,
569                Json(serde_json::json!({"error": "invalid_envelope"})),
570            );
571        }
572    };
573
574    if envelope.payload_json.len() > MAX_HANDOFF_PAYLOAD_BYTES {
575        tracing::warn!(
576            "a2a handoff payload too large: {} bytes (limit {MAX_HANDOFF_PAYLOAD_BYTES})",
577            envelope.payload_json.len()
578        );
579        return (
580            StatusCode::PAYLOAD_TOO_LARGE,
581            Json(serde_json::json!({"error": "payload_too_large"})),
582        );
583    }
584
585    let rt = crate::core::context_os::runtime();
586    rt.bus.append(
587        &state.project_root,
588        "a2a",
589        &crate::core::context_os::ContextEventKindV1::SessionMutated,
590        Some(&envelope.sender.agent_id),
591        serde_json::json!({
592            "type": "handoff_received",
593            "content_type": format!("{:?}", envelope.content_type),
594            "sender": envelope.sender.agent_id,
595            "payload_size": envelope.payload_json.len(),
596        }),
597    );
598
599    match envelope.content_type {
600        crate::core::a2a_transport::TransportContentType::ContextPackage => {
601            let dir = std::path::Path::new(&state.project_root)
602                .join(".lean-ctx")
603                .join("handoffs")
604                .join("packages");
605            let _ = std::fs::create_dir_all(&dir);
606            evict_oldest_files(&dir, MAX_HANDOFF_FILES);
607            let out = dir.join(format!(
608                "ctx-{}.{}",
609                chrono::Utc::now().format("%Y%m%d_%H%M%S"),
610                crate::core::contracts::PACKAGE_EXTENSION
611            ));
612            if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
613                tracing::error!("a2a handoff write failed: {e}");
614                return (
615                    StatusCode::INTERNAL_SERVER_ERROR,
616                    Json(serde_json::json!({"error": "write_failed"})),
617                );
618            }
619            (
620                StatusCode::OK,
621                Json(serde_json::json!({
622                    "status": "received",
623                    "content_type": "context_package",
624                })),
625            )
626        }
627        crate::core::a2a_transport::TransportContentType::HandoffBundle => {
628            let dir = std::path::Path::new(&state.project_root)
629                .join(".lean-ctx")
630                .join("handoffs");
631            let _ = std::fs::create_dir_all(&dir);
632            evict_oldest_files(&dir, MAX_HANDOFF_FILES);
633            let out = dir.join(format!(
634                "received-{}.json",
635                chrono::Utc::now().format("%Y%m%d_%H%M%S")
636            ));
637            if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
638                tracing::error!("a2a handoff write failed: {e}");
639                return (
640                    StatusCode::INTERNAL_SERVER_ERROR,
641                    Json(serde_json::json!({"error": "write_failed"})),
642                );
643            }
644            (
645                StatusCode::OK,
646                Json(serde_json::json!({
647                    "status": "received",
648                    "content_type": "handoff_bundle",
649                })),
650            )
651        }
652        _ => (
653            StatusCode::OK,
654            Json(serde_json::json!({
655                "status": "received",
656                "content_type": format!("{:?}", envelope.content_type),
657            })),
658        ),
659    }
660}
661
662fn evict_oldest_files(dir: &std::path::Path, max_files: usize) {
663    let Ok(entries) = std::fs::read_dir(dir) else {
664        return;
665    };
666    let mut files: Vec<(std::time::SystemTime, std::path::PathBuf)> = entries
667        .filter_map(|e| {
668            let e = e.ok()?;
669            let meta = e.metadata().ok()?;
670            if meta.is_file() {
671                Some((meta.modified().unwrap_or(std::time::UNIX_EPOCH), e.path()))
672            } else {
673                None
674            }
675        })
676        .collect();
677
678    if files.len() < max_files {
679        return;
680    }
681    files.sort_by_key(|(mtime, _)| *mtime);
682    let to_remove = files.len().saturating_sub(max_files.saturating_sub(1));
683    for (_, path) in files.into_iter().take(to_remove) {
684        let _ = std::fs::remove_file(path);
685    }
686}
687
688async fn a2a_jsonrpc(Json(body): Json<Value>) -> impl IntoResponse {
689    let req: crate::core::a2a::a2a_compat::JsonRpcRequest = match serde_json::from_value(body) {
690        Ok(r) => r,
691        Err(e) => {
692            tracing::debug!("a2a JSON-RPC parse error: {e}");
693            return (
694                StatusCode::BAD_REQUEST,
695                Json(serde_json::json!({
696                    "jsonrpc": "2.0",
697                    "id": null,
698                    "error": {"code": -32700, "message": "invalid request"}
699                })),
700            );
701        }
702    };
703    let resp = crate::core::a2a::a2a_compat::handle_a2a_jsonrpc(&req);
704    let json = serde_json::to_value(resp).unwrap_or_default();
705    (StatusCode::OK, Json(json))
706}
707
708async fn v1_a2a_agent_card(State(state): State<AppState>) -> impl IntoResponse {
709    let card = crate::core::a2a::agent_card::build_agent_card(&state.project_root);
710    (
711        StatusCode::OK,
712        [(header::CONTENT_TYPE, "application/json")],
713        Json(card),
714    )
715}
716
717async fn mcp_server_card() -> impl IntoResponse {
718    let card = serde_json::json!({
719        "name": "lean-ctx",
720        "version": env!("CARGO_PKG_VERSION"),
721        "description": "Context Infrastructure Layer — compression, caching, governance for AI agents",
722        "capabilities": {
723            "tools": true,
724            "resources": false,
725            "prompts": false,
726            "sampling": false
727        },
728        "tool_categories": [
729            {"name": "file_operations", "tools": ["ctx_read", "ctx_search", "ctx_tree", "ctx_edit"], "avg_token_cost": 150},
730            {"name": "session_management", "tools": ["ctx_session", "ctx_compress", "ctx_dedup", "ctx_preload"], "avg_token_cost": 80},
731            {"name": "intelligence", "tools": ["ctx_knowledge", "ctx_semantic_search", "ctx_graph", "ctx_overview"], "avg_token_cost": 200},
732            {"name": "agent_ops", "tools": ["ctx_agent", "ctx_handoff", "ctx_task", "ctx_share"], "avg_token_cost": 120}
733        ],
734        "features": {
735            "compression": "deterministic AST-based, 40-70% token reduction",
736            "caching": "session-scoped with zstd, re-reads ~13 tokens",
737            "audit_trail": "SHA-256 chained JSONL",
738            "rbac": "5 built-in roles with capability-based access",
739            "sandboxing": "Level 0 (subprocess) + Level 1 (OS-level)",
740            "secret_detection": "8 regex patterns + custom"
741        },
742        "security": {
743            "path_jail": true,
744            "rate_limiting": true,
745            "budget_tracking": true,
746            "signed_handoffs": true,
747            "timing_safe_auth": true
748        }
749    });
750    Json(card)
751}
752
753async fn v1_agents_register(
754    State(state): State<AppState>,
755    Json(body): Json<Value>,
756) -> impl IntoResponse {
757    let agent_type = body
758        .get("agent_type")
759        .and_then(|v| v.as_str())
760        .unwrap_or("unknown");
761    let role = body.get("role").and_then(|v| v.as_str());
762    let project_root = body
763        .get("project_root")
764        .and_then(|v| v.as_str())
765        .unwrap_or(&state.project_root);
766
767    let mut registry = crate::core::agents::AgentRegistry::load_or_create();
768    let agent_id = registry.register(agent_type, role, project_root);
769    let _ = registry.save();
770
771    Json(serde_json::json!({
772        "agent_id": agent_id,
773        "status": "registered"
774    }))
775}
776
777async fn v1_agents_heartbeat(Json(body): Json<Value>) -> impl IntoResponse {
778    let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
779    let mut registry = crate::core::agents::AgentRegistry::load_or_create();
780    registry.update_heartbeat(agent_id);
781    let _ = registry.save();
782    Json(serde_json::json!({"status": "ok"}))
783}
784
785async fn v1_agents_list() -> impl IntoResponse {
786    let registry = crate::core::agents::AgentRegistry::load_or_create();
787    let active = registry.list_active(None);
788    Json(serde_json::json!({
789        "agents": active.iter().map(|a| serde_json::json!({
790            "agent_id": a.agent_id,
791            "agent_type": a.agent_type,
792            "role": a.role,
793            "status": a.status.to_string(),
794            "last_active": a.last_active.to_rfc3339(),
795        })).collect::<Vec<_>>()
796    }))
797}
798
799async fn v1_agents_deregister(Json(body): Json<Value>) -> impl IntoResponse {
800    let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
801    let mut registry = crate::core::agents::AgentRegistry::load_or_create();
802    registry.set_status(
803        agent_id,
804        crate::core::agents::AgentStatus::Finished,
805        Some("deregistered via API"),
806    );
807    let _ = registry.save();
808    Json(serde_json::json!({"status": "deregistered"}))
809}
810
811async fn v1_agents_events_sse(
812) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
813    let stream = futures::stream::unfold(0usize, |last_count| async move {
814        loop {
815            tokio::time::sleep(Duration::from_secs(5)).await;
816            let registry = crate::core::agents::AgentRegistry::load_or_create();
817            let active = registry.list_active(None);
818            let count = active.len();
819            if count != last_count {
820                let data = serde_json::json!({
821                    "type": "agents_changed",
822                    "active_count": count,
823                    "agents": active.iter().map(|a| &a.agent_id).collect::<Vec<_>>(),
824                });
825                return Some((
826                    Ok::<_, std::convert::Infallible>(SseEvent::default().data(data.to_string())),
827                    count,
828                ));
829            }
830        }
831    });
832
833    Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
834}
835
836fn build_app_router(cfg: &HttpServerConfig) -> Router {
837    let project_root = cfg.project_root.to_string_lossy().to_string();
838    let service_project_root = project_root.clone();
839    let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
840        Ok(LeanCtxServer::new_shared_with_context(
841            &service_project_root,
842            "default",
843            "default",
844        ))
845    };
846    let mcp_http = StreamableHttpService::new(
847        service_factory,
848        Arc::new(
849            rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
850        ),
851        cfg.mcp_http_config(),
852    );
853
854    let rest_server = LeanCtxServer::new_shared_with_context(&project_root, "default", "default");
855
856    let state = AppState {
857        token: cfg.effective_auth_token(),
858        concurrency: Arc::new(tokio::sync::Semaphore::new(cfg.max_concurrency.max(1))),
859        rate: Arc::new(RateLimiter::new(cfg.max_rps, cfg.rate_burst)),
860        project_root,
861        timeout: Duration::from_millis(cfg.request_timeout_ms.max(1)),
862        server: rest_server,
863    };
864
865    Router::new()
866        .route("/health", get(health))
867        .route("/v1/shutdown", axum::routing::post(v1_shutdown))
868        .route("/v1/manifest", get(v1_manifest))
869        .route("/v1/tools", get(v1_tools))
870        .route("/v1/tools/call", axum::routing::post(v1_tool_call))
871        .route("/v1/events", get(v1_events))
872        .route(
873            "/v1/context/summary",
874            get(context_views::v1_context_summary),
875        )
876        .route("/v1/events/search", get(context_views::v1_events_search))
877        .route("/v1/events/lineage", get(context_views::v1_event_lineage))
878        .route("/v1/metrics", get(v1_metrics))
879        .route("/v1/audit/events", get(v1_audit_events))
880        .route("/v1/a2a/handoff", axum::routing::post(v1_a2a_handoff))
881        .route("/v1/a2a/agent-card", get(v1_a2a_agent_card))
882        .route("/.well-known/agent.json", get(v1_a2a_agent_card))
883        .route("/.well-known/mcp-server.json", get(mcp_server_card))
884        .route("/a2a", axum::routing::post(a2a_jsonrpc))
885        .route(
886            "/v1/agents/register",
887            axum::routing::post(v1_agents_register),
888        )
889        .route(
890            "/v1/agents/heartbeat",
891            axum::routing::post(v1_agents_heartbeat),
892        )
893        .route("/v1/agents/list", get(v1_agents_list))
894        .route(
895            "/v1/agents/deregister",
896            axum::routing::post(v1_agents_deregister),
897        )
898        .route("/v1/agents/events", get(v1_agents_events_sse))
899        .fallback_service(mcp_http)
900        .layer(axum::extract::DefaultBodyLimit::max(cfg.max_body_bytes))
901        .layer(middleware::from_fn_with_state(
902            state.clone(),
903            rate_limit_middleware,
904        ))
905        .layer(middleware::from_fn_with_state(
906            state.clone(),
907            concurrency_middleware,
908        ))
909        .layer(middleware::from_fn_with_state(
910            state.clone(),
911            auth_middleware,
912        ))
913        .with_state(state)
914}
915
916pub async fn serve(cfg: HttpServerConfig) -> Result<()> {
917    crate::core::protocol::set_mcp_context(true);
918    cfg.validate()?;
919
920    let addr: SocketAddr = format!("{}:{}", cfg.host, cfg.port)
921        .parse()
922        .context("invalid host/port")?;
923
924    let app = build_app_router(&cfg);
925
926    let listener = tokio::net::TcpListener::bind(addr)
927        .await
928        .with_context(|| format!("bind {addr}"))?;
929
930    tracing::info!(
931        "lean-ctx Streamable HTTP server listening on http://{addr} (project_root={})",
932        cfg.project_root.display()
933    );
934
935    axum::serve(listener, app)
936        .with_graceful_shutdown(async move {
937            let _ = tokio::signal::ctrl_c().await;
938        })
939        .await
940        .context("http server")?;
941    Ok(())
942}
943
944#[cfg(windows)]
945impl axum::serve::Listener for crate::ipc::NamedPipeListener {
946    type Io = tokio::net::windows::named_pipe::NamedPipeServer;
947    type Addr = String;
948
949    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
950        loop {
951            match self.accept_pipe().await {
952                Ok(pipe) => return (pipe, self.name().to_string()),
953                Err(e) => {
954                    tracing::error!("named pipe accept error: {e}");
955                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
956                }
957            }
958        }
959    }
960
961    fn local_addr(&self) -> std::io::Result<Self::Addr> {
962        Ok(self.name().to_string())
963    }
964}
965
966/// Serve the daemon over a platform-independent IPC channel (UDS on Unix,
967/// Named Pipes on Windows).
968pub async fn serve_ipc(cfg: HttpServerConfig, addr: crate::ipc::DaemonAddr) -> Result<()> {
969    cfg.validate()?;
970
971    match addr {
972        #[cfg(unix)]
973        crate::ipc::DaemonAddr::Unix(ref path) => {
974            let app = build_app_router(&cfg);
975            let listener = crate::ipc::bind_listener(&addr)?;
976
977            tracing::info!(
978                "lean-ctx daemon listening on {} (project_root={})",
979                path.display(),
980                cfg.project_root.display()
981            );
982
983            axum::serve(listener, app.into_make_service())
984                .with_graceful_shutdown(async move {
985                    let _ = tokio::signal::ctrl_c().await;
986                })
987                .await
988                .context("ipc server")?;
989            Ok(())
990        }
991        #[cfg(windows)]
992        crate::ipc::DaemonAddr::NamedPipe(ref name) => {
993            let app = build_app_router(&cfg);
994            let listener = crate::ipc::bind_listener(&addr)?;
995
996            tracing::info!(
997                "lean-ctx daemon listening on {} (project_root={})",
998                name,
999                cfg.project_root.display()
1000            );
1001
1002            axum::serve(listener, app.into_make_service())
1003                .with_graceful_shutdown(async move {
1004                    let _ = tokio::signal::ctrl_c().await;
1005                })
1006                .await
1007                .context("ipc server")?;
1008            Ok(())
1009        }
1010    }
1011}
1012
1013#[cfg(test)]
1014mod tests {
1015    use super::*;
1016    use axum::body::Body;
1017    use axum::http::Request;
1018    use futures::StreamExt;
1019    use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
1020    use serde_json::json;
1021    use tower::ServiceExt;
1022
1023    async fn read_first_sse_message(body: Body) -> String {
1024        let mut stream = body.into_data_stream();
1025        let mut buf: Vec<u8> = Vec::new();
1026        for _ in 0..32 {
1027            let next = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
1028            let Ok(Some(Ok(bytes))) = next else {
1029                break;
1030            };
1031            buf.extend_from_slice(&bytes);
1032            if buf.windows(2).any(|w| w == b"\n\n") {
1033                break;
1034            }
1035        }
1036        String::from_utf8_lossy(&buf).to_string()
1037    }
1038
1039    #[tokio::test]
1040    async fn auth_token_blocks_requests_without_bearer_header() {
1041        let dir = tempfile::tempdir().expect("tempdir");
1042        let root_str = dir.path().to_string_lossy().to_string();
1043        let service_project_root = root_str.clone();
1044        let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
1045            Ok(LeanCtxServer::new_shared_with_context(
1046                &service_project_root,
1047                "default",
1048                "default",
1049            ))
1050        };
1051        let cfg = StreamableHttpServerConfig::default()
1052            .with_stateful_mode(false)
1053            .with_json_response(true);
1054
1055        let mcp_http = StreamableHttpService::new(
1056            service_factory,
1057            Arc::new(
1058                rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
1059            ),
1060            cfg,
1061        );
1062
1063        let state = AppState {
1064            token: Some("secret".to_string()),
1065            concurrency: Arc::new(tokio::sync::Semaphore::new(4)),
1066            rate: Arc::new(RateLimiter::new(50, 100)),
1067            project_root: root_str.clone(),
1068            timeout: Duration::from_secs(30),
1069            server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1070        };
1071
1072        let app = Router::new()
1073            .fallback_service(mcp_http)
1074            .layer(middleware::from_fn_with_state(
1075                state.clone(),
1076                auth_middleware,
1077            ))
1078            .with_state(state);
1079
1080        let body = json!({
1081            "jsonrpc": "2.0",
1082            "id": 1,
1083            "method": "tools/list",
1084            "params": {}
1085        })
1086        .to_string();
1087
1088        let req = Request::builder()
1089            .method("POST")
1090            .uri("/")
1091            .header("Host", "localhost")
1092            .header("Accept", "application/json, text/event-stream")
1093            .header("Content-Type", "application/json")
1094            .body(Body::from(body))
1095            .expect("request");
1096
1097        let resp = app.clone().oneshot(req).await.expect("resp");
1098        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1099    }
1100
1101    #[tokio::test]
1102    async fn mcp_service_factory_isolates_per_client_state() {
1103        let dir = tempfile::tempdir().expect("tempdir");
1104        let root_str = dir.path().to_string_lossy().to_string();
1105
1106        // Mirrors the serve() setup: service_factory must create a fresh server per MCP session.
1107        let service_project_root = root_str.clone();
1108        let service_factory = move || -> Result<LeanCtxServer, std::convert::Infallible> {
1109            Ok(LeanCtxServer::new_shared_with_context(
1110                &service_project_root,
1111                "default",
1112                "default",
1113            ))
1114        };
1115
1116        let s1 = service_factory().expect("server 1");
1117        let s2 = service_factory().expect("server 2");
1118
1119        // If the two servers accidentally share the same Arc-backed fields, these writes would
1120        // clobber each other. This test stays independent of rmcp's InitializeRequestParams API.
1121        *s1.client_name.write().await = "client-a".to_string();
1122        *s2.client_name.write().await = "client-b".to_string();
1123
1124        let a = s1.client_name.read().await.clone();
1125        let b = s2.client_name.read().await.clone();
1126        assert_eq!(a, "client-a");
1127        assert_eq!(b, "client-b");
1128    }
1129
1130    #[tokio::test]
1131    async fn rate_limit_returns_429_when_exhausted() {
1132        let state = AppState {
1133            token: None,
1134            concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1135            rate: Arc::new(RateLimiter::new(1, 1)),
1136            project_root: ".".to_string(),
1137            timeout: Duration::from_secs(30),
1138            server: LeanCtxServer::new_shared_with_context(".", "default", "default"),
1139        };
1140
1141        let app = Router::new()
1142            .route("/limited", get(|| async { (StatusCode::OK, "ok\n") }))
1143            .layer(middleware::from_fn_with_state(
1144                state.clone(),
1145                rate_limit_middleware,
1146            ))
1147            .with_state(state);
1148
1149        let req1 = Request::builder()
1150            .method("GET")
1151            .uri("/limited")
1152            .header("Host", "localhost")
1153            .body(Body::empty())
1154            .expect("req1");
1155        let resp1 = app.clone().oneshot(req1).await.expect("resp1");
1156        assert_eq!(resp1.status(), StatusCode::OK);
1157
1158        let req2 = Request::builder()
1159            .method("GET")
1160            .uri("/limited")
1161            .header("Host", "localhost")
1162            .body(Body::empty())
1163            .expect("req2");
1164        let resp2 = app.clone().oneshot(req2).await.expect("resp2");
1165        assert_eq!(resp2.status(), StatusCode::TOO_MANY_REQUESTS);
1166    }
1167
1168    #[tokio::test]
1169    async fn audit_events_endpoint_returns_json() {
1170        let dir = tempfile::tempdir().expect("tempdir");
1171        let root_str = dir.path().to_string_lossy().to_string();
1172
1173        let state = AppState {
1174            token: None,
1175            concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1176            rate: Arc::new(RateLimiter::new(50, 100)),
1177            project_root: root_str.clone(),
1178            timeout: Duration::from_secs(30),
1179            server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1180        };
1181
1182        let app = Router::new()
1183            .route("/v1/audit/events", get(v1_audit_events))
1184            .with_state(state);
1185
1186        let req = Request::builder()
1187            .method("GET")
1188            .uri("/v1/audit/events?limit=10")
1189            .header("Host", "localhost")
1190            .body(Body::empty())
1191            .unwrap();
1192
1193        let resp = app.oneshot(req).await.unwrap();
1194        assert_eq!(resp.status(), StatusCode::OK);
1195
1196        let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1197            .await
1198            .unwrap();
1199        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1200        assert!(json.get("cross_project_events").unwrap().is_array());
1201        assert!(json.get("audit_trail").unwrap().is_array());
1202    }
1203
1204    #[tokio::test]
1205    async fn events_endpoint_replays_tool_call_event() {
1206        use crate::core::context_os::{self, ContextEventKindV1};
1207
1208        let dir = tempfile::tempdir().expect("tempdir");
1209        std::fs::create_dir_all(dir.path().join(".git")).expect("git marker");
1210        std::fs::write(dir.path().join("a.txt"), "ok").expect("file");
1211        let root_str = dir.path().to_string_lossy().to_string();
1212
1213        let state = AppState {
1214            token: None,
1215            concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1216            rate: Arc::new(RateLimiter::new(50, 100)),
1217            project_root: root_str.clone(),
1218            timeout: Duration::from_secs(30),
1219            server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1220        };
1221
1222        let app = Router::new()
1223            .route("/v1/events", get(v1_events))
1224            .with_state(state);
1225
1226        // Directly append an event to the bus — no fire-and-forget timing dependency.
1227        let rt = context_os::runtime();
1228        rt.bus.append(
1229            "ws1",
1230            "ch1",
1231            &ContextEventKindV1::ToolCallRecorded,
1232            Some("test-agent"),
1233            json!({"tool": "ctx_session", "action": "status"}),
1234        );
1235
1236        let req = Request::builder()
1237            .method("GET")
1238            .uri("/v1/events?workspaceId=ws1&channelId=ch1&since=0&limit=1")
1239            .header("Host", "localhost")
1240            .header("Accept", "text/event-stream")
1241            .body(Body::empty())
1242            .expect("req");
1243        let resp = app.clone().oneshot(req).await.expect("events");
1244        assert_eq!(resp.status(), StatusCode::OK);
1245
1246        let msg = read_first_sse_message(resp.into_body()).await;
1247        assert!(msg.contains("event: tool_call_recorded"), "msg={msg:?}");
1248        assert!(msg.contains("\"ws1\""), "msg={msg:?}");
1249        assert!(msg.contains("\"ch1\""), "msg={msg:?}");
1250    }
1251}