1use std::collections::HashMap;
7use std::io::{Read, Write};
8use std::process::{Child, Command, Stdio};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, Instant};
11
12pub type SessionId = String;
14
15fn generate_session_id() -> SessionId {
17 use std::time::{SystemTime, UNIX_EPOCH};
18 let timestamp = SystemTime::now()
19 .duration_since(UNIX_EPOCH)
20 .unwrap_or_default()
21 .as_millis();
22
23 let adjectives = ["warm", "cool", "swift", "calm", "bold", "keen", "bright", "quick"];
25 let nouns = ["rook", "hawk", "wolf", "bear", "fox", "owl", "lynx", "crow"];
26
27 let adj_idx = (timestamp % adjectives.len() as u128) as usize;
28 let noun_idx = ((timestamp / 8) % nouns.len() as u128) as usize;
29
30 format!("{}-{}", adjectives[adj_idx], nouns[noun_idx])
31}
32
33#[derive(Debug, Clone, PartialEq)]
35pub enum SessionStatus {
36 Running,
38 Exited(i32),
40 Killed,
42 TimedOut,
44}
45
46pub struct ExecSession {
48 pub id: SessionId,
50 pub command: String,
52 pub working_dir: String,
54 pub started_at: Instant,
56 pub timeout: Option<Duration>,
58 pub status: SessionStatus,
60 stdout_buffer: Vec<u8>,
62 stderr_buffer: Vec<u8>,
64 combined_output: String,
66 last_read_pos: usize,
68 child: Option<Child>,
70 exit_code: Option<i32>,
72}
73
74impl ExecSession {
75 pub fn new(
77 command: String,
78 working_dir: String,
79 timeout: Option<Duration>,
80 child: Child,
81 ) -> Self {
82 Self {
83 id: generate_session_id(),
84 command,
85 working_dir,
86 started_at: Instant::now(),
87 timeout,
88 status: SessionStatus::Running,
89 stdout_buffer: Vec::new(),
90 stderr_buffer: Vec::new(),
91 combined_output: String::new(),
92 last_read_pos: 0,
93 child: Some(child),
94 exit_code: None,
95 }
96 }
97
98 pub fn is_timed_out(&self) -> bool {
100 if let Some(timeout) = self.timeout {
101 self.started_at.elapsed() > timeout
102 } else {
103 false
104 }
105 }
106
107 pub fn elapsed(&self) -> Duration {
109 self.started_at.elapsed()
110 }
111
112 pub fn append_output(&mut self, text: &str) {
114 self.combined_output.push_str(text);
115 }
116
117 pub fn poll_output(&mut self) -> &str {
119 let new_output = &self.combined_output[self.last_read_pos..];
120 self.last_read_pos = self.combined_output.len();
121 new_output
122 }
123
124 pub fn full_output(&self) -> &str {
126 &self.combined_output
127 }
128
129 pub fn log_output(&self, offset: Option<usize>, limit: Option<usize>) -> String {
131 let lines: Vec<&str> = self.combined_output.lines().collect();
132 let total = lines.len();
133
134 let (start, end) = match (offset, limit) {
136 (None, Some(lim)) => {
137 let start = total.saturating_sub(lim);
138 (start, total)
139 }
140 (Some(off), Some(lim)) => {
141 let start = off.min(total);
142 let end = (start + lim).min(total);
143 (start, end)
144 }
145 (Some(off), None) => {
146 let start = off.min(total);
147 (start, total)
148 }
149 (None, None) => (0, total),
150 };
151
152 lines[start..end].join("\n")
153 }
154
155 pub fn try_read_output(&mut self) -> bool {
158 let Some(ref mut child) = self.child else {
159 return false;
160 };
161
162 let mut read_any = false;
163
164 if let Some(ref mut stdout) = child.stdout {
166 let mut buf = [0u8; 4096];
167 if let Ok(n) = read_nonblocking(stdout, &mut buf) {
169 if n > 0 {
170 let text = String::from_utf8_lossy(&buf[..n]);
171 self.combined_output.push_str(&text);
172 self.stdout_buffer.extend_from_slice(&buf[..n]);
173 read_any = true;
174 }
175 }
176 }
177
178 if let Some(ref mut stderr) = child.stderr {
180 let mut buf = [0u8; 4096];
181 if let Ok(n) = read_nonblocking(stderr, &mut buf) {
182 if n > 0 {
183 let text = String::from_utf8_lossy(&buf[..n]);
184 self.combined_output.push_str(&text);
185 self.stderr_buffer.extend_from_slice(&buf[..n]);
186 read_any = true;
187 }
188 }
189 }
190
191 read_any
192 }
193
194 pub fn check_exit(&mut self) -> bool {
196 let Some(ref mut child) = self.child else {
197 return true; };
199
200 match child.try_wait() {
201 Ok(Some(status)) => {
202 self.exit_code = status.code();
203 self.status = if let Some(code) = status.code() {
204 SessionStatus::Exited(code)
205 } else {
206 SessionStatus::Killed
207 };
208
209 self.try_read_output();
211
212 true
213 }
214 Ok(None) => {
215 let timed_out = self.timeout
217 .map(|t| self.started_at.elapsed() > t)
218 .unwrap_or(false);
219 if timed_out {
220 let _ = child.kill();
221 self.status = SessionStatus::TimedOut;
222 self.exit_code = None;
223 return true;
224 }
225 false
226 }
227 Err(_) => {
228 self.status = SessionStatus::Killed;
229 true
230 }
231 }
232 }
233
234 pub fn write_stdin(&mut self, data: &str) -> Result<(), String> {
236 let Some(ref mut child) = self.child else {
237 return Err("Process has exited".to_string());
238 };
239
240 let Some(ref mut stdin) = child.stdin else {
241 return Err("Process stdin not available".to_string());
242 };
243
244 stdin
245 .write_all(data.as_bytes())
246 .map_err(|e| format!("Failed to write to stdin: {}", e))?;
247 stdin
248 .flush()
249 .map_err(|e| format!("Failed to flush stdin: {}", e))?;
250
251 Ok(())
252 }
253
254 pub fn kill(&mut self) -> Result<(), String> {
256 let Some(ref mut child) = self.child else {
257 return Ok(()); };
259
260 child
261 .kill()
262 .map_err(|e| format!("Failed to kill process: {}", e))?;
263
264 self.status = SessionStatus::Killed;
265 Ok(())
266 }
267}
268
269#[cfg(unix)]
271fn read_nonblocking<R: Read + std::os::unix::io::AsRawFd>(
272 reader: &mut R,
273 buf: &mut [u8],
274) -> std::io::Result<usize> {
275 let fd = reader.as_raw_fd();
276
277 unsafe {
279 let flags = libc::fcntl(fd, libc::F_GETFL);
280 libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
281 }
282
283 let result = reader.read(buf);
284
285 unsafe {
287 let flags = libc::fcntl(fd, libc::F_GETFL);
288 libc::fcntl(fd, libc::F_SETFL, flags & !libc::O_NONBLOCK);
289 }
290
291 match result {
292 Ok(n) => Ok(n),
293 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(0),
294 Err(e) => Err(e),
295 }
296}
297
298#[cfg(not(unix))]
299fn read_nonblocking<R: Read>(reader: &mut R, buf: &mut [u8]) -> std::io::Result<usize> {
300 Ok(0)
303}
304
305pub struct ProcessManager {
307 sessions: HashMap<SessionId, ExecSession>,
308}
309
310impl ProcessManager {
311 pub fn new() -> Self {
313 Self {
314 sessions: HashMap::new(),
315 }
316 }
317
318 pub fn spawn(
320 &mut self,
321 command: &str,
322 working_dir: &str,
323 timeout_secs: Option<u64>,
324 ) -> Result<SessionId, String> {
325 let timeout = timeout_secs.map(Duration::from_secs);
326
327 let child = Command::new("sh")
328 .arg("-c")
329 .arg(command)
330 .current_dir(working_dir)
331 .stdin(Stdio::piped())
332 .stdout(Stdio::piped())
333 .stderr(Stdio::piped())
334 .spawn()
335 .map_err(|e| format!("Failed to spawn process: {}", e))?;
336
337 let session = ExecSession::new(
338 command.to_string(),
339 working_dir.to_string(),
340 timeout,
341 child,
342 );
343
344 let id = session.id.clone();
345 self.sessions.insert(id.clone(), session);
346
347 Ok(id)
348 }
349
350 pub fn insert(&mut self, session: ExecSession) -> SessionId {
352 let id = session.id.clone();
353 self.sessions.insert(id.clone(), session);
354 id
355 }
356
357 pub fn get(&self, id: &str) -> Option<&ExecSession> {
359 self.sessions.get(id)
360 }
361
362 pub fn get_mut(&mut self, id: &str) -> Option<&mut ExecSession> {
364 self.sessions.get_mut(id)
365 }
366
367 pub fn list(&self) -> Vec<&ExecSession> {
369 self.sessions.values().collect()
370 }
371
372 pub fn list_active(&self) -> Vec<&ExecSession> {
374 self.sessions
375 .values()
376 .filter(|s| s.status == SessionStatus::Running)
377 .collect()
378 }
379
380 pub fn remove(&mut self, id: &str) -> Option<ExecSession> {
382 self.sessions.remove(id)
383 }
384
385 pub fn poll_all(&mut self) {
387 for session in self.sessions.values_mut() {
388 if session.status == SessionStatus::Running {
389 session.try_read_output();
390 session.check_exit();
391 }
392 }
393 }
394
395 pub fn clear_completed(&mut self) {
397 self.sessions.retain(|_, s| s.status == SessionStatus::Running);
398 }
399}
400
401impl Default for ProcessManager {
402 fn default() -> Self {
403 Self::new()
404 }
405}
406
407pub type SharedProcessManager = Arc<Mutex<ProcessManager>>;
409
410pub fn new_shared_manager() -> SharedProcessManager {
412 Arc::new(Mutex::new(ProcessManager::new()))
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418
419 #[test]
420 fn test_session_id_generation() {
421 let id1 = generate_session_id();
422 let id2 = generate_session_id();
423 assert!(id1.contains('-'));
425 assert!(id2.contains('-'));
426 }
427
428 #[test]
429 fn test_process_manager_creation() {
430 let manager = ProcessManager::new();
431 assert!(manager.list().is_empty());
432 }
433
434 #[test]
435 fn test_log_output_with_limits() {
436 let session = ExecSession {
437 id: "test".to_string(),
438 command: "echo test".to_string(),
439 working_dir: "/tmp".to_string(),
440 started_at: Instant::now(),
441 timeout: None,
442 status: SessionStatus::Running,
443 stdout_buffer: Vec::new(),
444 stderr_buffer: Vec::new(),
445 combined_output: "line1\nline2\nline3\nline4\nline5\n".to_string(),
446 last_read_pos: 0,
447 child: None,
448 exit_code: None,
449 };
450
451 let output = session.log_output(None, Some(2));
453 assert_eq!(output, "line4\nline5");
454
455 let output = session.log_output(Some(1), Some(2));
457 assert_eq!(output, "line2\nline3");
458 }
459}