Skip to main content

kanal_plus/
future.rs

1use crate::{
2    internal::{acquire_internal, Internal},
3    signal::AsyncSignal,
4    AsyncReceiver, ReceiveError, SendError,
5};
6use core::{
7    cell::UnsafeCell,
8    fmt::Debug,
9    marker::PhantomPinned,
10    mem::{transmute, ManuallyDrop},
11    pin::Pin,
12    task::Poll,
13};
14
15use branches::{likely, unlikely};
16use futures_core::{FusedStream, Future, Stream};
17
18#[repr(u8)]
19#[derive(PartialEq, Clone, Copy)]
20pub(crate) enum FutureState {
21    Unregistered,
22    Pending,
23    Success,
24    Failure,
25    Done,
26}
27
28#[cold]
29fn mark_branch_unlikely() {}
30
31#[allow(unused)]
32impl FutureState {
33    #[inline(always)]
34    fn is_pending(&self) -> bool {
35        *self == FutureState::Pending
36    }
37
38    #[inline(always)]
39    fn is_done(&self) -> bool {
40        *self == FutureState::Done
41    }
42
43    #[inline(always)]
44    fn is_unregistered(&self) -> bool {
45        *self == FutureState::Unregistered
46    }
47
48    #[inline(always)]
49    fn is_success(&self) -> bool {
50        *self == FutureState::Success
51    }
52
53    #[inline(always)]
54    fn is_failure(&self) -> bool {
55        *self == FutureState::Failure
56    }
57}
58
59/// SendFuture is a future for sending an object to a channel asynchronously.
60/// It must be polled to complete the send operation.
61#[must_use = "futures do nothing unless you .await or poll them"]
62pub struct SendFuture<'a, T> {
63    internal: &'a Internal<T>,
64    sig: AsyncSignal<T>,
65    _pinned: PhantomPinned,
66}
67
68impl<T> Debug for SendFuture<'_, T> {
69    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
70        write!(f, "SendFuture {{ .. }}")
71    }
72}
73
74impl<T> Drop for SendFuture<'_, T> {
75    fn drop(&mut self) {
76        let state = self.sig.state();
77        if unlikely(!state.is_done()) {
78            // If we are still pending, try to cancel the send operation.
79            // Cancellation succeeds → we still own the data and must drop it.
80            // Otherwise the receiver may already own the payload; we wait for it
81            // to finish before dropping the future.
82            let mut need_drop = true;
83
84            if state.is_pending()
85                && acquire_internal(self.internal).cancel_send_signal(self.sig.as_tagged_ptr())
86            {
87                // Cancellation succeeded – we still own the data.
88                need_drop = true;
89            } else if !state.is_unregistered() && self.sig.blocking_wait() {
90                // A receiver has taken ownership; it will drop the data.
91                need_drop = false;
92            }
93
94            if need_drop {
95                // SAFETY: the payload has never been moved out of the signal.
96                unsafe { self.sig.drop_data() };
97            }
98        }
99    }
100}
101
102impl<'a, T> SendFuture<'a, T> {
103    /// Creates a new SendFuture with the given internal channel and data.
104    #[inline(always)]
105    pub(crate) fn new(internal: &'a Internal<T>, data: T) -> Self {
106        SendFuture {
107            internal,
108            sig: AsyncSignal::new_send(data),
109            _pinned: PhantomPinned,
110        }
111    }
112    #[inline(always)]
113    pub(crate) fn new_finished(internal: &'a Internal<T>) -> Self {
114        SendFuture {
115            internal,
116            sig: AsyncSignal::new_send_finished(),
117            _pinned: PhantomPinned,
118        }
119    }
120}
121
122impl<T> Future for SendFuture<'_, T> {
123    type Output = Result<(), SendError<T>>;
124
125    #[inline(always)]
126    fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
127        let this = unsafe { self.get_unchecked_mut() };
128
129        match this.sig.state() {
130            FutureState::Unregistered => {
131                let cap = this.internal.capacity();
132                let mut internal = acquire_internal(this.internal);
133                if unlikely(internal.recv_count == 0) {
134                    drop(internal);
135                    this.sig.set_state_relaxed(FutureState::Done);
136                    // SAFETY: the data failed to move, we can safely return it to user
137                    unsafe {
138                        return Poll::Ready(Err(SendError(this.sig.assume_init())));
139                    }
140                }
141                if let Some(first) = internal.next_recv() {
142                    drop(internal);
143                    this.sig.set_state_relaxed(FutureState::Done);
144                    // SAFETY: data is inited and available from constructor
145                    unsafe { first.send(this.sig.assume_init()) }
146                    return Poll::Ready(Ok(()));
147                }
148                if cap > 0 && internal.queue.len() < cap {
149                    this.sig.set_state_relaxed(FutureState::Done);
150                    // SAFETY: data is inited and available from constructor
151                    internal.queue.push_back(unsafe { this.sig.assume_init() });
152                    drop(internal);
153                    return Poll::Ready(Ok(()));
154                }
155                this.sig.set_state(FutureState::Pending);
156                // SAFETY: waker is empty, it is safe to init it here
157                unsafe {
158                    this.sig.update_waker(cx.waker());
159                }
160                // send directly to the waitlist
161                internal.push_signal(this.sig.dynamic_ptr());
162                drop(internal);
163                Poll::Pending
164            }
165            FutureState::Success => {
166                this.sig.set_state_relaxed(FutureState::Done);
167                Poll::Ready(Ok(()))
168            }
169            FutureState::Pending => {
170                mark_branch_unlikely();
171                let waker = cx.waker();
172                // SAFETY: signal waker is valid as we inited it in future pending state
173                if unlikely(unsafe { !this.sig.will_wake(waker) }) {
174                    // Waker is changed and we need to update waker in the waiting list
175                    let internal = acquire_internal(this.internal);
176                    if internal.send_signal_exists(this.sig.as_tagged_ptr()) {
177                        // SAFETY: signal is not shared with other thread yet so it's safe to
178                        // update waker locally
179                        unsafe {
180                            this.sig.update_waker(waker);
181                        }
182                        drop(internal);
183                        return Poll::Pending;
184                    }
185                    drop(internal);
186                    // signal is already shared, and data will be available shortly, so wait
187                    // synchronously and return the result note:
188                    // it's not possible safely to update waker after the signal is shared,
189                    // but we know data will be ready shortly,
190                    //   we can wait synchronously and receive it.
191                    this.sig.set_state(FutureState::Done);
192                    if likely(this.sig.blocking_wait()) {
193                        return Poll::Ready(Ok(()));
194                    }
195                    // the data failed to move, we can safely return it to user
196                    Poll::Ready(Err(SendError(unsafe { this.sig.assume_init() })))
197                } else {
198                    Poll::Pending
199                }
200            }
201            FutureState::Failure => {
202                mark_branch_unlikely();
203                this.sig.set_state_relaxed(FutureState::Done);
204                // SAFETY: the data failed to move, we can safely return it to user
205                Poll::Ready(Err(SendError(unsafe { this.sig.assume_init() })))
206            }
207            FutureState::Done => {
208                mark_branch_unlikely();
209                panic!("polled after result is already returned")
210            }
211        }
212    }
213}
214
215/// ReceiveFuture is a future for receiving an object from a channel
216/// asynchronously. It must be polled to complete the receive operation.
217#[must_use = "futures do nothing unless you .await or poll them"]
218pub struct ReceiveFuture<'a, T> {
219    is_stream: bool,
220    internal: &'a Internal<T>,
221    sig: AsyncSignal<T>,
222    _pinned: PhantomPinned,
223}
224
225impl<T> Drop for ReceiveFuture<'_, T> {
226    fn drop(&mut self) {
227        let state = self.sig.state();
228        if unlikely(!state.is_done()) {
229            // try to cancel the signal if we are still waiting
230            if state.is_pending()
231                && acquire_internal(self.internal).cancel_recv_signal(self.sig.as_tagged_ptr())
232            {
233                // signal canceled
234                return;
235            }
236            // we failed to cancel the signal,
237            // either it is unregistered or a sender got signal ownership, receiver should
238            // wait until the response
239            if !state.is_unregistered() && self.sig.blocking_wait() {
240                // got ownership of data that is not going to be used ever again, so drop it
241                // this is actually a bug in user code but we should handle it gracefully
242                // and we warn user in debug mode
243                // SAFETY: data is not moved it's safe to drop it or put it back to the channel
244                // queue
245                unsafe {
246                    if self.internal.capacity() == 0 {
247                        #[cfg(debug_assertions)]
248                        println!(
249                            "warning: ReceiveFuture dropped while send operation is in progress"
250                        );
251                        self.sig.drop_data();
252                    } else {
253                        // fallback: push it back to the channel queue
254                        acquire_internal(self.internal)
255                            .queue
256                            .push_front(self.sig.assume_init())
257                    }
258                }
259            }
260        }
261    }
262}
263
264impl<T> Debug for ReceiveFuture<'_, T> {
265    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
266        write!(f, "ReceiveFuture {{ .. }}")
267    }
268}
269
270impl<'a, T> ReceiveFuture<'a, T> {
271    #[inline(always)]
272    pub(crate) fn new_ref(internal: &'a Internal<T>) -> Self {
273        Self {
274            sig: AsyncSignal::new_recv(),
275            internal,
276            is_stream: false,
277            _pinned: PhantomPinned,
278        }
279    }
280}
281
282impl<T> Future for ReceiveFuture<'_, T> {
283    type Output = Result<T, ReceiveError>;
284
285    #[inline(always)]
286    fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
287        let this = unsafe { self.get_unchecked_mut() };
288
289        loop {
290            return match this.sig.state() {
291                FutureState::Unregistered => {
292                    let cap = this.internal.capacity();
293                    let mut internal = acquire_internal(this.internal);
294                    if unlikely(internal.recv_count == 0) {
295                        drop(internal);
296                        this.sig.set_state_relaxed(FutureState::Done);
297                        return Poll::Ready(Err(ReceiveError()));
298                    }
299                    if cap > 0 {
300                        if let Some(v) = internal.queue.pop_front() {
301                            if let Some(t) = internal.next_send() {
302                                // if there is a sender take its data and push it into the queue
303                                unsafe { internal.queue.push_back(t.recv()) }
304                            }
305                            drop(internal);
306                            this.sig.set_state_relaxed(FutureState::Done);
307                            return Poll::Ready(Ok(v));
308                        }
309                    }
310                    if let Some(t) = internal.next_send() {
311                        drop(internal);
312                        this.sig.set_state_relaxed(FutureState::Done);
313                        return Poll::Ready(Ok(unsafe { t.recv() }));
314                    }
315                    if unlikely(internal.send_count == 0) {
316                        this.sig.set_state_relaxed(FutureState::Done);
317                        return Poll::Ready(Err(ReceiveError()));
318                    }
319                    this.sig.set_state(FutureState::Pending);
320                    // SAFETY: waker is NOOP and not shared yet, it is safe to init it here
321                    unsafe {
322                        this.sig.update_waker(cx.waker());
323                    }
324                    // no active waiter so push to the queue
325                    internal.push_signal(this.sig.dynamic_ptr());
326                    drop(internal);
327                    Poll::Pending
328                }
329                FutureState::Success => {
330                    this.sig.set_state_relaxed(FutureState::Done);
331                    // SAFETY: data is received and safe to read
332                    Poll::Ready(Ok(unsafe { this.sig.assume_init() }))
333                }
334                FutureState::Pending => {
335                    let waker = cx.waker();
336                    // SAFETY: signal waker is valid as we inited it in future pending state
337                    if unsafe { !this.sig.will_wake(waker) } {
338                        // the Waker is changed and we need to update waker in the waiting
339                        // list
340                        let internal = acquire_internal(this.internal);
341                        if internal.recv_signal_exists(this.sig.as_tagged_ptr()) {
342                            // SAFETY: signal is not shared with other thread yet so it's safe to
343                            // update waker locally
344                            unsafe {
345                                this.sig.update_waker(waker);
346                            }
347                            drop(internal);
348                            Poll::Pending
349                        } else {
350                            drop(internal);
351                            // the signal is already shared, and data will be available shortly,
352                            // so wait synchronously and return the result
353                            // note: it's not possible safely to update waker after the signal
354                            // is shared, but we know data will be ready shortly,
355                            //   we can wait synchronously and receive it.
356                            this.sig.set_state_relaxed(FutureState::Done);
357                            if likely(this.sig.blocking_wait()) {
358                                // SAFETY: data is received and safe to read
359                                Poll::Ready(Ok(unsafe { this.sig.assume_init() }))
360                            } else {
361                                Poll::Ready(Err(ReceiveError()))
362                            }
363                        }
364                    } else {
365                        Poll::Pending
366                    }
367                }
368                FutureState::Failure => {
369                    mark_branch_unlikely();
370                    this.sig.set_state_relaxed(FutureState::Done);
371                    Poll::Ready(Err(ReceiveError()))
372                }
373                FutureState::Done => {
374                    mark_branch_unlikely();
375                    if this.is_stream {
376                        this.sig.set_state_relaxed(FutureState::Unregistered);
377                        continue;
378                    }
379                    panic!("polled after result is already returned")
380                }
381            };
382        }
383    }
384}
385
386/// ReceiveStream is a stream for receiving objects from a channel
387/// asynchronously.
388pub struct ReceiveStream<'a, T: 'a> {
389    future: Pin<Box<ReceiveFuture<'a, T>>>,
390    terminated: bool,
391    receiver: &'a AsyncReceiver<T>,
392}
393
394impl<T> Debug for ReceiveStream<'_, T> {
395    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
396        write!(f, "ReceiveStream {{ .. }}")
397    }
398}
399
400impl<T> Stream for ReceiveStream<'_, T> {
401    type Item = T;
402
403    fn poll_next(
404        mut self: Pin<&mut Self>,
405        cx: &mut core::task::Context<'_>,
406    ) -> Poll<Option<Self::Item>> {
407        if unlikely(self.terminated) {
408            return Poll::Ready(None);
409        }
410        // SAFETY: future is pinned as stream is pinned to a location too
411        match self.future.as_mut().poll(cx) {
412            Poll::Ready(res) => match res {
413                Ok(d) => Poll::Ready(Some(d)),
414                Err(_) => {
415                    mark_branch_unlikely();
416                    self.terminated = true;
417                    Poll::Ready(None)
418                }
419            },
420            Poll::Pending => Poll::Pending,
421        }
422    }
423}
424
425impl<T> FusedStream for ReceiveStream<'_, T> {
426    fn is_terminated(&self) -> bool {
427        self.receiver.is_terminated()
428    }
429}
430
431impl<'a, T> ReceiveStream<'a, T> {
432    pub(crate) fn new_borrowed(receiver: &'a AsyncReceiver<T>) -> Self {
433        let mut future = receiver.recv();
434        future.is_stream = true;
435        ReceiveStream {
436            future: Box::pin(future),
437            terminated: false,
438            receiver,
439        }
440    }
441}
442
443/// ReceiveStreamOwned is a stream for receiving objects from a channel
444/// asynchronously and owns the receiver.
445pub struct ReceiveStreamOwned<T: 'static> {
446    future: ManuallyDrop<Pin<Box<ReceiveFuture<'static, T>>>>,
447    terminated: bool,
448    receiver: ManuallyDrop<Pin<Box<AsyncReceiver<T>>>>,
449}
450
451impl<T: 'static> Debug for ReceiveStreamOwned<T> {
452    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
453        write!(f, "ReceiveStreamOwned {{ .. }}")
454    }
455}
456
457impl<T: 'static> Stream for ReceiveStreamOwned<T> {
458    type Item = T;
459
460    fn poll_next(
461        mut self: Pin<&mut Self>,
462        cx: &mut core::task::Context<'_>,
463    ) -> Poll<Option<Self::Item>> {
464        if unlikely(self.terminated) {
465            return Poll::Ready(None);
466        }
467        // SAFETY: future is pinned as stream is pinned to a location too
468        // `future` is initialized and pinned for the lifetime of the stream.
469        let future = &mut *self.future;
470        match future.as_mut().poll(cx) {
471            Poll::Ready(res) => match res {
472                Ok(d) => Poll::Ready(Some(d)),
473                Err(_) => {
474                    mark_branch_unlikely();
475                    self.terminated = true;
476                    Poll::Ready(None)
477                }
478            },
479            Poll::Pending => Poll::Pending,
480        }
481    }
482}
483
484impl<T: 'static> FusedStream for ReceiveStreamOwned<T> {
485    fn is_terminated(&self) -> bool {
486        // Receiver is alive while the stream exists.
487        (&*self.receiver).is_terminated()
488    }
489}
490
491impl<T: 'static> ReceiveStreamOwned<T> {
492    pub(crate) fn new(receiver: AsyncReceiver<T>) -> Self {
493        let receiver = Box::pin(receiver);
494        let mut future = ReceiveFuture::new_ref(&receiver.internal);
495        future.is_stream = true;
496        let future = unsafe {
497            // Safety: `receiver` is pinned and will not be moved for the lifetime
498            // of `future`, so extending the lifetime is safe here.
499            transmute::<Pin<Box<ReceiveFuture<'_, T>>>, Pin<Box<ReceiveFuture<'static, T>>>>(
500                Box::pin(future),
501            )
502        };
503        ReceiveStreamOwned {
504            future: ManuallyDrop::new(future),
505            terminated: false,
506            receiver: ManuallyDrop::new(receiver),
507        }
508    }
509}
510
511impl<T: 'static> Drop for ReceiveStreamOwned<T> {
512    fn drop(&mut self) {
513        // Ensure the future (which borrows the receiver) is dropped before the receiver.
514        unsafe {
515            ManuallyDrop::drop(&mut self.future);
516            ManuallyDrop::drop(&mut self.receiver);
517        }
518    }
519}
520
521/// SendManyFuture is a future for sending multiple objects to a channel
522/// asynchronously. It must be polled to complete the send operation.
523#[must_use = "futures do nothing unless you .await or poll them"]
524#[derive(Debug)]
525pub struct SendManyFuture<'a, 'b, T> {
526    internal: &'a Internal<T>,
527    // This is a UnsafeCell, because it can be shared in WaitQueue of the channel
528    fut: UnsafeCell<SendFuture<'a, T>>,
529    // Elements that we are writing to the channel
530    elements: &'b mut std::collections::VecDeque<T>,
531    // Future is finished and no longer should be polled
532    finished: bool,
533    // If set we are in wait queue and fut shall not be used as mutable
534    in_wait_queue: bool,
535    _pinned: PhantomPinned,
536}
537
538impl<'a, 'b, T> SendManyFuture<'a, 'b, T> {
539    #[inline(always)]
540    pub(crate) fn new(
541        internal: &'a Internal<T>,
542        elements: &'b mut std::collections::VecDeque<T>,
543    ) -> Self {
544        SendManyFuture {
545            internal,
546            fut: UnsafeCell::new(SendFuture::new_finished(internal)),
547            elements,
548            finished: false,
549            in_wait_queue: false,
550            _pinned: PhantomPinned,
551        }
552    }
553}
554
555impl<'a, 'b, T> Future for SendManyFuture<'a, 'b, T> {
556    type Output = Result<(), SendError<T>>;
557
558    #[inline(always)]
559    fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
560        let this = unsafe { self.get_unchecked_mut() };
561
562        loop {
563            if unlikely(this.finished) {
564                panic!("polled after completion");
565            }
566
567            if this.in_wait_queue {
568                // SAFETY: this is pinned therefore it's safe to create a pinned reference
569                let fut = unsafe { Pin::new_unchecked(this.fut.get_mut()) };
570
571                match fut.poll(cx) {
572                    Poll::Ready(res) => {
573                        if this.elements.is_empty() {
574                            this.finished = true;
575                            return Poll::Ready(res);
576                        }
577                    }
578                    Poll::Pending => return Poll::Pending,
579                }
580                this.in_wait_queue = false;
581            }
582
583            // If there is nothing to send we are done.
584            if unlikely(this.elements.is_empty()) {
585                this.finished = true;
586                return Poll::Ready(Ok(()));
587            }
588
589            // Channel capacity
590            let cap = this.internal.capacity();
591
592            // Acquire a mutable reference to the internal state.
593            let mut internal = acquire_internal(this.internal);
594
595            // Channel is closed from the other side
596            if unlikely(internal.recv_count == 0) {
597                // Return the first element that could not be sent.
598                let first = this.elements.pop_front().unwrap();
599                drop(internal);
600                this.finished = true;
601                return Poll::Ready(Err(SendError(first)));
602            }
603
604            // -----------------------------------------------------------------
605            // 1) Deliver directly to waiting receivers.
606            // -----------------------------------------------------------------
607            while let Some(waiter) = internal.next_recv() {
608                let v = this.elements.pop_front().unwrap();
609                // SAFETY: it is safe to send an owned waiter once
610                unsafe {
611                    waiter.send(v);
612                }
613                if unlikely(this.elements.is_empty()) {
614                    // No more elements to send.
615                    drop(internal);
616                    this.finished = true;
617                    return Poll::Ready(Ok(()));
618                }
619            }
620
621            if unlikely(this.elements.is_empty()) {
622                // Nothing left to send.
623                drop(internal);
624                this.finished = true;
625                return Poll::Ready(Ok(()));
626            }
627
628            // -----------------------------------------------------------------
629            // 2) Fill the channel’s queue (if it has a bounded/unbounded capacity).
630            // -----------------------------------------------------------------
631            if cap > 0 {
632                while internal.queue.len() < cap {
633                    if let Some(v) = this.elements.pop_front() {
634                        internal.queue.push_back(v);
635                    } else {
636                        // All elements have been queued.
637                        drop(internal);
638                        this.finished = true;
639                        return Poll::Ready(Ok(()));
640                    }
641                }
642            }
643
644            // -----------------------------------------------------------------
645            // 3) If there are still elements, send the next one via a signal
646            // -----------------------------------------------------------------
647            if let Some(v) = this.elements.pop_front() {
648                // SAFETY: we checked above and we are not in any wait queue as we are not
649                // registered in the queue yet.
650                unsafe {
651                    this.fut.get_mut().sig.reset_send(v);
652                }
653                // This is pinned therefore this.fut is also pinned.
654                internal.push_signal(this.fut.get_mut().sig.dynamic_ptr());
655
656                this.in_wait_queue = true;
657                drop(internal);
658                // go poll the future to register the waker or return early if message already
659                // arrived
660                continue;
661            } else {
662                // No more elements left.
663                this.finished = true;
664                return Poll::Ready(Ok(()));
665            }
666        }
667    }
668}
669
670/// DrainIntoBlockingFuture is a future for draining all available messages from a channel
671/// into a vector, blocking until at least one message is received.
672#[must_use = "futures do nothing unless you .await or poll them"]
673pub struct DrainIntoBlockingFuture<'a, 'b, T> {
674    internal: &'a Internal<T>,
675    sig: AsyncSignal<T>,
676    vec: &'b mut Vec<T>,
677    _pinned: PhantomPinned,
678}
679
680impl<T> Drop for DrainIntoBlockingFuture<'_, '_, T> {
681    fn drop(&mut self) {
682        let state = self.sig.state();
683        if unlikely(!state.is_done()) {
684            // try to cancel the signal if we are still waiting
685            if state.is_pending()
686                && acquire_internal(self.internal).cancel_recv_signal(self.sig.as_tagged_ptr())
687            {
688                // signal canceled
689                return;
690            }
691            // we failed to cancel the signal,
692            // either it is unregistered or a sender got signal ownership, receiver should
693            // wait until the response
694            if !state.is_unregistered() && self.sig.blocking_wait() {
695                // got ownership of data that is not going to be used ever again, so drop it
696                // SAFETY: data is not moved it's safe to drop it or put it back to the channel queue
697                unsafe {
698                    if self.internal.capacity() == 0 {
699                        self.sig.drop_data();
700                    } else {
701                        // fallback: push it back to the channel queue
702                        acquire_internal(self.internal)
703                            .queue
704                            .push_front(self.sig.assume_init())
705                    }
706                }
707            }
708        }
709    }
710}
711
712impl<T> Debug for DrainIntoBlockingFuture<'_, '_, T> {
713    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
714        write!(f, "DrainIntoBlockingFuture {{ .. }}")
715    }
716}
717
718impl<'a, 'b, T> DrainIntoBlockingFuture<'a, 'b, T> {
719    #[inline(always)]
720    pub(crate) fn new(internal: &'a Internal<T>, vec: &'b mut Vec<T>) -> Self {
721        Self {
722            sig: AsyncSignal::new_recv(),
723            internal,
724            vec,
725            _pinned: PhantomPinned,
726        }
727    }
728}
729
730impl<T> Future for DrainIntoBlockingFuture<'_, '_, T> {
731    type Output = Result<usize, ReceiveError>;
732
733    #[inline(always)]
734    fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
735        let this = unsafe { self.get_unchecked_mut() };
736
737        match this.sig.state() {
738            FutureState::Unregistered => {
739                let vec_initial_length = this.vec.len();
740                let mut internal = acquire_internal(this.internal);
741
742                // Check if channel is closed
743                if unlikely(internal.recv_count == 0) {
744                    this.sig.set_state_relaxed(FutureState::Done);
745                    return Poll::Ready(Err(ReceiveError()));
746                }
747
748                // Calculate required capacity and reserve
749                let required_cap = internal.queue.len() + {
750                    if internal.recv_blocking {
751                        0
752                    } else {
753                        internal.wait_list.len()
754                    }
755                };
756                let remaining_cap = this.vec.capacity() - vec_initial_length;
757                if required_cap > remaining_cap {
758                    this.vec
759                        .reserve(vec_initial_length + required_cap - remaining_cap);
760                }
761
762                // Drain queue
763                this.vec.extend(internal.queue.drain(..));
764
765                // Drain wait_list send signals
766                while let Some(p) = internal.next_send() {
767                    // SAFETY: it's safe to receive from owned signal once
768                    unsafe { this.vec.push(p.recv()) }
769                }
770
771                // If got data, return immediately
772                let count = this.vec.len() - vec_initial_length;
773                if count > 0 {
774                    this.sig.set_state_relaxed(FutureState::Done);
775                    return Poll::Ready(Ok(count));
776                }
777
778                // No data, check if there are still senders
779                if unlikely(internal.send_count == 0) {
780                    this.sig.set_state_relaxed(FutureState::Done);
781                    return Poll::Ready(Err(ReceiveError()));
782                }
783
784                // Register signal and wait
785                this.sig.set_state(FutureState::Pending);
786                // SAFETY: waker is NOOP and not shared yet, it is safe to init it here
787                unsafe {
788                    this.sig.update_waker(cx.waker());
789                }
790                internal.push_signal(this.sig.dynamic_ptr());
791                drop(internal);
792                Poll::Pending
793            }
794            FutureState::Success => {
795                this.sig.set_state_relaxed(FutureState::Done);
796                // SAFETY: data is received and safe to read
797                this.vec.push(unsafe { this.sig.assume_init() });
798                Poll::Ready(Ok(1))
799            }
800            FutureState::Pending => {
801                let waker = cx.waker();
802                // SAFETY: signal waker is valid as we inited it in future pending state
803                if unsafe { !this.sig.will_wake(waker) } {
804                    // the Waker is changed and we need to update waker in the waiting list
805                    let internal = acquire_internal(this.internal);
806                    if internal.recv_signal_exists(this.sig.as_tagged_ptr()) {
807                        // SAFETY: signal is not shared with other thread yet so it's safe to
808                        // update waker locally
809                        unsafe {
810                            this.sig.update_waker(waker);
811                        }
812                        drop(internal);
813                        Poll::Pending
814                    } else {
815                        drop(internal);
816                        // the signal is already shared, and data will be available shortly,
817                        // so wait synchronously and return the result
818                        this.sig.set_state_relaxed(FutureState::Done);
819                        if likely(this.sig.blocking_wait()) {
820                            // SAFETY: data is received and safe to read
821                            this.vec.push(unsafe { this.sig.assume_init() });
822                            Poll::Ready(Ok(1))
823                        } else {
824                            Poll::Ready(Err(ReceiveError()))
825                        }
826                    }
827                } else {
828                    Poll::Pending
829                }
830            }
831            FutureState::Failure => {
832                mark_branch_unlikely();
833                this.sig.set_state_relaxed(FutureState::Done);
834                Poll::Ready(Err(ReceiveError()))
835            }
836            FutureState::Done => {
837                mark_branch_unlikely();
838                panic!("polled after result is already returned")
839            }
840        }
841    }
842}