use crate::{simulation::Simulator, Event};
use std::time::{Duration, SystemTime};
use tokio::{
sync::mpsc::{channel, Receiver, Sender},
time::timeout,
};
pub type InputSender = Sender<Event>;
#[derive(Debug)]
pub struct InputQueue {
sender: Sender<Event>,
receiver: Receiver<Event>,
window: Option<Duration>,
}
impl InputQueue {
#[inline]
pub fn new(buffer: usize, window: Option<Duration>) -> Self {
let (sender, receiver) = channel(buffer);
Self {
sender,
receiver,
window,
}
}
#[inline]
pub fn subscribe(&self) -> InputSender {
self.sender.clone()
}
#[inline]
pub async fn wait_event(&mut self, t_next: Option<SystemTime>, component: &impl Simulator) {
let duration = match t_next {
Some(t_next) => t_next.duration_since(SystemTime::now()).unwrap_or_default(),
None => Duration::MAX,
};
tracing::debug!("waiting for external events or timeout of {duration:?}");
self.inject_timeout(duration, component).await;
if let Some(window) = self.window {
let t_max = match t_next {
Some(t_next) => std::cmp::min(t_next, SystemTime::now() + window),
None => SystemTime::now() + window,
};
while let Ok(duration) = t_max.duration_since(SystemTime::now()) {
tracing::debug!("waiting for external events within the window of {duration:?}");
self.inject_timeout(duration, component).await;
}
}
}
#[inline]
async fn inject_timeout(&mut self, duration: Duration, component: &impl Simulator) {
match timeout(duration, self.receiver.recv()).await {
Err(_) => {
tracing::debug!("timeout expired without any external events");
}
Ok(None) => {
tracing::error!("all senders have been dropped");
unreachable!();
}
Ok(Some(event)) => {
tracing::info!("injecting input event {event}");
match unsafe { component.get_component().inject(event) } {
Ok(_) => {}
Err(e) => {
tracing::error!("failed to inject event: {e:?}. Skipping.");
}
}
}
}
}
}