1use crate::{
2 internal::{acquire_internal, Internal},
3 signal::AsyncSignal,
4 AsyncReceiver, ReceiveError, SendError,
5};
6use core::{
7 cell::UnsafeCell,
8 fmt::Debug,
9 marker::PhantomPinned,
10 mem::{transmute, ManuallyDrop},
11 pin::Pin,
12 task::Poll,
13};
14
15use branches::{likely, unlikely};
16use futures_core::{FusedStream, Future, Stream};
17
18#[repr(u8)]
19#[derive(PartialEq, Clone, Copy)]
20pub(crate) enum FutureState {
21 Unregistered,
22 Pending,
23 Success,
24 Failure,
25 Done,
26}
27
28#[cold]
29fn mark_branch_unlikely() {}
30
31#[allow(unused)]
32impl FutureState {
33 #[inline(always)]
34 fn is_pending(&self) -> bool {
35 *self == FutureState::Pending
36 }
37
38 #[inline(always)]
39 fn is_done(&self) -> bool {
40 *self == FutureState::Done
41 }
42
43 #[inline(always)]
44 fn is_unregistered(&self) -> bool {
45 *self == FutureState::Unregistered
46 }
47
48 #[inline(always)]
49 fn is_success(&self) -> bool {
50 *self == FutureState::Success
51 }
52
53 #[inline(always)]
54 fn is_failure(&self) -> bool {
55 *self == FutureState::Failure
56 }
57}
58
59#[must_use = "futures do nothing unless you .await or poll them"]
62pub struct SendFuture<'a, T> {
63 internal: &'a Internal<T>,
64 sig: AsyncSignal<T>,
65 _pinned: PhantomPinned,
66}
67
68impl<T> Debug for SendFuture<'_, T> {
69 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
70 write!(f, "SendFuture {{ .. }}")
71 }
72}
73
74impl<T> Drop for SendFuture<'_, T> {
75 fn drop(&mut self) {
76 let state = self.sig.state();
77 if unlikely(!state.is_done()) {
78 let mut need_drop = true;
83
84 if state.is_pending()
85 && acquire_internal(self.internal).cancel_send_signal(self.sig.as_tagged_ptr())
86 {
87 need_drop = true;
89 } else if !state.is_unregistered() && self.sig.blocking_wait() {
90 need_drop = false;
92 }
93
94 if need_drop {
95 unsafe { self.sig.drop_data() };
97 }
98 }
99 }
100}
101
102impl<'a, T> SendFuture<'a, T> {
103 #[inline(always)]
105 pub(crate) fn new(internal: &'a Internal<T>, data: T) -> Self {
106 SendFuture {
107 internal,
108 sig: AsyncSignal::new_send(data),
109 _pinned: PhantomPinned,
110 }
111 }
112 #[inline(always)]
113 pub(crate) fn new_finished(internal: &'a Internal<T>) -> Self {
114 SendFuture {
115 internal,
116 sig: AsyncSignal::new_send_finished(),
117 _pinned: PhantomPinned,
118 }
119 }
120}
121
122impl<T> Future for SendFuture<'_, T> {
123 type Output = Result<(), SendError<T>>;
124
125 #[inline(always)]
126 fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
127 let this = unsafe { self.get_unchecked_mut() };
128
129 match this.sig.state() {
130 FutureState::Unregistered => {
131 let cap = this.internal.capacity();
132 let mut internal = acquire_internal(this.internal);
133 if unlikely(internal.recv_count == 0) {
134 drop(internal);
135 this.sig.set_state_relaxed(FutureState::Done);
136 unsafe {
138 return Poll::Ready(Err(SendError(this.sig.assume_init())));
139 }
140 }
141 if let Some(first) = internal.next_recv() {
142 drop(internal);
143 this.sig.set_state_relaxed(FutureState::Done);
144 unsafe { first.send(this.sig.assume_init()) }
146 return Poll::Ready(Ok(()));
147 }
148 if cap > 0 && internal.queue.len() < cap {
149 this.sig.set_state_relaxed(FutureState::Done);
150 internal.queue.push_back(unsafe { this.sig.assume_init() });
152 drop(internal);
153 return Poll::Ready(Ok(()));
154 }
155 this.sig.set_state(FutureState::Pending);
156 unsafe {
158 this.sig.update_waker(cx.waker());
159 }
160 internal.push_signal(this.sig.dynamic_ptr());
162 drop(internal);
163 Poll::Pending
164 }
165 FutureState::Success => {
166 this.sig.set_state_relaxed(FutureState::Done);
167 Poll::Ready(Ok(()))
168 }
169 FutureState::Pending => {
170 mark_branch_unlikely();
171 let waker = cx.waker();
172 if unlikely(unsafe { !this.sig.will_wake(waker) }) {
174 let internal = acquire_internal(this.internal);
176 if internal.send_signal_exists(this.sig.as_tagged_ptr()) {
177 unsafe {
180 this.sig.update_waker(waker);
181 }
182 drop(internal);
183 return Poll::Pending;
184 }
185 drop(internal);
186 this.sig.set_state(FutureState::Done);
192 if likely(this.sig.blocking_wait()) {
193 return Poll::Ready(Ok(()));
194 }
195 Poll::Ready(Err(SendError(unsafe { this.sig.assume_init() })))
197 } else {
198 Poll::Pending
199 }
200 }
201 FutureState::Failure => {
202 mark_branch_unlikely();
203 this.sig.set_state_relaxed(FutureState::Done);
204 Poll::Ready(Err(SendError(unsafe { this.sig.assume_init() })))
206 }
207 FutureState::Done => {
208 mark_branch_unlikely();
209 panic!("polled after result is already returned")
210 }
211 }
212 }
213}
214
215#[must_use = "futures do nothing unless you .await or poll them"]
218pub struct ReceiveFuture<'a, T> {
219 is_stream: bool,
220 internal: &'a Internal<T>,
221 sig: AsyncSignal<T>,
222 _pinned: PhantomPinned,
223}
224
225impl<T> Drop for ReceiveFuture<'_, T> {
226 fn drop(&mut self) {
227 let state = self.sig.state();
228 if unlikely(!state.is_done()) {
229 if state.is_pending()
231 && acquire_internal(self.internal).cancel_recv_signal(self.sig.as_tagged_ptr())
232 {
233 return;
235 }
236 if !state.is_unregistered() && self.sig.blocking_wait() {
240 unsafe {
246 if self.internal.capacity() == 0 {
247 #[cfg(debug_assertions)]
248 println!(
249 "warning: ReceiveFuture dropped while send operation is in progress"
250 );
251 self.sig.drop_data();
252 } else {
253 acquire_internal(self.internal)
255 .queue
256 .push_front(self.sig.assume_init())
257 }
258 }
259 }
260 }
261 }
262}
263
264impl<T> Debug for ReceiveFuture<'_, T> {
265 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
266 write!(f, "ReceiveFuture {{ .. }}")
267 }
268}
269
270impl<'a, T> ReceiveFuture<'a, T> {
271 #[inline(always)]
272 pub(crate) fn new_ref(internal: &'a Internal<T>) -> Self {
273 Self {
274 sig: AsyncSignal::new_recv(),
275 internal,
276 is_stream: false,
277 _pinned: PhantomPinned,
278 }
279 }
280}
281
282impl<T> Future for ReceiveFuture<'_, T> {
283 type Output = Result<T, ReceiveError>;
284
285 #[inline(always)]
286 fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
287 let this = unsafe { self.get_unchecked_mut() };
288
289 loop {
290 return match this.sig.state() {
291 FutureState::Unregistered => {
292 let cap = this.internal.capacity();
293 let mut internal = acquire_internal(this.internal);
294 if unlikely(internal.recv_count == 0) {
295 drop(internal);
296 this.sig.set_state_relaxed(FutureState::Done);
297 return Poll::Ready(Err(ReceiveError()));
298 }
299 if cap > 0 {
300 if let Some(v) = internal.queue.pop_front() {
301 if let Some(t) = internal.next_send() {
302 unsafe { internal.queue.push_back(t.recv()) }
304 }
305 drop(internal);
306 this.sig.set_state_relaxed(FutureState::Done);
307 return Poll::Ready(Ok(v));
308 }
309 }
310 if let Some(t) = internal.next_send() {
311 drop(internal);
312 this.sig.set_state_relaxed(FutureState::Done);
313 return Poll::Ready(Ok(unsafe { t.recv() }));
314 }
315 if unlikely(internal.send_count == 0) {
316 this.sig.set_state_relaxed(FutureState::Done);
317 return Poll::Ready(Err(ReceiveError()));
318 }
319 this.sig.set_state(FutureState::Pending);
320 unsafe {
322 this.sig.update_waker(cx.waker());
323 }
324 internal.push_signal(this.sig.dynamic_ptr());
326 drop(internal);
327 Poll::Pending
328 }
329 FutureState::Success => {
330 this.sig.set_state_relaxed(FutureState::Done);
331 Poll::Ready(Ok(unsafe { this.sig.assume_init() }))
333 }
334 FutureState::Pending => {
335 let waker = cx.waker();
336 if unsafe { !this.sig.will_wake(waker) } {
338 let internal = acquire_internal(this.internal);
341 if internal.recv_signal_exists(this.sig.as_tagged_ptr()) {
342 unsafe {
345 this.sig.update_waker(waker);
346 }
347 drop(internal);
348 Poll::Pending
349 } else {
350 drop(internal);
351 this.sig.set_state_relaxed(FutureState::Done);
357 if likely(this.sig.blocking_wait()) {
358 Poll::Ready(Ok(unsafe { this.sig.assume_init() }))
360 } else {
361 Poll::Ready(Err(ReceiveError()))
362 }
363 }
364 } else {
365 Poll::Pending
366 }
367 }
368 FutureState::Failure => {
369 mark_branch_unlikely();
370 this.sig.set_state_relaxed(FutureState::Done);
371 Poll::Ready(Err(ReceiveError()))
372 }
373 FutureState::Done => {
374 mark_branch_unlikely();
375 if this.is_stream {
376 this.sig.set_state_relaxed(FutureState::Unregistered);
377 continue;
378 }
379 panic!("polled after result is already returned")
380 }
381 };
382 }
383 }
384}
385
386pub struct ReceiveStream<'a, T: 'a> {
389 future: Pin<Box<ReceiveFuture<'a, T>>>,
390 terminated: bool,
391 receiver: &'a AsyncReceiver<T>,
392}
393
394impl<T> Debug for ReceiveStream<'_, T> {
395 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
396 write!(f, "ReceiveStream {{ .. }}")
397 }
398}
399
400impl<T> Stream for ReceiveStream<'_, T> {
401 type Item = T;
402
403 fn poll_next(
404 mut self: Pin<&mut Self>,
405 cx: &mut core::task::Context<'_>,
406 ) -> Poll<Option<Self::Item>> {
407 if unlikely(self.terminated) {
408 return Poll::Ready(None);
409 }
410 match self.future.as_mut().poll(cx) {
412 Poll::Ready(res) => match res {
413 Ok(d) => Poll::Ready(Some(d)),
414 Err(_) => {
415 mark_branch_unlikely();
416 self.terminated = true;
417 Poll::Ready(None)
418 }
419 },
420 Poll::Pending => Poll::Pending,
421 }
422 }
423}
424
425impl<T> FusedStream for ReceiveStream<'_, T> {
426 fn is_terminated(&self) -> bool {
427 self.receiver.is_terminated()
428 }
429}
430
431impl<'a, T> ReceiveStream<'a, T> {
432 pub(crate) fn new_borrowed(receiver: &'a AsyncReceiver<T>) -> Self {
433 let mut future = receiver.recv();
434 future.is_stream = true;
435 ReceiveStream {
436 future: Box::pin(future),
437 terminated: false,
438 receiver,
439 }
440 }
441}
442
443pub struct ReceiveStreamOwned<T: 'static> {
446 future: ManuallyDrop<Pin<Box<ReceiveFuture<'static, T>>>>,
447 terminated: bool,
448 receiver: ManuallyDrop<Pin<Box<AsyncReceiver<T>>>>,
449}
450
451impl<T: 'static> Debug for ReceiveStreamOwned<T> {
452 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
453 write!(f, "ReceiveStreamOwned {{ .. }}")
454 }
455}
456
457impl<T: 'static> Stream for ReceiveStreamOwned<T> {
458 type Item = T;
459
460 fn poll_next(
461 mut self: Pin<&mut Self>,
462 cx: &mut core::task::Context<'_>,
463 ) -> Poll<Option<Self::Item>> {
464 if unlikely(self.terminated) {
465 return Poll::Ready(None);
466 }
467 let future = &mut *self.future;
470 match future.as_mut().poll(cx) {
471 Poll::Ready(res) => match res {
472 Ok(d) => Poll::Ready(Some(d)),
473 Err(_) => {
474 mark_branch_unlikely();
475 self.terminated = true;
476 Poll::Ready(None)
477 }
478 },
479 Poll::Pending => Poll::Pending,
480 }
481 }
482}
483
484impl<T: 'static> FusedStream for ReceiveStreamOwned<T> {
485 fn is_terminated(&self) -> bool {
486 (&*self.receiver).is_terminated()
488 }
489}
490
491impl<T: 'static> ReceiveStreamOwned<T> {
492 pub(crate) fn new(receiver: AsyncReceiver<T>) -> Self {
493 let receiver = Box::pin(receiver);
494 let mut future = ReceiveFuture::new_ref(&receiver.internal);
495 future.is_stream = true;
496 let future = unsafe {
497 transmute::<Pin<Box<ReceiveFuture<'_, T>>>, Pin<Box<ReceiveFuture<'static, T>>>>(
500 Box::pin(future),
501 )
502 };
503 ReceiveStreamOwned {
504 future: ManuallyDrop::new(future),
505 terminated: false,
506 receiver: ManuallyDrop::new(receiver),
507 }
508 }
509}
510
511impl<T: 'static> Drop for ReceiveStreamOwned<T> {
512 fn drop(&mut self) {
513 unsafe {
515 ManuallyDrop::drop(&mut self.future);
516 ManuallyDrop::drop(&mut self.receiver);
517 }
518 }
519}
520
521#[must_use = "futures do nothing unless you .await or poll them"]
524#[derive(Debug)]
525pub struct SendManyFuture<'a, 'b, T> {
526 internal: &'a Internal<T>,
527 fut: UnsafeCell<SendFuture<'a, T>>,
529 elements: &'b mut std::collections::VecDeque<T>,
531 finished: bool,
533 in_wait_queue: bool,
535 _pinned: PhantomPinned,
536}
537
538impl<'a, 'b, T> SendManyFuture<'a, 'b, T> {
539 #[inline(always)]
540 pub(crate) fn new(
541 internal: &'a Internal<T>,
542 elements: &'b mut std::collections::VecDeque<T>,
543 ) -> Self {
544 SendManyFuture {
545 internal,
546 fut: UnsafeCell::new(SendFuture::new_finished(internal)),
547 elements,
548 finished: false,
549 in_wait_queue: false,
550 _pinned: PhantomPinned,
551 }
552 }
553}
554
555impl<'a, 'b, T> Future for SendManyFuture<'a, 'b, T> {
556 type Output = Result<(), SendError<T>>;
557
558 #[inline(always)]
559 fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
560 let this = unsafe { self.get_unchecked_mut() };
561
562 loop {
563 if unlikely(this.finished) {
564 panic!("polled after completion");
565 }
566
567 if this.in_wait_queue {
568 let fut = unsafe { Pin::new_unchecked(this.fut.get_mut()) };
570
571 match fut.poll(cx) {
572 Poll::Ready(res) => {
573 if this.elements.is_empty() {
574 this.finished = true;
575 return Poll::Ready(res);
576 }
577 }
578 Poll::Pending => return Poll::Pending,
579 }
580 this.in_wait_queue = false;
581 }
582
583 if unlikely(this.elements.is_empty()) {
585 this.finished = true;
586 return Poll::Ready(Ok(()));
587 }
588
589 let cap = this.internal.capacity();
591
592 let mut internal = acquire_internal(this.internal);
594
595 if unlikely(internal.recv_count == 0) {
597 let first = this.elements.pop_front().unwrap();
599 drop(internal);
600 this.finished = true;
601 return Poll::Ready(Err(SendError(first)));
602 }
603
604 while let Some(waiter) = internal.next_recv() {
608 let v = this.elements.pop_front().unwrap();
609 unsafe {
611 waiter.send(v);
612 }
613 if unlikely(this.elements.is_empty()) {
614 drop(internal);
616 this.finished = true;
617 return Poll::Ready(Ok(()));
618 }
619 }
620
621 if unlikely(this.elements.is_empty()) {
622 drop(internal);
624 this.finished = true;
625 return Poll::Ready(Ok(()));
626 }
627
628 if cap > 0 {
632 while internal.queue.len() < cap {
633 if let Some(v) = this.elements.pop_front() {
634 internal.queue.push_back(v);
635 } else {
636 drop(internal);
638 this.finished = true;
639 return Poll::Ready(Ok(()));
640 }
641 }
642 }
643
644 if let Some(v) = this.elements.pop_front() {
648 unsafe {
651 this.fut.get_mut().sig.reset_send(v);
652 }
653 internal.push_signal(this.fut.get_mut().sig.dynamic_ptr());
655
656 this.in_wait_queue = true;
657 drop(internal);
658 continue;
661 } else {
662 this.finished = true;
664 return Poll::Ready(Ok(()));
665 }
666 }
667 }
668}
669
670#[must_use = "futures do nothing unless you .await or poll them"]
673pub struct DrainIntoBlockingFuture<'a, 'b, T> {
674 internal: &'a Internal<T>,
675 sig: AsyncSignal<T>,
676 vec: &'b mut Vec<T>,
677 _pinned: PhantomPinned,
678}
679
680impl<T> Drop for DrainIntoBlockingFuture<'_, '_, T> {
681 fn drop(&mut self) {
682 let state = self.sig.state();
683 if unlikely(!state.is_done()) {
684 if state.is_pending()
686 && acquire_internal(self.internal).cancel_recv_signal(self.sig.as_tagged_ptr())
687 {
688 return;
690 }
691 if !state.is_unregistered() && self.sig.blocking_wait() {
695 unsafe {
698 if self.internal.capacity() == 0 {
699 self.sig.drop_data();
700 } else {
701 acquire_internal(self.internal)
703 .queue
704 .push_front(self.sig.assume_init())
705 }
706 }
707 }
708 }
709 }
710}
711
712impl<T> Debug for DrainIntoBlockingFuture<'_, '_, T> {
713 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
714 write!(f, "DrainIntoBlockingFuture {{ .. }}")
715 }
716}
717
718impl<'a, 'b, T> DrainIntoBlockingFuture<'a, 'b, T> {
719 #[inline(always)]
720 pub(crate) fn new(internal: &'a Internal<T>, vec: &'b mut Vec<T>) -> Self {
721 Self {
722 sig: AsyncSignal::new_recv(),
723 internal,
724 vec,
725 _pinned: PhantomPinned,
726 }
727 }
728}
729
730impl<T> Future for DrainIntoBlockingFuture<'_, '_, T> {
731 type Output = Result<usize, ReceiveError>;
732
733 #[inline(always)]
734 fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
735 let this = unsafe { self.get_unchecked_mut() };
736
737 match this.sig.state() {
738 FutureState::Unregistered => {
739 let vec_initial_length = this.vec.len();
740 let mut internal = acquire_internal(this.internal);
741
742 if unlikely(internal.recv_count == 0) {
744 this.sig.set_state_relaxed(FutureState::Done);
745 return Poll::Ready(Err(ReceiveError()));
746 }
747
748 let required_cap = internal.queue.len() + {
750 if internal.recv_blocking {
751 0
752 } else {
753 internal.wait_list.len()
754 }
755 };
756 let remaining_cap = this.vec.capacity() - vec_initial_length;
757 if required_cap > remaining_cap {
758 this.vec
759 .reserve(vec_initial_length + required_cap - remaining_cap);
760 }
761
762 this.vec.extend(internal.queue.drain(..));
764
765 while let Some(p) = internal.next_send() {
767 unsafe { this.vec.push(p.recv()) }
769 }
770
771 let count = this.vec.len() - vec_initial_length;
773 if count > 0 {
774 this.sig.set_state_relaxed(FutureState::Done);
775 return Poll::Ready(Ok(count));
776 }
777
778 if unlikely(internal.send_count == 0) {
780 this.sig.set_state_relaxed(FutureState::Done);
781 return Poll::Ready(Err(ReceiveError()));
782 }
783
784 this.sig.set_state(FutureState::Pending);
786 unsafe {
788 this.sig.update_waker(cx.waker());
789 }
790 internal.push_signal(this.sig.dynamic_ptr());
791 drop(internal);
792 Poll::Pending
793 }
794 FutureState::Success => {
795 this.sig.set_state_relaxed(FutureState::Done);
796 this.vec.push(unsafe { this.sig.assume_init() });
798 Poll::Ready(Ok(1))
799 }
800 FutureState::Pending => {
801 let waker = cx.waker();
802 if unsafe { !this.sig.will_wake(waker) } {
804 let internal = acquire_internal(this.internal);
806 if internal.recv_signal_exists(this.sig.as_tagged_ptr()) {
807 unsafe {
810 this.sig.update_waker(waker);
811 }
812 drop(internal);
813 Poll::Pending
814 } else {
815 drop(internal);
816 this.sig.set_state_relaxed(FutureState::Done);
819 if likely(this.sig.blocking_wait()) {
820 this.vec.push(unsafe { this.sig.assume_init() });
822 Poll::Ready(Ok(1))
823 } else {
824 Poll::Ready(Err(ReceiveError()))
825 }
826 }
827 } else {
828 Poll::Pending
829 }
830 }
831 FutureState::Failure => {
832 mark_branch_unlikely();
833 this.sig.set_state_relaxed(FutureState::Done);
834 Poll::Ready(Err(ReceiveError()))
835 }
836 FutureState::Done => {
837 mark_branch_unlikely();
838 panic!("polled after result is already returned")
839 }
840 }
841 }
842}