batch_channel/
lib.rs

1#![doc = include_str!("../README.md")]
2#![doc = include_str!("example.md")]
3
4use futures_core::future::BoxFuture;
5use mutex::PinnedCondvar as Condvar;
6use mutex::PinnedMutex as Mutex;
7use mutex::PinnedMutexGuard as MutexGuard;
8use pin_project::pin_project;
9use pin_project::pinned_drop;
10#[cfg(feature = "parking_lot")]
11use pinned_mutex::parking_lot as mutex;
12#[cfg(not(feature = "parking_lot"))]
13use pinned_mutex::std as mutex;
14use std::cmp::min;
15use std::collections::VecDeque;
16use std::fmt;
17use std::future::Future;
18use std::iter::Peekable;
19use std::pin::Pin;
20use std::sync::OnceLock;
21use std::task::Context;
22use std::task::Poll;
23use wakerset::WakerList;
24use wakerset::WakerSlot;
25
26const UNBOUNDED_CAPACITY: usize = usize::MAX;
27
28macro_rules! derive_clone {
29    ($t:ident) => {
30        impl<T> Clone for $t<T> {
31            fn clone(&self) -> Self {
32                Self {
33                    core: self.core.clone(),
34                }
35            }
36        }
37    };
38}
39
40#[derive(Debug)]
41#[pin_project]
42struct StateBase {
43    capacity: usize,
44    closed: bool,
45    #[pin]
46    tx_wakers: WakerList,
47    #[pin]
48    rx_wakers: WakerList,
49}
50
51impl StateBase {
52    fn target_capacity(&self) -> usize {
53        // TODO: We could offer an option to use queue.capacity
54        // instead.
55        self.capacity
56    }
57
58    fn pending_tx<T>(
59        self: Pin<&mut StateBase>,
60        slot: Pin<&mut WakerSlot>,
61        cx: &mut Context,
62    ) -> Poll<T> {
63        // This may allocate, but only when the sender is about to
64        // block, which is already expensive.
65        self.project().tx_wakers.link(slot, cx.waker().clone());
66        Poll::Pending
67    }
68
69    fn pending_rx<T>(
70        self: Pin<&mut StateBase>,
71        slot: Pin<&mut WakerSlot>,
72        cx: &mut Context,
73    ) -> Poll<T> {
74        // This may allocate, but only when the receiver is about to
75        // block, which is already expensive.
76        self.project().rx_wakers.link(slot, cx.waker().clone());
77        Poll::Pending
78    }
79}
80
81#[derive(Debug)]
82#[pin_project]
83struct State<T> {
84    #[pin]
85    base: StateBase,
86    queue: VecDeque<T>,
87}
88
89impl<T> State<T> {
90    fn has_capacity(&self) -> bool {
91        self.queue.len() < self.target_capacity()
92    }
93
94    fn base(self: Pin<&mut Self>) -> Pin<&mut StateBase> {
95        self.project().base
96    }
97}
98
99impl<T> std::ops::Deref for State<T> {
100    type Target = StateBase;
101
102    fn deref(&self) -> &Self::Target {
103        &self.base
104    }
105}
106
107impl<T> std::ops::DerefMut for State<T> {
108    fn deref_mut(&mut self) -> &mut Self::Target {
109        &mut self.base
110    }
111}
112
113#[derive(Debug)]
114#[pin_project]
115struct Core<T> {
116    #[pin]
117    state: Mutex<State<T>>,
118    // OnceLock ensures Core is Sync and Arc<Core> is Send. But it is
119    // not strictly necessary, as these condition variables are only
120    // accessed while the lock is held. Alas, Rust does not allow
121    // Condvar to be stored under the Mutex.
122    not_empty: OnceLock<Condvar>,
123    not_full: OnceLock<Condvar>,
124}
125
126impl<T> Core<T> {
127    /// Returns when there is a value or there are no values and all
128    /// senders are dropped.
129    fn block_until_not_empty(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
130        fn condition<T>(s: Pin<&mut State<T>>) -> bool {
131            !s.closed && s.queue.is_empty()
132        }
133
134        let mut state = self.project_ref().state.lock();
135        if !condition(state.as_mut()) {
136            return state;
137        }
138        // Initialize the condvar while the lock is held. Thus, the
139        // caller can, while the lock is held, check whether the
140        // condvar must be notified.
141        let not_empty = self.not_empty.get_or_init(Default::default);
142        not_empty.wait_while(state, condition)
143    }
144
145    /// Returns when there is either room in the queue or all receivers
146    /// are dropped.
147    fn block_until_not_full(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
148        fn condition<T>(s: Pin<&mut State<T>>) -> bool {
149            !s.closed && !s.has_capacity()
150        }
151
152        let mut state = self.project_ref().state.lock();
153        if !condition(state.as_mut()) {
154            return state;
155        }
156        // Initialize the condvar while the lock is held. Thus, the
157        // caller can, while the lock is held, check whether the
158        // condvar must be notified.
159        let not_full = self.not_full.get_or_init(Default::default);
160        not_full.wait_while(state, condition)
161    }
162
163    /// Returns when there is either room in the queue or all receivers
164    /// are dropped.
165    fn wake_rx_and_block_while_full<'a>(
166        self: Pin<&'a Self>,
167        mut state: MutexGuard<'a, State<T>>,
168    ) -> MutexGuard<'a, State<T>> {
169        // The lock is held. Therefore, we know whether a Condvar must
170        // be notified or not.
171        let cvar = self.not_empty.get();
172        // We should not wake Wakers while a lock is held. Therefore,
173        // we must release the lock and reacquire it to wait.
174        let mut wakers = state
175            .as_mut()
176            .project()
177            .base
178            .project()
179            .rx_wakers
180            .extract_some_wakers();
181
182        // TODO: Avoid unlocking and locking again when there's no
183        // waker or condition variable.
184
185        drop(state);
186        if let Some(cvar) = cvar {
187            // TODO: There are situations where we may know that we
188            // can get away with notify_one().
189            cvar.notify_all();
190        }
191        // There is no guarantee that the highest-priority waker will
192        // actually call poll() again. Therefore, the best we can do
193        // is wake everyone.
194        while wakers.wake_all() {
195            let mut state = self.project_ref().state.lock();
196            wakers.extract_more(state.as_mut().base().project().rx_wakers);
197        }
198
199        // Lock again to block while full.
200        let state = self.project_ref().state.lock();
201
202        // Initialize the condvar while the lock is held. Thus, the
203        // caller can, while the lock is held, check whether the
204        // condvar must be notified.
205        let not_full = self.not_full.get_or_init(Default::default);
206        not_full.wait_while(state, |s| !s.closed && !s.has_capacity())
207    }
208
209    fn wake_all_tx(self: Pin<&Self>, mut state: MutexGuard<State<T>>) {
210        // The lock is held. Therefore, we know whether a Condvar must be notified or not.
211        let cvar = self.not_full.get();
212        let mut wakers = state
213            .as_mut()
214            .project()
215            .base
216            .project()
217            .tx_wakers
218            .extract_some_wakers();
219        drop(state);
220        if let Some(cvar) = cvar {
221            // TODO: There are situations where we may know that we
222            // can get away with notify_one().
223            cvar.notify_all();
224        }
225        // There is no guarantee that the highest-priority waker will
226        // actually call poll() again. Therefore, the best we can do
227        // is wake everyone.
228        while wakers.wake_all() {
229            let mut state = self.project_ref().state.lock();
230            wakers.extract_more(state.as_mut().project().base.project().tx_wakers);
231        }
232    }
233
234    fn wake_one_rx(self: Pin<&Self>, mut state: MutexGuard<State<T>>) {
235        // The lock is held. Therefore, we know whether a Condvar must be notified or not.
236        let cvar = self.not_empty.get();
237        let mut wakers = state
238            .as_mut()
239            .project()
240            .base
241            .project()
242            .rx_wakers
243            .extract_some_wakers();
244        drop(state);
245        if let Some(cvar) = cvar {
246            cvar.notify_one();
247        }
248        // There is no guarantee that the highest-priority waker will
249        // actually call poll() again. Therefore, the best we can do
250        // is wake everyone.
251        while wakers.wake_all() {
252            let mut state = self.project_ref().state.lock();
253            wakers.extract_more(state.as_mut().project().base.project().rx_wakers);
254        }
255    }
256
257    fn wake_all_rx(self: Pin<&Self>, mut state: MutexGuard<State<T>>) {
258        // The lock is held. Therefore, we know whether a Condvar must be notified or not.
259        let cvar = self.not_empty.get();
260        let mut wakers = state
261            .as_mut()
262            .project()
263            .base
264            .project()
265            .rx_wakers
266            .extract_some_wakers();
267        drop(state);
268        if let Some(cvar) = cvar {
269            cvar.notify_all();
270        }
271        // There is no guarantee that the highest-priority waker will
272        // actually call poll() again. Therefore, the best we can do
273        // is wake everyone.
274        while wakers.wake_all() {
275            let mut state = self.project_ref().state.lock();
276            wakers.extract_more(state.as_mut().project().base.project().rx_wakers);
277        }
278    }
279}
280
281impl<T> splitrc::Notify for Core<T> {
282    fn last_tx_did_drop_pinned(self: Pin<&Self>) {
283        let mut state = self.project_ref().state.lock();
284        *state.as_mut().base().project().closed = true;
285        // We cannot deallocate the queue, as remaining receivers can
286        // drain it.
287        self.wake_all_rx(state);
288    }
289
290    fn last_rx_did_drop_pinned(self: Pin<&Self>) {
291        let mut state = self.project_ref().state.lock();
292        *state.as_mut().base().project().closed = true;
293        // TODO: deallocate
294        state.as_mut().project().queue.clear();
295        self.wake_all_tx(state);
296    }
297}
298
299// SendError
300
301/// An error returned from [Sender::send] when all [Receiver]s are
302/// dropped.
303///
304/// The unsent value is returned.
305#[derive(Clone, Copy, Debug, Eq, PartialEq)]
306pub struct SendError<T>(pub T);
307
308impl<T> fmt::Display for SendError<T> {
309    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310        write!(f, "failed to send value on channel")
311    }
312}
313
314impl<T: fmt::Debug> std::error::Error for SendError<T> {}
315
316// SyncSender
317
318/// The sending half of a channel.
319#[derive(Debug)]
320pub struct SyncSender<T> {
321    core: Pin<splitrc::Tx<Core<T>>>,
322}
323
324derive_clone!(SyncSender);
325
326impl<T> SyncSender<T> {
327    /// Converts `SyncSender` to asynchronous `Sender`.
328    pub fn into_async(self) -> Sender<T> {
329        Sender { core: self.core }
330    }
331
332    /// Send a single value.
333    ///
334    /// Returns [SendError] if all receivers are dropped.
335    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
336        let mut state = self.core.as_ref().block_until_not_full();
337        if state.closed {
338            assert!(state.as_ref().project_ref().queue.is_empty());
339            return Err(SendError(value));
340        }
341
342        state.as_mut().project().queue.push_back(value);
343
344        self.core.as_ref().wake_one_rx(state);
345        Ok(())
346    }
347
348    /// Send multiple values.
349    ///
350    /// If all receivers are dropped, SendError is returned. The
351    /// values cannot be returned, as they may have been partially
352    /// sent when the channel is closed.
353    pub fn send_iter<I>(&self, values: I) -> Result<(), SendError<()>>
354    where
355        I: IntoIterator<Item = T>,
356    {
357        let mut values = values.into_iter();
358
359        // If the iterator is empty, we can avoid acquiring the lock.
360        let Some(mut value) = values.next() else {
361            return Ok(());
362        };
363
364        let mut sent_count = 0usize;
365
366        let mut state = self.core.as_ref().block_until_not_full();
367        'outer: loop {
368            if state.closed {
369                // We may have sent some values, but the receivers are
370                // all dropped, and that cleared the queue.
371                assert!(state.queue.is_empty());
372                return Err(SendError(()));
373            }
374
375            debug_assert!(state.has_capacity());
376            state.as_mut().project().queue.push_back(value);
377            sent_count += 1;
378            loop {
379                match values.next() {
380                    Some(v) => {
381                        if state.has_capacity() {
382                            state.as_mut().project().queue.push_back(v);
383                            sent_count += 1;
384                        } else {
385                            value = v;
386                            // We're about to block, but we know we
387                            // sent at least one value, so wake any
388                            // waiters.
389                            state = self.core.as_ref().wake_rx_and_block_while_full(state);
390                            continue 'outer;
391                        }
392                    }
393                    None => {
394                        // Done pulling from the iterator and know we
395                        // sent at least one value.
396                        if sent_count == 1 {
397                            self.core.as_ref().wake_one_rx(state);
398                        } else {
399                            self.core.as_ref().wake_all_rx(state);
400                        }
401                        return Ok(());
402                    }
403                }
404            }
405        }
406    }
407
408    /// Drain a [Vec] into the channel without deallocating it.
409    ///
410    /// This is a convenience method for allocation-free batched
411    /// sends. The `values` vector is drained, and then returned with
412    /// the same capacity it had.
413    pub fn send_vec(&self, mut values: Vec<T>) -> Result<Vec<T>, SendError<Vec<T>>> {
414        match self.send_iter(values.drain(..)) {
415            Ok(_) => Ok(values),
416            Err(_) => Err(SendError(values)),
417        }
418    }
419
420    /// Automatically accumulate sends into a buffer of size `batch_limit`
421    /// and send when full.
422    pub fn autobatch<'a, F, R>(&'a mut self, batch_limit: usize, f: F) -> Result<R, SendError<()>>
423    where
424        F: (FnOnce(&mut SyncBatchSender<'a, T>) -> Result<R, SendError<()>>),
425    {
426        let mut tx = SyncBatchSender {
427            sender: self,
428            capacity: batch_limit,
429            buffer: Vec::with_capacity(batch_limit),
430        };
431        let r = f(&mut tx)?;
432        tx.drain()?;
433        Ok(r)
434    }
435}
436
437// SyncBatchSender
438
439/// Automatically batches up values and sends them when a batch is full.
440#[derive(Debug)]
441pub struct SyncBatchSender<'a, T> {
442    sender: &'a mut SyncSender<T>,
443    capacity: usize,
444    buffer: Vec<T>,
445}
446
447impl<T> SyncBatchSender<'_, T> {
448    /// Buffers a single value to be sent on the channel.
449    ///
450    /// Sends the batch if the buffer is full.
451    pub fn send(&mut self, value: T) -> Result<(), SendError<()>> {
452        self.buffer.push(value);
453        // TODO: consider using the full capacity if Vec overallocated.
454        if self.buffer.len() == self.capacity {
455            self.drain()
456        } else {
457            Ok(())
458        }
459    }
460
461    /// Buffers multiple values, sending batches as the internal
462    /// buffer reaches capacity.
463    pub fn send_iter<I: IntoIterator<Item = T>>(&mut self, values: I) -> Result<(), SendError<()>> {
464        // TODO: We could return the remainder of I under cancellation.
465        for value in values.into_iter() {
466            self.send(value)?;
467        }
468        Ok(())
469    }
470
471    /// Sends any buffered values, clearing the current batch.
472    pub fn drain(&mut self) -> Result<(), SendError<()>> {
473        // TODO: send_iter
474        match self.sender.send_vec(std::mem::take(&mut self.buffer)) {
475            Ok(drained_vec) => {
476                self.buffer = drained_vec;
477                Ok(())
478            }
479            Err(_) => Err(SendError(())),
480        }
481    }
482}
483
484// Sender
485
486/// The asynchronous sending half of a channel.
487#[derive(Debug)]
488pub struct Sender<T> {
489    core: Pin<splitrc::Tx<Core<T>>>,
490}
491
492derive_clone!(Sender);
493
494impl<T> Sender<T> {
495    /// Converts asynchronous `Sender` to `SyncSender`.
496    pub fn into_sync(self) -> SyncSender<T> {
497        SyncSender { core: self.core }
498    }
499
500    /// Send a single value.
501    ///
502    /// Returns [SendError] if all receivers are dropped.
503    pub fn send(&self, value: T) -> impl Future<Output = Result<(), SendError<T>>> + '_ {
504        Send {
505            sender: self,
506            value: Some(value),
507            waker: WakerSlot::new(),
508        }
509    }
510
511    /// Send multiple values.
512    ///
513    /// If all receivers are dropped, SendError is returned and unsent
514    /// values are dropped.
515    pub fn send_iter<'a, I>(
516        &'a self,
517        values: I,
518    ) -> impl Future<Output = Result<(), SendError<()>>> + 'a
519    where
520        I: IntoIterator<Item = T> + 'a,
521    {
522        SendIter {
523            sender: self,
524            values: Some(values.into_iter().peekable()),
525            waker: WakerSlot::new(),
526        }
527    }
528
529    /// Automatically accumulate sends into a buffer of size `batch_limit`
530    /// and send when full.
531    ///
532    /// The callback's future must be boxed to work around [type
533    /// system limitations in
534    /// Rust](https://smallcultfollowing.com/babysteps/blog/2023/03/29/thoughts-on-async-closures/).
535    ///
536    /// There is [a git
537    /// branch](https://github.com/chadaustin/batch-channel/tree/async-closure-autobatch)
538    /// that shows what an API based on async closures would look
539    /// like.
540    pub async fn autobatch<F, R>(self, batch_limit: usize, f: F) -> Result<R, SendError<()>>
541    where
542        for<'a> F: (FnOnce(&'a mut BatchSender<T>) -> BoxFuture<'a, Result<R, SendError<()>>>),
543    {
544        let mut tx = BatchSender {
545            sender: self,
546            batch_limit,
547            buffer: Vec::with_capacity(batch_limit),
548        };
549        let r = f(&mut tx).await?;
550        tx.drain().await?;
551        Ok(r)
552    }
553
554    /// Same as [Sender::autobatch] except that it immediately returns
555    /// `()` when `f` returns [SendError]. This is a convenience
556    /// wrapper for the common case that the future is passed to a
557    /// spawn function and the receiver being dropped (i.e.
558    /// [SendError]) is considered a clean cancellation.
559    pub async fn autobatch_or_cancel<F>(self, capacity: usize, f: F)
560    where
561        for<'a> F: (FnOnce(&'a mut BatchSender<T>) -> BoxFuture<'a, Result<(), SendError<()>>>),
562    {
563        self.autobatch(capacity, f).await.unwrap_or(())
564    }
565}
566
567#[must_use = "futures do nothing unless you `.await` or poll them"]
568#[pin_project(PinnedDrop)]
569struct Send<'a, T> {
570    sender: &'a Sender<T>,
571    value: Option<T>,
572    #[pin]
573    waker: WakerSlot,
574}
575
576#[pinned_drop]
577impl<T> PinnedDrop for Send<'_, T> {
578    fn drop(mut self: Pin<&mut Self>) {
579        if self.waker.is_linked() {
580            let mut state = self.sender.core.as_ref().project_ref().state.lock();
581            state
582                .as_mut()
583                .base()
584                .project()
585                .tx_wakers
586                .unlink(self.project().waker);
587        }
588    }
589}
590
591impl<T> Future for Send<'_, T> {
592    type Output = Result<(), SendError<T>>;
593
594    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
595        let mut state = self.sender.core.as_ref().project_ref().state.lock();
596        if state.closed {
597            return Poll::Ready(Err(SendError(self.project().value.take().unwrap())));
598        }
599        if state.has_capacity() {
600            state
601                .as_mut()
602                .project()
603                .queue
604                .push_back(self.as_mut().project().value.take().unwrap());
605            self.project().sender.core.as_ref().wake_one_rx(state);
606            Poll::Ready(Ok(()))
607        } else {
608            state.as_mut().base().pending_tx(self.project().waker, cx)
609        }
610    }
611}
612
613#[must_use = "futures do nothing unless you `.await` or poll them"]
614#[pin_project(PinnedDrop)]
615struct SendIter<'a, T, I: Iterator<Item = T>> {
616    sender: &'a Sender<T>,
617    values: Option<Peekable<I>>,
618    #[pin]
619    waker: WakerSlot,
620}
621
622#[pinned_drop]
623impl<T, I: Iterator<Item = T>> PinnedDrop for SendIter<'_, T, I> {
624    fn drop(mut self: Pin<&mut Self>) {
625        if self.waker.is_linked() {
626            let mut state = self.sender.core.as_ref().project_ref().state.lock();
627            state
628                .as_mut()
629                .base()
630                .project()
631                .tx_wakers
632                .unlink(self.project().waker);
633        }
634    }
635}
636
637impl<T, I: Iterator<Item = T>> Future for SendIter<'_, T, I> {
638    type Output = Result<(), SendError<()>>;
639
640    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
641        // Optimize the case that send_iter was called with an empty
642        // iterator, and don't even acquire the lock.
643        {
644            let pi = self.as_mut().project().values.as_mut().unwrap();
645            if pi.peek().is_none() {
646                return Poll::Ready(Ok(()));
647            }
648            // Satisfy borrow checker: we cannot hold a mut reference to
649            // self through pi before acquiring the lock below.
650        }
651
652        let mut state = self.sender.core.as_ref().project_ref().state.lock();
653
654        // There is an awkward set of constraints here.
655        // 1. To check whether an iterator contains an item, one must be popped.
656        // 2. If the receivers are cancelled, we'd like to return the iterator whole.
657        // 3. If we don't know whether there are any remaining items, we must block
658        //    if the queue is at capacity.
659        // We relax constraint #2 because #3 is preferable.
660        // TODO: We could return Peekable<I> instead.
661
662        let pi = self.as_mut().project().values.as_mut().unwrap();
663        // We already checked above.
664        debug_assert!(pi.peek().is_some());
665        if state.closed {
666            Poll::Ready(Err(SendError(())))
667        } else if !state.has_capacity() {
668            // We know we have a value to send, but there is no room.
669            state.as_mut().base().pending_tx(self.project().waker, cx)
670        } else {
671            debug_assert!(state.has_capacity());
672            state.as_mut().project().queue.push_back(pi.next().unwrap());
673            while state.has_capacity() {
674                match pi.next() {
675                    Some(value) => {
676                        state.as_mut().project().queue.push_back(value);
677                    }
678                    None => {
679                        // Done pulling from the iterator and still
680                        // have capacity, so we're done.
681                        // TODO: wake_one_rx if we only queued one.
682                        self.sender.core.as_ref().wake_all_rx(state);
683                        return Poll::Ready(Ok(()));
684                    }
685                }
686            }
687            // We're out of capacity, and might still have items to
688            // send. To avoid a round-trip through the scheduler, peek
689            // ahead.
690            if pi.peek().is_none() {
691                self.sender.core.as_ref().wake_all_rx(state);
692                return Poll::Ready(Ok(()));
693            }
694
695            // Unconditionally returns Poll::Pending
696            let pending = state
697                .as_mut()
698                .base()
699                .pending_tx(self.as_mut().project().waker, cx);
700            self.sender.core.as_ref().wake_all_rx(state);
701            pending
702        }
703    }
704}
705
706// BatchSender
707
708/// The internal send handle used by [Sender::autobatch].
709/// Builds a buffer of size `batch_limit` and flushes when it's full.
710pub struct BatchSender<T> {
711    sender: Sender<T>,
712    batch_limit: usize,
713    buffer: Vec<T>,
714}
715
716impl<T> BatchSender<T> {
717    /// Adds a value to the internal buffer and flushes it into the
718    /// queue when the buffer fills.
719    pub async fn send(&mut self, value: T) -> Result<(), SendError<()>> {
720        self.buffer.push(value);
721        if self.buffer.len() == self.batch_limit {
722            self.drain().await?;
723        }
724        Ok(())
725    }
726
727    async fn drain(&mut self) -> Result<(), SendError<()>> {
728        self.sender.send_iter(self.buffer.drain(..)).await?;
729        assert!(self.buffer.is_empty());
730        Ok(())
731    }
732}
733
734// Receiver
735
736/// The receiving half of a channel. Reads are asynchronous.
737#[derive(Debug)]
738pub struct Receiver<T> {
739    core: Pin<splitrc::Rx<Core<T>>>,
740}
741
742derive_clone!(Receiver);
743
744#[must_use = "futures do nothing unless you `.await` or poll them"]
745#[pin_project(PinnedDrop)]
746struct Recv<'a, T> {
747    receiver: &'a Receiver<T>,
748    #[pin]
749    waker: WakerSlot,
750}
751
752#[pinned_drop]
753impl<T> PinnedDrop for Recv<'_, T> {
754    fn drop(mut self: Pin<&mut Self>) {
755        if self.waker.is_linked() {
756            let mut state = self.receiver.core.as_ref().project_ref().state.lock();
757            state
758                .as_mut()
759                .base()
760                .project()
761                .rx_wakers
762                .unlink(self.project().waker);
763        }
764    }
765}
766
767impl<T> Future for Recv<'_, T> {
768    type Output = Option<T>;
769
770    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
771        let mut state = self.receiver.core.as_ref().project_ref().state.lock();
772        match state.as_mut().project().queue.pop_front() {
773            Some(value) => {
774                self.receiver.core.as_ref().wake_all_tx(state);
775                Poll::Ready(Some(value))
776            }
777            None => {
778                if state.closed {
779                    Poll::Ready(None)
780                } else {
781                    state.as_mut().base().pending_rx(self.project().waker, cx)
782                }
783            }
784        }
785    }
786}
787
788#[must_use = "futures do nothing unless you .await or poll them"]
789#[pin_project(PinnedDrop)]
790struct RecvBatch<'a, T> {
791    receiver: &'a Receiver<T>,
792    element_limit: usize,
793    #[pin]
794    waker: WakerSlot,
795}
796
797#[pinned_drop]
798impl<T> PinnedDrop for RecvBatch<'_, T> {
799    fn drop(mut self: Pin<&mut Self>) {
800        if self.waker.is_linked() {
801            let mut state = self.receiver.core.as_ref().project_ref().state.lock();
802            state
803                .as_mut()
804                .base()
805                .project()
806                .rx_wakers
807                .unlink(self.project().waker);
808        }
809    }
810}
811
812impl<T> Future for RecvBatch<'_, T> {
813    type Output = Vec<T>;
814
815    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
816        let mut state = self.receiver.core.as_ref().project_ref().state.lock();
817        let q = &mut state.as_mut().project().queue;
818        let q_len = q.len();
819        if q_len == 0 {
820            if state.closed {
821                return Poll::Ready(Vec::new());
822            } else {
823                return state.as_mut().base().pending_rx(self.project().waker, cx);
824            }
825        }
826
827        let capacity = min(q_len, self.element_limit);
828        let v = Vec::from_iter(q.drain(..capacity));
829        self.receiver.core.as_ref().wake_all_tx(state);
830        Poll::Ready(v)
831    }
832}
833
834#[must_use = "futures do nothing unless you .await or poll them"]
835#[pin_project(PinnedDrop)]
836struct RecvVec<'a, T> {
837    receiver: &'a Receiver<T>,
838    element_limit: usize,
839    vec: &'a mut Vec<T>,
840    #[pin]
841    waker: WakerSlot,
842}
843
844#[pinned_drop]
845impl<T> PinnedDrop for RecvVec<'_, T> {
846    fn drop(mut self: Pin<&mut Self>) {
847        if self.waker.is_linked() {
848            let mut state = self.receiver.core.as_ref().project_ref().state.lock();
849            state
850                .as_mut()
851                .base()
852                .project()
853                .rx_wakers
854                .unlink(self.project().waker);
855        }
856    }
857}
858
859impl<T> Future for RecvVec<'_, T> {
860    type Output = ();
861
862    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
863        let mut state = self.receiver.core.as_ref().project_ref().state.lock();
864        let q = &mut state.as_mut().project().queue;
865        let q_len = q.len();
866        if q_len == 0 {
867            if state.closed {
868                assert!(self.vec.is_empty());
869                return Poll::Ready(());
870            } else {
871                return state.as_mut().base().pending_rx(self.project().waker, cx);
872            }
873        }
874
875        let capacity = min(q_len, self.element_limit);
876        self.as_mut().project().vec.extend(q.drain(..capacity));
877        self.project().receiver.core.as_ref().wake_all_tx(state);
878        Poll::Ready(())
879    }
880}
881
882impl<T> Receiver<T> {
883    /// Converts asynchronous `Receiver` to `SyncReceiver`.
884    pub fn into_sync(self) -> SyncReceiver<T> {
885        SyncReceiver { core: self.core }
886    }
887
888    /// Wait for a single value from the channel.
889    ///
890    /// Returns [None] if all [Sender]s are dropped.
891    pub fn recv(&self) -> impl Future<Output = Option<T>> + '_ {
892        Recv {
893            receiver: self,
894            waker: WakerSlot::new(),
895        }
896    }
897
898    // TODO: try_recv
899
900    /// Wait for up to `element_limit` values from the channel.
901    ///
902    /// Up to `element_limit` values are returned if they're already
903    /// available. Otherwise, waits for any values to be available.
904    ///
905    /// Returns an empty [Vec] if all [Sender]s are dropped.
906    pub fn recv_batch(&self, element_limit: usize) -> impl Future<Output = Vec<T>> + '_ {
907        RecvBatch {
908            receiver: self,
909            element_limit,
910            waker: WakerSlot::new(),
911        }
912    }
913
914    // TODO: try_recv_batch
915
916    /// Wait for up to `element_limit` values from the channel and
917    /// store them in `vec`.
918    ///
919    /// `vec` should be empty when passed in. Nevertheless, `recv_vec`
920    /// will clear it before adding values. The intent of `recv_vec`
921    /// is that batches can be repeatedly read by workers without new
922    /// allocations.
923    ///
924    /// It's not required, but `vec`'s capacity should be greater than
925    /// or equal to element_limit to avoid reallocation.
926    pub fn recv_vec<'a>(
927        &'a self,
928        element_limit: usize,
929        vec: &'a mut Vec<T>,
930    ) -> impl Future<Output = ()> + 'a {
931        vec.clear();
932        RecvVec {
933            receiver: self,
934            element_limit,
935            vec,
936            waker: WakerSlot::new(),
937        }
938    }
939
940    // TODO: try_recv_vec
941}
942
943// SyncReceiver
944
945/// The synchronous receiving half of a channel.
946#[derive(Debug)]
947pub struct SyncReceiver<T> {
948    core: Pin<splitrc::Rx<Core<T>>>,
949}
950
951derive_clone!(SyncReceiver);
952
953impl<T> SyncReceiver<T> {
954    /// Converts `SyncReceiver` to asynchronous `Receiver`.
955    pub fn into_async(self) -> Receiver<T> {
956        Receiver { core: self.core }
957    }
958
959    /// Block waiting for a single value from the channel.
960    ///
961    /// Returns [None] if all [Sender]s are dropped.
962    pub fn recv(&self) -> Option<T> {
963        let mut state = self.core.as_ref().block_until_not_empty();
964        match state.as_mut().project().queue.pop_front() {
965            Some(value) => {
966                self.core.as_ref().wake_all_tx(state);
967                Some(value)
968            }
969            None => {
970                assert!(state.closed);
971                None
972            }
973        }
974    }
975
976    /// Block waiting for values from the channel.
977    ///
978    /// Up to `element_limit` values are returned if they're already
979    /// available. Otherwise, waits for any values to be available.
980    ///
981    /// Returns an empty [Vec] if all [Sender]s are dropped.
982    pub fn recv_batch(&self, element_limit: usize) -> Vec<T> {
983        let mut state = self.core.as_ref().block_until_not_empty();
984
985        let q = &mut state.as_mut().project().queue;
986        let q_len = q.len();
987        if q_len == 0 {
988            assert!(state.closed);
989            return Vec::new();
990        }
991
992        let capacity = min(q_len, element_limit);
993        let v = Vec::from_iter(q.drain(..capacity));
994        self.core.as_ref().wake_all_tx(state);
995        v
996    }
997
998    /// Wait for up to `element_limit` values from the channel and
999    /// store them in `vec`.
1000    ///
1001    /// `vec` should be empty when passed in. Nevertheless, `recv_vec`
1002    /// will clear it before adding values. The intent of `recv_vec`
1003    /// is that batches can be repeatedly read by workers without new
1004    /// allocations.
1005    ///
1006    /// It's not required, but `vec`'s capacity should be greater than
1007    /// or equal to element_limit to avoid reallocation.
1008    pub fn recv_vec(&self, element_limit: usize, vec: &mut Vec<T>) {
1009        vec.clear();
1010
1011        let mut state = self.core.as_ref().block_until_not_empty();
1012        let q = &mut state.as_mut().project().queue;
1013        let q_len = q.len();
1014        if q_len == 0 {
1015            assert!(state.closed);
1016            // The result vector is already cleared.
1017            return;
1018        }
1019
1020        let capacity = min(q_len, element_limit);
1021        vec.extend(q.drain(..capacity));
1022        self.core.as_ref().wake_all_tx(state);
1023    }
1024}
1025
1026// Constructors
1027
1028/// Allocates a bounded channel and returns the sender, receiver
1029/// pair.
1030///
1031/// Rust async is polling, so unbuffered channels are not supported.
1032/// Therefore, a capacity of 0 is rounded up to 1.
1033pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
1034    let capacity = capacity.max(1);
1035    let core = Core {
1036        state: Mutex::new(State {
1037            base: StateBase {
1038                capacity,
1039                closed: false,
1040                tx_wakers: WakerList::new(),
1041                rx_wakers: WakerList::new(),
1042            },
1043            queue: VecDeque::new(),
1044        }),
1045        not_empty: OnceLock::new(),
1046        not_full: OnceLock::new(),
1047    };
1048    let (core_tx, core_rx) = splitrc::pin(core);
1049    (Sender { core: core_tx }, Receiver { core: core_rx })
1050}
1051
1052/// Allocates a bounded channel and returns the synchronous handles as
1053/// a sender, receiver pair.
1054///
1055/// Because handles can be converted freely between sync and async,
1056/// and Rust async is polling, unbuffered channels are not
1057/// supported. A capacity of 0 is rounded up to 1.
1058pub fn bounded_sync<T>(capacity: usize) -> (SyncSender<T>, SyncReceiver<T>) {
1059    let (tx, rx) = bounded(capacity);
1060    (tx.into_sync(), rx.into_sync())
1061}
1062
1063/// Allocates an unbounded channel and returns the sender,
1064/// receiver pair.
1065pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1066    let core = Core {
1067        state: Mutex::new(State {
1068            base: StateBase {
1069                capacity: UNBOUNDED_CAPACITY,
1070                closed: false,
1071                tx_wakers: WakerList::new(),
1072                rx_wakers: WakerList::new(),
1073            },
1074            queue: VecDeque::new(),
1075        }),
1076        not_empty: OnceLock::new(),
1077        not_full: OnceLock::new(),
1078    };
1079    let (core_tx, core_rx) = splitrc::pin(core);
1080    (Sender { core: core_tx }, Receiver { core: core_rx })
1081}
1082
1083/// Allocates an unbounded channel and returns the synchronous handles
1084/// as a sender, receiver pair.
1085pub fn unbounded_sync<T>() -> (SyncSender<T>, SyncReceiver<T>) {
1086    let (tx, rx) = unbounded();
1087    (tx.into_sync(), rx.into_sync())
1088}