synaps_cli/tools/shell/
pty.rs1use std::collections::HashMap;
11use std::io::Write;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use portable_pty::{CommandBuilder, MasterPty, PtySize, native_pty_system, Child, ChildKiller};
17use tokio::sync::mpsc;
18use tokio::task::JoinHandle;
19
20use crate::{Result, RuntimeError};
21
22pub struct PtyHandle {
28 master: Box<dyn MasterPty + Send>,
30 writer: Box<dyn Write + Send>,
32 _reader_task: JoinHandle<()>,
34 output_rx: mpsc::UnboundedReceiver<Vec<u8>>,
36 child: Box<dyn Child + Send + Sync>,
38 alive: Arc<AtomicBool>,
40 killer: Box<dyn ChildKiller + Send + Sync>,
42}
43
44impl PtyHandle {
45 pub fn spawn(
53 command: &str,
54 working_dir: Option<&str>,
55 env: HashMap<String, String>,
56 rows: u16,
57 cols: u16,
58 ) -> Result<Self> {
59 let pty_system = native_pty_system();
61 let pair = pty_system
62 .openpty(PtySize {
63 rows,
64 cols,
65 pixel_width: 0,
66 pixel_height: 0,
67 })
68 .map_err(|e| RuntimeError::Tool(format!("Failed to open PTY: {e}")))?;
69
70 let parts: Vec<&str> = command.split_whitespace().collect();
73 let program = parts
74 .first()
75 .ok_or_else(|| RuntimeError::Tool("Empty command string".to_string()))?;
76 let mut cmd = CommandBuilder::new(program);
77 for arg in parts.iter().skip(1) {
78 cmd.arg(arg);
79 }
80
81 if let Some(dir) = working_dir {
83 cmd.cwd(dir);
84 }
85
86 cmd.env("TERM", "xterm-256color");
88 for (k, v) in &env {
89 cmd.env(k, v);
90 }
91
92 let child = pair
94 .slave
95 .spawn_command(cmd)
96 .map_err(|e| RuntimeError::Tool(format!("Failed to spawn command: {e}")))?;
97
98 drop(pair.slave);
100
101 let writer = pair
103 .master
104 .take_writer()
105 .map_err(|e| RuntimeError::Tool(format!("Failed to take PTY writer: {e}")))?;
106
107 let mut reader = pair
108 .master
109 .try_clone_reader()
110 .map_err(|e| RuntimeError::Tool(format!("Failed to clone PTY reader: {e}")))?;
111
112 let (output_tx, output_rx) = mpsc::unbounded_channel::<Vec<u8>>();
114 let alive = Arc::new(AtomicBool::new(true));
115 let reader_alive = alive.clone();
116
117 let reader_task = tokio::task::spawn_blocking(move || {
118 let mut buf = [0u8; 4096];
119 loop {
120 match reader.read(&mut buf) {
121 Ok(0) => {
122 break;
124 }
125 Ok(n) => {
126 if output_tx.send(buf[..n].to_vec()).is_err() {
127 break;
129 }
130 }
131 Err(_) => {
132 break;
134 }
135 }
136 }
137 reader_alive.store(false, Ordering::SeqCst);
138 });
139
140 let killer = child.clone_killer();
142
143 Ok(PtyHandle {
144 master: pair.master,
145 writer,
146 _reader_task: reader_task,
147 output_rx,
148 child,
149 alive,
150 killer,
151 })
152 }
153
154 pub fn write(&mut self, input: &[u8]) -> Result<()> {
156 self.writer
157 .write_all(input)
158 .map_err(|e| RuntimeError::Tool(format!("PTY write failed: {e}")))?;
159 self.writer
160 .flush()
161 .map_err(|e| RuntimeError::Tool(format!("PTY flush failed: {e}")))?;
162 Ok(())
163 }
164
165 pub async fn try_read_output(&mut self, timeout: Duration) -> Vec<u8> {
174 let mut collected = Vec::new();
175
176 while let Ok(chunk) = self.output_rx.try_recv() {
178 collected.extend_from_slice(&chunk);
179 }
180
181 if collected.is_empty() {
183 match tokio::time::timeout(timeout, self.output_rx.recv()).await {
184 Ok(Some(chunk)) => {
185 collected.extend_from_slice(&chunk);
186 }
187 Ok(None) | Err(_) => {
188 return collected;
190 }
191 }
192
193 while let Ok(chunk) = self.output_rx.try_recv() {
195 collected.extend_from_slice(&chunk);
196 }
197 }
198
199 collected
200 }
201
202 pub fn resize(&self, rows: u16, cols: u16) -> Result<()> {
204 self.master
205 .resize(PtySize {
206 rows,
207 cols,
208 pixel_width: 0,
209 pixel_height: 0,
210 })
211 .map_err(|e| RuntimeError::Tool(format!("PTY resize failed: {e}")))
212 }
213
214 pub fn is_alive(&mut self) -> bool {
218 if !self.alive.load(Ordering::SeqCst) {
219 return false;
220 }
221 match self.child.try_wait() {
222 Ok(Some(_status)) => {
223 self.alive.store(false, Ordering::SeqCst);
225 false
226 }
227 Ok(None) => true,
228 Err(_) => {
229 self.alive.store(false, Ordering::SeqCst);
231 false
232 }
233 }
234 }
235}
236
237impl Drop for PtyHandle {
238 fn drop(&mut self) {
239 if self.alive.load(Ordering::SeqCst) {
240 let _ = self.killer.kill();
241 }
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248 use std::collections::HashMap;
249
250 #[tokio::test]
251 async fn test_spawn_echo_hello() {
252 let mut handle = PtyHandle::spawn(
253 "echo hello",
254 None,
255 HashMap::new(),
256 24,
257 80,
258 )
259 .expect("failed to spawn echo");
260
261 let output = handle
263 .try_read_output(Duration::from_secs(3))
264 .await;
265
266 let text = String::from_utf8_lossy(&output);
267 assert!(
268 text.contains("hello"),
269 "expected 'hello' in output, got: {text:?}"
270 );
271 }
272
273 #[tokio::test]
274 async fn test_cat_echo_back() {
275 let mut handle = PtyHandle::spawn(
276 "cat",
277 None,
278 HashMap::new(),
279 24,
280 80,
281 )
282 .expect("failed to spawn cat");
283
284 handle.write(b"test\n").expect("write failed");
286
287 let output = handle
288 .try_read_output(Duration::from_secs(3))
289 .await;
290
291 let text = String::from_utf8_lossy(&output);
292 assert!(
293 text.contains("test"),
294 "expected 'test' in output, got: {text:?}"
295 );
296 }
297
298 #[tokio::test]
299 async fn test_exit_code_detection() {
300 let mut handle = PtyHandle::spawn(
301 "bash -c exit 42",
302 None,
303 HashMap::new(),
304 24,
305 80,
306 )
307 .expect("failed to spawn bash exit");
308
309 let _ = handle
311 .try_read_output(Duration::from_secs(3))
312 .await;
313
314 tokio::time::sleep(Duration::from_millis(200)).await;
316
317 assert!(
318 !handle.is_alive(),
319 "expected process to have exited"
320 );
321 }
322}