use std::collections::VecDeque;
use std::mem;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::atomic::Ordering::{Acquire, Release};
use std::thread;
use std::time::Instant;
use parking_lot::Mutex;
use select::CaseId;
use select::handle::{self, Handle};
use utils::Backoff;
/// Waiting strategy in an exchange operation.
enum Wait {
/// Yield once and then time out.
YieldOnce,
/// Wait until an optional deadline.
Until(Option<Instant>),
}
/// Enumeration of possible exchange errors.
pub enum ExchangeError<T> {
/// The exchange operation timed out.
Timeout(T),
/// The exchanger was disconnected.
Disconnected(T),
}
/// Inner representation of an exchanger.
///
/// This data structure is wrapped in a mutex.
struct Inner<T> {
/// There are two wait queues, one per side.
wait_queues: [WaitQueue<T>; 2],
/// `true` if the exchanger is closed.
closed: bool,
}
/// A two-sided exchanger.
///
/// This is a concurrent data structure with two sides: left and right. A thread can offer a
/// message on one side, and if there is another thread waiting on the opposite side at the same
/// time, they exchange messages.
///
/// Instead of *offering* a concrete message for excahnge, a thread can also *promise* a message.
/// If another thread pairs up with the promise on the opposite end, then it will wait until the
/// promise is fulfilled. The thread that promised the message must in the end fulfill it. A
/// promise can also be revoked if nobody is waiting for it.
pub struct Exchanger<T> {
inner: Mutex<Inner<T>>,
}
impl<T> Exchanger<T> {
/// Returns a new exchanger.
#[inline]
pub fn new() -> Self {
Exchanger {
inner: Mutex::new(Inner {
wait_queues: [WaitQueue::new(), WaitQueue::new()],
closed: false,
}),
}
}
/// Returns the left side of the exchanger.
pub fn left(&self) -> Side<T> {
Side {
index: 0,
exchanger: self,
}
}
/// Returns the right side of the exchanger.
pub fn right(&self) -> Side<T> {
Side {
index: 1,
exchanger: self,
}
}
/// Closes the exchanger and wakes up all currently blocked operations on it.
pub fn close(&self) -> bool {
let mut inner = self.inner.lock();
if inner.closed {
false
} else {
inner.closed = true;
inner.wait_queues[0].abort_all();
inner.wait_queues[1].abort_all();
true
}
}
/// Returns `true` if the exchanger is closed.
pub fn is_closed(&self) -> bool {
self.inner.lock().closed
}
}
/// One side of an exchanger.
pub struct Side<'a, T: 'a> {
/// The index is 0 or 1.
index: usize,
/// A reference to the parent exchanger.
exchanger: &'a Exchanger<T>,
}
impl<'a, T> Side<'a, T> {
/// Promises a message for exchange.
pub fn promise(&self, case_id: CaseId) {
self.exchanger.inner.lock().wait_queues[self.index].promise(case_id);
}
/// Revokes the previously made promise.
///
/// This method should be called right after the case is aborted.
pub fn revoke(&self, case_id: CaseId) {
self.exchanger.inner.lock().wait_queues[self.index].remove(case_id);
}
/// Fulfills the previously made promise.
pub fn fulfill(&self, msg: T) -> T {
// Wait until the requesting thread gives us a pointer to its `Request`.
let req = LOCAL.with(|local| {
let mut backoff = Backoff::new();
loop {
let ptr = local.request_ptr.swap(0, Acquire) as *const Request<T>;
if !ptr.is_null() {
break ptr;
}
backoff.step();
}
});
unsafe {
// First, make a clone of the requesting thread's `Handle`.
let handle = (*req).handle.clone();
// Exchange the messages and then notify the requesting thread that it can pick up our
// message.
let m = (*req).packet.exchange(msg);
(*req).handle.try_select(CaseId::abort());
// Wake up the requesting thread.
handle.unpark();
// Return the exchanged message.
m
}
}
/// Exchanges `msg` if there is an offer or promise on the opposite side.
pub fn try_exchange(&self, msg: T, case_id: CaseId) -> Result<T, ExchangeError<T>> {
self.exchange(msg, Wait::YieldOnce, case_id)
}
/// Exchanges `msg`, waiting until the specified `deadline`.
pub fn exchange_until(
&self,
msg: T,
deadline: Option<Instant>,
case_id: CaseId,
) -> Result<T, ExchangeError<T>> {
self.exchange(msg, Wait::Until(deadline), case_id)
}
/// Returns `true` if there is an offer or promise on the opposite side.
pub fn can_notify(&self) -> bool {
self.exchanger.inner.lock().wait_queues[self.index].can_notify()
}
/// Exchanges `msg` with the specified `wait` strategy.
fn exchange(&self, mut msg: T, wait: Wait, case_id: CaseId) -> Result<T, ExchangeError<T>> {
loop {
// Allocate a packet on the stack.
let packet;
{
let mut inner = self.exchanger.inner.lock();
if inner.closed {
return Err(ExchangeError::Disconnected(msg));
}
// If there's someone on the other side, exchange messages with it.
if let Some(case) = inner.wait_queues[self.index ^ 1].pop() {
drop(inner);
return Ok(case.exchange(msg));
}
// Promise a packet with the message.
handle::current_reset();
packet = Packet::new(msg);
inner.wait_queues[self.index].offer(&packet, case_id);
}
// Wait until the timeout and then try aborting the case.
let timed_out = match wait {
Wait::YieldOnce => {
thread::yield_now();
handle::current_try_select(CaseId::abort())
}
Wait::Until(deadline) => !handle::current_wait_until(deadline),
};
// If someone requested the promised message...
if handle::current_selected() != CaseId::abort() {
// Wait until the message is taken and return.
packet.wait();
return Ok(packet.into_inner());
}
// Revoke the promise.
let mut inner = self.exchanger.inner.lock();
inner.wait_queues[self.index].remove(case_id);
msg = packet.into_inner();
// If we timed out, return.
if timed_out {
return Err(ExchangeError::Timeout(msg));
}
// Otherwise, another thread must have woken us up. Let's try again.
}
}
}
/// An entry in a wait queue.
enum Entry<T> {
/// Offers a concrete message.
Offer {
handle: Handle,
packet: *const Packet<T>,
case_id: CaseId,
},
/// Promises a message.
Promise {
local: Arc<Local>,
case_id: CaseId,
},
}
impl<T> Entry<T> {
/// Exchange `msg` with this entry and wake it up.
fn exchange(&self, msg: T) -> T {
match *self {
// This is an offer.
// We can exchange messages immediately.
Entry::Offer {
ref handle, packet, ..
} => {
let m = unsafe { (*packet).exchange(msg) };
handle.unpark();
m
}
// This is a promise.
// We must request the message and then wait until the promise is fulfilled.
Entry::Promise { ref local, .. } => {
// Reset the current thread's selection case.
handle::current_reset();
// Create a request on the stack and register it in the owner of this entry.
let req = Request::new(msg);
local.request_ptr.store(&req as *const _ as usize, Release);
// Wake up the owner of this entry.
local.handle.unpark();
// Wait until our selection case is woken.
handle::current_wait_until(None);
// Extract the received message from the request.
req.packet.into_inner()
}
}
}
/// Returns the handle associated with the owner of this entry.
fn handle(&self) -> &Handle {
match *self {
Entry::Offer { ref handle, .. } => handle,
Entry::Promise { ref local, .. } => &local.handle,
}
}
/// Returns the case ID associated with this entry.
fn case_id(&self) -> CaseId {
match *self {
Entry::Offer { case_id, .. } | Entry::Promise { case_id, .. } => case_id,
}
}
}
/// A wait queue in which blocked selection cases are registered.
struct WaitQueue<T> {
cases: VecDeque<Entry<T>>,
}
impl<T> WaitQueue<T> {
/// Creates a new `WaitQueue`.
fn new() -> Self {
WaitQueue {
cases: VecDeque::new(),
}
}
/// Attempts to fire one case owned by another thread and returns it on success.
fn pop(&mut self) -> Option<Entry<T>> {
let thread_id = current_thread_id();
for i in 0..self.cases.len() {
if self.cases[i].handle().thread_id() != thread_id {
if self.cases[i].handle().try_select(self.cases[i].case_id()) {
let case = self.cases.remove(i).unwrap();
self.maybe_shrink();
return Some(case);
}
}
}
None
}
/// Inserts an *offer* case owned by the current thread with `case_id`.
fn offer(&mut self, packet: *const Packet<T>, case_id: CaseId) {
self.cases.push_back(Entry::Offer {
handle: handle::current(),
packet,
case_id,
});
}
/// Inserts a *promise* case owned by the current thread with `case_id`.
fn promise(&mut self, case_id: CaseId) {
self.cases.push_back(Entry::Promise {
local: LOCAL.with(|l| l.clone()),
case_id,
});
}
/// Removes a case owned by the current thread with `case_id`.
fn remove(&mut self, case_id: CaseId) {
let thread_id = current_thread_id();
if let Some((i, _)) = self.cases.iter().enumerate().find(|&(_, case)| {
case.case_id() == case_id && case.handle().thread_id() == thread_id
}) {
self.cases.remove(i);
self.maybe_shrink();
}
}
/// Returns `true` if there exists a case which isn't owned by the current thread.
fn can_notify(&self) -> bool {
let thread_id = current_thread_id();
for i in 0..self.cases.len() {
if self.cases[i].handle().thread_id() != thread_id {
return true;
}
}
false
}
/// Aborts all cases and unparks threads which own them.
fn abort_all(&mut self) {
for case in self.cases.drain(..) {
case.handle().try_select(CaseId::abort());
case.handle().unpark();
}
self.maybe_shrink();
}
/// Shrinks the internal buffer if its capacity is underused.
fn maybe_shrink(&mut self) {
if self.cases.capacity() > 32 && self.cases.capacity() / 2 > self.cases.len() {
self.cases.shrink_to_fit();
}
}
}
impl<T> Drop for WaitQueue<T> {
fn drop(&mut self) {
debug_assert!(self.cases.is_empty());
}
}
thread_local! {
static THREAD_ID: thread::ThreadId = thread::current().id();
}
/// Returns the current thread ID.
///
/// This is much faster than calling `std::thread::current().id()`.
fn current_thread_id() -> thread::ThreadId {
THREAD_ID.with(|id| *id)
}
thread_local! {
static LOCAL: Arc<Local> = Arc::new(Local {
handle: handle::current(),
request_ptr: AtomicUsize::new(0),
});
}
/// Thread-local structure that contains a slot for the `Request` pointer.
struct Local {
/// The handle associated with this thread.
handle: Handle,
/// A slot into which another thread may store a pointer to its `Request`.
request_ptr: AtomicUsize,
}
/// A request for promised message.
struct Request<T> {
/// The handle associated with the requestor.
handle: Handle,
/// The message for exchange.
packet: Packet<T>,
}
impl<T> Request<T> {
/// Creates a new request owned by the current thread for exchanging `msg`.
fn new(msg: T) -> Self {
Request {
handle: handle::current(),
packet: Packet::new(msg),
}
}
}
/// A one-time only packet for exchanging messages.
///
/// A message can be exchanged for the one inside the packet only once.
struct Packet<T> {
/// The mutex-protected message.
msg: Mutex<Option<T>>,
/// This will become `true` when the message is exchanged.
ready: AtomicBool,
}
impl<T> Packet<T> {
/// Creates a new packet containing `msg`.
fn new(msg: T) -> Self {
Packet {
msg: Mutex::new(Some(msg)),
ready: AtomicBool::new(false),
}
}
/// Exchanges `msg` for the one inside the packet.
fn exchange(&self, msg: T) -> T {
let r = mem::replace(&mut *self.msg.try_lock().unwrap(), Some(msg));
self.ready.store(true, Release);
r.unwrap()
}
/// Extracts the message inside the packet.
fn into_inner(self) -> T {
self.msg.try_lock().unwrap().take().unwrap()
}
/// Spin-waits until the message inside the packet is exchanged.
fn wait(&self) {
let mut backoff = Backoff::new();
while !self.ready.load(Acquire) {
backoff.step();
}
}
}