pub_sub/
lib.rs

1// Copyright (c) 2016 creato
2//
3// Permission is hereby granted, free of charge, to any person obtaining a copy
4// of this software and associated documentation files (the "Software"), to deal
5// in the Software without restriction, including without limitation the rights
6// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7// copies of the Software, and to permit persons to whom the Software is
8// furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in all
11// copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19// SOFTWARE.
20
21#![warn(missing_docs)]
22
23//! A basic publish/subscribe channel.
24//!
25//! # Usage
26//!
27//! Add to crate dependencies:
28//!
29//! ```toml
30//! [dependencies]
31//! pub-sub = "*"
32//! ```
33//! Import in crate root:
34//!
35//! ```
36//! extern crate pub_sub;
37//! ```
38//!
39//! # Example
40//!
41//! ```
42//! extern crate pub_sub;
43//! extern crate uuid;
44//!
45//! use std::thread;
46//! use uuid::Uuid;
47//!
48//! fn main() {
49//!    let channel = pub_sub::PubSub::new();
50//!
51//!    let mut handles = vec![];
52//!
53//!    for _ in 0..16 {
54//!        let recv = channel.subscribe();
55//!
56//!         handles.push(thread::spawn(move || {
57//!             for _ in 0..16 {
58//!                 println!("recevied {}", recv.recv().unwrap());
59//!             }
60//!         }));
61//!     }
62//!
63//!     for _ in 0..16 {
64//!         let channel = channel.clone();
65//!
66//!         handles.push(thread::spawn(move || {
67//!             let msg_id = Uuid::new_v4();
68//!             println!("    sent {}", msg_id);
69//!             channel.send(msg_id).unwrap();
70//!         }));
71//!     }
72//!
73//!     while let Some(handle) = handles.pop() {
74//!         handle.join().unwrap();
75//!     }
76//! }
77//! ```
78
79
80extern crate uuid;
81
82use std::sync::{mpsc, Arc, Mutex};
83use std::collections::HashMap;
84
85
86/// Pub/sub channel.
87#[derive(Clone)]
88pub struct PubSub<T: Clone> {
89    senders: Arc<Mutex<HashMap<uuid::Uuid, mpsc::Sender<T>>>>,
90}
91
92/// Subscription to a pub/sub channel
93pub struct Subscription<T: Clone> {
94    receiver: mpsc::Receiver<T>,
95    senders: Arc<Mutex<HashMap<uuid::Uuid, mpsc::Sender<T>>>>,
96    id: uuid::Uuid,
97}
98
99impl<T: Clone> PubSub<T> {
100    /// Create a pub/sub channel
101    pub fn new() -> PubSub<T> {
102        PubSub { senders: Arc::new(Mutex::new(HashMap::new())) }
103    }
104
105    /// Attempts to broadcast
106    pub fn send(&self, it: T) -> Result<(), mpsc::SendError<T>> {
107        let senders = self.senders.lock().unwrap();
108
109        for (_, sender) in senders.iter() {
110            match sender.send(it.clone()) {
111                Ok(_) => {}
112                Err(err) => return Err(err),
113            }
114        }
115
116        Ok(())
117    }
118
119    /// Create a new subscription to the channel.
120    pub fn subscribe(&self) -> Subscription<T> {
121        let id = uuid::Uuid::new_v4();
122        let (send, recv) = mpsc::channel();
123
124        {
125            let mut senders = self.senders.lock().unwrap();
126            senders.insert(id, send);
127        }
128
129        Subscription {
130            receiver: recv,
131            senders: self.senders.clone(),
132            id: id,
133        }
134    }
135}
136
137impl<T: Clone> Subscription<T> {
138    /// Receives a single message. Blocks until a message is available.
139    pub fn recv(&self) -> Result<T, mpsc::RecvError> {
140        self.receiver.recv()
141    }
142
143    /// Tries to receive a single message, not blocking if one is not available.
144    pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
145        self.receiver.try_recv()
146    }
147
148    /// Creates an iterator that will block waiting for messages.
149    pub fn iter(&self) -> mpsc::Iter<T> {
150        self.receiver.iter()
151    }
152}
153
154impl<T: Clone> Drop for Subscription<T> {
155    /// Remove our sender ID from the sender list.
156    fn drop(&mut self) {
157        let mut senders = self.senders.lock().unwrap();
158        senders.remove(&self.id);
159    }
160}
161
162impl<T: Clone> Clone for Subscription<T> {
163    fn clone(&self) -> Self {
164        PubSub { senders: self.senders.clone() }.subscribe()
165    }
166}
167
168
169#[cfg(test)]
170mod tests {
171    use std;
172
173    use super::*;
174
175    #[test]
176    fn many_senders() {
177        use std::sync::atomic::{AtomicUsize, Ordering};
178
179        let send = PubSub::new();
180        let recv = send.subscribe();
181
182        let threads = 5;
183        let pulses = 50;
184
185        let received = std::sync::Arc::new(AtomicUsize::new(0));
186
187        for _ in 0..threads {
188            let recv = recv.clone();
189            let received = received.clone();
190            std::thread::spawn(move || {
191                while let Ok(_) = recv.recv() {
192                    received.fetch_add(1, Ordering::AcqRel);
193                }
194            });
195        }
196
197        let mut accum = 0;
198
199        for _ in 0..pulses {
200            accum += 1;
201            send.send(accum).unwrap();
202        }
203
204        std::thread::sleep(std::time::Duration::from_millis(75));
205        assert_eq!(received.load(Ordering::Acquire), threads * pulses);
206    }
207}