ommui_broadcast/
lib.rs

1#![deny(missing_docs)]
2
3//! Broadcast synchronization sending out to all subscribed receivers.
4
5extern crate crossbeam_channel;
6extern crate snafu;
7
8mod error;
9
10use crossbeam_channel::{unbounded, Receiver, Sender};
11use snafu::ensure;
12use std::sync::{Arc, RwLock};
13
14pub use error::Error;
15
16/// A manager for subscriptions.
17/// The manager is bound to a broadcaster and can be used to subscribe to
18/// it's broadcast messages. It can be cloned and sent to other threads
19/// which also want to subscribe to the broadcast.
20#[derive(Clone)]
21pub struct SubscriptionManager<T: Clone> {
22    sender: Sender<Sender<T>>,
23}
24
25/// A broadcaster which sends out broadcasts to all of it's subscribers.
26#[derive(Clone)]
27pub struct Broadcaster<T: Clone> {
28    senders: Arc<RwLock<Vec<Sender<T>>>>,
29    inbox: (Sender<Sender<T>>, Receiver<Sender<T>>),
30}
31
32impl<T: Clone> Default for Broadcaster<T> {
33    fn default() -> Self {
34        Broadcaster {
35            senders: Arc::new(RwLock::new(Vec::new())),
36            inbox: unbounded(),
37        }
38    }
39}
40
41impl<T: Clone> Broadcaster<T> {
42    /// Process all pending subscriptions.
43    pub fn process_pending_subscriptions(&mut self) {
44        self.process_pending_subscriptions_with(&mut |_| {})
45    }
46
47    /// Process all pending subscriptions.
48    /// This method calls a function on each sender before adding it.
49    pub fn process_pending_subscriptions_with<F: FnMut(&mut Sender<T>)>(
50        &mut self,
51        f: &mut F,
52    ) {
53        let mut senders = self.senders.write().unwrap();
54        while let Ok(mut s) = self.inbox.1.try_recv() {
55            f(&mut s);
56            senders.push(s);
57        }
58    }
59
60    /// Broadcast a message to all subscribers.
61    pub fn broadcast(&mut self, item: T) {
62        self.process_pending_subscriptions();
63        let mut senders = self.senders.write().unwrap();
64        senders.retain(|sender| sender.send(item.clone()).is_ok());
65    }
66
67    /// Get a subscription manager which can be cloned or sent.
68    pub fn subscription_manager(&self) -> SubscriptionManager<T> {
69        SubscriptionManager {
70            sender: self.inbox.0.clone(),
71        }
72    }
73}
74
75impl<T: Clone> SubscriptionRegistration<T> for SubscriptionManager<T> {
76    fn subscribe_with<F: FnMut(&mut Sender<T>)>(
77        &self,
78        f: &mut F,
79    ) -> Result<Receiver<T>, Error> {
80        self.sender.subscribe_with(f)
81    }
82}
83
84impl<T: Clone> SubscriptionRegistration<T> for Sender<Sender<T>> {
85    fn subscribe_with<F: FnMut(&mut Sender<T>)>(
86        &self,
87        f: &mut F,
88    ) -> Result<Receiver<T>, Error> {
89        let (mut sender, receiver) = unbounded();
90        ensure!(
91            self.send(sender.clone()).is_ok(),
92            error::BroadcasterNoLongerExists
93        );
94        f(&mut sender);
95        Ok(receiver)
96    }
97}
98
99impl<T: Clone> SubscriptionRegistration<T> for Broadcaster<T> {
100    fn subscribe_with<F: FnMut(&mut Sender<T>)>(
101        &self,
102        f: &mut F,
103    ) -> Result<Receiver<T>, Error> {
104        self.inbox.0.subscribe_with(f)
105    }
106}
107
108/// A trait for subscribing to a broadcaster.
109pub trait SubscriptionRegistration<T: Clone> {
110    /// Subscribe to a broadcaster and return the receiver.
111    /// If the associated broadcaster got dropped in the meantime, an
112    /// error gets returned.
113    fn subscribe(&self) -> Result<Receiver<T>, Error> {
114        self.subscribe_with(&mut |_| {})
115    }
116
117    /// Subscribe to a broadcaster and return the receiver.
118    /// Calls a function on the newly created sender.
119    /// If the associated broadcaster got dropped in the meantime, an
120    /// error gets returned.
121    fn subscribe_with<F: FnMut(&mut Sender<T>)>(
122        &self,
123        f: &mut F,
124    ) -> Result<Receiver<T>, Error>;
125}