solo_api/mcp_progress.rs
1// SPDX-License-Identifier: Apache-2.0
2
3//! v0.11.0 P3 — per-tool progress events for long-running MCP tool calls.
4//!
5//! When an MCP `tools/call` request carries `_meta.progressToken` in its
6//! params, the dispatcher builds a [`ProgressReporter`] wired to the
7//! caller's [`crate::mcp_session::SessionState`]. The reporter is passed
8//! into the tool handler, which calls [`ProgressReporter::report`] at
9//! sensible checkpoints. Each call publishes a JSON-RPC
10//! `notifications/progress` envelope onto the session's broadcast
11//! channel, where the GET SSE stream subscriber forwards it to the
12//! client.
13//!
14//! ## Wire shape (Decision C — MCP spec verbatim)
15//!
16//! ```json
17//! {
18//! "jsonrpc": "2.0",
19//! "method": "notifications/progress",
20//! "params": {
21//! "progressToken": "<echo of the client's token>",
22//! "progress": 5,
23//! "total": 12,
24//! "message": "Processing cluster 5/12"
25//! }
26//! }
27//! ```
28//!
29//! The `progressToken` is whatever shape the client sent — string OR
30//! number per the spec. [`ProgressToken`] is `serde_json::Value`-backed
31//! so we preserve the wire shape losslessly on echo.
32//!
33//! ## Stdio backward compat
34//!
35//! [`ProgressReporter`] is constructed only for HTTP requests that have
36//! both an attached `SessionState` (planted by the session middleware)
37//! and a `_meta.progressToken` in their `tools/call` params. The stdio
38//! path (rmcp `call_tool`) and the unit tests that drive `dispatch_tool`
39//! directly pass `None` — handlers see `Option<ProgressReporter>` and
40//! skip reporting silently when it's `None`.
41//!
42//! ## Backpressure
43//!
44//! [`crate::mcp_session::SessionState::publish_event`] is lossy on the
45//! broadcast channel (slow subscribers see a `lagged` event on
46//! reconnect) but lossless to the replay buffer for `Last-Event-ID`
47//! resume. Progress events are inherently best-effort — a client that
48//! falls behind 256 events still sees the final tool result on the
49//! POST side.
50
51use std::sync::Arc;
52
53use serde::{Deserialize, Serialize};
54
55use crate::mcp_session::{McpEventKind, SessionState};
56
57/// JSON-RPC method name for `notifications/progress` per the MCP
58/// Streamable HTTP transport spec. Held as a const so any code that
59/// emits the envelope agrees on the exact wire spelling — the GET
60/// stream client matches on this string to route into its progress
61/// handler.
62pub const MCP_NOTIFICATION_PROGRESS_METHOD: &str = "notifications/progress";
63
64/// Threshold above which `memory_search_docs` calls emit progress events.
65/// Below 100 results the search completes fast enough that progress
66/// notifications add wire-overhead with no UX benefit (Decision C).
67pub const MCP_SEARCH_DOCS_PROGRESS_TOP_K_THRESHOLD: u32 = 100;
68
69/// Threshold above which `memory_remember_batch` calls emit per-item
70/// embedding progress. Below 50 items the batch completes inside one
71/// or two embedder round-trips and progress notifications add
72/// wire-overhead with no UX benefit (Decision C).
73pub const MCP_REMEMBER_BATCH_PROGRESS_ITEM_THRESHOLD: usize = 50;
74
75/// How often (in items processed) `memory_remember_batch` emits a
76/// progress event during the embed loop. Plan §6 P3 spec: "every 25
77/// rows". Set to 25 so a 51-item batch emits at items 25 + 50 + the
78/// final terminal event.
79pub const MCP_REMEMBER_BATCH_PROGRESS_EMIT_EVERY: usize = 25;
80
81/// The progress token an MCP client passes in `tools/call` params
82/// `_meta.progressToken`. Spec allows EITHER a string OR a number;
83/// the server echoes back whatever shape the client sent. Held as
84/// `serde_json::Value` so we don't lose the original wire shape on
85/// the round-trip.
86///
87/// Created from the request `params._meta.progressToken` JSON value
88/// via [`ProgressToken::from_meta`]; the dispatcher's `handle_tools_call`
89/// is the sole caller.
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct ProgressToken(pub serde_json::Value);
92
93impl ProgressToken {
94 /// Read `_meta.progressToken` from the `tools/call` params JSON
95 /// blob. Returns `None` if either the `_meta` key is absent, the
96 /// `progressToken` key is absent, or the value is `null`. The spec
97 /// only permits string and number — any other JSON type returns
98 /// `None` so unexpected shapes are treated as "client did not
99 /// request progress".
100 pub fn from_meta(params: &serde_json::Value) -> Option<Self> {
101 let meta = params.get("_meta")?;
102 let token = meta.get("progressToken")?;
103 match token {
104 serde_json::Value::Null => None,
105 serde_json::Value::String(_) | serde_json::Value::Number(_) => {
106 Some(Self(token.clone()))
107 }
108 // Reject objects/arrays/bools — the spec only permits
109 // string OR number.
110 _ => None,
111 }
112 }
113
114 /// The wire-shape value to echo back in `params.progressToken` of
115 /// every emitted `notifications/progress` envelope.
116 pub fn as_value(&self) -> &serde_json::Value {
117 &self.0
118 }
119}
120
121/// A handle that long-running tool handlers call to emit progress
122/// notifications during their work. Wraps the
123/// [`crate::mcp_session::SessionState`] that owns the broadcast channel
124/// + replay buffer; calls into `SessionState::publish_event` for the
125/// actual fan-out.
126///
127/// Cloneable so handlers can move it across `tokio::spawn` boundaries
128/// (the underlying `Arc<SessionState>` + token clone are cheap).
129///
130/// Constructed only when the dispatch path has BOTH an attached
131/// `Arc<SessionState>` AND a parsed `_meta.progressToken`. Callers
132/// (`dispatch_tool` + downstream handlers) accept `Option<ProgressReporter>`
133/// and skip reporting silently when it's `None`.
134#[derive(Debug, Clone)]
135pub struct ProgressReporter {
136 session: Arc<SessionState>,
137 token: ProgressToken,
138}
139
140impl ProgressReporter {
141 /// Build a reporter wired to a session + the client's token. Sole
142 /// caller today is `McpDispatcher::handle_tools_call` (and tests).
143 pub fn new(session: Arc<SessionState>, token: ProgressToken) -> Self {
144 Self { session, token }
145 }
146
147 /// Echo the original wire-shape token back to the client. Used by
148 /// the GET stream sink to correlate progress events with in-flight
149 /// `tools/call` requests on the JSON-RPC reply side.
150 pub fn token(&self) -> &ProgressToken {
151 &self.token
152 }
153
154 /// Publish one `notifications/progress` envelope to the session's
155 /// event stream. The envelope:
156 ///
157 /// ```json
158 /// {
159 /// "jsonrpc": "2.0",
160 /// "method": "notifications/progress",
161 /// "params": {
162 /// "progressToken": <client's original token>,
163 /// "progress": <current>,
164 /// "total": <expected total or null>,
165 /// "message": <optional human-readable string or null>
166 /// }
167 /// }
168 /// ```
169 ///
170 /// Returns the per-session event id assigned by
171 /// `SessionState::publish_event` so callers can correlate the
172 /// emit with the resulting stream entry. Lossy on the broadcast
173 /// side (no live subscriber = silent drop) but lossless to the
174 /// replay buffer.
175 pub fn report(&self, progress: u64, total: Option<u64>, message: Option<&str>) -> u64 {
176 let mut params = serde_json::Map::with_capacity(4);
177 params.insert("progressToken".to_string(), self.token.0.clone());
178 params.insert(
179 "progress".to_string(),
180 serde_json::Value::Number(progress.into()),
181 );
182 if let Some(t) = total {
183 params.insert("total".to_string(), serde_json::Value::Number(t.into()));
184 }
185 if let Some(m) = message {
186 params.insert(
187 "message".to_string(),
188 serde_json::Value::String(m.to_string()),
189 );
190 }
191 let envelope = serde_json::json!({
192 "jsonrpc": "2.0",
193 "method": MCP_NOTIFICATION_PROGRESS_METHOD,
194 "params": serde_json::Value::Object(params),
195 });
196 self.session.publish_event(McpEventKind::Progress, envelope)
197 }
198}
199
200/// Helper for handlers: emit a progress event if-and-only-if a
201/// reporter is present. Reads cleaner at every checkpoint than the
202/// `if let Some(r) = …` pattern repeated 4x per handler.
203///
204/// The `_unused` reply is intentional: callers don't generally care
205/// about the assigned event id; the `_unused = …` pattern in the
206/// handlers documents the intent without polluting the call site.
207pub fn report_if_some(
208 reporter: Option<&ProgressReporter>,
209 progress: u64,
210 total: Option<u64>,
211 message: Option<&str>,
212) {
213 if let Some(r) = reporter {
214 let _id = r.report(progress, total, message);
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use serde_json::json;
222 use solo_core::TenantId;
223
224 fn fresh_session() -> Arc<SessionState> {
225 Arc::new(SessionState::new(TenantId::default_tenant(), None))
226 }
227
228 #[test]
229 fn progress_token_from_meta_reads_string_token() {
230 let params = json!({
231 "name": "memory_ingest_document",
232 "_meta": { "progressToken": "abc-123" },
233 });
234 let token = ProgressToken::from_meta(¶ms).expect("string token");
235 assert_eq!(token.as_value(), &json!("abc-123"));
236 }
237
238 #[test]
239 fn progress_token_from_meta_reads_number_token() {
240 let params = json!({
241 "name": "memory_search_docs",
242 "_meta": { "progressToken": 42 },
243 });
244 let token = ProgressToken::from_meta(¶ms).expect("number token");
245 assert_eq!(token.as_value(), &json!(42));
246 }
247
248 #[test]
249 fn progress_token_from_meta_returns_none_without_meta() {
250 let params = json!({"name": "memory_inspect"});
251 assert!(ProgressToken::from_meta(¶ms).is_none());
252 }
253
254 #[test]
255 fn progress_token_from_meta_returns_none_for_null_token() {
256 let params = json!({"_meta": {"progressToken": null}});
257 assert!(ProgressToken::from_meta(¶ms).is_none());
258 }
259
260 #[test]
261 fn progress_token_from_meta_rejects_object_token() {
262 let params = json!({"_meta": {"progressToken": {"nope": 1}}});
263 assert!(ProgressToken::from_meta(¶ms).is_none());
264 }
265
266 #[test]
267 fn progress_reporter_publishes_spec_shape() {
268 let session = fresh_session();
269 let mut rx = session.subscribe_events();
270 let reporter = ProgressReporter::new(session.clone(), ProgressToken(json!("tok-1")));
271 let _id = reporter.report(3, Some(12), Some("crunching"));
272 let event = rx.try_recv().expect("event published");
273 assert_eq!(event.event, McpEventKind::Progress);
274 // Spec shape: jsonrpc + method + params{progressToken, progress, total, message}.
275 assert_eq!(event.data["jsonrpc"], json!("2.0"));
276 assert_eq!(
277 event.data["method"],
278 json!(MCP_NOTIFICATION_PROGRESS_METHOD)
279 );
280 assert_eq!(event.data["params"]["progressToken"], json!("tok-1"));
281 assert_eq!(event.data["params"]["progress"], json!(3));
282 assert_eq!(event.data["params"]["total"], json!(12));
283 assert_eq!(event.data["params"]["message"], json!("crunching"));
284 }
285
286 #[test]
287 fn progress_reporter_omits_total_and_message_when_absent() {
288 let session = fresh_session();
289 let mut rx = session.subscribe_events();
290 let reporter = ProgressReporter::new(session.clone(), ProgressToken(json!(7)));
291 let _id = reporter.report(1, None, None);
292 let event = rx.try_recv().expect("event published");
293 assert_eq!(event.data["params"]["progressToken"], json!(7));
294 assert_eq!(event.data["params"]["progress"], json!(1));
295 // Per the spec, total and message are optional — absent keys
296 // (not null) when the handler doesn't have a value to emit.
297 assert!(
298 event.data["params"].get("total").is_none(),
299 "total key must be omitted when None"
300 );
301 assert!(
302 event.data["params"].get("message").is_none(),
303 "message key must be omitted when None"
304 );
305 }
306
307 #[test]
308 fn progress_reporter_emits_monotonic_event_ids() {
309 let session = fresh_session();
310 let reporter = ProgressReporter::new(session.clone(), ProgressToken(json!("t")));
311 let id_a = reporter.report(1, None, None);
312 let id_b = reporter.report(2, None, None);
313 let id_c = reporter.report(3, None, None);
314 assert!(id_a < id_b);
315 assert!(id_b < id_c);
316 }
317
318 #[test]
319 fn report_if_some_noop_when_none() {
320 // No session, no reporter — must not panic.
321 report_if_some(None, 1, Some(2), Some("noop"));
322 }
323
324 #[test]
325 fn report_if_some_publishes_when_present() {
326 let session = fresh_session();
327 let mut rx = session.subscribe_events();
328 let reporter = ProgressReporter::new(session.clone(), ProgressToken(json!("t")));
329 report_if_some(Some(&reporter), 1, Some(1), Some("hi"));
330 let event = rx.try_recv().expect("event published");
331 assert_eq!(event.data["params"]["progress"], json!(1));
332 }
333}