#![allow(dead_code)]
use std::io::{BufRead, BufReader, Read, Write};
use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use tempfile::TempDir;
pub const SOCKET_DIR_ENV: &str = "EZPN_TEST_SOCKET_DIR";
pub const POLL_INTERVAL: Duration = Duration::from_millis(25);
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
pub fn ezpn_binary() -> PathBuf {
PathBuf::from(env!("CARGO_BIN_EXE_ezpn"))
}
pub fn wait_for<F, T>(label: &str, timeout: Duration, mut f: F) -> Result<T, String>
where
F: FnMut() -> Option<T>,
{
let deadline = Instant::now() + timeout;
loop {
if let Some(v) = f() {
return Ok(v);
}
if Instant::now() >= deadline {
return Err(format!("wait_for({}) timed out after {:?}", label, timeout));
}
thread::sleep(POLL_INTERVAL);
}
}
pub fn wait_for_default<F, T>(label: &str, f: F) -> Result<T, String>
where
F: FnMut() -> Option<T>,
{
wait_for(label, DEFAULT_TIMEOUT, f)
}
pub fn wait_for_path(path: &Path, timeout: Duration) -> Result<(), String> {
wait_for(&format!("path exists: {}", path.display()), timeout, || {
if path.exists() {
Some(())
} else {
None
}
})
}
pub fn wait_for_output(
capture: &Arc<Mutex<Vec<u8>>>,
needle: &str,
timeout: Duration,
) -> Result<(), String> {
wait_for(&format!("output contains {:?}", needle), timeout, || {
let buf = capture.lock().ok()?;
if twoway_contains(&buf, needle.as_bytes()) {
Some(())
} else {
None
}
})
}
fn twoway_contains(haystack: &[u8], needle: &[u8]) -> bool {
if needle.is_empty() {
return true;
}
haystack.windows(needle.len()).any(|w| w == needle)
}
pub struct TestEnv {
pub temp: TempDir,
}
impl TestEnv {
pub fn new() -> Self {
let temp = tempfile::Builder::new()
.prefix("ezpn-it-")
.tempdir()
.expect("create tempdir");
std::fs::create_dir_all(temp.path().join("sockets")).expect("mkdir sockets");
std::fs::create_dir_all(temp.path().join("snapshots")).expect("mkdir snapshots");
Self { temp }
}
pub fn root(&self) -> &Path {
self.temp.path()
}
pub fn socket_dir(&self) -> PathBuf {
self.temp.path().join("sockets")
}
pub fn snapshot_dir(&self) -> PathBuf {
self.temp.path().join("snapshots")
}
pub fn session_socket(&self, name: &str) -> PathBuf {
self.socket_dir()
.join(format!("ezpn-session-{}.sock", name))
}
}
pub struct DaemonHandle {
pub session: String,
pub socket: PathBuf,
pub stdout: Arc<Mutex<Vec<u8>>>,
pub stderr: Arc<Mutex<Vec<u8>>>,
child: Option<Child>,
}
impl DaemonHandle {
pub fn pid(&self) -> Option<u32> {
self.child.as_ref().map(|c| c.id())
}
pub fn shutdown(&mut self) {
if let Some(mut child) = self.child.take() {
let _ = Command::new(ezpn_binary())
.arg("kill")
.arg(&self.session)
.env(SOCKET_DIR_ENV, parent_dir(&self.socket))
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
let _ = wait_for("daemon socket removed", Duration::from_secs(2), || {
if !self.socket.exists() {
Some(())
} else {
None
}
});
let _ = child.kill();
let _ = child.wait();
}
let _ = std::fs::remove_file(&self.socket);
}
}
impl Drop for DaemonHandle {
fn drop(&mut self) {
self.shutdown();
}
}
fn parent_dir(p: &Path) -> PathBuf {
p.parent()
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("."))
}
pub fn spawn_daemon(env: &TestEnv, session: &str) -> DaemonHandle {
let socket_dir = env.socket_dir();
std::fs::create_dir_all(&socket_dir).expect("mkdir socket dir");
let mut cmd = Command::new(ezpn_binary());
cmd.env(SOCKET_DIR_ENV, &socket_dir)
.env("SHELL", "/bin/sh")
.env_remove("EZPN")
.arg("--server")
.arg(session)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let mut child = cmd.spawn().expect("spawn ezpn daemon");
let stdout_buf = Arc::new(Mutex::new(Vec::new()));
let stderr_buf = Arc::new(Mutex::new(Vec::new()));
if let Some(out) = child.stdout.take() {
spawn_capture(out, Arc::clone(&stdout_buf));
}
if let Some(err) = child.stderr.take() {
spawn_capture(err, Arc::clone(&stderr_buf));
}
let socket = socket_dir.join(format!("ezpn-session-{}.sock", session));
let mut handle = DaemonHandle {
session: session.to_string(),
socket: socket.clone(),
stdout: stdout_buf,
stderr: stderr_buf,
child: Some(child),
};
if let Err(e) = wait_for_path(&socket, Duration::from_secs(5)) {
handle.shutdown();
panic!("daemon never created socket {}: {}", socket.display(), e);
}
handle
}
fn spawn_capture<R: Read + Send + 'static>(mut r: R, sink: Arc<Mutex<Vec<u8>>>) {
thread::spawn(move || {
let mut buf = [0u8; 4096];
loop {
match r.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
if let Ok(mut sink) = sink.lock() {
sink.extend_from_slice(&buf[..n]);
}
}
Err(_) => break,
}
}
});
}
pub struct AttachClient {
pub stream: UnixStream,
pub reader: BufReader<UnixStream>,
pub cols: u16,
pub rows: u16,
output: Arc<Mutex<Vec<u8>>>,
}
impl AttachClient {
pub fn output(&self) -> Arc<Mutex<Vec<u8>>> {
Arc::clone(&self.output)
}
pub fn send_event(&mut self, json: &[u8]) -> std::io::Result<()> {
write_msg(&mut self.stream, 0x01, json)
}
pub fn send_resize(&mut self, cols: u16, rows: u16) -> std::io::Result<()> {
let payload = encode_resize(cols, rows);
write_msg(&mut self.stream, 0x03, &payload)
}
pub fn send_detach(&mut self) -> std::io::Result<()> {
write_msg(&mut self.stream, 0x02, &[])
}
}
pub fn attach_client(daemon: &DaemonHandle, cols: u16, rows: u16) -> AttachClient {
let stream = UnixStream::connect(&daemon.socket).expect("connect to daemon");
stream
.set_read_timeout(Some(Duration::from_millis(500)))
.expect("set read timeout");
let mut writer = stream.try_clone().expect("clone stream");
let resize = encode_resize(cols, rows);
write_msg(&mut writer, 0x03 , &resize).expect("send initial resize");
let reader_stream = stream.try_clone().expect("clone reader");
let output = Arc::new(Mutex::new(Vec::new()));
spawn_frame_collector(reader_stream, Arc::clone(&output));
AttachClient {
stream: writer,
reader: BufReader::new(stream),
cols,
rows,
output,
}
}
pub fn type_text(client: &mut AttachClient, text: &str) -> std::io::Result<()> {
let json = format!("{{\"Paste\":{}}}", serde_json_string(text));
client.send_event(json.as_bytes())
}
fn serde_json_string(s: &str) -> String {
let mut out = String::with_capacity(s.len() + 2);
out.push('"');
for c in s.chars() {
match c {
'"' => out.push_str("\\\""),
'\\' => out.push_str("\\\\"),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
'\t' => out.push_str("\\t"),
c if (c as u32) < 0x20 => {
out.push_str(&format!("\\u{:04x}", c as u32));
}
c => out.push(c),
}
}
out.push('"');
out
}
fn write_msg<W: Write>(w: &mut W, tag: u8, payload: &[u8]) -> std::io::Result<()> {
let len = (payload.len() as u32).to_be_bytes();
w.write_all(&[tag])?;
w.write_all(&len)?;
if !payload.is_empty() {
w.write_all(payload)?;
}
w.flush()
}
fn read_msg<R: Read>(r: &mut R) -> std::io::Result<(u8, Vec<u8>)> {
let mut tag = [0u8; 1];
r.read_exact(&mut tag)?;
let mut len_buf = [0u8; 4];
r.read_exact(&mut len_buf)?;
let len = u32::from_be_bytes(len_buf) as usize;
let mut payload = vec![0u8; len];
if len > 0 {
r.read_exact(&mut payload)?;
}
Ok((tag[0], payload))
}
fn encode_resize(cols: u16, rows: u16) -> [u8; 4] {
let c = cols.to_be_bytes();
let r = rows.to_be_bytes();
[c[0], c[1], r[0], r[1]]
}
fn spawn_frame_collector(stream: UnixStream, sink: Arc<Mutex<Vec<u8>>>) {
thread::spawn(move || {
let mut reader = BufReader::new(stream);
loop {
match read_msg(&mut reader) {
Ok((0x81 , payload)) => {
if let Ok(mut sink) = sink.lock() {
sink.extend_from_slice(&payload);
}
}
Ok((0x83 , _)) => break,
Ok((0x82 , _)) => break,
Ok(_) => {}
Err(_) => break,
}
}
});
}
pub fn ls(env: &TestEnv) -> String {
let out = Command::new(ezpn_binary())
.env(SOCKET_DIR_ENV, env.socket_dir())
.arg("ls")
.output()
.expect("run ezpn ls");
String::from_utf8_lossy(&out.stdout).into_owned()
}
pub fn kill_session(env: &TestEnv, session: &str) -> std::process::Output {
Command::new(ezpn_binary())
.env(SOCKET_DIR_ENV, env.socket_dir())
.arg("kill")
.arg(session)
.output()
.expect("run ezpn kill")
}
pub fn drain_until<R: BufRead>(
reader: &mut R,
pred: impl Fn(&str) -> bool,
timeout: Duration,
) -> Result<String, String> {
let deadline = Instant::now() + timeout;
let mut acc = String::new();
while Instant::now() < deadline {
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(0) => return Err("eof before predicate matched".to_string()),
Ok(_) => {
acc.push_str(&line);
if pred(&line) {
return Ok(acc);
}
}
Err(_) => break,
}
}
Err(format!("drain_until timed out after {:?}", timeout))
}