pub_sub/lib.rs
1// Copyright (c) 2016 creato
2//
3// Permission is hereby granted, free of charge, to any person obtaining a copy
4// of this software and associated documentation files (the "Software"), to deal
5// in the Software without restriction, including without limitation the rights
6// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7// copies of the Software, and to permit persons to whom the Software is
8// furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in all
11// copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19// SOFTWARE.
20
21#![warn(missing_docs)]
22
23//! A basic publish/subscribe channel.
24//!
25//! # Usage
26//!
27//! Add to crate dependencies:
28//!
29//! ```toml
30//! [dependencies]
31//! pub-sub = "*"
32//! ```
33//! Import in crate root:
34//!
35//! ```
36//! extern crate pub_sub;
37//! ```
38//!
39//! # Example
40//!
41//! ```
42//! extern crate pub_sub;
43//! extern crate uuid;
44//!
45//! use std::thread;
46//! use uuid::Uuid;
47//!
48//! fn main() {
49//! let channel = pub_sub::PubSub::new();
50//!
51//! let mut handles = vec![];
52//!
53//! for _ in 0..16 {
54//! let recv = channel.subscribe();
55//!
56//! handles.push(thread::spawn(move || {
57//! for _ in 0..16 {
58//! println!("recevied {}", recv.recv().unwrap());
59//! }
60//! }));
61//! }
62//!
63//! for _ in 0..16 {
64//! let channel = channel.clone();
65//!
66//! handles.push(thread::spawn(move || {
67//! let msg_id = Uuid::new_v4();
68//! println!(" sent {}", msg_id);
69//! channel.send(msg_id).unwrap();
70//! }));
71//! }
72//!
73//! while let Some(handle) = handles.pop() {
74//! handle.join().unwrap();
75//! }
76//! }
77//! ```
78
79
80extern crate uuid;
81
82use std::sync::{mpsc, Arc, Mutex};
83use std::collections::HashMap;
84
85
86/// Pub/sub channel.
87#[derive(Clone)]
88pub struct PubSub<T: Clone> {
89 senders: Arc<Mutex<HashMap<uuid::Uuid, mpsc::Sender<T>>>>,
90}
91
92/// Subscription to a pub/sub channel
93pub struct Subscription<T: Clone> {
94 receiver: mpsc::Receiver<T>,
95 senders: Arc<Mutex<HashMap<uuid::Uuid, mpsc::Sender<T>>>>,
96 id: uuid::Uuid,
97}
98
99impl<T: Clone> PubSub<T> {
100 /// Create a pub/sub channel
101 pub fn new() -> PubSub<T> {
102 PubSub { senders: Arc::new(Mutex::new(HashMap::new())) }
103 }
104
105 /// Attempts to broadcast
106 pub fn send(&self, it: T) -> Result<(), mpsc::SendError<T>> {
107 let senders = self.senders.lock().unwrap();
108
109 for (_, sender) in senders.iter() {
110 match sender.send(it.clone()) {
111 Ok(_) => {}
112 Err(err) => return Err(err),
113 }
114 }
115
116 Ok(())
117 }
118
119 /// Create a new subscription to the channel.
120 pub fn subscribe(&self) -> Subscription<T> {
121 let id = uuid::Uuid::new_v4();
122 let (send, recv) = mpsc::channel();
123
124 {
125 let mut senders = self.senders.lock().unwrap();
126 senders.insert(id, send);
127 }
128
129 Subscription {
130 receiver: recv,
131 senders: self.senders.clone(),
132 id: id,
133 }
134 }
135}
136
137impl<T: Clone> Subscription<T> {
138 /// Receives a single message. Blocks until a message is available.
139 pub fn recv(&self) -> Result<T, mpsc::RecvError> {
140 self.receiver.recv()
141 }
142
143 /// Tries to receive a single message, not blocking if one is not available.
144 pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
145 self.receiver.try_recv()
146 }
147
148 /// Creates an iterator that will block waiting for messages.
149 pub fn iter(&self) -> mpsc::Iter<T> {
150 self.receiver.iter()
151 }
152}
153
154impl<T: Clone> Drop for Subscription<T> {
155 /// Remove our sender ID from the sender list.
156 fn drop(&mut self) {
157 let mut senders = self.senders.lock().unwrap();
158 senders.remove(&self.id);
159 }
160}
161
162impl<T: Clone> Clone for Subscription<T> {
163 fn clone(&self) -> Self {
164 PubSub { senders: self.senders.clone() }.subscribe()
165 }
166}
167
168
169#[cfg(test)]
170mod tests {
171 use std;
172
173 use super::*;
174
175 #[test]
176 fn many_senders() {
177 use std::sync::atomic::{AtomicUsize, Ordering};
178
179 let send = PubSub::new();
180 let recv = send.subscribe();
181
182 let threads = 5;
183 let pulses = 50;
184
185 let received = std::sync::Arc::new(AtomicUsize::new(0));
186
187 for _ in 0..threads {
188 let recv = recv.clone();
189 let received = received.clone();
190 std::thread::spawn(move || {
191 while let Ok(_) = recv.recv() {
192 received.fetch_add(1, Ordering::AcqRel);
193 }
194 });
195 }
196
197 let mut accum = 0;
198
199 for _ in 0..pulses {
200 accum += 1;
201 send.send(accum).unwrap();
202 }
203
204 std::thread::sleep(std::time::Duration::from_millis(75));
205 assert_eq!(received.load(Ordering::Acquire), threads * pulses);
206 }
207}