nonblocking_channel/
lib.rs

1use std::{
2    mem::MaybeUninit,
3    num::NonZeroUsize,
4    sync::{
5        atomic::{AtomicBool, Ordering},
6        Arc, Mutex,
7    },
8};
9
10use ringbuf::{Consumer, HeapRb, Producer, SharedRb};
11
12/// The result of trying to send a message.
13#[must_use]
14#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
15pub enum SendResult<T> {
16    Ok,
17    Full(T),
18    Disconnected,
19}
20
21impl<T> SendResult<T> {
22    /// Checks if the receiving end of the channel had been dropped when a message was sent.
23    pub fn is_disconnected(&self) -> bool {
24        matches!(self, Self::Disconnected)
25    }
26
27    /// Checks if the message was sent.
28    pub fn is_ok(&self) -> bool {
29        matches!(self, Self::Ok)
30    }
31}
32impl<T: std::fmt::Debug> SendResult<T> {
33    pub fn unwrap(self) {
34        match self {
35            SendResult::Ok => return,
36            SendResult::Full(message) => panic!(
37                "failed to send message - queue was full when sending {:?}",
38                message
39            ),
40            SendResult::Disconnected => panic!("client was disconnected when sending message"),
41        }
42    }
43}
44
45/// A SPSC sender guaranteed to never block when sending a message. This is a strong constraint, enforced
46/// by WebAssembly on the main thread, so this should only be preferred over other mpsc channels where
47/// non-blocking behaviour is *required*.
48///
49/// Cannot be cloned, so if you want multiple clients to send messages then you need a sender that may block
50/// for very short periods when sending a message - see [`NonBlockingSender::mpsc`].
51pub struct NonBlockingSender<T> {
52    inner: Producer<T, Arc<SharedRb<T, Vec<MaybeUninit<T>>>>>,
53    is_closed: Arc<AtomicBool>,
54}
55impl<T> NonBlockingSender<T> {
56    /// Tries to send a message to the receiving channel without ever blocking, even briefly.
57    ///
58    /// # Result
59    ///
60    /// This method fails if the receiving queue is full, or if the receiver has been dropped.
61    pub fn try_send(&mut self, message: T) -> SendResult<T> {
62        if self.is_closed.load(Ordering::SeqCst) {
63            SendResult::Disconnected
64        } else {
65            let res = self.inner.push(message);
66            if let Err(message) = res {
67                SendResult::Full(message)
68            } else {
69                SendResult::Ok
70            }
71        }
72    }
73
74    /// Makes this channel into a `mpsc` channel, but where the sender is allowed to block for very small durations.
75    pub fn mpsc(self) -> MicroBlockingSender<T> {
76        return MicroBlockingSender {
77            inner: Arc::new(Mutex::new(self)),
78        };
79    }
80}
81
82impl<T> Drop for NonBlockingSender<T> {
83    fn drop(&mut self) {
84        // Cascade closures
85        self.is_closed.store(true, Ordering::SeqCst);
86    }
87}
88
89/// A variant of a [`NonBlockingSender`] which can block but only for very short durations of time, much like the
90/// channel in `std` or `flume`. Holding this variant of sender does not mean that the receiver will ever block
91/// when receiving - the receiver provided by this crate will never block.
92pub struct MicroBlockingSender<T> {
93    inner: Arc<Mutex<NonBlockingSender<T>>>,
94}
95
96impl<T> MicroBlockingSender<T> {
97    /// Adds a state change to the set of changes to enact before the next frame.
98    /// This operation is submitted to the GPU only when `RenderData::submit_changes` is called on the main thread.
99    ///
100    /// This method blocks, but only briefly, and so should not be used on the main thread of a web application.
101    pub fn try_send(&self, message: T) -> SendResult<T> {
102        self.inner.lock().unwrap().try_send(message)
103    }
104}
105
106impl<T> Clone for MicroBlockingSender<T> {
107    fn clone(&self) -> Self {
108        Self {
109            inner: self.inner.clone(),
110        }
111    }
112}
113
114/// The result of trying to receive a message.
115#[must_use]
116#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
117pub enum RecvResult<T> {
118    Ok(T),
119    Empty,
120    Disconnected,
121}
122
123impl<T> RecvResult<T> {
124    /// Checks if the sending end(s) of the channel had been dropped when a message was sent.
125    pub fn is_disconnected(&self) -> bool {
126        matches!(self, Self::Disconnected)
127    }
128
129    /// Checks if either a message was received, or there was nothing in the channel.
130    pub fn is_ok(&self) -> bool {
131        matches!(self, Self::Ok(_) | Self::Empty)
132    }
133}
134impl<T> RecvResult<T> {
135    pub fn unwrap(self) -> Option<T> {
136        match self {
137            RecvResult::Ok(message) => Some(message),
138            RecvResult::Empty => None,
139            RecvResult::Disconnected => panic!("receiver was disconnected when receiving message"),
140        }
141    }
142}
143
144/// A receiver that is guaranteed to never block when receiving messages.
145pub struct NonBlockingReceiver<T> {
146    inner: Consumer<T, Arc<SharedRb<T, Vec<MaybeUninit<T>>>>>,
147    is_closed: Arc<AtomicBool>,
148}
149impl<T> NonBlockingReceiver<T> {
150    /// Tries to send a message to the receiving channel without ever blocking, even briefly.
151    ///
152    /// # Result
153    ///
154    /// This method fails if the receiving queue is full, or if the receiver has been dropped.
155    pub fn try_recv(&mut self) -> RecvResult<T> {
156        if self.is_closed.load(Ordering::SeqCst) {
157            RecvResult::Disconnected
158        } else {
159            let res = self.inner.pop();
160            if let Some(message) = res {
161                RecvResult::Ok(message)
162            } else {
163                RecvResult::Empty
164            }
165        }
166    }
167}
168
169impl<T> Drop for NonBlockingReceiver<T> {
170    fn drop(&mut self) {
171        // Cascade closures
172        self.is_closed.store(true, Ordering::SeqCst);
173    }
174}
175
176pub fn nonblocking_channel<T>(
177    capacity: NonZeroUsize,
178) -> (NonBlockingSender<T>, NonBlockingReceiver<T>) {
179    let (sender, receiver) = HeapRb::<T>::new(capacity.get()).split();
180    let is_closed = Arc::new(AtomicBool::from(false));
181
182    let sender = NonBlockingSender {
183        inner: sender,
184        is_closed: Arc::clone(&is_closed),
185    };
186    let receiver = NonBlockingReceiver {
187        inner: receiver,
188        is_closed,
189    };
190
191    return (sender, receiver);
192}