1use std::{
2 sync::{atomic::AtomicUsize, Arc, Mutex},
3 time::Duration,
4};
5
6use crate::user_event::{AutoIncEvent, UserEvent};
7
8mod inner;
9use async_timer_rs::{Timer, TimerWithContext};
10use inner::*;
11mod receiver;
12pub use receiver::*;
13mod sender;
14pub use sender::*;
15
16#[derive(Clone)]
20pub struct CompleteQ<E: UserEvent> {
21 event: E,
22 receiver_id_seq: Arc<AtomicUsize>,
24
25 inner: Arc<Mutex<CompleteQImpl<E>>>,
26}
27
28impl<E: UserEvent> CompleteQ<E>
29where
30 E: 'static,
31{
32 pub fn new() -> Self {
33 Self {
34 event: E::default(),
35 receiver_id_seq: Arc::new(AtomicUsize::new(1)),
36 inner: Default::default(),
37 }
38 }
39
40 pub fn cancel_all(&self) {
41 self.inner.lock().unwrap().cancel_all();
42 }
43
44 pub fn complete_one(&self, event_id: E::ID, event_arg: E::Argument) -> EventSend<E> {
45 EventSend::new(event_id, event_arg, self.inner.clone())
46 }
47
48 pub fn wait_for(
50 &self,
51 event_id: E::ID,
52 max_len: usize,
53 ) -> EventReceiver<E, async_timer_rs::hashed::Timeout> {
54 EventReceiver::new(
55 event_id,
56 max_len,
57 self.receiver_id_seq.clone(),
58 self.inner.clone(),
59 None,
60 )
61 }
62
63 pub fn wait_for_timeout<T: TimerWithContext>(
65 &self,
66 event_id: E::ID,
67 max_len: usize,
68 timeout: Duration,
69 ) -> EventReceiver<E, T> {
70 self.wait_for_with_timer(event_id, max_len, T::new(timeout))
71 }
72
73 pub fn wait_for_timeout_with_context<T: TimerWithContext, C>(
74 &self,
75 event_id: E::ID,
76 max_len: usize,
77 timeout: Duration,
78 context: C,
79 ) -> EventReceiver<E, T>
80 where
81 C: AsMut<T::Context>,
82 {
83 self.wait_for_with_timer(event_id, max_len, T::new_with_context(timeout, context))
84 }
85
86 pub fn wait_for_with_timer<T: Timer>(
87 &self,
88 event_id: E::ID,
89 max_len: usize,
90 timer: T,
91 ) -> EventReceiver<E, T> {
92 let receiver = EventReceiver::new(
93 event_id,
94 max_len,
95 self.receiver_id_seq.clone(),
96 self.inner.clone(),
97 Some(timer),
98 );
99
100 receiver
101 }
102}
103
104impl<E: AutoIncEvent> CompleteQ<E>
105where
106 E: 'static,
107{
108 pub fn wait_one(
110 &mut self,
111 max_len: usize,
112 ) -> EventReceiver<E, async_timer_rs::hashed::Timeout> {
113 let event_id = self.event.next();
114 self.wait_for(event_id, max_len)
115 }
116
117 pub fn wait_one_with_timer<T: Timer>(
118 &mut self,
119 max_len: usize,
120 timer: T,
121 ) -> EventReceiver<E, T> {
122 let event_id = self.event.next();
123
124 self.wait_for_with_timer(event_id, max_len, timer)
125 }
126
127 pub fn wait_one_timeout<T: Timer>(
128 &mut self,
129 max_len: usize,
130 duration: Duration,
131 ) -> EventReceiver<E, T> {
132 let event_id = self.event.next();
133
134 self.wait_for_with_timer(event_id, max_len, T::new(duration))
135 }
136
137 pub fn wait_one_timeout_with_context<T: TimerWithContext, C>(
138 &mut self,
139 max_len: usize,
140 duration: Duration,
141 context: C,
142 ) -> EventReceiver<E, T>
143 where
144 C: AsMut<T::Context>,
145 {
146 let event_id = self.event.next();
147
148 self.wait_for_with_timer(event_id, max_len, T::new_with_context(duration, context))
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use crate::{error::CompleteQError, user_event::RPCResponser};
155
156 use super::CompleteQ;
157
158 #[derive(Default)]
159 struct NullArgument;
160
161 type Event = RPCResponser<NullArgument>;
162
163 #[async_std::test]
164 async fn one_send_one_recv() -> anyhow::Result<()> {
165 _ = pretty_env_logger::try_init();
166
167 let mut q = CompleteQ::<Event>::new();
168
169 let receiver = q.wait_one(10);
170
171 let sender = receiver.sender();
172
173 async_std::task::spawn(async move {
174 sender.send(Default::default()).await.completed()?;
175
176 Ok::<(), CompleteQError>(())
177 });
178
179 receiver.await.success()?;
180
181 Ok(())
182 }
183}