stochastic_queue/
mpmc_sync.rs1use crate::queue::StochasticQueue;
2use std::error::Error;
3use std::fmt;
4use std::fmt::Display;
5use std::fmt::Formatter;
6use std::sync::atomic;
7use std::sync::atomic::AtomicUsize;
8use std::sync::Arc;
9use std::sync::Condvar;
10use std::sync::Mutex;
11use std::time::Duration;
12
13struct MpmcState<T> {
14 queue: Mutex<StochasticQueue<T>>,
15 condvar: Condvar,
16 senders: AtomicUsize,
18 receivers: AtomicUsize,
19}
20
21pub struct StochasticMpmcSender<T>(Arc<MpmcState<T>>);
23
24impl<T> Clone for StochasticMpmcSender<T> {
25 fn clone(&self) -> Self {
26 let state = self.0.clone();
27 state.senders.fetch_add(1, atomic::Ordering::Relaxed);
28 Self(state)
29 }
30}
31
32impl<T> Drop for StochasticMpmcSender<T> {
33 fn drop(&mut self) {
34 let old = self.0.senders.fetch_sub(1, atomic::Ordering::Relaxed);
35 if old == 1 {
36 self.0.condvar.notify_all();
38 };
39 }
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq)]
44pub enum StochasticMpmcSendError {
45 NoReceivers,
46}
47
48impl Display for StochasticMpmcSendError {
49 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
50 write!(f, "{:?}", self)
51 }
52}
53
54impl Error for StochasticMpmcSendError {}
55
56impl<T> StochasticMpmcSender<T> {
57 pub fn send(&self, val: T) -> Result<(), StochasticMpmcSendError> {
59 if self.0.receivers.load(atomic::Ordering::Relaxed) == 0 {
60 return Err(StochasticMpmcSendError::NoReceivers);
61 };
62 self.0.queue.lock().unwrap().push(val);
63 self.0.condvar.notify_one();
64 Ok(())
65 }
66}
67
68pub struct StochasticMpmcReceiver<T>(Arc<MpmcState<T>>);
70
71impl<T> Clone for StochasticMpmcReceiver<T> {
72 fn clone(&self) -> Self {
73 let state = self.0.clone();
74 state.receivers.fetch_add(1, atomic::Ordering::Relaxed);
75 Self(state)
76 }
77}
78
79impl<T> Drop for StochasticMpmcReceiver<T> {
80 fn drop(&mut self) {
81 self.0.receivers.fetch_sub(1, atomic::Ordering::Relaxed);
82 }
83}
84
85#[derive(Clone, Copy, Debug, PartialEq, Eq)]
87pub enum StochasticMpmcRecvError {
88 NoSenders,
89}
90
91impl Display for StochasticMpmcRecvError {
92 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
93 write!(f, "{:?}", self)
94 }
95}
96
97impl Error for StochasticMpmcRecvError {}
98
99#[derive(Clone, Copy, Debug, PartialEq, Eq)]
101pub enum StochasticMpmcRecvTimeoutError {
102 NoSenders,
103 Timeout,
104}
105
106impl Display for StochasticMpmcRecvTimeoutError {
107 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
108 write!(f, "{:?}", self)
109 }
110}
111
112impl Error for StochasticMpmcRecvTimeoutError {}
113
114impl<T> StochasticMpmcReceiver<T> {
115 pub fn try_recv(&self) -> Result<Option<T>, StochasticMpmcRecvError> {
117 if self.0.senders.load(atomic::Ordering::Relaxed) == 0 {
118 return Err(StochasticMpmcRecvError::NoSenders);
119 };
120 let mut queue = self.0.queue.lock().unwrap();
121 Ok(queue.pop())
122 }
123
124 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, StochasticMpmcRecvTimeoutError> {
126 let mut queue = self.0.queue.lock().unwrap();
127 while queue.is_empty() {
128 if self.0.senders.load(atomic::Ordering::Relaxed) == 0 {
129 return Err(StochasticMpmcRecvTimeoutError::NoSenders);
130 };
131 let res = self.0.condvar.wait_timeout(queue, timeout).unwrap();
132 if res.1.timed_out() {
133 return Err(StochasticMpmcRecvTimeoutError::Timeout);
134 };
135 queue = res.0;
136 }
137 Ok(queue.pop().unwrap())
138 }
139
140 pub fn recv(&self) -> Result<T, StochasticMpmcRecvError> {
142 let mut queue = self.0.queue.lock().unwrap();
143 while queue.is_empty() {
144 if self.0.senders.load(atomic::Ordering::Relaxed) == 0 {
145 return Err(StochasticMpmcRecvError::NoSenders);
146 };
147 queue = self.0.condvar.wait(queue).unwrap();
148 }
149 Ok(queue.pop().unwrap())
150 }
151}
152
153impl<T> IntoIterator for StochasticMpmcReceiver<T> {
154 type IntoIter = StochasticMpmcReceiverIterator<T>;
155 type Item = T;
156
157 fn into_iter(self) -> Self::IntoIter {
158 StochasticMpmcReceiverIterator { receiver: self }
159 }
160}
161
162pub struct StochasticMpmcReceiverIterator<T> {
164 receiver: StochasticMpmcReceiver<T>,
165}
166
167impl<T> Iterator for StochasticMpmcReceiverIterator<T> {
168 type Item = T;
169
170 fn next(&mut self) -> Option<Self::Item> {
171 match self.receiver.recv() {
172 Ok(v) => Some(v),
173 Err(StochasticMpmcRecvError::NoSenders) => None,
174 }
175 }
176}
177
178pub fn stochastic_channel<T>() -> (StochasticMpmcSender<T>, StochasticMpmcReceiver<T>) {
219 let inner = Arc::new(MpmcState {
220 condvar: Condvar::new(),
221 queue: Mutex::new(StochasticQueue::new()),
222 receivers: AtomicUsize::new(1),
223 senders: AtomicUsize::new(1),
224 });
225 (
226 StochasticMpmcSender(inner.clone()),
227 StochasticMpmcReceiver(inner.clone()),
228 )
229}