1#![deny(missing_docs)]
2
3extern crate crossbeam_channel;
6extern crate snafu;
7
8mod error;
9
10use crossbeam_channel::{unbounded, Receiver, Sender};
11use snafu::ensure;
12use std::sync::{Arc, RwLock};
13
14pub use error::Error;
15
16#[derive(Clone)]
21pub struct SubscriptionManager<T: Clone> {
22 sender: Sender<Sender<T>>,
23}
24
25#[derive(Clone)]
27pub struct Broadcaster<T: Clone> {
28 senders: Arc<RwLock<Vec<Sender<T>>>>,
29 inbox: (Sender<Sender<T>>, Receiver<Sender<T>>),
30}
31
32impl<T: Clone> Default for Broadcaster<T> {
33 fn default() -> Self {
34 Broadcaster {
35 senders: Arc::new(RwLock::new(Vec::new())),
36 inbox: unbounded(),
37 }
38 }
39}
40
41impl<T: Clone> Broadcaster<T> {
42 pub fn process_pending_subscriptions(&mut self) {
44 self.process_pending_subscriptions_with(&mut |_| {})
45 }
46
47 pub fn process_pending_subscriptions_with<F: FnMut(&mut Sender<T>)>(
50 &mut self,
51 f: &mut F,
52 ) {
53 let mut senders = self.senders.write().unwrap();
54 while let Ok(mut s) = self.inbox.1.try_recv() {
55 f(&mut s);
56 senders.push(s);
57 }
58 }
59
60 pub fn broadcast(&mut self, item: T) {
62 self.process_pending_subscriptions();
63 let mut senders = self.senders.write().unwrap();
64 senders.retain(|sender| sender.send(item.clone()).is_ok());
65 }
66
67 pub fn subscription_manager(&self) -> SubscriptionManager<T> {
69 SubscriptionManager {
70 sender: self.inbox.0.clone(),
71 }
72 }
73}
74
75impl<T: Clone> SubscriptionRegistration<T> for SubscriptionManager<T> {
76 fn subscribe_with<F: FnMut(&mut Sender<T>)>(
77 &self,
78 f: &mut F,
79 ) -> Result<Receiver<T>, Error> {
80 self.sender.subscribe_with(f)
81 }
82}
83
84impl<T: Clone> SubscriptionRegistration<T> for Sender<Sender<T>> {
85 fn subscribe_with<F: FnMut(&mut Sender<T>)>(
86 &self,
87 f: &mut F,
88 ) -> Result<Receiver<T>, Error> {
89 let (mut sender, receiver) = unbounded();
90 ensure!(
91 self.send(sender.clone()).is_ok(),
92 error::BroadcasterNoLongerExists
93 );
94 f(&mut sender);
95 Ok(receiver)
96 }
97}
98
99impl<T: Clone> SubscriptionRegistration<T> for Broadcaster<T> {
100 fn subscribe_with<F: FnMut(&mut Sender<T>)>(
101 &self,
102 f: &mut F,
103 ) -> Result<Receiver<T>, Error> {
104 self.inbox.0.subscribe_with(f)
105 }
106}
107
108pub trait SubscriptionRegistration<T: Clone> {
110 fn subscribe(&self) -> Result<Receiver<T>, Error> {
114 self.subscribe_with(&mut |_| {})
115 }
116
117 fn subscribe_with<F: FnMut(&mut Sender<T>)>(
122 &self,
123 f: &mut F,
124 ) -> Result<Receiver<T>, Error>;
125}