1use std::fmt;
2use std::ops::Deref;
3use std::sync::atomic::*;
4use std::sync::{Arc, Condvar, Mutex};
5
6mod queue;
7mod stack;
8
9pub fn queue<T: Send + 'static>() -> (Sender<T>, Receiver<T>) {
10 let inner = Arc::new(Inner {
11 data: Box::new(queue::Queue::new()),
12 guard: Mutex::new(false),
13 waker: Condvar::new(),
14 connected: AtomicBool::new(true),
15 sleepers: AtomicUsize::new(0),
16 });
17 (Sender::new(inner.clone()), Receiver::new(inner.clone()))
18}
19
20pub fn stack<T: Send + 'static>() -> (Sender<T>, Receiver<T>) {
21 let inner = Arc::new(Inner {
22 data: Box::new(stack::Stack::new()),
23 guard: Mutex::new(false),
24 waker: Condvar::new(),
25 connected: AtomicBool::new(true),
26 sleepers: AtomicUsize::new(0),
27 });
28 (Sender::new(inner.clone()), Receiver::new(inner.clone()))
29}
30
31pub trait LockFree<T> {
32 fn push(&self, item: T);
33 fn pop(&self) -> Option<T>;
34 fn len(&self) -> usize;
35}
36
37struct Inner<T: Send> {
38 data: Box<LockFree<T>>,
39 connected: AtomicBool,
40 guard: Mutex<bool>,
41 waker: Condvar,
42 sleepers: AtomicUsize,
43}
44
45unsafe impl<T: Send> Send for Sender<T> {}
46unsafe impl<T: Send> Sync for Sender<T> {}
47unsafe impl<T: Send> Send for Receiver<T> {}
48unsafe impl<T: Send> Sync for Receiver<T> {}
49
50pub struct Sender<T: Send> {
51 inner: Arc<SendInner<T>>,
52}
53
54pub struct Receiver<T: Send> {
55 inner: Arc<RecvInner<T>>,
56}
57
58struct SendInner<T: Send> {
59 inner: Arc<Inner<T>>,
60}
61
62struct RecvInner<T: Send> {
63 inner: Arc<Inner<T>>,
64}
65
66impl<T: Send> Deref for RecvInner<T> {
67 type Target = Arc<Inner<T>>;
68 fn deref(&self) -> &Arc<Inner<T>> {
69 &self.inner
70 }
71}
72
73impl<T: Send> Deref for SendInner<T> {
74 type Target = Arc<Inner<T>>;
75 fn deref(&self) -> &Arc<Inner<T>> {
76 &self.inner
77 }
78}
79
80impl<T: Send> Drop for RecvInner<T> {
81 fn drop(&mut self) {
82 self.inner.connected.store(false, Ordering::Release);
83 }
84}
85
86impl<T: Send> Drop for SendInner<T> {
87 fn drop(&mut self) {
88 self.inner.connected.store(false, Ordering::Release);
90 if self.inner.sleepers.load(Ordering::Acquire) > 0 {
92 *self.inner.guard.lock().unwrap() = true;
93 self.inner.waker.notify_all();
94 }
95 }
96}
97
98impl<T: Send> Sender<T> {
99 fn new(inner: Arc<Inner<T>>) -> Sender<T> {
100 Sender {
101 inner: Arc::new(SendInner { inner }),
102 }
103 }
104
105 pub fn send(&self, data: T) -> Result<(), T> {
106 if self.inner.connected.load(Ordering::Acquire) {
109 self.inner.data.push(data);
110 if self.inner.sleepers.load(Ordering::Acquire) > 0 {
111 *self.inner.guard.lock().unwrap() = true;
112 self.inner.waker.notify_one();
113 }
114 Ok(())
115 } else {
116 Err(data)
118 }
119 }
120
121 pub fn size_hint(&self) -> usize {
122 self.inner.data.len()
123 }
124
125 pub fn close(self) {}
127}
128
129impl<T: Send> Clone for Sender<T> {
130 fn clone(&self) -> Sender<T> {
131 Sender {
132 inner: self.inner.clone(),
133 }
134 }
135}
136
137#[derive(PartialEq)]
138pub enum Error {
139 Empty,
140 Disconnected,
141}
142
143impl<T: Send> Clone for Receiver<T> {
144 fn clone(&self) -> Receiver<T> {
145 Receiver {
146 inner: self.inner.clone(),
147 }
148 }
149}
150
151impl<T: Send> Receiver<T> {
152 fn new(inner: Arc<Inner<T>>) -> Receiver<T> {
153 Receiver {
154 inner: Arc::new(RecvInner { inner }),
155 }
156 }
157
158 pub fn try_recv(&self) -> Result<T, Error> {
160 match self.inner.data.pop() {
161 Some(data) => Ok(data),
162 None => {
163 if self.inner.connected.load(Ordering::Acquire) {
164 Err(Error::Empty)
165 } else {
166 Err(Error::Disconnected)
167 }
168 }
169 }
170 }
171
172 pub fn recv(&self) -> Result<T, Error> {
174 match self.try_recv() {
175 Ok(data) => return Ok(data),
176 Err(Error::Disconnected) => return Err(Error::Disconnected),
177 Err(Error::Empty) => (),
178 };
179
180 let ret;
181 let mut guard = self.inner.guard.lock().unwrap();
182 self.inner.sleepers.fetch_add(1, Ordering::Relaxed);
183 loop {
184 match self.try_recv() {
185 Ok(data) => {
186 ret = Ok(data);
187 break;
188 }
189 Err(Error::Disconnected) => {
190 ret = Err(Error::Disconnected);
191 break;
192 }
193 Err(Error::Empty) => {}
194 };
195 guard = self.inner.waker.wait(guard).unwrap();
196 }
197 self.inner.sleepers.fetch_sub(1, Ordering::Relaxed);
198 ret
199 }
200}
201
202impl fmt::Display for Error {
203 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
204 match self {
205 Error::Disconnected => write!(f, "Receiver Error: channel is disconnected"),
206 Error::Empty => write!(f, "Receiver Error: channel is empty"),
207 }
208 }
209}
210
211impl fmt::Debug for Error {
212 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
213 match self {
214 Error::Disconnected => write!(f, "Receiver Error: channel is disconnected"),
215 Error::Empty => write!(f, "Receiver Error: channel is empty"),
216 }
217 }
218}