1use crate::daemon::log_writer::{self, OutputLine};
2use crate::daemon::port_allocator::PortAllocator;
3use crate::paths;
4use crate::protocol::{ErrorCode, ProcessInfo, ProcessState, Response, Stream as ProtoStream, process_url};
5use crate::session::IdCounter;
6use std::collections::HashMap;
7use std::process::Stdio;
8use std::time::{Duration, Instant};
9use tokio::process::{Child, Command};
10use tokio::sync::broadcast;
11
12const DEFAULT_MAX_LOG_BYTES: u64 = 50 * 1024 * 1024; #[must_use]
17pub fn is_valid_dns_label(name: &str) -> bool {
18 if name.is_empty() || name.len() > 63 {
19 return false;
20 }
21 if name.starts_with('-') || name.ends_with('-') {
22 return false;
23 }
24 name.chars()
25 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-')
26}
27
28pub struct ManagedProcess {
29 pub name: String,
30 pub id: String,
31 pub command: String,
32 pub cwd: Option<String>,
33 pub env: HashMap<String, String>,
34 pub child: Option<Child>,
35 pub pid: u32,
36 pub started_at: Instant,
37 pub exit_code: Option<i32>,
38 pub port: Option<u16>,
39}
40
41pub struct ProcessManager {
42 processes: HashMap<String, ManagedProcess>,
43 id_counter: IdCounter,
44 session: String,
45 pub output_tx: broadcast::Sender<OutputLine>,
46 port_allocator: PortAllocator,
47}
48
49impl ProcessManager {
50 pub fn new(session: &str) -> Self {
51 let (output_tx, _) = broadcast::channel(1024);
52 Self {
53 processes: HashMap::new(),
54 id_counter: IdCounter::new(),
55 session: session.to_string(),
56 output_tx,
57 port_allocator: PortAllocator::new(),
58 }
59 }
60
61 #[allow(unsafe_code, clippy::unused_async)]
62 pub async fn spawn_process(
63 &mut self,
64 command: &str,
65 name: Option<String>,
66 cwd: Option<&str>,
67 env: Option<&HashMap<String, String>>,
68 port: Option<u16>,
69 ) -> Response {
70 let id = self.id_counter.next_id();
71 let name = name.unwrap_or_else(|| id.clone());
72
73 if name.contains('/') || name.contains('\\') || name.contains("..") || name.contains('\0') {
75 return Response::Error {
76 code: ErrorCode::General,
77 message: format!("invalid process name: {}", name),
78 };
79 }
80
81 if self.port_allocator.is_proxy_enabled() && !is_valid_dns_label(&name) {
83 return Response::Error {
84 code: ErrorCode::General,
85 message: format!(
86 "invalid process name for proxy: '{}' (must be lowercase alphanumeric/hyphens, max 63 chars)",
87 name
88 ),
89 };
90 }
91
92 let resolved_port = if let Some(p) = port {
94 Some(p)
95 } else if self.port_allocator.is_proxy_enabled() {
96 let assigned: std::collections::HashSet<u16> =
97 self.processes.values().filter_map(|p| p.port).collect();
98 match self.port_allocator.auto_assign_port(&assigned) {
99 Ok(p) => Some(p),
100 Err(e) => {
101 return Response::Error {
102 code: ErrorCode::General,
103 message: e.to_string(),
104 };
105 }
106 }
107 } else {
108 None
109 };
110
111 if self.processes.contains_key(&name) {
112 return Response::Error {
113 code: ErrorCode::General,
114 message: format!("process already exists: {}", name),
115 };
116 }
117
118 let log_dir = paths::log_dir(&self.session);
119 let _ = std::fs::create_dir_all(&log_dir);
120
121 let mut cmd = Command::new("sh");
122 cmd.arg("-c")
123 .arg(command)
124 .stdout(Stdio::piped())
125 .stderr(Stdio::piped());
126 if let Some(dir) = cwd {
127 cmd.current_dir(dir);
128 }
129 if let Some(p) = resolved_port {
130 let mut merged_env: HashMap<String, String> = HashMap::new();
132 merged_env.insert("PORT".to_string(), p.to_string());
133 merged_env.insert("HOST".to_string(), "127.0.0.1".to_string());
134 if let Some(env_vars) = env {
135 for (k, v) in env_vars {
136 merged_env.insert(k.clone(), v.clone());
137 }
138 }
139 cmd.envs(&merged_env);
140 } else if let Some(env_vars) = env {
141 cmd.envs(env_vars);
142 }
143 unsafe {
147 cmd.pre_exec(|| {
148 nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), nix::unistd::Pid::from_raw(0))
149 .map_err(std::io::Error::other)?;
150 Ok(())
151 });
152 }
153
154 let mut child = match cmd.spawn() {
155 Ok(c) => c,
156 Err(e) => {
157 return Response::Error {
158 code: ErrorCode::General,
159 message: format!("failed to spawn: {}", e),
160 };
161 }
162 };
163
164 let pid = child.id().unwrap_or(0);
165
166 if let Some(stdout) = child.stdout.take() {
168 let tx = self.output_tx.clone();
169 let pname = name.clone();
170 let path = log_dir.join(format!("{}.stdout", name));
171 tokio::spawn(async move {
172 log_writer::capture_output(
173 stdout,
174 &path,
175 &pname,
176 ProtoStream::Stdout,
177 tx,
178 DEFAULT_MAX_LOG_BYTES,
179 log_writer::DEFAULT_MAX_ROTATED_FILES,
180 )
181 .await;
182 });
183 }
184 if let Some(stderr) = child.stderr.take() {
185 let tx = self.output_tx.clone();
186 let pname = name.clone();
187 let path = log_dir.join(format!("{}.stderr", name));
188 tokio::spawn(async move {
189 log_writer::capture_output(
190 stderr,
191 &path,
192 &pname,
193 ProtoStream::Stderr,
194 tx,
195 DEFAULT_MAX_LOG_BYTES,
196 log_writer::DEFAULT_MAX_ROTATED_FILES,
197 )
198 .await;
199 });
200 }
201
202 self.processes.insert(
203 name.clone(),
204 ManagedProcess {
205 name: name.clone(),
206 id: id.clone(),
207 command: command.to_string(),
208 cwd: cwd.map(std::string::ToString::to_string),
209 env: env.cloned().unwrap_or_default(),
210 child: Some(child),
211 pid,
212 started_at: Instant::now(),
213 exit_code: None,
214 port: resolved_port,
215 },
216 );
217
218 let url = resolved_port.map(|p| process_url(&name, p, None));
219 Response::RunOk {
220 name,
221 id,
222 pid,
223 port: resolved_port,
224 url,
225 }
226 }
227
228 pub async fn stop_process(&mut self, target: &str) -> Response {
229 let proc = match self.find_mut(target) {
230 Some(p) => p,
231 None => {
232 return Response::Error {
233 code: ErrorCode::NotFound,
234 message: format!("process not found: {}", target),
235 };
236 }
237 };
238
239 if let Some(ref child) = proc.child {
240 let raw_pid = child.id().unwrap_or(0) as i32;
241 if raw_pid > 0 {
242 let pgid = nix::unistd::Pid::from_raw(raw_pid);
244 let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGTERM);
245 }
246 }
247
248 if let Some(ref mut child) = proc.child {
250 let wait_result = tokio::time::timeout(Duration::from_secs(10), child.wait()).await;
251
252 match wait_result {
253 Ok(Ok(status)) => {
254 proc.exit_code = status.code();
255 }
256 _ => {
257 let raw_pid = proc.pid as i32;
259 if raw_pid > 0 {
260 let pgid = nix::unistd::Pid::from_raw(raw_pid);
261 let _ = nix::sys::signal::killpg(pgid, nix::sys::signal::Signal::SIGKILL);
262 }
263 let _ = child.wait().await;
264 proc.exit_code = Some(-9);
265 }
266 }
267 proc.child = None;
268 }
269
270 Response::Ok {
271 message: format!("stopped {}", target),
272 }
273 }
274
275 pub async fn stop_all(&mut self) -> Response {
276 let names: Vec<String> = self.processes.keys().cloned().collect();
277 for name in names {
278 let _ = self.stop_process(&name).await;
279 }
280 self.processes.clear();
281 Response::Ok {
282 message: "all processes stopped".into(),
283 }
284 }
285
286 pub async fn restart_process(&mut self, target: &str) -> Response {
287 let (command, name, cwd, env, port) = match self.find(target) {
288 Some(p) => (
289 p.command.clone(),
290 p.name.clone(),
291 p.cwd.clone(),
292 p.env.clone(),
293 p.port,
294 ),
295 None => {
296 return Response::Error {
297 code: ErrorCode::NotFound,
298 message: format!("process not found: {}", target),
299 };
300 }
301 };
302 let _ = self.stop_process(target).await;
303 self.processes.remove(&name);
304 let env = if env.is_empty() { None } else { Some(env) };
305 self.spawn_process(&command, Some(name), cwd.as_deref(), env.as_ref(), port)
306 .await
307 }
308
309 pub fn enable_proxy(&mut self) {
310 self.port_allocator.enable_proxy();
311 }
312
313 pub fn status(&mut self) -> Response {
314 self.refresh_exit_states();
315 Response::Status {
316 processes: self.build_process_infos(),
317 }
318 }
319
320 pub fn is_process_exited(&mut self, target: &str) -> Option<Option<i32>> {
323 self.refresh_exit_states();
324 self.find(target).and_then(|p| {
325 if p.child.is_none() {
326 Some(p.exit_code)
327 } else {
328 None
329 }
330 })
331 }
332
333 fn refresh_exit_states(&mut self) {
334 for proc in self.processes.values_mut() {
335 if proc.child.is_some() && proc.exit_code.is_none() {
336 if let Some(ref mut child) = proc.child {
337 if let Ok(Some(status)) = child.try_wait() {
338 proc.exit_code = status.code();
339 proc.child = None;
340 }
341 }
342 }
343 }
344 }
345
346 pub fn session_name(&self) -> &str {
347 &self.session
348 }
349
350 pub fn has_process(&self, target: &str) -> bool {
351 self.find(target).is_some()
352 }
353
354 pub fn running_ports(&self) -> HashMap<String, u16> {
356 self.processes
357 .iter()
358 .filter_map(|(name, p)| {
359 if p.child.is_some() {
360 p.port.map(|port| (name.clone(), port))
361 } else {
362 None
363 }
364 })
365 .collect()
366 }
367
368 pub fn status_snapshot(&self) -> Response {
371 Response::Status {
372 processes: self.build_process_infos(),
373 }
374 }
375
376 fn build_process_infos(&self) -> Vec<ProcessInfo> {
377 let mut infos: Vec<ProcessInfo> = self
378 .processes
379 .values()
380 .map(|p| ProcessInfo {
381 name: p.name.clone(),
382 id: p.id.clone(),
383 pid: p.pid,
384 state: if p.child.is_some() {
385 ProcessState::Running
386 } else {
387 ProcessState::Exited
388 },
389 exit_code: p.exit_code,
390 uptime_secs: if p.child.is_some() {
391 Some(p.started_at.elapsed().as_secs())
392 } else {
393 None
394 },
395 command: p.command.clone(),
396 port: p.port,
397 url: p.port.map(|port| process_url(&p.name, port, None)),
398 })
399 .collect();
400 infos.sort_by(|a, b| a.name.cmp(&b.name));
401 infos
402 }
403
404 fn find(&self, target: &str) -> Option<&ManagedProcess> {
405 self.processes
406 .get(target)
407 .or_else(|| self.processes.values().find(|p| p.id == target))
408 }
409
410 fn find_mut(&mut self, target: &str) -> Option<&mut ManagedProcess> {
411 if self.processes.contains_key(target) {
412 self.processes.get_mut(target)
413 } else {
414 self.processes.values_mut().find(|p| p.id == target)
415 }
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422
423 #[test]
424 fn test_valid_dns_labels() {
425 assert!(is_valid_dns_label("api"));
426 assert!(is_valid_dns_label("my-app"));
427 assert!(is_valid_dns_label("a"));
428 assert!(is_valid_dns_label("a1"));
429 assert!(is_valid_dns_label("123"));
430 }
431
432 #[test]
433 fn test_invalid_dns_labels() {
434 assert!(!is_valid_dns_label(""));
435 assert!(!is_valid_dns_label("-start"));
436 assert!(!is_valid_dns_label("end-"));
437 assert!(!is_valid_dns_label("UPPER"));
438 assert!(!is_valid_dns_label("has.dot"));
439 assert!(!is_valid_dns_label("has space"));
440 assert!(!is_valid_dns_label(&"a".repeat(64))); assert!(!is_valid_dns_label("has_underscore"));
442 }
443}