batch_channel/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3#![doc = include_str!("../README.md")]
4#![doc = include_str!("example.md")]
5
6use mutex::PinnedCondvar as Condvar;
7use mutex::PinnedMutex as Mutex;
8use mutex::PinnedMutexGuard as MutexGuard;
9use pin_project::pin_project;
10use pin_project::pinned_drop;
11#[cfg(feature = "parking_lot")]
12use pinned_mutex::parking_lot as mutex;
13#[cfg(not(feature = "parking_lot"))]
14use pinned_mutex::std as mutex;
15use std::cmp::min;
16use std::collections::VecDeque;
17use std::fmt;
18use std::future::Future;
19use std::iter::Peekable;
20use std::ops::AsyncFnOnce;
21use std::pin::Pin;
22use std::sync::OnceLock;
23use std::task::Context;
24use std::task::Poll;
25use wakerset::ExtractedWakers;
26use wakerset::WakerList;
27use wakerset::WakerSlot;
28
29const UNBOUNDED_CAPACITY: usize = usize::MAX;
30
31macro_rules! derive_clone {
32    ($t:ident) => {
33        impl<T> Clone for $t<T> {
34            fn clone(&self) -> Self {
35                Self {
36                    core: self.core.clone(),
37                }
38            }
39        }
40    };
41}
42
43#[derive(Debug)]
44#[pin_project]
45struct StateBase {
46    capacity: usize,
47    closed: bool,
48    #[pin]
49    tx_wakers: WakerList,
50    #[pin]
51    rx_wakers: WakerList,
52}
53
54impl StateBase {
55    fn target_capacity(&self) -> usize {
56        // TODO: We could offer an option to use queue.capacity
57        // instead.
58        self.capacity
59    }
60
61    fn pending_tx<T>(
62        self: Pin<&mut StateBase>,
63        slot: Pin<&mut WakerSlot>,
64        cx: &mut Context,
65    ) -> Poll<T> {
66        // This may allocate, but only when the sender is about to
67        // block, which is already expensive.
68        self.project().tx_wakers.link(slot, cx.waker().clone());
69        Poll::Pending
70    }
71
72    fn pending_rx<T>(
73        self: Pin<&mut StateBase>,
74        slot: Pin<&mut WakerSlot>,
75        cx: &mut Context,
76    ) -> Poll<T> {
77        // This may allocate, but only when the receiver is about to
78        // block, which is already expensive.
79        self.project().rx_wakers.link(slot, cx.waker().clone());
80        Poll::Pending
81    }
82}
83
84#[derive(Debug)]
85#[pin_project]
86struct State<T> {
87    #[pin]
88    base: StateBase,
89    queue: VecDeque<T>,
90}
91
92impl<T> State<T> {
93    fn has_capacity(&self) -> bool {
94        self.queue.len() < self.target_capacity()
95    }
96
97    fn base(self: Pin<&mut Self>) -> Pin<&mut StateBase> {
98        self.project().base
99    }
100}
101
102impl<T> std::ops::Deref for State<T> {
103    type Target = StateBase;
104
105    fn deref(&self) -> &Self::Target {
106        &self.base
107    }
108}
109
110impl<T> std::ops::DerefMut for State<T> {
111    fn deref_mut(&mut self) -> &mut Self::Target {
112        &mut self.base
113    }
114}
115
116#[derive(Debug)]
117#[pin_project]
118struct Core<T> {
119    #[pin]
120    state: Mutex<State<T>>,
121    // OnceLock ensures Core is Sync and Arc<Core> is Send. But it is
122    // not strictly necessary, as these condition variables are only
123    // accessed while the lock is held. Alas, Rust does not allow
124    // Condvar to be stored under the Mutex.
125    not_empty: OnceLock<Condvar>,
126    not_full: OnceLock<Condvar>,
127}
128
129impl<T> Core<T> {
130    /// Returns when there is a value or there are no values and all
131    /// senders are dropped.
132    fn block_until_not_empty(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
133        fn condition<T>(s: Pin<&mut State<T>>) -> bool {
134            !s.closed && s.queue.is_empty()
135        }
136
137        let mut state = self.project_ref().state.lock();
138        if !condition(state.as_mut()) {
139            return state;
140        }
141        // Initialize the condvar while the lock is held. Thus, the
142        // caller can, while the lock is held, check whether the
143        // condvar must be notified.
144        let not_empty = self.not_empty.get_or_init(Default::default);
145        not_empty.wait_while(state, condition)
146    }
147
148    /// Returns when there is either room in the queue or all receivers
149    /// are dropped.
150    fn block_until_not_full(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
151        fn condition<T>(s: Pin<&mut State<T>>) -> bool {
152            !s.closed && !s.has_capacity()
153        }
154
155        let mut state = self.project_ref().state.lock();
156        if !condition(state.as_mut()) {
157            return state;
158        }
159        // Initialize the condvar while the lock is held. Thus, the
160        // caller can, while the lock is held, check whether the
161        // condvar must be notified.
162        let not_full = self.not_full.get_or_init(Default::default);
163        not_full.wait_while(state, condition)
164    }
165
166    /// Returns when there is either room in the queue or all receivers
167    /// are dropped.
168    fn wake_rx_and_block_while_full<'a>(
169        self: Pin<&'a Self>,
170        mut state: MutexGuard<'a, State<T>>,
171    ) -> MutexGuard<'a, State<T>> {
172        // The lock is held. Therefore, we know whether a Condvar must
173        // be notified or not.
174        let cvar = self.not_empty.get();
175
176        // We should not wake Wakers while a lock is held. Therefore,
177        // we must release the lock and reacquire it to wait.
178        let round = state
179            .as_mut()
180            .project()
181            .base
182            .project()
183            .rx_wakers
184            .begin_extraction();
185        let mut wakers = ExtractedWakers::new();
186        // There is no guarantee that the highest-priority waker will
187        // actually call poll() again. Therefore, the best we can do
188        // is wake everyone.
189        loop {
190            let more = state
191                .as_mut()
192                .project()
193                .base
194                .project()
195                .rx_wakers
196                .extract_some_wakers(round, &mut wakers);
197            drop(state);
198            wakers.wake_all();
199            if !more {
200                break;
201            }
202            state = self.project_ref().state.lock();
203        }
204
205        // TODO: Avoid unlocking and locking again when there's no
206        // waker or condition variable.
207
208        if let Some(cvar) = cvar {
209            // TODO: There are situations where we may know that we
210            // can get away with notify_one().
211            cvar.notify_all();
212        }
213
214        state = self.project_ref().state.lock();
215
216        // Initialize the condvar while the lock is held. Thus, the
217        // caller can, while the lock is held, check whether the
218        // condvar must be notified.
219        let not_full = self.not_full.get_or_init(Default::default);
220        not_full.wait_while(state, |s| !s.closed && !s.has_capacity())
221    }
222
223    fn wake_all_tx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
224        // The lock is held. Therefore, we know whether a Condvar must be notified or not.
225        let cvar = self.not_full.get();
226
227        if state.as_mut().project().base.project().tx_wakers.is_empty() {
228            drop(state);
229        } else {
230            let round = state
231                .as_mut()
232                .project()
233                .base
234                .project()
235                .tx_wakers
236                .begin_extraction();
237            let mut wakers = ExtractedWakers::new();
238            loop {
239                let more = state
240                    .as_mut()
241                    .project()
242                    .base
243                    .project()
244                    .tx_wakers
245                    .extract_some_wakers(round, &mut wakers);
246                drop(state);
247                wakers.wake_all();
248                if !more {
249                    break;
250                }
251                state = self.project_ref().state.lock();
252            }
253        }
254
255        if let Some(cvar) = cvar {
256            // TODO: There are situations where we may know that we
257            // can get away with notify_one().
258            cvar.notify_all();
259        }
260    }
261
262    fn wake_one_rx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
263        // The lock is held. Therefore, we know whether a Condvar must be notified or not.
264        let cvar = self.not_empty.get();
265        if state.as_mut().project().base.project().rx_wakers.is_empty() {
266            drop(state);
267        } else {
268            let round = state
269                .as_mut()
270                .project()
271                .base
272                .project()
273                .rx_wakers
274                .begin_extraction();
275            let mut wakers = ExtractedWakers::new();
276            // There is no guarantee that the highest-priority waker will
277            // actually call poll() again. Therefore, the best we can do
278            // is wake everyone.
279            loop {
280                let more = state
281                    .as_mut()
282                    .project()
283                    .base
284                    .project()
285                    .rx_wakers
286                    .extract_some_wakers(round, &mut wakers);
287                drop(state);
288                wakers.wake_all();
289                if !more {
290                    break;
291                }
292                state = self.project_ref().state.lock();
293            }
294        }
295
296        if let Some(cvar) = cvar {
297            cvar.notify_one();
298        }
299    }
300
301    fn wake_all_rx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
302        // The lock is held. Therefore, we know whether a Condvar must be notified or not.
303        let cvar = self.not_empty.get();
304        let round = state
305            .as_mut()
306            .project()
307            .base
308            .project()
309            .rx_wakers
310            .begin_extraction();
311        let mut wakers = ExtractedWakers::new();
312        // There is no guarantee that the highest-priority waker will
313        // actually call poll() again. Therefore, the best we can do
314        // is wake everyone.
315        loop {
316            let more = state
317                .as_mut()
318                .project()
319                .base
320                .project()
321                .rx_wakers
322                .extract_some_wakers(round, &mut wakers);
323            drop(state);
324            wakers.wake_all();
325            if !more {
326                break;
327            }
328            state = self.project_ref().state.lock();
329        }
330
331        if let Some(cvar) = cvar {
332            cvar.notify_all();
333        }
334    }
335}
336
337impl<T> splitrc::Notify for Core<T> {
338    fn last_tx_did_drop_pinned(self: Pin<&Self>) {
339        let mut state = self.project_ref().state.lock();
340        *state.as_mut().base().project().closed = true;
341        // We cannot deallocate the queue, as remaining receivers can
342        // drain it.
343        self.wake_all_rx(state);
344    }
345
346    fn last_rx_did_drop_pinned(self: Pin<&Self>) {
347        let mut state = self.project_ref().state.lock();
348        *state.as_mut().base().project().closed = true;
349        // TODO: deallocate
350        state.as_mut().project().queue.clear();
351        self.wake_all_tx(state);
352    }
353}
354
355// SendError
356
357/// An error returned from [Sender::send] when all [Receiver]s are
358/// dropped.
359///
360/// The unsent value is returned.
361#[derive(Clone, Copy, Debug, Eq, PartialEq)]
362pub struct SendError<T>(pub T);
363
364impl<T> fmt::Display for SendError<T> {
365    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
366        write!(f, "failed to send value on channel")
367    }
368}
369
370impl<T: fmt::Debug> std::error::Error for SendError<T> {}
371
372// SyncSender
373
374/// The sending half of a channel.
375#[derive(Debug)]
376pub struct SyncSender<T> {
377    core: Pin<splitrc::Tx<Core<T>>>,
378}
379
380derive_clone!(SyncSender);
381
382impl<T> SyncSender<T> {
383    /// Converts `SyncSender` to asynchronous `Sender`.
384    pub fn into_async(self) -> Sender<T> {
385        Sender { core: self.core }
386    }
387
388    /// Send a single value.
389    ///
390    /// Returns [SendError] if all receivers are dropped.
391    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
392        let mut state = self.core.as_ref().block_until_not_full();
393        if state.closed {
394            assert!(state.as_ref().project_ref().queue.is_empty());
395            return Err(SendError(value));
396        }
397
398        state.as_mut().project().queue.push_back(value);
399
400        self.core.as_ref().wake_one_rx(state);
401        Ok(())
402    }
403
404    /// Send multiple values.
405    ///
406    /// If all receivers are dropped, SendError is returned. The
407    /// values cannot be returned, as they may have been partially
408    /// sent when the channel is closed.
409    pub fn send_iter<I>(&self, values: I) -> Result<(), SendError<()>>
410    where
411        I: IntoIterator<Item = T>,
412    {
413        let mut values = values.into_iter();
414
415        // If the iterator is empty, we can avoid acquiring the lock.
416        let Some(mut value) = values.next() else {
417            return Ok(());
418        };
419
420        let mut sent_count = 0usize;
421
422        let mut state = self.core.as_ref().block_until_not_full();
423        'outer: loop {
424            if state.closed {
425                // We may have sent some values, but the receivers are
426                // all dropped, and that cleared the queue.
427                assert!(state.queue.is_empty());
428                return Err(SendError(()));
429            }
430
431            debug_assert!(state.has_capacity());
432            state.as_mut().project().queue.push_back(value);
433            sent_count += 1;
434            loop {
435                match values.next() {
436                    Some(v) => {
437                        if state.has_capacity() {
438                            state.as_mut().project().queue.push_back(v);
439                            sent_count += 1;
440                        } else {
441                            value = v;
442                            // We're about to block, but we know we
443                            // sent at least one value, so wake any
444                            // waiters.
445                            state = self.core.as_ref().wake_rx_and_block_while_full(state);
446                            continue 'outer;
447                        }
448                    }
449                    None => {
450                        // Done pulling from the iterator and know we
451                        // sent at least one value.
452                        if sent_count == 1 {
453                            self.core.as_ref().wake_one_rx(state);
454                        } else {
455                            self.core.as_ref().wake_all_rx(state);
456                        }
457                        return Ok(());
458                    }
459                }
460            }
461        }
462    }
463
464    /// Drain a [Vec] into the channel without deallocating it.
465    ///
466    /// This is a convenience method for allocation-free batched
467    /// sends. The `values` vector is drained, and then returned with
468    /// the same capacity it had.
469    pub fn send_vec(&self, mut values: Vec<T>) -> Result<Vec<T>, SendError<Vec<T>>> {
470        match self.send_iter(values.drain(..)) {
471            Ok(_) => Ok(values),
472            Err(_) => Err(SendError(values)),
473        }
474    }
475
476    /// Automatically accumulate sends into a buffer of size `batch_limit`
477    /// and send when full.
478    pub fn autobatch<'a, F, R>(&'a mut self, batch_limit: usize, f: F) -> Result<R, SendError<()>>
479    where
480        F: (FnOnce(&mut SyncBatchSender<'a, T>) -> Result<R, SendError<()>>),
481    {
482        let mut tx = SyncBatchSender {
483            sender: self,
484            capacity: batch_limit,
485            buffer: Vec::with_capacity(batch_limit),
486        };
487        let r = f(&mut tx)?;
488        tx.drain()?;
489        Ok(r)
490    }
491}
492
493// SyncBatchSender
494
495/// Automatically batches up values and sends them when a batch is full.
496#[derive(Debug)]
497pub struct SyncBatchSender<'a, T> {
498    sender: &'a mut SyncSender<T>,
499    capacity: usize,
500    buffer: Vec<T>,
501}
502
503impl<T> SyncBatchSender<'_, T> {
504    /// Buffers a single value to be sent on the channel.
505    ///
506    /// Sends the batch if the buffer is full.
507    pub fn send(&mut self, value: T) -> Result<(), SendError<()>> {
508        self.buffer.push(value);
509        // TODO: consider using the full capacity if Vec overallocated.
510        if self.buffer.len() == self.capacity {
511            self.drain()
512        } else {
513            Ok(())
514        }
515    }
516
517    /// Buffers multiple values, sending batches as the internal
518    /// buffer reaches capacity.
519    pub fn send_iter<I: IntoIterator<Item = T>>(&mut self, values: I) -> Result<(), SendError<()>> {
520        // TODO: We could return the remainder of I under cancellation.
521        for value in values.into_iter() {
522            self.send(value)?;
523        }
524        Ok(())
525    }
526
527    /// Sends any buffered values, clearing the current batch.
528    pub fn drain(&mut self) -> Result<(), SendError<()>> {
529        // TODO: send_iter
530        match self.sender.send_vec(std::mem::take(&mut self.buffer)) {
531            Ok(drained_vec) => {
532                self.buffer = drained_vec;
533                Ok(())
534            }
535            Err(_) => Err(SendError(())),
536        }
537    }
538}
539
540// Sender
541
542/// The asynchronous sending half of a channel.
543#[derive(Debug)]
544pub struct Sender<T> {
545    core: Pin<splitrc::Tx<Core<T>>>,
546}
547
548derive_clone!(Sender);
549
550impl<T> Sender<T> {
551    /// Converts asynchronous `Sender` to `SyncSender`.
552    pub fn into_sync(self) -> SyncSender<T> {
553        SyncSender { core: self.core }
554    }
555
556    /// Send a single value.
557    ///
558    /// Returns [SendError] if all receivers are dropped.
559    pub fn send(&self, value: T) -> impl Future<Output = Result<(), SendError<T>>> + '_ {
560        Send {
561            sender: self,
562            value: Some(value),
563            waker: WakerSlot::new(),
564        }
565    }
566
567    /// Send multiple values.
568    ///
569    /// If all receivers are dropped, SendError is returned and unsent
570    /// values are dropped.
571    pub fn send_iter<'a, I>(
572        &'a self,
573        values: I,
574    ) -> impl Future<Output = Result<(), SendError<()>>> + 'a
575    where
576        I: IntoIterator<Item = T> + 'a,
577    {
578        SendIter {
579            sender: self,
580            values: Some(values.into_iter().peekable()),
581            waker: WakerSlot::new(),
582        }
583    }
584
585    /// Automatically accumulate sends into a buffer of size `batch_limit`
586    /// and send when full.
587    pub async fn autobatch<R>(
588        self,
589        batch_limit: usize,
590        f: impl AsyncFnOnce(&mut BatchSender<T>) -> Result<R, SendError<()>>,
591    ) -> Result<R, SendError<()>> {
592        let mut tx = BatchSender {
593            sender: self,
594            batch_limit,
595            buffer: Vec::with_capacity(batch_limit),
596        };
597        let r = f(&mut tx).await?;
598        tx.drain().await?;
599        Ok(r)
600    }
601
602    /// Same as [Sender::autobatch] except that it immediately returns
603    /// `()` when `f` returns [SendError]. This is a convenience
604    /// wrapper for the common case that the future is passed to a
605    /// spawn function and the receiver being dropped (i.e.
606    /// [SendError]) is considered a clean cancellation.
607    pub async fn autobatch_or_cancel(
608        self,
609        capacity: usize,
610        f: impl AsyncFnOnce(&mut BatchSender<T>) -> Result<(), SendError<()>>,
611    ) {
612        self.autobatch(capacity, f).await.unwrap_or(())
613    }
614}
615
616#[must_use = "futures do nothing unless you `.await` or poll them"]
617#[pin_project(PinnedDrop)]
618struct Send<'a, T> {
619    sender: &'a Sender<T>,
620    value: Option<T>,
621    #[pin]
622    waker: WakerSlot,
623}
624
625#[pinned_drop]
626impl<T> PinnedDrop for Send<'_, T> {
627    fn drop(mut self: Pin<&mut Self>) {
628        if self.waker.is_linked() {
629            let mut state = self.sender.core.as_ref().project_ref().state.lock();
630            state
631                .as_mut()
632                .base()
633                .project()
634                .tx_wakers
635                .unlink(self.project().waker);
636        }
637    }
638}
639
640impl<T> Future for Send<'_, T> {
641    type Output = Result<(), SendError<T>>;
642
643    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
644        let mut state = self.sender.core.as_ref().project_ref().state.lock();
645        if state.closed {
646            return Poll::Ready(Err(SendError(self.project().value.take().unwrap())));
647        }
648        if state.has_capacity() {
649            state
650                .as_mut()
651                .project()
652                .queue
653                .push_back(self.as_mut().project().value.take().unwrap());
654            self.project().sender.core.as_ref().wake_one_rx(state);
655            Poll::Ready(Ok(()))
656        } else {
657            state.as_mut().base().pending_tx(self.project().waker, cx)
658        }
659    }
660}
661
662#[must_use = "futures do nothing unless you `.await` or poll them"]
663#[pin_project(PinnedDrop)]
664struct SendIter<'a, T, I: Iterator<Item = T>> {
665    sender: &'a Sender<T>,
666    values: Option<Peekable<I>>,
667    #[pin]
668    waker: WakerSlot,
669}
670
671#[pinned_drop]
672impl<T, I: Iterator<Item = T>> PinnedDrop for SendIter<'_, T, I> {
673    fn drop(mut self: Pin<&mut Self>) {
674        if self.waker.is_linked() {
675            let mut state = self.sender.core.as_ref().project_ref().state.lock();
676            state
677                .as_mut()
678                .base()
679                .project()
680                .tx_wakers
681                .unlink(self.project().waker);
682        }
683    }
684}
685
686impl<T, I: Iterator<Item = T>> Future for SendIter<'_, T, I> {
687    type Output = Result<(), SendError<()>>;
688
689    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
690        // Optimize the case that send_iter was called with an empty
691        // iterator, and don't even acquire the lock.
692        {
693            let pi = self.as_mut().project().values.as_mut().unwrap();
694            if pi.peek().is_none() {
695                return Poll::Ready(Ok(()));
696            }
697            // Satisfy borrow checker: we cannot hold a mut reference to
698            // self through pi before acquiring the lock below.
699        }
700
701        let mut state = self.sender.core.as_ref().project_ref().state.lock();
702
703        // There is an awkward set of constraints here.
704        // 1. To check whether an iterator contains an item, one must be popped.
705        // 2. If the receivers are cancelled, we'd like to return the iterator whole.
706        // 3. If we don't know whether there are any remaining items, we must block
707        //    if the queue is at capacity.
708        // We relax constraint #2 because #3 is preferable.
709        // TODO: We could return Peekable<I> instead.
710
711        let pi = self.as_mut().project().values.as_mut().unwrap();
712        // We already checked above.
713        debug_assert!(pi.peek().is_some());
714        if state.closed {
715            Poll::Ready(Err(SendError(())))
716        } else if !state.has_capacity() {
717            // We know we have a value to send, but there is no room.
718            state.as_mut().base().pending_tx(self.project().waker, cx)
719        } else {
720            debug_assert!(state.has_capacity());
721            state.as_mut().project().queue.push_back(pi.next().unwrap());
722            while state.has_capacity() {
723                match pi.next() {
724                    Some(value) => {
725                        state.as_mut().project().queue.push_back(value);
726                    }
727                    None => {
728                        // Done pulling from the iterator and still
729                        // have capacity, so we're done.
730                        // TODO: wake_one_rx if we only queued one.
731                        self.sender.core.as_ref().wake_all_rx(state);
732                        return Poll::Ready(Ok(()));
733                    }
734                }
735            }
736            // We're out of capacity, and might still have items to
737            // send. To avoid a round-trip through the scheduler, peek
738            // ahead.
739            if pi.peek().is_none() {
740                self.sender.core.as_ref().wake_all_rx(state);
741                return Poll::Ready(Ok(()));
742            }
743
744            // Unconditionally returns Poll::Pending
745            let pending = state
746                .as_mut()
747                .base()
748                .pending_tx(self.as_mut().project().waker, cx);
749            self.sender.core.as_ref().wake_all_rx(state);
750            pending
751        }
752    }
753}
754
755// BatchSender
756
757/// The internal send handle used by [Sender::autobatch].
758/// Builds a buffer of size `batch_limit` and flushes when it's full.
759pub struct BatchSender<T> {
760    sender: Sender<T>,
761    batch_limit: usize,
762    buffer: Vec<T>,
763}
764
765impl<T> BatchSender<T> {
766    /// Adds a value to the internal buffer and flushes it into the
767    /// queue when the buffer fills.
768    pub async fn send(&mut self, value: T) -> Result<(), SendError<()>> {
769        self.buffer.push(value);
770        if self.buffer.len() == self.batch_limit {
771            self.drain().await?;
772        }
773        Ok(())
774    }
775
776    async fn drain(&mut self) -> Result<(), SendError<()>> {
777        self.sender.send_iter(self.buffer.drain(..)).await?;
778        assert!(self.buffer.is_empty());
779        Ok(())
780    }
781}
782
783// Receiver
784
785/// The receiving half of a channel. Reads are asynchronous.
786#[derive(Debug)]
787pub struct Receiver<T> {
788    core: Pin<splitrc::Rx<Core<T>>>,
789}
790
791derive_clone!(Receiver);
792
793#[must_use = "futures do nothing unless you `.await` or poll them"]
794#[pin_project(PinnedDrop)]
795struct Recv<'a, T> {
796    receiver: &'a Receiver<T>,
797    #[pin]
798    waker: WakerSlot,
799}
800
801#[pinned_drop]
802impl<T> PinnedDrop for Recv<'_, T> {
803    fn drop(mut self: Pin<&mut Self>) {
804        if self.waker.is_linked() {
805            let mut state = self.receiver.core.as_ref().project_ref().state.lock();
806            state
807                .as_mut()
808                .base()
809                .project()
810                .rx_wakers
811                .unlink(self.project().waker);
812        }
813    }
814}
815
816impl<T> Future for Recv<'_, T> {
817    type Output = Option<T>;
818
819    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
820        let mut state = self.receiver.core.as_ref().project_ref().state.lock();
821        match state.as_mut().project().queue.pop_front() {
822            Some(value) => {
823                self.receiver.core.as_ref().wake_all_tx(state);
824                Poll::Ready(Some(value))
825            }
826            None => {
827                if state.closed {
828                    Poll::Ready(None)
829                } else {
830                    state.as_mut().base().pending_rx(self.project().waker, cx)
831                }
832            }
833        }
834    }
835}
836
837#[must_use = "futures do nothing unless you .await or poll them"]
838#[pin_project(PinnedDrop)]
839struct RecvBatch<'a, T> {
840    receiver: &'a Receiver<T>,
841    element_limit: usize,
842    #[pin]
843    waker: WakerSlot,
844}
845
846#[pinned_drop]
847impl<T> PinnedDrop for RecvBatch<'_, T> {
848    fn drop(mut self: Pin<&mut Self>) {
849        if self.waker.is_linked() {
850            let mut state = self.receiver.core.as_ref().project_ref().state.lock();
851            state
852                .as_mut()
853                .base()
854                .project()
855                .rx_wakers
856                .unlink(self.project().waker);
857        }
858    }
859}
860
861impl<T> Future for RecvBatch<'_, T> {
862    type Output = Vec<T>;
863
864    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
865        let mut state = self.receiver.core.as_ref().project_ref().state.lock();
866        let q = &mut state.as_mut().project().queue;
867        let q_len = q.len();
868        if q_len == 0 {
869            if state.closed {
870                return Poll::Ready(Vec::new());
871            } else {
872                return state.as_mut().base().pending_rx(self.project().waker, cx);
873            }
874        }
875
876        let capacity = min(q_len, self.element_limit);
877        let v = Vec::from_iter(q.drain(..capacity));
878        self.receiver.core.as_ref().wake_all_tx(state);
879        Poll::Ready(v)
880    }
881}
882
883#[must_use = "futures do nothing unless you .await or poll them"]
884#[pin_project(PinnedDrop)]
885struct RecvVec<'a, T> {
886    receiver: &'a Receiver<T>,
887    element_limit: usize,
888    vec: &'a mut Vec<T>,
889    #[pin]
890    waker: WakerSlot,
891}
892
893#[pinned_drop]
894impl<T> PinnedDrop for RecvVec<'_, T> {
895    fn drop(mut self: Pin<&mut Self>) {
896        if self.waker.is_linked() {
897            let mut state = self.receiver.core.as_ref().project_ref().state.lock();
898            state
899                .as_mut()
900                .base()
901                .project()
902                .rx_wakers
903                .unlink(self.project().waker);
904        }
905    }
906}
907
908impl<T> Future for RecvVec<'_, T> {
909    type Output = ();
910
911    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
912        let mut state = self.receiver.core.as_ref().project_ref().state.lock();
913        let q = &mut state.as_mut().project().queue;
914        let q_len = q.len();
915        if q_len == 0 {
916            if state.closed {
917                assert!(self.vec.is_empty());
918                return Poll::Ready(());
919            } else {
920                return state.as_mut().base().pending_rx(self.project().waker, cx);
921            }
922        }
923
924        let capacity = min(q_len, self.element_limit);
925        self.as_mut().project().vec.extend(q.drain(..capacity));
926        self.project().receiver.core.as_ref().wake_all_tx(state);
927        Poll::Ready(())
928    }
929}
930
931impl<T> Receiver<T> {
932    /// Converts asynchronous `Receiver` to `SyncReceiver`.
933    pub fn into_sync(self) -> SyncReceiver<T> {
934        SyncReceiver { core: self.core }
935    }
936
937    /// Wait for a single value from the channel.
938    ///
939    /// Returns [None] if all [Sender]s are dropped.
940    pub fn recv(&self) -> impl Future<Output = Option<T>> + '_ {
941        Recv {
942            receiver: self,
943            waker: WakerSlot::new(),
944        }
945    }
946
947    // TODO: try_recv
948
949    /// Wait for up to `element_limit` values from the channel.
950    ///
951    /// Up to `element_limit` values are returned if they're already
952    /// available. Otherwise, waits for any values to be available.
953    ///
954    /// Returns an empty [Vec] if all [Sender]s are dropped.
955    pub fn recv_batch(&self, element_limit: usize) -> impl Future<Output = Vec<T>> + '_ {
956        RecvBatch {
957            receiver: self,
958            element_limit,
959            waker: WakerSlot::new(),
960        }
961    }
962
963    // TODO: try_recv_batch
964
965    /// Wait for up to `element_limit` values from the channel and
966    /// store them in `vec`.
967    ///
968    /// `vec` should be empty when passed in. Nevertheless, `recv_vec`
969    /// will clear it before adding values. The intent of `recv_vec`
970    /// is that batches can be repeatedly read by workers without new
971    /// allocations.
972    ///
973    /// It's not required, but `vec`'s capacity should be greater than
974    /// or equal to element_limit to avoid reallocation.
975    pub fn recv_vec<'a>(
976        &'a self,
977        element_limit: usize,
978        vec: &'a mut Vec<T>,
979    ) -> impl Future<Output = ()> + 'a {
980        vec.clear();
981        RecvVec {
982            receiver: self,
983            element_limit,
984            vec,
985            waker: WakerSlot::new(),
986        }
987    }
988
989    // TODO: try_recv_vec
990
991    /// Return a [futures_core::Stream] implementation that repeatedly
992    /// calls [Receiver::recv]. It yields None when the channel is
993    /// closed.
994    ///
995    /// `Stream` must be pinned before use:
996    ///
997    /// ```rust,ignore
998    /// let mut rx = pin!(rx.stream());
999    /// let elt1 = rx.next().await;
1000    /// let elt2 = rx.next().await;
1001    /// ```
1002    #[cfg(feature = "futures-core")]
1003    #[cfg_attr(docsrs, doc(cfg(feature = "futures-core")))]
1004    pub fn stream<'a>(&'a self) -> Stream<'a, T> {
1005        let recv = Recv {
1006            receiver: self,
1007            waker: WakerSlot::new(),
1008        };
1009        Stream { recv }
1010    }
1011}
1012
1013/// A [futures_core::Stream] implementation that returns elements
1014/// from the stream, one by one, until it's closed.
1015///
1016/// Returned by [Receiver::stream].
1017#[cfg(feature = "futures-core")]
1018#[cfg_attr(docsrs, doc(cfg(feature = "futures-core")))]
1019#[must_use = "streams do nothing unless you `.await` or poll them"]
1020#[pin_project]
1021pub struct Stream<'a, T> {
1022    #[pin]
1023    recv: Recv<'a, T>,
1024}
1025
1026#[cfg(feature = "futures-core")]
1027impl<'a, T> futures_core::Stream for Stream<'a, T> {
1028    type Item = T;
1029
1030    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1031        self.project().recv.poll(cx)
1032    }
1033}
1034
1035// SyncReceiver
1036
1037/// The synchronous receiving half of a channel.
1038#[derive(Debug)]
1039pub struct SyncReceiver<T> {
1040    core: Pin<splitrc::Rx<Core<T>>>,
1041}
1042
1043derive_clone!(SyncReceiver);
1044
1045impl<T> SyncReceiver<T> {
1046    /// Converts `SyncReceiver` to asynchronous `Receiver`.
1047    pub fn into_async(self) -> Receiver<T> {
1048        Receiver { core: self.core }
1049    }
1050
1051    /// Block waiting for a single value from the channel.
1052    ///
1053    /// Returns [None] if all [Sender]s are dropped.
1054    pub fn recv(&self) -> Option<T> {
1055        let mut state = self.core.as_ref().block_until_not_empty();
1056        match state.as_mut().project().queue.pop_front() {
1057            Some(value) => {
1058                self.core.as_ref().wake_all_tx(state);
1059                Some(value)
1060            }
1061            None => {
1062                assert!(state.closed);
1063                None
1064            }
1065        }
1066    }
1067
1068    /// Block waiting for values from the channel.
1069    ///
1070    /// Up to `element_limit` values are returned if they're already
1071    /// available. Otherwise, waits for any values to be available.
1072    ///
1073    /// Returns an empty [Vec] if all [Sender]s are dropped.
1074    pub fn recv_batch(&self, element_limit: usize) -> Vec<T> {
1075        let mut state = self.core.as_ref().block_until_not_empty();
1076
1077        let q = &mut state.as_mut().project().queue;
1078        let q_len = q.len();
1079        if q_len == 0 {
1080            assert!(state.closed);
1081            return Vec::new();
1082        }
1083
1084        let capacity = min(q_len, element_limit);
1085        let v = Vec::from_iter(q.drain(..capacity));
1086        self.core.as_ref().wake_all_tx(state);
1087        v
1088    }
1089
1090    /// Wait for up to `element_limit` values from the channel and
1091    /// store them in `vec`.
1092    ///
1093    /// `vec` should be empty when passed in. Nevertheless, `recv_vec`
1094    /// will clear it before adding values. The intent of `recv_vec`
1095    /// is that batches can be repeatedly read by workers without new
1096    /// allocations.
1097    ///
1098    /// It's not required, but `vec`'s capacity should be greater than
1099    /// or equal to element_limit to avoid reallocation.
1100    pub fn recv_vec(&self, element_limit: usize, vec: &mut Vec<T>) {
1101        vec.clear();
1102
1103        let mut state = self.core.as_ref().block_until_not_empty();
1104        let q = &mut state.as_mut().project().queue;
1105        let q_len = q.len();
1106        if q_len == 0 {
1107            assert!(state.closed);
1108            // The result vector is already cleared.
1109            return;
1110        }
1111
1112        let capacity = min(q_len, element_limit);
1113        vec.extend(q.drain(..capacity));
1114        self.core.as_ref().wake_all_tx(state);
1115    }
1116}
1117
1118impl<T> Iterator for SyncReceiver<T> {
1119    type Item = T;
1120
1121    fn next(&mut self) -> Option<Self::Item> {
1122        self.recv()
1123    }
1124}
1125
1126// Constructors
1127
1128/// Allocates a bounded channel and returns the sender, receiver
1129/// pair.
1130///
1131/// Rust async is polling, so unbuffered channels are not supported.
1132/// Therefore, a capacity of 0 is rounded up to 1.
1133pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
1134    Builder::new().bounded(capacity).build_async()
1135}
1136
1137/// Allocates a bounded channel and returns the synchronous handles as
1138/// a sender, receiver pair.
1139///
1140/// Because handles can be converted freely between sync and async,
1141/// and Rust async is polling, unbuffered channels are not
1142/// supported. A capacity of 0 is rounded up to 1.
1143pub fn bounded_sync<T>(capacity: usize) -> (SyncSender<T>, SyncReceiver<T>) {
1144    let (tx, rx) = bounded(capacity);
1145    (tx.into_sync(), rx.into_sync())
1146}
1147
1148/// Allocates an unbounded channel and returns the sender,
1149/// receiver pair.
1150pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1151    Builder::new().build_async()
1152}
1153
1154/// Allocates an unbounded channel and returns the synchronous handles
1155/// as a sender, receiver pair.
1156pub fn unbounded_sync<T>() -> (SyncSender<T>, SyncReceiver<T>) {
1157    let (tx, rx) = unbounded();
1158    (tx.into_sync(), rx.into_sync())
1159}
1160
1161/// Customized channel construction.
1162#[derive(Debug, Default)]
1163pub struct Builder {
1164    capacity: Option<usize>,
1165    preallocate: bool,
1166}
1167
1168impl Builder {
1169    /// Defaults to unbounded.
1170    pub fn new() -> Self {
1171        Default::default()
1172    }
1173
1174    /// Define this channel as bounded with the given capacity.
1175    ///
1176    /// Because handles can be converted freely between sync and
1177    /// async, and Rust async is polling, unbuffered channels are not
1178    /// supported. A capacity of 0 is rounded up to 1.
1179    pub fn bounded(&mut self, capacity: usize) -> &mut Self {
1180        self.capacity = Some(capacity);
1181        self
1182    }
1183
1184    /// Preallocate bounded channels with their full capacity. This
1185    /// avoids reallocation at runtime.
1186    ///
1187    /// Has no effect on unbounded channels.
1188    pub fn preallocate(&mut self) -> &mut Self {
1189        self.preallocate = true;
1190        self
1191    }
1192
1193    /// Allocates a channel and returns async sender and receiver
1194    /// handles.
1195    pub fn build_async<T>(&mut self) -> (Sender<T>, Receiver<T>) {
1196        let capacity;
1197        let queue;
1198        match self.capacity {
1199            Some(c) => {
1200                capacity = c;
1201                queue = if self.preallocate {
1202                    VecDeque::with_capacity(capacity)
1203                } else {
1204                    VecDeque::new()
1205                };
1206            }
1207            None => {
1208                capacity = UNBOUNDED_CAPACITY;
1209                queue = VecDeque::new();
1210            }
1211        };
1212
1213        let core = Core {
1214            state: Mutex::new(State {
1215                base: StateBase {
1216                    capacity,
1217                    closed: false,
1218                    tx_wakers: WakerList::new(),
1219                    rx_wakers: WakerList::new(),
1220                },
1221                queue,
1222            }),
1223            not_empty: OnceLock::new(),
1224            not_full: OnceLock::new(),
1225        };
1226        let (core_tx, core_rx) = splitrc::pin(core);
1227        (Sender { core: core_tx }, Receiver { core: core_rx })
1228    }
1229
1230    /// Allocates a channel and returns the synchronous sender and
1231    /// receiver handles.
1232    pub fn build_sync<T>(&mut self) -> (SyncSender<T>, SyncReceiver<T>) {
1233        let (tx, rx) = self.build_async();
1234        (tx.into_sync(), rx.into_sync())
1235    }
1236}