use std::time::Duration;
use crate::error::{Error, Result};
use crate::event::Event;
pub struct Subscription {
rx: EventReceiver,
_cancel: CancelHandle,
}
impl Subscription {
pub fn new(rx: EventReceiver, cancel: CancelHandle) -> Self {
Self {
rx,
_cancel: cancel,
}
}
pub fn try_recv(&self) -> Option<Event> {
self.rx.try_recv()
}
pub fn recv(&self, timeout: Duration) -> Result<Event> {
self.rx
.recv_timeout(timeout)
.ok_or(Error::Timeout { elapsed: timeout })
}
pub fn wait_for(&self, predicate: impl Fn(&Event) -> bool, timeout: Duration) -> Result<Event> {
let start = std::time::Instant::now();
loop {
let remaining = timeout.saturating_sub(start.elapsed());
if remaining.is_zero() {
return Err(Error::Timeout {
elapsed: start.elapsed(),
});
}
let poll = remaining.min(Duration::from_millis(10));
if let Some(event) = self.rx.recv_timeout(poll) {
if predicate(&event) {
return Ok(event);
}
}
}
}
pub fn iter(&self) -> SubscriptionIter<'_> {
SubscriptionIter { sub: self }
}
}
pub struct SubscriptionIter<'a> {
sub: &'a Subscription,
}
impl<'a> Iterator for SubscriptionIter<'a> {
type Item = Event;
fn next(&mut self) -> Option<Event> {
loop {
match self.sub.rx.recv_timeout(Duration::from_millis(100)) {
Some(event) => return Some(event),
None => {
continue;
}
}
}
}
}
pub struct EventReceiver {
rx: std::sync::mpsc::Receiver<Event>,
}
impl EventReceiver {
pub fn new(rx: std::sync::mpsc::Receiver<Event>) -> Self {
Self { rx }
}
pub fn try_recv(&self) -> Option<Event> {
self.rx.try_recv().ok()
}
pub fn recv_timeout(&self, timeout: Duration) -> Option<Event> {
self.rx.recv_timeout(timeout).ok()
}
}
pub struct CancelHandle {
_cancel_fn: Option<Box<dyn FnOnce() + Send>>,
}
impl CancelHandle {
pub fn new(cancel_fn: impl FnOnce() + Send + 'static) -> Self {
Self {
_cancel_fn: Some(Box::new(cancel_fn)),
}
}
pub fn noop() -> Self {
Self { _cancel_fn: None }
}
}
impl Drop for CancelHandle {
fn drop(&mut self) {
if let Some(cancel) = self._cancel_fn.take() {
cancel();
}
}
}