Skip to main content

rustyclaw_core/
process_manager.rs

1//! Background process session management for RustyClaw.
2//!
3//! Provides a registry of background exec sessions that can be polled,
4//! written to, and killed by the agent.
5
6use std::collections::HashMap;
7use std::io::{Read, Write};
8use std::process::{Child, Command, Stdio};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, Instant};
11
12/// Unique identifier for a background session.
13pub type SessionId = String;
14
15/// Generate a short human-readable session ID.
16fn generate_session_id() -> SessionId {
17    use std::time::{SystemTime, UNIX_EPOCH};
18    let timestamp = SystemTime::now()
19        .duration_since(UNIX_EPOCH)
20        .unwrap_or_default()
21        .as_millis();
22
23    // Simple adjective-noun pattern for readability
24    let adjectives = ["warm", "cool", "swift", "calm", "bold", "keen", "bright", "quick"];
25    let nouns = ["rook", "hawk", "wolf", "bear", "fox", "owl", "lynx", "crow"];
26
27    let adj_idx = (timestamp % adjectives.len() as u128) as usize;
28    let noun_idx = ((timestamp / 8) % nouns.len() as u128) as usize;
29
30    format!("{}-{}", adjectives[adj_idx], nouns[noun_idx])
31}
32
33/// Status of a background session.
34#[derive(Debug, Clone, PartialEq)]
35pub enum SessionStatus {
36    /// Process is still running.
37    Running,
38    /// Process exited with the given code.
39    Exited(i32),
40    /// Process was killed by a signal.
41    Killed,
42    /// Process timed out and was killed.
43    TimedOut,
44}
45
46/// A background exec session.
47pub struct ExecSession {
48    /// Session identifier.
49    pub id: SessionId,
50    /// The command that was executed.
51    pub command: String,
52    /// Working directory.
53    pub working_dir: String,
54    /// When the session started.
55    pub started_at: Instant,
56    /// Timeout duration (if set).
57    pub timeout: Option<Duration>,
58    /// Current status.
59    pub status: SessionStatus,
60    /// Accumulated stdout output.
61    stdout_buffer: Vec<u8>,
62    /// Accumulated stderr output.
63    stderr_buffer: Vec<u8>,
64    /// Combined output (interleaved stdout + stderr for display).
65    combined_output: String,
66    /// Last read position for polling.
67    last_read_pos: usize,
68    /// The child process handle.
69    child: Option<Child>,
70    /// Exit code (set when process exits).
71    exit_code: Option<i32>,
72}
73
74impl ExecSession {
75    /// Create a new session for a running process.
76    pub fn new(
77        command: String,
78        working_dir: String,
79        timeout: Option<Duration>,
80        child: Child,
81    ) -> Self {
82        Self {
83            id: generate_session_id(),
84            command,
85            working_dir,
86            started_at: Instant::now(),
87            timeout,
88            status: SessionStatus::Running,
89            stdout_buffer: Vec::new(),
90            stderr_buffer: Vec::new(),
91            combined_output: String::new(),
92            last_read_pos: 0,
93            child: Some(child),
94            exit_code: None,
95        }
96    }
97
98    /// Check if the process has exceeded its timeout.
99    pub fn is_timed_out(&self) -> bool {
100        if let Some(timeout) = self.timeout {
101            self.started_at.elapsed() > timeout
102        } else {
103            false
104        }
105    }
106
107    /// Get the elapsed time since the session started.
108    pub fn elapsed(&self) -> Duration {
109        self.started_at.elapsed()
110    }
111
112    /// Append output to the combined buffer.
113    pub fn append_output(&mut self, text: &str) {
114        self.combined_output.push_str(text);
115    }
116
117    /// Get new output since the last poll.
118    pub fn poll_output(&mut self) -> &str {
119        let new_output = &self.combined_output[self.last_read_pos..];
120        self.last_read_pos = self.combined_output.len();
121        new_output
122    }
123
124    /// Get the full output log.
125    pub fn full_output(&self) -> &str {
126        &self.combined_output
127    }
128
129    /// Get output with line-based offset and limit.
130    pub fn log_output(&self, offset: Option<usize>, limit: Option<usize>) -> String {
131        let lines: Vec<&str> = self.combined_output.lines().collect();
132        let total = lines.len();
133
134        // If offset is None, grab the last `limit` lines
135        let (start, end) = match (offset, limit) {
136            (None, Some(lim)) => {
137                let start = total.saturating_sub(lim);
138                (start, total)
139            }
140            (Some(off), Some(lim)) => {
141                let start = off.min(total);
142                let end = (start + lim).min(total);
143                (start, end)
144            }
145            (Some(off), None) => {
146                let start = off.min(total);
147                (start, total)
148            }
149            (None, None) => (0, total),
150        };
151
152        lines[start..end].join("\n")
153    }
154
155    /// Try to read any available output from the child process.
156    /// Returns true if any output was read.
157    pub fn try_read_output(&mut self) -> bool {
158        let Some(ref mut child) = self.child else {
159            return false;
160        };
161
162        let mut read_any = false;
163
164        // Try to read from stdout
165        if let Some(ref mut stdout) = child.stdout {
166            let mut buf = [0u8; 4096];
167            // Non-blocking read attempt
168            if let Ok(n) = read_nonblocking(stdout, &mut buf) {
169                if n > 0 {
170                    let text = String::from_utf8_lossy(&buf[..n]);
171                    self.combined_output.push_str(&text);
172                    self.stdout_buffer.extend_from_slice(&buf[..n]);
173                    read_any = true;
174                }
175            }
176        }
177
178        // Try to read from stderr
179        if let Some(ref mut stderr) = child.stderr {
180            let mut buf = [0u8; 4096];
181            if let Ok(n) = read_nonblocking(stderr, &mut buf) {
182                if n > 0 {
183                    let text = String::from_utf8_lossy(&buf[..n]);
184                    self.combined_output.push_str(&text);
185                    self.stderr_buffer.extend_from_slice(&buf[..n]);
186                    read_any = true;
187                }
188            }
189        }
190
191        read_any
192    }
193
194    /// Check if the process has exited and update status.
195    pub fn check_exit(&mut self) -> bool {
196        let Some(ref mut child) = self.child else {
197            return true; // Already exited
198        };
199
200        match child.try_wait() {
201            Ok(Some(status)) => {
202                self.exit_code = status.code();
203                self.status = if let Some(code) = status.code() {
204                    SessionStatus::Exited(code)
205                } else {
206                    SessionStatus::Killed
207                };
208
209                // Read any remaining output
210                self.try_read_output();
211
212                true
213            }
214            Ok(None) => {
215                // Still running, check timeout
216                let timed_out = self.timeout
217                    .map(|t| self.started_at.elapsed() > t)
218                    .unwrap_or(false);
219                if timed_out {
220                    let _ = child.kill();
221                    self.status = SessionStatus::TimedOut;
222                    self.exit_code = None;
223                    return true;
224                }
225                false
226            }
227            Err(_) => {
228                self.status = SessionStatus::Killed;
229                true
230            }
231        }
232    }
233
234    /// Write data to the process stdin.
235    pub fn write_stdin(&mut self, data: &str) -> Result<(), String> {
236        let Some(ref mut child) = self.child else {
237            return Err("Process has exited".to_string());
238        };
239
240        let Some(ref mut stdin) = child.stdin else {
241            return Err("Process stdin not available".to_string());
242        };
243
244        stdin
245            .write_all(data.as_bytes())
246            .map_err(|e| format!("Failed to write to stdin: {}", e))?;
247        stdin
248            .flush()
249            .map_err(|e| format!("Failed to flush stdin: {}", e))?;
250
251        Ok(())
252    }
253
254    /// Translate named keys to escape sequences and write them to stdin.
255    ///
256    /// Supports key names: Enter, Tab, Escape, Space, Backspace,
257    /// Up, Down, Left, Right, Home, End, PageUp, PageDown, Delete, Insert,
258    /// Ctrl-A..Ctrl-Z, Ctrl-C, F1..F12, and plain text.
259    ///
260    /// Multiple keys can be separated by spaces:  `"Enter"`, `"Ctrl-C"`,
261    /// `"Up Up Down Down Left Right"`.
262    pub fn send_keys(&mut self, keys: &str) -> Result<usize, String> {
263        let bytes = translate_keys(keys)?;
264        let len = bytes.len();
265
266        let Some(ref mut child) = self.child else {
267            return Err("Process has exited".to_string());
268        };
269        let Some(ref mut stdin) = child.stdin else {
270            return Err("Process stdin not available".to_string());
271        };
272
273        stdin
274            .write_all(&bytes)
275            .map_err(|e| format!("Failed to send keys: {}", e))?;
276        stdin
277            .flush()
278            .map_err(|e| format!("Failed to flush after send-keys: {}", e))?;
279
280        Ok(len)
281    }
282
283    /// Kill the process.
284    pub fn kill(&mut self) -> Result<(), String> {
285        let Some(ref mut child) = self.child else {
286            return Ok(()); // Already gone
287        };
288
289        child
290            .kill()
291            .map_err(|e| format!("Failed to kill process: {}", e))?;
292
293        self.status = SessionStatus::Killed;
294        Ok(())
295    }
296}
297
298/// Non-blocking read helper (Unix-specific for now).
299#[cfg(unix)]
300fn read_nonblocking<R: Read + std::os::unix::io::AsRawFd>(
301    reader: &mut R,
302    buf: &mut [u8],
303) -> std::io::Result<usize> {
304    let fd = reader.as_raw_fd();
305
306    // Set non-blocking
307    unsafe {
308        let flags = libc::fcntl(fd, libc::F_GETFL);
309        libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
310    }
311
312    let result = reader.read(buf);
313
314    // Restore blocking mode
315    unsafe {
316        let flags = libc::fcntl(fd, libc::F_GETFL);
317        libc::fcntl(fd, libc::F_SETFL, flags & !libc::O_NONBLOCK);
318    }
319
320    match result {
321        Ok(n) => Ok(n),
322        Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(0),
323        Err(e) => Err(e),
324    }
325}
326
327#[cfg(not(unix))]
328fn read_nonblocking<R: Read>(reader: &mut R, buf: &mut [u8]) -> std::io::Result<usize> {
329    // On non-Unix, just try a regular read with a short timeout
330    // This is a simplified fallback
331    Ok(0)
332}
333
334// ── Key translation ─────────────────────────────────────────────────────────
335
336/// Translate a space-separated list of named keys into raw bytes.
337///
338/// Each token is matched case-insensitively against known key names.
339/// Unrecognised tokens are sent as literal UTF-8 text.
340pub fn translate_keys(keys: &str) -> Result<Vec<u8>, String> {
341    let mut out = Vec::new();
342
343    for token in keys.split_whitespace() {
344        match token.to_lowercase().as_str() {
345            // Basic control characters
346            "enter" | "return" | "cr" => out.push(b'\n'),
347            "tab" => out.push(b'\t'),
348            "escape" | "esc" => out.push(0x1b),
349            "space" => out.push(b' '),
350            "backspace" | "bs" => out.push(0x7f),
351            "delete" | "del" => out.extend_from_slice(b"\x1b[3~"),
352            "insert" | "ins" => out.extend_from_slice(b"\x1b[2~"),
353
354            // Arrow keys
355            "up" => out.extend_from_slice(b"\x1b[A"),
356            "down" => out.extend_from_slice(b"\x1b[B"),
357            "right" => out.extend_from_slice(b"\x1b[C"),
358            "left" => out.extend_from_slice(b"\x1b[D"),
359
360            // Navigation
361            "home" => out.extend_from_slice(b"\x1b[H"),
362            "end" => out.extend_from_slice(b"\x1b[F"),
363            "pageup" | "pgup" => out.extend_from_slice(b"\x1b[5~"),
364            "pagedown" | "pgdn" => out.extend_from_slice(b"\x1b[6~"),
365
366            // Function keys
367            "f1" => out.extend_from_slice(b"\x1bOP"),
368            "f2" => out.extend_from_slice(b"\x1bOQ"),
369            "f3" => out.extend_from_slice(b"\x1bOR"),
370            "f4" => out.extend_from_slice(b"\x1bOS"),
371            "f5" => out.extend_from_slice(b"\x1b[15~"),
372            "f6" => out.extend_from_slice(b"\x1b[17~"),
373            "f7" => out.extend_from_slice(b"\x1b[18~"),
374            "f8" => out.extend_from_slice(b"\x1b[19~"),
375            "f9" => out.extend_from_slice(b"\x1b[20~"),
376            "f10" => out.extend_from_slice(b"\x1b[21~"),
377            "f11" => out.extend_from_slice(b"\x1b[23~"),
378            "f12" => out.extend_from_slice(b"\x1b[24~"),
379
380            // Ctrl- combinations
381            other if other.starts_with("ctrl-") || other.starts_with("c-") => {
382                let ch = other.rsplit('-').next().unwrap_or("");
383                if ch.len() == 1 {
384                    let b = ch.as_bytes()[0];
385                    // Ctrl-A = 0x01, Ctrl-Z = 0x1A, Ctrl-[ = 0x1B, etc.
386                    let ctrl = match b {
387                        b'a'..=b'z' => b - b'a' + 1,
388                        b'@' => 0,
389                        b'[' => 0x1b,
390                        b'\\' => 0x1c,
391                        b']' => 0x1d,
392                        b'^' => 0x1e,
393                        b'_' => 0x1f,
394                        _ => return Err(format!("Unknown Ctrl- key: {}", token)),
395                    };
396                    out.push(ctrl);
397                } else {
398                    return Err(format!("Invalid Ctrl- key: {}", token));
399                }
400            }
401
402            // Literal text fallback
403            _ => out.extend_from_slice(token.as_bytes()),
404        }
405    }
406
407    Ok(out)
408}
409
410/// Global process session manager.
411pub struct ProcessManager {
412    sessions: HashMap<SessionId, ExecSession>,
413}
414
415impl ProcessManager {
416    /// Create a new process manager.
417    pub fn new() -> Self {
418        Self {
419            sessions: HashMap::new(),
420        }
421    }
422
423    /// Start a new background process.
424    pub fn spawn(
425        &mut self,
426        command: &str,
427        working_dir: &str,
428        timeout_secs: Option<u64>,
429    ) -> Result<SessionId, String> {
430        let timeout = timeout_secs.map(Duration::from_secs);
431
432        let child = Command::new("sh")
433            .arg("-c")
434            .arg(command)
435            .current_dir(working_dir)
436            .stdin(Stdio::piped())
437            .stdout(Stdio::piped())
438            .stderr(Stdio::piped())
439            .spawn()
440            .map_err(|e| format!("Failed to spawn process: {}", e))?;
441
442        let session = ExecSession::new(
443            command.to_string(),
444            working_dir.to_string(),
445            timeout,
446            child,
447        );
448
449        let id = session.id.clone();
450        self.sessions.insert(id.clone(), session);
451
452        Ok(id)
453    }
454
455    /// Insert an existing session into the manager.
456    pub fn insert(&mut self, session: ExecSession) -> SessionId {
457        let id = session.id.clone();
458        self.sessions.insert(id.clone(), session);
459        id
460    }
461
462    /// Get a session by ID.
463    pub fn get(&self, id: &str) -> Option<&ExecSession> {
464        self.sessions.get(id)
465    }
466
467    /// Get a mutable session by ID.
468    pub fn get_mut(&mut self, id: &str) -> Option<&mut ExecSession> {
469        self.sessions.get_mut(id)
470    }
471
472    /// List all sessions.
473    pub fn list(&self) -> Vec<&ExecSession> {
474        self.sessions.values().collect()
475    }
476
477    /// List active (running) sessions.
478    pub fn list_active(&self) -> Vec<&ExecSession> {
479        self.sessions
480            .values()
481            .filter(|s| s.status == SessionStatus::Running)
482            .collect()
483    }
484
485    /// Remove a session.
486    pub fn remove(&mut self, id: &str) -> Option<ExecSession> {
487        self.sessions.remove(id)
488    }
489
490    /// Poll all sessions for updates.
491    pub fn poll_all(&mut self) {
492        for session in self.sessions.values_mut() {
493            if session.status == SessionStatus::Running {
494                session.try_read_output();
495                session.check_exit();
496            }
497        }
498    }
499
500    /// Clear completed sessions.
501    pub fn clear_completed(&mut self) {
502        self.sessions.retain(|_, s| s.status == SessionStatus::Running);
503    }
504}
505
506impl Default for ProcessManager {
507    fn default() -> Self {
508        Self::new()
509    }
510}
511
512/// Thread-safe process manager.
513pub type SharedProcessManager = Arc<Mutex<ProcessManager>>;
514
515/// Create a new shared process manager.
516pub fn new_shared_manager() -> SharedProcessManager {
517    Arc::new(Mutex::new(ProcessManager::new()))
518}
519
520#[cfg(test)]
521mod tests {
522    use super::*;
523
524    #[test]
525    fn test_session_id_generation() {
526        let id1 = generate_session_id();
527        let id2 = generate_session_id();
528        // IDs contain a hyphen
529        assert!(id1.contains('-'));
530        assert!(id2.contains('-'));
531    }
532
533    #[test]
534    fn test_process_manager_creation() {
535        let manager = ProcessManager::new();
536        assert!(manager.list().is_empty());
537    }
538
539    #[test]
540    fn test_log_output_with_limits() {
541        let session = ExecSession {
542            id: "test".to_string(),
543            command: "echo test".to_string(),
544            working_dir: "/tmp".to_string(),
545            started_at: Instant::now(),
546            timeout: None,
547            status: SessionStatus::Running,
548            stdout_buffer: Vec::new(),
549            stderr_buffer: Vec::new(),
550            combined_output: "line1\nline2\nline3\nline4\nline5\n".to_string(),
551            last_read_pos: 0,
552            child: None,
553            exit_code: None,
554        };
555
556        // Get last 2 lines
557        let output = session.log_output(None, Some(2));
558        assert_eq!(output, "line4\nline5");
559
560        // Get lines 1-3 (0-indexed offset)
561        let output = session.log_output(Some(1), Some(2));
562        assert_eq!(output, "line2\nline3");
563    }
564}