sync-async-runner 0.1.0

Run async functions synchronously (like coroutines)
Documentation
//! A not-actually-async async runner (for coroutines, etc).
//!
//! Lets you run futures in the current thread, blocking until they await or exit.
//!
//! Primairly intended as a coroutine alternative, when you want a separate thread of executiong
//! but want to run it synchronously with your main thread. Can also be used for testing, and also
//! functions as a basic example of how to execute futures, as a stepping stone to writing your
//! own scheduler.
//!
//! It's not very big, and the code has comments. Check the tests for some simple examples.
//!
//! ```rust
//! use sync_async_runner::runner;
//! use std::task::Poll;
//! use futures_channel::{
//! 	mpsc,
//! 	oneshot,
//! };
//! use futures_util::{
//! 	pin_mut,
//! 	stream::StreamExt,
//! };
//!
//! let (mut sender, receiver) = mpsc::channel(5);
//! sender.try_send(1u32).unwrap();
//! sender.try_send(2).unwrap();
//! sender.try_send(3).unwrap();
//! sender.close_channel();
//! let coro = runner(async move {
//! 	pin_mut!(receiver);
//! 	assert_eq!(receiver.next().await, Some(1));
//! 	assert_eq!(receiver.next().await, Some(2));
//! 	assert_eq!(receiver.next().await, Some(3));
//! 	assert_eq!(receiver.next().await, None);
//! 	42
//! });
//! pin_mut!(coro);
//! // Ready immediately since the messages are all there
//! assert_eq!(coro.as_mut().poll(), Poll::Ready(42));
//!
//! ```
use pin_project::pin_project;
use std::{
	future::Future,
	mem,
	pin::Pin,
	sync::{
		atomic::{
			AtomicBool,
			Ordering,
		},
		Arc,
	},
	task::{
		Context,
		Poll,
		RawWaker,
		RawWakerVTable,
		Waker,
	},
};

/// Wraps a future in a runner, allowing it to run on the current thread
pub fn runner<F: Future>(future: F) -> SimpleRunner<F> {
	// Create the waker data...
	let waker_data = Arc::new(AtomicBool::new(true));
	// Then the waker. See the waker module below for an explanation on what a waker is.
	let waker_obj = waker::create(Arc::clone(&waker_data));
	SimpleRunner {
		future,
		is_awake: waker_data,
		cached_waker: waker_obj,
	}
}

/// The not-actually-async async runner.
///
/// Wraps a future and provides a method to run it on the current thread.
#[pin_project]
pub struct SimpleRunner<F: Future> {
	/// The future we are executing. Futures need to be pinned in order to run, so we
	/// use the pin_project crate to allow us to get a `Pin<&mut F>` from a `Pin<&mut SimpleRunner<F>>`.
	#[pin]
	future: F,
	/// Flag set when a waker says we can run again. See below.
	is_awake: Arc<AtomicBool>,
	/// A pre-created waker pointing to `is_awake` that we can use in `poll`.
	cached_waker: Waker,
}
impl<F: Future> SimpleRunner<F> {
	/// Has a waker been activated for this future?
	///
	/// For example, this will be true if the future was waiting on a channel to have an element for it
	/// to receive and the channel now has one.
	///
	/// You can run an unawakened future, but it will likely just result in it awaiting without making
	/// any progress.
	pub fn is_awake(&self) -> bool {
		self.is_awake.load(Ordering::Acquire)
	}

	/// Resumes the future, running it on the current thread until it awaits or returns.
	///
	/// Returns whether the future has completed or not.
	pub fn poll(self: Pin<&mut Self>) -> Poll<F::Output> {
		// Project pin, to get access to a `Pin<&mut F>`.
		let this = self.project();
		// Clear awake flag now that we have resumed.
		this.is_awake.store(false, Ordering::Release);
		// Create context. This is just a holder for a waker reference, which we created in the constructor.
		// We could also create a new waker here, pointing to the `is_awake` flag.
		let mut ctx = Context::from_waker(&this.cached_waker);
		// Execute the future. Here we just return whether or not the future has completed. A real
		// scheduler would check the return value and either destroy the task if it finished, put it in a
		// "pending" set if it hasn't yet, or some other bookkeeping.
		this.future.poll(&mut ctx)
	}
}

/// Returns a future whose first call to `poll` will return `Pending`.
///
/// All other polls will result in `Ready(())`.
///
/// Can be used with `SimpleRunner` to "yield" from the future while being
/// immediately ready to poll again.
pub fn yield_() -> impl Future<Output = ()> {
	YieldFuture(true)
}
struct YieldFuture(bool);
impl Future for YieldFuture {
	type Output = ();

	fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
		let this = self.get_mut();
		if this.0 {
			// First call
			this.0 = false;
			// Wake immediately since we don't actually wait on anything
			ctx.waker().wake_by_ref();
			Poll::Pending
		} else {
			Poll::Ready(())
		}
	}
}

