Skip to main content

construct/tools/
progress.rs

1//! Progress-notification hook for long-running tools.
2//!
3//! This module provides an additive, no-op-by-default surface that tools can
4//! use to emit progress events during execution. Callers that care about
5//! progress (e.g. a future MCP server wrapping Construct tools and forwarding
6//! `notifications/progress`) can supply a real [`ProgressSink`]; everyone else
7//! gets a [`NoopProgressSink`] and pays no cost.
8//!
9//! # Design
10//!
11//! The [`Tool`](crate::tools::Tool) trait gained a new default method,
12//! [`Tool::execute_with_progress`], that takes a `&dyn ProgressSink` alongside
13//! the existing `args`. Its default body simply calls
14//! [`Tool::execute`](crate::tools::Tool::execute) and drops the sink, so the
15//! ~93 existing tool implementations compile and behave exactly as before.
16//! Tool authors that want to emit progress opt in by overriding
17//! `execute_with_progress`.
18//!
19//! # MCP alignment
20//!
21//! The [`ProgressSink::notify`] shape (`progress_token`, `progress`,
22//! `total: Option<u64>`, `message: Option<&str>`) mirrors the MCP
23//! `notifications/progress` payload so that a future adapter can forward
24//! events verbatim.
25
26use std::sync::Arc;
27use std::sync::atomic::{AtomicU64, Ordering};
28
29/// Opaque token identifying a single progress stream.
30///
31/// Tools receive (or allocate) a token and tag every [`ProgressSink::notify`]
32/// call with it; MCP callers use it to correlate notifications with the
33/// originating request. The representation is intentionally simple (`u64`)
34/// so the hot path stays alloc-free.
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
36pub struct ProgressToken(pub u64);
37
38impl ProgressToken {
39    /// Raw token value — useful when serializing into an MCP payload.
40    pub const fn value(self) -> u64 {
41        self.0
42    }
43}
44
45/// Sink that receives progress emissions from a running tool.
46///
47/// Implementations must be cheap to clone/share (`Send + Sync`) because the
48/// same sink may be handed to many tool invocations concurrently. All methods
49/// take `&self`; internal synchronization is the sink's responsibility.
50pub trait ProgressSink: Send + Sync {
51    /// Allocate a fresh progress token for a new operation.
52    ///
53    /// The default implementation hands out monotonically increasing tokens
54    /// starting at 0; real sinks (e.g. MCP) will typically override this to
55    /// mint tokens that match an inbound request ID.
56    fn new_token(&self) -> ProgressToken {
57        // Fall back to a process-local counter. Real sinks should override.
58        static COUNTER: AtomicU64 = AtomicU64::new(0);
59        ProgressToken(COUNTER.fetch_add(1, Ordering::Relaxed))
60    }
61
62    /// Emit a progress event.
63    ///
64    /// - `token`: correlates this event with an operation.
65    /// - `progress`: current work completed (monotonically non-decreasing).
66    /// - `total`: optional total units of work, if known up front.
67    /// - `message`: optional human-readable status string.
68    fn notify(
69        &self,
70        token: ProgressToken,
71        progress: u64,
72        total: Option<u64>,
73        message: Option<&str>,
74    );
75}
76
77/// Zero-cost sink used whenever a caller does not care about progress.
78///
79/// `notify` is a no-op and will be inlined away by the optimizer.
80#[derive(Debug, Default, Clone, Copy)]
81pub struct NoopProgressSink;
82
83impl ProgressSink for NoopProgressSink {
84    #[inline]
85    fn notify(
86        &self,
87        _token: ProgressToken,
88        _progress: u64,
89        _total: Option<u64>,
90        _message: Option<&str>,
91    ) {
92        // Intentionally empty.
93    }
94}
95
96/// Process-wide default no-op sink, handed out when no sink is supplied.
97///
98/// Returned as `&'static dyn ProgressSink` so callers can pass it without
99/// allocating.
100pub fn noop_sink() -> &'static dyn ProgressSink {
101    static SINK: NoopProgressSink = NoopProgressSink;
102    &SINK
103}
104
105/// Ergonomic scoped handle: binds a [`ProgressToken`] to a [`ProgressSink`]
106/// so tool code can emit without repeating the token argument.
107///
108/// Construct one at the top of a long-running tool's `execute_with_progress`
109/// and call [`ProgressHandle::update`] as work proceeds.
110pub struct ProgressHandle<'a> {
111    sink: &'a dyn ProgressSink,
112    token: ProgressToken,
113    total: Option<u64>,
114}
115
116impl<'a> ProgressHandle<'a> {
117    /// Create a handle for a new operation on `sink` with an optional known total.
118    pub fn new(sink: &'a dyn ProgressSink, total: Option<u64>) -> Self {
119        let token = sink.new_token();
120        Self { sink, token, total }
121    }
122
123    /// Create a handle reusing an existing token (e.g. one supplied by the caller).
124    pub fn with_token(
125        sink: &'a dyn ProgressSink,
126        token: ProgressToken,
127        total: Option<u64>,
128    ) -> Self {
129        Self { sink, token, total }
130    }
131
132    /// Token backing this handle.
133    pub fn token(&self) -> ProgressToken {
134        self.token
135    }
136
137    /// Emit a progress update.
138    pub fn update(&self, progress: u64, message: Option<&str>) {
139        self.sink.notify(self.token, progress, self.total, message);
140    }
141}
142
143/// Shared/owned sink alias, for callers that need to stash one in a struct.
144pub type SharedProgressSink = Arc<dyn ProgressSink>;
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149    use crate::tools::Tool;
150    use crate::tools::traits::ToolResult;
151    use async_trait::async_trait;
152    use std::sync::Mutex;
153
154    /// Recording sink used to assert that emissions really flow through the
155    /// new trait entry point.
156    #[derive(Default)]
157    struct RecordingSink {
158        events: Mutex<Vec<(u64, u64, Option<u64>, Option<String>)>>,
159    }
160
161    impl ProgressSink for RecordingSink {
162        fn notify(
163            &self,
164            token: ProgressToken,
165            progress: u64,
166            total: Option<u64>,
167            message: Option<&str>,
168        ) {
169            self.events.lock().unwrap().push((
170                token.value(),
171                progress,
172                total,
173                message.map(str::to_owned),
174            ));
175        }
176    }
177
178    #[test]
179    fn noop_sink_swallows_emissions() {
180        let sink = NoopProgressSink;
181        // Should be safe to call arbitrarily many times without side effects.
182        let t = sink.new_token();
183        sink.notify(t, 0, Some(10), Some("starting"));
184        sink.notify(t, 5, Some(10), None);
185        sink.notify(t, 10, Some(10), Some("done"));
186
187        // noop_sink() helper returns a usable static sink too.
188        let s = noop_sink();
189        s.notify(ProgressToken(42), 1, None, None);
190    }
191
192    /// Tool that overrides `execute_with_progress` and emits a couple of
193    /// events. Used to prove the new entry point wires through to the sink.
194    struct ProgressyTool;
195
196    #[async_trait]
197    impl Tool for ProgressyTool {
198        fn name(&self) -> &str {
199            "progressy"
200        }
201        fn description(&self) -> &str {
202            "emits progress for tests"
203        }
204        fn parameters_schema(&self) -> serde_json::Value {
205            serde_json::json!({ "type": "object" })
206        }
207        async fn execute(&self, _args: serde_json::Value) -> anyhow::Result<ToolResult> {
208            // Non-progress path returns the same thing so behaviour is identical
209            // when a caller skips the progress entry point.
210            Ok(ToolResult {
211                success: true,
212                output: "ok".into(),
213                error: None,
214            })
215        }
216        async fn execute_with_progress(
217            &self,
218            args: serde_json::Value,
219            sink: &dyn ProgressSink,
220        ) -> anyhow::Result<ToolResult> {
221            let handle = ProgressHandle::new(sink, Some(2));
222            handle.update(1, Some("halfway"));
223            handle.update(2, Some("finished"));
224            self.execute(args).await
225        }
226    }
227
228    #[tokio::test]
229    async fn overridden_execute_with_progress_forwards_to_sink() {
230        let sink = RecordingSink::default();
231        let tool = ProgressyTool;
232
233        let result = tool
234            .execute_with_progress(serde_json::json!({}), &sink)
235            .await
236            .unwrap();
237
238        assert!(result.success);
239        let events = sink.events.lock().unwrap();
240        assert_eq!(events.len(), 2);
241        assert_eq!(events[0].1, 1);
242        assert_eq!(events[0].2, Some(2));
243        assert_eq!(events[0].3.as_deref(), Some("halfway"));
244        assert_eq!(events[1].1, 2);
245        assert_eq!(events[1].3.as_deref(), Some("finished"));
246    }
247
248    /// Tool that does NOT override `execute_with_progress`. The default impl
249    /// must fall through to `execute` and never touch the sink.
250    struct LegacyTool;
251
252    #[async_trait]
253    impl Tool for LegacyTool {
254        fn name(&self) -> &str {
255            "legacy"
256        }
257        fn description(&self) -> &str {
258            "legacy tool, no progress"
259        }
260        fn parameters_schema(&self) -> serde_json::Value {
261            serde_json::json!({ "type": "object" })
262        }
263        async fn execute(&self, _args: serde_json::Value) -> anyhow::Result<ToolResult> {
264            Ok(ToolResult {
265                success: true,
266                output: "legacy-ok".into(),
267                error: None,
268            })
269        }
270    }
271
272    #[tokio::test]
273    async fn default_execute_with_progress_ignores_sink() {
274        let sink = RecordingSink::default();
275        let tool = LegacyTool;
276
277        let result = tool
278            .execute_with_progress(serde_json::json!({}), &sink)
279            .await
280            .unwrap();
281
282        assert!(result.success);
283        assert_eq!(result.output, "legacy-ok");
284        assert!(sink.events.lock().unwrap().is_empty());
285    }
286}