uchan/lib.rs
1//! Multi-producer, single-consumer FIFO queue communication primitives.
2//!
3//! This module provides message-based communication over channels, concretely
4//! defined among two types:
5//!
6//! * [`Sender`]
7//! * [`Receiver`]
8//!
9//! A [`Sender`] is used to send data to a [`Receiver`]. Senders are clone-able (multi-producer)
10//! such that many threads can send simultaneously to one receiver (single-consumer).
11//!
12//! There is currently one flavour available: An asynchronous, infinitely buffered channel.
13//! The [`channel`] function will return a `(Sender, Receiver)` tuple where all sends will be
14//! **asynchronous** (they never block). The channel conceptually has an infinite buffer.
15//!
16//! ## `no_std` Usage
17//!
18//! Channels can be used in `no_std` settings due to the thread blocking facilities being made generic.
19//! Use the `Event` trait to implement thread parking and create custom [`RawSender`]s and [`Receiver`]s using `raw_channel`.
20//! The default [`Sender`] and [`Receiver`] use `StdEvent` which implements thread parking using `std::thread::park`.
21//!
22//! ## Disconnection
23//!
24//! The send and receive operations on channels will all return a [`Result`]
25//! indicating whether the operation succeeded or not. An unsuccessful operation
26//! is normally indicative of the other half of a channel having "hung up" by
27//! being dropped in its corresponding thread.
28//!
29//! Once half of a channel has been deallocated, most operations can no longer
30//! continue to make progress, so [`Err`] will be returned. Many applications
31//! will continue to [`unwrap`] the results returned from this module,
32//! instigating a propagation of failure among threads if one unexpectedly dies.
33//!
34//! [`unwrap`]: Result::unwrap
35//!
36//! # Examples
37//!
38//! Simple usage:
39//!
40//! ```
41//! use std::thread;
42//! use uchan::channel;
43//!
44//! // Create a simple streaming channel
45//! let (tx, rx) = channel();
46//! thread::spawn(move|| {
47//! tx.send(10).unwrap();
48//! });
49//! assert_eq!(rx.recv().unwrap(), 10);
50//! ```
51//!
52//! Shared usage:
53//!
54//! ```
55//! use std::thread;
56//! use uchan::channel;
57//!
58//! // Create a shared channel that can be sent along from many threads
59//! // where tx is the sending half (tx for transmission), and rx is the receiving
60//! // half (rx for receiving).
61//! let (tx, rx) = channel();
62//! for i in 0..10 {
63//! let tx = tx.clone();
64//! thread::spawn(move|| {
65//! tx.send(i).unwrap();
66//! });
67//! }
68//!
69//! for _ in 0..10 {
70//! let j = rx.recv().unwrap();
71//! assert!(0 <= j && j < 10);
72//! }
73//! ```
74//!
75//! Propagating panics:
76//!
77//! ```
78//! use uchan::channel;
79//!
80//! // The call to recv() will return an error because the channel has already
81//! // hung up (or been deallocated)
82//! let (tx, rx) = channel::<i32>();
83//! drop(tx);
84//! assert!(rx.recv().is_err());
85//! ```
86
87#![cfg_attr(not(feature = "std"), no_std)]
88#![allow(unstable_name_collisions)]
89#![warn(
90 rust_2018_idioms,
91 unreachable_pub,
92 missing_docs,
93 missing_debug_implementations
94)]
95
96extern crate alloc;
97
98mod backoff;
99mod event;
100mod parker;
101mod queue;
102
103use alloc::sync::Arc;
104use core::{fmt, marker::PhantomData};
105use queue::Queue;
106
107pub use event::{Event, TimedEvent};
108
109#[cfg(feature = "std")]
110pub use if_std::*;
111
112#[cfg(feature = "std")]
113mod if_std {
114 pub use super::event::StdEvent;
115
116 /// An unbounded channel sender implemented with [`StdEvent`].
117 /// See [`RawSender`] for more details.
118 ///
119 /// [`RawSender`]: super::RawSender
120 pub type Sender<T> = super::RawSender<T>;
121
122 /// An unbounded channel receiver implemented with [`StdEvent`].
123 /// See [`RawReceiver`] for more details.
124 ///
125 /// [`RawReceiver`]: super::RawReceiver
126 pub type Receiver<T> = super::RawReceiver<StdEvent, T>;
127
128 /// Creates an unbounded channel [`Sender`] and [`Receiver`] using the `StdEvent` implementation.
129 /// See [`raw_channel`] for more details.
130 ///
131 /// [`raw_channel`]: super::raw_channel
132 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
133 super::raw_channel::<StdEvent, T>()
134 }
135}
136
137/// An error returned from the [`RawSender::send`] function on **channel**s.
138///
139/// A **send** operation can only fail if the receiving end of a channel is
140/// disconnected, implying that the data could never be received. The error
141/// contains the data being sent as a payload so it can be recovered.
142#[derive(PartialEq, Eq, Clone, Copy)]
143pub struct SendError<T>(pub T);
144
145#[cfg(feature = "std")]
146impl<T: Send> std::error::Error for SendError<T> {}
147
148impl<T> fmt::Debug for SendError<T> {
149 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150 f.debug_struct("SendError").finish_non_exhaustive()
151 }
152}
153
154impl<T> fmt::Display for SendError<T> {
155 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156 "sending on a closed channel".fmt(f)
157 }
158}
159
160/// An error returned from the [`recv`] function on a [`RawReceiver`].
161///
162/// The [`recv`] operation can only fail if the sending half of a
163/// [`raw_channel`] is disconnected, implying that no further messages
164/// will ever be received.
165///
166/// [`recv`]: RawReceiver::recv
167#[derive(PartialEq, Eq, Clone, Copy, Debug)]
168pub struct RecvError;
169
170#[cfg(feature = "std")]
171impl std::error::Error for RecvError {}
172
173impl fmt::Display for RecvError {
174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175 "receiving on a closed channel".fmt(f)
176 }
177}
178
179/// This enumeration is the list of the possible reasons that [`try_recv`] could
180/// not return data when called. This can occur with a [`raw_channel`].
181///
182/// [`try_recv`]: RawReceiver::try_recv
183#[derive(PartialEq, Eq, Clone, Copy, Debug)]
184pub enum TryRecvError {
185 /// This **channel** is currently empty, but the **Sender**(s) have not yet
186 /// disconnected, so data may yet become available.
187 Empty,
188
189 /// The **channel**'s sending half has become disconnected, and there will
190 /// never be any more data received on it.
191 Disconnected,
192}
193
194#[cfg(feature = "std")]
195impl std::error::Error for TryRecvError {}
196
197impl fmt::Display for TryRecvError {
198 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199 match *self {
200 TryRecvError::Empty => "receiving on an empty channel".fmt(f),
201 TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
202 }
203 }
204}
205
206impl From<RecvError> for TryRecvError {
207 /// Converts a `RecvError` into a `TryRecvError`.
208 ///
209 /// This conversion always returns `TryRecvError::Disconnected`.
210 ///
211 /// No data is allocated on the heap.
212 fn from(err: RecvError) -> TryRecvError {
213 match err {
214 RecvError => TryRecvError::Disconnected,
215 }
216 }
217}
218
219/// This enumeration is the list of possible errors that made [`recv_timeout`]
220/// unable to return data when called. This can occur with a [`raw_channel`].
221///
222/// [`recv_timeout`]: RawReceiver::recv_timeout
223#[derive(PartialEq, Eq, Clone, Copy, Debug)]
224pub enum RecvTimeoutError {
225 /// This **channel** is currently empty, but the **Sender**(s) have not yet
226 /// disconnected, so data may yet become available.
227 Timeout,
228 /// The **channel**'s sending half has become disconnected, and there will
229 /// never be any more data received on it.
230 Disconnected,
231}
232
233#[cfg(feature = "std")]
234impl std::error::Error for RecvTimeoutError {}
235
236impl fmt::Display for RecvTimeoutError {
237 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
238 match *self {
239 RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
240 RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
241 }
242 }
243}
244
245impl From<RecvError> for RecvTimeoutError {
246 /// Converts a `RecvError` into a `RecvTimeoutError`.
247 ///
248 /// This conversion always returns `RecvTimeoutError::Disconnected`.
249 ///
250 /// No data is allocated on the heap.
251 fn from(err: RecvError) -> RecvTimeoutError {
252 match err {
253 RecvError => RecvTimeoutError::Disconnected,
254 }
255 }
256}
257
258/// Creates a new asynchronous channel, returning the sender/receiver halves.
259/// All data sent on the [`RawSender`] will become available on the [`RawReceiver`] in
260/// the same order as it was sent, and no [`send`] will block the calling thread
261/// (this channel has an "infinite buffer"). [`recv`] will block until a message
262/// is available while there is at least one [`Sender`] alive (including clones).
263///
264/// The [`RawSender`] can be cloned to [`send`] to the same channel multiple times, but
265/// only one [`RawReceiver`] is supported.
266///
267/// If the [`RawReceiver`] is disconnected while trying to [`send`] with the
268/// [`RawSender`], the [`send`] method will return a [`SendError`]. Similarly, if the
269/// [`RawSender`] is disconnected while trying to [`recv`], the [`recv`] method will
270/// return a [`RecvError`].
271///
272/// [`send`]: RawSender::send
273/// [`recv`]: RawReceiver::recv
274///
275/// # Examples
276///
277/// ```
278/// use uchan::channel;
279/// use std::thread;
280///
281/// let (sender, receiver) = channel();
282///
283/// // Spawn off an expensive computation
284/// thread::spawn(move|| {
285/// # fn expensive_computation() {}
286/// sender.send(expensive_computation()).unwrap();
287/// });
288///
289/// // Do some useful work for awhile
290///
291/// // Let's see what that answer was
292/// println!("{:?}", receiver.recv().unwrap());
293/// ```
294pub fn raw_channel<E, T>() -> (RawSender<T>, RawReceiver<E, T>) {
295 let queue = Arc::new(Queue::EMPTY);
296 let sender = RawSender {
297 queue: queue.clone(),
298 };
299 let receiver = RawReceiver {
300 queue,
301 _event: PhantomData,
302 };
303 (sender, receiver)
304}
305
306/// The sending-half of Rust's asynchronous [`raw_channel`] type. This half can only be
307/// owned by one thread, but it can be cloned to send to other threads.
308///
309/// Messages can be sent through this channel with [`send`].
310///
311/// Note: all senders (the original and the clones) need to be dropped for the receiver
312/// to stop blocking to receive messages with [`RawReceiver::recv`].
313///
314/// [`send`]: RawSender::send
315///
316/// # Examples
317///
318/// ```rust
319/// use uchan::channel;
320/// use std::thread;
321///
322/// let (sender, receiver) = channel();
323/// let sender2 = sender.clone();
324///
325/// // First thread owns sender
326/// thread::spawn(move || {
327/// sender.send(1).unwrap();
328/// });
329///
330/// // Second thread owns sender2
331/// thread::spawn(move || {
332/// sender2.send(2).unwrap();
333/// });
334///
335/// let msg = receiver.recv().unwrap();
336/// let msg2 = receiver.recv().unwrap();
337///
338/// assert_eq!(3, msg + msg2);
339/// ```
340pub struct RawSender<T> {
341 queue: Arc<Queue<T>>,
342}
343
344impl<T> RawSender<T> {
345 /// Attempts to send a value on this channel, returning it back if it could
346 /// not be sent.
347 ///
348 /// A successful send occurs when it is determined that the other end of
349 /// the channel has not hung up already. An unsuccessful send would be one
350 /// where the corresponding receiver has already been deallocated. Note
351 /// that a return value of [`Err`] means that the data will never be
352 /// received, but a return value of [`Ok`] does *not* mean that the data
353 /// will be received. It is possible for the corresponding receiver to
354 /// hang up immediately after this function returns [`Ok`].
355 ///
356 /// This method will never block the current thread.
357 ///
358 /// # Examples
359 ///
360 /// ```
361 /// use uchan::channel;
362 ///
363 /// let (tx, rx) = channel();
364 ///
365 /// // This send is always successful
366 /// tx.send(1).unwrap();
367 ///
368 /// // This send will fail because the receiver is gone
369 /// drop(rx);
370 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
371 /// ```
372 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
373 self.queue.send(value).map_err(SendError)
374 }
375}
376
377impl<T> Clone for RawSender<T> {
378 /// Clone a sender to send to other threads.
379 ///
380 /// Note, be aware of the lifetime of the sender because all senders
381 /// (including the original) need to be dropped in order for
382 /// [`RawReceiver::recv`] to stop blocking.
383 fn clone(&self) -> Self {
384 Self {
385 queue: self.queue.clone(),
386 }
387 }
388}
389
390impl<T> fmt::Debug for RawSender<T> {
391 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392 f.debug_struct("Sender").finish_non_exhaustive()
393 }
394}
395
396impl<T> Drop for RawSender<T> {
397 fn drop(&mut self) {
398 if Arc::strong_count(&self.queue) == 2 {
399 let is_sender = true;
400 self.queue.disconnect(is_sender);
401 }
402 }
403}
404
405/// The receiving half of Rust's [`raw_channel`] type.
406/// This half can only be owned by one thread.
407///
408/// Messages sent to the channel can be retrieved using [`recv`].
409///
410/// [`recv`]: RawReceiver::recv
411///
412/// # Examples
413///
414/// ```rust
415/// use uchan::channel;
416/// use std::thread;
417/// use std::time::Duration;
418///
419/// let (send, recv) = channel();
420///
421/// thread::spawn(move || {
422/// send.send("Hello world!").unwrap();
423/// thread::sleep(Duration::from_secs(2)); // block for two seconds
424/// send.send("Delayed for 2 seconds").unwrap();
425/// });
426///
427/// println!("{}", recv.recv().unwrap()); // Received immediately
428/// println!("Waiting...");
429/// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
430/// ```
431pub struct RawReceiver<E, T> {
432 queue: Arc<Queue<T>>,
433 _event: PhantomData<E>,
434}
435
436impl<E, T> RawReceiver<E, T> {
437 /// Attempts to return a pending value on this receiver without blocking.
438 ///
439 /// This method will never block the caller in order to wait for data to
440 /// become available. Instead, this will always return immediately with a
441 /// possible option of pending data on the channel.
442 ///
443 /// This is useful for a flavor of "optimistic check" before deciding to
444 /// block on a receiver.
445 ///
446 /// Compared with [`recv`], this function has two failure cases instead of one
447 /// (one for disconnection, one for an empty buffer).
448 ///
449 /// [`recv`]: Self::recv
450 ///
451 /// # Examples
452 ///
453 /// ```rust
454 /// use uchan::{Receiver, channel};
455 ///
456 /// let (_, receiver): (_, Receiver<i32>) = channel();
457 ///
458 /// assert!(receiver.try_recv().is_err());
459 /// ```
460 pub fn try_recv(&self) -> Result<T, TryRecvError> {
461 match unsafe { self.queue.try_recv() } {
462 Ok(Some(value)) => Ok(value),
463 Ok(None) => Err(TryRecvError::Empty),
464 Err(()) => Err(TryRecvError::Disconnected),
465 }
466 }
467}
468
469impl<E: Event, T> RawReceiver<E, T> {
470 /// Attempts to wait for a value on this receiver, returning an error if the
471 /// corresponding channel has hung up.
472 ///
473 /// This function will always block the current thread if there is no data
474 /// available and it's possible for more data to be sent (at least one sender
475 /// still exists). Once a message is sent to the corresponding [`RawSender`],
476 /// this receiver will wake up and return that message.
477 ///
478 /// If the corresponding [`RawSender`] has disconnected, or it disconnects while
479 /// this call is blocking, this call will wake up and return [`Err`] to
480 /// indicate that no more messages can ever be received on this channel.
481 /// However, since channels are buffered, messages sent before the disconnect
482 /// will still be properly received.
483 ///
484 /// # Examples
485 ///
486 /// ```
487 /// use uchan::channel;
488 /// use std::thread;
489 ///
490 /// let (send, recv) = channel();
491 /// let handle = thread::spawn(move || {
492 /// send.send(1u8).unwrap();
493 /// });
494 ///
495 /// handle.join().unwrap();
496 ///
497 /// assert_eq!(Ok(1), recv.recv());
498 /// ```
499 ///
500 /// Buffering behavior:
501 ///
502 /// ```
503 /// use uchan::{channel, RecvError};
504 /// use std::thread;
505 ///
506 /// let (send, recv) = channel();
507 /// let handle = thread::spawn(move || {
508 /// send.send(1u8).unwrap();
509 /// send.send(2).unwrap();
510 /// send.send(3).unwrap();
511 /// drop(send);
512 /// });
513 ///
514 /// // wait for the thread to join so we ensure the sender is dropped
515 /// handle.join().unwrap();
516 ///
517 /// assert_eq!(Ok(1), recv.recv());
518 /// assert_eq!(Ok(2), recv.recv());
519 /// assert_eq!(Ok(3), recv.recv());
520 /// assert_eq!(Err(RecvError), recv.recv());
521 /// ```
522 pub fn recv(&self) -> Result<T, RecvError> {
523 (unsafe { self.queue.recv::<E>() }).map_err(|_| RecvError)
524 }
525}
526
527impl<E: TimedEvent, T> RawReceiver<E, T> {
528 /// Attempts to wait for a value on this receiver, returning an error if the
529 /// corresponding channel has hung up, or if it waits more than `timeout`.
530 ///
531 /// This function will always block the current thread if there is no data
532 /// available and it's possible for more data to be sent (at least one sender
533 /// still exists). Once a message is sent to the corresponding [`RawSender`]
534 /// this receiver will wake up and return that message.
535 ///
536 /// If the corresponding [`RawSender`] has disconnected, or it disconnects while
537 /// this call is blocking, this call will wake up and return [`Err`] to
538 /// indicate that no more messages can ever be received on this channel.
539 /// However, since channels are buffered, messages sent before the disconnect
540 /// will still be properly received.
541 ///
542 /// # Examples
543 ///
544 /// Successfully receiving value before encountering timeout:
545 ///
546 /// ```no_run
547 /// use std::thread;
548 /// use std::time::Duration;
549 /// use uchan::channel;
550 ///
551 /// let (send, recv) = channel();
552 ///
553 /// thread::spawn(move || {
554 /// send.send('a').unwrap();
555 /// });
556 ///
557 /// assert_eq!(
558 /// recv.recv_timeout(Duration::from_millis(400)),
559 /// Ok('a')
560 /// );
561 /// ```
562 ///
563 /// Receiving an error upon reaching timeout:
564 ///
565 /// ```no_run
566 /// use std::thread;
567 /// use std::time::Duration;
568 /// use uchan::{channel, RecvTimeoutError};
569 ///
570 /// let (send, recv) = channel();
571 ///
572 /// thread::spawn(move || {
573 /// thread::sleep(Duration::from_millis(800));
574 /// send.send('a').unwrap();
575 /// });
576 ///
577 /// assert_eq!(
578 /// recv.recv_timeout(Duration::from_millis(400)),
579 /// Err(RecvTimeoutError::Timeout)
580 /// );
581 /// ```
582 pub fn recv_timeout(&self, timeout: E::Duration) -> Result<T, RecvTimeoutError> {
583 match unsafe { self.queue.recv_timeout::<E>(timeout) } {
584 Ok(Some(value)) => Ok(value),
585 Ok(None) => Err(RecvTimeoutError::Timeout),
586 Err(()) => Err(RecvTimeoutError::Disconnected),
587 }
588 }
589}
590
591impl<E, T> fmt::Debug for RawReceiver<E, T> {
592 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
593 f.debug_struct("Receiver").finish_non_exhaustive()
594 }
595}
596
597impl<E, T> Drop for RawReceiver<E, T> {
598 fn drop(&mut self) {
599 let is_sender = false;
600 self.queue.disconnect(is_sender);
601 }
602}