1#![forbid(unsafe_code)]
40#![warn(missing_docs)]
41
42mod mutex;
43mod queue;
44mod signal;
45
46use std::fmt::Debug;
47use std::future::Future;
48use std::marker::PhantomData;
49use std::pin::Pin;
50use std::sync::atomic::AtomicUsize;
51use std::sync::atomic::Ordering;
52use std::sync::Arc;
53use std::task::ready;
54use std::task::{Context, Poll};
55use std::time::{Duration, Instant};
56
57use futures_core::Stream;
58use futures_sink::Sink;
59
60use mutex::MutexGuard;
61use queue::Queue;
62
63use crate::mutex::Mutex;
64use crate::signal::{Signal, SyncSignal};
65
66#[derive(PartialEq, Eq, Clone, Copy, Debug)]
69pub enum RecvError {
70 Disconnected,
73}
74
75impl std::error::Error for RecvError {}
76
77impl std::fmt::Display for RecvError {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 match self {
80 RecvError::Disconnected => f.write_str("receiving on a closed channel"),
81 }
82 }
83}
84
85#[derive(PartialEq, Eq, Clone, Copy)]
87pub struct SendError<T>(pub T);
88
89impl<T> std::error::Error for SendError<T> {}
90
91impl<T> SendError<T> {
92 pub fn into_inner(self) -> T {
94 self.0
95 }
96}
97
98impl<T> std::fmt::Display for SendError<T> {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 f.write_str("sending on a closed channel")
101 }
102}
103
104impl<T> std::fmt::Debug for SendError<T> {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.write_str("SendError(..)")
107 }
108}
109
110#[derive(PartialEq, Eq, Clone, Copy)]
115pub enum TrySendError<T> {
116 Full(T),
118 Disconnected(T),
120}
121
122impl<T> TrySendError<T> {
123 pub fn into_inner(self) -> T {
125 match self {
126 Self::Full(msg) | Self::Disconnected(msg) => msg,
127 }
128 }
129}
130
131impl<T> std::fmt::Debug for TrySendError<T> {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 match *self {
134 TrySendError::Full(..) => f.write_str("Full(..)"),
135 TrySendError::Disconnected(..) => f.write_str("Disconnected(..)"),
136 }
137 }
138}
139
140impl<T> std::fmt::Display for TrySendError<T> {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 match self {
143 TrySendError::Full(..) => f.write_str("sending on a full channel"),
144 TrySendError::Disconnected(..) => f.write_str("sending on a closed channel"),
145 }
146 }
147}
148
149impl<T> std::error::Error for TrySendError<T> {}
150
151impl<T> From<SendError<T>> for TrySendError<T> {
152 fn from(err: SendError<T>) -> Self {
153 match err {
154 SendError(item) => Self::Disconnected(item),
155 }
156 }
157}
158
159#[derive(PartialEq, Eq, Clone, Copy, Debug)]
163pub enum TryRecvError {
164 Empty,
166 Disconnected,
169}
170
171impl std::fmt::Display for TryRecvError {
172 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173 match self {
174 TryRecvError::Empty => f.write_str("receiving on an empty channel"),
175 TryRecvError::Disconnected => f.write_str("channel is empty and closed"),
176 }
177 }
178}
179
180impl std::error::Error for TryRecvError {}
181
182impl From<RecvError> for TryRecvError {
183 fn from(err: RecvError) -> Self {
184 match err {
185 RecvError::Disconnected => Self::Disconnected,
186 }
187 }
188}
189
190#[derive(PartialEq, Eq, Clone, Copy, Debug)]
194pub enum RecvTimeoutError {
195 Timeout,
197 Disconnected,
199}
200
201impl std::error::Error for RecvTimeoutError {}
202
203impl std::fmt::Display for RecvTimeoutError {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 match self {
206 RecvTimeoutError::Timeout => f.write_str("timed out waiting on a channel"),
207 RecvTimeoutError::Disconnected => f.write_str("channel is empty and closed"),
208 }
209 }
210}
211
212#[derive(Copy, Clone, PartialEq, Eq)]
219pub enum SendTimeoutError<T> {
220 Timeout(T),
222 Disconnected(T),
224}
225
226impl<T> std::error::Error for SendTimeoutError<T> {}
227
228impl<T> SendTimeoutError<T> {
229 pub fn into_inner(self) -> T {
231 match self {
232 Self::Timeout(msg) | Self::Disconnected(msg) => msg,
233 }
234 }
235}
236
237impl<T> std::fmt::Debug for SendTimeoutError<T> {
238 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 f.write_str("SendTimeoutError(..)")
240 }
241}
242
243impl<T> std::fmt::Display for SendTimeoutError<T> {
244 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245 match self {
246 SendTimeoutError::Timeout(..) => f.write_str("timed out sending on a full channel"),
247 SendTimeoutError::Disconnected(..) => f.write_str("sending on a closed channel"),
248 }
249 }
250}
251
252impl<T> From<SendError<T>> for SendTimeoutError<T> {
253 fn from(value: SendError<T>) -> Self {
254 SendTimeoutError::Disconnected(value.0)
255 }
256}
257
258pub struct Iter<'a, T> {
260 receiver: &'a Receiver<T>,
261}
262
263pub struct TryIter<'a, T> {
265 receiver: &'a Receiver<T>,
266}
267
268pub struct IntoIter<T> {
270 receiver: Receiver<T>,
271}
272
273impl<T> Iterator for IntoIter<T> {
274 type Item = T;
275
276 fn next(&mut self) -> Option<Self::Item> {
277 self.receiver.recv().ok()
278 }
279}
280
281#[derive(Debug)]
282struct SharedState<T> {
283 pending_recvs: Queue<Signal>,
284 pending_sends: Queue<(T, Option<Signal>)>,
285 queue: Queue<T>,
286 closed: bool,
287 cap: Option<usize>,
288 next_id: usize,
289}
290
291impl<T> SharedState<T> {
292 fn new(cap: Option<usize>) -> Self {
293 let pending_sends = cap.map_or_else(Queue::new, Queue::with_capacity);
294 Self {
295 pending_recvs: Queue::new(),
296 pending_sends,
297 queue: Queue::new(),
298 closed: false,
299 cap,
300 next_id: 1,
301 }
302 }
303
304 fn len(&self) -> usize {
305 self.queue.len()
306 }
307
308 fn is_full(&self) -> bool {
309 Some(self.len()) == self.cap
310 }
311
312 fn is_empty(&self) -> bool {
313 self.len() == 0
314 }
315
316 fn get_next_id(&mut self) -> usize {
317 let id = self.next_id;
318 self.next_id = self.next_id.wrapping_add(1);
319 id
320 }
321
322 fn close(&mut self) -> bool {
323 let was_closed = self.closed;
324 self.closed = true;
325 for (_, s) in self.pending_recvs.iter() {
326 s.wake_by_ref();
327 }
328 for (_, (_, s)) in self.pending_sends.iter() {
329 if let Some(s) = s {
330 s.wake_by_ref();
331 }
332 }
333 !was_closed
334 }
335
336 fn is_closed(&self) -> bool {
337 self.closed
338 }
339}
340
341enum TrySendResult<'a, T> {
342 Ok,
343 Disconnected(T),
344 Full(T, MutexGuard<'a, SharedState<T>>),
345}
346
347#[inline(always)]
348fn try_send<T>(m: T, id: usize, mut guard: MutexGuard<'_, SharedState<T>>) -> TrySendResult<T> {
349 if guard.closed {
350 return TrySendResult::Disconnected(m);
351 }
352 if !guard.is_full() {
353 guard.queue.enqueue(id, m);
354
355 let pending_recvs = std::mem::take(&mut guard.pending_recvs);
356 drop(guard);
357 for (_, s) in pending_recvs.iter() {
358 s.wake_by_ref();
359 }
360
361 return TrySendResult::Ok;
362 } else if guard.cap == Some(0) {
363 if let Some((_, s)) = guard.pending_recvs.dequeue() {
364 guard.pending_sends.enqueue(id, (m, None));
365 drop(guard);
366 s.wake();
367 return TrySendResult::Ok;
368 }
369 }
370 TrySendResult::Full(m, guard)
371}
372
373enum TryRecvResult<'a, T> {
374 Ok(T),
375 Disconnected,
376 Empty(MutexGuard<'a, SharedState<T>>),
377}
378
379#[inline(always)]
380fn try_recv<T>(mut guard: MutexGuard<'_, SharedState<T>>) -> TryRecvResult<T> {
381 if let Some((_, m)) = guard.queue.dequeue() {
382 if let Some((id, (m, s))) = guard.pending_sends.dequeue() {
383 guard.queue.enqueue(id, m);
384 if let Some(s) = s {
385 drop(guard);
386 s.wake();
387 }
388 }
389 return TryRecvResult::Ok(m);
390 } else if guard.cap == Some(0) {
391 if let Some((_, (m, s))) = guard.pending_sends.dequeue() {
392 if let Some(s) = s {
393 drop(guard);
394 s.wake();
395 }
396 return TryRecvResult::Ok(m);
397 }
398 }
399 if guard.closed {
400 return TryRecvResult::Disconnected;
401 }
402 TryRecvResult::Empty(guard)
403}
404
405#[must_use = "futures do nothing unless you `.await` or poll them"]
407#[derive(Debug)]
408pub struct SendFuture<T> {
409 sender: Sender<T>,
410 msg: MessageOrId<T>,
411}
412
413impl<T> SendFuture<T> {
414 pub fn is_closed(&self) -> bool {
416 self.sender.is_closed()
417 }
418
419 pub fn is_empty(&self) -> bool {
421 self.sender.is_empty()
422 }
423
424 pub fn is_full(&self) -> bool {
426 self.sender.is_full()
427 }
428
429 pub fn len(&self) -> usize {
431 self.sender.len()
432 }
433
434 pub fn capacity(&self) -> Option<usize> {
436 self.sender.capacity()
437 }
438}
439
440pub struct SendSink<T>(SendFuture<T>);
444
445impl<T> SendSink<T> {
446 pub fn sender(&self) -> &Sender<T> {
448 &self.0.sender
449 }
450
451 pub fn is_closed(&self) -> bool {
453 self.0.sender.is_closed()
454 }
455
456 pub fn is_empty(&self) -> bool {
458 self.0.sender.is_empty()
459 }
460
461 pub fn is_full(&self) -> bool {
463 self.0.sender.is_full()
464 }
465
466 pub fn len(&self) -> usize {
468 self.0.sender.len()
469 }
470
471 pub fn capacity(&self) -> Option<usize> {
473 self.0.sender.capacity()
474 }
475
476 pub fn same_channel(&self, other: &Self) -> bool {
478 self.0.sender.same_channel(&other.0.sender)
479 }
480}
481
482impl<T> Debug for SendSink<T> {
483 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
484 f.debug_struct("SendSink").finish()
485 }
486}
487
488impl<T> Sink<T> for SendSink<T> {
489 type Error = SendError<T>;
490
491 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
492 Poll::Ready(Ok(()))
493 }
494
495 fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
496 self.0.msg = MessageOrId::Message(item);
497 Ok(())
498 }
499
500 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
501 if let MessageOrId::Message(_) = self.0.msg {
502 ready!(Pin::new(&mut self.0).poll(cx))?;
503 }
504 Poll::Ready(Ok(()))
505 }
506
507 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
508 if let MessageOrId::Message(_) = self.0.msg {
509 ready!(Pin::new(&mut self.0).poll(cx))?;
510 }
511 self.0.sender.close();
512 Poll::Ready(Ok(()))
513 }
514}
515
516impl<T> Clone for SendSink<T> {
517 fn clone(&self) -> SendSink<T> {
518 SendSink(SendFuture {
519 sender: self.0.sender.clone(),
520 msg: MessageOrId::Invalid,
521 })
522 }
523}
524
525#[derive(Debug)]
526enum MessageOrId<T> {
527 Message(T),
528 Id(usize),
529 Invalid,
530}
531
532impl<T> MessageOrId<T> {
533 fn take(&mut self) -> Self {
534 std::mem::replace(self, Self::Invalid)
535 }
536}
537
538impl<T> std::marker::Unpin for SendFuture<T> {}
539
540impl<T> Future for SendFuture<T> {
541 type Output = Result<(), SendError<T>>;
542
543 fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
544 let m = match self.msg.take() {
545 MessageOrId::Message(m) => m,
546 MessageOrId::Id(id) => {
547 let mut guard = self.sender.inner.shared_state.lock();
548 if guard.closed {
549 if let Some((_, (m, Some(_)))) = guard.pending_sends.remove(id) {
550 return Poll::Ready(Err(SendError(m)));
551 }
552 }
553 if !guard.pending_sends.contains(id) {
555 return Poll::Ready(Ok(()));
556 }
557 let s = if let Some((_, Some(s))) = guard.pending_sends.get(id) {
559 Some(s.clone())
560 } else {
561 None
562 };
563 drop(guard);
564 if let Some(s) = s {
565 s.wake();
566 }
567 self.msg = MessageOrId::Id(id);
568 return Poll::Pending;
569 }
570 MessageOrId::Invalid => panic!("Future polled after completion"),
571 };
572 let mut guard = self.sender.inner.shared_state.lock();
573 let id = guard.get_next_id();
574 let (m, mut guard) = match try_send(m, id, guard) {
575 TrySendResult::Ok => return Poll::Ready(Ok(())),
576 TrySendResult::Disconnected(m) => return Poll::Ready(Err(SendError(m))),
577 TrySendResult::Full(m, guard) => (m, guard),
578 };
579 guard
580 .pending_sends
581 .enqueue(id, (m, Some(cx.waker().clone().into())));
582 let opt = guard.pending_recvs.dequeue();
583 drop(guard);
584 if let Some((_, s)) = opt {
585 s.wake();
586 }
587 self.msg = MessageOrId::Id(id);
588 Poll::Pending
589 }
590}
591
592#[must_use = "futures do nothing unless you `.await` or poll them"]
596#[derive(Debug)]
597pub struct RecvFuture<T> {
598 id: usize,
599 receiver: Receiver<T>,
600}
601
602impl<T> RecvFuture<T> {
603 pub fn is_closed(&self) -> bool {
605 self.receiver.is_closed()
606 }
607
608 pub fn is_empty(&self) -> bool {
610 self.receiver.is_empty()
611 }
612
613 pub fn is_full(&self) -> bool {
615 self.receiver.is_full()
616 }
617
618 pub fn len(&self) -> usize {
620 self.receiver.len()
621 }
622
623 pub fn capacity(&self) -> Option<usize> {
625 self.receiver.capacity()
626 }
627}
628
629impl<T> Future for RecvFuture<T> {
630 type Output = Result<T, RecvError>;
631
632 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
633 let mut guard = match try_recv(self.receiver.inner.shared_state.lock()) {
634 TryRecvResult::Ok(r) => return Poll::Ready(Ok(r)),
635 TryRecvResult::Disconnected => return Poll::Ready(Err(RecvError::Disconnected)),
636 TryRecvResult::Empty(guard) => guard,
637 };
638 if guard.closed {
639 return Poll::Ready(Err(RecvError::Disconnected));
640 }
641 if !guard.pending_recvs.contains(self.id) {
642 guard
643 .pending_recvs
644 .enqueue(self.id, cx.waker().clone().into());
645 }
646 Poll::Pending
647 }
648}
649
650impl<T> Drop for RecvFuture<T> {
651 fn drop(&mut self) {
652 let mut guard = self.receiver.inner.shared_state.lock();
653 guard.pending_recvs.remove(self.id);
654 }
655}
656
657struct SenderInner<T> {
658 shared_state: Arc<Mutex<SharedState<T>>>,
659 send_count: AtomicUsize,
660 next_id: AtomicUsize,
661}
662
663pub struct Sender<T> {
665 inner: Arc<SenderInner<T>>,
666}
667
668impl<T> std::fmt::Debug for Sender<T> {
669 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
670 f.debug_struct("Sender").finish()
671 }
672}
673
674impl<T> Clone for Sender<T> {
675 fn clone(&self) -> Self {
678 self.inner.send_count.fetch_add(1, Ordering::Relaxed);
679 Self {
680 inner: Arc::clone(&self.inner),
681 }
682 }
683}
684
685impl<T> Sender<T> {
686 fn new(shared_state: Arc<Mutex<SharedState<T>>>) -> Self {
687 Self {
688 inner: Arc::new(SenderInner {
689 shared_state,
690 send_count: AtomicUsize::new(1),
691 next_id: AtomicUsize::new(1),
692 })
693 }
694 }
695
696 fn get_next_id(&self) -> usize {
697 self.inner.next_id.fetch_add(1, Ordering::Relaxed)
698 }
699
700 pub fn try_send(&self, m: T) -> Result<(), TrySendError<T>> {
709 match try_send(m, self.get_next_id(), self.inner.shared_state.lock()) {
710 TrySendResult::Ok => Ok(()),
711 TrySendResult::Disconnected(m) => Err(TrySendError::Disconnected(m)),
712 TrySendResult::Full(m, _) => Err(TrySendError::Full(m)),
713 }
714 }
715
716 pub fn send_async(&self, m: T) -> SendFuture<T> {
722 SendFuture {
723 sender: self.clone(),
724 msg: MessageOrId::Message(m),
725 }
726 }
727
728 pub fn send(&self, m: T) -> Result<(), SendError<T>> {
731 let id = self.get_next_id();
732 let (m, mut guard) = match try_send(m, id, self.inner.shared_state.lock()) {
733 TrySendResult::Ok => return Ok(()),
734 TrySendResult::Disconnected(m) => return Err(SendError(m)),
735 TrySendResult::Full(m, guard) => (m, guard),
736 };
737 let sync_signal = SyncSignal::new();
738
739 guard
740 .pending_sends
741 .enqueue(id, (m, Some(sync_signal.clone().into())));
742 drop(guard);
743 loop {
744 sync_signal.wait();
745 let mut guard = self.inner.shared_state.lock();
746 if guard.closed {
747 if let Some((_, (m, Some(_)))) = guard.pending_sends.remove(id) {
748 return Err(SendError(m));
749 }
750 }
751 if !guard.pending_sends.contains(id) {
752 break;
753 }
754 }
755 Ok(())
756 }
757
758 pub fn send_timeout(&self, m: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
764 let id = self.get_next_id();
765 let (m, mut guard) = match try_send(m, id, self.inner.shared_state.lock()) {
766 TrySendResult::Ok => return Ok(()),
767 TrySendResult::Disconnected(m) => return Err(SendTimeoutError::Disconnected(m)),
768 TrySendResult::Full(m, guard) => (m, guard),
769 };
770 let sync_signal = SyncSignal::new();
771 guard
772 .pending_sends
773 .enqueue(id, (m, Some(sync_signal.clone().into())));
774 drop(guard);
775 loop {
776 let _ = sync_signal.wait_timeout(timeout);
777 let mut guard = self.inner.shared_state.lock();
778 if let Some((_, (m, Some(_)))) = guard.pending_sends.remove(id) {
779 if guard.closed {
780 return Err(SendTimeoutError::Disconnected(m));
781 }
782 return Err(SendTimeoutError::Timeout(m));
783 }
784 if !guard.pending_sends.contains(id) {
785 break;
786 }
787 }
788 Ok(())
789 }
790
791 pub fn send_deadline(&self, m: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
794 self.send_timeout(m, deadline.checked_duration_since(Instant::now()).unwrap())
795 }
796
797 pub fn same_channel(&self, other: &Sender<T>) -> bool {
799 Arc::ptr_eq(&self.inner.shared_state, &other.inner.shared_state)
800 }
801
802 pub fn len(&self) -> usize {
807 self.inner.shared_state.lock().len()
808 }
809
810 pub fn capacity(&self) -> Option<usize> {
812 self.inner.shared_state.lock().cap
813 }
814
815 pub fn is_empty(&self) -> bool {
819 self.inner.shared_state.lock().is_empty()
820 }
821
822 pub fn is_full(&self) -> bool {
826 self.inner.shared_state.lock().is_full()
827 }
828
829 pub fn close(&self) -> bool {
835 self.inner.shared_state.lock().close()
836 }
837
838 pub fn is_closed(&self) -> bool {
840 self.inner.shared_state.lock().is_closed()
841 }
842
843 pub fn sink(&self) -> SendSink<T> {
845 SendSink(SendFuture {
846 sender: self.clone(),
847 msg: MessageOrId::Invalid,
848 })
849 }
850
851 pub fn into_sink(self) -> SendSink<T> {
853 SendSink(SendFuture {
854 sender: self,
855 msg: MessageOrId::Invalid,
856 })
857 }
858}
859
860impl<T> Drop for Sender<T> {
861 fn drop(&mut self) {
862 let _ = self
863 .inner
864 .send_count
865 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| {
866 let mut count = c;
867 if count > 0 {
868 count -= 1;
869 if count == 0 {
870 self.inner.shared_state.lock().close();
871 }
872 }
873 Some(count)
874 });
875 }
876}
877
878struct ReceiverInner<T> {
879 shared_state: Arc<Mutex<SharedState<T>>>,
880 recv_count: AtomicUsize,
881 next_id: AtomicUsize,
882}
883
884pub struct Receiver<T> {
890 inner: Arc<ReceiverInner<T>>,
891}
892
893impl<T> std::fmt::Debug for Receiver<T> {
894 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
895 f.debug_struct("Receiver").finish()
896 }
897}
898
899impl<T> Clone for Receiver<T> {
900 fn clone(&self) -> Self {
908 self.inner.recv_count.fetch_add(1, Ordering::Relaxed);
909 Self {
910 inner: Arc::clone(&self.inner),
911 }
912 }
913}
914
915impl<T> Receiver<T> {
916 fn new(shared_state: Arc<Mutex<SharedState<T>>>) -> Self {
917 Self {
918 inner: Arc::new(ReceiverInner {
919 shared_state,
920 recv_count: AtomicUsize::new(1),
921 next_id: AtomicUsize::new(1),
922 }),
923 }
924 }
925
926 fn get_next_id(&self) -> usize {
927 self.inner.next_id.fetch_add(1, Ordering::Relaxed)
928 }
929
930 pub fn try_recv(&self) -> Result<T, TryRecvError> {
936 match try_recv(self.inner.shared_state.lock()) {
937 TryRecvResult::Ok(m) => Ok(m),
938 TryRecvResult::Disconnected => Err(TryRecvError::Disconnected),
939 TryRecvResult::Empty(_) => Err(TryRecvError::Empty),
940 }
941 }
942
943 pub fn recv_async(&self) -> RecvFuture<T> {
949 RecvFuture {
950 id: self.get_next_id(),
951 receiver: self.clone(),
952 }
953 }
954
955 pub fn recv(&self) -> Result<T, RecvError> {
958 loop {
959 let mut guard = match try_recv(self.inner.shared_state.lock()) {
960 TryRecvResult::Ok(r) => return Ok(r),
961 TryRecvResult::Disconnected => return Err(RecvError::Disconnected),
962 TryRecvResult::Empty(guard) => guard,
963 };
964 let id = self.get_next_id();
965 let sync_signal = SyncSignal::new();
966 guard.pending_recvs.enqueue(id, sync_signal.clone().into());
967 drop(guard);
968 sync_signal.wait();
969 }
970 }
971
972 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
975 let start_time = Instant::now();
976 let mut timeout_remaining = timeout;
977 loop {
978 let mut guard = match try_recv(self.inner.shared_state.lock()) {
979 TryRecvResult::Ok(r) => return Ok(r),
980 TryRecvResult::Disconnected => return Err(RecvTimeoutError::Disconnected),
981 TryRecvResult::Empty(guard) => guard,
982 };
983 if guard.closed {
984 return Err(RecvTimeoutError::Disconnected);
985 }
986 let id = self.get_next_id();
987 let sync_signal = SyncSignal::new();
988 guard.pending_recvs.enqueue(id, sync_signal.clone().into());
989 drop(guard);
990 let _ = sync_signal.wait_timeout(timeout_remaining);
991 let elapsed = start_time.elapsed();
992 if elapsed >= timeout {
993 let mut guard = self.inner.shared_state.lock();
994 guard.pending_recvs.remove(id);
995 drop(guard);
996 return Err(RecvTimeoutError::Timeout);
997 }
998 timeout_remaining = timeout - elapsed;
999 }
1000 }
1001
1002 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1005 self.recv_timeout(deadline.checked_duration_since(Instant::now()).unwrap())
1006 }
1007
1008 pub fn drain(&self) -> Drain<T> {
1012 let mut guard = self.inner.shared_state.lock();
1013 let queue = std::mem::take(&mut guard.queue);
1014 let n = guard
1015 .cap
1016 .map_or(0, |cap| cap.min(guard.pending_sends.len()));
1017 for _ in 0..n {
1018 if let Some((id, (m, mut s))) = guard.pending_sends.dequeue() {
1019 guard.queue.enqueue(id, m);
1020 if let Some(s) = s.take() {
1021 s.wake();
1022 }
1023 }
1024 }
1025 Drain {
1026 queue,
1027 _phantom: PhantomData,
1028 }
1029 }
1030
1031 pub fn iter(&self) -> Iter<T> {
1034 Iter { receiver: self }
1035 }
1036
1037 pub fn try_iter(&self) -> TryIter<T> {
1043 TryIter { receiver: self }
1044 }
1045
1046 pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1048 Arc::ptr_eq(&self.inner.shared_state, &other.inner.shared_state)
1049 }
1050
1051 pub fn len(&self) -> usize {
1056 self.inner.shared_state.lock().len()
1057 }
1058
1059 pub fn capacity(&self) -> Option<usize> {
1061 self.inner.shared_state.lock().cap
1062 }
1063
1064 pub fn is_empty(&self) -> bool {
1068 self.inner.shared_state.lock().is_empty()
1069 }
1070
1071 pub fn is_full(&self) -> bool {
1075 self.inner.shared_state.lock().is_full()
1076 }
1077
1078 pub fn close(&self) -> bool {
1084 self.inner.shared_state.lock().close()
1085 }
1086
1087 pub fn is_closed(&self) -> bool {
1089 self.inner.shared_state.lock().is_closed()
1090 }
1091
1092 pub fn stream(&self) -> RecvStream<T> {
1094 RecvStream(RecvFuture {
1095 id: self.get_next_id(),
1096 receiver: self.clone(),
1097 })
1098 }
1099
1100 pub fn into_stream(self) -> RecvStream<T> {
1102 RecvStream(RecvFuture {
1103 id: self.get_next_id(),
1104 receiver: self,
1105 })
1106 }
1107}
1108
1109impl<T> Drop for Receiver<T> {
1110 fn drop(&mut self) {
1111 let _ = self
1112 .inner
1113 .recv_count
1114 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| {
1115 let mut count = c;
1116 if count > 0 {
1117 count -= 1;
1118 if count == 0 {
1119 self.inner.shared_state.lock().close();
1120 }
1121 }
1122 Some(count)
1123 });
1124 }
1125}
1126
1127#[derive(Debug)]
1129pub struct Drain<'a, T> {
1130 queue: Queue<T>,
1131 _phantom: PhantomData<&'a ()>,
1135}
1136
1137impl<'a, T> Iterator for Drain<'a, T> {
1138 type Item = T;
1139
1140 fn next(&mut self) -> Option<Self::Item> {
1141 self.queue.dequeue().map(|(_, x)| x)
1142 }
1143}
1144
1145impl<'a, T> ExactSizeIterator for Drain<'a, T> {
1146 fn len(&self) -> usize {
1147 self.queue.len()
1148 }
1149}
1150
1151impl<'a, T> Iterator for Iter<'a, T> {
1152 type Item = T;
1153
1154 fn next(&mut self) -> Option<Self::Item> {
1155 self.receiver.recv().ok()
1156 }
1157}
1158
1159impl<'a, T> Iterator for TryIter<'a, T> {
1160 type Item = T;
1161
1162 fn next(&mut self) -> Option<Self::Item> {
1163 self.receiver.try_recv().ok()
1164 }
1165}
1166
1167impl<'a, T> IntoIterator for &'a Receiver<T> {
1168 type Item = T;
1169 type IntoIter = Iter<'a, T>;
1170
1171 fn into_iter(self) -> Self::IntoIter {
1172 Iter { receiver: self }
1173 }
1174}
1175
1176impl<T> IntoIterator for Receiver<T> {
1177 type Item = T;
1178 type IntoIter = IntoIter<T>;
1179
1180 fn into_iter(self) -> Self::IntoIter {
1181 IntoIter { receiver: self }
1182 }
1183}
1184
1185pub struct RecvStream<T>(RecvFuture<T>);
1189
1190impl<T> RecvStream<T> {
1191 pub fn is_closed(&self) -> bool {
1193 self.0.is_closed()
1194 }
1195
1196 pub fn is_empty(&self) -> bool {
1198 self.0.is_empty()
1199 }
1200
1201 pub fn is_full(&self) -> bool {
1203 self.0.is_full()
1204 }
1205
1206 pub fn len(&self) -> usize {
1208 self.0.len()
1209 }
1210
1211 pub fn capacity(&self) -> Option<usize> {
1213 self.0.capacity()
1214 }
1215
1216 pub fn same_channel(&self, other: &Self) -> bool {
1218 self.0.receiver.same_channel(&other.0.receiver)
1219 }
1220}
1221
1222impl<T> std::fmt::Debug for RecvStream<T> {
1223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1224 f.debug_struct("RecvStream").finish()
1225 }
1226}
1227
1228impl<T> Clone for RecvStream<T> {
1229 fn clone(&self) -> RecvStream<T> {
1230 RecvStream(RecvFuture {
1231 id: self.0.receiver.get_next_id(),
1232 receiver: self.0.receiver.clone(),
1233 })
1234 }
1235}
1236
1237impl<T> Stream for RecvStream<T> {
1238 type Item = T;
1239
1240 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1241 match Pin::new(&mut self.0).poll(cx) {
1242 Poll::Pending => Poll::Pending,
1243 Poll::Ready(item) => Poll::Ready(item.ok()),
1244 }
1245 }
1246}
1247
1248fn channel<T>(cap: Option<usize>) -> (Sender<T>, Receiver<T>) {
1249 let shared_state = Arc::new(Mutex::new(SharedState::new(cap)));
1250 let sender = Sender::new(Arc::clone(&shared_state));
1251 let receiver = Receiver::new(shared_state);
1252 (sender, receiver)
1253}
1254
1255pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
1293 channel(Some(cap))
1294}
1295
1296pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1313 channel(None)
1314}