rdispatcher/
lib.rs

1use std::collections::HashMap;
2use std::sync::{mpsc, Arc, Mutex};
3use std::thread;
4use std::hash::{Hash, SipHasher, Hasher};
5use std::clone::Clone;
6
7// Aliases for easier refactoring
8pub type SubscribeHandle<T> = mpsc::Sender<DispatchMessage<T>>;
9pub type BroadcastHandle<T> = mpsc::Receiver<DispatchMessage<T>>;
10
11#[derive(Clone)]
12pub struct DispatchMessage<T> where T: Hash + Send + Clone {
13   pub dispatch_type: T,
14   pub payload: String
15}
16
17pub struct Dispatcher<T> where T: Hash + Send + Clone {
18    subscribers: HashMap<String, Vec<SubscribeHandle<T>>>,
19    broadcasters: Vec<Arc<Mutex<BroadcastHandle<T>>>>
20}
21
22pub trait Broadcast<T: Hash + Send + Clone> {
23   fn broadcast_handle(&mut self) -> BroadcastHandle<T>;
24}
25
26pub trait Subscribe<T: Hash + Send + Clone> {
27   fn subscribe_handle(&self) -> SubscribeHandle<T>;
28}
29
30impl <T: 'static + Hash + Send + Clone>Dispatcher<T> {
31    pub fn new() -> Dispatcher<T> {
32        Dispatcher { subscribers: HashMap::new(), broadcasters: vec![] }
33    }
34
35    pub fn register_broadcaster(&mut self, broadcaster: &mut Broadcast<T>) {
36       let handle = Arc::new(Mutex::new(broadcaster.broadcast_handle()));
37       self.broadcasters.push(handle);
38    }
39
40    pub fn register_subscriber(&mut self, subscriber: &Subscribe<T>, dispatch_type: T) {
41       let sender = subscriber.subscribe_handle();
42       let type_key = type_to_string(&dispatch_type);
43       let new = match self.subscribers.get_mut(&type_key) {
44          Some(others) => {
45             others.push(sender);
46             None
47          },
48          None => {
49             Some(vec![sender])
50          }
51       };
52       // Improve me. Cant chain because double mut borrow not allowed
53       new.and_then(|new_senders| self.subscribers.insert(type_key, new_senders));
54    }
55
56    pub fn start(&self) {
57       // Assuming that broadcasters.clone() copies the vector, but increase ref count on children
58       for broadcaster in self.broadcasters.clone() {
59          let subscribers = self.subscribers.clone();
60          thread::spawn(move || {
61             loop {
62                let message = broadcaster.lock().unwrap().recv().ok().expect("Couldn't receive message in broadcaster or channel hung up");
63                match subscribers.get(&type_to_string(&message.dispatch_type)) {
64                  Some(ref subs) => { 
65                      for sub in subs.iter() { sub.send(message.clone()).unwrap(); }
66                  },
67                  None => ()
68                }
69
70             }
71          });
72       }
73    }
74
75    #[allow(dead_code)]
76    fn num_broadcasters(&self) -> usize {
77       self.broadcasters.len()
78    }
79
80    #[allow(dead_code)]
81    fn num_subscribers(&self, dispatch_type: T) -> usize {
82       match self.subscribers.get(&type_to_string(&dispatch_type)) {
83          Some(subscribers) => subscribers.len(),
84          None => 0
85       }
86    }
87}
88
89// Convert to hashable for dispatchtype?
90fn type_to_string<T: Hash>(t: &T) -> String {
91   let mut s = SipHasher::new();
92   t.hash(&mut s);
93   s.finish().to_string()
94}
95
96#[cfg(test)]
97mod test {
98    use std::sync::mpsc;
99    use self::DispatchType::*;
100    use super::*;
101
102    #[derive(PartialEq, Clone, Hash, Debug)]
103    pub enum DispatchType {
104        OutgoingMessage,
105        RawIncomingMessage
106    }
107
108    fn setup_dispatcher() -> Dispatcher<DispatchType> {
109        Dispatcher::new()
110    }
111
112    #[test]
113    fn test_register_broadcaster() {
114        let mut dispatcher = setup_dispatcher();
115        let mut brd = TestBroadcaster::new();
116        assert_eq!(dispatcher.num_broadcasters(), 0);
117        dispatcher.register_broadcaster(&mut brd);
118        assert_eq!(dispatcher.num_broadcasters(), 1);
119    }
120
121    #[test]
122    fn test_register_subscriber() {
123        let mut dispatcher = setup_dispatcher();
124        let sub = TestSubscriber::new();
125        assert_eq!(dispatcher.num_subscribers(OutgoingMessage), 0);
126        dispatcher.register_subscriber(&sub, OutgoingMessage);
127        assert_eq!(dispatcher.num_subscribers(OutgoingMessage), 1);
128    }
129
130    #[test]
131    fn test_register_multiple_subscribers() {
132        let mut dispatcher = setup_dispatcher();
133        let sub = TestSubscriber::new();
134        let sub2 = TestSubscriber::new();
135
136        assert_eq!(dispatcher.num_subscribers(OutgoingMessage), 0);
137        dispatcher.register_subscriber(&sub, OutgoingMessage);
138        dispatcher.register_subscriber(&sub2, OutgoingMessage);
139        assert_eq!(dispatcher.num_subscribers(OutgoingMessage), 2);
140    }
141
142    #[test]
143    fn test_broadcast_simple_message() {
144        let mut dispatcher = setup_dispatcher();
145        let sub = TestSubscriber::new();
146        let mut brd = TestBroadcaster::new();
147        dispatcher.register_broadcaster(&mut brd);
148        dispatcher.register_subscriber(&sub, OutgoingMessage);
149
150        dispatcher.start();
151
152        brd.broadcast(OutgoingMessage, "Hello world!".to_string());
153        let message = sub.receiver.recv().unwrap();
154        assert_eq!(message.dispatch_type, OutgoingMessage);
155        assert_eq!(message.payload, "Hello world!");
156    }
157
158    #[test]
159    fn test_broadcast_multiple_to_one() {
160        let mut dispatcher = setup_dispatcher();
161        let sub = TestSubscriber::new();
162        let mut brd = TestBroadcaster::new();
163        dispatcher.register_broadcaster(&mut brd);
164        dispatcher.register_subscriber(&sub, OutgoingMessage);
165        dispatcher.register_subscriber(&sub, RawIncomingMessage);
166
167        dispatcher.start();
168
169        brd.broadcast(OutgoingMessage, "Hello world!".to_string());
170        let message = sub.receiver.recv().unwrap();
171        assert_eq!(message.dispatch_type, OutgoingMessage);
172        assert_eq!(message.payload, "Hello world!");
173        brd.broadcast(RawIncomingMessage, "Hello world!".to_string());
174        let message = sub.receiver.recv().unwrap();
175        assert_eq!(message.dispatch_type, RawIncomingMessage);
176        assert_eq!(message.payload, "Hello world!");
177    }
178
179    // #[test]
180    // TODO Test is pending, this features needs to be implemented
181    // or the problem avoided
182    //
183    // fn test_broadcast_simple_message_with_complex_enum() {
184    //     let mut dispatcher = setup_dispatcher();
185    //     let sub = TestSubscriber::new();
186    //     let mut brd = TestBroadcaster::new();
187    //     dispatcher.register_broadcaster(&mut brd);
188    //     dispatcher.register_subscriber(&sub, SomethingComplex(String::new()));
189
190    //     dispatcher.start();
191
192    //     brd.broadcast(SomethingComplex("abc".to_string()), "Hello world!".to_string());
193    //     let message = sub.receiver.recv().unwrap();
194    //     assert_eq!(message.dispatch_type, SomethingComplex("abc".to_string()));
195    //     assert_eq!(message.payload, "Hello world!");
196    // }
197
198    struct TestBroadcaster {
199       sender: Option<SubscribeHandle<DispatchType>>
200    }
201
202    impl TestBroadcaster {
203       fn new() -> TestBroadcaster {
204         TestBroadcaster { sender: None }
205      }
206
207      fn broadcast(&self, dispatch_type: DispatchType, payload: String) {
208         let message = DispatchMessage { dispatch_type: dispatch_type, payload: payload };
209         match self.sender {
210            Some(ref s) => { s.send(message).unwrap(); },
211            None => ()
212         };
213      }
214    }
215
216    impl Broadcast<DispatchType> for TestBroadcaster {
217      fn broadcast_handle(&mut self) -> BroadcastHandle<DispatchType> {
218         let (tx, rx) = mpsc::channel::<DispatchMessage<DispatchType>>();
219         self.sender = Some(tx);
220         rx
221      }
222
223    }
224
225    struct TestSubscriber {
226      receiver: BroadcastHandle<DispatchType>,
227      sender: SubscribeHandle<DispatchType>
228    }
229
230    impl TestSubscriber {
231       fn new() -> TestSubscriber {
232          let(tx, rx) = mpsc::channel::<DispatchMessage<DispatchType>>();
233          TestSubscriber { receiver: rx, sender: tx }
234       }
235    }
236
237    impl Subscribe<DispatchType> for TestSubscriber {
238       fn subscribe_handle(&self) -> SubscribeHandle<DispatchType> {
239          self.sender.clone()
240       }
241    }
242}