librojo/
message_queue.rs

1use std::sync::{Mutex, RwLock};
2
3use futures::channel::oneshot;
4
5/// A message queue with persistent history that can be subscribed to.
6///
7/// Definitely non-optimal. This would ideally be a lockless mpmc queue.
8#[derive(Default)]
9pub struct MessageQueue<T> {
10    messages: RwLock<Vec<T>>,
11    message_listeners: Mutex<Vec<Listener<T>>>,
12}
13
14impl<T: Clone> MessageQueue<T> {
15    pub fn new() -> MessageQueue<T> {
16        MessageQueue {
17            messages: RwLock::new(Vec::new()),
18            message_listeners: Mutex::new(Vec::new()),
19        }
20    }
21
22    pub fn push_messages(&self, new_messages: &[T]) {
23        let mut message_listeners = self.message_listeners.lock().unwrap();
24        let mut messages = self.messages.write().unwrap();
25        messages.extend_from_slice(new_messages);
26
27        let mut remaining_listeners = Vec::new();
28
29        for listener in message_listeners.drain(..) {
30            match fire_listener_if_ready(&messages, listener) {
31                Ok(_) => {}
32                Err(listener) => remaining_listeners.push(listener),
33            }
34        }
35
36        // Without this annotation, Rust gets confused since the first argument
37        // is a MutexGuard, but the second is a Vec.
38        *message_listeners = remaining_listeners;
39    }
40
41    /// Subscribe to any messages occurring after the given message cursor.
42    pub fn subscribe(&self, cursor: u32) -> oneshot::Receiver<(u32, Vec<T>)> {
43        let (sender, receiver) = oneshot::channel();
44
45        let listener = {
46            let listener = Listener { sender, cursor };
47
48            let messages = self.messages.read().unwrap();
49
50            match fire_listener_if_ready(&messages, listener) {
51                Ok(_) => return receiver,
52                Err(listener) => listener,
53            }
54        };
55
56        let mut message_listeners = self.message_listeners.lock().unwrap();
57        message_listeners.push(listener);
58
59        receiver
60    }
61
62    /// Subscribe to any messages being pushed into the queue.
63    ///
64    /// This method is only useful in tests. Non-test code should use subscribe
65    /// instead.
66    #[cfg(test)]
67    #[allow(unused)]
68    pub fn subscribe_any(&self) -> oneshot::Receiver<(u32, Vec<T>)> {
69        let cursor = {
70            let messages = self.messages.read().unwrap();
71            messages.len() as u32
72        };
73
74        self.subscribe(cursor)
75    }
76
77    pub fn cursor(&self) -> u32 {
78        self.messages.read().unwrap().len() as u32
79    }
80}
81
82struct Listener<T> {
83    sender: oneshot::Sender<(u32, Vec<T>)>,
84    cursor: u32,
85}
86
87fn fire_listener_if_ready<T: Clone>(
88    messages: &[T],
89    listener: Listener<T>,
90) -> Result<(), Listener<T>> {
91    let current_cursor = messages.len() as u32;
92
93    if listener.cursor < current_cursor {
94        let new_messages = messages[(listener.cursor as usize)..].to_vec();
95        let _ = listener.sender.send((current_cursor, new_messages));
96        Ok(())
97    } else {
98        Err(listener)
99    }
100}