1
2use std::any::Any;
3use std::cell::Cell;
4use std::collections::VecDeque;
5use std::error::Error;
6use std::fmt;
7use std::marker::PhantomData;
8use std::mem;
9use std::ptr;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, fence};
12use std::sync::atomic::Ordering::*;
13use std::sync::mpsc::{TrySendError, TryRecvError, RecvError};
14use std::thread::yield_now;
15
16use alloc;
17use atomicsignal::LoadedSignal;
18use countedindex::{CountedIndex, get_valid_wrap, is_tagged, rm_tag, Index, INITIAL_QUEUE_FLAG};
19use memory::{MemoryManager, MemToken};
20use wait::*;
21
22use read_cursor::{ReadCursor, Reader};
23
24extern crate futures;
25extern crate parking_lot;
26extern crate smallvec;
27
28use self::futures::{Async, AsyncSink, Poll, Sink, Stream, StartSend};
29use self::futures::task::{park, Task};
30
31pub trait QueueRW<T> {
34 fn inc_ref(&AtomicUsize);
35 fn dec_ref(&AtomicUsize);
36 fn check_ref(&AtomicUsize) -> bool;
37 fn do_drop() -> bool;
38 unsafe fn get_val(&mut T) -> T;
39 fn forget_val(T);
40 unsafe fn drop_in_place(&mut T);
41}
42
43#[derive(Clone)]
44pub struct BCast<T> {
45 mk: PhantomData<T>,
46}
47
48impl<T: Clone> QueueRW<T> for BCast<T> {
49 #[inline(always)]
51 fn inc_ref(r: &AtomicUsize) {
52 r.fetch_add(1, Relaxed);
53 }
54
55 #[inline(always)]
57 fn dec_ref(r: &AtomicUsize) {
58 r.fetch_sub(1, Relaxed);
59 }
60
61 #[inline(always)]
62 fn check_ref(r: &AtomicUsize) -> bool {
63 r.load(Relaxed) == 0
64 }
65
66 #[inline(always)]
67 fn do_drop() -> bool {
68 true
69 }
70
71 #[inline(always)]
72 unsafe fn get_val(val: &mut T) -> T {
73 val.clone()
74 }
75
76 #[inline(always)]
77 fn forget_val(_v: T) {}
78
79 #[inline(always)]
80 unsafe fn drop_in_place(_v: &mut T) {}
81}
82
83#[derive(Clone)]
84pub struct MPMC<T> {
85 mk: PhantomData<T>,
86}
87
88impl<T> QueueRW<T> for MPMC<T> {
89 #[inline(always)]
90 fn inc_ref(_r: &AtomicUsize) {}
91
92 #[inline(always)]
93 fn dec_ref(_r: &AtomicUsize) {}
94
95 #[inline(always)]
96 fn check_ref(_r: &AtomicUsize) -> bool {
97 true
98 }
99
100 #[inline(always)]
101 fn do_drop() -> bool {
102 false
103 }
104
105 #[inline(always)]
106 unsafe fn get_val(val: &mut T) -> T {
107 ptr::read(val)
108 }
109
110 #[inline(always)]
111 fn forget_val(val: T) {
112 mem::forget(val);
113 }
114
115 #[inline(always)]
116 unsafe fn drop_in_place(val: &mut T) {
117 ptr::drop_in_place(val);
118 }
119}
120
121#[derive(Clone, Copy)]
122enum QueueState {
123 Uni,
124 Multi,
125}
126
127struct QueueEntry<T> {
129 val: T,
130 wraps: AtomicUsize,
131}
132
133struct RefCnt {
135 refcnt: AtomicUsize,
136 _buffer: [u8; 64],
137}
138
139#[repr(C)]
142pub struct MultiQueue<RW: QueueRW<T>, T> {
143 d1: [u8; 64],
144
145 head: CountedIndex,
147 tail_cache: AtomicUsize,
148 writers: AtomicUsize,
149 d2: [u8; 64],
150
151 tail: ReadCursor,
157 data: *mut QueueEntry<T>,
158 refs: *mut RefCnt,
159 capacity: isize,
160 pub waiter: Arc<Wait>,
161 needs_notify: bool,
162 mk: PhantomData<RW>,
163 d3: [u8; 64],
164
165 pub manager: MemoryManager,
166 d4: [u8; 64],
167}
168
169pub struct InnerSend<RW: QueueRW<T>, T> {
170 queue: Arc<MultiQueue<RW, T>>,
171 token: *const MemToken,
172 state: Cell<QueueState>,
173}
174
175pub struct InnerRecv<RW: QueueRW<T>, T> {
176 queue: Arc<MultiQueue<RW, T>>,
177 reader: Reader,
178 token: *const MemToken,
179 alive: bool,
180}
181
182#[derive(Clone)]
184pub struct FutInnerSend<RW: QueueRW<T>, T> {
185 writer: InnerSend<RW, T>,
186 wait: Arc<FutWait>,
187 prod_wait: Arc<FutWait>,
188}
189
190#[derive(Clone)]
192pub struct FutInnerRecv<RW: QueueRW<T>, T> {
193 reader: InnerRecv<RW, T>,
194 wait: Arc<FutWait>,
195 prod_wait: Arc<FutWait>,
196}
197
198pub struct FutInnerUniRecv<RW: QueueRW<T>, R, F: FnMut(&T) -> R, T> {
199 reader: InnerRecv<RW, T>,
200 wait: Arc<FutWait>,
201 prod_wait: Arc<FutWait>,
202 pub op: F,
203}
204
205struct FutWait {
206 spins_first: usize,
207 spins_yield: usize,
208 parked: parking_lot::Mutex<VecDeque<Task>>,
209}
210
211impl<RW: QueueRW<T>, T> MultiQueue<RW, T> {
212 pub fn new(_capacity: Index) -> (InnerSend<RW, T>, InnerRecv<RW, T>) {
213 MultiQueue::new_with(_capacity, BlockingWait::new())
214 }
215
216 pub fn new_with<W: Wait + 'static>(capacity: Index,
217 wait: W)
218 -> (InnerSend<RW, T>, InnerRecv<RW, T>) {
219 MultiQueue::new_internal(capacity, Arc::new(wait))
220 }
221
222 fn new_internal(_capacity: Index, wait: Arc<Wait>) -> (InnerSend<RW, T>, InnerRecv<RW, T>) {
223 let capacity = get_valid_wrap(_capacity);
224 let queuedat = alloc::allocate(capacity as usize);
225 let refdat = alloc::allocate(capacity as usize);
226 unsafe {
227 for i in 0..capacity as isize {
228 let elem: &QueueEntry<T> = &*queuedat.offset(i);
229 elem.wraps.store(INITIAL_QUEUE_FLAG, Relaxed);
230
231 let refd: &RefCnt = &*refdat.offset(i);
232 refd.refcnt.store(0, Relaxed);
233 }
234 }
235
236 let (cursor, reader) = ReadCursor::new(capacity);
237 let needs_notify = wait.needs_notify();
238 let queue = MultiQueue {
239 d1: unsafe { mem::uninitialized() },
240
241 head: CountedIndex::new(capacity),
242 tail_cache: AtomicUsize::new(0),
243 writers: AtomicUsize::new(1),
244 d2: unsafe { mem::uninitialized() },
245
246 tail: cursor,
247 data: queuedat,
248 refs: refdat,
249 capacity: capacity as isize,
250 waiter: wait,
251 needs_notify: needs_notify,
252 mk: PhantomData,
253 d3: unsafe { mem::uninitialized() },
254
255 manager: MemoryManager::new(),
256
257 d4: unsafe { mem::uninitialized() },
258 };
259
260 let qarc = Arc::new(queue);
261
262 let mwriter = InnerSend {
263 queue: qarc.clone(),
264 state: Cell::new(QueueState::Uni),
265 token: qarc.manager.get_token(),
266 };
267
268 let mreader = InnerRecv {
269 queue: qarc.clone(),
270 reader: reader,
271 token: qarc.manager.get_token(),
272 alive: true,
273 };
274
275 (mwriter, mreader)
276 }
277
278 pub fn try_send_multi(&self, val: T) -> Result<(), TrySendError<T>> {
279 let mut transaction = self.head.load_transaction(Relaxed);
280
281 unsafe {
282 loop {
283 let (chead, wrap_valid_tag) = transaction.get();
284 let tail_cache = self.tail_cache.load(Relaxed);
285 if transaction.matches_previous(tail_cache) {
286 let new_tail = self.reload_tail_multi(tail_cache, wrap_valid_tag);
287 if transaction.matches_previous(new_tail) {
288 return Err(TrySendError::Full(val));
289 }
290 }
291 let write_cell = &mut *self.data.offset(chead);
292 let ref_cell = &*self.refs.offset(chead);
293 if !RW::check_ref(&ref_cell.refcnt) {
294 return Err(TrySendError::Full(val));
295 }
296 fence(Acquire);
297
298 match transaction.commit(1, Relaxed) {
299 Some(new_transaction) => transaction = new_transaction,
300 None => {
301 let current_tag = write_cell.wraps.load(Relaxed);
302
303 let _possible_drop = if RW::do_drop() && !is_tagged(current_tag) {
309 Some(ptr::read(&write_cell.val))
310 } else {
311 None
312 };
313 ptr::write(&mut write_cell.val, val);
314 write_cell.wraps.store(wrap_valid_tag, Release);
315 return Ok(());
316 }
317 }
318 }
319 }
320 }
321
322 pub fn try_send_single(&self, val: T) -> Result<(), TrySendError<T>> {
323 let transaction = self.head.load_transaction(Relaxed);
324 let (chead, wrap_valid_tag) = transaction.get();
325 unsafe {
326 let tail_cache = self.tail_cache.load(Relaxed);
327 if transaction.matches_previous(tail_cache) {
328 let new_tail = self.reload_tail_single(wrap_valid_tag);
329 if transaction.matches_previous(new_tail) {
330 return Err(TrySendError::Full(val));
331 }
332 }
333 let write_cell = &mut *self.data.offset(chead);
334 let ref_cell = &*self.refs.offset(chead);
335 if !RW::check_ref(&ref_cell.refcnt) {
336 return Err(TrySendError::Full(val));
337 }
338 fence(Acquire);
339 transaction.commit_direct(1, Relaxed);
340 let current_tag = write_cell.wraps.load(Relaxed);
341 let _possible_drop = if RW::do_drop() && !is_tagged(current_tag) {
342 Some(ptr::read(&write_cell.val))
343 } else {
344 None
345 };
346 ptr::write(&mut write_cell.val, val);
347 write_cell.wraps.store(wrap_valid_tag, Release);
348 Ok(())
349 }
350 }
351
352 pub fn try_recv(&self, reader: &Reader) -> Result<T, (*const AtomicUsize, TryRecvError)> {
353 let mut ctail_attempt = reader.load_attempt(Relaxed);
354 unsafe {
355 loop {
356 let (ctail, wrap_valid_tag) = ctail_attempt.get();
357 let read_cell = &mut *self.data.offset(ctail);
358
359 if rm_tag(read_cell.wraps.load(Relaxed)) != wrap_valid_tag {
366 if self.writers.load(Relaxed) == 0 {
367 fence(Acquire);
368 if rm_tag(read_cell.wraps.load(Acquire)) != wrap_valid_tag {
369 return Err((ptr::null(), TryRecvError::Disconnected));
370 }
371 }
372 return Err((&read_cell.wraps, TryRecvError::Empty));
373 }
374 let ref_cell = &*self.refs.offset(ctail);
375 let is_single = reader.is_single();
376 if !is_single {
377 RW::inc_ref(&ref_cell.refcnt);
378 }
379 fence(Acquire);
380 let rval = RW::get_val(&mut read_cell.val);
381 fence(Release);
382 if !is_single {
383 RW::dec_ref(&ref_cell.refcnt);
384 }
385 match ctail_attempt.commit_attempt(1, Relaxed) {
386 Some(new_attempt) => {
387 ctail_attempt = new_attempt;
388 RW::forget_val(rval);
389 }
390 None => return Ok(rval),
391 }
392 }
393 }
394 }
395
396 pub fn try_recv_view<R, F: FnOnce(&T) -> R>
397 (&self,
398 op: F,
399 reader: &Reader)
400 -> Result<R, (F, *const AtomicUsize, TryRecvError)> {
401 let ctail_attempt = reader.load_attempt(Relaxed);
402 unsafe {
403 let (ctail, wrap_valid_tag) = ctail_attempt.get();
404 let read_cell = &mut *self.data.offset(ctail);
405 if rm_tag(read_cell.wraps.load(Acquire)) != wrap_valid_tag {
406 if self.writers.load(Relaxed) == 0 {
407 fence(Acquire);
408 if rm_tag(read_cell.wraps.load(Acquire)) != wrap_valid_tag {
409 return Err((op, ptr::null(), TryRecvError::Disconnected));
410 }
411 }
412 return Err((op, &read_cell.wraps, TryRecvError::Empty));
413 }
414 let rval = op(&read_cell.val);
415 RW::drop_in_place(&mut read_cell.val);
416 ctail_attempt.commit_direct(1, Release);
417 Ok(rval)
418 }
419 }
420
421 fn reload_tail_multi(&self, tail_cache: usize, count: usize) -> usize {
422 if let Some(max_diff_from_head) = self.tail.get_max_diff(count) {
423 let current_tail = CountedIndex::get_previous(count, max_diff_from_head);
424 if tail_cache == current_tail {
425 return current_tail;
426 }
427 match self.tail_cache.compare_exchange(tail_cache, current_tail, AcqRel, Relaxed) {
428 Ok(_) => current_tail,
429 Err(val) => val,
430 }
431 } else {
432 self.tail_cache.load(Acquire)
433 }
434 }
435
436 fn reload_tail_single(&self, count: usize) -> usize {
437 let max_diff_from_head = self.tail
438 .get_max_diff(count)
439 .expect("The write head got ran over by consumers in single writer mode. This \
440 process is borked!");
441 let current_tail = CountedIndex::get_previous(count, max_diff_from_head);
442 self.tail_cache.store(current_tail, Relaxed);
443 current_tail
444 }
445}
446
447impl<RW: QueueRW<T>, T> InnerSend<RW, T> {
448 #[inline(always)]
449 pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
450 let signal = self.queue.manager.signal.load(Relaxed);
451 if signal.has_action() {
452 let disconnected = self.handle_signals(signal);
453 if disconnected {
454 return Err(TrySendError::Full(val));
455 }
456 }
457 let val = match self.state.get() {
458 QueueState::Uni => self.queue.try_send_single(val),
459 QueueState::Multi => {
460 if self.queue.writers.load(Relaxed) == 1 {
461 fence(Acquire);
462 self.state.set(QueueState::Uni);
463 self.queue.try_send_single(val)
464 } else {
465 self.queue.try_send_multi(val)
466 }
467 }
468 };
469 if val.is_ok() {
475 if self.queue.needs_notify {
476 self.queue.waiter.notify();
477 }
478 }
479 val
480 }
481
482 pub fn unsubscribe(self) {}
484
485 #[cold]
486 fn handle_signals(&self, signal: LoadedSignal) -> bool {
487 if signal.get_epoch() {
488 self.queue.manager.update_token(self.token);
489 }
490 signal.get_reader()
491 }
492}
493
494impl<RW: QueueRW<T>, T> InnerRecv<RW, T> {
495 #[inline(always)]
496 pub fn try_recv(&self) -> Result<T, TryRecvError> {
497 self.examine_signals();
498 match self.queue.try_recv(&self.reader) {
499 Ok(v) => Ok(v),
500 Err((_, e)) => Err(e),
501 }
502 }
503
504 pub fn recv(&self) -> Result<T, RecvError> {
505 self.examine_signals();
506 loop {
507 match self.queue.try_recv(&self.reader) {
508 Ok(v) => return Ok(v),
509 Err((_, TryRecvError::Disconnected)) => return Err(RecvError),
510 Err((pt, TryRecvError::Empty)) => {
511 let count = self.reader.load_count(Relaxed);
512 unsafe {
513 self.queue.waiter.wait(count, &*pt, &self.queue.writers);
514 }
515 }
516 }
517 }
518 }
519
520 pub fn is_single(&self) -> bool {
521 self.reader.get_consumers() == 1
522 }
523
524 #[inline(always)]
525 pub fn try_recv_view<R, F: FnOnce(&T) -> R>(&self, op: F) -> Result<R, (F, TryRecvError)> {
526 self.examine_signals();
527 match self.queue.try_recv_view(op, &self.reader) {
528 Ok(v) => Ok(v),
529 Err((op, _, e)) => Err((op, e)),
530 }
531 }
532
533 pub fn recv_view<R, F: FnOnce(&T) -> R>(&self, mut op: F) -> Result<R, (F, RecvError)> {
534 self.examine_signals();
535 loop {
536 match self.queue.try_recv_view(op, &self.reader) {
537 Ok(v) => return Ok(v),
538 Err((o, _, TryRecvError::Disconnected)) => return Err((o, RecvError)),
539 Err((o, pt, TryRecvError::Empty)) => {
540 op = o;
541 let count = self.reader.load_count(Relaxed);
542 unsafe {
543 self.queue.waiter.wait(count, &*pt, &self.queue.writers);
544 }
545 }
546 }
547 }
548 }
549
550 pub fn add_stream(&self) -> InnerRecv<RW, T> {
551 InnerRecv {
552 queue: self.queue.clone(),
553 reader: self.queue.tail.add_stream(&self.reader, &self.queue.manager),
554 token: self.queue.manager.get_token(),
555 alive: true,
556 }
557 }
558
559 #[inline(always)]
560 fn examine_signals(&self) {
561 let signal = self.queue.manager.signal.load(Relaxed);
562 if signal.has_action() {
563 self.handle_signals(signal);
564 }
565 }
566
567 #[cold]
568 fn handle_signals(&self, signal: LoadedSignal) {
569 if signal.get_epoch() {
570 self.queue.manager.update_token(self.token);
571 }
572 }
573
574
575 pub fn unsubscribe(self) -> bool {
576 self.reader.get_consumers() == 1
577 }
578
579 unsafe fn do_unsubscribe_with<F: FnOnce()>(&mut self, f: F) {
581 if self.alive {
582 self.alive = false;
583 if self.reader.remove_consumer() == 1 {
584 if self.queue.tail.remove_reader(&self.reader, &self.queue.manager) {
585 self.queue.manager.signal.set_reader(SeqCst);
586 }
587 self.queue.manager.remove_token(self.token);
588 }
589 fence(SeqCst);
590 f()
591 }
592 }
593}
594
595
596impl<RW: QueueRW<T>, T> FutInnerSend<RW, T> {
597 pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
599 self.writer.try_send(val)
600 }
601
602 pub fn unsubscribe(self) {
604 self.writer.unsubscribe()
605 }
606}
607
608impl<RW: QueueRW<T>, T> FutInnerRecv<RW, T> {
609 #[inline(always)]
611 pub fn try_recv(&self) -> Result<T, TryRecvError> {
612 self.reader.try_recv()
613 }
614
615 #[inline(always)]
616 pub fn recv(&self) -> Result<T, RecvError> {
617 self.reader.recv()
618 }
619
620 pub fn add_stream(&self) -> FutInnerRecv<RW, T> {
622 let rx = self.reader.add_stream();
623 FutInnerRecv {
624 reader: rx,
625 wait: self.wait.clone(),
626 prod_wait: self.prod_wait.clone(),
627 }
628 }
629
630 pub fn into_single<R, F: FnMut(&T) -> R>
633 (self,
634 op: F)
635 -> Result<FutInnerUniRecv<RW, R, F, T>, (F, FutInnerRecv<RW, T>)> {
636 let new_mreader;
637 let new_pwait = self.prod_wait.clone();
638 let new_wait = self.wait.clone();
639 {
640 new_mreader = self.reader.clone();
641 drop(self);
642 }
643 if new_mreader.is_single() {
644 Ok(FutInnerUniRecv {
645 reader: new_mreader,
646 wait: new_wait,
647 prod_wait: new_pwait,
648 op: op,
649 })
650 } else {
651 Err((op,
652 FutInnerRecv {
653 reader: new_mreader,
654 wait: new_wait,
655 prod_wait: new_pwait,
656 }))
657 }
658 }
659
660 pub fn unsubscribe(self) -> bool {
662 self.reader.reader.get_consumers() == 1
663 }
664}
665
666impl<RW: QueueRW<T>, R, F: FnMut(&T) -> R, T> FutInnerUniRecv<RW, R, F, T> {
672 #[inline(always)]
674 pub fn try_recv(&mut self) -> Result<R, TryRecvError> {
675 let opref = &mut self.op;
676 let rval = self.reader.try_recv_view(|tr| opref(tr));
677 self.prod_wait.notify_one();
678 rval.map_err(|x| x.1)
679 }
680
681 #[inline(always)]
683 pub fn recv(&mut self) -> Result<R, RecvError> {
684 let opref = &mut self.op;
685 let rval = self.reader.recv_view(|tr| opref(tr));
686 self.prod_wait.notify_one();
687 rval.map_err(|x| x.1)
688 }
689
690 pub fn add_stream_with<Q, FQ: FnMut(&T) -> Q>(&self, op: FQ) -> FutInnerUniRecv<RW, Q, FQ, T> {
692 let rx = self.reader.add_stream();
693 FutInnerUniRecv {
694 reader: rx,
695 wait: self.wait.clone(),
696 prod_wait: self.prod_wait.clone(),
697 op: op,
698 }
699 }
700
701 pub fn unsubscribe(self) -> bool {
703 self.reader.reader.get_consumers() == 1
704 }
705
706 pub fn into_multi(self) -> FutInnerRecv<RW, T> {
707 let new_reader = self.reader.add_stream();
708 FutInnerRecv {
709 reader: new_reader,
710 wait: self.wait.clone(),
711 prod_wait: self.prod_wait.clone(),
712 }
713 }
714}
715
716pub struct SendError<T>(T);
724
725impl<T> fmt::Debug for SendError<T> {
726 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
727 fmt.debug_tuple("SendError")
728 .field(&"...")
729 .finish()
730 }
731}
732
733impl<T> fmt::Display for SendError<T> {
734 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
735 write!(fmt, "send failed because receiver is gone")
736 }
737}
738
739impl<T> Error for SendError<T>
740 where T: Any
741{
742 fn description(&self) -> &str {
743 "send failed because receiver is gone"
744 }
745}
746
747impl<T> SendError<T> {
748 pub fn into_inner(self) -> T {
750 self.0
751 }
752}
753
754impl<RW: QueueRW<T>, T> Sink for FutInnerSend<RW, T> {
755 type SinkItem = T;
756 type SinkError = SendError<T>;
757
758 fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
760
761 match self.prod_wait.send_or_park(|m| self.writer.try_send(m), msg) {
762 Ok(_) => {
763 if self.writer.queue.needs_notify {
765 self.writer.queue.waiter.notify();
766 }
767 Ok(AsyncSink::Ready)
768 }
769 Err(TrySendError::Full(msg)) => Ok(AsyncSink::NotReady(msg)),
770 Err(TrySendError::Disconnected(msg)) => Err(SendError(msg)),
771 }
772 }
773
774 #[inline(always)]
775 fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
776 Ok(Async::Ready(()))
777 }
778}
779
780impl<RW: QueueRW<T>, T> Stream for FutInnerRecv<RW, T> {
781 type Item = T;
782 type Error = ();
783
784 #[inline]
786 fn poll(&mut self) -> Poll<Option<T>, ()> {
787 self.reader.examine_signals();
788 loop {
789 match self.reader.queue.try_recv(&self.reader.reader) {
790 Ok(msg) => {
791 self.prod_wait.notify_one();
792 return Ok(Async::Ready(Some(msg)));
793 }
794 Err((_, TryRecvError::Disconnected)) => return Ok(Async::Ready(None)),
795 Err((pt, _)) => {
796 let count = self.reader.reader.load_count(Relaxed);
797 if unsafe { self.wait.fut_wait(count, &*pt, &self.reader.queue.writers) } {
798 return Ok(Async::NotReady);
799 }
800 }
801 }
802 }
803 }
804}
805
806impl<RW: QueueRW<T>, R, F: for<'r> FnMut(&T) -> R, T> Stream for FutInnerUniRecv<RW, R, F, T> {
807 type Item = R;
808 type Error = ();
809
810 #[inline]
811 fn poll(&mut self) -> Poll<Option<R>, ()> {
812 self.reader.examine_signals();
813 loop {
814 let opref = &mut self.op;
815 match self.reader
816 .queue
817 .try_recv_view(opref, &self.reader.reader) {
818 Ok(msg) => {
819 self.prod_wait.notify_one();
820 return Ok(Async::Ready(Some(msg)));
821 }
822 Err((_, _, TryRecvError::Disconnected)) => return Ok(Async::Ready(None)),
823 Err((_, pt, _)) => {
824 let count = self.reader.reader.load_count(Relaxed);
825 if unsafe { self.wait.fut_wait(count, &*pt, &self.reader.queue.writers) } {
826 return Ok(Async::NotReady);
827 }
828 }
829 }
830 }
831 }
832}
833
834
835impl FutWait {
838 pub fn new() -> FutWait {
839 FutWait::with_spins(DEFAULT_TRY_SPINS, DEFAULT_YIELD_SPINS)
840 }
841
842 pub fn with_spins(spins_first: usize, spins_yield: usize) -> FutWait {
843 FutWait {
844 spins_first: spins_first,
845 spins_yield: spins_yield,
846 parked: parking_lot::Mutex::new(VecDeque::new()),
847 }
848 }
849
850 pub fn fut_wait(&self, seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
851 self.spin(seq, at, wc) && self.park(seq, at, wc)
852 }
853
854 pub fn spin(&self, seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
855 for _ in 0..self.spins_first {
856 if check(seq, at, wc) {
857 return false;
858 }
859 }
860
861 for _ in 0..self.spins_yield {
862 yield_now();
863 if check(seq, at, wc) {
864 return false;
865 }
866 }
867 return true;
868 }
869
870 pub fn park(&self, seq: usize, at: &AtomicUsize, wc: &AtomicUsize) -> bool {
871 let mut parked = self.parked.lock();
872 if check(seq, at, wc) {
873 return false;
874 }
875 parked.push_back(park());
876 return true;
877 }
878
879 fn send_or_park<T, F: Fn(T) -> Result<(), TrySendError<T>>>(&self,
880 f: F,
881 mut val: T)
882 -> Result<(), TrySendError<T>> {
883 for _ in 0..self.spins_first {
884 match f(val) {
885 Err(TrySendError::Full(v)) => val = v,
886 v => return v,
887 }
888 }
889
890 for _ in 0..self.spins_yield {
891 yield_now();
892 match f(val) {
893 Err(TrySendError::Full(v)) => val = v,
894 v => return v,
895 }
896 }
897
898 let mut parked = self.parked.lock();
899 match f(val) {
900 Err(TrySendError::Full(v)) => {
901 parked.push_back(park());
902 return Err(TrySendError::Full(v));
903 }
904 v => return v,
905 }
906 }
907
908 fn notify_one(&self) {
909 let mut parked = self.parked.lock();
910 match parked.pop_front() {
911 Some(val) => {
912 drop(parked);
913 val.unpark();
914 }
915 None => (),
916 }
917 }
918}
919
920impl Wait for FutWait {
921 #[cold]
922 fn wait(&self, _seq: usize, _w_pos: &AtomicUsize, _wc: &AtomicUsize) {
923 assert!(false, "Somehow normal wait got called in futures queue");
924 }
925
926 fn notify(&self) {
927 let mut parked = self.parked.lock();
928 if parked.len() > 0 {
929 if parked.len() > 8 {
930 for val in parked.drain(..) {
931 val.unpark();
932 }
933 } else {
934 let mut inline_v = smallvec::SmallVec::<[Task; 9]>::new();
935 inline_v.extend(parked.drain(..));
936 drop(parked);
937 for val in inline_v.drain() {
938 val.unpark();
939 }
940 }
941 }
942 }
943
944 fn needs_notify(&self) -> bool {
945 true
946 }
947}
948
949impl<RW: QueueRW<T>, T> Clone for InnerSend<RW, T> {
952 fn clone(&self) -> InnerSend<RW, T> {
953 self.state.set(QueueState::Multi);
954 let rval = InnerSend {
955 queue: self.queue.clone(),
956 state: Cell::new(QueueState::Multi),
957 token: self.queue.manager.get_token(),
958 };
959 self.queue.writers.fetch_add(1, SeqCst);
960 rval
961 }
962}
963
964impl<RW: QueueRW<T>, T> Clone for InnerRecv<RW, T> {
965 fn clone(&self) -> InnerRecv<RW, T> {
966 self.reader.dup_consumer();
967 InnerRecv {
968 queue: self.queue.clone(),
969 reader: self.reader.clone(),
970 token: self.queue.manager.get_token(),
971 alive: true,
972 }
973 }
974}
975
976impl Clone for FutWait {
977 fn clone(&self) -> FutWait {
978 FutWait::with_spins(self.spins_first, self.spins_yield)
979 }
980}
981
982impl<RW: QueueRW<T>, T> Drop for InnerSend<RW, T> {
985 fn drop(&mut self) {
986 self.queue.writers.fetch_sub(1, SeqCst);
987 fence(SeqCst);
988 self.queue.manager.remove_token(self.token);
989 self.queue.waiter.notify();
990 }
991}
992
993impl<RW: QueueRW<T>, T> Drop for InnerRecv<RW, T> {
994 fn drop(&mut self) {
995 unsafe { self.do_unsubscribe_with(|| ()) }
996 }
997}
998
999impl<RW: QueueRW<T>, T> Drop for MultiQueue<RW, T> {
1000 fn drop(&mut self) {
1001 if RW::do_drop() {
1002 for i in 0..self.capacity {
1006 unsafe {
1007 let cell = &mut *self.data.offset(i);
1008 if !is_tagged(cell.wraps.load(Relaxed)) {
1009 ptr::read(&cell.val);
1010 }
1011 }
1012 }
1013 } else {
1014 let last_read = CountedIndex::from_usize(self.tail.last_pos.get(),
1015 self.capacity as Index);
1016 while last_read.load_count(Relaxed) != self.head.load_count(Relaxed) {
1017 unsafe {
1018 let cur_pos = last_read.load_transaction(Relaxed);
1019 let (cur_ind, _) = cur_pos.get();
1020 ptr::drop_in_place(&mut (*self.data.offset(cur_ind)).val);
1021 cur_pos.commit_direct(1, Relaxed);
1022 }
1023 }
1024 }
1025 }
1026}
1027
1028impl<RW: QueueRW<T>, T> Drop for FutInnerRecv<RW, T> {
1029 fn drop(&mut self) {
1030 let prod_wait = self.prod_wait.clone();
1031 unsafe { self.reader.do_unsubscribe_with(|| { prod_wait.notify(); }) }
1032 }
1033}
1034
1035impl<RW: QueueRW<T>, R, F: for<'r> FnMut(&T) -> R, T> Drop for FutInnerUniRecv<RW, R, F, T> {
1036 fn drop(&mut self) {
1037 let prod_wait = self.prod_wait.clone();
1038 unsafe { self.reader.do_unsubscribe_with(|| { prod_wait.notify(); }) }
1039 }
1040}
1041
1042impl<RW: QueueRW<T>, T> fmt::Debug for InnerRecv<RW, T> {
1043 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1044 write!(f,
1045 "MultiQueue error message - you probably tried to unwrap the result of into_single")
1046 }
1047}
1048
1049impl<RW: QueueRW<T>, T> fmt::Debug for FutInnerRecv<RW, T> {
1050 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1051 write!(f,
1052 "MultiQueue error message - you probably tried to unwrap the result of into_single")
1053 }
1054}
1055
1056unsafe impl<RW: QueueRW<T>, T> Sync for MultiQueue<RW, T> {}
1057unsafe impl<RW: QueueRW<T>, T> Send for MultiQueue<RW, T> {}
1058unsafe impl<RW: QueueRW<T>, T> Send for InnerSend<RW, T> {}
1059unsafe impl<RW: QueueRW<T>, T> Send for InnerRecv<RW, T> {}
1060unsafe impl<RW: QueueRW<T>, T> Send for FutInnerSend<RW, T> {}
1061unsafe impl<RW: QueueRW<T>, T> Send for FutInnerRecv<RW, T> {}
1062unsafe impl<RW: QueueRW<T>, R, F: FnMut(&T) -> R, T> Send for FutInnerUniRecv<RW, R, F, T> {}
1063
1064pub fn futures_multiqueue<RW: QueueRW<T>, T>(capacity: Index)
1065 -> (FutInnerSend<RW, T>, FutInnerRecv<RW, T>) {
1066 let cons_arc = Arc::new(FutWait::new());
1067 let prod_arc = Arc::new(FutWait::new());
1068 let (tx, rx) = MultiQueue::new_internal(capacity, cons_arc.clone());
1069 let ftx = FutInnerSend {
1070 writer: tx,
1071 wait: cons_arc.clone(),
1072 prod_wait: prod_arc.clone(),
1073 };
1074 let rtx = FutInnerRecv {
1075 reader: rx,
1076 wait: cons_arc.clone(),
1077 prod_wait: prod_arc.clone(),
1078 };
1079 (ftx, rtx)
1080}