use std::{
cell::Cell,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
task::Poll,
};
use async_timer_rs::Timer;
use futures::FutureExt;
use crate::{result::ReceiveResult, user_event::UserEvent};
use super::{inner::CompleteQImpl, sender::EventSender};
pub struct EventReceiver<E: UserEvent, T: Timer> {
max_len: usize,
event_id: E::ID,
pub(crate) receiver_id: usize,
inner: Arc<Mutex<CompleteQImpl<E>>>,
timer: Cell<Option<T>>,
}
impl<E: UserEvent, T: Timer> EventReceiver<E, T> {
pub(crate) fn new(
event_id: E::ID,
max_len: usize,
receiver_id_seq: Arc<AtomicUsize>,
inner: Arc<Mutex<CompleteQImpl<E>>>,
timer: Option<T>,
) -> Self {
let id = receiver_id_seq.fetch_add(1, Ordering::SeqCst);
inner
.lock()
.unwrap()
.open_channel(event_id.clone(), max_len);
Self {
max_len,
event_id,
receiver_id: id,
inner,
timer: Cell::new(timer),
}
}
pub fn event_id(&self) -> E::ID {
self.event_id.clone()
}
pub fn max_len(&self) -> usize {
self.max_len
}
pub fn sender(&self) -> EventSender<E> {
EventSender::new(self.event_id.clone(), self.inner.clone())
}
}
impl<E: UserEvent, T: Timer> Drop for EventReceiver<E, T> {
fn drop(&mut self) {
self.inner
.lock()
.unwrap()
.close_channel(self.event_id.clone());
}
}
impl<E: UserEvent, T: Timer + Unpin> std::future::Future for EventReceiver<E, T> {
type Output = ReceiveResult<E>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let timer = self.timer.take();
if let Some(mut timer) = timer {
match timer.poll_unpin(cx) {
Poll::Pending => {
self.timer.set(Some(timer));
}
Poll::Ready(_) => {
self.inner
.lock()
.unwrap()
.remove_pending_poll(self.receiver_id, self.event_id.clone());
return Poll::Ready(ReceiveResult::Timeout);
}
}
}
self.inner.lock().unwrap().poll_once(
self.receiver_id,
self.event_id.clone(),
cx.waker().clone(),
)
}
}