#![cfg_attr(all(feature = "cargo-clippy", feature = "pedantic"), warn(clippy_pedantic))]
#![cfg_attr(feature = "cargo-clippy", warn(use_self))]
#![cfg_attr(feature = "cargo-clippy", allow(missing_docs_in_private_items))]
#![deny(missing_debug_implementations, warnings)]
#![doc(html_root_url = "https://docs.rs/futures-shuttle/0.2.1")]
extern crate futures_core;
extern crate parking_lot;
use std::error::Error;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
use futures_core::task::{AtomicWaker, Context, Waker};
use futures_core::{Async, Future, IntoFuture, Poll};
use parking_lot::{Mutex, MutexGuard};
#[derive(Debug, PartialEq)]
pub enum ShuttleError {
Corrupted,
Stopped,
}
impl fmt::Display for ShuttleError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", self.description())
}
}
impl Error for ShuttleError {
fn description(&self) -> &str {
use ShuttleError::*;
match *self {
Corrupted => "Shuttle is corrupted",
Stopped => "Shuttle is stopped",
}
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
enum Owner {
Left,
Right,
}
impl From<bool> for Owner {
fn from(owner: bool) -> Self {
if owner {
Owner::Right
} else {
Owner::Left
}
}
}
impl From<Owner> for bool {
fn from(owner: Owner) -> Self {
match owner {
Owner::Left => false,
Owner::Right => true,
}
}
}
impl Owner {
fn other(&self) -> Self {
match *self {
Owner::Left => Owner::Right,
Owner::Right => Owner::Left,
}
}
}
#[derive(Debug)]
pub struct Shuttle<T> {
inner: Arc<Inner<T>>,
whoami: Owner,
}
impl<T> Shuttle<T> {
pub fn is_mine(&self) -> bool {
self.whoami == self.inner.owner().into()
}
pub fn data(&self) -> ShuttleValue<T> {
ShuttleValue {
data: self.inner.data(),
}
}
pub fn send(&self) {
assert!(!self.inner.is_stopped(), "Sending stopped shuttle");
self.send_impl();
}
pub fn try_send(&self) -> Result<(), ShuttleError> {
if self.inner.is_stopped() {
Err(ShuttleError::Stopped)
} else {
self.send_impl();
Ok(())
}
}
#[inline]
fn send_impl(&self) {
let other = self.whoami.other().into();
self.inner.turnover(other);
}
fn left(inner: Arc<Inner<T>>) -> Self {
Self {
inner,
whoami: Owner::Left,
}
}
fn right(inner: Arc<Inner<T>>) -> Self {
Self {
inner,
whoami: Owner::Right,
}
}
fn pair(item: T) -> (Self, Self) {
let left = Arc::new(Inner::new(item, false));
let right = Arc::clone(&left);
let left = Self::left(left);
let right = Self::right(right);
(left, right)
}
fn set_waker(&self, waker: &Waker) {
let idx = self.whoami as usize;
self.inner.set_waker(idx, waker);
}
}
impl<T> Drop for Shuttle<T> {
fn drop(&mut self) {
self.inner.stop();
if self.is_mine() {
self.send_impl();
}
}
}
#[derive(Debug)]
pub struct ShuttleCombined<T> {
inner: Arc<Inner<T>>,
}
impl<T> ShuttleCombined<T> {
pub fn new(item: T) -> Self {
let inner = Arc::new(Inner::new(item, true));
Self { inner }
}
pub fn split(self) -> (Shuttle<T>, Shuttle<T>) {
let left = self.inner;
left.set_combined(false);
let right = Arc::clone(&left);
let left = Shuttle::left(left);
let right = Shuttle::right(right);
(left, right)
}
pub fn left(&self) -> ShuttleWait<T> {
Shuttle::left(Arc::clone(&self.inner)).into_future()
}
pub fn right(&self) -> ShuttleWait<T> {
Shuttle::right(Arc::clone(&self.inner)).into_future()
}
}
#[derive(Debug)]
struct Inner<T> {
data: Mutex<T>,
wakers: [AtomicWaker; 2],
owner: AtomicBool,
stopped: AtomicBool,
combined: AtomicBool,
}
impl<T> Inner<T> {
fn new(item: T, combined: bool) -> Self {
Self {
owner: AtomicBool::new(Owner::Left.into()),
stopped: AtomicBool::new(false),
combined: AtomicBool::new(combined),
data: Mutex::new(item),
wakers: Default::default(),
}
}
#[inline]
fn owner(&self) -> bool {
self.owner.load(SeqCst)
}
#[inline]
fn stop(&self) {
self.stopped.store(true, Release);
}
#[inline]
fn is_stopped(&self) -> bool {
self.stopped.load(Acquire)
}
#[inline]
fn data(&self) -> MutexGuard<T> {
self.data.lock()
}
#[inline]
fn set_waker(&self, idx: usize, waker: &Waker) {
self.wakers[idx].register(waker);
}
#[inline]
fn wake(&self, idx: usize) {
self.wakers[idx].wake();
}
#[inline]
fn set_combined(&self, combined: bool) {
self.combined.store(combined, Release);
}
#[inline]
fn turnover(&self, other: bool) {
let me = !other;
if let Ok(prev) = self.owner.compare_exchange(me, other, SeqCst, Relaxed) {
debug_assert_eq!(prev, me);
let idx = other as usize;
self.wake(idx);
}
}
}
pub struct ShuttleValue<'a, T: 'a> {
data: MutexGuard<'a, T>,
}
impl<'a, T: 'a + fmt::Debug> fmt::Debug for ShuttleValue<'a, T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ShuttleValue")
.field("data", &self.data.deref())
.finish()
}
}
impl<'a, T: 'a> Deref for ShuttleValue<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.data.deref()
}
}
impl<'a, T: 'a> DerefMut for ShuttleValue<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.data.deref_mut()
}
}
impl<'a, T, U> PartialEq<U> for ShuttleValue<'a, T>
where
T: PartialEq<U>,
{
fn eq(&self, other: &U) -> bool {
self.data.eq(other)
}
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct ShuttleWait<T> {
shuttle: Option<Shuttle<T>>,
}
impl<T> IntoFuture for Shuttle<T> {
type Future = ShuttleWait<T>;
type Item = Self;
type Error = ShuttleError;
fn into_future(self) -> Self::Future {
ShuttleWait {
shuttle: Some(self),
}
}
}
impl<T> Future for ShuttleWait<T> {
type Item = Shuttle<T>;
type Error = ShuttleError;
fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Self::Error> {
if let Some(shuttle) = self.shuttle.take() {
shuttle.set_waker(cx.waker());
if shuttle.is_mine() {
Ok(Async::Ready(shuttle))
} else {
self.shuttle = Some(shuttle);
Ok(Async::Pending)
}
} else {
Err(ShuttleError::Corrupted)
}
}
}
pub fn shuttle<T>(item: T) -> (Shuttle<T>, Shuttle<T>) {
Shuttle::pair(item)
}
pub fn shuttle_combined<T>(item: T) -> ShuttleCombined<T> {
ShuttleCombined::new(item)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic() {
let (left, right) = shuttle(42);
assert!(left.is_mine());
assert!(!right.is_mine());
}
#[test]
fn back_and_forth() {
let (left, right) = shuttle(42);
assert!(left.is_mine());
assert!(!right.is_mine());
*left.data() = 84;
left.send();
assert!(right.is_mine());
assert!(!left.is_mine());
assert_eq!(right.data(), 84);
*right.data() = 123;
right.send();
assert!(left.is_mine());
assert!(!right.is_mine());
assert_eq!(left.data(), 123);
}
#[test]
fn stopped() {
let (left, right) = shuttle(42);
assert!(left.is_mine());
assert!(!right.is_mine());
assert!(left.try_send().is_ok());
assert!(right.is_mine());
assert!(!left.is_mine());
drop(left);
assert_eq!(right.try_send(), Err(ShuttleError::Stopped));
}
}