Skip to main content

lean_ctx/http_server/
mod.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{anyhow, Context, Result};
6use axum::{
7    extract::Json,
8    extract::Query,
9    extract::State,
10    http::{header, Request, StatusCode},
11    middleware::{self, Next},
12    response::sse::{Event as SseEvent, KeepAlive, Sse},
13    response::{IntoResponse, Response},
14    routing::get,
15    Router,
16};
17use futures::Stream;
18use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
19use serde::Deserialize;
20use serde_json::Value;
21use tokio::sync::broadcast;
22use tokio::time::{Duration, Instant};
23
24use crate::engine::ContextEngine;
25use crate::tools::LeanCtxServer;
26
27#[cfg(feature = "team-server")]
28pub mod team;
29
30#[derive(Clone, Debug)]
31pub struct HttpServerConfig {
32    pub host: String,
33    pub port: u16,
34    pub project_root: PathBuf,
35    pub auth_token: Option<String>,
36    pub stateful_mode: bool,
37    pub json_response: bool,
38    pub disable_host_check: bool,
39    pub allowed_hosts: Vec<String>,
40    pub max_body_bytes: usize,
41    pub max_concurrency: usize,
42    pub max_rps: u32,
43    pub rate_burst: u32,
44    pub request_timeout_ms: u64,
45}
46
47impl Default for HttpServerConfig {
48    fn default() -> Self {
49        let project_root = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
50        Self {
51            host: "127.0.0.1".to_string(),
52            port: 8080,
53            project_root,
54            auth_token: None,
55            stateful_mode: false,
56            json_response: true,
57            disable_host_check: false,
58            allowed_hosts: Vec::new(),
59            max_body_bytes: 2 * 1024 * 1024,
60            max_concurrency: 32,
61            max_rps: 50,
62            rate_burst: 100,
63            request_timeout_ms: 30_000,
64        }
65    }
66}
67
68impl HttpServerConfig {
69    pub fn validate(&self) -> Result<()> {
70        let host = self.host.trim().to_lowercase();
71        let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
72        if !is_loopback && self.auth_token.as_deref().unwrap_or("").is_empty() {
73            return Err(anyhow!(
74                "Refusing to bind to host='{host}' without auth. Provide --auth-token (or bind to 127.0.0.1)."
75            ));
76        }
77        Ok(())
78    }
79
80    fn mcp_http_config(&self) -> StreamableHttpServerConfig {
81        let mut cfg = StreamableHttpServerConfig::default()
82            .with_stateful_mode(self.stateful_mode)
83            .with_json_response(self.json_response);
84
85        if self.disable_host_check {
86            cfg = cfg.disable_allowed_hosts();
87            return cfg;
88        }
89
90        if !self.allowed_hosts.is_empty() {
91            cfg = cfg.with_allowed_hosts(self.allowed_hosts.clone());
92            return cfg;
93        }
94
95        // Keep rmcp's secure loopback defaults; also allow the configured host (if it's loopback).
96        let host = self.host.trim();
97        if host == "127.0.0.1" || host == "localhost" || host == "::1" {
98            cfg.allowed_hosts.push(host.to_string());
99        }
100
101        cfg
102    }
103}
104
105#[derive(Clone)]
106struct AppState {
107    token: Option<String>,
108    concurrency: Arc<tokio::sync::Semaphore>,
109    rate: Arc<RateLimiter>,
110    project_root: String,
111    timeout: Duration,
112}
113
114#[derive(Debug)]
115struct RateLimiter {
116    max_rps: f64,
117    burst: f64,
118    state: tokio::sync::Mutex<RateState>,
119}
120
121#[derive(Debug, Clone, Copy)]
122struct RateState {
123    tokens: f64,
124    last: Instant,
125}
126
127impl RateLimiter {
128    fn new(max_rps: u32, burst: u32) -> Self {
129        let now = Instant::now();
130        Self {
131            max_rps: (max_rps.max(1)) as f64,
132            burst: (burst.max(1)) as f64,
133            state: tokio::sync::Mutex::new(RateState {
134                tokens: (burst.max(1)) as f64,
135                last: now,
136            }),
137        }
138    }
139
140    async fn allow(&self) -> bool {
141        let mut s = self.state.lock().await;
142        let now = Instant::now();
143        let elapsed = now.saturating_duration_since(s.last);
144        let refill = elapsed.as_secs_f64() * self.max_rps;
145        s.tokens = (s.tokens + refill).min(self.burst);
146        s.last = now;
147        if s.tokens >= 1.0 {
148            s.tokens -= 1.0;
149            true
150        } else {
151            false
152        }
153    }
154}
155
156async fn auth_middleware(
157    State(state): State<AppState>,
158    req: Request<axum::body::Body>,
159    next: Next,
160) -> Response {
161    if state.token.is_none() {
162        return next.run(req).await;
163    }
164
165    if req.uri().path() == "/health" {
166        return next.run(req).await;
167    }
168
169    let expected = state.token.as_deref().unwrap_or("");
170    let Some(h) = req.headers().get(header::AUTHORIZATION) else {
171        return StatusCode::UNAUTHORIZED.into_response();
172    };
173    let Ok(s) = h.to_str() else {
174        return StatusCode::UNAUTHORIZED.into_response();
175    };
176    let Some(token) = s
177        .strip_prefix("Bearer ")
178        .or_else(|| s.strip_prefix("bearer "))
179    else {
180        return StatusCode::UNAUTHORIZED.into_response();
181    };
182    if !constant_time_eq(token.as_bytes(), expected.as_bytes()) {
183        return StatusCode::UNAUTHORIZED.into_response();
184    }
185
186    next.run(req).await
187}
188
189fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
190    if a.len() != b.len() {
191        return false;
192    }
193    a.iter()
194        .zip(b.iter())
195        .fold(0u8, |acc, (x, y)| acc | (x ^ y))
196        == 0
197}
198
199async fn rate_limit_middleware(
200    State(state): State<AppState>,
201    req: Request<axum::body::Body>,
202    next: Next,
203) -> Response {
204    if req.uri().path() == "/health" {
205        return next.run(req).await;
206    }
207    if !state.rate.allow().await {
208        return StatusCode::TOO_MANY_REQUESTS.into_response();
209    }
210    next.run(req).await
211}
212
213async fn concurrency_middleware(
214    State(state): State<AppState>,
215    req: Request<axum::body::Body>,
216    next: Next,
217) -> Response {
218    if req.uri().path() == "/health" {
219        return next.run(req).await;
220    }
221    let Ok(permit) = state.concurrency.clone().try_acquire_owned() else {
222        return StatusCode::TOO_MANY_REQUESTS.into_response();
223    };
224    let resp = next.run(req).await;
225    drop(permit);
226    resp
227}
228
229async fn health() -> impl IntoResponse {
230    (StatusCode::OK, "ok\n")
231}
232
233#[derive(Debug, Deserialize)]
234#[serde(rename_all = "camelCase")]
235struct ToolCallBody {
236    name: String,
237    #[serde(default)]
238    arguments: Option<Value>,
239    #[serde(default)]
240    workspace_id: Option<String>,
241    #[serde(default)]
242    channel_id: Option<String>,
243}
244
245#[derive(Debug, Deserialize)]
246#[serde(rename_all = "camelCase")]
247struct EventsQuery {
248    #[serde(default)]
249    workspace_id: Option<String>,
250    #[serde(default)]
251    channel_id: Option<String>,
252    #[serde(default)]
253    since: Option<i64>,
254    #[serde(default)]
255    limit: Option<usize>,
256}
257
258async fn v1_manifest(State(state): State<AppState>) -> impl IntoResponse {
259    let _ = state;
260    let v = crate::core::mcp_manifest::manifest_value();
261    (StatusCode::OK, Json(v))
262}
263
264#[derive(Debug, Deserialize)]
265#[serde(rename_all = "camelCase")]
266struct ToolsQuery {
267    #[serde(default)]
268    offset: Option<usize>,
269    #[serde(default)]
270    limit: Option<usize>,
271}
272
273async fn v1_tools(State(state): State<AppState>, Query(q): Query<ToolsQuery>) -> impl IntoResponse {
274    let _ = state;
275    let v = crate::core::mcp_manifest::manifest_value();
276    let tools = v
277        .get("tools")
278        .and_then(|t| t.get("granular"))
279        .cloned()
280        .unwrap_or(Value::Array(vec![]));
281
282    let all = tools.as_array().cloned().unwrap_or_default();
283    let total = all.len();
284    let offset = q.offset.unwrap_or(0).min(total);
285    let limit = q.limit.unwrap_or(200).min(500);
286    let page = all.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
287
288    (
289        StatusCode::OK,
290        Json(serde_json::json!({
291            "tools": page,
292            "total": total,
293            "offset": offset,
294            "limit": limit,
295        })),
296    )
297}
298
299async fn v1_tool_call(
300    State(state): State<AppState>,
301    Json(body): Json<ToolCallBody>,
302) -> impl IntoResponse {
303    let ws = body.workspace_id.as_deref().unwrap_or("default");
304    let ch = body.channel_id.as_deref().unwrap_or("default");
305    let server = LeanCtxServer::new_shared_with_context(&state.project_root, ws, ch);
306    let engine = ContextEngine::from_server(server);
307    match tokio::time::timeout(
308        state.timeout,
309        engine.call_tool_value(&body.name, body.arguments),
310    )
311    .await
312    {
313        Ok(Ok(v)) => (StatusCode::OK, Json(serde_json::json!({ "result": v }))).into_response(),
314        Ok(Err(e)) => (
315            StatusCode::BAD_REQUEST,
316            Json(serde_json::json!({ "error": e.to_string() })),
317        )
318            .into_response(),
319        Err(_) => (
320            StatusCode::GATEWAY_TIMEOUT,
321            Json(serde_json::json!({ "error": "request_timeout" })),
322        )
323            .into_response(),
324    }
325}
326
327async fn v1_events(
328    State(_state): State<AppState>,
329    Query(q): Query<EventsQuery>,
330) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
331    use crate::core::context_os::{redact_event_payload, RedactionLevel};
332
333    let ws = q.workspace_id.unwrap_or_else(|| "default".to_string());
334    let ch = q.channel_id.unwrap_or_else(|| "default".to_string());
335    let since = q.since.unwrap_or(0);
336    let limit = q.limit.unwrap_or(200).min(1000);
337    let redaction = RedactionLevel::RefsOnly;
338
339    let rt = crate::core::context_os::runtime();
340    let replay = rt.bus.read(&ws, &ch, since, limit);
341    let rx = rt.bus.subscribe();
342    rt.metrics.record_sse_connect();
343    rt.metrics.record_events_replayed(replay.len() as u64);
344    rt.metrics.record_workspace_active(&ws);
345
346    let stream = futures::stream::unfold(
347        (
348            replay.into_iter(),
349            rx,
350            ws.clone(),
351            ch.clone(),
352            since,
353            redaction,
354        ),
355        |(mut replay_it, mut rx, ws, ch, mut last_id, redaction)| async move {
356            if let Some(mut ev) = replay_it.next() {
357                last_id = ev.id;
358                redact_event_payload(&mut ev, redaction);
359                let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
360                let evt = SseEvent::default()
361                    .id(ev.id.to_string())
362                    .event(ev.kind)
363                    .data(data);
364                return Some((Ok(evt), (replay_it, rx, ws, ch, last_id, redaction)));
365            }
366
367            loop {
368                match rx.recv().await {
369                    Ok(mut ev) => {
370                        if ev.workspace_id == ws && ev.channel_id == ch && ev.id > last_id {
371                            last_id = ev.id;
372                            redact_event_payload(&mut ev, redaction);
373                            let data =
374                                serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
375                            let evt = SseEvent::default()
376                                .id(ev.id.to_string())
377                                .event(ev.kind)
378                                .data(data);
379                            return Some((Ok(evt), (replay_it, rx, ws, ch, last_id, redaction)));
380                        }
381                    }
382                    Err(broadcast::error::RecvError::Closed) => return None,
383                    Err(broadcast::error::RecvError::Lagged(_)) => {}
384                }
385            }
386        },
387    );
388
389    Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
390}
391
392async fn v1_metrics(State(_state): State<AppState>) -> impl IntoResponse {
393    let rt = crate::core::context_os::runtime();
394    let snap = rt.metrics.snapshot();
395    (
396        StatusCode::OK,
397        Json(serde_json::to_value(snap).unwrap_or_default()),
398    )
399}
400
401pub async fn serve(cfg: HttpServerConfig) -> Result<()> {
402    cfg.validate()?;
403
404    let addr: SocketAddr = format!("{}:{}", cfg.host, cfg.port)
405        .parse()
406        .context("invalid host/port")?;
407
408    let project_root = cfg.project_root.to_string_lossy().to_string();
409    // IMPORTANT: Create a fresh server per MCP session in *shared* mode.
410    // This avoids per-client state clobbering while still sharing the Context OS store.
411    let service_project_root = project_root.clone();
412    let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
413        Ok(LeanCtxServer::new_shared_with_context(
414            &service_project_root,
415            "default",
416            "default",
417        ))
418    };
419    let mcp_http = StreamableHttpService::new(
420        service_factory,
421        Arc::new(
422            rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
423        ),
424        cfg.mcp_http_config(),
425    );
426
427    let state = AppState {
428        token: cfg.auth_token.clone().filter(|t| !t.is_empty()),
429        concurrency: Arc::new(tokio::sync::Semaphore::new(cfg.max_concurrency.max(1))),
430        rate: Arc::new(RateLimiter::new(cfg.max_rps, cfg.rate_burst)),
431        project_root: project_root.clone(),
432        timeout: Duration::from_millis(cfg.request_timeout_ms.max(1)),
433    };
434
435    let app = Router::new()
436        .route("/health", get(health))
437        .route("/v1/manifest", get(v1_manifest))
438        .route("/v1/tools", get(v1_tools))
439        .route("/v1/tools/call", axum::routing::post(v1_tool_call))
440        .route("/v1/events", get(v1_events))
441        .route("/v1/metrics", get(v1_metrics))
442        .fallback_service(mcp_http)
443        .layer(axum::extract::DefaultBodyLimit::max(cfg.max_body_bytes))
444        .layer(middleware::from_fn_with_state(
445            state.clone(),
446            rate_limit_middleware,
447        ))
448        .layer(middleware::from_fn_with_state(
449            state.clone(),
450            concurrency_middleware,
451        ))
452        .layer(middleware::from_fn_with_state(
453            state.clone(),
454            auth_middleware,
455        ))
456        .with_state(state);
457
458    let listener = tokio::net::TcpListener::bind(addr)
459        .await
460        .with_context(|| format!("bind {addr}"))?;
461
462    tracing::info!(
463        "lean-ctx Streamable HTTP server listening on http://{addr} (project_root={})",
464        cfg.project_root.display()
465    );
466
467    axum::serve(listener, app)
468        .with_graceful_shutdown(async move {
469            let _ = tokio::signal::ctrl_c().await;
470        })
471        .await
472        .context("http server")?;
473    Ok(())
474}
475
476#[cfg(unix)]
477pub async fn serve_uds(cfg: HttpServerConfig, socket_path: PathBuf) -> Result<()> {
478    cfg.validate()?;
479
480    if socket_path.exists() {
481        std::fs::remove_file(&socket_path)
482            .with_context(|| format!("remove stale socket {}", socket_path.display()))?;
483    }
484
485    let project_root = cfg.project_root.to_string_lossy().to_string();
486    let service_project_root = project_root.clone();
487    let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
488        Ok(LeanCtxServer::new_shared_with_context(
489            &service_project_root,
490            "default",
491            "default",
492        ))
493    };
494    let mcp_http = StreamableHttpService::new(
495        service_factory,
496        Arc::new(
497            rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
498        ),
499        cfg.mcp_http_config(),
500    );
501
502    let state = AppState {
503        token: cfg.auth_token.clone().filter(|t| !t.is_empty()),
504        concurrency: Arc::new(tokio::sync::Semaphore::new(cfg.max_concurrency.max(1))),
505        rate: Arc::new(RateLimiter::new(cfg.max_rps, cfg.rate_burst)),
506        project_root: project_root.clone(),
507        timeout: Duration::from_millis(cfg.request_timeout_ms.max(1)),
508    };
509
510    let app = Router::new()
511        .route("/health", get(health))
512        .route("/v1/manifest", get(v1_manifest))
513        .route("/v1/tools", get(v1_tools))
514        .route("/v1/tools/call", axum::routing::post(v1_tool_call))
515        .route("/v1/events", get(v1_events))
516        .route("/v1/metrics", get(v1_metrics))
517        .fallback_service(mcp_http)
518        .layer(axum::extract::DefaultBodyLimit::max(cfg.max_body_bytes))
519        .layer(middleware::from_fn_with_state(
520            state.clone(),
521            rate_limit_middleware,
522        ))
523        .layer(middleware::from_fn_with_state(
524            state.clone(),
525            concurrency_middleware,
526        ))
527        .layer(middleware::from_fn_with_state(
528            state.clone(),
529            auth_middleware,
530        ))
531        .with_state(state);
532
533    let listener = tokio::net::UnixListener::bind(&socket_path)
534        .with_context(|| format!("bind UDS {}", socket_path.display()))?;
535
536    tracing::info!(
537        "lean-ctx daemon listening on {} (project_root={})",
538        socket_path.display(),
539        cfg.project_root.display()
540    );
541
542    axum::serve(listener, app.into_make_service())
543        .with_graceful_shutdown(async move {
544            let _ = tokio::signal::ctrl_c().await;
545        })
546        .await
547        .context("uds server")?;
548    Ok(())
549}
550
551#[cfg(test)]
552mod tests {
553    use super::*;
554    use axum::body::Body;
555    use axum::http::Request;
556    use futures::StreamExt;
557    use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
558    use serde_json::json;
559    use tower::ServiceExt;
560
561    async fn read_first_sse_message(body: Body) -> String {
562        let mut stream = body.into_data_stream();
563        let mut buf: Vec<u8> = Vec::new();
564        for _ in 0..32 {
565            let next = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
566            let Ok(Some(Ok(bytes))) = next else {
567                break;
568            };
569            buf.extend_from_slice(&bytes);
570            if buf.windows(2).any(|w| w == b"\n\n") {
571                break;
572            }
573        }
574        String::from_utf8_lossy(&buf).to_string()
575    }
576
577    #[tokio::test]
578    async fn auth_token_blocks_requests_without_bearer_header() {
579        let dir = tempfile::tempdir().expect("tempdir");
580        let root_str = dir.path().to_string_lossy().to_string();
581        let service_project_root = root_str.clone();
582        let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
583            Ok(LeanCtxServer::new_shared_with_context(
584                &service_project_root,
585                "default",
586                "default",
587            ))
588        };
589        let cfg = StreamableHttpServerConfig::default()
590            .with_stateful_mode(false)
591            .with_json_response(true);
592
593        let mcp_http = StreamableHttpService::new(
594            service_factory,
595            Arc::new(
596                rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
597            ),
598            cfg,
599        );
600
601        let state = AppState {
602            token: Some("secret".to_string()),
603            concurrency: Arc::new(tokio::sync::Semaphore::new(4)),
604            rate: Arc::new(RateLimiter::new(50, 100)),
605            project_root: root_str.clone(),
606            timeout: Duration::from_millis(30_000),
607        };
608
609        let app = Router::new()
610            .fallback_service(mcp_http)
611            .layer(middleware::from_fn_with_state(
612                state.clone(),
613                auth_middleware,
614            ))
615            .with_state(state);
616
617        let body = json!({
618            "jsonrpc": "2.0",
619            "id": 1,
620            "method": "tools/list",
621            "params": {}
622        })
623        .to_string();
624
625        let req = Request::builder()
626            .method("POST")
627            .uri("/")
628            .header("Host", "localhost")
629            .header("Accept", "application/json, text/event-stream")
630            .header("Content-Type", "application/json")
631            .body(Body::from(body))
632            .expect("request");
633
634        let resp = app.clone().oneshot(req).await.expect("resp");
635        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
636    }
637
638    #[tokio::test]
639    async fn mcp_service_factory_isolates_per_client_state() {
640        let dir = tempfile::tempdir().expect("tempdir");
641        let root_str = dir.path().to_string_lossy().to_string();
642
643        // Mirrors the serve() setup: service_factory must create a fresh server per MCP session.
644        let service_project_root = root_str.clone();
645        let service_factory = move || -> Result<LeanCtxServer, std::convert::Infallible> {
646            Ok(LeanCtxServer::new_shared_with_context(
647                &service_project_root,
648                "default",
649                "default",
650            ))
651        };
652
653        let s1 = service_factory().expect("server 1");
654        let s2 = service_factory().expect("server 2");
655
656        // If the two servers accidentally share the same Arc-backed fields, these writes would
657        // clobber each other. This test stays independent of rmcp's InitializeRequestParams API.
658        *s1.client_name.write().await = "client-a".to_string();
659        *s2.client_name.write().await = "client-b".to_string();
660
661        let a = s1.client_name.read().await.clone();
662        let b = s2.client_name.read().await.clone();
663        assert_eq!(a, "client-a");
664        assert_eq!(b, "client-b");
665    }
666
667    #[tokio::test]
668    async fn rate_limit_returns_429_when_exhausted() {
669        let state = AppState {
670            token: None,
671            concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
672            rate: Arc::new(RateLimiter::new(1, 1)),
673            project_root: ".".to_string(),
674            timeout: Duration::from_millis(30_000),
675        };
676
677        let app = Router::new()
678            .route("/limited", get(|| async { (StatusCode::OK, "ok\n") }))
679            .layer(middleware::from_fn_with_state(
680                state.clone(),
681                rate_limit_middleware,
682            ))
683            .with_state(state);
684
685        let req1 = Request::builder()
686            .method("GET")
687            .uri("/limited")
688            .header("Host", "localhost")
689            .body(Body::empty())
690            .expect("req1");
691        let resp1 = app.clone().oneshot(req1).await.expect("resp1");
692        assert_eq!(resp1.status(), StatusCode::OK);
693
694        let req2 = Request::builder()
695            .method("GET")
696            .uri("/limited")
697            .header("Host", "localhost")
698            .body(Body::empty())
699            .expect("req2");
700        let resp2 = app.clone().oneshot(req2).await.expect("resp2");
701        assert_eq!(resp2.status(), StatusCode::TOO_MANY_REQUESTS);
702    }
703
704    #[tokio::test]
705    async fn events_endpoint_replays_tool_call_event() {
706        let dir = tempfile::tempdir().expect("tempdir");
707        std::fs::create_dir_all(dir.path().join(".git")).expect("git marker");
708        std::fs::write(dir.path().join("a.txt"), "ok").expect("file");
709        let root_str = dir.path().to_string_lossy().to_string();
710
711        let state = AppState {
712            token: None,
713            concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
714            rate: Arc::new(RateLimiter::new(50, 100)),
715            project_root: root_str.clone(),
716            timeout: Duration::from_millis(30_000),
717        };
718
719        let app = Router::new()
720            .route("/v1/tools/call", axum::routing::post(v1_tool_call))
721            .route("/v1/events", get(v1_events))
722            .with_state(state);
723
724        let body = json!({
725            "name": "ctx_session",
726            "arguments": { "action": "status" },
727            "workspaceId": "ws1",
728            "channelId": "ch1"
729        })
730        .to_string();
731        let req = Request::builder()
732            .method("POST")
733            .uri("/v1/tools/call")
734            .header("Host", "localhost")
735            .header("Content-Type", "application/json")
736            .body(Body::from(body))
737            .expect("req");
738        let resp = app.clone().oneshot(req).await.expect("call");
739        assert_eq!(resp.status(), StatusCode::OK);
740
741        // Subscribe with replay semantics; read the first SSE message.
742        let req = Request::builder()
743            .method("GET")
744            .uri("/v1/events?workspaceId=ws1&channelId=ch1&since=0&limit=1")
745            .header("Host", "localhost")
746            .header("Accept", "text/event-stream")
747            .body(Body::empty())
748            .expect("req");
749        let resp = app.clone().oneshot(req).await.expect("events");
750        assert_eq!(resp.status(), StatusCode::OK);
751
752        let msg = read_first_sse_message(resp.into_body()).await;
753        assert!(msg.contains("event: tool_call_recorded"), "msg={msg:?}");
754        assert!(msg.contains("\"workspaceId\":\"ws1\""), "msg={msg:?}");
755        assert!(msg.contains("\"channelId\":\"ch1\""), "msg={msg:?}");
756    }
757}