1use anyhow::{anyhow, Context, Result};
11use portable_pty::{native_pty_system, Child, CommandBuilder, MasterPty, PtySize};
12use std::collections::hash_map::DefaultHasher;
13use std::collections::VecDeque;
14use std::hash::{Hash, Hasher};
15use std::io::{Read, Write};
16use std::path::Path;
17use std::process::Command;
18use std::sync::mpsc::{self, TryRecvError};
19use std::sync::Arc;
20use std::thread;
21use std::time::{Duration, Instant};
22use tokio::sync::Mutex;
23use tracing::{debug, info, warn};
24
25pub const DEFAULT_COLS: u16 = 200;
27pub const DEFAULT_ROWS: u16 = 50;
28
29const MAX_OUTPUT_SIZE: usize = 1_000_000;
31
32pub const RING_BUFFER_LINES: usize = 2_000;
36
37const WCGW_PROMPT_PATTERN: &str = "◉";
39const WCGW_PROMPT_END: &str = "──➤";
40
41fn attachable_command(restricted_mode: bool) -> (CommandBuilder, Option<String>) {
42 let requested = std::env::var("WINX_ATTACH_TERMINAL")
43 .or_else(|_| std::env::var("WINX_USE_SCREEN"))
44 .unwrap_or_default();
45 if !requested.is_empty() && requested != "0" && requested != "false" {
46 let session = format!("winx-{}-{}", std::process::id(), timestamp_millis());
47 if requested == "tmux" && command_available("tmux") {
48 let mut cmd = CommandBuilder::new("tmux");
49 cmd.args(["new-session", "-A", "-s", &session, "bash"]);
50 if restricted_mode {
51 cmd.arg("-r");
52 }
53 return (cmd, Some(format!("tmux attach -t {session}")));
54 }
55 if command_available("screen") {
56 let mut cmd = CommandBuilder::new("screen");
57 cmd.args(["-q", "-S", &session, "bash"]);
58 if restricted_mode {
59 cmd.arg("-r");
60 }
61 return (cmd, Some(format!("screen -x {session}")));
62 }
63 }
64
65 let mut cmd = CommandBuilder::new("bash");
66 if restricted_mode {
67 cmd.arg("-r");
68 }
69 (cmd, None)
70}
71
72fn command_available(command: &str) -> bool {
73 Command::new("sh")
74 .args(["-c", &format!("command -v {command}")])
75 .output()
76 .is_ok_and(|output| output.status.success())
77}
78
79fn timestamp_millis() -> u128 {
80 std::time::SystemTime::now()
81 .duration_since(std::time::UNIX_EPOCH)
82 .map_or(0, |duration| duration.as_millis())
83}
84
85pub struct PtyShell {
90 master: Box<dyn MasterPty + Send>,
92 child: Box<dyn Child + Send + Sync>,
94 writer: Box<dyn Write + Send>,
96 output_rx: mpsc::Receiver<String>,
98 size: PtySize,
100 pub last_command: String,
102 pub output_buffer: String,
104 pub command_running: bool,
106 max_output_size: usize,
108 pub output_truncated: bool,
110 pub line_ring: VecDeque<String>,
113 line_ring_partial: String,
116 pub last_returned_hash: Option<u64>,
119 pub attach_hint: Option<String>,
121}
122
123impl std::fmt::Debug for PtyShell {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 f.debug_struct("PtyShell")
126 .field("size", &format!("{}x{}", self.size.cols, self.size.rows))
127 .field("last_command", &self.last_command)
128 .field("command_running", &self.command_running)
129 .field("output_truncated", &self.output_truncated)
130 .field("output_buffer_len", &self.output_buffer.len())
131 .field("attach_hint", &self.attach_hint)
132 .finish_non_exhaustive()
133 }
134}
135
136impl PtyShell {
137 pub fn new(initial_dir: &Path, restricted_mode: bool) -> Result<Self> {
146 info!(
147 "Creating new PTY shell (restricted: {}) in {}",
148 restricted_mode,
149 initial_dir.display()
150 );
151
152 let pty_system = native_pty_system();
154
155 let size =
157 PtySize { rows: DEFAULT_ROWS, cols: DEFAULT_COLS, pixel_width: 0, pixel_height: 0 };
158
159 let pair = pty_system.openpty(size).context("Failed to open PTY pair")?;
161
162 let (mut cmd, attach_hint) = attachable_command(restricted_mode);
164
165 cmd.env("TERM", "xterm-256color");
167 cmd.env("COLORTERM", "truecolor");
168 cmd.env("PAGER", "cat");
169 cmd.env("GIT_PAGER", "cat");
170 cmd.env("COLUMNS", DEFAULT_COLS.to_string());
171 cmd.env("ROWS", DEFAULT_ROWS.to_string());
172 cmd.env("PROMPT_COMMAND", r#"printf "◉ %s──➤ " "$PWD""#);
175 cmd.cwd(initial_dir);
176
177 let child = pair.slave.spawn_command(cmd).context("Failed to spawn bash in PTY")?;
179
180 let mut reader = pair.master.try_clone_reader().context("Failed to clone PTY reader")?;
182 let writer = pair.master.take_writer().context("Failed to take PTY writer")?;
183
184 let (output_tx, output_rx) = mpsc::channel::<String>();
186
187 thread::spawn(move || {
190 let mut buf = [0u8; 4096];
191 loop {
192 match reader.read(&mut buf) {
193 Ok(0) => {
194 break;
196 }
197 Ok(n) => {
198 let chunk = String::from_utf8_lossy(&buf[..n]).to_string();
199 if output_tx.send(chunk).is_err() {
200 break;
202 }
203 }
204 Err(e) => {
205 debug!("PTY reader thread error: {}", e);
206 break;
207 }
208 }
209 }
210 debug!("PTY reader thread exiting");
211 });
212
213 let mut shell = Self {
215 master: pair.master,
216 child,
217 writer,
218 output_rx,
219 size,
220 last_command: String::new(),
221 output_buffer: String::new(),
222 command_running: false,
223 max_output_size: MAX_OUTPUT_SIZE,
224 output_truncated: false,
225 line_ring: VecDeque::with_capacity(RING_BUFFER_LINES),
226 line_ring_partial: String::new(),
227 last_returned_hash: None,
228 attach_hint,
229 };
230
231 shell.initialize_prompt()?;
233
234 debug!("PTY shell created successfully");
235 Ok(shell)
236 }
237
238 fn initialize_prompt(&mut self) -> Result<()> {
240 let prompt_statement =
243 r#"export GIT_PAGER=cat PAGER=cat PROMPT_COMMAND='printf "◉ %s──➤ " "$PWD"'"#;
244
245 self.write_command(prompt_statement)?;
246
247 std::thread::sleep(Duration::from_millis(100));
249 let _ = self.drain_output();
250
251 Ok(())
252 }
253
254 fn write_command(&mut self, command: &str) -> Result<()> {
256 let cmd_with_newline = format!("{command}\n");
258 self.writer.write_all(cmd_with_newline.as_bytes()).context("Failed to write to PTY")?;
259 self.writer.flush().context("Failed to flush PTY")?;
260 Ok(())
261 }
262
263 fn drain_output(&mut self) -> String {
265 let mut output = String::new();
266 let deadline = Instant::now() + Duration::from_millis(200);
267
268 while Instant::now() < deadline {
270 match self.output_rx.try_recv() {
271 Ok(chunk) => {
272 output.push_str(&chunk);
273
274 if output.len() > self.max_output_size {
276 self.output_truncated = true;
277 break;
278 }
279 }
280 Err(TryRecvError::Empty) => {
281 thread::sleep(Duration::from_millis(10));
283 }
284 Err(TryRecvError::Disconnected) => {
285 break;
287 }
288 }
289 }
290
291 output
292 }
293
294 pub fn clear_to_run(&mut self, max_wait_secs: f32) -> Result<bool> {
301 let (_, complete) = self.read_output(max_wait_secs.min(0.5))?;
304 if complete {
305 return Ok(true);
306 }
307
308 debug!("clear_to_run: prompt not seen, sending Ctrl+C");
310 self.send_interrupt()?;
311
312 let (_, drained) = self.read_output(max_wait_secs)?;
314 Ok(drained)
315 }
316
317 pub fn send_command(&mut self, command: &str) -> Result<()> {
319 debug!("PTY sending command: {}", command);
320
321 self.output_buffer.clear();
323 self.output_truncated = false;
324 self.last_command = command.to_string();
325 self.command_running = true;
326 self.last_returned_hash = None;
329
330 self.write_command(command)?;
332
333 Ok(())
334 }
335
336 fn ingest_into_ring(&mut self, chunk: &str) {
339 let combined = if self.line_ring_partial.is_empty() {
340 chunk.to_string()
341 } else {
342 let mut s = std::mem::take(&mut self.line_ring_partial);
343 s.push_str(chunk);
344 s
345 };
346
347 let mut last_nl_end: Option<usize> = None;
348 for (idx, ch) in combined.char_indices() {
349 if ch == '\n' {
350 let end = idx + ch.len_utf8();
351 let start = last_nl_end.unwrap_or(0);
352 let line = combined[start..idx].trim_end_matches('\r').to_string();
353 if self.line_ring.len() == RING_BUFFER_LINES {
354 self.line_ring.pop_front();
355 }
356 self.line_ring.push_back(line);
357 last_nl_end = Some(end);
358 }
359 }
360
361 if let Some(end) = last_nl_end {
362 self.line_ring_partial = combined[end..].to_string();
363 } else {
364 self.line_ring_partial = combined;
365 }
366 }
367
368 pub fn collect_scrollback(&self, lines: usize) -> String {
371 if lines == 0 {
372 return String::new();
373 }
374 let start = self.line_ring.len().saturating_sub(lines);
375 let mut out = String::new();
376 for line in self.line_ring.iter().skip(start) {
377 out.push_str(line);
378 out.push('\n');
379 }
380 if !self.line_ring_partial.is_empty() {
381 out.push_str(&self.line_ring_partial);
382 }
383 out
384 }
385
386 pub fn fingerprint(text: &str) -> u64 {
388 let mut hasher = DefaultHasher::new();
389 text.hash(&mut hasher);
390 hasher.finish()
391 }
392
393 pub fn read_output(&mut self, timeout_secs: f32) -> Result<(String, bool)> {
398 let timeout = Duration::from_secs_f32(timeout_secs.clamp(0.1, 60.0));
399 let start = Instant::now();
400 let mut complete = false;
401 let mut no_data_count = 0;
402 let mut prompt_detected_at: Option<Instant> = None;
403
404 while start.elapsed() < timeout {
405 match self.output_rx.try_recv() {
406 Ok(chunk) => {
407 self.output_buffer.push_str(&chunk);
408 self.ingest_into_ring(&chunk);
409 no_data_count = 0;
410
411 if prompt_detected_at.is_none()
413 && (Self::check_prompt_complete(&chunk)
414 || Self::check_prompt_complete(&self.output_buffer))
415 {
416 prompt_detected_at = Some(Instant::now());
417 debug!("Prompt detected, draining remaining output...");
418 }
419
420 if self.output_buffer.len() > self.max_output_size {
422 self.output_truncated = true;
423 let truncate_msg = "\n(...output truncated...)\n";
424 let keep_size = self.max_output_size / 2;
425 self.output_buffer = format!(
426 "{}{}",
427 truncate_msg,
428 &self.output_buffer[self.output_buffer.len() - keep_size..]
429 );
430 }
431 }
432 Err(TryRecvError::Empty) => {
433 thread::sleep(Duration::from_millis(10));
435 no_data_count += 1;
436
437 if let Some(detected_time) = prompt_detected_at {
439 if detected_time.elapsed() > Duration::from_millis(100) {
441 complete = true;
442 debug!("Command completed - prompt detected and drained");
443 break;
444 }
445 } else if no_data_count > 10 && Self::check_prompt_complete(&self.output_buffer)
446 {
447 prompt_detected_at = Some(Instant::now());
449 debug!("Prompt detected after wait, draining...");
450 }
451 }
452 Err(TryRecvError::Disconnected) => {
453 warn!("PTY reader disconnected");
455 complete = true;
456 break;
457 }
458 }
459 }
460
461 if complete || prompt_detected_at.is_some() {
462 self.command_running = false;
463 complete = true;
464 }
465
466 Ok((self.output_buffer.clone(), complete))
467 }
468
469 fn check_prompt_complete(text: &str) -> bool {
471 text.contains(WCGW_PROMPT_PATTERN) && text.contains(WCGW_PROMPT_END)
473 }
474
475 pub fn send_interrupt(&mut self) -> Result<()> {
477 debug!("PTY sending Ctrl+C");
478 self.writer
479 .write_all(&[0x03]) .context("Failed to send Ctrl+C")?;
481 self.writer.flush()?;
482 Ok(())
483 }
484
485 pub fn send_eof(&mut self) -> Result<()> {
487 debug!("PTY sending Ctrl+D");
488 self.writer
489 .write_all(&[0x04]) .context("Failed to send Ctrl+D")?;
491 self.writer.flush()?;
492 Ok(())
493 }
494
495 pub fn send_suspend(&mut self) -> Result<()> {
497 debug!("PTY sending Ctrl+Z");
498 self.writer
499 .write_all(&[0x1A]) .context("Failed to send Ctrl+Z")?;
501 self.writer.flush()?;
502 Ok(())
503 }
504
505 pub fn send_text(&mut self, text: &str) -> Result<()> {
507 debug!("PTY sending text: {:?}", text);
508 self.send_bytes(text.as_bytes()).context("Failed to send text")?;
509 Ok(())
510 }
511
512 pub fn send_bytes(&mut self, bytes: &[u8]) -> Result<()> {
514 self.writer.write_all(bytes).context("Failed to send bytes")?;
515 self.writer.flush()?;
516 Ok(())
517 }
518
519 pub fn send_special_key(&mut self, key: &str) -> Result<()> {
521 let bytes: &[u8] = match key {
522 "Enter" => b"\r",
523 "Tab" => b"\t",
524 "Backspace" => b"\x7F",
525 "Escape" => b"\x1B",
526 "Up" | "KeyUp" => b"\x1B[A",
527 "Down" | "KeyDown" => b"\x1B[B",
528 "Right" | "KeyRight" => b"\x1B[C",
529 "Left" | "KeyLeft" => b"\x1B[D",
530 "Home" => b"\x1B[H",
531 "End" => b"\x1B[F",
532 "PageUp" => b"\x1B[5~",
533 "PageDown" => b"\x1B[6~",
534 "Delete" => b"\x1B[3~",
535 "Insert" => b"\x1B[2~",
536 "CtrlC" | "Ctrl-C" => b"\x03",
537 "CtrlD" | "Ctrl-D" => b"\x04",
538 "CtrlZ" | "Ctrl-Z" => b"\x1A",
539 "CtrlL" | "Ctrl-L" => b"\x0C",
540 _ => return Err(anyhow!("Unknown special key: {key}")),
541 };
542
543 debug!("PTY sending special key: {} ({:?})", key, bytes);
544 self.send_bytes(bytes)?;
545 Ok(())
546 }
547
548 pub fn resize(&mut self, cols: u16, rows: u16) -> Result<()> {
550 debug!("PTY resizing to {}x{}", cols, rows);
551
552 let new_size = PtySize { rows, cols, pixel_width: 0, pixel_height: 0 };
553
554 self.master.resize(new_size).context("Failed to resize PTY")?;
555
556 self.size = new_size;
557 Ok(())
558 }
559
560 pub fn get_size(&self) -> (u16, u16) {
562 (self.size.cols, self.size.rows)
563 }
564
565 pub fn is_alive(&mut self) -> bool {
567 self.child.try_wait().is_ok_and(|status| status.is_none())
568 }
569}
570
571pub type SharedPtyShell = Arc<Mutex<Option<PtyShell>>>;
573
574pub fn create_shared_pty(initial_dir: &Path, restricted_mode: bool) -> Result<SharedPtyShell> {
576 let shell = PtyShell::new(initial_dir, restricted_mode)?;
577 Ok(Arc::new(Mutex::new(Some(shell))))
578}
579
580#[cfg(test)]
581mod tests {
582 use super::*;
583 use tempfile::TempDir;
584
585 #[test]
586 fn test_pty_shell_creation() -> Result<()> {
587 let temp_dir = TempDir::new()?;
588 let result = PtyShell::new(temp_dir.path(), false);
589 assert!(result.is_ok(), "Failed to create PTY shell: {:?}", result.err());
590 Ok(())
591 }
592
593 #[test]
594 fn test_pty_shell_echo() -> Result<()> {
595 let temp_dir = TempDir::new()?;
596 let mut shell = PtyShell::new(temp_dir.path(), false)?;
597
598 shell.send_command("echo 'hello pty'")?;
599 let (output, _complete) = shell.read_output(2.0)?;
600
601 assert!(output.contains("hello pty"), "Output should contain 'hello pty': {output}");
602 Ok(())
603 }
604
605 #[test]
606 fn test_pty_shell_pwd() -> Result<()> {
607 let temp_dir = TempDir::new()?;
608 let mut shell = PtyShell::new(temp_dir.path(), false)?;
609
610 shell.send_command("pwd && echo 'pwd_done'")?;
613 let (output, _complete) = shell.read_output(2.0)?;
614
615 assert!(output.contains("pwd_done"), "Output should contain 'pwd_done': {output}");
617 Ok(())
618 }
619
620 #[test]
621 fn test_pty_resize() -> Result<()> {
622 let temp_dir = TempDir::new()?;
623 let mut shell = PtyShell::new(temp_dir.path(), false)?;
624
625 let result = shell.resize(120, 40);
626 assert!(result.is_ok());
627
628 let (cols, rows) = shell.get_size();
629 assert_eq!(cols, 120);
630 assert_eq!(rows, 40);
631 Ok(())
632 }
633}