async_local_bounded_channel/
lib.rs

1//! A same-producer, same-consumer channel, bounded to a single async task.
2//!
3//! # Implementation details
4//!
5//! Internally, this uses the `generic-array` crate, which utilizes types from
6//! `typenum` to specify the capacity at compile time, allowing the space for
7//! the queue to be allocated inline. Thus, this channel also requires
8//! specifying the capacity upfront at compile time.
9//!
10//! # Examples
11//!
12//! Used together with `futures::future::select`, this can implement something
13//! like a coroutine, where two asynchronous generators cooperate producing
14//! and consuming values.
15//!
16//! ```
17//! # use futures::pin_mut;
18//! # use futures::future::{Either, select};
19//! # use typenum::U8;
20//! # use async_local_bounded_channel::channel;
21//! futures::executor::block_on(async move {
22//!     // create a new channel with a capacity of 8 items
23//!     let mut channel = channel::<_, U8>();
24//!     let (mut tx, mut rx) = channel.split();
25//!     let producer = async move {
26//!         for i in 0..100 {
27//!             tx.send(i).await.expect("consumer still alive");
28//!         }
29//!     };
30//!     let consumer = async move {
31//!         let mut expected = 0;
32//!         loop {
33//!             if let Ok(v) = rx.receive().await {
34//!                 assert_eq!(v, expected);
35//!                 expected += 1;
36//!             } else {
37//!                 break;
38//!             }
39//!         }
40//!     };
41//!     pin_mut!(producer, consumer);
42//!     let remaining = select(producer, consumer).await.factor_first().1;
43//!     match remaining {
44//!         Either::Left(f) => f.await,
45//!         Either::Right(f) => f.await,
46//!     }
47//! });
48//! ```
49//!
50//! This can be useful, for example, when implementing a server. One task can
51//! handle each client, where the producer waits for incoming requests and
52//! writes responses; and the consumer waits for requests, handles them, and
53//! then generates a response.
54//!
55//! # Usage notes
56//!
57//! Once the transmission endpoints have been acquired via `split()`, the
58//! channel cannot be moved. This is required for safety, since each endpoint
59//! contains a reference back to the channel; thus, if the channel were to move,
60//! those references would become dangling.
61//!
62//! ```compile_fail
63//! # use typenum::U8;
64//! # use async_local_bounded_channel::channel;
65//! let mut channel = channel::<isize, U8>();
66//! let (tx, rx) = channel.split();
67//! std::thread::spawn(move || {
68//!     // nope!
69//!     let channel = channel;
70//!     let tx = tx;
71//!     let rx = rx;
72//! });
73//! ```
74//!
75//! Further, endpoints must remain anchored to a single thread, since access
76//! to the underlying data structures is not thread-safe. Unfortunately, this
77//! _isn't_ enforced by the compiler, and scoped thread libraries can allow
78//! unsafe usage. For example:
79//!
80//! ```
81//! # use typenum::U8;
82//! # use async_local_bounded_channel::channel;
83//! // shouldn't compile, but unfortunately does.
84//! let mut channel = channel::<isize, U8>();
85//! crossbeam::thread::scope(|s| {
86//!     let (tx, rx) = channel.split();
87//!     // don't do this!
88//!     s.spawn(move |_| {
89//!         let tx = tx;
90//!     });
91//!     s.spawn(move |_| {
92//!         let rx = rx;
93//!     });
94//! });
95//! ```
96//!
97//! If there are no open endpoints, though, a channel can be safely moved and
98//! sent. A channel can even be re-used after the endpoints are dropped.
99//!
100//! ```
101//! # use futures::executor::block_on;
102//! # use futures::pin_mut;
103//! # use futures::future::{Either, select};
104//! # use typenum::U8;
105//! # use async_local_bounded_channel::channel;
106//! type C = async_local_bounded_channel::Channel<isize, U8>;
107//!
108//! async fn test_channel(mut channel: C) -> C {
109//!     // run the producer-consumer example above.
110//!     # {
111//!     #     let (mut tx, mut rx) = channel.split();
112//!     #     let producer = async move {
113//!     #         for i in 0..100 {
114//!     #             tx.send(i).await.expect("consumer still alive");
115//!     #         }
116//!     #     };
117//!     #     let consumer = async move {
118//!     #         let mut expected = 0;
119//!     #         loop {
120//!     #             if let Ok(v) = rx.receive().await {
121//!     #                 assert_eq!(v, expected);
122//!     #                 expected += 1;
123//!     #             } else {
124//!     #                 break;
125//!     #             }
126//!     #         }
127//!     #     };
128//!     #     pin_mut!(producer, consumer);
129//!     #     let remaining = select(producer, consumer).await.factor_first().1;
130//!     #     match remaining {
131//!     #         Either::Left(f) => f.await,
132//!     #         Either::Right(f) => f.await,
133//!     #     }
134//!     # }
135//!     channel
136//! }
137//!
138//! let channel = channel();
139//! let t = std::thread::spawn(move || {
140//!     let channel = block_on(async move {
141//!        test_channel(channel).await
142//!     });
143//!     block_on(async move {
144//!         test_channel(channel).await
145//!     });
146//! });
147//! t.join().expect("test to pass");
148//! ```
149#![deny(missing_docs)]
150#![warn(rust_2018_idioms)]
151
152use std::future::Future;
153use std::pin::Pin;
154use std::task::{Context, Poll, Waker};
155
156use generic_array::ArrayLength;
157use queue::Queue;
158
159mod queue;
160
161/// Create a bounded channel for communicating within a task.
162pub fn channel<T, N: ArrayLength<Option<T>>>() -> Channel<T, N> {
163    Channel {
164        queue: Queue::new(),
165        close_count: 0,
166        waiter: None,
167    }
168}
169
170/// A same-producer, same-consumer channel.
171pub struct Channel<T, N: ArrayLength<Option<T>>> {
172    // The underlying queue.
173    queue: Queue<T, N>,
174    // A count of the number of endpoints which have been dropped.
175    close_count: u8,
176    // The waker for whichever endpoint is waiting on the other.
177    waiter: Option<Waker>,
178}
179
180impl<T, N: ArrayLength<Option<T>>> Channel<T, N> {
181    /// Split a channel into a pair of sender and receiver endpoints.
182    ///
183    /// This is safe for reasons analogous to why `split_at_mut` works: each
184    /// endpoint has exclusive access to disjoin regions within the collection.
185    /// Since both endpoints must stay within the same task, they execute at any
186    /// moment within one thread, so mutual exclusivity is maintained.
187    pub fn split(&mut self) -> (Sender<'_, T, N>, Receiver<'_, T, N>) {
188        let channel: *mut _ = self as *mut _;
189        let sender = Sender {
190            channel: unsafe { &mut *channel },
191        };
192        let receiver = Receiver {
193            channel: unsafe { &mut *channel },
194        };
195        (sender, receiver)
196    }
197
198    // register the drop of an endpoint.
199    fn close(&mut self) {
200        self.close_count += 1;
201        // if both endpoints have closed, reset the count, so that the channel
202        // could be used further.
203        if self.close_count == 2 {
204            self.close_count = 0;
205        }
206    }
207
208    // determine whether at least one endpoint has dropped.
209    fn pair_endpoint_closed(&self) -> bool {
210        self.close_count > 0
211    }
212}
213
214/// The endpoint of a channel for sending values.
215pub struct Sender<'a, T, N: ArrayLength<Option<T>>> {
216    channel: &'a mut Channel<T, N>,
217}
218
219/// The endpoint of a channel for receiving values.
220pub struct Receiver<'a, T, N: ArrayLength<Option<T>>> {
221    channel: &'a mut Channel<T, N>,
222}
223
224impl<'a, T: Unpin, N: ArrayLength<Option<T>>> Sender<'a, T, N> {
225    /// Asynchronously send a value through the channel.
226    ///
227    /// If the channel is already at full capacity, this will wait until the
228    /// Receiver consumes a value, and then notify that the channel is ready.
229    /// If the receiver endpoint has been dropped, this returns `Err(value)`,
230    /// regardless of whether there is enough capacity.
231    pub fn send(&mut self, value: T) -> impl Future<Output = Result<(), T>> + '_ {
232        Sending {
233            channel: self.channel,
234            value: Some(value),
235        }
236    }
237}
238
239impl<'a, T: Unpin, N: ArrayLength<Option<T>>> Receiver<'a, T, N> {
240    /// Asynchronously receive a value through the channel.
241    ///
242    /// If the channel is empty, this will wait until the sender produces a
243    /// value, and then notify that the channel is ready. If the sender has
244    /// been dropped and the channel is empty, however, this returns `Err(())`.
245    pub fn receive(&mut self) -> impl Future<Output = Result<T, ()>> + '_ {
246        Receiving {
247            channel: self.channel,
248        }
249    }
250}
251
252impl<'a, T, N: ArrayLength<Option<T>>> Drop for Sender<'a, T, N> {
253    fn drop(&mut self) {
254        self.channel.close();
255    }
256}
257
258impl<'a, T, N: ArrayLength<Option<T>>> Drop for Receiver<'a, T, N> {
259    fn drop(&mut self) {
260        self.channel.close();
261    }
262}
263
264struct Sending<'a, T, N: ArrayLength<Option<T>>> {
265    channel: &'a mut Channel<T, N>,
266    value: Option<T>,
267}
268
269impl<'a, T: Unpin, N: ArrayLength<Option<T>>> Future for Sending<'_, T, N> {
270    type Output = Result<(), T>;
271
272    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
273        let value = self.as_mut().value.take().expect("poll not called again");
274        let channel = &mut self.as_mut().channel;
275        if channel.pair_endpoint_closed() {
276            return Poll::Ready(Err(value));
277        }
278        match channel.queue.enqueue(value) {
279            Ok(()) => {
280                if let Some(receiver) = channel.waiter.take() {
281                    receiver.wake()
282                }
283                Poll::Ready(Ok(()))
284            }
285            Err(value) => {
286                channel.waiter = Some(cx.waker().clone());
287                self.as_mut().value = Some(value);
288                Poll::Pending
289            }
290        }
291    }
292}
293
294struct Receiving<'a, T, N: ArrayLength<Option<T>>> {
295    channel: &'a mut Channel<T, N>,
296}
297
298impl<'a, T: Unpin, N: ArrayLength<Option<T>>> Future for Receiving<'_, T, N> {
299    type Output = Result<T, ()>;
300
301    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
302        let channel = &mut self.as_mut().channel;
303        match channel.queue.dequeue() {
304            Some(value) => {
305                if let Some(sender) = channel.waiter.take() {
306                    sender.wake();
307                }
308                Poll::Ready(Ok(value))
309            }
310            None => {
311                if channel.pair_endpoint_closed() {
312                    Poll::Ready(Err(()))
313                } else {
314                    channel.waiter = Some(cx.waker().clone());
315                    Poll::Pending
316                }
317            }
318        }
319    }
320}