use std::collections::VecDeque;
use std::path::PathBuf;
use std::process::Command;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use running_process::pty::NativePtyProcess;
pub fn mitm_byte_exact_supported() -> bool {
#[cfg(not(windows))]
{
true
}
#[cfg(windows)]
{
if std::env::var_os("RUNNING_PROCESS_FORCE_MITM_WINDOWS").is_some() {
return true;
}
let _ = windows_build_number();
false
}
}
pub const STARTUP_HANDSHAKE_BYTE: u8 = b'R';
fn spawn_handshake_failure_diagnostic(drained: &[u8], handshake: Duration) -> String {
let backend = backend_description();
let host = host_description();
let drained_hex = drained
.iter()
.map(|b| format!("{b:02x}"))
.collect::<Vec<_>>()
.join(" ");
let preview_ascii = drained
.iter()
.map(|&b| {
if (0x20..0x7f).contains(&b) {
b as char
} else {
'.'
}
})
.collect::<String>();
format!(
"testbin-stdin-echoer never emitted startup ACK in {handshake:?}\n \
host: {host}\n \
backend: {backend}\n \
drained: {} bytes [{drained_hex}]\n \
ascii: \"{preview_ascii}\"\n \
hint: if drained matches `1b 5b 36 6e`, ConPTY is in non-passthrough mode and \
synthesized a DSR cursor query — see #452 for the Windows Server 2025 \
investigation. Re-run with RUNNING_PROCESS_FORCE_MITM_WINDOWS=1 to drive \
this path manually.",
drained.len(),
)
}
#[cfg(windows)]
fn backend_description() -> String {
format!("{:?}", running_process::pty::current_backend_kind())
}
#[cfg(not(windows))]
fn backend_description() -> String {
"n/a (POSIX PTY)".to_string()
}
pub fn skip_unless_mitm_supported() -> bool {
if mitm_byte_exact_supported() {
return false;
}
eprintln!(
"[SKIP] byte-exact MITM substrate is currently disabled on Windows pending \
investigation of Server 2025's ConPTY behavior (#448 / #449 follow-up). \
Linux/macOS CI covers the substrate guarantee. Current host: {}",
host_description()
);
true
}
#[cfg(windows)]
fn host_description() -> String {
format!(
"Windows build {}, sidecar present = {}",
windows_build_number().unwrap_or(0),
sidecar_conpty_dll_present()
)
}
#[cfg(not(windows))]
fn host_description() -> String {
"POSIX".to_string()
}
#[cfg(windows)]
fn windows_build_number() -> Option<u32> {
let output = Command::new("cmd").args(["/c", "ver"]).output().ok()?;
let stdout = String::from_utf8_lossy(&output.stdout);
let v = stdout.split('.').nth(2)?;
v.parse::<u32>().ok()
}
#[cfg(windows)]
fn sidecar_conpty_dll_present() -> bool {
if let Ok(exe) = std::env::current_exe() {
if let Some(parent) = exe.parent() {
if parent.join("conpty.dll").is_file() {
return true;
}
}
}
let Some(cache_root) = dirs::cache_dir() else {
return false;
};
let arch = if cfg!(target_arch = "x86_64") {
"x64"
} else if cfg!(target_arch = "aarch64") {
"arm64"
} else if cfg!(target_arch = "x86") {
"x86"
} else if cfg!(target_arch = "arm") {
"arm"
} else {
return false;
};
let p = cache_root
.join("running-process")
.join("conpty")
.join(env!("CARGO_PKG_VERSION"))
.join(arch)
.join("conpty.dll");
p.is_file()
}
pub fn testbin_path(name: &str) -> PathBuf {
let output = Command::new(env!("CARGO"))
.args([
"build",
"-p",
"testbins",
"--bin",
name,
"--message-format=json",
])
.stderr(std::process::Stdio::inherit())
.output()
.unwrap_or_else(|e| panic!("cargo build for testbin {name} failed: {e}"));
assert!(
output.status.success(),
"cargo build -p testbins --bin {name} returned non-zero status"
);
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
if !line.contains("\"compiler-artifact\"") || !line.contains(name) {
continue;
}
let Ok(v) = serde_json::from_str::<serde_json::Value>(line) else {
continue;
};
if v["reason"] != "compiler-artifact" {
continue;
}
let Some(kinds) = v["target"]["kind"].as_array() else {
continue;
};
if !kinds.iter().any(|k| k == "bin") {
continue;
}
if let Some(exe) = v["executable"].as_str() {
let p = PathBuf::from(exe);
let deadline = Instant::now() + Duration::from_secs(5);
while !p.exists() && Instant::now() < deadline {
std::thread::sleep(Duration::from_millis(50));
}
return p;
}
}
panic!("could not locate compiler-artifact for testbin {name}");
}
pub struct EchoerSession {
process: NativePtyProcess,
prefetched: Mutex<VecDeque<u8>>,
}
impl EchoerSession {
pub fn spawn(args: &[&str]) -> Self {
let bin = testbin_path("testbin-stdin-echoer");
let mut argv: Vec<String> = Vec::with_capacity(1 + args.len());
argv.push(bin.to_string_lossy().into_owned());
for a in args {
argv.push((*a).to_string());
}
let process = NativePtyProcess::new(argv, None, None, 24, 80, None)
.expect("construct NativePtyProcess");
process.start_impl().expect("start NativePtyProcess");
let session = Self {
process,
prefetched: Mutex::new(VecDeque::new()),
};
let handshake = Duration::from_secs(40);
let drained = session.drain_raw_until_byte(STARTUP_HANDSHAKE_BYTE, handshake);
let ack_pos = drained
.iter()
.position(|&b| b == STARTUP_HANDSHAKE_BYTE)
.unwrap_or_else(|| {
panic!(
"{}",
spawn_handshake_failure_diagnostic(&drained, handshake)
)
});
if ack_pos + 1 < drained.len() {
session
.prefetched
.lock()
.expect("prefetched mutex poisoned")
.extend(drained[ack_pos + 1..].iter().copied());
}
session
}
fn drain_raw_until_byte(&self, target: u8, timeout: Duration) -> Vec<u8> {
let mut out = Vec::new();
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
let remaining = deadline.saturating_duration_since(Instant::now());
let slice = remaining.min(Duration::from_millis(100));
match self.process.read_chunk_impl(Some(slice.as_secs_f64())) {
Ok(Some(bytes)) => {
out.extend_from_slice(&bytes);
if out.contains(&target) {
return out;
}
}
Ok(None) => {}
Err(e) => panic!("read_chunk_impl error: {e:?}"),
}
}
out
}
fn drain_prefetched(&self, max_bytes: usize) -> Vec<u8> {
let mut guard = self.prefetched.lock().expect("prefetched mutex poisoned");
let take = max_bytes.min(guard.len());
let mut out = Vec::with_capacity(take);
for _ in 0..take {
if let Some(b) = guard.pop_front() {
out.push(b);
}
}
out
}
pub fn write_stdin(&self, data: &[u8]) {
self.process
.write_impl(data, false)
.unwrap_or_else(|e| panic!("write_stdin({} bytes) failed: {e:?}", data.len()));
}
pub fn drain_for(&self, timeout: Duration, target_len: Option<usize>) -> Vec<u8> {
let mut out = self.drain_prefetched(target_len.unwrap_or(usize::MAX));
if let Some(target) = target_len {
if out.len() >= target {
return out;
}
}
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
let remaining = deadline.saturating_duration_since(Instant::now());
let slice = remaining.min(Duration::from_millis(100));
let chunk = self
.process
.read_chunk_impl(Some(slice.as_secs_f64()))
.expect("read_chunk_impl");
match chunk {
Some(bytes) => {
out.extend_from_slice(&bytes);
if let Some(target) = target_len {
if out.len() >= target {
return out;
}
}
}
None => {
if Instant::now() >= deadline {
break;
}
}
}
}
out
}
pub fn assert_received_exact(&self, expected: &[u8], timeout: Duration) {
let got = self.drain_for(timeout, Some(expected.len()));
if got.as_slice() != expected {
panic!(
"byte-exact mismatch:\n expected ({} bytes): {}\n got ({} bytes): {}",
expected.len(),
hex(expected),
got.len(),
hex(&got),
);
}
}
pub fn drain_until_contains(&self, needle: &[u8], timeout: Duration) -> Vec<u8> {
let mut out = self.drain_prefetched(usize::MAX);
if !out.is_empty() && out.windows(needle.len()).any(|w| w == needle) {
return out;
}
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
let remaining = deadline.saturating_duration_since(Instant::now());
let slice = remaining.min(Duration::from_millis(100));
match self.process.read_chunk_impl(Some(slice.as_secs_f64())) {
Ok(Some(bytes)) => {
out.extend_from_slice(&bytes);
if out.windows(needle.len()).any(|w| w == needle) {
return out;
}
}
Ok(None) => {}
Err(e) => panic!("read_chunk_impl error: {e:?}"),
}
}
out
}
pub fn close_stdin(&self) {
let _ = self.process.close_impl();
}
pub fn process(&self) -> &NativePtyProcess {
&self.process
}
}
impl Drop for EchoerSession {
fn drop(&mut self) {
let _ = self.process.kill_impl();
let _ = self.process.wait_impl(Some(1.5));
}
}
fn hex(bytes: &[u8]) -> String {
let mut s = String::with_capacity(bytes.len() * 3);
for (i, b) in bytes.iter().enumerate() {
if i > 0 && i % 16 == 0 {
s.push('\n');
s.push_str(" ");
}
s.push_str(&format!("{b:02x} "));
}
s
}