meerkat-tools 0.4.13

Tool validation and dispatch for Meerkat
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
//! Wait tool for pausing execution

use crate::builtin::{BuiltinTool, BuiltinToolError, ToolOutput};
use async_trait::async_trait;
use meerkat_core::ToolDef;
use meerkat_core::time_compat::{Duration, Instant};
use meerkat_core::wait_interrupt::WaitInterruptReceiver;
use serde::Deserialize;
use serde_json::{Value, json};

// Re-export for backward compatibility
pub use meerkat_core::wait_interrupt::WaitInterrupt;

/// Maximum wait time in seconds (1 minute).
///
/// With comms interrupt wired in, waits are interrupted early when peer
/// messages arrive. Note: budget checks happen at loop boundaries (CallingLlm),
/// not during tool dispatch, so a long wait can overshoot max_duration by the
/// full requested delay in non-comms sessions or when no interrupt arrives.
/// 60s keeps budget overshoot bounded to a single minute.
const MAX_WAIT_SECONDS: f64 = 60.0;

/// Tool for pausing execution for a specified duration
///
/// This tool allows agents to wait before continuing, which is essential for:
/// - Waiting between status checks on async operations
/// - Rate limiting when interacting with external services
/// - Coordinating timing-sensitive workflows
///
/// The wait can be interrupted by incoming messages if an interrupt receiver is configured.
#[derive(Debug, Clone)]
pub struct WaitTool {
    /// Optional interrupt receiver - when a message arrives, wait is interrupted
    interrupt_rx: Option<WaitInterruptReceiver>,
}

impl WaitTool {
    /// Create a new WaitTool without interrupt support
    pub fn new() -> Self {
        Self { interrupt_rx: None }
    }

    /// Create a WaitTool with an interrupt receiver
    ///
    /// When a message is sent on the channel, the wait will be interrupted early
    /// and return with status "interrupted" along with the reason.
    pub fn with_interrupt(rx: WaitInterruptReceiver) -> Self {
        Self {
            interrupt_rx: Some(rx),
        }
    }
}

impl Default for WaitTool {
    fn default() -> Self {
        Self::new()
    }
}

