1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
use actix::prelude::*; use fnv::FnvHasher; use std::any::{Any, TypeId}; use std::collections::HashMap; use std::hash::BuildHasherDefault; use std::mem; use msgs::*; type TypeMap<A> = HashMap<TypeId, A, BuildHasherDefault<FnvHasher>>; #[derive(Default)] pub struct Broker { sub_map: TypeMap<Vec<Box<Any>>>, msg_map: TypeMap<Box<Any>>, } impl Broker { fn take_subs<M: BrokerMsg>(&mut self) -> Option<Vec<Recipient<M>>> where <M as Message>::Result: Send, { let id = TypeId::of::<M>(); let subs = self.sub_map.get_mut(&id)?; trace!("Broker: Found subscription list for {:?}.", id); let mut subs = mem::replace(subs, Vec::new()); let subs = subs.drain(..) .filter_map(|s| s.downcast::<Recipient<M>>().ok()) .map(|s| *s) .collect(); Some(subs) } fn add_sub<M: BrokerMsg>(&mut self, sub: Recipient<M>) where <M as Message>::Result: Send, { let id = TypeId::of::<M>(); let boxed = Box::new(sub); if let Some(subs) = self.sub_map.get_mut(&id) { trace!("Broker: Adding to {:?} subscription list.", id); subs.push(boxed); return; } trace!("Broker: Creating {:?} subscription list.", id); self.sub_map.insert(id, vec![boxed]); } fn get_previous_msg<M: BrokerMsg>(&self) -> Option<M> { let id = TypeId::of::<M>(); let msg = self.msg_map.get(&id)?; trace!("Broker: Previous message found for {:?}", id); let msg = msg.downcast_ref::<M>()?; Some(msg.clone()) } fn set_msg<M: BrokerMsg>(&mut self, msg: M) { let id = TypeId::of::<M>(); let boxed = Box::new(msg); if let Some(pm) = self.msg_map.get_mut(&id) { trace!("Broker: Setting new message value for {:?}", id); mem::replace(pm, boxed); return; } trace!("Broker: Adding first message value for {:?}", id); self.msg_map.insert(id, boxed); } } impl<M: BrokerMsg> Handler<SubscribeAsync<M>> for Broker where <M as Message>::Result: Send, { type Result = (); fn handle(&mut self, msg: SubscribeAsync<M>, _ctx: &mut Context<Self>) { trace!("Broker: Received SubscribeAsync"); self.add_sub::<M>(msg.0); } } impl<M: BrokerMsg> Handler<SubscribeSync<M>> for Broker where <M as Message>::Result: Send, { type Result = Option<M>; fn handle(&mut self, msg: SubscribeSync<M>, _ctx: &mut Context<Self>) -> Self::Result { trace!("Broker: Received SubscribeSync"); self.add_sub::<M>(msg.0); self.get_previous_msg::<M>() } } impl<M: BrokerMsg> Handler<IssueAsync<M>> for Broker where <M as Message>::Result: Send, { type Result = (); fn handle(&mut self, msg: IssueAsync<M>, _ctx: &mut Context<Self>) { trace!("Broker: Received IssueAsync"); if let Some(mut subs) = self.take_subs::<M>() { subs.drain(..) .filter_map(|s| { if s.do_send(msg.0.clone()).is_ok() { Some(s) } else { None } }) .for_each(|s| self.add_sub::<M>(s)); } self.set_msg::<M>(msg.0); } } impl<M: BrokerMsg> Handler<IssueSync<M>> for Broker where <M as Message>::Result: Send, { type Result = (); fn handle(&mut self, msg: IssueSync<M>, ctx: &mut Context<Self>) { trace!("Broker: Received IssueSync"); if let Some(mut subs) = self.take_subs::<M>() { subs.drain(..).for_each(|s| { s.send(msg.0.clone()) .into_actor(self) .map_err(|_, _, _| ()) .map(move |_, act, _| act.add_sub::<M>(s)) .wait(ctx); }); } self.set_msg::<M>(msg.0); } } impl Actor for Broker { type Context = Context<Self>; } impl SystemService for Broker {} impl Supervised for Broker {}