use std::collections::VecDeque;
use std::io::{self, Read, Write};
use std::sync::{Arc, Mutex};
#[derive(Debug, Default)]
struct SharedBuffer {
data: VecDeque<u8>,
closed: bool,
}
#[derive(Debug)]
pub struct FakePty {
read_buf: Arc<Mutex<SharedBuffer>>,
write_buf: Arc<Mutex<SharedBuffer>>,
}
impl FakePty {
#[must_use]
pub fn new() -> Self {
Self {
read_buf: Arc::new(Mutex::new(SharedBuffer::default())),
write_buf: Arc::new(Mutex::new(SharedBuffer::default())),
}
}
pub fn queue_input(&self, data: &[u8]) {
let mut buf = self
.read_buf
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
buf.data.extend(data);
}
pub fn queue_input_str(&self, s: &str) {
self.queue_input(s.as_bytes());
}
#[must_use]
pub fn take_output(&self) -> Vec<u8> {
let mut buf = self
.write_buf
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
buf.data.drain(..).collect()
}
#[must_use]
pub fn take_output_str(&self) -> String {
String::from_utf8_lossy(&self.take_output()).into_owned()
}
#[must_use]
pub fn has_output(&self) -> bool {
!self
.write_buf
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.data
.is_empty()
}
#[must_use]
pub fn has_input(&self) -> bool {
!self
.read_buf
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.data
.is_empty()
}
pub fn close(&self) {
self.read_buf
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.closed = true;
self.write_buf
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.closed = true;
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.read_buf
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.closed
}
pub fn send_output(&self, data: &[u8]) {
self.queue_input(data);
}
pub fn send_line(&self, line: &str) {
self.queue_input_str(line);
self.queue_input(b"\r\n");
}
pub fn send_prompt(&self, prompt: &str) {
self.queue_input_str(prompt);
}
#[must_use]
pub fn get_typed(&self) -> String {
self.take_output_str()
}
}
impl Default for FakePty {
fn default() -> Self {
Self::new()
}
}
#[allow(clippy::significant_drop_tightening)]
impl Read for FakePty {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut shared = self
.read_buf
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if shared.closed && shared.data.is_empty() {
return Ok(0);
}
if shared.data.is_empty() {
return Err(io::Error::new(
io::ErrorKind::WouldBlock,
"No data available",
));
}
let len = buf.len().min(shared.data.len());
for (i, byte) in shared.data.drain(..len).enumerate() {
buf[i] = byte;
}
Ok(len)
}
}
#[allow(clippy::significant_drop_tightening)]
impl Write for FakePty {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut shared = self
.write_buf
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if shared.closed {
return Err(io::Error::new(io::ErrorKind::BrokenPipe, "PTY closed"));
}
shared.data.extend(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Clone for FakePty {
fn clone(&self) -> Self {
Self {
read_buf: Arc::clone(&self.read_buf),
write_buf: Arc::clone(&self.write_buf),
}
}
}
#[derive(Debug)]
pub struct FakePtyPair {
pub master: FakePty,
pub slave: FakePty,
}
impl FakePtyPair {
#[must_use]
pub fn new() -> Self {
let master = FakePty::new();
let slave = FakePty {
read_buf: Arc::clone(&master.write_buf),
write_buf: Arc::clone(&master.read_buf),
};
Self { master, slave }
}
}
impl Default for FakePtyPair {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fake_pty_read_write() {
let pty = FakePty::new();
pty.queue_input(b"Hello");
assert!(pty.has_input());
let mut clone = pty;
let mut buf = [0u8; 10];
let n = clone.read(&mut buf).unwrap();
assert_eq!(&buf[..n], b"Hello");
}
#[test]
fn fake_pty_pair() {
let pair = FakePtyPair::new();
let mut master = pair.master.clone();
master.write_all(b"command\n").unwrap();
let mut slave = pair.slave;
let mut buf = [0u8; 20];
let n = slave.read(&mut buf).unwrap();
assert_eq!(&buf[..n], b"command\n");
slave.write_all(b"response").unwrap();
let n = master.read(&mut buf).unwrap();
assert_eq!(&buf[..n], b"response");
}
#[test]
fn send_helpers() {
let pty = FakePty::new();
pty.send_line("Hello, World!");
pty.send_prompt("$ ");
let mut clone = pty;
let mut buf = [0u8; 50];
let n = clone.read(&mut buf).unwrap();
assert_eq!(&buf[..n], b"Hello, World!\r\n$ ");
}
}