/// The "waker" implementation.
///
/// When a future returns `Poll::Pending`, it means its waiting on an async operation. However,
/// for efficient processing, the operation needs to be able to tell the future scheduler when the
/// operation has finished, so that the scheduler can know that it should poll the future again.
///
/// The waker is that mechanism. It gets passed, through a `Context`, into the future's `poll` function.
/// The async operation can clone the waker, store it, and call its wake method when it is finished.
/// For example, when waiting on an empty asynchronous queue, the queue can store the waker in it, so
/// that the queue's enqueue method can wake the pending future when a new item is enqueued and it can
/// start again.
///
/// Implementing a waker is a bit tricky, because it does not use the standard traits paradigm that rust uses
/// for other interfaces. Rather, waker "methods" are unsafe function operating on data pointers. The functions
/// are stored in a "vtable", which a reference to which is stored with the data pointer (fun fact: this is similar
/// to how `dyn Trait` objects are implemented). The `RawWaker` struct is a dumb tuple of the data pointer and the
/// vtable, and the `Waker` is a container for the `RawWaker` that provides higher-level methods that call the
/// vtable functions under-the-hood.
///
/// Since our runner doesn't actually do any scheduling, our implementation is very simple: the data pointer is
/// an `Arc`'s data pointer, which just contains an `AtomicBool` which is set upon awaking. The `SimpleRunner`
/// also has a reference to the `AtomicBool` and can read it. A real scheduler would want to, say, transfer the
/// future to an "active" set to be ran next, or do some other bookkeeping.
///
/// Worth pointing out two things:
///
/// * The waker must be thread-safe. Some async operations may be performed on a separate worker thread, which will then
///   call the wake function. Ours is safe due to our use of Arc and atomic operations.
/// * As a collaroy to the above: the waker may be awoken before the future even returns `Poll::Pending`, so any bookkeeping
///   you do has to work when targeting a future that's currently running.
mod waker {
	use super::*;

	/// Creates a waker, referencing the passed in awake flag
	pub fn create(data: Arc<AtomicBool>) -> Waker {
		let raw_waker = RawWaker::new(Arc::into_raw(data) as *const _, &VTABLE);
		unsafe { Waker::from_raw(raw_waker) }
	}

	/// Clones the waker, creating another waker that will wake up the same future
	unsafe fn v_clone(p: *const ()) -> RawWaker {
		let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _);
		let new_ref = Arc::clone(&rc);
		Arc::into_raw(rc); // keep reference we got as an arg alive, by re-leaking it
		RawWaker::new(Arc::into_raw(new_ref) as *const _, &VTABLE)
	}
	/// Flags the waker that the future can be resumed, then deallocates it.
	unsafe fn v_wake(p: *const ()) {
		let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _);
		rc.store(true, Ordering::Release);
	}
	/// Flags the waker that the future can be resumed, without deallocating it.
	unsafe fn v_wake_by_ref(p: *const ()) {
		let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _);
		rc.store(true, Ordering::Release);
		Arc::into_raw(rc); // keep reference we got as an arg alive, by re-leaking it
	}
	/// Deallocates the waker without waking.
	unsafe fn v_drop(p: *const ()) {
		mem::drop(Arc::from_raw(p as *const _))
	}
	/// The "virtual table" that holds all the methods. This must live for the 'static lifetime,
	/// though that's not a problem.
	const VTABLE: RawWakerVTable = RawWakerVTable::new(v_clone, v_wake, v_wake_by_ref, v_drop);
}

#[cfg(test)]
mod tests {
	use super::*;
	use futures_channel::{
		mpsc,
		oneshot,
	};
	use futures_util::{
		pin_mut,
		stream::StreamExt,
	};

	#[test]
	fn channel_take_preset() {
		let (mut sender, receiver) = mpsc::channel(5);
		sender.try_send(1u32).unwrap();
		sender.try_send(2).unwrap();
		sender.try_send(3).unwrap();
		sender.close_channel();
		let coro = runner(async move {
			pin_mut!(receiver);
			assert_eq!(receiver.next().await, Some(1));
			assert_eq!(receiver.next().await, Some(2));
			assert_eq!(receiver.next().await, Some(3));
			assert_eq!(receiver.next().await, None);
			42
		});
		pin_mut!(coro);
		// Ready immediately since the messages are all there
		assert_eq!(coro.as_mut().poll(), Poll::Ready(42));
	}

	#[test]
	fn channel_take_set_during() {
		let (mut sender, receiver) = mpsc::channel::<(u32, oneshot::Sender<u32>)>(5);
		let coro = runner(async move {
			println!("Enter");
			pin_mut!(receiver);
			for i in 0..5u32 {
				println!("Receiving {}", i);
				let (n, ok) = receiver.next().await.unwrap();
				assert_eq!(n, i);
				println!("Received");
				ok.send(n * 2).unwrap();
			}
			assert!(receiver.next().await.is_none());
			64
		});
		pin_mut!(coro);
		// Get to first await
		assert_eq!(coro.as_mut().poll(), Poll::Pending);

		for i in 0..5u32 {
			println!("Sending {}", i);
			// Should be waiting for a message...
			assert!(!coro.is_awake());

			let (result_sender, mut result_receiver) = oneshot::channel();
			// Which we now send...
			sender.try_send((i, result_sender)).unwrap();
			// Now should be awake
			assert!(coro.is_awake());
			// Continue running
			assert_eq!(coro.as_mut().poll(), Poll::Pending);
			// Make sure they responded with the right result
			assert_eq!(result_receiver.try_recv(), Ok(Some(i * 2)));
		}
		// Close channel
		sender.close_channel();
		// Now that the channel is closed, the future can run to completion
		assert_eq!(coro.as_mut().poll(), Poll::Ready(64));
	}

	#[test]
	fn yield_test() {
		let coro = runner(async move {
			yield_().await;
			yield_().await;
			yield_().await;
			return 32;
		});
		pin_mut!(coro);
		assert!(coro.is_awake());
		assert_eq!(coro.as_mut().poll(), Poll::Pending);
		assert!(coro.is_awake());
		assert_eq!(coro.as_mut().poll(), Poll::Pending);
		assert!(coro.is_awake());
		assert_eq!(coro.as_mut().poll(), Poll::Pending);
		assert!(coro.is_awake());
		assert_eq!(coro.as_mut().poll(), Poll::Ready(32));
	}
}