Skip to main content

construct/mcp_server/
server.rs

1//! Axum-based MCP daemon.
2//!
3//! Endpoints:
4//! - `POST /session`        — unauthenticated; mints `{ session_id, token, cwd }`.
5//! - `POST /mcp`            — authenticated JSON-RPC 2.0. Returns either plain
6//!                            JSON or an SSE stream depending on the method.
7//! - `GET  /health`         — simple liveness probe.
8//!
9//! Session auth headers (required on `POST /mcp`):
10//! - `Authorization: Bearer <token>`
11//! - `X-Construct-Session: <session_id>`
12//!
13//! `POST /mcp` dispatch:
14//! - `initialize`     → plain JSON response with server info + capabilities.
15//! - `tools/list`     → plain JSON response listing all advertised tools.
16//! - `tools/call`     → SSE stream. First emits zero or more
17//!                      `notifications/progress` events (forwarded from the
18//!                      tool's `execute_with_progress`), then one terminal
19//!                      JSON-RPC response event, then `event: done`.
20
21use crate::config::Config;
22use crate::mcp_server::progress::McpProgressSink;
23use crate::mcp_server::registry::{
24    SkippedEntry, build_default_tools, build_tools_with_config, build_tools_with_runtime,
25};
26use crate::mcp_server::runtime::RuntimeHandles;
27use crate::mcp_server::session::{ProgressEvent, SessionStore, SharedSessionStore};
28use crate::tools::Tool;
29use crate::tools::mcp_protocol::{
30    INTERNAL_ERROR, INVALID_PARAMS, INVALID_REQUEST, JSONRPC_VERSION, JsonRpcError,
31    MCP_PROTOCOL_VERSION, METHOD_NOT_FOUND,
32};
33use axum::{
34    Json, Router,
35    extract::{Path, State},
36    http::{HeaderMap, StatusCode, header},
37    response::{
38        IntoResponse, Response,
39        sse::{Event, KeepAlive, Sse},
40    },
41    routing::{get, post},
42};
43use chrono::{DateTime, Utc};
44use serde::{Deserialize, Serialize};
45use serde_json::{Value, json};
46use std::collections::HashMap;
47use std::convert::Infallible;
48use std::net::SocketAddr;
49use std::path::PathBuf;
50use std::sync::Arc;
51use std::sync::OnceLock;
52use std::time::Instant;
53use tokio::net::TcpListener;
54use tokio::sync::broadcast;
55use tokio::sync::mpsc;
56use tokio_stream::StreamExt;
57use tokio_stream::wrappers::BroadcastStream;
58use tokio_stream::wrappers::UnboundedReceiverStream;
59
60// ── Router state ───────────────────────────────────────────────────────────
61
62#[derive(Clone)]
63pub struct AppState {
64    pub sessions: SharedSessionStore,
65    pub tools: Arc<HashMap<String, Arc<dyn Tool>>>,
66}
67
68// ── Process start-time tracking (used by /health). ────────────────────────
69//
70// A `OnceLock` is fine here: the daemon process has a single start. Tests
71// that spin up multiple routers in-process are all inside the same pid, so
72// `started_at` is stable for the test binary lifetime — tests only assert
73// shape, not exact value.
74
75struct StartTime {
76    instant: Instant,
77    wall: DateTime<Utc>,
78}
79
80static START_TIME: OnceLock<StartTime> = OnceLock::new();
81
82fn start_time() -> &'static StartTime {
83    START_TIME.get_or_init(|| StartTime {
84        instant: Instant::now(),
85        wall: Utc::now(),
86    })
87}
88
89#[derive(Debug, Serialize)]
90struct HealthResponse {
91    status: &'static str,
92    pid: u32,
93    uptime_seconds: u64,
94    started_at: String,
95    protocol_version: &'static str,
96}
97
98fn build_health_response() -> HealthResponse {
99    let st = start_time();
100    HealthResponse {
101        status: "ok",
102        pid: std::process::id(),
103        uptime_seconds: st.instant.elapsed().as_secs(),
104        started_at: st.wall.to_rfc3339(),
105        protocol_version: MCP_PROTOCOL_VERSION,
106    }
107}
108
109// ── Public handles ─────────────────────────────────────────────────────────
110
111/// Handle returned by `serve_on` so tests can learn the bound port and shut
112/// the server down.
113pub struct McpServerHandle {
114    pub addr: SocketAddr,
115    pub shutdown: tokio::sync::oneshot::Sender<()>,
116    pub joined: tokio::task::JoinHandle<()>,
117}
118
119/// Build the Axum router. Exposed for tests.
120#[must_use]
121pub fn build_router(state: AppState) -> Router {
122    Router::new()
123        .route("/health", get(health_handler))
124        .route("/session", post(create_session_handler))
125        .route("/session/{session_id}/events", get(session_events_handler))
126        .route("/mcp", post(mcp_handler))
127        .with_state(state)
128}
129
130/// Build an [`AppState`] with Construct's baseline tool registry (no Config).
131///
132/// Kept as the M1 entry point — returns the curated set of ~16 tools and no
133/// integrations. Tests targeting the baseline surface call this directly.
134#[must_use]
135pub fn default_state(workspace_dir: &std::path::Path) -> (AppState, Vec<SkippedEntry>) {
136    let (tools, skipped) = build_default_tools(workspace_dir);
137    (build_app_state(tools), skipped)
138}
139
140/// Build an [`AppState`] using a loaded `Config` so integrations with creds
141/// (Notion, Jira, Composio, Google Workspace, etc.) and the skills meta-tools
142/// get registered alongside the baseline.
143#[must_use]
144pub fn state_from_config(
145    workspace_dir: &std::path::Path,
146    config: &Config,
147) -> (AppState, Vec<SkippedEntry>) {
148    let (tools, skipped) = build_tools_with_config(workspace_dir, config);
149    (build_app_state(tools), skipped)
150}
151
152/// Build an [`AppState`] using a loaded `Config` plus the gateway's live
153/// [`RuntimeHandles`]. This is the full-registry entry point used by the
154/// in-process daemon boot.
155#[must_use]
156pub fn state_from_runtime(
157    workspace_dir: &std::path::Path,
158    config: &Config,
159    runtime: &RuntimeHandles,
160) -> (AppState, Vec<SkippedEntry>) {
161    let (tools, skipped) = build_tools_with_runtime(workspace_dir, config, runtime);
162    (build_app_state(tools), skipped)
163}
164
165fn build_app_state(tools: Vec<Arc<dyn Tool>>) -> AppState {
166    let map: HashMap<String, Arc<dyn Tool>> = tools
167        .into_iter()
168        .map(|t| (t.name().to_string(), t))
169        .collect();
170    AppState {
171        sessions: Arc::new(SessionStore::new()),
172        tools: Arc::new(map),
173    }
174}
175
176/// Extend an existing [`AppState`] with additional tools (used by tests).
177pub fn state_with_tools(tools: Vec<Arc<dyn Tool>>) -> AppState {
178    let map: HashMap<String, Arc<dyn Tool>> = tools
179        .into_iter()
180        .map(|t| (t.name().to_string(), t))
181        .collect();
182    AppState {
183        sessions: Arc::new(SessionStore::new()),
184        tools: Arc::new(map),
185    }
186}
187
188/// Bind to `addr` (use `127.0.0.1:0` for ephemeral) and serve the router.
189/// Returns once the server is listening. Writes no discovery file.
190pub async fn serve_on(addr: SocketAddr, state: AppState) -> anyhow::Result<McpServerHandle> {
191    let listener = TcpListener::bind(addr).await?;
192    let bound = listener.local_addr()?;
193    let router = build_router(state);
194    let (tx, rx) = tokio::sync::oneshot::channel::<()>();
195
196    let joined = tokio::spawn(async move {
197        let res = axum::serve(listener, router)
198            .with_graceful_shutdown(async move {
199                let _ = rx.await;
200            })
201            .await;
202        if let Err(e) = res {
203            tracing::error!("construct-mcp server exited: {e}");
204        }
205    });
206
207    Ok(McpServerHandle {
208        addr: bound,
209        shutdown: tx,
210        joined,
211    })
212}
213
214/// Legacy blocking entry point retained for tests that want to boot the MCP
215/// server with only a workspace directory (no gateway AppState).
216///
217/// The main daemon no longer calls this: the gateway wires the MCP task
218/// directly via [`state_from_runtime`] + [`serve_on`] so that live runtime
219/// handles (workspace manager, channel map, session store, …) can be
220/// threaded in.
221///
222/// Attempts to load a real Construct `Config`. On failure falls back to the
223/// baseline registry so the server still advertises the curated tool set.
224pub async fn run_daemon(workspace_dir: PathBuf) -> anyhow::Result<()> {
225    let _ = start_time();
226
227    let (state, skipped) = match Box::pin(Config::load_or_init()).await {
228        Ok(config) => {
229            tracing::info!(
230                "mcp-server: loaded Construct config from {}",
231                config.config_path.display()
232            );
233            state_from_config(&workspace_dir, &config)
234        }
235        Err(err) => {
236            tracing::warn!(
237                "mcp-server: failed to load Construct config ({err}) — continuing with baseline registry"
238            );
239            default_state(&workspace_dir)
240        }
241    };
242
243    for (name, reason) in &skipped {
244        tracing::info!("mcp-server: skipped tool `{name}` — {reason}");
245    }
246    tracing::info!("mcp-server: advertising {} tools", state.tools.len());
247
248    let handle = serve_on(SocketAddr::from(([127, 0, 0, 1], 0)), state).await?;
249    let url = format!("http://{}/mcp", handle.addr);
250    write_discovery_file(&url)?;
251    tracing::info!("mcp-server: listening on {url}");
252
253    let _ = tokio::signal::ctrl_c().await;
254    let _ = handle.shutdown.send(());
255    let _ = handle.joined.await;
256    cleanup_discovery_file();
257    Ok(())
258}
259
260/// Absolute path of the MCP discovery file (`~/.construct/mcp.json`).
261#[must_use]
262pub fn discovery_path() -> Option<PathBuf> {
263    directories::UserDirs::new().map(|u| u.home_dir().join(".construct").join("mcp.json"))
264}
265
266/// Write the MCP discovery file atomically (tempfile + rename) so external
267/// readers never observe a half-written JSON document. Payload shape is
268/// `{url, pid, started_at}` — frozen by contract, do not change.
269pub fn write_discovery_file(url: &str) -> anyhow::Result<()> {
270    let Some(path) = discovery_path() else {
271        anyhow::bail!("could not resolve home directory");
272    };
273    if let Some(parent) = path.parent() {
274        std::fs::create_dir_all(parent)?;
275    }
276    let payload = json!({
277        "url": url,
278        "pid": std::process::id(),
279        "started_at": Utc::now().to_rfc3339(),
280    });
281    let bytes = serde_json::to_vec_pretty(&payload)?;
282
283    // Atomic write: tempfile in the same directory, then rename over the
284    // target. Rename is atomic on POSIX filesystems so concurrent readers
285    // never see a partial write.
286    let parent = path
287        .parent()
288        .ok_or_else(|| anyhow::anyhow!("discovery path has no parent"))?;
289    let tmp_name = format!(
290        ".mcp.json.{}.{}",
291        std::process::id(),
292        Utc::now().timestamp_nanos_opt().unwrap_or(0)
293    );
294    let tmp_path = parent.join(tmp_name);
295    std::fs::write(&tmp_path, &bytes)?;
296    std::fs::rename(&tmp_path, &path)?;
297    Ok(())
298}
299
300/// Remove the MCP discovery file on shutdown. Safe to call when the file
301/// doesn't exist.
302pub fn cleanup_discovery_file() {
303    if let Some(path) = discovery_path() {
304        let _ = std::fs::remove_file(path);
305    }
306}
307
308// ── Handlers ───────────────────────────────────────────────────────────────
309
310async fn health_handler() -> Response {
311    (StatusCode::OK, Json(build_health_response())).into_response()
312}
313
314#[derive(Debug, Deserialize, Default)]
315struct CreateSessionBody {
316    cwd: Option<String>,
317    label: Option<String>,
318}
319
320#[derive(Debug, Serialize)]
321struct CreateSessionResponse {
322    session_id: String,
323    token: String,
324    cwd: String,
325}
326
327async fn create_session_handler(
328    State(state): State<AppState>,
329    body: Option<Json<CreateSessionBody>>,
330) -> Response {
331    let body = body.map(|Json(b)| b).unwrap_or_default();
332    let cwd = resolve_cwd(body.cwd.as_deref());
333    let sess = state.sessions.create(cwd.clone(), body.label).await;
334    let resp = CreateSessionResponse {
335        session_id: sess.id,
336        token: sess.token,
337        cwd: cwd.to_string_lossy().into_owned(),
338    };
339    (StatusCode::OK, Json(resp)).into_response()
340}
341
342fn resolve_cwd(supplied: Option<&str>) -> PathBuf {
343    if let Some(s) = supplied {
344        let p = PathBuf::from(shellexpand::tilde(s).into_owned());
345        if let Ok(canon) = p.canonicalize() {
346            return canon;
347        }
348        return p;
349    }
350    std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
351}
352
353/// GET /session/<session_id>/events — session-wide progress SSE stream.
354///
355/// Auth: `Authorization: Bearer <token>` where `<token>` matches the token
356/// that was issued by `POST /session`. `X-Construct-Session` is not required
357/// here because the session id is already in the URL.
358///
359/// Every `ProgressEvent` published to the session's broadcast sender (by any
360/// in-flight `tools/call`) is forwarded as a single `event:` line containing
361/// the JSON-serialized payload. The stream terminates when the client
362/// disconnects.
363async fn session_events_handler(
364    State(state): State<AppState>,
365    Path(session_id): Path<String>,
366    headers: HeaderMap,
367) -> Response {
368    // Reuse the `Authorization: Bearer <token>` header; `X-Construct-Session`
369    // would duplicate `session_id` so we skip it.
370    let Some(token) = headers
371        .get(header::AUTHORIZATION)
372        .and_then(|v| v.to_str().ok())
373        .and_then(|s| s.strip_prefix("Bearer "))
374        .map(str::trim)
375    else {
376        return (
377            StatusCode::UNAUTHORIZED,
378            "missing Authorization: Bearer <token>",
379        )
380            .into_response();
381    };
382
383    let Some(_session) = state.sessions.authenticate(&session_id, token).await else {
384        return (StatusCode::UNAUTHORIZED, "invalid session or token").into_response();
385    };
386
387    // Subscribe to the session's broadcast channel. `authenticate` already
388    // proved the session exists, but we re-fetch the sender to clone it.
389    let Some(sender) = state.sessions.event_sender(&session_id).await else {
390        return (StatusCode::NOT_FOUND, "session vanished").into_response();
391    };
392
393    let rx = sender.subscribe();
394    let stream = BroadcastStream::new(rx).filter_map(|msg| match msg {
395        Ok(ev) => Some(Ok::<_, Infallible>(
396            Event::default().data(serde_json::to_string(&ev).unwrap_or_else(|_| "{}".into())),
397        )),
398        // Lagged: slow consumer missed N frames. We drop those frames
399        // silently — progress is advisory.
400        Err(_) => None,
401    });
402
403    Sse::new(stream)
404        .keep_alive(KeepAlive::default())
405        .into_response()
406}
407
408fn auth_or_401(headers: &HeaderMap) -> Result<(String, String), Response> {
409    let bearer = headers
410        .get(header::AUTHORIZATION)
411        .and_then(|v| v.to_str().ok())
412        .and_then(|s| s.strip_prefix("Bearer "))
413        .map(str::trim);
414    let session_id = headers
415        .get("x-construct-session")
416        .and_then(|v| v.to_str().ok())
417        .map(str::trim);
418    match (bearer, session_id) {
419        (Some(t), Some(s)) if !t.is_empty() && !s.is_empty() => Ok((s.to_string(), t.to_string())),
420        _ => Err((
421            StatusCode::UNAUTHORIZED,
422            "missing Authorization: Bearer <token> or X-Construct-Session header",
423        )
424            .into_response()),
425    }
426}
427
428async fn mcp_handler(
429    State(state): State<AppState>,
430    headers: HeaderMap,
431    Json(req): Json<Value>,
432) -> Response {
433    let (session_id, token) = match auth_or_401(&headers) {
434        Ok(pair) => pair,
435        Err(resp) => return resp,
436    };
437    let Some(session) = state.sessions.authenticate(&session_id, &token).await else {
438        return (StatusCode::UNAUTHORIZED, "invalid session or token").into_response();
439    };
440
441    // Pull id + method. `id` may be absent for JSON-RPC notifications; for
442    // method calls we require it.
443    let id = req.get("id").cloned();
444    let method = req
445        .get("method")
446        .and_then(Value::as_str)
447        .unwrap_or("")
448        .to_string();
449    let params = req.get("params").cloned().unwrap_or(Value::Null);
450
451    match method.as_str() {
452        "initialize" => plain_ok(id, initialize_result()),
453        "tools/list" => plain_ok(id, tools_list_result(&state)),
454        "tools/call" => stream_tool_call(state, session.events.clone(), id, params),
455        "notifications/initialized" | "notifications/cancelled" => {
456            // No response body for notifications.
457            StatusCode::ACCEPTED.into_response()
458        }
459        "" => plain_err(id, INVALID_REQUEST, "missing method"),
460        other => plain_err(id, METHOD_NOT_FOUND, &format!("unknown method: {other}")),
461    }
462}
463
464fn initialize_result() -> Value {
465    json!({
466        "protocolVersion": MCP_PROTOCOL_VERSION,
467        "capabilities": {
468            "tools": { "listChanged": false }
469        },
470        "serverInfo": {
471            "name": "construct-mcp",
472            "version": env!("CARGO_PKG_VERSION"),
473        }
474    })
475}
476
477fn tools_list_result(state: &AppState) -> Value {
478    let mut tools: Vec<Value> = state
479        .tools
480        .values()
481        .map(|t| {
482            json!({
483                "name": t.name(),
484                "description": t.description(),
485                "inputSchema": t.parameters_schema(),
486            })
487        })
488        .collect();
489    tools.sort_by(|a, b| a["name"].as_str().cmp(&b["name"].as_str()));
490    json!({ "tools": tools })
491}
492
493fn plain_ok(id: Option<Value>, result: Value) -> Response {
494    let body = json!({
495        "jsonrpc": JSONRPC_VERSION,
496        "id": id.unwrap_or(Value::Null),
497        "result": result,
498    });
499    (StatusCode::OK, Json(body)).into_response()
500}
501
502fn plain_err(id: Option<Value>, code: i32, msg: &str) -> Response {
503    let err = JsonRpcError {
504        code,
505        message: msg.to_string(),
506        data: None,
507    };
508    let body = json!({
509        "jsonrpc": JSONRPC_VERSION,
510        "id": id.unwrap_or(Value::Null),
511        "error": err,
512    });
513    (StatusCode::OK, Json(body)).into_response()
514}
515
516fn stream_tool_call(
517    state: AppState,
518    session_events: broadcast::Sender<ProgressEvent>,
519    id: Option<Value>,
520    params: Value,
521) -> Response {
522    let name = params
523        .get("name")
524        .and_then(Value::as_str)
525        .unwrap_or("")
526        .to_string();
527    let args = params.get("arguments").cloned().unwrap_or(Value::Null);
528    let meta_token = params
529        .get("_meta")
530        .and_then(|m| m.get("progressToken"))
531        .and_then(Value::as_u64);
532
533    let Some(tool) = state.tools.get(&name).cloned() else {
534        return plain_err(id, INVALID_PARAMS, &format!("unknown tool: {name}"));
535    };
536
537    let (tx, rx) = mpsc::unbounded_channel::<Value>();
538    let sink = Arc::new(McpProgressSink::with_session(
539        tx.clone(),
540        meta_token,
541        session_events,
542        name.clone(),
543    ));
544
545    // Kick off the tool in the background; the task pushes the terminal
546    // JSON-RPC response onto `tx` once finished.
547    let tx_final = tx.clone();
548    let id_for_task = id.clone();
549    tokio::spawn(async move {
550        let result = tool.execute_with_progress(args, sink.as_ref()).await;
551        let final_msg = match result {
552            Ok(tool_result) => {
553                let content = tool_result_to_content(&tool_result);
554                let payload = json!({
555                    "content": content,
556                    "isError": !tool_result.success,
557                });
558                json!({
559                    "jsonrpc": JSONRPC_VERSION,
560                    "id": id_for_task.unwrap_or(Value::Null),
561                    "result": payload,
562                })
563            }
564            Err(err) => {
565                json!({
566                    "jsonrpc": JSONRPC_VERSION,
567                    "id": id_for_task.unwrap_or(Value::Null),
568                    "error": {
569                        "code": INTERNAL_ERROR,
570                        "message": err.to_string(),
571                    }
572                })
573            }
574        };
575        let _ = tx_final.send(final_msg);
576        // Dropping the last sender will close `rx` for the SSE stream.
577    });
578
579    let event_stream = UnboundedReceiverStream::new(rx)
580        .map(|msg| Ok::<_, Infallible>(Event::default().data(msg.to_string())));
581
582    Sse::new(event_stream)
583        .keep_alive(KeepAlive::default())
584        .into_response()
585}
586
587fn tool_result_to_content(result: &crate::tools::traits::ToolResult) -> Value {
588    let text = if result.success {
589        result.output.clone()
590    } else {
591        result
592            .error
593            .clone()
594            .unwrap_or_else(|| result.output.clone())
595    };
596    json!([{ "type": "text", "text": text }])
597}
598
599#[cfg(test)]
600mod tests {
601    use super::*;
602
603    #[test]
604    fn health_response_shape() {
605        let h = build_health_response();
606        // Serialize to JSON and assert every documented field is present with
607        // the expected type.
608        let v = serde_json::to_value(&h).unwrap();
609        assert_eq!(v["status"], "ok");
610        assert!(v["pid"].as_u64().is_some());
611        assert!(v["uptime_seconds"].as_u64().is_some());
612        assert!(
613            v["started_at"].as_str().is_some_and(|s| !s.is_empty()),
614            "started_at should be a non-empty rfc3339 string"
615        );
616        assert_eq!(v["protocol_version"], MCP_PROTOCOL_VERSION);
617    }
618
619    #[tokio::test]
620    async fn health_handler_returns_200_json() {
621        let resp = health_handler().await;
622        assert_eq!(resp.status(), StatusCode::OK);
623        // Best effort: response should carry a JSON content-type.
624        let ct = resp
625            .headers()
626            .get(header::CONTENT_TYPE)
627            .and_then(|v| v.to_str().ok())
628            .unwrap_or("");
629        assert!(
630            ct.contains("application/json"),
631            "expected JSON content-type, got: {ct}"
632        );
633    }
634
635    // ── Session-wide progress SSE (`/session/<id>/events`) ────────────────
636
637    fn fresh_app_state() -> AppState {
638        let tmp = std::env::temp_dir();
639        let (state, _) = default_state(&tmp);
640        state
641    }
642
643    #[tokio::test]
644    async fn session_events_rejects_missing_auth() {
645        let state = fresh_app_state();
646        let sess = state.sessions.create(std::env::temp_dir(), None).await;
647        let headers = HeaderMap::new();
648        let resp = session_events_handler(State(state), Path(sess.id.clone()), headers).await;
649        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
650    }
651
652    #[tokio::test]
653    async fn session_events_rejects_wrong_token() {
654        let state = fresh_app_state();
655        let sess = state.sessions.create(std::env::temp_dir(), None).await;
656        let mut headers = HeaderMap::new();
657        headers.insert(
658            header::AUTHORIZATION,
659            "Bearer not-the-token".parse().unwrap(),
660        );
661        let resp = session_events_handler(State(state), Path(sess.id.clone()), headers).await;
662        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
663    }
664
665    #[tokio::test]
666    async fn session_events_accepts_correct_token() {
667        let state = fresh_app_state();
668        let sess = state.sessions.create(std::env::temp_dir(), None).await;
669        let mut headers = HeaderMap::new();
670        headers.insert(
671            header::AUTHORIZATION,
672            format!("Bearer {}", sess.token).parse().unwrap(),
673        );
674        let resp = session_events_handler(State(state), Path(sess.id.clone()), headers).await;
675        assert_eq!(resp.status(), StatusCode::OK);
676        // SSE content-type marker.
677        let ct = resp
678            .headers()
679            .get(header::CONTENT_TYPE)
680            .and_then(|v| v.to_str().ok())
681            .unwrap_or("");
682        assert!(
683            ct.contains("text/event-stream"),
684            "expected SSE content-type, got: {ct}"
685        );
686    }
687
688    #[tokio::test]
689    async fn session_broadcast_round_trip_through_store() {
690        let state = fresh_app_state();
691        let sess = state.sessions.create(std::env::temp_dir(), None).await;
692
693        // Subscribe via the authenticated lookup path the handler uses.
694        let sender = state
695            .sessions
696            .event_sender(&sess.id)
697            .await
698            .expect("session present");
699        let mut rx = sender.subscribe();
700
701        // Simulate a synthetic progress event (as if a tool had emitted it).
702        let ev = ProgressEvent::new(
703            5,
704            2,
705            Some(4),
706            Some("half way".into()),
707            Some("notion".into()),
708        );
709        sender.send(ev).expect("send ok");
710
711        let got = rx.recv().await.expect("recv ok");
712        assert_eq!(got.progress, 2);
713        assert_eq!(got.tool.as_deref(), Some("notion"));
714    }
715}