1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use super::PubSub;
use many_to_many::ManyToMany;
use std::sync::{Arc, Mutex};
use std::hash::Hash;
#[derive(Clone)]
pub struct Hive<ClientKey: Clone + Eq + Hash, TopicKey: Clone + Eq + Hash, Data> {
callback: Arc<Mutex<Box<dyn Fn(Vec<ClientKey>, Data) + Send>>>,
pub map: Arc<Mutex<ManyToMany<ClientKey, TopicKey>>>,
}
impl<ClientKey: Clone + Eq + Hash, TopicKey: Clone + Eq + Hash, Data> Hive<ClientKey, TopicKey, Data> {
pub fn new
<F: 'static + Fn(Vec<ClientKey>, Data) + Send>
(callback: F) -> Hive<ClientKey, TopicKey, Data> {
Hive {
callback: Arc::new(Mutex::new(Box::new(callback))),
map: Arc::new(Mutex::new(ManyToMany::new()))
}
}
}
impl<ClientKey: Clone + Eq + Hash, TopicKey: Clone + Eq + Hash, Data> PubSub<ClientKey, TopicKey, Data> for Hive<ClientKey, TopicKey, Data> {
fn drop_client(&self, id: &ClientKey) -> Result<(), String> {
self.map
.lock()
.map_err(|_| "Failed to lock many-to-many map.".to_string())?
.remove_left(id);
Ok(())
}
fn drop_topic(&self, id: &TopicKey) -> Result<(), String> {
self.map
.lock()
.map_err(|_| "Failed to lock many-to-many map.".to_string())?
.remove_right(id);
Ok(())
}
fn subscribe(&self, client: ClientKey, topic: TopicKey) -> Result<(), String> {
self.map
.lock()
.map_err(|_| "Failed to lock many-to-many map.".to_string())?
.insert(client, topic);
Ok(())
}
fn subscribe_multiple(&self, client: ClientKey, topics: Vec<TopicKey>) -> Result<(), String> {
let mut map = self.map
.lock()
.map_err(|_| "Failed to lock many-to-many map.".to_string())?;
for topic in topics {
map.insert(client.clone(), topic);
}
Ok(())
}
fn unsubscribe(&self, client: &ClientKey, topic: &TopicKey) -> Result<(), String> {
self.map
.lock()
.map_err(|_| "Failed to lock many-to-many map.".to_string())?
.remove(client, topic);
Ok(())
}
fn unsubscribe_multiple(&self, client: &ClientKey, topics: &Vec<TopicKey>) -> Result<(), String> {
let mut map = self.map
.lock()
.map_err(|_| "Failed to lock many-to-many map.".to_string())?;
for topic in topics {
map.remove(client, topic);
}
Ok(())
}
fn publish(&self, topic: &TopicKey, data: Data) -> Result<(), String> {
let topics = self.map
.lock()
.map_err(|_| "Failed to lock many-to-many map.".to_string())?
.get_right(topic);
if let Some(clients) = topics {
(*self.callback.lock().map_err(|_| "Failed to lock callback.")?)(clients, data);
}
Ok(())
}
}