async_local_bounded_channel/lib.rs
1//! A same-producer, same-consumer channel, bounded to a single async task.
2//!
3//! # Implementation details
4//!
5//! Internally, this uses the `generic-array` crate, which utilizes types from
6//! `typenum` to specify the capacity at compile time, allowing the space for
7//! the queue to be allocated inline. Thus, this channel also requires
8//! specifying the capacity upfront at compile time.
9//!
10//! # Examples
11//!
12//! Used together with `futures::future::select`, this can implement something
13//! like a coroutine, where two asynchronous generators cooperate producing
14//! and consuming values.
15//!
16//! ```
17//! # use futures::pin_mut;
18//! # use futures::future::{Either, select};
19//! # use typenum::U8;
20//! # use async_local_bounded_channel::channel;
21//! futures::executor::block_on(async move {
22//! // create a new channel with a capacity of 8 items
23//! let mut channel = channel::<_, U8>();
24//! let (mut tx, mut rx) = channel.split();
25//! let producer = async move {
26//! for i in 0..100 {
27//! tx.send(i).await.expect("consumer still alive");
28//! }
29//! };
30//! let consumer = async move {
31//! let mut expected = 0;
32//! loop {
33//! if let Ok(v) = rx.receive().await {
34//! assert_eq!(v, expected);
35//! expected += 1;
36//! } else {
37//! break;
38//! }
39//! }
40//! };
41//! pin_mut!(producer, consumer);
42//! let remaining = select(producer, consumer).await.factor_first().1;
43//! match remaining {
44//! Either::Left(f) => f.await,
45//! Either::Right(f) => f.await,
46//! }
47//! });
48//! ```
49//!
50//! This can be useful, for example, when implementing a server. One task can
51//! handle each client, where the producer waits for incoming requests and
52//! writes responses; and the consumer waits for requests, handles them, and
53//! then generates a response.
54//!
55//! # Usage notes
56//!
57//! Once the transmission endpoints have been acquired via `split()`, the
58//! channel cannot be moved. This is required for safety, since each endpoint
59//! contains a reference back to the channel; thus, if the channel were to move,
60//! those references would become dangling.
61//!
62//! ```compile_fail
63//! # use typenum::U8;
64//! # use async_local_bounded_channel::channel;
65//! let mut channel = channel::<isize, U8>();
66//! let (tx, rx) = channel.split();
67//! std::thread::spawn(move || {
68//! // nope!
69//! let channel = channel;
70//! let tx = tx;
71//! let rx = rx;
72//! });
73//! ```
74//!
75//! Further, endpoints must remain anchored to a single thread, since access
76//! to the underlying data structures is not thread-safe. Unfortunately, this
77//! _isn't_ enforced by the compiler, and scoped thread libraries can allow
78//! unsafe usage. For example:
79//!
80//! ```
81//! # use typenum::U8;
82//! # use async_local_bounded_channel::channel;
83//! // shouldn't compile, but unfortunately does.
84//! let mut channel = channel::<isize, U8>();
85//! crossbeam::thread::scope(|s| {
86//! let (tx, rx) = channel.split();
87//! // don't do this!
88//! s.spawn(move |_| {
89//! let tx = tx;
90//! });
91//! s.spawn(move |_| {
92//! let rx = rx;
93//! });
94//! });
95//! ```
96//!
97//! If there are no open endpoints, though, a channel can be safely moved and
98//! sent. A channel can even be re-used after the endpoints are dropped.
99//!
100//! ```
101//! # use futures::executor::block_on;
102//! # use futures::pin_mut;
103//! # use futures::future::{Either, select};
104//! # use typenum::U8;
105//! # use async_local_bounded_channel::channel;
106//! type C = async_local_bounded_channel::Channel<isize, U8>;
107//!
108//! async fn test_channel(mut channel: C) -> C {
109//! // run the producer-consumer example above.
110//! # {
111//! # let (mut tx, mut rx) = channel.split();
112//! # let producer = async move {
113//! # for i in 0..100 {
114//! # tx.send(i).await.expect("consumer still alive");
115//! # }
116//! # };
117//! # let consumer = async move {
118//! # let mut expected = 0;
119//! # loop {
120//! # if let Ok(v) = rx.receive().await {
121//! # assert_eq!(v, expected);
122//! # expected += 1;
123//! # } else {
124//! # break;
125//! # }
126//! # }
127//! # };
128//! # pin_mut!(producer, consumer);
129//! # let remaining = select(producer, consumer).await.factor_first().1;
130//! # match remaining {
131//! # Either::Left(f) => f.await,
132//! # Either::Right(f) => f.await,
133//! # }
134//! # }
135//! channel
136//! }
137//!
138//! let channel = channel();
139//! let t = std::thread::spawn(move || {
140//! let channel = block_on(async move {
141//! test_channel(channel).await
142//! });
143//! block_on(async move {
144//! test_channel(channel).await
145//! });
146//! });
147//! t.join().expect("test to pass");
148//! ```
149#![deny(missing_docs)]
150#![warn(rust_2018_idioms)]
151
152use std::future::Future;
153use std::pin::Pin;
154use std::task::{Context, Poll, Waker};
155
156use generic_array::ArrayLength;
157use queue::Queue;
158
159mod queue;
160
161/// Create a bounded channel for communicating within a task.
162pub fn channel<T, N: ArrayLength<Option<T>>>() -> Channel<T, N> {
163 Channel {
164 queue: Queue::new(),
165 close_count: 0,
166 waiter: None,
167 }
168}
169
170/// A same-producer, same-consumer channel.
171pub struct Channel<T, N: ArrayLength<Option<T>>> {
172 // The underlying queue.
173 queue: Queue<T, N>,
174 // A count of the number of endpoints which have been dropped.
175 close_count: u8,
176 // The waker for whichever endpoint is waiting on the other.
177 waiter: Option<Waker>,
178}
179
180impl<T, N: ArrayLength<Option<T>>> Channel<T, N> {
181 /// Split a channel into a pair of sender and receiver endpoints.
182 ///
183 /// This is safe for reasons analogous to why `split_at_mut` works: each
184 /// endpoint has exclusive access to disjoin regions within the collection.
185 /// Since both endpoints must stay within the same task, they execute at any
186 /// moment within one thread, so mutual exclusivity is maintained.
187 pub fn split(&mut self) -> (Sender<'_, T, N>, Receiver<'_, T, N>) {
188 let channel: *mut _ = self as *mut _;
189 let sender = Sender {
190 channel: unsafe { &mut *channel },
191 };
192 let receiver = Receiver {
193 channel: unsafe { &mut *channel },
194 };
195 (sender, receiver)
196 }
197
198 // register the drop of an endpoint.
199 fn close(&mut self) {
200 self.close_count += 1;
201 // if both endpoints have closed, reset the count, so that the channel
202 // could be used further.
203 if self.close_count == 2 {
204 self.close_count = 0;
205 }
206 }
207
208 // determine whether at least one endpoint has dropped.
209 fn pair_endpoint_closed(&self) -> bool {
210 self.close_count > 0
211 }
212}
213
214/// The endpoint of a channel for sending values.
215pub struct Sender<'a, T, N: ArrayLength<Option<T>>> {
216 channel: &'a mut Channel<T, N>,
217}
218
219/// The endpoint of a channel for receiving values.
220pub struct Receiver<'a, T, N: ArrayLength<Option<T>>> {
221 channel: &'a mut Channel<T, N>,
222}
223
224impl<'a, T: Unpin, N: ArrayLength<Option<T>>> Sender<'a, T, N> {
225 /// Asynchronously send a value through the channel.
226 ///
227 /// If the channel is already at full capacity, this will wait until the
228 /// Receiver consumes a value, and then notify that the channel is ready.
229 /// If the receiver endpoint has been dropped, this returns `Err(value)`,
230 /// regardless of whether there is enough capacity.
231 pub fn send(&mut self, value: T) -> impl Future<Output = Result<(), T>> + '_ {
232 Sending {
233 channel: self.channel,
234 value: Some(value),
235 }
236 }
237}
238
239impl<'a, T: Unpin, N: ArrayLength<Option<T>>> Receiver<'a, T, N> {
240 /// Asynchronously receive a value through the channel.
241 ///
242 /// If the channel is empty, this will wait until the sender produces a
243 /// value, and then notify that the channel is ready. If the sender has
244 /// been dropped and the channel is empty, however, this returns `Err(())`.
245 pub fn receive(&mut self) -> impl Future<Output = Result<T, ()>> + '_ {
246 Receiving {
247 channel: self.channel,
248 }
249 }
250}
251
252impl<'a, T, N: ArrayLength<Option<T>>> Drop for Sender<'a, T, N> {
253 fn drop(&mut self) {
254 self.channel.close();
255 }
256}
257
258impl<'a, T, N: ArrayLength<Option<T>>> Drop for Receiver<'a, T, N> {
259 fn drop(&mut self) {
260 self.channel.close();
261 }
262}
263
264struct Sending<'a, T, N: ArrayLength<Option<T>>> {
265 channel: &'a mut Channel<T, N>,
266 value: Option<T>,
267}
268
269impl<'a, T: Unpin, N: ArrayLength<Option<T>>> Future for Sending<'_, T, N> {
270 type Output = Result<(), T>;
271
272 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
273 let value = self.as_mut().value.take().expect("poll not called again");
274 let channel = &mut self.as_mut().channel;
275 if channel.pair_endpoint_closed() {
276 return Poll::Ready(Err(value));
277 }
278 match channel.queue.enqueue(value) {
279 Ok(()) => {
280 if let Some(receiver) = channel.waiter.take() {
281 receiver.wake()
282 }
283 Poll::Ready(Ok(()))
284 }
285 Err(value) => {
286 channel.waiter = Some(cx.waker().clone());
287 self.as_mut().value = Some(value);
288 Poll::Pending
289 }
290 }
291 }
292}
293
294struct Receiving<'a, T, N: ArrayLength<Option<T>>> {
295 channel: &'a mut Channel<T, N>,
296}
297
298impl<'a, T: Unpin, N: ArrayLength<Option<T>>> Future for Receiving<'_, T, N> {
299 type Output = Result<T, ()>;
300
301 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
302 let channel = &mut self.as_mut().channel;
303 match channel.queue.dequeue() {
304 Some(value) => {
305 if let Some(sender) = channel.waiter.take() {
306 sender.wake();
307 }
308 Poll::Ready(Ok(value))
309 }
310 None => {
311 if channel.pair_endpoint_closed() {
312 Poll::Ready(Err(()))
313 } else {
314 channel.waiter = Some(cx.waker().clone());
315 Poll::Pending
316 }
317 }
318 }
319 }
320}