uchan/
lib.rs

1//! Multi-producer, single-consumer FIFO queue communication primitives.
2//!
3//! This module provides message-based communication over channels, concretely
4//! defined among two types:
5//!
6//! * [`Sender`]
7//! * [`Receiver`]
8//!
9//! A [`Sender`] is used to send data to a [`Receiver`]. Senders are clone-able (multi-producer)
10//! such that many threads can send simultaneously to one receiver (single-consumer).
11//!
12//! There is currently one flavour available: An asynchronous, infinitely buffered channel.
13//! The [`channel`] function will return a `(Sender, Receiver)` tuple where all sends will be
14//! **asynchronous** (they never block). The channel conceptually has an infinite buffer.
15//!
16//! ## `no_std` Usage
17//!
18//! Channels can be used in `no_std` settings due to the thread blocking facilities being made generic.
19//! Use the `Event` trait to implement thread parking and create custom [`RawSender`]s and [`Receiver`]s using `raw_channel`.
20//! The default [`Sender`] and [`Receiver`] use `StdEvent` which implements thread parking using `std::thread::park`.
21//!
22//! ## Disconnection
23//!
24//! The send and receive operations on channels will all return a [`Result`]
25//! indicating whether the operation succeeded or not. An unsuccessful operation
26//! is normally indicative of the other half of a channel having "hung up" by
27//! being dropped in its corresponding thread.
28//!
29//! Once half of a channel has been deallocated, most operations can no longer
30//! continue to make progress, so [`Err`] will be returned. Many applications
31//! will continue to [`unwrap`] the results returned from this module,
32//! instigating a propagation of failure among threads if one unexpectedly dies.
33//!
34//! [`unwrap`]: Result::unwrap
35//!
36//! # Examples
37//!
38//! Simple usage:
39//!
40//! ```
41//! use std::thread;
42//! use uchan::channel;
43//!
44//! // Create a simple streaming channel
45//! let (tx, rx) = channel();
46//! thread::spawn(move|| {
47//!     tx.send(10).unwrap();
48//! });
49//! assert_eq!(rx.recv().unwrap(), 10);
50//! ```
51//!
52//! Shared usage:
53//!
54//! ```
55//! use std::thread;
56//! use uchan::channel;
57//!
58//! // Create a shared channel that can be sent along from many threads
59//! // where tx is the sending half (tx for transmission), and rx is the receiving
60//! // half (rx for receiving).
61//! let (tx, rx) = channel();
62//! for i in 0..10 {
63//!     let tx = tx.clone();
64//!     thread::spawn(move|| {
65//!         tx.send(i).unwrap();
66//!     });
67//! }
68//!
69//! for _ in 0..10 {
70//!     let j = rx.recv().unwrap();
71//!     assert!(0 <= j && j < 10);
72//! }
73//! ```
74//!
75//! Propagating panics:
76//!
77//! ```
78//! use uchan::channel;
79//!
80//! // The call to recv() will return an error because the channel has already
81//! // hung up (or been deallocated)
82//! let (tx, rx) = channel::<i32>();
83//! drop(tx);
84//! assert!(rx.recv().is_err());
85//! ```
86
87#![cfg_attr(not(feature = "std"), no_std)]
88#![allow(unstable_name_collisions)]
89#![warn(
90    rust_2018_idioms,
91    unreachable_pub,
92    missing_docs,
93    missing_debug_implementations
94)]
95
96extern crate alloc;
97
98mod backoff;
99mod event;
100mod parker;
101mod queue;
102
103use alloc::sync::Arc;
104use core::{fmt, marker::PhantomData};
105use queue::Queue;
106
107pub use event::{Event, TimedEvent};
108
109#[cfg(feature = "std")]
110pub use if_std::*;
111
112#[cfg(feature = "std")]
113mod if_std {
114    pub use super::event::StdEvent;
115
116    /// An unbounded channel sender implemented with [`StdEvent`].
117    /// See [`RawSender`] for more details.
118    ///
119    /// [`RawSender`]: super::RawSender
120    pub type Sender<T> = super::RawSender<T>;
121
122    /// An unbounded channel receiver implemented with [`StdEvent`].
123    /// See [`RawReceiver`] for more details.
124    ///
125    /// [`RawReceiver`]: super::RawReceiver
126    pub type Receiver<T> = super::RawReceiver<StdEvent, T>;
127
128    /// Creates an unbounded channel [`Sender`] and [`Receiver`] using the `StdEvent` implementation.
129    /// See [`raw_channel`] for more details.
130    ///
131    /// [`raw_channel`]: super::raw_channel
132    pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
133        super::raw_channel::<StdEvent, T>()
134    }
135}
136
137/// An error returned from the [`RawSender::send`] function on **channel**s.
138///
139/// A **send** operation can only fail if the receiving end of a channel is
140/// disconnected, implying that the data could never be received. The error
141/// contains the data being sent as a payload so it can be recovered.
142#[derive(PartialEq, Eq, Clone, Copy)]
143pub struct SendError<T>(pub T);
144
145#[cfg(feature = "std")]
146impl<T: Send> std::error::Error for SendError<T> {}
147
148impl<T> fmt::Debug for SendError<T> {
149    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150        f.debug_struct("SendError").finish_non_exhaustive()
151    }
152}
153
154impl<T> fmt::Display for SendError<T> {
155    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156        "sending on a closed channel".fmt(f)
157    }
158}
159
160/// An error returned from the [`recv`] function on a [`RawReceiver`].
161///
162/// The [`recv`] operation can only fail if the sending half of a
163/// [`raw_channel`] is disconnected, implying that no further messages
164/// will ever be received.
165///
166/// [`recv`]: RawReceiver::recv
167#[derive(PartialEq, Eq, Clone, Copy, Debug)]
168pub struct RecvError;
169
170#[cfg(feature = "std")]
171impl std::error::Error for RecvError {}
172
173impl fmt::Display for RecvError {
174    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175        "receiving on a closed channel".fmt(f)
176    }
177}
178
179/// This enumeration is the list of the possible reasons that [`try_recv`] could
180/// not return data when called. This can occur with a [`raw_channel`].
181///
182/// [`try_recv`]: RawReceiver::try_recv
183#[derive(PartialEq, Eq, Clone, Copy, Debug)]
184pub enum TryRecvError {
185    /// This **channel** is currently empty, but the **Sender**(s) have not yet
186    /// disconnected, so data may yet become available.
187    Empty,
188
189    /// The **channel**'s sending half has become disconnected, and there will
190    /// never be any more data received on it.
191    Disconnected,
192}
193
194#[cfg(feature = "std")]
195impl std::error::Error for TryRecvError {}
196
197impl fmt::Display for TryRecvError {
198    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199        match *self {
200            TryRecvError::Empty => "receiving on an empty channel".fmt(f),
201            TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
202        }
203    }
204}
205
206impl From<RecvError> for TryRecvError {
207    /// Converts a `RecvError` into a `TryRecvError`.
208    ///
209    /// This conversion always returns `TryRecvError::Disconnected`.
210    ///
211    /// No data is allocated on the heap.
212    fn from(err: RecvError) -> TryRecvError {
213        match err {
214            RecvError => TryRecvError::Disconnected,
215        }
216    }
217}
218
219/// This enumeration is the list of possible errors that made [`recv_timeout`]
220/// unable to return data when called. This can occur with a [`raw_channel`].
221///
222/// [`recv_timeout`]: RawReceiver::recv_timeout
223#[derive(PartialEq, Eq, Clone, Copy, Debug)]
224pub enum RecvTimeoutError {
225    /// This **channel** is currently empty, but the **Sender**(s) have not yet
226    /// disconnected, so data may yet become available.
227    Timeout,
228    /// The **channel**'s sending half has become disconnected, and there will
229    /// never be any more data received on it.
230    Disconnected,
231}
232
233#[cfg(feature = "std")]
234impl std::error::Error for RecvTimeoutError {}
235
236impl fmt::Display for RecvTimeoutError {
237    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
238        match *self {
239            RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
240            RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
241        }
242    }
243}
244
245impl From<RecvError> for RecvTimeoutError {
246    /// Converts a `RecvError` into a `RecvTimeoutError`.
247    ///
248    /// This conversion always returns `RecvTimeoutError::Disconnected`.
249    ///
250    /// No data is allocated on the heap.
251    fn from(err: RecvError) -> RecvTimeoutError {
252        match err {
253            RecvError => RecvTimeoutError::Disconnected,
254        }
255    }
256}
257
258/// Creates a new asynchronous channel, returning the sender/receiver halves.
259/// All data sent on the [`RawSender`] will become available on the [`RawReceiver`] in
260/// the same order as it was sent, and no [`send`] will block the calling thread
261/// (this channel has an "infinite buffer"). [`recv`] will block until a message
262/// is available while there is at least one [`Sender`] alive (including clones).
263///
264/// The [`RawSender`] can be cloned to [`send`] to the same channel multiple times, but
265/// only one [`RawReceiver`] is supported.
266///
267/// If the [`RawReceiver`] is disconnected while trying to [`send`] with the
268/// [`RawSender`], the [`send`] method will return a [`SendError`]. Similarly, if the
269/// [`RawSender`] is disconnected while trying to [`recv`], the [`recv`] method will
270/// return a [`RecvError`].
271///
272/// [`send`]: RawSender::send
273/// [`recv`]: RawReceiver::recv
274///
275/// # Examples
276///
277/// ```
278/// use uchan::channel;
279/// use std::thread;
280///
281/// let (sender, receiver) = channel();
282///
283/// // Spawn off an expensive computation
284/// thread::spawn(move|| {
285/// #   fn expensive_computation() {}
286///     sender.send(expensive_computation()).unwrap();
287/// });
288///
289/// // Do some useful work for awhile
290///
291/// // Let's see what that answer was
292/// println!("{:?}", receiver.recv().unwrap());
293/// ```
294pub fn raw_channel<E, T>() -> (RawSender<T>, RawReceiver<E, T>) {
295    let queue = Arc::new(Queue::EMPTY);
296    let sender = RawSender {
297        queue: queue.clone(),
298    };
299    let receiver = RawReceiver {
300        queue,
301        _event: PhantomData,
302    };
303    (sender, receiver)
304}
305
306/// The sending-half of Rust's asynchronous [`raw_channel`] type. This half can only be
307/// owned by one thread, but it can be cloned to send to other threads.
308///
309/// Messages can be sent through this channel with [`send`].
310///
311/// Note: all senders (the original and the clones) need to be dropped for the receiver
312/// to stop blocking to receive messages with [`RawReceiver::recv`].
313///
314/// [`send`]: RawSender::send
315///
316/// # Examples
317///
318/// ```rust
319/// use uchan::channel;
320/// use std::thread;
321///
322/// let (sender, receiver) = channel();
323/// let sender2 = sender.clone();
324///
325/// // First thread owns sender
326/// thread::spawn(move || {
327///     sender.send(1).unwrap();
328/// });
329///
330/// // Second thread owns sender2
331/// thread::spawn(move || {
332///     sender2.send(2).unwrap();
333/// });
334///
335/// let msg = receiver.recv().unwrap();
336/// let msg2 = receiver.recv().unwrap();
337///
338/// assert_eq!(3, msg + msg2);
339/// ```
340pub struct RawSender<T> {
341    queue: Arc<Queue<T>>,
342}
343
344impl<T> RawSender<T> {
345    /// Attempts to send a value on this channel, returning it back if it could
346    /// not be sent.
347    ///
348    /// A successful send occurs when it is determined that the other end of
349    /// the channel has not hung up already. An unsuccessful send would be one
350    /// where the corresponding receiver has already been deallocated. Note
351    /// that a return value of [`Err`] means that the data will never be
352    /// received, but a return value of [`Ok`] does *not* mean that the data
353    /// will be received. It is possible for the corresponding receiver to
354    /// hang up immediately after this function returns [`Ok`].
355    ///
356    /// This method will never block the current thread.
357    ///
358    /// # Examples
359    ///
360    /// ```
361    /// use uchan::channel;
362    ///
363    /// let (tx, rx) = channel();
364    ///
365    /// // This send is always successful
366    /// tx.send(1).unwrap();
367    ///
368    /// // This send will fail because the receiver is gone
369    /// drop(rx);
370    /// assert_eq!(tx.send(1).unwrap_err().0, 1);
371    /// ```
372    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
373        self.queue.send(value).map_err(SendError)
374    }
375}
376
377impl<T> Clone for RawSender<T> {
378    /// Clone a sender to send to other threads.
379    ///
380    /// Note, be aware of the lifetime of the sender because all senders
381    /// (including the original) need to be dropped in order for
382    /// [`RawReceiver::recv`] to stop blocking.
383    fn clone(&self) -> Self {
384        Self {
385            queue: self.queue.clone(),
386        }
387    }
388}
389
390impl<T> fmt::Debug for RawSender<T> {
391    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392        f.debug_struct("Sender").finish_non_exhaustive()
393    }
394}
395
396impl<T> Drop for RawSender<T> {
397    fn drop(&mut self) {
398        if Arc::strong_count(&self.queue) == 2 {
399            let is_sender = true;
400            self.queue.disconnect(is_sender);
401        }
402    }
403}
404
405/// The receiving half of Rust's [`raw_channel`] type.
406/// This half can only be owned by one thread.
407///
408/// Messages sent to the channel can be retrieved using [`recv`].
409///
410/// [`recv`]: RawReceiver::recv
411///
412/// # Examples
413///
414/// ```rust
415/// use uchan::channel;
416/// use std::thread;
417/// use std::time::Duration;
418///
419/// let (send, recv) = channel();
420///
421/// thread::spawn(move || {
422///     send.send("Hello world!").unwrap();
423///     thread::sleep(Duration::from_secs(2)); // block for two seconds
424///     send.send("Delayed for 2 seconds").unwrap();
425/// });
426///
427/// println!("{}", recv.recv().unwrap()); // Received immediately
428/// println!("Waiting...");
429/// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
430/// ```
431pub struct RawReceiver<E, T> {
432    queue: Arc<Queue<T>>,
433    _event: PhantomData<E>,
434}
435
436impl<E, T> RawReceiver<E, T> {
437    /// Attempts to return a pending value on this receiver without blocking.
438    ///
439    /// This method will never block the caller in order to wait for data to
440    /// become available. Instead, this will always return immediately with a
441    /// possible option of pending data on the channel.
442    ///
443    /// This is useful for a flavor of "optimistic check" before deciding to
444    /// block on a receiver.
445    ///
446    /// Compared with [`recv`], this function has two failure cases instead of one
447    /// (one for disconnection, one for an empty buffer).
448    ///
449    /// [`recv`]: Self::recv
450    ///
451    /// # Examples
452    ///
453    /// ```rust
454    /// use uchan::{Receiver, channel};
455    ///
456    /// let (_, receiver): (_, Receiver<i32>) = channel();
457    ///
458    /// assert!(receiver.try_recv().is_err());
459    /// ```
460    pub fn try_recv(&self) -> Result<T, TryRecvError> {
461        match unsafe { self.queue.try_recv() } {
462            Ok(Some(value)) => Ok(value),
463            Ok(None) => Err(TryRecvError::Empty),
464            Err(()) => Err(TryRecvError::Disconnected),
465        }
466    }
467}
468
469impl<E: Event, T> RawReceiver<E, T> {
470    /// Attempts to wait for a value on this receiver, returning an error if the
471    /// corresponding channel has hung up.
472    ///
473    /// This function will always block the current thread if there is no data
474    /// available and it's possible for more data to be sent (at least one sender
475    /// still exists). Once a message is sent to the corresponding [`RawSender`],
476    /// this receiver will wake up and return that message.
477    ///
478    /// If the corresponding [`RawSender`] has disconnected, or it disconnects while
479    /// this call is blocking, this call will wake up and return [`Err`] to
480    /// indicate that no more messages can ever be received on this channel.
481    /// However, since channels are buffered, messages sent before the disconnect
482    /// will still be properly received.
483    ///
484    /// # Examples
485    ///
486    /// ```
487    /// use uchan::channel;
488    /// use std::thread;
489    ///
490    /// let (send, recv) = channel();
491    /// let handle = thread::spawn(move || {
492    ///     send.send(1u8).unwrap();
493    /// });
494    ///
495    /// handle.join().unwrap();
496    ///
497    /// assert_eq!(Ok(1), recv.recv());
498    /// ```
499    ///
500    /// Buffering behavior:
501    ///
502    /// ```
503    /// use uchan::{channel, RecvError};
504    /// use std::thread;
505    ///
506    /// let (send, recv) = channel();
507    /// let handle = thread::spawn(move || {
508    ///     send.send(1u8).unwrap();
509    ///     send.send(2).unwrap();
510    ///     send.send(3).unwrap();
511    ///     drop(send);
512    /// });
513    ///
514    /// // wait for the thread to join so we ensure the sender is dropped
515    /// handle.join().unwrap();
516    ///
517    /// assert_eq!(Ok(1), recv.recv());
518    /// assert_eq!(Ok(2), recv.recv());
519    /// assert_eq!(Ok(3), recv.recv());
520    /// assert_eq!(Err(RecvError), recv.recv());
521    /// ```
522    pub fn recv(&self) -> Result<T, RecvError> {
523        (unsafe { self.queue.recv::<E>() }).map_err(|_| RecvError)
524    }
525}
526
527impl<E: TimedEvent, T> RawReceiver<E, T> {
528    /// Attempts to wait for a value on this receiver, returning an error if the
529    /// corresponding channel has hung up, or if it waits more than `timeout`.
530    ///
531    /// This function will always block the current thread if there is no data
532    /// available and it's possible for more data to be sent (at least one sender
533    /// still exists). Once a message is sent to the corresponding [`RawSender`]
534    /// this receiver will wake up and return that message.
535    ///
536    /// If the corresponding [`RawSender`] has disconnected, or it disconnects while
537    /// this call is blocking, this call will wake up and return [`Err`] to
538    /// indicate that no more messages can ever be received on this channel.
539    /// However, since channels are buffered, messages sent before the disconnect
540    /// will still be properly received.
541    ///
542    /// # Examples
543    ///
544    /// Successfully receiving value before encountering timeout:
545    ///
546    /// ```no_run
547    /// use std::thread;
548    /// use std::time::Duration;
549    /// use uchan::channel;
550    ///
551    /// let (send, recv) = channel();
552    ///
553    /// thread::spawn(move || {
554    ///     send.send('a').unwrap();
555    /// });
556    ///
557    /// assert_eq!(
558    ///     recv.recv_timeout(Duration::from_millis(400)),
559    ///     Ok('a')
560    /// );
561    /// ```
562    ///
563    /// Receiving an error upon reaching timeout:
564    ///
565    /// ```no_run
566    /// use std::thread;
567    /// use std::time::Duration;
568    /// use uchan::{channel, RecvTimeoutError};
569    ///
570    /// let (send, recv) = channel();
571    ///
572    /// thread::spawn(move || {
573    ///     thread::sleep(Duration::from_millis(800));
574    ///     send.send('a').unwrap();
575    /// });
576    ///
577    /// assert_eq!(
578    ///     recv.recv_timeout(Duration::from_millis(400)),
579    ///     Err(RecvTimeoutError::Timeout)
580    /// );
581    /// ```
582    pub fn recv_timeout(&self, timeout: E::Duration) -> Result<T, RecvTimeoutError> {
583        match unsafe { self.queue.recv_timeout::<E>(timeout) } {
584            Ok(Some(value)) => Ok(value),
585            Ok(None) => Err(RecvTimeoutError::Timeout),
586            Err(()) => Err(RecvTimeoutError::Disconnected),
587        }
588    }
589}
590
591impl<E, T> fmt::Debug for RawReceiver<E, T> {
592    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
593        f.debug_struct("Receiver").finish_non_exhaustive()
594    }
595}
596
597impl<E, T> Drop for RawReceiver<E, T> {
598    fn drop(&mut self) {
599        let is_sender = false;
600        self.queue.disconnect(is_sender);
601    }
602}