1#![doc = include_str!("../README.md")]
2#![doc = include_str!("example.md")]
3
4use mutex::PinnedCondvar as Condvar;
5use mutex::PinnedMutex as Mutex;
6use mutex::PinnedMutexGuard as MutexGuard;
7use pin_project::pin_project;
8use pin_project::pinned_drop;
9#[cfg(feature = "parking_lot")]
10use pinned_mutex::parking_lot as mutex;
11#[cfg(not(feature = "parking_lot"))]
12use pinned_mutex::std as mutex;
13use std::cmp::min;
14use std::collections::VecDeque;
15use std::fmt;
16use std::future::Future;
17use std::iter::Peekable;
18use std::ops::AsyncFnOnce;
19use std::pin::Pin;
20use std::sync::OnceLock;
21use std::task::Context;
22use std::task::Poll;
23use wakerset::ExtractedWakers;
24use wakerset::WakerList;
25use wakerset::WakerSlot;
26
27const UNBOUNDED_CAPACITY: usize = usize::MAX;
28
29macro_rules! derive_clone {
30 ($t:ident) => {
31 impl<T> Clone for $t<T> {
32 fn clone(&self) -> Self {
33 Self {
34 core: self.core.clone(),
35 }
36 }
37 }
38 };
39}
40
41#[derive(Debug)]
42#[pin_project]
43struct StateBase {
44 capacity: usize,
45 closed: bool,
46 #[pin]
47 tx_wakers: WakerList,
48 #[pin]
49 rx_wakers: WakerList,
50}
51
52impl StateBase {
53 fn target_capacity(&self) -> usize {
54 self.capacity
57 }
58
59 fn pending_tx<T>(
60 self: Pin<&mut StateBase>,
61 slot: Pin<&mut WakerSlot>,
62 cx: &mut Context,
63 ) -> Poll<T> {
64 self.project().tx_wakers.link(slot, cx.waker().clone());
67 Poll::Pending
68 }
69
70 fn pending_rx<T>(
71 self: Pin<&mut StateBase>,
72 slot: Pin<&mut WakerSlot>,
73 cx: &mut Context,
74 ) -> Poll<T> {
75 self.project().rx_wakers.link(slot, cx.waker().clone());
78 Poll::Pending
79 }
80}
81
82#[derive(Debug)]
83#[pin_project]
84struct State<T> {
85 #[pin]
86 base: StateBase,
87 queue: VecDeque<T>,
88}
89
90impl<T> State<T> {
91 fn has_capacity(&self) -> bool {
92 self.queue.len() < self.target_capacity()
93 }
94
95 fn base(self: Pin<&mut Self>) -> Pin<&mut StateBase> {
96 self.project().base
97 }
98}
99
100impl<T> std::ops::Deref for State<T> {
101 type Target = StateBase;
102
103 fn deref(&self) -> &Self::Target {
104 &self.base
105 }
106}
107
108impl<T> std::ops::DerefMut for State<T> {
109 fn deref_mut(&mut self) -> &mut Self::Target {
110 &mut self.base
111 }
112}
113
114#[derive(Debug)]
115#[pin_project]
116struct Core<T> {
117 #[pin]
118 state: Mutex<State<T>>,
119 not_empty: OnceLock<Condvar>,
124 not_full: OnceLock<Condvar>,
125}
126
127impl<T> Core<T> {
128 fn block_until_not_empty(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
131 fn condition<T>(s: Pin<&mut State<T>>) -> bool {
132 !s.closed && s.queue.is_empty()
133 }
134
135 let mut state = self.project_ref().state.lock();
136 if !condition(state.as_mut()) {
137 return state;
138 }
139 let not_empty = self.not_empty.get_or_init(Default::default);
143 not_empty.wait_while(state, condition)
144 }
145
146 fn block_until_not_full(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
149 fn condition<T>(s: Pin<&mut State<T>>) -> bool {
150 !s.closed && !s.has_capacity()
151 }
152
153 let mut state = self.project_ref().state.lock();
154 if !condition(state.as_mut()) {
155 return state;
156 }
157 let not_full = self.not_full.get_or_init(Default::default);
161 not_full.wait_while(state, condition)
162 }
163
164 fn wake_rx_and_block_while_full<'a>(
167 self: Pin<&'a Self>,
168 mut state: MutexGuard<'a, State<T>>,
169 ) -> MutexGuard<'a, State<T>> {
170 let cvar = self.not_empty.get();
173
174 let round = state
177 .as_mut()
178 .project()
179 .base
180 .project()
181 .rx_wakers
182 .begin_extraction();
183 let mut wakers = ExtractedWakers::new();
184 loop {
188 let more = state
189 .as_mut()
190 .project()
191 .base
192 .project()
193 .rx_wakers
194 .extract_some_wakers(round, &mut wakers);
195 drop(state);
196 wakers.wake_all();
197 if !more {
198 break;
199 }
200 state = self.project_ref().state.lock();
201 }
202
203 if let Some(cvar) = cvar {
207 cvar.notify_all();
210 }
211
212 state = self.project_ref().state.lock();
213
214 let not_full = self.not_full.get_or_init(Default::default);
218 not_full.wait_while(state, |s| !s.closed && !s.has_capacity())
219 }
220
221 fn wake_all_tx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
222 let cvar = self.not_full.get();
224
225 if state.as_mut().project().base.project().tx_wakers.is_empty() {
226 drop(state);
227 } else {
228 let round = state
229 .as_mut()
230 .project()
231 .base
232 .project()
233 .tx_wakers
234 .begin_extraction();
235 let mut wakers = ExtractedWakers::new();
236 loop {
237 let more = state
238 .as_mut()
239 .project()
240 .base
241 .project()
242 .tx_wakers
243 .extract_some_wakers(round, &mut wakers);
244 drop(state);
245 wakers.wake_all();
246 if !more {
247 break;
248 }
249 state = self.project_ref().state.lock();
250 }
251 }
252
253 if let Some(cvar) = cvar {
254 cvar.notify_all();
257 }
258 }
259
260 fn wake_one_rx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
261 let cvar = self.not_empty.get();
263 if state.as_mut().project().base.project().rx_wakers.is_empty() {
264 drop(state);
265 } else {
266 let round = state
267 .as_mut()
268 .project()
269 .base
270 .project()
271 .rx_wakers
272 .begin_extraction();
273 let mut wakers = ExtractedWakers::new();
274 loop {
278 let more = state
279 .as_mut()
280 .project()
281 .base
282 .project()
283 .rx_wakers
284 .extract_some_wakers(round, &mut wakers);
285 drop(state);
286 wakers.wake_all();
287 if !more {
288 break;
289 }
290 state = self.project_ref().state.lock();
291 }
292 }
293
294 if let Some(cvar) = cvar {
295 cvar.notify_one();
296 }
297 }
298
299 fn wake_all_rx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
300 let cvar = self.not_empty.get();
302 let round = state
303 .as_mut()
304 .project()
305 .base
306 .project()
307 .rx_wakers
308 .begin_extraction();
309 let mut wakers = ExtractedWakers::new();
310 loop {
314 let more = state
315 .as_mut()
316 .project()
317 .base
318 .project()
319 .rx_wakers
320 .extract_some_wakers(round, &mut wakers);
321 drop(state);
322 wakers.wake_all();
323 if !more {
324 break;
325 }
326 state = self.project_ref().state.lock();
327 }
328
329 if let Some(cvar) = cvar {
330 cvar.notify_all();
331 }
332 }
333}
334
335impl<T> splitrc::Notify for Core<T> {
336 fn last_tx_did_drop_pinned(self: Pin<&Self>) {
337 let mut state = self.project_ref().state.lock();
338 *state.as_mut().base().project().closed = true;
339 self.wake_all_rx(state);
342 }
343
344 fn last_rx_did_drop_pinned(self: Pin<&Self>) {
345 let mut state = self.project_ref().state.lock();
346 *state.as_mut().base().project().closed = true;
347 state.as_mut().project().queue.clear();
349 self.wake_all_tx(state);
350 }
351}
352
353#[derive(Clone, Copy, Debug, Eq, PartialEq)]
360pub struct SendError<T>(pub T);
361
362impl<T> fmt::Display for SendError<T> {
363 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
364 write!(f, "failed to send value on channel")
365 }
366}
367
368impl<T: fmt::Debug> std::error::Error for SendError<T> {}
369
370#[derive(Debug)]
374pub struct SyncSender<T> {
375 core: Pin<splitrc::Tx<Core<T>>>,
376}
377
378derive_clone!(SyncSender);
379
380impl<T> SyncSender<T> {
381 pub fn into_async(self) -> Sender<T> {
383 Sender { core: self.core }
384 }
385
386 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
390 let mut state = self.core.as_ref().block_until_not_full();
391 if state.closed {
392 assert!(state.as_ref().project_ref().queue.is_empty());
393 return Err(SendError(value));
394 }
395
396 state.as_mut().project().queue.push_back(value);
397
398 self.core.as_ref().wake_one_rx(state);
399 Ok(())
400 }
401
402 pub fn send_iter<I>(&self, values: I) -> Result<(), SendError<()>>
408 where
409 I: IntoIterator<Item = T>,
410 {
411 let mut values = values.into_iter();
412
413 let Some(mut value) = values.next() else {
415 return Ok(());
416 };
417
418 let mut sent_count = 0usize;
419
420 let mut state = self.core.as_ref().block_until_not_full();
421 'outer: loop {
422 if state.closed {
423 assert!(state.queue.is_empty());
426 return Err(SendError(()));
427 }
428
429 debug_assert!(state.has_capacity());
430 state.as_mut().project().queue.push_back(value);
431 sent_count += 1;
432 loop {
433 match values.next() {
434 Some(v) => {
435 if state.has_capacity() {
436 state.as_mut().project().queue.push_back(v);
437 sent_count += 1;
438 } else {
439 value = v;
440 state = self.core.as_ref().wake_rx_and_block_while_full(state);
444 continue 'outer;
445 }
446 }
447 None => {
448 if sent_count == 1 {
451 self.core.as_ref().wake_one_rx(state);
452 } else {
453 self.core.as_ref().wake_all_rx(state);
454 }
455 return Ok(());
456 }
457 }
458 }
459 }
460 }
461
462 pub fn send_vec(&self, mut values: Vec<T>) -> Result<Vec<T>, SendError<Vec<T>>> {
468 match self.send_iter(values.drain(..)) {
469 Ok(_) => Ok(values),
470 Err(_) => Err(SendError(values)),
471 }
472 }
473
474 pub fn autobatch<'a, F, R>(&'a mut self, batch_limit: usize, f: F) -> Result<R, SendError<()>>
477 where
478 F: (FnOnce(&mut SyncBatchSender<'a, T>) -> Result<R, SendError<()>>),
479 {
480 let mut tx = SyncBatchSender {
481 sender: self,
482 capacity: batch_limit,
483 buffer: Vec::with_capacity(batch_limit),
484 };
485 let r = f(&mut tx)?;
486 tx.drain()?;
487 Ok(r)
488 }
489}
490
491#[derive(Debug)]
495pub struct SyncBatchSender<'a, T> {
496 sender: &'a mut SyncSender<T>,
497 capacity: usize,
498 buffer: Vec<T>,
499}
500
501impl<T> SyncBatchSender<'_, T> {
502 pub fn send(&mut self, value: T) -> Result<(), SendError<()>> {
506 self.buffer.push(value);
507 if self.buffer.len() == self.capacity {
509 self.drain()
510 } else {
511 Ok(())
512 }
513 }
514
515 pub fn send_iter<I: IntoIterator<Item = T>>(&mut self, values: I) -> Result<(), SendError<()>> {
518 for value in values.into_iter() {
520 self.send(value)?;
521 }
522 Ok(())
523 }
524
525 pub fn drain(&mut self) -> Result<(), SendError<()>> {
527 match self.sender.send_vec(std::mem::take(&mut self.buffer)) {
529 Ok(drained_vec) => {
530 self.buffer = drained_vec;
531 Ok(())
532 }
533 Err(_) => Err(SendError(())),
534 }
535 }
536}
537
538#[derive(Debug)]
542pub struct Sender<T> {
543 core: Pin<splitrc::Tx<Core<T>>>,
544}
545
546derive_clone!(Sender);
547
548impl<T> Sender<T> {
549 pub fn into_sync(self) -> SyncSender<T> {
551 SyncSender { core: self.core }
552 }
553
554 pub fn send(&self, value: T) -> impl Future<Output = Result<(), SendError<T>>> + '_ {
558 Send {
559 sender: self,
560 value: Some(value),
561 waker: WakerSlot::new(),
562 }
563 }
564
565 pub fn send_iter<'a, I>(
570 &'a self,
571 values: I,
572 ) -> impl Future<Output = Result<(), SendError<()>>> + 'a
573 where
574 I: IntoIterator<Item = T> + 'a,
575 {
576 SendIter {
577 sender: self,
578 values: Some(values.into_iter().peekable()),
579 waker: WakerSlot::new(),
580 }
581 }
582
583 pub async fn autobatch<R>(
586 self,
587 batch_limit: usize,
588 f: impl AsyncFnOnce(&mut BatchSender<T>) -> Result<R, SendError<()>>,
589 ) -> Result<R, SendError<()>> {
590 let mut tx = BatchSender {
591 sender: self,
592 batch_limit,
593 buffer: Vec::with_capacity(batch_limit),
594 };
595 let r = f(&mut tx).await?;
596 tx.drain().await?;
597 Ok(r)
598 }
599
600 pub async fn autobatch_or_cancel(
606 self,
607 capacity: usize,
608 f: impl AsyncFnOnce(&mut BatchSender<T>) -> Result<(), SendError<()>>,
609 ) {
610 self.autobatch(capacity, f).await.unwrap_or(())
611 }
612}
613
614#[must_use = "futures do nothing unless you `.await` or poll them"]
615#[pin_project(PinnedDrop)]
616struct Send<'a, T> {
617 sender: &'a Sender<T>,
618 value: Option<T>,
619 #[pin]
620 waker: WakerSlot,
621}
622
623#[pinned_drop]
624impl<T> PinnedDrop for Send<'_, T> {
625 fn drop(mut self: Pin<&mut Self>) {
626 if self.waker.is_linked() {
627 let mut state = self.sender.core.as_ref().project_ref().state.lock();
628 state
629 .as_mut()
630 .base()
631 .project()
632 .tx_wakers
633 .unlink(self.project().waker);
634 }
635 }
636}
637
638impl<T> Future for Send<'_, T> {
639 type Output = Result<(), SendError<T>>;
640
641 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
642 let mut state = self.sender.core.as_ref().project_ref().state.lock();
643 if state.closed {
644 return Poll::Ready(Err(SendError(self.project().value.take().unwrap())));
645 }
646 if state.has_capacity() {
647 state
648 .as_mut()
649 .project()
650 .queue
651 .push_back(self.as_mut().project().value.take().unwrap());
652 self.project().sender.core.as_ref().wake_one_rx(state);
653 Poll::Ready(Ok(()))
654 } else {
655 state.as_mut().base().pending_tx(self.project().waker, cx)
656 }
657 }
658}
659
660#[must_use = "futures do nothing unless you `.await` or poll them"]
661#[pin_project(PinnedDrop)]
662struct SendIter<'a, T, I: Iterator<Item = T>> {
663 sender: &'a Sender<T>,
664 values: Option<Peekable<I>>,
665 #[pin]
666 waker: WakerSlot,
667}
668
669#[pinned_drop]
670impl<T, I: Iterator<Item = T>> PinnedDrop for SendIter<'_, T, I> {
671 fn drop(mut self: Pin<&mut Self>) {
672 if self.waker.is_linked() {
673 let mut state = self.sender.core.as_ref().project_ref().state.lock();
674 state
675 .as_mut()
676 .base()
677 .project()
678 .tx_wakers
679 .unlink(self.project().waker);
680 }
681 }
682}
683
684impl<T, I: Iterator<Item = T>> Future for SendIter<'_, T, I> {
685 type Output = Result<(), SendError<()>>;
686
687 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
688 {
691 let pi = self.as_mut().project().values.as_mut().unwrap();
692 if pi.peek().is_none() {
693 return Poll::Ready(Ok(()));
694 }
695 }
698
699 let mut state = self.sender.core.as_ref().project_ref().state.lock();
700
701 let pi = self.as_mut().project().values.as_mut().unwrap();
710 debug_assert!(pi.peek().is_some());
712 if state.closed {
713 Poll::Ready(Err(SendError(())))
714 } else if !state.has_capacity() {
715 state.as_mut().base().pending_tx(self.project().waker, cx)
717 } else {
718 debug_assert!(state.has_capacity());
719 state.as_mut().project().queue.push_back(pi.next().unwrap());
720 while state.has_capacity() {
721 match pi.next() {
722 Some(value) => {
723 state.as_mut().project().queue.push_back(value);
724 }
725 None => {
726 self.sender.core.as_ref().wake_all_rx(state);
730 return Poll::Ready(Ok(()));
731 }
732 }
733 }
734 if pi.peek().is_none() {
738 self.sender.core.as_ref().wake_all_rx(state);
739 return Poll::Ready(Ok(()));
740 }
741
742 let pending = state
744 .as_mut()
745 .base()
746 .pending_tx(self.as_mut().project().waker, cx);
747 self.sender.core.as_ref().wake_all_rx(state);
748 pending
749 }
750 }
751}
752
753pub struct BatchSender<T> {
758 sender: Sender<T>,
759 batch_limit: usize,
760 buffer: Vec<T>,
761}
762
763impl<T> BatchSender<T> {
764 pub async fn send(&mut self, value: T) -> Result<(), SendError<()>> {
767 self.buffer.push(value);
768 if self.buffer.len() == self.batch_limit {
769 self.drain().await?;
770 }
771 Ok(())
772 }
773
774 async fn drain(&mut self) -> Result<(), SendError<()>> {
775 self.sender.send_iter(self.buffer.drain(..)).await?;
776 assert!(self.buffer.is_empty());
777 Ok(())
778 }
779}
780
781#[derive(Debug)]
785pub struct Receiver<T> {
786 core: Pin<splitrc::Rx<Core<T>>>,
787}
788
789derive_clone!(Receiver);
790
791#[must_use = "futures do nothing unless you `.await` or poll them"]
792#[pin_project(PinnedDrop)]
793struct Recv<'a, T> {
794 receiver: &'a Receiver<T>,
795 #[pin]
796 waker: WakerSlot,
797}
798
799#[pinned_drop]
800impl<T> PinnedDrop for Recv<'_, T> {
801 fn drop(mut self: Pin<&mut Self>) {
802 if self.waker.is_linked() {
803 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
804 state
805 .as_mut()
806 .base()
807 .project()
808 .rx_wakers
809 .unlink(self.project().waker);
810 }
811 }
812}
813
814impl<T> Future for Recv<'_, T> {
815 type Output = Option<T>;
816
817 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
818 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
819 match state.as_mut().project().queue.pop_front() {
820 Some(value) => {
821 self.receiver.core.as_ref().wake_all_tx(state);
822 Poll::Ready(Some(value))
823 }
824 None => {
825 if state.closed {
826 Poll::Ready(None)
827 } else {
828 state.as_mut().base().pending_rx(self.project().waker, cx)
829 }
830 }
831 }
832 }
833}
834
835#[must_use = "futures do nothing unless you .await or poll them"]
836#[pin_project(PinnedDrop)]
837struct RecvBatch<'a, T> {
838 receiver: &'a Receiver<T>,
839 element_limit: usize,
840 #[pin]
841 waker: WakerSlot,
842}
843
844#[pinned_drop]
845impl<T> PinnedDrop for RecvBatch<'_, T> {
846 fn drop(mut self: Pin<&mut Self>) {
847 if self.waker.is_linked() {
848 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
849 state
850 .as_mut()
851 .base()
852 .project()
853 .rx_wakers
854 .unlink(self.project().waker);
855 }
856 }
857}
858
859impl<T> Future for RecvBatch<'_, T> {
860 type Output = Vec<T>;
861
862 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
863 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
864 let q = &mut state.as_mut().project().queue;
865 let q_len = q.len();
866 if q_len == 0 {
867 if state.closed {
868 return Poll::Ready(Vec::new());
869 } else {
870 return state.as_mut().base().pending_rx(self.project().waker, cx);
871 }
872 }
873
874 let capacity = min(q_len, self.element_limit);
875 let v = Vec::from_iter(q.drain(..capacity));
876 self.receiver.core.as_ref().wake_all_tx(state);
877 Poll::Ready(v)
878 }
879}
880
881#[must_use = "futures do nothing unless you .await or poll them"]
882#[pin_project(PinnedDrop)]
883struct RecvVec<'a, T> {
884 receiver: &'a Receiver<T>,
885 element_limit: usize,
886 vec: &'a mut Vec<T>,
887 #[pin]
888 waker: WakerSlot,
889}
890
891#[pinned_drop]
892impl<T> PinnedDrop for RecvVec<'_, T> {
893 fn drop(mut self: Pin<&mut Self>) {
894 if self.waker.is_linked() {
895 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
896 state
897 .as_mut()
898 .base()
899 .project()
900 .rx_wakers
901 .unlink(self.project().waker);
902 }
903 }
904}
905
906impl<T> Future for RecvVec<'_, T> {
907 type Output = ();
908
909 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
910 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
911 let q = &mut state.as_mut().project().queue;
912 let q_len = q.len();
913 if q_len == 0 {
914 if state.closed {
915 assert!(self.vec.is_empty());
916 return Poll::Ready(());
917 } else {
918 return state.as_mut().base().pending_rx(self.project().waker, cx);
919 }
920 }
921
922 let capacity = min(q_len, self.element_limit);
923 self.as_mut().project().vec.extend(q.drain(..capacity));
924 self.project().receiver.core.as_ref().wake_all_tx(state);
925 Poll::Ready(())
926 }
927}
928
929impl<T> Receiver<T> {
930 pub fn into_sync(self) -> SyncReceiver<T> {
932 SyncReceiver { core: self.core }
933 }
934
935 pub fn recv(&self) -> impl Future<Output = Option<T>> + '_ {
939 Recv {
940 receiver: self,
941 waker: WakerSlot::new(),
942 }
943 }
944
945 pub fn recv_batch(&self, element_limit: usize) -> impl Future<Output = Vec<T>> + '_ {
954 RecvBatch {
955 receiver: self,
956 element_limit,
957 waker: WakerSlot::new(),
958 }
959 }
960
961 pub fn recv_vec<'a>(
974 &'a self,
975 element_limit: usize,
976 vec: &'a mut Vec<T>,
977 ) -> impl Future<Output = ()> + 'a {
978 vec.clear();
979 RecvVec {
980 receiver: self,
981 element_limit,
982 vec,
983 waker: WakerSlot::new(),
984 }
985 }
986
987 #[cfg(feature = "futures-core")]
1000 pub fn stream<'a>(&'a self) -> Stream<'a, T> {
1001 let recv = Recv {
1002 receiver: self,
1003 waker: WakerSlot::new(),
1004 };
1005 Stream { recv }
1006 }
1007}
1008
1009#[cfg(feature = "futures-core")]
1012#[must_use = "streams do nothing unless you `.await` or poll them"]
1013#[pin_project]
1014pub struct Stream<'a, T> {
1015 #[pin]
1016 recv: Recv<'a, T>,
1017}
1018
1019#[cfg(feature = "futures-core")]
1020impl<'a, T> futures_core::Stream for Stream<'a, T> {
1021 type Item = T;
1022
1023 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1024 self.project().recv.poll(cx)
1025 }
1026}
1027
1028#[derive(Debug)]
1032pub struct SyncReceiver<T> {
1033 core: Pin<splitrc::Rx<Core<T>>>,
1034}
1035
1036derive_clone!(SyncReceiver);
1037
1038impl<T> SyncReceiver<T> {
1039 pub fn into_async(self) -> Receiver<T> {
1041 Receiver { core: self.core }
1042 }
1043
1044 pub fn recv(&self) -> Option<T> {
1048 let mut state = self.core.as_ref().block_until_not_empty();
1049 match state.as_mut().project().queue.pop_front() {
1050 Some(value) => {
1051 self.core.as_ref().wake_all_tx(state);
1052 Some(value)
1053 }
1054 None => {
1055 assert!(state.closed);
1056 None
1057 }
1058 }
1059 }
1060
1061 pub fn recv_batch(&self, element_limit: usize) -> Vec<T> {
1068 let mut state = self.core.as_ref().block_until_not_empty();
1069
1070 let q = &mut state.as_mut().project().queue;
1071 let q_len = q.len();
1072 if q_len == 0 {
1073 assert!(state.closed);
1074 return Vec::new();
1075 }
1076
1077 let capacity = min(q_len, element_limit);
1078 let v = Vec::from_iter(q.drain(..capacity));
1079 self.core.as_ref().wake_all_tx(state);
1080 v
1081 }
1082
1083 pub fn recv_vec(&self, element_limit: usize, vec: &mut Vec<T>) {
1094 vec.clear();
1095
1096 let mut state = self.core.as_ref().block_until_not_empty();
1097 let q = &mut state.as_mut().project().queue;
1098 let q_len = q.len();
1099 if q_len == 0 {
1100 assert!(state.closed);
1101 return;
1103 }
1104
1105 let capacity = min(q_len, element_limit);
1106 vec.extend(q.drain(..capacity));
1107 self.core.as_ref().wake_all_tx(state);
1108 }
1109}
1110
1111impl<T> Iterator for SyncReceiver<T> {
1112 type Item = T;
1113
1114 fn next(&mut self) -> Option<Self::Item> {
1115 self.recv()
1116 }
1117}
1118
1119pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
1127 Builder::new().bounded(capacity).build_async()
1128}
1129
1130pub fn bounded_sync<T>(capacity: usize) -> (SyncSender<T>, SyncReceiver<T>) {
1137 let (tx, rx) = bounded(capacity);
1138 (tx.into_sync(), rx.into_sync())
1139}
1140
1141pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1144 Builder::new().build_async()
1145}
1146
1147pub fn unbounded_sync<T>() -> (SyncSender<T>, SyncReceiver<T>) {
1150 let (tx, rx) = unbounded();
1151 (tx.into_sync(), rx.into_sync())
1152}
1153
1154#[derive(Debug, Default)]
1156pub struct Builder {
1157 capacity: Option<usize>,
1158 preallocate: bool,
1159}
1160
1161impl Builder {
1162 pub fn new() -> Self {
1164 Default::default()
1165 }
1166
1167 pub fn bounded(&mut self, capacity: usize) -> &mut Self {
1173 self.capacity = Some(capacity);
1174 self
1175 }
1176
1177 pub fn preallocate(&mut self) -> &mut Self {
1182 self.preallocate = true;
1183 self
1184 }
1185
1186 pub fn build_async<T>(&mut self) -> (Sender<T>, Receiver<T>) {
1189 let capacity;
1190 let queue;
1191 match self.capacity {
1192 Some(c) => {
1193 capacity = c;
1194 queue = if self.preallocate {
1195 VecDeque::with_capacity(capacity)
1196 } else {
1197 VecDeque::new()
1198 };
1199 }
1200 None => {
1201 capacity = UNBOUNDED_CAPACITY;
1202 queue = VecDeque::new();
1203 }
1204 };
1205
1206 let core = Core {
1207 state: Mutex::new(State {
1208 base: StateBase {
1209 capacity,
1210 closed: false,
1211 tx_wakers: WakerList::new(),
1212 rx_wakers: WakerList::new(),
1213 },
1214 queue,
1215 }),
1216 not_empty: OnceLock::new(),
1217 not_full: OnceLock::new(),
1218 };
1219 let (core_tx, core_rx) = splitrc::pin(core);
1220 (Sender { core: core_tx }, Receiver { core: core_rx })
1221 }
1222
1223 pub fn build_sync<T>(&mut self) -> (SyncSender<T>, SyncReceiver<T>) {
1226 let (tx, rx) = self.build_async();
1227 (tx.into_sync(), rx.into_sync())
1228 }
1229}