deepseek/agent/builtin_tools/
monitor.rs1use std::process::Stdio;
13use std::time::Duration;
14
15use async_trait::async_trait;
16use serde_json::{json, Value};
17use tokio::io::{AsyncBufReadExt, BufReader};
18use tokio::process::Command;
19use tokio::time::timeout;
20
21use crate::agent::tool::{Tool, ToolDefinition};
22
23const DEFAULT_TIMEOUT_MS: u64 = 300_000; const MAX_TIMEOUT_MS: u64 = 3_600_000; const MAX_LINES_RETURNED: usize = 1000;
26
27pub struct MonitorTool;
28
29#[async_trait]
30impl Tool for MonitorTool {
31 fn name(&self) -> &str {
32 "Monitor"
33 }
34
35 fn definition(&self) -> ToolDefinition {
36 ToolDefinition {
37 name: self.name().to_string(),
38 description: "Run a shell command and stream its stdout. Returns when the \
39 process exits or `timeout_ms` elapses (default 5 min, max 1 h). \
40 Lines beyond 1000 are truncated. Use this instead of polling \
41 when you want to react to events (CI status changes, log \
42 ERROR lines, file writes via inotifywait)."
43 .into(),
44 parameters: json!({
45 "type": "object",
46 "properties": {
47 "command": { "type": "string" },
48 "description": {
49 "type": "string",
50 "description": "Short human-readable label, surfaced in events."
51 },
52 "timeout_ms": {
53 "type": "integer",
54 "minimum": 1000,
55 "maximum": MAX_TIMEOUT_MS,
56 },
57 },
58 "required": ["command"]
59 }),
60 }
61 }
62
63 async fn call_json(&self, args: Value) -> Result<String, String> {
64 let command = args
65 .get("command")
66 .and_then(Value::as_str)
67 .ok_or_else(|| "Monitor: missing string `command`".to_string())?;
68 let label = args
69 .get("description")
70 .and_then(Value::as_str)
71 .unwrap_or("monitor");
72 let timeout_ms = args
73 .get("timeout_ms")
74 .and_then(Value::as_u64)
75 .unwrap_or(DEFAULT_TIMEOUT_MS)
76 .min(MAX_TIMEOUT_MS);
77
78 let mut child = Command::new("/bin/sh")
79 .arg("-c")
80 .arg(command)
81 .stdout(Stdio::piped())
82 .stderr(Stdio::piped())
83 .spawn()
84 .map_err(|e| format!("Monitor: spawn failed: {e}"))?;
85
86 let stdout = child
87 .stdout
88 .take()
89 .ok_or_else(|| "Monitor: stdout pipe missing".to_string())?;
90 let mut lines = BufReader::new(stdout).lines();
91
92 let mut collected: Vec<String> = Vec::new();
93 let mut truncated = false;
94
95 let read_loop = async {
96 while let Some(line) = lines.next_line().await.transpose() {
97 match line {
98 Ok(s) => {
99 if collected.len() >= MAX_LINES_RETURNED {
100 truncated = true;
101 continue;
104 }
105 collected.push(s);
106 }
107 Err(e) => {
108 return Err(format!("Monitor: read error: {e}"));
109 }
110 }
111 }
112 Ok::<(), String>(())
113 };
114
115 let result = timeout(Duration::from_millis(timeout_ms), read_loop).await;
116
117 let timed_out = result.is_err();
118 if timed_out {
119 let _ = child.start_kill();
121 } else if let Ok(Err(e)) = result {
122 return Err(e);
123 }
124
125 let exit = match child.wait().await {
127 Ok(s) => s.code().unwrap_or(-1),
128 Err(_) => -1,
129 };
130
131 Ok(serde_json::to_string(&json!({
132 "label": label,
133 "exit_code": exit,
134 "lines": collected,
135 "truncated": truncated,
136 "timed_out": timed_out,
137 }))
138 .unwrap())
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145
146 #[tokio::test]
147 async fn echoes_a_few_lines() {
148 let tool = MonitorTool;
149 let args = json!({
150 "command": "printf 'a\\nb\\nc\\n'",
151 "description": "test echo",
152 "timeout_ms": 5000
153 });
154 let raw = tool.call_json(args).await.unwrap();
155 let v: Value = serde_json::from_str(&raw).unwrap();
156 let lines: Vec<String> = v["lines"]
157 .as_array()
158 .unwrap()
159 .iter()
160 .map(|x| x.as_str().unwrap().to_string())
161 .collect();
162 assert_eq!(lines, vec!["a", "b", "c"]);
163 assert_eq!(v["exit_code"].as_i64().unwrap(), 0);
164 assert_eq!(v["timed_out"].as_bool().unwrap(), false);
165 }
166
167 #[tokio::test]
168 async fn timeout_terminates_runaway() {
169 let tool = MonitorTool;
170 let args = json!({
171 "command": "while true; do echo tick; sleep 1; done",
172 "timeout_ms": 1500
173 });
174 let raw = tool.call_json(args).await.unwrap();
175 let v: Value = serde_json::from_str(&raw).unwrap();
176 assert_eq!(v["timed_out"].as_bool().unwrap(), true);
177 }
178}