Skip to main content

deepseek/agent/builtin_tools/
monitor.rs

1//! `Monitor` tool — long-running watcher.
2//!
3//! Spawns a shell command and reads its stdout line-by-line until the process
4//! exits or `timeout_ms` elapses. Returns the collected lines as a single
5//! result string. Use this for tasks where you'd otherwise poll repeatedly
6//! (`tail -f log | grep ERROR`, `inotifywait -m`, status pollers).
7//!
8//! Note: the `Tool` trait returns one result per invocation; we don't stream
9//! events back to the loop in v0.3.0. The lines are concatenated and returned
10//! when the watcher exits.
11
12use std::process::Stdio;
13use std::time::Duration;
14
15use async_trait::async_trait;
16use serde_json::{json, Value};
17use tokio::io::{AsyncBufReadExt, BufReader};
18use tokio::process::Command;
19use tokio::time::timeout;
20
21use crate::agent::tool::{Tool, ToolDefinition};
22
23const DEFAULT_TIMEOUT_MS: u64 = 300_000; // 5 minutes
24const MAX_TIMEOUT_MS: u64 = 3_600_000; // 1 hour
25const MAX_LINES_RETURNED: usize = 1000;
26
27pub struct MonitorTool;
28
29#[async_trait]
30impl Tool for MonitorTool {
31    fn name(&self) -> &str {
32        "Monitor"
33    }
34
35    fn definition(&self) -> ToolDefinition {
36        ToolDefinition {
37            name: self.name().to_string(),
38            description: "Run a shell command and stream its stdout. Returns when the \
39                          process exits or `timeout_ms` elapses (default 5 min, max 1 h). \
40                          Lines beyond 1000 are truncated. Use this instead of polling \
41                          when you want to react to events (CI status changes, log \
42                          ERROR lines, file writes via inotifywait)."
43                .into(),
44            parameters: json!({
45                "type": "object",
46                "properties": {
47                    "command": { "type": "string" },
48                    "description": {
49                        "type": "string",
50                        "description": "Short human-readable label, surfaced in events."
51                    },
52                    "timeout_ms": {
53                        "type": "integer",
54                        "minimum": 1000,
55                        "maximum": MAX_TIMEOUT_MS,
56                    },
57                },
58                "required": ["command"]
59            }),
60        }
61    }
62
63    async fn call_json(&self, args: Value) -> Result<String, String> {
64        let command = args
65            .get("command")
66            .and_then(Value::as_str)
67            .ok_or_else(|| "Monitor: missing string `command`".to_string())?;
68        let label = args
69            .get("description")
70            .and_then(Value::as_str)
71            .unwrap_or("monitor");
72        let timeout_ms = args
73            .get("timeout_ms")
74            .and_then(Value::as_u64)
75            .unwrap_or(DEFAULT_TIMEOUT_MS)
76            .min(MAX_TIMEOUT_MS);
77
78        let mut child = Command::new("/bin/sh")
79            .arg("-c")
80            .arg(command)
81            .stdout(Stdio::piped())
82            .stderr(Stdio::piped())
83            .spawn()
84            .map_err(|e| format!("Monitor: spawn failed: {e}"))?;
85
86        let stdout = child
87            .stdout
88            .take()
89            .ok_or_else(|| "Monitor: stdout pipe missing".to_string())?;
90        let mut lines = BufReader::new(stdout).lines();
91
92        let mut collected: Vec<String> = Vec::new();
93        let mut truncated = false;
94
95        let read_loop = async {
96            while let Some(line) = lines.next_line().await.transpose() {
97                match line {
98                    Ok(s) => {
99                        if collected.len() >= MAX_LINES_RETURNED {
100                            truncated = true;
101                            // Keep draining so the child doesn't block on a
102                            // full pipe, but stop appending.
103                            continue;
104                        }
105                        collected.push(s);
106                    }
107                    Err(e) => {
108                        return Err(format!("Monitor: read error: {e}"));
109                    }
110                }
111            }
112            Ok::<(), String>(())
113        };
114
115        let result = timeout(Duration::from_millis(timeout_ms), read_loop).await;
116
117        let timed_out = result.is_err();
118        if timed_out {
119            // Best-effort: try to kill the child so it doesn't outlive us.
120            let _ = child.start_kill();
121        } else if let Ok(Err(e)) = result {
122            return Err(e);
123        }
124
125        // Reap exit status (non-blocking after kill).
126        let exit = match child.wait().await {
127            Ok(s) => s.code().unwrap_or(-1),
128            Err(_) => -1,
129        };
130
131        Ok(serde_json::to_string(&json!({
132            "label": label,
133            "exit_code": exit,
134            "lines": collected,
135            "truncated": truncated,
136            "timed_out": timed_out,
137        }))
138        .unwrap())
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    #[tokio::test]
147    async fn echoes_a_few_lines() {
148        let tool = MonitorTool;
149        let args = json!({
150            "command": "printf 'a\\nb\\nc\\n'",
151            "description": "test echo",
152            "timeout_ms": 5000
153        });
154        let raw = tool.call_json(args).await.unwrap();
155        let v: Value = serde_json::from_str(&raw).unwrap();
156        let lines: Vec<String> = v["lines"]
157            .as_array()
158            .unwrap()
159            .iter()
160            .map(|x| x.as_str().unwrap().to_string())
161            .collect();
162        assert_eq!(lines, vec!["a", "b", "c"]);
163        assert_eq!(v["exit_code"].as_i64().unwrap(), 0);
164        assert_eq!(v["timed_out"].as_bool().unwrap(), false);
165    }
166
167    #[tokio::test]
168    async fn timeout_terminates_runaway() {
169        let tool = MonitorTool;
170        let args = json!({
171            "command": "while true; do echo tick; sleep 1; done",
172            "timeout_ms": 1500
173        });
174        let raw = tool.call_json(args).await.unwrap();
175        let v: Value = serde_json::from_str(&raw).unwrap();
176        assert_eq!(v["timed_out"].as_bool().unwrap(), true);
177    }
178}