use crate::event;
use mio;
use std::mem;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
pub struct Sender<T> {
sender: Option<mpsc::SyncSender<T>>,
read_set_readiness: event::SetReadiness,
write_registration: event::Registration,
cts: Option<Arc<AtomicBool>>,
}
impl<T> Sender<T> {
pub fn can_send(&self) -> bool {
match &self.cts {
Some(cts) => cts.load(Ordering::Relaxed),
None => true,
}
}
pub fn get_write_registration(&self) -> &event::Registration {
&self.write_registration
}
pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
if let Some(cts) = &self.cts {
if cts
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
return Err(mpsc::TrySendError::Full(t));
}
}
match self.sender.as_ref().unwrap().try_send(t) {
Ok(_) => {
self.read_set_readiness
.set_readiness(mio::Interest::READABLE)
.unwrap();
Ok(())
}
Err(e) => Err(e),
}
}
pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
if self.cts.is_some() {
panic!("blocking send with rendezvous channel not supported")
}
match self.sender.as_ref().unwrap().send(t) {
Ok(_) => {
self.read_set_readiness
.set_readiness(mio::Interest::READABLE)
.unwrap();
Ok(())
}
Err(e) => Err(e),
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
mem::drop(self.sender.take().unwrap());
self.read_set_readiness
.set_readiness(mio::Interest::READABLE)
.unwrap();
}
}
pub struct Receiver<T> {
receiver: mpsc::Receiver<T>,
read_registration: event::Registration,
write_set_readiness: event::SetReadiness,
cts: Option<Arc<AtomicBool>>,
}
impl<T> Receiver<T> {
pub fn get_read_registration(&self) -> &event::Registration {
&self.read_registration
}
pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
match self.receiver.try_recv() {
Ok(t) => {
if self.cts.is_none() {
self.write_set_readiness
.set_readiness(mio::Interest::WRITABLE)
.unwrap();
}
Ok(t)
}
Err(mpsc::TryRecvError::Empty) if self.cts.is_some() => {
let cts = self.cts.as_ref().unwrap();
if cts
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.write_set_readiness
.set_readiness(mio::Interest::WRITABLE)
.unwrap();
}
Err(mpsc::TryRecvError::Empty)
}
Err(e) => Err(e),
}
}
pub fn recv(&self) -> Result<T, mpsc::RecvError> {
let t = self.receiver.recv()?;
if self.cts.is_none() {
self.write_set_readiness
.set_readiness(mio::Interest::WRITABLE)
.unwrap();
}
Ok(t)
}
}
pub fn channel<T>(bound: usize) -> (Sender<T>, Receiver<T>) {
let (read_reg, read_sr) = event::Registration::new();
let (write_reg, write_sr) = event::Registration::new();
if bound == 0 {
let (s, r) = mpsc::sync_channel::<T>(1);
let cts = Arc::new(AtomicBool::new(false));
let sender = Sender {
sender: Some(s),
read_set_readiness: read_sr,
write_registration: write_reg,
cts: Some(Arc::clone(&cts)),
};
let receiver = Receiver {
receiver: r,
read_registration: read_reg,
write_set_readiness: write_sr,
cts: Some(Arc::clone(&cts)),
};
(sender, receiver)
} else {
let (s, r) = mpsc::sync_channel::<T>(bound);
let sender = Sender {
sender: Some(s),
read_set_readiness: read_sr,
write_registration: write_reg,
cts: None,
};
let receiver = Receiver {
receiver: r,
read_registration: read_reg,
write_set_readiness: write_sr,
cts: None,
};
receiver
.write_set_readiness
.set_readiness(mio::Interest::WRITABLE)
.unwrap();
(sender, receiver)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time;
#[test]
fn test_send_recv_bound0() {
let (sender, receiver) = channel(0);
assert_eq!(sender.can_send(), false);
let result = sender.try_send(42);
assert_eq!(result.is_err(), true);
assert_eq!(result.unwrap_err(), mpsc::TrySendError::Full(42));
let result = receiver.try_recv();
assert_eq!(result.is_err(), true);
assert_eq!(result.unwrap_err(), mpsc::TryRecvError::Empty);
assert_eq!(sender.can_send(), true);
let result = sender.try_send(42);
assert_eq!(result.is_ok(), true);
assert_eq!(sender.can_send(), false);
let result = receiver.try_recv();
assert_eq!(result.is_ok(), true);
let v = result.unwrap();
assert_eq!(v, 42);
let result = receiver.try_recv();
assert_eq!(result.is_err(), true);
assert_eq!(result.unwrap_err(), mpsc::TryRecvError::Empty);
mem::drop(sender);
let result = receiver.try_recv();
assert_eq!(result.is_err(), true);
assert_eq!(result.unwrap_err(), mpsc::TryRecvError::Disconnected);
}
#[test]
fn test_send_recv_bound1() {
let (sender, receiver) = channel(1);
let result = receiver.try_recv();
assert_eq!(result.is_err(), true);
assert_eq!(result.unwrap_err(), mpsc::TryRecvError::Empty);
let result = sender.try_send(42);
assert_eq!(result.is_ok(), true);
let result = sender.try_send(42);
assert_eq!(result.is_err(), true);
assert_eq!(result.unwrap_err(), mpsc::TrySendError::Full(42));
let result = receiver.try_recv();
assert_eq!(result.is_ok(), true);
let v = result.unwrap();
assert_eq!(v, 42);
let result = receiver.try_recv();
assert_eq!(result.is_err(), true);
assert_eq!(result.unwrap_err(), mpsc::TryRecvError::Empty);
mem::drop(sender);
let result = receiver.try_recv();
assert_eq!(result.is_err(), true);
assert_eq!(result.unwrap_err(), mpsc::TryRecvError::Disconnected);
}
#[test]
fn test_notify_bound0() {
let (sender, receiver) = channel(0);
let mut poller = event::Poller::new(2).unwrap();
poller
.register_custom(
sender.get_write_registration(),
mio::Token(1),
mio::Interest::WRITABLE,
)
.unwrap();
poller
.register_custom(
receiver.get_read_registration(),
mio::Token(2),
mio::Interest::READABLE,
)
.unwrap();
assert_eq!(sender.can_send(), false);
poller.poll(Some(time::Duration::from_millis(0))).unwrap();
assert_eq!(poller.iter_events().next(), None);
let result = receiver.try_recv();
assert_eq!(result.is_err(), true);
assert_eq!(result.unwrap_err(), mpsc::TryRecvError::Empty);
poller.poll(None).unwrap();
let mut it = poller.iter_events();
let event = it.next().unwrap();
assert_eq!(event.token(), mio::Token(1));
assert_eq!(event.is_writable(), true);
assert_eq!(it.next(), None);
assert_eq!(sender.can_send(), true);
sender.try_send(42).unwrap();
poller.poll(None).unwrap();
let mut it = poller.iter_events();
let event = it.next().unwrap();
assert_eq!(event.token(), mio::Token(2));
assert_eq!(event.is_readable(), true);
assert_eq!(it.next(), None);
let v = receiver.try_recv().unwrap();
assert_eq!(v, 42);
mem::drop(sender);
poller.poll(None).unwrap();
let mut it = poller.iter_events();
let event = it.next().unwrap();
assert_eq!(event.token(), mio::Token(2));
assert_eq!(event.is_readable(), true);
assert_eq!(it.next(), None);
let e = receiver.try_recv().unwrap_err();
assert_eq!(e, mpsc::TryRecvError::Disconnected);
}
#[test]
fn test_notify_bound1() {
let (sender, receiver) = channel(1);
let mut poller = event::Poller::new(2).unwrap();
poller
.register_custom(
sender.get_write_registration(),
mio::Token(1),
mio::Interest::WRITABLE,
)
.unwrap();
poller
.register_custom(
receiver.get_read_registration(),
mio::Token(2),
mio::Interest::READABLE,
)
.unwrap();
poller.poll(Some(time::Duration::from_millis(0))).unwrap();
let mut it = poller.iter_events();
let event = it.next().unwrap();
assert_eq!(event.token(), mio::Token(1));
assert_eq!(event.is_writable(), true);
assert_eq!(it.next(), None);
sender.try_send(42).unwrap();
poller.poll(None).unwrap();
let mut it = poller.iter_events();
let event = it.next().unwrap();
assert_eq!(event.token(), mio::Token(2));
assert_eq!(event.is_readable(), true);
assert_eq!(it.next(), None);
let v = receiver.try_recv().unwrap();
assert_eq!(v, 42);
mem::drop(sender);
poller.poll(None).unwrap();
let mut it = poller.iter_events();
let event = it.next().unwrap();
assert_eq!(event.token(), mio::Token(2));
assert_eq!(event.is_readable(), true);
assert_eq!(it.next(), None);
let e = receiver.try_recv().unwrap_err();
assert_eq!(e, mpsc::TryRecvError::Disconnected);
}
}