1#![allow(
2 clippy::cast_precision_loss,
3 clippy::cast_possible_truncation,
4 clippy::cast_possible_wrap,
5 clippy::cast_sign_loss,
6 clippy::match_same_arms
7)]
8use std::sync::OnceLock;
20
21use serde_json::{Value, json};
22
23use synwire_core::BoxFuture;
24use synwire_core::error::{SynwireError, ToolError};
25use synwire_core::tools::{Tool, ToolOutput, ToolResultStatus, ToolSchema};
26
27use crate::process_registry::ProcessStatus;
28use crate::visibility::ProcessVisibilityScope;
29
30fn tool_err(msg: impl Into<String>) -> SynwireError {
33 SynwireError::Tool(ToolError::InvocationFailed {
34 message: msg.into(),
35 })
36}
37
38fn validation_err(msg: impl Into<String>) -> SynwireError {
39 SynwireError::Tool(ToolError::ValidationFailed {
40 message: msg.into(),
41 })
42}
43
44fn parse_pid(input: &Value) -> Result<u32, SynwireError> {
45 input["pid"]
46 .as_u64()
47 .and_then(|v| u32::try_from(v).ok())
48 .ok_or_else(|| validation_err("'pid' must be a positive integer"))
49}
50
51pub struct ListProcessesTool {
58 scope: ProcessVisibilityScope,
59 schema: OnceLock<ToolSchema>,
60}
61
62impl ListProcessesTool {
63 pub const fn new(scope: ProcessVisibilityScope) -> Self {
65 Self {
66 scope,
67 schema: OnceLock::new(),
68 }
69 }
70}
71
72impl Tool for ListProcessesTool {
73 fn name(&self) -> &'static str {
74 "list_processes"
75 }
76
77 fn description(&self) -> &'static str {
78 "List all running processes spawned by this agent and any visible sub-agents. \
79 Returns PID, command, agent label, and live CPU/memory statistics when available."
80 }
81
82 fn schema(&self) -> &ToolSchema {
83 self.schema.get_or_init(|| ToolSchema {
84 name: "list_processes".into(),
85 description: self.description().into(),
86 parameters: json!({
87 "type": "object",
88 "properties": {},
89 "required": []
90 }),
91 })
92 }
93
94 fn invoke(&self, _input: Value) -> BoxFuture<'_, Result<ToolOutput, SynwireError>> {
95 Box::pin(async move {
96 let visible = self.scope.visible_running().await;
97 let processes: Vec<serde_json::Value> = visible
98 .iter()
99 .map(|(label, r)| {
100 json!({
101 "pid": r.pid,
102 "command": format!("{} {}", r.command, r.args.join(" ")),
103 "agent": label.as_deref().unwrap_or("self"),
104 "cpu_pct": r.cpu_usage_ns.map(|ns| ns as f64 / 1_000_000_000.0),
105 "mem_bytes": r.memory_bytes,
106 "state": "running",
107 })
108 })
109 .collect();
110
111 let content =
112 serde_json::to_string_pretty(&processes).map_err(|e| tool_err(e.to_string()))?;
113 Ok(ToolOutput {
114 content,
115 ..Default::default()
116 })
117 })
118 }
119}
120
121pub struct KillProcessTool {
128 scope: ProcessVisibilityScope,
129 schema: OnceLock<ToolSchema>,
130}
131
132impl KillProcessTool {
133 pub const fn new(scope: ProcessVisibilityScope) -> Self {
135 Self {
136 scope,
137 schema: OnceLock::new(),
138 }
139 }
140}
141
142impl Tool for KillProcessTool {
143 fn name(&self) -> &'static str {
144 "kill_process"
145 }
146
147 fn description(&self) -> &'static str {
148 "Send a signal to a process spawned by this agent. Defaults to SIGTERM. \
149 Use SIGKILL only if the process does not respond to SIGTERM. \
150 Only processes tracked by this agent can be targeted (not sub-agent processes)."
151 }
152
153 fn schema(&self) -> &ToolSchema {
154 self.schema.get_or_init(|| ToolSchema {
155 name: "kill_process".into(),
156 description: self.description().into(),
157 parameters: json!({
158 "type": "object",
159 "properties": {
160 "pid": {
161 "type": "integer",
162 "description": "The process ID to signal.",
163 "minimum": 1
164 },
165 "signal": {
166 "type": "string",
167 "description": "Signal name (SIGTERM, SIGKILL, SIGINT, SIGHUP). Defaults to SIGTERM.",
168 "enum": ["SIGTERM", "SIGKILL", "SIGINT", "SIGHUP", "SIGSTOP", "SIGCONT"],
169 "default": "SIGTERM"
170 }
171 },
172 "required": ["pid"]
173 }),
174 })
175 }
176
177 fn invoke(&self, input: Value) -> BoxFuture<'_, Result<ToolOutput, SynwireError>> {
178 Box::pin(async move {
179 let pid = parse_pid(&input)?;
180 let signal_name = input["signal"].as_str().unwrap_or("SIGTERM").to_uppercase();
181
182 {
184 let reg = self.scope.own.read().await;
185 if reg.get(pid).is_none() {
186 return Ok(ToolOutput {
187 content: format!("process {pid} is not tracked by this agent"),
188 status: ToolResultStatus::Failure,
189 ..Default::default()
190 });
191 }
192 if reg.get(pid).map(|r| &r.status) != Some(&ProcessStatus::Running) {
193 return Ok(ToolOutput {
194 content: format!("process {pid} is not running"),
195 status: ToolResultStatus::Failure,
196 ..Default::default()
197 });
198 }
199 }
200
201 send_signal_to_pid(pid, &signal_name)?;
202
203 let signal_num = signal_name_to_number(&signal_name);
204 {
205 let mut reg = self.scope.own.write().await;
206 reg.mark_signaled(pid, signal_num);
207 }
208
209 Ok(ToolOutput {
210 content: format!("sent {signal_name} to process {pid}"),
211 ..Default::default()
212 })
213 })
214 }
215}
216
217pub struct ProcessStatsTool {
221 scope: ProcessVisibilityScope,
222 schema: OnceLock<ToolSchema>,
223}
224
225impl ProcessStatsTool {
226 pub const fn new(scope: ProcessVisibilityScope) -> Self {
228 Self {
229 scope,
230 schema: OnceLock::new(),
231 }
232 }
233}
234
235impl Tool for ProcessStatsTool {
236 fn name(&self) -> &'static str {
237 "process_stats"
238 }
239
240 fn description(&self) -> &'static str {
241 "Retrieve detailed statistics for a specific process tracked by this agent \
242 or a visible sub-agent, including PID, command, start time, CPU usage, \
243 memory usage, and current status."
244 }
245
246 fn schema(&self) -> &ToolSchema {
247 self.schema.get_or_init(|| ToolSchema {
248 name: "process_stats".into(),
249 description: self.description().into(),
250 parameters: json!({
251 "type": "object",
252 "properties": {
253 "pid": {
254 "type": "integer",
255 "description": "The process ID to query.",
256 "minimum": 1
257 }
258 },
259 "required": ["pid"]
260 }),
261 })
262 }
263
264 fn invoke(&self, input: Value) -> BoxFuture<'_, Result<ToolOutput, SynwireError>> {
265 Box::pin(async move {
266 let pid = parse_pid(&input)?;
267
268 let (agent_label, record) =
269 self.scope.find(pid).await.ok_or_else(|| {
270 tool_err(format!("process {pid} is not visible to this agent"))
271 })?;
272
273 let status_str = match &record.status {
274 ProcessStatus::Running => "running".to_string(),
275 ProcessStatus::Exited { code } => format!("exited({code})"),
276 ProcessStatus::Signaled { signal } => format!("signaled({signal})"),
277 ProcessStatus::Unknown => "unknown".to_string(),
278 };
279
280 let stats = json!({
281 "pid": record.pid,
282 "command": record.command,
283 "args": record.args,
284 "agent": agent_label.as_deref().unwrap_or("self"),
285 "started_at": record.started_at.to_rfc3339(),
286 "status": status_str,
287 "cpu_usage_ns": record.cpu_usage_ns,
288 "memory_bytes": record.memory_bytes,
289 "cgroup_path": record.cgroup_path,
290 "has_captured_output": record.output.is_some(),
291 });
292
293 let content =
294 serde_json::to_string_pretty(&stats).map_err(|e| tool_err(e.to_string()))?;
295 Ok(ToolOutput {
296 content,
297 ..Default::default()
298 })
299 })
300 }
301}
302
303pub struct WaitForProcessTool {
312 scope: ProcessVisibilityScope,
313 schema: OnceLock<ToolSchema>,
314}
315
316impl WaitForProcessTool {
317 pub const fn new(scope: ProcessVisibilityScope) -> Self {
319 Self {
320 scope,
321 schema: OnceLock::new(),
322 }
323 }
324}
325
326impl Tool for WaitForProcessTool {
327 fn name(&self) -> &'static str {
328 "wait_for_process"
329 }
330
331 fn description(&self) -> &'static str {
332 "Wait for a process to finish and return its exit status. \
333 Defaults to 30 second timeout. Returns the final status \
334 (exit code or signal) when the process completes, or \
335 'timeout' if it is still running after the deadline."
336 }
337
338 fn schema(&self) -> &ToolSchema {
339 self.schema.get_or_init(|| ToolSchema {
340 name: "wait_for_process".into(),
341 description: self.description().into(),
342 parameters: json!({
343 "type": "object",
344 "properties": {
345 "pid": {
346 "type": "integer",
347 "description": "The process ID to wait for.",
348 "minimum": 1
349 },
350 "timeout_ms": {
351 "type": "integer",
352 "description": "Maximum time to wait in milliseconds (default: 30000).",
353 "minimum": 100,
354 "default": 30000
355 }
356 },
357 "required": ["pid"]
358 }),
359 })
360 }
361
362 fn invoke(&self, input: Value) -> BoxFuture<'_, Result<ToolOutput, SynwireError>> {
363 Box::pin(async move {
364 let pid = parse_pid(&input)?;
365 let timeout_ms = input["timeout_ms"].as_u64().unwrap_or(30_000);
366
367 let start = std::time::Instant::now();
368 loop {
369 if let Some((_label, record)) = self.scope.find(pid).await {
370 match &record.status {
371 ProcessStatus::Running => { }
372 ProcessStatus::Exited { code } => {
373 return Ok(ToolOutput {
374 content: format!("exited with code {code}"),
375 ..Default::default()
376 });
377 }
378 ProcessStatus::Signaled { signal } => {
379 return Ok(ToolOutput {
380 content: format!("killed by signal {signal}"),
381 ..Default::default()
382 });
383 }
384 ProcessStatus::Unknown => {
385 return Ok(ToolOutput {
386 content: "unknown status".to_string(),
387 ..Default::default()
388 });
389 }
390 }
391 } else {
392 return Ok(ToolOutput {
393 content: format!("process {pid} is not visible to this agent"),
394 status: ToolResultStatus::Failure,
395 ..Default::default()
396 });
397 }
398
399 if start.elapsed().as_millis() as u64 >= timeout_ms {
400 return Ok(ToolOutput {
401 content: format!(
402 "timeout: process {pid} still running after {timeout_ms}ms"
403 ),
404 status: ToolResultStatus::Failure,
405 ..Default::default()
406 });
407 }
408
409 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
410 }
411 })
412 }
413}
414
415pub struct ReadProcessOutputTool {
422 scope: ProcessVisibilityScope,
423 schema: OnceLock<ToolSchema>,
424}
425
426impl ReadProcessOutputTool {
427 pub const fn new(scope: ProcessVisibilityScope) -> Self {
429 Self {
430 scope,
431 schema: OnceLock::new(),
432 }
433 }
434}
435
436impl Tool for ReadProcessOutputTool {
437 fn name(&self) -> &'static str {
438 "read_process_output"
439 }
440
441 fn description(&self) -> &'static str {
442 "Read captured stdout or stderr from a process. Only available for \
443 non-interactive processes spawned with output capture. For combined \
444 mode, 'stdout' returns the combined stream. Returns the output content \
445 as a string."
446 }
447
448 fn schema(&self) -> &ToolSchema {
449 self.schema.get_or_init(|| ToolSchema {
450 name: "read_process_output".into(),
451 description: self.description().into(),
452 parameters: json!({
453 "type": "object",
454 "properties": {
455 "pid": {
456 "type": "integer",
457 "description": "The process ID whose output to read.",
458 "minimum": 1
459 },
460 "stream": {
461 "type": "string",
462 "description": "Which stream to read: 'stdout' (or combined output), 'stderr'.",
463 "enum": ["stdout", "stderr"],
464 "default": "stdout"
465 }
466 },
467 "required": ["pid"]
468 }),
469 })
470 }
471
472 fn invoke(&self, input: Value) -> BoxFuture<'_, Result<ToolOutput, SynwireError>> {
473 Box::pin(async move {
474 let pid = parse_pid(&input)?;
475 let stream = input["stream"].as_str().unwrap_or("stdout");
476
477 let (_label, record) =
478 self.scope.find(pid).await.ok_or_else(|| {
479 tool_err(format!("process {pid} is not visible to this agent"))
480 })?;
481
482 let captured = record
483 .output
484 .as_ref()
485 .ok_or_else(|| tool_err("no captured output for this process"))?;
486
487 let content = match stream {
488 "stderr" => captured
489 .read_stderr()
490 .map_err(|e| tool_err(e.to_string()))?
491 .unwrap_or_else(|| "(streams are combined — use 'stdout' to read)".to_string()),
492 _ => captured
493 .read_stdout()
494 .map_err(|e| tool_err(e.to_string()))?,
495 };
496
497 Ok(ToolOutput {
498 content,
499 ..Default::default()
500 })
501 })
502 }
503}
504
505#[cfg(target_os = "linux")]
508fn send_signal_to_pid(pid: u32, signal_name: &str) -> Result<(), SynwireError> {
509 use nix::sys::signal::kill;
510 use nix::unistd::Pid;
511
512 let sig = parse_signal(signal_name)?;
513 kill(Pid::from_raw(pid as i32), sig)
514 .map_err(|e| tool_err(format!("failed to send {signal_name} to pid {pid}: {e}")))
515}
516
517#[cfg(target_os = "macos")]
518fn send_signal_to_pid(pid: u32, signal_name: &str) -> Result<(), SynwireError> {
519 use nix::sys::signal::kill;
520 use nix::unistd::Pid;
521
522 let sig = parse_signal(signal_name)?;
523 kill(Pid::from_raw(pid as i32), sig)
524 .map_err(|e| tool_err(format!("failed to send {signal_name} to pid {pid}: {e}")))
525}
526
527#[cfg(not(any(target_os = "linux", target_os = "macos")))]
528fn send_signal_to_pid(_pid: u32, _signal_name: &str) -> Result<(), SynwireError> {
529 Err(tool_err("signal sending not supported on this platform"))
530}
531
532#[cfg(any(target_os = "linux", target_os = "macos"))]
533fn parse_signal(name: &str) -> Result<nix::sys::signal::Signal, SynwireError> {
534 use nix::sys::signal::Signal;
535 match name {
536 "SIGTERM" => Ok(Signal::SIGTERM),
537 "SIGKILL" => Ok(Signal::SIGKILL),
538 "SIGINT" => Ok(Signal::SIGINT),
539 "SIGHUP" => Ok(Signal::SIGHUP),
540 "SIGSTOP" => Ok(Signal::SIGSTOP),
541 "SIGCONT" => Ok(Signal::SIGCONT),
542 other => Err(validation_err(format!("unknown signal: {other}"))),
543 }
544}
545
546fn signal_name_to_number(name: &str) -> i32 {
547 match name {
548 "SIGTERM" => 15,
549 "SIGKILL" => 9,
550 "SIGINT" => 2,
551 "SIGHUP" => 1,
552 "SIGSTOP" => 19,
553 "SIGCONT" => 18,
554 _ => 15,
555 }
556}