synaps_cli/tools/shell/pty.rs
1//! PTY abstraction — spawn processes on a pseudo-terminal, async read/write.
2//!
3//! Wraps `portable-pty` to provide an async-friendly handle with:
4//! - Spawning commands on a PTY (with cwd, env, size)
5//! - Non-blocking reads via a background reader thread + mpsc channel
6//! - Synchronous writes to the PTY master
7//! - Resize support
8//! - Alive-check and graceful cleanup on Drop
9
10use std::collections::HashMap;
11use std::io::Write;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use portable_pty::{CommandBuilder, MasterPty, PtySize, native_pty_system, Child, ChildKiller};
17use tokio::sync::mpsc;
18use tokio::task::JoinHandle;
19
20use crate::{Result, RuntimeError};
21
22/// Async-friendly wrapper around a PTY master/child pair.
23///
24/// The reader runs on a blocking Tokio thread and pushes raw byte chunks
25/// into an unbounded mpsc channel. Consumers drain the channel via
26/// `try_read_output()`.
27pub struct PtyHandle {
28 /// Master PTY — retained for resize operations.
29 master: Box<dyn MasterPty + Send>,
30 /// Writer end of the PTY master (bytes written here reach the child's stdin).
31 writer: Box<dyn Write + Send>,
32 /// Handle to the blocking reader task (for cleanup tracking).
33 _reader_task: JoinHandle<()>,
34 /// Receiving end of the output channel fed by the reader task.
35 output_rx: mpsc::UnboundedReceiver<Vec<u8>>,
36 /// Child process handle — used for try_wait / kill.
37 child: Box<dyn Child + Send + Sync>,
38 /// Cached alive flag — once the child exits, this stays false.
39 alive: Arc<AtomicBool>,
40 /// Separate killer handle so Drop can kill even if child is borrowed.
41 killer: Box<dyn ChildKiller + Send + Sync>,
42}
43
44impl PtyHandle {
45 /// Spawn a command on a new PTY.
46 ///
47 /// # Arguments
48 /// * `command` — the program (and optional arguments) to run, e.g. `"bash"` or `"ssh user@host"`.
49 /// * `working_dir` — optional working directory for the child process.
50 /// * `env` — additional environment variables (merged on top of inherited env).
51 /// * `rows` / `cols` — initial terminal dimensions.
52 pub fn spawn(
53 command: &str,
54 working_dir: Option<&str>,
55 env: HashMap<String, String>,
56 rows: u16,
57 cols: u16,
58 ) -> Result<Self> {
59 // 1. Open a PTY pair with the requested size.
60 let pty_system = native_pty_system();
61 let pair = pty_system
62 .openpty(PtySize {
63 rows,
64 cols,
65 pixel_width: 0,
66 pixel_height: 0,
67 })
68 .map_err(|e| RuntimeError::Tool(format!("Failed to open PTY: {e}")))?;
69
70 // 2. Build the command.
71 // We split on whitespace for simple cases ("bash -l", "ssh user@host").
72 let parts: Vec<&str> = command.split_whitespace().collect();
73 let program = parts
74 .first()
75 .ok_or_else(|| RuntimeError::Tool("Empty command string".to_string()))?;
76 let mut cmd = CommandBuilder::new(program);
77 for arg in parts.iter().skip(1) {
78 cmd.arg(arg);
79 }
80
81 // Set working directory if provided.
82 if let Some(dir) = working_dir {
83 cmd.cwd(dir);
84 }
85
86 // Inject environment variables; always set TERM.
87 cmd.env("TERM", "xterm-256color");
88 for (k, v) in &env {
89 cmd.env(k, v);
90 }
91
92 // 3. Spawn the child on the slave side.
93 let child = pair
94 .slave
95 .spawn_command(cmd)
96 .map_err(|e| RuntimeError::Tool(format!("Failed to spawn command: {e}")))?;
97
98 // Drop the slave — the child process owns its end now.
99 drop(pair.slave);
100
101 // 4. Obtain writer and reader from the master.
102 let writer = pair
103 .master
104 .take_writer()
105 .map_err(|e| RuntimeError::Tool(format!("Failed to take PTY writer: {e}")))?;
106
107 let mut reader = pair
108 .master
109 .try_clone_reader()
110 .map_err(|e| RuntimeError::Tool(format!("Failed to clone PTY reader: {e}")))?;
111
112 // 5. Spawn a blocking reader task that pushes chunks into the channel.
113 let (output_tx, output_rx) = mpsc::unbounded_channel::<Vec<u8>>();
114 let alive = Arc::new(AtomicBool::new(true));
115 let reader_alive = alive.clone();
116
117 let reader_task = tokio::task::spawn_blocking(move || {
118 let mut buf = [0u8; 4096];
119 loop {
120 match reader.read(&mut buf) {
121 Ok(0) => {
122 // EOF — child closed its side.
123 break;
124 }
125 Ok(n) => {
126 if output_tx.send(buf[..n].to_vec()).is_err() {
127 // Receiver dropped — no one is listening anymore.
128 break;
129 }
130 }
131 Err(_) => {
132 // Read error (child exited, fd closed, etc.) — exit cleanly.
133 break;
134 }
135 }
136 }
137 reader_alive.store(false, Ordering::SeqCst);
138 });
139
140 // 6. Clone a killer for Drop usage.
141 let killer = child.clone_killer();
142
143 Ok(PtyHandle {
144 master: pair.master,
145 writer,
146 _reader_task: reader_task,
147 output_rx,
148 child,
149 alive,
150 killer,
151 })
152 }
153
154 /// Write raw bytes to the PTY (reaches the child's stdin).
155 pub fn write(&mut self, input: &[u8]) -> Result<()> {
156 self.writer
157 .write_all(input)
158 .map_err(|e| RuntimeError::Tool(format!("PTY write failed: {e}")))?;
159 self.writer
160 .flush()
161 .map_err(|e| RuntimeError::Tool(format!("PTY flush failed: {e}")))?;
162 Ok(())
163 }
164
165 /// Read all available output from the PTY, waiting up to `timeout` for data.
166 ///
167 /// Behavior:
168 /// 1. Drain everything currently in the channel (non-blocking).
169 /// 2. If nothing was found, wait up to `timeout` for the first chunk.
170 /// 3. After getting something (or timing out), drain any remaining buffered data.
171 ///
172 /// Returns an empty `Vec` if no data arrived within the timeout.
173 pub async fn try_read_output(&mut self, timeout: Duration) -> Vec<u8> {
174 let mut collected = Vec::new();
175
176 // Phase 1: non-blocking drain of everything already queued.
177 while let Ok(chunk) = self.output_rx.try_recv() {
178 collected.extend_from_slice(&chunk);
179 }
180
181 // Phase 2: if we got nothing, wait up to `timeout` for at least one chunk.
182 if collected.is_empty() {
183 match tokio::time::timeout(timeout, self.output_rx.recv()).await {
184 Ok(Some(chunk)) => {
185 collected.extend_from_slice(&chunk);
186 }
187 Ok(None) | Err(_) => {
188 // Channel closed or timeout — return whatever we have (empty).
189 return collected;
190 }
191 }
192
193 // Phase 3: drain any additional chunks that arrived while we waited.
194 while let Ok(chunk) = self.output_rx.try_recv() {
195 collected.extend_from_slice(&chunk);
196 }
197 }
198
199 collected
200 }
201
202 /// Resize the PTY to new dimensions.
203 pub fn resize(&self, rows: u16, cols: u16) -> Result<()> {
204 self.master
205 .resize(PtySize {
206 rows,
207 cols,
208 pixel_width: 0,
209 pixel_height: 0,
210 })
211 .map_err(|e| RuntimeError::Tool(format!("PTY resize failed: {e}")))
212 }
213
214 /// Check whether the child process is still running.
215 ///
216 /// Once the child exits, subsequent calls return `false` without syscalls.
217 pub fn is_alive(&mut self) -> bool {
218 if !self.alive.load(Ordering::SeqCst) {
219 return false;
220 }
221 match self.child.try_wait() {
222 Ok(Some(_status)) => {
223 // Child exited.
224 self.alive.store(false, Ordering::SeqCst);
225 false
226 }
227 Ok(None) => true,
228 Err(_) => {
229 // If we can't query, assume dead.
230 self.alive.store(false, Ordering::SeqCst);
231 false
232 }
233 }
234 }
235}
236
237impl Drop for PtyHandle {
238 fn drop(&mut self) {
239 if self.alive.load(Ordering::SeqCst) {
240 let _ = self.killer.kill();
241 // Reap the child after kill — without a wait(), the SIGKILL'd
242 // child lingers as a zombie until the parent process exits.
243 // SIGKILL takes effect near-instantly; bound the reap attempts
244 // so Drop can never hang on a pathological child.
245 for _ in 0..5 {
246 match self.child.try_wait() {
247 Ok(Some(_)) | Err(_) => break, // reaped (or unreapable)
248 Ok(None) => std::thread::sleep(std::time::Duration::from_millis(10)),
249 }
250 }
251 }
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use std::collections::HashMap;
259
260 #[tokio::test]
261 async fn test_spawn_echo_hello() {
262 let mut handle = PtyHandle::spawn(
263 "echo hello",
264 None,
265 HashMap::new(),
266 24,
267 80,
268 )
269 .expect("failed to spawn echo");
270
271 // Give the process time to produce output and exit.
272 let output = handle
273 .try_read_output(Duration::from_secs(3))
274 .await;
275
276 let text = String::from_utf8_lossy(&output);
277 assert!(
278 text.contains("hello"),
279 "expected 'hello' in output, got: {text:?}"
280 );
281 }
282
283 #[tokio::test]
284 async fn test_cat_echo_back() {
285 let mut handle = PtyHandle::spawn(
286 "cat",
287 None,
288 HashMap::new(),
289 24,
290 80,
291 )
292 .expect("failed to spawn cat");
293
294 // Write input — cat will echo it back via the PTY.
295 handle.write(b"test\n").expect("write failed");
296
297 let output = handle
298 .try_read_output(Duration::from_secs(3))
299 .await;
300
301 let text = String::from_utf8_lossy(&output);
302 assert!(
303 text.contains("test"),
304 "expected 'test' in output, got: {text:?}"
305 );
306 }
307
308 #[tokio::test]
309 async fn test_exit_code_detection() {
310 let mut handle = PtyHandle::spawn(
311 "bash -c exit 42",
312 None,
313 HashMap::new(),
314 24,
315 80,
316 )
317 .expect("failed to spawn bash exit");
318
319 // Wait for the process to finish — read until EOF / timeout.
320 let _ = handle
321 .try_read_output(Duration::from_secs(3))
322 .await;
323
324 // Small additional delay to let try_wait catch up.
325 tokio::time::sleep(Duration::from_millis(200)).await;
326
327 assert!(
328 !handle.is_alive(),
329 "expected process to have exited"
330 );
331 }
332}