Skip to main content

agent_procs/daemon/
actor.rs

1use crate::daemon::log_writer::OutputLine;
2use crate::daemon::process_manager::ProcessManager;
3use crate::protocol::{self, ErrorCode, Response};
4use std::collections::HashMap;
5use tokio::sync::{broadcast, mpsc, oneshot, watch};
6
7/// State published via the watch channel for lock-free proxy reads.
8#[derive(Debug, Clone, PartialEq)]
9pub struct ProxyState {
10    /// Running process name → backend port.
11    pub port_map: HashMap<String, u16>,
12}
13
14/// Commands sent to the actor via [`PmHandle`].
15enum PmCommand {
16    Spawn {
17        command: String,
18        name: Option<String>,
19        cwd: Option<String>,
20        env: Option<HashMap<String, String>>,
21        port: Option<u16>,
22        reply: oneshot::Sender<Response>,
23    },
24    Stop {
25        target: String,
26        reply: oneshot::Sender<Response>,
27    },
28    StopAll {
29        reply: oneshot::Sender<Response>,
30    },
31    Restart {
32        target: String,
33        reply: oneshot::Sender<Response>,
34    },
35    Status {
36        reply: oneshot::Sender<Response>,
37    },
38    StatusSnapshot {
39        reply: oneshot::Sender<Response>,
40    },
41    HasProcess {
42        target: String,
43        reply: oneshot::Sender<bool>,
44    },
45    SessionName {
46        reply: oneshot::Sender<String>,
47    },
48    #[allow(clippy::option_option)]
49    IsProcessExited {
50        target: String,
51        reply: oneshot::Sender<Option<Option<i32>>>,
52    },
53    /// Returns `Some(existing_port)` if proxy is already enabled, `None` if newly enabled.
54    EnableProxy {
55        proxy_port: u16,
56        reply: oneshot::Sender<Option<u16>>,
57    },
58    Subscribe {
59        reply: oneshot::Sender<broadcast::Receiver<OutputLine>>,
60    },
61}
62
63fn actor_error(msg: &str) -> Response {
64    Response::Error {
65        code: ErrorCode::General,
66        message: msg.into(),
67    }
68}
69
70/// Cloneable handle for sending commands to the [`ProcessManagerActor`].
71#[derive(Clone)]
72pub struct PmHandle {
73    tx: mpsc::Sender<PmCommand>,
74}
75
76impl PmHandle {
77    pub async fn spawn_process(
78        &self,
79        command: String,
80        name: Option<String>,
81        cwd: Option<String>,
82        env: Option<HashMap<String, String>>,
83        port: Option<u16>,
84    ) -> Response {
85        let (reply, rx) = oneshot::channel();
86        let _ = self
87            .tx
88            .send(PmCommand::Spawn {
89                command,
90                name,
91                cwd,
92                env,
93                port,
94                reply,
95            })
96            .await;
97        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
98    }
99
100    pub async fn stop_process(&self, target: &str) -> Response {
101        let (reply, rx) = oneshot::channel();
102        let _ = self
103            .tx
104            .send(PmCommand::Stop {
105                target: target.to_string(),
106                reply,
107            })
108            .await;
109        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
110    }
111
112    pub async fn stop_all(&self) -> Response {
113        let (reply, rx) = oneshot::channel();
114        let _ = self.tx.send(PmCommand::StopAll { reply }).await;
115        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
116    }
117
118    pub async fn restart_process(&self, target: &str) -> Response {
119        let (reply, rx) = oneshot::channel();
120        let _ = self
121            .tx
122            .send(PmCommand::Restart {
123                target: target.to_string(),
124                reply,
125            })
126            .await;
127        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
128    }
129
130    pub async fn status(&self) -> Response {
131        let (reply, rx) = oneshot::channel();
132        let _ = self.tx.send(PmCommand::Status { reply }).await;
133        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
134    }
135
136    pub async fn status_snapshot(&self) -> Response {
137        let (reply, rx) = oneshot::channel();
138        let _ = self.tx.send(PmCommand::StatusSnapshot { reply }).await;
139        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
140    }
141
142    pub async fn has_process(&self, target: &str) -> bool {
143        let (reply, rx) = oneshot::channel();
144        let _ = self
145            .tx
146            .send(PmCommand::HasProcess {
147                target: target.to_string(),
148                reply,
149            })
150            .await;
151        rx.await.unwrap_or(false)
152    }
153
154    pub async fn session_name(&self) -> String {
155        let (reply, rx) = oneshot::channel();
156        let _ = self.tx.send(PmCommand::SessionName { reply }).await;
157        rx.await.unwrap_or_default()
158    }
159
160    pub async fn is_process_exited(&self, target: &str) -> Option<Option<i32>> {
161        let (reply, rx) = oneshot::channel();
162        let _ = self
163            .tx
164            .send(PmCommand::IsProcessExited {
165                target: target.to_string(),
166                reply,
167            })
168            .await;
169        rx.await.unwrap_or(None)
170    }
171
172    /// Enable proxy with the given port. Returns `Some(existing_port)` if already enabled.
173    pub async fn enable_proxy(&self, proxy_port: u16) -> Option<u16> {
174        let (reply, rx) = oneshot::channel();
175        let _ = self
176            .tx
177            .send(PmCommand::EnableProxy { proxy_port, reply })
178            .await;
179        rx.await.unwrap_or(None)
180    }
181
182    pub async fn subscribe(&self) -> broadcast::Receiver<OutputLine> {
183        let (reply, rx) = oneshot::channel();
184        let _ = self.tx.send(PmCommand::Subscribe { reply }).await;
185        rx.await.expect("actor should be alive for subscribe")
186    }
187}
188
189/// Actor that owns the [`ProcessManager`] and processes commands sequentially.
190pub struct ProcessManagerActor {
191    pm: ProcessManager,
192    rx: mpsc::Receiver<PmCommand>,
193    proxy_state_tx: watch::Sender<ProxyState>,
194    proxy_port: Option<u16>,
195}
196
197impl ProcessManagerActor {
198    /// Create the actor, its handle, and the proxy state watch channel.
199    pub fn new(session: &str) -> (PmHandle, watch::Receiver<ProxyState>, Self) {
200        let (tx, rx) = mpsc::channel(256);
201        let pm = ProcessManager::new(session);
202
203        let initial_state = ProxyState {
204            port_map: HashMap::new(),
205        };
206        let (proxy_state_tx, proxy_state_rx) = watch::channel(initial_state);
207
208        let handle = PmHandle { tx };
209        let actor = Self {
210            pm,
211            rx,
212            proxy_state_tx,
213            proxy_port: None,
214        };
215
216        (handle, proxy_state_rx, actor)
217    }
218
219    /// Run the actor loop until all senders are dropped.
220    pub async fn run(mut self) {
221        while let Some(cmd) = self.rx.recv().await {
222            self.handle_command(cmd).await;
223        }
224    }
225
226    async fn handle_command(&mut self, cmd: PmCommand) {
227        match cmd {
228            PmCommand::Spawn {
229                command,
230                name,
231                cwd,
232                env,
233                port,
234                reply,
235            } => {
236                let mut resp = self
237                    .pm
238                    .spawn_process(&command, name, cwd.as_deref(), env.as_ref(), port)
239                    .await;
240                if let Response::RunOk {
241                    ref name,
242                    ref mut url,
243                    port: Some(p),
244                    ..
245                } = resp
246                {
247                    if let Some(pp) = self.proxy_port {
248                        *url = Some(protocol::process_url(name, p, Some(pp)));
249                    }
250                }
251                self.publish_proxy_state();
252                let _ = reply.send(resp);
253            }
254            PmCommand::Stop { target, reply } => {
255                let resp = self.pm.stop_process(&target).await;
256                self.publish_proxy_state();
257                let _ = reply.send(resp);
258            }
259            PmCommand::StopAll { reply } => {
260                let resp = self.pm.stop_all().await;
261                self.publish_proxy_state();
262                let _ = reply.send(resp);
263            }
264            PmCommand::Restart { target, reply } => {
265                let resp = self.pm.restart_process(&target).await;
266                self.publish_proxy_state();
267                let _ = reply.send(resp);
268            }
269            PmCommand::Status { reply } => {
270                let _ = reply.send(self.build_status());
271            }
272            PmCommand::StatusSnapshot { reply } => {
273                let _ = reply.send(self.build_status_snapshot());
274            }
275            PmCommand::HasProcess { target, reply } => {
276                let _ = reply.send(self.pm.has_process(&target));
277            }
278            PmCommand::SessionName { reply } => {
279                let _ = reply.send(self.pm.session_name().to_string());
280            }
281            PmCommand::IsProcessExited { target, reply } => {
282                let _ = reply.send(self.pm.is_process_exited(&target));
283            }
284            PmCommand::EnableProxy { proxy_port, reply } => {
285                if let Some(existing) = self.proxy_port {
286                    let _ = reply.send(Some(existing));
287                } else {
288                    self.proxy_port = Some(proxy_port);
289                    self.pm.enable_proxy();
290                    self.publish_proxy_state();
291                    let _ = reply.send(None);
292                }
293            }
294            PmCommand::Subscribe { reply } => {
295                let _ = reply.send(self.pm.output_tx.subscribe());
296            }
297        }
298    }
299
300    /// Build a status response with proxy URL rewriting applied.
301    fn build_status(&mut self) -> Response {
302        let mut resp = self.pm.status();
303        self.rewrite_urls(&mut resp);
304        resp
305    }
306
307    /// Build a status snapshot with proxy URL rewriting applied.
308    fn build_status_snapshot(&self) -> Response {
309        let mut resp = self.pm.status_snapshot();
310        self.rewrite_urls(&mut resp);
311        resp
312    }
313
314    /// Rewrite process URLs to proxy form when proxy is enabled.
315    fn rewrite_urls(&self, resp: &mut Response) {
316        let Some(pp) = self.proxy_port else { return };
317        if let Response::Status { ref mut processes } = *resp {
318            for p in processes.iter_mut() {
319                if let Some(port) = p.port {
320                    p.url = Some(protocol::process_url(&p.name, port, Some(pp)));
321                }
322            }
323        }
324    }
325
326    /// Publish current port map to the watch channel for lock-free proxy reads.
327    /// Skips the update if the port map hasn't changed.
328    fn publish_proxy_state(&self) {
329        let new_map = self.pm.running_ports();
330        let current = self.proxy_state_tx.borrow();
331        if current.port_map == new_map {
332            return;
333        }
334        drop(current);
335        let _ = self.proxy_state_tx.send(ProxyState { port_map: new_map });
336    }
337}