message_io/
events.rs

1use crossbeam_channel::{self, Sender, Receiver, select};
2
3use std::time::{Instant, Duration};
4use std::collections::{BTreeMap};
5
6/// As a shortcut, it returns the sender and receiver queue as a tuple.
7///
8/// Equivalent to:
9/// ```
10/// struct MyEvent; // or usually an enum
11///
12/// use message_io::events::EventReceiver;
13///
14/// let event_queue = EventReceiver::<MyEvent>::default();
15/// let event_sender = event_queue.sender().clone();
16/// ```
17pub fn split<E: Send + 'static>() -> (EventSender<E>, EventReceiver<E>) {
18    let event_queue = EventReceiver::default();
19    let event_sender = event_queue.sender().clone();
20
21    (event_sender, event_queue)
22}
23
24/// An ID that represents a timer scheduled.
25/// It can be used to cancel the event.
26#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
27pub struct TimerId(Instant);
28
29// Internal enum to enqueue different timer commands in a single queue
30enum TimerCommand<E> {
31    Create(E),
32    Cancel,
33}
34
35/// A generic and synchronized queue where the user can send and receive events.
36/// See [`EventSender`] to see how send events.
37/// This entity can be used as an utility for the [`crate::network`] module redirecting the
38/// network events to process them later from here.
39pub struct EventReceiver<E> {
40    event_sender: EventSender<E>, // Should be before receiver in order to drop first.
41    receiver: Receiver<E>,
42    timer_receiver: Receiver<(Instant, TimerCommand<E>)>,
43    priority_receiver: Receiver<E>,
44    timers: BTreeMap<Instant, E>,
45}
46
47impl<E> Default for EventReceiver<E>
48where E: Send + 'static
49{
50    /// Creates a new event queue for generic incoming events.
51    fn default() -> Self {
52        let (sender, receiver) = crossbeam_channel::unbounded();
53        let (timer_sender, timer_receiver) = crossbeam_channel::unbounded();
54        let (priority_sender, priority_receiver) = crossbeam_channel::unbounded();
55        EventReceiver {
56            event_sender: EventSender::new(sender, timer_sender, priority_sender),
57            receiver,
58            timer_receiver,
59            priority_receiver,
60            timers: BTreeMap::new(),
61        }
62    }
63}
64
65impl<E> EventReceiver<E>
66where E: Send + 'static
67{
68    /// Returns the internal sender reference to this queue.
69    /// This reference can be safety cloned and shared to other threads
70    /// in order to get several senders to the same queue.
71    pub fn sender(&self) -> &EventSender<E> {
72        &self.event_sender
73    }
74
75    fn enque_timers(&mut self) {
76        for timer in self.timer_receiver.try_iter() {
77            match timer.1 {
78                TimerCommand::Create(e) => self.timers.insert(timer.0, e),
79                TimerCommand::Cancel => self.timers.remove(&timer.0),
80            };
81        }
82    }
83
84    /// Blocks the current thread until an event is received by this queue.
85    pub fn receive(&mut self) -> E {
86        self.enque_timers();
87        // Since [`EventReceiver`] always has a sender attribute,
88        // any call to [`receive()`] always has a living sender in that time
89        // and the channel never can be considered disconnected.
90        if !self.priority_receiver.is_empty() {
91            self.priority_receiver.recv().unwrap()
92        }
93        else if self.timers.is_empty() {
94            select! {
95                recv(self.receiver) -> event => event.unwrap(),
96                recv(self.priority_receiver) -> event => event.unwrap(),
97            }
98        }
99        else {
100            let next_instant = *self.timers.iter().next().unwrap().0;
101            if next_instant <= Instant::now() {
102                self.timers.remove(&next_instant).unwrap()
103            }
104            else {
105                select! {
106                    recv(self.receiver) -> event => event.unwrap(),
107                    recv(self.priority_receiver) -> event => event.unwrap(),
108                    recv(crossbeam_channel::at(next_instant)) -> _ => {
109                        self.timers.remove(&next_instant).unwrap()
110                    }
111                }
112            }
113        }
114    }
115
116    /// Blocks the current thread until an event is received by this queue or timeout is exceeded.
117    /// If timeout is reached a None is returned, otherwise the event is returned.
118    pub fn receive_timeout(&mut self, timeout: Duration) -> Option<E> {
119        self.enque_timers();
120
121        if !self.priority_receiver.is_empty() {
122            Some(self.priority_receiver.recv().unwrap())
123        }
124        else if self.timers.is_empty() {
125            select! {
126                recv(self.receiver) -> event => Some(event.unwrap()),
127                recv(self.priority_receiver) -> event => Some(event.unwrap()),
128                default(timeout) => None
129            }
130        }
131        else {
132            let next_instant = *self.timers.iter().next().unwrap().0;
133            if next_instant <= Instant::now() {
134                self.timers.remove(&next_instant)
135            }
136            else {
137                select! {
138                    recv(self.receiver) -> event => Some(event.unwrap()),
139                    recv(self.priority_receiver) -> event => Some(event.unwrap()),
140                    recv(crossbeam_channel::at(next_instant)) -> _ => {
141                        self.timers.remove(&next_instant)
142                    }
143                    default(timeout) => None
144                }
145            }
146        }
147    }
148
149    /// Attempts to receive an event without blocking.
150    /// Returns Some(E) if an event was received by this queue, otherwise returns None.
151    pub fn try_receive(&mut self) -> Option<E> {
152        self.enque_timers();
153
154        if let Ok(priority_event) = self.priority_receiver.try_recv() {
155            return Some(priority_event);
156        }
157        else if let Some(next_instant) = self.timers.iter().next() {
158            if *next_instant.0 <= Instant::now() {
159                let instant = *next_instant.0;
160                return self.timers.remove(&instant);
161            }
162        }
163        else if let Ok(event) = self.receiver.try_recv() {
164            return Some(event);
165        }
166
167        None
168    }
169}
170
171/// Struct used to send events into a [`EventReceiver`].
172/// This type can only be generated by the receiver `EventReceiver`.
173pub struct EventSender<E> {
174    sender: Sender<E>,
175    timer_sender: Sender<(Instant, TimerCommand<E>)>,
176    priority_sender: Sender<E>,
177}
178
179impl<E> EventSender<E>
180where E: Send + 'static
181{
182    fn new(
183        sender: Sender<E>,
184        timer_sender: Sender<(Instant, TimerCommand<E>)>,
185        priority_sender: Sender<E>,
186    ) -> EventSender<E> {
187        EventSender { sender, timer_sender, priority_sender }
188    }
189
190    /// Send instantly an event to the event queue.
191    pub fn send(&self, event: E) {
192        self.sender.send(event).ok();
193    }
194
195    /// Send instantly an event that would be process before any other event sent
196    /// by the [`EventSender::send()`] method.
197    /// Successive calls to send_with_priority will maintain the order of arrival.
198    pub fn send_with_priority(&self, event: E) {
199        self.priority_sender.send(event).ok();
200    }
201
202    /// Send a timed event to the [`EventReceiver`].
203    /// The event only will be sent after the specific duration, never before.
204    /// If the [`EventSender`] is dropped, the event will be generated as well unless
205    /// [`EventSender::cancel_timer()`] be called.
206    pub fn send_with_timer(&self, event: E, duration: Duration) -> TimerId {
207        let when = Instant::now() + duration;
208        self.timer_sender.send((when, TimerCommand::Create(event))).ok();
209        TimerId(when)
210    }
211
212    /// Remove a timer previously sent by [`EventSender::send_with_timer()`].
213    /// The timer will not be receive by the [`EventReceiver`].
214    pub fn cancel_timer(&self, timer_id: TimerId) {
215        self.timer_sender.send((timer_id.0, TimerCommand::Cancel)).ok();
216    }
217}
218
219impl<E> Clone for EventSender<E>
220where E: Send + 'static
221{
222    fn clone(&self) -> Self {
223        EventSender::new(
224            self.sender.clone(),
225            self.timer_sender.clone(),
226            self.priority_sender.clone(),
227        )
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234
235    // This high delay is for ensure to works CI machines that offers really slow resources.
236    // For a estandar execution, a value of 1ms is enough for the 99% of cases.
237    const DELAY: u64 = 2000; //ms
238
239    lazy_static::lazy_static! {
240        static ref ZERO_MS: Duration = Duration::from_millis(0);
241        static ref TIMER_TIME: Duration = Duration::from_millis(100);
242        static ref TIMEOUT: Duration = *TIMER_TIME * 2  + Duration::from_millis(DELAY);
243    }
244
245    #[test]
246    fn waiting_timer_event() {
247        let mut queue = EventReceiver::default();
248        queue.sender().send_with_timer("Timed", *TIMER_TIME);
249        assert_eq!(queue.receive_timeout(*TIMEOUT).unwrap(), "Timed");
250    }
251
252    #[test]
253    fn standard_events_order() {
254        let mut queue = EventReceiver::default();
255        queue.sender().send("first");
256        queue.sender().send("second");
257        assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "first");
258        assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "second");
259    }
260
261    #[test]
262    fn priority_events_order() {
263        let mut queue = EventReceiver::default();
264        queue.sender().send("standard");
265        queue.sender().send_with_priority("priority_first");
266        queue.sender().send_with_priority("priority_second");
267        assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "priority_first");
268        assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "priority_second");
269        assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "standard");
270    }
271
272    #[test]
273    fn timer_events_order() {
274        let mut queue = EventReceiver::default();
275        queue.sender().send_with_timer("timed_last", *TIMER_TIME * 2);
276        queue.sender().send_with_timer("timed_short", *TIMER_TIME);
277
278        std::thread::sleep(*TIMEOUT);
279        // The timed event has been received at this point
280
281        assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "timed_short");
282        assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "timed_last");
283    }
284
285    #[test]
286    fn default_and_timer_events_order() {
287        let mut queue = EventReceiver::default();
288        queue.sender().send_with_timer("timed", *TIMER_TIME);
289        queue.sender().send("standard_first");
290        queue.sender().send("standard_second");
291
292        std::thread::sleep(*TIMEOUT);
293        // The timed event has been received at this point
294
295        assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "timed");
296        assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "standard_first");
297        assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "standard_second");
298    }
299
300    #[test]
301    fn priority_and_timer_events_order() {
302        let mut queue = EventReceiver::default();
303        queue.sender().send_with_timer("timed", *TIMER_TIME);
304        queue.sender().send_with_priority("priority");
305
306        std::thread::sleep(*TIMEOUT);
307        // The timed event has been received at this point
308
309        assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "priority");
310        assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "timed");
311    }
312
313    #[test]
314    fn drop_queue_before_sender() {
315        let queue = EventReceiver::<()>::default();
316        let sender = queue.sender().clone();
317        drop(queue);
318        drop(sender);
319    }
320
321    #[test]
322    fn standard_events_order_try_receive() {
323        let mut queue = EventReceiver::default();
324        queue.sender().send("first");
325        queue.sender().send("second");
326        assert_eq!(queue.try_receive().unwrap(), "first");
327        assert_eq!(queue.try_receive().unwrap(), "second");
328        assert_eq!(queue.try_receive(), None);
329    }
330
331    #[test]
332    fn priority_events_order_try_receive() {
333        let mut queue = EventReceiver::default();
334        queue.sender().send("standard");
335        queue.sender().send_with_priority("priority_first");
336        queue.sender().send_with_priority("priority_second");
337        assert_eq!(queue.try_receive().unwrap(), "priority_first");
338        assert_eq!(queue.try_receive().unwrap(), "priority_second");
339        assert_eq!(queue.try_receive().unwrap(), "standard");
340        assert_eq!(queue.try_receive(), None);
341    }
342
343    #[test]
344    fn timer_events_order_try_receive() {
345        let mut queue = EventReceiver::default();
346        queue.sender().send_with_timer("timed_last", *TIMER_TIME * 2);
347        queue.sender().send_with_timer("timed_short", *TIMER_TIME);
348
349        assert_eq!(queue.try_receive(), None);
350        std::thread::sleep(*TIMER_TIME);
351        // The timed event has been received at this point
352        assert_eq!(queue.try_receive().unwrap(), "timed_short");
353        std::thread::sleep(*TIMER_TIME);
354        assert_eq!(queue.try_receive().unwrap(), "timed_last");
355        assert_eq!(queue.try_receive(), None);
356    }
357
358    #[test]
359    fn default_and_timer_events_order_try_receive() {
360        let mut queue = EventReceiver::default();
361        queue.sender().send_with_timer("timed", *TIMER_TIME);
362        queue.sender().send("standard_first");
363        queue.sender().send("standard_second");
364
365        std::thread::sleep(*TIMEOUT);
366        // The timed event has been received at this point
367
368        assert_eq!(queue.try_receive().unwrap(), "timed");
369        assert_eq!(queue.try_receive().unwrap(), "standard_first");
370        assert_eq!(queue.try_receive().unwrap(), "standard_second");
371        assert_eq!(queue.try_receive(), None);
372    }
373
374    #[test]
375    fn priority_and_timer_events_order_try_receive() {
376        let mut queue = EventReceiver::default();
377        queue.sender().send_with_timer("timed", *TIMER_TIME);
378        queue.sender().send_with_priority("priority");
379
380        std::thread::sleep(*TIMEOUT);
381        // The timed event has been received at this point
382
383        assert_eq!(queue.try_receive().unwrap(), "priority");
384        assert_eq!(queue.try_receive().unwrap(), "timed");
385        assert_eq!(queue.try_receive(), None);
386    }
387
388    #[test]
389    fn cancel_timers() {
390        let mut queue = EventReceiver::default();
391        let id = queue.sender().send_with_timer("timed", *TIMER_TIME);
392        queue.sender().cancel_timer(id);
393
394        std::thread::sleep(*TIMEOUT);
395        // The timed event has been received at this point, but was cancelled.
396
397        assert_eq!(queue.try_receive(), None);
398    }
399}