use std::{
collections::VecDeque,
sync::{Arc, Condvar, Mutex},
};
pub struct Sender<T> {
shared: Arc<Shared<T>>,
}
impl<T> Sender<T> {
pub fn send(&mut self, t: T) {
let mut inner = self.shared.inner.lock().unwrap();
inner.queue.push_back(t);
drop(inner);
self.shared.available.notify_one();
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
let mut inner = self.shared.inner.lock().unwrap();
inner.senders += 1;
drop(inner);
Sender {
shared: Arc::clone(&self.shared),
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let mut inner = self.shared.inner.lock().unwrap();
inner.senders -= 1;
let solo = inner.senders == 0;
drop(inner);
if solo {
self.shared.available.notify_one();
}
}
}
pub struct Receiver<T> {
shared: Arc<Shared<T>>,
}
impl<T> Receiver<T> {
pub fn recv(&self) -> Option<T> {
let mut inner = self.shared.inner.lock().unwrap();
loop {
match inner.queue.pop_front() {
Some(t) => return Some(t),
None if inner.senders == 0 => return None,
None => {
inner = self.shared.available.wait(inner).unwrap();
}
}
}
}
}
struct Inner<T> {
queue: VecDeque<T>,
senders: usize,
}
struct Shared<T> {
inner: Mutex<Inner<T>>,
available: Condvar,
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Inner {
queue: VecDeque::new(),
senders: 1,
};
let shared = Shared {
inner: Mutex::new(inner),
available: Condvar::new(),
};
let shared = Arc::new(shared);
(
Sender {
shared: shared.clone(),
},
Receiver {
shared: shared.clone(),
},
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn send_recv() {
let (mut tx, rx) = channel();
let x = 10;
tx.send(x);
assert_eq!(rx.recv(), Some(x));
}
#[test]
fn send_on_close() {
let (tx, rx) = channel::<()>();
drop(tx);
assert_eq!(rx.recv(), None);
}
#[test]
fn recv_on_close() {
let (tx, rx) = channel::<()>();
drop(tx);
assert_eq!(rx.recv(), None);
}
}