Skip to main content

agent_procs/daemon/
process_manager.rs

1use crate::daemon::log_writer::{self, OutputLine};
2use crate::daemon::port_allocator::PortAllocator;
3use crate::paths;
4use crate::protocol::{ErrorCode, ProcessInfo, ProcessState, Response, Stream as ProtoStream, process_url};
5use crate::session::IdCounter;
6use std::collections::HashMap;
7use std::process::Stdio;
8use std::time::{Duration, Instant};
9use tokio::process::{Child, Command};
10use tokio::sync::broadcast;
11
12const DEFAULT_MAX_LOG_BYTES: u64 = 50 * 1024 * 1024; // 50MB
13
14/// Returns true if `name` is a valid DNS label: 1-63 lowercase alphanumeric/hyphen
15/// chars, not starting or ending with a hyphen.
16#[must_use]
17pub fn is_valid_dns_label(name: &str) -> bool {
18    if name.is_empty() || name.len() > 63 {
19        return false;
20    }
21    if name.starts_with('-') || name.ends_with('-') {
22        return false;
23    }
24    name.chars()
25        .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-')
26}
27
28pub struct ManagedProcess {
29    pub name: String,
30    pub id: String,
31    pub command: String,
32    pub cwd: Option<String>,
33    pub env: HashMap<String, String>,
34    pub child: Option<Child>,
35    pub pid: u32,
36    pub started_at: Instant,
37    pub exit_code: Option<i32>,
38    pub port: Option<u16>,
39}
40
41pub struct ProcessManager {
42    processes: HashMap<String, ManagedProcess>,
43    id_counter: IdCounter,
44    session: String,
45    pub output_tx: broadcast::Sender<OutputLine>,
46    port_allocator: PortAllocator,
47}
48
49impl ProcessManager {
50    pub fn new(session: &str) -> Self {
51        let (output_tx, _) = broadcast::channel(1024);
52        Self {
53            processes: HashMap::new(),
54            id_counter: IdCounter::new(),
55            session: session.to_string(),
56            output_tx,
57            port_allocator: PortAllocator::new(),
58        }
59    }
60
61    #[allow(unsafe_code, clippy::unused_async)]
62    pub async fn spawn_process(
63        &mut self,
64        command: &str,
65        name: Option<String>,
66        cwd: Option<&str>,
67        env: Option<&HashMap<String, String>>,
68        port: Option<u16>,
69    ) -> Response {
70        let id = self.id_counter.next_id();
71        let name = name.unwrap_or_else(|| id.clone());
72
73        // Reject names that could cause path traversal in log files
74        if name.contains('/') || name.contains('\\') || name.contains("..") || name.contains('\0') {
75            return Response::Error {
76                code: ErrorCode::General,
77                message: format!("invalid process name: {}", name),
78            };
79        }
80
81        // When proxy is active, names must be valid DNS labels for subdomain routing
82        if self.port_allocator.is_proxy_enabled() && !is_valid_dns_label(&name) {
83            return Response::Error {
84                code: ErrorCode::General,
85                message: format!(
86                    "invalid process name for proxy: '{}' (must be lowercase alphanumeric/hyphens, max 63 chars)",
87                    name
88                ),
89            };
90        }
91
92        // Resolve the port: use explicit port, auto-assign if proxy is enabled, or None
93        let resolved_port = if let Some(p) = port {
94            Some(p)
95        } else if self.port_allocator.is_proxy_enabled() {
96            let assigned: std::collections::HashSet<u16> =
97                self.processes.values().filter_map(|p| p.port).collect();
98            match self.port_allocator.auto_assign_port(&assigned) {
99                Ok(p) => Some(p),
100                Err(e) => {
101                    return Response::Error {
102                        code: ErrorCode::General,
103                        message: e.to_string(),
104                    };
105                }
106            }
107        } else {
108            None
109        };
110
111        if self.processes.contains_key(&name) {
112            return Response::Error {
113                code: ErrorCode::General,
114                message: format!("process already exists: {}", name),
115            };
116        }
117
118        let log_dir = paths::log_dir(&self.session);
119        let _ = std::fs::create_dir_all(&log_dir);
120
121        let mut cmd = Command::new("sh");
122        cmd.arg("-c")
123            .arg(command)
124            .stdout(Stdio::piped())
125            .stderr(Stdio::piped());
126        if let Some(dir) = cwd {
127            cmd.current_dir(dir);
128        }
129        if let Some(p) = resolved_port {
130            // Inject PORT and HOST; user-supplied env takes precedence
131            let mut merged_env: HashMap<String, String> = HashMap::new();
132            merged_env.insert("PORT".to_string(), p.to_string());
133            merged_env.insert("HOST".to_string(), "127.0.0.1".to_string());
134            if let Some(env_vars) = env {
135                for (k, v) in env_vars {
136                    merged_env.insert(k.clone(), v.clone());
137                }
138            }
139            cmd.envs(&merged_env);
140        } else if let Some(env_vars) = env {
141            cmd.envs(env_vars);
142        }
143        // SAFETY: `setpgid(0, 0)` creates a new process group with the child as
144        // leader.  This must happen before exec so that all grandchildren inherit
145        // the group.  The parent uses this PGID to signal the entire tree on stop.
146        unsafe {
147            cmd.pre_exec(|| {
148                nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), nix::unistd::Pid::from_raw(0))
149                    .map_err(std::io::Error::other)?;
150                Ok(())
151            });
152        }
153
154        let mut child = match cmd.spawn() {
155            Ok(c) => c,
156            Err(e) => {
157                return Response::Error {
158                    code: ErrorCode::General,
159                    message: format!("failed to spawn: {}", e),
160                };
161            }
162        };
163
164        let pid = child.id().unwrap_or(0);
165
166        // Spawn output capture tasks via log_writer
167        if let Some(stdout) = child.stdout.take() {
168            let tx = self.output_tx.clone();
169            let pname = name.clone();
170            let path = log_dir.join(format!("{}.stdout", name));
171            tokio::spawn(async move {
172                log_writer::capture_output(
173                    stdout,
174                    &path,
175                    &pname,
176                    ProtoStream::Stdout,
177                    tx,
178                    DEFAULT_MAX_LOG_BYTES,
179                    log_writer::DEFAULT_MAX_ROTATED_FILES,
180                )
181                .await;
182            });
183        }
184        if let Some(stderr) = child.stderr.take() {
185            let tx = self.output_tx.clone();
186            let pname = name.clone();
187            let path = log_dir.join(format!("{}.stderr", name));
188            tokio::spawn(async move {
189                log_writer::capture_output(
190                    stderr,
191                    &path,
192                    &pname,
193                    ProtoStream::Stderr,
194                    tx,
195                    DEFAULT_MAX_LOG_BYTES,
196                    log_writer::DEFAULT_MAX_ROTATED_FILES,
197                )
198                .await;
199            });
200        }
201
202        self.processes.insert(
203            name.clone(),
204            ManagedProcess {
205                name: name.clone(),
206                id: id.clone(),
207                command: command.to_string(),
208                cwd: cwd.map(std::string::ToString::to_string),
209                env: env.cloned().unwrap_or_default(),
210                child: Some(child),
211                pid,
212                started_at: Instant::now(),
213                exit_code: None,
214                port: resolved_port,
215            },
216        );
217
218        let url = resolved_port.map(|p| process_url(&name, p, None));
219        Response::RunOk {
220            name,
221            id,
222            pid,
223            port: resolved_port,
224            url,
225        }
226    }
227
228    pub async fn stop_process(&mut self, target: &str) -> Response {
229        let proc = match self.find_mut(target) {
230            Some(p) => p,
231            None => {
232                return Response::Error {
233                    code: ErrorCode::NotFound,
234                    message: format!("process not found: {}", target),
235                };
236            }
237        };
238
239        if let Some(ref child) = proc.child {
240            let raw_pid = child.id().unwrap_or(0) as i32;
241            if raw_pid > 0 {
242                // Signal the entire process group (child PID == PGID due to setpgid in pre_exec)
243                let pgid = nix::unistd::Pid::from_raw(raw_pid);
244                let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGTERM);
245            }
246        }
247
248        // Wait up to 10s for graceful exit, then SIGKILL
249        if let Some(ref mut child) = proc.child {
250            let wait_result = tokio::time::timeout(Duration::from_secs(10), child.wait()).await;
251
252            match wait_result {
253                Ok(Ok(status)) => {
254                    proc.exit_code = status.code();
255                }
256                _ => {
257                    // Timed out or error — force kill the process group
258                    let raw_pid = proc.pid as i32;
259                    if raw_pid > 0 {
260                        let pgid = nix::unistd::Pid::from_raw(raw_pid);
261                        let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGKILL);
262                    }
263                    let _ = child.wait().await;
264                    proc.exit_code = Some(-9);
265                }
266            }
267            proc.child = None;
268        }
269
270        Response::Ok {
271            message: format!("stopped {}", target),
272        }
273    }
274
275    pub async fn stop_all(&mut self) -> Response {
276        let names: Vec<String> = self.processes.keys().cloned().collect();
277        for name in names {
278            let _ = self.stop_process(&name).await;
279        }
280        self.processes.clear();
281        Response::Ok {
282            message: "all processes stopped".into(),
283        }
284    }
285
286    pub async fn restart_process(&mut self, target: &str) -> Response {
287        let (command, name, cwd, env, port) = match self.find(target) {
288            Some(p) => (
289                p.command.clone(),
290                p.name.clone(),
291                p.cwd.clone(),
292                p.env.clone(),
293                p.port,
294            ),
295            None => {
296                return Response::Error {
297                    code: ErrorCode::NotFound,
298                    message: format!("process not found: {}", target),
299                };
300            }
301        };
302        let _ = self.stop_process(target).await;
303        self.processes.remove(&name);
304        let env = if env.is_empty() { None } else { Some(env) };
305        self.spawn_process(&command, Some(name), cwd.as_deref(), env.as_ref(), port)
306            .await
307    }
308
309    pub fn enable_proxy(&mut self) {
310        self.port_allocator.enable_proxy();
311    }
312
313    pub fn status(&mut self) -> Response {
314        self.refresh_exit_states();
315        Response::Status {
316            processes: self.build_process_infos(),
317        }
318    }
319
320    /// Returns `None` if process not found or still running.
321    /// Returns `Some(exit_code)` if process has exited (`exit_code` is None for signal kills).
322    pub fn is_process_exited(&mut self, target: &str) -> Option<Option<i32>> {
323        self.refresh_exit_states();
324        self.find(target).and_then(|p| {
325            if p.child.is_none() {
326                Some(p.exit_code)
327            } else {
328                None
329            }
330        })
331    }
332
333    fn refresh_exit_states(&mut self) {
334        for proc in self.processes.values_mut() {
335            if proc.child.is_some() && proc.exit_code.is_none() {
336                if let Some(ref mut child) = proc.child {
337                    if let Ok(Some(status)) = child.try_wait() {
338                        proc.exit_code = status.code();
339                        proc.child = None;
340                    }
341                }
342            }
343        }
344    }
345
346    pub fn session_name(&self) -> &str {
347        &self.session
348    }
349
350    pub fn has_process(&self, target: &str) -> bool {
351        self.find(target).is_some()
352    }
353
354    /// Returns a map of running process names to their assigned ports.
355    pub fn running_ports(&self) -> HashMap<String, u16> {
356        self.processes
357            .iter()
358            .filter_map(|(name, p)| {
359                if p.child.is_some() {
360                    p.port.map(|port| (name.clone(), port))
361                } else {
362                    None
363                }
364            })
365            .collect()
366    }
367
368    /// Non-mutating status snapshot for use by the proxy status page.
369    /// May show stale exit states since it skips `refresh_exit_states()`.
370    pub fn status_snapshot(&self) -> Response {
371        Response::Status {
372            processes: self.build_process_infos(),
373        }
374    }
375
376    fn build_process_infos(&self) -> Vec<ProcessInfo> {
377        let mut infos: Vec<ProcessInfo> = self
378            .processes
379            .values()
380            .map(|p| ProcessInfo {
381                name: p.name.clone(),
382                id: p.id.clone(),
383                pid: p.pid,
384                state: if p.child.is_some() {
385                    ProcessState::Running
386                } else {
387                    ProcessState::Exited
388                },
389                exit_code: p.exit_code,
390                uptime_secs: if p.child.is_some() {
391                    Some(p.started_at.elapsed().as_secs())
392                } else {
393                    None
394                },
395                command: p.command.clone(),
396                port: p.port,
397                url: p.port.map(|port| process_url(&p.name, port, None)),
398            })
399            .collect();
400        infos.sort_by(|a, b| a.name.cmp(&b.name));
401        infos
402    }
403
404    fn find(&self, target: &str) -> Option<&ManagedProcess> {
405        self.processes
406            .get(target)
407            .or_else(|| self.processes.values().find(|p| p.id == target))
408    }
409
410    fn find_mut(&mut self, target: &str) -> Option<&mut ManagedProcess> {
411        if self.processes.contains_key(target) {
412            self.processes.get_mut(target)
413        } else {
414            self.processes.values_mut().find(|p| p.id == target)
415        }
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422
423    #[test]
424    fn test_valid_dns_labels() {
425        assert!(is_valid_dns_label("api"));
426        assert!(is_valid_dns_label("my-app"));
427        assert!(is_valid_dns_label("a"));
428        assert!(is_valid_dns_label("a1"));
429        assert!(is_valid_dns_label("123"));
430    }
431
432    #[test]
433    fn test_invalid_dns_labels() {
434        assert!(!is_valid_dns_label(""));
435        assert!(!is_valid_dns_label("-start"));
436        assert!(!is_valid_dns_label("end-"));
437        assert!(!is_valid_dns_label("UPPER"));
438        assert!(!is_valid_dns_label("has.dot"));
439        assert!(!is_valid_dns_label("has space"));
440        assert!(!is_valid_dns_label(&"a".repeat(64))); // > 63 chars
441        assert!(!is_valid_dns_label("has_underscore"));
442    }
443}