tachyonix/
lib.rs

1//! A very fast asynchronous, multi-producer, single-consumer (MPSC) bounded
2//! channel.
3//!
4//! This is a no-frills `async` bounded MPSC channel which only claim to fame is
5//! to be extremely fast, without taking any shortcuts on correctness
6//! and implementation quality.
7//!
8//! # Disconnection
9//!
10//! The channel is disconnected automatically once all [`Sender`]s are dropped
11//! or once the [`Receiver`] is dropped. It can also be disconnected manually by
12//! any `Sender` or `Receiver` handle.
13//!
14//! Disconnection is signaled by the `Result` of the sending or receiving
15//! operations. Once a channel is disconnected, all attempts to send a message
16//! will return an error. However, the receiver will still be able to receive
17//! messages already in the channel and will only get a disconnection error once
18//! all messages have been received.
19//!
20//! # Example
21//!
22//! ```
23//! use tachyonix;
24//! use futures_executor::{block_on, ThreadPool};
25//!
26//! let pool = ThreadPool::new().unwrap();
27//!
28//! let (s, mut r) = tachyonix::channel(3);
29//!
30//! block_on( async move {
31//!     pool.spawn_ok( async move {
32//!         assert_eq!(s.send("Hello").await, Ok(()));
33//!     });
34//!     
35//!     assert_eq!(r.recv().await, Ok("Hello"));
36//! });
37//! # std::thread::sleep(std::time::Duration::from_millis(100)); // MIRI bug workaround
38//! ```
39//!
40#![warn(missing_docs, missing_debug_implementations, unreachable_pub)]
41
42mod loom_exports;
43mod queue;
44
45use std::error;
46use std::fmt;
47use std::future::Future;
48use std::pin::Pin;
49use std::sync::atomic::{self, AtomicUsize, Ordering};
50use std::sync::Arc;
51use std::task::Context;
52use std::task::Poll;
53
54use async_event::Event;
55use diatomic_waker::primitives::DiatomicWaker;
56use futures_core::Stream;
57use pin_project_lite::pin_project;
58
59use crate::queue::{PopError, PushError, Queue};
60
61/// Shared channel data.
62struct Inner<T> {
63    /// Non-blocking internal queue.
64    queue: Queue<T>,
65    /// Signalling primitive used to notify the receiver.
66    receiver_signal: DiatomicWaker,
67    /// Signalling primitive used to notify one or several senders.
68    sender_signal: Event,
69    /// Current count of live senders.
70    sender_count: AtomicUsize,
71}
72
73impl<T> Inner<T> {
74    fn new(capacity: usize, sender_count: usize) -> Self {
75        Self {
76            queue: Queue::new(capacity),
77            receiver_signal: DiatomicWaker::new(),
78            sender_signal: Event::new(),
79            sender_count: AtomicUsize::new(sender_count),
80        }
81    }
82}
83
84/// The sending side of a channel.
85///
86/// Multiple [`Sender`]s can be created via cloning.
87pub struct Sender<T> {
88    /// Shared data.
89    inner: Arc<Inner<T>>,
90}
91
92impl<T> Sender<T> {
93    /// Attempts to send a message immediately.
94    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
95        match self.inner.queue.push(message) {
96            Ok(()) => {
97                self.inner.receiver_signal.notify();
98                Ok(())
99            }
100            Err(PushError::Full(v)) => Err(TrySendError::Full(v)),
101            Err(PushError::Closed(v)) => Err(TrySendError::Closed(v)),
102        }
103    }
104
105    /// Sends a message asynchronously, if necessary waiting until enough
106    /// capacity becomes available.
107    pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
108        let mut message = Some(message);
109
110        self.inner
111            .sender_signal
112            .wait_until(|| {
113                match self.inner.queue.push(message.take().unwrap()) {
114                    Ok(()) => Some(()),
115                    Err(PushError::Full(m)) => {
116                        // Recycle the message.
117                        message = Some(m);
118
119                        None
120                    }
121                    Err(PushError::Closed(m)) => {
122                        // Keep the message so it can be returned in the error
123                        // field.
124                        message = Some(m);
125
126                        Some(())
127                    }
128                }
129            })
130            .await;
131
132        match message {
133            Some(m) => Err(SendError(m)),
134            None => {
135                self.inner.receiver_signal.notify();
136
137                Ok(())
138            }
139        }
140    }
141
142    /// Sends a message asynchronously, if necessary waiting until enough
143    /// capacity becomes available or until the deadline elapses.
144    ///
145    /// The deadline is specified as a `Future` that is expected to resolves to
146    /// `()` after some duration, such as a `tokio::time::Sleep` future.
147    pub async fn send_timeout<'a, D>(
148        &'a self,
149        message: T,
150        deadline: D,
151    ) -> Result<(), SendTimeoutError<T>>
152    where
153        D: Future<Output = ()> + 'a,
154    {
155        let mut message = Some(message);
156
157        let res = self
158            .inner
159            .sender_signal
160            .wait_until_or_timeout(
161                || {
162                    match self.inner.queue.push(message.take().unwrap()) {
163                        Ok(()) => Some(()),
164                        Err(PushError::Full(m)) => {
165                            // Recycle the message.
166                            message = Some(m);
167
168                            None
169                        }
170                        Err(PushError::Closed(m)) => {
171                            // Keep the message so it can be returned in the error
172                            // field.
173                            message = Some(m);
174
175                            Some(())
176                        }
177                    }
178                },
179                deadline,
180            )
181            .await;
182
183        match (message, res) {
184            (Some(m), Some(())) => Err(SendTimeoutError::Closed(m)),
185            (Some(m), None) => Err(SendTimeoutError::Timeout(m)),
186            _ => {
187                self.inner.receiver_signal.notify();
188
189                Ok(())
190            }
191        }
192    }
193
194    /// Closes the queue.
195    ///
196    /// This prevents any further messages from being sent on the channel.
197    /// Messages that were already sent can still be received.
198    pub fn close(&self) {
199        self.inner.queue.close();
200
201        // Notify the receiver and all blocked senders that the channel is
202        // closed.
203        self.inner.receiver_signal.notify();
204        self.inner.sender_signal.notify_all();
205    }
206
207    /// Checks if the channel is closed.
208    ///
209    /// This can happen either because the [`Receiver`] was dropped or because
210    /// one of the [`Sender::close`] or [`Receiver::close`] method was called.
211    pub fn is_closed(&self) -> bool {
212        self.inner.queue.is_closed()
213    }
214}
215
216impl<T> Clone for Sender<T> {
217    fn clone(&self) -> Self {
218        // Increase the sender reference count.
219        //
220        // Ordering: Relaxed ordering is sufficient here for the same reason it
221        // is sufficient for an `Arc` reference count increment: synchronization
222        // is only necessary when decrementing the counter since all what is
223        // needed is to ensure that all operations until the drop handler is
224        // called are visible once the reference count drops to 0.
225        self.inner.sender_count.fetch_add(1, Ordering::Relaxed);
226
227        Self {
228            inner: self.inner.clone(),
229        }
230    }
231}
232
233impl<T> Drop for Sender<T> {
234    fn drop(&mut self) {
235        // Decrease the sender reference count.
236        //
237        // Ordering: Release ordering is necessary for the same reason it is
238        // necessary for an `Arc` reference count decrement: it ensures that all
239        // operations performed by this sender before it was dropped will be
240        // visible once the sender count drops to 0.
241        if self.inner.sender_count.fetch_sub(1, Ordering::Release) == 1
242            && !self.inner.queue.is_closed()
243        {
244            // Make sure that the notified receiver sees all operations
245            // performed by all dropped senders.
246            //
247            // Ordering: Acquire is necessary to synchronize with the Release
248            // decrement operations. Note that the fence synchronizes with _all_
249            // decrement operations since the chain of counter decrements forms
250            // a Release sequence.
251            atomic::fence(Ordering::Acquire);
252
253            self.inner.queue.close();
254
255            // Notify the receiver that the channel is closed.
256            self.inner.receiver_signal.notify();
257        }
258    }
259}
260
261impl<T> fmt::Debug for Sender<T> {
262    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263        f.debug_struct("Sender").finish_non_exhaustive()
264    }
265}
266
267/// The receiving side of a channel.
268///
269/// The receiver can only be called from a single thread.
270pub struct Receiver<T> {
271    /// Shared data.
272    inner: Arc<Inner<T>>,
273}
274
275impl<T> Receiver<T> {
276    /// Attempts to receive a message immediately.
277    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
278        // Safety: `Queue::pop` cannot be used concurrently from multiple
279        // threads since `Receiver` does not implement `Clone` and requires
280        // exclusive ownership.
281        match unsafe { self.inner.queue.pop() } {
282            Ok(message) => {
283                self.inner.sender_signal.notify_one();
284                Ok(message)
285            }
286            Err(PopError::Empty) => Err(TryRecvError::Empty),
287            Err(PopError::Closed) => Err(TryRecvError::Closed),
288        }
289    }
290
291    /// Receives a message asynchronously, if necessary waiting until one
292    /// becomes available.
293    pub async fn recv(&mut self) -> Result<T, RecvError> {
294        // We could of course return the future directly from a plain method,
295        // but the `async` signature makes the intent more explicit.
296        RecvFuture { receiver: self }.await
297    }
298
299    /// Receives a message asynchronously, if necessary waiting until one
300    /// becomes available or until the deadline elapses.
301    ///
302    /// The deadline is specified as a `Future` that is expected to resolves to
303    /// `()` after some duration, such as a `tokio::time::Sleep` future.
304    pub async fn recv_timeout<D>(&mut self, deadline: D) -> Result<T, RecvTimeoutError>
305    where
306        D: Future<Output = ()>,
307    {
308        // We could of course return the future directly from a plain method,
309        // but the `async` signature makes the intent more explicit.
310        RecvTimeoutFuture {
311            receiver: self,
312            deadline,
313        }
314        .await
315    }
316
317    /// Closes the queue.
318    ///
319    /// This prevents any further messages from being sent on the channel.
320    /// Messages that were already sent can still be received, however, which is
321    /// why a call to this method should typically be followed by a loop
322    /// receiving all remaining messages.
323    ///
324    /// For this reason, no counterpart to [`Sender::is_closed`] is exposed by
325    /// the receiver as such method could easily be misused and lead to lost
326    /// messages. Instead, messages should be received until a [`RecvError`],
327    /// [`RecvTimeoutError::Closed`] or [`TryRecvError::Closed`] error is
328    /// returned.
329    pub fn close(&self) {
330        if !self.inner.queue.is_closed() {
331            self.inner.queue.close();
332
333            // Notify all blocked senders that the channel is closed.
334            self.inner.sender_signal.notify_all();
335        }
336    }
337}
338
339impl<T> Drop for Receiver<T> {
340    fn drop(&mut self) {
341        self.inner.queue.close();
342
343        // Notify all blocked senders that the channel is closed.
344        self.inner.sender_signal.notify_all();
345    }
346}
347
348impl<T> fmt::Debug for Receiver<T> {
349    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
350        f.debug_struct("Receiver").finish_non_exhaustive()
351    }
352}
353
354impl<T> Stream for Receiver<T> {
355    type Item = T;
356
357    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
358        // Safety: `Queue::pop`, `DiatomicWaker::register` and
359        // `DiatomicWaker::unregister` cannot be used concurrently from multiple
360        // threads since `Receiver` does not implement `Clone` and requires
361        // exclusive ownership.
362        unsafe {
363            // Happy path: try to pop a message without registering the waker.
364            match self.inner.queue.pop() {
365                Ok(message) => {
366                    // Signal to one awaiting sender that one slot was freed.
367                    self.inner.sender_signal.notify_one();
368
369                    return Poll::Ready(Some(message));
370                }
371                Err(PopError::Closed) => {
372                    return Poll::Ready(None);
373                }
374                Err(PopError::Empty) => {}
375            }
376
377            // Slow path: we must register the waker to be notified when the
378            // queue is populated again. It is thereafter necessary to check
379            // again the predicate in case we raced with a sender.
380            self.inner.receiver_signal.register(cx.waker());
381
382            match self.inner.queue.pop() {
383                Ok(message) => {
384                    // Cancel the request for notification.
385                    self.inner.receiver_signal.unregister();
386
387                    // Signal to one awaiting sender that one slot was freed.
388                    self.inner.sender_signal.notify_one();
389
390                    Poll::Ready(Some(message))
391                }
392                Err(PopError::Closed) => {
393                    // Cancel the request for notification.
394                    self.inner.receiver_signal.unregister();
395
396                    Poll::Ready(None)
397                }
398                Err(PopError::Empty) => Poll::Pending,
399            }
400        }
401    }
402}
403
404/// The future returned by the `Receiver::recv` method.
405///
406/// This is just a thin wrapper over the `Stream::poll_next` implementation.
407struct RecvFuture<'a, T> {
408    receiver: &'a mut Receiver<T>,
409}
410
411impl<'a, T> Future for RecvFuture<'a, T> {
412    type Output = Result<T, RecvError>;
413
414    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
415        match Pin::new(&mut self.receiver).poll_next(cx) {
416            Poll::Ready(Some(v)) => Poll::Ready(Ok(v)),
417            Poll::Ready(None) => Poll::Ready(Err(RecvError)),
418            Poll::Pending => Poll::Pending,
419        }
420    }
421}
422
423pin_project! {
424    /// The future returned by the `Receiver::recv_timeout` method.
425    ///
426    /// This is just a thin wrapper over the `Stream::poll_next` implementation
427    /// which abandons if the deadline elapses.
428    struct RecvTimeoutFuture<'a, T, D> where D: Future<Output=()> {
429        receiver: &'a mut Receiver<T>,
430        #[pin]
431        deadline: D,
432    }
433}
434
435impl<'a, T, D> Future for RecvTimeoutFuture<'a, T, D>
436where
437    D: Future<Output = ()>,
438{
439    type Output = Result<T, RecvTimeoutError>;
440
441    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
442        let this = self.project();
443        let receiver = this.receiver;
444        let deadline = this.deadline;
445
446        match Pin::new(receiver).poll_next(cx) {
447            Poll::Ready(Some(v)) => Poll::Ready(Ok(v)),
448            Poll::Ready(None) => Poll::Ready(Err(RecvTimeoutError::Closed)),
449            Poll::Pending => match deadline.poll(cx) {
450                Poll::Pending => Poll::Pending,
451                Poll::Ready(()) => Poll::Ready(Err(RecvTimeoutError::Timeout)),
452            },
453        }
454    }
455}
456
457/// Creates a new channel, returning the sending and receiving sides.
458///
459/// # Panic
460///
461/// The function will panic if the requested capacity is 0 or if it is greater
462/// than `usize::MAX/2 + 1`.
463pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
464    let inner = Arc::new(Inner::new(capacity, 1));
465
466    let sender = Sender {
467        inner: inner.clone(),
468    };
469    let receiver = Receiver { inner };
470
471    (sender, receiver)
472}
473
474/// An error returned when an attempt to send a message synchronously is
475/// unsuccessful.
476#[derive(Clone, Copy, Debug, Eq, PartialEq)]
477pub enum TrySendError<T> {
478    /// The queue is full.
479    Full(T),
480    /// The receiver has been dropped.
481    Closed(T),
482}
483
484impl<T: fmt::Debug> error::Error for TrySendError<T> {}
485
486impl<T> fmt::Display for TrySendError<T> {
487    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
488        match self {
489            TrySendError::Full(_) => "sending into a full channel".fmt(f),
490            TrySendError::Closed(_) => "sending into a closed channel".fmt(f),
491        }
492    }
493}
494
495/// An error returned when an attempt to send a message asynchronously is
496/// unsuccessful.
497#[derive(Clone, Copy, Eq, PartialEq)]
498pub struct SendError<T>(pub T);
499
500impl<T> error::Error for SendError<T> {}
501
502impl<T> fmt::Debug for SendError<T> {
503    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
504        f.debug_struct("SendError").finish_non_exhaustive()
505    }
506}
507
508impl<T> fmt::Display for SendError<T> {
509    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
510        "sending into a closed channel".fmt(f)
511    }
512}
513
514/// An error returned when an attempt to send a message asynchronously with a
515/// deadline is unsuccessful.
516#[derive(Clone, Copy, Debug, Eq, PartialEq)]
517pub enum SendTimeoutError<T> {
518    /// The deadline has elapsed.
519    Timeout(T),
520    /// The channel has been closed.
521    Closed(T),
522}
523
524impl<T: fmt::Debug> error::Error for SendTimeoutError<T> {}
525
526impl<T> fmt::Display for SendTimeoutError<T> {
527    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
528        match self {
529            SendTimeoutError::Timeout(_) => "the deadline for sending has elapsed".fmt(f),
530            SendTimeoutError::Closed(_) => "sending into a closed channel".fmt(f),
531        }
532    }
533}
534
535/// An error returned when an attempt to receive a message synchronously is
536/// unsuccessful.
537#[derive(Clone, Copy, Debug, Eq, PartialEq)]
538pub enum TryRecvError {
539    /// The queue is empty.
540    Empty,
541    /// All senders have been dropped.
542    Closed,
543}
544
545impl error::Error for TryRecvError {}
546
547impl fmt::Display for TryRecvError {
548    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
549        match self {
550            TryRecvError::Empty => "receiving from an empty channel".fmt(f),
551            TryRecvError::Closed => "receiving from a closed channel".fmt(f),
552        }
553    }
554}
555
556/// An error returned when an attempt to receive a message asynchronously is
557/// unsuccessful.
558#[derive(Clone, Copy, Debug, Eq, PartialEq)]
559pub struct RecvError;
560
561impl error::Error for RecvError {}
562
563impl fmt::Display for RecvError {
564    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
565        "receiving from a closed channel".fmt(f)
566    }
567}
568
569/// An error returned when an attempt to receive a message asynchronously with a
570/// deadline is unsuccessful.
571#[derive(Clone, Copy, Debug, Eq, PartialEq)]
572pub enum RecvTimeoutError {
573    /// The deadline has elapsed.
574    Timeout,
575    /// All senders have been dropped.
576    Closed,
577}
578
579impl error::Error for RecvTimeoutError {}
580
581impl fmt::Display for RecvTimeoutError {
582    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
583        match self {
584            RecvTimeoutError::Timeout => "the deadline for receiving has elapsed".fmt(f),
585            RecvTimeoutError::Closed => "receiving from a closed channel".fmt(f),
586        }
587    }
588}