Skip to main content

agent_procs/daemon/
actor.rs

1use crate::daemon::log_writer::OutputLine;
2use crate::daemon::process_manager::ProcessManager;
3use crate::protocol::{self, ErrorCode, Response};
4use std::collections::HashMap;
5use tokio::sync::{broadcast, mpsc, oneshot, watch};
6
7/// State published via the watch channel for lock-free proxy reads.
8#[derive(Debug, Clone, PartialEq)]
9pub struct ProxyState {
10    /// Running process name → backend port.
11    pub port_map: HashMap<String, u16>,
12}
13
14/// Commands sent to the actor via [`PmHandle`].
15enum PmCommand {
16    Spawn {
17        command: String,
18        name: Option<String>,
19        cwd: Option<String>,
20        env: Option<HashMap<String, String>>,
21        port: Option<u16>,
22        reply: oneshot::Sender<Response>,
23    },
24    Stop {
25        target: String,
26        reply: oneshot::Sender<Response>,
27    },
28    StopAll {
29        reply: oneshot::Sender<Response>,
30    },
31    Restart {
32        target: String,
33        reply: oneshot::Sender<Response>,
34    },
35    Status {
36        reply: oneshot::Sender<Response>,
37    },
38    StatusSnapshot {
39        reply: oneshot::Sender<Response>,
40    },
41    HasProcess {
42        target: String,
43        reply: oneshot::Sender<bool>,
44    },
45    SessionName {
46        reply: oneshot::Sender<String>,
47    },
48    #[allow(clippy::option_option)]
49    IsProcessExited {
50        target: String,
51        reply: oneshot::Sender<Option<Option<i32>>>,
52    },
53    /// Returns `Some(existing_port)` if proxy is already enabled, `None` if newly enabled.
54    EnableProxy {
55        proxy_port: u16,
56        reply: oneshot::Sender<Option<u16>>,
57    },
58    Subscribe {
59        reply: oneshot::Sender<broadcast::Receiver<OutputLine>>,
60    },
61}
62
63fn actor_error(msg: &str) -> Response {
64    Response::Error {
65        code: ErrorCode::General,
66        message: msg.into(),
67    }
68}
69
70/// Cloneable handle for sending commands to the [`ProcessManagerActor`].
71#[derive(Clone)]
72pub struct PmHandle {
73    tx: mpsc::Sender<PmCommand>,
74}
75
76impl PmHandle {
77    pub async fn spawn_process(
78        &self,
79        command: String,
80        name: Option<String>,
81        cwd: Option<String>,
82        env: Option<HashMap<String, String>>,
83        port: Option<u16>,
84    ) -> Response {
85        let (reply, rx) = oneshot::channel();
86        let _ = self
87            .tx
88            .send(PmCommand::Spawn {
89                command,
90                name,
91                cwd,
92                env,
93                port,
94                reply,
95            })
96            .await;
97        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
98    }
99
100    pub async fn stop_process(&self, target: &str) -> Response {
101        let (reply, rx) = oneshot::channel();
102        let _ = self
103            .tx
104            .send(PmCommand::Stop {
105                target: target.to_string(),
106                reply,
107            })
108            .await;
109        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
110    }
111
112    pub async fn stop_all(&self) -> Response {
113        let (reply, rx) = oneshot::channel();
114        let _ = self.tx.send(PmCommand::StopAll { reply }).await;
115        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
116    }
117
118    pub async fn restart_process(&self, target: &str) -> Response {
119        let (reply, rx) = oneshot::channel();
120        let _ = self
121            .tx
122            .send(PmCommand::Restart {
123                target: target.to_string(),
124                reply,
125            })
126            .await;
127        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
128    }
129
130    pub async fn status(&self) -> Response {
131        let (reply, rx) = oneshot::channel();
132        let _ = self.tx.send(PmCommand::Status { reply }).await;
133        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
134    }
135
136    pub async fn status_snapshot(&self) -> Response {
137        let (reply, rx) = oneshot::channel();
138        let _ = self.tx.send(PmCommand::StatusSnapshot { reply }).await;
139        rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
140    }
141
142    pub async fn has_process(&self, target: &str) -> bool {
143        let (reply, rx) = oneshot::channel();
144        let _ = self
145            .tx
146            .send(PmCommand::HasProcess {
147                target: target.to_string(),
148                reply,
149            })
150            .await;
151        rx.await.unwrap_or(false)
152    }
153
154    pub async fn session_name(&self) -> String {
155        let (reply, rx) = oneshot::channel();
156        let _ = self.tx.send(PmCommand::SessionName { reply }).await;
157        rx.await.unwrap_or_default()
158    }
159
160    pub async fn is_process_exited(&self, target: &str) -> Option<Option<i32>> {
161        let (reply, rx) = oneshot::channel();
162        let _ = self
163            .tx
164            .send(PmCommand::IsProcessExited {
165                target: target.to_string(),
166                reply,
167            })
168            .await;
169        rx.await.unwrap_or(None)
170    }
171
172    /// Enable proxy with the given port. Returns `Some(existing_port)` if already enabled.
173    pub async fn enable_proxy(&self, proxy_port: u16) -> Option<u16> {
174        let (reply, rx) = oneshot::channel();
175        let _ = self
176            .tx
177            .send(PmCommand::EnableProxy { proxy_port, reply })
178            .await;
179        rx.await.unwrap_or(None)
180    }
181
182    pub async fn subscribe(&self) -> broadcast::Receiver<OutputLine> {
183        let (reply, rx) = oneshot::channel();
184        let _ = self.tx.send(PmCommand::Subscribe { reply }).await;
185        rx.await.expect("actor should be alive for subscribe")
186    }
187}
188
189/// Actor that owns the [`ProcessManager`] and processes commands sequentially.
190pub struct ProcessManagerActor {
191    pm: ProcessManager,
192    rx: mpsc::Receiver<PmCommand>,
193    proxy_state_tx: watch::Sender<ProxyState>,
194    proxy_port: Option<u16>,
195}
196
197impl ProcessManagerActor {
198    /// Create the actor, its handle, and the proxy state watch channel.
199    pub fn new(session: &str) -> (PmHandle, watch::Receiver<ProxyState>, Self) {
200        let (tx, rx) = mpsc::channel(256);
201        let pm = ProcessManager::new(session);
202
203        let initial_state = ProxyState {
204            port_map: HashMap::new(),
205        };
206        let (proxy_state_tx, proxy_state_rx) = watch::channel(initial_state);
207
208        let handle = PmHandle { tx };
209        let actor = Self {
210            pm,
211            rx,
212            proxy_state_tx,
213            proxy_port: None,
214        };
215
216        (handle, proxy_state_rx, actor)
217    }
218
219    /// Run the actor loop until all senders are dropped.
220    pub async fn run(mut self) {
221        while let Some(cmd) = self.rx.recv().await {
222            self.handle_command(cmd).await;
223        }
224    }
225
226    async fn handle_command(&mut self, cmd: PmCommand) {
227        match cmd {
228            PmCommand::Spawn {
229                command,
230                name,
231                cwd,
232                env,
233                port,
234                reply,
235            } => {
236                let mut resp = self
237                    .pm
238                    .spawn_process(&command, name, cwd.as_deref(), env.as_ref(), port)
239                    .await;
240                if let Response::RunOk {
241                    ref name,
242                    ref mut url,
243                    port: Some(p),
244                    ..
245                } = resp
246                    && let Some(pp) = self.proxy_port
247                {
248                    *url = Some(protocol::process_url(name, p, Some(pp)));
249                }
250                self.publish_proxy_state();
251                let _ = reply.send(resp);
252            }
253            PmCommand::Stop { target, reply } => {
254                let resp = self.pm.stop_process(&target).await;
255                self.publish_proxy_state();
256                let _ = reply.send(resp);
257            }
258            PmCommand::StopAll { reply } => {
259                let resp = self.pm.stop_all().await;
260                self.publish_proxy_state();
261                let _ = reply.send(resp);
262            }
263            PmCommand::Restart { target, reply } => {
264                let resp = self.pm.restart_process(&target).await;
265                self.publish_proxy_state();
266                let _ = reply.send(resp);
267            }
268            PmCommand::Status { reply } => {
269                let _ = reply.send(self.build_status());
270            }
271            PmCommand::StatusSnapshot { reply } => {
272                let _ = reply.send(self.build_status_snapshot());
273            }
274            PmCommand::HasProcess { target, reply } => {
275                let _ = reply.send(self.pm.has_process(&target));
276            }
277            PmCommand::SessionName { reply } => {
278                let _ = reply.send(self.pm.session_name().to_string());
279            }
280            PmCommand::IsProcessExited { target, reply } => {
281                let _ = reply.send(self.pm.is_process_exited(&target));
282            }
283            PmCommand::EnableProxy { proxy_port, reply } => {
284                if let Some(existing) = self.proxy_port {
285                    let _ = reply.send(Some(existing));
286                } else {
287                    self.proxy_port = Some(proxy_port);
288                    self.pm.enable_proxy();
289                    self.publish_proxy_state();
290                    let _ = reply.send(None);
291                }
292            }
293            PmCommand::Subscribe { reply } => {
294                let _ = reply.send(self.pm.output_tx.subscribe());
295            }
296        }
297    }
298
299    /// Build a status response with proxy URL rewriting applied.
300    fn build_status(&mut self) -> Response {
301        let mut resp = self.pm.status();
302        self.rewrite_urls(&mut resp);
303        resp
304    }
305
306    /// Build a status snapshot with proxy URL rewriting applied.
307    fn build_status_snapshot(&self) -> Response {
308        let mut resp = self.pm.status_snapshot();
309        self.rewrite_urls(&mut resp);
310        resp
311    }
312
313    /// Rewrite process URLs to proxy form when proxy is enabled.
314    fn rewrite_urls(&self, resp: &mut Response) {
315        let Some(pp) = self.proxy_port else { return };
316        if let Response::Status { ref mut processes } = *resp {
317            for p in processes.iter_mut() {
318                if let Some(port) = p.port {
319                    p.url = Some(protocol::process_url(&p.name, port, Some(pp)));
320                }
321            }
322        }
323    }
324
325    /// Publish current port map to the watch channel for lock-free proxy reads.
326    /// Skips the update if the port map hasn't changed.
327    fn publish_proxy_state(&self) {
328        let new_map = self.pm.running_ports();
329        let current = self.proxy_state_tx.borrow();
330        if current.port_map == new_map {
331            return;
332        }
333        drop(current);
334        let _ = self.proxy_state_tx.send(ProxyState { port_map: new_map });
335    }
336}