1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3#![doc = include_str!("../README.md")]
4#![doc = include_str!("example.md")]
5
6use mutex::PinnedCondvar as Condvar;
7use mutex::PinnedMutex as Mutex;
8use mutex::PinnedMutexGuard as MutexGuard;
9use pin_project::pin_project;
10use pin_project::pinned_drop;
11#[cfg(feature = "parking_lot")]
12use pinned_mutex::parking_lot as mutex;
13#[cfg(not(feature = "parking_lot"))]
14use pinned_mutex::std as mutex;
15use std::cmp::min;
16use std::collections::VecDeque;
17use std::fmt;
18use std::future::Future;
19use std::iter::Peekable;
20use std::ops::AsyncFnOnce;
21use std::pin::Pin;
22use std::sync::OnceLock;
23use std::task::Context;
24use std::task::Poll;
25use wakerset::ExtractedWakers;
26use wakerset::WakerList;
27use wakerset::WakerSlot;
28
29const UNBOUNDED_CAPACITY: usize = usize::MAX;
30
31macro_rules! derive_clone {
32 ($t:ident) => {
33 impl<T> Clone for $t<T> {
34 fn clone(&self) -> Self {
35 Self {
36 core: self.core.clone(),
37 }
38 }
39 }
40 };
41}
42
43#[derive(Debug)]
44#[pin_project]
45struct StateBase {
46 capacity: usize,
47 closed: bool,
48 #[pin]
49 tx_wakers: WakerList,
50 #[pin]
51 rx_wakers: WakerList,
52}
53
54impl StateBase {
55 fn target_capacity(&self) -> usize {
56 self.capacity
59 }
60
61 fn pending_tx<T>(
62 self: Pin<&mut StateBase>,
63 slot: Pin<&mut WakerSlot>,
64 cx: &mut Context,
65 ) -> Poll<T> {
66 self.project().tx_wakers.link(slot, cx.waker().clone());
69 Poll::Pending
70 }
71
72 fn pending_rx<T>(
73 self: Pin<&mut StateBase>,
74 slot: Pin<&mut WakerSlot>,
75 cx: &mut Context,
76 ) -> Poll<T> {
77 self.project().rx_wakers.link(slot, cx.waker().clone());
80 Poll::Pending
81 }
82}
83
84#[derive(Debug)]
85#[pin_project]
86struct State<T> {
87 #[pin]
88 base: StateBase,
89 queue: VecDeque<T>,
90}
91
92impl<T> State<T> {
93 fn has_capacity(&self) -> bool {
94 self.queue.len() < self.target_capacity()
95 }
96
97 fn base(self: Pin<&mut Self>) -> Pin<&mut StateBase> {
98 self.project().base
99 }
100}
101
102impl<T> std::ops::Deref for State<T> {
103 type Target = StateBase;
104
105 fn deref(&self) -> &Self::Target {
106 &self.base
107 }
108}
109
110impl<T> std::ops::DerefMut for State<T> {
111 fn deref_mut(&mut self) -> &mut Self::Target {
112 &mut self.base
113 }
114}
115
116#[derive(Debug)]
117#[pin_project]
118struct Core<T> {
119 #[pin]
120 state: Mutex<State<T>>,
121 not_empty: OnceLock<Condvar>,
126 not_full: OnceLock<Condvar>,
127}
128
129impl<T> Core<T> {
130 fn block_until_not_empty(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
133 fn condition<T>(s: Pin<&mut State<T>>) -> bool {
134 !s.closed && s.queue.is_empty()
135 }
136
137 let mut state = self.project_ref().state.lock();
138 if !condition(state.as_mut()) {
139 return state;
140 }
141 let not_empty = self.not_empty.get_or_init(Default::default);
145 not_empty.wait_while(state, condition)
146 }
147
148 fn block_until_not_full(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
151 fn condition<T>(s: Pin<&mut State<T>>) -> bool {
152 !s.closed && !s.has_capacity()
153 }
154
155 let mut state = self.project_ref().state.lock();
156 if !condition(state.as_mut()) {
157 return state;
158 }
159 let not_full = self.not_full.get_or_init(Default::default);
163 not_full.wait_while(state, condition)
164 }
165
166 fn wake_rx_and_block_while_full<'a>(
169 self: Pin<&'a Self>,
170 mut state: MutexGuard<'a, State<T>>,
171 ) -> MutexGuard<'a, State<T>> {
172 let cvar = self.not_empty.get();
175
176 let round = state
179 .as_mut()
180 .project()
181 .base
182 .project()
183 .rx_wakers
184 .begin_extraction();
185 let mut wakers = ExtractedWakers::new();
186 loop {
190 let more = state
191 .as_mut()
192 .project()
193 .base
194 .project()
195 .rx_wakers
196 .extract_some_wakers(round, &mut wakers);
197 drop(state);
198 wakers.wake_all();
199 if !more {
200 break;
201 }
202 state = self.project_ref().state.lock();
203 }
204
205 if let Some(cvar) = cvar {
209 cvar.notify_all();
212 }
213
214 state = self.project_ref().state.lock();
215
216 let not_full = self.not_full.get_or_init(Default::default);
220 not_full.wait_while(state, |s| !s.closed && !s.has_capacity())
221 }
222
223 fn wake_all_tx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
224 let cvar = self.not_full.get();
226
227 if state.as_mut().project().base.project().tx_wakers.is_empty() {
228 drop(state);
229 } else {
230 let round = state
231 .as_mut()
232 .project()
233 .base
234 .project()
235 .tx_wakers
236 .begin_extraction();
237 let mut wakers = ExtractedWakers::new();
238 loop {
239 let more = state
240 .as_mut()
241 .project()
242 .base
243 .project()
244 .tx_wakers
245 .extract_some_wakers(round, &mut wakers);
246 drop(state);
247 wakers.wake_all();
248 if !more {
249 break;
250 }
251 state = self.project_ref().state.lock();
252 }
253 }
254
255 if let Some(cvar) = cvar {
256 cvar.notify_all();
259 }
260 }
261
262 fn wake_one_rx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
263 let cvar = self.not_empty.get();
265 if state.as_mut().project().base.project().rx_wakers.is_empty() {
266 drop(state);
267 } else {
268 let round = state
269 .as_mut()
270 .project()
271 .base
272 .project()
273 .rx_wakers
274 .begin_extraction();
275 let mut wakers = ExtractedWakers::new();
276 loop {
280 let more = state
281 .as_mut()
282 .project()
283 .base
284 .project()
285 .rx_wakers
286 .extract_some_wakers(round, &mut wakers);
287 drop(state);
288 wakers.wake_all();
289 if !more {
290 break;
291 }
292 state = self.project_ref().state.lock();
293 }
294 }
295
296 if let Some(cvar) = cvar {
297 cvar.notify_one();
298 }
299 }
300
301 fn wake_all_rx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
302 let cvar = self.not_empty.get();
304 let round = state
305 .as_mut()
306 .project()
307 .base
308 .project()
309 .rx_wakers
310 .begin_extraction();
311 let mut wakers = ExtractedWakers::new();
312 loop {
316 let more = state
317 .as_mut()
318 .project()
319 .base
320 .project()
321 .rx_wakers
322 .extract_some_wakers(round, &mut wakers);
323 drop(state);
324 wakers.wake_all();
325 if !more {
326 break;
327 }
328 state = self.project_ref().state.lock();
329 }
330
331 if let Some(cvar) = cvar {
332 cvar.notify_all();
333 }
334 }
335}
336
337impl<T> splitrc::Notify for Core<T> {
338 fn last_tx_did_drop_pinned(self: Pin<&Self>) {
339 let mut state = self.project_ref().state.lock();
340 *state.as_mut().base().project().closed = true;
341 self.wake_all_rx(state);
344 }
345
346 fn last_rx_did_drop_pinned(self: Pin<&Self>) {
347 let mut state = self.project_ref().state.lock();
348 *state.as_mut().base().project().closed = true;
349 state.as_mut().project().queue.clear();
351 self.wake_all_tx(state);
352 }
353}
354
355#[derive(Clone, Copy, Debug, Eq, PartialEq)]
362pub struct SendError<T>(pub T);
363
364impl<T> fmt::Display for SendError<T> {
365 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
366 write!(f, "failed to send value on channel")
367 }
368}
369
370impl<T: fmt::Debug> std::error::Error for SendError<T> {}
371
372#[derive(Debug)]
376pub struct SyncSender<T> {
377 core: Pin<splitrc::Tx<Core<T>>>,
378}
379
380derive_clone!(SyncSender);
381
382impl<T> SyncSender<T> {
383 pub fn into_async(self) -> Sender<T> {
385 Sender { core: self.core }
386 }
387
388 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
392 let mut state = self.core.as_ref().block_until_not_full();
393 if state.closed {
394 assert!(state.as_ref().project_ref().queue.is_empty());
395 return Err(SendError(value));
396 }
397
398 state.as_mut().project().queue.push_back(value);
399
400 self.core.as_ref().wake_one_rx(state);
401 Ok(())
402 }
403
404 pub fn send_iter<I>(&self, values: I) -> Result<(), SendError<()>>
410 where
411 I: IntoIterator<Item = T>,
412 {
413 let mut values = values.into_iter();
414
415 let Some(mut value) = values.next() else {
417 return Ok(());
418 };
419
420 let mut sent_count = 0usize;
421
422 let mut state = self.core.as_ref().block_until_not_full();
423 'outer: loop {
424 if state.closed {
425 assert!(state.queue.is_empty());
428 return Err(SendError(()));
429 }
430
431 debug_assert!(state.has_capacity());
432 state.as_mut().project().queue.push_back(value);
433 sent_count += 1;
434 loop {
435 match values.next() {
436 Some(v) => {
437 if state.has_capacity() {
438 state.as_mut().project().queue.push_back(v);
439 sent_count += 1;
440 } else {
441 value = v;
442 state = self.core.as_ref().wake_rx_and_block_while_full(state);
446 continue 'outer;
447 }
448 }
449 None => {
450 if sent_count == 1 {
453 self.core.as_ref().wake_one_rx(state);
454 } else {
455 self.core.as_ref().wake_all_rx(state);
456 }
457 return Ok(());
458 }
459 }
460 }
461 }
462 }
463
464 pub fn send_vec(&self, mut values: Vec<T>) -> Result<Vec<T>, SendError<Vec<T>>> {
470 match self.send_iter(values.drain(..)) {
471 Ok(_) => Ok(values),
472 Err(_) => Err(SendError(values)),
473 }
474 }
475
476 pub fn autobatch<'a, F, R>(&'a mut self, batch_limit: usize, f: F) -> Result<R, SendError<()>>
479 where
480 F: (FnOnce(&mut SyncBatchSender<'a, T>) -> Result<R, SendError<()>>),
481 {
482 let mut tx = SyncBatchSender {
483 sender: self,
484 capacity: batch_limit,
485 buffer: Vec::with_capacity(batch_limit),
486 };
487 let r = f(&mut tx)?;
488 tx.drain()?;
489 Ok(r)
490 }
491}
492
493#[derive(Debug)]
497pub struct SyncBatchSender<'a, T> {
498 sender: &'a mut SyncSender<T>,
499 capacity: usize,
500 buffer: Vec<T>,
501}
502
503impl<T> SyncBatchSender<'_, T> {
504 pub fn send(&mut self, value: T) -> Result<(), SendError<()>> {
508 self.buffer.push(value);
509 if self.buffer.len() == self.capacity {
511 self.drain()
512 } else {
513 Ok(())
514 }
515 }
516
517 pub fn send_iter<I: IntoIterator<Item = T>>(&mut self, values: I) -> Result<(), SendError<()>> {
520 for value in values.into_iter() {
522 self.send(value)?;
523 }
524 Ok(())
525 }
526
527 pub fn drain(&mut self) -> Result<(), SendError<()>> {
529 match self.sender.send_vec(std::mem::take(&mut self.buffer)) {
531 Ok(drained_vec) => {
532 self.buffer = drained_vec;
533 Ok(())
534 }
535 Err(_) => Err(SendError(())),
536 }
537 }
538}
539
540#[derive(Debug)]
544pub struct Sender<T> {
545 core: Pin<splitrc::Tx<Core<T>>>,
546}
547
548derive_clone!(Sender);
549
550impl<T> Sender<T> {
551 pub fn into_sync(self) -> SyncSender<T> {
553 SyncSender { core: self.core }
554 }
555
556 pub fn send(&self, value: T) -> impl Future<Output = Result<(), SendError<T>>> + '_ {
560 Send {
561 sender: self,
562 value: Some(value),
563 waker: WakerSlot::new(),
564 }
565 }
566
567 pub fn send_iter<'a, I>(
572 &'a self,
573 values: I,
574 ) -> impl Future<Output = Result<(), SendError<()>>> + 'a
575 where
576 I: IntoIterator<Item = T> + 'a,
577 {
578 SendIter {
579 sender: self,
580 values: Some(values.into_iter().peekable()),
581 waker: WakerSlot::new(),
582 }
583 }
584
585 pub async fn autobatch<R>(
588 self,
589 batch_limit: usize,
590 f: impl AsyncFnOnce(&mut BatchSender<T>) -> Result<R, SendError<()>>,
591 ) -> Result<R, SendError<()>> {
592 let mut tx = BatchSender {
593 sender: self,
594 batch_limit,
595 buffer: Vec::with_capacity(batch_limit),
596 };
597 let r = f(&mut tx).await?;
598 tx.drain().await?;
599 Ok(r)
600 }
601
602 pub async fn autobatch_or_cancel(
608 self,
609 capacity: usize,
610 f: impl AsyncFnOnce(&mut BatchSender<T>) -> Result<(), SendError<()>>,
611 ) {
612 self.autobatch(capacity, f).await.unwrap_or(())
613 }
614}
615
616#[must_use = "futures do nothing unless you `.await` or poll them"]
617#[pin_project(PinnedDrop)]
618struct Send<'a, T> {
619 sender: &'a Sender<T>,
620 value: Option<T>,
621 #[pin]
622 waker: WakerSlot,
623}
624
625#[pinned_drop]
626impl<T> PinnedDrop for Send<'_, T> {
627 fn drop(mut self: Pin<&mut Self>) {
628 if self.waker.is_linked() {
629 let mut state = self.sender.core.as_ref().project_ref().state.lock();
630 state
631 .as_mut()
632 .base()
633 .project()
634 .tx_wakers
635 .unlink(self.project().waker);
636 }
637 }
638}
639
640impl<T> Future for Send<'_, T> {
641 type Output = Result<(), SendError<T>>;
642
643 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
644 let mut state = self.sender.core.as_ref().project_ref().state.lock();
645 if state.closed {
646 return Poll::Ready(Err(SendError(self.project().value.take().unwrap())));
647 }
648 if state.has_capacity() {
649 state
650 .as_mut()
651 .project()
652 .queue
653 .push_back(self.as_mut().project().value.take().unwrap());
654 self.project().sender.core.as_ref().wake_one_rx(state);
655 Poll::Ready(Ok(()))
656 } else {
657 state.as_mut().base().pending_tx(self.project().waker, cx)
658 }
659 }
660}
661
662#[must_use = "futures do nothing unless you `.await` or poll them"]
663#[pin_project(PinnedDrop)]
664struct SendIter<'a, T, I: Iterator<Item = T>> {
665 sender: &'a Sender<T>,
666 values: Option<Peekable<I>>,
667 #[pin]
668 waker: WakerSlot,
669}
670
671#[pinned_drop]
672impl<T, I: Iterator<Item = T>> PinnedDrop for SendIter<'_, T, I> {
673 fn drop(mut self: Pin<&mut Self>) {
674 if self.waker.is_linked() {
675 let mut state = self.sender.core.as_ref().project_ref().state.lock();
676 state
677 .as_mut()
678 .base()
679 .project()
680 .tx_wakers
681 .unlink(self.project().waker);
682 }
683 }
684}
685
686impl<T, I: Iterator<Item = T>> Future for SendIter<'_, T, I> {
687 type Output = Result<(), SendError<()>>;
688
689 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
690 {
693 let pi = self.as_mut().project().values.as_mut().unwrap();
694 if pi.peek().is_none() {
695 return Poll::Ready(Ok(()));
696 }
697 }
700
701 let mut state = self.sender.core.as_ref().project_ref().state.lock();
702
703 let pi = self.as_mut().project().values.as_mut().unwrap();
712 debug_assert!(pi.peek().is_some());
714 if state.closed {
715 Poll::Ready(Err(SendError(())))
716 } else if !state.has_capacity() {
717 state.as_mut().base().pending_tx(self.project().waker, cx)
719 } else {
720 debug_assert!(state.has_capacity());
721 state.as_mut().project().queue.push_back(pi.next().unwrap());
722 while state.has_capacity() {
723 match pi.next() {
724 Some(value) => {
725 state.as_mut().project().queue.push_back(value);
726 }
727 None => {
728 self.sender.core.as_ref().wake_all_rx(state);
732 return Poll::Ready(Ok(()));
733 }
734 }
735 }
736 if pi.peek().is_none() {
740 self.sender.core.as_ref().wake_all_rx(state);
741 return Poll::Ready(Ok(()));
742 }
743
744 let pending = state
746 .as_mut()
747 .base()
748 .pending_tx(self.as_mut().project().waker, cx);
749 self.sender.core.as_ref().wake_all_rx(state);
750 pending
751 }
752 }
753}
754
755pub struct BatchSender<T> {
760 sender: Sender<T>,
761 batch_limit: usize,
762 buffer: Vec<T>,
763}
764
765impl<T> BatchSender<T> {
766 pub async fn send(&mut self, value: T) -> Result<(), SendError<()>> {
769 self.buffer.push(value);
770 if self.buffer.len() == self.batch_limit {
771 self.drain().await?;
772 }
773 Ok(())
774 }
775
776 async fn drain(&mut self) -> Result<(), SendError<()>> {
777 self.sender.send_iter(self.buffer.drain(..)).await?;
778 assert!(self.buffer.is_empty());
779 Ok(())
780 }
781}
782
783#[derive(Debug)]
787pub struct Receiver<T> {
788 core: Pin<splitrc::Rx<Core<T>>>,
789}
790
791derive_clone!(Receiver);
792
793#[must_use = "futures do nothing unless you `.await` or poll them"]
794#[pin_project(PinnedDrop)]
795struct Recv<'a, T> {
796 receiver: &'a Receiver<T>,
797 #[pin]
798 waker: WakerSlot,
799}
800
801#[pinned_drop]
802impl<T> PinnedDrop for Recv<'_, T> {
803 fn drop(mut self: Pin<&mut Self>) {
804 if self.waker.is_linked() {
805 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
806 state
807 .as_mut()
808 .base()
809 .project()
810 .rx_wakers
811 .unlink(self.project().waker);
812 }
813 }
814}
815
816impl<T> Future for Recv<'_, T> {
817 type Output = Option<T>;
818
819 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
820 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
821 match state.as_mut().project().queue.pop_front() {
822 Some(value) => {
823 self.receiver.core.as_ref().wake_all_tx(state);
824 Poll::Ready(Some(value))
825 }
826 None => {
827 if state.closed {
828 Poll::Ready(None)
829 } else {
830 state.as_mut().base().pending_rx(self.project().waker, cx)
831 }
832 }
833 }
834 }
835}
836
837#[must_use = "futures do nothing unless you .await or poll them"]
838#[pin_project(PinnedDrop)]
839struct RecvBatch<'a, T> {
840 receiver: &'a Receiver<T>,
841 element_limit: usize,
842 #[pin]
843 waker: WakerSlot,
844}
845
846#[pinned_drop]
847impl<T> PinnedDrop for RecvBatch<'_, T> {
848 fn drop(mut self: Pin<&mut Self>) {
849 if self.waker.is_linked() {
850 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
851 state
852 .as_mut()
853 .base()
854 .project()
855 .rx_wakers
856 .unlink(self.project().waker);
857 }
858 }
859}
860
861impl<T> Future for RecvBatch<'_, T> {
862 type Output = Vec<T>;
863
864 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
865 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
866 let q = &mut state.as_mut().project().queue;
867 let q_len = q.len();
868 if q_len == 0 {
869 if state.closed {
870 return Poll::Ready(Vec::new());
871 } else {
872 return state.as_mut().base().pending_rx(self.project().waker, cx);
873 }
874 }
875
876 let capacity = min(q_len, self.element_limit);
877 let v = Vec::from_iter(q.drain(..capacity));
878 self.receiver.core.as_ref().wake_all_tx(state);
879 Poll::Ready(v)
880 }
881}
882
883#[must_use = "futures do nothing unless you .await or poll them"]
884#[pin_project(PinnedDrop)]
885struct RecvVec<'a, T> {
886 receiver: &'a Receiver<T>,
887 element_limit: usize,
888 vec: &'a mut Vec<T>,
889 #[pin]
890 waker: WakerSlot,
891}
892
893#[pinned_drop]
894impl<T> PinnedDrop for RecvVec<'_, T> {
895 fn drop(mut self: Pin<&mut Self>) {
896 if self.waker.is_linked() {
897 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
898 state
899 .as_mut()
900 .base()
901 .project()
902 .rx_wakers
903 .unlink(self.project().waker);
904 }
905 }
906}
907
908impl<T> Future for RecvVec<'_, T> {
909 type Output = ();
910
911 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
912 let mut state = self.receiver.core.as_ref().project_ref().state.lock();
913 let q = &mut state.as_mut().project().queue;
914 let q_len = q.len();
915 if q_len == 0 {
916 if state.closed {
917 assert!(self.vec.is_empty());
918 return Poll::Ready(());
919 } else {
920 return state.as_mut().base().pending_rx(self.project().waker, cx);
921 }
922 }
923
924 let capacity = min(q_len, self.element_limit);
925 self.as_mut().project().vec.extend(q.drain(..capacity));
926 self.project().receiver.core.as_ref().wake_all_tx(state);
927 Poll::Ready(())
928 }
929}
930
931impl<T> Receiver<T> {
932 pub fn into_sync(self) -> SyncReceiver<T> {
934 SyncReceiver { core: self.core }
935 }
936
937 pub fn recv(&self) -> impl Future<Output = Option<T>> + '_ {
941 Recv {
942 receiver: self,
943 waker: WakerSlot::new(),
944 }
945 }
946
947 pub fn recv_batch(&self, element_limit: usize) -> impl Future<Output = Vec<T>> + '_ {
956 RecvBatch {
957 receiver: self,
958 element_limit,
959 waker: WakerSlot::new(),
960 }
961 }
962
963 pub fn recv_vec<'a>(
976 &'a self,
977 element_limit: usize,
978 vec: &'a mut Vec<T>,
979 ) -> impl Future<Output = ()> + 'a {
980 vec.clear();
981 RecvVec {
982 receiver: self,
983 element_limit,
984 vec,
985 waker: WakerSlot::new(),
986 }
987 }
988
989 #[cfg(feature = "futures-core")]
1003 #[cfg_attr(docsrs, doc(cfg(feature = "futures-core")))]
1004 pub fn stream<'a>(&'a self) -> Stream<'a, T> {
1005 let recv = Recv {
1006 receiver: self,
1007 waker: WakerSlot::new(),
1008 };
1009 Stream { recv }
1010 }
1011}
1012
1013#[cfg(feature = "futures-core")]
1018#[cfg_attr(docsrs, doc(cfg(feature = "futures-core")))]
1019#[must_use = "streams do nothing unless you `.await` or poll them"]
1020#[pin_project]
1021pub struct Stream<'a, T> {
1022 #[pin]
1023 recv: Recv<'a, T>,
1024}
1025
1026#[cfg(feature = "futures-core")]
1027impl<'a, T> futures_core::Stream for Stream<'a, T> {
1028 type Item = T;
1029
1030 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1031 self.project().recv.poll(cx)
1032 }
1033}
1034
1035#[derive(Debug)]
1039pub struct SyncReceiver<T> {
1040 core: Pin<splitrc::Rx<Core<T>>>,
1041}
1042
1043derive_clone!(SyncReceiver);
1044
1045impl<T> SyncReceiver<T> {
1046 pub fn into_async(self) -> Receiver<T> {
1048 Receiver { core: self.core }
1049 }
1050
1051 pub fn recv(&self) -> Option<T> {
1055 let mut state = self.core.as_ref().block_until_not_empty();
1056 match state.as_mut().project().queue.pop_front() {
1057 Some(value) => {
1058 self.core.as_ref().wake_all_tx(state);
1059 Some(value)
1060 }
1061 None => {
1062 assert!(state.closed);
1063 None
1064 }
1065 }
1066 }
1067
1068 pub fn recv_batch(&self, element_limit: usize) -> Vec<T> {
1075 let mut state = self.core.as_ref().block_until_not_empty();
1076
1077 let q = &mut state.as_mut().project().queue;
1078 let q_len = q.len();
1079 if q_len == 0 {
1080 assert!(state.closed);
1081 return Vec::new();
1082 }
1083
1084 let capacity = min(q_len, element_limit);
1085 let v = Vec::from_iter(q.drain(..capacity));
1086 self.core.as_ref().wake_all_tx(state);
1087 v
1088 }
1089
1090 pub fn recv_vec(&self, element_limit: usize, vec: &mut Vec<T>) {
1101 vec.clear();
1102
1103 let mut state = self.core.as_ref().block_until_not_empty();
1104 let q = &mut state.as_mut().project().queue;
1105 let q_len = q.len();
1106 if q_len == 0 {
1107 assert!(state.closed);
1108 return;
1110 }
1111
1112 let capacity = min(q_len, element_limit);
1113 vec.extend(q.drain(..capacity));
1114 self.core.as_ref().wake_all_tx(state);
1115 }
1116}
1117
1118impl<T> Iterator for SyncReceiver<T> {
1119 type Item = T;
1120
1121 fn next(&mut self) -> Option<Self::Item> {
1122 self.recv()
1123 }
1124}
1125
1126pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
1134 Builder::new().bounded(capacity).build_async()
1135}
1136
1137pub fn bounded_sync<T>(capacity: usize) -> (SyncSender<T>, SyncReceiver<T>) {
1144 let (tx, rx) = bounded(capacity);
1145 (tx.into_sync(), rx.into_sync())
1146}
1147
1148pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1151 Builder::new().build_async()
1152}
1153
1154pub fn unbounded_sync<T>() -> (SyncSender<T>, SyncReceiver<T>) {
1157 let (tx, rx) = unbounded();
1158 (tx.into_sync(), rx.into_sync())
1159}
1160
1161#[derive(Debug, Default)]
1163pub struct Builder {
1164 capacity: Option<usize>,
1165 preallocate: bool,
1166}
1167
1168impl Builder {
1169 pub fn new() -> Self {
1171 Default::default()
1172 }
1173
1174 pub fn bounded(&mut self, capacity: usize) -> &mut Self {
1180 self.capacity = Some(capacity);
1181 self
1182 }
1183
1184 pub fn preallocate(&mut self) -> &mut Self {
1189 self.preallocate = true;
1190 self
1191 }
1192
1193 pub fn build_async<T>(&mut self) -> (Sender<T>, Receiver<T>) {
1196 let capacity;
1197 let queue;
1198 match self.capacity {
1199 Some(c) => {
1200 capacity = c;
1201 queue = if self.preallocate {
1202 VecDeque::with_capacity(capacity)
1203 } else {
1204 VecDeque::new()
1205 };
1206 }
1207 None => {
1208 capacity = UNBOUNDED_CAPACITY;
1209 queue = VecDeque::new();
1210 }
1211 };
1212
1213 let core = Core {
1214 state: Mutex::new(State {
1215 base: StateBase {
1216 capacity,
1217 closed: false,
1218 tx_wakers: WakerList::new(),
1219 rx_wakers: WakerList::new(),
1220 },
1221 queue,
1222 }),
1223 not_empty: OnceLock::new(),
1224 not_full: OnceLock::new(),
1225 };
1226 let (core_tx, core_rx) = splitrc::pin(core);
1227 (Sender { core: core_tx }, Receiver { core: core_rx })
1228 }
1229
1230 pub fn build_sync<T>(&mut self) -> (SyncSender<T>, SyncReceiver<T>) {
1233 let (tx, rx) = self.build_async();
1234 (tx.into_sync(), rx.into_sync())
1235 }
1236}