use super::{ListenerMsg, Port};
use std::ops::{Add, Sub};
use std::sync::{mpsc, Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};
pub(super) struct EventListenerWorker<U>
where
U: Eq + PartialEq + Clone + PartialOrd + Send,
{
ports: Vec<Port<U>>,
sender: mpsc::Sender<ListenerMsg<U>>,
paused: Arc<RwLock<bool>>,
running: Arc<RwLock<bool>>,
next_tick: Instant,
tick_interval: Option<Duration>,
}
impl<U> EventListenerWorker<U>
where
U: Eq + PartialEq + Clone + PartialOrd + Send + 'static,
{
pub(super) fn new(
ports: Vec<Port<U>>,
sender: mpsc::Sender<ListenerMsg<U>>,
paused: Arc<RwLock<bool>>,
running: Arc<RwLock<bool>>,
tick_interval: Option<Duration>,
) -> Self {
Self {
ports,
sender,
paused,
running,
next_tick: Instant::now(),
tick_interval,
}
}
fn calc_next_tick(&mut self) {
self.next_tick = Instant::now().add(self.tick_interval.unwrap());
}
fn next_event(&self) -> Duration {
let now = Instant::now();
let fallback_time = now.add(Duration::from_secs(60));
let min_listener_event = self
.ports
.iter()
.map(|x| x.next_poll())
.min()
.unwrap_or(fallback_time);
let next_tick = match self.tick_interval.is_some() {
false => fallback_time,
true => self.next_tick,
};
let min_time = std::cmp::min(min_listener_event, next_tick);
if min_time > now {
min_time.sub(now)
} else {
Duration::ZERO
}
}
fn running(&self) -> bool {
if let Ok(lock) = self.running.read() {
return *lock;
}
true
}
fn paused(&self) -> bool {
if let Ok(lock) = self.paused.read() {
return *lock;
}
false
}
fn should_tick(&self) -> bool {
match self.tick_interval {
None => false,
Some(_) => self.next_tick <= Instant::now(),
}
}
fn send_tick(&mut self) -> Result<(), mpsc::SendError<ListenerMsg<U>>> {
match self.sender.send(ListenerMsg::Tick) {
Err(err) => Err(err),
Ok(_) => {
self.calc_next_tick();
Ok(())
}
}
}
#[allow(clippy::needless_collect)]
fn poll(&mut self) -> Result<(), mpsc::SendError<ListenerMsg<U>>> {
let msg: Vec<ListenerMsg<U>> = self
.ports
.iter_mut()
.filter_map(|x| {
if x.should_poll() {
let msg = match x.poll() {
Ok(Some(ev)) => Some(ListenerMsg::User(ev)),
Ok(None) => None,
Err(err) => Some(ListenerMsg::Error(err)),
};
x.calc_next_poll();
msg
} else {
None
}
})
.collect();
match msg
.into_iter()
.map(|x| self.sender.send(x))
.filter(|x| x.is_err())
.map(|x| x.err().unwrap())
.next()
{
None => Ok(()),
Some(e) => Err(e),
}
}
pub(super) fn run(&mut self) {
loop {
if !self.running() {
break;
}
if self.paused() {
thread::sleep(Duration::from_millis(25));
continue;
}
if self.poll().is_err() {
break;
}
if self.should_tick() && self.send_tick().is_err() {
break;
}
thread::sleep(self.next_event());
}
}
}
#[cfg(test)]
mod test {
use super::super::{ListenerError, ListenerResult};
use super::*;
use crate::core::event::{Key, KeyEvent};
use crate::mock::MockEvent;
use crate::mock::MockPoll;
use crate::Event;
use pretty_assertions::assert_eq;
#[test]
fn worker_should_send_poll() {
let (tx, rx) = mpsc::channel();
let paused = Arc::new(RwLock::new(false));
let paused_t = Arc::clone(&paused);
let running = Arc::new(RwLock::new(true));
let running_t = Arc::clone(&running);
let mut worker = EventListenerWorker::<MockEvent>::new(
vec![Port::new(
Box::new(MockPoll::default()),
Duration::from_secs(5),
)],
tx,
paused_t,
running_t,
None,
);
assert!(worker.poll().is_ok());
assert!(worker.next_event() <= Duration::from_secs(5));
assert_eq!(
ListenerResult::from(rx.recv().ok().unwrap()).ok().unwrap(),
Some(Event::Keyboard(KeyEvent::from(Key::Enter)))
);
}
#[test]
fn worker_should_send_tick() {
let (tx, rx) = mpsc::channel();
let paused = Arc::new(RwLock::new(false));
let paused_t = Arc::clone(&paused);
let running = Arc::new(RwLock::new(true));
let running_t = Arc::clone(&running);
let mut worker = EventListenerWorker::<MockEvent>::new(
vec![Port::new(
Box::new(MockPoll::default()),
Duration::from_secs(5),
)],
tx,
paused_t,
running_t,
Some(Duration::from_secs(1)),
);
assert!(worker.send_tick().is_ok());
assert!(worker.next_tick > Instant::now());
assert_eq!(
ListenerResult::from(rx.recv().ok().unwrap()).ok().unwrap(),
Some(Event::Tick)
);
}
#[test]
fn worker_should_calc_times_correctly_with_tick() {
let (tx, rx) = mpsc::channel();
let paused = Arc::new(RwLock::new(false));
let paused_t = Arc::clone(&paused);
let running = Arc::new(RwLock::new(true));
let running_t = Arc::clone(&running);
let mut worker = EventListenerWorker::<MockEvent>::new(
vec![Port::new(
Box::new(MockPoll::default()),
Duration::from_secs(5),
)],
tx,
paused_t,
running_t,
Some(Duration::from_secs(1)),
);
assert_eq!(worker.running(), true);
assert!(worker.next_event() <= Duration::from_secs(1));
assert!(worker.next_tick <= Instant::now());
assert!(worker.should_tick());
let expected_next_tick = Instant::now().add(Duration::from_secs(1));
worker.calc_next_tick();
assert!(worker.next_tick >= expected_next_tick);
assert!(worker.next_event() <= Duration::from_secs(1));
assert_eq!(worker.should_tick(), false);
{
let mut running_flag = match running.write() {
Ok(lock) => Ok(lock),
Err(_) => Err(ListenerError::CouldNotStop),
}
.ok()
.unwrap();
*running_flag = false;
}
assert_eq!(worker.running(), false);
drop(rx);
}
#[test]
fn worker_should_calc_times_correctly_without_tick() {
let (tx, rx) = mpsc::channel();
let paused = Arc::new(RwLock::new(false));
let paused_t = Arc::clone(&paused);
let running = Arc::new(RwLock::new(true));
let running_t = Arc::clone(&running);
let worker = EventListenerWorker::<MockEvent>::new(
vec![Port::new(
Box::new(MockPoll::default()),
Duration::from_secs(3),
)],
tx,
paused_t,
running_t,
None,
);
assert_eq!(worker.running(), true);
assert_eq!(worker.paused(), false);
assert!(worker.next_event() <= Duration::from_secs(3));
assert!(worker.next_tick <= Instant::now());
assert_eq!(worker.should_tick(), false);
assert!(worker.next_event() <= Duration::from_secs(3));
{
let mut running_flag = match running.write() {
Ok(lock) => Ok(lock),
Err(_) => Err(ListenerError::CouldNotStop),
}
.ok()
.unwrap();
*running_flag = false;
}
assert_eq!(worker.running(), false);
drop(rx);
}
#[test]
#[should_panic]
fn worker_should_panic_when_trying_next_tick_without_it() {
let (tx, _) = mpsc::channel();
let paused = Arc::new(RwLock::new(false));
let paused_t = Arc::clone(&paused);
let running = Arc::new(RwLock::new(true));
let running_t = Arc::clone(&running);
let mut worker =
EventListenerWorker::<MockEvent>::new(vec![], tx, paused_t, running_t, None);
worker.calc_next_tick();
}
}