1#![doc = include_str!("../README.md")]
2#![doc = include_str!("example.md")]
3
4use mutex::PinnedCondvar as Condvar;
5use mutex::PinnedMutex as Mutex;
6use mutex::PinnedMutexGuard as MutexGuard;
7use pin_project::pin_project;
8use pin_project::pinned_drop;
9#[cfg(feature = "parking_lot")]
10use pinned_mutex::parking_lot as mutex;
11#[cfg(not(feature = "parking_lot"))]
12use pinned_mutex::std as mutex;
13use std::cmp::min;
14use std::collections::VecDeque;
15use std::fmt;
16use std::future::Future;
17use std::iter::Peekable;
18use std::ops::AsyncFnOnce;
19use std::pin::Pin;
20use std::sync::OnceLock;
21use std::task::Context;
22use std::task::Poll;
23use wakerset::ExtractedWakers;
24use wakerset::WakerList;
25use wakerset::WakerSlot;
26
27const UNBOUNDED_CAPACITY: usize = usize::MAX;
28
29macro_rules! derive_clone {
30 ($t:ident) => {
31 impl<T> Clone for $t<T> {
32 fn clone(&self) -> Self {
33 Self {
34 core: self.core.clone(),
35 }
36 }
37 }
38 };
39}
40
41#[derive(Debug)]
42#[pin_project]
43struct StateBase {
44 capacity: usize,
45 closed: bool,
46 #[pin]
47 tx_wakers: WakerList,
48 #[pin]
49 rx_wakers: WakerList,
50}
51
52impl StateBase {
53 fn target_capacity(&self) -> usize {
54 self.capacity
57 }
58
59 fn pending_tx<T>(
60 self: Pin<&mut StateBase>,
61 slot: Pin<&mut WakerSlot>,
62 cx: &mut Context,
63 ) -> Poll<T> {
64 self.project().tx_wakers.link(slot, cx.waker().clone());
67 Poll::Pending
68 }
69
70 fn pending_rx<T>(
71 self: Pin<&mut StateBase>,
72 slot: Pin<&mut WakerSlot>,
73 cx: &mut Context,
74 ) -> Poll<T> {
75 self.project().rx_wakers.link(slot, cx.waker().clone());
78 Poll::Pending
79 }
80}
81
82#[derive(Debug)]
83#[pin_project]
84struct State<T> {
85 #[pin]
86 base: StateBase,
87 queue: VecDeque<T>,
88}
89
90impl<T> State<T> {
91 fn has_capacity(&self) -> bool {
92 self.queue.len() < self.target_capacity()
93 }
94
95 fn base(self: Pin<&mut Self>) -> Pin<&mut StateBase> {
96 self.project().base
97 }
98}
99
100impl<T> std::ops::Deref for State<T> {
101 type Target = StateBase;
102
103 fn deref(&self) -> &Self::Target {
104 &self.base
105 }
106}
107
108impl<T> std::ops::DerefMut for State<T> {
109 fn deref_mut(&mut self) -> &mut Self::Target {
110 &mut self.base
111 }
112}
113
114#[derive(Debug)]
115#[pin_project]
116struct Core<T> {
117 #[pin]
118 state: Mutex<State<T>>,
119 not_empty: OnceLock<Condvar>,
124 not_full: OnceLock<Condvar>,
125}
126
127impl<T> Core<T> {
128 fn block_until_not_empty(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
131 fn condition<T>(s: Pin<&mut State<T>>) -> bool {
132 !s.closed && s.queue.is_empty()
133 }
134
135 let mut state = self.project_ref().state.lock();
136 if !condition(state.as_mut()) {
137 return state;
138 }
139 let not_empty = self.not_empty.get_or_init(Default::default);
143 not_empty.wait_while(state, condition)
144 }
145
146 fn block_until_not_full(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
149 fn condition<T>(s: Pin<&mut State<T>>) -> bool {
150 !s.closed && !s.has_capacity()
151 }
152
153 let mut state = self.project_ref().state.lock();
154 if !condition(state.as_mut()) {
155 return state;
156 }
157 let not_full = self.not_full.get_or_init(Default::default);
161 not_full.wait_while(state, condition)
162 }
163
164 fn wake_rx_and_block_while_full<'a>(
167 self: Pin<&'a Self>,
168 mut state: MutexGuard<'a, State<T>>,
169 ) -> MutexGuard<'a, State<T>> {
170 let cvar = self.not_empty.get();
173
174 let round = state
177 .as_mut()
178 .project()
179 .base
180 .project()
181 .rx_wakers
182 .begin_extraction();
183 let mut wakers = ExtractedWakers::new();
184 loop {
188 let more = state
189 .as_mut()
190 .project()
191 .base
192 .project()
193 .rx_wakers
194 .extract_some_wakers(round, &mut wakers);
195 drop(state);
196 wakers.wake_all();
197 if !more {
198 break;
199 }
200 state = self.project_ref().state.lock();
201 }
202
203 if let Some(cvar) = cvar {
207 cvar.notify_all();
210 }
211
212 state = self.project_ref().state.lock();
213
214 let not_full = self.not_full.get_or_init(Default::default);
218 not_full.wait_while(state, |s| !s.closed && !s.has_capacity())
219 }
220
221 fn wake_all_tx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
222 let cvar = self.not_full.get();
224
225 let round = state
226 .as_mut()
227 .project()
228 .base
229 .project()
230 .tx_wakers
231 .begin_extraction();
232 let mut wakers = ExtractedWakers::new();
233 loop {
234 let more = state
235 .as_mut()
236 .project()
237 .base
238 .project()
239 .tx_wakers
240 .extract_some_wakers(round, &mut wakers);
241 drop(state);
242 wakers.wake_all();
243 if !more {
244 break;
245 }
246 state = self.project_ref().state.lock();
247 }
248
249 if let Some(cvar) = cvar {
253 cvar.notify_all();
256 }
257 }
258
259 fn wake_one_rx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
260 let cvar = self.not_empty.get();
262 let round = state
263 .as_mut()
264 .project()
265 .base
266 .project()
267 .rx_wakers
268 .begin_extraction();
269 let mut wakers = ExtractedWakers::new();
270 loop {
274 let more = state
275 .as_mut()
276 .project()
277 .base
278 .project()
279 .rx_wakers
280 .extract_some_wakers(round, &mut wakers);
281 drop(state);
282 wakers.wake_all();
283 if !more {
284 break;
285 }
286 state = self.project_ref().state.lock();
287 }
288
289 if let Some(cvar) = cvar {
290 cvar.notify_one();
291 }
292 }
293
294 fn wake_all_rx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
295 let cvar = self.not_empty.get();
297 let round = state
298 .as_mut()
299 .project()
300 .base
301 .project()
302 .rx_wakers
303 .begin_extraction();
304 let mut wakers = ExtractedWakers::new();
305 loop {
309 let more = state
310 .as_mut()
311 .project()
312 .base
313 .project()
314 .rx_wakers
315 .extract_some_wakers(round, &mut wakers);
316 drop(state);
317 wakers.wake_all();
318 if !more {
319 break;
320 }
321 state = self.project_ref().state.lock();
322 }
323
324 if let Some(cvar) = cvar {
325 cvar.notify_all();
326 }
327 }
328}
329
330impl<T> splitrc::Notify for Core<T> {
331 fn last_tx_did_drop_pinned(self: Pin<&Self>) {
332 let mut state = self.project_ref().state.lock();
333 *state.as_mut().base().project().closed = true;
334 self.wake_all_rx(state);
337 }
338
339 fn last_rx_did_drop_pinned(self: Pin<&Self>) {
340 let mut state = self.project_ref().state.lock();
341 *state.as_mut().base().project().closed = true;
342 state.as_mut().project().queue.clear();
344 self.wake_all_tx(state);
345 }
346}
347
348#[derive(Clone, Copy, Debug, Eq, PartialEq)]
355pub struct SendError<T>(pub T);
356
357impl<T> fmt::Display for SendError<T> {
358 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
359 write!(f, "failed to send value on channel")
360 }
361}
362
363impl<T: fmt::Debug> std::error::Error for SendError<T> {}
364
365#[derive(Debug)]
369pub struct SyncSender<T> {
370 core: Pin<splitrc::Tx<Core<T>>>,
371}
372
373derive_clone!(SyncSender);
374
375impl<T> SyncSender<T> {
376 pub fn into_async(self) -> Sender<T> {
378 Sender { core: self.core }
379 }
380
381 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
385 let mut state = self.core.as_ref().block_until_not_full();
386 if state.closed {
387 assert!(state.as_ref().project_ref().queue.is_empty());
388 return Err(SendError(value));
389 }
390
391 state.as_mut().project().queue.push_back(value);
392
393 self.core.as_ref().wake_one_rx(state);
394 Ok(())
395 }
396
397 pub fn send_iter<I>(&self, values: I) -> Result<(), SendError<()>>
403 where
404 I: IntoIterator<Item = T>,
405 {
406 let mut values = values.into_iter();
407
408 let Some(mut value) = values.next() else {
410 return Ok(());
411 };
412
413 let mut sent_count = 0usize;
414
415 let mut state = self.core.as_ref().block_until_not_full();
416 'outer: loop {
417 if state.closed {
418 assert!(state.queue.is_empty());
421 return Err(SendError(()));
422 }
423
424 debug_assert!(state.has_capacity());
425 state.as_mut().project().queue.push_back(value);
426 sent_count += 1;
427 loop {
428 match values.next() {
429 Some(v) => {
430 if state.has_capacity() {
431 state.as_mut().project().queue.push_back(v);
432 sent_count += 1;
433 } else {
434 value = v;
435 state = self.core.as_ref().wake_rx_and_block_while_full(state);
439 continue 'outer;
440 }
441 }
442 None => {
443 if sent_count == 1 {
446 self.core.as_ref().wake_one_rx(state);
447 } else {
448 self.core.as_ref().wake_all_rx(state);
449 }
450 return Ok(());
451 }
452 }
453 }
454 }
455 }
456
457 pub fn send_vec(&self, mut values: Vec<T>) -> Result<Vec<T>, SendError<Vec<T>>> {
463 match self.send_iter(values.drain(..)) {
464 Ok(_) => Ok(values),
465 Err(_) => Err(SendError(values)),
466 }
467 }
468
469 pub fn autobatch<'a, F, R>(&'a mut self, batch_limit: usize, f: F) -> Result<R, SendError<()>>
472 where
473 F: (FnOnce(&mut SyncBatchSender<'a, T>) -> Result<R, SendError<()>>),
474 {
475 let mut tx = SyncBatchSender {
476 sender: self,
477 capacity: batch_limit,
478 buffer: Vec::with_capacity(batch_limit),
479 };
480 let r = f(&mut tx)?;
481 tx.drain()?;
482 Ok(r)
483 }
484}
485
486#[derive(Debug)]
490pub struct SyncBatchSender<'a, T> {
491 sender: &'a mut SyncSender<T>,
492 capacity: usize,
493 buffer: Vec<T>,
494}
495
496impl<T> SyncBatchSender<'_, T> {
497 pub fn send(&mut self, value: T) -> Result<(), SendError<()>> {
501 self.buffer.push(value);
502 if self.buffer.len() == self.capacity {
504 self.drain()
505 } else {
506 Ok(())
507 }
508 }
509
510 pub fn send_iter<I: IntoIterator<Item = T>>(&mut self, values: I) -> Result<(), SendError<()>> {
513 for value in values.into_iter() {
515 self.send(value)?;
516 }
517 Ok(())
518 }
519
520 pub fn drain(&mut self) -> Result<(), SendError<()>> {
522 match self.sender.send_vec(std::mem::take(&mut self.buffer)) {
524 Ok(drained_vec) => {
525 self.buffer = drained_vec;
526 Ok(())
527 }
528 Err(_) => Err(SendError(())),
529 }
530 }
531}
532
533#[derive(Debug)]
537pub struct Sender<T> {
538 core: Pin<splitrc::Tx<Core<T>>>,
539}
540
541derive_clone!(Sender);
542
543impl<T> Sender<T> {
544 pub fn into_sync(self) -> SyncSender<T> {
546 SyncSender { core: self.core }
547 }
548
549 pub fn send(&self, value: T) -> impl Future<Output = Result<(), SendError<T>>> + '_ {
553 Send {
554 sender: self,
555 value: Some(value),
556 waker: WakerSlot::new(),
557 }
558 }
559
560 pub fn send_iter<'a, I>(
565 &'a self,
566 values: I,
567 ) -> impl Future<Output = Result<(), SendError<()>>> + 'a
568 where
569 I: IntoIterator<Item = T> + 'a,
570 {
571 SendIter {
572 sender: self,
573 values: Some(values.into_iter().peekable()),
574 waker: WakerSlot::new(),
575 }
576 }
577
578 pub async fn autobatch<R>(
581 self,
582 batch_limit: usize,
583 f: impl AsyncFnOnce(&mut BatchSender<T>) -> Result<R, SendError<()>>,
584 ) -> Result<R, SendError<()>> {
585 let mut tx = BatchSender {
586 sender: self,
587 batch_limit,
588 buffer: Vec::with_capacity(batch_limit),
589 };
590 let r = f(&mut tx).await?;
591 tx.drain().await?;
592 Ok(r)
593 }
594
595 pub async fn autobatch_or_cancel(
601 self,
602 capacity: usize,
603 f: impl AsyncFnOnce(&mut BatchSender<T>) -> Result<(), SendError<()>>,
604 ) {
605 self.autobatch(capacity, f).await.unwrap_or(())
606 }
607}
608
609#[must_use = "futures do nothing unless you `.await` or poll them"]
610#[pin_project(PinnedDrop)]
611struct Send<'a, T> {
612 sender: &'a Sender<T>,
613 value: Option<T>,
614 #[pin]
615 waker: WakerSlot,
616}
617
618#[pinned_drop]
619impl<T> PinnedDrop for Send<'_, T> {
620 fn drop(mut self: Pin<&mut Self>) {
621 if self.waker.is_linked() {
622 let mut state = self.sender.core.as_ref().project_ref().state.lock();
623 state
624 .as_mut()
625 .base()
626 .project()
627 .tx_wakers
628 .unlink(self.project().waker);
629 }
630 }
631}
632
633impl<T> Future for Send<'_, T> {
634 type Output = Result<(), SendError<T>>;
635
636 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
637 let mut state = self.sender.core.as_ref().project_ref().state.lock();
638 if state.closed {
639 return Poll::Ready(Err(SendError(self.project().value.take().unwrap())));
640 }
641 if state.has_capacity() {
642 state
643 .as_mut()
644 .project()
645 .queue
646 .push_back(self.as_mut().project().value.take().unwrap());
647 self.project().sender.core.as_ref().wake_one_rx(state);
648 Poll::Ready(Ok(()))
649 } else {
650 state.as_mut().base().pending_tx(self.project().waker, cx)
651 }
652 }
653}
654
655#[must_use = "futures do nothing unless you `.await` or poll them"]
656#[pin_project(PinnedDrop)]
657struct SendIter<'a, T, I: Iterator<Item = T>> {
658 sender: &'a Sender<T>,
659 values: Option<Peekable<I>>,
660 #[pin]
661 waker: WakerSlot,
662}
663
664#[pinned_drop]
665impl<T, I: Iterator<Item = T>> PinnedDrop for SendIter<'_, T, I> {
666 fn drop(mut self: Pin<&mut Self>) {
667 if self.waker.is_linked() {
668 let mut state = self.sender.core.as_ref().project_ref().state.lock();
669 state
670 .as_mut()
671 .base()
672 .project()
673 .tx_wakers
674 .unlink(self.project().waker);
675 }
676 }
677}
678
679impl<T, I: Iterator<Item = T>> Future for SendIter<'_, T, I> {
680 type Output = Result<(), SendError<()>>;
681
682 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
683 {
686 let pi = self.as_mut().project().values.as_mut().unwrap();
687 if pi.peek().is_none() {
688 return Poll::Ready(Ok(()));
689 }
690 }
693
694 let mut state = self.sender.core.as_ref().project_ref().state.lock();
695
696 let pi = self.as_mut().project().values.as_mut().unwrap();
705 debug_assert!(pi.peek().is_some());
707 if state.closed {
708 Poll::Ready(Err(SendError(())))
709 } else if !state.has_capacity() {
710 state.as_mut().base().pending_tx(self.project().waker, cx)
712 } else {
713 debug_assert!(state.has_capacity());
714 state.as_mut().project().queue.push_back(pi.next().unwrap());
715 while state.has_capacity() {
716 match pi.next() {
717 Some(value) => {
718 state.as_mut().project().queue.push_back(value);
719 }
720 None => {
721 self.sender.core.as_ref().wake_all_rx(state);
725 return Poll::Ready(Ok(()));
726 }
727 }
728 }
729 if pi.peek().is_none() {
733 self.sender.core.as_ref().wake_all_rx(state);
734 return Poll::Ready(Ok(()));
735 }
736
737 let pending = state
739 .as_mut()
740 .base()
741 .pending_tx(self.as_mut().project().waker, cx);
742 self.sender.core.as_ref().wake_all_rx(state);
743 pending
744 }
745 }
746}
747
748pub struct BatchSender<T> {
753 sender: Sender<T>,
754 batch_limit: usize,
755 buffer: Vec<T>,
756}
757
758impl<T> BatchSender<T> {
759 pub async fn send(&mut self, value: T) -> Result<(), SendError<()>> {
762 self.buffer.push(value);
763 if self.buffer.len() == self.batch_limit {
764 self.drain().await?;
765 }
766 Ok(())
767 }
768
769 async fn drain(&mut self) -> Result<(), SendError<()>> {
770 self.sender.send_iter(self.buffer.drain(..)).await?;
771 assert!(self.buffer.is_empty());
772 Ok(())
773 }
774}
775
776#[derive(Debug)]
780pub struct Receiver<T> {
781 core: Pin<splitrc::Rx<Core<T>>>,
782}
783
784derive_clone!(Receiver);
785
786#[must_use = "futures do nothing unless you `.await` or poll them"]
787#[pin_project(PinnedDrop)]
788struct Recv<'a, T> {
789 receiver: &'a Receiver<T>,
790 #[pin]
791 waker: WakerSlot,
792}
793
794#[pinned_drop]
795impl<T> PinnedDrop for Recv<'_, T> {
796 fn drop(mut self: Pin<&mut Self>) {
797 if self.waker.is_linked() {
798 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
799 state
800 .as_mut()
801 .base()
802 .project()
803 .rx_wakers
804 .unlink(self.project().waker);
805 }
806 }
807}
808
809impl<T> Future for Recv<'_, T> {
810 type Output = Option<T>;
811
812 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
813 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
814 match state.as_mut().project().queue.pop_front() {
815 Some(value) => {
816 self.receiver.core.as_ref().wake_all_tx(state);
817 Poll::Ready(Some(value))
818 }
819 None => {
820 if state.closed {
821 Poll::Ready(None)
822 } else {
823 state.as_mut().base().pending_rx(self.project().waker, cx)
824 }
825 }
826 }
827 }
828}
829
830#[must_use = "futures do nothing unless you .await or poll them"]
831#[pin_project(PinnedDrop)]
832struct RecvBatch<'a, T> {
833 receiver: &'a Receiver<T>,
834 element_limit: usize,
835 #[pin]
836 waker: WakerSlot,
837}
838
839#[pinned_drop]
840impl<T> PinnedDrop for RecvBatch<'_, T> {
841 fn drop(mut self: Pin<&mut Self>) {
842 if self.waker.is_linked() {
843 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
844 state
845 .as_mut()
846 .base()
847 .project()
848 .rx_wakers
849 .unlink(self.project().waker);
850 }
851 }
852}
853
854impl<T> Future for RecvBatch<'_, T> {
855 type Output = Vec<T>;
856
857 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
858 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
859 let q = &mut state.as_mut().project().queue;
860 let q_len = q.len();
861 if q_len == 0 {
862 if state.closed {
863 return Poll::Ready(Vec::new());
864 } else {
865 return state.as_mut().base().pending_rx(self.project().waker, cx);
866 }
867 }
868
869 let capacity = min(q_len, self.element_limit);
870 let v = Vec::from_iter(q.drain(..capacity));
871 self.receiver.core.as_ref().wake_all_tx(state);
872 Poll::Ready(v)
873 }
874}
875
876#[must_use = "futures do nothing unless you .await or poll them"]
877#[pin_project(PinnedDrop)]
878struct RecvVec<'a, T> {
879 receiver: &'a Receiver<T>,
880 element_limit: usize,
881 vec: &'a mut Vec<T>,
882 #[pin]
883 waker: WakerSlot,
884}
885
886#[pinned_drop]
887impl<T> PinnedDrop for RecvVec<'_, T> {
888 fn drop(mut self: Pin<&mut Self>) {
889 if self.waker.is_linked() {
890 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
891 state
892 .as_mut()
893 .base()
894 .project()
895 .rx_wakers
896 .unlink(self.project().waker);
897 }
898 }
899}
900
901impl<T> Future for RecvVec<'_, T> {
902 type Output = ();
903
904 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
905 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
906 let q = &mut state.as_mut().project().queue;
907 let q_len = q.len();
908 if q_len == 0 {
909 if state.closed {
910 assert!(self.vec.is_empty());
911 return Poll::Ready(());
912 } else {
913 return state.as_mut().base().pending_rx(self.project().waker, cx);
914 }
915 }
916
917 let capacity = min(q_len, self.element_limit);
918 self.as_mut().project().vec.extend(q.drain(..capacity));
919 self.project().receiver.core.as_ref().wake_all_tx(state);
920 Poll::Ready(())
921 }
922}
923
924impl<T> Receiver<T> {
925 pub fn into_sync(self) -> SyncReceiver<T> {
927 SyncReceiver { core: self.core }
928 }
929
930 pub fn recv(&self) -> impl Future<Output = Option<T>> + '_ {
934 Recv {
935 receiver: self,
936 waker: WakerSlot::new(),
937 }
938 }
939
940 pub fn recv_batch(&self, element_limit: usize) -> impl Future<Output = Vec<T>> + '_ {
949 RecvBatch {
950 receiver: self,
951 element_limit,
952 waker: WakerSlot::new(),
953 }
954 }
955
956 pub fn recv_vec<'a>(
969 &'a self,
970 element_limit: usize,
971 vec: &'a mut Vec<T>,
972 ) -> impl Future<Output = ()> + 'a {
973 vec.clear();
974 RecvVec {
975 receiver: self,
976 element_limit,
977 vec,
978 waker: WakerSlot::new(),
979 }
980 }
981
982 }
984
985#[derive(Debug)]
989pub struct SyncReceiver<T> {
990 core: Pin<splitrc::Rx<Core<T>>>,
991}
992
993derive_clone!(SyncReceiver);
994
995impl<T> SyncReceiver<T> {
996 pub fn into_async(self) -> Receiver<T> {
998 Receiver { core: self.core }
999 }
1000
1001 pub fn recv(&self) -> Option<T> {
1005 let mut state = self.core.as_ref().block_until_not_empty();
1006 match state.as_mut().project().queue.pop_front() {
1007 Some(value) => {
1008 self.core.as_ref().wake_all_tx(state);
1009 Some(value)
1010 }
1011 None => {
1012 assert!(state.closed);
1013 None
1014 }
1015 }
1016 }
1017
1018 pub fn recv_batch(&self, element_limit: usize) -> Vec<T> {
1025 let mut state = self.core.as_ref().block_until_not_empty();
1026
1027 let q = &mut state.as_mut().project().queue;
1028 let q_len = q.len();
1029 if q_len == 0 {
1030 assert!(state.closed);
1031 return Vec::new();
1032 }
1033
1034 let capacity = min(q_len, element_limit);
1035 let v = Vec::from_iter(q.drain(..capacity));
1036 self.core.as_ref().wake_all_tx(state);
1037 v
1038 }
1039
1040 pub fn recv_vec(&self, element_limit: usize, vec: &mut Vec<T>) {
1051 vec.clear();
1052
1053 let mut state = self.core.as_ref().block_until_not_empty();
1054 let q = &mut state.as_mut().project().queue;
1055 let q_len = q.len();
1056 if q_len == 0 {
1057 assert!(state.closed);
1058 return;
1060 }
1061
1062 let capacity = min(q_len, element_limit);
1063 vec.extend(q.drain(..capacity));
1064 self.core.as_ref().wake_all_tx(state);
1065 }
1066}
1067
1068pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
1076 Builder::new().bounded(capacity).build_async()
1077}
1078
1079pub fn bounded_sync<T>(capacity: usize) -> (SyncSender<T>, SyncReceiver<T>) {
1086 let (tx, rx) = bounded(capacity);
1087 (tx.into_sync(), rx.into_sync())
1088}
1089
1090pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1093 Builder::new().build_async()
1094}
1095
1096pub fn unbounded_sync<T>() -> (SyncSender<T>, SyncReceiver<T>) {
1099 let (tx, rx) = unbounded();
1100 (tx.into_sync(), rx.into_sync())
1101}
1102
1103#[derive(Debug, Default)]
1105pub struct Builder {
1106 capacity: Option<usize>,
1107 preallocate: bool,
1108}
1109
1110impl Builder {
1111 pub fn new() -> Self {
1113 Default::default()
1114 }
1115
1116 pub fn bounded(&mut self, capacity: usize) -> &mut Self {
1122 self.capacity = Some(capacity);
1123 self
1124 }
1125
1126 pub fn preallocate(&mut self) -> &mut Self {
1131 self.preallocate = true;
1132 self
1133 }
1134
1135 pub fn build_async<T>(&mut self) -> (Sender<T>, Receiver<T>) {
1138 let capacity;
1139 let queue;
1140 match self.capacity {
1141 Some(c) => {
1142 capacity = c;
1143 queue = if self.preallocate {
1144 VecDeque::with_capacity(capacity)
1145 } else {
1146 VecDeque::new()
1147 };
1148 }
1149 None => {
1150 capacity = UNBOUNDED_CAPACITY;
1151 queue = VecDeque::new();
1152 }
1153 };
1154
1155 let core = Core {
1156 state: Mutex::new(State {
1157 base: StateBase {
1158 capacity,
1159 closed: false,
1160 tx_wakers: WakerList::new(),
1161 rx_wakers: WakerList::new(),
1162 },
1163 queue,
1164 }),
1165 not_empty: OnceLock::new(),
1166 not_full: OnceLock::new(),
1167 };
1168 let (core_tx, core_rx) = splitrc::pin(core);
1169 (Sender { core: core_tx }, Receiver { core: core_rx })
1170 }
1171
1172 pub fn build_sync<T>(&mut self) -> (SyncSender<T>, SyncReceiver<T>) {
1175 let (tx, rx) = self.build_async();
1176 (tx.into_sync(), rx.into_sync())
1177 }
1178}