sync_async_runner/
lib.rs

1//! A not-actually-async async runner (for coroutines, etc).
2//!
3//! Lets you run futures in the current thread, blocking until they await or exit.
4//!
5//! Primairly intended as a coroutine alternative, when you want a separate thread of executiong
6//! but want to run it synchronously with your main thread. Can also be used for testing, and also
7//! functions as a basic example of how to execute futures, as a stepping stone to writing your
8//! own scheduler.
9//!
10//! It's not very big, and the code has comments. Check the tests for some simple examples.
11//!
12//! ```rust
13//! use sync_async_runner::runner;
14//! use std::task::Poll;
15//! use futures_channel::{
16//! 	mpsc,
17//! 	oneshot,
18//! };
19//! use futures_util::{
20//! 	pin_mut,
21//! 	stream::StreamExt,
22//! };
23//!
24//! let (mut sender, receiver) = mpsc::channel(5);
25//! sender.try_send(1u32).unwrap();
26//! sender.try_send(2).unwrap();
27//! sender.try_send(3).unwrap();
28//! sender.close_channel();
29//! let coro = runner(async move {
30//! 	pin_mut!(receiver);
31//! 	assert_eq!(receiver.next().await, Some(1));
32//! 	assert_eq!(receiver.next().await, Some(2));
33//! 	assert_eq!(receiver.next().await, Some(3));
34//! 	assert_eq!(receiver.next().await, None);
35//! 	42
36//! });
37//! pin_mut!(coro);
38//! // Ready immediately since the messages are all there
39//! assert_eq!(coro.as_mut().poll(), Poll::Ready(42));
40//!
41//! ```
42use pin_project::pin_project;
43use std::{
44	future::Future,
45	mem,
46	pin::Pin,
47	sync::{
48		atomic::{
49			AtomicBool,
50			Ordering,
51		},
52		Arc,
53	},
54	task::{
55		Context,
56		Poll,
57		RawWaker,
58		RawWakerVTable,
59		Waker,
60	},
61};
62
63/// Wraps a future in a runner, allowing it to run on the current thread
64pub fn runner<F: Future>(future: F) -> SimpleRunner<F> {
65	// Create the waker data...
66	let waker_data = Arc::new(AtomicBool::new(true));
67	// Then the waker. See the waker module below for an explanation on what a waker is.
68	let waker_obj = waker::create(Arc::clone(&waker_data));
69	SimpleRunner {
70		future,
71		is_awake: waker_data,
72		cached_waker: waker_obj,
73	}
74}
75
76/// The not-actually-async async runner.
77///
78/// Wraps a future and provides a method to run it on the current thread.
79#[pin_project]
80pub struct SimpleRunner<F: Future> {
81	/// The future we are executing. Futures need to be pinned in order to run, so we
82	/// use the pin_project crate to allow us to get a `Pin<&mut F>` from a `Pin<&mut SimpleRunner<F>>`.
83	#[pin]
84	future: F,
85	/// Flag set when a waker says we can run again. See below.
86	is_awake: Arc<AtomicBool>,
87	/// A pre-created waker pointing to `is_awake` that we can use in `poll`.
88	cached_waker: Waker,
89}
90impl<F: Future> SimpleRunner<F> {
91	/// Has a waker been activated for this future?
92	///
93	/// For example, this will be true if the future was waiting on a channel to have an element for it
94	/// to receive and the channel now has one.
95	///
96	/// You can run an unawakened future, but it will likely just result in it awaiting without making
97	/// any progress.
98	pub fn is_awake(&self) -> bool {
99		self.is_awake.load(Ordering::Acquire)
100	}
101
102	/// Resumes the future, running it on the current thread until it awaits or returns.
103	///
104	/// Returns whether the future has completed or not.
105	pub fn poll(self: Pin<&mut Self>) -> Poll<F::Output> {
106		// Project pin, to get access to a `Pin<&mut F>`.
107		let this = self.project();
108		// Clear awake flag now that we have resumed.
109		this.is_awake.store(false, Ordering::Release);
110		// Create context. This is just a holder for a waker reference, which we created in the constructor.
111		// We could also create a new waker here, pointing to the `is_awake` flag.
112		let mut ctx = Context::from_waker(&this.cached_waker);
113		// Execute the future. Here we just return whether or not the future has completed. A real
114		// scheduler would check the return value and either destroy the task if it finished, put it in a
115		// "pending" set if it hasn't yet, or some other bookkeeping.
116		this.future.poll(&mut ctx)
117	}
118}
119
120/// Returns a future whose first call to `poll` will return `Pending`.
121///
122/// All other polls will result in `Ready(())`.
123///
124/// Can be used with `SimpleRunner` to "yield" from the future while being
125/// immediately ready to poll again.
126pub fn yield_() -> impl Future<Output = ()> {
127	YieldFuture(true)
128}
129struct YieldFuture(bool);
130impl Future for YieldFuture {
131	type Output = ();
132
133	fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
134		let this = self.get_mut();
135		if this.0 {
136			// First call
137			this.0 = false;
138			// Wake immediately since we don't actually wait on anything
139			ctx.waker().wake_by_ref();
140			Poll::Pending
141		} else {
142			Poll::Ready(())
143		}
144	}
145}
146
147/// The "waker" implementation.
148///
149/// When a future returns `Poll::Pending`, it means its waiting on an async operation. However,
150/// for efficient processing, the operation needs to be able to tell the future scheduler when the
151/// operation has finished, so that the scheduler can know that it should poll the future again.
152///
153/// The waker is that mechanism. It gets passed, through a `Context`, into the future's `poll` function.
154/// The async operation can clone the waker, store it, and call its wake method when it is finished.
155/// For example, when waiting on an empty asynchronous queue, the queue can store the waker in it, so
156/// that the queue's enqueue method can wake the pending future when a new item is enqueued and it can
157/// start again.
158///
159/// Implementing a waker is a bit tricky, because it does not use the standard traits paradigm that rust uses
160/// for other interfaces. Rather, waker "methods" are unsafe function operating on data pointers. The functions
161/// are stored in a "vtable", which a reference to which is stored with the data pointer (fun fact: this is similar
162/// to how `dyn Trait` objects are implemented). The `RawWaker` struct is a dumb tuple of the data pointer and the
163/// vtable, and the `Waker` is a container for the `RawWaker` that provides higher-level methods that call the
164/// vtable functions under-the-hood.
165///
166/// Since our runner doesn't actually do any scheduling, our implementation is very simple: the data pointer is
167/// an `Arc`'s data pointer, which just contains an `AtomicBool` which is set upon awaking. The `SimpleRunner`
168/// also has a reference to the `AtomicBool` and can read it. A real scheduler would want to, say, transfer the
169/// future to an "active" set to be ran next, or do some other bookkeeping.
170///
171/// Worth pointing out two things:
172///
173/// * The waker must be thread-safe. Some async operations may be performed on a separate worker thread, which will then
174///   call the wake function. Ours is safe due to our use of Arc and atomic operations.
175/// * As a collaroy to the above: the waker may be awoken before the future even returns `Poll::Pending`, so any bookkeeping
176///   you do has to work when targeting a future that's currently running.
177mod waker {
178	use super::*;
179
180	/// Creates a waker, referencing the passed in awake flag
181	pub fn create(data: Arc<AtomicBool>) -> Waker {
182		let raw_waker = RawWaker::new(Arc::into_raw(data) as *const _, &VTABLE);
183		unsafe { Waker::from_raw(raw_waker) }
184	}
185
186	/// Clones the waker, creating another waker that will wake up the same future
187	unsafe fn v_clone(p: *const ()) -> RawWaker {
188		let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _);
189		let new_ref = Arc::clone(&rc);
190		Arc::into_raw(rc); // keep reference we got as an arg alive, by re-leaking it
191		RawWaker::new(Arc::into_raw(new_ref) as *const _, &VTABLE)
192	}
193	/// Flags the waker that the future can be resumed, then deallocates it.
194	unsafe fn v_wake(p: *const ()) {
195		let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _);
196		rc.store(true, Ordering::Release);
197	}
198	/// Flags the waker that the future can be resumed, without deallocating it.
199	unsafe fn v_wake_by_ref(p: *const ()) {
200		let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _);
201		rc.store(true, Ordering::Release);
202		Arc::into_raw(rc); // keep reference we got as an arg alive, by re-leaking it
203	}
204	/// Deallocates the waker without waking.
205	unsafe fn v_drop(p: *const ()) {
206		mem::drop(Arc::from_raw(p as *const _))
207	}
208	/// The "virtual table" that holds all the methods. This must live for the 'static lifetime,
209	/// though that's not a problem.
210	const VTABLE: RawWakerVTable = RawWakerVTable::new(v_clone, v_wake, v_wake_by_ref, v_drop);
211}
212
213#[cfg(test)]
214mod tests {
215	use super::*;
216	use futures_channel::{
217		mpsc,
218		oneshot,
219	};
220	use futures_util::{
221		pin_mut,
222		stream::StreamExt,
223	};
224
225	#[test]
226	fn channel_take_preset() {
227		let (mut sender, receiver) = mpsc::channel(5);
228		sender.try_send(1u32).unwrap();
229		sender.try_send(2).unwrap();
230		sender.try_send(3).unwrap();
231		sender.close_channel();
232		let coro = runner(async move {
233			pin_mut!(receiver);
234			assert_eq!(receiver.next().await, Some(1));
235			assert_eq!(receiver.next().await, Some(2));
236			assert_eq!(receiver.next().await, Some(3));
237			assert_eq!(receiver.next().await, None);
238			42
239		});
240		pin_mut!(coro);
241		// Ready immediately since the messages are all there
242		assert_eq!(coro.as_mut().poll(), Poll::Ready(42));
243	}
244
245	#[test]
246	fn channel_take_set_during() {
247		let (mut sender, receiver) = mpsc::channel::<(u32, oneshot::Sender<u32>)>(5);
248		let coro = runner(async move {
249			println!("Enter");
250			pin_mut!(receiver);
251			for i in 0..5u32 {
252				println!("Receiving {}", i);
253				let (n, ok) = receiver.next().await.unwrap();
254				assert_eq!(n, i);
255				println!("Received");
256				ok.send(n * 2).unwrap();
257			}
258			assert!(receiver.next().await.is_none());
259			64
260		});
261		pin_mut!(coro);
262		// Get to first await
263		assert_eq!(coro.as_mut().poll(), Poll::Pending);
264
265		for i in 0..5u32 {
266			println!("Sending {}", i);
267			// Should be waiting for a message...
268			assert!(!coro.is_awake());
269
270			let (result_sender, mut result_receiver) = oneshot::channel();
271			// Which we now send...
272			sender.try_send((i, result_sender)).unwrap();
273			// Now should be awake
274			assert!(coro.is_awake());
275			// Continue running
276			assert_eq!(coro.as_mut().poll(), Poll::Pending);
277			// Make sure they responded with the right result
278			assert_eq!(result_receiver.try_recv(), Ok(Some(i * 2)));
279		}
280		// Close channel
281		sender.close_channel();
282		// Now that the channel is closed, the future can run to completion
283		assert_eq!(coro.as_mut().poll(), Poll::Ready(64));
284	}
285
286	#[test]
287	fn yield_test() {
288		let coro = runner(async move {
289			yield_().await;
290			yield_().await;
291			yield_().await;
292			return 32;
293		});
294		pin_mut!(coro);
295		assert!(coro.is_awake());
296		assert_eq!(coro.as_mut().poll(), Poll::Pending);
297		assert!(coro.is_awake());
298		assert_eq!(coro.as_mut().poll(), Poll::Pending);
299		assert!(coro.is_awake());
300		assert_eq!(coro.as_mut().poll(), Poll::Pending);
301		assert!(coro.is_awake());
302		assert_eq!(coro.as_mut().poll(), Poll::Ready(32));
303	}
304}