batch_channel/
lib.rs

1#![doc = include_str!("../README.md")]
2#![doc = include_str!("example.md")]
3
4use mutex::PinnedCondvar as Condvar;
5use mutex::PinnedMutex as Mutex;
6use mutex::PinnedMutexGuard as MutexGuard;
7use pin_project::pin_project;
8use pin_project::pinned_drop;
9#[cfg(feature = "parking_lot")]
10use pinned_mutex::parking_lot as mutex;
11#[cfg(not(feature = "parking_lot"))]
12use pinned_mutex::std as mutex;
13use std::cmp::min;
14use std::collections::VecDeque;
15use std::fmt;
16use std::future::Future;
17use std::iter::Peekable;
18use std::ops::AsyncFnOnce;
19use std::pin::Pin;
20use std::sync::OnceLock;
21use std::task::Context;
22use std::task::Poll;
23use wakerset::ExtractedWakers;
24use wakerset::WakerList;
25use wakerset::WakerSlot;
26
27const UNBOUNDED_CAPACITY: usize = usize::MAX;
28
29macro_rules! derive_clone {
30    ($t:ident) => {
31        impl<T> Clone for $t<T> {
32            fn clone(&self) -> Self {
33                Self {
34                    core: self.core.clone(),
35                }
36            }
37        }
38    };
39}
40
41#[derive(Debug)]
42#[pin_project]
43struct StateBase {
44    capacity: usize,
45    closed: bool,
46    #[pin]
47    tx_wakers: WakerList,
48    #[pin]
49    rx_wakers: WakerList,
50}
51
52impl StateBase {
53    fn target_capacity(&self) -> usize {
54        // TODO: We could offer an option to use queue.capacity
55        // instead.
56        self.capacity
57    }
58
59    fn pending_tx<T>(
60        self: Pin<&mut StateBase>,
61        slot: Pin<&mut WakerSlot>,
62        cx: &mut Context,
63    ) -> Poll<T> {
64        // This may allocate, but only when the sender is about to
65        // block, which is already expensive.
66        self.project().tx_wakers.link(slot, cx.waker().clone());
67        Poll::Pending
68    }
69
70    fn pending_rx<T>(
71        self: Pin<&mut StateBase>,
72        slot: Pin<&mut WakerSlot>,
73        cx: &mut Context,
74    ) -> Poll<T> {
75        // This may allocate, but only when the receiver is about to
76        // block, which is already expensive.
77        self.project().rx_wakers.link(slot, cx.waker().clone());
78        Poll::Pending
79    }
80}
81
82#[derive(Debug)]
83#[pin_project]
84struct State<T> {
85    #[pin]
86    base: StateBase,
87    queue: VecDeque<T>,
88}
89
90impl<T> State<T> {
91    fn has_capacity(&self) -> bool {
92        self.queue.len() < self.target_capacity()
93    }
94
95    fn base(self: Pin<&mut Self>) -> Pin<&mut StateBase> {
96        self.project().base
97    }
98}
99
100impl<T> std::ops::Deref for State<T> {
101    type Target = StateBase;
102
103    fn deref(&self) -> &Self::Target {
104        &self.base
105    }
106}
107
108impl<T> std::ops::DerefMut for State<T> {
109    fn deref_mut(&mut self) -> &mut Self::Target {
110        &mut self.base
111    }
112}
113
114#[derive(Debug)]
115#[pin_project]
116struct Core<T> {
117    #[pin]
118    state: Mutex<State<T>>,
119    // OnceLock ensures Core is Sync and Arc<Core> is Send. But it is
120    // not strictly necessary, as these condition variables are only
121    // accessed while the lock is held. Alas, Rust does not allow
122    // Condvar to be stored under the Mutex.
123    not_empty: OnceLock<Condvar>,
124    not_full: OnceLock<Condvar>,
125}
126
127impl<T> Core<T> {
128    /// Returns when there is a value or there are no values and all
129    /// senders are dropped.
130    fn block_until_not_empty(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
131        fn condition<T>(s: Pin<&mut State<T>>) -> bool {
132            !s.closed && s.queue.is_empty()
133        }
134
135        let mut state = self.project_ref().state.lock();
136        if !condition(state.as_mut()) {
137            return state;
138        }
139        // Initialize the condvar while the lock is held. Thus, the
140        // caller can, while the lock is held, check whether the
141        // condvar must be notified.
142        let not_empty = self.not_empty.get_or_init(Default::default);
143        not_empty.wait_while(state, condition)
144    }
145
146    /// Returns when there is either room in the queue or all receivers
147    /// are dropped.
148    fn block_until_not_full(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
149        fn condition<T>(s: Pin<&mut State<T>>) -> bool {
150            !s.closed && !s.has_capacity()
151        }
152
153        let mut state = self.project_ref().state.lock();
154        if !condition(state.as_mut()) {
155            return state;
156        }
157        // Initialize the condvar while the lock is held. Thus, the
158        // caller can, while the lock is held, check whether the
159        // condvar must be notified.
160        let not_full = self.not_full.get_or_init(Default::default);
161        not_full.wait_while(state, condition)
162    }
163
164    /// Returns when there is either room in the queue or all receivers
165    /// are dropped.
166    fn wake_rx_and_block_while_full<'a>(
167        self: Pin<&'a Self>,
168        mut state: MutexGuard<'a, State<T>>,
169    ) -> MutexGuard<'a, State<T>> {
170        // The lock is held. Therefore, we know whether a Condvar must
171        // be notified or not.
172        let cvar = self.not_empty.get();
173
174        // We should not wake Wakers while a lock is held. Therefore,
175        // we must release the lock and reacquire it to wait.
176        let round = state
177            .as_mut()
178            .project()
179            .base
180            .project()
181            .rx_wakers
182            .begin_extraction();
183        let mut wakers = ExtractedWakers::new();
184        // There is no guarantee that the highest-priority waker will
185        // actually call poll() again. Therefore, the best we can do
186        // is wake everyone.
187        loop {
188            let more = state
189                .as_mut()
190                .project()
191                .base
192                .project()
193                .rx_wakers
194                .extract_some_wakers(round, &mut wakers);
195            drop(state);
196            wakers.wake_all();
197            if !more {
198                break;
199            }
200            state = self.project_ref().state.lock();
201        }
202
203        // TODO: Avoid unlocking and locking again when there's no
204        // waker or condition variable.
205
206        if let Some(cvar) = cvar {
207            // TODO: There are situations where we may know that we
208            // can get away with notify_one().
209            cvar.notify_all();
210        }
211
212        state = self.project_ref().state.lock();
213
214        // Initialize the condvar while the lock is held. Thus, the
215        // caller can, while the lock is held, check whether the
216        // condvar must be notified.
217        let not_full = self.not_full.get_or_init(Default::default);
218        not_full.wait_while(state, |s| !s.closed && !s.has_capacity())
219    }
220
221    fn wake_all_tx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
222        // The lock is held. Therefore, we know whether a Condvar must be notified or not.
223        let cvar = self.not_full.get();
224
225        if state.as_mut().project().base.project().tx_wakers.is_empty() {
226            drop(state);
227        } else {
228            let round = state
229                .as_mut()
230                .project()
231                .base
232                .project()
233                .tx_wakers
234                .begin_extraction();
235            let mut wakers = ExtractedWakers::new();
236            loop {
237                let more = state
238                    .as_mut()
239                    .project()
240                    .base
241                    .project()
242                    .tx_wakers
243                    .extract_some_wakers(round, &mut wakers);
244                drop(state);
245                wakers.wake_all();
246                if !more {
247                    break;
248                }
249                state = self.project_ref().state.lock();
250            }
251        }
252
253        if let Some(cvar) = cvar {
254            // TODO: There are situations where we may know that we
255            // can get away with notify_one().
256            cvar.notify_all();
257        }
258    }
259
260    fn wake_one_rx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
261        // The lock is held. Therefore, we know whether a Condvar must be notified or not.
262        let cvar = self.not_empty.get();
263        if state.as_mut().project().base.project().rx_wakers.is_empty() {
264            drop(state);
265        } else {
266            let round = state
267                .as_mut()
268                .project()
269                .base
270                .project()
271                .rx_wakers
272                .begin_extraction();
273            let mut wakers = ExtractedWakers::new();
274            // There is no guarantee that the highest-priority waker will
275            // actually call poll() again. Therefore, the best we can do
276            // is wake everyone.
277            loop {
278                let more = state
279                    .as_mut()
280                    .project()
281                    .base
282                    .project()
283                    .rx_wakers
284                    .extract_some_wakers(round, &mut wakers);
285                drop(state);
286                wakers.wake_all();
287                if !more {
288                    break;
289                }
290                state = self.project_ref().state.lock();
291            }
292        }
293
294        if let Some(cvar) = cvar {
295            cvar.notify_one();
296        }
297    }
298
299    fn wake_all_rx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
300        // The lock is held. Therefore, we know whether a Condvar must be notified or not.
301        let cvar = self.not_empty.get();
302        let round = state
303            .as_mut()
304            .project()
305            .base
306            .project()
307            .rx_wakers
308            .begin_extraction();
309        let mut wakers = ExtractedWakers::new();
310        // There is no guarantee that the highest-priority waker will
311        // actually call poll() again. Therefore, the best we can do
312        // is wake everyone.
313        loop {
314            let more = state
315                .as_mut()
316                .project()
317                .base
318                .project()
319                .rx_wakers
320                .extract_some_wakers(round, &mut wakers);
321            drop(state);
322            wakers.wake_all();
323            if !more {
324                break;
325            }
326            state = self.project_ref().state.lock();
327        }
328
329        if let Some(cvar) = cvar {
330            cvar.notify_all();
331        }
332    }
333}
334
335impl<T> splitrc::Notify for Core<T> {
336    fn last_tx_did_drop_pinned(self: Pin<&Self>) {
337        let mut state = self.project_ref().state.lock();
338        *state.as_mut().base().project().closed = true;
339        // We cannot deallocate the queue, as remaining receivers can
340        // drain it.
341        self.wake_all_rx(state);
342    }
343
344    fn last_rx_did_drop_pinned(self: Pin<&Self>) {
345        let mut state = self.project_ref().state.lock();
346        *state.as_mut().base().project().closed = true;
347        // TODO: deallocate
348        state.as_mut().project().queue.clear();
349        self.wake_all_tx(state);
350    }
351}
352
353// SendError
354
355/// An error returned from [Sender::send] when all [Receiver]s are
356/// dropped.
357///
358/// The unsent value is returned.
359#[derive(Clone, Copy, Debug, Eq, PartialEq)]
360pub struct SendError<T>(pub T);
361
362impl<T> fmt::Display for SendError<T> {
363    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
364        write!(f, "failed to send value on channel")
365    }
366}
367
368impl<T: fmt::Debug> std::error::Error for SendError<T> {}
369
370// SyncSender
371
372/// The sending half of a channel.
373#[derive(Debug)]
374pub struct SyncSender<T> {
375    core: Pin<splitrc::Tx<Core<T>>>,
376}
377
378derive_clone!(SyncSender);
379
380impl<T> SyncSender<T> {
381    /// Converts `SyncSender` to asynchronous `Sender`.
382    pub fn into_async(self) -> Sender<T> {
383        Sender { core: self.core }
384    }
385
386    /// Send a single value.
387    ///
388    /// Returns [SendError] if all receivers are dropped.
389    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
390        let mut state = self.core.as_ref().block_until_not_full();
391        if state.closed {
392            assert!(state.as_ref().project_ref().queue.is_empty());
393            return Err(SendError(value));
394        }
395
396        state.as_mut().project().queue.push_back(value);
397
398        self.core.as_ref().wake_one_rx(state);
399        Ok(())
400    }
401
402    /// Send multiple values.
403    ///
404    /// If all receivers are dropped, SendError is returned. The
405    /// values cannot be returned, as they may have been partially
406    /// sent when the channel is closed.
407    pub fn send_iter<I>(&self, values: I) -> Result<(), SendError<()>>
408    where
409        I: IntoIterator<Item = T>,
410    {
411        let mut values = values.into_iter();
412
413        // If the iterator is empty, we can avoid acquiring the lock.
414        let Some(mut value) = values.next() else {
415            return Ok(());
416        };
417
418        let mut sent_count = 0usize;
419
420        let mut state = self.core.as_ref().block_until_not_full();
421        'outer: loop {
422            if state.closed {
423                // We may have sent some values, but the receivers are
424                // all dropped, and that cleared the queue.
425                assert!(state.queue.is_empty());
426                return Err(SendError(()));
427            }
428
429            debug_assert!(state.has_capacity());
430            state.as_mut().project().queue.push_back(value);
431            sent_count += 1;
432            loop {
433                match values.next() {
434                    Some(v) => {
435                        if state.has_capacity() {
436                            state.as_mut().project().queue.push_back(v);
437                            sent_count += 1;
438                        } else {
439                            value = v;
440                            // We're about to block, but we know we
441                            // sent at least one value, so wake any
442                            // waiters.
443                            state = self.core.as_ref().wake_rx_and_block_while_full(state);
444                            continue 'outer;
445                        }
446                    }
447                    None => {
448                        // Done pulling from the iterator and know we
449                        // sent at least one value.
450                        if sent_count == 1 {
451                            self.core.as_ref().wake_one_rx(state);
452                        } else {
453                            self.core.as_ref().wake_all_rx(state);
454                        }
455                        return Ok(());
456                    }
457                }
458            }
459        }
460    }
461
462    /// Drain a [Vec] into the channel without deallocating it.
463    ///
464    /// This is a convenience method for allocation-free batched
465    /// sends. The `values` vector is drained, and then returned with
466    /// the same capacity it had.
467    pub fn send_vec(&self, mut values: Vec<T>) -> Result<Vec<T>, SendError<Vec<T>>> {
468        match self.send_iter(values.drain(..)) {
469            Ok(_) => Ok(values),
470            Err(_) => Err(SendError(values)),
471        }
472    }
473
474    /// Automatically accumulate sends into a buffer of size `batch_limit`
475    /// and send when full.
476    pub fn autobatch<'a, F, R>(&'a mut self, batch_limit: usize, f: F) -> Result<R, SendError<()>>
477    where
478        F: (FnOnce(&mut SyncBatchSender<'a, T>) -> Result<R, SendError<()>>),
479    {
480        let mut tx = SyncBatchSender {
481            sender: self,
482            capacity: batch_limit,
483            buffer: Vec::with_capacity(batch_limit),
484        };
485        let r = f(&mut tx)?;
486        tx.drain()?;
487        Ok(r)
488    }
489}
490
491// SyncBatchSender
492
493/// Automatically batches up values and sends them when a batch is full.
494#[derive(Debug)]
495pub struct SyncBatchSender<'a, T> {
496    sender: &'a mut SyncSender<T>,
497    capacity: usize,
498    buffer: Vec<T>,
499}
500
501impl<T> SyncBatchSender<'_, T> {
502    /// Buffers a single value to be sent on the channel.
503    ///
504    /// Sends the batch if the buffer is full.
505    pub fn send(&mut self, value: T) -> Result<(), SendError<()>> {
506        self.buffer.push(value);
507        // TODO: consider using the full capacity if Vec overallocated.
508        if self.buffer.len() == self.capacity {
509            self.drain()
510        } else {
511            Ok(())
512        }
513    }
514
515    /// Buffers multiple values, sending batches as the internal
516    /// buffer reaches capacity.
517    pub fn send_iter<I: IntoIterator<Item = T>>(&mut self, values: I) -> Result<(), SendError<()>> {
518        // TODO: We could return the remainder of I under cancellation.
519        for value in values.into_iter() {
520            self.send(value)?;
521        }
522        Ok(())
523    }
524
525    /// Sends any buffered values, clearing the current batch.
526    pub fn drain(&mut self) -> Result<(), SendError<()>> {
527        // TODO: send_iter
528        match self.sender.send_vec(std::mem::take(&mut self.buffer)) {
529            Ok(drained_vec) => {
530                self.buffer = drained_vec;
531                Ok(())
532            }
533            Err(_) => Err(SendError(())),
534        }
535    }
536}
537
538// Sender
539
540/// The asynchronous sending half of a channel.
541#[derive(Debug)]
542pub struct Sender<T> {
543    core: Pin<splitrc::Tx<Core<T>>>,
544}
545
546derive_clone!(Sender);
547
548impl<T> Sender<T> {
549    /// Converts asynchronous `Sender` to `SyncSender`.
550    pub fn into_sync(self) -> SyncSender<T> {
551        SyncSender { core: self.core }
552    }
553
554    /// Send a single value.
555    ///
556    /// Returns [SendError] if all receivers are dropped.
557    pub fn send(&self, value: T) -> impl Future<Output = Result<(), SendError<T>>> + '_ {
558        Send {
559            sender: self,
560            value: Some(value),
561            waker: WakerSlot::new(),
562        }
563    }
564
565    /// Send multiple values.
566    ///
567    /// If all receivers are dropped, SendError is returned and unsent
568    /// values are dropped.
569    pub fn send_iter<'a, I>(
570        &'a self,
571        values: I,
572    ) -> impl Future<Output = Result<(), SendError<()>>> + 'a
573    where
574        I: IntoIterator<Item = T> + 'a,
575    {
576        SendIter {
577            sender: self,
578            values: Some(values.into_iter().peekable()),
579            waker: WakerSlot::new(),
580        }
581    }
582
583    /// Automatically accumulate sends into a buffer of size `batch_limit`
584    /// and send when full.
585    pub async fn autobatch<R>(
586        self,
587        batch_limit: usize,
588        f: impl AsyncFnOnce(&mut BatchSender<T>) -> Result<R, SendError<()>>,
589    ) -> Result<R, SendError<()>> {
590        let mut tx = BatchSender {
591            sender: self,
592            batch_limit,
593            buffer: Vec::with_capacity(batch_limit),
594        };
595        let r = f(&mut tx).await?;
596        tx.drain().await?;
597        Ok(r)
598    }
599
600    /// Same as [Sender::autobatch] except that it immediately returns
601    /// `()` when `f` returns [SendError]. This is a convenience
602    /// wrapper for the common case that the future is passed to a
603    /// spawn function and the receiver being dropped (i.e.
604    /// [SendError]) is considered a clean cancellation.
605    pub async fn autobatch_or_cancel(
606        self,
607        capacity: usize,
608        f: impl AsyncFnOnce(&mut BatchSender<T>) -> Result<(), SendError<()>>,
609    ) {
610        self.autobatch(capacity, f).await.unwrap_or(())
611    }
612}
613
614#[must_use = "futures do nothing unless you `.await` or poll them"]
615#[pin_project(PinnedDrop)]
616struct Send<'a, T> {
617    sender: &'a Sender<T>,
618    value: Option<T>,
619    #[pin]
620    waker: WakerSlot,
621}
622
623#[pinned_drop]
624impl<T> PinnedDrop for Send<'_, T> {
625    fn drop(mut self: Pin<&mut Self>) {
626        if self.waker.is_linked() {
627            let mut state = self.sender.core.as_ref().project_ref().state.lock();
628            state
629                .as_mut()
630                .base()
631                .project()
632                .tx_wakers
633                .unlink(self.project().waker);
634        }
635    }
636}
637
638impl<T> Future for Send<'_, T> {
639    type Output = Result<(), SendError<T>>;
640
641    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
642        let mut state = self.sender.core.as_ref().project_ref().state.lock();
643        if state.closed {
644            return Poll::Ready(Err(SendError(self.project().value.take().unwrap())));
645        }
646        if state.has_capacity() {
647            state
648                .as_mut()
649                .project()
650                .queue
651                .push_back(self.as_mut().project().value.take().unwrap());
652            self.project().sender.core.as_ref().wake_one_rx(state);
653            Poll::Ready(Ok(()))
654        } else {
655            state.as_mut().base().pending_tx(self.project().waker, cx)
656        }
657    }
658}
659
660#[must_use = "futures do nothing unless you `.await` or poll them"]
661#[pin_project(PinnedDrop)]
662struct SendIter<'a, T, I: Iterator<Item = T>> {
663    sender: &'a Sender<T>,
664    values: Option<Peekable<I>>,
665    #[pin]
666    waker: WakerSlot,
667}
668
669#[pinned_drop]
670impl<T, I: Iterator<Item = T>> PinnedDrop for SendIter<'_, T, I> {
671    fn drop(mut self: Pin<&mut Self>) {
672        if self.waker.is_linked() {
673            let mut state = self.sender.core.as_ref().project_ref().state.lock();
674            state
675                .as_mut()
676                .base()
677                .project()
678                .tx_wakers
679                .unlink(self.project().waker);
680        }
681    }
682}
683
684impl<T, I: Iterator<Item = T>> Future for SendIter<'_, T, I> {
685    type Output = Result<(), SendError<()>>;
686
687    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
688        // Optimize the case that send_iter was called with an empty
689        // iterator, and don't even acquire the lock.
690        {
691            let pi = self.as_mut().project().values.as_mut().unwrap();
692            if pi.peek().is_none() {
693                return Poll::Ready(Ok(()));
694            }
695            // Satisfy borrow checker: we cannot hold a mut reference to
696            // self through pi before acquiring the lock below.
697        }
698
699        let mut state = self.sender.core.as_ref().project_ref().state.lock();
700
701        // There is an awkward set of constraints here.
702        // 1. To check whether an iterator contains an item, one must be popped.
703        // 2. If the receivers are cancelled, we'd like to return the iterator whole.
704        // 3. If we don't know whether there are any remaining items, we must block
705        //    if the queue is at capacity.
706        // We relax constraint #2 because #3 is preferable.
707        // TODO: We could return Peekable<I> instead.
708
709        let pi = self.as_mut().project().values.as_mut().unwrap();
710        // We already checked above.
711        debug_assert!(pi.peek().is_some());
712        if state.closed {
713            Poll::Ready(Err(SendError(())))
714        } else if !state.has_capacity() {
715            // We know we have a value to send, but there is no room.
716            state.as_mut().base().pending_tx(self.project().waker, cx)
717        } else {
718            debug_assert!(state.has_capacity());
719            state.as_mut().project().queue.push_back(pi.next().unwrap());
720            while state.has_capacity() {
721                match pi.next() {
722                    Some(value) => {
723                        state.as_mut().project().queue.push_back(value);
724                    }
725                    None => {
726                        // Done pulling from the iterator and still
727                        // have capacity, so we're done.
728                        // TODO: wake_one_rx if we only queued one.
729                        self.sender.core.as_ref().wake_all_rx(state);
730                        return Poll::Ready(Ok(()));
731                    }
732                }
733            }
734            // We're out of capacity, and might still have items to
735            // send. To avoid a round-trip through the scheduler, peek
736            // ahead.
737            if pi.peek().is_none() {
738                self.sender.core.as_ref().wake_all_rx(state);
739                return Poll::Ready(Ok(()));
740            }
741
742            // Unconditionally returns Poll::Pending
743            let pending = state
744                .as_mut()
745                .base()
746                .pending_tx(self.as_mut().project().waker, cx);
747            self.sender.core.as_ref().wake_all_rx(state);
748            pending
749        }
750    }
751}
752
753// BatchSender
754
755/// The internal send handle used by [Sender::autobatch].
756/// Builds a buffer of size `batch_limit` and flushes when it's full.
757pub struct BatchSender<T> {
758    sender: Sender<T>,
759    batch_limit: usize,
760    buffer: Vec<T>,
761}
762
763impl<T> BatchSender<T> {
764    /// Adds a value to the internal buffer and flushes it into the
765    /// queue when the buffer fills.
766    pub async fn send(&mut self, value: T) -> Result<(), SendError<()>> {
767        self.buffer.push(value);
768        if self.buffer.len() == self.batch_limit {
769            self.drain().await?;
770        }
771        Ok(())
772    }
773
774    async fn drain(&mut self) -> Result<(), SendError<()>> {
775        self.sender.send_iter(self.buffer.drain(..)).await?;
776        assert!(self.buffer.is_empty());
777        Ok(())
778    }
779}
780
781// Receiver
782
783/// The receiving half of a channel. Reads are asynchronous.
784#[derive(Debug)]
785pub struct Receiver<T> {
786    core: Pin<splitrc::Rx<Core<T>>>,
787}
788
789derive_clone!(Receiver);
790
791#[must_use = "futures do nothing unless you `.await` or poll them"]
792#[pin_project(PinnedDrop)]
793struct Recv<'a, T> {
794    receiver: &'a Receiver<T>,
795    #[pin]
796    waker: WakerSlot,
797}
798
799#[pinned_drop]
800impl<T> PinnedDrop for Recv<'_, T> {
801    fn drop(mut self: Pin<&mut Self>) {
802        if self.waker.is_linked() {
803            let mut state = self.receiver.core.as_ref().project_ref().state.lock();
804            state
805                .as_mut()
806                .base()
807                .project()
808                .rx_wakers
809                .unlink(self.project().waker);
810        }
811    }
812}
813
814impl<T> Future for Recv<'_, T> {
815    type Output = Option<T>;
816
817    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
818        let mut state = self.receiver.core.as_ref().project_ref().state.lock();
819        match state.as_mut().project().queue.pop_front() {
820            Some(value) => {
821                self.receiver.core.as_ref().wake_all_tx(state);
822                Poll::Ready(Some(value))
823            }
824            None => {
825                if state.closed {
826                    Poll::Ready(None)
827                } else {
828                    state.as_mut().base().pending_rx(self.project().waker, cx)
829                }
830            }
831        }
832    }
833}
834
835#[must_use = "futures do nothing unless you .await or poll them"]
836#[pin_project(PinnedDrop)]
837struct RecvBatch<'a, T> {
838    receiver: &'a Receiver<T>,
839    element_limit: usize,
840    #[pin]
841    waker: WakerSlot,
842}
843
844#[pinned_drop]
845impl<T> PinnedDrop for RecvBatch<'_, T> {
846    fn drop(mut self: Pin<&mut Self>) {
847        if self.waker.is_linked() {
848            let mut state = self.receiver.core.as_ref().project_ref().state.lock();
849            state
850                .as_mut()
851                .base()
852                .project()
853                .rx_wakers
854                .unlink(self.project().waker);
855        }
856    }
857}
858
859impl<T> Future for RecvBatch<'_, T> {
860    type Output = Vec<T>;
861
862    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
863        let mut state = self.receiver.core.as_ref().project_ref().state.lock();
864        let q = &mut state.as_mut().project().queue;
865        let q_len = q.len();
866        if q_len == 0 {
867            if state.closed {
868                return Poll::Ready(Vec::new());
869            } else {
870                return state.as_mut().base().pending_rx(self.project().waker, cx);
871            }
872        }
873
874        let capacity = min(q_len, self.element_limit);
875        let v = Vec::from_iter(q.drain(..capacity));
876        self.receiver.core.as_ref().wake_all_tx(state);
877        Poll::Ready(v)
878    }
879}
880
881#[must_use = "futures do nothing unless you .await or poll them"]
882#[pin_project(PinnedDrop)]
883struct RecvVec<'a, T> {
884    receiver: &'a Receiver<T>,
885    element_limit: usize,
886    vec: &'a mut Vec<T>,
887    #[pin]
888    waker: WakerSlot,
889}
890
891#[pinned_drop]
892impl<T> PinnedDrop for RecvVec<'_, T> {
893    fn drop(mut self: Pin<&mut Self>) {
894        if self.waker.is_linked() {
895            let mut state = self.receiver.core.as_ref().project_ref().state.lock();
896            state
897                .as_mut()
898                .base()
899                .project()
900                .rx_wakers
901                .unlink(self.project().waker);
902        }
903    }
904}
905
906impl<T> Future for RecvVec<'_, T> {
907    type Output = ();
908
909    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
910        let mut state = self.receiver.core.as_ref().project_ref().state.lock();
911        let q = &mut state.as_mut().project().queue;
912        let q_len = q.len();
913        if q_len == 0 {
914            if state.closed {
915                assert!(self.vec.is_empty());
916                return Poll::Ready(());
917            } else {
918                return state.as_mut().base().pending_rx(self.project().waker, cx);
919            }
920        }
921
922        let capacity = min(q_len, self.element_limit);
923        self.as_mut().project().vec.extend(q.drain(..capacity));
924        self.project().receiver.core.as_ref().wake_all_tx(state);
925        Poll::Ready(())
926    }
927}
928
929impl<T> Receiver<T> {
930    /// Converts asynchronous `Receiver` to `SyncReceiver`.
931    pub fn into_sync(self) -> SyncReceiver<T> {
932        SyncReceiver { core: self.core }
933    }
934
935    /// Wait for a single value from the channel.
936    ///
937    /// Returns [None] if all [Sender]s are dropped.
938    pub fn recv(&self) -> impl Future<Output = Option<T>> + '_ {
939        Recv {
940            receiver: self,
941            waker: WakerSlot::new(),
942        }
943    }
944
945    // TODO: try_recv
946
947    /// Wait for up to `element_limit` values from the channel.
948    ///
949    /// Up to `element_limit` values are returned if they're already
950    /// available. Otherwise, waits for any values to be available.
951    ///
952    /// Returns an empty [Vec] if all [Sender]s are dropped.
953    pub fn recv_batch(&self, element_limit: usize) -> impl Future<Output = Vec<T>> + '_ {
954        RecvBatch {
955            receiver: self,
956            element_limit,
957            waker: WakerSlot::new(),
958        }
959    }
960
961    // TODO: try_recv_batch
962
963    /// Wait for up to `element_limit` values from the channel and
964    /// store them in `vec`.
965    ///
966    /// `vec` should be empty when passed in. Nevertheless, `recv_vec`
967    /// will clear it before adding values. The intent of `recv_vec`
968    /// is that batches can be repeatedly read by workers without new
969    /// allocations.
970    ///
971    /// It's not required, but `vec`'s capacity should be greater than
972    /// or equal to element_limit to avoid reallocation.
973    pub fn recv_vec<'a>(
974        &'a self,
975        element_limit: usize,
976        vec: &'a mut Vec<T>,
977    ) -> impl Future<Output = ()> + 'a {
978        vec.clear();
979        RecvVec {
980            receiver: self,
981            element_limit,
982            vec,
983            waker: WakerSlot::new(),
984        }
985    }
986
987    // TODO: try_recv_vec
988
989    /// Return a [futures_core::Stream] implementation that repeatedly
990    /// calls `recv`. It returns None when the channel is closed.
991    ///
992    /// `Stream` must be pinned before use:
993    ///
994    /// ```rust,ignore
995    /// let mut rx = pin!(rx.stream());
996    /// let elt1 = rx.next().await;
997    /// let elt2 = rx.next().await;
998    /// ```
999    #[cfg(feature = "futures-core")]
1000    pub fn stream<'a>(&'a self) -> Stream<'a, T> {
1001        let recv = Recv {
1002            receiver: self,
1003            waker: WakerSlot::new(),
1004        };
1005        Stream { recv }
1006    }
1007}
1008
1009/// A [futures::core::Stream] implementation that returns elements
1010/// from the stream, one by one, until it's closed.
1011#[cfg(feature = "futures-core")]
1012#[must_use = "streams do nothing unless you `.await` or poll them"]
1013#[pin_project]
1014pub struct Stream<'a, T> {
1015    #[pin]
1016    recv: Recv<'a, T>,
1017}
1018
1019#[cfg(feature = "futures-core")]
1020impl<'a, T> futures_core::Stream for Stream<'a, T> {
1021    type Item = T;
1022
1023    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1024        self.project().recv.poll(cx)
1025    }
1026}
1027
1028// SyncReceiver
1029
1030/// The synchronous receiving half of a channel.
1031#[derive(Debug)]
1032pub struct SyncReceiver<T> {
1033    core: Pin<splitrc::Rx<Core<T>>>,
1034}
1035
1036derive_clone!(SyncReceiver);
1037
1038impl<T> SyncReceiver<T> {
1039    /// Converts `SyncReceiver` to asynchronous `Receiver`.
1040    pub fn into_async(self) -> Receiver<T> {
1041        Receiver { core: self.core }
1042    }
1043
1044    /// Block waiting for a single value from the channel.
1045    ///
1046    /// Returns [None] if all [Sender]s are dropped.
1047    pub fn recv(&self) -> Option<T> {
1048        let mut state = self.core.as_ref().block_until_not_empty();
1049        match state.as_mut().project().queue.pop_front() {
1050            Some(value) => {
1051                self.core.as_ref().wake_all_tx(state);
1052                Some(value)
1053            }
1054            None => {
1055                assert!(state.closed);
1056                None
1057            }
1058        }
1059    }
1060
1061    /// Block waiting for values from the channel.
1062    ///
1063    /// Up to `element_limit` values are returned if they're already
1064    /// available. Otherwise, waits for any values to be available.
1065    ///
1066    /// Returns an empty [Vec] if all [Sender]s are dropped.
1067    pub fn recv_batch(&self, element_limit: usize) -> Vec<T> {
1068        let mut state = self.core.as_ref().block_until_not_empty();
1069
1070        let q = &mut state.as_mut().project().queue;
1071        let q_len = q.len();
1072        if q_len == 0 {
1073            assert!(state.closed);
1074            return Vec::new();
1075        }
1076
1077        let capacity = min(q_len, element_limit);
1078        let v = Vec::from_iter(q.drain(..capacity));
1079        self.core.as_ref().wake_all_tx(state);
1080        v
1081    }
1082
1083    /// Wait for up to `element_limit` values from the channel and
1084    /// store them in `vec`.
1085    ///
1086    /// `vec` should be empty when passed in. Nevertheless, `recv_vec`
1087    /// will clear it before adding values. The intent of `recv_vec`
1088    /// is that batches can be repeatedly read by workers without new
1089    /// allocations.
1090    ///
1091    /// It's not required, but `vec`'s capacity should be greater than
1092    /// or equal to element_limit to avoid reallocation.
1093    pub fn recv_vec(&self, element_limit: usize, vec: &mut Vec<T>) {
1094        vec.clear();
1095
1096        let mut state = self.core.as_ref().block_until_not_empty();
1097        let q = &mut state.as_mut().project().queue;
1098        let q_len = q.len();
1099        if q_len == 0 {
1100            assert!(state.closed);
1101            // The result vector is already cleared.
1102            return;
1103        }
1104
1105        let capacity = min(q_len, element_limit);
1106        vec.extend(q.drain(..capacity));
1107        self.core.as_ref().wake_all_tx(state);
1108    }
1109}
1110
1111impl<T> Iterator for SyncReceiver<T> {
1112    type Item = T;
1113
1114    fn next(&mut self) -> Option<Self::Item> {
1115        self.recv()
1116    }
1117}
1118
1119// Constructors
1120
1121/// Allocates a bounded channel and returns the sender, receiver
1122/// pair.
1123///
1124/// Rust async is polling, so unbuffered channels are not supported.
1125/// Therefore, a capacity of 0 is rounded up to 1.
1126pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
1127    Builder::new().bounded(capacity).build_async()
1128}
1129
1130/// Allocates a bounded channel and returns the synchronous handles as
1131/// a sender, receiver pair.
1132///
1133/// Because handles can be converted freely between sync and async,
1134/// and Rust async is polling, unbuffered channels are not
1135/// supported. A capacity of 0 is rounded up to 1.
1136pub fn bounded_sync<T>(capacity: usize) -> (SyncSender<T>, SyncReceiver<T>) {
1137    let (tx, rx) = bounded(capacity);
1138    (tx.into_sync(), rx.into_sync())
1139}
1140
1141/// Allocates an unbounded channel and returns the sender,
1142/// receiver pair.
1143pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1144    Builder::new().build_async()
1145}
1146
1147/// Allocates an unbounded channel and returns the synchronous handles
1148/// as a sender, receiver pair.
1149pub fn unbounded_sync<T>() -> (SyncSender<T>, SyncReceiver<T>) {
1150    let (tx, rx) = unbounded();
1151    (tx.into_sync(), rx.into_sync())
1152}
1153
1154/// Customized channel construction.
1155#[derive(Debug, Default)]
1156pub struct Builder {
1157    capacity: Option<usize>,
1158    preallocate: bool,
1159}
1160
1161impl Builder {
1162    /// Defaults to unbounded.
1163    pub fn new() -> Self {
1164        Default::default()
1165    }
1166
1167    /// Define this channel as bounded with the given capacity.
1168    ///
1169    /// Because handles can be converted freely between sync and
1170    /// async, and Rust async is polling, unbuffered channels are not
1171    /// supported. A capacity of 0 is rounded up to 1.
1172    pub fn bounded(&mut self, capacity: usize) -> &mut Self {
1173        self.capacity = Some(capacity);
1174        self
1175    }
1176
1177    /// Preallocate bounded channels with their full capacity. This
1178    /// avoids reallocation at runtime.
1179    ///
1180    /// Has no effect on unbounded channels.
1181    pub fn preallocate(&mut self) -> &mut Self {
1182        self.preallocate = true;
1183        self
1184    }
1185
1186    /// Allocates a channel and returns async sender and receiver
1187    /// handles.
1188    pub fn build_async<T>(&mut self) -> (Sender<T>, Receiver<T>) {
1189        let capacity;
1190        let queue;
1191        match self.capacity {
1192            Some(c) => {
1193                capacity = c;
1194                queue = if self.preallocate {
1195                    VecDeque::with_capacity(capacity)
1196                } else {
1197                    VecDeque::new()
1198                };
1199            }
1200            None => {
1201                capacity = UNBOUNDED_CAPACITY;
1202                queue = VecDeque::new();
1203            }
1204        };
1205
1206        let core = Core {
1207            state: Mutex::new(State {
1208                base: StateBase {
1209                    capacity,
1210                    closed: false,
1211                    tx_wakers: WakerList::new(),
1212                    rx_wakers: WakerList::new(),
1213                },
1214                queue,
1215            }),
1216            not_empty: OnceLock::new(),
1217            not_full: OnceLock::new(),
1218        };
1219        let (core_tx, core_rx) = splitrc::pin(core);
1220        (Sender { core: core_tx }, Receiver { core: core_rx })
1221    }
1222
1223    /// Allocates a channel and returns the synchronous sender and
1224    /// receiver handles.
1225    pub fn build_sync<T>(&mut self) -> (SyncSender<T>, SyncReceiver<T>) {
1226        let (tx, rx) = self.build_async();
1227        (tx.into_sync(), rx.into_sync())
1228    }
1229}