use anyhow::{anyhow, Context, Result};
use portable_pty::{native_pty_system, Child, CommandBuilder, MasterPty, PtySize};
use std::collections::hash_map::DefaultHasher;
use std::collections::VecDeque;
use std::hash::{Hash, Hasher};
use std::io::{Read, Write};
use std::path::Path;
use std::process::Command;
use std::sync::mpsc::{self, TryRecvError};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
pub const DEFAULT_COLS: u16 = 200;
pub const DEFAULT_ROWS: u16 = 50;
const MAX_OUTPUT_SIZE: usize = 1_000_000;
pub const RING_BUFFER_LINES: usize = 2_000;
const WCGW_PROMPT_PATTERN: &str = "◉";
const WCGW_PROMPT_END: &str = "──➤";
fn attachable_command(restricted_mode: bool) -> (CommandBuilder, Option<String>, bool) {
let requested = std::env::var("WINX_ATTACH_TERMINAL")
.or_else(|_| std::env::var("WINX_USE_SCREEN"))
.unwrap_or_default();
if !requested.is_empty() && requested != "0" && requested != "false" {
let session = format!("winx-{}-{}", std::process::id(), timestamp_millis());
if requested == "tmux" && command_available("tmux") {
let mut cmd = CommandBuilder::new("tmux");
cmd.args(["new-session", "-A", "-s", &session, "bash"]);
if restricted_mode {
cmd.arg("-r");
}
return (cmd, Some(format!("tmux attach -t {session}")), false);
}
if command_available("screen") {
ensure_screenrc();
cleanup_orphaned_screens();
let mut cmd = CommandBuilder::new("screen");
cmd.args(["-q", "-S", &session, "bash"]);
if restricted_mode {
cmd.arg("-r");
}
return (cmd, Some(format!("screen -x {session}")), false);
}
}
let shell = preferred_shell(restricted_mode);
let is_zsh = shell == "zsh";
let mut cmd = CommandBuilder::new(&shell);
if restricted_mode && !is_zsh {
cmd.arg("-r");
}
(cmd, None, is_zsh)
}
fn preferred_shell(restricted_mode: bool) -> String {
if !restricted_mode {
if let Ok(requested) = std::env::var("WINX_SHELL") {
if requested == "zsh" && command_available("zsh") {
return "zsh".to_string();
}
}
}
"bash".to_string()
}
fn command_available(command: &str) -> bool {
Command::new("sh")
.args(["-c", &format!("command -v {command}")])
.output()
.is_ok_and(|output| output.status.success())
}
fn ensure_screenrc() {
let Some(home) = home::home_dir() else {
return;
};
let screenrc = home.join(".screenrc");
if screenrc.exists() {
return;
}
let _ = std::fs::write(
&screenrc,
"defscrollback 10000\ntermcapinfo xterm* ti@:te@\nstartup_message off\n",
);
}
fn cleanup_orphaned_screens() {
let Ok(output) = Command::new("screen").arg("-ls").output() else {
return;
};
let listing = String::from_utf8_lossy(&output.stdout);
for line in listing.lines() {
let Some(session) = line.split_whitespace().next() else {
continue;
};
let Some((_, name)) = session.split_once('.') else {
continue;
};
if let Some(creator_pid) = winx_creator_pid(name) {
if !process_exists(creator_pid) {
let _ = Command::new("screen").args(["-S", session, "-X", "quit"]).output();
}
}
}
}
fn winx_creator_pid(name: &str) -> Option<u32> {
name.strip_prefix("winx-")?.split('-').next()?.parse::<u32>().ok()
}
fn process_exists(pid: u32) -> bool {
std::path::Path::new("/proc").join(pid.to_string()).exists()
}
fn timestamp_millis() -> u128 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |duration| duration.as_millis())
}
pub struct PtyShell {
master: Box<dyn MasterPty + Send>,
child: Box<dyn Child + Send + Sync>,
writer: Box<dyn Write + Send>,
output_rx: mpsc::Receiver<String>,
size: PtySize,
pub last_command: String,
pub output_buffer: String,
pub command_running: bool,
max_output_size: usize,
pub output_truncated: bool,
pub line_ring: VecDeque<String>,
line_ring_partial: String,
pub last_returned_hash: Option<u64>,
pub attach_hint: Option<String>,
}
impl std::fmt::Debug for PtyShell {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PtyShell")
.field("size", &format!("{}x{}", self.size.cols, self.size.rows))
.field("last_command", &self.last_command)
.field("command_running", &self.command_running)
.field("output_truncated", &self.output_truncated)
.field("output_buffer_len", &self.output_buffer.len())
.field("attach_hint", &self.attach_hint)
.finish_non_exhaustive()
}
}
impl Drop for PtyShell {
fn drop(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
}
impl PtyShell {
pub fn new(initial_dir: &Path, restricted_mode: bool) -> Result<Self> {
info!(
"Creating new PTY shell (restricted: {}) in {}",
restricted_mode,
initial_dir.display()
);
let pty_system = native_pty_system();
let size =
PtySize { rows: DEFAULT_ROWS, cols: DEFAULT_COLS, pixel_width: 0, pixel_height: 0 };
let pair = pty_system.openpty(size).context("Failed to open PTY pair")?;
let (mut cmd, attach_hint, is_zsh) = attachable_command(restricted_mode);
cmd.env("TERM", "xterm-256color");
cmd.env("COLORTERM", "truecolor");
cmd.env("PAGER", "cat");
cmd.env("GIT_PAGER", "cat");
cmd.env("COLUMNS", DEFAULT_COLS.to_string());
cmd.env("ROWS", DEFAULT_ROWS.to_string());
cmd.env("PROMPT_COMMAND", r#"printf "◉ %s──➤ " "$PWD""#);
cmd.cwd(initial_dir);
let child = pair.slave.spawn_command(cmd).context("Failed to spawn bash in PTY")?;
let mut reader = pair.master.try_clone_reader().context("Failed to clone PTY reader")?;
let writer = pair.master.take_writer().context("Failed to take PTY writer")?;
let (output_tx, output_rx) = mpsc::channel::<String>();
thread::spawn(move || {
let mut buf = [0u8; 4096];
loop {
match reader.read(&mut buf) {
Ok(0) => {
break;
}
Ok(n) => {
let chunk = String::from_utf8_lossy(&buf[..n]).to_string();
if output_tx.send(chunk).is_err() {
break;
}
}
Err(e) => {
debug!("PTY reader thread error: {}", e);
break;
}
}
}
debug!("PTY reader thread exiting");
});
let mut shell = Self {
master: pair.master,
child,
writer,
output_rx,
size,
last_command: String::new(),
output_buffer: String::new(),
command_running: false,
max_output_size: MAX_OUTPUT_SIZE,
output_truncated: false,
line_ring: VecDeque::with_capacity(RING_BUFFER_LINES),
line_ring_partial: String::new(),
last_returned_hash: None,
attach_hint,
};
shell.initialize_prompt(is_zsh)?;
debug!("PTY shell created successfully");
Ok(shell)
}
fn initialize_prompt(&mut self, is_zsh: bool) -> Result<()> {
let prompt_statement = if is_zsh {
r#"export GIT_PAGER=cat PAGER=cat; precmd_functions=(); preexec_functions=(); PROMPT=''; RPROMPT=''; precmd() { printf "◉ %s──➤ " "$PWD" }"#
} else {
r#"export GIT_PAGER=cat PAGER=cat PROMPT_COMMAND='printf "◉ %s──➤ " "$PWD"'; PS1=''"#
};
self.write_command(prompt_statement)?;
std::thread::sleep(Duration::from_millis(100));
let _ = self.drain_output();
Ok(())
}
fn write_command(&mut self, command: &str) -> Result<()> {
let cmd_with_newline = format!("{command}\n");
self.writer.write_all(cmd_with_newline.as_bytes()).context("Failed to write to PTY")?;
self.writer.flush().context("Failed to flush PTY")?;
Ok(())
}
fn drain_output(&mut self) -> String {
let mut output = String::new();
let deadline = Instant::now() + Duration::from_millis(200);
while Instant::now() < deadline {
match self.output_rx.try_recv() {
Ok(chunk) => {
output.push_str(&chunk);
if output.len() > self.max_output_size {
self.output_truncated = true;
break;
}
}
Err(TryRecvError::Empty) => {
thread::sleep(Duration::from_millis(10));
}
Err(TryRecvError::Disconnected) => {
break;
}
}
}
output
}
pub fn clear_to_run(&mut self, max_wait_secs: f32) -> Result<bool> {
let (_, complete) = self.read_output(max_wait_secs.min(0.5))?;
if complete {
return Ok(true);
}
debug!("clear_to_run: prompt not seen, sending Ctrl+C");
self.send_interrupt()?;
let (_, drained) = self.read_output(max_wait_secs)?;
Ok(drained)
}
pub fn send_command(&mut self, command: &str) -> Result<()> {
debug!("PTY sending command: {}", command);
self.output_buffer.clear();
self.output_truncated = false;
self.last_command = command.to_string();
self.command_running = true;
self.last_returned_hash = None;
self.write_command(command)?;
Ok(())
}
fn ingest_into_ring(&mut self, chunk: &str) {
let combined = if self.line_ring_partial.is_empty() {
chunk.to_string()
} else {
let mut s = std::mem::take(&mut self.line_ring_partial);
s.push_str(chunk);
s
};
let mut last_nl_end: Option<usize> = None;
for (idx, ch) in combined.char_indices() {
if ch == '\n' {
let end = idx + ch.len_utf8();
let start = last_nl_end.unwrap_or(0);
let line = combined[start..idx].trim_end_matches('\r').to_string();
if self.line_ring.len() == RING_BUFFER_LINES {
self.line_ring.pop_front();
}
self.line_ring.push_back(line);
last_nl_end = Some(end);
}
}
if let Some(end) = last_nl_end {
self.line_ring_partial = combined[end..].to_string();
} else {
self.line_ring_partial = combined;
}
}
pub fn collect_scrollback(&self, lines: usize) -> String {
if lines == 0 {
return String::new();
}
let start = self.line_ring.len().saturating_sub(lines);
let mut out = String::new();
for line in self.line_ring.iter().skip(start) {
out.push_str(line);
out.push('\n');
}
if !self.line_ring_partial.is_empty() {
out.push_str(&self.line_ring_partial);
}
crate::state::terminal::render_terminal_output(&out).join("\n")
}
pub fn fingerprint(text: &str) -> u64 {
let mut hasher = DefaultHasher::new();
text.hash(&mut hasher);
hasher.finish()
}
pub fn read_output(&mut self, timeout_secs: f32) -> Result<(String, bool)> {
let timeout = Duration::from_secs_f32(timeout_secs.clamp(0.1, 60.0));
let start = Instant::now();
let mut complete = false;
let mut no_data_count = 0;
let mut prompt_detected_at: Option<Instant> = None;
while start.elapsed() < timeout {
match self.output_rx.try_recv() {
Ok(chunk) => {
self.output_buffer.push_str(&chunk);
self.ingest_into_ring(&chunk);
no_data_count = 0;
if prompt_detected_at.is_none()
&& (Self::check_prompt_complete(&chunk)
|| Self::check_prompt_complete(&self.output_buffer))
{
prompt_detected_at = Some(Instant::now());
debug!("Prompt detected, draining remaining output...");
}
if self.output_buffer.len() > self.max_output_size {
self.output_truncated = true;
let truncate_msg = "\n(...output truncated...)\n";
let keep_size = self.max_output_size / 2;
let mut cut = self.output_buffer.len() - keep_size;
while cut < self.output_buffer.len()
&& !self.output_buffer.is_char_boundary(cut)
{
cut += 1;
}
self.output_buffer =
format!("{truncate_msg}{}", &self.output_buffer[cut..]);
}
}
Err(TryRecvError::Empty) => {
thread::sleep(Duration::from_millis(10));
no_data_count += 1;
if let Some(detected_time) = prompt_detected_at {
if detected_time.elapsed() > Duration::from_millis(100) {
complete = true;
debug!("Command completed - prompt detected and drained");
break;
}
} else if no_data_count > 10 && Self::check_prompt_complete(&self.output_buffer)
{
prompt_detected_at = Some(Instant::now());
debug!("Prompt detected after wait, draining...");
}
}
Err(TryRecvError::Disconnected) => {
warn!("PTY reader disconnected");
complete = true;
break;
}
}
}
if complete || prompt_detected_at.is_some() {
self.command_running = false;
complete = true;
}
Ok((self.output_buffer.clone(), complete))
}
fn check_prompt_complete(text: &str) -> bool {
text.lines().rev().find(|line| !line.trim().is_empty()).is_some_and(|last| {
let clean = crate::state::terminal::strip_ansi_codes(last);
let clean = clean.trim_end();
clean.contains(WCGW_PROMPT_PATTERN) && clean.ends_with(WCGW_PROMPT_END)
})
}
pub fn send_interrupt(&mut self) -> Result<()> {
debug!("PTY sending Ctrl+C");
self.writer
.write_all(&[0x03]) .context("Failed to send Ctrl+C")?;
self.writer.flush()?;
Ok(())
}
pub fn send_eof(&mut self) -> Result<()> {
debug!("PTY sending Ctrl+D");
self.writer
.write_all(&[0x04]) .context("Failed to send Ctrl+D")?;
self.writer.flush()?;
Ok(())
}
pub fn send_suspend(&mut self) -> Result<()> {
debug!("PTY sending Ctrl+Z");
self.writer
.write_all(&[0x1A]) .context("Failed to send Ctrl+Z")?;
self.writer.flush()?;
Ok(())
}
pub fn send_text(&mut self, text: &str) -> Result<()> {
debug!("PTY sending text: {:?}", text);
self.send_bytes(text.as_bytes()).context("Failed to send text")?;
Ok(())
}
pub fn send_bytes(&mut self, bytes: &[u8]) -> Result<()> {
self.writer.write_all(bytes).context("Failed to send bytes")?;
self.writer.flush()?;
Ok(())
}
pub fn send_special_key(&mut self, key: &str) -> Result<()> {
let bytes: &[u8] = match key {
"Enter" => b"\r",
"Tab" => b"\t",
"Backspace" => b"\x7F",
"Escape" => b"\x1B",
"Up" | "KeyUp" => b"\x1B[A",
"Down" | "KeyDown" => b"\x1B[B",
"Right" | "KeyRight" => b"\x1B[C",
"Left" | "KeyLeft" => b"\x1B[D",
"Home" => b"\x1B[H",
"End" => b"\x1B[F",
"PageUp" => b"\x1B[5~",
"PageDown" => b"\x1B[6~",
"Delete" => b"\x1B[3~",
"Insert" => b"\x1B[2~",
"CtrlC" | "Ctrl-C" => b"\x03",
"CtrlD" | "Ctrl-D" => b"\x04",
"CtrlZ" | "Ctrl-Z" => b"\x1A",
"CtrlL" | "Ctrl-L" => b"\x0C",
_ => return Err(anyhow!("Unknown special key: {key}")),
};
debug!("PTY sending special key: {} ({:?})", key, bytes);
self.send_bytes(bytes)?;
Ok(())
}
pub fn resize(&mut self, cols: u16, rows: u16) -> Result<()> {
debug!("PTY resizing to {}x{}", cols, rows);
let new_size = PtySize { rows, cols, pixel_width: 0, pixel_height: 0 };
self.master.resize(new_size).context("Failed to resize PTY")?;
self.size = new_size;
Ok(())
}
pub fn get_size(&self) -> (u16, u16) {
(self.size.cols, self.size.rows)
}
pub fn is_alive(&mut self) -> bool {
self.child.try_wait().is_ok_and(|status| status.is_none())
}
}
pub type SharedPtyShell = Arc<Mutex<Option<PtyShell>>>;
pub fn create_shared_pty(initial_dir: &Path, restricted_mode: bool) -> Result<SharedPtyShell> {
let shell = PtyShell::new(initial_dir, restricted_mode)?;
Ok(Arc::new(Mutex::new(Some(shell))))
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn prompt_detection_is_suffix_anchored() {
assert!(PtyShell::check_prompt_complete("out\nmore\n◉ /home/x──➤ "));
assert!(PtyShell::check_prompt_complete("◉ /home/x──➤ \u{1b}[K"));
assert!(!PtyShell::check_prompt_complete("menu: ◉ start ──➤ stop\nstill running"));
assert!(!PtyShell::check_prompt_complete("◉ /home/x──➤ ls -la"));
assert!(!PtyShell::check_prompt_complete("just some output\n"));
}
#[test]
fn test_pty_shell_creation() -> Result<()> {
let temp_dir = TempDir::new()?;
let result = PtyShell::new(temp_dir.path(), false);
assert!(result.is_ok(), "Failed to create PTY shell: {:?}", result.err());
Ok(())
}
#[test]
fn test_pty_shell_echo() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut shell = PtyShell::new(temp_dir.path(), false)?;
shell.send_command("echo 'hello pty'")?;
let (output, _complete) = shell.read_output(2.0)?;
assert!(output.contains("hello pty"), "Output should contain 'hello pty': {output}");
Ok(())
}
#[test]
fn test_pty_shell_pwd() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut shell = PtyShell::new(temp_dir.path(), false)?;
shell.send_command("pwd && echo 'pwd_done'")?;
let (output, _complete) = shell.read_output(2.0)?;
assert!(output.contains("pwd_done"), "Output should contain 'pwd_done': {output}");
Ok(())
}
#[test]
fn test_pty_resize() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut shell = PtyShell::new(temp_dir.path(), false)?;
let result = shell.resize(120, 40);
assert!(result.is_ok());
let (cols, rows) = shell.get_size();
assert_eq!(cols, 120);
assert_eq!(rows, 40);
Ok(())
}
}