Skip to main content

agent_procs/daemon/
process_manager.rs

1use crate::daemon::log_writer::{self, OutputLine};
2use crate::paths;
3use crate::protocol::{ProcessInfo, ProcessState, Response, Stream as ProtoStream};
4use crate::session::IdCounter;
5use std::collections::HashMap;
6use std::process::Stdio;
7use std::time::{Duration, Instant};
8use tokio::process::{Child, Command};
9use tokio::sync::broadcast;
10
11const DEFAULT_MAX_LOG_BYTES: u64 = 50 * 1024 * 1024; // 50MB
12
13pub struct ManagedProcess {
14    pub name: String,
15    pub id: String,
16    pub command: String,
17    pub cwd: Option<String>,
18    pub env: HashMap<String, String>,
19    pub child: Option<Child>,
20    pub pid: u32,
21    pub started_at: Instant,
22    pub exit_code: Option<i32>,
23}
24
25pub struct ProcessManager {
26    processes: HashMap<String, ManagedProcess>,
27    id_counter: IdCounter,
28    session: String,
29    pub output_tx: broadcast::Sender<OutputLine>,
30}
31
32impl ProcessManager {
33    pub fn new(session: &str) -> Self {
34        let (output_tx, _) = broadcast::channel(1024);
35        Self {
36            processes: HashMap::new(),
37            id_counter: IdCounter::new(),
38            session: session.to_string(),
39            output_tx,
40        }
41    }
42
43    pub async fn spawn_process(
44        &mut self,
45        command: &str,
46        name: Option<String>,
47        cwd: Option<&str>,
48        env: Option<&HashMap<String, String>>,
49    ) -> Response {
50        let id = self.id_counter.next_id();
51        let name = name.unwrap_or_else(|| id.clone());
52
53        if self.processes.contains_key(&name) {
54            return Response::Error {
55                code: 1,
56                message: format!("process already exists: {}", name),
57            };
58        }
59
60        let log_dir = paths::log_dir(&self.session);
61        let _ = std::fs::create_dir_all(&log_dir);
62
63        let mut cmd = Command::new("sh");
64        cmd.arg("-c")
65            .arg(command)
66            .stdout(Stdio::piped())
67            .stderr(Stdio::piped());
68        if let Some(dir) = cwd {
69            cmd.current_dir(dir);
70        }
71        if let Some(env_vars) = env {
72            cmd.envs(env_vars);
73        }
74        // Put child in its own process group so we can signal the entire tree
75        unsafe {
76            cmd.pre_exec(|| {
77                nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), nix::unistd::Pid::from_raw(0))
78                    .map_err(std::io::Error::other)?;
79                Ok(())
80            });
81        }
82
83        let mut child = match cmd.spawn() {
84            Ok(c) => c,
85            Err(e) => {
86                return Response::Error {
87                    code: 1,
88                    message: format!("failed to spawn: {}", e),
89                }
90            }
91        };
92
93        let pid = child.id().unwrap_or(0);
94
95        // Spawn output capture tasks via log_writer
96        if let Some(stdout) = child.stdout.take() {
97            let tx = self.output_tx.clone();
98            let pname = name.clone();
99            let path = log_dir.join(format!("{}.stdout", name));
100            tokio::spawn(async move {
101                log_writer::capture_output(
102                    stdout,
103                    &path,
104                    &pname,
105                    ProtoStream::Stdout,
106                    tx,
107                    DEFAULT_MAX_LOG_BYTES,
108                )
109                .await;
110            });
111        }
112        if let Some(stderr) = child.stderr.take() {
113            let tx = self.output_tx.clone();
114            let pname = name.clone();
115            let path = log_dir.join(format!("{}.stderr", name));
116            tokio::spawn(async move {
117                log_writer::capture_output(
118                    stderr,
119                    &path,
120                    &pname,
121                    ProtoStream::Stderr,
122                    tx,
123                    DEFAULT_MAX_LOG_BYTES,
124                )
125                .await;
126            });
127        }
128
129        self.processes.insert(
130            name.clone(),
131            ManagedProcess {
132                name: name.clone(),
133                id: id.clone(),
134                command: command.to_string(),
135                cwd: cwd.map(|s| s.to_string()),
136                env: env.cloned().unwrap_or_default(),
137                child: Some(child),
138                pid,
139                started_at: Instant::now(),
140                exit_code: None,
141            },
142        );
143
144        Response::RunOk { name, id, pid }
145    }
146
147    pub async fn stop_process(&mut self, target: &str) -> Response {
148        let proc = match self.find_mut(target) {
149            Some(p) => p,
150            None => {
151                return Response::Error {
152                    code: 2,
153                    message: format!("process not found: {}", target),
154                }
155            }
156        };
157
158        if let Some(ref child) = proc.child {
159            let raw_pid = child.id().unwrap_or(0) as i32;
160            if raw_pid > 0 {
161                // Signal the entire process group (child PID == PGID due to setpgid in pre_exec)
162                let pgid = nix::unistd::Pid::from_raw(raw_pid);
163                let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGTERM);
164            }
165        }
166
167        // Wait up to 10s for graceful exit, then SIGKILL
168        if let Some(ref mut child) = proc.child {
169            let wait_result = tokio::time::timeout(Duration::from_secs(10), child.wait()).await;
170
171            match wait_result {
172                Ok(Ok(status)) => {
173                    proc.exit_code = status.code();
174                }
175                _ => {
176                    // Timed out or error — force kill the process group
177                    let raw_pid = proc.pid as i32;
178                    if raw_pid > 0 {
179                        let pgid = nix::unistd::Pid::from_raw(raw_pid);
180                        let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGKILL);
181                    }
182                    let _ = child.wait().await;
183                    proc.exit_code = Some(-9);
184                }
185            }
186            proc.child = None;
187        }
188
189        Response::Ok {
190            message: format!("stopped {}", target),
191        }
192    }
193
194    pub async fn stop_all(&mut self) -> Response {
195        let names: Vec<String> = self.processes.keys().cloned().collect();
196        for name in names {
197            self.stop_process(&name).await;
198        }
199        self.processes.clear();
200        Response::Ok {
201            message: "all processes stopped".into(),
202        }
203    }
204
205    pub async fn restart_process(&mut self, target: &str) -> Response {
206        let (command, name, cwd, env) = match self.find(target) {
207            Some(p) => (
208                p.command.clone(),
209                p.name.clone(),
210                p.cwd.clone(),
211                p.env.clone(),
212            ),
213            None => {
214                return Response::Error {
215                    code: 2,
216                    message: format!("process not found: {}", target),
217                }
218            }
219        };
220        self.stop_process(target).await;
221        self.processes.remove(&name);
222        let env = if env.is_empty() { None } else { Some(env) };
223        self.spawn_process(&command, Some(name), cwd.as_deref(), env.as_ref())
224            .await
225    }
226
227    pub fn status(&mut self) -> Response {
228        self.refresh_exit_states();
229        let mut infos: Vec<ProcessInfo> = self
230            .processes
231            .values()
232            .map(|p| ProcessInfo {
233                name: p.name.clone(),
234                id: p.id.clone(),
235                pid: p.pid,
236                state: if p.child.is_some() {
237                    ProcessState::Running
238                } else {
239                    ProcessState::Exited
240                },
241                exit_code: p.exit_code,
242                uptime_secs: if p.child.is_some() {
243                    Some(p.started_at.elapsed().as_secs())
244                } else {
245                    None
246                },
247                command: p.command.clone(),
248            })
249            .collect();
250        infos.sort_by(|a, b| a.name.cmp(&b.name));
251        Response::Status { processes: infos }
252    }
253
254    pub fn is_process_exited(&mut self, target: &str) -> Option<Option<i32>> {
255        self.refresh_exit_states();
256        self.find(target)
257            .map(|p| if p.child.is_none() { p.exit_code } else { None })
258    }
259
260    fn refresh_exit_states(&mut self) {
261        for proc in self.processes.values_mut() {
262            if proc.child.is_some() && proc.exit_code.is_none() {
263                if let Some(ref mut child) = proc.child {
264                    if let Ok(Some(status)) = child.try_wait() {
265                        proc.exit_code = status.code();
266                        proc.child = None;
267                    }
268                }
269            }
270        }
271    }
272
273    pub fn has_process(&self, target: &str) -> bool {
274        self.find(target).is_some()
275    }
276
277    fn find(&self, target: &str) -> Option<&ManagedProcess> {
278        self.processes
279            .get(target)
280            .or_else(|| self.processes.values().find(|p| p.id == target))
281    }
282
283    fn find_mut(&mut self, target: &str) -> Option<&mut ManagedProcess> {
284        if self.processes.contains_key(target) {
285            self.processes.get_mut(target)
286        } else {
287            self.processes.values_mut().find(|p| p.id == target)
288        }
289    }
290}