completeq_rs/oneshot/
receiver.rs1use std::{
2 cell::Cell,
3 sync::{Arc, Mutex},
4 task::Poll,
5};
6
7use async_timer_rs::Timer;
8use futures::FutureExt;
9
10use crate::{result::ReceiveResult, user_event::UserEvent};
11
12use super::{inner::CompleteQImpl, sender::EventSender};
13
14pub struct EventReceiver<E: UserEvent, T: Timer> {
18 event_id: E::ID,
19 inner: Arc<Mutex<CompleteQImpl<E>>>,
20 timer: Cell<Option<T>>,
21}
22
23impl<E: UserEvent, T: Timer> EventReceiver<E, T> {
24 pub(crate) fn new(
25 event_id: E::ID,
26 inner: Arc<Mutex<CompleteQImpl<E>>>,
27 timer: Option<T>,
28 ) -> Self {
29 _ = inner.lock().unwrap().open_channel(event_id.clone());
31
32 Self {
33 event_id,
34 inner,
35 timer: Cell::new(timer),
36 }
37 }
38
39 pub fn event_id(&self) -> E::ID {
41 self.event_id.clone()
42 }
43
44 pub fn sender(&self) -> EventSender<E> {
46 EventSender::new(self.event_id.clone(), self.inner.clone())
47 }
48}
49
50impl<E: UserEvent, T: Timer> Drop for EventReceiver<E, T> {
51 fn drop(&mut self) {
52 self.inner
53 .lock()
54 .unwrap()
55 .close_channel(self.event_id.clone());
56 }
57}
58
59impl<E: UserEvent, T: Timer + Unpin> std::future::Future for EventReceiver<E, T> {
60 type Output = ReceiveResult<E>;
61 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
62 let timer = self.timer.take();
63
64 if let Some(mut timer) = timer {
65 match timer.poll_unpin(cx) {
66 Poll::Pending => {
67 self.timer.set(Some(timer));
68 }
69 Poll::Ready(_) => {
70 self.inner
72 .lock()
73 .unwrap()
74 .remove_pending_poll(self.event_id.clone());
75
76 return Poll::Ready(ReceiveResult::Timeout);
78 }
79 }
80 }
81
82 let poll = self
83 .inner
84 .lock()
85 .unwrap()
86 .poll_once(self.event_id.clone(), cx.waker().clone());
87
88 match poll {
89 Poll::Pending => Poll::Pending,
90 Poll::Ready(result) => {
91 self.inner
93 .lock()
94 .unwrap()
95 .remove_pending_poll(self.event_id.clone());
96
97 return Poll::Ready(result);
99 }
100 }
101 }
102}