multiqueue/
multiqueue.rs

1
2use std::any::Any;
3use std::cell::Cell;
4use std::collections::VecDeque;
5use std::error::Error;
6use std::fmt;
7use std::marker::PhantomData;
8use std::mem;
9use std::ptr;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, fence};
12use std::sync::atomic::Ordering::*;
13use std::sync::mpsc::{TrySendError, TryRecvError, RecvError};
14use std::thread::yield_now;
15
16use alloc;
17use atomicsignal::LoadedSignal;
18use countedindex::{CountedIndex, get_valid_wrap, is_tagged, rm_tag, Index, INITIAL_QUEUE_FLAG};
19use memory::{MemoryManager, MemToken};
20use wait::*;
21
22use read_cursor::{ReadCursor, Reader};
23
24extern crate futures;
25extern crate parking_lot;
26extern crate smallvec;
27
28use self::futures::{Async, AsyncSink, Poll, Sink, Stream, StartSend};
29use self::futures::task::{park, Task};
30
31/// This is basically acting as a static bool
32/// so the queue can act as a normal mpmc in other circumstances
33pub trait QueueRW<T> {
34    fn inc_ref(&AtomicUsize);
35    fn dec_ref(&AtomicUsize);
36    fn check_ref(&AtomicUsize) -> bool;
37    fn do_drop() -> bool;
38    unsafe fn get_val(&mut T) -> T;
39    fn forget_val(T);
40    unsafe fn drop_in_place(&mut T);
41}
42
43#[derive(Clone)]
44pub struct BCast<T> {
45    mk: PhantomData<T>,
46}
47
48impl<T: Clone> QueueRW<T> for BCast<T> {
49    // TODO: Skip refcount when type is copyable or clone is safe on junk data
50    #[inline(always)]
51    fn inc_ref(r: &AtomicUsize) {
52        r.fetch_add(1, Relaxed);
53    }
54
55    // TODO: Skip refcount when type is copyable or clone is safe on junk data
56    #[inline(always)]
57    fn dec_ref(r: &AtomicUsize) {
58        r.fetch_sub(1, Relaxed);
59    }
60
61    #[inline(always)]
62    fn check_ref(r: &AtomicUsize) -> bool {
63        r.load(Relaxed) == 0
64    }
65
66    #[inline(always)]
67    fn do_drop() -> bool {
68        true
69    }
70
71    #[inline(always)]
72    unsafe fn get_val(val: &mut T) -> T {
73        val.clone()
74    }
75
76    #[inline(always)]
77    fn forget_val(_v: T) {}
78
79    #[inline(always)]
80    unsafe fn drop_in_place(_v: &mut T) {}
81}
82
83#[derive(Clone)]
84pub struct MPMC<T> {
85    mk: PhantomData<T>,
86}
87
88impl<T> QueueRW<T> for MPMC<T> {
89    #[inline(always)]
90    fn inc_ref(_r: &AtomicUsize) {}
91
92    #[inline(always)]
93    fn dec_ref(_r: &AtomicUsize) {}
94
95    #[inline(always)]
96    fn check_ref(_r: &AtomicUsize) -> bool {
97        true
98    }
99
100    #[inline(always)]
101    fn do_drop() -> bool {
102        false
103    }
104
105    #[inline(always)]
106    unsafe fn get_val(val: &mut T) -> T {
107        ptr::read(val)
108    }
109
110    #[inline(always)]
111    fn forget_val(val: T) {
112        mem::forget(val);
113    }
114
115    #[inline(always)]
116    unsafe fn drop_in_place(val: &mut T) {
117        ptr::drop_in_place(val);
118    }
119}
120
121#[derive(Clone, Copy)]
122enum QueueState {
123    Uni,
124    Multi,
125}
126
127/// This holds entries in the queue
128struct QueueEntry<T> {
129    val: T,
130    wraps: AtomicUsize,
131}
132
133/// This holds the refcount object
134struct RefCnt {
135    refcnt: AtomicUsize,
136    _buffer: [u8; 64],
137}
138
139/// A bounded queue that supports multiple reader and writers
140/// and supports effecient methods for single consumers and producers
141#[repr(C)]
142pub struct MultiQueue<RW: QueueRW<T>, T> {
143    d1: [u8; 64],
144
145    // Writer data
146    head: CountedIndex,
147    tail_cache: AtomicUsize,
148    writers: AtomicUsize,
149    d2: [u8; 64],
150
151    // Shared Data
152    // The data and the wraps flag are in the same location
153    // to reduce the # of distinct cache lines read when getting an item
154    // The tail itself is rarely modified, making it a suitable candidate
155    // to be in the shared space
156    tail: ReadCursor,
157    data: *mut QueueEntry<T>,
158    refs: *mut RefCnt,
159    capacity: isize,
160    pub waiter: Arc<Wait>,
161    needs_notify: bool,
162    mk: PhantomData<RW>,
163    d3: [u8; 64],
164
165    pub manager: MemoryManager,
166    d4: [u8; 64],
167}
168
169pub struct InnerSend<RW: QueueRW<T>, T> {
170    queue: Arc<MultiQueue<RW, T>>,
171    token: *const MemToken,
172    state: Cell<QueueState>,
173}
174
175pub struct InnerRecv<RW: QueueRW<T>, T> {
176    queue: Arc<MultiQueue<RW, T>>,
177    reader: Reader,
178    token: *const MemToken,
179    alive: bool,
180}
181
182/// This is a sender that can transparently act as a futures stream
183#[derive(Clone)]
184pub struct FutInnerSend<RW: QueueRW<T>, T> {
185    writer: InnerSend<RW, T>,
186    wait: Arc<FutWait>,
187    prod_wait: Arc<FutWait>,
188}
189
190/// This is a receiver that can transparently act as a futures stream
191#[derive(Clone)]
192pub struct FutInnerRecv<RW: QueueRW<T>, T> {
193    reader: InnerRecv<RW, T>,
194    wait: Arc<FutWait>,
195    prod_wait: Arc<FutWait>,
196}
197
198pub struct FutInnerUniRecv<RW: QueueRW<T>, R, F: FnMut(&T) -> R, T> {
199    reader: InnerRecv<RW, T>,
200    wait: Arc<FutWait>,
201    prod_wait: Arc<FutWait>,
202    pub op: F,
203}
204
205struct FutWait {
206    spins_first: usize,
207    spins_yield: usize,
208    parked: parking_lot::Mutex<VecDeque<Task>>,
209}
210
211impl<RW: QueueRW<T>, T> MultiQueue<RW, T> {
212    pub fn new(_capacity: Index) -> (InnerSend<RW, T>, InnerRecv<RW, T>) {
213        MultiQueue::new_with(_capacity, BlockingWait::new())
214    }
215
216    pub fn new_with<W: Wait + 'static>(capacity: Index,
217                                       wait: W)
218                                       -> (InnerSend<RW, T>, InnerRecv<RW, T>) {
219        MultiQueue::new_internal(capacity, Arc::new(wait))
220    }
221
222    fn new_internal(_capacity: Index, wait: Arc<Wait>) -> (InnerSend<RW, T>, InnerRecv<RW, T>) {
223        let capacity = get_valid_wrap(_capacity);
224        let queuedat = alloc::allocate(capacity as usize);
225        let refdat = alloc::allocate(capacity as usize);
226        unsafe {
227            for i in 0..capacity as isize {
228                let elem: &QueueEntry<T> = &*queuedat.offset(i);
229                elem.wraps.store(INITIAL_QUEUE_FLAG, Relaxed);
230
231                let refd: &RefCnt = &*refdat.offset(i);
232                refd.refcnt.store(0, Relaxed);
233            }
234        }
235
236        let (cursor, reader) = ReadCursor::new(capacity);
237        let needs_notify = wait.needs_notify();
238        let queue = MultiQueue {
239            d1: unsafe { mem::uninitialized() },
240
241            head: CountedIndex::new(capacity),
242            tail_cache: AtomicUsize::new(0),
243            writers: AtomicUsize::new(1),
244            d2: unsafe { mem::uninitialized() },
245
246            tail: cursor,
247            data: queuedat,
248            refs: refdat,
249            capacity: capacity as isize,
250            waiter: wait,
251            needs_notify: needs_notify,
252            mk: PhantomData,
253            d3: unsafe { mem::uninitialized() },
254
255            manager: MemoryManager::new(),
256
257            d4: unsafe { mem::uninitialized() },
258        };
259
260        let qarc = Arc::new(queue);
261
262        let mwriter = InnerSend {
263            queue: qarc.clone(),
264            state: Cell::new(QueueState::Uni),
265            token: qarc.manager.get_token(),
266        };
267
268        let mreader = InnerRecv {
269            queue: qarc.clone(),
270            reader: reader,
271            token: qarc.manager.get_token(),
272            alive: true,
273        };
274
275        (mwriter, mreader)
276    }
277
278    pub fn try_send_multi(&self, val: T) -> Result<(), TrySendError<T>> {
279        let mut transaction = self.head.load_transaction(Relaxed);
280
281        unsafe {
282            loop {
283                let (chead, wrap_valid_tag) = transaction.get();
284                let tail_cache = self.tail_cache.load(Relaxed);
285                if transaction.matches_previous(tail_cache) {
286                    let new_tail = self.reload_tail_multi(tail_cache, wrap_valid_tag);
287                    if transaction.matches_previous(new_tail) {
288                        return Err(TrySendError::Full(val));
289                    }
290                }
291                let write_cell = &mut *self.data.offset(chead);
292                let ref_cell = &*self.refs.offset(chead);
293                if !RW::check_ref(&ref_cell.refcnt) {
294                    return Err(TrySendError::Full(val));
295                }
296                fence(Acquire);
297
298                match transaction.commit(1, Relaxed) {
299                    Some(new_transaction) => transaction = new_transaction,
300                    None => {
301                        let current_tag = write_cell.wraps.load(Relaxed);
302
303                        // This will delay the dropping of the exsisting item until
304                        // after the write is done. This will have a marginal effect on
305                        // throughput in most cases but will really help latency.
306                        // Hopefully the compiler is smart enough to get rid of this
307                        // when there's no drop
308                        let _possible_drop = if RW::do_drop() && !is_tagged(current_tag) {
309                            Some(ptr::read(&write_cell.val))
310                        } else {
311                            None
312                        };
313                        ptr::write(&mut write_cell.val, val);
314                        write_cell.wraps.store(wrap_valid_tag, Release);
315                        return Ok(());
316                    }
317                }
318            }
319        }
320    }
321
322    pub fn try_send_single(&self, val: T) -> Result<(), TrySendError<T>> {
323        let transaction = self.head.load_transaction(Relaxed);
324        let (chead, wrap_valid_tag) = transaction.get();
325        unsafe {
326            let tail_cache = self.tail_cache.load(Relaxed);
327            if transaction.matches_previous(tail_cache) {
328                let new_tail = self.reload_tail_single(wrap_valid_tag);
329                if transaction.matches_previous(new_tail) {
330                    return Err(TrySendError::Full(val));
331                }
332            }
333            let write_cell = &mut *self.data.offset(chead);
334            let ref_cell = &*self.refs.offset(chead);
335            if !RW::check_ref(&ref_cell.refcnt) {
336                return Err(TrySendError::Full(val));
337            }
338            fence(Acquire);
339            transaction.commit_direct(1, Relaxed);
340            let current_tag = write_cell.wraps.load(Relaxed);
341            let _possible_drop = if RW::do_drop() && !is_tagged(current_tag) {
342                Some(ptr::read(&write_cell.val))
343            } else {
344                None
345            };
346            ptr::write(&mut write_cell.val, val);
347            write_cell.wraps.store(wrap_valid_tag, Release);
348            Ok(())
349        }
350    }
351
352    pub fn try_recv(&self, reader: &Reader) -> Result<T, (*const AtomicUsize, TryRecvError)> {
353        let mut ctail_attempt = reader.load_attempt(Relaxed);
354        unsafe {
355            loop {
356                let (ctail, wrap_valid_tag) = ctail_attempt.get();
357                let read_cell = &mut *self.data.offset(ctail);
358
359                // For any curious readers, this gnarly if block catchs a race between
360                // advancing the write index and unsubscribing from the queue. in short,
361                // Since unsubscribe happens after the read_cell is written, there's a race
362                // between the first and second if statements. Hence, a second check is required
363                // after the writer load so ensure that the the wrap_valid_tag is still wrong so
364                // we had actually seen a race. Doing it this way removes fences on the fast path
365                if rm_tag(read_cell.wraps.load(Relaxed)) != wrap_valid_tag {
366                    if self.writers.load(Relaxed) == 0 {
367                        fence(Acquire);
368                        if rm_tag(read_cell.wraps.load(Acquire)) != wrap_valid_tag {
369                            return Err((ptr::null(), TryRecvError::Disconnected));
370                        }
371                    }
372                    return Err((&read_cell.wraps, TryRecvError::Empty));
373                }
374                let ref_cell = &*self.refs.offset(ctail);
375                let is_single = reader.is_single();
376                if !is_single {
377                    RW::inc_ref(&ref_cell.refcnt);
378                }
379                fence(Acquire);
380                let rval = RW::get_val(&mut read_cell.val);
381                fence(Release);
382                if !is_single {
383                    RW::dec_ref(&ref_cell.refcnt);
384                }
385                match ctail_attempt.commit_attempt(1, Relaxed) {
386                    Some(new_attempt) => {
387                        ctail_attempt = new_attempt;
388                        RW::forget_val(rval);
389                    }
390                    None => return Ok(rval),
391                }
392            }
393        }
394    }
395
396    pub fn try_recv_view<R, F: FnOnce(&T) -> R>
397        (&self,
398         op: F,
399         reader: &Reader)
400         -> Result<R, (F, *const AtomicUsize, TryRecvError)> {
401        let ctail_attempt = reader.load_attempt(Relaxed);
402        unsafe {
403            let (ctail, wrap_valid_tag) = ctail_attempt.get();
404            let read_cell = &mut *self.data.offset(ctail);
405            if rm_tag(read_cell.wraps.load(Acquire)) != wrap_valid_tag {
406                if self.writers.load(Relaxed) == 0 {
407                    fence(Acquire);
408                    if rm_tag(read_cell.wraps.load(Acquire)) != wrap_valid_tag {
409                        return Err((op, ptr::null(), TryRecvError::Disconnected));
410                    }
411                }
412                return Err((op, &read_cell.wraps, TryRecvError::Empty));
413            }
414            let rval = op(&read_cell.val);
415            RW::drop_in_place(&mut read_cell.val);
416            ctail_attempt.commit_direct(1, Release);
417            Ok(rval)
418        }
419    }
420
421    fn reload_tail_multi(&self, tail_cache: usize, count: usize) -> usize {
422        if let Some(max_diff_from_head) = self.tail.get_max_diff(count) {
423            let current_tail = CountedIndex::get_previous(count, max_diff_from_head);
424            if tail_cache == current_tail {
425                return current_tail;
426            }
427            match self.tail_cache.compare_exchange(tail_cache, current_tail, AcqRel, Relaxed) {
428                Ok(_) => current_tail,
429                Err(val) => val,
430            }
431        } else {
432            self.tail_cache.load(Acquire)
433        }
434    }
435
436    fn reload_tail_single(&self, count: usize) -> usize {
437        let max_diff_from_head = self.tail
438            .get_max_diff(count)
439            .expect("The write head got ran over by consumers in single writer mode. This \
440                     process is borked!");
441        let current_tail = CountedIndex::get_previous(count, max_diff_from_head);
442        self.tail_cache.store(current_tail, Relaxed);
443        current_tail
444    }
445}
446
447impl<RW: QueueRW<T>, T> InnerSend<RW, T> {
448    #[inline(always)]
449    pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
450        let signal = self.queue.manager.signal.load(Relaxed);
451        if signal.has_action() {
452            let disconnected = self.handle_signals(signal);
453            if disconnected {
454                return Err(TrySendError::Full(val));
455            }
456        }
457        let val = match self.state.get() {
458            QueueState::Uni => self.queue.try_send_single(val),
459            QueueState::Multi => {
460                if self.queue.writers.load(Relaxed) == 1 {
461                    fence(Acquire);
462                    self.state.set(QueueState::Uni);
463                    self.queue.try_send_single(val)
464                } else {
465                    self.queue.try_send_multi(val)
466                }
467            }
468        };
469        // Putting this in the send functions
470        // greatly confuses the compiler and literally halfs
471        // the performance of the queue. I suspect the compiler
472        // always sets up a stack from regardless of the condition
473        // and that hurts optimizations around it.
474        if val.is_ok() {
475            if self.queue.needs_notify {
476                self.queue.waiter.notify();
477            }
478        }
479        val
480    }
481
482    /// Removes the writer as a producer to the queue
483    pub fn unsubscribe(self) {}
484
485    #[cold]
486    fn handle_signals(&self, signal: LoadedSignal) -> bool {
487        if signal.get_epoch() {
488            self.queue.manager.update_token(self.token);
489        }
490        signal.get_reader()
491    }
492}
493
494impl<RW: QueueRW<T>, T> InnerRecv<RW, T> {
495    #[inline(always)]
496    pub fn try_recv(&self) -> Result<T, TryRecvError> {
497        self.examine_signals();
498        match self.queue.try_recv(&self.reader) {
499            Ok(v) => Ok(v),
500            Err((_, e)) => Err(e),
501        }
502    }
503
504    pub fn recv(&self) -> Result<T, RecvError> {
505        self.examine_signals();
506        loop {
507            match self.queue.try_recv(&self.reader) {
508                Ok(v) => return Ok(v),
509                Err((_, TryRecvError::Disconnected)) => return Err(RecvError),
510                Err((pt, TryRecvError::Empty)) => {
511                    let count = self.reader.load_count(Relaxed);
512                    unsafe {
513                        self.queue.waiter.wait(count, &*pt, &self.queue.writers);
514                    }
515                }
516            }
517        }
518    }
519
520    pub fn is_single(&self) -> bool {
521        self.reader.get_consumers() == 1
522    }
523
524    #[inline(always)]
525    pub fn try_recv_view<R, F: FnOnce(&T) -> R>(&self, op: F) -> Result<R, (F, TryRecvError)> {
526        self.examine_signals();
527        match self.queue.try_recv_view(op, &self.reader) {
528            Ok(v) => Ok(v),
529            Err((op, _, e)) => Err((op, e)),
530        }
531    }
532
533    pub fn recv_view<R, F: FnOnce(&T) -> R>(&self, mut op: F) -> Result<R, (F, RecvError)> {
534        self.examine_signals();
535        loop {
536            match self.queue.try_recv_view(op, &self.reader) {
537                Ok(v) => return Ok(v),
538                Err((o, _, TryRecvError::Disconnected)) => return Err((o, RecvError)),
539                Err((o, pt, TryRecvError::Empty)) => {
540                    op = o;
541                    let count = self.reader.load_count(Relaxed);
542                    unsafe {
543                        self.queue.waiter.wait(count, &*pt, &self.queue.writers);
544                    }
545                }
546            }
547        }
548    }
549
550    pub fn add_stream(&self) -> InnerRecv<RW, T> {
551        InnerRecv {
552            queue: self.queue.clone(),
553            reader: self.queue.tail.add_stream(&self.reader, &self.queue.manager),
554            token: self.queue.manager.get_token(),
555            alive: true,
556        }
557    }
558
559    #[inline(always)]
560    fn examine_signals(&self) {
561        let signal = self.queue.manager.signal.load(Relaxed);
562        if signal.has_action() {
563            self.handle_signals(signal);
564        }
565    }
566
567    #[cold]
568    fn handle_signals(&self, signal: LoadedSignal) {
569        if signal.get_epoch() {
570            self.queue.manager.update_token(self.token);
571        }
572    }
573
574
575    pub fn unsubscribe(self) -> bool {
576        self.reader.get_consumers() == 1
577    }
578
579    /// Runs the passed function after unsubscribing the reader from the queue
580    unsafe fn do_unsubscribe_with<F: FnOnce()>(&mut self, f: F) {
581        if self.alive {
582            self.alive = false;
583            if self.reader.remove_consumer() == 1 {
584                if self.queue.tail.remove_reader(&self.reader, &self.queue.manager) {
585                    self.queue.manager.signal.set_reader(SeqCst);
586                }
587                self.queue.manager.remove_token(self.token);
588            }
589            fence(SeqCst);
590            f()
591        }
592    }
593}
594
595
596impl<RW: QueueRW<T>, T> FutInnerSend<RW, T> {
597    /// Identical to InnerSend::try_send()
598    pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
599        self.writer.try_send(val)
600    }
601
602    /// Identical to InnerSend::unsubscribe()
603    pub fn unsubscribe(self) {
604        self.writer.unsubscribe()
605    }
606}
607
608impl<RW: QueueRW<T>, T> FutInnerRecv<RW, T> {
609    /// Identical to InnerRecv::try_recv()
610    #[inline(always)]
611    pub fn try_recv(&self) -> Result<T, TryRecvError> {
612        self.reader.try_recv()
613    }
614
615    #[inline(always)]
616    pub fn recv(&self) -> Result<T, RecvError> {
617        self.reader.recv()
618    }
619
620    /// Creates a new stream and returns a FutInnerRecv on that stream
621    pub fn add_stream(&self) -> FutInnerRecv<RW, T> {
622        let rx = self.reader.add_stream();
623        FutInnerRecv {
624            reader: rx,
625            wait: self.wait.clone(),
626            prod_wait: self.prod_wait.clone(),
627        }
628    }
629
630    /// Attempts to transform this receiver into a FutInnerUniRecv
631    /// calling the passed function on the input data.
632    pub fn into_single<R, F: FnMut(&T) -> R>
633        (self,
634         op: F)
635         -> Result<FutInnerUniRecv<RW, R, F, T>, (F, FutInnerRecv<RW, T>)> {
636        let new_mreader;
637        let new_pwait = self.prod_wait.clone();
638        let new_wait = self.wait.clone();
639        {
640            new_mreader = self.reader.clone();
641            drop(self);
642        }
643        if new_mreader.is_single() {
644            Ok(FutInnerUniRecv {
645                reader: new_mreader,
646                wait: new_wait,
647                prod_wait: new_pwait,
648                op: op,
649            })
650        } else {
651            Err((op,
652                 FutInnerRecv {
653                     reader: new_mreader,
654                     wait: new_wait,
655                     prod_wait: new_pwait,
656                 }))
657        }
658    }
659
660    /// Identical to InnerRecv::unsubscribe()
661    pub fn unsubscribe(self) -> bool {
662        self.reader.reader.get_consumers() == 1
663    }
664}
665
666/// This struct acts as a UniInnerRecv except operating as a futures Stream on incoming data
667///
668/// Since this operates in an iterator-like manner on the data stream, it holds the function
669/// it calls and to use a different function must transform itself into a different
670/// FutInnerUniRecv using transform_operation
671impl<RW: QueueRW<T>, R, F: FnMut(&T) -> R, T> FutInnerUniRecv<RW, R, F, T> {
672    /// Identical to UniInnerRecv::try_recv, uses operation held by FutInnerUniRecv
673    #[inline(always)]
674    pub fn try_recv(&mut self) -> Result<R, TryRecvError> {
675        let opref = &mut self.op;
676        let rval = self.reader.try_recv_view(|tr| opref(tr));
677        self.prod_wait.notify_one();
678        rval.map_err(|x| x.1)
679    }
680
681    /// Identical to UniInnerRecv::recv, uses operation held by FutInnerUniRecv
682    #[inline(always)]
683    pub fn recv(&mut self) -> Result<R, RecvError> {
684        let opref = &mut self.op;
685        let rval = self.reader.recv_view(|tr| opref(tr));
686        self.prod_wait.notify_one();
687        rval.map_err(|x| x.1)
688    }
689
690    /// Adds another stream to the queue with a FutInnerUniRecv using the passed function
691    pub fn add_stream_with<Q, FQ: FnMut(&T) -> Q>(&self, op: FQ) -> FutInnerUniRecv<RW, Q, FQ, T> {
692        let rx = self.reader.add_stream();
693        FutInnerUniRecv {
694            reader: rx,
695            wait: self.wait.clone(),
696            prod_wait: self.prod_wait.clone(),
697            op: op,
698        }
699    }
700
701    /// Identical to InnerRecv::unsubscribe()
702    pub fn unsubscribe(self) -> bool {
703        self.reader.reader.get_consumers() == 1
704    }
705
706    pub fn into_multi(self) -> FutInnerRecv<RW, T> {
707        let new_reader = self.reader.add_stream();
708        FutInnerRecv {
709            reader: new_reader,
710            wait: self.wait.clone(),
711            prod_wait: self.prod_wait.clone(),
712        }
713    }
714}
715
716//////// Fut stream/sink implementations
717
718// The mpsc SendError struct can't be constructed according to rustc
719// since it's a struct and the ctor is private. Copied and pasted here
720
721/// Error type for sending, used when the receiving end of the channel is
722/// dropped
723pub struct SendError<T>(T);
724
725impl<T> fmt::Debug for SendError<T> {
726    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
727        fmt.debug_tuple("SendError")
728            .field(&"...")
729            .finish()
730    }
731}
732
733impl<T> fmt::Display for SendError<T> {
734    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
735        write!(fmt, "send failed because receiver is gone")
736    }
737}
738
739impl<T> Error for SendError<T>
740    where T: Any
741{
742    fn description(&self) -> &str {
743        "send failed because receiver is gone"
744    }
745}
746
747impl<T> SendError<T> {
748    /// Returns the message that was attempted to be sent but failed.
749    pub fn into_inner(self) -> T {
750        self.0
751    }
752}
753
754impl<RW: QueueRW<T>, T> Sink for FutInnerSend<RW, T> {
755    type SinkItem = T;
756    type SinkError = SendError<T>;
757
758    /// Essentially try_send except parks if the queue is full
759    fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
760
761        match self.prod_wait.send_or_park(|m| self.writer.try_send(m), msg) {
762            Ok(_) => {
763                // see InnerSend::try_recv for why this isn't in the queue
764                if self.writer.queue.needs_notify {
765                    self.writer.queue.waiter.notify();
766                }
767                Ok(AsyncSink::Ready)
768            }
769            Err(TrySendError::Full(msg)) => Ok(AsyncSink::NotReady(msg)),
770            Err(TrySendError::Disconnected(msg)) => Err(SendError(msg)),
771        }
772    }
773
774    #[inline(always)]
775    fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
776        Ok(Async::Ready(()))
777    }
778}
779
780impl<RW: QueueRW<T>, T> Stream for FutInnerRecv<RW, T> {
781    type Item = T;
782    type Error = ();
783
784    /// Essentially the same as recv
785    #[inline]
786    fn poll(&mut self) -> Poll<Option<T>, ()> {
787        self.reader.examine_signals();
788        loop {
789            match self.reader.queue.try_recv(&self.reader.reader) {
790                Ok(msg) => {
791                    self.prod_wait.notify_one();
792                    return Ok(Async::Ready(Some(msg)));
793                }
794                Err((_, TryRecvError::Disconnected)) => return Ok(Async::Ready(None)),
795                Err((pt, _)) => {
796                    let count = self.reader.reader.load_count(Relaxed);
797                    if unsafe { self.wait.fut_wait(count, &*pt, &self.reader.queue.writers) } {
798                        return Ok(Async::NotReady);
799                    }
800                }
801            }
802        }
803    }
804}
805
806impl<RW: QueueRW<T>, R, F: for<'r> FnMut(&T) -> R, T> Stream for FutInnerUniRecv<RW, R, F, T> {
807    type Item = R;
808    type Error = ();
809
810    #[inline]
811    fn poll(&mut self) -> Poll<Option<R>, ()> {
812        self.reader.examine_signals();
813        loop {
814            let opref = &mut self.op;
815            match self.reader
816                .queue
817                .try_recv_view(opref, &self.reader.reader) {
818                Ok(msg) => {
819                    self.prod_wait.notify_one();
820                    return Ok(Async::Ready(Some(msg)));
821                }
822                Err((_, _, TryRecvError::Disconnected)) => return Ok(Async::Ready(None)),
823                Err((_, pt, _)) => {
824                    let count = self.reader.reader.load_count(Relaxed);
825                    if unsafe { self.wait.fut_wait(count, &*pt, &self.reader.queue.writers) } {
826                        return Ok(Async::NotReady);
827                    }
828                }
829            }
830        }
831    }
832}
833
834
835//////// FutWait
836
837impl FutWait {
838    pub fn new() -> FutWait {
839        FutWait::with_spins(DEFAULT_TRY_SPINS, DEFAULT_YIELD_SPINS)
840    }
841
842    pub fn with_spins(spins_first: usize, spins_yield: usize) -> FutWait {
843        FutWait {
844            spins_first: spins_first,
845            spins_yield: spins_yield,
846            parked: parking_lot::Mutex::new(VecDeque::new()),
847        }
848    }
849
850    pub fn fut_wait(&self, seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
851        self.spin(seq, at, wc) && self.park(seq, at, wc)
852    }
853
854    pub fn spin(&self, seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
855        for _ in 0..self.spins_first {
856            if check(seq, at, wc) {
857                return false;
858            }
859        }
860
861        for _ in 0..self.spins_yield {
862            yield_now();
863            if check(seq, at, wc) {
864                return false;
865            }
866        }
867        return true;
868    }
869
870    pub fn park(&self, seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
871        let mut parked = self.parked.lock();
872        if check(seq, at, wc) {
873            return false;
874        }
875        parked.push_back(park());
876        return true;
877    }
878
879    fn send_or_park<T, F: Fn(T) -> Result<(), TrySendError<T>>>(&self,
880                                                                f: F,
881                                                                mut val: T)
882                                                                -> Result<(), TrySendError<T>> {
883        for _ in 0..self.spins_first {
884            match f(val) {
885                Err(TrySendError::Full(v)) => val = v,
886                v => return v,
887            }
888        }
889
890        for _ in 0..self.spins_yield {
891            yield_now();
892            match f(val) {
893                Err(TrySendError::Full(v)) => val = v,
894                v => return v,
895            }
896        }
897
898        let mut parked = self.parked.lock();
899        match f(val) {
900            Err(TrySendError::Full(v)) => {
901                parked.push_back(park());
902                return Err(TrySendError::Full(v));
903            }
904            v => return v,
905        }
906    }
907
908    fn notify_one(&self) {
909        let mut parked = self.parked.lock();
910        match parked.pop_front() {
911            Some(val) => {
912                drop(parked);
913                val.unpark();
914            }
915            None => (),
916        }
917    }
918}
919
920impl Wait for FutWait {
921    #[cold]
922    fn wait(&self, _seq: usize, _w_pos: &AtomicUsize, _wc: &AtomicUsize) {
923        assert!(false, "Somehow normal wait got called in futures queue");
924    }
925
926    fn notify(&self) {
927        let mut parked = self.parked.lock();
928        if parked.len() > 0 {
929            if parked.len() > 8 {
930                for val in parked.drain(..) {
931                    val.unpark();
932                }
933            } else {
934                let mut inline_v = smallvec::SmallVec::<[Task; 9]>::new();
935                inline_v.extend(parked.drain(..));
936                drop(parked);
937                for val in inline_v.drain() {
938                    val.unpark();
939                }
940            }
941        }
942    }
943
944    fn needs_notify(&self) -> bool {
945        true
946    }
947}
948
949//////// Clone implementations
950
951impl<RW: QueueRW<T>, T> Clone for InnerSend<RW, T> {
952    fn clone(&self) -> InnerSend<RW, T> {
953        self.state.set(QueueState::Multi);
954        let rval = InnerSend {
955            queue: self.queue.clone(),
956            state: Cell::new(QueueState::Multi),
957            token: self.queue.manager.get_token(),
958        };
959        self.queue.writers.fetch_add(1, SeqCst);
960        rval
961    }
962}
963
964impl<RW: QueueRW<T>, T> Clone for InnerRecv<RW, T> {
965    fn clone(&self) -> InnerRecv<RW, T> {
966        self.reader.dup_consumer();
967        InnerRecv {
968            queue: self.queue.clone(),
969            reader: self.reader.clone(),
970            token: self.queue.manager.get_token(),
971            alive: true,
972        }
973    }
974}
975
976impl Clone for FutWait {
977    fn clone(&self) -> FutWait {
978        FutWait::with_spins(self.spins_first, self.spins_yield)
979    }
980}
981
982//////// Drop implementations
983
984impl<RW: QueueRW<T>, T> Drop for InnerSend<RW, T> {
985    fn drop(&mut self) {
986        self.queue.writers.fetch_sub(1, SeqCst);
987        fence(SeqCst);
988        self.queue.manager.remove_token(self.token);
989        self.queue.waiter.notify();
990    }
991}
992
993impl<RW: QueueRW<T>, T> Drop for InnerRecv<RW, T> {
994    fn drop(&mut self) {
995        unsafe { self.do_unsubscribe_with(|| ()) }
996    }
997}
998
999impl<RW: QueueRW<T>, T> Drop for MultiQueue<RW, T> {
1000    fn drop(&mut self) {
1001        if RW::do_drop() {
1002            // everything that's tagged shouldn't be dropped
1003            // otherwise, everything else is valid and waiting to be read
1004            // or invalid and waiting to be overwritten/dropped
1005            for i in 0..self.capacity {
1006                unsafe {
1007                    let cell = &mut *self.data.offset(i);
1008                    if !is_tagged(cell.wraps.load(Relaxed)) {
1009                        ptr::read(&cell.val);
1010                    }
1011                }
1012            }
1013        } else {
1014            let last_read = CountedIndex::from_usize(self.tail.last_pos.get(),
1015                                                     self.capacity as Index);
1016            while last_read.load_count(Relaxed) != self.head.load_count(Relaxed) {
1017                unsafe {
1018                    let cur_pos = last_read.load_transaction(Relaxed);
1019                    let (cur_ind, _) = cur_pos.get();
1020                    ptr::drop_in_place(&mut (*self.data.offset(cur_ind)).val);
1021                    cur_pos.commit_direct(1, Relaxed);
1022                }
1023            }
1024        }
1025    }
1026}
1027
1028impl<RW: QueueRW<T>, T> Drop for FutInnerRecv<RW, T> {
1029    fn drop(&mut self) {
1030        let prod_wait = self.prod_wait.clone();
1031        unsafe { self.reader.do_unsubscribe_with(|| { prod_wait.notify(); }) }
1032    }
1033}
1034
1035impl<RW: QueueRW<T>, R, F: for<'r> FnMut(&T) -> R, T> Drop for FutInnerUniRecv<RW, R, F, T> {
1036    fn drop(&mut self) {
1037        let prod_wait = self.prod_wait.clone();
1038        unsafe { self.reader.do_unsubscribe_with(|| { prod_wait.notify(); }) }
1039    }
1040}
1041
1042impl<RW: QueueRW<T>, T> fmt::Debug for InnerRecv<RW, T> {
1043    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1044        write!(f,
1045               "MultiQueue error message - you probably tried to unwrap the result of into_single")
1046    }
1047}
1048
1049impl<RW: QueueRW<T>, T> fmt::Debug for FutInnerRecv<RW, T> {
1050    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1051        write!(f,
1052               "MultiQueue error message - you probably tried to unwrap the result of into_single")
1053    }
1054}
1055
1056unsafe impl<RW: QueueRW<T>, T> Sync for MultiQueue<RW, T> {}
1057unsafe impl<RW: QueueRW<T>, T> Send for MultiQueue<RW, T> {}
1058unsafe impl<RW: QueueRW<T>, T> Send for InnerSend<RW, T> {}
1059unsafe impl<RW: QueueRW<T>, T> Send for InnerRecv<RW, T> {}
1060unsafe impl<RW: QueueRW<T>, T> Send for FutInnerSend<RW, T> {}
1061unsafe impl<RW: QueueRW<T>, T> Send for FutInnerRecv<RW, T> {}
1062unsafe impl<RW: QueueRW<T>, R, F: FnMut(&T) -> R, T> Send for FutInnerUniRecv<RW, R, F, T> {}
1063
1064pub fn futures_multiqueue<RW: QueueRW<T>, T>(capacity: Index)
1065                                             -> (FutInnerSend<RW, T>, FutInnerRecv<RW, T>) {
1066    let cons_arc = Arc::new(FutWait::new());
1067    let prod_arc = Arc::new(FutWait::new());
1068    let (tx, rx) = MultiQueue::new_internal(capacity, cons_arc.clone());
1069    let ftx = FutInnerSend {
1070        writer: tx,
1071        wait: cons_arc.clone(),
1072        prod_wait: prod_arc.clone(),
1073    };
1074    let rtx = FutInnerRecv {
1075        reader: rx,
1076        wait: cons_arc.clone(),
1077        prod_wait: prod_arc.clone(),
1078    };
1079    (ftx, rtx)
1080}