1use std::sync::{Mutex, RwLock};
2
3use futures::channel::oneshot;
4
5#[derive(Default)]
9pub struct MessageQueue<T> {
10 messages: RwLock<Vec<T>>,
11 message_listeners: Mutex<Vec<Listener<T>>>,
12}
13
14impl<T: Clone> MessageQueue<T> {
15 pub fn new() -> MessageQueue<T> {
16 MessageQueue {
17 messages: RwLock::new(Vec::new()),
18 message_listeners: Mutex::new(Vec::new()),
19 }
20 }
21
22 pub fn push_messages(&self, new_messages: &[T]) {
23 let mut message_listeners = self.message_listeners.lock().unwrap();
24 let mut messages = self.messages.write().unwrap();
25 messages.extend_from_slice(new_messages);
26
27 let mut remaining_listeners = Vec::new();
28
29 for listener in message_listeners.drain(..) {
30 match fire_listener_if_ready(&messages, listener) {
31 Ok(_) => {}
32 Err(listener) => remaining_listeners.push(listener),
33 }
34 }
35
36 *message_listeners = remaining_listeners;
39 }
40
41 pub fn subscribe(&self, cursor: u32) -> oneshot::Receiver<(u32, Vec<T>)> {
43 let (sender, receiver) = oneshot::channel();
44
45 let listener = {
46 let listener = Listener { sender, cursor };
47
48 let messages = self.messages.read().unwrap();
49
50 match fire_listener_if_ready(&messages, listener) {
51 Ok(_) => return receiver,
52 Err(listener) => listener,
53 }
54 };
55
56 let mut message_listeners = self.message_listeners.lock().unwrap();
57 message_listeners.push(listener);
58
59 receiver
60 }
61
62 #[cfg(test)]
67 #[allow(unused)]
68 pub fn subscribe_any(&self) -> oneshot::Receiver<(u32, Vec<T>)> {
69 let cursor = {
70 let messages = self.messages.read().unwrap();
71 messages.len() as u32
72 };
73
74 self.subscribe(cursor)
75 }
76
77 pub fn cursor(&self) -> u32 {
78 self.messages.read().unwrap().len() as u32
79 }
80}
81
82struct Listener<T> {
83 sender: oneshot::Sender<(u32, Vec<T>)>,
84 cursor: u32,
85}
86
87fn fire_listener_if_ready<T: Clone>(
88 messages: &[T],
89 listener: Listener<T>,
90) -> Result<(), Listener<T>> {
91 let current_cursor = messages.len() as u32;
92
93 if listener.cursor < current_cursor {
94 let new_messages = messages[(listener.cursor as usize)..].to_vec();
95 let _ = listener.sender.send((current_cursor, new_messages));
96 Ok(())
97 } else {
98 Err(listener)
99 }
100}