Skip to main content

agent_procs/daemon/
actor.rs

1use crate::daemon::log_writer::OutputLine;
2use crate::daemon::process_manager::ProcessManager;
3use crate::protocol::{self, ErrorCode, Response, Stream as ProtoStream};
4use std::collections::HashMap;
5use tokio::sync::{broadcast, mpsc, oneshot, watch};
6use tokio::time::{self, Duration, MissedTickBehavior};
7
8const EXIT_REFRESH_INTERVAL: Duration = Duration::from_millis(200);
9
10/// State published via the watch channel for lock-free proxy reads.
11#[derive(Debug, Clone, PartialEq)]
12pub struct ProxyState {
13    /// Running process name → backend port.
14    pub port_map: HashMap<String, u16>,
15}
16
17/// Commands sent to the actor via [`PmHandle`].
18enum PmCommand {
19    Spawn {
20        command: String,
21        name: Option<String>,
22        cwd: Option<String>,
23        env: Option<HashMap<String, String>>,
24        port: Option<u16>,
25        restart: Option<crate::protocol::RestartPolicy>,
26        watch: Option<crate::protocol::WatchConfig>,
27        reply: oneshot::Sender<Response>,
28    },
29    Stop {
30        target: String,
31        reply: oneshot::Sender<Response>,
32    },
33    StopAll {
34        reply: oneshot::Sender<Response>,
35    },
36    Restart {
37        target: String,
38        reply: oneshot::Sender<Response>,
39    },
40    Status {
41        reply: oneshot::Sender<Response>,
42    },
43    StatusSnapshot {
44        reply: oneshot::Sender<Response>,
45    },
46    HasProcess {
47        target: String,
48        reply: oneshot::Sender<bool>,
49    },
50    SessionName {
51        reply: oneshot::Sender<String>,
52    },
53    #[allow(clippy::option_option)]
54    IsProcessExited {
55        target: String,
56        reply: oneshot::Sender<Option<Option<i32>>>,
57    },
58    /// Returns `Some(existing_port)` if proxy is already enabled, `None` if newly enabled.
59    EnableProxy {
60        proxy_port: u16,
61        reply: oneshot::Sender<Option<u16>>,
62    },
63    Subscribe {
64        reply: oneshot::Sender<broadcast::Receiver<OutputLine>>,
65    },
66    /// Internal: delayed auto-restart for a crashed process.
67    AutoRestart {
68        name: String,
69    },
70    /// Internal: file watcher triggered restart.
71    WatchRestart {
72        name: String,
73    },
74}
75
76fn actor_error(msg: &str) -> Response {
77    Response::Error {
78        code: ErrorCode::General,
79        message: msg.into(),
80    }
81}
82
83/// Cloneable handle for sending commands to the [`ProcessManagerActor`].
84#[derive(Clone)]
85pub struct PmHandle {
86    tx: mpsc::Sender<PmCommand>,
87}
88
89impl PmHandle {
90    pub async fn spawn_process(
91        &self,
92        command: String,
93        name: Option<String>,
94        cwd: Option<String>,
95        env: Option<HashMap<String, String>>,
96        port: Option<u16>,
97    ) -> Response {
98        self.spawn_process_supervised(command, name, cwd, env, port, None, None)
99            .await
100    }
101
102    #[allow(clippy::too_many_arguments)]
103    pub async fn spawn_process_supervised(
104        &self,
105        command: String,
106        name: Option<String>,
107        cwd: Option<String>,
108        env: Option<HashMap<String, String>>,
109        port: Option<u16>,
110        restart: Option<crate::protocol::RestartPolicy>,
111        watch: Option<crate::protocol::WatchConfig>,
112    ) -> Response {
113        let (reply, rx) = oneshot::channel();
114        let _ = self
115            .tx
116            .send(PmCommand::Spawn {
117                command,
118                name,
119                cwd,
120                env,
121                port,
122                restart,
123                watch,
124                reply,
125            })
126            .await;
127        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
128    }
129
130    pub async fn stop_process(&self, target: &str) -> Response {
131        let (reply, rx) = oneshot::channel();
132        let _ = self
133            .tx
134            .send(PmCommand::Stop {
135                target: target.to_string(),
136                reply,
137            })
138            .await;
139        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
140    }
141
142    pub async fn stop_all(&self) -> Response {
143        let (reply, rx) = oneshot::channel();
144        let _ = self.tx.send(PmCommand::StopAll { reply }).await;
145        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
146    }
147
148    pub async fn restart_process(&self, target: &str) -> Response {
149        let (reply, rx) = oneshot::channel();
150        let _ = self
151            .tx
152            .send(PmCommand::Restart {
153                target: target.to_string(),
154                reply,
155            })
156            .await;
157        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
158    }
159
160    pub async fn status(&self) -> Response {
161        let (reply, rx) = oneshot::channel();
162        let _ = self.tx.send(PmCommand::Status { reply }).await;
163        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
164    }
165
166    pub async fn status_snapshot(&self) -> Response {
167        let (reply, rx) = oneshot::channel();
168        let _ = self.tx.send(PmCommand::StatusSnapshot { reply }).await;
169        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
170    }
171
172    pub async fn has_process(&self, target: &str) -> bool {
173        let (reply, rx) = oneshot::channel();
174        let _ = self
175            .tx
176            .send(PmCommand::HasProcess {
177                target: target.to_string(),
178                reply,
179            })
180            .await;
181        rx.await.unwrap_or(false)
182    }
183
184    pub async fn session_name(&self) -> String {
185        let (reply, rx) = oneshot::channel();
186        let _ = self.tx.send(PmCommand::SessionName { reply }).await;
187        rx.await.unwrap_or_default()
188    }
189
190    pub async fn is_process_exited(&self, target: &str) -> Option<Option<i32>> {
191        let (reply, rx) = oneshot::channel();
192        let _ = self
193            .tx
194            .send(PmCommand::IsProcessExited {
195                target: target.to_string(),
196                reply,
197            })
198            .await;
199        rx.await.unwrap_or(None)
200    }
201
202    /// Enable proxy with the given port. Returns `Some(existing_port)` if already enabled.
203    pub async fn enable_proxy(&self, proxy_port: u16) -> Option<u16> {
204        let (reply, rx) = oneshot::channel();
205        let _ = self
206            .tx
207            .send(PmCommand::EnableProxy { proxy_port, reply })
208            .await;
209        rx.await.unwrap_or(None)
210    }
211
212    pub async fn subscribe(&self) -> broadcast::Receiver<OutputLine> {
213        let (reply, rx) = oneshot::channel();
214        let _ = self.tx.send(PmCommand::Subscribe { reply }).await;
215        rx.await.expect("actor should be alive for subscribe")
216    }
217}
218
219/// Actor that owns the [`ProcessManager`] and processes commands sequentially.
220pub struct ProcessManagerActor {
221    pm: ProcessManager,
222    rx: mpsc::Receiver<PmCommand>,
223    /// Clone of the sender for scheduling internal commands (e.g. delayed restart).
224    self_tx: mpsc::Sender<PmCommand>,
225    proxy_state_tx: watch::Sender<ProxyState>,
226    proxy_port: Option<u16>,
227}
228
229impl ProcessManagerActor {
230    /// Create the actor, its handle, and the proxy state watch channel.
231    pub fn new(session: &str) -> (PmHandle, watch::Receiver<ProxyState>, Self) {
232        let (tx, rx) = mpsc::channel(256);
233        let pm = ProcessManager::new(session);
234
235        let initial_state = ProxyState {
236            port_map: HashMap::new(),
237        };
238        let (proxy_state_tx, proxy_state_rx) = watch::channel(initial_state);
239
240        let handle = PmHandle { tx: tx.clone() };
241        let actor = Self {
242            pm,
243            rx,
244            self_tx: tx,
245            proxy_state_tx,
246            proxy_port: None,
247        };
248
249        (handle, proxy_state_rx, actor)
250    }
251
252    /// Run the actor loop until all senders are dropped.
253    pub async fn run(mut self) {
254        let mut exit_refresh = time::interval(EXIT_REFRESH_INTERVAL);
255        exit_refresh.set_missed_tick_behavior(MissedTickBehavior::Delay);
256
257        loop {
258            tokio::select! {
259                cmd = self.rx.recv() => match cmd {
260                    Some(cmd) => self.handle_command(cmd).await,
261                    None => break,
262                },
263                _ = exit_refresh.tick() => {
264                    if self.pm.refresh_exit_states() {
265                        self.publish_proxy_state();
266                    }
267                    // Check for processes needing restart
268                    self.schedule_restarts();
269                }
270            }
271        }
272    }
273
274    /// Check for exited processes eligible for auto-restart, schedule delayed restarts.
275    /// Also detects processes that have exhausted their `max_restarts` and marks them failed.
276    fn schedule_restarts(&mut self) {
277        let (restartable, exhausted) = self.pm.classify_restart_candidates();
278
279        // Mark exhausted processes as failed
280        for name in &exhausted {
281            if let Some(p) = self.pm.find(name)
282                && let Some(ref policy) = p.restart_policy
283                && let Some(max) = policy.max_restarts
284                && let Some(ref tx) = p.supervisor_tx
285            {
286                let msg = format!("[agent-procs] Max restarts ({}) exhausted", max);
287                let _ = tx.try_send(msg);
288            }
289            self.pm.mark_failed(name);
290        }
291        if !exhausted.is_empty() {
292            self.publish_proxy_state();
293        }
294
295        // Schedule delayed restarts for eligible processes
296        for name in restartable {
297            if let Some(p) = self.pm.find_mut(&name) {
298                p.restart_pending = true;
299                let delay_ms = p
300                    .restart_policy
301                    .as_ref()
302                    .map_or(1000, |rp| rp.restart_delay_ms);
303                let tx = self.self_tx.clone();
304                let name_clone = name.clone();
305                tokio::spawn(async move {
306                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
307                    let _ = tx.send(PmCommand::AutoRestart { name: name_clone }).await;
308                });
309            }
310        }
311    }
312
313    async fn handle_command(&mut self, cmd: PmCommand) {
314        match cmd {
315            PmCommand::Spawn {
316                command,
317                name,
318                cwd,
319                env,
320                port,
321                restart,
322                watch,
323                reply,
324            } => {
325                let mut resp = self
326                    .pm
327                    .spawn_process(&command, name, cwd.as_deref(), env.as_ref(), port)
328                    .await;
329                // Attach restart/watch config if spawn succeeded
330                let has_watch = watch.is_some();
331                if let Response::RunOk { ref name, .. } = resp
332                    && let Some(p) = self.pm.find_mut(name)
333                {
334                    p.restart_policy = restart;
335                    p.watch_config = watch;
336                }
337                // Set up file watcher if watch config present
338                if has_watch
339                    && let Response::RunOk { ref name, .. } = resp
340                    && let Err(e) = self.setup_watcher(name)
341                {
342                    // Process started but watcher failed — annotate the log.
343                    // setup_watcher already cleared watch_config so status won't
344                    // falsely report the process as watched.
345                    let msg = format!("[agent-procs] Watch setup failed: {}", e);
346                    if let Some(p) = self.pm.find(name)
347                        && let Some(tx) = &p.supervisor_tx
348                    {
349                        let _ = tx.try_send(msg);
350                    }
351                }
352                if let Response::RunOk {
353                    ref name,
354                    ref mut url,
355                    port: Some(p),
356                    ..
357                } = resp
358                    && let Some(pp) = self.proxy_port
359                {
360                    *url = Some(protocol::process_url(name, p, Some(pp)));
361                }
362                self.publish_proxy_state();
363                let _ = reply.send(resp);
364            }
365            PmCommand::Stop { target, reply } => {
366                // Tear down watcher on stop
367                if let Some(p) = self.pm.find_mut(&target) {
368                    p.watch_handle = None;
369                }
370                let resp = self.pm.stop_process(&target).await;
371                self.publish_proxy_state();
372                let _ = reply.send(resp);
373            }
374            PmCommand::StopAll { reply } => {
375                let resp = self.pm.stop_all().await;
376                self.publish_proxy_state();
377                let _ = reply.send(resp);
378            }
379            PmCommand::Restart { target, reply } => {
380                let resp = self.pm.restart_process(&target).await;
381                // Recreate file watcher if watch config was preserved
382                if let Response::RunOk { ref name, .. } = resp {
383                    let _ = self.setup_watcher(name);
384                }
385                self.publish_proxy_state();
386                let _ = reply.send(resp);
387            }
388            PmCommand::Status { reply } | PmCommand::StatusSnapshot { reply } => {
389                let _ = reply.send(self.build_status());
390            }
391            PmCommand::HasProcess { target, reply } => {
392                let _ = reply.send(self.pm.has_process(&target));
393            }
394            PmCommand::SessionName { reply } => {
395                let _ = reply.send(self.pm.session_name().to_string());
396            }
397            PmCommand::IsProcessExited { target, reply } => {
398                let _ = reply.send(self.pm.is_process_exited(&target));
399            }
400            PmCommand::EnableProxy { proxy_port, reply } => {
401                if let Some(existing) = self.proxy_port {
402                    let _ = reply.send(Some(existing));
403                } else {
404                    self.proxy_port = Some(proxy_port);
405                    self.pm.enable_proxy();
406                    self.publish_proxy_state();
407                    let _ = reply.send(None);
408                }
409            }
410            PmCommand::Subscribe { reply } => {
411                let _ = reply.send(self.pm.output_tx.subscribe());
412            }
413            PmCommand::AutoRestart { name } => {
414                self.handle_auto_restart(&name).await;
415            }
416            PmCommand::WatchRestart { name } => {
417                self.handle_watch_restart(&name).await;
418            }
419        }
420    }
421
422    async fn handle_auto_restart(&mut self, name: &str) {
423        // Re-check guards
424        let should_restart = self
425            .pm
426            .find(name)
427            .is_some_and(|p| p.child.is_none() && !p.manually_stopped && p.restart_pending);
428        if !should_restart {
429            if let Some(p) = self.pm.find_mut(name) {
430                p.restart_pending = false;
431            }
432            return;
433        }
434
435        // Capture exit code before respawn (respawn_in_place removes the old record)
436        let prev_exit_code = self.pm.find(name).and_then(|p| p.exit_code);
437
438        // Increment count
439        if let Some(p) = self.pm.find_mut(name) {
440            p.restart_count += 1;
441        }
442
443        // Respawn
444        match self.pm.respawn_in_place(name).await {
445            Ok(()) => {
446                // Send success annotation to new capture task
447                if let Some(p) = self.pm.find(name) {
448                    let count = p.restart_count;
449                    let max = p.restart_policy.as_ref().and_then(|rp| rp.max_restarts);
450                    let exit = prev_exit_code.map_or("signal".into(), |c: i32| c.to_string());
451                    let msg = match max {
452                        Some(m) => {
453                            format!(
454                                "[agent-procs] Restarted (exit {}, attempt {}/{})",
455                                exit, count, m
456                            )
457                        }
458                        None => {
459                            format!("[agent-procs] Restarted (exit {}, attempt {})", exit, count)
460                        }
461                    };
462                    if let Some(tx) = &p.supervisor_tx {
463                        let _ = tx.send(msg).await;
464                    }
465                }
466                // Recreate file watcher for the new process (old one was
467                // dropped when respawn_in_place removed the process record)
468                let _ = self.setup_watcher(name);
469            }
470            Err(err) => {
471                // Broadcast failure (live only)
472                let msg = format!("[agent-procs] Restart failed: {}", err);
473                let _ = self.pm.output_tx.send(OutputLine {
474                    process: name.to_string(),
475                    stream: ProtoStream::Stdout,
476                    line: msg,
477                });
478            }
479        }
480
481        // Clear pending
482        if let Some(p) = self.pm.find_mut(name) {
483            p.restart_pending = false;
484        }
485        self.publish_proxy_state();
486    }
487
488    async fn handle_watch_restart(&mut self, name: &str) {
489        let should_restart = self.pm.find(name).is_some_and(|p| !p.manually_stopped);
490        if !should_restart {
491            return;
492        }
493
494        // Stop process (without manually_stopped)
495        if self.pm.find(name).is_some_and(|p| p.child.is_some()) {
496            let _ = self.pm.stop_process(name).await;
497            // Clear manually_stopped (stop_process sets it)
498            if let Some(p) = self.pm.find_mut(name) {
499                p.manually_stopped = false;
500            }
501        }
502
503        // Reset restart count (watch restarts are intentional)
504        if let Some(p) = self.pm.find_mut(name) {
505            p.restart_count = 0;
506            p.failed = false;
507        }
508
509        // Respawn
510        match self.pm.respawn_in_place(name).await {
511            Ok(()) => {
512                if let Some(p) = self.pm.find(name)
513                    && let Some(tx) = &p.supervisor_tx
514                {
515                    let _ = tx
516                        .send("[agent-procs] File changed, restarted".to_string())
517                        .await;
518                }
519                // Recreate watcher for the new process
520                let _ = self.setup_watcher(name);
521            }
522            Err(err) => {
523                let _ = self.pm.output_tx.send(OutputLine {
524                    process: name.to_string(),
525                    stream: ProtoStream::Stdout,
526                    line: format!("[agent-procs] Watch restart failed: {}", err),
527                });
528            }
529        }
530
531        self.publish_proxy_state();
532    }
533
534    /// Set up a file watcher for a process if it has a `WatchConfig`.
535    /// Returns `Err` with an error message if the watcher could not be created.
536    /// On failure, clears `watch_config` so status/TUI don't report a phantom watcher.
537    fn setup_watcher(&mut self, name: &str) -> Result<(), String> {
538        let (paths, ignore, cwd) = {
539            let Some(p) = self.pm.find(name) else {
540                return Ok(());
541            };
542            let Some(ref wc) = p.watch_config else {
543                return Ok(());
544            };
545            (
546                wc.paths.clone(),
547                wc.ignore.clone(),
548                p.cwd.clone().unwrap_or_else(|| ".".to_string()),
549            )
550        };
551
552        let base_dir = std::path::PathBuf::from(&cwd);
553        let ignore_refs: Option<Vec<String>> = ignore;
554        let ignore_slice = ignore_refs.as_deref();
555
556        let tx = self.self_tx.clone();
557        let proc_name = name.to_string();
558        let (restart_tx, mut restart_rx) = tokio::sync::mpsc::channel::<String>(16);
559
560        // Forward watch events to PmCommand::WatchRestart
561        tokio::spawn(async move {
562            while let Some(name) = restart_rx.recv().await {
563                let _ = tx.send(PmCommand::WatchRestart { name }).await;
564            }
565        });
566
567        match crate::daemon::watcher::create_watcher(
568            &paths,
569            ignore_slice,
570            &base_dir,
571            proc_name,
572            restart_tx,
573        ) {
574            Ok(handle) => {
575                if let Some(p) = self.pm.find_mut(name) {
576                    p.watch_handle = Some(handle);
577                }
578                Ok(())
579            }
580            Err(e) => {
581                tracing::warn!(process = %name, error = %e, "failed to create file watcher");
582                // Clear watch_config so status/TUI don't report as watched
583                if let Some(p) = self.pm.find_mut(name) {
584                    p.watch_config = None;
585                }
586                Err(e)
587            }
588        }
589    }
590
591    /// Build a status response with proxy URL rewriting applied.
592    fn build_status(&mut self) -> Response {
593        if self.pm.refresh_exit_states() {
594            self.publish_proxy_state();
595        }
596        let mut resp = self.pm.status_snapshot();
597        self.rewrite_urls(&mut resp);
598        resp
599    }
600
601    /// Rewrite process URLs to proxy form when proxy is enabled.
602    fn rewrite_urls(&self, resp: &mut Response) {
603        let Some(pp) = self.proxy_port else { return };
604        if let Response::Status { ref mut processes } = *resp {
605            for p in processes.iter_mut() {
606                if let Some(port) = p.port {
607                    p.url = Some(protocol::process_url(&p.name, port, Some(pp)));
608                }
609            }
610        }
611    }
612
613    /// Publish current port map to the watch channel for lock-free proxy reads.
614    /// Skips the update if the port map hasn't changed.
615    fn publish_proxy_state(&self) {
616        let new_map = self.pm.running_ports();
617        let current = self.proxy_state_tx.borrow();
618        if current.port_map == new_map {
619            return;
620        }
621        drop(current);
622        let _ = self.proxy_state_tx.send(ProxyState { port_map: new_map });
623    }
624}