synwire_agent/sandbox/
process.rs1use std::collections::HashMap;
4
5use tokio::sync::Mutex;
6
7use serde::{Deserialize, Serialize};
8use synwire_core::BoxFuture;
9use synwire_core::vfs::error::VfsError;
10use synwire_core::vfs::types::{ExecuteResponse, JobInfo, ProcessInfo};
11use tokio::process::Command;
12use uuid::Uuid;
13
14#[derive(Debug)]
16pub struct ProcessManager {
17 jobs: Mutex<HashMap<String, JobInfo>>,
18 bg_pids: Mutex<HashMap<String, u32>>,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct ProcessList {
25 pub processes: Vec<ProcessInfo>,
27}
28
29impl Default for ProcessManager {
30 fn default() -> Self {
31 Self::new()
32 }
33}
34
35impl ProcessManager {
36 #[must_use]
38 pub fn new() -> Self {
39 Self {
40 jobs: Mutex::new(HashMap::new()),
41 bg_pids: Mutex::new(HashMap::new()),
42 }
43 }
44
45 pub fn list_processes(&self) -> BoxFuture<'_, Result<ProcessList, VfsError>> {
47 Box::pin(async move {
48 let output = Command::new("ps")
49 .args(["-eo", "pid,ppid,comm,pcpu,rss,stat"])
50 .output()
51 .await
52 .map_err(VfsError::Io)?;
53
54 let text = String::from_utf8_lossy(&output.stdout);
55 let mut processes = Vec::new();
56 for line in text.lines().skip(1) {
57 let cols: Vec<&str> = line.split_whitespace().collect();
58 if cols.len() < 6 {
59 continue;
60 }
61 let pid = cols[0].parse::<u32>().unwrap_or(0);
62 let parent_pid = cols[1].parse::<u32>().ok();
63 let command = cols[2].to_string();
64 let cpu_pct = cols[3].parse::<f32>().ok();
65 let mem_kb = cols[4].parse::<u64>().ok();
66 let state = cols[5].to_string();
67 processes.push(ProcessInfo {
68 pid,
69 command,
70 cpu_pct,
71 mem_bytes: mem_kb.map(|k| k * 1024),
72 parent_pid,
73 state,
74 });
75 }
76 Ok(ProcessList { processes })
77 })
78 }
79
80 pub fn kill_process(&self, pid: u32) -> BoxFuture<'_, Result<(), VfsError>> {
82 Box::pin(async move {
83 let output = Command::new("kill")
84 .arg(pid.to_string())
85 .output()
86 .await
87 .map_err(VfsError::Io)?;
88 if !output.status.success() {
89 return Err(VfsError::NotFound(format!("process {pid}")));
90 }
91 Ok(())
92 })
93 }
94
95 pub fn spawn_background<'a>(
97 &'a self,
98 cmd: &'a str,
99 args: &'a [String],
100 ) -> BoxFuture<'a, Result<String, VfsError>> {
101 Box::pin(async move {
102 let job_id = Uuid::new_v4().to_string();
103 let child = Command::new(cmd).args(args).spawn().map_err(VfsError::Io)?;
104
105 let pid = child.id();
106 let _ = self.jobs.lock().await.insert(
107 job_id.clone(),
108 JobInfo {
109 id: job_id.clone(),
110 pid,
111 command: format!("{cmd} {}", args.join(" ")),
112 status: "running".to_string(),
113 },
114 );
115
116 if let Some(pid) = pid {
117 let _ = self.bg_pids.lock().await.insert(job_id.clone(), pid);
118 }
119 Ok(job_id)
120 })
121 }
122
123 pub async fn list_jobs(&self) -> Vec<JobInfo> {
125 self.jobs.lock().await.values().cloned().collect()
126 }
127
128 pub fn execute<'a>(
130 &'a self,
131 cmd: &'a str,
132 args: &'a [String],
133 ) -> BoxFuture<'a, Result<ExecuteResponse, VfsError>> {
134 Box::pin(async move {
135 let output = Command::new(cmd)
136 .args(args)
137 .output()
138 .await
139 .map_err(VfsError::Io)?;
140 Ok(ExecuteResponse {
141 exit_code: output.status.code().unwrap_or(-1),
142 stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
143 stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
144 })
145 })
146 }
147}
148
149#[cfg(test)]
150#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
151mod tests {
152 use super::*;
153
154 #[tokio::test]
155 async fn test_list_processes_returns_data() {
156 let backend = ProcessManager::new();
157 let list = backend.list_processes().await.expect("list_processes");
158 assert!(!list.processes.is_empty());
160 }
161
162 #[tokio::test]
163 async fn test_spawn_background_job() {
164 let backend = ProcessManager::new();
165 let job_id = backend
166 .spawn_background("sleep", &["60".to_string()])
167 .await
168 .expect("spawn");
169 let jobs = backend.list_jobs().await;
170 assert!(jobs.iter().any(|j| j.id == job_id));
171 let pid = backend.bg_pids.lock().await.get(&job_id).copied();
173 if let Some(pid) = pid {
174 let _ = backend.kill_process(pid).await;
175 }
176 }
177
178 #[tokio::test]
179 async fn test_execute_returns_output() {
180 let backend = ProcessManager::new();
181 let resp = backend
182 .execute("echo", &["hello".to_string()])
183 .await
184 .expect("execute");
185 assert_eq!(resp.exit_code, 0);
186 assert!(resp.stdout.contains("hello"));
187 }
188}