use std::cell::Cell;
use std::fmt::{self, Debug};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
pub fn new<T>() -> (Publisher<T>, Waiter<T>) {
let value = Arc::new(Value {
published: AtomicBool::new(false),
mutex: Mutex::new(()),
condvar: Condvar::new(),
value: Cell::new(None),
});
(Publisher(value.clone()), Waiter(value))
}
#[derive(Debug)]
pub struct Publisher<T>(Arc<Value<T>>);
impl<T> Publisher<T> {
pub fn publish(self, value: T) {
let inner: &Value<T> = &self.0;
let _lock = inner.mutex.lock().expect("lock poisoned");
let ptr: *mut Option<T> = inner.value.as_ptr();
unsafe {
*ptr = Some(value);
}
inner.published.store(true, Ordering::Release);
inner.condvar.notify_all();
}
}
#[derive(Debug)]
pub struct Waiter<T>(Arc<Value<T>>);
impl<T> Clone for Waiter<T> {
fn clone(&self) -> Self {
Waiter(self.0.clone())
}
}
impl<T> Waiter<T> {
pub fn wait_for_value(&self) -> &T {
match self.0.try_get_value() {
Some(value) => value,
None => self._wait_for_value(),
}
}
pub fn try_get_value(&self) -> Option<&T> {
self.0.try_get_value()
}
pub fn into_value(self) -> Result<T, Waiter<T>> {
Arc::try_unwrap(self.0)
.map_err(Waiter)
.and_then(|value| value.into_value().map_err(|value| Waiter(Arc::new(value))))
}
fn _wait_for_value(&self) -> &T {
let inner: &Value<T> = &self.0;
if !inner.published.load(Ordering::Acquire) {
let mut lock = inner.mutex.lock().expect("lock poisoned");
if !inner.published.load(Ordering::Relaxed) {
while unsafe { inner.get_value() }.is_none() {
lock = inner.condvar.wait(lock).expect("lock poisoned");
}
}
}
unsafe { inner.get_value() }.unwrap()
}
}
struct Value<T> {
published: AtomicBool,
mutex: Mutex<()>,
condvar: Condvar,
value: Cell<Option<T>>,
}
impl<T> Value<T> {
fn try_get_value(&self) -> Option<&T> {
if !self.published.load(Ordering::Acquire) {
None
} else {
unsafe { self.get_value() }
}
}
fn into_value(mut self) -> Result<T, Value<T>> {
if !self.published.load(Ordering::Acquire) {
Err(self)
} else {
Ok(self.value.get_mut().take().unwrap())
}
}
unsafe fn get_value(&self) -> Option<&T> {
(&*self.value.as_ptr()).as_ref()
}
}
unsafe impl<T> Sync for Value<T> where T: Sync {}
impl<T> Debug for Value<T>
where
T: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.try_get_value() {
Some(value) => write!(f, "Published({:?})", value),
None => write!(f, "NotPublished"),
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn basic() {
let (publisher, waiter) = ::new();
let thread1 = ::std::thread::spawn({
let waiter = waiter.clone();
move || {
let value = waiter.wait_for_value();
format!("thread1 received value {}", value)
}
});
let thread2 = ::std::thread::spawn({
let waiter = waiter.clone();
move || {
let value = waiter.wait_for_value();
format!("thread2 received value {}", value)
}
});
publisher.publish(42);
assert_eq!(
thread1.join().unwrap(),
"thread1 received value 42".to_string()
);
assert_eq!(
thread2.join().unwrap(),
"thread2 received value 42".to_string()
);
}
#[test]
fn try_get_value() {
let (publisher, waiter) = ::new();
assert_eq!(waiter.try_get_value(), None);
publisher.publish(42);
assert_eq!(waiter.try_get_value(), Some(&42));
}
#[test]
fn into_value() {
let (publisher, waiter) = ::new();
assert!(waiter.clone().into_value().is_err());
publisher.publish(42);
assert!(waiter.clone().into_value().is_err());
assert_eq!(waiter.into_value().unwrap(), 42);
}
#[test]
fn not_cloneable() {
#[derive(Debug, PartialEq)]
struct NotClone(i64);
let (publisher, waiter) = ::new();
publisher.publish(NotClone(42));
assert_eq!(waiter.wait_for_value(), &NotClone(42));
assert_eq!(waiter.into_value().unwrap(), NotClone(42));
}
#[test]
fn debug() {
let (publisher, waiter) = ::new();
assert_eq!(format!("{:?}", publisher), "Publisher(NotPublished)");
assert_eq!(format!("{:?}", waiter), "Waiter(NotPublished)");
publisher.publish(42);
assert_eq!(format!("{:?}", waiter), "Waiter(Published(42))");
}
}