construct/tools/progress.rs
1//! Progress-notification hook for long-running tools.
2//!
3//! This module provides an additive, no-op-by-default surface that tools can
4//! use to emit progress events during execution. Callers that care about
5//! progress (e.g. a future MCP server wrapping Construct tools and forwarding
6//! `notifications/progress`) can supply a real [`ProgressSink`]; everyone else
7//! gets a [`NoopProgressSink`] and pays no cost.
8//!
9//! # Design
10//!
11//! The [`Tool`](crate::tools::Tool) trait gained a new default method,
12//! [`Tool::execute_with_progress`], that takes a `&dyn ProgressSink` alongside
13//! the existing `args`. Its default body simply calls
14//! [`Tool::execute`](crate::tools::Tool::execute) and drops the sink, so the
15//! ~93 existing tool implementations compile and behave exactly as before.
16//! Tool authors that want to emit progress opt in by overriding
17//! `execute_with_progress`.
18//!
19//! # MCP alignment
20//!
21//! The [`ProgressSink::notify`] shape (`progress_token`, `progress`,
22//! `total: Option<u64>`, `message: Option<&str>`) mirrors the MCP
23//! `notifications/progress` payload so that a future adapter can forward
24//! events verbatim.
25
26use std::sync::Arc;
27use std::sync::atomic::{AtomicU64, Ordering};
28
29/// Opaque token identifying a single progress stream.
30///
31/// Tools receive (or allocate) a token and tag every [`ProgressSink::notify`]
32/// call with it; MCP callers use it to correlate notifications with the
33/// originating request. The representation is intentionally simple (`u64`)
34/// so the hot path stays alloc-free.
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
36pub struct ProgressToken(pub u64);
37
38impl ProgressToken {
39 /// Raw token value — useful when serializing into an MCP payload.
40 pub const fn value(self) -> u64 {
41 self.0
42 }
43}
44
45/// Sink that receives progress emissions from a running tool.
46///
47/// Implementations must be cheap to clone/share (`Send + Sync`) because the
48/// same sink may be handed to many tool invocations concurrently. All methods
49/// take `&self`; internal synchronization is the sink's responsibility.
50pub trait ProgressSink: Send + Sync {
51 /// Allocate a fresh progress token for a new operation.
52 ///
53 /// The default implementation hands out monotonically increasing tokens
54 /// starting at 0; real sinks (e.g. MCP) will typically override this to
55 /// mint tokens that match an inbound request ID.
56 fn new_token(&self) -> ProgressToken {
57 // Fall back to a process-local counter. Real sinks should override.
58 static COUNTER: AtomicU64 = AtomicU64::new(0);
59 ProgressToken(COUNTER.fetch_add(1, Ordering::Relaxed))
60 }
61
62 /// Emit a progress event.
63 ///
64 /// - `token`: correlates this event with an operation.
65 /// - `progress`: current work completed (monotonically non-decreasing).
66 /// - `total`: optional total units of work, if known up front.
67 /// - `message`: optional human-readable status string.
68 fn notify(
69 &self,
70 token: ProgressToken,
71 progress: u64,
72 total: Option<u64>,
73 message: Option<&str>,
74 );
75}
76
77/// Zero-cost sink used whenever a caller does not care about progress.
78///
79/// `notify` is a no-op and will be inlined away by the optimizer.
80#[derive(Debug, Default, Clone, Copy)]
81pub struct NoopProgressSink;
82
83impl ProgressSink for NoopProgressSink {
84 #[inline]
85 fn notify(
86 &self,
87 _token: ProgressToken,
88 _progress: u64,
89 _total: Option<u64>,
90 _message: Option<&str>,
91 ) {
92 // Intentionally empty.
93 }
94}
95
96/// Process-wide default no-op sink, handed out when no sink is supplied.
97///
98/// Returned as `&'static dyn ProgressSink` so callers can pass it without
99/// allocating.
100pub fn noop_sink() -> &'static dyn ProgressSink {
101 static SINK: NoopProgressSink = NoopProgressSink;
102 &SINK
103}
104
105/// Ergonomic scoped handle: binds a [`ProgressToken`] to a [`ProgressSink`]
106/// so tool code can emit without repeating the token argument.
107///
108/// Construct one at the top of a long-running tool's `execute_with_progress`
109/// and call [`ProgressHandle::update`] as work proceeds.
110pub struct ProgressHandle<'a> {
111 sink: &'a dyn ProgressSink,
112 token: ProgressToken,
113 total: Option<u64>,
114}
115
116impl<'a> ProgressHandle<'a> {
117 /// Create a handle for a new operation on `sink` with an optional known total.
118 pub fn new(sink: &'a dyn ProgressSink, total: Option<u64>) -> Self {
119 let token = sink.new_token();
120 Self { sink, token, total }
121 }
122
123 /// Create a handle reusing an existing token (e.g. one supplied by the caller).
124 pub fn with_token(
125 sink: &'a dyn ProgressSink,
126 token: ProgressToken,
127 total: Option<u64>,
128 ) -> Self {
129 Self { sink, token, total }
130 }
131
132 /// Token backing this handle.
133 pub fn token(&self) -> ProgressToken {
134 self.token
135 }
136
137 /// Emit a progress update.
138 pub fn update(&self, progress: u64, message: Option<&str>) {
139 self.sink.notify(self.token, progress, self.total, message);
140 }
141}
142
143/// Shared/owned sink alias, for callers that need to stash one in a struct.
144pub type SharedProgressSink = Arc<dyn ProgressSink>;
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149 use crate::tools::Tool;
150 use crate::tools::traits::ToolResult;
151 use async_trait::async_trait;
152 use std::sync::Mutex;
153
154 /// Recording sink used to assert that emissions really flow through the
155 /// new trait entry point.
156 #[derive(Default)]
157 struct RecordingSink {
158 events: Mutex<Vec<(u64, u64, Option<u64>, Option<String>)>>,
159 }
160
161 impl ProgressSink for RecordingSink {
162 fn notify(
163 &self,
164 token: ProgressToken,
165 progress: u64,
166 total: Option<u64>,
167 message: Option<&str>,
168 ) {
169 self.events.lock().unwrap().push((
170 token.value(),
171 progress,
172 total,
173 message.map(str::to_owned),
174 ));
175 }
176 }
177
178 #[test]
179 fn noop_sink_swallows_emissions() {
180 let sink = NoopProgressSink;
181 // Should be safe to call arbitrarily many times without side effects.
182 let t = sink.new_token();
183 sink.notify(t, 0, Some(10), Some("starting"));
184 sink.notify(t, 5, Some(10), None);
185 sink.notify(t, 10, Some(10), Some("done"));
186
187 // noop_sink() helper returns a usable static sink too.
188 let s = noop_sink();
189 s.notify(ProgressToken(42), 1, None, None);
190 }
191
192 /// Tool that overrides `execute_with_progress` and emits a couple of
193 /// events. Used to prove the new entry point wires through to the sink.
194 struct ProgressyTool;
195
196 #[async_trait]
197 impl Tool for ProgressyTool {
198 fn name(&self) -> &str {
199 "progressy"
200 }
201 fn description(&self) -> &str {
202 "emits progress for tests"
203 }
204 fn parameters_schema(&self) -> serde_json::Value {
205 serde_json::json!({ "type": "object" })
206 }
207 async fn execute(&self, _args: serde_json::Value) -> anyhow::Result<ToolResult> {
208 // Non-progress path returns the same thing so behaviour is identical
209 // when a caller skips the progress entry point.
210 Ok(ToolResult {
211 success: true,
212 output: "ok".into(),
213 error: None,
214 })
215 }
216 async fn execute_with_progress(
217 &self,
218 args: serde_json::Value,
219 sink: &dyn ProgressSink,
220 ) -> anyhow::Result<ToolResult> {
221 let handle = ProgressHandle::new(sink, Some(2));
222 handle.update(1, Some("halfway"));
223 handle.update(2, Some("finished"));
224 self.execute(args).await
225 }
226 }
227
228 #[tokio::test]
229 async fn overridden_execute_with_progress_forwards_to_sink() {
230 let sink = RecordingSink::default();
231 let tool = ProgressyTool;
232
233 let result = tool
234 .execute_with_progress(serde_json::json!({}), &sink)
235 .await
236 .unwrap();
237
238 assert!(result.success);
239 let events = sink.events.lock().unwrap();
240 assert_eq!(events.len(), 2);
241 assert_eq!(events[0].1, 1);
242 assert_eq!(events[0].2, Some(2));
243 assert_eq!(events[0].3.as_deref(), Some("halfway"));
244 assert_eq!(events[1].1, 2);
245 assert_eq!(events[1].3.as_deref(), Some("finished"));
246 }
247
248 /// Tool that does NOT override `execute_with_progress`. The default impl
249 /// must fall through to `execute` and never touch the sink.
250 struct LegacyTool;
251
252 #[async_trait]
253 impl Tool for LegacyTool {
254 fn name(&self) -> &str {
255 "legacy"
256 }
257 fn description(&self) -> &str {
258 "legacy tool, no progress"
259 }
260 fn parameters_schema(&self) -> serde_json::Value {
261 serde_json::json!({ "type": "object" })
262 }
263 async fn execute(&self, _args: serde_json::Value) -> anyhow::Result<ToolResult> {
264 Ok(ToolResult {
265 success: true,
266 output: "legacy-ok".into(),
267 error: None,
268 })
269 }
270 }
271
272 #[tokio::test]
273 async fn default_execute_with_progress_ignores_sink() {
274 let sink = RecordingSink::default();
275 let tool = LegacyTool;
276
277 let result = tool
278 .execute_with_progress(serde_json::json!({}), &sink)
279 .await
280 .unwrap();
281
282 assert!(result.success);
283 assert_eq!(result.output, "legacy-ok");
284 assert!(sink.events.lock().unwrap().is_empty());
285 }
286}