sync_async_runner/lib.rs
1//! A not-actually-async async runner (for coroutines, etc).
2//!
3//! Lets you run futures in the current thread, blocking until they await or exit.
4//!
5//! Primairly intended as a coroutine alternative, when you want a separate thread of executiong
6//! but want to run it synchronously with your main thread. Can also be used for testing, and also
7//! functions as a basic example of how to execute futures, as a stepping stone to writing your
8//! own scheduler.
9//!
10//! It's not very big, and the code has comments. Check the tests for some simple examples.
11//!
12//! ```rust
13//! use sync_async_runner::runner;
14//! use std::task::Poll;
15//! use futures_channel::{
16//! mpsc,
17//! oneshot,
18//! };
19//! use futures_util::{
20//! pin_mut,
21//! stream::StreamExt,
22//! };
23//!
24//! let (mut sender, receiver) = mpsc::channel(5);
25//! sender.try_send(1u32).unwrap();
26//! sender.try_send(2).unwrap();
27//! sender.try_send(3).unwrap();
28//! sender.close_channel();
29//! let coro = runner(async move {
30//! pin_mut!(receiver);
31//! assert_eq!(receiver.next().await, Some(1));
32//! assert_eq!(receiver.next().await, Some(2));
33//! assert_eq!(receiver.next().await, Some(3));
34//! assert_eq!(receiver.next().await, None);
35//! 42
36//! });
37//! pin_mut!(coro);
38//! // Ready immediately since the messages are all there
39//! assert_eq!(coro.as_mut().poll(), Poll::Ready(42));
40//!
41//! ```
42use pin_project::pin_project;
43use std::{
44 future::Future,
45 mem,
46 pin::Pin,
47 sync::{
48 atomic::{
49 AtomicBool,
50 Ordering,
51 },
52 Arc,
53 },
54 task::{
55 Context,
56 Poll,
57 RawWaker,
58 RawWakerVTable,
59 Waker,
60 },
61};
62
63/// Wraps a future in a runner, allowing it to run on the current thread
64pub fn runner<F: Future>(future: F) -> SimpleRunner<F> {
65 // Create the waker data...
66 let waker_data = Arc::new(AtomicBool::new(true));
67 // Then the waker. See the waker module below for an explanation on what a waker is.
68 let waker_obj = waker::create(Arc::clone(&waker_data));
69 SimpleRunner {
70 future,
71 is_awake: waker_data,
72 cached_waker: waker_obj,
73 }
74}
75
76/// The not-actually-async async runner.
77///
78/// Wraps a future and provides a method to run it on the current thread.
79#[pin_project]
80pub struct SimpleRunner<F: Future> {
81 /// The future we are executing. Futures need to be pinned in order to run, so we
82 /// use the pin_project crate to allow us to get a `Pin<&mut F>` from a `Pin<&mut SimpleRunner<F>>`.
83 #[pin]
84 future: F,
85 /// Flag set when a waker says we can run again. See below.
86 is_awake: Arc<AtomicBool>,
87 /// A pre-created waker pointing to `is_awake` that we can use in `poll`.
88 cached_waker: Waker,
89}
90impl<F: Future> SimpleRunner<F> {
91 /// Has a waker been activated for this future?
92 ///
93 /// For example, this will be true if the future was waiting on a channel to have an element for it
94 /// to receive and the channel now has one.
95 ///
96 /// You can run an unawakened future, but it will likely just result in it awaiting without making
97 /// any progress.
98 pub fn is_awake(&self) -> bool {
99 self.is_awake.load(Ordering::Acquire)
100 }
101
102 /// Resumes the future, running it on the current thread until it awaits or returns.
103 ///
104 /// Returns whether the future has completed or not.
105 pub fn poll(self: Pin<&mut Self>) -> Poll<F::Output> {
106 // Project pin, to get access to a `Pin<&mut F>`.
107 let this = self.project();
108 // Clear awake flag now that we have resumed.
109 this.is_awake.store(false, Ordering::Release);
110 // Create context. This is just a holder for a waker reference, which we created in the constructor.
111 // We could also create a new waker here, pointing to the `is_awake` flag.
112 let mut ctx = Context::from_waker(&this.cached_waker);
113 // Execute the future. Here we just return whether or not the future has completed. A real
114 // scheduler would check the return value and either destroy the task if it finished, put it in a
115 // "pending" set if it hasn't yet, or some other bookkeeping.
116 this.future.poll(&mut ctx)
117 }
118}
119
120/// Returns a future whose first call to `poll` will return `Pending`.
121///
122/// All other polls will result in `Ready(())`.
123///
124/// Can be used with `SimpleRunner` to "yield" from the future while being
125/// immediately ready to poll again.
126pub fn yield_() -> impl Future<Output = ()> {
127 YieldFuture(true)
128}
129struct YieldFuture(bool);
130impl Future for YieldFuture {
131 type Output = ();
132
133 fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
134 let this = self.get_mut();
135 if this.0 {
136 // First call
137 this.0 = false;
138 // Wake immediately since we don't actually wait on anything
139 ctx.waker().wake_by_ref();
140 Poll::Pending
141 } else {
142 Poll::Ready(())
143 }
144 }
145}
146
147/// The "waker" implementation.
148///
149/// When a future returns `Poll::Pending`, it means its waiting on an async operation. However,
150/// for efficient processing, the operation needs to be able to tell the future scheduler when the
151/// operation has finished, so that the scheduler can know that it should poll the future again.
152///
153/// The waker is that mechanism. It gets passed, through a `Context`, into the future's `poll` function.
154/// The async operation can clone the waker, store it, and call its wake method when it is finished.
155/// For example, when waiting on an empty asynchronous queue, the queue can store the waker in it, so
156/// that the queue's enqueue method can wake the pending future when a new item is enqueued and it can
157/// start again.
158///
159/// Implementing a waker is a bit tricky, because it does not use the standard traits paradigm that rust uses
160/// for other interfaces. Rather, waker "methods" are unsafe function operating on data pointers. The functions
161/// are stored in a "vtable", which a reference to which is stored with the data pointer (fun fact: this is similar
162/// to how `dyn Trait` objects are implemented). The `RawWaker` struct is a dumb tuple of the data pointer and the
163/// vtable, and the `Waker` is a container for the `RawWaker` that provides higher-level methods that call the
164/// vtable functions under-the-hood.
165///
166/// Since our runner doesn't actually do any scheduling, our implementation is very simple: the data pointer is
167/// an `Arc`'s data pointer, which just contains an `AtomicBool` which is set upon awaking. The `SimpleRunner`
168/// also has a reference to the `AtomicBool` and can read it. A real scheduler would want to, say, transfer the
169/// future to an "active" set to be ran next, or do some other bookkeeping.
170///
171/// Worth pointing out two things:
172///
173/// * The waker must be thread-safe. Some async operations may be performed on a separate worker thread, which will then
174/// call the wake function. Ours is safe due to our use of Arc and atomic operations.
175/// * As a collaroy to the above: the waker may be awoken before the future even returns `Poll::Pending`, so any bookkeeping
176/// you do has to work when targeting a future that's currently running.
177mod waker {
178 use super::*;
179
180 /// Creates a waker, referencing the passed in awake flag
181 pub fn create(data: Arc<AtomicBool>) -> Waker {
182 let raw_waker = RawWaker::new(Arc::into_raw(data) as *const _, &VTABLE);
183 unsafe { Waker::from_raw(raw_waker) }
184 }
185
186 /// Clones the waker, creating another waker that will wake up the same future
187 unsafe fn v_clone(p: *const ()) -> RawWaker {
188 let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _);
189 let new_ref = Arc::clone(&rc);
190 Arc::into_raw(rc); // keep reference we got as an arg alive, by re-leaking it
191 RawWaker::new(Arc::into_raw(new_ref) as *const _, &VTABLE)
192 }
193 /// Flags the waker that the future can be resumed, then deallocates it.
194 unsafe fn v_wake(p: *const ()) {
195 let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _);
196 rc.store(true, Ordering::Release);
197 }
198 /// Flags the waker that the future can be resumed, without deallocating it.
199 unsafe fn v_wake_by_ref(p: *const ()) {
200 let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _);
201 rc.store(true, Ordering::Release);
202 Arc::into_raw(rc); // keep reference we got as an arg alive, by re-leaking it
203 }
204 /// Deallocates the waker without waking.
205 unsafe fn v_drop(p: *const ()) {
206 mem::drop(Arc::from_raw(p as *const _))
207 }
208 /// The "virtual table" that holds all the methods. This must live for the 'static lifetime,
209 /// though that's not a problem.
210 const VTABLE: RawWakerVTable = RawWakerVTable::new(v_clone, v_wake, v_wake_by_ref, v_drop);
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216 use futures_channel::{
217 mpsc,
218 oneshot,
219 };
220 use futures_util::{
221 pin_mut,
222 stream::StreamExt,
223 };
224
225 #[test]
226 fn channel_take_preset() {
227 let (mut sender, receiver) = mpsc::channel(5);
228 sender.try_send(1u32).unwrap();
229 sender.try_send(2).unwrap();
230 sender.try_send(3).unwrap();
231 sender.close_channel();
232 let coro = runner(async move {
233 pin_mut!(receiver);
234 assert_eq!(receiver.next().await, Some(1));
235 assert_eq!(receiver.next().await, Some(2));
236 assert_eq!(receiver.next().await, Some(3));
237 assert_eq!(receiver.next().await, None);
238 42
239 });
240 pin_mut!(coro);
241 // Ready immediately since the messages are all there
242 assert_eq!(coro.as_mut().poll(), Poll::Ready(42));
243 }
244
245 #[test]
246 fn channel_take_set_during() {
247 let (mut sender, receiver) = mpsc::channel::<(u32, oneshot::Sender<u32>)>(5);
248 let coro = runner(async move {
249 println!("Enter");
250 pin_mut!(receiver);
251 for i in 0..5u32 {
252 println!("Receiving {}", i);
253 let (n, ok) = receiver.next().await.unwrap();
254 assert_eq!(n, i);
255 println!("Received");
256 ok.send(n * 2).unwrap();
257 }
258 assert!(receiver.next().await.is_none());
259 64
260 });
261 pin_mut!(coro);
262 // Get to first await
263 assert_eq!(coro.as_mut().poll(), Poll::Pending);
264
265 for i in 0..5u32 {
266 println!("Sending {}", i);
267 // Should be waiting for a message...
268 assert!(!coro.is_awake());
269
270 let (result_sender, mut result_receiver) = oneshot::channel();
271 // Which we now send...
272 sender.try_send((i, result_sender)).unwrap();
273 // Now should be awake
274 assert!(coro.is_awake());
275 // Continue running
276 assert_eq!(coro.as_mut().poll(), Poll::Pending);
277 // Make sure they responded with the right result
278 assert_eq!(result_receiver.try_recv(), Ok(Some(i * 2)));
279 }
280 // Close channel
281 sender.close_channel();
282 // Now that the channel is closed, the future can run to completion
283 assert_eq!(coro.as_mut().poll(), Poll::Ready(64));
284 }
285
286 #[test]
287 fn yield_test() {
288 let coro = runner(async move {
289 yield_().await;
290 yield_().await;
291 yield_().await;
292 return 32;
293 });
294 pin_mut!(coro);
295 assert!(coro.is_awake());
296 assert_eq!(coro.as_mut().poll(), Poll::Pending);
297 assert!(coro.is_awake());
298 assert_eq!(coro.as_mut().poll(), Poll::Pending);
299 assert!(coro.is_awake());
300 assert_eq!(coro.as_mut().poll(), Poll::Pending);
301 assert!(coro.is_awake());
302 assert_eq!(coro.as_mut().poll(), Poll::Ready(32));
303 }
304}