1mod inner;
2use async_timer_rs::{Timer, TimerWithContext};
3use inner::*;
4mod receiver;
5pub use receiver::*;
6mod sender;
7pub use sender::*;
8
9use std::{
10 sync::{Arc, Mutex},
11 time::Duration,
12};
13
14use crate::{
15 result::EmitResult,
16 user_event::{AutoIncEvent, UserEvent},
17};
18
19#[derive(Clone)]
23pub struct CompleteQ<E: UserEvent> {
24 event: E,
25
26 inner: Arc<Mutex<CompleteQImpl<E>>>,
27}
28
29impl<E: UserEvent> CompleteQ<E>
30where
31 E: 'static,
32{
33 pub fn new() -> Self {
34 Self {
35 event: E::default(),
36 inner: Arc::new(Mutex::new(CompleteQImpl::new())),
37 }
38 }
39
40 pub fn complete_one(&self, event_id: E::ID, event_arg: E::Argument) -> EmitResult {
41 EventSender::new(event_id, self.inner.clone()).send(event_arg)
42 }
43
44 pub fn cancel_all(&self) {
45 self.inner.lock().unwrap().cancel_all();
46 }
47
48 pub fn wait_for(&self, event_id: E::ID) -> EventReceiver<E, async_timer_rs::hashed::Timeout> {
50 EventReceiver::new(event_id, self.inner.clone(), None)
51 }
52
53 pub fn wait_for_with_timer<T: Timer>(&self, event_id: E::ID, timer: T) -> EventReceiver<E, T> {
55 let receiver = EventReceiver::new(event_id, self.inner.clone(), Some(timer));
56
57 receiver
58 }
59 pub fn wait_for_timeout<T: Timer>(
63 &self,
64 event_id: E::ID,
65 duration: Duration,
66 ) -> EventReceiver<E, T> {
67 EventReceiver::new(event_id, self.inner.clone(), Some(T::new(duration)))
68 }
69
70 pub fn wait_for_timeout_with_context<T: TimerWithContext, C>(
76 &self,
77 event_id: E::ID,
78 duration: Duration,
79 context: C,
80 ) -> EventReceiver<E, T>
81 where
82 C: AsMut<T::Context>,
83 {
84 self.wait_for_with_timer(event_id, T::new_with_context(duration, context))
85 }
86}
87
88impl<E: AutoIncEvent> CompleteQ<E>
89where
90 E: 'static,
91{
92 pub fn wait_one(&mut self) -> EventReceiver<E, async_timer_rs::hashed::Timeout> {
94 let event_id = self.event.next();
95 self.wait_for(event_id)
96 }
97
98 pub fn wait_one_timeout<T: Timer>(&mut self, timeout: Duration) -> EventReceiver<E, T> {
100 let event_id = self.event.next();
101
102 self.wait_for_timeout(event_id, timeout)
103 }
104
105 pub fn wait_one_timeout_with_context<T: TimerWithContext, C>(
106 &mut self,
107 timeout: Duration,
108 context: C,
109 ) -> EventReceiver<E, T>
110 where
111 C: AsMut<T::Context>,
112 {
113 let event_id = self.event.next();
114
115 self.wait_for_timeout_with_context(event_id, timeout, context)
116 }
117
118 pub fn wait_one_with_timer<T: Timer>(&mut self, timer: T) -> EventReceiver<E, T> {
119 let event_id = self.event.next();
120
121 self.wait_for_with_timer(event_id, timer)
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use std::time::Duration;
128
129 use async_timer_rs::hashed::Timeout;
130
131 use crate::{error::CompleteQError, user_event::RPCResponser};
132
133 use super::CompleteQ;
134
135 #[derive(Default)]
136 struct NullArgument;
137
138 type Event = RPCResponser<NullArgument>;
139
140 #[async_std::test]
141 async fn one_send_one_recv() -> anyhow::Result<()> {
142 _ = pretty_env_logger::try_init();
143
144 let mut q = CompleteQ::<Event>::new();
145
146 let receiver = q.wait_one();
147
148 let sender = receiver.sender();
149
150 async_std::task::spawn(async move {
151 sender.send(Default::default()).completed()?;
152
153 Ok::<(), CompleteQError>(())
154 });
155
156 receiver.await.success()?;
157
158 Ok(())
159 }
160
161 #[async_std::test]
162 async fn rec_timeout() -> anyhow::Result<()> {
163 _ = pretty_env_logger::try_init();
164
165 use std::time::SystemTime;
166
167 let mut q = CompleteQ::<Event>::new();
168
169 let now = SystemTime::now();
170
171 let receiver = q.wait_one_timeout::<Timeout>(Duration::from_secs(2));
172
173 assert!(receiver.await.is_timeout());
174
175 assert_eq!(now.elapsed()?.as_secs(), 2);
176
177 Ok(())
178 }
179}