#[derive(Debug, Deserialize, schemars::JsonSchema)]
struct WaitArgs {
    /// Duration to wait in seconds (max 60)
    #[schemars(
        description = "Number of seconds to wait (0.1 to 60)",
        range(min = 0.1, max = 60.0)
    )]
    seconds: f64,
}

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl BuiltinTool for WaitTool {
    fn name(&self) -> &'static str {
        "wait"
    }

    fn def(&self) -> ToolDef {
        ToolDef {
            name: "wait".into(),
            description: "Pause execution for the specified number of seconds. Use this to wait between status checks on async operations like sub-agents. Wait is interrupted early when peer messages arrive. Maximum wait time is 60 seconds (1 minute).".into(),
            input_schema: crate::schema::schema_for::<WaitArgs>(),
        }
    }

    fn default_enabled(&self) -> bool {
        true // Utility tools enabled by default
    }

    async fn call(&self, args: Value) -> Result<ToolOutput, BuiltinToolError> {
        #[cfg(target_arch = "wasm32")]
        use crate::tokio::time::sleep;
        #[cfg(not(target_arch = "wasm32"))]
        use tokio::time::sleep;

        let args: WaitArgs = serde_json::from_value(args)
            .map_err(|e| BuiltinToolError::invalid_args(format!("Invalid arguments: {e}")))?;

        // Clamp to valid range
        let seconds = args.seconds.clamp(0.0, MAX_WAIT_SECONDS);
        let duration = Duration::from_secs_f64(seconds);
        let start = Instant::now();

        // If we have an interrupt receiver, race between sleep and interrupt
        if let Some(ref rx) = self.interrupt_rx {
            let mut rx = rx.clone();
            // Mark the current value as seen - we only want to react to NEW interrupts
            // that arrive AFTER we start waiting, not stale ones from previous waits
            rx.borrow_and_update();

            // Use futures::future::select to avoid tokio::select! macro
            // which requires the tokio crate to be directly named in Cargo.toml.
            let interrupted = {
                let sleep_fut = sleep(duration);
                futures::pin_mut!(sleep_fut);

                let changed_fut = rx.changed();
                futures::pin_mut!(changed_fut);

                match futures::future::select(sleep_fut, changed_fut).await {
                    futures::future::Either::Left(_) => false,
                    futures::future::Either::Right((result, _)) => result.is_ok(),
                }
            };

            if interrupted && let Some(interrupt) = rx.borrow().as_ref() {
                let waited = start.elapsed().as_secs_f64();
                return Ok(ToolOutput::Json(json!({
                    "waited_seconds": waited,
                    "requested_seconds": seconds,
                    "status": "interrupted",
                    "reason": format!("Wait interrupted after {:.1}s: {}", waited, interrupt.reason)
                })));
            }
            // Completed normally (either sleep finished or channel closed without data)
            Ok(ToolOutput::Json(json!({
                "waited_seconds": seconds,
                "status": "complete"
            })))
        } else {
            // No interrupt receiver - just sleep
            sleep(duration).await;
            Ok(ToolOutput::Json(json!({
                "waited_seconds": seconds,
                "status": "complete"
            })))
        }
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
    use super::*;
    use std::time::Instant;

    #[test]
    fn test_wait_tool_name() {
        let tool = WaitTool::new();
        assert_eq!(tool.name(), "wait");
    }

    #[tokio::test]
    async fn test_wait_tool_interrupted_by_message() {
        let (tx, rx) = tokio::sync::watch::channel(None::<WaitInterrupt>);
        let tool = WaitTool::with_interrupt(rx);
        let start = Instant::now();

        // Spawn interrupt after 100ms
        tokio::spawn(async move {
            tokio::time::sleep(Duration::from_millis(100)).await;
            let _ = tx.send(Some(WaitInterrupt {
                reason: "Received message from sub-agent: Task completed".to_string(),
            }));
        });

        let result = tool
            .call(json!({"seconds": 10.0}))
            .await
            .unwrap()
            .into_json()
            .unwrap();

        let elapsed = start.elapsed();
        assert!(
            elapsed < Duration::from_secs(1),
            "Should be interrupted quickly"
        );
        assert_eq!(result["status"], "interrupted");
        assert!(result["waited_seconds"].as_f64().unwrap() < 1.0);
        assert!(result["reason"].as_str().unwrap().contains("sub-agent"));
    }

    #[tokio::test]
    async fn test_wait_tool_completes_without_interrupt() {
        let (_tx, rx) = tokio::sync::watch::channel(None::<WaitInterrupt>);
        let tool = WaitTool::with_interrupt(rx);

        let result = tool
            .call(json!({"seconds": 0.1}))
            .await
            .unwrap()
            .into_json()
            .unwrap();

        assert_eq!(result["status"], "complete");
        assert_eq!(result["waited_seconds"], 0.1);
    }

    #[tokio::test]
    async fn test_wait_tool_without_interrupt_receiver() {
        // Original behavior - no interrupt receiver
        let tool = WaitTool::new();

        let result = tool
            .call(json!({"seconds": 0.1}))
            .await
            .unwrap()
            .into_json()
            .unwrap();

        assert_eq!(result["status"], "complete");
    }

    #[test]
    fn test_wait_tool_default_enabled() {
        let tool = WaitTool::new();
        assert!(tool.default_enabled());
    }

    #[test]
    fn test_wait_tool_def() {
        let tool = WaitTool::new();
        let def = tool.def();
        assert_eq!(def.name, "wait");
        assert!(def.description.contains("Pause execution"));
        assert!(def.input_schema.get("properties").is_some());
    }

    #[tokio::test]
    async fn test_wait_tool_short_wait() {
        let tool = WaitTool::new();
        let start = Instant::now();

        let result = tool
            .call(json!({"seconds": 0.1}))
            .await
            .unwrap()
            .into_json()
            .unwrap();

        let elapsed = start.elapsed();
        assert!(elapsed >= Duration::from_millis(100));
        assert!(elapsed < Duration::from_millis(200)); // Some tolerance

        assert_eq!(result["status"], "complete");
        assert_eq!(result["waited_seconds"], 0.1);
    }

    #[test]
    fn test_wait_tool_clamps_max_value() {
        // Test that values above MAX_WAIT_SECONDS get clamped
        // We can't test the actual wait easily, but we can verify the clamping logic
        let seconds = 9999.0_f64;
        let clamped = seconds.clamp(0.0, MAX_WAIT_SECONDS);
        assert_eq!(clamped, MAX_WAIT_SECONDS);
    }

    #[test]
    fn test_wait_tool_clamps_negative_value() {
        // Test that negative values get clamped to 0
        let seconds = -5.0_f64;
        let clamped = seconds.clamp(0.0, MAX_WAIT_SECONDS);
        assert_eq!(clamped, 0.0);
    }

    #[tokio::test]
    async fn test_wait_tool_invalid_args() {
        let tool = WaitTool::new();

        let result = tool.call(json!({"invalid": "args"})).await;
        assert!(result.is_err());

        let err = result.unwrap_err();
        assert!(matches!(err, BuiltinToolError::InvalidArgs(_)));
    }

    #[tokio::test]
    async fn test_wait_tool_missing_seconds() {
        let tool = WaitTool::new();

        let result = tool.call(json!({})).await;
        assert!(result.is_err());
    }

    #[tokio::test]
    async fn test_wait_tool_stale_interrupt_does_not_affect_subsequent_waits() {
        // Regression test: a previous interrupt should not affect subsequent wait calls
        let (tx, rx) = tokio::sync::watch::channel(None::<WaitInterrupt>);
        let tool = WaitTool::with_interrupt(rx);

        // First: send an interrupt, then call wait - should NOT be interrupted
        tx.send(Some(WaitInterrupt {
            reason: "stale interrupt".to_string(),
        }))
        .unwrap();

        // Small delay to ensure value is set
        tokio::time::sleep(Duration::from_millis(10)).await;

        let start = Instant::now();
        let result = tool
            .call(json!({"seconds": 0.2}))
            .await
            .unwrap()
            .into_json()
            .unwrap();
        let elapsed = start.elapsed();

        // Should complete normally since interrupt was stale (sent before wait started)
        assert_eq!(result["status"], "complete");
        assert!(
            elapsed >= Duration::from_millis(180),
            "Should wait full duration, got {elapsed:?}"
        );
    }

    #[tokio::test]
    async fn test_wait_tool_interrupt_returns_interrupted_status_with_reason() {
        // Validates the exact contract for comms-driven interrupts:
        // when an incoming peer message interrupts a wait, the result must
        // contain status="interrupted" and a reason string.
        let (tx, rx) = tokio::sync::watch::channel(None::<WaitInterrupt>);
        let tool = WaitTool::with_interrupt(rx);

        tokio::spawn(async move {
            tokio::time::sleep(Duration::from_millis(50)).await;
            let _ = tx.send(Some(WaitInterrupt {
                reason: "Incoming peer message".to_string(),
            }));
        });

        let result = tool
            .call(json!({"seconds": 30.0}))
            .await
            .unwrap()
            .into_json()
            .unwrap();

        assert_eq!(result["status"], "interrupted");
        assert!(
            result["reason"]
                .as_str()
                .unwrap()
                .contains("Incoming peer message"),
            "reason must include the interrupt source"
        );
        assert!(
            result["waited_seconds"].as_f64().unwrap() < 1.0,
            "should have been interrupted well before the full wait"
        );
        assert_eq!(result["requested_seconds"], 30.0);
    }

    #[tokio::test]
    async fn test_wait_tool_multiple_sequential_interrupts() {
        // After one interrupt, subsequent waits should still work and be
        // interruptible by new signals.
        let (tx, rx) = tokio::sync::watch::channel(None::<WaitInterrupt>);
        let tool = WaitTool::with_interrupt(rx);

        // First wait + interrupt
        let tx_clone = tx.clone();
        tokio::spawn(async move {
            tokio::time::sleep(Duration::from_millis(50)).await;
            let _ = tx_clone.send(Some(WaitInterrupt {
                reason: "First interrupt".to_string(),
            }));
        });

        let result1 = tool
            .call(json!({"seconds": 10.0}))
            .await
            .unwrap()
            .into_json()
            .unwrap();
        assert_eq!(result1["status"], "interrupted");

        // Second wait + interrupt
        tokio::spawn(async move {
            tokio::time::sleep(Duration::from_millis(50)).await;
            let _ = tx.send(Some(WaitInterrupt {
                reason: "Second interrupt".to_string(),
            }));
        });

        let result2 = tool
            .call(json!({"seconds": 10.0}))
            .await
            .unwrap()
            .into_json()
            .unwrap();
        assert_eq!(result2["status"], "interrupted");
        assert!(
            result2["reason"]
                .as_str()
                .unwrap()
                .contains("Second interrupt")
        );
    }

    #[test]
    fn test_max_wait_seconds_is_60() {
        assert_eq!(MAX_WAIT_SECONDS, 60.0);
        // Verify clamping at the new cap
        let seconds = 120.0_f64;
        let clamped = seconds.clamp(0.0, MAX_WAIT_SECONDS);
        assert_eq!(clamped, 60.0);
    }
}