1use crate::daemon::log_writer::OutputLine;
2use crate::daemon::process_manager::ProcessManager;
3use crate::protocol::{self, ErrorCode, Response};
4use std::collections::HashMap;
5use tokio::sync::{broadcast, mpsc, oneshot, watch};
6
7#[derive(Debug, Clone, PartialEq)]
9pub struct ProxyState {
10 pub port_map: HashMap<String, u16>,
12}
13
14enum PmCommand {
16 Spawn {
17 command: String,
18 name: Option<String>,
19 cwd: Option<String>,
20 env: Option<HashMap<String, String>>,
21 port: Option<u16>,
22 reply: oneshot::Sender<Response>,
23 },
24 Stop {
25 target: String,
26 reply: oneshot::Sender<Response>,
27 },
28 StopAll {
29 reply: oneshot::Sender<Response>,
30 },
31 Restart {
32 target: String,
33 reply: oneshot::Sender<Response>,
34 },
35 Status {
36 reply: oneshot::Sender<Response>,
37 },
38 StatusSnapshot {
39 reply: oneshot::Sender<Response>,
40 },
41 HasProcess {
42 target: String,
43 reply: oneshot::Sender<bool>,
44 },
45 SessionName {
46 reply: oneshot::Sender<String>,
47 },
48 #[allow(clippy::option_option)]
49 IsProcessExited {
50 target: String,
51 reply: oneshot::Sender<Option<Option<i32>>>,
52 },
53 EnableProxy {
55 proxy_port: u16,
56 reply: oneshot::Sender<Option<u16>>,
57 },
58 Subscribe {
59 reply: oneshot::Sender<broadcast::Receiver<OutputLine>>,
60 },
61}
62
63fn actor_error(msg: &str) -> Response {
64 Response::Error {
65 code: ErrorCode::General,
66 message: msg.into(),
67 }
68}
69
70#[derive(Clone)]
72pub struct PmHandle {
73 tx: mpsc::Sender<PmCommand>,
74}
75
76impl PmHandle {
77 pub async fn spawn_process(
78 &self,
79 command: String,
80 name: Option<String>,
81 cwd: Option<String>,
82 env: Option<HashMap<String, String>>,
83 port: Option<u16>,
84 ) -> Response {
85 let (reply, rx) = oneshot::channel();
86 let _ = self
87 .tx
88 .send(PmCommand::Spawn {
89 command,
90 name,
91 cwd,
92 env,
93 port,
94 reply,
95 })
96 .await;
97 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
98 }
99
100 pub async fn stop_process(&self, target: &str) -> Response {
101 let (reply, rx) = oneshot::channel();
102 let _ = self
103 .tx
104 .send(PmCommand::Stop {
105 target: target.to_string(),
106 reply,
107 })
108 .await;
109 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
110 }
111
112 pub async fn stop_all(&self) -> Response {
113 let (reply, rx) = oneshot::channel();
114 let _ = self.tx.send(PmCommand::StopAll { reply }).await;
115 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
116 }
117
118 pub async fn restart_process(&self, target: &str) -> Response {
119 let (reply, rx) = oneshot::channel();
120 let _ = self
121 .tx
122 .send(PmCommand::Restart {
123 target: target.to_string(),
124 reply,
125 })
126 .await;
127 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
128 }
129
130 pub async fn status(&self) -> Response {
131 let (reply, rx) = oneshot::channel();
132 let _ = self.tx.send(PmCommand::Status { reply }).await;
133 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
134 }
135
136 pub async fn status_snapshot(&self) -> Response {
137 let (reply, rx) = oneshot::channel();
138 let _ = self.tx.send(PmCommand::StatusSnapshot { reply }).await;
139 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
140 }
141
142 pub async fn has_process(&self, target: &str) -> bool {
143 let (reply, rx) = oneshot::channel();
144 let _ = self
145 .tx
146 .send(PmCommand::HasProcess {
147 target: target.to_string(),
148 reply,
149 })
150 .await;
151 rx.await.unwrap_or(false)
152 }
153
154 pub async fn session_name(&self) -> String {
155 let (reply, rx) = oneshot::channel();
156 let _ = self.tx.send(PmCommand::SessionName { reply }).await;
157 rx.await.unwrap_or_default()
158 }
159
160 pub async fn is_process_exited(&self, target: &str) -> Option<Option<i32>> {
161 let (reply, rx) = oneshot::channel();
162 let _ = self
163 .tx
164 .send(PmCommand::IsProcessExited {
165 target: target.to_string(),
166 reply,
167 })
168 .await;
169 rx.await.unwrap_or(None)
170 }
171
172 pub async fn enable_proxy(&self, proxy_port: u16) -> Option<u16> {
174 let (reply, rx) = oneshot::channel();
175 let _ = self
176 .tx
177 .send(PmCommand::EnableProxy { proxy_port, reply })
178 .await;
179 rx.await.unwrap_or(None)
180 }
181
182 pub async fn subscribe(&self) -> broadcast::Receiver<OutputLine> {
183 let (reply, rx) = oneshot::channel();
184 let _ = self.tx.send(PmCommand::Subscribe { reply }).await;
185 rx.await.expect("actor should be alive for subscribe")
186 }
187}
188
189pub struct ProcessManagerActor {
191 pm: ProcessManager,
192 rx: mpsc::Receiver<PmCommand>,
193 proxy_state_tx: watch::Sender<ProxyState>,
194 proxy_port: Option<u16>,
195}
196
197impl ProcessManagerActor {
198 pub fn new(session: &str) -> (PmHandle, watch::Receiver<ProxyState>, Self) {
200 let (tx, rx) = mpsc::channel(256);
201 let pm = ProcessManager::new(session);
202
203 let initial_state = ProxyState {
204 port_map: HashMap::new(),
205 };
206 let (proxy_state_tx, proxy_state_rx) = watch::channel(initial_state);
207
208 let handle = PmHandle { tx };
209 let actor = Self {
210 pm,
211 rx,
212 proxy_state_tx,
213 proxy_port: None,
214 };
215
216 (handle, proxy_state_rx, actor)
217 }
218
219 pub async fn run(mut self) {
221 while let Some(cmd) = self.rx.recv().await {
222 self.handle_command(cmd).await;
223 }
224 }
225
226 async fn handle_command(&mut self, cmd: PmCommand) {
227 match cmd {
228 PmCommand::Spawn {
229 command,
230 name,
231 cwd,
232 env,
233 port,
234 reply,
235 } => {
236 let mut resp = self
237 .pm
238 .spawn_process(&command, name, cwd.as_deref(), env.as_ref(), port)
239 .await;
240 if let Response::RunOk {
241 ref name,
242 ref mut url,
243 port: Some(p),
244 ..
245 } = resp
246 && let Some(pp) = self.proxy_port
247 {
248 *url = Some(protocol::process_url(name, p, Some(pp)));
249 }
250 self.publish_proxy_state();
251 let _ = reply.send(resp);
252 }
253 PmCommand::Stop { target, reply } => {
254 let resp = self.pm.stop_process(&target).await;
255 self.publish_proxy_state();
256 let _ = reply.send(resp);
257 }
258 PmCommand::StopAll { reply } => {
259 let resp = self.pm.stop_all().await;
260 self.publish_proxy_state();
261 let _ = reply.send(resp);
262 }
263 PmCommand::Restart { target, reply } => {
264 let resp = self.pm.restart_process(&target).await;
265 self.publish_proxy_state();
266 let _ = reply.send(resp);
267 }
268 PmCommand::Status { reply } => {
269 let _ = reply.send(self.build_status());
270 }
271 PmCommand::StatusSnapshot { reply } => {
272 let _ = reply.send(self.build_status_snapshot());
273 }
274 PmCommand::HasProcess { target, reply } => {
275 let _ = reply.send(self.pm.has_process(&target));
276 }
277 PmCommand::SessionName { reply } => {
278 let _ = reply.send(self.pm.session_name().to_string());
279 }
280 PmCommand::IsProcessExited { target, reply } => {
281 let _ = reply.send(self.pm.is_process_exited(&target));
282 }
283 PmCommand::EnableProxy { proxy_port, reply } => {
284 if let Some(existing) = self.proxy_port {
285 let _ = reply.send(Some(existing));
286 } else {
287 self.proxy_port = Some(proxy_port);
288 self.pm.enable_proxy();
289 self.publish_proxy_state();
290 let _ = reply.send(None);
291 }
292 }
293 PmCommand::Subscribe { reply } => {
294 let _ = reply.send(self.pm.output_tx.subscribe());
295 }
296 }
297 }
298
299 fn build_status(&mut self) -> Response {
301 let mut resp = self.pm.status();
302 self.rewrite_urls(&mut resp);
303 resp
304 }
305
306 fn build_status_snapshot(&self) -> Response {
308 let mut resp = self.pm.status_snapshot();
309 self.rewrite_urls(&mut resp);
310 resp
311 }
312
313 fn rewrite_urls(&self, resp: &mut Response) {
315 let Some(pp) = self.proxy_port else { return };
316 if let Response::Status { ref mut processes } = *resp {
317 for p in processes.iter_mut() {
318 if let Some(port) = p.port {
319 p.url = Some(protocol::process_url(&p.name, port, Some(pp)));
320 }
321 }
322 }
323 }
324
325 fn publish_proxy_state(&self) {
328 let new_map = self.pm.running_ports();
329 let current = self.proxy_state_tx.borrow();
330 if current.port_map == new_map {
331 return;
332 }
333 drop(current);
334 let _ = self.proxy_state_tx.send(ProxyState { port_map: new_map });
335 }
336}