tachyonix/lib.rs
1//! A very fast asynchronous, multi-producer, single-consumer (MPSC) bounded
2//! channel.
3//!
4//! This is a no-frills `async` bounded MPSC channel which only claim to fame is
5//! to be extremely fast, without taking any shortcuts on correctness
6//! and implementation quality.
7//!
8//! # Disconnection
9//!
10//! The channel is disconnected automatically once all [`Sender`]s are dropped
11//! or once the [`Receiver`] is dropped. It can also be disconnected manually by
12//! any `Sender` or `Receiver` handle.
13//!
14//! Disconnection is signaled by the `Result` of the sending or receiving
15//! operations. Once a channel is disconnected, all attempts to send a message
16//! will return an error. However, the receiver will still be able to receive
17//! messages already in the channel and will only get a disconnection error once
18//! all messages have been received.
19//!
20//! # Example
21//!
22//! ```
23//! use tachyonix;
24//! use futures_executor::{block_on, ThreadPool};
25//!
26//! let pool = ThreadPool::new().unwrap();
27//!
28//! let (s, mut r) = tachyonix::channel(3);
29//!
30//! block_on( async move {
31//! pool.spawn_ok( async move {
32//! assert_eq!(s.send("Hello").await, Ok(()));
33//! });
34//!
35//! assert_eq!(r.recv().await, Ok("Hello"));
36//! });
37//! # std::thread::sleep(std::time::Duration::from_millis(100)); // MIRI bug workaround
38//! ```
39//!
40#![warn(missing_docs, missing_debug_implementations, unreachable_pub)]
41
42mod loom_exports;
43mod queue;
44
45use std::error;
46use std::fmt;
47use std::future::Future;
48use std::pin::Pin;
49use std::sync::atomic::{self, AtomicUsize, Ordering};
50use std::sync::Arc;
51use std::task::Context;
52use std::task::Poll;
53
54use async_event::Event;
55use diatomic_waker::primitives::DiatomicWaker;
56use futures_core::Stream;
57use pin_project_lite::pin_project;
58
59use crate::queue::{PopError, PushError, Queue};
60
61/// Shared channel data.
62struct Inner<T> {
63 /// Non-blocking internal queue.
64 queue: Queue<T>,
65 /// Signalling primitive used to notify the receiver.
66 receiver_signal: DiatomicWaker,
67 /// Signalling primitive used to notify one or several senders.
68 sender_signal: Event,
69 /// Current count of live senders.
70 sender_count: AtomicUsize,
71}
72
73impl<T> Inner<T> {
74 fn new(capacity: usize, sender_count: usize) -> Self {
75 Self {
76 queue: Queue::new(capacity),
77 receiver_signal: DiatomicWaker::new(),
78 sender_signal: Event::new(),
79 sender_count: AtomicUsize::new(sender_count),
80 }
81 }
82}
83
84/// The sending side of a channel.
85///
86/// Multiple [`Sender`]s can be created via cloning.
87pub struct Sender<T> {
88 /// Shared data.
89 inner: Arc<Inner<T>>,
90}
91
92impl<T> Sender<T> {
93 /// Attempts to send a message immediately.
94 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
95 match self.inner.queue.push(message) {
96 Ok(()) => {
97 self.inner.receiver_signal.notify();
98 Ok(())
99 }
100 Err(PushError::Full(v)) => Err(TrySendError::Full(v)),
101 Err(PushError::Closed(v)) => Err(TrySendError::Closed(v)),
102 }
103 }
104
105 /// Sends a message asynchronously, if necessary waiting until enough
106 /// capacity becomes available.
107 pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
108 let mut message = Some(message);
109
110 self.inner
111 .sender_signal
112 .wait_until(|| {
113 match self.inner.queue.push(message.take().unwrap()) {
114 Ok(()) => Some(()),
115 Err(PushError::Full(m)) => {
116 // Recycle the message.
117 message = Some(m);
118
119 None
120 }
121 Err(PushError::Closed(m)) => {
122 // Keep the message so it can be returned in the error
123 // field.
124 message = Some(m);
125
126 Some(())
127 }
128 }
129 })
130 .await;
131
132 match message {
133 Some(m) => Err(SendError(m)),
134 None => {
135 self.inner.receiver_signal.notify();
136
137 Ok(())
138 }
139 }
140 }
141
142 /// Sends a message asynchronously, if necessary waiting until enough
143 /// capacity becomes available or until the deadline elapses.
144 ///
145 /// The deadline is specified as a `Future` that is expected to resolves to
146 /// `()` after some duration, such as a `tokio::time::Sleep` future.
147 pub async fn send_timeout<'a, D>(
148 &'a self,
149 message: T,
150 deadline: D,
151 ) -> Result<(), SendTimeoutError<T>>
152 where
153 D: Future<Output = ()> + 'a,
154 {
155 let mut message = Some(message);
156
157 let res = self
158 .inner
159 .sender_signal
160 .wait_until_or_timeout(
161 || {
162 match self.inner.queue.push(message.take().unwrap()) {
163 Ok(()) => Some(()),
164 Err(PushError::Full(m)) => {
165 // Recycle the message.
166 message = Some(m);
167
168 None
169 }
170 Err(PushError::Closed(m)) => {
171 // Keep the message so it can be returned in the error
172 // field.
173 message = Some(m);
174
175 Some(())
176 }
177 }
178 },
179 deadline,
180 )
181 .await;
182
183 match (message, res) {
184 (Some(m), Some(())) => Err(SendTimeoutError::Closed(m)),
185 (Some(m), None) => Err(SendTimeoutError::Timeout(m)),
186 _ => {
187 self.inner.receiver_signal.notify();
188
189 Ok(())
190 }
191 }
192 }
193
194 /// Closes the queue.
195 ///
196 /// This prevents any further messages from being sent on the channel.
197 /// Messages that were already sent can still be received.
198 pub fn close(&self) {
199 self.inner.queue.close();
200
201 // Notify the receiver and all blocked senders that the channel is
202 // closed.
203 self.inner.receiver_signal.notify();
204 self.inner.sender_signal.notify_all();
205 }
206
207 /// Checks if the channel is closed.
208 ///
209 /// This can happen either because the [`Receiver`] was dropped or because
210 /// one of the [`Sender::close`] or [`Receiver::close`] method was called.
211 pub fn is_closed(&self) -> bool {
212 self.inner.queue.is_closed()
213 }
214}
215
216impl<T> Clone for Sender<T> {
217 fn clone(&self) -> Self {
218 // Increase the sender reference count.
219 //
220 // Ordering: Relaxed ordering is sufficient here for the same reason it
221 // is sufficient for an `Arc` reference count increment: synchronization
222 // is only necessary when decrementing the counter since all what is
223 // needed is to ensure that all operations until the drop handler is
224 // called are visible once the reference count drops to 0.
225 self.inner.sender_count.fetch_add(1, Ordering::Relaxed);
226
227 Self {
228 inner: self.inner.clone(),
229 }
230 }
231}
232
233impl<T> Drop for Sender<T> {
234 fn drop(&mut self) {
235 // Decrease the sender reference count.
236 //
237 // Ordering: Release ordering is necessary for the same reason it is
238 // necessary for an `Arc` reference count decrement: it ensures that all
239 // operations performed by this sender before it was dropped will be
240 // visible once the sender count drops to 0.
241 if self.inner.sender_count.fetch_sub(1, Ordering::Release) == 1
242 && !self.inner.queue.is_closed()
243 {
244 // Make sure that the notified receiver sees all operations
245 // performed by all dropped senders.
246 //
247 // Ordering: Acquire is necessary to synchronize with the Release
248 // decrement operations. Note that the fence synchronizes with _all_
249 // decrement operations since the chain of counter decrements forms
250 // a Release sequence.
251 atomic::fence(Ordering::Acquire);
252
253 self.inner.queue.close();
254
255 // Notify the receiver that the channel is closed.
256 self.inner.receiver_signal.notify();
257 }
258 }
259}
260
261impl<T> fmt::Debug for Sender<T> {
262 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263 f.debug_struct("Sender").finish_non_exhaustive()
264 }
265}
266
267/// The receiving side of a channel.
268///
269/// The receiver can only be called from a single thread.
270pub struct Receiver<T> {
271 /// Shared data.
272 inner: Arc<Inner<T>>,
273}
274
275impl<T> Receiver<T> {
276 /// Attempts to receive a message immediately.
277 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
278 // Safety: `Queue::pop` cannot be used concurrently from multiple
279 // threads since `Receiver` does not implement `Clone` and requires
280 // exclusive ownership.
281 match unsafe { self.inner.queue.pop() } {
282 Ok(message) => {
283 self.inner.sender_signal.notify_one();
284 Ok(message)
285 }
286 Err(PopError::Empty) => Err(TryRecvError::Empty),
287 Err(PopError::Closed) => Err(TryRecvError::Closed),
288 }
289 }
290
291 /// Receives a message asynchronously, if necessary waiting until one
292 /// becomes available.
293 pub async fn recv(&mut self) -> Result<T, RecvError> {
294 // We could of course return the future directly from a plain method,
295 // but the `async` signature makes the intent more explicit.
296 RecvFuture { receiver: self }.await
297 }
298
299 /// Receives a message asynchronously, if necessary waiting until one
300 /// becomes available or until the deadline elapses.
301 ///
302 /// The deadline is specified as a `Future` that is expected to resolves to
303 /// `()` after some duration, such as a `tokio::time::Sleep` future.
304 pub async fn recv_timeout<D>(&mut self, deadline: D) -> Result<T, RecvTimeoutError>
305 where
306 D: Future<Output = ()>,
307 {
308 // We could of course return the future directly from a plain method,
309 // but the `async` signature makes the intent more explicit.
310 RecvTimeoutFuture {
311 receiver: self,
312 deadline,
313 }
314 .await
315 }
316
317 /// Closes the queue.
318 ///
319 /// This prevents any further messages from being sent on the channel.
320 /// Messages that were already sent can still be received, however, which is
321 /// why a call to this method should typically be followed by a loop
322 /// receiving all remaining messages.
323 ///
324 /// For this reason, no counterpart to [`Sender::is_closed`] is exposed by
325 /// the receiver as such method could easily be misused and lead to lost
326 /// messages. Instead, messages should be received until a [`RecvError`],
327 /// [`RecvTimeoutError::Closed`] or [`TryRecvError::Closed`] error is
328 /// returned.
329 pub fn close(&self) {
330 if !self.inner.queue.is_closed() {
331 self.inner.queue.close();
332
333 // Notify all blocked senders that the channel is closed.
334 self.inner.sender_signal.notify_all();
335 }
336 }
337}
338
339impl<T> Drop for Receiver<T> {
340 fn drop(&mut self) {
341 self.inner.queue.close();
342
343 // Notify all blocked senders that the channel is closed.
344 self.inner.sender_signal.notify_all();
345 }
346}
347
348impl<T> fmt::Debug for Receiver<T> {
349 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
350 f.debug_struct("Receiver").finish_non_exhaustive()
351 }
352}
353
354impl<T> Stream for Receiver<T> {
355 type Item = T;
356
357 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
358 // Safety: `Queue::pop`, `DiatomicWaker::register` and
359 // `DiatomicWaker::unregister` cannot be used concurrently from multiple
360 // threads since `Receiver` does not implement `Clone` and requires
361 // exclusive ownership.
362 unsafe {
363 // Happy path: try to pop a message without registering the waker.
364 match self.inner.queue.pop() {
365 Ok(message) => {
366 // Signal to one awaiting sender that one slot was freed.
367 self.inner.sender_signal.notify_one();
368
369 return Poll::Ready(Some(message));
370 }
371 Err(PopError::Closed) => {
372 return Poll::Ready(None);
373 }
374 Err(PopError::Empty) => {}
375 }
376
377 // Slow path: we must register the waker to be notified when the
378 // queue is populated again. It is thereafter necessary to check
379 // again the predicate in case we raced with a sender.
380 self.inner.receiver_signal.register(cx.waker());
381
382 match self.inner.queue.pop() {
383 Ok(message) => {
384 // Cancel the request for notification.
385 self.inner.receiver_signal.unregister();
386
387 // Signal to one awaiting sender that one slot was freed.
388 self.inner.sender_signal.notify_one();
389
390 Poll::Ready(Some(message))
391 }
392 Err(PopError::Closed) => {
393 // Cancel the request for notification.
394 self.inner.receiver_signal.unregister();
395
396 Poll::Ready(None)
397 }
398 Err(PopError::Empty) => Poll::Pending,
399 }
400 }
401 }
402}
403
404/// The future returned by the `Receiver::recv` method.
405///
406/// This is just a thin wrapper over the `Stream::poll_next` implementation.
407struct RecvFuture<'a, T> {
408 receiver: &'a mut Receiver<T>,
409}
410
411impl<'a, T> Future for RecvFuture<'a, T> {
412 type Output = Result<T, RecvError>;
413
414 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
415 match Pin::new(&mut self.receiver).poll_next(cx) {
416 Poll::Ready(Some(v)) => Poll::Ready(Ok(v)),
417 Poll::Ready(None) => Poll::Ready(Err(RecvError)),
418 Poll::Pending => Poll::Pending,
419 }
420 }
421}
422
423pin_project! {
424 /// The future returned by the `Receiver::recv_timeout` method.
425 ///
426 /// This is just a thin wrapper over the `Stream::poll_next` implementation
427 /// which abandons if the deadline elapses.
428 struct RecvTimeoutFuture<'a, T, D> where D: Future<Output=()> {
429 receiver: &'a mut Receiver<T>,
430 #[pin]
431 deadline: D,
432 }
433}
434
435impl<'a, T, D> Future for RecvTimeoutFuture<'a, T, D>
436where
437 D: Future<Output = ()>,
438{
439 type Output = Result<T, RecvTimeoutError>;
440
441 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
442 let this = self.project();
443 let receiver = this.receiver;
444 let deadline = this.deadline;
445
446 match Pin::new(receiver).poll_next(cx) {
447 Poll::Ready(Some(v)) => Poll::Ready(Ok(v)),
448 Poll::Ready(None) => Poll::Ready(Err(RecvTimeoutError::Closed)),
449 Poll::Pending => match deadline.poll(cx) {
450 Poll::Pending => Poll::Pending,
451 Poll::Ready(()) => Poll::Ready(Err(RecvTimeoutError::Timeout)),
452 },
453 }
454 }
455}
456
457/// Creates a new channel, returning the sending and receiving sides.
458///
459/// # Panic
460///
461/// The function will panic if the requested capacity is 0 or if it is greater
462/// than `usize::MAX/2 + 1`.
463pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
464 let inner = Arc::new(Inner::new(capacity, 1));
465
466 let sender = Sender {
467 inner: inner.clone(),
468 };
469 let receiver = Receiver { inner };
470
471 (sender, receiver)
472}
473
474/// An error returned when an attempt to send a message synchronously is
475/// unsuccessful.
476#[derive(Clone, Copy, Debug, Eq, PartialEq)]
477pub enum TrySendError<T> {
478 /// The queue is full.
479 Full(T),
480 /// The receiver has been dropped.
481 Closed(T),
482}
483
484impl<T: fmt::Debug> error::Error for TrySendError<T> {}
485
486impl<T> fmt::Display for TrySendError<T> {
487 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
488 match self {
489 TrySendError::Full(_) => "sending into a full channel".fmt(f),
490 TrySendError::Closed(_) => "sending into a closed channel".fmt(f),
491 }
492 }
493}
494
495/// An error returned when an attempt to send a message asynchronously is
496/// unsuccessful.
497#[derive(Clone, Copy, Eq, PartialEq)]
498pub struct SendError<T>(pub T);
499
500impl<T> error::Error for SendError<T> {}
501
502impl<T> fmt::Debug for SendError<T> {
503 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
504 f.debug_struct("SendError").finish_non_exhaustive()
505 }
506}
507
508impl<T> fmt::Display for SendError<T> {
509 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
510 "sending into a closed channel".fmt(f)
511 }
512}
513
514/// An error returned when an attempt to send a message asynchronously with a
515/// deadline is unsuccessful.
516#[derive(Clone, Copy, Debug, Eq, PartialEq)]
517pub enum SendTimeoutError<T> {
518 /// The deadline has elapsed.
519 Timeout(T),
520 /// The channel has been closed.
521 Closed(T),
522}
523
524impl<T: fmt::Debug> error::Error for SendTimeoutError<T> {}
525
526impl<T> fmt::Display for SendTimeoutError<T> {
527 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
528 match self {
529 SendTimeoutError::Timeout(_) => "the deadline for sending has elapsed".fmt(f),
530 SendTimeoutError::Closed(_) => "sending into a closed channel".fmt(f),
531 }
532 }
533}
534
535/// An error returned when an attempt to receive a message synchronously is
536/// unsuccessful.
537#[derive(Clone, Copy, Debug, Eq, PartialEq)]
538pub enum TryRecvError {
539 /// The queue is empty.
540 Empty,
541 /// All senders have been dropped.
542 Closed,
543}
544
545impl error::Error for TryRecvError {}
546
547impl fmt::Display for TryRecvError {
548 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
549 match self {
550 TryRecvError::Empty => "receiving from an empty channel".fmt(f),
551 TryRecvError::Closed => "receiving from a closed channel".fmt(f),
552 }
553 }
554}
555
556/// An error returned when an attempt to receive a message asynchronously is
557/// unsuccessful.
558#[derive(Clone, Copy, Debug, Eq, PartialEq)]
559pub struct RecvError;
560
561impl error::Error for RecvError {}
562
563impl fmt::Display for RecvError {
564 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
565 "receiving from a closed channel".fmt(f)
566 }
567}
568
569/// An error returned when an attempt to receive a message asynchronously with a
570/// deadline is unsuccessful.
571#[derive(Clone, Copy, Debug, Eq, PartialEq)]
572pub enum RecvTimeoutError {
573 /// The deadline has elapsed.
574 Timeout,
575 /// All senders have been dropped.
576 Closed,
577}
578
579impl error::Error for RecvTimeoutError {}
580
581impl fmt::Display for RecvTimeoutError {
582 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
583 match self {
584 RecvTimeoutError::Timeout => "the deadline for receiving has elapsed".fmt(f),
585 RecvTimeoutError::Closed => "receiving from a closed channel".fmt(f),
586 }
587 }
588}