1use crate::daemon::log_writer::OutputLine;
2use crate::daemon::process_manager::ProcessManager;
3use crate::protocol::{self, ErrorCode, Response, Stream as ProtoStream};
4use std::collections::HashMap;
5use tokio::sync::{broadcast, mpsc, oneshot, watch};
6use tokio::time::{self, Duration, MissedTickBehavior};
7
8const EXIT_REFRESH_INTERVAL: Duration = Duration::from_millis(200);
9
10#[derive(Debug, Clone, PartialEq)]
12pub struct ProxyState {
13 pub port_map: HashMap<String, u16>,
15}
16
17enum PmCommand {
19 Spawn {
20 command: String,
21 name: Option<String>,
22 cwd: Option<String>,
23 env: Option<HashMap<String, String>>,
24 port: Option<u16>,
25 restart: Option<crate::protocol::RestartPolicy>,
26 watch: Option<crate::protocol::WatchConfig>,
27 reply: oneshot::Sender<Response>,
28 },
29 Stop {
30 target: String,
31 reply: oneshot::Sender<Response>,
32 },
33 StopAll {
34 reply: oneshot::Sender<Response>,
35 },
36 Restart {
37 target: String,
38 reply: oneshot::Sender<Response>,
39 },
40 Status {
41 reply: oneshot::Sender<Response>,
42 },
43 StatusSnapshot {
44 reply: oneshot::Sender<Response>,
45 },
46 HasProcess {
47 target: String,
48 reply: oneshot::Sender<bool>,
49 },
50 SessionName {
51 reply: oneshot::Sender<String>,
52 },
53 #[allow(clippy::option_option)]
54 IsProcessExited {
55 target: String,
56 reply: oneshot::Sender<Option<Option<i32>>>,
57 },
58 EnableProxy {
60 proxy_port: u16,
61 reply: oneshot::Sender<Option<u16>>,
62 },
63 Subscribe {
64 reply: oneshot::Sender<broadcast::Receiver<OutputLine>>,
65 },
66 AutoRestart {
68 name: String,
69 },
70 WatchRestart {
72 name: String,
73 },
74}
75
76fn actor_error(msg: &str) -> Response {
77 Response::Error {
78 code: ErrorCode::General,
79 message: msg.into(),
80 }
81}
82
83#[derive(Clone)]
85pub struct PmHandle {
86 tx: mpsc::Sender<PmCommand>,
87}
88
89impl PmHandle {
90 pub async fn spawn_process(
91 &self,
92 command: String,
93 name: Option<String>,
94 cwd: Option<String>,
95 env: Option<HashMap<String, String>>,
96 port: Option<u16>,
97 ) -> Response {
98 self.spawn_process_supervised(command, name, cwd, env, port, None, None)
99 .await
100 }
101
102 #[allow(clippy::too_many_arguments)]
103 pub async fn spawn_process_supervised(
104 &self,
105 command: String,
106 name: Option<String>,
107 cwd: Option<String>,
108 env: Option<HashMap<String, String>>,
109 port: Option<u16>,
110 restart: Option<crate::protocol::RestartPolicy>,
111 watch: Option<crate::protocol::WatchConfig>,
112 ) -> Response {
113 let (reply, rx) = oneshot::channel();
114 let _ = self
115 .tx
116 .send(PmCommand::Spawn {
117 command,
118 name,
119 cwd,
120 env,
121 port,
122 restart,
123 watch,
124 reply,
125 })
126 .await;
127 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
128 }
129
130 pub async fn stop_process(&self, target: &str) -> Response {
131 let (reply, rx) = oneshot::channel();
132 let _ = self
133 .tx
134 .send(PmCommand::Stop {
135 target: target.to_string(),
136 reply,
137 })
138 .await;
139 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
140 }
141
142 pub async fn stop_all(&self) -> Response {
143 let (reply, rx) = oneshot::channel();
144 let _ = self.tx.send(PmCommand::StopAll { reply }).await;
145 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
146 }
147
148 pub async fn restart_process(&self, target: &str) -> Response {
149 let (reply, rx) = oneshot::channel();
150 let _ = self
151 .tx
152 .send(PmCommand::Restart {
153 target: target.to_string(),
154 reply,
155 })
156 .await;
157 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
158 }
159
160 pub async fn status(&self) -> Response {
161 let (reply, rx) = oneshot::channel();
162 let _ = self.tx.send(PmCommand::Status { reply }).await;
163 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
164 }
165
166 pub async fn status_snapshot(&self) -> Response {
167 let (reply, rx) = oneshot::channel();
168 let _ = self.tx.send(PmCommand::StatusSnapshot { reply }).await;
169 rx.await.unwrap_or_else(|_| actor_error("actor stopped"))
170 }
171
172 pub async fn has_process(&self, target: &str) -> bool {
173 let (reply, rx) = oneshot::channel();
174 let _ = self
175 .tx
176 .send(PmCommand::HasProcess {
177 target: target.to_string(),
178 reply,
179 })
180 .await;
181 rx.await.unwrap_or(false)
182 }
183
184 pub async fn session_name(&self) -> String {
185 let (reply, rx) = oneshot::channel();
186 let _ = self.tx.send(PmCommand::SessionName { reply }).await;
187 rx.await.unwrap_or_default()
188 }
189
190 pub async fn is_process_exited(&self, target: &str) -> Option<Option<i32>> {
191 let (reply, rx) = oneshot::channel();
192 let _ = self
193 .tx
194 .send(PmCommand::IsProcessExited {
195 target: target.to_string(),
196 reply,
197 })
198 .await;
199 rx.await.unwrap_or(None)
200 }
201
202 pub async fn enable_proxy(&self, proxy_port: u16) -> Option<u16> {
204 let (reply, rx) = oneshot::channel();
205 let _ = self
206 .tx
207 .send(PmCommand::EnableProxy { proxy_port, reply })
208 .await;
209 rx.await.unwrap_or(None)
210 }
211
212 pub async fn subscribe(&self) -> broadcast::Receiver<OutputLine> {
213 let (reply, rx) = oneshot::channel();
214 let _ = self.tx.send(PmCommand::Subscribe { reply }).await;
215 rx.await.expect("actor should be alive for subscribe")
216 }
217}
218
219pub struct ProcessManagerActor {
221 pm: ProcessManager,
222 rx: mpsc::Receiver<PmCommand>,
223 self_tx: mpsc::Sender<PmCommand>,
225 proxy_state_tx: watch::Sender<ProxyState>,
226 proxy_port: Option<u16>,
227}
228
229impl ProcessManagerActor {
230 pub fn new(session: &str) -> (PmHandle, watch::Receiver<ProxyState>, Self) {
232 let (tx, rx) = mpsc::channel(256);
233 let pm = ProcessManager::new(session);
234
235 let initial_state = ProxyState {
236 port_map: HashMap::new(),
237 };
238 let (proxy_state_tx, proxy_state_rx) = watch::channel(initial_state);
239
240 let handle = PmHandle { tx: tx.clone() };
241 let actor = Self {
242 pm,
243 rx,
244 self_tx: tx,
245 proxy_state_tx,
246 proxy_port: None,
247 };
248
249 (handle, proxy_state_rx, actor)
250 }
251
252 pub async fn run(mut self) {
254 let mut exit_refresh = time::interval(EXIT_REFRESH_INTERVAL);
255 exit_refresh.set_missed_tick_behavior(MissedTickBehavior::Delay);
256
257 loop {
258 tokio::select! {
259 cmd = self.rx.recv() => match cmd {
260 Some(cmd) => self.handle_command(cmd).await,
261 None => break,
262 },
263 _ = exit_refresh.tick() => {
264 if self.pm.refresh_exit_states() {
265 self.publish_proxy_state();
266 }
267 self.schedule_restarts();
269 }
270 }
271 }
272 }
273
274 fn schedule_restarts(&mut self) {
277 let (restartable, exhausted) = self.pm.classify_restart_candidates();
278
279 for name in &exhausted {
281 if let Some(p) = self.pm.find(name)
282 && let Some(ref policy) = p.restart_policy
283 && let Some(max) = policy.max_restarts
284 && let Some(ref tx) = p.supervisor_tx
285 {
286 let msg = format!("[agent-procs] Max restarts ({}) exhausted", max);
287 let _ = tx.try_send(msg);
288 }
289 self.pm.mark_failed(name);
290 }
291 if !exhausted.is_empty() {
292 self.publish_proxy_state();
293 }
294
295 for name in restartable {
297 if let Some(p) = self.pm.find_mut(&name) {
298 p.restart_pending = true;
299 let delay_ms = p
300 .restart_policy
301 .as_ref()
302 .map_or(1000, |rp| rp.restart_delay_ms);
303 let tx = self.self_tx.clone();
304 let name_clone = name.clone();
305 tokio::spawn(async move {
306 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
307 let _ = tx.send(PmCommand::AutoRestart { name: name_clone }).await;
308 });
309 }
310 }
311 }
312
313 async fn handle_command(&mut self, cmd: PmCommand) {
314 match cmd {
315 PmCommand::Spawn {
316 command,
317 name,
318 cwd,
319 env,
320 port,
321 restart,
322 watch,
323 reply,
324 } => {
325 let mut resp = self
326 .pm
327 .spawn_process(&command, name, cwd.as_deref(), env.as_ref(), port)
328 .await;
329 let has_watch = watch.is_some();
331 if let Response::RunOk { ref name, .. } = resp
332 && let Some(p) = self.pm.find_mut(name)
333 {
334 p.restart_policy = restart;
335 p.watch_config = watch;
336 }
337 if has_watch
339 && let Response::RunOk { ref name, .. } = resp
340 && let Err(e) = self.setup_watcher(name)
341 {
342 let msg = format!("[agent-procs] Watch setup failed: {}", e);
346 if let Some(p) = self.pm.find(name)
347 && let Some(tx) = &p.supervisor_tx
348 {
349 let _ = tx.try_send(msg);
350 }
351 }
352 if let Response::RunOk {
353 ref name,
354 ref mut url,
355 port: Some(p),
356 ..
357 } = resp
358 && let Some(pp) = self.proxy_port
359 {
360 *url = Some(protocol::process_url(name, p, Some(pp)));
361 }
362 self.publish_proxy_state();
363 let _ = reply.send(resp);
364 }
365 PmCommand::Stop { target, reply } => {
366 if let Some(p) = self.pm.find_mut(&target) {
368 p.watch_handle = None;
369 }
370 let resp = self.pm.stop_process(&target).await;
371 self.publish_proxy_state();
372 let _ = reply.send(resp);
373 }
374 PmCommand::StopAll { reply } => {
375 let resp = self.pm.stop_all().await;
376 self.publish_proxy_state();
377 let _ = reply.send(resp);
378 }
379 PmCommand::Restart { target, reply } => {
380 let resp = self.pm.restart_process(&target).await;
381 if let Response::RunOk { ref name, .. } = resp {
383 let _ = self.setup_watcher(name);
384 }
385 self.publish_proxy_state();
386 let _ = reply.send(resp);
387 }
388 PmCommand::Status { reply } | PmCommand::StatusSnapshot { reply } => {
389 let _ = reply.send(self.build_status());
390 }
391 PmCommand::HasProcess { target, reply } => {
392 let _ = reply.send(self.pm.has_process(&target));
393 }
394 PmCommand::SessionName { reply } => {
395 let _ = reply.send(self.pm.session_name().to_string());
396 }
397 PmCommand::IsProcessExited { target, reply } => {
398 let _ = reply.send(self.pm.is_process_exited(&target));
399 }
400 PmCommand::EnableProxy { proxy_port, reply } => {
401 if let Some(existing) = self.proxy_port {
402 let _ = reply.send(Some(existing));
403 } else {
404 self.proxy_port = Some(proxy_port);
405 self.pm.enable_proxy();
406 self.publish_proxy_state();
407 let _ = reply.send(None);
408 }
409 }
410 PmCommand::Subscribe { reply } => {
411 let _ = reply.send(self.pm.output_tx.subscribe());
412 }
413 PmCommand::AutoRestart { name } => {
414 self.handle_auto_restart(&name).await;
415 }
416 PmCommand::WatchRestart { name } => {
417 self.handle_watch_restart(&name).await;
418 }
419 }
420 }
421
422 async fn handle_auto_restart(&mut self, name: &str) {
423 let should_restart = self
425 .pm
426 .find(name)
427 .is_some_and(|p| p.child.is_none() && !p.manually_stopped && p.restart_pending);
428 if !should_restart {
429 if let Some(p) = self.pm.find_mut(name) {
430 p.restart_pending = false;
431 }
432 return;
433 }
434
435 let prev_exit_code = self.pm.find(name).and_then(|p| p.exit_code);
437
438 if let Some(p) = self.pm.find_mut(name) {
440 p.restart_count += 1;
441 }
442
443 match self.pm.respawn_in_place(name).await {
445 Ok(()) => {
446 if let Some(p) = self.pm.find(name) {
448 let count = p.restart_count;
449 let max = p.restart_policy.as_ref().and_then(|rp| rp.max_restarts);
450 let exit = prev_exit_code.map_or("signal".into(), |c: i32| c.to_string());
451 let msg = match max {
452 Some(m) => {
453 format!(
454 "[agent-procs] Restarted (exit {}, attempt {}/{})",
455 exit, count, m
456 )
457 }
458 None => {
459 format!("[agent-procs] Restarted (exit {}, attempt {})", exit, count)
460 }
461 };
462 if let Some(tx) = &p.supervisor_tx {
463 let _ = tx.send(msg).await;
464 }
465 }
466 let _ = self.setup_watcher(name);
469 }
470 Err(err) => {
471 let msg = format!("[agent-procs] Restart failed: {}", err);
473 let _ = self.pm.output_tx.send(OutputLine {
474 process: name.to_string(),
475 stream: ProtoStream::Stdout,
476 line: msg,
477 });
478 }
479 }
480
481 if let Some(p) = self.pm.find_mut(name) {
483 p.restart_pending = false;
484 }
485 self.publish_proxy_state();
486 }
487
488 async fn handle_watch_restart(&mut self, name: &str) {
489 let should_restart = self.pm.find(name).is_some_and(|p| !p.manually_stopped);
490 if !should_restart {
491 return;
492 }
493
494 if self.pm.find(name).is_some_and(|p| p.child.is_some()) {
496 let _ = self.pm.stop_process(name).await;
497 if let Some(p) = self.pm.find_mut(name) {
499 p.manually_stopped = false;
500 }
501 }
502
503 if let Some(p) = self.pm.find_mut(name) {
505 p.restart_count = 0;
506 p.failed = false;
507 }
508
509 match self.pm.respawn_in_place(name).await {
511 Ok(()) => {
512 if let Some(p) = self.pm.find(name)
513 && let Some(tx) = &p.supervisor_tx
514 {
515 let _ = tx
516 .send("[agent-procs] File changed, restarted".to_string())
517 .await;
518 }
519 let _ = self.setup_watcher(name);
521 }
522 Err(err) => {
523 let _ = self.pm.output_tx.send(OutputLine {
524 process: name.to_string(),
525 stream: ProtoStream::Stdout,
526 line: format!("[agent-procs] Watch restart failed: {}", err),
527 });
528 }
529 }
530
531 self.publish_proxy_state();
532 }
533
534 fn setup_watcher(&mut self, name: &str) -> Result<(), String> {
538 let (paths, ignore, cwd) = {
539 let Some(p) = self.pm.find(name) else {
540 return Ok(());
541 };
542 let Some(ref wc) = p.watch_config else {
543 return Ok(());
544 };
545 (
546 wc.paths.clone(),
547 wc.ignore.clone(),
548 p.cwd.clone().unwrap_or_else(|| ".".to_string()),
549 )
550 };
551
552 let base_dir = std::path::PathBuf::from(&cwd);
553 let ignore_refs: Option<Vec<String>> = ignore;
554 let ignore_slice = ignore_refs.as_deref();
555
556 let tx = self.self_tx.clone();
557 let proc_name = name.to_string();
558 let (restart_tx, mut restart_rx) = tokio::sync::mpsc::channel::<String>(16);
559
560 tokio::spawn(async move {
562 while let Some(name) = restart_rx.recv().await {
563 let _ = tx.send(PmCommand::WatchRestart { name }).await;
564 }
565 });
566
567 match crate::daemon::watcher::create_watcher(
568 &paths,
569 ignore_slice,
570 &base_dir,
571 proc_name,
572 restart_tx,
573 ) {
574 Ok(handle) => {
575 if let Some(p) = self.pm.find_mut(name) {
576 p.watch_handle = Some(handle);
577 }
578 Ok(())
579 }
580 Err(e) => {
581 tracing::warn!(process = %name, error = %e, "failed to create file watcher");
582 if let Some(p) = self.pm.find_mut(name) {
584 p.watch_config = None;
585 }
586 Err(e)
587 }
588 }
589 }
590
591 fn build_status(&mut self) -> Response {
593 if self.pm.refresh_exit_states() {
594 self.publish_proxy_state();
595 }
596 let mut resp = self.pm.status_snapshot();
597 self.rewrite_urls(&mut resp);
598 resp
599 }
600
601 fn rewrite_urls(&self, resp: &mut Response) {
603 let Some(pp) = self.proxy_port else { return };
604 if let Response::Status { ref mut processes } = *resp {
605 for p in processes.iter_mut() {
606 if let Some(port) = p.port {
607 p.url = Some(protocol::process_url(&p.name, port, Some(pp)));
608 }
609 }
610 }
611 }
612
613 fn publish_proxy_state(&self) {
616 let new_map = self.pm.running_ports();
617 let current = self.proxy_state_tx.borrow();
618 if current.port_map == new_map {
619 return;
620 }
621 drop(current);
622 let _ = self.proxy_state_tx.send(ProxyState { port_map: new_map });
623 }
624}