1use crate::daemon::log_writer::{self, OutputLine};
2use crate::daemon::port_allocator::PortAllocator;
3use crate::paths;
4use crate::protocol::{
5 ErrorCode, ProcessInfo, ProcessState, Response, RestartMode, RestartPolicy,
6 Stream as ProtoStream, WatchConfig, process_url,
7};
8use crate::session::IdCounter;
9use std::collections::HashMap;
10use std::process::Stdio;
11use std::sync::Arc;
12use std::sync::atomic::AtomicU64;
13use std::time::{Duration, Instant};
14use tokio::process::{Child, Command};
15use tokio::sync::broadcast;
16
17const DEFAULT_MAX_LOG_BYTES: u64 = 50 * 1024 * 1024; #[must_use]
22pub fn is_valid_dns_label(name: &str) -> bool {
23 if name.is_empty() || name.len() > 63 {
24 return false;
25 }
26 if name.starts_with('-') || name.ends_with('-') {
27 return false;
28 }
29 name.chars()
30 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-')
31}
32
33pub struct ManagedProcess {
34 pub name: String,
35 pub id: String,
36 pub command: String,
37 pub cwd: Option<String>,
38 pub env: HashMap<String, String>,
39 pub child: Option<Child>,
40 pub pid: u32,
41 pub started_at: Instant,
42 pub exit_code: Option<i32>,
43 pub port: Option<u16>,
44 pub restart_policy: Option<RestartPolicy>,
46 pub watch_config: Option<WatchConfig>,
47 pub restart_count: u32,
48 pub manually_stopped: bool,
49 pub restart_pending: bool,
50 pub failed: bool,
51 pub supervisor_tx: Option<tokio::sync::mpsc::Sender<String>>,
52 pub capture_handles: Vec<tokio::task::JoinHandle<()>>,
53 pub watch_handle: Option<crate::daemon::watcher::WatchHandle>,
54}
55
56pub struct ProcessManager {
57 processes: HashMap<String, ManagedProcess>,
58 id_counter: IdCounter,
59 session: String,
60 pub output_tx: broadcast::Sender<OutputLine>,
61 port_allocator: PortAllocator,
62}
63
64impl ProcessManager {
65 pub fn new(session: &str) -> Self {
66 let (output_tx, _) = broadcast::channel(1024);
67 Self {
68 processes: HashMap::new(),
69 id_counter: IdCounter::new(),
70 session: session.to_string(),
71 output_tx,
72 port_allocator: PortAllocator::new(),
73 }
74 }
75
76 #[allow(unsafe_code, clippy::unused_async)]
77 pub async fn spawn_process(
78 &mut self,
79 command: &str,
80 name: Option<String>,
81 cwd: Option<&str>,
82 env: Option<&HashMap<String, String>>,
83 port: Option<u16>,
84 ) -> Response {
85 let id = self.id_counter.next_id();
86 let name = name.unwrap_or_else(|| id.clone());
87
88 if name.contains('/') || name.contains('\\') || name.contains("..") || name.contains('\0') {
90 return Response::Error {
91 code: ErrorCode::General,
92 message: format!("invalid process name: {}", name),
93 };
94 }
95
96 if self.port_allocator.is_proxy_enabled() && !is_valid_dns_label(&name) {
98 return Response::Error {
99 code: ErrorCode::General,
100 message: format!(
101 "invalid process name for proxy: '{}' (must be lowercase alphanumeric/hyphens, max 63 chars)",
102 name
103 ),
104 };
105 }
106
107 let resolved_port = if let Some(p) = port {
109 Some(p)
110 } else if self.port_allocator.is_proxy_enabled() {
111 let assigned: std::collections::HashSet<u16> = self
112 .processes
113 .values()
114 .filter(|p| p.child.is_some())
115 .filter_map(|p| p.port)
116 .collect();
117 match self.port_allocator.auto_assign_port(&assigned) {
118 Ok(p) => Some(p),
119 Err(e) => {
120 return Response::Error {
121 code: ErrorCode::General,
122 message: e.to_string(),
123 };
124 }
125 }
126 } else {
127 None
128 };
129
130 if self.processes.contains_key(&name) {
131 return Response::Error {
132 code: ErrorCode::General,
133 message: format!("process already exists: {}", name),
134 };
135 }
136
137 let log_dir = paths::log_dir(&self.session);
138 let _ = std::fs::create_dir_all(&log_dir);
139
140 let mut cmd = Command::new("sh");
141 cmd.arg("-c")
142 .arg(command)
143 .stdout(Stdio::piped())
144 .stderr(Stdio::piped());
145 if let Some(dir) = cwd {
146 cmd.current_dir(dir);
147 }
148 if let Some(p) = resolved_port {
149 let mut merged_env: HashMap<String, String> = HashMap::new();
151 merged_env.insert("PORT".to_string(), p.to_string());
152 merged_env.insert("HOST".to_string(), "127.0.0.1".to_string());
153 if let Some(env_vars) = env {
154 for (k, v) in env_vars {
155 merged_env.insert(k.clone(), v.clone());
156 }
157 }
158 cmd.envs(&merged_env);
159 } else if let Some(env_vars) = env {
160 cmd.envs(env_vars);
161 }
162 unsafe {
166 cmd.pre_exec(|| {
167 nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), nix::unistd::Pid::from_raw(0))
168 .map_err(std::io::Error::other)?;
169 Ok(())
170 });
171 }
172
173 let mut child = match cmd.spawn() {
174 Ok(c) => c,
175 Err(e) => {
176 return Response::Error {
177 code: ErrorCode::General,
178 message: format!("failed to spawn: {}", e),
179 };
180 }
181 };
182
183 let pid = child.id().unwrap_or(0);
184
185 let seq_counter = Arc::new(AtomicU64::new(0));
187
188 let (sup_tx, sup_rx_stdout) = tokio::sync::mpsc::channel::<String>(16);
191 let (stderr_sup_sender, sup_rx_stderr) = tokio::sync::mpsc::channel::<String>(16);
192 drop(stderr_sup_sender);
193
194 let mut capture_handles = Vec::new();
195
196 if let Some(stdout) = child.stdout.take() {
197 let tx = self.output_tx.clone();
198 let pname = name.clone();
199 let path = log_dir.join(format!("{}.stdout", name));
200 let seq = Arc::clone(&seq_counter);
201 let handle = tokio::spawn(async move {
202 log_writer::capture_output(
203 stdout,
204 &path,
205 &pname,
206 ProtoStream::Stdout,
207 tx,
208 DEFAULT_MAX_LOG_BYTES,
209 log_writer::DEFAULT_MAX_ROTATED_FILES,
210 seq,
211 sup_rx_stdout,
212 )
213 .await;
214 });
215 capture_handles.push(handle);
216 }
217 if let Some(stderr) = child.stderr.take() {
218 let tx = self.output_tx.clone();
219 let pname = name.clone();
220 let path = log_dir.join(format!("{}.stderr", name));
221 let seq = Arc::clone(&seq_counter);
222 let handle = tokio::spawn(async move {
223 log_writer::capture_output(
224 stderr,
225 &path,
226 &pname,
227 ProtoStream::Stderr,
228 tx,
229 DEFAULT_MAX_LOG_BYTES,
230 log_writer::DEFAULT_MAX_ROTATED_FILES,
231 seq,
232 sup_rx_stderr,
233 )
234 .await;
235 });
236 capture_handles.push(handle);
237 }
238
239 self.processes.insert(
240 name.clone(),
241 ManagedProcess {
242 name: name.clone(),
243 id: id.clone(),
244 command: command.to_string(),
245 cwd: cwd.map(std::string::ToString::to_string),
246 env: env.cloned().unwrap_or_default(),
247 child: Some(child),
248 pid,
249 started_at: Instant::now(),
250 exit_code: None,
251 port: resolved_port,
252 restart_policy: None,
253 watch_config: None,
254 restart_count: 0,
255 manually_stopped: false,
256 restart_pending: false,
257 failed: false,
258 supervisor_tx: Some(sup_tx),
259 capture_handles,
260 watch_handle: None,
261 },
262 );
263
264 let url = resolved_port.map(|p| process_url(&name, p, None));
265 Response::RunOk {
266 name,
267 id,
268 pid,
269 port: resolved_port,
270 url,
271 }
272 }
273
274 pub async fn stop_process(&mut self, target: &str) -> Response {
275 let proc = match self.find_mut(target) {
276 Some(p) => p,
277 None => {
278 return Response::Error {
279 code: ErrorCode::NotFound,
280 message: format!("process not found: {}", target),
281 };
282 }
283 };
284
285 proc.manually_stopped = true;
286
287 if let Some(ref child) = proc.child {
288 let raw_pid = child.id().unwrap_or(0) as i32;
289 if raw_pid > 0 {
290 let pgid = nix::unistd::Pid::from_raw(raw_pid);
292 let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGTERM);
293 }
294 }
295
296 if let Some(ref mut child) = proc.child {
298 let wait_result = tokio::time::timeout(Duration::from_secs(10), child.wait()).await;
299
300 match wait_result {
301 Ok(Ok(status)) => {
302 proc.exit_code = status.code();
303 }
304 _ => {
305 let raw_pid = proc.pid as i32;
307 if raw_pid > 0 {
308 let pgid = nix::unistd::Pid::from_raw(raw_pid);
309 let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGKILL);
310 }
311 let _ = child.wait().await;
312 proc.exit_code = Some(-9);
313 }
314 }
315 proc.child = None;
316 }
317
318 Response::Ok {
319 message: format!("stopped {}", target),
320 }
321 }
322
323 pub async fn stop_all(&mut self) -> Response {
324 let names: Vec<String> = self.processes.keys().cloned().collect();
325 for name in names {
326 let _ = self.stop_process(&name).await;
327 }
328 self.processes.clear();
329 Response::Ok {
330 message: "all processes stopped".into(),
331 }
332 }
333
334 pub async fn restart_process(&mut self, target: &str) -> Response {
335 let (command, name, cwd, env, port, restart_policy, watch_config) = match self.find(target)
336 {
337 Some(p) => (
338 p.command.clone(),
339 p.name.clone(),
340 p.cwd.clone(),
341 p.env.clone(),
342 p.port,
343 p.restart_policy.clone(),
344 p.watch_config.clone(),
345 ),
346 None => {
347 return Response::Error {
348 code: ErrorCode::NotFound,
349 message: format!("process not found: {}", target),
350 };
351 }
352 };
353 if let Some(p) = self.find_mut(target) {
355 p.manually_stopped = false;
356 p.restart_count = 0;
357 p.failed = false;
358 p.restart_pending = false;
359 }
360 let _ = self.stop_process(target).await;
361 self.processes.remove(&name);
362 let env = if env.is_empty() { None } else { Some(env) };
363 let resp = self
364 .spawn_process(
365 &command,
366 Some(name.clone()),
367 cwd.as_deref(),
368 env.as_ref(),
369 port,
370 )
371 .await;
372 if let Response::RunOk { .. } = resp
374 && let Some(p) = self.find_mut(&name)
375 {
376 p.restart_policy = restart_policy;
377 p.watch_config = watch_config;
378 }
379 resp
380 }
381
382 pub fn enable_proxy(&mut self) {
383 self.port_allocator.enable_proxy();
384 }
385
386 pub fn status(&mut self) -> Response {
387 self.refresh_exit_states();
388 Response::Status {
389 processes: self.build_process_infos(),
390 }
391 }
392
393 pub fn is_process_exited(&mut self, target: &str) -> Option<Option<i32>> {
396 self.refresh_exit_states();
397 self.find(target).and_then(|p| {
398 if p.child.is_none() {
399 Some(p.exit_code)
400 } else {
401 None
402 }
403 })
404 }
405
406 pub(crate) fn refresh_exit_states(&mut self) -> bool {
407 let mut changed = false;
408 for proc in self.processes.values_mut() {
409 if proc.child.is_some()
410 && proc.exit_code.is_none()
411 && let Some(ref mut child) = proc.child
412 && let Ok(Some(status)) = child.try_wait()
413 {
414 proc.exit_code = status.code();
415 proc.child = None;
416 changed = true;
417 }
418 }
419 changed
420 }
421
422 pub fn session_name(&self) -> &str {
423 &self.session
424 }
425
426 pub fn has_process(&self, target: &str) -> bool {
427 self.find(target).is_some()
428 }
429
430 pub fn running_ports(&self) -> HashMap<String, u16> {
432 self.processes
433 .iter()
434 .filter_map(|(name, p)| {
435 if p.child.is_some() {
436 p.port.map(|port| (name.clone(), port))
437 } else {
438 None
439 }
440 })
441 .collect()
442 }
443
444 pub fn status_snapshot(&self) -> Response {
447 Response::Status {
448 processes: self.build_process_infos(),
449 }
450 }
451
452 fn build_process_infos(&self) -> Vec<ProcessInfo> {
453 let mut infos: Vec<ProcessInfo> = self
454 .processes
455 .values()
456 .map(|p| ProcessInfo {
457 name: p.name.clone(),
458 id: p.id.clone(),
459 pid: p.pid,
460 state: if p.child.is_some() {
461 ProcessState::Running
462 } else if p.failed {
463 ProcessState::Failed
464 } else {
465 ProcessState::Exited
466 },
467 exit_code: p.exit_code,
468 uptime_secs: if p.child.is_some() {
469 Some(p.started_at.elapsed().as_secs())
470 } else {
471 None
472 },
473 command: p.command.clone(),
474 port: p.port,
475 url: p.port.map(|port| process_url(&p.name, port, None)),
476 restart_count: if p.restart_count > 0 {
477 Some(p.restart_count)
478 } else {
479 None
480 },
481 max_restarts: p.restart_policy.as_ref().and_then(|rp| rp.max_restarts),
482 restart_policy: p.restart_policy.as_ref().map(|rp| match rp.mode {
483 RestartMode::Always => "always".into(),
484 RestartMode::OnFailure => "on-failure".into(),
485 RestartMode::Never => "never".into(),
486 }),
487 watched: if p.watch_config.is_some() {
488 Some(true)
489 } else {
490 None
491 },
492 })
493 .collect();
494 infos.sort_by(|a, b| a.name.cmp(&b.name));
495 infos
496 }
497
498 pub(crate) fn find(&self, target: &str) -> Option<&ManagedProcess> {
499 self.processes
500 .get(target)
501 .or_else(|| self.processes.values().find(|p| p.id == target))
502 }
503
504 pub(crate) fn find_mut(&mut self, target: &str) -> Option<&mut ManagedProcess> {
505 if self.processes.contains_key(target) {
506 self.processes.get_mut(target)
507 } else {
508 self.processes.values_mut().find(|p| p.id == target)
509 }
510 }
511
512 pub fn classify_restart_candidates(&self) -> (Vec<String>, Vec<String>) {
515 let mut restartable = Vec::new();
516 let mut exhausted = Vec::new();
517 for p in self.processes.values() {
518 if p.child.is_some() || p.manually_stopped || p.restart_pending || p.failed {
519 continue;
520 }
521 let Some(ref policy) = p.restart_policy else {
522 continue;
523 };
524 if !policy.mode.should_restart(p.exit_code) {
525 continue;
526 }
527 if policy
528 .max_restarts
529 .is_some_and(|max| p.restart_count >= max)
530 {
531 exhausted.push(p.name.clone());
532 } else {
533 restartable.push(p.name.clone());
534 }
535 }
536 (restartable, exhausted)
537 }
538
539 pub fn mark_failed(&mut self, target: &str) {
541 if let Some(p) = self.find_mut(target) {
542 p.failed = true;
543 }
544 }
545
546 pub async fn respawn_in_place(&mut self, target: &str) -> Result<(), String> {
550 let proc = self
551 .find(target)
552 .ok_or_else(|| format!("process not found: {}", target))?;
553
554 let command = proc.command.clone();
556 let name = proc.name.clone();
557 let cwd = proc.cwd.clone();
558 let env = proc.env.clone();
559 let port = proc.port;
560 let restart_policy = proc.restart_policy.clone();
561 let watch_config = proc.watch_config.clone();
562 let restart_count = proc.restart_count;
563 let restart_pending = proc.restart_pending;
564
565 if let Some(proc) = self.find_mut(target) {
567 proc.supervisor_tx = None;
568 }
569
570 if let Some(proc) = self.find_mut(target) {
572 let handles = std::mem::take(&mut proc.capture_handles);
573 for h in handles {
574 let _ = h.await;
575 }
576 }
577
578 let log_dir = crate::paths::log_dir(&self.session);
580 let stdout_path = log_dir.join(format!("{}.stdout", name));
581 let stderr_path = log_dir.join(format!("{}.stderr", name));
582 log_writer::rotate_if_exists(&stdout_path).await;
583 log_writer::rotate_if_exists(&stderr_path).await;
584
585 self.processes.remove(&name);
587
588 let env_opt = if env.is_empty() {
590 None
591 } else {
592 Some(env.clone())
593 };
594 let resp = self
595 .spawn_process(
596 &command,
597 Some(name.clone()),
598 cwd.as_deref(),
599 env_opt.as_ref(),
600 port,
601 )
602 .await;
603
604 match resp {
605 Response::RunOk { .. } => {
606 if let Some(p) = self.find_mut(&name) {
608 p.restart_policy = restart_policy;
609 p.watch_config = watch_config;
610 p.restart_count = restart_count;
611 p.failed = false;
612 }
613 Ok(())
614 }
615 Response::Error { message, .. } => {
616 self.processes.insert(
618 name.clone(),
619 ManagedProcess {
620 name: name.clone(),
621 id: "tombstone".into(),
622 command,
623 cwd,
624 env,
625 child: None,
626 pid: 0,
627 started_at: Instant::now(),
628 exit_code: None,
629 port,
630 restart_policy,
631 watch_config,
632 restart_count,
633 manually_stopped: false,
634 restart_pending,
635 failed: true,
636 supervisor_tx: None,
637 capture_handles: Vec::new(),
638 watch_handle: None,
639 },
640 );
641 Err(message)
642 }
643 _ => Err("unexpected response from spawn".into()),
644 }
645 }
646}
647
648#[cfg(test)]
649mod tests {
650 use super::*;
651
652 #[test]
653 fn test_valid_dns_labels() {
654 assert!(is_valid_dns_label("api"));
655 assert!(is_valid_dns_label("my-app"));
656 assert!(is_valid_dns_label("a"));
657 assert!(is_valid_dns_label("a1"));
658 assert!(is_valid_dns_label("123"));
659 }
660
661 #[test]
662 fn test_invalid_dns_labels() {
663 assert!(!is_valid_dns_label(""));
664 assert!(!is_valid_dns_label("-start"));
665 assert!(!is_valid_dns_label("end-"));
666 assert!(!is_valid_dns_label("UPPER"));
667 assert!(!is_valid_dns_label("has.dot"));
668 assert!(!is_valid_dns_label("has space"));
669 assert!(!is_valid_dns_label(&"a".repeat(64))); assert!(!is_valid_dns_label("has_underscore"));
671 }
672
673 #[tokio::test]
674 async fn test_respawn_in_place_preserves_metadata() {
675 let mut pm = ProcessManager::new("test-respawn");
676 let resp = pm
677 .spawn_process("echo hello", Some("worker".into()), None, None, None)
678 .await;
679 assert!(matches!(resp, Response::RunOk { .. }));
680
681 if let Some(p) = pm.find_mut("worker") {
683 p.restart_policy = Some(RestartPolicy {
684 mode: RestartMode::OnFailure,
685 max_restarts: Some(5),
686 restart_delay_ms: 1000,
687 });
688 p.restart_count = 3;
689 }
690
691 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
693 pm.refresh_exit_states();
694
695 let result = pm.respawn_in_place("worker").await;
697 assert!(result.is_ok());
698
699 let p = pm.find("worker").unwrap();
701 assert!(p.child.is_some()); assert_eq!(p.restart_count, 3);
703 assert!(p.restart_policy.is_some());
704 assert_eq!(
705 p.restart_policy.as_ref().unwrap().mode,
706 RestartMode::OnFailure
707 );
708 assert!(!p.failed);
709 }
710
711 #[tokio::test]
712 async fn test_respawn_in_place_tombstone_on_failure() {
713 let mut pm = ProcessManager::new("test-tombstone");
714 let resp = pm
715 .spawn_process("echo hello", Some("worker".into()), None, None, None)
716 .await;
717 assert!(matches!(resp, Response::RunOk { .. }));
718
719 if let Some(p) = pm.find_mut("worker") {
720 p.restart_policy = Some(RestartPolicy {
721 mode: RestartMode::Always,
722 max_restarts: Some(3),
723 restart_delay_ms: 1000,
724 });
725 p.restart_count = 2;
726 p.name = "work/er".to_string();
728 }
729
730 let proc = pm.processes.remove("worker").unwrap();
732 pm.processes.insert("work/er".to_string(), proc);
733
734 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
735 pm.refresh_exit_states();
736
737 let result = pm.respawn_in_place("work/er").await;
738 assert!(result.is_err());
739
740 let p = pm.find("work/er").unwrap();
742 assert!(p.child.is_none());
743 assert!(p.failed);
744 assert_eq!(p.restart_count, 2); assert!(p.restart_policy.is_some());
746 }
747}