1use crate::daemon::log_writer::OutputLine;
2use crate::daemon::process_manager::ProcessManager;
3use crate::protocol::{self, ErrorCode, Response};
4use std::collections::HashMap;
5use tokio::sync::{broadcast, mpsc, oneshot, watch};
6use tokio::time::{self, Duration, MissedTickBehavior};
7
8const EXIT_REFRESH_INTERVAL: Duration = Duration::from_millis(200);
9
10#[derive(Debug, Clone, PartialEq)]
12pub struct ProxyState {
13 pub port_map: HashMap<String, u16>,
15}
16
17enum PmCommand {
19 Spawn {
20 command: String,
21 name: Option<String>,
22 cwd: Option<String>,
23 env: Option<HashMap<String, String>>,
24 port: Option<u16>,
25 reply: oneshot::Sender<Response>,
26 },
27 Stop {
28 target: String,
29 reply: oneshot::Sender<Response>,
30 },
31 StopAll {
32 reply: oneshot::Sender<Response>,
33 },
34 Restart {
35 target: String,
36 reply: oneshot::Sender<Response>,
37 },
38 Status {
39 reply: oneshot::Sender<Response>,
40 },
41 StatusSnapshot {
42 reply: oneshot::Sender<Response>,
43 },
44 HasProcess {
45 target: String,
46 reply: oneshot::Sender<bool>,
47 },
48 SessionName {
49 reply: oneshot::Sender<String>,
50 },
51 #[allow(clippy::option_option)]
52 IsProcessExited {
53 target: String,
54 reply: oneshot::Sender<Option<Option<i32>>>,
55 },
56 EnableProxy {
58 proxy_port: u16,
59 reply: oneshot::Sender<Option<u16>>,
60 },
61 Subscribe {
62 reply: oneshot::Sender<broadcast::Receiver<OutputLine>>,
63 },
64}
65
66fn actor_error(msg: &str) -> Response {
67 Response::Error {
68 code: ErrorCode::General,
69 message: msg.into(),
70 }
71}
72
73#[derive(Clone)]
75pub struct PmHandle {
76 tx: mpsc::Sender<PmCommand>,
77}
78
79impl PmHandle {
80 pub async fn spawn_process(
81 &self,
82 command: String,
83 name: Option<String>,
84 cwd: Option<String>,
85 env: Option<HashMap<String, String>>,
86 port: Option<u16>,
87 ) -> Response {
88 let (reply, rx) = oneshot::channel();
89 let _ = self
90 .tx
91 .send(PmCommand::Spawn {
92 command,
93 name,
94 cwd,
95 env,
96 port,
97 reply,
98 })
99 .await;
100 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
101 }
102
103 pub async fn stop_process(&self, target: &str) -> Response {
104 let (reply, rx) = oneshot::channel();
105 let _ = self
106 .tx
107 .send(PmCommand::Stop {
108 target: target.to_string(),
109 reply,
110 })
111 .await;
112 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
113 }
114
115 pub async fn stop_all(&self) -> Response {
116 let (reply, rx) = oneshot::channel();
117 let _ = self.tx.send(PmCommand::StopAll { reply }).await;
118 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
119 }
120
121 pub async fn restart_process(&self, target: &str) -> Response {
122 let (reply, rx) = oneshot::channel();
123 let _ = self
124 .tx
125 .send(PmCommand::Restart {
126 target: target.to_string(),
127 reply,
128 })
129 .await;
130 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
131 }
132
133 pub async fn status(&self) -> Response {
134 let (reply, rx) = oneshot::channel();
135 let _ = self.tx.send(PmCommand::Status { reply }).await;
136 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
137 }
138
139 pub async fn status_snapshot(&self) -> Response {
140 let (reply, rx) = oneshot::channel();
141 let _ = self.tx.send(PmCommand::StatusSnapshot { reply }).await;
142 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
143 }
144
145 pub async fn has_process(&self, target: &str) -> bool {
146 let (reply, rx) = oneshot::channel();
147 let _ = self
148 .tx
149 .send(PmCommand::HasProcess {
150 target: target.to_string(),
151 reply,
152 })
153 .await;
154 rx.await.unwrap_or(false)
155 }
156
157 pub async fn session_name(&self) -> String {
158 let (reply, rx) = oneshot::channel();
159 let _ = self.tx.send(PmCommand::SessionName { reply }).await;
160 rx.await.unwrap_or_default()
161 }
162
163 pub async fn is_process_exited(&self, target: &str) -> Option<Option<i32>> {
164 let (reply, rx) = oneshot::channel();
165 let _ = self
166 .tx
167 .send(PmCommand::IsProcessExited {
168 target: target.to_string(),
169 reply,
170 })
171 .await;
172 rx.await.unwrap_or(None)
173 }
174
175 pub async fn enable_proxy(&self, proxy_port: u16) -> Option<u16> {
177 let (reply, rx) = oneshot::channel();
178 let _ = self
179 .tx
180 .send(PmCommand::EnableProxy { proxy_port, reply })
181 .await;
182 rx.await.unwrap_or(None)
183 }
184
185 pub async fn subscribe(&self) -> broadcast::Receiver<OutputLine> {
186 let (reply, rx) = oneshot::channel();
187 let _ = self.tx.send(PmCommand::Subscribe { reply }).await;
188 rx.await.expect("actor should be alive for subscribe")
189 }
190}
191
192pub struct ProcessManagerActor {
194 pm: ProcessManager,
195 rx: mpsc::Receiver<PmCommand>,
196 proxy_state_tx: watch::Sender<ProxyState>,
197 proxy_port: Option<u16>,
198}
199
200impl ProcessManagerActor {
201 pub fn new(session: &str) -> (PmHandle, watch::Receiver<ProxyState>, Self) {
203 let (tx, rx) = mpsc::channel(256);
204 let pm = ProcessManager::new(session);
205
206 let initial_state = ProxyState {
207 port_map: HashMap::new(),
208 };
209 let (proxy_state_tx, proxy_state_rx) = watch::channel(initial_state);
210
211 let handle = PmHandle { tx };
212 let actor = Self {
213 pm,
214 rx,
215 proxy_state_tx,
216 proxy_port: None,
217 };
218
219 (handle, proxy_state_rx, actor)
220 }
221
222 pub async fn run(mut self) {
224 let mut exit_refresh = time::interval(EXIT_REFRESH_INTERVAL);
225 exit_refresh.set_missed_tick_behavior(MissedTickBehavior::Delay);
226
227 loop {
228 tokio::select! {
229 cmd = self.rx.recv() => match cmd {
230 Some(cmd) => self.handle_command(cmd).await,
231 None => break,
232 },
233 _ = exit_refresh.tick() => {
234 if self.pm.refresh_exit_states() {
235 self.publish_proxy_state();
236 }
237 }
238 }
239 }
240 }
241
242 async fn handle_command(&mut self, cmd: PmCommand) {
243 match cmd {
244 PmCommand::Spawn {
245 command,
246 name,
247 cwd,
248 env,
249 port,
250 reply,
251 } => {
252 let mut resp = self
253 .pm
254 .spawn_process(&command, name, cwd.as_deref(), env.as_ref(), port)
255 .await;
256 if let Response::RunOk {
257 ref name,
258 ref mut url,
259 port: Some(p),
260 ..
261 } = resp
262 && let Some(pp) = self.proxy_port
263 {
264 *url = Some(protocol::process_url(name, p, Some(pp)));
265 }
266 self.publish_proxy_state();
267 let _ = reply.send(resp);
268 }
269 PmCommand::Stop { target, reply } => {
270 let resp = self.pm.stop_process(&target).await;
271 self.publish_proxy_state();
272 let _ = reply.send(resp);
273 }
274 PmCommand::StopAll { reply } => {
275 let resp = self.pm.stop_all().await;
276 self.publish_proxy_state();
277 let _ = reply.send(resp);
278 }
279 PmCommand::Restart { target, reply } => {
280 let resp = self.pm.restart_process(&target).await;
281 self.publish_proxy_state();
282 let _ = reply.send(resp);
283 }
284 PmCommand::Status { reply } => {
285 let _ = reply.send(self.build_status());
286 }
287 PmCommand::StatusSnapshot { reply } => {
288 let _ = reply.send(self.build_status_snapshot());
289 }
290 PmCommand::HasProcess { target, reply } => {
291 let _ = reply.send(self.pm.has_process(&target));
292 }
293 PmCommand::SessionName { reply } => {
294 let _ = reply.send(self.pm.session_name().to_string());
295 }
296 PmCommand::IsProcessExited { target, reply } => {
297 let _ = reply.send(self.pm.is_process_exited(&target));
298 }
299 PmCommand::EnableProxy { proxy_port, reply } => {
300 if let Some(existing) = self.proxy_port {
301 let _ = reply.send(Some(existing));
302 } else {
303 self.proxy_port = Some(proxy_port);
304 self.pm.enable_proxy();
305 self.publish_proxy_state();
306 let _ = reply.send(None);
307 }
308 }
309 PmCommand::Subscribe { reply } => {
310 let _ = reply.send(self.pm.output_tx.subscribe());
311 }
312 }
313 }
314
315 fn build_status(&mut self) -> Response {
317 if self.pm.refresh_exit_states() {
318 self.publish_proxy_state();
319 }
320 let mut resp = self.pm.status_snapshot();
321 self.rewrite_urls(&mut resp);
322 resp
323 }
324
325 fn build_status_snapshot(&mut self) -> Response {
327 if self.pm.refresh_exit_states() {
328 self.publish_proxy_state();
329 }
330 let mut resp = self.pm.status_snapshot();
331 self.rewrite_urls(&mut resp);
332 resp
333 }
334
335 fn rewrite_urls(&self, resp: &mut Response) {
337 let Some(pp) = self.proxy_port else { return };
338 if let Response::Status { ref mut processes } = *resp {
339 for p in processes.iter_mut() {
340 if let Some(port) = p.port {
341 p.url = Some(protocol::process_url(&p.name, port, Some(pp)));
342 }
343 }
344 }
345 }
346
347 fn publish_proxy_state(&self) {
350 let new_map = self.pm.running_ports();
351 let current = self.proxy_state_tx.borrow();
352 if current.port_map == new_map {
353 return;
354 }
355 drop(current);
356 let _ = self.proxy_state_tx.send(ProxyState { port_map: new_map });
357 }
358}