use std::io::{Read, Write};
use std::time::{Duration, Instant};
use cmdq::osc133::{Detector, Event};
use cmdq::queue::Queue;
use portable_pty::{CommandBuilder, PtySize, native_pty_system};
const SHELL_INTEGRATION_BASH: &str = include_str!("../shell/integration.bash");
fn write_temp_files(suffix: &str) -> (std::path::PathBuf, std::path::PathBuf) {
let dir = std::env::temp_dir().join(format!("cmdq-e2e-{}-{}", std::process::id(), suffix));
std::fs::create_dir_all(&dir).unwrap();
let int_path = dir.join("integration.bash");
std::fs::write(&int_path, SHELL_INTEGRATION_BASH).unwrap();
let rcfile = dir.join("bashrc");
std::fs::write(
&rcfile,
format!("PS1='$ '\nsource \"{}\"\n", int_path.display()),
)
.unwrap();
(int_path, rcfile)
}
fn spawn_bash(
rcfile: &std::path::Path,
) -> (
Box<dyn Read + Send>,
Box<dyn Write + Send>,
Box<dyn portable_pty::Child + Send + Sync>,
) {
let pty_system = native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
})
.unwrap();
let mut cmd = CommandBuilder::new("/bin/bash");
cmd.arg("--noprofile");
cmd.arg("--rcfile");
cmd.arg(rcfile);
cmd.arg("-i");
cmd.env("CMDQ_ACTIVE", "1");
cmd.env("TERM", "xterm-256color");
let child = pair.slave.spawn_command(cmd).unwrap();
drop(pair.slave);
let reader = pair.master.try_clone_reader().unwrap();
let writer = pair.master.take_writer().unwrap();
(reader, writer, child)
}
#[test]
fn dispatches_queued_commands_after_running_one() {
if !std::path::Path::new("/bin/bash").exists() {
return;
}
let (_int, rcfile) = write_temp_files("dispatch");
let (reader, mut writer, mut child) = spawn_bash(&rcfile);
let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
{
let mut reader = reader;
std::thread::spawn(move || {
let mut buf = [0u8; 4096];
while let Ok(n) = reader.read(&mut buf) {
if n == 0 || tx.send(buf[..n].to_vec()).is_err() {
break;
}
}
});
}
let mut detector = Detector::new();
let mut queue = Queue::new();
queue.push("echo QUEUED_FIRST", false);
queue.push("echo QUEUED_SECOND", false);
let mut accum: Vec<u8> = Vec::new();
let deadline = Instant::now() + Duration::from_secs(15);
let mut wrote_initial = false;
while Instant::now() < deadline {
if let Ok(b) = rx.recv_timeout(Duration::from_millis(200)) {
accum.extend_from_slice(&b);
for ev in detector.feed(&b) {
match ev {
Event::PromptStart if !wrote_initial => {
wrote_initial = true;
writer.write_all(b"sleep 0.2; echo INITIAL_DONE\n").unwrap();
writer.flush().unwrap();
}
Event::CommandEnd { exit_code } => {
if !queue.is_empty()
&& let Some(item) = queue.pop_next_eligible(exit_code)
{
writer.write_all(item.command.as_bytes()).unwrap();
writer.write_all(b"\n").unwrap();
writer.flush().unwrap();
}
}
_ => {}
}
}
}
if accum
.windows(b"QUEUED_SECOND".len())
.filter(|w| *w == b"QUEUED_SECOND")
.count()
> 0
&& accum
.windows(b"QUEUED_FIRST".len())
.filter(|w| *w == b"QUEUED_FIRST")
.count()
> 0
{
break;
}
}
writer.write_all(b"exit\n").unwrap();
let _ = child.wait();
let s = String::from_utf8_lossy(&accum);
assert!(s.contains("INITIAL_DONE"), "missing initial command output");
assert!(
s.contains("QUEUED_FIRST"),
"first queued command did not run"
);
assert!(
s.contains("QUEUED_SECOND"),
"second queued command did not run"
);
assert!(queue.is_empty(), "queue not drained");
let pos_initial = s.find("INITIAL_DONE").unwrap();
let echoes_only = |needle: &str, s: &str| -> Option<usize> {
let mut indices: Vec<usize> = s.match_indices(needle).map(|(i, _)| i).collect();
if indices.len() >= 2 {
Some(indices.remove(1))
} else {
indices.first().copied()
}
};
let pos_first = echoes_only("QUEUED_FIRST", &s).unwrap();
let pos_second = echoes_only("QUEUED_SECOND", &s).unwrap();
assert!(
pos_initial < pos_first,
"INITIAL should come before QUEUED_FIRST"
);
assert!(
pos_first < pos_second,
"QUEUED_FIRST should come before QUEUED_SECOND"
);
}
#[test]
fn conditional_skips_after_failure() {
if !std::path::Path::new("/bin/bash").exists() {
return;
}
let (_int, rcfile) = write_temp_files("cond");
let (reader, mut writer, mut child) = spawn_bash(&rcfile);
let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
{
let mut reader = reader;
std::thread::spawn(move || {
let mut buf = [0u8; 4096];
while let Ok(n) = reader.read(&mut buf) {
if n == 0 || tx.send(buf[..n].to_vec()).is_err() {
break;
}
}
});
}
let mut detector = Detector::new();
let mut queue = Queue::new();
queue.push("echo SHOULD_NOT_RUN", true);
queue.push("echo ALWAYS_RUNS", false);
let mut accum: Vec<u8> = Vec::new();
let deadline = Instant::now() + Duration::from_secs(15);
let mut wrote_initial = false;
while Instant::now() < deadline {
if let Ok(b) = rx.recv_timeout(Duration::from_millis(200)) {
accum.extend_from_slice(&b);
for ev in detector.feed(&b) {
match ev {
Event::PromptStart if !wrote_initial => {
wrote_initial = true;
writer.write_all(b"false\n").unwrap();
writer.flush().unwrap();
}
Event::CommandEnd { exit_code } => {
if !queue.is_empty()
&& let Some(item) = queue.pop_next_eligible(exit_code)
{
writer.write_all(item.command.as_bytes()).unwrap();
writer.write_all(b"\n").unwrap();
writer.flush().unwrap();
}
}
_ => {}
}
}
}
let s = String::from_utf8_lossy(&accum);
if s.contains("ALWAYS_RUNS\r\n") || s.matches("ALWAYS_RUNS").count() >= 2 {
break;
}
}
writer.write_all(b"exit\n").unwrap();
let _ = child.wait();
let s = String::from_utf8_lossy(&accum);
assert!(
s.contains("ALWAYS_RUNS"),
"ALWAYS_RUNS missing in output: {}",
s
);
assert!(
!s.contains("SHOULD_NOT_RUN"),
"conditional command was not skipped"
);
}