Skip to main content

rustyclaw/
process_manager.rs

1//! Background process session management for RustyClaw.
2//!
3//! Provides a registry of background exec sessions that can be polled,
4//! written to, and killed by the agent.
5
6use std::collections::HashMap;
7use std::io::{Read, Write};
8use std::process::{Child, Command, Stdio};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, Instant};
11
12/// Unique identifier for a background session.
13pub type SessionId = String;
14
15/// Generate a short human-readable session ID.
16fn generate_session_id() -> SessionId {
17    use std::time::{SystemTime, UNIX_EPOCH};
18    let timestamp = SystemTime::now()
19        .duration_since(UNIX_EPOCH)
20        .unwrap_or_default()
21        .as_millis();
22
23    // Simple adjective-noun pattern for readability
24    let adjectives = ["warm", "cool", "swift", "calm", "bold", "keen", "bright", "quick"];
25    let nouns = ["rook", "hawk", "wolf", "bear", "fox", "owl", "lynx", "crow"];
26
27    let adj_idx = (timestamp % adjectives.len() as u128) as usize;
28    let noun_idx = ((timestamp / 8) % nouns.len() as u128) as usize;
29
30    format!("{}-{}", adjectives[adj_idx], nouns[noun_idx])
31}
32
33/// Status of a background session.
34#[derive(Debug, Clone, PartialEq)]
35pub enum SessionStatus {
36    /// Process is still running.
37    Running,
38    /// Process exited with the given code.
39    Exited(i32),
40    /// Process was killed by a signal.
41    Killed,
42    /// Process timed out and was killed.
43    TimedOut,
44}
45
46/// A background exec session.
47pub struct ExecSession {
48    /// Session identifier.
49    pub id: SessionId,
50    /// The command that was executed.
51    pub command: String,
52    /// Working directory.
53    pub working_dir: String,
54    /// When the session started.
55    pub started_at: Instant,
56    /// Timeout duration (if set).
57    pub timeout: Option<Duration>,
58    /// Current status.
59    pub status: SessionStatus,
60    /// Accumulated stdout output.
61    stdout_buffer: Vec<u8>,
62    /// Accumulated stderr output.
63    stderr_buffer: Vec<u8>,
64    /// Combined output (interleaved stdout + stderr for display).
65    combined_output: String,
66    /// Last read position for polling.
67    last_read_pos: usize,
68    /// The child process handle.
69    child: Option<Child>,
70    /// Exit code (set when process exits).
71    exit_code: Option<i32>,
72}
73
74impl ExecSession {
75    /// Create a new session for a running process.
76    pub fn new(
77        command: String,
78        working_dir: String,
79        timeout: Option<Duration>,
80        child: Child,
81    ) -> Self {
82        Self {
83            id: generate_session_id(),
84            command,
85            working_dir,
86            started_at: Instant::now(),
87            timeout,
88            status: SessionStatus::Running,
89            stdout_buffer: Vec::new(),
90            stderr_buffer: Vec::new(),
91            combined_output: String::new(),
92            last_read_pos: 0,
93            child: Some(child),
94            exit_code: None,
95        }
96    }
97
98    /// Check if the process has exceeded its timeout.
99    pub fn is_timed_out(&self) -> bool {
100        if let Some(timeout) = self.timeout {
101            self.started_at.elapsed() > timeout
102        } else {
103            false
104        }
105    }
106
107    /// Get the elapsed time since the session started.
108    pub fn elapsed(&self) -> Duration {
109        self.started_at.elapsed()
110    }
111
112    /// Append output to the combined buffer.
113    pub fn append_output(&mut self, text: &str) {
114        self.combined_output.push_str(text);
115    }
116
117    /// Get new output since the last poll.
118    pub fn poll_output(&mut self) -> &str {
119        let new_output = &self.combined_output[self.last_read_pos..];
120        self.last_read_pos = self.combined_output.len();
121        new_output
122    }
123
124    /// Get the full output log.
125    pub fn full_output(&self) -> &str {
126        &self.combined_output
127    }
128
129    /// Get output with line-based offset and limit.
130    pub fn log_output(&self, offset: Option<usize>, limit: Option<usize>) -> String {
131        let lines: Vec<&str> = self.combined_output.lines().collect();
132        let total = lines.len();
133
134        // If offset is None, grab the last `limit` lines
135        let (start, end) = match (offset, limit) {
136            (None, Some(lim)) => {
137                let start = total.saturating_sub(lim);
138                (start, total)
139            }
140            (Some(off), Some(lim)) => {
141                let start = off.min(total);
142                let end = (start + lim).min(total);
143                (start, end)
144            }
145            (Some(off), None) => {
146                let start = off.min(total);
147                (start, total)
148            }
149            (None, None) => (0, total),
150        };
151
152        lines[start..end].join("\n")
153    }
154
155    /// Try to read any available output from the child process.
156    /// Returns true if any output was read.
157    pub fn try_read_output(&mut self) -> bool {
158        let Some(ref mut child) = self.child else {
159            return false;
160        };
161
162        let mut read_any = false;
163
164        // Try to read from stdout
165        if let Some(ref mut stdout) = child.stdout {
166            let mut buf = [0u8; 4096];
167            // Non-blocking read attempt
168            if let Ok(n) = read_nonblocking(stdout, &mut buf) {
169                if n > 0 {
170                    let text = String::from_utf8_lossy(&buf[..n]);
171                    self.combined_output.push_str(&text);
172                    self.stdout_buffer.extend_from_slice(&buf[..n]);
173                    read_any = true;
174                }
175            }
176        }
177
178        // Try to read from stderr
179        if let Some(ref mut stderr) = child.stderr {
180            let mut buf = [0u8; 4096];
181            if let Ok(n) = read_nonblocking(stderr, &mut buf) {
182                if n > 0 {
183                    let text = String::from_utf8_lossy(&buf[..n]);
184                    self.combined_output.push_str(&text);
185                    self.stderr_buffer.extend_from_slice(&buf[..n]);
186                    read_any = true;
187                }
188            }
189        }
190
191        read_any
192    }
193
194    /// Check if the process has exited and update status.
195    pub fn check_exit(&mut self) -> bool {
196        let Some(ref mut child) = self.child else {
197            return true; // Already exited
198        };
199
200        match child.try_wait() {
201            Ok(Some(status)) => {
202                self.exit_code = status.code();
203                self.status = if let Some(code) = status.code() {
204                    SessionStatus::Exited(code)
205                } else {
206                    SessionStatus::Killed
207                };
208
209                // Read any remaining output
210                self.try_read_output();
211
212                true
213            }
214            Ok(None) => {
215                // Still running, check timeout
216                let timed_out = self.timeout
217                    .map(|t| self.started_at.elapsed() > t)
218                    .unwrap_or(false);
219                if timed_out {
220                    let _ = child.kill();
221                    self.status = SessionStatus::TimedOut;
222                    self.exit_code = None;
223                    return true;
224                }
225                false
226            }
227            Err(_) => {
228                self.status = SessionStatus::Killed;
229                true
230            }
231        }
232    }
233
234    /// Write data to the process stdin.
235    pub fn write_stdin(&mut self, data: &str) -> Result<(), String> {
236        let Some(ref mut child) = self.child else {
237            return Err("Process has exited".to_string());
238        };
239
240        let Some(ref mut stdin) = child.stdin else {
241            return Err("Process stdin not available".to_string());
242        };
243
244        stdin
245            .write_all(data.as_bytes())
246            .map_err(|e| format!("Failed to write to stdin: {}", e))?;
247        stdin
248            .flush()
249            .map_err(|e| format!("Failed to flush stdin: {}", e))?;
250
251        Ok(())
252    }
253
254    /// Kill the process.
255    pub fn kill(&mut self) -> Result<(), String> {
256        let Some(ref mut child) = self.child else {
257            return Ok(()); // Already gone
258        };
259
260        child
261            .kill()
262            .map_err(|e| format!("Failed to kill process: {}", e))?;
263
264        self.status = SessionStatus::Killed;
265        Ok(())
266    }
267}
268
269/// Non-blocking read helper (Unix-specific for now).
270#[cfg(unix)]
271fn read_nonblocking<R: Read + std::os::unix::io::AsRawFd>(
272    reader: &mut R,
273    buf: &mut [u8],
274) -> std::io::Result<usize> {
275    let fd = reader.as_raw_fd();
276
277    // Set non-blocking
278    unsafe {
279        let flags = libc::fcntl(fd, libc::F_GETFL);
280        libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
281    }
282
283    let result = reader.read(buf);
284
285    // Restore blocking mode
286    unsafe {
287        let flags = libc::fcntl(fd, libc::F_GETFL);
288        libc::fcntl(fd, libc::F_SETFL, flags & !libc::O_NONBLOCK);
289    }
290
291    match result {
292        Ok(n) => Ok(n),
293        Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(0),
294        Err(e) => Err(e),
295    }
296}
297
298#[cfg(not(unix))]
299fn read_nonblocking<R: Read>(reader: &mut R, buf: &mut [u8]) -> std::io::Result<usize> {
300    // On non-Unix, just try a regular read with a short timeout
301    // This is a simplified fallback
302    Ok(0)
303}
304
305/// Global process session manager.
306pub struct ProcessManager {
307    sessions: HashMap<SessionId, ExecSession>,
308}
309
310impl ProcessManager {
311    /// Create a new process manager.
312    pub fn new() -> Self {
313        Self {
314            sessions: HashMap::new(),
315        }
316    }
317
318    /// Start a new background process.
319    pub fn spawn(
320        &mut self,
321        command: &str,
322        working_dir: &str,
323        timeout_secs: Option<u64>,
324    ) -> Result<SessionId, String> {
325        let timeout = timeout_secs.map(Duration::from_secs);
326
327        let child = Command::new("sh")
328            .arg("-c")
329            .arg(command)
330            .current_dir(working_dir)
331            .stdin(Stdio::piped())
332            .stdout(Stdio::piped())
333            .stderr(Stdio::piped())
334            .spawn()
335            .map_err(|e| format!("Failed to spawn process: {}", e))?;
336
337        let session = ExecSession::new(
338            command.to_string(),
339            working_dir.to_string(),
340            timeout,
341            child,
342        );
343
344        let id = session.id.clone();
345        self.sessions.insert(id.clone(), session);
346
347        Ok(id)
348    }
349
350    /// Insert an existing session into the manager.
351    pub fn insert(&mut self, session: ExecSession) -> SessionId {
352        let id = session.id.clone();
353        self.sessions.insert(id.clone(), session);
354        id
355    }
356
357    /// Get a session by ID.
358    pub fn get(&self, id: &str) -> Option<&ExecSession> {
359        self.sessions.get(id)
360    }
361
362    /// Get a mutable session by ID.
363    pub fn get_mut(&mut self, id: &str) -> Option<&mut ExecSession> {
364        self.sessions.get_mut(id)
365    }
366
367    /// List all sessions.
368    pub fn list(&self) -> Vec<&ExecSession> {
369        self.sessions.values().collect()
370    }
371
372    /// List active (running) sessions.
373    pub fn list_active(&self) -> Vec<&ExecSession> {
374        self.sessions
375            .values()
376            .filter(|s| s.status == SessionStatus::Running)
377            .collect()
378    }
379
380    /// Remove a session.
381    pub fn remove(&mut self, id: &str) -> Option<ExecSession> {
382        self.sessions.remove(id)
383    }
384
385    /// Poll all sessions for updates.
386    pub fn poll_all(&mut self) {
387        for session in self.sessions.values_mut() {
388            if session.status == SessionStatus::Running {
389                session.try_read_output();
390                session.check_exit();
391            }
392        }
393    }
394
395    /// Clear completed sessions.
396    pub fn clear_completed(&mut self) {
397        self.sessions.retain(|_, s| s.status == SessionStatus::Running);
398    }
399}
400
401impl Default for ProcessManager {
402    fn default() -> Self {
403        Self::new()
404    }
405}
406
407/// Thread-safe process manager.
408pub type SharedProcessManager = Arc<Mutex<ProcessManager>>;
409
410/// Create a new shared process manager.
411pub fn new_shared_manager() -> SharedProcessManager {
412    Arc::new(Mutex::new(ProcessManager::new()))
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418
419    #[test]
420    fn test_session_id_generation() {
421        let id1 = generate_session_id();
422        let id2 = generate_session_id();
423        // IDs contain a hyphen
424        assert!(id1.contains('-'));
425        assert!(id2.contains('-'));
426    }
427
428    #[test]
429    fn test_process_manager_creation() {
430        let manager = ProcessManager::new();
431        assert!(manager.list().is_empty());
432    }
433
434    #[test]
435    fn test_log_output_with_limits() {
436        let session = ExecSession {
437            id: "test".to_string(),
438            command: "echo test".to_string(),
439            working_dir: "/tmp".to_string(),
440            started_at: Instant::now(),
441            timeout: None,
442            status: SessionStatus::Running,
443            stdout_buffer: Vec::new(),
444            stderr_buffer: Vec::new(),
445            combined_output: "line1\nline2\nline3\nline4\nline5\n".to_string(),
446            last_read_pos: 0,
447            child: None,
448            exit_code: None,
449        };
450
451        // Get last 2 lines
452        let output = session.log_output(None, Some(2));
453        assert_eq!(output, "line4\nline5");
454
455        // Get lines 1-3 (0-indexed offset)
456        let output = session.log_output(Some(1), Some(2));
457        assert_eq!(output, "line2\nline3");
458    }
459}