1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use snowflake::ProcessUniqueId;
#[derive(Debug,Clone,Copy,Eq,PartialEq,Hash)]
pub struct Subscription(ProcessUniqueId);
impl Subscription {
fn new() -> Subscription {
Subscription(ProcessUniqueId::new())
}
}
type ListenerMap<T> = HashMap<Subscription, Box<dyn Fn(T) + Send + 'static>>;
#[derive(Clone)]
pub struct ListenerSet<T>
where T: Send
{
listeners: Arc<Mutex<ListenerMap<T>>>,
}
impl<T> ListenerSet<T> where T: Send + Clone
{
pub fn new() -> Self {
ListenerSet { listeners: Arc::new(Mutex::new(ListenerMap::new())) }
}
pub fn subscribe<Listener: Fn(T) + Send + 'static>(&self, listener: Listener) -> Subscription {
let mut acquired_listeners = self.listeners.lock().unwrap();
let subscription = Subscription::new();
acquired_listeners.insert(subscription, Box::new(listener));
subscription
}
pub fn unsubscribe(&self, sub: Subscription) {
let mut acquired_listeners = self.listeners.lock().unwrap();
acquired_listeners.remove(&sub);
}
pub fn notify(&self, payload: &T) {
let listeners = self.listeners.lock().unwrap();
for listener in listeners.values() {
listener(payload.clone())
}
}
#[allow(dead_code)]
pub fn len(&self) -> usize {
self.listeners.lock().unwrap().len()
}
}
#[cfg(test)]
mod tests {
use super::{ListenerSet};
use std::sync::mpsc;
#[test]
fn test_new_listener_set() {
let ls = ListenerSet::<()>::new();
assert_eq!(ls.len(), 0);
}
#[test]
fn test_new_listener_for_chan() {
let ls = ListenerSet::<bool>::new();
ls.subscribe(|_e| {});
assert_eq!(ls.len(), 1);
}
#[test]
fn test_add_listener_to_set() {
let (tx, rx) = mpsc::channel();
let ls = ListenerSet::<bool>::new();
ls.subscribe(move |e| tx.send(e).unwrap());
assert_eq!(ls.len(), 1);
ls.notify(&true);
assert_eq!(rx.recv().is_ok(), true);
}
#[test]
fn test_remove_listener_from_set() {
let (tx, rx) = mpsc::channel();
let ls = ListenerSet::<bool>::new();
let sub = ls.subscribe(move |e| tx.send(e).unwrap());
ls.unsubscribe(sub);
assert_eq!(ls.len(), 0);
ls.notify(&true);
assert_eq!(rx.recv().is_err(), true);
}
}