Skip to main content

agent_procs/daemon/
actor.rs

1use crate::daemon::log_writer::OutputLine;
2use crate::daemon::process_manager::ProcessManager;
3use crate::protocol::{self, ErrorCode, Response};
4use std::collections::HashMap;
5use tokio::sync::{broadcast, mpsc, oneshot, watch};
6use tokio::time::{self, Duration, MissedTickBehavior};
7
8const EXIT_REFRESH_INTERVAL: Duration = Duration::from_millis(200);
9
10/// State published via the watch channel for lock-free proxy reads.
11#[derive(Debug, Clone, PartialEq)]
12pub struct ProxyState {
13    /// Running process name → backend port.
14    pub port_map: HashMap<String, u16>,
15}
16
17/// Commands sent to the actor via [`PmHandle`].
18enum PmCommand {
19    Spawn {
20        command: String,
21        name: Option<String>,
22        cwd: Option<String>,
23        env: Option<HashMap<String, String>>,
24        port: Option<u16>,
25        reply: oneshot::Sender<Response>,
26    },
27    Stop {
28        target: String,
29        reply: oneshot::Sender<Response>,
30    },
31    StopAll {
32        reply: oneshot::Sender<Response>,
33    },
34    Restart {
35        target: String,
36        reply: oneshot::Sender<Response>,
37    },
38    Status {
39        reply: oneshot::Sender<Response>,
40    },
41    StatusSnapshot {
42        reply: oneshot::Sender<Response>,
43    },
44    HasProcess {
45        target: String,
46        reply: oneshot::Sender<bool>,
47    },
48    SessionName {
49        reply: oneshot::Sender<String>,
50    },
51    #[allow(clippy::option_option)]
52    IsProcessExited {
53        target: String,
54        reply: oneshot::Sender<Option<Option<i32>>>,
55    },
56    /// Returns `Some(existing_port)` if proxy is already enabled, `None` if newly enabled.
57    EnableProxy {
58        proxy_port: u16,
59        reply: oneshot::Sender<Option<u16>>,
60    },
61    Subscribe {
62        reply: oneshot::Sender<broadcast::Receiver<OutputLine>>,
63    },
64}
65
66fn actor_error(msg: &str) -> Response {
67    Response::Error {
68        code: ErrorCode::General,
69        message: msg.into(),
70    }
71}
72
73/// Cloneable handle for sending commands to the [`ProcessManagerActor`].
74#[derive(Clone)]
75pub struct PmHandle {
76    tx: mpsc::Sender<PmCommand>,
77}
78
79impl PmHandle {
80    pub async fn spawn_process(
81        &self,
82        command: String,
83        name: Option<String>,
84        cwd: Option<String>,
85        env: Option<HashMap<String, String>>,
86        port: Option<u16>,
87    ) -> Response {
88        let (reply, rx) = oneshot::channel();
89        let _ = self
90            .tx
91            .send(PmCommand::Spawn {
92                command,
93                name,
94                cwd,
95                env,
96                port,
97                reply,
98            })
99            .await;
100        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
101    }
102
103    pub async fn stop_process(&self, target: &str) -> Response {
104        let (reply, rx) = oneshot::channel();
105        let _ = self
106            .tx
107            .send(PmCommand::Stop {
108                target: target.to_string(),
109                reply,
110            })
111            .await;
112        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
113    }
114
115    pub async fn stop_all(&self) -> Response {
116        let (reply, rx) = oneshot::channel();
117        let _ = self.tx.send(PmCommand::StopAll { reply }).await;
118        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
119    }
120
121    pub async fn restart_process(&self, target: &str) -> Response {
122        let (reply, rx) = oneshot::channel();
123        let _ = self
124            .tx
125            .send(PmCommand::Restart {
126                target: target.to_string(),
127                reply,
128            })
129            .await;
130        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
131    }
132
133    pub async fn status(&self) -> Response {
134        let (reply, rx) = oneshot::channel();
135        let _ = self.tx.send(PmCommand::Status { reply }).await;
136        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
137    }
138
139    pub async fn status_snapshot(&self) -> Response {
140        let (reply, rx) = oneshot::channel();
141        let _ = self.tx.send(PmCommand::StatusSnapshot { reply }).await;
142        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
143    }
144
145    pub async fn has_process(&self, target: &str) -> bool {
146        let (reply, rx) = oneshot::channel();
147        let _ = self
148            .tx
149            .send(PmCommand::HasProcess {
150                target: target.to_string(),
151                reply,
152            })
153            .await;
154        rx.await.unwrap_or(false)
155    }
156
157    pub async fn session_name(&self) -> String {
158        let (reply, rx) = oneshot::channel();
159        let _ = self.tx.send(PmCommand::SessionName { reply }).await;
160        rx.await.unwrap_or_default()
161    }
162
163    pub async fn is_process_exited(&self, target: &str) -> Option<Option<i32>> {
164        let (reply, rx) = oneshot::channel();
165        let _ = self
166            .tx
167            .send(PmCommand::IsProcessExited {
168                target: target.to_string(),
169                reply,
170            })
171            .await;
172        rx.await.unwrap_or(None)
173    }
174
175    /// Enable proxy with the given port. Returns `Some(existing_port)` if already enabled.
176    pub async fn enable_proxy(&self, proxy_port: u16) -> Option<u16> {
177        let (reply, rx) = oneshot::channel();
178        let _ = self
179            .tx
180            .send(PmCommand::EnableProxy { proxy_port, reply })
181            .await;
182        rx.await.unwrap_or(None)
183    }
184
185    pub async fn subscribe(&self) -> broadcast::Receiver<OutputLine> {
186        let (reply, rx) = oneshot::channel();
187        let _ = self.tx.send(PmCommand::Subscribe { reply }).await;
188        rx.await.expect("actor should be alive for subscribe")
189    }
190}
191
192/// Actor that owns the [`ProcessManager`] and processes commands sequentially.
193pub struct ProcessManagerActor {
194    pm: ProcessManager,
195    rx: mpsc::Receiver<PmCommand>,
196    proxy_state_tx: watch::Sender<ProxyState>,
197    proxy_port: Option<u16>,
198}
199
200impl ProcessManagerActor {
201    /// Create the actor, its handle, and the proxy state watch channel.
202    pub fn new(session: &str) -> (PmHandle, watch::Receiver<ProxyState>, Self) {
203        let (tx, rx) = mpsc::channel(256);
204        let pm = ProcessManager::new(session);
205
206        let initial_state = ProxyState {
207            port_map: HashMap::new(),
208        };
209        let (proxy_state_tx, proxy_state_rx) = watch::channel(initial_state);
210
211        let handle = PmHandle { tx };
212        let actor = Self {
213            pm,
214            rx,
215            proxy_state_tx,
216            proxy_port: None,
217        };
218
219        (handle, proxy_state_rx, actor)
220    }
221
222    /// Run the actor loop until all senders are dropped.
223    pub async fn run(mut self) {
224        let mut exit_refresh = time::interval(EXIT_REFRESH_INTERVAL);
225        exit_refresh.set_missed_tick_behavior(MissedTickBehavior::Delay);
226
227        loop {
228            tokio::select! {
229                cmd = self.rx.recv() => match cmd {
230                    Some(cmd) => self.handle_command(cmd).await,
231                    None => break,
232                },
233                _ = exit_refresh.tick() => {
234                    if self.pm.refresh_exit_states() {
235                        self.publish_proxy_state();
236                    }
237                }
238            }
239        }
240    }
241
242    async fn handle_command(&mut self, cmd: PmCommand) {
243        match cmd {
244            PmCommand::Spawn {
245                command,
246                name,
247                cwd,
248                env,
249                port,
250                reply,
251            } => {
252                let mut resp = self
253                    .pm
254                    .spawn_process(&command, name, cwd.as_deref(), env.as_ref(), port)
255                    .await;
256                if let Response::RunOk {
257                    ref name,
258                    ref mut url,
259                    port: Some(p),
260                    ..
261                } = resp
262                    && let Some(pp) = self.proxy_port
263                {
264                    *url = Some(protocol::process_url(name, p, Some(pp)));
265                }
266                self.publish_proxy_state();
267                let _ = reply.send(resp);
268            }
269            PmCommand::Stop { target, reply } => {
270                let resp = self.pm.stop_process(&target).await;
271                self.publish_proxy_state();
272                let _ = reply.send(resp);
273            }
274            PmCommand::StopAll { reply } => {
275                let resp = self.pm.stop_all().await;
276                self.publish_proxy_state();
277                let _ = reply.send(resp);
278            }
279            PmCommand::Restart { target, reply } => {
280                let resp = self.pm.restart_process(&target).await;
281                self.publish_proxy_state();
282                let _ = reply.send(resp);
283            }
284            PmCommand::Status { reply } => {
285                let _ = reply.send(self.build_status());
286            }
287            PmCommand::StatusSnapshot { reply } => {
288                let _ = reply.send(self.build_status_snapshot());
289            }
290            PmCommand::HasProcess { target, reply } => {
291                let _ = reply.send(self.pm.has_process(&target));
292            }
293            PmCommand::SessionName { reply } => {
294                let _ = reply.send(self.pm.session_name().to_string());
295            }
296            PmCommand::IsProcessExited { target, reply } => {
297                let _ = reply.send(self.pm.is_process_exited(&target));
298            }
299            PmCommand::EnableProxy { proxy_port, reply } => {
300                if let Some(existing) = self.proxy_port {
301                    let _ = reply.send(Some(existing));
302                } else {
303                    self.proxy_port = Some(proxy_port);
304                    self.pm.enable_proxy();
305                    self.publish_proxy_state();
306                    let _ = reply.send(None);
307                }
308            }
309            PmCommand::Subscribe { reply } => {
310                let _ = reply.send(self.pm.output_tx.subscribe());
311            }
312        }
313    }
314
315    /// Build a status response with proxy URL rewriting applied.
316    fn build_status(&mut self) -> Response {
317        if self.pm.refresh_exit_states() {
318            self.publish_proxy_state();
319        }
320        let mut resp = self.pm.status_snapshot();
321        self.rewrite_urls(&mut resp);
322        resp
323    }
324
325    /// Build a status snapshot with proxy URL rewriting applied.
326    fn build_status_snapshot(&mut self) -> Response {
327        if self.pm.refresh_exit_states() {
328            self.publish_proxy_state();
329        }
330        let mut resp = self.pm.status_snapshot();
331        self.rewrite_urls(&mut resp);
332        resp
333    }
334
335    /// Rewrite process URLs to proxy form when proxy is enabled.
336    fn rewrite_urls(&self, resp: &mut Response) {
337        let Some(pp) = self.proxy_port else { return };
338        if let Response::Status { ref mut processes } = *resp {
339            for p in processes.iter_mut() {
340                if let Some(port) = p.port {
341                    p.url = Some(protocol::process_url(&p.name, port, Some(pp)));
342                }
343            }
344        }
345    }
346
347    /// Publish current port map to the watch channel for lock-free proxy reads.
348    /// Skips the update if the port map hasn't changed.
349    fn publish_proxy_state(&self) {
350        let new_map = self.pm.running_ports();
351        let current = self.proxy_state_tx.borrow();
352        if current.port_map == new_map {
353            return;
354        }
355        drop(current);
356        let _ = self.proxy_state_tx.send(ProxyState { port_map: new_map });
357    }
358}