completeq_rs/oneshot/
receiver.rs

1use std::{
2    cell::Cell,
3    sync::{Arc, Mutex},
4    task::Poll,
5};
6
7use async_timer_rs::Timer;
8use futures::FutureExt;
9
10use crate::{result::ReceiveResult, user_event::UserEvent};
11
12use super::{inner::CompleteQImpl, sender::EventSender};
13
14/// Event receiver endpoint.
15///
16/// We can create an associated [`EventSender`] instance from this instance.
17pub struct EventReceiver<E: UserEvent, T: Timer> {
18    event_id: E::ID,
19    inner: Arc<Mutex<CompleteQImpl<E>>>,
20    timer: Cell<Option<T>>,
21}
22
23impl<E: UserEvent, T: Timer> EventReceiver<E, T> {
24    pub(crate) fn new(
25        event_id: E::ID,
26        inner: Arc<Mutex<CompleteQImpl<E>>>,
27        timer: Option<T>,
28    ) -> Self {
29        // Open a new channel or reset an existing channel configuration data.
30        _ = inner.lock().unwrap().open_channel(event_id.clone());
31
32        Self {
33            event_id,
34            inner,
35            timer: Cell::new(timer),
36        }
37    }
38
39    /// Get receiver bound event_id
40    pub fn event_id(&self) -> E::ID {
41        self.event_id.clone()
42    }
43
44    /// Create an associated [`Sender`](EventSender) instance
45    pub fn sender(&self) -> EventSender<E> {
46        EventSender::new(self.event_id.clone(), self.inner.clone())
47    }
48}
49
50impl<E: UserEvent, T: Timer> Drop for EventReceiver<E, T> {
51    fn drop(&mut self) {
52        self.inner
53            .lock()
54            .unwrap()
55            .close_channel(self.event_id.clone());
56    }
57}
58
59impl<E: UserEvent, T: Timer + Unpin> std::future::Future for EventReceiver<E, T> {
60    type Output = ReceiveResult<E>;
61    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
62        let timer = self.timer.take();
63
64        if let Some(mut timer) = timer {
65            match timer.poll_unpin(cx) {
66                Poll::Pending => {
67                    self.timer.set(Some(timer));
68                }
69                Poll::Ready(_) => {
70                    // Remove pending poll operation .
71                    self.inner
72                        .lock()
73                        .unwrap()
74                        .remove_pending_poll(self.event_id.clone());
75
76                    // Return timeout error
77                    return Poll::Ready(ReceiveResult::Timeout);
78                }
79            }
80        }
81
82        let poll = self
83            .inner
84            .lock()
85            .unwrap()
86            .poll_once(self.event_id.clone(), cx.waker().clone());
87
88        match poll {
89            Poll::Pending => Poll::Pending,
90            Poll::Ready(result) => {
91                // Remove pending poll operation .
92                self.inner
93                    .lock()
94                    .unwrap()
95                    .remove_pending_poll(self.event_id.clone());
96
97                // Return timeout error
98                return Poll::Ready(result);
99            }
100        }
101    }
102}