1use crate::daemon::log_writer::OutputLine;
2use crate::daemon::process_manager::ProcessManager;
3use crate::protocol::{self, ErrorCode, Response};
4use std::collections::HashMap;
5use tokio::sync::{broadcast, mpsc, oneshot, watch};
6
7#[derive(Debug, Clone, PartialEq)]
9pub struct ProxyState {
10 pub port_map: HashMap<String, u16>,
12}
13
14enum PmCommand {
16 Spawn {
17 command: String,
18 name: Option<String>,
19 cwd: Option<String>,
20 env: Option<HashMap<String, String>>,
21 port: Option<u16>,
22 reply: oneshot::Sender<Response>,
23 },
24 Stop {
25 target: String,
26 reply: oneshot::Sender<Response>,
27 },
28 StopAll {
29 reply: oneshot::Sender<Response>,
30 },
31 Restart {
32 target: String,
33 reply: oneshot::Sender<Response>,
34 },
35 Status {
36 reply: oneshot::Sender<Response>,
37 },
38 StatusSnapshot {
39 reply: oneshot::Sender<Response>,
40 },
41 HasProcess {
42 target: String,
43 reply: oneshot::Sender<bool>,
44 },
45 SessionName {
46 reply: oneshot::Sender<String>,
47 },
48 #[allow(clippy::option_option)]
49 IsProcessExited {
50 target: String,
51 reply: oneshot::Sender<Option<Option<i32>>>,
52 },
53 EnableProxy {
55 proxy_port: u16,
56 reply: oneshot::Sender<Option<u16>>,
57 },
58 Subscribe {
59 reply: oneshot::Sender<broadcast::Receiver<OutputLine>>,
60 },
61}
62
63fn actor_error(msg: &str) -> Response {
64 Response::Error {
65 code: ErrorCode::General,
66 message: msg.into(),
67 }
68}
69
70#[derive(Clone)]
72pub struct PmHandle {
73 tx: mpsc::Sender<PmCommand>,
74}
75
76impl PmHandle {
77 pub async fn spawn_process(
78 &self,
79 command: String,
80 name: Option<String>,
81 cwd: Option<String>,
82 env: Option<HashMap<String, String>>,
83 port: Option<u16>,
84 ) -> Response {
85 let (reply, rx) = oneshot::channel();
86 let _ = self
87 .tx
88 .send(PmCommand::Spawn {
89 command,
90 name,
91 cwd,
92 env,
93 port,
94 reply,
95 })
96 .await;
97 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
98 }
99
100 pub async fn stop_process(&self, target: &str) -> Response {
101 let (reply, rx) = oneshot::channel();
102 let _ = self
103 .tx
104 .send(PmCommand::Stop {
105 target: target.to_string(),
106 reply,
107 })
108 .await;
109 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
110 }
111
112 pub async fn stop_all(&self) -> Response {
113 let (reply, rx) = oneshot::channel();
114 let _ = self.tx.send(PmCommand::StopAll { reply }).await;
115 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
116 }
117
118 pub async fn restart_process(&self, target: &str) -> Response {
119 let (reply, rx) = oneshot::channel();
120 let _ = self
121 .tx
122 .send(PmCommand::Restart {
123 target: target.to_string(),
124 reply,
125 })
126 .await;
127 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
128 }
129
130 pub async fn status(&self) -> Response {
131 let (reply, rx) = oneshot::channel();
132 let _ = self.tx.send(PmCommand::Status { reply }).await;
133 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
134 }
135
136 pub async fn status_snapshot(&self) -> Response {
137 let (reply, rx) = oneshot::channel();
138 let _ = self.tx.send(PmCommand::StatusSnapshot { reply }).await;
139 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
140 }
141
142 pub async fn has_process(&self, target: &str) -> bool {
143 let (reply, rx) = oneshot::channel();
144 let _ = self
145 .tx
146 .send(PmCommand::HasProcess {
147 target: target.to_string(),
148 reply,
149 })
150 .await;
151 rx.await.unwrap_or(false)
152 }
153
154 pub async fn session_name(&self) -> String {
155 let (reply, rx) = oneshot::channel();
156 let _ = self.tx.send(PmCommand::SessionName { reply }).await;
157 rx.await.unwrap_or_default()
158 }
159
160 pub async fn is_process_exited(&self, target: &str) -> Option<Option<i32>> {
161 let (reply, rx) = oneshot::channel();
162 let _ = self
163 .tx
164 .send(PmCommand::IsProcessExited {
165 target: target.to_string(),
166 reply,
167 })
168 .await;
169 rx.await.unwrap_or(None)
170 }
171
172 pub async fn enable_proxy(&self, proxy_port: u16) -> Option<u16> {
174 let (reply, rx) = oneshot::channel();
175 let _ = self
176 .tx
177 .send(PmCommand::EnableProxy { proxy_port, reply })
178 .await;
179 rx.await.unwrap_or(None)
180 }
181
182 pub async fn subscribe(&self) -> broadcast::Receiver<OutputLine> {
183 let (reply, rx) = oneshot::channel();
184 let _ = self.tx.send(PmCommand::Subscribe { reply }).await;
185 rx.await.expect("actor should be alive for subscribe")
186 }
187}
188
189pub struct ProcessManagerActor {
191 pm: ProcessManager,
192 rx: mpsc::Receiver<PmCommand>,
193 proxy_state_tx: watch::Sender<ProxyState>,
194 proxy_port: Option<u16>,
195}
196
197impl ProcessManagerActor {
198 pub fn new(session: &str) -> (PmHandle, watch::Receiver<ProxyState>, Self) {
200 let (tx, rx) = mpsc::channel(256);
201 let pm = ProcessManager::new(session);
202
203 let initial_state = ProxyState {
204 port_map: HashMap::new(),
205 };
206 let (proxy_state_tx, proxy_state_rx) = watch::channel(initial_state);
207
208 let handle = PmHandle { tx };
209 let actor = Self {
210 pm,
211 rx,
212 proxy_state_tx,
213 proxy_port: None,
214 };
215
216 (handle, proxy_state_rx, actor)
217 }
218
219 pub async fn run(mut self) {
221 while let Some(cmd) = self.rx.recv().await {
222 self.handle_command(cmd).await;
223 }
224 }
225
226 async fn handle_command(&mut self, cmd: PmCommand) {
227 match cmd {
228 PmCommand::Spawn {
229 command,
230 name,
231 cwd,
232 env,
233 port,
234 reply,
235 } => {
236 let mut resp = self
237 .pm
238 .spawn_process(&command, name, cwd.as_deref(), env.as_ref(), port)
239 .await;
240 if let Response::RunOk {
241 ref name,
242 ref mut url,
243 port: Some(p),
244 ..
245 } = resp
246 {
247 if let Some(pp) = self.proxy_port {
248 *url = Some(protocol::process_url(name, p, Some(pp)));
249 }
250 }
251 self.publish_proxy_state();
252 let _ = reply.send(resp);
253 }
254 PmCommand::Stop { target, reply } => {
255 let resp = self.pm.stop_process(&target).await;
256 self.publish_proxy_state();
257 let _ = reply.send(resp);
258 }
259 PmCommand::StopAll { reply } => {
260 let resp = self.pm.stop_all().await;
261 self.publish_proxy_state();
262 let _ = reply.send(resp);
263 }
264 PmCommand::Restart { target, reply } => {
265 let resp = self.pm.restart_process(&target).await;
266 self.publish_proxy_state();
267 let _ = reply.send(resp);
268 }
269 PmCommand::Status { reply } => {
270 let _ = reply.send(self.build_status());
271 }
272 PmCommand::StatusSnapshot { reply } => {
273 let _ = reply.send(self.build_status_snapshot());
274 }
275 PmCommand::HasProcess { target, reply } => {
276 let _ = reply.send(self.pm.has_process(&target));
277 }
278 PmCommand::SessionName { reply } => {
279 let _ = reply.send(self.pm.session_name().to_string());
280 }
281 PmCommand::IsProcessExited { target, reply } => {
282 let _ = reply.send(self.pm.is_process_exited(&target));
283 }
284 PmCommand::EnableProxy { proxy_port, reply } => {
285 if let Some(existing) = self.proxy_port {
286 let _ = reply.send(Some(existing));
287 } else {
288 self.proxy_port = Some(proxy_port);
289 self.pm.enable_proxy();
290 self.publish_proxy_state();
291 let _ = reply.send(None);
292 }
293 }
294 PmCommand::Subscribe { reply } => {
295 let _ = reply.send(self.pm.output_tx.subscribe());
296 }
297 }
298 }
299
300 fn build_status(&mut self) -> Response {
302 let mut resp = self.pm.status();
303 self.rewrite_urls(&mut resp);
304 resp
305 }
306
307 fn build_status_snapshot(&self) -> Response {
309 let mut resp = self.pm.status_snapshot();
310 self.rewrite_urls(&mut resp);
311 resp
312 }
313
314 fn rewrite_urls(&self, resp: &mut Response) {
316 let Some(pp) = self.proxy_port else { return };
317 if let Response::Status { ref mut processes } = *resp {
318 for p in processes.iter_mut() {
319 if let Some(port) = p.port {
320 p.url = Some(protocol::process_url(&p.name, port, Some(pp)));
321 }
322 }
323 }
324 }
325
326 fn publish_proxy_state(&self) {
329 let new_map = self.pm.running_ports();
330 let current = self.proxy_state_tx.borrow();
331 if current.port_map == new_map {
332 return;
333 }
334 drop(current);
335 let _ = self.proxy_state_tx.send(ProxyState { port_map: new_map });
336 }
337}