loole/
lib.rs

1//! An async/sync multi-producer multi-consumer channel. Multiple threads can send and receive
2//! messages on the channel at the same time and each message will be received by only one thread.
3//!
4//! Producers can send and consumers can receive messages asynchronously or synchronously:
5//!
6//! There are two types of channels: bounded and unbounded.
7//!
8//! 1. [Bounded][`bounded()`] channel with limited capacity.
9//! 2. [Unbounded][`unbounded()`] channel with unlimited capacity.
10//!
11//! A channel has two sides: the [`Sender`] side and the [`Receiver`] side. Both sides are cloneable, meaning
12//! that they can be copied and shared among multiple threads. This allows you to have multiple
13//! threads sending and receiving messages on the same channel.
14//!
15//! When all [`Sender`]s or all [`Receiver`]s are dropped, the channel becomes closed. This means that no
16//! more messages can be sent, but remaining messages can still be received.
17//!
18//! The channel can also be closed manually by calling Sender::close() or Receiver::close().
19//!
20//! # Examples
21//!
22//! ```
23//! let (tx, rx) = loole::unbounded();
24//!
25//! std::thread::spawn(move || {
26//!     for i in 0..10 {
27//!         tx.send(i).unwrap();
28//!     }
29//! });
30//!
31//! let mut sum = 0;
32//! while let Ok(i) = rx.recv() {
33//!     sum += i;
34//! }
35//!
36//! assert_eq!(sum, (0..10).sum());
37//! ```
38
39#![forbid(unsafe_code)]
40#![warn(missing_docs)]
41
42mod mutex;
43mod queue;
44mod signal;
45
46use std::fmt::Debug;
47use std::future::Future;
48use std::marker::PhantomData;
49use std::pin::Pin;
50use std::sync::atomic::AtomicUsize;
51use std::sync::atomic::Ordering;
52use std::sync::Arc;
53use std::task::ready;
54use std::task::{Context, Poll};
55use std::time::{Duration, Instant};
56
57use futures_core::Stream;
58use futures_sink::Sink;
59
60use mutex::MutexGuard;
61use queue::Queue;
62
63use crate::mutex::Mutex;
64use crate::signal::{Signal, SyncSignal};
65
66/// An error that occurs when trying to receive a value from a channel after all senders have been
67/// dropped and there are no more messages in the channel.
68#[derive(PartialEq, Eq, Clone, Copy, Debug)]
69pub enum RecvError {
70    /// No further messages can be received because all senders have been dropped and there are no messages
71    /// waiting in the channel.
72    Disconnected,
73}
74
75impl std::error::Error for RecvError {}
76
77impl std::fmt::Display for RecvError {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        match self {
80            RecvError::Disconnected => f.write_str("receiving on a closed channel"),
81        }
82    }
83}
84
85/// An error that occurs when trying to send a value on a channel after all receivers have been dropped.
86#[derive(PartialEq, Eq, Clone, Copy)]
87pub struct SendError<T>(pub T);
88
89impl<T> std::error::Error for SendError<T> {}
90
91impl<T> SendError<T> {
92    /// Consumes the error, returning the message that failed to send.
93    pub fn into_inner(self) -> T {
94        self.0
95    }
96}
97
98impl<T> std::fmt::Display for SendError<T> {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        f.write_str("sending on a closed channel")
101    }
102}
103
104impl<T> std::fmt::Debug for SendError<T> {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.write_str("SendError(..)")
107    }
108}
109
110/// An error that occurs when trying to send a value to a channel:
111///
112/// * When the channel is full.
113/// * When all receivers have been dropped.
114#[derive(PartialEq, Eq, Clone, Copy)]
115pub enum TrySendError<T> {
116    /// The message cannot be sent because the channel is full.
117    Full(T),
118    /// All receivers have been dropped, so the message cannot be received.
119    Disconnected(T),
120}
121
122impl<T> TrySendError<T> {
123    /// Consume the error and return the message that failed to send.
124    pub fn into_inner(self) -> T {
125        match self {
126            Self::Full(msg) | Self::Disconnected(msg) => msg,
127        }
128    }
129}
130
131impl<T> std::fmt::Debug for TrySendError<T> {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        match *self {
134            TrySendError::Full(..) => f.write_str("Full(..)"),
135            TrySendError::Disconnected(..) => f.write_str("Disconnected(..)"),
136        }
137    }
138}
139
140impl<T> std::fmt::Display for TrySendError<T> {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        match self {
143            TrySendError::Full(..) => f.write_str("sending on a full channel"),
144            TrySendError::Disconnected(..) => f.write_str("sending on a closed channel"),
145        }
146    }
147}
148
149impl<T> std::error::Error for TrySendError<T> {}
150
151impl<T> From<SendError<T>> for TrySendError<T> {
152    fn from(err: SendError<T>) -> Self {
153        match err {
154            SendError(item) => Self::Disconnected(item),
155        }
156    }
157}
158
159/// An error that occurs when trying to receive a value from a channel when there are no messages in the channel.
160/// If there are no messages in the channel and all senders are dropped, then the `Disconnected` error will be
161/// returned.
162#[derive(PartialEq, Eq, Clone, Copy, Debug)]
163pub enum TryRecvError {
164    /// An error that occurs when trying to receive a value from an empty channel.
165    Empty,
166    /// The channel has been closed because all senders have been dropped and there are no more messages waiting
167    /// in the channel.
168    Disconnected,
169}
170
171impl std::fmt::Display for TryRecvError {
172    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173        match self {
174            TryRecvError::Empty => f.write_str("receiving on an empty channel"),
175            TryRecvError::Disconnected => f.write_str("channel is empty and closed"),
176        }
177    }
178}
179
180impl std::error::Error for TryRecvError {}
181
182impl From<RecvError> for TryRecvError {
183    fn from(err: RecvError) -> Self {
184        match err {
185            RecvError::Disconnected => Self::Disconnected,
186        }
187    }
188}
189
190/// An error that may be returned when attempting to receive a value on a channel with a timeout
191/// and no value is received within the timeout period, or when all senders have been dropped and
192/// there are no more values left in the channel.
193#[derive(PartialEq, Eq, Clone, Copy, Debug)]
194pub enum RecvTimeoutError {
195    /// The operation timed out while waiting for a message to be received.
196    Timeout,
197    /// The channel is empty and all senders have been dropped, so no further messages can be received.
198    Disconnected,
199}
200
201impl std::error::Error for RecvTimeoutError {}
202
203impl std::fmt::Display for RecvTimeoutError {
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        match self {
206            RecvTimeoutError::Timeout => f.write_str("timed out waiting on a channel"),
207            RecvTimeoutError::Disconnected => f.write_str("channel is empty and closed"),
208        }
209    }
210}
211
212/// An error that may be emitted when sending a value into a channel on a sender with a timeout.
213///
214/// This error can occur when either:
215///
216/// * The send operation times out before the value is successfully sent.
217/// * All receivers of the channel are dropped before the value is successfully sent.
218#[derive(Copy, Clone, PartialEq, Eq)]
219pub enum SendTimeoutError<T> {
220    /// A timeout occurred when attempting to send the message.
221    Timeout(T),
222    /// The message cannot be sent because all channel receivers were dropped.
223    Disconnected(T),
224}
225
226impl<T> std::error::Error for SendTimeoutError<T> {}
227
228impl<T> SendTimeoutError<T> {
229    /// Consumes the error, returning the message that failed to send.
230    pub fn into_inner(self) -> T {
231        match self {
232            Self::Timeout(msg) | Self::Disconnected(msg) => msg,
233        }
234    }
235}
236
237impl<T> std::fmt::Debug for SendTimeoutError<T> {
238    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239        f.write_str("SendTimeoutError(..)")
240    }
241}
242
243impl<T> std::fmt::Display for SendTimeoutError<T> {
244    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245        match self {
246            SendTimeoutError::Timeout(..) => f.write_str("timed out sending on a full channel"),
247            SendTimeoutError::Disconnected(..) => f.write_str("sending on a closed channel"),
248        }
249    }
250}
251
252impl<T> From<SendError<T>> for SendTimeoutError<T> {
253    fn from(value: SendError<T>) -> Self {
254        SendTimeoutError::Disconnected(value.0)
255    }
256}
257
258/// An iterator over the msgs received synchronously from a channel.
259pub struct Iter<'a, T> {
260    receiver: &'a Receiver<T>,
261}
262
263/// An non-blocking iterator over the msgs received synchronously from a channel.
264pub struct TryIter<'a, T> {
265    receiver: &'a Receiver<T>,
266}
267
268/// An owned iterator over the msgs received synchronously from a channel.
269pub struct IntoIter<T> {
270    receiver: Receiver<T>,
271}
272
273impl<T> Iterator for IntoIter<T> {
274    type Item = T;
275
276    fn next(&mut self) -> Option<Self::Item> {
277        self.receiver.recv().ok()
278    }
279}
280
281#[derive(Debug)]
282struct SharedState<T> {
283    pending_recvs: Queue<Signal>,
284    pending_sends: Queue<(T, Option<Signal>)>,
285    queue: Queue<T>,
286    closed: bool,
287    cap: Option<usize>,
288    next_id: usize,
289}
290
291impl<T> SharedState<T> {
292    fn new(cap: Option<usize>) -> Self {
293        let pending_sends = cap.map_or_else(Queue::new, Queue::with_capacity);
294        Self {
295            pending_recvs: Queue::new(),
296            pending_sends,
297            queue: Queue::new(),
298            closed: false,
299            cap,
300            next_id: 1,
301        }
302    }
303
304    fn len(&self) -> usize {
305        self.queue.len()
306    }
307
308    fn is_full(&self) -> bool {
309        Some(self.len()) == self.cap
310    }
311
312    fn is_empty(&self) -> bool {
313        self.len() == 0
314    }
315
316    fn get_next_id(&mut self) -> usize {
317        let id = self.next_id;
318        self.next_id = self.next_id.wrapping_add(1);
319        id
320    }
321
322    fn close(&mut self) -> bool {
323        let was_closed = self.closed;
324        self.closed = true;
325        for (_, s) in self.pending_recvs.iter() {
326            s.wake_by_ref();
327        }
328        for (_, (_, s)) in self.pending_sends.iter() {
329            if let Some(s) = s {
330                s.wake_by_ref();
331            }
332        }
333        !was_closed
334    }
335
336    fn is_closed(&self) -> bool {
337        self.closed
338    }
339}
340
341enum TrySendResult<'a, T> {
342    Ok,
343    Disconnected(T),
344    Full(T, MutexGuard<'a, SharedState<T>>),
345}
346
347#[inline(always)]
348fn try_send<T>(m: T, id: usize, mut guard: MutexGuard<'_, SharedState<T>>) -> TrySendResult<T> {
349    if guard.closed {
350        return TrySendResult::Disconnected(m);
351    }
352    if !guard.is_full() {
353        guard.queue.enqueue(id, m);
354
355        let pending_recvs = std::mem::take(&mut guard.pending_recvs);
356        drop(guard);
357        for (_, s) in pending_recvs.iter() {
358            s.wake_by_ref();
359        }
360        
361        return TrySendResult::Ok;
362    } else if guard.cap == Some(0) {
363        if let Some((_, s)) = guard.pending_recvs.dequeue() {
364            guard.pending_sends.enqueue(id, (m, None));
365            drop(guard);
366            s.wake();
367            return TrySendResult::Ok;
368        }
369    }
370    TrySendResult::Full(m, guard)
371}
372
373enum TryRecvResult<'a, T> {
374    Ok(T),
375    Disconnected,
376    Empty(MutexGuard<'a, SharedState<T>>),
377}
378
379#[inline(always)]
380fn try_recv<T>(mut guard: MutexGuard<'_, SharedState<T>>) -> TryRecvResult<T> {
381    if let Some((_, m)) = guard.queue.dequeue() {
382        if let Some((id, (m, s))) = guard.pending_sends.dequeue() {
383            guard.queue.enqueue(id, m);
384            if let Some(s) = s {
385                drop(guard);
386                s.wake();
387            }
388        }
389        return TryRecvResult::Ok(m);
390    } else if guard.cap == Some(0) {
391        if let Some((_, (m, s))) = guard.pending_sends.dequeue() {
392            if let Some(s) = s {
393                drop(guard);
394                s.wake();
395            }
396            return TryRecvResult::Ok(m);
397        }
398    }
399    if guard.closed {
400        return TryRecvResult::Disconnected;
401    }
402    TryRecvResult::Empty(guard)
403}
404
405/// A future that sends a value into a channel.
406#[must_use = "futures do nothing unless you `.await` or poll them"]
407#[derive(Debug)]
408pub struct SendFuture<T> {
409    sender: Sender<T>,
410    msg: MessageOrId<T>,
411}
412
413impl<T> SendFuture<T> {
414    /// See [`Sender::is_closed`].
415    pub fn is_closed(&self) -> bool {
416        self.sender.is_closed()
417    }
418
419    /// See [`Sender::is_empty`].
420    pub fn is_empty(&self) -> bool {
421        self.sender.is_empty()
422    }
423
424    /// See [`Sender::is_full`].
425    pub fn is_full(&self) -> bool {
426        self.sender.is_full()
427    }
428
429    /// See [`Sender::len`].
430    pub fn len(&self) -> usize {
431        self.sender.len()
432    }
433
434    /// See [`Sender::capacity`].
435    pub fn capacity(&self) -> Option<usize> {
436        self.sender.capacity()
437    }
438}
439
440/// A sink that allows sending values into a channel.
441///
442/// Can be created via [`Sender::sink`] or [`Sender::into_sink`].
443pub struct SendSink<T>(SendFuture<T>);
444
445impl<T> SendSink<T> {
446    /// Returns a clone of a sending half of the channel of this sink.
447    pub fn sender(&self) -> &Sender<T> {
448        &self.0.sender
449    }
450
451    /// See [`Sender::is_closed`].
452    pub fn is_closed(&self) -> bool {
453        self.0.sender.is_closed()
454    }
455
456    /// See [`Sender::is_empty`].
457    pub fn is_empty(&self) -> bool {
458        self.0.sender.is_empty()
459    }
460
461    /// See [`Sender::is_full`].
462    pub fn is_full(&self) -> bool {
463        self.0.sender.is_full()
464    }
465
466    /// See [`Sender::len`].
467    pub fn len(&self) -> usize {
468        self.0.sender.len()
469    }
470
471    /// See [`Sender::capacity`].
472    pub fn capacity(&self) -> Option<usize> {
473        self.0.sender.capacity()
474    }
475
476    /// Returns whether the SendSinks are belong to the same channel.
477    pub fn same_channel(&self, other: &Self) -> bool {
478        self.0.sender.same_channel(&other.0.sender)
479    }
480}
481
482impl<T> Debug for SendSink<T> {
483    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
484        f.debug_struct("SendSink").finish()
485    }
486}
487
488impl<T> Sink<T> for SendSink<T> {
489    type Error = SendError<T>;
490
491    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
492        Poll::Ready(Ok(()))
493    }
494
495    fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
496        self.0.msg = MessageOrId::Message(item);
497        Ok(())
498    }
499
500    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
501        if let MessageOrId::Message(_) = self.0.msg {
502            ready!(Pin::new(&mut self.0).poll(cx))?;
503        }
504        Poll::Ready(Ok(()))
505    }
506
507    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
508        if let MessageOrId::Message(_) = self.0.msg {
509            ready!(Pin::new(&mut self.0).poll(cx))?;
510        }
511        self.0.sender.close();
512        Poll::Ready(Ok(()))
513    }
514}
515
516impl<T> Clone for SendSink<T> {
517    fn clone(&self) -> SendSink<T> {
518        SendSink(SendFuture {
519            sender: self.0.sender.clone(),
520            msg: MessageOrId::Invalid,
521        })
522    }
523}
524
525#[derive(Debug)]
526enum MessageOrId<T> {
527    Message(T),
528    Id(usize),
529    Invalid,
530}
531
532impl<T> MessageOrId<T> {
533    fn take(&mut self) -> Self {
534        std::mem::replace(self, Self::Invalid)
535    }
536}
537
538impl<T> std::marker::Unpin for SendFuture<T> {}
539
540impl<T> Future for SendFuture<T> {
541    type Output = Result<(), SendError<T>>;
542
543    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
544        let m = match self.msg.take() {
545            MessageOrId::Message(m) => m,
546            MessageOrId::Id(id) => {
547                let mut guard = self.sender.inner.shared_state.lock();
548                if guard.closed {
549                    if let Some((_, (m, Some(_)))) = guard.pending_sends.remove(id) {
550                        return Poll::Ready(Err(SendError(m)));
551                    }
552                }
553                // Check if the message has been sent
554                if !guard.pending_sends.contains(id) {
555                    return Poll::Ready(Ok(()));
556                }
557                // Message is still pending, update the waker and return Pending
558                let s = if let Some((_, Some(s))) = guard.pending_sends.get(id) {
559                    Some(s.clone())
560                } else {
561                    None
562                };
563                drop(guard);
564                if let Some(s) = s {
565                    s.wake();
566                }
567                self.msg = MessageOrId::Id(id);
568                return Poll::Pending;
569            }
570            MessageOrId::Invalid => panic!("Future polled after completion"),
571        };
572        let mut guard = self.sender.inner.shared_state.lock();
573        let id = guard.get_next_id();
574        let (m, mut guard) = match try_send(m, id, guard) {
575            TrySendResult::Ok => return Poll::Ready(Ok(())),
576            TrySendResult::Disconnected(m) => return Poll::Ready(Err(SendError(m))),
577            TrySendResult::Full(m, guard) => (m, guard),
578        };
579        guard
580            .pending_sends
581            .enqueue(id, (m, Some(cx.waker().clone().into())));
582        let opt = guard.pending_recvs.dequeue();
583        drop(guard);
584        if let Some((_, s)) = opt {
585            s.wake();
586        }
587        self.msg = MessageOrId::Id(id);
588        Poll::Pending
589    }
590}
591
592/// A future that allows asynchronously receiving a message.
593/// This future will resolve to a message when a message is available on the channel,
594/// or to an error if the channel is closed.
595#[must_use = "futures do nothing unless you `.await` or poll them"]
596#[derive(Debug)]
597pub struct RecvFuture<T> {
598    id: usize,
599    receiver: Receiver<T>,
600}
601
602impl<T> RecvFuture<T> {
603    /// See [`Receiver::is_closed`].
604    pub fn is_closed(&self) -> bool {
605        self.receiver.is_closed()
606    }
607
608    /// See [`Receiver::is_empty`].
609    pub fn is_empty(&self) -> bool {
610        self.receiver.is_empty()
611    }
612
613    /// See [`Receiver::is_full`].
614    pub fn is_full(&self) -> bool {
615        self.receiver.is_full()
616    }
617
618    /// See [`Receiver::len`].
619    pub fn len(&self) -> usize {
620        self.receiver.len()
621    }
622
623    /// See [`Receiver::capacity`].
624    pub fn capacity(&self) -> Option<usize> {
625        self.receiver.capacity()
626    }
627}
628
629impl<T> Future for RecvFuture<T> {
630    type Output = Result<T, RecvError>;
631
632    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
633        let mut guard = match try_recv(self.receiver.inner.shared_state.lock()) {
634            TryRecvResult::Ok(r) => return Poll::Ready(Ok(r)),
635            TryRecvResult::Disconnected => return Poll::Ready(Err(RecvError::Disconnected)),
636            TryRecvResult::Empty(guard) => guard,
637        };
638        if guard.closed {
639            return Poll::Ready(Err(RecvError::Disconnected));
640        }
641        if !guard.pending_recvs.contains(self.id) {
642            guard
643                .pending_recvs
644                .enqueue(self.id, cx.waker().clone().into());
645        }
646        Poll::Pending
647    }
648}
649
650impl<T> Drop for RecvFuture<T> {
651    fn drop(&mut self) {
652        let mut guard = self.receiver.inner.shared_state.lock();
653        guard.pending_recvs.remove(self.id);
654    }
655}
656
657struct SenderInner<T> {
658    shared_state: Arc<Mutex<SharedState<T>>>,
659    send_count: AtomicUsize,
660    next_id: AtomicUsize,
661}
662
663/// The sending half of a channel.
664pub struct Sender<T> {
665    inner: Arc<SenderInner<T>>,
666}
667
668impl<T> std::fmt::Debug for Sender<T> {
669    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
670        f.debug_struct("Sender").finish()
671    }
672}
673
674impl<T> Clone for Sender<T> {
675    /// Clones this sender. A [`Sender`] acts as a handle to the sending end of a channel. The remaining
676    /// contents of the channel will only be cleaned up when all senders and the receiver have been dropped.
677    fn clone(&self) -> Self {
678        self.inner.send_count.fetch_add(1, Ordering::Relaxed);
679        Self {
680            inner: Arc::clone(&self.inner),
681        }
682    }
683}
684
685impl<T> Sender<T> {
686    fn new(shared_state: Arc<Mutex<SharedState<T>>>) -> Self {
687        Self {
688            inner: Arc::new(SenderInner {
689                shared_state,
690                send_count: AtomicUsize::new(1),
691                next_id: AtomicUsize::new(1),
692            })
693        }
694    }
695
696    fn get_next_id(&self) -> usize {
697        self.inner.next_id.fetch_add(1, Ordering::Relaxed)
698    }
699
700    /// It returns an error if the channel is bounded and full, or if all receivers have been dropped.
701    /// If the channel is unbounded, the method behaves the same as the [`Sender::send`] method.
702    ///
703    /// This method is useful for avoiding deadlocks. If you are sending a value into a channel and
704    /// you are not sure if the channel is full or if all receivers have been dropped, you can use
705    /// this method instead of the [`Sender::send`] method. If this method returns an error, you can
706    /// take appropriate action, such as retrying the send operation later or buffering the value
707    /// until you can send it successfully.
708    pub fn try_send(&self, m: T) -> Result<(), TrySendError<T>> {
709        match try_send(m, self.get_next_id(), self.inner.shared_state.lock()) {
710            TrySendResult::Ok => Ok(()),
711            TrySendResult::Disconnected(m) => Err(TrySendError::Disconnected(m)),
712            TrySendResult::Full(m, _) => Err(TrySendError::Full(m)),
713        }
714    }
715
716    /// Asynchronously send a value into the channel, it will return a future that completes when the
717    /// value has been successfully sent, or when an error has occurred.
718    ///
719    /// The method returns an error if all receivers on the channel have been dropped.
720    /// If the channel is bounded and is full, the returned future will yield to the async runtime.
721    pub fn send_async(&self, m: T) -> SendFuture<T> {
722        SendFuture {
723            sender: self.clone(),
724            msg: MessageOrId::Message(m),
725        }
726    }
727
728    /// Sends a value into the channel. Returns an error if all receivers have been dropped, or if
729    /// the channel is bounded and is full and no receivers are available.
730    pub fn send(&self, m: T) -> Result<(), SendError<T>> {
731        let id = self.get_next_id();
732        let (m, mut guard) = match try_send(m, id, self.inner.shared_state.lock()) {
733            TrySendResult::Ok => return Ok(()),
734            TrySendResult::Disconnected(m) => return Err(SendError(m)),
735            TrySendResult::Full(m, guard) => (m, guard),
736        };
737        let sync_signal = SyncSignal::new();
738
739        guard
740            .pending_sends
741            .enqueue(id, (m, Some(sync_signal.clone().into())));
742        drop(guard);
743        loop {
744            sync_signal.wait();
745            let mut guard = self.inner.shared_state.lock();
746            if guard.closed {
747                if let Some((_, (m, Some(_)))) = guard.pending_sends.remove(id) {
748                    return Err(SendError(m));
749                }
750            }
751            if !guard.pending_sends.contains(id) {
752                break;
753            }
754        }
755        Ok(())
756    }
757
758    /// Attempts to send a value into the channel.
759    ///
760    /// If all receivers have been dropped or the timeout has expired, this method will return
761    /// an error. If the channel is bounded and is full, this method will block until space is
762    /// available, the timeout has expired, or all receivers have been dropped.
763    pub fn send_timeout(&self, m: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
764        let id = self.get_next_id();
765        let (m, mut guard) = match try_send(m, id, self.inner.shared_state.lock()) {
766            TrySendResult::Ok => return Ok(()),
767            TrySendResult::Disconnected(m) => return Err(SendTimeoutError::Disconnected(m)),
768            TrySendResult::Full(m, guard) => (m, guard),
769        };
770        let sync_signal = SyncSignal::new();
771        guard
772            .pending_sends
773            .enqueue(id, (m, Some(sync_signal.clone().into())));
774        drop(guard);
775        loop {
776            let _ = sync_signal.wait_timeout(timeout);
777            let mut guard = self.inner.shared_state.lock();
778            if let Some((_, (m, Some(_)))) = guard.pending_sends.remove(id) {
779                if guard.closed {
780                    return Err(SendTimeoutError::Disconnected(m));
781                }
782                return Err(SendTimeoutError::Timeout(m));
783            }
784            if !guard.pending_sends.contains(id) {
785                break;
786            }
787        }
788        Ok(())
789    }
790
791    /// Sends a value into the channel, returning an error if the channel is full and the
792    /// deadline has passed, or if all receivers have been dropped.
793    pub fn send_deadline(&self, m: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
794        self.send_timeout(m, deadline.checked_duration_since(Instant::now()).unwrap())
795    }
796
797    /// Returns `true` if the two senders belong to the same channel, and `false` otherwise.
798    pub fn same_channel(&self, other: &Sender<T>) -> bool {
799        Arc::ptr_eq(&self.inner.shared_state, &other.inner.shared_state)
800    }
801
802    /// Returns the number of messages currently in the channel.
803    ///
804    /// This function is useful for determining how many messages are waiting to be processed
805    /// by consumers, or for implementing backpressure mechanisms.
806    pub fn len(&self) -> usize {
807        self.inner.shared_state.lock().len()
808    }
809
810    /// Returns the capacity of the channel, if it is bounded. Otherwise, returns `None`.
811    pub fn capacity(&self) -> Option<usize> {
812        self.inner.shared_state.lock().cap
813    }
814
815    /// Returns true if the channel is empty.
816    ///
817    /// Note: Zero-capacity channels are always empty.
818    pub fn is_empty(&self) -> bool {
819        self.inner.shared_state.lock().is_empty()
820    }
821
822    /// Returns true if the channel is full.
823    ///
824    /// Note: Zero-capacity channels are always full.
825    pub fn is_full(&self) -> bool {
826        self.inner.shared_state.lock().is_full()
827    }
828
829    /// Closes the channel.
830    ///
831    /// Returns true only if this call actively closed the channel, which was previously open.
832    ///
833    /// The remaining messages can still be received.
834    pub fn close(&self) -> bool {
835        self.inner.shared_state.lock().close()
836    }
837
838    /// Returns true if the channel is closed.
839    pub fn is_closed(&self) -> bool {
840        self.inner.shared_state.lock().is_closed()
841    }
842
843    /// Returns a sink that can be used to send values into the channel.
844    pub fn sink(&self) -> SendSink<T> {
845        SendSink(SendFuture {
846            sender: self.clone(),
847            msg: MessageOrId::Invalid,
848        })
849    }
850
851    /// Converts this sender into a sink that can be used to send values into the channel.
852    pub fn into_sink(self) -> SendSink<T> {
853        SendSink(SendFuture {
854            sender: self,
855            msg: MessageOrId::Invalid,
856        })
857    }
858}
859
860impl<T> Drop for Sender<T> {
861    fn drop(&mut self) {
862        let _ = self
863            .inner
864            .send_count
865            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| {
866                let mut count = c;
867                if count > 0 {
868                    count -= 1;
869                    if count == 0 {
870                        self.inner.shared_state.lock().close();
871                    }
872                }
873                Some(count)
874            });
875    }
876}
877
878struct ReceiverInner<T> {
879    shared_state: Arc<Mutex<SharedState<T>>>,
880    recv_count: AtomicUsize,
881    next_id: AtomicUsize,
882}
883
884/// The receiving end of a channel.
885///
886/// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
887/// Each message will only be received by a single receiver. This is useful for
888/// implementing work stealing for concurrent programs.
889pub struct Receiver<T> {
890    inner: Arc<ReceiverInner<T>>,
891}
892
893impl<T> std::fmt::Debug for Receiver<T> {
894    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
895        f.debug_struct("Receiver").finish()
896    }
897}
898
899impl<T> Clone for Receiver<T> {
900    /// Clone this receiver. [`Receiver`] acts as a handle to the ending a channel. Remaining
901    /// channel contents will only be cleaned up when all senders and the receiver have been
902    /// dropped.
903    ///
904    /// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
905    /// Each message will only be received by a single receiver. This is useful for
906    /// implementing work stealing for concurrent programs.
907    fn clone(&self) -> Self {
908        self.inner.recv_count.fetch_add(1, Ordering::Relaxed);
909        Self {
910            inner: Arc::clone(&self.inner),
911        }
912    }
913}
914
915impl<T> Receiver<T> {
916    fn new(shared_state: Arc<Mutex<SharedState<T>>>) -> Self {
917        Self {
918            inner: Arc::new(ReceiverInner {
919                shared_state,
920                recv_count: AtomicUsize::new(1),
921                next_id: AtomicUsize::new(1),
922            }),
923        }
924    }
925
926    fn get_next_id(&self) -> usize {
927        self.inner.next_id.fetch_add(1, Ordering::Relaxed)
928    }
929
930    /// Attempts to receive a value from the channel associated with this receiver, returning an error if
931    /// the channel is empty or if all senders have been dropped.
932    ///
933    /// This method will block until a value is available on the channel, or until the channel is empty
934    /// or all senders have been dropped.
935    pub fn try_recv(&self) -> Result<T, TryRecvError> {
936        match try_recv(self.inner.shared_state.lock()) {
937            TryRecvResult::Ok(m) => Ok(m),
938            TryRecvResult::Disconnected => Err(TryRecvError::Disconnected),
939            TryRecvResult::Empty(_) => Err(TryRecvError::Empty),
940        }
941    }
942
943    /// Asynchronously receive a value from the channel, returning an error if all senders have been dropped.
944    /// If the channel is empty, the returned future will yield to the async runtime.
945    ///
946    /// This method returns a future that will be resolved with the value received from the channel,
947    /// or with an error if the channel is closed.
948    pub fn recv_async(&self) -> RecvFuture<T> {
949        RecvFuture {
950            id: self.get_next_id(),
951            receiver: self.clone(),
952        }
953    }
954
955    /// Wait for an incoming value from the channel associated with this receiver. If all senders have been
956    /// dropped and there are no more messages in the channel, this method will return an error.
957    pub fn recv(&self) -> Result<T, RecvError> {
958        loop {
959            let mut guard = match try_recv(self.inner.shared_state.lock()) {
960                TryRecvResult::Ok(r) => return Ok(r),
961                TryRecvResult::Disconnected => return Err(RecvError::Disconnected),
962                TryRecvResult::Empty(guard) => guard,
963            };
964            let id = self.get_next_id();
965            let sync_signal = SyncSignal::new();
966            guard.pending_recvs.enqueue(id, sync_signal.clone().into());
967            drop(guard);
968            sync_signal.wait();
969        }
970    }
971
972    /// Receives a value from the channel associated with this receiver, blocking the current thread
973    /// until a value is available or the timeout expires.
974    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
975        let start_time = Instant::now();
976        let mut timeout_remaining = timeout;
977        loop {
978            let mut guard = match try_recv(self.inner.shared_state.lock()) {
979                TryRecvResult::Ok(r) => return Ok(r),
980                TryRecvResult::Disconnected => return Err(RecvTimeoutError::Disconnected),
981                TryRecvResult::Empty(guard) => guard,
982            };
983            if guard.closed {
984                return Err(RecvTimeoutError::Disconnected);
985            }
986            let id = self.get_next_id();
987            let sync_signal = SyncSignal::new();
988            guard.pending_recvs.enqueue(id, sync_signal.clone().into());
989            drop(guard);
990            let _ = sync_signal.wait_timeout(timeout_remaining);
991            let elapsed = start_time.elapsed();
992            if elapsed >= timeout {
993                let mut guard = self.inner.shared_state.lock();
994                guard.pending_recvs.remove(id);
995                drop(guard);
996                return Err(RecvTimeoutError::Timeout);
997            }
998            timeout_remaining = timeout - elapsed;
999        }
1000    }
1001
1002    /// Receives a value from the channel associated with this receiver, blocking the current thread
1003    /// until a value is available or the deadline has passed.
1004    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1005        self.recv_timeout(deadline.checked_duration_since(Instant::now()).unwrap())
1006    }
1007
1008    /// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike
1009    /// `try_iter`, the iterator will not attempt to fetch any more values from the channel once
1010    /// the function has been called.
1011    pub fn drain(&self) -> Drain<T> {
1012        let mut guard = self.inner.shared_state.lock();
1013        let queue = std::mem::take(&mut guard.queue);
1014        let n = guard
1015            .cap
1016            .map_or(0, |cap| cap.min(guard.pending_sends.len()));
1017        for _ in 0..n {
1018            if let Some((id, (m, mut s))) = guard.pending_sends.dequeue() {
1019                guard.queue.enqueue(id, m);
1020                if let Some(s) = s.take() {
1021                    s.wake();
1022                }
1023            }
1024        }
1025        Drain {
1026            queue,
1027            _phantom: PhantomData,
1028        }
1029    }
1030
1031    /// Returns a blocking iterator over the values received on the channel. The iterator will finish
1032    /// iteration when all senders have been dropped.
1033    pub fn iter(&self) -> Iter<T> {
1034        Iter { receiver: self }
1035    }
1036
1037    /// An iterator over the values received on the channel that finishes iteration when all senders
1038    /// have been dropped or the channel is empty.
1039    ///
1040    /// This iterator is non-blocking, meaning that it will not wait for the next value to be available
1041    /// if there is not one already. If there is no value available, the iterator will return `None`.
1042    pub fn try_iter(&self) -> TryIter<T> {
1043        TryIter { receiver: self }
1044    }
1045
1046    /// Returns `true` if the two receivers belong to the same channel, and `false` otherwise.
1047    pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1048        Arc::ptr_eq(&self.inner.shared_state, &other.inner.shared_state)
1049    }
1050
1051    /// Returns the number of messages currently in the channel.
1052    ///
1053    /// This function is useful for determining how many messages are waiting to be processed
1054    /// by consumers, or for implementing backpressure mechanisms.
1055    pub fn len(&self) -> usize {
1056        self.inner.shared_state.lock().len()
1057    }
1058
1059    /// Returns the capacity of the channel, if it is bounded. Otherwise, returns `None`.
1060    pub fn capacity(&self) -> Option<usize> {
1061        self.inner.shared_state.lock().cap
1062    }
1063
1064    /// Returns true if the channel is empty.
1065    ///
1066    /// Note: Zero-capacity channels are always empty.
1067    pub fn is_empty(&self) -> bool {
1068        self.inner.shared_state.lock().is_empty()
1069    }
1070
1071    /// Returns true if the channel is full.
1072    ///
1073    /// Note: Zero-capacity channels are always full.
1074    pub fn is_full(&self) -> bool {
1075        self.inner.shared_state.lock().is_full()
1076    }
1077
1078    /// Closes the channel.
1079    ///
1080    /// Returns true only if this call actively closed the channel, which was previously open.
1081    ///
1082    /// The remaining messages can still be received.
1083    pub fn close(&self) -> bool {
1084        self.inner.shared_state.lock().close()
1085    }
1086
1087    /// Returns true if the channel is closed.
1088    pub fn is_closed(&self) -> bool {
1089        self.inner.shared_state.lock().is_closed()
1090    }
1091
1092    /// Returns a stream of messages from the channel.
1093    pub fn stream(&self) -> RecvStream<T> {
1094        RecvStream(RecvFuture {
1095            id: self.get_next_id(),
1096            receiver: self.clone(),
1097        })
1098    }
1099
1100    /// Convert this receiver into a stream that allows asynchronously receiving messages from the channel.
1101    pub fn into_stream(self) -> RecvStream<T> {
1102        RecvStream(RecvFuture {
1103            id: self.get_next_id(),
1104            receiver: self,
1105        })
1106    }
1107}
1108
1109impl<T> Drop for Receiver<T> {
1110    fn drop(&mut self) {
1111        let _ = self
1112            .inner
1113            .recv_count
1114            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| {
1115                let mut count = c;
1116                if count > 0 {
1117                    count -= 1;
1118                    if count == 0 {
1119                        self.inner.shared_state.lock().close();
1120                    }
1121                }
1122                Some(count)
1123            });
1124    }
1125}
1126
1127/// An fixed-sized iterator over the msgs drained from a channel.
1128#[derive(Debug)]
1129pub struct Drain<'a, T> {
1130    queue: Queue<T>,
1131    /// A phantom field used to constrain the lifetime of this iterator. We do this because the
1132    /// implementation may change and we don't want to unintentionally constrain it. Removing this
1133    /// lifetime later is a possibility.
1134    _phantom: PhantomData<&'a ()>,
1135}
1136
1137impl<'a, T> Iterator for Drain<'a, T> {
1138    type Item = T;
1139
1140    fn next(&mut self) -> Option<Self::Item> {
1141        self.queue.dequeue().map(|(_, x)| x)
1142    }
1143}
1144
1145impl<'a, T> ExactSizeIterator for Drain<'a, T> {
1146    fn len(&self) -> usize {
1147        self.queue.len()
1148    }
1149}
1150
1151impl<'a, T> Iterator for Iter<'a, T> {
1152    type Item = T;
1153
1154    fn next(&mut self) -> Option<Self::Item> {
1155        self.receiver.recv().ok()
1156    }
1157}
1158
1159impl<'a, T> Iterator for TryIter<'a, T> {
1160    type Item = T;
1161
1162    fn next(&mut self) -> Option<Self::Item> {
1163        self.receiver.try_recv().ok()
1164    }
1165}
1166
1167impl<'a, T> IntoIterator for &'a Receiver<T> {
1168    type Item = T;
1169    type IntoIter = Iter<'a, T>;
1170
1171    fn into_iter(self) -> Self::IntoIter {
1172        Iter { receiver: self }
1173    }
1174}
1175
1176impl<T> IntoIterator for Receiver<T> {
1177    type Item = T;
1178    type IntoIter = IntoIter<T>;
1179
1180    fn into_iter(self) -> Self::IntoIter {
1181        IntoIter { receiver: self }
1182    }
1183}
1184
1185/// A stream which allows asynchronously receiving messages.
1186///
1187/// Can be created via [`Receiver::stream`] or [`Receiver::into_stream`].
1188pub struct RecvStream<T>(RecvFuture<T>);
1189
1190impl<T> RecvStream<T> {
1191    /// See [`Receiver::is_closed`].
1192    pub fn is_closed(&self) -> bool {
1193        self.0.is_closed()
1194    }
1195
1196    /// See [`Receiver::is_empty`].
1197    pub fn is_empty(&self) -> bool {
1198        self.0.is_empty()
1199    }
1200
1201    /// See [`Receiver::is_full`].
1202    pub fn is_full(&self) -> bool {
1203        self.0.is_full()
1204    }
1205
1206    /// See [`Receiver::len`].
1207    pub fn len(&self) -> usize {
1208        self.0.len()
1209    }
1210
1211    /// See [`Receiver::capacity`].
1212    pub fn capacity(&self) -> Option<usize> {
1213        self.0.capacity()
1214    }
1215
1216    /// Returns whether the SendSinks are belong to the same channel.
1217    pub fn same_channel(&self, other: &Self) -> bool {
1218        self.0.receiver.same_channel(&other.0.receiver)
1219    }
1220}
1221
1222impl<T> std::fmt::Debug for RecvStream<T> {
1223    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1224        f.debug_struct("RecvStream").finish()
1225    }
1226}
1227
1228impl<T> Clone for RecvStream<T> {
1229    fn clone(&self) -> RecvStream<T> {
1230        RecvStream(RecvFuture {
1231            id: self.0.receiver.get_next_id(),
1232            receiver: self.0.receiver.clone(),
1233        })
1234    }
1235}
1236
1237impl<T> Stream for RecvStream<T> {
1238    type Item = T;
1239
1240    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1241        match Pin::new(&mut self.0).poll(cx) {
1242            Poll::Pending => Poll::Pending,
1243            Poll::Ready(item) => Poll::Ready(item.ok()),
1244        }
1245    }
1246}
1247
1248fn channel<T>(cap: Option<usize>) -> (Sender<T>, Receiver<T>) {
1249    let shared_state = Arc::new(Mutex::new(SharedState::new(cap)));
1250    let sender = Sender::new(Arc::clone(&shared_state));
1251    let receiver = Receiver::new(shared_state);
1252    (sender, receiver)
1253}
1254
1255/// Create a bounded channel with a limited maximum capacity.
1256///
1257/// Returns a tuple of a [`Sender`] and a [`Receiver`], which can be used to send and receive messages, respectively.
1258/// The channel has a limited capacity, which means that it can only hold a certain number of messages at a time.
1259/// If the channel is full, calls to [`Sender::send`] will block until there is space available.
1260///
1261/// Bounded channels are useful for controlling the flow of data between threads. For example, you can use a bounded
1262/// channel to prevent a producer thread from overwhelming a consumer thread.
1263///
1264/// Unlike an [`unbounded`] channel, if there is no space left for new messages, calls to
1265/// [`Sender::send`] will block (unblocking once a receiver has made space). If blocking behaviour
1266/// is not desired, [`Sender::try_send`] may be used.
1267///
1268/// Also 'rendezvous' channels are supported by loole. A bounded queue with a limited maximum capacity of zero will
1269/// block senders until a receiver is available to take the value.
1270///
1271/// Producers can send and consumers can receive messages asynchronously or synchronously.
1272///
1273/// # Examples
1274/// ```
1275/// let (tx, rx) = loole::bounded(3);
1276///
1277/// tx.send(1).unwrap();
1278/// tx.send(2).unwrap();
1279/// tx.send(3).unwrap();
1280///
1281/// assert!(tx.try_send(4).is_err());
1282///
1283/// let mut sum = 0;
1284/// sum += rx.recv().unwrap();
1285/// sum += rx.recv().unwrap();
1286/// sum += rx.recv().unwrap();
1287///
1288/// assert!(rx.try_recv().is_err());
1289///
1290/// assert_eq!(sum, 1 + 2 + 3);
1291/// ```
1292pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
1293    channel(Some(cap))
1294}
1295
1296/// Creates an unbounded channel, which has unlimited capacity.
1297///
1298/// This function creates a pair of sender and receiver halves of a channel. Values sent on the sender half will be
1299/// received on the receiver half, in the same order in which they were sent. The channel is thread-safe, and both
1300/// sender and receiver halves can be sent to or shared between threads as needed. Additionally, both sender and
1301/// receiver halves can be cloned.
1302///
1303/// Producers can send and consumers can receive messages asynchronously or synchronously.
1304///
1305/// # Examples
1306/// ```
1307/// let (tx, rx) = loole::unbounded();
1308///
1309/// tx.send(10).unwrap();
1310/// assert_eq!(rx.recv().unwrap(), 10);
1311/// ```
1312pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1313    channel(None)
1314}