use std::any::Any;
use std::cell::Cell;
use std::collections::VecDeque;
use std::error::Error;
use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::ptr;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, fence};
use std::sync::atomic::Ordering::*;
use std::sync::mpsc::{TrySendError, TryRecvError, RecvError};
use std::thread::yield_now;
use alloc;
use atomicsignal::LoadedSignal;
use countedindex::{CountedIndex, get_valid_wrap, is_tagged, rm_tag, Index, INITIAL_QUEUE_FLAG};
use memory::{MemoryManager, MemToken};
use wait::*;
use read_cursor::{ReadCursor, Reader};
extern crate futures;
extern crate parking_lot;
extern crate smallvec;
use self::futures::{Async, AsyncSink, Poll, Sink, Stream, StartSend};
use self::futures::task::{park, Task};
pub trait QueueRW<T> {
fn inc_ref(&AtomicUsize);
fn dec_ref(&AtomicUsize);
fn check_ref(&AtomicUsize) -> bool;
fn do_drop() -> bool;
unsafe fn get_val(&mut T) -> T;
fn forget_val(T);
unsafe fn drop_in_place(&mut T);
}
#[derive(Clone)]
pub struct BCast<T> {
mk: PhantomData<T>,
}
impl<T: Clone> QueueRW<T> for BCast<T> {
#[inline(always)]
fn inc_ref(r: &AtomicUsize) {
r.fetch_add(1, Relaxed);
}
#[inline(always)]
fn dec_ref(r: &AtomicUsize) {
r.fetch_sub(1, Relaxed);
}
#[inline(always)]
fn check_ref(r: &AtomicUsize) -> bool {
r.load(Relaxed) == 0
}
#[inline(always)]
fn do_drop() -> bool {
true
}
#[inline(always)]
unsafe fn get_val(val: &mut T) -> T {
val.clone()
}
#[inline(always)]
fn forget_val(_v: T) {}
#[inline(always)]
unsafe fn drop_in_place(_v: &mut T) {}
}
#[derive(Clone)]
pub struct MPMC<T> {
mk: PhantomData<T>,
}
impl<T> QueueRW<T> for MPMC<T> {
#[inline(always)]
fn inc_ref(_r: &AtomicUsize) {}
#[inline(always)]
fn dec_ref(_r: &AtomicUsize) {}
#[inline(always)]
fn check_ref(_r: &AtomicUsize) -> bool {
true
}
#[inline(always)]
fn do_drop() -> bool {
false
}
#[inline(always)]
unsafe fn get_val(val: &mut T) -> T {
ptr::read(val)
}
#[inline(always)]
fn forget_val(val: T) {
mem::forget(val);
}
#[inline(always)]
unsafe fn drop_in_place(val: &mut T) {
ptr::drop_in_place(val);
}
}
#[derive(Clone, Copy)]
enum QueueState {
Uni,
Multi,
}
struct QueueEntry<T> {
val: T,
wraps: AtomicUsize,
}
struct RefCnt {
refcnt: AtomicUsize,
_buffer: [u8; 64],
}
#[repr(C)]
pub struct MultiQueue<RW: QueueRW<T>, T> {
d1: [u8; 64],
head: CountedIndex,
tail_cache: AtomicUsize,
writers: AtomicUsize,
d2: [u8; 64],
tail: ReadCursor,
data: *mut QueueEntry<T>,
refs: *mut RefCnt,
capacity: isize,
pub waiter: Arc<Wait>,
needs_notify: bool,
mk: PhantomData<RW>,
d3: [u8; 64],
pub manager: MemoryManager,
d4: [u8; 64],
}
pub struct InnerSend<RW: QueueRW<T>, T> {
queue: Arc<MultiQueue<RW, T>>,
token: *const MemToken,
state: Cell<QueueState>,
}
pub struct InnerRecv<RW: QueueRW<T>, T> {
queue: Arc<MultiQueue<RW, T>>,
reader: Reader,
token: *const MemToken,
alive: bool,
}
#[derive(Clone)]
pub struct FutInnerSend<RW: QueueRW<T>, T> {
writer: InnerSend<RW, T>,
wait: Arc<FutWait>,
prod_wait: Arc<FutWait>,
}
#[derive(Clone)]
pub struct FutInnerRecv<RW: QueueRW<T>, T> {
reader: InnerRecv<RW, T>,
wait: Arc<FutWait>,
prod_wait: Arc<FutWait>,
}
pub struct FutInnerUniRecv<RW: QueueRW<T>, R, F: FnMut(&T) -> R, T> {
reader: InnerRecv<RW, T>,
wait: Arc<FutWait>,
prod_wait: Arc<FutWait>,
pub op: F,
}
struct FutWait {
spins_first: usize,
spins_yield: usize,
parked: parking_lot::Mutex<VecDeque<Task>>,
}
impl<RW: QueueRW<T>, T> MultiQueue<RW, T> {
pub fn new(_capacity: Index) -> (InnerSend<RW, T>, InnerRecv<RW, T>) {
MultiQueue::new_with(_capacity, BlockingWait::new())
}
pub fn new_with<W: Wait + 'static>(capacity: Index,
wait: W)
-> (InnerSend<RW, T>, InnerRecv<RW, T>) {
MultiQueue::new_internal(capacity, Arc::new(wait))
}
fn new_internal(_capacity: Index, wait: Arc<Wait>) -> (InnerSend<RW, T>, InnerRecv<RW, T>) {
let capacity = get_valid_wrap(_capacity);
let queuedat = alloc::allocate(capacity as usize);
let refdat = alloc::allocate(capacity as usize);
unsafe {
for i in 0..capacity as isize {
let elem: &QueueEntry<T> = &*queuedat.offset(i);
elem.wraps.store(INITIAL_QUEUE_FLAG, Relaxed);
let refd: &RefCnt = &*refdat.offset(i);
refd.refcnt.store(0, Relaxed);
}
}
let (cursor, reader) = ReadCursor::new(capacity);
let needs_notify = wait.needs_notify();
let queue = MultiQueue {
d1: unsafe { mem::uninitialized() },
head: CountedIndex::new(capacity),
tail_cache: AtomicUsize::new(0),
writers: AtomicUsize::new(1),
d2: unsafe { mem::uninitialized() },
tail: cursor,
data: queuedat,
refs: refdat,
capacity: capacity as isize,
waiter: wait,
needs_notify: needs_notify,
mk: PhantomData,
d3: unsafe { mem::uninitialized() },
manager: MemoryManager::new(),
d4: unsafe { mem::uninitialized() },
};
let qarc = Arc::new(queue);
let mwriter = InnerSend {
queue: qarc.clone(),
state: Cell::new(QueueState::Uni),
token: qarc.manager.get_token(),
};
let mreader = InnerRecv {
queue: qarc.clone(),
reader: reader,
token: qarc.manager.get_token(),
alive: true,
};
(mwriter, mreader)
}
pub fn try_send_multi(&self, val: T) -> Result<(), TrySendError<T>> {
let mut transaction = self.head.load_transaction(Relaxed);
unsafe {
loop {
let (chead, wrap_valid_tag) = transaction.get();
let tail_cache = self.tail_cache.load(Relaxed);
if transaction.matches_previous(tail_cache) {
let new_tail = self.reload_tail_multi(tail_cache, wrap_valid_tag);
if transaction.matches_previous(new_tail) {
return Err(TrySendError::Full(val));
}
}
let write_cell = &mut *self.data.offset(chead);
let ref_cell = &*self.refs.offset(chead);
if !RW::check_ref(&ref_cell.refcnt) {
return Err(TrySendError::Full(val));
}
fence(Acquire);
match transaction.commit(1, Relaxed) {
Some(new_transaction) => transaction = new_transaction,
None => {
let current_tag = write_cell.wraps.load(Relaxed);
let _possible_drop = if RW::do_drop() && !is_tagged(current_tag) {
Some(ptr::read(&write_cell.val))
} else {
None
};
ptr::write(&mut write_cell.val, val);
write_cell.wraps.store(wrap_valid_tag, Release);
return Ok(());
}
}
}
}
}
pub fn try_send_single(&self, val: T) -> Result<(), TrySendError<T>> {
let transaction = self.head.load_transaction(Relaxed);
let (chead, wrap_valid_tag) = transaction.get();
unsafe {
let tail_cache = self.tail_cache.load(Relaxed);
if transaction.matches_previous(tail_cache) {
let new_tail = self.reload_tail_single(wrap_valid_tag);
if transaction.matches_previous(new_tail) {
return Err(TrySendError::Full(val));
}
}
let write_cell = &mut *self.data.offset(chead);
let ref_cell = &*self.refs.offset(chead);
if !RW::check_ref(&ref_cell.refcnt) {
return Err(TrySendError::Full(val));
}
fence(Acquire);
transaction.commit_direct(1, Relaxed);
let current_tag = write_cell.wraps.load(Relaxed);
let _possible_drop = if RW::do_drop() && !is_tagged(current_tag) {
Some(ptr::read(&write_cell.val))
} else {
None
};
ptr::write(&mut write_cell.val, val);
write_cell.wraps.store(wrap_valid_tag, Release);
Ok(())
}
}
pub fn try_recv(&self, reader: &Reader) -> Result<T, (*const AtomicUsize, TryRecvError)> {
let mut ctail_attempt = reader.load_attempt(Relaxed);
unsafe {
loop {
let (ctail, wrap_valid_tag) = ctail_attempt.get();
let read_cell = &mut *self.data.offset(ctail);
if rm_tag(read_cell.wraps.load(Relaxed)) != wrap_valid_tag {
if self.writers.load(Relaxed) == 0 {
fence(Acquire);
if rm_tag(read_cell.wraps.load(Acquire)) != wrap_valid_tag {
return Err((ptr::null(), TryRecvError::Disconnected));
}
}
return Err((&read_cell.wraps, TryRecvError::Empty));
}
let ref_cell = &*self.refs.offset(ctail);
let is_single = reader.is_single();
if !is_single {
RW::inc_ref(&ref_cell.refcnt);
}
fence(Acquire);
let rval = RW::get_val(&mut read_cell.val);
fence(Release);
if !is_single {
RW::dec_ref(&ref_cell.refcnt);
}
match ctail_attempt.commit_attempt(1, Relaxed) {
Some(new_attempt) => {
ctail_attempt = new_attempt;
RW::forget_val(rval);
}
None => return Ok(rval),
}
}
}
}
pub fn try_recv_view<R, F: FnOnce(&T) -> R>
(&self,
op: F,
reader: &Reader)
-> Result<R, (F, *const AtomicUsize, TryRecvError)> {
let ctail_attempt = reader.load_attempt(Relaxed);
unsafe {
let (ctail, wrap_valid_tag) = ctail_attempt.get();
let read_cell = &mut *self.data.offset(ctail);
if rm_tag(read_cell.wraps.load(Acquire)) != wrap_valid_tag {
if self.writers.load(Relaxed) == 0 {
fence(Acquire);
if rm_tag(read_cell.wraps.load(Acquire)) != wrap_valid_tag {
return Err((op, ptr::null(), TryRecvError::Disconnected));
}
}
return Err((op, &read_cell.wraps, TryRecvError::Empty));
}
let rval = op(&read_cell.val);
RW::drop_in_place(&mut read_cell.val);
ctail_attempt.commit_direct(1, Release);
Ok(rval)
}
}
fn reload_tail_multi(&self, tail_cache: usize, count: usize) -> usize {
if let Some(max_diff_from_head) = self.tail.get_max_diff(count) {
let current_tail = CountedIndex::get_previous(count, max_diff_from_head);
if tail_cache == current_tail {
return current_tail;
}
match self.tail_cache.compare_exchange(tail_cache, current_tail, AcqRel, Relaxed) {
Ok(_) => current_tail,
Err(val) => val,
}
} else {
self.tail_cache.load(Acquire)
}
}
fn reload_tail_single(&self, count: usize) -> usize {
let max_diff_from_head = self.tail
.get_max_diff(count)
.expect("The write head got ran over by consumers in single writer mode. This \
process is borked!");
let current_tail = CountedIndex::get_previous(count, max_diff_from_head);
self.tail_cache.store(current_tail, Relaxed);
current_tail
}
}
impl<RW: QueueRW<T>, T> InnerSend<RW, T> {
#[inline(always)]
pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
let signal = self.queue.manager.signal.load(Relaxed);
if signal.has_action() {
let disconnected = self.handle_signals(signal);
if disconnected {
return Err(TrySendError::Full(val));
}
}
let val = match self.state.get() {
QueueState::Uni => self.queue.try_send_single(val),
QueueState::Multi => {
if self.queue.writers.load(Relaxed) == 1 {
fence(Acquire);
self.state.set(QueueState::Uni);
self.queue.try_send_single(val)
} else {
self.queue.try_send_multi(val)
}
}
};
if val.is_ok() {
if self.queue.needs_notify {
self.queue.waiter.notify();
}
}
val
}
pub fn unsubscribe(self) {}
#[cold]
fn handle_signals(&self, signal: LoadedSignal) -> bool {
if signal.get_epoch() {
self.queue.manager.update_token(self.token);
}
signal.get_reader()
}
}
impl<RW: QueueRW<T>, T> InnerRecv<RW, T> {
#[inline(always)]
pub fn try_recv(&self) -> Result<T, TryRecvError> {
self.examine_signals();
match self.queue.try_recv(&self.reader) {
Ok(v) => Ok(v),
Err((_, e)) => Err(e),
}
}
pub fn recv(&self) -> Result<T, RecvError> {
self.examine_signals();
loop {
match self.queue.try_recv(&self.reader) {
Ok(v) => return Ok(v),
Err((_, TryRecvError::Disconnected)) => return Err(RecvError),
Err((pt, TryRecvError::Empty)) => {
let count = self.reader.load_count(Relaxed);
unsafe {
self.queue.waiter.wait(count, &*pt, &self.queue.writers);
}
}
}
}
}
pub fn is_single(&self) -> bool {
self.reader.get_consumers() == 1
}
#[inline(always)]
pub fn try_recv_view<R, F: FnOnce(&T) -> R>(&self, op: F) -> Result<R, (F, TryRecvError)> {
self.examine_signals();
match self.queue.try_recv_view(op, &self.reader) {
Ok(v) => Ok(v),
Err((op, _, e)) => Err((op, e)),
}
}
pub fn recv_view<R, F: FnOnce(&T) -> R>(&self, mut op: F) -> Result<R, (F, RecvError)> {
self.examine_signals();
loop {
match self.queue.try_recv_view(op, &self.reader) {
Ok(v) => return Ok(v),
Err((o, _, TryRecvError::Disconnected)) => return Err((o, RecvError)),
Err((o, pt, TryRecvError::Empty)) => {
op = o;
let count = self.reader.load_count(Relaxed);
unsafe {
self.queue.waiter.wait(count, &*pt, &self.queue.writers);
}
}
}
}
}
pub fn add_stream(&self) -> InnerRecv<RW, T> {
InnerRecv {
queue: self.queue.clone(),
reader: self.queue.tail.add_stream(&self.reader, &self.queue.manager),
token: self.queue.manager.get_token(),
alive: true,
}
}
#[inline(always)]
fn examine_signals(&self) {
let signal = self.queue.manager.signal.load(Relaxed);
if signal.has_action() {
self.handle_signals(signal);
}
}
#[cold]
fn handle_signals(&self, signal: LoadedSignal) {
if signal.get_epoch() {
self.queue.manager.update_token(self.token);
}
}
pub fn unsubscribe(self) -> bool {
self.reader.get_consumers() == 1
}
unsafe fn do_unsubscribe_with<F: FnOnce()>(&mut self, f: F) {
if self.alive {
self.alive = false;
if self.reader.remove_consumer() == 1 {
if self.queue.tail.remove_reader(&self.reader, &self.queue.manager) {
self.queue.manager.signal.set_reader(SeqCst);
}
self.queue.manager.remove_token(self.token);
}
fence(SeqCst);
f()
}
}
}
impl<RW: QueueRW<T>, T> FutInnerSend<RW, T> {
pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
self.writer.try_send(val)
}
pub fn unsubscribe(self) {
self.writer.unsubscribe()
}
}
impl<RW: QueueRW<T>, T> FutInnerRecv<RW, T> {
#[inline(always)]
pub fn try_recv(&self) -> Result<T, TryRecvError> {
self.reader.try_recv()
}
#[inline(always)]
pub fn recv(&self) -> Result<T, RecvError> {
self.reader.recv()
}
pub fn add_stream(&self) -> FutInnerRecv<RW, T> {
let rx = self.reader.add_stream();
FutInnerRecv {
reader: rx,
wait: self.wait.clone(),
prod_wait: self.prod_wait.clone(),
}
}
pub fn into_single<R, F: FnMut(&T) -> R>
(self,
op: F)
-> Result<FutInnerUniRecv<RW, R, F, T>, (F, FutInnerRecv<RW, T>)> {
let new_mreader;
let new_pwait = self.prod_wait.clone();
let new_wait = self.wait.clone();
{
new_mreader = self.reader.clone();
drop(self);
}
if new_mreader.is_single() {
Ok(FutInnerUniRecv {
reader: new_mreader,
wait: new_wait,
prod_wait: new_pwait,
op: op,
})
} else {
Err((op,
FutInnerRecv {
reader: new_mreader,
wait: new_wait,
prod_wait: new_pwait,
}))
}
}
pub fn unsubscribe(self) -> bool {
self.reader.reader.get_consumers() == 1
}
}
impl<RW: QueueRW<T>, R, F: FnMut(&T) -> R, T> FutInnerUniRecv<RW, R, F, T> {
#[inline(always)]
pub fn try_recv(&mut self) -> Result<R, TryRecvError> {
let opref = &mut self.op;
let rval = self.reader.try_recv_view(|tr| opref(tr));
self.prod_wait.notify_one();
rval.map_err(|x| x.1)
}
#[inline(always)]
pub fn recv(&mut self) -> Result<R, RecvError> {
let opref = &mut self.op;
let rval = self.reader.recv_view(|tr| opref(tr));
self.prod_wait.notify_one();
rval.map_err(|x| x.1)
}
pub fn add_stream_with<Q, FQ: FnMut(&T) -> Q>(&self, op: FQ) -> FutInnerUniRecv<RW, Q, FQ, T> {
let rx = self.reader.add_stream();
FutInnerUniRecv {
reader: rx,
wait: self.wait.clone(),
prod_wait: self.prod_wait.clone(),
op: op,
}
}
pub fn unsubscribe(self) -> bool {
self.reader.reader.get_consumers() == 1
}
pub fn into_multi(self) -> FutInnerRecv<RW, T> {
let new_reader = self.reader.add_stream();
FutInnerRecv {
reader: new_reader,
wait: self.wait.clone(),
prod_wait: self.prod_wait.clone(),
}
}
}
pub struct SendError<T>(T);
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_tuple("SendError")
.field(&"...")
.finish()
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "send failed because receiver is gone")
}
}
impl<T> Error for SendError<T>
where T: Any
{
fn description(&self) -> &str {
"send failed because receiver is gone"
}
}
impl<T> SendError<T> {
pub fn into_inner(self) -> T {
self.0
}
}
impl<RW: QueueRW<T>, T> Sink for FutInnerSend<RW, T> {
type SinkItem = T;
type SinkError = SendError<T>;
fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
match self.prod_wait.send_or_park(|m| self.writer.try_send(m), msg) {
Ok(_) => {
if self.writer.queue.needs_notify {
self.writer.queue.waiter.notify();
}
Ok(AsyncSink::Ready)
}
Err(TrySendError::Full(msg)) => Ok(AsyncSink::NotReady(msg)),
Err(TrySendError::Disconnected(msg)) => Err(SendError(msg)),
}
}
#[inline(always)]
fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
Ok(Async::Ready(()))
}
}
impl<RW: QueueRW<T>, T> Stream for FutInnerRecv<RW, T> {
type Item = T;
type Error = ();
#[inline]
fn poll(&mut self) -> Poll<Option<T>, ()> {
self.reader.examine_signals();
loop {
match self.reader.queue.try_recv(&self.reader.reader) {
Ok(msg) => {
self.prod_wait.notify_one();
return Ok(Async::Ready(Some(msg)));
}
Err((_, TryRecvError::Disconnected)) => return Ok(Async::Ready(None)),
Err((pt, _)) => {
let count = self.reader.reader.load_count(Relaxed);
if unsafe { self.wait.fut_wait(count, &*pt, &self.reader.queue.writers) } {
return Ok(Async::NotReady);
}
}
}
}
}
}
impl<RW: QueueRW<T>, R, F: for<'r> FnMut(&T) -> R, T> Stream for FutInnerUniRecv<RW, R, F, T> {
type Item = R;
type Error = ();
#[inline]
fn poll(&mut self) -> Poll<Option<R>, ()> {
self.reader.examine_signals();
loop {
let opref = &mut self.op;
match self.reader
.queue
.try_recv_view(opref, &self.reader.reader) {
Ok(msg) => {
self.prod_wait.notify_one();
return Ok(Async::Ready(Some(msg)));
}
Err((_, _, TryRecvError::Disconnected)) => return Ok(Async::Ready(None)),
Err((_, pt, _)) => {
let count = self.reader.reader.load_count(Relaxed);
if unsafe { self.wait.fut_wait(count, &*pt, &self.reader.queue.writers) } {
return Ok(Async::NotReady);
}
}
}
}
}
}
impl FutWait {
pub fn new() -> FutWait {
FutWait::with_spins(DEFAULT_TRY_SPINS, DEFAULT_YIELD_SPINS)
}
pub fn with_spins(spins_first: usize, spins_yield: usize) -> FutWait {
FutWait {
spins_first: spins_first,
spins_yield: spins_yield,
parked: parking_lot::Mutex::new(VecDeque::new()),
}
}
pub fn fut_wait(&self, seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
self.spin(seq, at, wc) && self.park(seq, at, wc)
}
pub fn spin(&self, seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
for _ in 0..self.spins_first {
if check(seq, at, wc) {
return false;
}
}
for _ in 0..self.spins_yield {
yield_now();
if check(seq, at, wc) {
return false;
}
}
return true;
}
pub fn park(&self, seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
let mut parked = self.parked.lock();
if check(seq, at, wc) {
return false;
}
parked.push_back(park());
return true;
}
fn send_or_park<T, F: Fn(T) -> Result<(), TrySendError<T>>>(&self,
f: F,
mut val: T)
-> Result<(), TrySendError<T>> {
for _ in 0..self.spins_first {
match f(val) {
Err(TrySendError::Full(v)) => val = v,
v => return v,
}
}
for _ in 0..self.spins_yield {
yield_now();
match f(val) {
Err(TrySendError::Full(v)) => val = v,
v => return v,
}
}
let mut parked = self.parked.lock();
match f(val) {
Err(TrySendError::Full(v)) => {
parked.push_back(park());
return Err(TrySendError::Full(v));
}
v => return v,
}
}
fn notify_one(&self) {
let mut parked = self.parked.lock();
match parked.pop_front() {
Some(val) => {
drop(parked);
val.unpark();
}
None => (),
}
}
}
impl Wait for FutWait {
#[cold]
fn wait(&self, _seq: usize, _w_pos: &AtomicUsize, _wc: &AtomicUsize) {
assert!(false, "Somehow normal wait got called in futures queue");
}
fn notify(&self) {
let mut parked = self.parked.lock();
if parked.len() > 0 {
if parked.len() > 8 {
for val in parked.drain(..) {
val.unpark();
}
} else {
let mut inline_v = smallvec::SmallVec::<[Task; 9]>::new();
inline_v.extend(parked.drain(..));
drop(parked);
for val in inline_v.drain() {
val.unpark();
}
}
}
}
fn needs_notify(&self) -> bool {
true
}
}
impl<RW: QueueRW<T>, T> Clone for InnerSend<RW, T> {
fn clone(&self) -> InnerSend<RW, T> {
self.state.set(QueueState::Multi);
let rval = InnerSend {
queue: self.queue.clone(),
state: Cell::new(QueueState::Multi),
token: self.queue.manager.get_token(),
};
self.queue.writers.fetch_add(1, SeqCst);
rval
}
}
impl<RW: QueueRW<T>, T> Clone for InnerRecv<RW, T> {
fn clone(&self) -> InnerRecv<RW, T> {
self.reader.dup_consumer();
InnerRecv {
queue: self.queue.clone(),
reader: self.reader.clone(),
token: self.queue.manager.get_token(),
alive: true,
}
}
}
impl Clone for FutWait {
fn clone(&self) -> FutWait {
FutWait::with_spins(self.spins_first, self.spins_yield)
}
}
impl<RW: QueueRW<T>, T> Drop for InnerSend<RW, T> {
fn drop(&mut self) {
self.queue.writers.fetch_sub(1, SeqCst);
fence(SeqCst);
self.queue.manager.remove_token(self.token);
self.queue.waiter.notify();
}
}
impl<RW: QueueRW<T>, T> Drop for InnerRecv<RW, T> {
fn drop(&mut self) {
unsafe { self.do_unsubscribe_with(|| ()) }
}
}
impl<RW: QueueRW<T>, T> Drop for MultiQueue<RW, T> {
fn drop(&mut self) {
if RW::do_drop() {
for i in 0..self.capacity {
unsafe {
let cell = &mut *self.data.offset(i);
if !is_tagged(cell.wraps.load(Relaxed)) {
ptr::read(&cell.val);
}
}
}
} else {
let last_read = CountedIndex::from_usize(self.tail.last_pos.get(),
self.capacity as Index);
while last_read.load_count(Relaxed) != self.head.load_count(Relaxed) {
unsafe {
let cur_pos = last_read.load_transaction(Relaxed);
let (cur_ind, _) = cur_pos.get();
ptr::drop_in_place(&mut (*self.data.offset(cur_ind)).val);
cur_pos.commit_direct(1, Relaxed);
}
}
}
}
}
impl<RW: QueueRW<T>, T> Drop for FutInnerRecv<RW, T> {
fn drop(&mut self) {
let prod_wait = self.prod_wait.clone();
unsafe { self.reader.do_unsubscribe_with(|| { prod_wait.notify(); }) }
}
}
impl<RW: QueueRW<T>, R, F: for<'r> FnMut(&T) -> R, T> Drop for FutInnerUniRecv<RW, R, F, T> {
fn drop(&mut self) {
let prod_wait = self.prod_wait.clone();
unsafe { self.reader.do_unsubscribe_with(|| { prod_wait.notify(); }) }
}
}
impl<RW: QueueRW<T>, T> fmt::Debug for InnerRecv<RW, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f,
"MultiQueue error message - you probably tried to unwrap the result of into_single")
}
}
impl<RW: QueueRW<T>, T> fmt::Debug for FutInnerRecv<RW, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f,
"MultiQueue error message - you probably tried to unwrap the result of into_single")
}
}
unsafe impl<RW: QueueRW<T>, T> Sync for MultiQueue<RW, T> {}
unsafe impl<RW: QueueRW<T>, T> Send for MultiQueue<RW, T> {}
unsafe impl<RW: QueueRW<T>, T> Send for InnerSend<RW, T> {}
unsafe impl<RW: QueueRW<T>, T> Send for InnerRecv<RW, T> {}
unsafe impl<RW: QueueRW<T>, T> Send for FutInnerSend<RW, T> {}
unsafe impl<RW: QueueRW<T>, T> Send for FutInnerRecv<RW, T> {}
unsafe impl<RW: QueueRW<T>, R, F: FnMut(&T) -> R, T> Send for FutInnerUniRecv<RW, R, F, T> {}
pub fn futures_multiqueue<RW: QueueRW<T>, T>(capacity: Index)
-> (FutInnerSend<RW, T>, FutInnerRecv<RW, T>) {
let cons_arc = Arc::new(FutWait::new());
let prod_arc = Arc::new(FutWait::new());
let (tx, rx) = MultiQueue::new_internal(capacity, cons_arc.clone());
let ftx = FutInnerSend {
writer: tx,
wait: cons_arc.clone(),
prod_wait: prod_arc.clone(),
};
let rtx = FutInnerRecv {
reader: rx,
wait: cons_arc.clone(),
prod_wait: prod_arc.clone(),
};
(ftx, rtx)
}