completeq_rs/channel/
receiver.rs1use std::{
2 cell::Cell,
3 sync::{
4 atomic::{AtomicUsize, Ordering},
5 Arc, Mutex,
6 },
7 task::Poll,
8};
9
10use async_timer_rs::Timer;
11use futures::FutureExt;
12
13use crate::{result::ReceiveResult, user_event::UserEvent};
14
15use super::{inner::CompleteQImpl, sender::EventSender};
16
17pub struct EventReceiver<E: UserEvent, T: Timer> {
21 max_len: usize,
22 event_id: E::ID,
23 pub(crate) receiver_id: usize,
24 inner: Arc<Mutex<CompleteQImpl<E>>>,
25 timer: Cell<Option<T>>,
26}
27
28impl<E: UserEvent, T: Timer> EventReceiver<E, T> {
29 pub(crate) fn new(
30 event_id: E::ID,
31 max_len: usize,
32 receiver_id_seq: Arc<AtomicUsize>,
33 inner: Arc<Mutex<CompleteQImpl<E>>>,
34 timer: Option<T>,
35 ) -> Self {
36 let id = receiver_id_seq.fetch_add(1, Ordering::SeqCst);
37
38 inner
40 .lock()
41 .unwrap()
42 .open_channel(event_id.clone(), max_len);
43
44 Self {
45 max_len,
46 event_id,
47 receiver_id: id,
48 inner,
49 timer: Cell::new(timer),
50 }
51 }
52
53 pub fn event_id(&self) -> E::ID {
55 self.event_id.clone()
56 }
57
58 pub fn max_len(&self) -> usize {
59 self.max_len
60 }
61
62 pub fn sender(&self) -> EventSender<E> {
64 EventSender::new(self.event_id.clone(), self.inner.clone())
65 }
66}
67
68impl<E: UserEvent, T: Timer> Drop for EventReceiver<E, T> {
69 fn drop(&mut self) {
70 self.inner
71 .lock()
72 .unwrap()
73 .close_channel(self.event_id.clone());
74 }
75}
76
77impl<E: UserEvent, T: Timer + Unpin> std::future::Future for EventReceiver<E, T> {
78 type Output = ReceiveResult<E>;
79 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
80 let timer = self.timer.take();
81
82 if let Some(mut timer) = timer {
83 match timer.poll_unpin(cx) {
84 Poll::Pending => {
85 self.timer.set(Some(timer));
86 }
87 Poll::Ready(_) => {
88 self.inner
90 .lock()
91 .unwrap()
92 .remove_pending_poll(self.receiver_id, self.event_id.clone());
93
94 return Poll::Ready(ReceiveResult::Timeout);
96 }
97 }
98 }
99
100 self.inner.lock().unwrap().poll_once(
101 self.receiver_id,
102 self.event_id.clone(),
103 cx.waker().clone(),
104 )
105 }
106}