completeq_rs/channel/
receiver.rs

1use std::{
2    cell::Cell,
3    sync::{
4        atomic::{AtomicUsize, Ordering},
5        Arc, Mutex,
6    },
7    task::Poll,
8};
9
10use async_timer_rs::Timer;
11use futures::FutureExt;
12
13use crate::{result::ReceiveResult, user_event::UserEvent};
14
15use super::{inner::CompleteQImpl, sender::EventSender};
16
17/// Event receiver endpoint.
18///
19/// We can create an associated [`EventSender`] instance from this instance.
20pub struct EventReceiver<E: UserEvent, T: Timer> {
21    max_len: usize,
22    event_id: E::ID,
23    pub(crate) receiver_id: usize,
24    inner: Arc<Mutex<CompleteQImpl<E>>>,
25    timer: Cell<Option<T>>,
26}
27
28impl<E: UserEvent, T: Timer> EventReceiver<E, T> {
29    pub(crate) fn new(
30        event_id: E::ID,
31        max_len: usize,
32        receiver_id_seq: Arc<AtomicUsize>,
33        inner: Arc<Mutex<CompleteQImpl<E>>>,
34        timer: Option<T>,
35    ) -> Self {
36        let id = receiver_id_seq.fetch_add(1, Ordering::SeqCst);
37
38        // Open a new channel or reset an existing channel configuration data.
39        inner
40            .lock()
41            .unwrap()
42            .open_channel(event_id.clone(), max_len);
43
44        Self {
45            max_len,
46            event_id,
47            receiver_id: id,
48            inner,
49            timer: Cell::new(timer),
50        }
51    }
52
53    /// Get receiver bound event_id
54    pub fn event_id(&self) -> E::ID {
55        self.event_id.clone()
56    }
57
58    pub fn max_len(&self) -> usize {
59        self.max_len
60    }
61
62    /// Create an associated [`Sender`](EventSender) instance
63    pub fn sender(&self) -> EventSender<E> {
64        EventSender::new(self.event_id.clone(), self.inner.clone())
65    }
66}
67
68impl<E: UserEvent, T: Timer> Drop for EventReceiver<E, T> {
69    fn drop(&mut self) {
70        self.inner
71            .lock()
72            .unwrap()
73            .close_channel(self.event_id.clone());
74    }
75}
76
77impl<E: UserEvent, T: Timer + Unpin> std::future::Future for EventReceiver<E, T> {
78    type Output = ReceiveResult<E>;
79    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
80        let timer = self.timer.take();
81
82        if let Some(mut timer) = timer {
83            match timer.poll_unpin(cx) {
84                Poll::Pending => {
85                    self.timer.set(Some(timer));
86                }
87                Poll::Ready(_) => {
88                    // Remove pending poll operation .
89                    self.inner
90                        .lock()
91                        .unwrap()
92                        .remove_pending_poll(self.receiver_id, self.event_id.clone());
93
94                    // Return timeout error
95                    return Poll::Ready(ReceiveResult::Timeout);
96                }
97            }
98        }
99
100        self.inner.lock().unwrap().poll_once(
101            self.receiver_id,
102            self.event_id.clone(),
103            cx.waker().clone(),
104        )
105    }
106}