1use crossbeam_channel::{self, Sender, Receiver, select};
2
3use std::time::{Instant, Duration};
4use std::collections::{BTreeMap};
5
6pub fn split<E: Send + 'static>() -> (EventSender<E>, EventReceiver<E>) {
18 let event_queue = EventReceiver::default();
19 let event_sender = event_queue.sender().clone();
20
21 (event_sender, event_queue)
22}
23
24#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
27pub struct TimerId(Instant);
28
29enum TimerCommand<E> {
31 Create(E),
32 Cancel,
33}
34
35pub struct EventReceiver<E> {
40 event_sender: EventSender<E>, receiver: Receiver<E>,
42 timer_receiver: Receiver<(Instant, TimerCommand<E>)>,
43 priority_receiver: Receiver<E>,
44 timers: BTreeMap<Instant, E>,
45}
46
47impl<E> Default for EventReceiver<E>
48where E: Send + 'static
49{
50 fn default() -> Self {
52 let (sender, receiver) = crossbeam_channel::unbounded();
53 let (timer_sender, timer_receiver) = crossbeam_channel::unbounded();
54 let (priority_sender, priority_receiver) = crossbeam_channel::unbounded();
55 EventReceiver {
56 event_sender: EventSender::new(sender, timer_sender, priority_sender),
57 receiver,
58 timer_receiver,
59 priority_receiver,
60 timers: BTreeMap::new(),
61 }
62 }
63}
64
65impl<E> EventReceiver<E>
66where E: Send + 'static
67{
68 pub fn sender(&self) -> &EventSender<E> {
72 &self.event_sender
73 }
74
75 fn enque_timers(&mut self) {
76 for timer in self.timer_receiver.try_iter() {
77 match timer.1 {
78 TimerCommand::Create(e) => self.timers.insert(timer.0, e),
79 TimerCommand::Cancel => self.timers.remove(&timer.0),
80 };
81 }
82 }
83
84 pub fn receive(&mut self) -> E {
86 self.enque_timers();
87 if !self.priority_receiver.is_empty() {
91 self.priority_receiver.recv().unwrap()
92 }
93 else if self.timers.is_empty() {
94 select! {
95 recv(self.receiver) -> event => event.unwrap(),
96 recv(self.priority_receiver) -> event => event.unwrap(),
97 }
98 }
99 else {
100 let next_instant = *self.timers.iter().next().unwrap().0;
101 if next_instant <= Instant::now() {
102 self.timers.remove(&next_instant).unwrap()
103 }
104 else {
105 select! {
106 recv(self.receiver) -> event => event.unwrap(),
107 recv(self.priority_receiver) -> event => event.unwrap(),
108 recv(crossbeam_channel::at(next_instant)) -> _ => {
109 self.timers.remove(&next_instant).unwrap()
110 }
111 }
112 }
113 }
114 }
115
116 pub fn receive_timeout(&mut self, timeout: Duration) -> Option<E> {
119 self.enque_timers();
120
121 if !self.priority_receiver.is_empty() {
122 Some(self.priority_receiver.recv().unwrap())
123 }
124 else if self.timers.is_empty() {
125 select! {
126 recv(self.receiver) -> event => Some(event.unwrap()),
127 recv(self.priority_receiver) -> event => Some(event.unwrap()),
128 default(timeout) => None
129 }
130 }
131 else {
132 let next_instant = *self.timers.iter().next().unwrap().0;
133 if next_instant <= Instant::now() {
134 self.timers.remove(&next_instant)
135 }
136 else {
137 select! {
138 recv(self.receiver) -> event => Some(event.unwrap()),
139 recv(self.priority_receiver) -> event => Some(event.unwrap()),
140 recv(crossbeam_channel::at(next_instant)) -> _ => {
141 self.timers.remove(&next_instant)
142 }
143 default(timeout) => None
144 }
145 }
146 }
147 }
148
149 pub fn try_receive(&mut self) -> Option<E> {
152 self.enque_timers();
153
154 if let Ok(priority_event) = self.priority_receiver.try_recv() {
155 return Some(priority_event);
156 }
157 else if let Some(next_instant) = self.timers.iter().next() {
158 if *next_instant.0 <= Instant::now() {
159 let instant = *next_instant.0;
160 return self.timers.remove(&instant);
161 }
162 }
163 else if let Ok(event) = self.receiver.try_recv() {
164 return Some(event);
165 }
166
167 None
168 }
169}
170
171pub struct EventSender<E> {
174 sender: Sender<E>,
175 timer_sender: Sender<(Instant, TimerCommand<E>)>,
176 priority_sender: Sender<E>,
177}
178
179impl<E> EventSender<E>
180where E: Send + 'static
181{
182 fn new(
183 sender: Sender<E>,
184 timer_sender: Sender<(Instant, TimerCommand<E>)>,
185 priority_sender: Sender<E>,
186 ) -> EventSender<E> {
187 EventSender { sender, timer_sender, priority_sender }
188 }
189
190 pub fn send(&self, event: E) {
192 self.sender.send(event).ok();
193 }
194
195 pub fn send_with_priority(&self, event: E) {
199 self.priority_sender.send(event).ok();
200 }
201
202 pub fn send_with_timer(&self, event: E, duration: Duration) -> TimerId {
207 let when = Instant::now() + duration;
208 self.timer_sender.send((when, TimerCommand::Create(event))).ok();
209 TimerId(when)
210 }
211
212 pub fn cancel_timer(&self, timer_id: TimerId) {
215 self.timer_sender.send((timer_id.0, TimerCommand::Cancel)).ok();
216 }
217}
218
219impl<E> Clone for EventSender<E>
220where E: Send + 'static
221{
222 fn clone(&self) -> Self {
223 EventSender::new(
224 self.sender.clone(),
225 self.timer_sender.clone(),
226 self.priority_sender.clone(),
227 )
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234
235 const DELAY: u64 = 2000; lazy_static::lazy_static! {
240 static ref ZERO_MS: Duration = Duration::from_millis(0);
241 static ref TIMER_TIME: Duration = Duration::from_millis(100);
242 static ref TIMEOUT: Duration = *TIMER_TIME * 2 + Duration::from_millis(DELAY);
243 }
244
245 #[test]
246 fn waiting_timer_event() {
247 let mut queue = EventReceiver::default();
248 queue.sender().send_with_timer("Timed", *TIMER_TIME);
249 assert_eq!(queue.receive_timeout(*TIMEOUT).unwrap(), "Timed");
250 }
251
252 #[test]
253 fn standard_events_order() {
254 let mut queue = EventReceiver::default();
255 queue.sender().send("first");
256 queue.sender().send("second");
257 assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "first");
258 assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "second");
259 }
260
261 #[test]
262 fn priority_events_order() {
263 let mut queue = EventReceiver::default();
264 queue.sender().send("standard");
265 queue.sender().send_with_priority("priority_first");
266 queue.sender().send_with_priority("priority_second");
267 assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "priority_first");
268 assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "priority_second");
269 assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "standard");
270 }
271
272 #[test]
273 fn timer_events_order() {
274 let mut queue = EventReceiver::default();
275 queue.sender().send_with_timer("timed_last", *TIMER_TIME * 2);
276 queue.sender().send_with_timer("timed_short", *TIMER_TIME);
277
278 std::thread::sleep(*TIMEOUT);
279 assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "timed_short");
282 assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "timed_last");
283 }
284
285 #[test]
286 fn default_and_timer_events_order() {
287 let mut queue = EventReceiver::default();
288 queue.sender().send_with_timer("timed", *TIMER_TIME);
289 queue.sender().send("standard_first");
290 queue.sender().send("standard_second");
291
292 std::thread::sleep(*TIMEOUT);
293 assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "timed");
296 assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "standard_first");
297 assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "standard_second");
298 }
299
300 #[test]
301 fn priority_and_timer_events_order() {
302 let mut queue = EventReceiver::default();
303 queue.sender().send_with_timer("timed", *TIMER_TIME);
304 queue.sender().send_with_priority("priority");
305
306 std::thread::sleep(*TIMEOUT);
307 assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "priority");
310 assert_eq!(queue.receive_timeout(*ZERO_MS).unwrap(), "timed");
311 }
312
313 #[test]
314 fn drop_queue_before_sender() {
315 let queue = EventReceiver::<()>::default();
316 let sender = queue.sender().clone();
317 drop(queue);
318 drop(sender);
319 }
320
321 #[test]
322 fn standard_events_order_try_receive() {
323 let mut queue = EventReceiver::default();
324 queue.sender().send("first");
325 queue.sender().send("second");
326 assert_eq!(queue.try_receive().unwrap(), "first");
327 assert_eq!(queue.try_receive().unwrap(), "second");
328 assert_eq!(queue.try_receive(), None);
329 }
330
331 #[test]
332 fn priority_events_order_try_receive() {
333 let mut queue = EventReceiver::default();
334 queue.sender().send("standard");
335 queue.sender().send_with_priority("priority_first");
336 queue.sender().send_with_priority("priority_second");
337 assert_eq!(queue.try_receive().unwrap(), "priority_first");
338 assert_eq!(queue.try_receive().unwrap(), "priority_second");
339 assert_eq!(queue.try_receive().unwrap(), "standard");
340 assert_eq!(queue.try_receive(), None);
341 }
342
343 #[test]
344 fn timer_events_order_try_receive() {
345 let mut queue = EventReceiver::default();
346 queue.sender().send_with_timer("timed_last", *TIMER_TIME * 2);
347 queue.sender().send_with_timer("timed_short", *TIMER_TIME);
348
349 assert_eq!(queue.try_receive(), None);
350 std::thread::sleep(*TIMER_TIME);
351 assert_eq!(queue.try_receive().unwrap(), "timed_short");
353 std::thread::sleep(*TIMER_TIME);
354 assert_eq!(queue.try_receive().unwrap(), "timed_last");
355 assert_eq!(queue.try_receive(), None);
356 }
357
358 #[test]
359 fn default_and_timer_events_order_try_receive() {
360 let mut queue = EventReceiver::default();
361 queue.sender().send_with_timer("timed", *TIMER_TIME);
362 queue.sender().send("standard_first");
363 queue.sender().send("standard_second");
364
365 std::thread::sleep(*TIMEOUT);
366 assert_eq!(queue.try_receive().unwrap(), "timed");
369 assert_eq!(queue.try_receive().unwrap(), "standard_first");
370 assert_eq!(queue.try_receive().unwrap(), "standard_second");
371 assert_eq!(queue.try_receive(), None);
372 }
373
374 #[test]
375 fn priority_and_timer_events_order_try_receive() {
376 let mut queue = EventReceiver::default();
377 queue.sender().send_with_timer("timed", *TIMER_TIME);
378 queue.sender().send_with_priority("priority");
379
380 std::thread::sleep(*TIMEOUT);
381 assert_eq!(queue.try_receive().unwrap(), "priority");
384 assert_eq!(queue.try_receive().unwrap(), "timed");
385 assert_eq!(queue.try_receive(), None);
386 }
387
388 #[test]
389 fn cancel_timers() {
390 let mut queue = EventReceiver::default();
391 let id = queue.sender().send_with_timer("timed", *TIMER_TIME);
392 queue.sender().cancel_timer(id);
393
394 std::thread::sleep(*TIMEOUT);
395 assert_eq!(queue.try_receive(), None);
398 }
399}