Skip to main content

lean_ctx/http_server/
mod.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{anyhow, Context, Result};
6use axum::{
7    extract::Json,
8    extract::Query,
9    extract::State,
10    http::{header, Request, StatusCode},
11    middleware::{self, Next},
12    response::sse::{Event as SseEvent, KeepAlive, Sse},
13    response::{IntoResponse, Response},
14    routing::get,
15    Router,
16};
17use futures::Stream;
18use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
19use serde::Deserialize;
20use serde_json::Value;
21use tokio::sync::broadcast;
22use tokio::time::{Duration, Instant};
23
24use crate::core::context_os::ContextOsMetrics;
25use crate::engine::ContextEngine;
26use crate::tools::LeanCtxServer;
27
28pub mod context_views;
29
30#[cfg(feature = "team-server")]
31pub mod team;
32
33/// Wrapper stream that calls `record_sse_disconnect` on drop.
34use std::pin::Pin;
35
36pub(crate) struct SseDisconnectGuard<I> {
37    pub(crate) inner: Pin<Box<dyn Stream<Item = I> + Send>>,
38    pub(crate) metrics: Arc<ContextOsMetrics>,
39}
40
41impl<I> Stream for SseDisconnectGuard<I> {
42    type Item = I;
43
44    fn poll_next(
45        mut self: Pin<&mut Self>,
46        cx: &mut std::task::Context<'_>,
47    ) -> std::task::Poll<Option<Self::Item>> {
48        self.inner.as_mut().poll_next(cx)
49    }
50}
51
52impl<I> Drop for SseDisconnectGuard<I> {
53    fn drop(&mut self) {
54        self.metrics.record_sse_disconnect();
55    }
56}
57
58const MAX_ID_LEN: usize = 64;
59
60fn sanitize_id(raw: &str) -> String {
61    let trimmed = raw.trim();
62    if trimmed.is_empty() {
63        return "default".to_string();
64    }
65    let cleaned: String = trimmed
66        .chars()
67        .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
68        .take(MAX_ID_LEN)
69        .collect();
70    if cleaned.is_empty() {
71        "default".to_string()
72    } else {
73        cleaned
74    }
75}
76
77#[derive(Clone, Debug)]
78pub struct HttpServerConfig {
79    pub host: String,
80    pub port: u16,
81    pub project_root: PathBuf,
82    pub auth_token: Option<String>,
83    pub stateful_mode: bool,
84    pub json_response: bool,
85    pub disable_host_check: bool,
86    pub allowed_hosts: Vec<String>,
87    pub max_body_bytes: usize,
88    pub max_concurrency: usize,
89    pub max_rps: u32,
90    pub rate_burst: u32,
91    pub request_timeout_ms: u64,
92}
93
94impl Default for HttpServerConfig {
95    fn default() -> Self {
96        let project_root = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
97        Self {
98            host: "127.0.0.1".to_string(),
99            port: 8080,
100            project_root,
101            auth_token: None,
102            stateful_mode: false,
103            json_response: true,
104            disable_host_check: false,
105            allowed_hosts: Vec::new(),
106            max_body_bytes: 2 * 1024 * 1024,
107            max_concurrency: 32,
108            max_rps: 50,
109            rate_burst: 100,
110            request_timeout_ms: 30_000,
111        }
112    }
113}
114
115impl HttpServerConfig {
116    pub fn validate(&self) -> Result<()> {
117        let host = self.host.trim().to_lowercase();
118        let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
119        if !is_loopback && self.auth_token.as_deref().unwrap_or("").is_empty() {
120            return Err(anyhow!(
121                "Refusing to bind to host='{host}' without auth. Provide --auth-token (or bind to 127.0.0.1)."
122            ));
123        }
124        Ok(())
125    }
126
127    fn mcp_http_config(&self) -> StreamableHttpServerConfig {
128        let mut cfg = StreamableHttpServerConfig::default()
129            .with_stateful_mode(self.stateful_mode)
130            .with_json_response(self.json_response);
131
132        if self.disable_host_check {
133            tracing::warn!(
134                "⚠ --disable-host-check is active: DNS rebinding protection is OFF. \
135                 Do NOT use this in production or on non-loopback interfaces."
136            );
137            cfg = cfg.disable_allowed_hosts();
138            return cfg;
139        }
140
141        if !self.allowed_hosts.is_empty() {
142            cfg = cfg.with_allowed_hosts(self.allowed_hosts.clone());
143            return cfg;
144        }
145
146        // Keep rmcp's secure loopback defaults; also allow the configured host (if it's loopback).
147        let host = self.host.trim();
148        if host == "127.0.0.1" || host == "localhost" || host == "::1" {
149            cfg.allowed_hosts.push(host.to_string());
150        }
151
152        cfg
153    }
154}
155
156#[derive(Clone)]
157struct AppState {
158    token: Option<String>,
159    concurrency: Arc<tokio::sync::Semaphore>,
160    rate: Arc<RateLimiter>,
161    project_root: String,
162    timeout: Duration,
163    server: LeanCtxServer,
164}
165
166#[derive(Debug)]
167struct RateLimiter {
168    max_rps: f64,
169    burst: f64,
170    state: tokio::sync::Mutex<RateState>,
171}
172
173#[derive(Debug, Clone, Copy)]
174struct RateState {
175    tokens: f64,
176    last: Instant,
177}
178
179impl RateLimiter {
180    fn new(max_rps: u32, burst: u32) -> Self {
181        let now = Instant::now();
182        Self {
183            max_rps: (max_rps.max(1)) as f64,
184            burst: (burst.max(1)) as f64,
185            state: tokio::sync::Mutex::new(RateState {
186                tokens: (burst.max(1)) as f64,
187                last: now,
188            }),
189        }
190    }
191
192    async fn allow(&self) -> bool {
193        let mut s = self.state.lock().await;
194        let now = Instant::now();
195        let elapsed = now.saturating_duration_since(s.last);
196        let refill = elapsed.as_secs_f64() * self.max_rps;
197        s.tokens = (s.tokens + refill).min(self.burst);
198        s.last = now;
199        if s.tokens >= 1.0 {
200            s.tokens -= 1.0;
201            true
202        } else {
203            false
204        }
205    }
206}
207
208async fn auth_middleware(
209    State(state): State<AppState>,
210    req: Request<axum::body::Body>,
211    next: Next,
212) -> Response {
213    if state.token.is_none() {
214        return next.run(req).await;
215    }
216
217    if req.uri().path() == "/health" {
218        return next.run(req).await;
219    }
220
221    let expected = state.token.as_deref().unwrap_or("");
222    let Some(h) = req.headers().get(header::AUTHORIZATION) else {
223        return StatusCode::UNAUTHORIZED.into_response();
224    };
225    let Ok(s) = h.to_str() else {
226        return StatusCode::UNAUTHORIZED.into_response();
227    };
228    let Some(token) = s
229        .strip_prefix("Bearer ")
230        .or_else(|| s.strip_prefix("bearer "))
231    else {
232        return StatusCode::UNAUTHORIZED.into_response();
233    };
234    if !constant_time_eq(token.as_bytes(), expected.as_bytes()) {
235        return StatusCode::UNAUTHORIZED.into_response();
236    }
237
238    next.run(req).await
239}
240
241fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
242    use subtle::ConstantTimeEq;
243    if a.len() != b.len() {
244        return false;
245    }
246    bool::from(a.ct_eq(b))
247}
248
249async fn rate_limit_middleware(
250    State(state): State<AppState>,
251    req: Request<axum::body::Body>,
252    next: Next,
253) -> Response {
254    if !state.rate.allow().await {
255        return StatusCode::TOO_MANY_REQUESTS.into_response();
256    }
257    next.run(req).await
258}
259
260async fn concurrency_middleware(
261    State(state): State<AppState>,
262    req: Request<axum::body::Body>,
263    next: Next,
264) -> Response {
265    let Ok(permit) = state.concurrency.clone().try_acquire_owned() else {
266        return StatusCode::TOO_MANY_REQUESTS.into_response();
267    };
268    let resp = next.run(req).await;
269    drop(permit);
270    resp
271}
272
273async fn health() -> impl IntoResponse {
274    (StatusCode::OK, "ok\n")
275}
276
277async fn v1_shutdown() -> impl IntoResponse {
278    tokio::spawn(async {
279        tokio::time::sleep(Duration::from_millis(100)).await;
280        std::process::exit(0);
281    });
282    (StatusCode::OK, "shutting down\n")
283}
284
285#[derive(Debug, Deserialize)]
286#[serde(rename_all = "camelCase")]
287struct ToolCallBody {
288    name: String,
289    #[serde(default)]
290    arguments: Option<Value>,
291    #[serde(default)]
292    _workspace_id: Option<String>,
293    #[serde(default)]
294    _channel_id: Option<String>,
295}
296
297#[derive(Debug, Deserialize)]
298#[serde(rename_all = "camelCase")]
299struct EventsQuery {
300    #[serde(default)]
301    workspace_id: Option<String>,
302    #[serde(default)]
303    channel_id: Option<String>,
304    #[serde(default)]
305    since: Option<i64>,
306    #[serde(default)]
307    limit: Option<usize>,
308    /// Comma-separated event kind filter (e.g. `tool_call,session_start`).
309    /// When set, only matching events are delivered via SSE.
310    #[serde(default)]
311    kind: Option<String>,
312}
313
314async fn v1_manifest(State(state): State<AppState>) -> impl IntoResponse {
315    let _ = state;
316    let v = crate::core::mcp_manifest::manifest_value();
317    (StatusCode::OK, Json(v))
318}
319
320#[derive(Debug, Deserialize)]
321#[serde(rename_all = "camelCase")]
322struct ToolsQuery {
323    #[serde(default)]
324    offset: Option<usize>,
325    #[serde(default)]
326    limit: Option<usize>,
327}
328
329async fn v1_tools(State(state): State<AppState>, Query(q): Query<ToolsQuery>) -> impl IntoResponse {
330    let _ = state;
331    let v = crate::core::mcp_manifest::manifest_value();
332    let tools = v
333        .get("tools")
334        .and_then(|t| t.get("granular"))
335        .cloned()
336        .unwrap_or(Value::Array(vec![]));
337
338    let all = tools.as_array().cloned().unwrap_or_default();
339    let total = all.len();
340    let offset = q.offset.unwrap_or(0).min(total);
341    let limit = q.limit.unwrap_or(200).min(500);
342    let page = all.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
343
344    (
345        StatusCode::OK,
346        Json(serde_json::json!({
347            "tools": page,
348            "total": total,
349            "offset": offset,
350            "limit": limit,
351        })),
352    )
353}
354
355async fn v1_tool_call(
356    State(state): State<AppState>,
357    Json(body): Json<ToolCallBody>,
358) -> impl IntoResponse {
359    let engine = ContextEngine::from_server(state.server.clone());
360    match tokio::time::timeout(
361        state.timeout,
362        engine.call_tool_value(&body.name, body.arguments),
363    )
364    .await
365    {
366        Ok(Ok(v)) => (StatusCode::OK, Json(serde_json::json!({ "result": v }))).into_response(),
367        Ok(Err(e)) => {
368            tracing::warn!("tool call error: {e}");
369            (
370                StatusCode::BAD_REQUEST,
371                Json(serde_json::json!({ "error": "tool_error", "code": "TOOL_ERROR" })),
372            )
373                .into_response()
374        }
375        Err(_) => (
376            StatusCode::GATEWAY_TIMEOUT,
377            Json(serde_json::json!({ "error": "request_timeout" })),
378        )
379            .into_response(),
380    }
381}
382
383async fn v1_events(
384    State(state): State<AppState>,
385    Query(q): Query<EventsQuery>,
386) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
387    use crate::core::context_os::{redact_event_payload, ContextEventV1, RedactionLevel};
388
389    let ws = sanitize_id(&q.workspace_id.unwrap_or_else(|| "default".to_string()));
390    let ch = sanitize_id(&q.channel_id.unwrap_or_else(|| "default".to_string()));
391    let _ = &state.project_root;
392    let since = q.since.unwrap_or(0);
393    let limit = q.limit.unwrap_or(200).min(1000);
394    let redaction = RedactionLevel::RefsOnly;
395
396    let kind_filter: Option<Vec<String>> = q
397        .kind
398        .as_deref()
399        .map(|k| k.split(',').map(|s| s.trim().to_string()).collect());
400
401    let rt = crate::core::context_os::runtime();
402    let replay = rt.bus.read(&ws, &ch, since, limit);
403
404    let replay = if let Some(ref kinds) = kind_filter {
405        replay
406            .into_iter()
407            .filter(|ev| kinds.contains(&ev.kind))
408            .collect()
409    } else {
410        replay
411    };
412
413    let rx = if let Some(ref kinds) = kind_filter {
414        let kind_refs: Vec<&str> = kinds.iter().map(String::as_str).collect();
415        let filter = crate::core::context_os::TopicFilter::kinds(&kind_refs);
416        if let Some(sub) = rt.bus.subscribe_filtered(&ws, &ch, filter) {
417            crate::core::context_os::SubscriptionKind::Filtered(sub)
418        } else {
419            tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
420            let (_, rx) = broadcast::channel::<ContextEventV1>(1);
421            crate::core::context_os::SubscriptionKind::Unfiltered(rx)
422        }
423    } else if let Some(sub) = rt.bus.subscribe(&ws, &ch) {
424        crate::core::context_os::SubscriptionKind::Unfiltered(sub)
425    } else {
426        tracing::warn!("SSE subscriber limit reached for {ws}/{ch}");
427        let (_, rx) = broadcast::channel::<ContextEventV1>(1);
428        crate::core::context_os::SubscriptionKind::Unfiltered(rx)
429    };
430
431    rt.metrics.record_sse_connect();
432    rt.metrics.record_events_replayed(replay.len() as u64);
433    rt.metrics.record_workspace_active(&ws);
434
435    let bus = rt.bus.clone();
436    let metrics = rt.metrics.clone();
437    let pending: std::collections::VecDeque<ContextEventV1> = replay.into();
438
439    let stream = futures::stream::unfold(
440        (
441            pending,
442            rx,
443            ws.clone(),
444            ch.clone(),
445            since,
446            redaction,
447            bus,
448            metrics,
449        ),
450        |(mut pending, mut rx, ws, ch, mut last_id, redaction, bus, metrics)| async move {
451            if let Some(mut ev) = pending.pop_front() {
452                last_id = ev.id;
453                redact_event_payload(&mut ev, redaction);
454                let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
455                let evt = SseEvent::default()
456                    .id(ev.id.to_string())
457                    .event(ev.kind)
458                    .data(data);
459                return Some((
460                    Ok(evt),
461                    (pending, rx, ws, ch, last_id, redaction, bus, metrics),
462                ));
463            }
464
465            loop {
466                match rx.recv().await {
467                    Ok(mut ev) if ev.id > last_id => {
468                        last_id = ev.id;
469                        redact_event_payload(&mut ev, redaction);
470                        let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
471                        let evt = SseEvent::default()
472                            .id(ev.id.to_string())
473                            .event(ev.kind)
474                            .data(data);
475                        return Some((
476                            Ok(evt),
477                            (pending, rx, ws, ch, last_id, redaction, bus, metrics),
478                        ));
479                    }
480                    Ok(_) => {}
481                    Err(broadcast::error::RecvError::Closed) => return None,
482                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
483                        let missed = bus.read(&ws, &ch, last_id, skipped as usize);
484                        metrics.record_events_replayed(missed.len() as u64);
485                        for ev in missed {
486                            last_id = last_id.max(ev.id);
487                            pending.push_back(ev);
488                        }
489                    }
490                }
491            }
492        },
493    );
494
495    let metrics_ref = rt.metrics.clone();
496    let guarded = SseDisconnectGuard {
497        inner: Box::pin(stream),
498        metrics: metrics_ref,
499    };
500
501    Sse::new(guarded).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
502}
503
504#[derive(Debug, Deserialize)]
505struct AuditEventsQuery {
506    #[serde(default = "default_audit_limit")]
507    limit: usize,
508}
509
510fn default_audit_limit() -> usize {
511    100
512}
513
514async fn v1_audit_events(Query(q): Query<AuditEventsQuery>) -> impl IntoResponse {
515    let capped = q.limit.min(1000);
516    let boundary_events = crate::core::memory_boundary::load_audit_events(capped);
517    let trail_events = crate::core::audit_trail::load_recent(capped);
518
519    Json(serde_json::json!({
520        "cross_project_events": boundary_events,
521        "audit_trail": trail_events,
522    }))
523}
524
525async fn v1_metrics(State(_state): State<AppState>) -> impl IntoResponse {
526    let rt = crate::core::context_os::runtime();
527    let snap = rt.metrics.snapshot();
528    (
529        StatusCode::OK,
530        Json(serde_json::to_value(snap).unwrap_or_default()),
531    )
532}
533
534const MAX_HANDOFF_PAYLOAD_BYTES: usize = 1_000_000;
535const MAX_HANDOFF_FILES: usize = 50;
536
537async fn v1_a2a_handoff(
538    State(state): State<AppState>,
539    Json(body): Json<Value>,
540) -> impl IntoResponse {
541    let envelope = match crate::core::a2a_transport::parse_envelope(
542        &serde_json::to_string(&body).unwrap_or_default(),
543    ) {
544        Ok(env) => env,
545        Err(e) => {
546            tracing::warn!("a2a handoff parse error: {e}");
547            return (
548                StatusCode::BAD_REQUEST,
549                Json(serde_json::json!({"error": "invalid_envelope"})),
550            );
551        }
552    };
553
554    if envelope.payload_json.len() > MAX_HANDOFF_PAYLOAD_BYTES {
555        tracing::warn!(
556            "a2a handoff payload too large: {} bytes (limit {MAX_HANDOFF_PAYLOAD_BYTES})",
557            envelope.payload_json.len()
558        );
559        return (
560            StatusCode::PAYLOAD_TOO_LARGE,
561            Json(serde_json::json!({"error": "payload_too_large"})),
562        );
563    }
564
565    let rt = crate::core::context_os::runtime();
566    rt.bus.append(
567        &state.project_root,
568        "a2a",
569        &crate::core::context_os::ContextEventKindV1::SessionMutated,
570        Some(&envelope.sender.agent_id),
571        serde_json::json!({
572            "type": "handoff_received",
573            "content_type": format!("{:?}", envelope.content_type),
574            "sender": envelope.sender.agent_id,
575            "payload_size": envelope.payload_json.len(),
576        }),
577    );
578
579    match envelope.content_type {
580        crate::core::a2a_transport::TransportContentType::ContextPackage => {
581            let dir = std::path::Path::new(&state.project_root)
582                .join(".lean-ctx")
583                .join("handoffs")
584                .join("packages");
585            let _ = std::fs::create_dir_all(&dir);
586            evict_oldest_files(&dir, MAX_HANDOFF_FILES);
587            let out = dir.join(format!(
588                "ctx-{}.lctxpkg",
589                chrono::Utc::now().format("%Y%m%d_%H%M%S")
590            ));
591            if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
592                tracing::error!("a2a handoff write failed: {e}");
593                return (
594                    StatusCode::INTERNAL_SERVER_ERROR,
595                    Json(serde_json::json!({"error": "write_failed"})),
596                );
597            }
598            (
599                StatusCode::OK,
600                Json(serde_json::json!({
601                    "status": "received",
602                    "content_type": "context_package",
603                })),
604            )
605        }
606        crate::core::a2a_transport::TransportContentType::HandoffBundle => {
607            let dir = std::path::Path::new(&state.project_root)
608                .join(".lean-ctx")
609                .join("handoffs");
610            let _ = std::fs::create_dir_all(&dir);
611            evict_oldest_files(&dir, MAX_HANDOFF_FILES);
612            let out = dir.join(format!(
613                "received-{}.json",
614                chrono::Utc::now().format("%Y%m%d_%H%M%S")
615            ));
616            if let Err(e) = std::fs::write(&out, &envelope.payload_json) {
617                tracing::error!("a2a handoff write failed: {e}");
618                return (
619                    StatusCode::INTERNAL_SERVER_ERROR,
620                    Json(serde_json::json!({"error": "write_failed"})),
621                );
622            }
623            (
624                StatusCode::OK,
625                Json(serde_json::json!({
626                    "status": "received",
627                    "content_type": "handoff_bundle",
628                })),
629            )
630        }
631        _ => (
632            StatusCode::OK,
633            Json(serde_json::json!({
634                "status": "received",
635                "content_type": format!("{:?}", envelope.content_type),
636            })),
637        ),
638    }
639}
640
641fn evict_oldest_files(dir: &std::path::Path, max_files: usize) {
642    let Ok(entries) = std::fs::read_dir(dir) else {
643        return;
644    };
645    let mut files: Vec<(std::time::SystemTime, std::path::PathBuf)> = entries
646        .filter_map(|e| {
647            let e = e.ok()?;
648            let meta = e.metadata().ok()?;
649            if meta.is_file() {
650                Some((meta.modified().unwrap_or(std::time::UNIX_EPOCH), e.path()))
651            } else {
652                None
653            }
654        })
655        .collect();
656
657    if files.len() < max_files {
658        return;
659    }
660    files.sort_by_key(|(mtime, _)| *mtime);
661    let to_remove = files.len().saturating_sub(max_files.saturating_sub(1));
662    for (_, path) in files.into_iter().take(to_remove) {
663        let _ = std::fs::remove_file(path);
664    }
665}
666
667async fn a2a_jsonrpc(Json(body): Json<Value>) -> impl IntoResponse {
668    let req: crate::core::a2a::a2a_compat::JsonRpcRequest = match serde_json::from_value(body) {
669        Ok(r) => r,
670        Err(e) => {
671            tracing::debug!("a2a JSON-RPC parse error: {e}");
672            return (
673                StatusCode::BAD_REQUEST,
674                Json(serde_json::json!({
675                    "jsonrpc": "2.0",
676                    "id": null,
677                    "error": {"code": -32700, "message": "invalid request"}
678                })),
679            );
680        }
681    };
682    let resp = crate::core::a2a::a2a_compat::handle_a2a_jsonrpc(&req);
683    let json = serde_json::to_value(resp).unwrap_or_default();
684    (StatusCode::OK, Json(json))
685}
686
687async fn v1_a2a_agent_card(State(state): State<AppState>) -> impl IntoResponse {
688    let card = crate::core::a2a::agent_card::build_agent_card(&state.project_root);
689    (
690        StatusCode::OK,
691        [(header::CONTENT_TYPE, "application/json")],
692        Json(card),
693    )
694}
695
696async fn mcp_server_card() -> impl IntoResponse {
697    let card = serde_json::json!({
698        "name": "lean-ctx",
699        "version": env!("CARGO_PKG_VERSION"),
700        "description": "Context Infrastructure Layer — compression, caching, governance for AI agents",
701        "capabilities": {
702            "tools": true,
703            "resources": false,
704            "prompts": false,
705            "sampling": false
706        },
707        "tool_categories": [
708            {"name": "file_operations", "tools": ["ctx_read", "ctx_search", "ctx_tree", "ctx_edit"], "avg_token_cost": 150},
709            {"name": "session_management", "tools": ["ctx_session", "ctx_compress", "ctx_dedup", "ctx_preload"], "avg_token_cost": 80},
710            {"name": "intelligence", "tools": ["ctx_knowledge", "ctx_semantic_search", "ctx_graph", "ctx_overview"], "avg_token_cost": 200},
711            {"name": "agent_ops", "tools": ["ctx_agent", "ctx_handoff", "ctx_task", "ctx_share"], "avg_token_cost": 120}
712        ],
713        "features": {
714            "compression": "deterministic AST-based, 40-70% token reduction",
715            "caching": "session-scoped with zstd, re-reads ~13 tokens",
716            "audit_trail": "SHA-256 chained JSONL",
717            "rbac": "5 built-in roles with capability-based access",
718            "sandboxing": "Level 0 (subprocess) + Level 1 (OS-level)",
719            "secret_detection": "8 regex patterns + custom"
720        },
721        "security": {
722            "path_jail": true,
723            "rate_limiting": true,
724            "budget_tracking": true,
725            "signed_handoffs": true,
726            "timing_safe_auth": true
727        }
728    });
729    Json(card)
730}
731
732async fn v1_agents_register(
733    State(state): State<AppState>,
734    Json(body): Json<Value>,
735) -> impl IntoResponse {
736    let agent_type = body
737        .get("agent_type")
738        .and_then(|v| v.as_str())
739        .unwrap_or("unknown");
740    let role = body.get("role").and_then(|v| v.as_str());
741    let project_root = body
742        .get("project_root")
743        .and_then(|v| v.as_str())
744        .unwrap_or(&state.project_root);
745
746    let mut registry = crate::core::agents::AgentRegistry::load_or_create();
747    let agent_id = registry.register(agent_type, role, project_root);
748    let _ = registry.save();
749
750    Json(serde_json::json!({
751        "agent_id": agent_id,
752        "status": "registered"
753    }))
754}
755
756async fn v1_agents_heartbeat(Json(body): Json<Value>) -> impl IntoResponse {
757    let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
758    let mut registry = crate::core::agents::AgentRegistry::load_or_create();
759    registry.update_heartbeat(agent_id);
760    let _ = registry.save();
761    Json(serde_json::json!({"status": "ok"}))
762}
763
764async fn v1_agents_list() -> impl IntoResponse {
765    let registry = crate::core::agents::AgentRegistry::load_or_create();
766    let active = registry.list_active(None);
767    Json(serde_json::json!({
768        "agents": active.iter().map(|a| serde_json::json!({
769            "agent_id": a.agent_id,
770            "agent_type": a.agent_type,
771            "role": a.role,
772            "status": a.status.to_string(),
773            "last_active": a.last_active.to_rfc3339(),
774        })).collect::<Vec<_>>()
775    }))
776}
777
778async fn v1_agents_deregister(Json(body): Json<Value>) -> impl IntoResponse {
779    let agent_id = body.get("agent_id").and_then(|v| v.as_str()).unwrap_or("");
780    let mut registry = crate::core::agents::AgentRegistry::load_or_create();
781    registry.set_status(
782        agent_id,
783        crate::core::agents::AgentStatus::Finished,
784        Some("deregistered via API"),
785    );
786    let _ = registry.save();
787    Json(serde_json::json!({"status": "deregistered"}))
788}
789
790async fn v1_agents_events_sse(
791) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
792    let stream = futures::stream::unfold(0usize, |last_count| async move {
793        loop {
794            tokio::time::sleep(Duration::from_secs(5)).await;
795            let registry = crate::core::agents::AgentRegistry::load_or_create();
796            let active = registry.list_active(None);
797            let count = active.len();
798            if count != last_count {
799                let data = serde_json::json!({
800                    "type": "agents_changed",
801                    "active_count": count,
802                    "agents": active.iter().map(|a| &a.agent_id).collect::<Vec<_>>(),
803                });
804                return Some((
805                    Ok::<_, std::convert::Infallible>(SseEvent::default().data(data.to_string())),
806                    count,
807                ));
808            }
809        }
810    });
811
812    Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
813}
814
815fn build_app_router(cfg: &HttpServerConfig) -> Router {
816    let project_root = cfg.project_root.to_string_lossy().to_string();
817    let service_project_root = project_root.clone();
818    let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
819        Ok(LeanCtxServer::new_shared_with_context(
820            &service_project_root,
821            "default",
822            "default",
823        ))
824    };
825    let mcp_http = StreamableHttpService::new(
826        service_factory,
827        Arc::new(
828            rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
829        ),
830        cfg.mcp_http_config(),
831    );
832
833    let rest_server = LeanCtxServer::new_shared_with_context(&project_root, "default", "default");
834
835    let state = AppState {
836        token: cfg.auth_token.clone().filter(|t| !t.is_empty()),
837        concurrency: Arc::new(tokio::sync::Semaphore::new(cfg.max_concurrency.max(1))),
838        rate: Arc::new(RateLimiter::new(cfg.max_rps, cfg.rate_burst)),
839        project_root,
840        timeout: Duration::from_millis(cfg.request_timeout_ms.max(1)),
841        server: rest_server,
842    };
843
844    Router::new()
845        .route("/health", get(health))
846        .route("/v1/shutdown", axum::routing::post(v1_shutdown))
847        .route("/v1/manifest", get(v1_manifest))
848        .route("/v1/tools", get(v1_tools))
849        .route("/v1/tools/call", axum::routing::post(v1_tool_call))
850        .route("/v1/events", get(v1_events))
851        .route(
852            "/v1/context/summary",
853            get(context_views::v1_context_summary),
854        )
855        .route("/v1/events/search", get(context_views::v1_events_search))
856        .route("/v1/events/lineage", get(context_views::v1_event_lineage))
857        .route("/v1/metrics", get(v1_metrics))
858        .route("/v1/audit/events", get(v1_audit_events))
859        .route("/v1/a2a/handoff", axum::routing::post(v1_a2a_handoff))
860        .route("/v1/a2a/agent-card", get(v1_a2a_agent_card))
861        .route("/.well-known/agent.json", get(v1_a2a_agent_card))
862        .route("/.well-known/mcp-server.json", get(mcp_server_card))
863        .route("/a2a", axum::routing::post(a2a_jsonrpc))
864        .route(
865            "/v1/agents/register",
866            axum::routing::post(v1_agents_register),
867        )
868        .route(
869            "/v1/agents/heartbeat",
870            axum::routing::post(v1_agents_heartbeat),
871        )
872        .route("/v1/agents/list", get(v1_agents_list))
873        .route(
874            "/v1/agents/deregister",
875            axum::routing::post(v1_agents_deregister),
876        )
877        .route("/v1/agents/events", get(v1_agents_events_sse))
878        .fallback_service(mcp_http)
879        .layer(axum::extract::DefaultBodyLimit::max(cfg.max_body_bytes))
880        .layer(middleware::from_fn_with_state(
881            state.clone(),
882            rate_limit_middleware,
883        ))
884        .layer(middleware::from_fn_with_state(
885            state.clone(),
886            concurrency_middleware,
887        ))
888        .layer(middleware::from_fn_with_state(
889            state.clone(),
890            auth_middleware,
891        ))
892        .with_state(state)
893}
894
895pub async fn serve(cfg: HttpServerConfig) -> Result<()> {
896    crate::core::protocol::set_mcp_context(true);
897    cfg.validate()?;
898
899    let addr: SocketAddr = format!("{}:{}", cfg.host, cfg.port)
900        .parse()
901        .context("invalid host/port")?;
902
903    let app = build_app_router(&cfg);
904
905    let listener = tokio::net::TcpListener::bind(addr)
906        .await
907        .with_context(|| format!("bind {addr}"))?;
908
909    tracing::info!(
910        "lean-ctx Streamable HTTP server listening on http://{addr} (project_root={})",
911        cfg.project_root.display()
912    );
913
914    axum::serve(listener, app)
915        .with_graceful_shutdown(async move {
916            let _ = tokio::signal::ctrl_c().await;
917        })
918        .await
919        .context("http server")?;
920    Ok(())
921}
922
923#[cfg(windows)]
924impl axum::serve::Listener for crate::ipc::NamedPipeListener {
925    type Io = tokio::net::windows::named_pipe::NamedPipeServer;
926    type Addr = String;
927
928    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
929        loop {
930            match self.accept_pipe().await {
931                Ok(pipe) => return (pipe, self.name().to_string()),
932                Err(e) => {
933                    tracing::error!("named pipe accept error: {e}");
934                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
935                }
936            }
937        }
938    }
939
940    fn local_addr(&self) -> std::io::Result<Self::Addr> {
941        Ok(self.name().to_string())
942    }
943}
944
945/// Serve the daemon over a platform-independent IPC channel (UDS on Unix,
946/// Named Pipes on Windows).
947pub async fn serve_ipc(cfg: HttpServerConfig, addr: crate::ipc::DaemonAddr) -> Result<()> {
948    cfg.validate()?;
949
950    match addr {
951        #[cfg(unix)]
952        crate::ipc::DaemonAddr::Unix(ref path) => {
953            let app = build_app_router(&cfg);
954            let listener = crate::ipc::bind_listener(&addr)?;
955
956            tracing::info!(
957                "lean-ctx daemon listening on {} (project_root={})",
958                path.display(),
959                cfg.project_root.display()
960            );
961
962            axum::serve(listener, app.into_make_service())
963                .with_graceful_shutdown(async move {
964                    let _ = tokio::signal::ctrl_c().await;
965                })
966                .await
967                .context("ipc server")?;
968            Ok(())
969        }
970        #[cfg(windows)]
971        crate::ipc::DaemonAddr::NamedPipe(ref name) => {
972            let app = build_app_router(&cfg);
973            let listener = crate::ipc::bind_listener(&addr)?;
974
975            tracing::info!(
976                "lean-ctx daemon listening on {} (project_root={})",
977                name,
978                cfg.project_root.display()
979            );
980
981            axum::serve(listener, app.into_make_service())
982                .with_graceful_shutdown(async move {
983                    let _ = tokio::signal::ctrl_c().await;
984                })
985                .await
986                .context("ipc server")?;
987            Ok(())
988        }
989    }
990}
991
992#[cfg(test)]
993mod tests {
994    use super::*;
995    use axum::body::Body;
996    use axum::http::Request;
997    use futures::StreamExt;
998    use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
999    use serde_json::json;
1000    use tower::ServiceExt;
1001
1002    async fn read_first_sse_message(body: Body) -> String {
1003        let mut stream = body.into_data_stream();
1004        let mut buf: Vec<u8> = Vec::new();
1005        for _ in 0..32 {
1006            let next = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
1007            let Ok(Some(Ok(bytes))) = next else {
1008                break;
1009            };
1010            buf.extend_from_slice(&bytes);
1011            if buf.windows(2).any(|w| w == b"\n\n") {
1012                break;
1013            }
1014        }
1015        String::from_utf8_lossy(&buf).to_string()
1016    }
1017
1018    #[tokio::test]
1019    async fn auth_token_blocks_requests_without_bearer_header() {
1020        let dir = tempfile::tempdir().expect("tempdir");
1021        let root_str = dir.path().to_string_lossy().to_string();
1022        let service_project_root = root_str.clone();
1023        let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
1024            Ok(LeanCtxServer::new_shared_with_context(
1025                &service_project_root,
1026                "default",
1027                "default",
1028            ))
1029        };
1030        let cfg = StreamableHttpServerConfig::default()
1031            .with_stateful_mode(false)
1032            .with_json_response(true);
1033
1034        let mcp_http = StreamableHttpService::new(
1035            service_factory,
1036            Arc::new(
1037                rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
1038            ),
1039            cfg,
1040        );
1041
1042        let state = AppState {
1043            token: Some("secret".to_string()),
1044            concurrency: Arc::new(tokio::sync::Semaphore::new(4)),
1045            rate: Arc::new(RateLimiter::new(50, 100)),
1046            project_root: root_str.clone(),
1047            timeout: Duration::from_secs(30),
1048            server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1049        };
1050
1051        let app = Router::new()
1052            .fallback_service(mcp_http)
1053            .layer(middleware::from_fn_with_state(
1054                state.clone(),
1055                auth_middleware,
1056            ))
1057            .with_state(state);
1058
1059        let body = json!({
1060            "jsonrpc": "2.0",
1061            "id": 1,
1062            "method": "tools/list",
1063            "params": {}
1064        })
1065        .to_string();
1066
1067        let req = Request::builder()
1068            .method("POST")
1069            .uri("/")
1070            .header("Host", "localhost")
1071            .header("Accept", "application/json, text/event-stream")
1072            .header("Content-Type", "application/json")
1073            .body(Body::from(body))
1074            .expect("request");
1075
1076        let resp = app.clone().oneshot(req).await.expect("resp");
1077        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1078    }
1079
1080    #[tokio::test]
1081    async fn mcp_service_factory_isolates_per_client_state() {
1082        let dir = tempfile::tempdir().expect("tempdir");
1083        let root_str = dir.path().to_string_lossy().to_string();
1084
1085        // Mirrors the serve() setup: service_factory must create a fresh server per MCP session.
1086        let service_project_root = root_str.clone();
1087        let service_factory = move || -> Result<LeanCtxServer, std::convert::Infallible> {
1088            Ok(LeanCtxServer::new_shared_with_context(
1089                &service_project_root,
1090                "default",
1091                "default",
1092            ))
1093        };
1094
1095        let s1 = service_factory().expect("server 1");
1096        let s2 = service_factory().expect("server 2");
1097
1098        // If the two servers accidentally share the same Arc-backed fields, these writes would
1099        // clobber each other. This test stays independent of rmcp's InitializeRequestParams API.
1100        *s1.client_name.write().await = "client-a".to_string();
1101        *s2.client_name.write().await = "client-b".to_string();
1102
1103        let a = s1.client_name.read().await.clone();
1104        let b = s2.client_name.read().await.clone();
1105        assert_eq!(a, "client-a");
1106        assert_eq!(b, "client-b");
1107    }
1108
1109    #[tokio::test]
1110    async fn rate_limit_returns_429_when_exhausted() {
1111        let state = AppState {
1112            token: None,
1113            concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1114            rate: Arc::new(RateLimiter::new(1, 1)),
1115            project_root: ".".to_string(),
1116            timeout: Duration::from_secs(30),
1117            server: LeanCtxServer::new_shared_with_context(".", "default", "default"),
1118        };
1119
1120        let app = Router::new()
1121            .route("/limited", get(|| async { (StatusCode::OK, "ok\n") }))
1122            .layer(middleware::from_fn_with_state(
1123                state.clone(),
1124                rate_limit_middleware,
1125            ))
1126            .with_state(state);
1127
1128        let req1 = Request::builder()
1129            .method("GET")
1130            .uri("/limited")
1131            .header("Host", "localhost")
1132            .body(Body::empty())
1133            .expect("req1");
1134        let resp1 = app.clone().oneshot(req1).await.expect("resp1");
1135        assert_eq!(resp1.status(), StatusCode::OK);
1136
1137        let req2 = Request::builder()
1138            .method("GET")
1139            .uri("/limited")
1140            .header("Host", "localhost")
1141            .body(Body::empty())
1142            .expect("req2");
1143        let resp2 = app.clone().oneshot(req2).await.expect("resp2");
1144        assert_eq!(resp2.status(), StatusCode::TOO_MANY_REQUESTS);
1145    }
1146
1147    #[tokio::test]
1148    async fn audit_events_endpoint_returns_json() {
1149        let dir = tempfile::tempdir().expect("tempdir");
1150        let root_str = dir.path().to_string_lossy().to_string();
1151
1152        let state = AppState {
1153            token: None,
1154            concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1155            rate: Arc::new(RateLimiter::new(50, 100)),
1156            project_root: root_str.clone(),
1157            timeout: Duration::from_secs(30),
1158            server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1159        };
1160
1161        let app = Router::new()
1162            .route("/v1/audit/events", get(v1_audit_events))
1163            .with_state(state);
1164
1165        let req = Request::builder()
1166            .method("GET")
1167            .uri("/v1/audit/events?limit=10")
1168            .header("Host", "localhost")
1169            .body(Body::empty())
1170            .unwrap();
1171
1172        let resp = app.oneshot(req).await.unwrap();
1173        assert_eq!(resp.status(), StatusCode::OK);
1174
1175        let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1176            .await
1177            .unwrap();
1178        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
1179        assert!(json.get("cross_project_events").unwrap().is_array());
1180        assert!(json.get("audit_trail").unwrap().is_array());
1181    }
1182
1183    #[tokio::test]
1184    async fn events_endpoint_replays_tool_call_event() {
1185        use crate::core::context_os::{self, ContextEventKindV1};
1186
1187        let dir = tempfile::tempdir().expect("tempdir");
1188        std::fs::create_dir_all(dir.path().join(".git")).expect("git marker");
1189        std::fs::write(dir.path().join("a.txt"), "ok").expect("file");
1190        let root_str = dir.path().to_string_lossy().to_string();
1191
1192        let state = AppState {
1193            token: None,
1194            concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
1195            rate: Arc::new(RateLimiter::new(50, 100)),
1196            project_root: root_str.clone(),
1197            timeout: Duration::from_secs(30),
1198            server: LeanCtxServer::new_shared_with_context(&root_str, "default", "default"),
1199        };
1200
1201        let app = Router::new()
1202            .route("/v1/events", get(v1_events))
1203            .with_state(state);
1204
1205        // Directly append an event to the bus — no fire-and-forget timing dependency.
1206        let rt = context_os::runtime();
1207        rt.bus.append(
1208            "ws1",
1209            "ch1",
1210            &ContextEventKindV1::ToolCallRecorded,
1211            Some("test-agent"),
1212            json!({"tool": "ctx_session", "action": "status"}),
1213        );
1214
1215        let req = Request::builder()
1216            .method("GET")
1217            .uri("/v1/events?workspaceId=ws1&channelId=ch1&since=0&limit=1")
1218            .header("Host", "localhost")
1219            .header("Accept", "text/event-stream")
1220            .body(Body::empty())
1221            .expect("req");
1222        let resp = app.clone().oneshot(req).await.expect("events");
1223        assert_eq!(resp.status(), StatusCode::OK);
1224
1225        let msg = read_first_sse_message(resp.into_body()).await;
1226        assert!(msg.contains("event: tool_call_recorded"), "msg={msg:?}");
1227        assert!(msg.contains("\"ws1\""), "msg={msg:?}");
1228        assert!(msg.contains("\"ch1\""), "msg={msg:?}");
1229    }
1230}