Skip to main content

solo_api/
mcp_progress.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! v0.11.0 P3 — per-tool progress events for long-running MCP tool calls.
4//!
5//! When an MCP `tools/call` request carries `_meta.progressToken` in its
6//! params, the dispatcher builds a [`ProgressReporter`] wired to the
7//! caller's [`crate::mcp_session::SessionState`]. The reporter is passed
8//! into the tool handler, which calls [`ProgressReporter::report`] at
9//! sensible checkpoints. Each call publishes a JSON-RPC
10//! `notifications/progress` envelope onto the session's broadcast
11//! channel, where the GET SSE stream subscriber forwards it to the
12//! client.
13//!
14//! ## Wire shape (Decision C — MCP spec verbatim)
15//!
16//! ```json
17//! {
18//!   "jsonrpc": "2.0",
19//!   "method": "notifications/progress",
20//!   "params": {
21//!     "progressToken": "<echo of the client's token>",
22//!     "progress": 5,
23//!     "total": 12,
24//!     "message": "Processing cluster 5/12"
25//!   }
26//! }
27//! ```
28//!
29//! The `progressToken` is whatever shape the client sent — string OR
30//! number per the spec. [`ProgressToken`] is `serde_json::Value`-backed
31//! so we preserve the wire shape losslessly on echo.
32//!
33//! ## Stdio backward compat
34//!
35//! [`ProgressReporter`] is constructed only for HTTP requests that have
36//! both an attached `SessionState` (planted by the session middleware)
37//! and a `_meta.progressToken` in their `tools/call` params. The stdio
38//! path (rmcp `call_tool`) and the unit tests that drive `dispatch_tool`
39//! directly pass `None` — handlers see `Option<ProgressReporter>` and
40//! skip reporting silently when it's `None`.
41//!
42//! ## Backpressure
43//!
44//! [`crate::mcp_session::SessionState::publish_event`] is lossy on the
45//! broadcast channel (slow subscribers see a `lagged` event on
46//! reconnect) but lossless to the replay buffer for `Last-Event-ID`
47//! resume. Progress events are inherently best-effort — a client that
48//! falls behind 256 events still sees the final tool result on the
49//! POST side.
50
51use std::sync::Arc;
52
53use serde::{Deserialize, Serialize};
54
55use crate::mcp_session::{McpEventKind, SessionState};
56
57/// JSON-RPC method name for `notifications/progress` per the MCP
58/// Streamable HTTP transport spec. Held as a const so any code that
59/// emits the envelope agrees on the exact wire spelling — the GET
60/// stream client matches on this string to route into its progress
61/// handler.
62pub const MCP_NOTIFICATION_PROGRESS_METHOD: &str = "notifications/progress";
63
64/// Threshold above which `memory_search_docs` calls emit progress events.
65/// Below 100 results the search completes fast enough that progress
66/// notifications add wire-overhead with no UX benefit (Decision C).
67pub const MCP_SEARCH_DOCS_PROGRESS_TOP_K_THRESHOLD: u32 = 100;
68
69/// Threshold above which `memory_remember_batch` calls emit per-item
70/// embedding progress. Below 50 items the batch completes inside one
71/// or two embedder round-trips and progress notifications add
72/// wire-overhead with no UX benefit (Decision C).
73pub const MCP_REMEMBER_BATCH_PROGRESS_ITEM_THRESHOLD: usize = 50;
74
75/// How often (in items processed) `memory_remember_batch` emits a
76/// progress event during the embed loop. Plan §6 P3 spec: "every 25
77/// rows". Set to 25 so a 51-item batch emits at items 25 + 50 + the
78/// final terminal event.
79pub const MCP_REMEMBER_BATCH_PROGRESS_EMIT_EVERY: usize = 25;
80
81/// The progress token an MCP client passes in `tools/call` params
82/// `_meta.progressToken`. Spec allows EITHER a string OR a number;
83/// the server echoes back whatever shape the client sent. Held as
84/// `serde_json::Value` so we don't lose the original wire shape on
85/// the round-trip.
86///
87/// Created from the request `params._meta.progressToken` JSON value
88/// via [`ProgressToken::from_meta`]; the dispatcher's `handle_tools_call`
89/// is the sole caller.
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct ProgressToken(pub serde_json::Value);
92
93impl ProgressToken {
94    /// Read `_meta.progressToken` from the `tools/call` params JSON
95    /// blob. Returns `None` if either the `_meta` key is absent, the
96    /// `progressToken` key is absent, or the value is `null`. The spec
97    /// only permits string and number — any other JSON type returns
98    /// `None` so unexpected shapes are treated as "client did not
99    /// request progress".
100    pub fn from_meta(params: &serde_json::Value) -> Option<Self> {
101        let meta = params.get("_meta")?;
102        let token = meta.get("progressToken")?;
103        match token {
104            serde_json::Value::Null => None,
105            serde_json::Value::String(_) | serde_json::Value::Number(_) => {
106                Some(Self(token.clone()))
107            }
108            // Reject objects/arrays/bools — the spec only permits
109            // string OR number.
110            _ => None,
111        }
112    }
113
114    /// The wire-shape value to echo back in `params.progressToken` of
115    /// every emitted `notifications/progress` envelope.
116    pub fn as_value(&self) -> &serde_json::Value {
117        &self.0
118    }
119}
120
121/// A handle that long-running tool handlers call to emit progress
122/// notifications during their work. Wraps the
123/// [`crate::mcp_session::SessionState`] that owns the broadcast channel
124/// + replay buffer; calls into `SessionState::publish_event` for the
125/// actual fan-out.
126///
127/// Cloneable so handlers can move it across `tokio::spawn` boundaries
128/// (the underlying `Arc<SessionState>` + token clone are cheap).
129///
130/// Constructed only when the dispatch path has BOTH an attached
131/// `Arc<SessionState>` AND a parsed `_meta.progressToken`. Callers
132/// (`dispatch_tool` + downstream handlers) accept `Option<ProgressReporter>`
133/// and skip reporting silently when it's `None`.
134#[derive(Debug, Clone)]
135pub struct ProgressReporter {
136    session: Arc<SessionState>,
137    token: ProgressToken,
138}
139
140impl ProgressReporter {
141    /// Build a reporter wired to a session + the client's token. Sole
142    /// caller today is `McpDispatcher::handle_tools_call` (and tests).
143    pub fn new(session: Arc<SessionState>, token: ProgressToken) -> Self {
144        Self { session, token }
145    }
146
147    /// Echo the original wire-shape token back to the client. Used by
148    /// the GET stream sink to correlate progress events with in-flight
149    /// `tools/call` requests on the JSON-RPC reply side.
150    pub fn token(&self) -> &ProgressToken {
151        &self.token
152    }
153
154    /// Publish one `notifications/progress` envelope to the session's
155    /// event stream. The envelope:
156    ///
157    /// ```json
158    /// {
159    ///   "jsonrpc": "2.0",
160    ///   "method": "notifications/progress",
161    ///   "params": {
162    ///     "progressToken": <client's original token>,
163    ///     "progress": <current>,
164    ///     "total": <expected total or null>,
165    ///     "message": <optional human-readable string or null>
166    ///   }
167    /// }
168    /// ```
169    ///
170    /// Returns the per-session event id assigned by
171    /// `SessionState::publish_event` so callers can correlate the
172    /// emit with the resulting stream entry. Lossy on the broadcast
173    /// side (no live subscriber = silent drop) but lossless to the
174    /// replay buffer.
175    pub fn report(&self, progress: u64, total: Option<u64>, message: Option<&str>) -> u64 {
176        let mut params = serde_json::Map::with_capacity(4);
177        params.insert("progressToken".to_string(), self.token.0.clone());
178        params.insert(
179            "progress".to_string(),
180            serde_json::Value::Number(progress.into()),
181        );
182        if let Some(t) = total {
183            params.insert("total".to_string(), serde_json::Value::Number(t.into()));
184        }
185        if let Some(m) = message {
186            params.insert(
187                "message".to_string(),
188                serde_json::Value::String(m.to_string()),
189            );
190        }
191        let envelope = serde_json::json!({
192            "jsonrpc": "2.0",
193            "method": MCP_NOTIFICATION_PROGRESS_METHOD,
194            "params": serde_json::Value::Object(params),
195        });
196        self.session.publish_event(McpEventKind::Progress, envelope)
197    }
198}
199
200/// Helper for handlers: emit a progress event if-and-only-if a
201/// reporter is present. Reads cleaner at every checkpoint than the
202/// `if let Some(r) = …` pattern repeated 4x per handler.
203///
204/// The `_unused` reply is intentional: callers don't generally care
205/// about the assigned event id; the `_unused = …` pattern in the
206/// handlers documents the intent without polluting the call site.
207pub fn report_if_some(
208    reporter: Option<&ProgressReporter>,
209    progress: u64,
210    total: Option<u64>,
211    message: Option<&str>,
212) {
213    if let Some(r) = reporter {
214        let _id = r.report(progress, total, message);
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use serde_json::json;
222    use solo_core::TenantId;
223
224    fn fresh_session() -> Arc<SessionState> {
225        Arc::new(SessionState::new(TenantId::default_tenant(), None))
226    }
227
228    #[test]
229    fn progress_token_from_meta_reads_string_token() {
230        let params = json!({
231            "name": "memory_ingest_document",
232            "_meta": { "progressToken": "abc-123" },
233        });
234        let token = ProgressToken::from_meta(&params).expect("string token");
235        assert_eq!(token.as_value(), &json!("abc-123"));
236    }
237
238    #[test]
239    fn progress_token_from_meta_reads_number_token() {
240        let params = json!({
241            "name": "memory_search_docs",
242            "_meta": { "progressToken": 42 },
243        });
244        let token = ProgressToken::from_meta(&params).expect("number token");
245        assert_eq!(token.as_value(), &json!(42));
246    }
247
248    #[test]
249    fn progress_token_from_meta_returns_none_without_meta() {
250        let params = json!({"name": "memory_inspect"});
251        assert!(ProgressToken::from_meta(&params).is_none());
252    }
253
254    #[test]
255    fn progress_token_from_meta_returns_none_for_null_token() {
256        let params = json!({"_meta": {"progressToken": null}});
257        assert!(ProgressToken::from_meta(&params).is_none());
258    }
259
260    #[test]
261    fn progress_token_from_meta_rejects_object_token() {
262        let params = json!({"_meta": {"progressToken": {"nope": 1}}});
263        assert!(ProgressToken::from_meta(&params).is_none());
264    }
265
266    #[test]
267    fn progress_reporter_publishes_spec_shape() {
268        let session = fresh_session();
269        let mut rx = session.subscribe_events();
270        let reporter = ProgressReporter::new(
271            session.clone(),
272            ProgressToken(json!("tok-1")),
273        );
274        let _id = reporter.report(3, Some(12), Some("crunching"));
275        let event = rx.try_recv().expect("event published");
276        assert_eq!(event.event, McpEventKind::Progress);
277        // Spec shape: jsonrpc + method + params{progressToken, progress, total, message}.
278        assert_eq!(event.data["jsonrpc"], json!("2.0"));
279        assert_eq!(
280            event.data["method"],
281            json!(MCP_NOTIFICATION_PROGRESS_METHOD)
282        );
283        assert_eq!(event.data["params"]["progressToken"], json!("tok-1"));
284        assert_eq!(event.data["params"]["progress"], json!(3));
285        assert_eq!(event.data["params"]["total"], json!(12));
286        assert_eq!(event.data["params"]["message"], json!("crunching"));
287    }
288
289    #[test]
290    fn progress_reporter_omits_total_and_message_when_absent() {
291        let session = fresh_session();
292        let mut rx = session.subscribe_events();
293        let reporter = ProgressReporter::new(session.clone(), ProgressToken(json!(7)));
294        let _id = reporter.report(1, None, None);
295        let event = rx.try_recv().expect("event published");
296        assert_eq!(event.data["params"]["progressToken"], json!(7));
297        assert_eq!(event.data["params"]["progress"], json!(1));
298        // Per the spec, total and message are optional — absent keys
299        // (not null) when the handler doesn't have a value to emit.
300        assert!(
301            event.data["params"].get("total").is_none(),
302            "total key must be omitted when None"
303        );
304        assert!(
305            event.data["params"].get("message").is_none(),
306            "message key must be omitted when None"
307        );
308    }
309
310    #[test]
311    fn progress_reporter_emits_monotonic_event_ids() {
312        let session = fresh_session();
313        let reporter = ProgressReporter::new(session.clone(), ProgressToken(json!("t")));
314        let id_a = reporter.report(1, None, None);
315        let id_b = reporter.report(2, None, None);
316        let id_c = reporter.report(3, None, None);
317        assert!(id_a < id_b);
318        assert!(id_b < id_c);
319    }
320
321    #[test]
322    fn report_if_some_noop_when_none() {
323        // No session, no reporter — must not panic.
324        report_if_some(None, 1, Some(2), Some("noop"));
325    }
326
327    #[test]
328    fn report_if_some_publishes_when_present() {
329        let session = fresh_session();
330        let mut rx = session.subscribe_events();
331        let reporter = ProgressReporter::new(session.clone(), ProgressToken(json!("t")));
332        report_if_some(Some(&reporter), 1, Some(1), Some("hi"));
333        let event = rx.try_recv().expect("event published");
334        assert_eq!(event.data["params"]["progress"], json!(1));
335    }
336}