1use crate::daemon::log_writer::{self, OutputLine};
2use crate::paths;
3use crate::protocol::{ProcessInfo, ProcessState, Response, Stream as ProtoStream};
4use crate::session::IdCounter;
5use std::collections::HashMap;
6use std::process::Stdio;
7use std::time::{Duration, Instant};
8use tokio::process::{Child, Command};
9use tokio::sync::broadcast;
10
11const DEFAULT_MAX_LOG_BYTES: u64 = 50 * 1024 * 1024; pub struct ManagedProcess {
14 pub name: String,
15 pub id: String,
16 pub command: String,
17 pub child: Option<Child>,
18 pub pid: u32,
19 pub started_at: Instant,
20 pub exit_code: Option<i32>,
21}
22
23pub struct ProcessManager {
24 processes: HashMap<String, ManagedProcess>,
25 id_counter: IdCounter,
26 session: String,
27 pub output_tx: broadcast::Sender<OutputLine>,
28}
29
30impl ProcessManager {
31 pub fn new(session: &str) -> Self {
32 let (output_tx, _) = broadcast::channel(1024);
33 Self {
34 processes: HashMap::new(),
35 id_counter: IdCounter::new(),
36 session: session.to_string(),
37 output_tx,
38 }
39 }
40
41 pub async fn spawn_process(&mut self, command: &str, name: Option<String>, cwd: Option<&str>, env: Option<&HashMap<String, String>>) -> Response {
42 let id = self.id_counter.next_id();
43 let name = name.unwrap_or_else(|| id.clone());
44
45 if self.processes.contains_key(&name) {
46 return Response::Error { code: 1, message: format!("process already exists: {}", name) };
47 }
48
49 let log_dir = paths::log_dir(&self.session);
50 let _ = std::fs::create_dir_all(&log_dir);
51
52 let mut cmd = Command::new("sh");
53 cmd.arg("-c").arg(command).stdout(Stdio::piped()).stderr(Stdio::piped());
54 if let Some(dir) = cwd {
55 cmd.current_dir(dir);
56 }
57 if let Some(env_vars) = env {
58 cmd.envs(env_vars);
59 }
60 unsafe {
62 cmd.pre_exec(|| {
63 nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), nix::unistd::Pid::from_raw(0))
64 .map_err(std::io::Error::other)?;
65 Ok(())
66 });
67 }
68
69 let mut child = match cmd.spawn()
70 {
71 Ok(c) => c,
72 Err(e) => return Response::Error { code: 1, message: format!("failed to spawn: {}", e) },
73 };
74
75 let pid = child.id().unwrap_or(0);
76
77 if let Some(stdout) = child.stdout.take() {
79 let tx = self.output_tx.clone();
80 let pname = name.clone();
81 let path = log_dir.join(format!("{}.stdout", name));
82 tokio::spawn(async move {
83 log_writer::capture_output(stdout, &path, &pname, ProtoStream::Stdout, tx, DEFAULT_MAX_LOG_BYTES).await;
84 });
85 }
86 if let Some(stderr) = child.stderr.take() {
87 let tx = self.output_tx.clone();
88 let pname = name.clone();
89 let path = log_dir.join(format!("{}.stderr", name));
90 tokio::spawn(async move {
91 log_writer::capture_output(stderr, &path, &pname, ProtoStream::Stderr, tx, DEFAULT_MAX_LOG_BYTES).await;
92 });
93 }
94
95 self.processes.insert(name.clone(), ManagedProcess {
96 name: name.clone(), id: id.clone(), command: command.to_string(),
97 child: Some(child), pid, started_at: Instant::now(), exit_code: None,
98 });
99
100 Response::RunOk { name, id, pid }
101 }
102
103 pub async fn stop_process(&mut self, target: &str) -> Response {
104 let proc = match self.find_mut(target) {
105 Some(p) => p,
106 None => return Response::Error { code: 2, message: format!("process not found: {}", target) },
107 };
108
109 if let Some(ref child) = proc.child {
110 let raw_pid = child.id().unwrap_or(0) as i32;
111 if raw_pid > 0 {
112 let pgid = nix::unistd::Pid::from_raw(raw_pid);
114 let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGTERM);
115 }
116 }
117
118 if let Some(ref mut child) = proc.child {
120 let wait_result = tokio::time::timeout(
121 Duration::from_secs(10),
122 child.wait()
123 ).await;
124
125 match wait_result {
126 Ok(Ok(status)) => {
127 proc.exit_code = status.code();
128 }
129 _ => {
130 let raw_pid = proc.pid as i32;
132 if raw_pid > 0 {
133 let pgid = nix::unistd::Pid::from_raw(raw_pid);
134 let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGKILL);
135 }
136 let _ = child.wait().await;
137 proc.exit_code = Some(-9);
138 }
139 }
140 proc.child = None;
141 }
142
143 Response::Ok { message: format!("stopped {}", target) }
144 }
145
146 pub async fn stop_all(&mut self) -> Response {
147 let names: Vec<String> = self.processes.keys().cloned().collect();
148 for name in names {
149 self.stop_process(&name).await;
150 }
151 Response::Ok { message: "all processes stopped".into() }
152 }
153
154 pub async fn restart_process(&mut self, target: &str) -> Response {
155 let (command, name) = match self.find(target) {
156 Some(p) => (p.command.clone(), p.name.clone()),
157 None => return Response::Error { code: 2, message: format!("process not found: {}", target) },
158 };
159 self.stop_process(target).await;
160 self.processes.remove(&name);
161 self.spawn_process(&command, Some(name), None, None).await
162 }
163
164 pub fn status(&mut self) -> Response {
165 self.refresh_exit_states();
166 let mut infos: Vec<ProcessInfo> = self.processes.values()
167 .map(|p| ProcessInfo {
168 name: p.name.clone(), id: p.id.clone(), pid: p.pid,
169 state: if p.child.is_some() { ProcessState::Running } else { ProcessState::Exited },
170 exit_code: p.exit_code,
171 uptime_secs: if p.child.is_some() { Some(p.started_at.elapsed().as_secs()) } else { None },
172 command: p.command.clone(),
173 })
174 .collect();
175 infos.sort_by(|a, b| a.name.cmp(&b.name));
176 Response::Status { processes: infos }
177 }
178
179 pub fn is_process_exited(&mut self, target: &str) -> Option<Option<i32>> {
180 self.refresh_exit_states();
181 self.find(target).map(|p| if p.child.is_none() { p.exit_code } else { None })
182 }
183
184 fn refresh_exit_states(&mut self) {
185 for proc in self.processes.values_mut() {
186 if proc.child.is_some() && proc.exit_code.is_none() {
187 if let Some(ref mut child) = proc.child {
188 if let Ok(Some(status)) = child.try_wait() {
189 proc.exit_code = status.code();
190 proc.child = None;
191 }
192 }
193 }
194 }
195 }
196
197 pub fn has_process(&self, target: &str) -> bool {
198 self.find(target).is_some()
199 }
200
201 fn find(&self, target: &str) -> Option<&ManagedProcess> {
202 self.processes.get(target)
203 .or_else(|| self.processes.values().find(|p| p.id == target))
204 }
205
206 fn find_mut(&mut self, target: &str) -> Option<&mut ManagedProcess> {
207 if self.processes.contains_key(target) {
208 self.processes.get_mut(target)
209 } else {
210 self.processes.values_mut().find(|p| p.id == target)
211 }
212 }
213}