Skip to main content

agent_procs/daemon/
process_manager.rs

1use crate::daemon::log_writer::{self, OutputLine};
2use crate::daemon::port_allocator::PortAllocator;
3use crate::paths;
4use crate::protocol::{
5    ErrorCode, ProcessInfo, ProcessState, Response, RestartMode, RestartPolicy,
6    Stream as ProtoStream, WatchConfig, process_url,
7};
8use crate::session::IdCounter;
9use std::collections::HashMap;
10use std::process::Stdio;
11use std::sync::Arc;
12use std::sync::atomic::AtomicU64;
13use std::time::{Duration, Instant};
14use tokio::process::{Child, Command};
15use tokio::sync::broadcast;
16
17const DEFAULT_MAX_LOG_BYTES: u64 = 50 * 1024 * 1024; // 50MB
18
19/// Returns true if `name` is a valid DNS label: 1-63 lowercase alphanumeric/hyphen
20/// chars, not starting or ending with a hyphen.
21#[must_use]
22pub fn is_valid_dns_label(name: &str) -> bool {
23    if name.is_empty() || name.len() > 63 {
24        return false;
25    }
26    if name.starts_with('-') || name.ends_with('-') {
27        return false;
28    }
29    name.chars()
30        .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-')
31}
32
33pub struct ManagedProcess {
34    pub name: String,
35    pub id: String,
36    pub command: String,
37    pub cwd: Option<String>,
38    pub env: HashMap<String, String>,
39    pub child: Option<Child>,
40    pub pid: u32,
41    pub started_at: Instant,
42    pub exit_code: Option<i32>,
43    pub port: Option<u16>,
44    // Supervisor fields
45    pub restart_policy: Option<RestartPolicy>,
46    pub watch_config: Option<WatchConfig>,
47    pub restart_count: u32,
48    pub manually_stopped: bool,
49    pub restart_pending: bool,
50    pub failed: bool,
51    pub supervisor_tx: Option<tokio::sync::mpsc::Sender<String>>,
52    pub capture_handles: Vec<tokio::task::JoinHandle<()>>,
53    pub watch_handle: Option<crate::daemon::watcher::WatchHandle>,
54}
55
56pub struct ProcessManager {
57    processes: HashMap<String, ManagedProcess>,
58    id_counter: IdCounter,
59    session: String,
60    pub output_tx: broadcast::Sender<OutputLine>,
61    port_allocator: PortAllocator,
62}
63
64impl ProcessManager {
65    pub fn new(session: &str) -> Self {
66        let (output_tx, _) = broadcast::channel(1024);
67        Self {
68            processes: HashMap::new(),
69            id_counter: IdCounter::new(),
70            session: session.to_string(),
71            output_tx,
72            port_allocator: PortAllocator::new(),
73        }
74    }
75
76    #[allow(unsafe_code, clippy::unused_async)]
77    pub async fn spawn_process(
78        &mut self,
79        command: &str,
80        name: Option<String>,
81        cwd: Option<&str>,
82        env: Option<&HashMap<String, String>>,
83        port: Option<u16>,
84    ) -> Response {
85        let id = self.id_counter.next_id();
86        let name = name.unwrap_or_else(|| id.clone());
87
88        // Reject names that could cause path traversal in log files
89        if name.contains('/') || name.contains('\\') || name.contains("..") || name.contains('\0') {
90            return Response::Error {
91                code: ErrorCode::General,
92                message: format!("invalid process name: {}", name),
93            };
94        }
95
96        // When proxy is active, names must be valid DNS labels for subdomain routing
97        if self.port_allocator.is_proxy_enabled() && !is_valid_dns_label(&name) {
98            return Response::Error {
99                code: ErrorCode::General,
100                message: format!(
101                    "invalid process name for proxy: '{}' (must be lowercase alphanumeric/hyphens, max 63 chars)",
102                    name
103                ),
104            };
105        }
106
107        // Resolve the port: use explicit port, auto-assign if proxy is enabled, or None
108        let resolved_port = if let Some(p) = port {
109            Some(p)
110        } else if self.port_allocator.is_proxy_enabled() {
111            let assigned: std::collections::HashSet<u16> = self
112                .processes
113                .values()
114                .filter(|p| p.child.is_some())
115                .filter_map(|p| p.port)
116                .collect();
117            match self.port_allocator.auto_assign_port(&assigned) {
118                Ok(p) => Some(p),
119                Err(e) => {
120                    return Response::Error {
121                        code: ErrorCode::General,
122                        message: e.to_string(),
123                    };
124                }
125            }
126        } else {
127            None
128        };
129
130        if self.processes.contains_key(&name) {
131            return Response::Error {
132                code: ErrorCode::General,
133                message: format!("process already exists: {}", name),
134            };
135        }
136
137        let log_dir = paths::log_dir(&self.session);
138        let _ = std::fs::create_dir_all(&log_dir);
139
140        let mut cmd = Command::new("sh");
141        cmd.arg("-c")
142            .arg(command)
143            .stdout(Stdio::piped())
144            .stderr(Stdio::piped());
145        if let Some(dir) = cwd {
146            cmd.current_dir(dir);
147        }
148        if let Some(p) = resolved_port {
149            // Inject PORT and HOST; user-supplied env takes precedence
150            let mut merged_env: HashMap<String, String> = HashMap::new();
151            merged_env.insert("PORT".to_string(), p.to_string());
152            merged_env.insert("HOST".to_string(), "127.0.0.1".to_string());
153            if let Some(env_vars) = env {
154                for (k, v) in env_vars {
155                    merged_env.insert(k.clone(), v.clone());
156                }
157            }
158            cmd.envs(&merged_env);
159        } else if let Some(env_vars) = env {
160            cmd.envs(env_vars);
161        }
162        // SAFETY: `setpgid(0, 0)` creates a new process group with the child as
163        // leader.  This must happen before exec so that all grandchildren inherit
164        // the group.  The parent uses this PGID to signal the entire tree on stop.
165        unsafe {
166            cmd.pre_exec(|| {
167                nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), nix::unistd::Pid::from_raw(0))
168                    .map_err(std::io::Error::other)?;
169                Ok(())
170            });
171        }
172
173        let mut child = match cmd.spawn() {
174            Ok(c) => c,
175            Err(e) => {
176                return Response::Error {
177                    code: ErrorCode::General,
178                    message: format!("failed to spawn: {}", e),
179                };
180            }
181        };
182
183        let pid = child.id().unwrap_or(0);
184
185        // Shared sequence counter for interleaved ordering across streams
186        let seq_counter = Arc::new(AtomicU64::new(0));
187
188        // Spawn output capture tasks via log_writer
189        // Supervisor channel: stdout gets the real sender, stderr gets a dummy
190        let (sup_tx, sup_rx_stdout) = tokio::sync::mpsc::channel::<String>(16);
191        let (stderr_sup_sender, sup_rx_stderr) = tokio::sync::mpsc::channel::<String>(16);
192        drop(stderr_sup_sender);
193
194        let mut capture_handles = Vec::new();
195
196        if let Some(stdout) = child.stdout.take() {
197            let tx = self.output_tx.clone();
198            let pname = name.clone();
199            let path = log_dir.join(format!("{}.stdout", name));
200            let seq = Arc::clone(&seq_counter);
201            let handle = tokio::spawn(async move {
202                log_writer::capture_output(
203                    stdout,
204                    &path,
205                    &pname,
206                    ProtoStream::Stdout,
207                    tx,
208                    DEFAULT_MAX_LOG_BYTES,
209                    log_writer::DEFAULT_MAX_ROTATED_FILES,
210                    seq,
211                    sup_rx_stdout,
212                )
213                .await;
214            });
215            capture_handles.push(handle);
216        }
217        if let Some(stderr) = child.stderr.take() {
218            let tx = self.output_tx.clone();
219            let pname = name.clone();
220            let path = log_dir.join(format!("{}.stderr", name));
221            let seq = Arc::clone(&seq_counter);
222            let handle = tokio::spawn(async move {
223                log_writer::capture_output(
224                    stderr,
225                    &path,
226                    &pname,
227                    ProtoStream::Stderr,
228                    tx,
229                    DEFAULT_MAX_LOG_BYTES,
230                    log_writer::DEFAULT_MAX_ROTATED_FILES,
231                    seq,
232                    sup_rx_stderr,
233                )
234                .await;
235            });
236            capture_handles.push(handle);
237        }
238
239        self.processes.insert(
240            name.clone(),
241            ManagedProcess {
242                name: name.clone(),
243                id: id.clone(),
244                command: command.to_string(),
245                cwd: cwd.map(std::string::ToString::to_string),
246                env: env.cloned().unwrap_or_default(),
247                child: Some(child),
248                pid,
249                started_at: Instant::now(),
250                exit_code: None,
251                port: resolved_port,
252                restart_policy: None,
253                watch_config: None,
254                restart_count: 0,
255                manually_stopped: false,
256                restart_pending: false,
257                failed: false,
258                supervisor_tx: Some(sup_tx),
259                capture_handles,
260                watch_handle: None,
261            },
262        );
263
264        let url = resolved_port.map(|p| process_url(&name, p, None));
265        Response::RunOk {
266            name,
267            id,
268            pid,
269            port: resolved_port,
270            url,
271        }
272    }
273
274    pub async fn stop_process(&mut self, target: &str) -> Response {
275        let proc = match self.find_mut(target) {
276            Some(p) => p,
277            None => {
278                return Response::Error {
279                    code: ErrorCode::NotFound,
280                    message: format!("process not found: {}", target),
281                };
282            }
283        };
284
285        proc.manually_stopped = true;
286
287        if let Some(ref child) = proc.child {
288            let raw_pid = child.id().unwrap_or(0) as i32;
289            if raw_pid > 0 {
290                // Signal the entire process group (child PID == PGID due to setpgid in pre_exec)
291                let pgid = nix::unistd::Pid::from_raw(raw_pid);
292                let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGTERM);
293            }
294        }
295
296        // Wait up to 10s for graceful exit, then SIGKILL
297        if let Some(ref mut child) = proc.child {
298            let wait_result = tokio::time::timeout(Duration::from_secs(10), child.wait()).await;
299
300            match wait_result {
301                Ok(Ok(status)) => {
302                    proc.exit_code = status.code();
303                }
304                _ => {
305                    // Timed out or error — force kill the process group
306                    let raw_pid = proc.pid as i32;
307                    if raw_pid > 0 {
308                        let pgid = nix::unistd::Pid::from_raw(raw_pid);
309                        let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGKILL);
310                    }
311                    let _ = child.wait().await;
312                    proc.exit_code = Some(-9);
313                }
314            }
315            proc.child = None;
316        }
317
318        Response::Ok {
319            message: format!("stopped {}", target),
320        }
321    }
322
323    pub async fn stop_all(&mut self) -> Response {
324        let names: Vec<String> = self.processes.keys().cloned().collect();
325        for name in names {
326            let _ = self.stop_process(&name).await;
327        }
328        self.processes.clear();
329        Response::Ok {
330            message: "all processes stopped".into(),
331        }
332    }
333
334    pub async fn restart_process(&mut self, target: &str) -> Response {
335        let (command, name, cwd, env, port, restart_policy, watch_config) = match self.find(target)
336        {
337            Some(p) => (
338                p.command.clone(),
339                p.name.clone(),
340                p.cwd.clone(),
341                p.env.clone(),
342                p.port,
343                p.restart_policy.clone(),
344                p.watch_config.clone(),
345            ),
346            None => {
347                return Response::Error {
348                    code: ErrorCode::NotFound,
349                    message: format!("process not found: {}", target),
350                };
351            }
352        };
353        // Clear supervisor flags — restart is intentional
354        if let Some(p) = self.find_mut(target) {
355            p.manually_stopped = false;
356            p.restart_count = 0;
357            p.failed = false;
358            p.restart_pending = false;
359        }
360        let _ = self.stop_process(target).await;
361        self.processes.remove(&name);
362        let env = if env.is_empty() { None } else { Some(env) };
363        let resp = self
364            .spawn_process(
365                &command,
366                Some(name.clone()),
367                cwd.as_deref(),
368                env.as_ref(),
369                port,
370            )
371            .await;
372        // Re-attach restart/watch config so manual restart preserves supervisor behavior
373        if let Response::RunOk { .. } = resp
374            && let Some(p) = self.find_mut(&name)
375        {
376            p.restart_policy = restart_policy;
377            p.watch_config = watch_config;
378        }
379        resp
380    }
381
382    pub fn enable_proxy(&mut self) {
383        self.port_allocator.enable_proxy();
384    }
385
386    pub fn status(&mut self) -> Response {
387        self.refresh_exit_states();
388        Response::Status {
389            processes: self.build_process_infos(),
390        }
391    }
392
393    /// Returns `None` if process not found or still running.
394    /// Returns `Some(exit_code)` if process has exited (`exit_code` is None for signal kills).
395    pub fn is_process_exited(&mut self, target: &str) -> Option<Option<i32>> {
396        self.refresh_exit_states();
397        self.find(target).and_then(|p| {
398            if p.child.is_none() {
399                Some(p.exit_code)
400            } else {
401                None
402            }
403        })
404    }
405
406    pub(crate) fn refresh_exit_states(&mut self) -> bool {
407        let mut changed = false;
408        for proc in self.processes.values_mut() {
409            if proc.child.is_some()
410                && proc.exit_code.is_none()
411                && let Some(ref mut child) = proc.child
412                && let Ok(Some(status)) = child.try_wait()
413            {
414                proc.exit_code = status.code();
415                proc.child = None;
416                changed = true;
417            }
418        }
419        changed
420    }
421
422    pub fn session_name(&self) -> &str {
423        &self.session
424    }
425
426    pub fn has_process(&self, target: &str) -> bool {
427        self.find(target).is_some()
428    }
429
430    /// Returns a map of running process names to their assigned ports.
431    pub fn running_ports(&self) -> HashMap<String, u16> {
432        self.processes
433            .iter()
434            .filter_map(|(name, p)| {
435                if p.child.is_some() {
436                    p.port.map(|port| (name.clone(), port))
437                } else {
438                    None
439                }
440            })
441            .collect()
442    }
443
444    /// Non-mutating status snapshot for use by the proxy status page.
445    /// May show stale exit states since it skips `refresh_exit_states()`.
446    pub fn status_snapshot(&self) -> Response {
447        Response::Status {
448            processes: self.build_process_infos(),
449        }
450    }
451
452    fn build_process_infos(&self) -> Vec<ProcessInfo> {
453        let mut infos: Vec<ProcessInfo> = self
454            .processes
455            .values()
456            .map(|p| ProcessInfo {
457                name: p.name.clone(),
458                id: p.id.clone(),
459                pid: p.pid,
460                state: if p.child.is_some() {
461                    ProcessState::Running
462                } else if p.failed {
463                    ProcessState::Failed
464                } else {
465                    ProcessState::Exited
466                },
467                exit_code: p.exit_code,
468                uptime_secs: if p.child.is_some() {
469                    Some(p.started_at.elapsed().as_secs())
470                } else {
471                    None
472                },
473                command: p.command.clone(),
474                port: p.port,
475                url: p.port.map(|port| process_url(&p.name, port, None)),
476                restart_count: if p.restart_count > 0 {
477                    Some(p.restart_count)
478                } else {
479                    None
480                },
481                max_restarts: p.restart_policy.as_ref().and_then(|rp| rp.max_restarts),
482                restart_policy: p.restart_policy.as_ref().map(|rp| match rp.mode {
483                    RestartMode::Always => "always".into(),
484                    RestartMode::OnFailure => "on-failure".into(),
485                    RestartMode::Never => "never".into(),
486                }),
487                watched: if p.watch_config.is_some() {
488                    Some(true)
489                } else {
490                    None
491                },
492            })
493            .collect();
494        infos.sort_by(|a, b| a.name.cmp(&b.name));
495        infos
496    }
497
498    pub(crate) fn find(&self, target: &str) -> Option<&ManagedProcess> {
499        self.processes
500            .get(target)
501            .or_else(|| self.processes.values().find(|p| p.id == target))
502    }
503
504    pub(crate) fn find_mut(&mut self, target: &str) -> Option<&mut ManagedProcess> {
505        if self.processes.contains_key(target) {
506            self.processes.get_mut(target)
507        } else {
508            self.processes.values_mut().find(|p| p.id == target)
509        }
510    }
511
512    /// Classify exited processes that have a restart policy into "restartable" vs "exhausted".
513    /// Returns `(restartable, exhausted)` name vectors in a single pass.
514    pub fn classify_restart_candidates(&self) -> (Vec<String>, Vec<String>) {
515        let mut restartable = Vec::new();
516        let mut exhausted = Vec::new();
517        for p in self.processes.values() {
518            if p.child.is_some() || p.manually_stopped || p.restart_pending || p.failed {
519                continue;
520            }
521            let Some(ref policy) = p.restart_policy else {
522                continue;
523            };
524            if !policy.mode.should_restart(p.exit_code) {
525                continue;
526            }
527            if policy
528                .max_restarts
529                .is_some_and(|max| p.restart_count >= max)
530            {
531                exhausted.push(p.name.clone());
532            } else {
533                restartable.push(p.name.clone());
534            }
535        }
536        (restartable, exhausted)
537    }
538
539    /// Mark a process as failed (max restarts exhausted).
540    pub fn mark_failed(&mut self, target: &str) {
541        if let Some(p) = self.find_mut(target) {
542            p.failed = true;
543        }
544    }
545
546    /// Re-spawn a process in place, preserving supervisor metadata.
547    /// Drains capture tasks, rotates logs, re-spawns, and carries over metadata.
548    /// On spawn failure, reinserts a tombstone record with failed=true.
549    pub async fn respawn_in_place(&mut self, target: &str) -> Result<(), String> {
550        let proc = self
551            .find(target)
552            .ok_or_else(|| format!("process not found: {}", target))?;
553
554        // 1. Save metadata and spawn args
555        let command = proc.command.clone();
556        let name = proc.name.clone();
557        let cwd = proc.cwd.clone();
558        let env = proc.env.clone();
559        let port = proc.port;
560        let restart_policy = proc.restart_policy.clone();
561        let watch_config = proc.watch_config.clone();
562        let restart_count = proc.restart_count;
563        let restart_pending = proc.restart_pending;
564
565        // 2. Drop supervisor sender (signals capture task to drain)
566        if let Some(proc) = self.find_mut(target) {
567            proc.supervisor_tx = None;
568        }
569
570        // 3. Await capture task JoinHandles
571        if let Some(proc) = self.find_mut(target) {
572            let handles = std::mem::take(&mut proc.capture_handles);
573            for h in handles {
574                let _ = h.await;
575            }
576        }
577
578        // 4. Rotate log files
579        let log_dir = crate::paths::log_dir(&self.session);
580        let stdout_path = log_dir.join(format!("{}.stdout", name));
581        let stderr_path = log_dir.join(format!("{}.stderr", name));
582        log_writer::rotate_if_exists(&stdout_path).await;
583        log_writer::rotate_if_exists(&stderr_path).await;
584
585        // 5. Remove old record
586        self.processes.remove(&name);
587
588        // 6. Spawn fresh
589        let env_opt = if env.is_empty() {
590            None
591        } else {
592            Some(env.clone())
593        };
594        let resp = self
595            .spawn_process(
596                &command,
597                Some(name.clone()),
598                cwd.as_deref(),
599                env_opt.as_ref(),
600                port,
601            )
602            .await;
603
604        match resp {
605            Response::RunOk { .. } => {
606                // 7. Copy metadata to new record
607                if let Some(p) = self.find_mut(&name) {
608                    p.restart_policy = restart_policy;
609                    p.watch_config = watch_config;
610                    p.restart_count = restart_count;
611                    p.failed = false;
612                }
613                Ok(())
614            }
615            Response::Error { message, .. } => {
616                // 8. Reinsert tombstone
617                self.processes.insert(
618                    name.clone(),
619                    ManagedProcess {
620                        name: name.clone(),
621                        id: "tombstone".into(),
622                        command,
623                        cwd,
624                        env,
625                        child: None,
626                        pid: 0,
627                        started_at: Instant::now(),
628                        exit_code: None,
629                        port,
630                        restart_policy,
631                        watch_config,
632                        restart_count,
633                        manually_stopped: false,
634                        restart_pending,
635                        failed: true,
636                        supervisor_tx: None,
637                        capture_handles: Vec::new(),
638                        watch_handle: None,
639                    },
640                );
641                Err(message)
642            }
643            _ => Err("unexpected response from spawn".into()),
644        }
645    }
646}
647
648#[cfg(test)]
649mod tests {
650    use super::*;
651
652    #[test]
653    fn test_valid_dns_labels() {
654        assert!(is_valid_dns_label("api"));
655        assert!(is_valid_dns_label("my-app"));
656        assert!(is_valid_dns_label("a"));
657        assert!(is_valid_dns_label("a1"));
658        assert!(is_valid_dns_label("123"));
659    }
660
661    #[test]
662    fn test_invalid_dns_labels() {
663        assert!(!is_valid_dns_label(""));
664        assert!(!is_valid_dns_label("-start"));
665        assert!(!is_valid_dns_label("end-"));
666        assert!(!is_valid_dns_label("UPPER"));
667        assert!(!is_valid_dns_label("has.dot"));
668        assert!(!is_valid_dns_label("has space"));
669        assert!(!is_valid_dns_label(&"a".repeat(64))); // > 63 chars
670        assert!(!is_valid_dns_label("has_underscore"));
671    }
672
673    #[tokio::test]
674    async fn test_respawn_in_place_preserves_metadata() {
675        let mut pm = ProcessManager::new("test-respawn");
676        let resp = pm
677            .spawn_process("echo hello", Some("worker".into()), None, None, None)
678            .await;
679        assert!(matches!(resp, Response::RunOk { .. }));
680
681        // Set supervisor metadata
682        if let Some(p) = pm.find_mut("worker") {
683            p.restart_policy = Some(RestartPolicy {
684                mode: RestartMode::OnFailure,
685                max_restarts: Some(5),
686                restart_delay_ms: 1000,
687            });
688            p.restart_count = 3;
689        }
690
691        // Wait for the process to exit
692        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
693        pm.refresh_exit_states();
694
695        // Respawn
696        let result = pm.respawn_in_place("worker").await;
697        assert!(result.is_ok());
698
699        // Verify metadata preserved
700        let p = pm.find("worker").unwrap();
701        assert!(p.child.is_some()); // new process running
702        assert_eq!(p.restart_count, 3);
703        assert!(p.restart_policy.is_some());
704        assert_eq!(
705            p.restart_policy.as_ref().unwrap().mode,
706            RestartMode::OnFailure
707        );
708        assert!(!p.failed);
709    }
710
711    #[tokio::test]
712    async fn test_respawn_in_place_tombstone_on_failure() {
713        let mut pm = ProcessManager::new("test-tombstone");
714        let resp = pm
715            .spawn_process("echo hello", Some("worker".into()), None, None, None)
716            .await;
717        assert!(matches!(resp, Response::RunOk { .. }));
718
719        if let Some(p) = pm.find_mut("worker") {
720            p.restart_policy = Some(RestartPolicy {
721                mode: RestartMode::Always,
722                max_restarts: Some(3),
723                restart_delay_ms: 1000,
724            });
725            p.restart_count = 2;
726            // Corrupt name to contain path traversal — triggers spawn validation error
727            p.name = "work/er".to_string();
728        }
729
730        // Re-key in the processes map under the corrupted name
731        let proc = pm.processes.remove("worker").unwrap();
732        pm.processes.insert("work/er".to_string(), proc);
733
734        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
735        pm.refresh_exit_states();
736
737        let result = pm.respawn_in_place("work/er").await;
738        assert!(result.is_err());
739
740        // Tombstone should exist
741        let p = pm.find("work/er").unwrap();
742        assert!(p.child.is_none());
743        assert!(p.failed);
744        assert_eq!(p.restart_count, 2); // preserved
745        assert!(p.restart_policy.is_some());
746    }
747}