Skip to main content

agent_procs/daemon/
process_manager.rs

1use crate::daemon::log_writer::{self, OutputLine};
2use crate::paths;
3use crate::protocol::{ProcessInfo, ProcessState, Response, Stream as ProtoStream};
4use crate::session::IdCounter;
5use std::collections::HashMap;
6use std::process::Stdio;
7use std::time::{Duration, Instant};
8use tokio::process::{Child, Command};
9use tokio::sync::broadcast;
10
11const DEFAULT_MAX_LOG_BYTES: u64 = 50 * 1024 * 1024; // 50MB
12
13pub struct ManagedProcess {
14    pub name: String,
15    pub id: String,
16    pub command: String,
17    pub child: Option<Child>,
18    pub pid: u32,
19    pub started_at: Instant,
20    pub exit_code: Option<i32>,
21}
22
23pub struct ProcessManager {
24    processes: HashMap<String, ManagedProcess>,
25    id_counter: IdCounter,
26    session: String,
27    pub output_tx: broadcast::Sender<OutputLine>,
28}
29
30impl ProcessManager {
31    pub fn new(session: &str) -> Self {
32        let (output_tx, _) = broadcast::channel(1024);
33        Self {
34            processes: HashMap::new(),
35            id_counter: IdCounter::new(),
36            session: session.to_string(),
37            output_tx,
38        }
39    }
40
41    pub async fn spawn_process(&mut self, command: &str, name: Option<String>, cwd: Option<&str>, env: Option<&HashMap<String, String>>) -> Response {
42        let id = self.id_counter.next_id();
43        let name = name.unwrap_or_else(|| id.clone());
44
45        if self.processes.contains_key(&name) {
46            return Response::Error { code: 1, message: format!("process already exists: {}", name) };
47        }
48
49        let log_dir = paths::log_dir(&self.session);
50        let _ = std::fs::create_dir_all(&log_dir);
51
52        let mut cmd = Command::new("sh");
53        cmd.arg("-c").arg(command).stdout(Stdio::piped()).stderr(Stdio::piped());
54        if let Some(dir) = cwd {
55            cmd.current_dir(dir);
56        }
57        if let Some(env_vars) = env {
58            cmd.envs(env_vars);
59        }
60        // Put child in its own process group so we can signal the entire tree
61        unsafe {
62            cmd.pre_exec(|| {
63                nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), nix::unistd::Pid::from_raw(0))
64                    .map_err(std::io::Error::other)?;
65                Ok(())
66            });
67        }
68
69        let mut child = match cmd.spawn()
70        {
71            Ok(c) => c,
72            Err(e) => return Response::Error { code: 1, message: format!("failed to spawn: {}", e) },
73        };
74
75        let pid = child.id().unwrap_or(0);
76
77        // Spawn output capture tasks via log_writer
78        if let Some(stdout) = child.stdout.take() {
79            let tx = self.output_tx.clone();
80            let pname = name.clone();
81            let path = log_dir.join(format!("{}.stdout", name));
82            tokio::spawn(async move {
83                log_writer::capture_output(stdout, &path, &pname, ProtoStream::Stdout, tx, DEFAULT_MAX_LOG_BYTES).await;
84            });
85        }
86        if let Some(stderr) = child.stderr.take() {
87            let tx = self.output_tx.clone();
88            let pname = name.clone();
89            let path = log_dir.join(format!("{}.stderr", name));
90            tokio::spawn(async move {
91                log_writer::capture_output(stderr, &path, &pname, ProtoStream::Stderr, tx, DEFAULT_MAX_LOG_BYTES).await;
92            });
93        }
94
95        self.processes.insert(name.clone(), ManagedProcess {
96            name: name.clone(), id: id.clone(), command: command.to_string(),
97            child: Some(child), pid, started_at: Instant::now(), exit_code: None,
98        });
99
100        Response::RunOk { name, id, pid }
101    }
102
103    pub async fn stop_process(&mut self, target: &str) -> Response {
104        let proc = match self.find_mut(target) {
105            Some(p) => p,
106            None => return Response::Error { code: 2, message: format!("process not found: {}", target) },
107        };
108
109        if let Some(ref child) = proc.child {
110            let raw_pid = child.id().unwrap_or(0) as i32;
111            if raw_pid > 0 {
112                // Signal the entire process group (child PID == PGID due to setpgid in pre_exec)
113                let pgid = nix::unistd::Pid::from_raw(raw_pid);
114                let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGTERM);
115            }
116        }
117
118        // Wait up to 10s for graceful exit, then SIGKILL
119        if let Some(ref mut child) = proc.child {
120            let wait_result = tokio::time::timeout(
121                Duration::from_secs(10),
122                child.wait()
123            ).await;
124
125            match wait_result {
126                Ok(Ok(status)) => {
127                    proc.exit_code = status.code();
128                }
129                _ => {
130                    // Timed out or error — force kill the process group
131                    let raw_pid = proc.pid as i32;
132                    if raw_pid > 0 {
133                        let pgid = nix::unistd::Pid::from_raw(raw_pid);
134                        let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGKILL);
135                    }
136                    let _ = child.wait().await;
137                    proc.exit_code = Some(-9);
138                }
139            }
140            proc.child = None;
141        }
142
143        Response::Ok { message: format!("stopped {}", target) }
144    }
145
146    pub async fn stop_all(&mut self) -> Response {
147        let names: Vec<String> = self.processes.keys().cloned().collect();
148        for name in names {
149            self.stop_process(&name).await;
150        }
151        Response::Ok { message: "all processes stopped".into() }
152    }
153
154    pub async fn restart_process(&mut self, target: &str) -> Response {
155        let (command, name) = match self.find(target) {
156            Some(p) => (p.command.clone(), p.name.clone()),
157            None => return Response::Error { code: 2, message: format!("process not found: {}", target) },
158        };
159        self.stop_process(target).await;
160        self.processes.remove(&name);
161        self.spawn_process(&command, Some(name), None, None).await
162    }
163
164    pub fn status(&mut self) -> Response {
165        self.refresh_exit_states();
166        let mut infos: Vec<ProcessInfo> = self.processes.values()
167            .map(|p| ProcessInfo {
168                name: p.name.clone(), id: p.id.clone(), pid: p.pid,
169                state: if p.child.is_some() { ProcessState::Running } else { ProcessState::Exited },
170                exit_code: p.exit_code,
171                uptime_secs: if p.child.is_some() { Some(p.started_at.elapsed().as_secs()) } else { None },
172                command: p.command.clone(),
173            })
174            .collect();
175        infos.sort_by(|a, b| a.name.cmp(&b.name));
176        Response::Status { processes: infos }
177    }
178
179    pub fn is_process_exited(&mut self, target: &str) -> Option<Option<i32>> {
180        self.refresh_exit_states();
181        self.find(target).map(|p| if p.child.is_none() { p.exit_code } else { None })
182    }
183
184    fn refresh_exit_states(&mut self) {
185        for proc in self.processes.values_mut() {
186            if proc.child.is_some() && proc.exit_code.is_none() {
187                if let Some(ref mut child) = proc.child {
188                    if let Ok(Some(status)) = child.try_wait() {
189                        proc.exit_code = status.code();
190                        proc.child = None;
191                    }
192                }
193            }
194        }
195    }
196
197    pub fn has_process(&self, target: &str) -> bool {
198        self.find(target).is_some()
199    }
200
201    fn find(&self, target: &str) -> Option<&ManagedProcess> {
202        self.processes.get(target)
203            .or_else(|| self.processes.values().find(|p| p.id == target))
204    }
205
206    fn find_mut(&mut self, target: &str) -> Option<&mut ManagedProcess> {
207        if self.processes.contains_key(target) {
208            self.processes.get_mut(target)
209        } else {
210            self.processes.values_mut().find(|p| p.id == target)
211        }
212    }
213}