Skip to main content

synaps_cli/tools/shell/
pty.rs

1//! PTY abstraction — spawn processes on a pseudo-terminal, async read/write.
2//!
3//! Wraps `portable-pty` to provide an async-friendly handle with:
4//! - Spawning commands on a PTY (with cwd, env, size)
5//! - Non-blocking reads via a background reader thread + mpsc channel
6//! - Synchronous writes to the PTY master
7//! - Resize support
8//! - Alive-check and graceful cleanup on Drop
9
10use std::collections::HashMap;
11use std::io::Write;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use portable_pty::{CommandBuilder, MasterPty, PtySize, native_pty_system, Child, ChildKiller};
17use tokio::sync::mpsc;
18use tokio::task::JoinHandle;
19
20use crate::{Result, RuntimeError};
21
22/// Async-friendly wrapper around a PTY master/child pair.
23///
24/// The reader runs on a blocking Tokio thread and pushes raw byte chunks
25/// into an unbounded mpsc channel. Consumers drain the channel via
26/// `try_read_output()`.
27pub struct PtyHandle {
28    /// Master PTY — retained for resize operations.
29    master: Box<dyn MasterPty + Send>,
30    /// Writer end of the PTY master (bytes written here reach the child's stdin).
31    writer: Box<dyn Write + Send>,
32    /// Handle to the blocking reader task (for cleanup tracking).
33    _reader_task: JoinHandle<()>,
34    /// Receiving end of the output channel fed by the reader task.
35    output_rx: mpsc::UnboundedReceiver<Vec<u8>>,
36    /// Child process handle — used for try_wait / kill.
37    child: Box<dyn Child + Send + Sync>,
38    /// Cached alive flag — once the child exits, this stays false.
39    alive: Arc<AtomicBool>,
40    /// Separate killer handle so Drop can kill even if child is borrowed.
41    killer: Box<dyn ChildKiller + Send + Sync>,
42}
43
44impl PtyHandle {
45    /// Spawn a command on a new PTY.
46    ///
47    /// # Arguments
48    /// * `command` — the program (and optional arguments) to run, e.g. `"bash"` or `"ssh user@host"`.
49    /// * `working_dir` — optional working directory for the child process.
50    /// * `env` — additional environment variables (merged on top of inherited env).
51    /// * `rows` / `cols` — initial terminal dimensions.
52    pub fn spawn(
53        command: &str,
54        working_dir: Option<&str>,
55        env: HashMap<String, String>,
56        rows: u16,
57        cols: u16,
58    ) -> Result<Self> {
59        // 1. Open a PTY pair with the requested size.
60        let pty_system = native_pty_system();
61        let pair = pty_system
62            .openpty(PtySize {
63                rows,
64                cols,
65                pixel_width: 0,
66                pixel_height: 0,
67            })
68            .map_err(|e| RuntimeError::Tool(format!("Failed to open PTY: {e}")))?;
69
70        // 2. Build the command.
71        //    We split on whitespace for simple cases ("bash -l", "ssh user@host").
72        let parts: Vec<&str> = command.split_whitespace().collect();
73        let program = parts
74            .first()
75            .ok_or_else(|| RuntimeError::Tool("Empty command string".to_string()))?;
76        let mut cmd = CommandBuilder::new(program);
77        for arg in parts.iter().skip(1) {
78            cmd.arg(arg);
79        }
80
81        // Set working directory if provided.
82        if let Some(dir) = working_dir {
83            cmd.cwd(dir);
84        }
85
86        // Inject environment variables; always set TERM.
87        cmd.env("TERM", "xterm-256color");
88        for (k, v) in &env {
89            cmd.env(k, v);
90        }
91
92        // 3. Spawn the child on the slave side.
93        let child = pair
94            .slave
95            .spawn_command(cmd)
96            .map_err(|e| RuntimeError::Tool(format!("Failed to spawn command: {e}")))?;
97
98        // Drop the slave — the child process owns its end now.
99        drop(pair.slave);
100
101        // 4. Obtain writer and reader from the master.
102        let writer = pair
103            .master
104            .take_writer()
105            .map_err(|e| RuntimeError::Tool(format!("Failed to take PTY writer: {e}")))?;
106
107        let mut reader = pair
108            .master
109            .try_clone_reader()
110            .map_err(|e| RuntimeError::Tool(format!("Failed to clone PTY reader: {e}")))?;
111
112        // 5. Spawn a blocking reader task that pushes chunks into the channel.
113        let (output_tx, output_rx) = mpsc::unbounded_channel::<Vec<u8>>();
114        let alive = Arc::new(AtomicBool::new(true));
115        let reader_alive = alive.clone();
116
117        let reader_task = tokio::task::spawn_blocking(move || {
118            let mut buf = [0u8; 4096];
119            loop {
120                match reader.read(&mut buf) {
121                    Ok(0) => {
122                        // EOF — child closed its side.
123                        break;
124                    }
125                    Ok(n) => {
126                        if output_tx.send(buf[..n].to_vec()).is_err() {
127                            // Receiver dropped — no one is listening anymore.
128                            break;
129                        }
130                    }
131                    Err(_) => {
132                        // Read error (child exited, fd closed, etc.) — exit cleanly.
133                        break;
134                    }
135                }
136            }
137            reader_alive.store(false, Ordering::SeqCst);
138        });
139
140        // 6. Clone a killer for Drop usage.
141        let killer = child.clone_killer();
142
143        Ok(PtyHandle {
144            master: pair.master,
145            writer,
146            _reader_task: reader_task,
147            output_rx,
148            child,
149            alive,
150            killer,
151        })
152    }
153
154    /// Write raw bytes to the PTY (reaches the child's stdin).
155    pub fn write(&mut self, input: &[u8]) -> Result<()> {
156        self.writer
157            .write_all(input)
158            .map_err(|e| RuntimeError::Tool(format!("PTY write failed: {e}")))?;
159        self.writer
160            .flush()
161            .map_err(|e| RuntimeError::Tool(format!("PTY flush failed: {e}")))?;
162        Ok(())
163    }
164
165    /// Read all available output from the PTY, waiting up to `timeout` for data.
166    ///
167    /// Behavior:
168    /// 1. Drain everything currently in the channel (non-blocking).
169    /// 2. If nothing was found, wait up to `timeout` for the first chunk.
170    /// 3. After getting something (or timing out), drain any remaining buffered data.
171    ///
172    /// Returns an empty `Vec` if no data arrived within the timeout.
173    pub async fn try_read_output(&mut self, timeout: Duration) -> Vec<u8> {
174        let mut collected = Vec::new();
175
176        // Phase 1: non-blocking drain of everything already queued.
177        while let Ok(chunk) = self.output_rx.try_recv() {
178            collected.extend_from_slice(&chunk);
179        }
180
181        // Phase 2: if we got nothing, wait up to `timeout` for at least one chunk.
182        if collected.is_empty() {
183            match tokio::time::timeout(timeout, self.output_rx.recv()).await {
184                Ok(Some(chunk)) => {
185                    collected.extend_from_slice(&chunk);
186                }
187                Ok(None) | Err(_) => {
188                    // Channel closed or timeout — return whatever we have (empty).
189                    return collected;
190                }
191            }
192
193            // Phase 3: drain any additional chunks that arrived while we waited.
194            while let Ok(chunk) = self.output_rx.try_recv() {
195                collected.extend_from_slice(&chunk);
196            }
197        }
198
199        collected
200    }
201
202    /// Resize the PTY to new dimensions.
203    pub fn resize(&self, rows: u16, cols: u16) -> Result<()> {
204        self.master
205            .resize(PtySize {
206                rows,
207                cols,
208                pixel_width: 0,
209                pixel_height: 0,
210            })
211            .map_err(|e| RuntimeError::Tool(format!("PTY resize failed: {e}")))
212    }
213
214    /// Check whether the child process is still running.
215    ///
216    /// Once the child exits, subsequent calls return `false` without syscalls.
217    pub fn is_alive(&mut self) -> bool {
218        if !self.alive.load(Ordering::SeqCst) {
219            return false;
220        }
221        match self.child.try_wait() {
222            Ok(Some(_status)) => {
223                // Child exited.
224                self.alive.store(false, Ordering::SeqCst);
225                false
226            }
227            Ok(None) => true,
228            Err(_) => {
229                // If we can't query, assume dead.
230                self.alive.store(false, Ordering::SeqCst);
231                false
232            }
233        }
234    }
235}
236
237impl Drop for PtyHandle {
238    fn drop(&mut self) {
239        if self.alive.load(Ordering::SeqCst) {
240            let _ = self.killer.kill();
241        }
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248    use std::collections::HashMap;
249
250    #[tokio::test]
251    async fn test_spawn_echo_hello() {
252        let mut handle = PtyHandle::spawn(
253            "echo hello",
254            None,
255            HashMap::new(),
256            24,
257            80,
258        )
259        .expect("failed to spawn echo");
260
261        // Give the process time to produce output and exit.
262        let output = handle
263            .try_read_output(Duration::from_secs(3))
264            .await;
265
266        let text = String::from_utf8_lossy(&output);
267        assert!(
268            text.contains("hello"),
269            "expected 'hello' in output, got: {text:?}"
270        );
271    }
272
273    #[tokio::test]
274    async fn test_cat_echo_back() {
275        let mut handle = PtyHandle::spawn(
276            "cat",
277            None,
278            HashMap::new(),
279            24,
280            80,
281        )
282        .expect("failed to spawn cat");
283
284        // Write input — cat will echo it back via the PTY.
285        handle.write(b"test\n").expect("write failed");
286
287        let output = handle
288            .try_read_output(Duration::from_secs(3))
289            .await;
290
291        let text = String::from_utf8_lossy(&output);
292        assert!(
293            text.contains("test"),
294            "expected 'test' in output, got: {text:?}"
295        );
296    }
297
298    #[tokio::test]
299    async fn test_exit_code_detection() {
300        let mut handle = PtyHandle::spawn(
301            "bash -c exit 42",
302            None,
303            HashMap::new(),
304            24,
305            80,
306        )
307        .expect("failed to spawn bash exit");
308
309        // Wait for the process to finish — read until EOF / timeout.
310        let _ = handle
311            .try_read_output(Duration::from_secs(3))
312            .await;
313
314        // Small additional delay to let try_wait catch up.
315        tokio::time::sleep(Duration::from_millis(200)).await;
316
317        assert!(
318            !handle.is_alive(),
319            "expected process to have exited"
320        );
321    }
322}