1use std::collections::HashMap;
2use std::sync::{mpsc, Arc, Mutex};
3use std::thread;
4use std::hash::{Hash, SipHasher, Hasher};
5use std::clone::Clone;
6
7pub type SubscribeHandle<T> = mpsc::Sender<DispatchMessage<T>>;
9pub type BroadcastHandle<T> = mpsc::Receiver<DispatchMessage<T>>;
10
11#[derive(Clone)]
12pub struct DispatchMessage<T> where T: Hash + Send + Clone {
13 pub dispatch_type: T,
14 pub payload: String
15}
16
17pub struct Dispatcher<T> where T: Hash + Send + Clone {
18 subscribers: HashMap<String, Vec<SubscribeHandle<T>>>,
19 broadcasters: Vec<Arc<Mutex<BroadcastHandle<T>>>>
20}
21
22pub trait Broadcast<T: Hash + Send + Clone> {
23 fn broadcast_handle(&mut self) -> BroadcastHandle<T>;
24}
25
26pub trait Subscribe<T: Hash + Send + Clone> {
27 fn subscribe_handle(&self) -> SubscribeHandle<T>;
28}
29
30impl <T: 'static + Hash + Send + Clone>Dispatcher<T> {
31 pub fn new() -> Dispatcher<T> {
32 Dispatcher { subscribers: HashMap::new(), broadcasters: vec![] }
33 }
34
35 pub fn register_broadcaster(&mut self, broadcaster: &mut Broadcast<T>) {
36 let handle = Arc::new(Mutex::new(broadcaster.broadcast_handle()));
37 self.broadcasters.push(handle);
38 }
39
40 pub fn register_subscriber(&mut self, subscriber: &Subscribe<T>, dispatch_type: T) {
41 let sender = subscriber.subscribe_handle();
42 let type_key = type_to_string(&dispatch_type);
43 let new = match self.subscribers.get_mut(&type_key) {
44 Some(others) => {
45 others.push(sender);
46 None
47 },
48 None => {
49 Some(vec![sender])
50 }
51 };
52 new.and_then(|new_senders| self.subscribers.insert(type_key, new_senders));
54 }
55
56 pub fn start(&self) {
57 for broadcaster in self.broadcasters.clone() {
59 let subscribers = self.subscribers.clone();
60 thread::spawn(move || {
61 loop {
62 let message = broadcaster.lock().unwrap().recv().ok().expect("Couldn't receive message in broadcaster or channel hung up");
63 match subscribers.get(&type_to_string(&message.dispatch_type)) {
64 Some(ref subs) => {
65 for sub in subs.iter() { sub.send(message.clone()).unwrap(); }
66 },
67 None => ()
68 }
69
70 }
71 });
72 }
73 }
74
75 #[allow(dead_code)]
76 fn num_broadcasters(&self) -> usize {
77 self.broadcasters.len()
78 }
79
80 #[allow(dead_code)]
81 fn num_subscribers(&self, dispatch_type: T) -> usize {
82 match self.subscribers.get(&type_to_string(&dispatch_type)) {
83 Some(subscribers) => subscribers.len(),
84 None => 0
85 }
86 }
87}
88
89fn type_to_string<T: Hash>(t: &T) -> String {
91 let mut s = SipHasher::new();
92 t.hash(&mut s);
93 s.finish().to_string()
94}
95
96#[cfg(test)]
97mod test {
98 use std::sync::mpsc;
99 use self::DispatchType::*;
100 use super::*;
101
102 #[derive(PartialEq, Clone, Hash, Debug)]
103 pub enum DispatchType {
104 OutgoingMessage,
105 RawIncomingMessage
106 }
107
108 fn setup_dispatcher() -> Dispatcher<DispatchType> {
109 Dispatcher::new()
110 }
111
112 #[test]
113 fn test_register_broadcaster() {
114 let mut dispatcher = setup_dispatcher();
115 let mut brd = TestBroadcaster::new();
116 assert_eq!(dispatcher.num_broadcasters(), 0);
117 dispatcher.register_broadcaster(&mut brd);
118 assert_eq!(dispatcher.num_broadcasters(), 1);
119 }
120
121 #[test]
122 fn test_register_subscriber() {
123 let mut dispatcher = setup_dispatcher();
124 let sub = TestSubscriber::new();
125 assert_eq!(dispatcher.num_subscribers(OutgoingMessage), 0);
126 dispatcher.register_subscriber(&sub, OutgoingMessage);
127 assert_eq!(dispatcher.num_subscribers(OutgoingMessage), 1);
128 }
129
130 #[test]
131 fn test_register_multiple_subscribers() {
132 let mut dispatcher = setup_dispatcher();
133 let sub = TestSubscriber::new();
134 let sub2 = TestSubscriber::new();
135
136 assert_eq!(dispatcher.num_subscribers(OutgoingMessage), 0);
137 dispatcher.register_subscriber(&sub, OutgoingMessage);
138 dispatcher.register_subscriber(&sub2, OutgoingMessage);
139 assert_eq!(dispatcher.num_subscribers(OutgoingMessage), 2);
140 }
141
142 #[test]
143 fn test_broadcast_simple_message() {
144 let mut dispatcher = setup_dispatcher();
145 let sub = TestSubscriber::new();
146 let mut brd = TestBroadcaster::new();
147 dispatcher.register_broadcaster(&mut brd);
148 dispatcher.register_subscriber(&sub, OutgoingMessage);
149
150 dispatcher.start();
151
152 brd.broadcast(OutgoingMessage, "Hello world!".to_string());
153 let message = sub.receiver.recv().unwrap();
154 assert_eq!(message.dispatch_type, OutgoingMessage);
155 assert_eq!(message.payload, "Hello world!");
156 }
157
158 #[test]
159 fn test_broadcast_multiple_to_one() {
160 let mut dispatcher = setup_dispatcher();
161 let sub = TestSubscriber::new();
162 let mut brd = TestBroadcaster::new();
163 dispatcher.register_broadcaster(&mut brd);
164 dispatcher.register_subscriber(&sub, OutgoingMessage);
165 dispatcher.register_subscriber(&sub, RawIncomingMessage);
166
167 dispatcher.start();
168
169 brd.broadcast(OutgoingMessage, "Hello world!".to_string());
170 let message = sub.receiver.recv().unwrap();
171 assert_eq!(message.dispatch_type, OutgoingMessage);
172 assert_eq!(message.payload, "Hello world!");
173 brd.broadcast(RawIncomingMessage, "Hello world!".to_string());
174 let message = sub.receiver.recv().unwrap();
175 assert_eq!(message.dispatch_type, RawIncomingMessage);
176 assert_eq!(message.payload, "Hello world!");
177 }
178
179 struct TestBroadcaster {
199 sender: Option<SubscribeHandle<DispatchType>>
200 }
201
202 impl TestBroadcaster {
203 fn new() -> TestBroadcaster {
204 TestBroadcaster { sender: None }
205 }
206
207 fn broadcast(&self, dispatch_type: DispatchType, payload: String) {
208 let message = DispatchMessage { dispatch_type: dispatch_type, payload: payload };
209 match self.sender {
210 Some(ref s) => { s.send(message).unwrap(); },
211 None => ()
212 };
213 }
214 }
215
216 impl Broadcast<DispatchType> for TestBroadcaster {
217 fn broadcast_handle(&mut self) -> BroadcastHandle<DispatchType> {
218 let (tx, rx) = mpsc::channel::<DispatchMessage<DispatchType>>();
219 self.sender = Some(tx);
220 rx
221 }
222
223 }
224
225 struct TestSubscriber {
226 receiver: BroadcastHandle<DispatchType>,
227 sender: SubscribeHandle<DispatchType>
228 }
229
230 impl TestSubscriber {
231 fn new() -> TestSubscriber {
232 let(tx, rx) = mpsc::channel::<DispatchMessage<DispatchType>>();
233 TestSubscriber { receiver: rx, sender: tx }
234 }
235 }
236
237 impl Subscribe<DispatchType> for TestSubscriber {
238 fn subscribe_handle(&self) -> SubscribeHandle<DispatchType> {
239 self.sender.clone()
240 }
241 }
242}