use std::path::PathBuf;
use std::process::Command;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use expectrl::{session::OsSession, Eof, Expect, Regex, Session};
const TIMEOUT: Duration = Duration::from_secs(15);
static TEMP_DIR_COUNTER: AtomicU64 = AtomicU64::new(0);
struct TempDir {
path: PathBuf,
}
impl TempDir {
fn new() -> Self {
let mut path = std::env::temp_dir();
let id = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let seq = TEMP_DIR_COUNTER.fetch_add(1, Ordering::Relaxed);
path.push(format!("yosh-pty-test-{}-{}", id, seq));
std::fs::create_dir_all(&path).unwrap();
TempDir { path }
}
fn path(&self) -> &PathBuf {
&self.path
}
}
impl Drop for TempDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}
fn spawn_yosh() -> (OsSession, TempDir) {
let bin = env!("CARGO_BIN_EXE_yosh");
let tmpdir = TempDir::new();
let mut cmd = Command::new(bin);
cmd.env("TERM", "dumb");
cmd.env("HOME", tmpdir.path());
let mut session = Session::spawn(cmd).expect("failed to spawn yosh");
session.set_expect_timeout(Some(TIMEOUT));
(session, tmpdir)
}
fn wait_for_prompt(session: &mut OsSession) {
session.expect("$ ").expect("prompt not found");
wait_for_raw_mode();
}
fn wait_for_ps2(session: &mut OsSession) {
session.expect("> ").expect("PS2 prompt not found");
wait_for_raw_mode();
}
fn wait_for_raw_mode() {
std::thread::sleep(Duration::from_millis(50));
}
fn expect_output(session: &mut OsSession, text: &str, msg: &str) {
let pattern = format!("\r?\n{}", text);
session
.expect(Regex(&pattern))
.unwrap_or_else(|e| panic!("{}: {}", msg, e));
}
fn exit_shell(session: &mut OsSession) {
session.send("\x04").unwrap();
let _ = session.expect(Eof);
}
#[test]
fn test_pty_echo_command() {
let (mut s, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("echo hello\r").unwrap();
expect_output(&mut s, "hello", "echo output not found");
wait_for_prompt(&mut s);
exit_shell(&mut s);
}
#[test]
fn test_pty_ctrl_d_exits() {
let (mut s, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("\x04").unwrap();
s.expect(Eof).expect("shell did not exit on Ctrl+D");
}
#[test]
fn test_pty_ctrl_c_interrupts_input() {
let (mut s, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("partial input").unwrap();
s.send("\x03").unwrap();
wait_for_prompt(&mut s);
s.send("echo ok\r").unwrap();
expect_output(&mut s, "ok", "command after Ctrl+C failed");
wait_for_prompt(&mut s);
exit_shell(&mut s);
}
#[test]
fn test_pty_history_up_re_executes() {
let (mut s, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("echo first_cmd\r").unwrap();
expect_output(&mut s, "first_cmd", "first command output not found");
wait_for_prompt(&mut s);
s.send("\x1b[A").unwrap(); s.send("\r").unwrap();
expect_output(&mut s, "first_cmd", "history re-execution failed");
wait_for_prompt(&mut s);
exit_shell(&mut s);
}
#[test]
fn test_pty_backspace_editing() {
let (mut s, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("echoo").unwrap();
s.send("\x7f").unwrap(); s.send(" works\r").unwrap();
expect_output(&mut s, "works", "line editing with backspace failed");
wait_for_prompt(&mut s);
exit_shell(&mut s);
}
#[test]
fn test_pty_ps2_continuation() {
let (mut s, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("if true; then\r").unwrap();
wait_for_ps2(&mut s);
s.send("echo continued\r").unwrap();
wait_for_ps2(&mut s);
s.send("fi\r").unwrap();
expect_output(&mut s, "continued", "if-then-fi output not found");
wait_for_prompt(&mut s);
exit_shell(&mut s);
}
#[test]
fn test_pty_ctrl_r_history_search() {
let (mut s, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("echo alpha\r").unwrap();
expect_output(&mut s, "alpha", "first echo alpha failed");
wait_for_prompt(&mut s);
s.send("echo beta\r").unwrap();
expect_output(&mut s, "beta", "echo beta failed");
wait_for_prompt(&mut s);
s.send("\x12").unwrap(); s.expect("2/2 > ").expect("Ctrl+R search UI did not appear");
wait_for_raw_mode();
s.send("echo alpha").unwrap();
s.expect("1/1 > ").expect("search query did not filter to unique match");
s.send("\r").unwrap(); wait_for_raw_mode();
s.send("\r").unwrap(); expect_output(&mut s, "alpha", "Ctrl+R history search failed");
wait_for_prompt(&mut s);
exit_shell(&mut s);
}
#[test]
fn test_pty_autosuggest_accept_with_right_arrow() {
let (mut s, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("echo autosuggest_test_value\r").unwrap();
expect_output(&mut s, "autosuggest_test_value", "initial echo failed");
wait_for_prompt(&mut s);
s.send("echo auto").unwrap();
std::thread::sleep(Duration::from_millis(50));
s.send("\x1b[C").unwrap(); std::thread::sleep(Duration::from_millis(50));
s.send("\r").unwrap();
expect_output(&mut s, "autosuggest_test_value", "autosuggest acceptance failed");
wait_for_prompt(&mut s);
exit_shell(&mut s);
}
#[test]
fn test_pty_tab_completion() {
let (mut s, tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
let test_file = tmpdir.path().join("yosh_tab_test_unique.txt");
std::fs::write(&test_file, "hello").unwrap();
s.send("cd\r").unwrap();
wait_for_prompt(&mut s);
s.send("echo yosh_tab").unwrap();
std::thread::sleep(Duration::from_millis(50));
s.send("\t").unwrap(); std::thread::sleep(Duration::from_millis(100));
s.send("\r").unwrap();
expect_output(
&mut s,
"yosh_tab_test_unique.txt",
"Tab completion failed to complete and execute",
);
wait_for_prompt(&mut s);
exit_shell(&mut s);
}
#[test]
fn test_pty_command_completion() {
let (mut s, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("ech").unwrap();
std::thread::sleep(Duration::from_millis(50));
s.send("\t").unwrap();
std::thread::sleep(Duration::from_millis(100));
s.send(" hello\r").unwrap();
expect_output(&mut s, "hello", "Command completion for 'echo' failed");
wait_for_prompt(&mut s);
exit_shell(&mut s);
}
#[test]
fn test_pty_command_completion_after_pipe() {
let (mut s, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("echo hello | ca").unwrap();
std::thread::sleep(Duration::from_millis(50));
s.send("\t").unwrap();
std::thread::sleep(Duration::from_millis(100));
s.send("t\r").unwrap();
expect_output(
&mut s,
"hello",
"Command completion after pipe failed",
);
wait_for_prompt(&mut s);
exit_shell(&mut s);
}
#[test]
fn test_pty_path_completion_in_argument_position() {
let (mut s, tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
let test_file = tmpdir.path().join("yosh_argcomp_unique.txt");
std::fs::write(&test_file, "content").unwrap();
s.send("cd\r").unwrap();
wait_for_prompt(&mut s);
s.send("cat yosh_argcomp").unwrap();
std::thread::sleep(Duration::from_millis(50));
s.send("\t").unwrap();
std::thread::sleep(Duration::from_millis(100));
s.send("\r").unwrap();
expect_output(
&mut s,
"content",
"Path completion in argument position failed",
);
wait_for_prompt(&mut s);
exit_shell(&mut s);
}
#[test]
fn test_pty_syntax_highlight_keyword() {
let (mut s, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("if").unwrap();
std::thread::sleep(Duration::from_millis(100));
s.send("\x03").unwrap(); wait_for_prompt(&mut s);
exit_shell(&mut s);
}
#[test]
fn test_pty_syntax_highlight_valid_command() {
let (mut s, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("echo hi\r").unwrap();
expect_output(&mut s, "hi", "echo with highlighting failed");
wait_for_prompt(&mut s);
exit_shell(&mut s);
}
#[test]
fn test_pty_syntax_highlight_pipe() {
let (mut s, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("echo pipe_ok | cat\r").unwrap();
expect_output(&mut s, "pipe_ok", "pipe with highlighting failed");
wait_for_prompt(&mut s);
exit_shell(&mut s);
}
#[test]
fn ansi_colored_prompt() {
let (mut session, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut session);
session.send("PS1=$(printf '\\033[32m$ \\033[0m')\r").unwrap();
wait_for_raw_mode();
session.expect("$").expect("colored prompt not found");
wait_for_raw_mode();
session.send("echo hello\r").unwrap();
expect_output(&mut session, "hello", "echo after colored prompt");
exit_shell(&mut session);
}
#[test]
fn multi_line_prompt() {
let (mut session, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut session);
session.send("PS1=$(printf 'info line\\n> ')\r").unwrap();
wait_for_raw_mode();
session.expect(">").expect("multi-line prompt char not found");
wait_for_raw_mode();
session.send("echo works\r").unwrap();
expect_output(&mut session, "works", "echo after multi-line prompt");
exit_shell(&mut session);
}
#[test]
fn test_pty_sighup_saves_history() {
let (mut s, tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("echo sighup_test_marker\r").unwrap();
expect_output(&mut s, "sighup_test_marker", "echo output");
wait_for_prompt(&mut s);
let pid = s.get_process().pid();
unsafe {
libc::kill(pid.as_raw(), libc::SIGHUP);
}
let _ = s.expect(Eof);
let histfile = tmpdir.path().join(".yosh_history");
let contents = std::fs::read_to_string(&histfile)
.expect("history file should exist after SIGHUP");
assert!(
contents.contains("echo sighup_test_marker"),
"history file should contain the command, got: {:?}",
contents
);
}
#[test]
fn test_pty_set_plus_m_disables_job_control() {
let (mut s, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("set +m\r").unwrap();
wait_for_prompt(&mut s);
s.send("fg\r").unwrap();
s.expect("no job control")
.expect("fg should report 'no job control' after set +m");
wait_for_prompt(&mut s);
exit_shell(&mut s);
}
#[test]
fn test_pty_set_minus_m_reenables_job_control() {
let (mut s, _tmpdir) = spawn_yosh();
wait_for_prompt(&mut s);
s.send("set +m\r").unwrap();
wait_for_prompt(&mut s);
s.send("set -m\r").unwrap();
wait_for_prompt(&mut s);
s.send("sleep 100\r").unwrap();
std::thread::sleep(Duration::from_millis(200));
s.send("\x1a").unwrap();
wait_for_prompt(&mut s);
s.send("jobs\r").unwrap();
s.expect("Stopped")
.expect("jobs should show Stopped after Ctrl+Z suspend");
wait_for_prompt(&mut s);
s.send("kill %1\r").unwrap();
wait_for_prompt(&mut s);
exit_shell(&mut s);
}