use std::cell::UnsafeCell;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
pub fn channel<T>() -> (Requester<T>, Responder<T>) {
let inner = Arc::new(Inner {
has_request_lock: AtomicBool::new(false),
has_response_lock: AtomicBool::new(false),
has_request: AtomicBool::new(false),
has_datum: AtomicBool::new(false),
datum: UnsafeCell::new(None),
});
(
Requester { inner: inner.clone() },
Responder { inner: inner.clone() },
)
}
pub struct Requester<T> {
inner: Arc<Inner<T>>,
}
impl<T> Requester<T> {
pub fn try_request(&self) -> Result<RequestContract<T>, TryRequestError> {
if !self.inner.try_lock_request() {
return Err(TryRequestError::Locked);
}
self.inner.flag_request();
Ok(RequestContract {
inner: self.inner.clone(),
done: false,
})
}
}
pub struct RequestContract<T> {
inner: Arc<Inner<T>>,
done: bool,
}
impl<T> RequestContract<T> {
pub fn try_receive(&mut self) -> Result<T, TryReceiveError> {
if self.done {
return Err(TryReceiveError::Done);
}
match self.inner.try_get_datum() {
Ok(datum) => {
self.done = true;
Ok(datum)
},
Err(err) => Err(err),
}
}
pub fn try_cancel(&mut self) -> bool {
if self.done {
return false;
}
match self.inner.try_unflag_request() {
true => {
self.done = true;
true
},
false => false,
}
}
}
impl<T> Drop for RequestContract<T> {
fn drop(&mut self) {
if !self.done {
panic!("Dropping RequestContract without receiving data!");
}
self.inner.unlock_request();
}
}
pub struct Responder<T> {
inner: Arc<Inner<T>>,
}
impl<T> Responder<T> {
pub fn try_respond(&self) -> Result<ResponseContract<T>,
TryRespondError> {
if !self.inner.try_lock_response() {
return Err(TryRespondError::Locked);
}
if !self.inner.try_unflag_request() {
self.inner.unlock_response();
return Err(TryRespondError::NoRequest);
}
Ok(ResponseContract {
inner: self.inner.clone(),
done: false,
})
}
}
impl<T> Clone for Responder<T> {
fn clone(&self) -> Self {
Responder {
inner: self.inner.clone(),
}
}
}
pub struct ResponseContract<T> {
inner: Arc<Inner<T>>,
done: bool,
}
impl<T> ResponseContract<T> {
pub fn send(mut self, datum: T) {
self.inner.set_datum(datum);
self.done = true;
}
}
impl<T> Drop for ResponseContract<T> {
fn drop(&mut self) {
if !self.done {
panic!("Dropping ResponseContract without sending data!");
}
self.inner.unlock_response();
}
}
#[doc(hidden)]
struct Inner<T> {
has_request_lock: AtomicBool,
has_response_lock: AtomicBool,
has_request: AtomicBool,
has_datum: AtomicBool,
datum: UnsafeCell<Option<T>>,
}
unsafe impl<T> Sync for Inner<T> {}
#[doc(hidden)]
impl<T> Inner<T> {
#[inline]
fn flag_request(&self) {
self.has_request.store(true, Ordering::SeqCst);
}
#[inline]
fn try_unflag_request(&self) -> bool {
let (old, new) = (true, false);
self.has_request.compare_and_swap(old,
new,
Ordering::SeqCst) == old
}
#[inline]
fn set_datum(&self, data: T) {
unsafe {
*self.datum.get() = Some(data);
}
self.has_datum.store(true, Ordering::SeqCst);
}
#[inline]
fn try_get_datum(&self) -> Result<T, TryReceiveError> {
let (old, new) = (true, false);
if self.has_datum.compare_and_swap(old,
new,
Ordering::SeqCst) == old {
unsafe {
Ok((*self.datum.get()).take().unwrap())
}
}
else {
Err(TryReceiveError::Empty)
}
}
#[inline]
fn try_lock_request(&self) -> bool {
let (old, new) = (false, true);
self.has_request_lock.compare_and_swap(old, new, Ordering::SeqCst) == old
}
#[inline]
fn unlock_request(&self) {
self.has_request_lock.store(false, Ordering::SeqCst);
}
#[inline]
fn try_lock_response(&self) -> bool {
let (old, new) = (false, true);
self.has_response_lock.compare_and_swap(old, new, Ordering::SeqCst) == old
}
#[inline]
fn unlock_response(&self) {
self.has_response_lock.store(false, Ordering::SeqCst);
}
}
#[derive(Debug)]
pub enum TryRequestError {
Locked,
}
#[derive(Debug)]
pub enum TryReceiveError {
Empty,
Done,
}
#[derive(Debug)]
pub enum TryRespondError {
NoRequest,
Locked,
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use super::*;
trait FnBox {
fn call_box(self: Box<Self>);
}
impl<F: FnOnce()> FnBox for F {
fn call_box(self: Box<F>) {
(*self)()
}
}
type Task = Box<FnBox + Send + 'static>;
#[test]
fn test_channel() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
}
#[test]
fn test_inner_try_lock_request() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
assert_eq!(rqst.inner.try_lock_request(), true);
assert_eq!(resp.inner.has_request_lock.load(Ordering::SeqCst), true);
}
#[test]
fn test_inner_try_lock_request_multiple() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
rqst.inner.try_lock_request();
assert_eq!(rqst.inner.try_lock_request(), false);
}
#[test]
fn test_inner_try_unlock_request() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
rqst.inner.has_request_lock.store(true, Ordering::SeqCst);
rqst.inner.unlock_request();
assert_eq!(resp.inner.has_request_lock.load(Ordering::SeqCst), false);
}
#[test]
fn test_inner_try_lock_response() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
assert_eq!(rqst.inner.try_lock_response(), true);
assert_eq!(resp.inner.has_response_lock.load(Ordering::SeqCst), true);
}
#[test]
fn test_inner_try_lock_response_multiple() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
rqst.inner.try_lock_response();
assert_eq!(rqst.inner.try_lock_response(), false);
}
#[test]
fn test_inner_try_unlock_response() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
rqst.inner.has_response_lock.store(true, Ordering::SeqCst);
rqst.inner.unlock_response();
assert_eq!(resp.inner.has_response_lock.load(Ordering::SeqCst), false);
}
#[test]
fn test_inner_flag_request() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
rqst.inner.flag_request();
assert_eq!(resp.inner.has_request.load(Ordering::SeqCst), true);
}
#[test]
fn test_inner_try_unflag_request() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
resp.inner.has_request.store(true, Ordering::SeqCst);
assert_eq!(rqst.inner.try_unflag_request(), true);
assert_eq!(resp.inner.has_request.load(Ordering::SeqCst), false);
assert_eq!(rqst.inner.try_unflag_request(), false);
}
#[test]
fn test_inner_try_unflag_request_multiple() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
resp.inner.has_request.store(true, Ordering::SeqCst);
rqst.inner.try_unflag_request();
assert_eq!(rqst.inner.try_unflag_request(), false);
}
#[test]
fn test_inner_set_datum() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
let task = Box::new(move || { println!("Hello World!"); }) as Task;
resp.inner.set_datum(task);
assert_eq!(resp.inner.has_datum.load(Ordering::SeqCst), true);
}
#[test]
fn test_inner_try_get_datum_with_data() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
let var = Arc::new(AtomicUsize::new(0));
let var2 = var.clone();
let task = Box::new(move || {
var2.fetch_add(1, Ordering::SeqCst);
}) as Task;
unsafe {
*resp.inner.datum.get() = Some(task);
}
resp.inner.has_datum.store(true, Ordering::SeqCst);
match rqst.inner.try_get_datum() {
Ok(t) => {
t.call_box();
assert_eq!(var.load(Ordering::SeqCst), 1);
},
_ => { assert!(false); },
}
}
#[test]
fn test_inner_try_get_datum_no_data() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
match rqst.inner.try_get_datum() {
Err(TryReceiveError::Empty) => {}
_ => { assert!(false); },
}
}
#[test]
fn test_requester_try_request() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
let mut contract = rqst.try_request().unwrap();
contract.done = true;
}
#[test]
fn test_requester_try_request_multiple() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
rqst.inner.try_lock_request();
match rqst.try_request() {
Err(TryRequestError::Locked) => {},
_ => { assert!(false); },
}
}
#[test]
fn test_request_contract_try_receive() {
let (rqst, resp) = channel::<Task>();
let var = Arc::new(AtomicUsize::new(0));
let var2 = var.clone();
let task = Box::new(move || {
var2.fetch_add(1, Ordering::SeqCst);
}) as Task;
let mut contract = rqst.try_request().unwrap();
resp.inner.set_datum(task);
match contract.try_receive() {
Ok(task) => {
task.call_box();
},
_ => { assert!(false); },
}
assert_eq!(contract.done, true);
assert_eq!(var.load(Ordering::SeqCst), 1);
}
#[test]
fn test_request_contract_try_receive_no_data() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
let mut contract = rqst.try_request().unwrap();
match contract.try_receive() {
Err(TryReceiveError::Empty) => {},
_ => { assert!(false); },
}
assert_eq!(contract.done, false);
contract.done = true;
}
#[test]
fn test_request_contract_try_receive_done() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
let mut contract = rqst.try_request().unwrap();
contract.done = true;
match contract.try_receive() {
Err(TryReceiveError::Done) => {},
_ => { assert!(false); },
}
}
#[test]
fn test_request_contract_try_cancel() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
let mut contract = rqst.try_request().unwrap();
assert_eq!(contract.try_cancel(), true);
}
#[test]
fn test_request_contract_try_cancel_too_late() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
let mut contract = rqst.try_request().unwrap();
rqst.inner.try_unflag_request();
assert_eq!(contract.try_cancel(), false);
assert_eq!(contract.done, false);
contract.done = true;
}
#[test]
fn test_request_contract_try_cancel_done() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
let mut contract = rqst.try_request().unwrap();
contract.done = true;
assert_eq!(contract.try_cancel(), false);
}
#[test]
#[should_panic]
fn test_request_contract_drop_without_receiving_data() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
#[allow(unused_variables)]
let contract = rqst.try_request().unwrap();
}
#[test]
fn test_responder_try_respond() {
let (rqst, resp) = channel::<Task>();
rqst.inner.flag_request();
let mut contract = resp.try_respond().unwrap();
contract.done = true;
}
#[test]
fn test_responder_try_respond_no_request() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
match resp.try_respond() {
Err(TryRespondError::NoRequest) => {},
_ => { assert!(false); },
}
}
#[test]
fn test_responder_try_respond_multiple() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
resp.inner.try_lock_response();
match resp.try_respond() {
Err(TryRespondError::Locked) => {},
_ => { assert!(false); },
}
}
#[test]
fn test_response_contract_send() {
let (rqst, resp) = channel::<Task>();
rqst.inner.flag_request();
let contract = resp.try_respond().unwrap();
contract.send(Box::new(move || { println!("Hello World!"); }) as Task);
}
#[test]
#[should_panic]
fn test_response_contract_drop_without_sending_data() {
#[allow(unused_variables)]
let (rqst, resp) = channel::<Task>();
#[allow(unused_variables)]
let contract = resp.try_respond().unwrap();
}
}