Skip to main content

rustyclaw_core/
process_manager.rs

1//! Background process session management for RustyClaw.
2//!
3//! Provides a registry of background exec sessions that can be polled,
4//! written to, and killed by the agent.
5//!
6//! Uses Tokio's async process handling for cross-platform non-blocking I/O,
7//! but exposes a sync API for compatibility with the sync tool execute interface.
8
9use std::collections::HashMap;
10use std::io::Write;
11use std::process::{Child, Command, Stdio};
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14
15/// Unique identifier for a background session.
16pub type SessionId = String;
17
18/// Generate a short human-readable session ID.
19fn generate_session_id() -> SessionId {
20    use std::time::{SystemTime, UNIX_EPOCH};
21    let timestamp = SystemTime::now()
22        .duration_since(UNIX_EPOCH)
23        .unwrap_or_default()
24        .as_millis();
25
26    // Simple adjective-noun pattern for readability
27    let adjectives = [
28        "warm", "cool", "swift", "calm", "bold", "keen", "bright", "quick",
29    ];
30    let nouns = ["rook", "hawk", "wolf", "bear", "fox", "owl", "lynx", "crow"];
31
32    let adj_idx = (timestamp % adjectives.len() as u128) as usize;
33    let noun_idx = ((timestamp / 8) % nouns.len() as u128) as usize;
34
35    format!("{}-{}", adjectives[adj_idx], nouns[noun_idx])
36}
37
38/// Status of a background session.
39#[derive(Debug, Clone, PartialEq)]
40pub enum SessionStatus {
41    /// Process is still running.
42    Running,
43    /// Process exited with the given code.
44    Exited(i32),
45    /// Process was killed by a signal.
46    Killed,
47    /// Process timed out and was killed.
48    TimedOut,
49}
50
51/// A background exec session.
52pub struct ExecSession {
53    /// Session identifier.
54    pub id: SessionId,
55    /// The command that was executed.
56    pub command: String,
57    /// Working directory.
58    pub working_dir: String,
59    /// When the session started.
60    pub started_at: Instant,
61    /// Timeout duration (if set).
62    pub timeout: Option<Duration>,
63    /// Current status.
64    pub status: SessionStatus,
65    /// Accumulated stdout output.
66    stdout_buffer: Vec<u8>,
67    /// Accumulated stderr output.
68    stderr_buffer: Vec<u8>,
69    /// Combined output (interleaved stdout + stderr for display).
70    combined_output: String,
71    /// Last read position for polling.
72    last_read_pos: usize,
73    /// The child process handle.
74    child: Option<Child>,
75    /// Exit code (set when process exits).
76    exit_code: Option<i32>,
77}
78
79impl ExecSession {
80    /// Create a new session for a running process.
81    pub fn new(
82        command: String,
83        working_dir: String,
84        timeout: Option<Duration>,
85        child: Child,
86    ) -> Self {
87        Self {
88            id: generate_session_id(),
89            command,
90            working_dir,
91            started_at: Instant::now(),
92            timeout,
93            status: SessionStatus::Running,
94            stdout_buffer: Vec::new(),
95            stderr_buffer: Vec::new(),
96            combined_output: String::new(),
97            last_read_pos: 0,
98            child: Some(child),
99            exit_code: None,
100        }
101    }
102
103    /// Check if the process has exceeded its timeout.
104    pub fn is_timed_out(&self) -> bool {
105        if let Some(timeout) = self.timeout {
106            self.started_at.elapsed() > timeout
107        } else {
108            false
109        }
110    }
111
112    /// Get the elapsed time since the session started.
113    pub fn elapsed(&self) -> Duration {
114        self.started_at.elapsed()
115    }
116
117    /// Append output to the combined buffer.
118    pub fn append_output(&mut self, text: &str) {
119        self.combined_output.push_str(text);
120    }
121
122    /// Get new output since the last poll.
123    pub fn poll_output(&mut self) -> &str {
124        let new_output = &self.combined_output[self.last_read_pos..];
125        self.last_read_pos = self.combined_output.len();
126        new_output
127    }
128
129    /// Get the full output log.
130    pub fn full_output(&self) -> &str {
131        &self.combined_output
132    }
133
134    /// Get output with line-based offset and limit.
135    pub fn log_output(&self, offset: Option<usize>, limit: Option<usize>) -> String {
136        let lines: Vec<&str> = self.combined_output.lines().collect();
137        let total = lines.len();
138
139        // If offset is None, grab the last `limit` lines
140        let (start, end) = match (offset, limit) {
141            (None, Some(lim)) => {
142                let start = total.saturating_sub(lim);
143                (start, total)
144            }
145            (Some(off), Some(lim)) => {
146                let start = off.min(total);
147                let end = (start + lim).min(total);
148                (start, end)
149            }
150            (Some(off), None) => {
151                let start = off.min(total);
152                (start, total)
153            }
154            (None, None) => (0, total),
155        };
156
157        lines[start..end].join("\n")
158    }
159
160    /// Try to read any available output from the child process.
161    /// Returns true if any output was read.
162    ///
163    /// Uses platform-specific non-blocking I/O.
164    pub fn try_read_output(&mut self) -> bool {
165        let Some(ref mut child) = self.child else {
166            return false;
167        };
168
169        let mut read_any = false;
170
171        // Try to read from stdout
172        if let Some(ref mut stdout) = child.stdout {
173            let mut buf = [0u8; 4096];
174            if let Ok(n) = read_nonblocking(stdout, &mut buf) {
175                if n > 0 {
176                    let text = String::from_utf8_lossy(&buf[..n]);
177                    self.combined_output.push_str(&text);
178                    self.stdout_buffer.extend_from_slice(&buf[..n]);
179                    read_any = true;
180                }
181            }
182        }
183
184        // Try to read from stderr
185        if let Some(ref mut stderr) = child.stderr {
186            let mut buf = [0u8; 4096];
187            if let Ok(n) = read_nonblocking(stderr, &mut buf) {
188                if n > 0 {
189                    let text = String::from_utf8_lossy(&buf[..n]);
190                    self.combined_output.push_str(&text);
191                    self.stderr_buffer.extend_from_slice(&buf[..n]);
192                    read_any = true;
193                }
194            }
195        }
196
197        read_any
198    }
199
200    /// Check if the process has exited and update status.
201    pub fn check_exit(&mut self) -> bool {
202        let Some(ref mut child) = self.child else {
203            return true; // Already exited
204        };
205
206        match child.try_wait() {
207            Ok(Some(status)) => {
208                self.exit_code = status.code();
209                self.status = if let Some(code) = status.code() {
210                    SessionStatus::Exited(code)
211                } else {
212                    SessionStatus::Killed
213                };
214
215                // Read any remaining output
216                self.try_read_output();
217
218                true
219            }
220            Ok(None) => {
221                // Still running, check timeout
222                let timed_out = self
223                    .timeout
224                    .map(|t| self.started_at.elapsed() > t)
225                    .unwrap_or(false);
226                if timed_out {
227                    let _ = child.kill();
228                    self.status = SessionStatus::TimedOut;
229                    self.exit_code = None;
230                    return true;
231                }
232                false
233            }
234            Err(_) => {
235                self.status = SessionStatus::Killed;
236                true
237            }
238        }
239    }
240
241    /// Write data to the process stdin.
242    pub fn write_stdin(&mut self, data: &str) -> Result<(), String> {
243        let Some(ref mut child) = self.child else {
244            return Err("Process has exited".to_string());
245        };
246
247        let Some(ref mut stdin) = child.stdin else {
248            return Err("Process stdin not available".to_string());
249        };
250
251        stdin
252            .write_all(data.as_bytes())
253            .map_err(|e| format!("Failed to write to stdin: {}", e))?;
254        stdin
255            .flush()
256            .map_err(|e| format!("Failed to flush stdin: {}", e))?;
257
258        Ok(())
259    }
260
261    /// Translate named keys to escape sequences and write them to stdin.
262    ///
263    /// Supports key names: Enter, Tab, Escape, Space, Backspace,
264    /// Up, Down, Left, Right, Home, End, PageUp, PageDown, Delete, Insert,
265    /// Ctrl-A..Ctrl-Z, Ctrl-C, F1..F12, and plain text.
266    ///
267    /// Multiple keys can be separated by spaces:  `"Enter"`, `"Ctrl-C"`,
268    /// `"Up Up Down Down Left Right"`.
269    pub fn send_keys(&mut self, keys: &str) -> Result<usize, String> {
270        let bytes = translate_keys(keys)?;
271        let len = bytes.len();
272
273        let Some(ref mut child) = self.child else {
274            return Err("Process has exited".to_string());
275        };
276        let Some(ref mut stdin) = child.stdin else {
277            return Err("Process stdin not available".to_string());
278        };
279
280        stdin
281            .write_all(&bytes)
282            .map_err(|e| format!("Failed to send keys: {}", e))?;
283        stdin
284            .flush()
285            .map_err(|e| format!("Failed to flush after send-keys: {}", e))?;
286
287        Ok(len)
288    }
289
290    /// Kill the process.
291    pub fn kill(&mut self) -> Result<(), String> {
292        let Some(ref mut child) = self.child else {
293            return Ok(()); // Already gone
294        };
295
296        child
297            .kill()
298            .map_err(|e| format!("Failed to kill process: {}", e))?;
299
300        self.status = SessionStatus::Killed;
301        Ok(())
302    }
303}
304
305// ── Non-blocking read helpers ───────────────────────────────────────────────
306
307/// Non-blocking read helper for Unix using poll().
308#[cfg(unix)]
309fn read_nonblocking<R: std::io::Read + std::os::unix::io::AsRawFd>(
310    reader: &mut R,
311    buf: &mut [u8],
312) -> std::io::Result<usize> {
313    let fd = reader.as_raw_fd();
314
315    // Use poll() to check if data is available (more portable than fcntl tricks)
316    let mut poll_fd = libc::pollfd {
317        fd,
318        events: libc::POLLIN,
319        revents: 0,
320    };
321
322    // Poll with 0 timeout (non-blocking check)
323    let ready = unsafe { libc::poll(&mut poll_fd, 1, 0) };
324
325    if ready > 0 && (poll_fd.revents & libc::POLLIN) != 0 {
326        // Data available, do a regular read
327        reader.read(buf)
328    } else {
329        // No data available
330        Ok(0)
331    }
332}
333
334/// Non-blocking read helper for Windows using PeekNamedPipe.
335#[cfg(windows)]
336fn read_nonblocking<R: std::io::Read + std::os::windows::io::AsRawHandle>(
337    reader: &mut R,
338    buf: &mut [u8],
339) -> std::io::Result<usize> {
340    use std::os::windows::io::AsRawHandle;
341    use windows_sys::Win32::Foundation::HANDLE;
342    use windows_sys::Win32::System::Pipes::PeekNamedPipe;
343
344    let handle = reader.as_raw_handle() as HANDLE;
345    let mut available: u32 = 0;
346
347    // Check if data is available without blocking
348    let result = unsafe {
349        PeekNamedPipe(
350            handle,
351            std::ptr::null_mut(),
352            0,
353            std::ptr::null_mut(),
354            &mut available,
355            std::ptr::null_mut(),
356        )
357    };
358
359    if result != 0 && available > 0 {
360        // Data available, do a regular read
361        let to_read = (available as usize).min(buf.len());
362        reader.read(&mut buf[..to_read])
363    } else {
364        // No data available or error (treat as no data)
365        Ok(0)
366    }
367}
368
369/// Fallback for platforms without specific implementation.
370#[cfg(not(any(unix, windows)))]
371fn read_nonblocking<R: std::io::Read>(_reader: &mut R, _buf: &mut [u8]) -> std::io::Result<usize> {
372    // Can't do non-blocking reads on unknown platform
373    Ok(0)
374}
375
376// ── Key translation ─────────────────────────────────────────────────────────
377
378/// Translate a space-separated list of named keys into raw bytes.
379///
380/// Each token is matched case-insensitively against known key names.
381/// Unrecognised tokens are sent as literal UTF-8 text.
382pub fn translate_keys(keys: &str) -> Result<Vec<u8>, String> {
383    let mut out = Vec::new();
384
385    for token in keys.split_whitespace() {
386        match token.to_lowercase().as_str() {
387            // Basic control characters
388            "enter" | "return" | "cr" => out.push(b'\n'),
389            "tab" => out.push(b'\t'),
390            "escape" | "esc" => out.push(0x1b),
391            "space" => out.push(b' '),
392            "backspace" | "bs" => out.push(0x7f),
393            "delete" | "del" => out.extend_from_slice(b"\x1b[3~"),
394            "insert" | "ins" => out.extend_from_slice(b"\x1b[2~"),
395
396            // Arrow keys
397            "up" => out.extend_from_slice(b"\x1b[A"),
398            "down" => out.extend_from_slice(b"\x1b[B"),
399            "right" => out.extend_from_slice(b"\x1b[C"),
400            "left" => out.extend_from_slice(b"\x1b[D"),
401
402            // Navigation
403            "home" => out.extend_from_slice(b"\x1b[H"),
404            "end" => out.extend_from_slice(b"\x1b[F"),
405            "pageup" | "pgup" => out.extend_from_slice(b"\x1b[5~"),
406            "pagedown" | "pgdn" => out.extend_from_slice(b"\x1b[6~"),
407
408            // Function keys
409            "f1" => out.extend_from_slice(b"\x1bOP"),
410            "f2" => out.extend_from_slice(b"\x1bOQ"),
411            "f3" => out.extend_from_slice(b"\x1bOR"),
412            "f4" => out.extend_from_slice(b"\x1bOS"),
413            "f5" => out.extend_from_slice(b"\x1b[15~"),
414            "f6" => out.extend_from_slice(b"\x1b[17~"),
415            "f7" => out.extend_from_slice(b"\x1b[18~"),
416            "f8" => out.extend_from_slice(b"\x1b[19~"),
417            "f9" => out.extend_from_slice(b"\x1b[20~"),
418            "f10" => out.extend_from_slice(b"\x1b[21~"),
419            "f11" => out.extend_from_slice(b"\x1b[23~"),
420            "f12" => out.extend_from_slice(b"\x1b[24~"),
421
422            // Ctrl- combinations
423            other if other.starts_with("ctrl-") || other.starts_with("c-") => {
424                let ch = other.rsplit('-').next().unwrap_or("");
425                if ch.len() == 1 {
426                    let b = ch.as_bytes()[0];
427                    // Ctrl-A = 0x01, Ctrl-Z = 0x1A, Ctrl-[ = 0x1B, etc.
428                    let ctrl = match b {
429                        b'a'..=b'z' => b - b'a' + 1,
430                        b'@' => 0,
431                        b'[' => 0x1b,
432                        b'\\' => 0x1c,
433                        b']' => 0x1d,
434                        b'^' => 0x1e,
435                        b'_' => 0x1f,
436                        _ => return Err(format!("Unknown Ctrl- key: {}", token)),
437                    };
438                    out.push(ctrl);
439                } else {
440                    return Err(format!("Invalid Ctrl- key: {}", token));
441                }
442            }
443
444            // Literal text fallback
445            _ => out.extend_from_slice(token.as_bytes()),
446        }
447    }
448
449    Ok(out)
450}
451
452/// Global process session manager.
453pub struct ProcessManager {
454    sessions: HashMap<SessionId, ExecSession>,
455}
456
457impl ProcessManager {
458    /// Create a new process manager.
459    pub fn new() -> Self {
460        Self {
461            sessions: HashMap::new(),
462        }
463    }
464
465    /// Start a new background process.
466    pub fn spawn(
467        &mut self,
468        command: &str,
469        working_dir: &str,
470        timeout_secs: Option<u64>,
471    ) -> Result<SessionId, String> {
472        let timeout = timeout_secs.map(Duration::from_secs);
473
474        // Use platform-appropriate shell
475        #[cfg(unix)]
476        let child = Command::new("sh")
477            .arg("-c")
478            .arg(command)
479            .current_dir(working_dir)
480            .stdin(Stdio::piped())
481            .stdout(Stdio::piped())
482            .stderr(Stdio::piped())
483            .spawn()
484            .map_err(|e| format!("Failed to spawn process: {}", e))?;
485
486        #[cfg(windows)]
487        let child = Command::new("cmd")
488            .arg("/C")
489            .arg(command)
490            .current_dir(working_dir)
491            .stdin(Stdio::piped())
492            .stdout(Stdio::piped())
493            .stderr(Stdio::piped())
494            .spawn()
495            .map_err(|e| format!("Failed to spawn process: {}", e))?;
496
497        #[cfg(not(any(unix, windows)))]
498        let child = Command::new("sh")
499            .arg("-c")
500            .arg(command)
501            .current_dir(working_dir)
502            .stdin(Stdio::piped())
503            .stdout(Stdio::piped())
504            .stderr(Stdio::piped())
505            .spawn()
506            .map_err(|e| format!("Failed to spawn process: {}", e))?;
507
508        let session =
509            ExecSession::new(command.to_string(), working_dir.to_string(), timeout, child);
510
511        let id = session.id.clone();
512        self.sessions.insert(id.clone(), session);
513
514        Ok(id)
515    }
516
517    /// Insert an existing session into the manager.
518    pub fn insert(&mut self, session: ExecSession) -> SessionId {
519        let id = session.id.clone();
520        self.sessions.insert(id.clone(), session);
521        id
522    }
523
524    /// Get a session by ID.
525    pub fn get(&self, id: &str) -> Option<&ExecSession> {
526        self.sessions.get(id)
527    }
528
529    /// Get a mutable session by ID.
530    pub fn get_mut(&mut self, id: &str) -> Option<&mut ExecSession> {
531        self.sessions.get_mut(id)
532    }
533
534    /// List all sessions.
535    pub fn list(&self) -> Vec<&ExecSession> {
536        self.sessions.values().collect()
537    }
538
539    /// List active (running) sessions.
540    pub fn list_active(&self) -> Vec<&ExecSession> {
541        self.sessions
542            .values()
543            .filter(|s| s.status == SessionStatus::Running)
544            .collect()
545    }
546
547    /// Remove a session.
548    pub fn remove(&mut self, id: &str) -> Option<ExecSession> {
549        self.sessions.remove(id)
550    }
551
552    /// Poll all sessions for updates.
553    pub fn poll_all(&mut self) {
554        for session in self.sessions.values_mut() {
555            if session.status == SessionStatus::Running {
556                session.try_read_output();
557                session.check_exit();
558            }
559        }
560    }
561
562    /// Clear completed sessions.
563    pub fn clear_completed(&mut self) {
564        self.sessions
565            .retain(|_, s| s.status == SessionStatus::Running);
566    }
567}
568
569impl Default for ProcessManager {
570    fn default() -> Self {
571        Self::new()
572    }
573}
574
575/// Thread-safe process manager.
576pub type SharedProcessManager = Arc<Mutex<ProcessManager>>;
577
578/// Create a new shared process manager.
579pub fn new_shared_manager() -> SharedProcessManager {
580    Arc::new(Mutex::new(ProcessManager::new()))
581}
582
583#[cfg(test)]
584mod tests {
585    use super::*;
586
587    #[test]
588    fn test_session_id_generation() {
589        let id1 = generate_session_id();
590        let id2 = generate_session_id();
591        // IDs contain a hyphen
592        assert!(id1.contains('-'));
593        assert!(id2.contains('-'));
594    }
595
596    #[test]
597    fn test_process_manager_creation() {
598        let manager = ProcessManager::new();
599        assert!(manager.list().is_empty());
600    }
601
602    #[test]
603    fn test_log_output_with_limits() {
604        let session = ExecSession {
605            id: "test".to_string(),
606            command: "echo test".to_string(),
607            working_dir: "/tmp".to_string(),
608            started_at: Instant::now(),
609            timeout: None,
610            status: SessionStatus::Running,
611            stdout_buffer: Vec::new(),
612            stderr_buffer: Vec::new(),
613            combined_output: "line1\nline2\nline3\nline4\nline5\n".to_string(),
614            last_read_pos: 0,
615            child: None,
616            exit_code: None,
617        };
618
619        // Get last 2 lines
620        let output = session.log_output(None, Some(2));
621        assert_eq!(output, "line4\nline5");
622
623        // Get lines 1-3 (0-indexed offset)
624        let output = session.log_output(Some(1), Some(2));
625        assert_eq!(output, "line2\nline3");
626    }
627
628    #[test]
629    fn test_translate_keys_basic() {
630        let keys = translate_keys("Enter").unwrap();
631        assert_eq!(keys, vec![b'\n']);
632
633        let keys = translate_keys("Ctrl-C").unwrap();
634        assert_eq!(keys, vec![3]); // 0x03
635
636        let keys = translate_keys("Up Down Left Right").unwrap();
637        assert_eq!(keys, b"\x1b[A\x1b[B\x1b[D\x1b[C".to_vec());
638    }
639
640    #[test]
641    fn test_translate_keys_literal() {
642        let keys = translate_keys("hello").unwrap();
643        assert_eq!(keys, b"hello".to_vec());
644    }
645
646    #[test]
647    #[cfg(unix)]
648    fn test_spawn_and_poll() {
649        let mut manager = ProcessManager::new();
650        let id = manager.spawn("echo hello", "/tmp", None).unwrap();
651
652        // Give it a moment
653        std::thread::sleep(Duration::from_millis(100));
654
655        // Poll for updates
656        manager.poll_all();
657
658        let session = manager.get(&id).unwrap();
659        assert!(session.full_output().contains("hello"));
660    }
661}