1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use std::{
collections::HashMap,
sync::{
mpsc,
atomic::{AtomicUsize, Ordering},
RwLock,
Mutex,
},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ListenerId(usize);
pub fn get_listener_id() -> ListenerId {
static LAST_ID: AtomicUsize = AtomicUsize::new(0);
ListenerId(LAST_ID.fetch_add(1, Ordering::SeqCst))
}
#[derive(Default)]
pub struct MessageQueue<T> {
messages: RwLock<Vec<T>>,
message_listeners: Mutex<HashMap<ListenerId, mpsc::Sender<()>>>,
}
impl<T: Clone> MessageQueue<T> {
pub fn new() -> MessageQueue<T> {
MessageQueue {
messages: RwLock::new(Vec::new()),
message_listeners: Mutex::new(HashMap::new()),
}
}
pub fn push_messages(&self, new_messages: &[T]) {
let message_listeners = self.message_listeners.lock().unwrap();
{
let mut messages = self.messages.write().unwrap();
messages.extend_from_slice(new_messages);
}
for listener in message_listeners.values() {
listener.send(()).unwrap();
}
}
pub fn subscribe(&self, sender: mpsc::Sender<()>) -> ListenerId {
let id = get_listener_id();
let mut message_listeners = self.message_listeners.lock().unwrap();
message_listeners.insert(id, sender);
id
}
pub fn unsubscribe(&self, id: ListenerId) {
let mut message_listeners = self.message_listeners.lock().unwrap();
message_listeners.remove(&id);
}
pub fn get_message_cursor(&self) -> u32 {
self.messages.read().unwrap().len() as u32
}
pub fn get_messages_since(&self, cursor: u32) -> (u32, Vec<T>) {
let messages = self.messages.read().unwrap();
let current_cursor = messages.len() as u32;
if cursor >= current_cursor {
return (current_cursor, Vec::new());
}
(current_cursor, messages[(cursor as usize)..].to_vec())
}
}