1use crate::daemon::log_writer::{self, OutputLine};
2use crate::paths;
3use crate::protocol::{ProcessInfo, ProcessState, Response, Stream as ProtoStream};
4use crate::session::IdCounter;
5use std::collections::HashMap;
6use std::process::Stdio;
7use std::time::{Duration, Instant};
8use tokio::process::{Child, Command};
9use tokio::sync::broadcast;
10
11const DEFAULT_MAX_LOG_BYTES: u64 = 50 * 1024 * 1024; pub struct ManagedProcess {
14 pub name: String,
15 pub id: String,
16 pub command: String,
17 pub cwd: Option<String>,
18 pub env: HashMap<String, String>,
19 pub child: Option<Child>,
20 pub pid: u32,
21 pub started_at: Instant,
22 pub exit_code: Option<i32>,
23}
24
25pub struct ProcessManager {
26 processes: HashMap<String, ManagedProcess>,
27 id_counter: IdCounter,
28 session: String,
29 pub output_tx: broadcast::Sender<OutputLine>,
30}
31
32impl ProcessManager {
33 pub fn new(session: &str) -> Self {
34 let (output_tx, _) = broadcast::channel(1024);
35 Self {
36 processes: HashMap::new(),
37 id_counter: IdCounter::new(),
38 session: session.to_string(),
39 output_tx,
40 }
41 }
42
43 pub async fn spawn_process(
44 &mut self,
45 command: &str,
46 name: Option<String>,
47 cwd: Option<&str>,
48 env: Option<&HashMap<String, String>>,
49 ) -> Response {
50 let id = self.id_counter.next_id();
51 let name = name.unwrap_or_else(|| id.clone());
52
53 if self.processes.contains_key(&name) {
54 return Response::Error {
55 code: 1,
56 message: format!("process already exists: {}", name),
57 };
58 }
59
60 let log_dir = paths::log_dir(&self.session);
61 let _ = std::fs::create_dir_all(&log_dir);
62
63 let mut cmd = Command::new("sh");
64 cmd.arg("-c")
65 .arg(command)
66 .stdout(Stdio::piped())
67 .stderr(Stdio::piped());
68 if let Some(dir) = cwd {
69 cmd.current_dir(dir);
70 }
71 if let Some(env_vars) = env {
72 cmd.envs(env_vars);
73 }
74 unsafe {
76 cmd.pre_exec(|| {
77 nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), nix::unistd::Pid::from_raw(0))
78 .map_err(std::io::Error::other)?;
79 Ok(())
80 });
81 }
82
83 let mut child = match cmd.spawn() {
84 Ok(c) => c,
85 Err(e) => {
86 return Response::Error {
87 code: 1,
88 message: format!("failed to spawn: {}", e),
89 }
90 }
91 };
92
93 let pid = child.id().unwrap_or(0);
94
95 if let Some(stdout) = child.stdout.take() {
97 let tx = self.output_tx.clone();
98 let pname = name.clone();
99 let path = log_dir.join(format!("{}.stdout", name));
100 tokio::spawn(async move {
101 log_writer::capture_output(
102 stdout,
103 &path,
104 &pname,
105 ProtoStream::Stdout,
106 tx,
107 DEFAULT_MAX_LOG_BYTES,
108 )
109 .await;
110 });
111 }
112 if let Some(stderr) = child.stderr.take() {
113 let tx = self.output_tx.clone();
114 let pname = name.clone();
115 let path = log_dir.join(format!("{}.stderr", name));
116 tokio::spawn(async move {
117 log_writer::capture_output(
118 stderr,
119 &path,
120 &pname,
121 ProtoStream::Stderr,
122 tx,
123 DEFAULT_MAX_LOG_BYTES,
124 )
125 .await;
126 });
127 }
128
129 self.processes.insert(
130 name.clone(),
131 ManagedProcess {
132 name: name.clone(),
133 id: id.clone(),
134 command: command.to_string(),
135 cwd: cwd.map(|s| s.to_string()),
136 env: env.cloned().unwrap_or_default(),
137 child: Some(child),
138 pid,
139 started_at: Instant::now(),
140 exit_code: None,
141 },
142 );
143
144 Response::RunOk { name, id, pid }
145 }
146
147 pub async fn stop_process(&mut self, target: &str) -> Response {
148 let proc = match self.find_mut(target) {
149 Some(p) => p,
150 None => {
151 return Response::Error {
152 code: 2,
153 message: format!("process not found: {}", target),
154 }
155 }
156 };
157
158 if let Some(ref child) = proc.child {
159 let raw_pid = child.id().unwrap_or(0) as i32;
160 if raw_pid > 0 {
161 let pgid = nix::unistd::Pid::from_raw(raw_pid);
163 let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGTERM);
164 }
165 }
166
167 if let Some(ref mut child) = proc.child {
169 let wait_result = tokio::time::timeout(Duration::from_secs(10), child.wait()).await;
170
171 match wait_result {
172 Ok(Ok(status)) => {
173 proc.exit_code = status.code();
174 }
175 _ => {
176 let raw_pid = proc.pid as i32;
178 if raw_pid > 0 {
179 let pgid = nix::unistd::Pid::from_raw(raw_pid);
180 let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGKILL);
181 }
182 let _ = child.wait().await;
183 proc.exit_code = Some(-9);
184 }
185 }
186 proc.child = None;
187 }
188
189 Response::Ok {
190 message: format!("stopped {}", target),
191 }
192 }
193
194 pub async fn stop_all(&mut self) -> Response {
195 let names: Vec<String> = self.processes.keys().cloned().collect();
196 for name in names {
197 self.stop_process(&name).await;
198 }
199 self.processes.clear();
200 Response::Ok {
201 message: "all processes stopped".into(),
202 }
203 }
204
205 pub async fn restart_process(&mut self, target: &str) -> Response {
206 let (command, name, cwd, env) = match self.find(target) {
207 Some(p) => (
208 p.command.clone(),
209 p.name.clone(),
210 p.cwd.clone(),
211 p.env.clone(),
212 ),
213 None => {
214 return Response::Error {
215 code: 2,
216 message: format!("process not found: {}", target),
217 }
218 }
219 };
220 self.stop_process(target).await;
221 self.processes.remove(&name);
222 let env = if env.is_empty() { None } else { Some(env) };
223 self.spawn_process(&command, Some(name), cwd.as_deref(), env.as_ref())
224 .await
225 }
226
227 pub fn status(&mut self) -> Response {
228 self.refresh_exit_states();
229 let mut infos: Vec<ProcessInfo> = self
230 .processes
231 .values()
232 .map(|p| ProcessInfo {
233 name: p.name.clone(),
234 id: p.id.clone(),
235 pid: p.pid,
236 state: if p.child.is_some() {
237 ProcessState::Running
238 } else {
239 ProcessState::Exited
240 },
241 exit_code: p.exit_code,
242 uptime_secs: if p.child.is_some() {
243 Some(p.started_at.elapsed().as_secs())
244 } else {
245 None
246 },
247 command: p.command.clone(),
248 })
249 .collect();
250 infos.sort_by(|a, b| a.name.cmp(&b.name));
251 Response::Status { processes: infos }
252 }
253
254 pub fn is_process_exited(&mut self, target: &str) -> Option<Option<i32>> {
255 self.refresh_exit_states();
256 self.find(target)
257 .map(|p| if p.child.is_none() { p.exit_code } else { None })
258 }
259
260 fn refresh_exit_states(&mut self) {
261 for proc in self.processes.values_mut() {
262 if proc.child.is_some() && proc.exit_code.is_none() {
263 if let Some(ref mut child) = proc.child {
264 if let Ok(Some(status)) = child.try_wait() {
265 proc.exit_code = status.code();
266 proc.child = None;
267 }
268 }
269 }
270 }
271 }
272
273 pub fn has_process(&self, target: &str) -> bool {
274 self.find(target).is_some()
275 }
276
277 fn find(&self, target: &str) -> Option<&ManagedProcess> {
278 self.processes
279 .get(target)
280 .or_else(|| self.processes.values().find(|p| p.id == target))
281 }
282
283 fn find_mut(&mut self, target: &str) -> Option<&mut ManagedProcess> {
284 if self.processes.contains_key(target) {
285 self.processes.get_mut(target)
286 } else {
287 self.processes.values_mut().find(|p| p.id == target)
288 }
289 }
290}