batch_channel/
lib.rs

1#![doc = include_str!("../README.md")]
2#![doc = include_str!("example.md")]
3
4use mutex::PinnedCondvar as Condvar;
5use mutex::PinnedMutex as Mutex;
6use mutex::PinnedMutexGuard as MutexGuard;
7use pin_project::pin_project;
8use pin_project::pinned_drop;
9#[cfg(feature = "parking_lot")]
10use pinned_mutex::parking_lot as mutex;
11#[cfg(not(feature = "parking_lot"))]
12use pinned_mutex::std as mutex;
13use std::cmp::min;
14use std::collections::VecDeque;
15use std::fmt;
16use std::future::Future;
17use std::iter::Peekable;
18use std::ops::AsyncFnOnce;
19use std::pin::Pin;
20use std::sync::OnceLock;
21use std::task::Context;
22use std::task::Poll;
23use wakerset::ExtractedWakers;
24use wakerset::WakerList;
25use wakerset::WakerSlot;
26
27const UNBOUNDED_CAPACITY: usize = usize::MAX;
28
29macro_rules! derive_clone {
30    ($t:ident) => {
31        impl<T> Clone for $t<T> {
32            fn clone(&self) -> Self {
33                Self {
34                    core: self.core.clone(),
35                }
36            }
37        }
38    };
39}
40
41#[derive(Debug)]
42#[pin_project]
43struct StateBase {
44    capacity: usize,
45    closed: bool,
46    #[pin]
47    tx_wakers: WakerList,
48    #[pin]
49    rx_wakers: WakerList,
50}
51
52impl StateBase {
53    fn target_capacity(&self) -> usize {
54        // TODO: We could offer an option to use queue.capacity
55        // instead.
56        self.capacity
57    }
58
59    fn pending_tx<T>(
60        self: Pin<&mut StateBase>,
61        slot: Pin<&mut WakerSlot>,
62        cx: &mut Context,
63    ) -> Poll<T> {
64        // This may allocate, but only when the sender is about to
65        // block, which is already expensive.
66        self.project().tx_wakers.link(slot, cx.waker().clone());
67        Poll::Pending
68    }
69
70    fn pending_rx<T>(
71        self: Pin<&mut StateBase>,
72        slot: Pin<&mut WakerSlot>,
73        cx: &mut Context,
74    ) -> Poll<T> {
75        // This may allocate, but only when the receiver is about to
76        // block, which is already expensive.
77        self.project().rx_wakers.link(slot, cx.waker().clone());
78        Poll::Pending
79    }
80}
81
82#[derive(Debug)]
83#[pin_project]
84struct State<T> {
85    #[pin]
86    base: StateBase,
87    queue: VecDeque<T>,
88}
89
90impl<T> State<T> {
91    fn has_capacity(&self) -> bool {
92        self.queue.len() < self.target_capacity()
93    }
94
95    fn base(self: Pin<&mut Self>) -> Pin<&mut StateBase> {
96        self.project().base
97    }
98}
99
100impl<T> std::ops::Deref for State<T> {
101    type Target = StateBase;
102
103    fn deref(&self) -> &Self::Target {
104        &self.base
105    }
106}
107
108impl<T> std::ops::DerefMut for State<T> {
109    fn deref_mut(&mut self) -> &mut Self::Target {
110        &mut self.base
111    }
112}
113
114#[derive(Debug)]
115#[pin_project]
116struct Core<T> {
117    #[pin]
118    state: Mutex<State<T>>,
119    // OnceLock ensures Core is Sync and Arc<Core> is Send. But it is
120    // not strictly necessary, as these condition variables are only
121    // accessed while the lock is held. Alas, Rust does not allow
122    // Condvar to be stored under the Mutex.
123    not_empty: OnceLock<Condvar>,
124    not_full: OnceLock<Condvar>,
125}
126
127impl<T> Core<T> {
128    /// Returns when there is a value or there are no values and all
129    /// senders are dropped.
130    fn block_until_not_empty(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
131        fn condition<T>(s: Pin<&mut State<T>>) -> bool {
132            !s.closed && s.queue.is_empty()
133        }
134
135        let mut state = self.project_ref().state.lock();
136        if !condition(state.as_mut()) {
137            return state;
138        }
139        // Initialize the condvar while the lock is held. Thus, the
140        // caller can, while the lock is held, check whether the
141        // condvar must be notified.
142        let not_empty = self.not_empty.get_or_init(Default::default);
143        not_empty.wait_while(state, condition)
144    }
145
146    /// Returns when there is either room in the queue or all receivers
147    /// are dropped.
148    fn block_until_not_full(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
149        fn condition<T>(s: Pin<&mut State<T>>) -> bool {
150            !s.closed && !s.has_capacity()
151        }
152
153        let mut state = self.project_ref().state.lock();
154        if !condition(state.as_mut()) {
155            return state;
156        }
157        // Initialize the condvar while the lock is held. Thus, the
158        // caller can, while the lock is held, check whether the
159        // condvar must be notified.
160        let not_full = self.not_full.get_or_init(Default::default);
161        not_full.wait_while(state, condition)
162    }
163
164    /// Returns when there is either room in the queue or all receivers
165    /// are dropped.
166    fn wake_rx_and_block_while_full<'a>(
167        self: Pin<&'a Self>,
168        mut state: MutexGuard<'a, State<T>>,
169    ) -> MutexGuard<'a, State<T>> {
170        // The lock is held. Therefore, we know whether a Condvar must
171        // be notified or not.
172        let cvar = self.not_empty.get();
173
174        // We should not wake Wakers while a lock is held. Therefore,
175        // we must release the lock and reacquire it to wait.
176        let round = state
177            .as_mut()
178            .project()
179            .base
180            .project()
181            .rx_wakers
182            .begin_extraction();
183        let mut wakers = ExtractedWakers::new();
184        // There is no guarantee that the highest-priority waker will
185        // actually call poll() again. Therefore, the best we can do
186        // is wake everyone.
187        loop {
188            let more = state
189                .as_mut()
190                .project()
191                .base
192                .project()
193                .rx_wakers
194                .extract_some_wakers(round, &mut wakers);
195            drop(state);
196            wakers.wake_all();
197            if !more {
198                break;
199            }
200            state = self.project_ref().state.lock();
201        }
202
203        // TODO: Avoid unlocking and locking again when there's no
204        // waker or condition variable.
205
206        if let Some(cvar) = cvar {
207            // TODO: There are situations where we may know that we
208            // can get away with notify_one().
209            cvar.notify_all();
210        }
211
212        state = self.project_ref().state.lock();
213
214        // Initialize the condvar while the lock is held. Thus, the
215        // caller can, while the lock is held, check whether the
216        // condvar must be notified.
217        let not_full = self.not_full.get_or_init(Default::default);
218        not_full.wait_while(state, |s| !s.closed && !s.has_capacity())
219    }
220
221    fn wake_all_tx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
222        // The lock is held. Therefore, we know whether a Condvar must be notified or not.
223        let cvar = self.not_full.get();
224
225        let round = state
226            .as_mut()
227            .project()
228            .base
229            .project()
230            .tx_wakers
231            .begin_extraction();
232        let mut wakers = ExtractedWakers::new();
233        loop {
234            let more = state
235                .as_mut()
236                .project()
237                .base
238                .project()
239                .tx_wakers
240                .extract_some_wakers(round, &mut wakers);
241            drop(state);
242            wakers.wake_all();
243            if !more {
244                break;
245            }
246            state = self.project_ref().state.lock();
247        }
248
249        // TODO: Avoid unlocking and locking again when there's no
250        // waker or condition variable.
251
252        if let Some(cvar) = cvar {
253            // TODO: There are situations where we may know that we
254            // can get away with notify_one().
255            cvar.notify_all();
256        }
257    }
258
259    fn wake_one_rx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
260        // The lock is held. Therefore, we know whether a Condvar must be notified or not.
261        let cvar = self.not_empty.get();
262        let round = state
263            .as_mut()
264            .project()
265            .base
266            .project()
267            .rx_wakers
268            .begin_extraction();
269        let mut wakers = ExtractedWakers::new();
270        // There is no guarantee that the highest-priority waker will
271        // actually call poll() again. Therefore, the best we can do
272        // is wake everyone.
273        loop {
274            let more = state
275                .as_mut()
276                .project()
277                .base
278                .project()
279                .rx_wakers
280                .extract_some_wakers(round, &mut wakers);
281            drop(state);
282            wakers.wake_all();
283            if !more {
284                break;
285            }
286            state = self.project_ref().state.lock();
287        }
288
289        if let Some(cvar) = cvar {
290            cvar.notify_one();
291        }
292    }
293
294    fn wake_all_rx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
295        // The lock is held. Therefore, we know whether a Condvar must be notified or not.
296        let cvar = self.not_empty.get();
297        let round = state
298            .as_mut()
299            .project()
300            .base
301            .project()
302            .rx_wakers
303            .begin_extraction();
304        let mut wakers = ExtractedWakers::new();
305        // There is no guarantee that the highest-priority waker will
306        // actually call poll() again. Therefore, the best we can do
307        // is wake everyone.
308        loop {
309            let more = state
310                .as_mut()
311                .project()
312                .base
313                .project()
314                .rx_wakers
315                .extract_some_wakers(round, &mut wakers);
316            drop(state);
317            wakers.wake_all();
318            if !more {
319                break;
320            }
321            state = self.project_ref().state.lock();
322        }
323
324        if let Some(cvar) = cvar {
325            cvar.notify_all();
326        }
327    }
328}
329
330impl<T> splitrc::Notify for Core<T> {
331    fn last_tx_did_drop_pinned(self: Pin<&Self>) {
332        let mut state = self.project_ref().state.lock();
333        *state.as_mut().base().project().closed = true;
334        // We cannot deallocate the queue, as remaining receivers can
335        // drain it.
336        self.wake_all_rx(state);
337    }
338
339    fn last_rx_did_drop_pinned(self: Pin<&Self>) {
340        let mut state = self.project_ref().state.lock();
341        *state.as_mut().base().project().closed = true;
342        // TODO: deallocate
343        state.as_mut().project().queue.clear();
344        self.wake_all_tx(state);
345    }
346}
347
348// SendError
349
350/// An error returned from [Sender::send] when all [Receiver]s are
351/// dropped.
352///
353/// The unsent value is returned.
354#[derive(Clone, Copy, Debug, Eq, PartialEq)]
355pub struct SendError<T>(pub T);
356
357impl<T> fmt::Display for SendError<T> {
358    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
359        write!(f, "failed to send value on channel")
360    }
361}
362
363impl<T: fmt::Debug> std::error::Error for SendError<T> {}
364
365// SyncSender
366
367/// The sending half of a channel.
368#[derive(Debug)]
369pub struct SyncSender<T> {
370    core: Pin<splitrc::Tx<Core<T>>>,
371}
372
373derive_clone!(SyncSender);
374
375impl<T> SyncSender<T> {
376    /// Converts `SyncSender` to asynchronous `Sender`.
377    pub fn into_async(self) -> Sender<T> {
378        Sender { core: self.core }
379    }
380
381    /// Send a single value.
382    ///
383    /// Returns [SendError] if all receivers are dropped.
384    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
385        let mut state = self.core.as_ref().block_until_not_full();
386        if state.closed {
387            assert!(state.as_ref().project_ref().queue.is_empty());
388            return Err(SendError(value));
389        }
390
391        state.as_mut().project().queue.push_back(value);
392
393        self.core.as_ref().wake_one_rx(state);
394        Ok(())
395    }
396
397    /// Send multiple values.
398    ///
399    /// If all receivers are dropped, SendError is returned. The
400    /// values cannot be returned, as they may have been partially
401    /// sent when the channel is closed.
402    pub fn send_iter<I>(&self, values: I) -> Result<(), SendError<()>>
403    where
404        I: IntoIterator<Item = T>,
405    {
406        let mut values = values.into_iter();
407
408        // If the iterator is empty, we can avoid acquiring the lock.
409        let Some(mut value) = values.next() else {
410            return Ok(());
411        };
412
413        let mut sent_count = 0usize;
414
415        let mut state = self.core.as_ref().block_until_not_full();
416        'outer: loop {
417            if state.closed {
418                // We may have sent some values, but the receivers are
419                // all dropped, and that cleared the queue.
420                assert!(state.queue.is_empty());
421                return Err(SendError(()));
422            }
423
424            debug_assert!(state.has_capacity());
425            state.as_mut().project().queue.push_back(value);
426            sent_count += 1;
427            loop {
428                match values.next() {
429                    Some(v) => {
430                        if state.has_capacity() {
431                            state.as_mut().project().queue.push_back(v);
432                            sent_count += 1;
433                        } else {
434                            value = v;
435                            // We're about to block, but we know we
436                            // sent at least one value, so wake any
437                            // waiters.
438                            state = self.core.as_ref().wake_rx_and_block_while_full(state);
439                            continue 'outer;
440                        }
441                    }
442                    None => {
443                        // Done pulling from the iterator and know we
444                        // sent at least one value.
445                        if sent_count == 1 {
446                            self.core.as_ref().wake_one_rx(state);
447                        } else {
448                            self.core.as_ref().wake_all_rx(state);
449                        }
450                        return Ok(());
451                    }
452                }
453            }
454        }
455    }
456
457    /// Drain a [Vec] into the channel without deallocating it.
458    ///
459    /// This is a convenience method for allocation-free batched
460    /// sends. The `values` vector is drained, and then returned with
461    /// the same capacity it had.
462    pub fn send_vec(&self, mut values: Vec<T>) -> Result<Vec<T>, SendError<Vec<T>>> {
463        match self.send_iter(values.drain(..)) {
464            Ok(_) => Ok(values),
465            Err(_) => Err(SendError(values)),
466        }
467    }
468
469    /// Automatically accumulate sends into a buffer of size `batch_limit`
470    /// and send when full.
471    pub fn autobatch<'a, F, R>(&'a mut self, batch_limit: usize, f: F) -> Result<R, SendError<()>>
472    where
473        F: (FnOnce(&mut SyncBatchSender<'a, T>) -> Result<R, SendError<()>>),
474    {
475        let mut tx = SyncBatchSender {
476            sender: self,
477            capacity: batch_limit,
478            buffer: Vec::with_capacity(batch_limit),
479        };
480        let r = f(&mut tx)?;
481        tx.drain()?;
482        Ok(r)
483    }
484}
485
486// SyncBatchSender
487
488/// Automatically batches up values and sends them when a batch is full.
489#[derive(Debug)]
490pub struct SyncBatchSender<'a, T> {
491    sender: &'a mut SyncSender<T>,
492    capacity: usize,
493    buffer: Vec<T>,
494}
495
496impl<T> SyncBatchSender<'_, T> {
497    /// Buffers a single value to be sent on the channel.
498    ///
499    /// Sends the batch if the buffer is full.
500    pub fn send(&mut self, value: T) -> Result<(), SendError<()>> {
501        self.buffer.push(value);
502        // TODO: consider using the full capacity if Vec overallocated.
503        if self.buffer.len() == self.capacity {
504            self.drain()
505        } else {
506            Ok(())
507        }
508    }
509
510    /// Buffers multiple values, sending batches as the internal
511    /// buffer reaches capacity.
512    pub fn send_iter<I: IntoIterator<Item = T>>(&mut self, values: I) -> Result<(), SendError<()>> {
513        // TODO: We could return the remainder of I under cancellation.
514        for value in values.into_iter() {
515            self.send(value)?;
516        }
517        Ok(())
518    }
519
520    /// Sends any buffered values, clearing the current batch.
521    pub fn drain(&mut self) -> Result<(), SendError<()>> {
522        // TODO: send_iter
523        match self.sender.send_vec(std::mem::take(&mut self.buffer)) {
524            Ok(drained_vec) => {
525                self.buffer = drained_vec;
526                Ok(())
527            }
528            Err(_) => Err(SendError(())),
529        }
530    }
531}
532
533// Sender
534
535/// The asynchronous sending half of a channel.
536#[derive(Debug)]
537pub struct Sender<T> {
538    core: Pin<splitrc::Tx<Core<T>>>,
539}
540
541derive_clone!(Sender);
542
543impl<T> Sender<T> {
544    /// Converts asynchronous `Sender` to `SyncSender`.
545    pub fn into_sync(self) -> SyncSender<T> {
546        SyncSender { core: self.core }
547    }
548
549    /// Send a single value.
550    ///
551    /// Returns [SendError] if all receivers are dropped.
552    pub fn send(&self, value: T) -> impl Future<Output = Result<(), SendError<T>>> + '_ {
553        Send {
554            sender: self,
555            value: Some(value),
556            waker: WakerSlot::new(),
557        }
558    }
559
560    /// Send multiple values.
561    ///
562    /// If all receivers are dropped, SendError is returned and unsent
563    /// values are dropped.
564    pub fn send_iter<'a, I>(
565        &'a self,
566        values: I,
567    ) -> impl Future<Output = Result<(), SendError<()>>> + 'a
568    where
569        I: IntoIterator<Item = T> + 'a,
570    {
571        SendIter {
572            sender: self,
573            values: Some(values.into_iter().peekable()),
574            waker: WakerSlot::new(),
575        }
576    }
577
578    /// Automatically accumulate sends into a buffer of size `batch_limit`
579    /// and send when full.
580    pub async fn autobatch<R>(
581        self,
582        batch_limit: usize,
583        f: impl AsyncFnOnce(&mut BatchSender<T>) -> Result<R, SendError<()>>,
584    ) -> Result<R, SendError<()>> {
585        let mut tx = BatchSender {
586            sender: self,
587            batch_limit,
588            buffer: Vec::with_capacity(batch_limit),
589        };
590        let r = f(&mut tx).await?;
591        tx.drain().await?;
592        Ok(r)
593    }
594
595    /// Same as [Sender::autobatch] except that it immediately returns
596    /// `()` when `f` returns [SendError]. This is a convenience
597    /// wrapper for the common case that the future is passed to a
598    /// spawn function and the receiver being dropped (i.e.
599    /// [SendError]) is considered a clean cancellation.
600    pub async fn autobatch_or_cancel(
601        self,
602        capacity: usize,
603        f: impl AsyncFnOnce(&mut BatchSender<T>) -> Result<(), SendError<()>>,
604    ) {
605        self.autobatch(capacity, f).await.unwrap_or(())
606    }
607}
608
609#[must_use = "futures do nothing unless you `.await` or poll them"]
610#[pin_project(PinnedDrop)]
611struct Send<'a, T> {
612    sender: &'a Sender<T>,
613    value: Option<T>,
614    #[pin]
615    waker: WakerSlot,
616}
617
618#[pinned_drop]
619impl<T> PinnedDrop for Send<'_, T> {
620    fn drop(mut self: Pin<&mut Self>) {
621        if self.waker.is_linked() {
622            let mut state = self.sender.core.as_ref().project_ref().state.lock();
623            state
624                .as_mut()
625                .base()
626                .project()
627                .tx_wakers
628                .unlink(self.project().waker);
629        }
630    }
631}
632
633impl<T> Future for Send<'_, T> {
634    type Output = Result<(), SendError<T>>;
635
636    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
637        let mut state = self.sender.core.as_ref().project_ref().state.lock();
638        if state.closed {
639            return Poll::Ready(Err(SendError(self.project().value.take().unwrap())));
640        }
641        if state.has_capacity() {
642            state
643                .as_mut()
644                .project()
645                .queue
646                .push_back(self.as_mut().project().value.take().unwrap());
647            self.project().sender.core.as_ref().wake_one_rx(state);
648            Poll::Ready(Ok(()))
649        } else {
650            state.as_mut().base().pending_tx(self.project().waker, cx)
651        }
652    }
653}
654
655#[must_use = "futures do nothing unless you `.await` or poll them"]
656#[pin_project(PinnedDrop)]
657struct SendIter<'a, T, I: Iterator<Item = T>> {
658    sender: &'a Sender<T>,
659    values: Option<Peekable<I>>,
660    #[pin]
661    waker: WakerSlot,
662}
663
664#[pinned_drop]
665impl<T, I: Iterator<Item = T>> PinnedDrop for SendIter<'_, T, I> {
666    fn drop(mut self: Pin<&mut Self>) {
667        if self.waker.is_linked() {
668            let mut state = self.sender.core.as_ref().project_ref().state.lock();
669            state
670                .as_mut()
671                .base()
672                .project()
673                .tx_wakers
674                .unlink(self.project().waker);
675        }
676    }
677}
678
679impl<T, I: Iterator<Item = T>> Future for SendIter<'_, T, I> {
680    type Output = Result<(), SendError<()>>;
681
682    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
683        // Optimize the case that send_iter was called with an empty
684        // iterator, and don't even acquire the lock.
685        {
686            let pi = self.as_mut().project().values.as_mut().unwrap();
687            if pi.peek().is_none() {
688                return Poll::Ready(Ok(()));
689            }
690            // Satisfy borrow checker: we cannot hold a mut reference to
691            // self through pi before acquiring the lock below.
692        }
693
694        let mut state = self.sender.core.as_ref().project_ref().state.lock();
695
696        // There is an awkward set of constraints here.
697        // 1. To check whether an iterator contains an item, one must be popped.
698        // 2. If the receivers are cancelled, we'd like to return the iterator whole.
699        // 3. If we don't know whether there are any remaining items, we must block
700        //    if the queue is at capacity.
701        // We relax constraint #2 because #3 is preferable.
702        // TODO: We could return Peekable<I> instead.
703
704        let pi = self.as_mut().project().values.as_mut().unwrap();
705        // We already checked above.
706        debug_assert!(pi.peek().is_some());
707        if state.closed {
708            Poll::Ready(Err(SendError(())))
709        } else if !state.has_capacity() {
710            // We know we have a value to send, but there is no room.
711            state.as_mut().base().pending_tx(self.project().waker, cx)
712        } else {
713            debug_assert!(state.has_capacity());
714            state.as_mut().project().queue.push_back(pi.next().unwrap());
715            while state.has_capacity() {
716                match pi.next() {
717                    Some(value) => {
718                        state.as_mut().project().queue.push_back(value);
719                    }
720                    None => {
721                        // Done pulling from the iterator and still
722                        // have capacity, so we're done.
723                        // TODO: wake_one_rx if we only queued one.
724                        self.sender.core.as_ref().wake_all_rx(state);
725                        return Poll::Ready(Ok(()));
726                    }
727                }
728            }
729            // We're out of capacity, and might still have items to
730            // send. To avoid a round-trip through the scheduler, peek
731            // ahead.
732            if pi.peek().is_none() {
733                self.sender.core.as_ref().wake_all_rx(state);
734                return Poll::Ready(Ok(()));
735            }
736
737            // Unconditionally returns Poll::Pending
738            let pending = state
739                .as_mut()
740                .base()
741                .pending_tx(self.as_mut().project().waker, cx);
742            self.sender.core.as_ref().wake_all_rx(state);
743            pending
744        }
745    }
746}
747
748// BatchSender
749
750/// The internal send handle used by [Sender::autobatch].
751/// Builds a buffer of size `batch_limit` and flushes when it's full.
752pub struct BatchSender<T> {
753    sender: Sender<T>,
754    batch_limit: usize,
755    buffer: Vec<T>,
756}
757
758impl<T> BatchSender<T> {
759    /// Adds a value to the internal buffer and flushes it into the
760    /// queue when the buffer fills.
761    pub async fn send(&mut self, value: T) -> Result<(), SendError<()>> {
762        self.buffer.push(value);
763        if self.buffer.len() == self.batch_limit {
764            self.drain().await?;
765        }
766        Ok(())
767    }
768
769    async fn drain(&mut self) -> Result<(), SendError<()>> {
770        self.sender.send_iter(self.buffer.drain(..)).await?;
771        assert!(self.buffer.is_empty());
772        Ok(())
773    }
774}
775
776// Receiver
777
778/// The receiving half of a channel. Reads are asynchronous.
779#[derive(Debug)]
780pub struct Receiver<T> {
781    core: Pin<splitrc::Rx<Core<T>>>,
782}
783
784derive_clone!(Receiver);
785
786#[must_use = "futures do nothing unless you `.await` or poll them"]
787#[pin_project(PinnedDrop)]
788struct Recv<'a, T> {
789    receiver: &'a Receiver<T>,
790    #[pin]
791    waker: WakerSlot,
792}
793
794#[pinned_drop]
795impl<T> PinnedDrop for Recv<'_, T> {
796    fn drop(mut self: Pin<&mut Self>) {
797        if self.waker.is_linked() {
798            let mut state = self.receiver.core.as_ref().project_ref().state.lock();
799            state
800                .as_mut()
801                .base()
802                .project()
803                .rx_wakers
804                .unlink(self.project().waker);
805        }
806    }
807}
808
809impl<T> Future for Recv<'_, T> {
810    type Output = Option<T>;
811
812    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
813        let mut state = self.receiver.core.as_ref().project_ref().state.lock();
814        match state.as_mut().project().queue.pop_front() {
815            Some(value) => {
816                self.receiver.core.as_ref().wake_all_tx(state);
817                Poll::Ready(Some(value))
818            }
819            None => {
820                if state.closed {
821                    Poll::Ready(None)
822                } else {
823                    state.as_mut().base().pending_rx(self.project().waker, cx)
824                }
825            }
826        }
827    }
828}
829
830#[must_use = "futures do nothing unless you .await or poll them"]
831#[pin_project(PinnedDrop)]
832struct RecvBatch<'a, T> {
833    receiver: &'a Receiver<T>,
834    element_limit: usize,
835    #[pin]
836    waker: WakerSlot,
837}
838
839#[pinned_drop]
840impl<T> PinnedDrop for RecvBatch<'_, T> {
841    fn drop(mut self: Pin<&mut Self>) {
842        if self.waker.is_linked() {
843            let mut state = self.receiver.core.as_ref().project_ref().state.lock();
844            state
845                .as_mut()
846                .base()
847                .project()
848                .rx_wakers
849                .unlink(self.project().waker);
850        }
851    }
852}
853
854impl<T> Future for RecvBatch<'_, T> {
855    type Output = Vec<T>;
856
857    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
858        let mut state = self.receiver.core.as_ref().project_ref().state.lock();
859        let q = &mut state.as_mut().project().queue;
860        let q_len = q.len();
861        if q_len == 0 {
862            if state.closed {
863                return Poll::Ready(Vec::new());
864            } else {
865                return state.as_mut().base().pending_rx(self.project().waker, cx);
866            }
867        }
868
869        let capacity = min(q_len, self.element_limit);
870        let v = Vec::from_iter(q.drain(..capacity));
871        self.receiver.core.as_ref().wake_all_tx(state);
872        Poll::Ready(v)
873    }
874}
875
876#[must_use = "futures do nothing unless you .await or poll them"]
877#[pin_project(PinnedDrop)]
878struct RecvVec<'a, T> {
879    receiver: &'a Receiver<T>,
880    element_limit: usize,
881    vec: &'a mut Vec<T>,
882    #[pin]
883    waker: WakerSlot,
884}
885
886#[pinned_drop]
887impl<T> PinnedDrop for RecvVec<'_, T> {
888    fn drop(mut self: Pin<&mut Self>) {
889        if self.waker.is_linked() {
890            let mut state = self.receiver.core.as_ref().project_ref().state.lock();
891            state
892                .as_mut()
893                .base()
894                .project()
895                .rx_wakers
896                .unlink(self.project().waker);
897        }
898    }
899}
900
901impl<T> Future for RecvVec<'_, T> {
902    type Output = ();
903
904    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
905        let mut state = self.receiver.core.as_ref().project_ref().state.lock();
906        let q = &mut state.as_mut().project().queue;
907        let q_len = q.len();
908        if q_len == 0 {
909            if state.closed {
910                assert!(self.vec.is_empty());
911                return Poll::Ready(());
912            } else {
913                return state.as_mut().base().pending_rx(self.project().waker, cx);
914            }
915        }
916
917        let capacity = min(q_len, self.element_limit);
918        self.as_mut().project().vec.extend(q.drain(..capacity));
919        self.project().receiver.core.as_ref().wake_all_tx(state);
920        Poll::Ready(())
921    }
922}
923
924impl<T> Receiver<T> {
925    /// Converts asynchronous `Receiver` to `SyncReceiver`.
926    pub fn into_sync(self) -> SyncReceiver<T> {
927        SyncReceiver { core: self.core }
928    }
929
930    /// Wait for a single value from the channel.
931    ///
932    /// Returns [None] if all [Sender]s are dropped.
933    pub fn recv(&self) -> impl Future<Output = Option<T>> + '_ {
934        Recv {
935            receiver: self,
936            waker: WakerSlot::new(),
937        }
938    }
939
940    // TODO: try_recv
941
942    /// Wait for up to `element_limit` values from the channel.
943    ///
944    /// Up to `element_limit` values are returned if they're already
945    /// available. Otherwise, waits for any values to be available.
946    ///
947    /// Returns an empty [Vec] if all [Sender]s are dropped.
948    pub fn recv_batch(&self, element_limit: usize) -> impl Future<Output = Vec<T>> + '_ {
949        RecvBatch {
950            receiver: self,
951            element_limit,
952            waker: WakerSlot::new(),
953        }
954    }
955
956    // TODO: try_recv_batch
957
958    /// Wait for up to `element_limit` values from the channel and
959    /// store them in `vec`.
960    ///
961    /// `vec` should be empty when passed in. Nevertheless, `recv_vec`
962    /// will clear it before adding values. The intent of `recv_vec`
963    /// is that batches can be repeatedly read by workers without new
964    /// allocations.
965    ///
966    /// It's not required, but `vec`'s capacity should be greater than
967    /// or equal to element_limit to avoid reallocation.
968    pub fn recv_vec<'a>(
969        &'a self,
970        element_limit: usize,
971        vec: &'a mut Vec<T>,
972    ) -> impl Future<Output = ()> + 'a {
973        vec.clear();
974        RecvVec {
975            receiver: self,
976            element_limit,
977            vec,
978            waker: WakerSlot::new(),
979        }
980    }
981
982    // TODO: try_recv_vec
983}
984
985// SyncReceiver
986
987/// The synchronous receiving half of a channel.
988#[derive(Debug)]
989pub struct SyncReceiver<T> {
990    core: Pin<splitrc::Rx<Core<T>>>,
991}
992
993derive_clone!(SyncReceiver);
994
995impl<T> SyncReceiver<T> {
996    /// Converts `SyncReceiver` to asynchronous `Receiver`.
997    pub fn into_async(self) -> Receiver<T> {
998        Receiver { core: self.core }
999    }
1000
1001    /// Block waiting for a single value from the channel.
1002    ///
1003    /// Returns [None] if all [Sender]s are dropped.
1004    pub fn recv(&self) -> Option<T> {
1005        let mut state = self.core.as_ref().block_until_not_empty();
1006        match state.as_mut().project().queue.pop_front() {
1007            Some(value) => {
1008                self.core.as_ref().wake_all_tx(state);
1009                Some(value)
1010            }
1011            None => {
1012                assert!(state.closed);
1013                None
1014            }
1015        }
1016    }
1017
1018    /// Block waiting for values from the channel.
1019    ///
1020    /// Up to `element_limit` values are returned if they're already
1021    /// available. Otherwise, waits for any values to be available.
1022    ///
1023    /// Returns an empty [Vec] if all [Sender]s are dropped.
1024    pub fn recv_batch(&self, element_limit: usize) -> Vec<T> {
1025        let mut state = self.core.as_ref().block_until_not_empty();
1026
1027        let q = &mut state.as_mut().project().queue;
1028        let q_len = q.len();
1029        if q_len == 0 {
1030            assert!(state.closed);
1031            return Vec::new();
1032        }
1033
1034        let capacity = min(q_len, element_limit);
1035        let v = Vec::from_iter(q.drain(..capacity));
1036        self.core.as_ref().wake_all_tx(state);
1037        v
1038    }
1039
1040    /// Wait for up to `element_limit` values from the channel and
1041    /// store them in `vec`.
1042    ///
1043    /// `vec` should be empty when passed in. Nevertheless, `recv_vec`
1044    /// will clear it before adding values. The intent of `recv_vec`
1045    /// is that batches can be repeatedly read by workers without new
1046    /// allocations.
1047    ///
1048    /// It's not required, but `vec`'s capacity should be greater than
1049    /// or equal to element_limit to avoid reallocation.
1050    pub fn recv_vec(&self, element_limit: usize, vec: &mut Vec<T>) {
1051        vec.clear();
1052
1053        let mut state = self.core.as_ref().block_until_not_empty();
1054        let q = &mut state.as_mut().project().queue;
1055        let q_len = q.len();
1056        if q_len == 0 {
1057            assert!(state.closed);
1058            // The result vector is already cleared.
1059            return;
1060        }
1061
1062        let capacity = min(q_len, element_limit);
1063        vec.extend(q.drain(..capacity));
1064        self.core.as_ref().wake_all_tx(state);
1065    }
1066}
1067
1068// Constructors
1069
1070/// Allocates a bounded channel and returns the sender, receiver
1071/// pair.
1072///
1073/// Rust async is polling, so unbuffered channels are not supported.
1074/// Therefore, a capacity of 0 is rounded up to 1.
1075pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
1076    Builder::new().bounded(capacity).build_async()
1077}
1078
1079/// Allocates a bounded channel and returns the synchronous handles as
1080/// a sender, receiver pair.
1081///
1082/// Because handles can be converted freely between sync and async,
1083/// and Rust async is polling, unbuffered channels are not
1084/// supported. A capacity of 0 is rounded up to 1.
1085pub fn bounded_sync<T>(capacity: usize) -> (SyncSender<T>, SyncReceiver<T>) {
1086    let (tx, rx) = bounded(capacity);
1087    (tx.into_sync(), rx.into_sync())
1088}
1089
1090/// Allocates an unbounded channel and returns the sender,
1091/// receiver pair.
1092pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1093    Builder::new().build_async()
1094}
1095
1096/// Allocates an unbounded channel and returns the synchronous handles
1097/// as a sender, receiver pair.
1098pub fn unbounded_sync<T>() -> (SyncSender<T>, SyncReceiver<T>) {
1099    let (tx, rx) = unbounded();
1100    (tx.into_sync(), rx.into_sync())
1101}
1102
1103/// Customized channel construction.
1104#[derive(Debug, Default)]
1105pub struct Builder {
1106    capacity: Option<usize>,
1107    preallocate: bool,
1108}
1109
1110impl Builder {
1111    /// Defaults to unbounded.
1112    pub fn new() -> Self {
1113        Default::default()
1114    }
1115
1116    /// Define this channel as bounded with the given capacity.
1117    ///
1118    /// Because handles can be converted freely between sync and
1119    /// async, and Rust async is polling, unbuffered channels are not
1120    /// supported. A capacity of 0 is rounded up to 1.
1121    pub fn bounded(&mut self, capacity: usize) -> &mut Self {
1122        self.capacity = Some(capacity);
1123        self
1124    }
1125
1126    /// Preallocate bounded channels with their full capacity. This
1127    /// avoids reallocation at runtime.
1128    ///
1129    /// Has no effect on unbounded channels.
1130    pub fn preallocate(&mut self) -> &mut Self {
1131        self.preallocate = true;
1132        self
1133    }
1134
1135    /// Allocates a channel and returns async sender and receiver
1136    /// handles.
1137    pub fn build_async<T>(&mut self) -> (Sender<T>, Receiver<T>) {
1138        let capacity;
1139        let queue;
1140        match self.capacity {
1141            Some(c) => {
1142                capacity = c;
1143                queue = if self.preallocate {
1144                    VecDeque::with_capacity(capacity)
1145                } else {
1146                    VecDeque::new()
1147                };
1148            }
1149            None => {
1150                capacity = UNBOUNDED_CAPACITY;
1151                queue = VecDeque::new();
1152            }
1153        };
1154
1155        let core = Core {
1156            state: Mutex::new(State {
1157                base: StateBase {
1158                    capacity,
1159                    closed: false,
1160                    tx_wakers: WakerList::new(),
1161                    rx_wakers: WakerList::new(),
1162                },
1163                queue,
1164            }),
1165            not_empty: OnceLock::new(),
1166            not_full: OnceLock::new(),
1167        };
1168        let (core_tx, core_rx) = splitrc::pin(core);
1169        (Sender { core: core_tx }, Receiver { core: core_rx })
1170    }
1171
1172    /// Allocates a channel and returns the synchronous sender and
1173    /// receiver handles.
1174    pub fn build_sync<T>(&mut self) -> (SyncSender<T>, SyncReceiver<T>) {
1175        let (tx, rx) = self.build_async();
1176        (tx.into_sync(), rx.into_sync())
1177    }
1178}