1#![doc = include_str!("../README.md")]
2#![doc = include_str!("example.md")]
3
4use futures_core::future::BoxFuture;
5use mutex::PinnedCondvar as Condvar;
6use mutex::PinnedMutex as Mutex;
7use mutex::PinnedMutexGuard as MutexGuard;
8use pin_project::pin_project;
9use pin_project::pinned_drop;
10#[cfg(feature = "parking_lot")]
11use pinned_mutex::parking_lot as mutex;
12#[cfg(not(feature = "parking_lot"))]
13use pinned_mutex::std as mutex;
14use std::cmp::min;
15use std::collections::VecDeque;
16use std::fmt;
17use std::future::Future;
18use std::iter::Peekable;
19use std::pin::Pin;
20use std::sync::OnceLock;
21use std::task::Context;
22use std::task::Poll;
23use wakerset::WakerList;
24use wakerset::WakerSlot;
25
26const UNBOUNDED_CAPACITY: usize = usize::MAX;
27
28macro_rules! derive_clone {
29 ($t:ident) => {
30 impl<T> Clone for $t<T> {
31 fn clone(&self) -> Self {
32 Self {
33 core: self.core.clone(),
34 }
35 }
36 }
37 };
38}
39
40#[derive(Debug)]
41#[pin_project]
42struct StateBase {
43 capacity: usize,
44 closed: bool,
45 #[pin]
46 tx_wakers: WakerList,
47 #[pin]
48 rx_wakers: WakerList,
49}
50
51impl StateBase {
52 fn target_capacity(&self) -> usize {
53 self.capacity
56 }
57
58 fn pending_tx<T>(
59 self: Pin<&mut StateBase>,
60 slot: Pin<&mut WakerSlot>,
61 cx: &mut Context,
62 ) -> Poll<T> {
63 self.project().tx_wakers.link(slot, cx.waker().clone());
66 Poll::Pending
67 }
68
69 fn pending_rx<T>(
70 self: Pin<&mut StateBase>,
71 slot: Pin<&mut WakerSlot>,
72 cx: &mut Context,
73 ) -> Poll<T> {
74 self.project().rx_wakers.link(slot, cx.waker().clone());
77 Poll::Pending
78 }
79}
80
81#[derive(Debug)]
82#[pin_project]
83struct State<T> {
84 #[pin]
85 base: StateBase,
86 queue: VecDeque<T>,
87}
88
89impl<T> State<T> {
90 fn has_capacity(&self) -> bool {
91 self.queue.len() < self.target_capacity()
92 }
93
94 fn base(self: Pin<&mut Self>) -> Pin<&mut StateBase> {
95 self.project().base
96 }
97}
98
99impl<T> std::ops::Deref for State<T> {
100 type Target = StateBase;
101
102 fn deref(&self) -> &Self::Target {
103 &self.base
104 }
105}
106
107impl<T> std::ops::DerefMut for State<T> {
108 fn deref_mut(&mut self) -> &mut Self::Target {
109 &mut self.base
110 }
111}
112
113#[derive(Debug)]
114#[pin_project]
115struct Core<T> {
116 #[pin]
117 state: Mutex<State<T>>,
118 not_empty: OnceLock<Condvar>,
123 not_full: OnceLock<Condvar>,
124}
125
126impl<T> Core<T> {
127 fn block_until_not_empty(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
130 fn condition<T>(s: Pin<&mut State<T>>) -> bool {
131 !s.closed && s.queue.is_empty()
132 }
133
134 let mut state = self.project_ref().state.lock();
135 if !condition(state.as_mut()) {
136 return state;
137 }
138 let not_empty = self.not_empty.get_or_init(Default::default);
142 not_empty.wait_while(state, condition)
143 }
144
145 fn block_until_not_full(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
148 fn condition<T>(s: Pin<&mut State<T>>) -> bool {
149 !s.closed && !s.has_capacity()
150 }
151
152 let mut state = self.project_ref().state.lock();
153 if !condition(state.as_mut()) {
154 return state;
155 }
156 let not_full = self.not_full.get_or_init(Default::default);
160 not_full.wait_while(state, condition)
161 }
162
163 fn wake_rx_and_block_while_full<'a>(
166 self: Pin<&'a Self>,
167 mut state: MutexGuard<'a, State<T>>,
168 ) -> MutexGuard<'a, State<T>> {
169 let cvar = self.not_empty.get();
172 let mut wakers = state
175 .as_mut()
176 .project()
177 .base
178 .project()
179 .rx_wakers
180 .extract_some_wakers();
181
182 drop(state);
186 if let Some(cvar) = cvar {
187 cvar.notify_all();
190 }
191 while wakers.wake_all() {
195 let mut state = self.project_ref().state.lock();
196 wakers.extract_more(state.as_mut().base().project().rx_wakers);
197 }
198
199 let state = self.project_ref().state.lock();
201
202 let not_full = self.not_full.get_or_init(Default::default);
206 not_full.wait_while(state, |s| !s.closed && !s.has_capacity())
207 }
208
209 fn wake_all_tx(self: Pin<&Self>, mut state: MutexGuard<State<T>>) {
210 let cvar = self.not_full.get();
212 let mut wakers = state
213 .as_mut()
214 .project()
215 .base
216 .project()
217 .tx_wakers
218 .extract_some_wakers();
219 drop(state);
220 if let Some(cvar) = cvar {
221 cvar.notify_all();
224 }
225 while wakers.wake_all() {
229 let mut state = self.project_ref().state.lock();
230 wakers.extract_more(state.as_mut().project().base.project().tx_wakers);
231 }
232 }
233
234 fn wake_one_rx(self: Pin<&Self>, mut state: MutexGuard<State<T>>) {
235 let cvar = self.not_empty.get();
237 let mut wakers = state
238 .as_mut()
239 .project()
240 .base
241 .project()
242 .rx_wakers
243 .extract_some_wakers();
244 drop(state);
245 if let Some(cvar) = cvar {
246 cvar.notify_one();
247 }
248 while wakers.wake_all() {
252 let mut state = self.project_ref().state.lock();
253 wakers.extract_more(state.as_mut().project().base.project().rx_wakers);
254 }
255 }
256
257 fn wake_all_rx(self: Pin<&Self>, mut state: MutexGuard<State<T>>) {
258 let cvar = self.not_empty.get();
260 let mut wakers = state
261 .as_mut()
262 .project()
263 .base
264 .project()
265 .rx_wakers
266 .extract_some_wakers();
267 drop(state);
268 if let Some(cvar) = cvar {
269 cvar.notify_all();
270 }
271 while wakers.wake_all() {
275 let mut state = self.project_ref().state.lock();
276 wakers.extract_more(state.as_mut().project().base.project().rx_wakers);
277 }
278 }
279}
280
281impl<T> splitrc::Notify for Core<T> {
282 fn last_tx_did_drop_pinned(self: Pin<&Self>) {
283 let mut state = self.project_ref().state.lock();
284 *state.as_mut().base().project().closed = true;
285 self.wake_all_rx(state);
288 }
289
290 fn last_rx_did_drop_pinned(self: Pin<&Self>) {
291 let mut state = self.project_ref().state.lock();
292 *state.as_mut().base().project().closed = true;
293 state.as_mut().project().queue.clear();
295 self.wake_all_tx(state);
296 }
297}
298
299#[derive(Clone, Copy, Debug, Eq, PartialEq)]
306pub struct SendError<T>(pub T);
307
308impl<T> fmt::Display for SendError<T> {
309 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310 write!(f, "failed to send value on channel")
311 }
312}
313
314impl<T: fmt::Debug> std::error::Error for SendError<T> {}
315
316#[derive(Debug)]
320pub struct SyncSender<T> {
321 core: Pin<splitrc::Tx<Core<T>>>,
322}
323
324derive_clone!(SyncSender);
325
326impl<T> SyncSender<T> {
327 pub fn into_async(self) -> Sender<T> {
329 Sender { core: self.core }
330 }
331
332 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
336 let mut state = self.core.as_ref().block_until_not_full();
337 if state.closed {
338 assert!(state.as_ref().project_ref().queue.is_empty());
339 return Err(SendError(value));
340 }
341
342 state.as_mut().project().queue.push_back(value);
343
344 self.core.as_ref().wake_one_rx(state);
345 Ok(())
346 }
347
348 pub fn send_iter<I>(&self, values: I) -> Result<(), SendError<()>>
354 where
355 I: IntoIterator<Item = T>,
356 {
357 let mut values = values.into_iter();
358
359 let Some(mut value) = values.next() else {
361 return Ok(());
362 };
363
364 let mut sent_count = 0usize;
365
366 let mut state = self.core.as_ref().block_until_not_full();
367 'outer: loop {
368 if state.closed {
369 assert!(state.queue.is_empty());
372 return Err(SendError(()));
373 }
374
375 debug_assert!(state.has_capacity());
376 state.as_mut().project().queue.push_back(value);
377 sent_count += 1;
378 loop {
379 match values.next() {
380 Some(v) => {
381 if state.has_capacity() {
382 state.as_mut().project().queue.push_back(v);
383 sent_count += 1;
384 } else {
385 value = v;
386 state = self.core.as_ref().wake_rx_and_block_while_full(state);
390 continue 'outer;
391 }
392 }
393 None => {
394 if sent_count == 1 {
397 self.core.as_ref().wake_one_rx(state);
398 } else {
399 self.core.as_ref().wake_all_rx(state);
400 }
401 return Ok(());
402 }
403 }
404 }
405 }
406 }
407
408 pub fn send_vec(&self, mut values: Vec<T>) -> Result<Vec<T>, SendError<Vec<T>>> {
414 match self.send_iter(values.drain(..)) {
415 Ok(_) => Ok(values),
416 Err(_) => Err(SendError(values)),
417 }
418 }
419
420 pub fn autobatch<'a, F, R>(&'a mut self, batch_limit: usize, f: F) -> Result<R, SendError<()>>
423 where
424 F: (FnOnce(&mut SyncBatchSender<'a, T>) -> Result<R, SendError<()>>),
425 {
426 let mut tx = SyncBatchSender {
427 sender: self,
428 capacity: batch_limit,
429 buffer: Vec::with_capacity(batch_limit),
430 };
431 let r = f(&mut tx)?;
432 tx.drain()?;
433 Ok(r)
434 }
435}
436
437#[derive(Debug)]
441pub struct SyncBatchSender<'a, T> {
442 sender: &'a mut SyncSender<T>,
443 capacity: usize,
444 buffer: Vec<T>,
445}
446
447impl<T> SyncBatchSender<'_, T> {
448 pub fn send(&mut self, value: T) -> Result<(), SendError<()>> {
452 self.buffer.push(value);
453 if self.buffer.len() == self.capacity {
455 self.drain()
456 } else {
457 Ok(())
458 }
459 }
460
461 pub fn send_iter<I: IntoIterator<Item = T>>(&mut self, values: I) -> Result<(), SendError<()>> {
464 for value in values.into_iter() {
466 self.send(value)?;
467 }
468 Ok(())
469 }
470
471 pub fn drain(&mut self) -> Result<(), SendError<()>> {
473 match self.sender.send_vec(std::mem::take(&mut self.buffer)) {
475 Ok(drained_vec) => {
476 self.buffer = drained_vec;
477 Ok(())
478 }
479 Err(_) => Err(SendError(())),
480 }
481 }
482}
483
484#[derive(Debug)]
488pub struct Sender<T> {
489 core: Pin<splitrc::Tx<Core<T>>>,
490}
491
492derive_clone!(Sender);
493
494impl<T> Sender<T> {
495 pub fn into_sync(self) -> SyncSender<T> {
497 SyncSender { core: self.core }
498 }
499
500 pub fn send(&self, value: T) -> impl Future<Output = Result<(), SendError<T>>> + '_ {
504 Send {
505 sender: self,
506 value: Some(value),
507 waker: WakerSlot::new(),
508 }
509 }
510
511 pub fn send_iter<'a, I>(
516 &'a self,
517 values: I,
518 ) -> impl Future<Output = Result<(), SendError<()>>> + 'a
519 where
520 I: IntoIterator<Item = T> + 'a,
521 {
522 SendIter {
523 sender: self,
524 values: Some(values.into_iter().peekable()),
525 waker: WakerSlot::new(),
526 }
527 }
528
529 pub async fn autobatch<F, R>(self, batch_limit: usize, f: F) -> Result<R, SendError<()>>
541 where
542 for<'a> F: (FnOnce(&'a mut BatchSender<T>) -> BoxFuture<'a, Result<R, SendError<()>>>),
543 {
544 let mut tx = BatchSender {
545 sender: self,
546 batch_limit,
547 buffer: Vec::with_capacity(batch_limit),
548 };
549 let r = f(&mut tx).await?;
550 tx.drain().await?;
551 Ok(r)
552 }
553
554 pub async fn autobatch_or_cancel<F>(self, capacity: usize, f: F)
560 where
561 for<'a> F: (FnOnce(&'a mut BatchSender<T>) -> BoxFuture<'a, Result<(), SendError<()>>>),
562 {
563 self.autobatch(capacity, f).await.unwrap_or(())
564 }
565}
566
567#[must_use = "futures do nothing unless you `.await` or poll them"]
568#[pin_project(PinnedDrop)]
569struct Send<'a, T> {
570 sender: &'a Sender<T>,
571 value: Option<T>,
572 #[pin]
573 waker: WakerSlot,
574}
575
576#[pinned_drop]
577impl<T> PinnedDrop for Send<'_, T> {
578 fn drop(mut self: Pin<&mut Self>) {
579 if self.waker.is_linked() {
580 let mut state = self.sender.core.as_ref().project_ref().state.lock();
581 state
582 .as_mut()
583 .base()
584 .project()
585 .tx_wakers
586 .unlink(self.project().waker);
587 }
588 }
589}
590
591impl<T> Future for Send<'_, T> {
592 type Output = Result<(), SendError<T>>;
593
594 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
595 let mut state = self.sender.core.as_ref().project_ref().state.lock();
596 if state.closed {
597 return Poll::Ready(Err(SendError(self.project().value.take().unwrap())));
598 }
599 if state.has_capacity() {
600 state
601 .as_mut()
602 .project()
603 .queue
604 .push_back(self.as_mut().project().value.take().unwrap());
605 self.project().sender.core.as_ref().wake_one_rx(state);
606 Poll::Ready(Ok(()))
607 } else {
608 state.as_mut().base().pending_tx(self.project().waker, cx)
609 }
610 }
611}
612
613#[must_use = "futures do nothing unless you `.await` or poll them"]
614#[pin_project(PinnedDrop)]
615struct SendIter<'a, T, I: Iterator<Item = T>> {
616 sender: &'a Sender<T>,
617 values: Option<Peekable<I>>,
618 #[pin]
619 waker: WakerSlot,
620}
621
622#[pinned_drop]
623impl<T, I: Iterator<Item = T>> PinnedDrop for SendIter<'_, T, I> {
624 fn drop(mut self: Pin<&mut Self>) {
625 if self.waker.is_linked() {
626 let mut state = self.sender.core.as_ref().project_ref().state.lock();
627 state
628 .as_mut()
629 .base()
630 .project()
631 .tx_wakers
632 .unlink(self.project().waker);
633 }
634 }
635}
636
637impl<T, I: Iterator<Item = T>> Future for SendIter<'_, T, I> {
638 type Output = Result<(), SendError<()>>;
639
640 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
641 {
644 let pi = self.as_mut().project().values.as_mut().unwrap();
645 if pi.peek().is_none() {
646 return Poll::Ready(Ok(()));
647 }
648 }
651
652 let mut state = self.sender.core.as_ref().project_ref().state.lock();
653
654 let pi = self.as_mut().project().values.as_mut().unwrap();
663 debug_assert!(pi.peek().is_some());
665 if state.closed {
666 Poll::Ready(Err(SendError(())))
667 } else if !state.has_capacity() {
668 state.as_mut().base().pending_tx(self.project().waker, cx)
670 } else {
671 debug_assert!(state.has_capacity());
672 state.as_mut().project().queue.push_back(pi.next().unwrap());
673 while state.has_capacity() {
674 match pi.next() {
675 Some(value) => {
676 state.as_mut().project().queue.push_back(value);
677 }
678 None => {
679 self.sender.core.as_ref().wake_all_rx(state);
683 return Poll::Ready(Ok(()));
684 }
685 }
686 }
687 if pi.peek().is_none() {
691 self.sender.core.as_ref().wake_all_rx(state);
692 return Poll::Ready(Ok(()));
693 }
694
695 let pending = state
697 .as_mut()
698 .base()
699 .pending_tx(self.as_mut().project().waker, cx);
700 self.sender.core.as_ref().wake_all_rx(state);
701 pending
702 }
703 }
704}
705
706pub struct BatchSender<T> {
711 sender: Sender<T>,
712 batch_limit: usize,
713 buffer: Vec<T>,
714}
715
716impl<T> BatchSender<T> {
717 pub async fn send(&mut self, value: T) -> Result<(), SendError<()>> {
720 self.buffer.push(value);
721 if self.buffer.len() == self.batch_limit {
722 self.drain().await?;
723 }
724 Ok(())
725 }
726
727 async fn drain(&mut self) -> Result<(), SendError<()>> {
728 self.sender.send_iter(self.buffer.drain(..)).await?;
729 assert!(self.buffer.is_empty());
730 Ok(())
731 }
732}
733
734#[derive(Debug)]
738pub struct Receiver<T> {
739 core: Pin<splitrc::Rx<Core<T>>>,
740}
741
742derive_clone!(Receiver);
743
744#[must_use = "futures do nothing unless you `.await` or poll them"]
745#[pin_project(PinnedDrop)]
746struct Recv<'a, T> {
747 receiver: &'a Receiver<T>,
748 #[pin]
749 waker: WakerSlot,
750}
751
752#[pinned_drop]
753impl<T> PinnedDrop for Recv<'_, T> {
754 fn drop(mut self: Pin<&mut Self>) {
755 if self.waker.is_linked() {
756 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
757 state
758 .as_mut()
759 .base()
760 .project()
761 .rx_wakers
762 .unlink(self.project().waker);
763 }
764 }
765}
766
767impl<T> Future for Recv<'_, T> {
768 type Output = Option<T>;
769
770 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
771 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
772 match state.as_mut().project().queue.pop_front() {
773 Some(value) => {
774 self.receiver.core.as_ref().wake_all_tx(state);
775 Poll::Ready(Some(value))
776 }
777 None => {
778 if state.closed {
779 Poll::Ready(None)
780 } else {
781 state.as_mut().base().pending_rx(self.project().waker, cx)
782 }
783 }
784 }
785 }
786}
787
788#[must_use = "futures do nothing unless you .await or poll them"]
789#[pin_project(PinnedDrop)]
790struct RecvBatch<'a, T> {
791 receiver: &'a Receiver<T>,
792 element_limit: usize,
793 #[pin]
794 waker: WakerSlot,
795}
796
797#[pinned_drop]
798impl<T> PinnedDrop for RecvBatch<'_, T> {
799 fn drop(mut self: Pin<&mut Self>) {
800 if self.waker.is_linked() {
801 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
802 state
803 .as_mut()
804 .base()
805 .project()
806 .rx_wakers
807 .unlink(self.project().waker);
808 }
809 }
810}
811
812impl<T> Future for RecvBatch<'_, T> {
813 type Output = Vec<T>;
814
815 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
816 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
817 let q = &mut state.as_mut().project().queue;
818 let q_len = q.len();
819 if q_len == 0 {
820 if state.closed {
821 return Poll::Ready(Vec::new());
822 } else {
823 return state.as_mut().base().pending_rx(self.project().waker, cx);
824 }
825 }
826
827 let capacity = min(q_len, self.element_limit);
828 let v = Vec::from_iter(q.drain(..capacity));
829 self.receiver.core.as_ref().wake_all_tx(state);
830 Poll::Ready(v)
831 }
832}
833
834#[must_use = "futures do nothing unless you .await or poll them"]
835#[pin_project(PinnedDrop)]
836struct RecvVec<'a, T> {
837 receiver: &'a Receiver<T>,
838 element_limit: usize,
839 vec: &'a mut Vec<T>,
840 #[pin]
841 waker: WakerSlot,
842}
843
844#[pinned_drop]
845impl<T> PinnedDrop for RecvVec<'_, T> {
846 fn drop(mut self: Pin<&mut Self>) {
847 if self.waker.is_linked() {
848 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
849 state
850 .as_mut()
851 .base()
852 .project()
853 .rx_wakers
854 .unlink(self.project().waker);
855 }
856 }
857}
858
859impl<T> Future for RecvVec<'_, T> {
860 type Output = ();
861
862 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
863 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
864 let q = &mut state.as_mut().project().queue;
865 let q_len = q.len();
866 if q_len == 0 {
867 if state.closed {
868 assert!(self.vec.is_empty());
869 return Poll::Ready(());
870 } else {
871 return state.as_mut().base().pending_rx(self.project().waker, cx);
872 }
873 }
874
875 let capacity = min(q_len, self.element_limit);
876 self.as_mut().project().vec.extend(q.drain(..capacity));
877 self.project().receiver.core.as_ref().wake_all_tx(state);
878 Poll::Ready(())
879 }
880}
881
882impl<T> Receiver<T> {
883 pub fn into_sync(self) -> SyncReceiver<T> {
885 SyncReceiver { core: self.core }
886 }
887
888 pub fn recv(&self) -> impl Future<Output = Option<T>> + '_ {
892 Recv {
893 receiver: self,
894 waker: WakerSlot::new(),
895 }
896 }
897
898 pub fn recv_batch(&self, element_limit: usize) -> impl Future<Output = Vec<T>> + '_ {
907 RecvBatch {
908 receiver: self,
909 element_limit,
910 waker: WakerSlot::new(),
911 }
912 }
913
914 pub fn recv_vec<'a>(
927 &'a self,
928 element_limit: usize,
929 vec: &'a mut Vec<T>,
930 ) -> impl Future<Output = ()> + 'a {
931 vec.clear();
932 RecvVec {
933 receiver: self,
934 element_limit,
935 vec,
936 waker: WakerSlot::new(),
937 }
938 }
939
940 }
942
943#[derive(Debug)]
947pub struct SyncReceiver<T> {
948 core: Pin<splitrc::Rx<Core<T>>>,
949}
950
951derive_clone!(SyncReceiver);
952
953impl<T> SyncReceiver<T> {
954 pub fn into_async(self) -> Receiver<T> {
956 Receiver { core: self.core }
957 }
958
959 pub fn recv(&self) -> Option<T> {
963 let mut state = self.core.as_ref().block_until_not_empty();
964 match state.as_mut().project().queue.pop_front() {
965 Some(value) => {
966 self.core.as_ref().wake_all_tx(state);
967 Some(value)
968 }
969 None => {
970 assert!(state.closed);
971 None
972 }
973 }
974 }
975
976 pub fn recv_batch(&self, element_limit: usize) -> Vec<T> {
983 let mut state = self.core.as_ref().block_until_not_empty();
984
985 let q = &mut state.as_mut().project().queue;
986 let q_len = q.len();
987 if q_len == 0 {
988 assert!(state.closed);
989 return Vec::new();
990 }
991
992 let capacity = min(q_len, element_limit);
993 let v = Vec::from_iter(q.drain(..capacity));
994 self.core.as_ref().wake_all_tx(state);
995 v
996 }
997
998 pub fn recv_vec(&self, element_limit: usize, vec: &mut Vec<T>) {
1009 vec.clear();
1010
1011 let mut state = self.core.as_ref().block_until_not_empty();
1012 let q = &mut state.as_mut().project().queue;
1013 let q_len = q.len();
1014 if q_len == 0 {
1015 assert!(state.closed);
1016 return;
1018 }
1019
1020 let capacity = min(q_len, element_limit);
1021 vec.extend(q.drain(..capacity));
1022 self.core.as_ref().wake_all_tx(state);
1023 }
1024}
1025
1026pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
1034 let capacity = capacity.max(1);
1035 let core = Core {
1036 state: Mutex::new(State {
1037 base: StateBase {
1038 capacity,
1039 closed: false,
1040 tx_wakers: WakerList::new(),
1041 rx_wakers: WakerList::new(),
1042 },
1043 queue: VecDeque::new(),
1044 }),
1045 not_empty: OnceLock::new(),
1046 not_full: OnceLock::new(),
1047 };
1048 let (core_tx, core_rx) = splitrc::pin(core);
1049 (Sender { core: core_tx }, Receiver { core: core_rx })
1050}
1051
1052pub fn bounded_sync<T>(capacity: usize) -> (SyncSender<T>, SyncReceiver<T>) {
1059 let (tx, rx) = bounded(capacity);
1060 (tx.into_sync(), rx.into_sync())
1061}
1062
1063pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1066 let core = Core {
1067 state: Mutex::new(State {
1068 base: StateBase {
1069 capacity: UNBOUNDED_CAPACITY,
1070 closed: false,
1071 tx_wakers: WakerList::new(),
1072 rx_wakers: WakerList::new(),
1073 },
1074 queue: VecDeque::new(),
1075 }),
1076 not_empty: OnceLock::new(),
1077 not_full: OnceLock::new(),
1078 };
1079 let (core_tx, core_rx) = splitrc::pin(core);
1080 (Sender { core: core_tx }, Receiver { core: core_rx })
1081}
1082
1083pub fn unbounded_sync<T>() -> (SyncSender<T>, SyncReceiver<T>) {
1086 let (tx, rx) = unbounded();
1087 (tx.into_sync(), rx.into_sync())
1088}