myriad/mpmc/
mod.rs

1use std::fmt;
2use std::ops::Deref;
3use std::sync::atomic::*;
4use std::sync::{Arc, Condvar, Mutex};
5
6mod queue;
7mod stack;
8
9pub fn queue<T: Send + 'static>() -> (Sender<T>, Receiver<T>) {
10    let inner = Arc::new(Inner {
11        data: Box::new(queue::Queue::new()),
12        guard: Mutex::new(false),
13        waker: Condvar::new(),
14        connected: AtomicBool::new(true),
15        sleepers: AtomicUsize::new(0),
16    });
17    (Sender::new(inner.clone()), Receiver::new(inner.clone()))
18}
19
20pub fn stack<T: Send + 'static>() -> (Sender<T>, Receiver<T>) {
21    let inner = Arc::new(Inner {
22        data: Box::new(stack::Stack::new()),
23        guard: Mutex::new(false),
24        waker: Condvar::new(),
25        connected: AtomicBool::new(true),
26        sleepers: AtomicUsize::new(0),
27    });
28    (Sender::new(inner.clone()), Receiver::new(inner.clone()))
29}
30
31pub trait LockFree<T> {
32    fn push(&self, item: T);
33    fn pop(&self) -> Option<T>;
34    fn len(&self) -> usize;
35}
36
37struct Inner<T: Send> {
38    data: Box<LockFree<T>>,
39    connected: AtomicBool,
40    guard: Mutex<bool>,
41    waker: Condvar,
42    sleepers: AtomicUsize,
43}
44
45unsafe impl<T: Send> Send for Sender<T> {}
46unsafe impl<T: Send> Sync for Sender<T> {}
47unsafe impl<T: Send> Send for Receiver<T> {}
48unsafe impl<T: Send> Sync for Receiver<T> {}
49
50pub struct Sender<T: Send> {
51    inner: Arc<SendInner<T>>,
52}
53
54pub struct Receiver<T: Send> {
55    inner: Arc<RecvInner<T>>,
56}
57
58struct SendInner<T: Send> {
59    inner: Arc<Inner<T>>,
60}
61
62struct RecvInner<T: Send> {
63    inner: Arc<Inner<T>>,
64}
65
66impl<T: Send> Deref for RecvInner<T> {
67    type Target = Arc<Inner<T>>;
68    fn deref(&self) -> &Arc<Inner<T>> {
69        &self.inner
70    }
71}
72
73impl<T: Send> Deref for SendInner<T> {
74    type Target = Arc<Inner<T>>;
75    fn deref(&self) -> &Arc<Inner<T>> {
76        &self.inner
77    }
78}
79
80impl<T: Send> Drop for RecvInner<T> {
81    fn drop(&mut self) {
82        self.inner.connected.store(false, Ordering::Release);
83    }
84}
85
86impl<T: Send> Drop for SendInner<T> {
87    fn drop(&mut self) {
88        // Disconnect
89        self.inner.connected.store(false, Ordering::Release);
90        // Wake sleepers
91        if self.inner.sleepers.load(Ordering::Acquire) > 0 {
92            *self.inner.guard.lock().unwrap() = true;
93            self.inner.waker.notify_all();
94        }
95    }
96}
97
98impl<T: Send> Sender<T> {
99    fn new(inner: Arc<Inner<T>>) -> Sender<T> {
100        Sender {
101            inner: Arc::new(SendInner { inner }),
102        }
103    }
104
105    pub fn send(&self, data: T) -> Result<(), T> {
106        // Use stricter ordering than release, because this value
107        // can be changed by dropping receivers
108        if self.inner.connected.load(Ordering::Acquire) {
109            self.inner.data.push(data);
110            if self.inner.sleepers.load(Ordering::Acquire) > 0 {
111                *self.inner.guard.lock().unwrap() = true;
112                self.inner.waker.notify_one();
113            }
114            Ok(())
115        } else {
116            // Return ownership
117            Err(data)
118        }
119    }
120
121    pub fn size_hint(&self) -> usize {
122        self.inner.data.len()
123    }
124
125    /// Close the channel
126    pub fn close(self) {}
127}
128
129impl<T: Send> Clone for Sender<T> {
130    fn clone(&self) -> Sender<T> {
131        Sender {
132            inner: self.inner.clone(),
133        }
134    }
135}
136
137#[derive(PartialEq)]
138pub enum Error {
139    Empty,
140    Disconnected,
141}
142
143impl<T: Send> Clone for Receiver<T> {
144    fn clone(&self) -> Receiver<T> {
145        Receiver {
146            inner: self.inner.clone(),
147        }
148    }
149}
150
151impl<T: Send> Receiver<T> {
152    fn new(inner: Arc<Inner<T>>) -> Receiver<T> {
153        Receiver {
154            inner: Arc::new(RecvInner { inner }),
155        }
156    }
157
158    /// Non-blocking attempt to receive data from the channel
159    pub fn try_recv(&self) -> Result<T, Error> {
160        match self.inner.data.pop() {
161            Some(data) => Ok(data),
162            None => {
163                if self.inner.connected.load(Ordering::Acquire) {
164                    Err(Error::Empty)
165                } else {
166                    Err(Error::Disconnected)
167                }
168            }
169        }
170    }
171
172    /// Block until data is received from the channel
173    pub fn recv(&self) -> Result<T, Error> {
174        match self.try_recv() {
175            Ok(data) => return Ok(data),
176            Err(Error::Disconnected) => return Err(Error::Disconnected),
177            Err(Error::Empty) => (),
178        };
179
180        let ret;
181        let mut guard = self.inner.guard.lock().unwrap();
182        self.inner.sleepers.fetch_add(1, Ordering::Relaxed);
183        loop {
184            match self.try_recv() {
185                Ok(data) => {
186                    ret = Ok(data);
187                    break;
188                }
189                Err(Error::Disconnected) => {
190                    ret = Err(Error::Disconnected);
191                    break;
192                }
193                Err(Error::Empty) => {}
194            };
195            guard = self.inner.waker.wait(guard).unwrap();
196        }
197        self.inner.sleepers.fetch_sub(1, Ordering::Relaxed);
198        ret
199    }
200}
201
202impl fmt::Display for Error {
203    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
204        match self {
205            Error::Disconnected => write!(f, "Receiver Error: channel is disconnected"),
206            Error::Empty => write!(f, "Receiver Error: channel is empty"),
207        }
208    }
209}
210
211impl fmt::Debug for Error {
212    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
213        match self {
214            Error::Disconnected => write!(f, "Receiver Error: channel is disconnected"),
215            Error::Empty => write!(f, "Receiver Error: channel is empty"),
216        }
217    }
218}