completeq_rs/
channel.rs

1use std::{
2    sync::{atomic::AtomicUsize, Arc, Mutex},
3    time::Duration,
4};
5
6use crate::user_event::{AutoIncEvent, UserEvent};
7
8mod inner;
9use async_timer_rs::{Timer, TimerWithContext};
10use inner::*;
11mod receiver;
12pub use receiver::*;
13mod sender;
14pub use sender::*;
15
16/// CompleteQ structure is a central scheduler for certain types of completion events
17///
18/// The generic parameter `E` represents a user-defined event type
19#[derive(Clone)]
20pub struct CompleteQ<E: UserEvent> {
21    event: E,
22    /// receiver id generator
23    receiver_id_seq: Arc<AtomicUsize>,
24
25    inner: Arc<Mutex<CompleteQImpl<E>>>,
26}
27
28impl<E: UserEvent> CompleteQ<E>
29where
30    E: 'static,
31{
32    pub fn new() -> Self {
33        Self {
34            event: E::default(),
35            receiver_id_seq: Arc::new(AtomicUsize::new(1)),
36            inner: Default::default(),
37        }
38    }
39
40    pub fn cancel_all(&self) {
41        self.inner.lock().unwrap().cancel_all();
42    }
43
44    pub fn complete_one(&self, event_id: E::ID, event_arg: E::Argument) -> EventSend<E> {
45        EventSend::new(event_id, event_arg, self.inner.clone())
46    }
47
48    /// Create a new event receiver with provide event_id
49    pub fn wait_for(
50        &self,
51        event_id: E::ID,
52        max_len: usize,
53    ) -> EventReceiver<E, async_timer_rs::hashed::Timeout> {
54        EventReceiver::new(
55            event_id,
56            max_len,
57            self.receiver_id_seq.clone(),
58            self.inner.clone(),
59            None,
60        )
61    }
62
63    /// [`wait_for`](CompleteQ::wait_for)  operation with timeout
64    pub fn wait_for_timeout<T: TimerWithContext>(
65        &self,
66        event_id: E::ID,
67        max_len: usize,
68        timeout: Duration,
69    ) -> EventReceiver<E, T> {
70        self.wait_for_with_timer(event_id, max_len, T::new(timeout))
71    }
72
73    pub fn wait_for_timeout_with_context<T: TimerWithContext, C>(
74        &self,
75        event_id: E::ID,
76        max_len: usize,
77        timeout: Duration,
78        context: C,
79    ) -> EventReceiver<E, T>
80    where
81        C: AsMut<T::Context>,
82    {
83        self.wait_for_with_timer(event_id, max_len, T::new_with_context(timeout, context))
84    }
85
86    pub fn wait_for_with_timer<T: Timer>(
87        &self,
88        event_id: E::ID,
89        max_len: usize,
90        timer: T,
91    ) -> EventReceiver<E, T> {
92        let receiver = EventReceiver::new(
93            event_id,
94            max_len,
95            self.receiver_id_seq.clone(),
96            self.inner.clone(),
97            Some(timer),
98        );
99
100        receiver
101    }
102}
103
104impl<E: AutoIncEvent> CompleteQ<E>
105where
106    E: 'static,
107{
108    /// Create a new event receiver with automatic generate event_id
109    pub fn wait_one(
110        &mut self,
111        max_len: usize,
112    ) -> EventReceiver<E, async_timer_rs::hashed::Timeout> {
113        let event_id = self.event.next();
114        self.wait_for(event_id, max_len)
115    }
116
117    pub fn wait_one_with_timer<T: Timer>(
118        &mut self,
119        max_len: usize,
120        timer: T,
121    ) -> EventReceiver<E, T> {
122        let event_id = self.event.next();
123
124        self.wait_for_with_timer(event_id, max_len, timer)
125    }
126
127    pub fn wait_one_timeout<T: Timer>(
128        &mut self,
129        max_len: usize,
130        duration: Duration,
131    ) -> EventReceiver<E, T> {
132        let event_id = self.event.next();
133
134        self.wait_for_with_timer(event_id, max_len, T::new(duration))
135    }
136
137    pub fn wait_one_timeout_with_context<T: TimerWithContext, C>(
138        &mut self,
139        max_len: usize,
140        duration: Duration,
141        context: C,
142    ) -> EventReceiver<E, T>
143    where
144        C: AsMut<T::Context>,
145    {
146        let event_id = self.event.next();
147
148        self.wait_for_with_timer(event_id, max_len, T::new_with_context(duration, context))
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use crate::{error::CompleteQError, user_event::RPCResponser};
155
156    use super::CompleteQ;
157
158    #[derive(Default)]
159    struct NullArgument;
160
161    type Event = RPCResponser<NullArgument>;
162
163    #[async_std::test]
164    async fn one_send_one_recv() -> anyhow::Result<()> {
165        _ = pretty_env_logger::try_init();
166
167        let mut q = CompleteQ::<Event>::new();
168
169        let receiver = q.wait_one(10);
170
171        let sender = receiver.sender();
172
173        async_std::task::spawn(async move {
174            sender.send(Default::default()).await.completed()?;
175
176            Ok::<(), CompleteQError>(())
177        });
178
179        receiver.await.success()?;
180
181        Ok(())
182    }
183}