completeq_rs/
oneshot.rs

1mod inner;
2use async_timer_rs::{Timer, TimerWithContext};
3use inner::*;
4mod receiver;
5pub use receiver::*;
6mod sender;
7pub use sender::*;
8
9use std::{
10    sync::{Arc, Mutex},
11    time::Duration,
12};
13
14use crate::{
15    result::EmitResult,
16    user_event::{AutoIncEvent, UserEvent},
17};
18
19/// CompleteQ structure is a central scheduler for certain types of completion events
20///
21/// The generic parameter `E` represents a user-defined event type
22#[derive(Clone)]
23pub struct CompleteQ<E: UserEvent> {
24    event: E,
25
26    inner: Arc<Mutex<CompleteQImpl<E>>>,
27}
28
29impl<E: UserEvent> CompleteQ<E>
30where
31    E: 'static,
32{
33    pub fn new() -> Self {
34        Self {
35            event: E::default(),
36            inner: Arc::new(Mutex::new(CompleteQImpl::new())),
37        }
38    }
39
40    pub fn complete_one(&self, event_id: E::ID, event_arg: E::Argument) -> EmitResult {
41        EventSender::new(event_id, self.inner.clone()).send(event_arg)
42    }
43
44    pub fn cancel_all(&self) {
45        self.inner.lock().unwrap().cancel_all();
46    }
47
48    /// Create a new event receiver with provide event_id
49    pub fn wait_for(&self, event_id: E::ID) -> EventReceiver<E, async_timer_rs::hashed::Timeout> {
50        EventReceiver::new(event_id, self.inner.clone(), None)
51    }
52
53    /// [`wait_for`](CompleteQ::wait_for) with timer to trigger waiting timeout
54    pub fn wait_for_with_timer<T: Timer>(&self, event_id: E::ID, timer: T) -> EventReceiver<E, T> {
55        let receiver = EventReceiver::new(event_id, self.inner.clone(), Some(timer));
56
57        receiver
58    }
59    /// This function takes a timeout interval and creates a timer object internally
60    ///
61    /// See [`wait_for_with_timer`](CompleteQ::wait_for_with_timer) for more information
62    pub fn wait_for_timeout<T: Timer>(
63        &self,
64        event_id: E::ID,
65        duration: Duration,
66    ) -> EventReceiver<E, T> {
67        EventReceiver::new(event_id, self.inner.clone(), Some(T::new(duration)))
68    }
69
70    /// Compared to function [`wait_for_timeout`](CompleteQ::wait_for_timeout),
71    /// this function provides a [`C`](TimerWithContext::Context) configuration
72    /// parameter to timer creation
73    ///
74    /// See [`wait_for_with_timer`](CompleteQ::wait_for_with_timer) for more information
75    pub fn wait_for_timeout_with_context<T: TimerWithContext, C>(
76        &self,
77        event_id: E::ID,
78        duration: Duration,
79        context: C,
80    ) -> EventReceiver<E, T>
81    where
82        C: AsMut<T::Context>,
83    {
84        self.wait_for_with_timer(event_id, T::new_with_context(duration, context))
85    }
86}
87
88impl<E: AutoIncEvent> CompleteQ<E>
89where
90    E: 'static,
91{
92    /// Create a new event receiver with automatic generate event_id
93    pub fn wait_one(&mut self) -> EventReceiver<E, async_timer_rs::hashed::Timeout> {
94        let event_id = self.event.next();
95        self.wait_for(event_id)
96    }
97
98    /// [`wait_one`](CompleteQ::wait_one) operation with timeout
99    pub fn wait_one_timeout<T: Timer>(&mut self, timeout: Duration) -> EventReceiver<E, T> {
100        let event_id = self.event.next();
101
102        self.wait_for_timeout(event_id, timeout)
103    }
104
105    pub fn wait_one_timeout_with_context<T: TimerWithContext, C>(
106        &mut self,
107        timeout: Duration,
108        context: C,
109    ) -> EventReceiver<E, T>
110    where
111        C: AsMut<T::Context>,
112    {
113        let event_id = self.event.next();
114
115        self.wait_for_timeout_with_context(event_id, timeout, context)
116    }
117
118    pub fn wait_one_with_timer<T: Timer>(&mut self, timer: T) -> EventReceiver<E, T> {
119        let event_id = self.event.next();
120
121        self.wait_for_with_timer(event_id, timer)
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use std::time::Duration;
128
129    use async_timer_rs::hashed::Timeout;
130
131    use crate::{error::CompleteQError, user_event::RPCResponser};
132
133    use super::CompleteQ;
134
135    #[derive(Default)]
136    struct NullArgument;
137
138    type Event = RPCResponser<NullArgument>;
139
140    #[async_std::test]
141    async fn one_send_one_recv() -> anyhow::Result<()> {
142        _ = pretty_env_logger::try_init();
143
144        let mut q = CompleteQ::<Event>::new();
145
146        let receiver = q.wait_one();
147
148        let sender = receiver.sender();
149
150        async_std::task::spawn(async move {
151            sender.send(Default::default()).completed()?;
152
153            Ok::<(), CompleteQError>(())
154        });
155
156        receiver.await.success()?;
157
158        Ok(())
159    }
160
161    #[async_std::test]
162    async fn rec_timeout() -> anyhow::Result<()> {
163        _ = pretty_env_logger::try_init();
164
165        use std::time::SystemTime;
166
167        let mut q = CompleteQ::<Event>::new();
168
169        let now = SystemTime::now();
170
171        let receiver = q.wait_one_timeout::<Timeout>(Duration::from_secs(2));
172
173        assert!(receiver.await.is_timeout());
174
175        assert_eq!(now.elapsed()?.as_secs(), 2);
176
177        Ok(())
178    }
179}