use std::io::{self, Cursor, Read, Write};
use std::path::PathBuf;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::Arc;
use std::time::Duration;
use tear_core::InProcess;
use tear_types::wire::{read_msg, Response};
use crate::DaemonHandle;
pub struct DuplexStream {
r: Cursor<Vec<u8>>,
w: Sender<u8>,
}
impl DuplexStream {
#[must_use]
pub fn new(input: Vec<u8>, sink: Sender<u8>) -> Self {
Self {
r: Cursor::new(input),
w: sink,
}
}
}
impl Read for DuplexStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.r.read(buf)
}
}
impl Write for DuplexStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
for &b in buf {
self.w
.send(b)
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "rx dropped"))?;
}
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[must_use]
pub fn drain_responses(rx: &Receiver<u8>) -> Vec<Response> {
let mut bytes = Vec::new();
while let Ok(b) = rx.recv_timeout(Duration::from_millis(50)) {
bytes.push(b);
}
let mut cur = Cursor::new(bytes);
let mut out = Vec::new();
while let Ok(r) = read_msg::<_, Response>(&mut cur) {
out.push(r);
}
out
}
pub struct DaemonHarness {
socket: PathBuf,
daemon: Option<DaemonHandle>,
}
impl DaemonHarness {
pub fn new(label: &str) -> Self {
use std::sync::atomic::{AtomicU32, Ordering};
static SEQ: AtomicU32 = AtomicU32::new(0);
let pid = std::process::id();
let seq = SEQ.fetch_add(1, Ordering::Relaxed);
let mut socket = std::env::temp_dir();
socket.push(format!("tear-{label}-{pid}-{seq}.sock"));
let _ = std::fs::remove_file(&socket);
let inproc = Arc::new(InProcess::new());
let daemon = crate::start(socket.clone(), inproc).expect("daemon start");
std::thread::sleep(Duration::from_millis(50));
Self {
socket,
daemon: Some(daemon),
}
}
pub fn socket(&self) -> &std::path::Path {
&self.socket
}
pub fn daemon(&self) -> &DaemonHandle {
self.daemon.as_ref().expect("daemon dropped")
}
}
impl Drop for DaemonHarness {
fn drop(&mut self) {
if let Some(d) = self.daemon.take() {
d.stop();
}
}
}