stochastic_queue/
mpmc_sync.rs

1use crate::queue::StochasticQueue;
2use std::error::Error;
3use std::fmt;
4use std::fmt::Display;
5use std::fmt::Formatter;
6use std::sync::atomic;
7use std::sync::atomic::AtomicUsize;
8use std::sync::Arc;
9use std::sync::Condvar;
10use std::sync::Mutex;
11use std::time::Duration;
12
13struct MpmcState<T> {
14  queue: Mutex<StochasticQueue<T>>,
15  condvar: Condvar,
16  // If either of these become zero, it's impossible for that value to ever increase again.
17  senders: AtomicUsize,
18  receivers: AtomicUsize,
19}
20
21/// Sender for a stochastic MPMC channel. Create one using `stochastic_channel()`. This sender can be cheaply cloned, which will increase the sender count. Dropping a sender will decrease the sender count.
22pub struct StochasticMpmcSender<T>(Arc<MpmcState<T>>);
23
24impl<T> Clone for StochasticMpmcSender<T> {
25  fn clone(&self) -> Self {
26    let state = self.0.clone();
27    state.senders.fetch_add(1, atomic::Ordering::Relaxed);
28    Self(state)
29  }
30}
31
32impl<T> Drop for StochasticMpmcSender<T> {
33  fn drop(&mut self) {
34    let old = self.0.senders.fetch_sub(1, atomic::Ordering::Relaxed);
35    if old == 1 {
36      // Any receiver blocked on recv() must now abort.
37      self.0.condvar.notify_all();
38    };
39  }
40}
41
42/// Errors that could occur when sending an item in a stochastic channel.
43#[derive(Clone, Copy, Debug, PartialEq, Eq)]
44pub enum StochasticMpmcSendError {
45  NoReceivers,
46}
47
48impl Display for StochasticMpmcSendError {
49  fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
50    write!(f, "{:?}", self)
51  }
52}
53
54impl Error for StochasticMpmcSendError {}
55
56impl<T> StochasticMpmcSender<T> {
57  /// Send an item. If there are no receivers, an error is returned.
58  pub fn send(&self, val: T) -> Result<(), StochasticMpmcSendError> {
59    if self.0.receivers.load(atomic::Ordering::Relaxed) == 0 {
60      return Err(StochasticMpmcSendError::NoReceivers);
61    };
62    self.0.queue.lock().unwrap().push(val);
63    self.0.condvar.notify_one();
64    Ok(())
65  }
66}
67
68/// Receiver for a stochastic MPMC channel. Create one using `stochastic_channel()`. This receiver can be cheaply cloned, which will increase the receiver count. Dropping a receiver will decrease the receiver count.
69pub struct StochasticMpmcReceiver<T>(Arc<MpmcState<T>>);
70
71impl<T> Clone for StochasticMpmcReceiver<T> {
72  fn clone(&self) -> Self {
73    let state = self.0.clone();
74    state.receivers.fetch_add(1, atomic::Ordering::Relaxed);
75    Self(state)
76  }
77}
78
79impl<T> Drop for StochasticMpmcReceiver<T> {
80  fn drop(&mut self) {
81    self.0.receivers.fetch_sub(1, atomic::Ordering::Relaxed);
82  }
83}
84
85/// Errors that could occur when receiving an item from a stochastic channel.
86#[derive(Clone, Copy, Debug, PartialEq, Eq)]
87pub enum StochasticMpmcRecvError {
88  NoSenders,
89}
90
91impl Display for StochasticMpmcRecvError {
92  fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
93    write!(f, "{:?}", self)
94  }
95}
96
97impl Error for StochasticMpmcRecvError {}
98
99/// Errors that could occur when receiving an item from a stochastic channel with a timeout.
100#[derive(Clone, Copy, Debug, PartialEq, Eq)]
101pub enum StochasticMpmcRecvTimeoutError {
102  NoSenders,
103  Timeout,
104}
105
106impl Display for StochasticMpmcRecvTimeoutError {
107  fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
108    write!(f, "{:?}", self)
109  }
110}
111
112impl Error for StochasticMpmcRecvTimeoutError {}
113
114impl<T> StochasticMpmcReceiver<T> {
115  /// Receive an item, or return None if no items are available right now. If there are no senders, an error is returned.
116  pub fn try_recv(&self) -> Result<Option<T>, StochasticMpmcRecvError> {
117    if self.0.senders.load(atomic::Ordering::Relaxed) == 0 {
118      return Err(StochasticMpmcRecvError::NoSenders);
119    };
120    let mut queue = self.0.queue.lock().unwrap();
121    Ok(queue.pop())
122  }
123
124  /// Block the current thread until an item can be received, time has run out, or there are no more senders. If time has run out or there are no more senders, an error is returned.
125  pub fn recv_timeout(&self, timeout: Duration) -> Result<T, StochasticMpmcRecvTimeoutError> {
126    let mut queue = self.0.queue.lock().unwrap();
127    while queue.is_empty() {
128      if self.0.senders.load(atomic::Ordering::Relaxed) == 0 {
129        return Err(StochasticMpmcRecvTimeoutError::NoSenders);
130      };
131      let res = self.0.condvar.wait_timeout(queue, timeout).unwrap();
132      if res.1.timed_out() {
133        return Err(StochasticMpmcRecvTimeoutError::Timeout);
134      };
135      queue = res.0;
136    }
137    Ok(queue.pop().unwrap())
138  }
139
140  /// Block the current thread until an item can be received or there are no more senders. If there are no more senders, an error is returned.
141  pub fn recv(&self) -> Result<T, StochasticMpmcRecvError> {
142    let mut queue = self.0.queue.lock().unwrap();
143    while queue.is_empty() {
144      if self.0.senders.load(atomic::Ordering::Relaxed) == 0 {
145        return Err(StochasticMpmcRecvError::NoSenders);
146      };
147      queue = self.0.condvar.wait(queue).unwrap();
148    }
149    Ok(queue.pop().unwrap())
150  }
151}
152
153impl<T> IntoIterator for StochasticMpmcReceiver<T> {
154  type IntoIter = StochasticMpmcReceiverIterator<T>;
155  type Item = T;
156
157  fn into_iter(self) -> Self::IntoIter {
158    StochasticMpmcReceiverIterator { receiver: self }
159  }
160}
161
162/// Iterator over items sent in a channel. This will keep producing items until there are no more senders. The current thread will be blocked while waiting for an item or until there are no more senders. Not all items sent will be received by this iterator, as there may be other receivers.
163pub struct StochasticMpmcReceiverIterator<T> {
164  receiver: StochasticMpmcReceiver<T>,
165}
166
167impl<T> Iterator for StochasticMpmcReceiverIterator<T> {
168  type Item = T;
169
170  fn next(&mut self) -> Option<Self::Item> {
171    match self.receiver.recv() {
172      Ok(v) => Some(v),
173      Err(StochasticMpmcRecvError::NoSenders) => None,
174    }
175  }
176}
177
178/// Create a MPMC channel that receives items in a uniformly random order but without any delays. Returns a tuple containing the sender and receiver, both of which can be cheaply cloned. Senders and receivers will continue to work until there are no more senders/receivers on the other side.
179///
180/// # Example
181///
182/// ```rust
183/// use std::thread;
184/// use stochastic_queue::stochastic_channel;
185///
186/// let (sender, receiver) = stochastic_channel();
187/// let r1 = receiver.clone();
188/// let t1 = thread::spawn(move || {
189///   for i in r1 {
190///     println!("Received {i}");
191///   };
192/// });
193/// let r2 = receiver.clone();
194/// let t2 = thread::spawn(move || {
195///   for i in r2 {
196///     println!("Received {i}");
197///   };
198/// });
199/// let s_even = sender.clone();
200/// let t3 = thread::spawn(move || {
201///   for i in (0..10).step_by(2) {
202///     s_even.send(i).unwrap();
203///   };
204/// });
205/// let s_odd = sender.clone();
206/// let t4 = thread::spawn(move || {
207///   for i in (1..10).step_by(2) {
208///     s_odd.send(i).unwrap();
209///   };
210/// });
211/// drop(sender);
212/// drop(receiver);
213/// t1.join().unwrap();
214/// t2.join().unwrap();
215/// t3.join().unwrap();
216/// t4.join().unwrap();
217/// ```
218pub fn stochastic_channel<T>() -> (StochasticMpmcSender<T>, StochasticMpmcReceiver<T>) {
219  let inner = Arc::new(MpmcState {
220    condvar: Condvar::new(),
221    queue: Mutex::new(StochasticQueue::new()),
222    receivers: AtomicUsize::new(1),
223    senders: AtomicUsize::new(1),
224  });
225  (
226    StochasticMpmcSender(inner.clone()),
227    StochasticMpmcReceiver(inner.clone()),
228  )
229}