1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{anyhow, Context, Result};
6use axum::{
7 extract::Json,
8 extract::Query,
9 extract::State,
10 http::{header, Request, StatusCode},
11 middleware::{self, Next},
12 response::sse::{Event as SseEvent, KeepAlive, Sse},
13 response::{IntoResponse, Response},
14 routing::get,
15 Router,
16};
17use futures::Stream;
18use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
19use serde::Deserialize;
20use serde_json::Value;
21use tokio::sync::broadcast;
22use tokio::time::{Duration, Instant};
23
24use crate::engine::ContextEngine;
25use crate::tools::LeanCtxServer;
26
27#[cfg(feature = "team-server")]
28pub mod team;
29
30#[derive(Clone, Debug)]
31pub struct HttpServerConfig {
32 pub host: String,
33 pub port: u16,
34 pub project_root: PathBuf,
35 pub auth_token: Option<String>,
36 pub stateful_mode: bool,
37 pub json_response: bool,
38 pub disable_host_check: bool,
39 pub allowed_hosts: Vec<String>,
40 pub max_body_bytes: usize,
41 pub max_concurrency: usize,
42 pub max_rps: u32,
43 pub rate_burst: u32,
44 pub request_timeout_ms: u64,
45}
46
47impl Default for HttpServerConfig {
48 fn default() -> Self {
49 let project_root = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
50 Self {
51 host: "127.0.0.1".to_string(),
52 port: 8080,
53 project_root,
54 auth_token: None,
55 stateful_mode: false,
56 json_response: true,
57 disable_host_check: false,
58 allowed_hosts: Vec::new(),
59 max_body_bytes: 2 * 1024 * 1024,
60 max_concurrency: 32,
61 max_rps: 50,
62 rate_burst: 100,
63 request_timeout_ms: 30_000,
64 }
65 }
66}
67
68impl HttpServerConfig {
69 pub fn validate(&self) -> Result<()> {
70 let host = self.host.trim().to_lowercase();
71 let is_loopback = host == "127.0.0.1" || host == "localhost" || host == "::1";
72 if !is_loopback && self.auth_token.as_deref().unwrap_or("").is_empty() {
73 return Err(anyhow!(
74 "Refusing to bind to host='{host}' without auth. Provide --auth-token (or bind to 127.0.0.1)."
75 ));
76 }
77 Ok(())
78 }
79
80 fn mcp_http_config(&self) -> StreamableHttpServerConfig {
81 let mut cfg = StreamableHttpServerConfig::default()
82 .with_stateful_mode(self.stateful_mode)
83 .with_json_response(self.json_response);
84
85 if self.disable_host_check {
86 cfg = cfg.disable_allowed_hosts();
87 return cfg;
88 }
89
90 if !self.allowed_hosts.is_empty() {
91 cfg = cfg.with_allowed_hosts(self.allowed_hosts.clone());
92 return cfg;
93 }
94
95 let host = self.host.trim();
97 if host == "127.0.0.1" || host == "localhost" || host == "::1" {
98 cfg.allowed_hosts.push(host.to_string());
99 }
100
101 cfg
102 }
103}
104
105#[derive(Clone)]
106struct AppState {
107 token: Option<String>,
108 concurrency: Arc<tokio::sync::Semaphore>,
109 rate: Arc<RateLimiter>,
110 project_root: String,
111 timeout: Duration,
112}
113
114#[derive(Debug)]
115struct RateLimiter {
116 max_rps: f64,
117 burst: f64,
118 state: tokio::sync::Mutex<RateState>,
119}
120
121#[derive(Debug, Clone, Copy)]
122struct RateState {
123 tokens: f64,
124 last: Instant,
125}
126
127impl RateLimiter {
128 fn new(max_rps: u32, burst: u32) -> Self {
129 let now = Instant::now();
130 Self {
131 max_rps: (max_rps.max(1)) as f64,
132 burst: (burst.max(1)) as f64,
133 state: tokio::sync::Mutex::new(RateState {
134 tokens: (burst.max(1)) as f64,
135 last: now,
136 }),
137 }
138 }
139
140 async fn allow(&self) -> bool {
141 let mut s = self.state.lock().await;
142 let now = Instant::now();
143 let elapsed = now.saturating_duration_since(s.last);
144 let refill = elapsed.as_secs_f64() * self.max_rps;
145 s.tokens = (s.tokens + refill).min(self.burst);
146 s.last = now;
147 if s.tokens >= 1.0 {
148 s.tokens -= 1.0;
149 true
150 } else {
151 false
152 }
153 }
154}
155
156async fn auth_middleware(
157 State(state): State<AppState>,
158 req: Request<axum::body::Body>,
159 next: Next,
160) -> Response {
161 if state.token.is_none() {
162 return next.run(req).await;
163 }
164
165 if req.uri().path() == "/health" {
166 return next.run(req).await;
167 }
168
169 let expected = state.token.as_deref().unwrap_or("");
170 let Some(h) = req.headers().get(header::AUTHORIZATION) else {
171 return StatusCode::UNAUTHORIZED.into_response();
172 };
173 let Ok(s) = h.to_str() else {
174 return StatusCode::UNAUTHORIZED.into_response();
175 };
176 let Some(token) = s
177 .strip_prefix("Bearer ")
178 .or_else(|| s.strip_prefix("bearer "))
179 else {
180 return StatusCode::UNAUTHORIZED.into_response();
181 };
182 if !constant_time_eq(token.as_bytes(), expected.as_bytes()) {
183 return StatusCode::UNAUTHORIZED.into_response();
184 }
185
186 next.run(req).await
187}
188
189fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
190 if a.len() != b.len() {
191 return false;
192 }
193 a.iter()
194 .zip(b.iter())
195 .fold(0u8, |acc, (x, y)| acc | (x ^ y))
196 == 0
197}
198
199async fn rate_limit_middleware(
200 State(state): State<AppState>,
201 req: Request<axum::body::Body>,
202 next: Next,
203) -> Response {
204 if req.uri().path() == "/health" {
205 return next.run(req).await;
206 }
207 if !state.rate.allow().await {
208 return StatusCode::TOO_MANY_REQUESTS.into_response();
209 }
210 next.run(req).await
211}
212
213async fn concurrency_middleware(
214 State(state): State<AppState>,
215 req: Request<axum::body::Body>,
216 next: Next,
217) -> Response {
218 if req.uri().path() == "/health" {
219 return next.run(req).await;
220 }
221 let Ok(permit) = state.concurrency.clone().try_acquire_owned() else {
222 return StatusCode::TOO_MANY_REQUESTS.into_response();
223 };
224 let resp = next.run(req).await;
225 drop(permit);
226 resp
227}
228
229async fn health() -> impl IntoResponse {
230 (StatusCode::OK, "ok\n")
231}
232
233#[derive(Debug, Deserialize)]
234#[serde(rename_all = "camelCase")]
235struct ToolCallBody {
236 name: String,
237 #[serde(default)]
238 arguments: Option<Value>,
239 #[serde(default)]
240 workspace_id: Option<String>,
241 #[serde(default)]
242 channel_id: Option<String>,
243}
244
245#[derive(Debug, Deserialize)]
246#[serde(rename_all = "camelCase")]
247struct EventsQuery {
248 #[serde(default)]
249 workspace_id: Option<String>,
250 #[serde(default)]
251 channel_id: Option<String>,
252 #[serde(default)]
253 since: Option<i64>,
254 #[serde(default)]
255 limit: Option<usize>,
256}
257
258async fn v1_manifest(State(state): State<AppState>) -> impl IntoResponse {
259 let _ = state;
260 let v = crate::core::mcp_manifest::manifest_value();
261 (StatusCode::OK, Json(v))
262}
263
264#[derive(Debug, Deserialize)]
265#[serde(rename_all = "camelCase")]
266struct ToolsQuery {
267 #[serde(default)]
268 offset: Option<usize>,
269 #[serde(default)]
270 limit: Option<usize>,
271}
272
273async fn v1_tools(State(state): State<AppState>, Query(q): Query<ToolsQuery>) -> impl IntoResponse {
274 let _ = state;
275 let v = crate::core::mcp_manifest::manifest_value();
276 let tools = v
277 .get("tools")
278 .and_then(|t| t.get("granular"))
279 .cloned()
280 .unwrap_or(Value::Array(vec![]));
281
282 let all = tools.as_array().cloned().unwrap_or_default();
283 let total = all.len();
284 let offset = q.offset.unwrap_or(0).min(total);
285 let limit = q.limit.unwrap_or(200).min(500);
286 let page = all.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
287
288 (
289 StatusCode::OK,
290 Json(serde_json::json!({
291 "tools": page,
292 "total": total,
293 "offset": offset,
294 "limit": limit,
295 })),
296 )
297}
298
299async fn v1_tool_call(
300 State(state): State<AppState>,
301 Json(body): Json<ToolCallBody>,
302) -> impl IntoResponse {
303 let ws = body.workspace_id.as_deref().unwrap_or("default");
304 let ch = body.channel_id.as_deref().unwrap_or("default");
305 let server = LeanCtxServer::new_shared_with_context(&state.project_root, ws, ch);
306 let engine = ContextEngine::from_server(server);
307 match tokio::time::timeout(
308 state.timeout,
309 engine.call_tool_value(&body.name, body.arguments),
310 )
311 .await
312 {
313 Ok(Ok(v)) => (StatusCode::OK, Json(serde_json::json!({ "result": v }))).into_response(),
314 Ok(Err(e)) => (
315 StatusCode::BAD_REQUEST,
316 Json(serde_json::json!({ "error": e.to_string() })),
317 )
318 .into_response(),
319 Err(_) => (
320 StatusCode::GATEWAY_TIMEOUT,
321 Json(serde_json::json!({ "error": "request_timeout" })),
322 )
323 .into_response(),
324 }
325}
326
327async fn v1_events(
328 State(_state): State<AppState>,
329 Query(q): Query<EventsQuery>,
330) -> Sse<impl Stream<Item = Result<SseEvent, std::convert::Infallible>>> {
331 use crate::core::context_os::{redact_event_payload, RedactionLevel};
332
333 let ws = q.workspace_id.unwrap_or_else(|| "default".to_string());
334 let ch = q.channel_id.unwrap_or_else(|| "default".to_string());
335 let since = q.since.unwrap_or(0);
336 let limit = q.limit.unwrap_or(200).min(1000);
337 let redaction = RedactionLevel::RefsOnly;
338
339 let rt = crate::core::context_os::runtime();
340 let replay = rt.bus.read(&ws, &ch, since, limit);
341 let rx = rt.bus.subscribe();
342 rt.metrics.record_sse_connect();
343 rt.metrics.record_events_replayed(replay.len() as u64);
344 rt.metrics.record_workspace_active(&ws);
345
346 let stream = futures::stream::unfold(
347 (
348 replay.into_iter(),
349 rx,
350 ws.clone(),
351 ch.clone(),
352 since,
353 redaction,
354 ),
355 |(mut replay_it, mut rx, ws, ch, mut last_id, redaction)| async move {
356 if let Some(mut ev) = replay_it.next() {
357 last_id = ev.id;
358 redact_event_payload(&mut ev, redaction);
359 let data = serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
360 let evt = SseEvent::default()
361 .id(ev.id.to_string())
362 .event(ev.kind)
363 .data(data);
364 return Some((Ok(evt), (replay_it, rx, ws, ch, last_id, redaction)));
365 }
366
367 loop {
368 match rx.recv().await {
369 Ok(mut ev) => {
370 if ev.workspace_id == ws && ev.channel_id == ch && ev.id > last_id {
371 last_id = ev.id;
372 redact_event_payload(&mut ev, redaction);
373 let data =
374 serde_json::to_string(&ev).unwrap_or_else(|_| "{}".to_string());
375 let evt = SseEvent::default()
376 .id(ev.id.to_string())
377 .event(ev.kind)
378 .data(data);
379 return Some((Ok(evt), (replay_it, rx, ws, ch, last_id, redaction)));
380 }
381 }
382 Err(broadcast::error::RecvError::Closed) => return None,
383 Err(broadcast::error::RecvError::Lagged(_)) => {}
384 }
385 }
386 },
387 );
388
389 Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
390}
391
392async fn v1_metrics(State(_state): State<AppState>) -> impl IntoResponse {
393 let rt = crate::core::context_os::runtime();
394 let snap = rt.metrics.snapshot();
395 (
396 StatusCode::OK,
397 Json(serde_json::to_value(snap).unwrap_or_default()),
398 )
399}
400
401pub async fn serve(cfg: HttpServerConfig) -> Result<()> {
402 cfg.validate()?;
403
404 let addr: SocketAddr = format!("{}:{}", cfg.host, cfg.port)
405 .parse()
406 .context("invalid host/port")?;
407
408 let project_root = cfg.project_root.to_string_lossy().to_string();
409 let service_project_root = project_root.clone();
412 let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
413 Ok(LeanCtxServer::new_shared_with_context(
414 &service_project_root,
415 "default",
416 "default",
417 ))
418 };
419 let mcp_http = StreamableHttpService::new(
420 service_factory,
421 Arc::new(
422 rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
423 ),
424 cfg.mcp_http_config(),
425 );
426
427 let state = AppState {
428 token: cfg.auth_token.clone().filter(|t| !t.is_empty()),
429 concurrency: Arc::new(tokio::sync::Semaphore::new(cfg.max_concurrency.max(1))),
430 rate: Arc::new(RateLimiter::new(cfg.max_rps, cfg.rate_burst)),
431 project_root: project_root.clone(),
432 timeout: Duration::from_millis(cfg.request_timeout_ms.max(1)),
433 };
434
435 let app = Router::new()
436 .route("/health", get(health))
437 .route("/v1/manifest", get(v1_manifest))
438 .route("/v1/tools", get(v1_tools))
439 .route("/v1/tools/call", axum::routing::post(v1_tool_call))
440 .route("/v1/events", get(v1_events))
441 .route("/v1/metrics", get(v1_metrics))
442 .fallback_service(mcp_http)
443 .layer(axum::extract::DefaultBodyLimit::max(cfg.max_body_bytes))
444 .layer(middleware::from_fn_with_state(
445 state.clone(),
446 rate_limit_middleware,
447 ))
448 .layer(middleware::from_fn_with_state(
449 state.clone(),
450 concurrency_middleware,
451 ))
452 .layer(middleware::from_fn_with_state(
453 state.clone(),
454 auth_middleware,
455 ))
456 .with_state(state);
457
458 let listener = tokio::net::TcpListener::bind(addr)
459 .await
460 .with_context(|| format!("bind {addr}"))?;
461
462 tracing::info!(
463 "lean-ctx Streamable HTTP server listening on http://{addr} (project_root={})",
464 cfg.project_root.display()
465 );
466
467 axum::serve(listener, app)
468 .with_graceful_shutdown(async move {
469 let _ = tokio::signal::ctrl_c().await;
470 })
471 .await
472 .context("http server")?;
473 Ok(())
474}
475
476#[cfg(unix)]
477pub async fn serve_uds(cfg: HttpServerConfig, socket_path: PathBuf) -> Result<()> {
478 cfg.validate()?;
479
480 if socket_path.exists() {
481 std::fs::remove_file(&socket_path)
482 .with_context(|| format!("remove stale socket {}", socket_path.display()))?;
483 }
484
485 let project_root = cfg.project_root.to_string_lossy().to_string();
486 let service_project_root = project_root.clone();
487 let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
488 Ok(LeanCtxServer::new_shared_with_context(
489 &service_project_root,
490 "default",
491 "default",
492 ))
493 };
494 let mcp_http = StreamableHttpService::new(
495 service_factory,
496 Arc::new(
497 rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
498 ),
499 cfg.mcp_http_config(),
500 );
501
502 let state = AppState {
503 token: cfg.auth_token.clone().filter(|t| !t.is_empty()),
504 concurrency: Arc::new(tokio::sync::Semaphore::new(cfg.max_concurrency.max(1))),
505 rate: Arc::new(RateLimiter::new(cfg.max_rps, cfg.rate_burst)),
506 project_root: project_root.clone(),
507 timeout: Duration::from_millis(cfg.request_timeout_ms.max(1)),
508 };
509
510 let app = Router::new()
511 .route("/health", get(health))
512 .route("/v1/manifest", get(v1_manifest))
513 .route("/v1/tools", get(v1_tools))
514 .route("/v1/tools/call", axum::routing::post(v1_tool_call))
515 .route("/v1/events", get(v1_events))
516 .route("/v1/metrics", get(v1_metrics))
517 .fallback_service(mcp_http)
518 .layer(axum::extract::DefaultBodyLimit::max(cfg.max_body_bytes))
519 .layer(middleware::from_fn_with_state(
520 state.clone(),
521 rate_limit_middleware,
522 ))
523 .layer(middleware::from_fn_with_state(
524 state.clone(),
525 concurrency_middleware,
526 ))
527 .layer(middleware::from_fn_with_state(
528 state.clone(),
529 auth_middleware,
530 ))
531 .with_state(state);
532
533 let listener = tokio::net::UnixListener::bind(&socket_path)
534 .with_context(|| format!("bind UDS {}", socket_path.display()))?;
535
536 tracing::info!(
537 "lean-ctx daemon listening on {} (project_root={})",
538 socket_path.display(),
539 cfg.project_root.display()
540 );
541
542 axum::serve(listener, app.into_make_service())
543 .with_graceful_shutdown(async move {
544 let _ = tokio::signal::ctrl_c().await;
545 })
546 .await
547 .context("uds server")?;
548 Ok(())
549}
550
551#[cfg(test)]
552mod tests {
553 use super::*;
554 use axum::body::Body;
555 use axum::http::Request;
556 use futures::StreamExt;
557 use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
558 use serde_json::json;
559 use tower::ServiceExt;
560
561 async fn read_first_sse_message(body: Body) -> String {
562 let mut stream = body.into_data_stream();
563 let mut buf: Vec<u8> = Vec::new();
564 for _ in 0..32 {
565 let next = tokio::time::timeout(Duration::from_secs(2), stream.next()).await;
566 let Ok(Some(Ok(bytes))) = next else {
567 break;
568 };
569 buf.extend_from_slice(&bytes);
570 if buf.windows(2).any(|w| w == b"\n\n") {
571 break;
572 }
573 }
574 String::from_utf8_lossy(&buf).to_string()
575 }
576
577 #[tokio::test]
578 async fn auth_token_blocks_requests_without_bearer_header() {
579 let dir = tempfile::tempdir().expect("tempdir");
580 let root_str = dir.path().to_string_lossy().to_string();
581 let service_project_root = root_str.clone();
582 let service_factory = move || -> Result<LeanCtxServer, std::io::Error> {
583 Ok(LeanCtxServer::new_shared_with_context(
584 &service_project_root,
585 "default",
586 "default",
587 ))
588 };
589 let cfg = StreamableHttpServerConfig::default()
590 .with_stateful_mode(false)
591 .with_json_response(true);
592
593 let mcp_http = StreamableHttpService::new(
594 service_factory,
595 Arc::new(
596 rmcp::transport::streamable_http_server::session::local::LocalSessionManager::default(),
597 ),
598 cfg,
599 );
600
601 let state = AppState {
602 token: Some("secret".to_string()),
603 concurrency: Arc::new(tokio::sync::Semaphore::new(4)),
604 rate: Arc::new(RateLimiter::new(50, 100)),
605 project_root: root_str.clone(),
606 timeout: Duration::from_millis(30_000),
607 };
608
609 let app = Router::new()
610 .fallback_service(mcp_http)
611 .layer(middleware::from_fn_with_state(
612 state.clone(),
613 auth_middleware,
614 ))
615 .with_state(state);
616
617 let body = json!({
618 "jsonrpc": "2.0",
619 "id": 1,
620 "method": "tools/list",
621 "params": {}
622 })
623 .to_string();
624
625 let req = Request::builder()
626 .method("POST")
627 .uri("/")
628 .header("Host", "localhost")
629 .header("Accept", "application/json, text/event-stream")
630 .header("Content-Type", "application/json")
631 .body(Body::from(body))
632 .expect("request");
633
634 let resp = app.clone().oneshot(req).await.expect("resp");
635 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
636 }
637
638 #[tokio::test]
639 async fn mcp_service_factory_isolates_per_client_state() {
640 let dir = tempfile::tempdir().expect("tempdir");
641 let root_str = dir.path().to_string_lossy().to_string();
642
643 let service_project_root = root_str.clone();
645 let service_factory = move || -> Result<LeanCtxServer, std::convert::Infallible> {
646 Ok(LeanCtxServer::new_shared_with_context(
647 &service_project_root,
648 "default",
649 "default",
650 ))
651 };
652
653 let s1 = service_factory().expect("server 1");
654 let s2 = service_factory().expect("server 2");
655
656 *s1.client_name.write().await = "client-a".to_string();
659 *s2.client_name.write().await = "client-b".to_string();
660
661 let a = s1.client_name.read().await.clone();
662 let b = s2.client_name.read().await.clone();
663 assert_eq!(a, "client-a");
664 assert_eq!(b, "client-b");
665 }
666
667 #[tokio::test]
668 async fn rate_limit_returns_429_when_exhausted() {
669 let state = AppState {
670 token: None,
671 concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
672 rate: Arc::new(RateLimiter::new(1, 1)),
673 project_root: ".".to_string(),
674 timeout: Duration::from_millis(30_000),
675 };
676
677 let app = Router::new()
678 .route("/limited", get(|| async { (StatusCode::OK, "ok\n") }))
679 .layer(middleware::from_fn_with_state(
680 state.clone(),
681 rate_limit_middleware,
682 ))
683 .with_state(state);
684
685 let req1 = Request::builder()
686 .method("GET")
687 .uri("/limited")
688 .header("Host", "localhost")
689 .body(Body::empty())
690 .expect("req1");
691 let resp1 = app.clone().oneshot(req1).await.expect("resp1");
692 assert_eq!(resp1.status(), StatusCode::OK);
693
694 let req2 = Request::builder()
695 .method("GET")
696 .uri("/limited")
697 .header("Host", "localhost")
698 .body(Body::empty())
699 .expect("req2");
700 let resp2 = app.clone().oneshot(req2).await.expect("resp2");
701 assert_eq!(resp2.status(), StatusCode::TOO_MANY_REQUESTS);
702 }
703
704 #[tokio::test]
705 async fn events_endpoint_replays_tool_call_event() {
706 let dir = tempfile::tempdir().expect("tempdir");
707 std::fs::create_dir_all(dir.path().join(".git")).expect("git marker");
708 std::fs::write(dir.path().join("a.txt"), "ok").expect("file");
709 let root_str = dir.path().to_string_lossy().to_string();
710
711 let state = AppState {
712 token: None,
713 concurrency: Arc::new(tokio::sync::Semaphore::new(16)),
714 rate: Arc::new(RateLimiter::new(50, 100)),
715 project_root: root_str.clone(),
716 timeout: Duration::from_millis(30_000),
717 };
718
719 let app = Router::new()
720 .route("/v1/tools/call", axum::routing::post(v1_tool_call))
721 .route("/v1/events", get(v1_events))
722 .with_state(state);
723
724 let body = json!({
725 "name": "ctx_session",
726 "arguments": { "action": "status" },
727 "workspaceId": "ws1",
728 "channelId": "ch1"
729 })
730 .to_string();
731 let req = Request::builder()
732 .method("POST")
733 .uri("/v1/tools/call")
734 .header("Host", "localhost")
735 .header("Content-Type", "application/json")
736 .body(Body::from(body))
737 .expect("req");
738 let resp = app.clone().oneshot(req).await.expect("call");
739 assert_eq!(resp.status(), StatusCode::OK);
740
741 let req = Request::builder()
743 .method("GET")
744 .uri("/v1/events?workspaceId=ws1&channelId=ch1&since=0&limit=1")
745 .header("Host", "localhost")
746 .header("Accept", "text/event-stream")
747 .body(Body::empty())
748 .expect("req");
749 let resp = app.clone().oneshot(req).await.expect("events");
750 assert_eq!(resp.status(), StatusCode::OK);
751
752 let msg = read_first_sse_message(resp.into_body()).await;
753 assert!(msg.contains("event: tool_call_recorded"), "msg={msg:?}");
754 assert!(msg.contains("\"workspaceId\":\"ws1\""), "msg={msg:?}");
755 assert!(msg.contains("\"channelId\":\"ch1\""), "msg={msg:?}");
756 }
757}