use std::{io, io::BufRead, os::unix::net::UnixStream, path::Path, time};
use anyhow::anyhow;
pub struct Events {
lines: io::Lines<io::BufReader<UnixStream>>,
}
impl Events {
pub fn new<P: AsRef<Path>>(sock: P) -> anyhow::Result<Self> {
let mut sleep_dur = time::Duration::from_millis(5);
for _ in 0..12 {
if let Ok(s) = UnixStream::connect(&sock) {
return Ok(Events { lines: io::BufReader::new(s).lines() });
} else {
std::thread::sleep(sleep_dur);
sleep_dur *= 2;
}
}
Err(anyhow!("timed out waiting for connection to event sock"))
}
pub fn waiter<S, SI>(mut self, events: SI) -> EventWaiter
where
S: Into<String>,
SI: IntoIterator<Item = S>,
{
let events: Vec<String> = events.into_iter().map(|s| s.into()).collect();
assert!(!events.is_empty());
let (tx, rx) = crossbeam_channel::bounded(events.len());
let waiter = EventWaiter { matched: rx };
std::thread::spawn(move || {
let mut return_lines = false;
let mut offset = 0;
'LINELOOP: for line in &mut self.lines {
match line {
Ok(l) => {
if events[offset] == l {
if offset == events.len() - 1 {
return_lines = true;
break 'LINELOOP;
} else {
tx.send(WaiterEvent::Event(l)).unwrap();
}
offset += 1;
}
}
Err(e) => {
eprintln!("error scanning for event '{}': {:?}", events[offset], e);
}
}
}
if return_lines {
tx.send(WaiterEvent::Done((events[offset].clone(), self.lines))).unwrap();
}
});
waiter
}
pub fn await_event(&mut self, event: &str) -> anyhow::Result<()> {
for line in &mut self.lines {
let line = line?;
if line == event {
return Ok(());
}
}
Ok(())
}
}
pub struct EventWaiter {
matched: crossbeam_channel::Receiver<WaiterEvent>,
}
enum WaiterEvent {
Event(String),
Done((String, io::Lines<io::BufReader<UnixStream>>)),
}
impl EventWaiter {
pub fn wait_event(&mut self, event: &str) -> anyhow::Result<()> {
eprintln!("waiting for event '{event}'");
match self.matched.recv()? {
WaiterEvent::Event(e) => {
if e == event {
Ok(())
} else {
Err(anyhow!("Got '{}' event, want '{}'", e, event))
}
}
WaiterEvent::Done((e, _)) => {
if e == event {
Ok(())
} else {
Err(anyhow!("Got '{}' event, want '{}'", e, event))
}
}
}
}
pub fn wait_final_event(self, event: &str) -> anyhow::Result<Events> {
eprintln!("waiting for final event '{event}'");
match self.matched.recv()? {
WaiterEvent::Event(e) => {
Err(anyhow!("Got non-fianl '{}' event, want final '{}'", e, event))
}
WaiterEvent::Done((e, lines)) => {
if e == event {
Ok(Events { lines })
} else {
Err(anyhow!("Got '{}' event, want '{}'", e, event))
}
}
}
}
}