#![allow(
dead_code,
unsafe_code,
clippy::format_push_string,
clippy::uninlined_format_args
)]
use std::collections::HashMap;
use std::ffi::CString;
use std::io::{self, Read};
use std::os::fd::FromRawFd;
use std::path::PathBuf;
use std::time::{Duration, Instant};
pub const DEFAULT_ENV: &[(&str, &str)] = &[
("TERM", "xterm-kitty"),
("TERM_PROGRAM", "kitty"),
("COLORTERM", "truecolor"),
("LANG", "C.UTF-8"),
("LC_ALL", "C.UTF-8"),
("NO_COLOR", ""),
("FORCE_COLOR", ""),
];
#[derive(Clone, Debug)]
pub struct PtyResult {
pub exit_code: Option<i32>,
pub output: Vec<u8>,
pub duration: Duration,
pub env: HashMap<String, String>,
pub command: Vec<String>,
pub timing: super::metrics::TimingMetrics,
}
impl PtyResult {
pub fn metrics(&self) -> &super::metrics::TimingMetrics {
&self.timing
}
pub fn check_timing_thresholds(
&self,
thresholds: &super::metrics::MetricThresholds,
) -> super::metrics::ThresholdCheckResult {
self.timing.check_thresholds(thresholds)
}
pub fn assert_timing_within_thresholds(&self, thresholds: &super::metrics::MetricThresholds) {
self.timing.assert_within_thresholds(thresholds);
}
pub fn timing_report_json(&self) -> Result<String, serde_json::Error> {
self.timing.to_json()
}
pub fn sequence_report(&self) -> super::analysis::SequenceReport {
super::analysis::SequenceReport::from_output(&self.output)
}
pub fn contains_sequence(&self, seq: &[u8]) -> bool {
self.output.windows(seq.len()).any(|window| window == seq)
}
pub fn contains_csi_private_set(&self, n: u16) -> bool {
let seq = format!("\x1b[?{n}h");
self.contains_sequence(seq.as_bytes())
}
pub fn contains_csi_private_reset(&self, n: u16) -> bool {
let seq = format!("\x1b[?{n}l");
self.contains_sequence(seq.as_bytes())
}
pub fn contains_osc8_hyperlink(&self) -> bool {
self.contains_sequence(b"\x1b]8;")
}
pub fn count_sequence(&self, seq: &[u8]) -> usize {
if seq.is_empty() {
return 0;
}
self.output
.windows(seq.len())
.filter(|window| *window == seq)
.count()
}
pub fn output_readable(&self) -> String {
use std::fmt::Write;
let mut result = String::new();
for &b in &self.output {
match b {
0x1b => result.push_str("ESC"),
0x07 => result.push_str("<BEL>"),
0x08 => result.push_str("<BS>"),
0x09 => result.push_str("<TAB>"),
0x0a => result.push_str("<LF>\n"),
0x0d => result.push_str("<CR>"),
b if (0x20..0x7f).contains(&b) => result.push(b as char),
b => {
let _ = write!(result, "<{b:02X}>");
}
}
}
result
}
pub fn output_hex(&self) -> String {
self.output
.iter()
.map(|b| format!("{b:02x}"))
.collect::<Vec<_>>()
.join(" ")
}
}
#[derive(Clone, Debug)]
pub struct PtyConfig {
pub binary: PathBuf,
pub args: Vec<String>,
pub env_overrides: HashMap<String, String>,
pub timeout: Duration,
pub size: (u16, u16),
}
impl Default for PtyConfig {
fn default() -> Self {
Self {
binary: PathBuf::from("target/debug/demo_showcase"),
args: Vec::new(),
env_overrides: HashMap::new(),
timeout: Duration::from_secs(60),
size: (80, 24),
}
}
}
impl PtyConfig {
pub fn demo_showcase_tour() -> Self {
Self {
binary: PathBuf::from("target/debug/demo_showcase"),
args: vec![
"--tour".to_string(),
"--exit-after-tour".to_string(),
"--max-frames".to_string(),
"600".to_string(),
"--fps".to_string(),
"30".to_string(),
],
timeout: Duration::from_secs(30),
..Default::default()
}
}
pub fn arg(mut self, arg: impl Into<String>) -> Self {
self.args.push(arg.into());
self
}
pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.env_overrides.insert(key.into(), value.into());
self
}
pub const fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub const fn size(mut self, cols: u16, rows: u16) -> Self {
self.size = (cols, rows);
self
}
}
#[cfg(unix)]
#[allow(clippy::too_many_lines)]
pub fn spawn_pty(config: &PtyConfig) -> io::Result<PtyResult> {
let start = Instant::now();
let mut env: HashMap<String, String> = DEFAULT_ENV
.iter()
.map(|(k, v)| ((*k).to_string(), (*v).to_string()))
.collect();
for (k, v) in &config.env_overrides {
env.insert(k.clone(), v.clone());
}
let command: Vec<String> = std::iter::once(config.binary.to_string_lossy().into_owned())
.chain(config.args.iter().cloned())
.collect();
let mut master_fd: libc::c_int = 0;
let mut slave_fd: libc::c_int = 0;
let ret = unsafe {
libc::openpty(
std::ptr::from_mut(&mut master_fd),
std::ptr::from_mut(&mut slave_fd),
std::ptr::null_mut(),
std::ptr::null_mut(),
std::ptr::null_mut(),
)
};
if ret != 0 {
return Err(io::Error::last_os_error());
}
let winsize = libc::winsize {
ws_row: config.size.1,
ws_col: config.size.0,
ws_xpixel: 0,
ws_ypixel: 0,
};
unsafe {
libc::ioctl(slave_fd, libc::TIOCSWINSZ, &winsize);
}
let pid = unsafe { libc::fork() };
if pid < 0 {
return Err(io::Error::last_os_error());
}
if pid == 0 {
unsafe {
libc::setsid();
libc::ioctl(slave_fd, libc::TIOCSCTTY, 0);
libc::dup2(slave_fd, 0);
libc::dup2(slave_fd, 1);
libc::dup2(slave_fd, 2);
if slave_fd > 2 {
libc::close(slave_fd);
}
libc::close(master_fd);
for (key, value) in &env {
let key_c = CString::new(key.as_str()).unwrap();
let value_c = CString::new(value.as_str()).unwrap();
libc::setenv(key_c.as_ptr(), value_c.as_ptr(), 1);
}
let binary_c = CString::new(config.binary.to_string_lossy().as_ref()).unwrap();
let mut args_c: Vec<CString> = vec![binary_c.clone()];
for arg in &config.args {
args_c.push(CString::new(arg.as_str()).unwrap());
}
let args_ptrs: Vec<*const libc::c_char> = args_c
.iter()
.map(|s| s.as_ptr())
.chain(std::iter::once(std::ptr::null()))
.collect();
libc::execvp(binary_c.as_ptr(), args_ptrs.as_ptr());
libc::_exit(127);
}
}
unsafe {
libc::close(slave_fd);
}
unsafe {
let flags = libc::fcntl(master_fd, libc::F_GETFL);
libc::fcntl(master_fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
}
let mut output = Vec::new();
let mut buf = [0u8; 4096];
let deadline = Instant::now() + config.timeout;
let mut master = unsafe { std::fs::File::from_raw_fd(master_fd) };
loop {
if Instant::now() > deadline {
unsafe {
libc::kill(pid, libc::SIGKILL);
}
break;
}
let mut status: libc::c_int = 0;
let wait_result =
unsafe { libc::waitpid(pid, std::ptr::from_mut(&mut status), libc::WNOHANG) };
if wait_result == pid {
while let Ok(n) = master.read(&mut buf) {
if n == 0 {
break;
}
output.extend_from_slice(&buf[..n]);
}
let exit_code = if libc::WIFEXITED(status) {
Some(libc::WEXITSTATUS(status))
} else {
None
};
let duration = start.elapsed();
let timing = compute_timing_metrics(&output, duration);
return Ok(PtyResult {
exit_code,
output,
duration,
env,
command,
timing,
});
}
match master.read(&mut buf) {
Ok(0) => {
std::thread::sleep(Duration::from_millis(10));
}
Ok(n) => {
output.extend_from_slice(&buf[..n]);
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
std::thread::sleep(Duration::from_millis(10));
}
Err(e) => {
if e.raw_os_error() == Some(libc::EIO) {
std::thread::sleep(Duration::from_millis(50));
} else {
return Err(e);
}
}
}
}
let mut status: libc::c_int = 0;
unsafe {
libc::waitpid(pid, std::ptr::from_mut(&mut status), 0);
}
let duration = start.elapsed();
let timing = compute_timing_metrics(&output, duration);
Ok(PtyResult {
exit_code: None,
output,
duration,
env,
command,
timing,
})
}
fn compute_timing_metrics(
output: &[u8],
total_duration: Duration,
) -> super::metrics::TimingMetrics {
let mut metrics = super::metrics::TimingMetrics::from_duration(total_duration);
metrics.estimate_startup_from_output(output, total_duration);
metrics.count_frames_from_output(output);
metrics.calculate_throughput(output.len());
metrics.extract_tour_steps(output);
metrics
}
pub mod sequences {
pub const ALT_SCREEN_ENTER: &[u8] = b"\x1b[?1049h";
pub const ALT_SCREEN_LEAVE: &[u8] = b"\x1b[?1049l";
pub const CURSOR_HIDE: &[u8] = b"\x1b[?25l";
pub const CURSOR_SHOW: &[u8] = b"\x1b[?25h";
pub const MOUSE_X10_ENABLE: &[u8] = b"\x1b[?9h";
pub const MOUSE_BUTTON_ENABLE: &[u8] = b"\x1b[?1000h";
pub const MOUSE_MOTION_ENABLE: &[u8] = b"\x1b[?1002h";
pub const MOUSE_ALL_ENABLE: &[u8] = b"\x1b[?1003h";
pub const MOUSE_SGR_ENABLE: &[u8] = b"\x1b[?1006h";
pub const MOUSE_BUTTON_DISABLE: &[u8] = b"\x1b[?1000l";
pub const MOUSE_MOTION_DISABLE: &[u8] = b"\x1b[?1002l";
pub const MOUSE_SGR_DISABLE: &[u8] = b"\x1b[?1006l";
pub const SYNC_OUTPUT_BEGIN: &[u8] = b"\x1b[?2026h";
pub const SYNC_OUTPUT_END: &[u8] = b"\x1b[?2026l";
pub const BRACKETED_PASTE_ENABLE: &[u8] = b"\x1b[?2004h";
pub const BRACKETED_PASTE_DISABLE: &[u8] = b"\x1b[?2004l";
pub const FOCUS_ENABLE: &[u8] = b"\x1b[?1004h";
pub const FOCUS_DISABLE: &[u8] = b"\x1b[?1004l";
pub const OSC8_PREFIX: &[u8] = b"\x1b]8;";
}
pub fn log_pty_result(result: &PtyResult, test_name: &str) {
use super::analysis::SequenceReport;
use super::artifacts::{ArtifactManager, SequenceAnalysis, output_to_readable};
eprintln!("=== PTY Test: {test_name} ===");
eprintln!("Command: {:?}", result.command);
eprintln!("Exit code: {:?}", result.exit_code);
eprintln!("Duration: {:?}", result.duration);
eprintln!("Output bytes: {}", result.output.len());
eprintln!("Environment:");
for (k, v) in &result.env {
if !v.is_empty() {
eprintln!(" {k}={v}");
}
}
let artifacts = ArtifactManager::new(test_name);
if artifacts.is_enabled() {
eprintln!(
"Artifacts directory: {}",
artifacts.artifact_dir().display()
);
artifacts.save_raw_output(&result.output);
let readable = output_to_readable(&result.output);
artifacts.save_decoded_output(&readable);
let analysis = SequenceAnalysis::from_output(&result.output);
artifacts.save_sequence_analysis(&analysis);
let report = SequenceReport::from_output(&result.output);
if let Ok(json) = report.to_json() {
artifacts.save_text("sequence_report.json", &json);
}
artifacts.save_text("sequence_report.txt", &report.to_human_readable());
let validation = report.validate_pairs();
artifacts.save_text("pair_validation.txt", &validation.to_human_readable());
let info = format!(
"Command: {:?}\nExit code: {:?}\nDuration: {:?}\nOutput bytes: {}\n\nEnvironment:\n{}",
result.command,
result.exit_code,
result.duration,
result.output.len(),
result
.env
.iter()
.map(|(k, v)| format!(" {k}={v}"))
.collect::<Vec<_>>()
.join("\n")
);
artifacts.save_text("info.txt", &info);
artifacts.save_text("output.hex", &result.output_hex());
if let Ok(timing_json) = result.timing.to_json() {
artifacts.save_text("timing_metrics.json", &timing_json);
}
let timing_summary = format_timing_summary(&result.timing);
artifacts.save_text("timing_summary.txt", &timing_summary);
}
}
fn format_timing_summary(metrics: &super::metrics::TimingMetrics) -> String {
let mut summary = String::new();
summary.push_str("=== Timing Metrics Summary ===\n\n");
summary.push_str(&format!("Total Runtime: {:?}\n", metrics.total_runtime));
if let Some(startup) = metrics.startup_time {
summary.push_str(&format!("Startup Time: {:?}\n", startup));
}
if let Some(first_frame) = metrics.first_frame_time {
summary.push_str(&format!("First Frame: {:?}\n", first_frame));
}
summary.push_str(&format!("Frame Count: {}\n", metrics.frame_count));
if let Some(avg_frame) = metrics.avg_frame_time {
let fps = 1.0 / avg_frame.as_secs_f64();
summary.push_str(&format!(
"Avg Frame Time: {:?} ({:.1} fps)\n",
avg_frame, fps
));
}
if let Some(throughput) = metrics.output_throughput_bps {
summary.push_str(&format!("Output Throughput: {:.0} bytes/sec\n", throughput));
}
if !metrics.tour_step_durations.is_empty() {
summary.push_str("\nTour Step Durations:\n");
for step in &metrics.tour_step_durations {
summary.push_str(&format!(
" Step {}: {:?} ({} bytes)\n",
step.step_number, step.duration, step.output_bytes
));
}
}
summary
}