1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
//! A not-actually-async async runner (for coroutines, etc).
//!
//! Lets you run futures in the current thread, blocking until they await or exit.
//!
//! Primairly intended as a coroutine alternative, when you want a separate thread of executiong
//! but want to run it synchronously with your main thread. Can also be used for testing, and also
//! functions as a basic example of how to execute futures, as a stepping stone to writing your
//! own scheduler.
//!
//! It's not very big, and the code has comments. Check the tests for some simple examples.
//!
//! ```rust
//! use sync_async_runner::runner;
//! use std::task::Poll;
//! use futures_channel::{
//! 	mpsc,
//! 	oneshot,
//! };
//! use futures_util::{
//! 	pin_mut,
//! 	stream::StreamExt,
//! };
//!
//! let (mut sender, receiver) = mpsc::channel(5);
//! sender.try_send(1u32).unwrap();
//! sender.try_send(2).unwrap();
//! sender.try_send(3).unwrap();
//! sender.close_channel();
//! let coro = runner(async move {
//! 	pin_mut!(receiver);
//! 	assert_eq!(receiver.next().await, Some(1));
//! 	assert_eq!(receiver.next().await, Some(2));
//! 	assert_eq!(receiver.next().await, Some(3));
//! 	assert_eq!(receiver.next().await, None);
//! 	42
//! });
//! pin_mut!(coro);
//! // Ready immediately since the messages are all there
//! assert_eq!(coro.as_mut().poll(), Poll::Ready(42));
//!
//! ```
use pin_project::pin_project;
use std::{
	future::Future,
	mem,
	pin::Pin,
	sync::{
		atomic::{
			AtomicBool,
			Ordering,
		},
		Arc,
	},
	task::{
		Context,
		Poll,
		RawWaker,
		RawWakerVTable,
		Waker,
	},
};

/// Wraps a future in a runner, allowing it to run on the current thread
pub fn runner<F: Future>(future: F) -> SimpleRunner<F> {
	// Create the waker data...
	let waker_data = Arc::new(AtomicBool::new(true));
	// Then the waker. See the waker module below for an explanation on what a waker is.
	let waker_obj = waker::create(Arc::clone(&waker_data));
	SimpleRunner {
		future,
		is_awake: waker_data,
		cached_waker: waker_obj,
	}
}

/// The not-actually-async async runner.
///
/// Wraps a future and provides a method to run it on the current thread.
#[pin_project]
pub struct SimpleRunner<F: Future> {
	/// The future we are executing. Futures need to be pinned in order to run, so we
	/// use the pin_project crate to allow us to get a `Pin<&mut F>` from a `Pin<&mut SimpleRunner<F>>`.
	#[pin]
	future: F,
	/// Flag set when a waker says we can run again. See below.
	is_awake: Arc<AtomicBool>,
	/// A pre-created waker pointing to `is_awake` that we can use in `poll`.
	cached_waker: Waker,
}
impl<F: Future> SimpleRunner<F> {
	/// Has a waker been activated for this future?
	///
	/// For example, this will be true if the future was waiting on a channel to have an element for it
	/// to receive and the channel now has one.
	///
	/// You can run an unawakened future, but it will likely just result in it awaiting without making
	/// any progress.
	pub fn is_awake(&self) -> bool {
		self.is_awake.load(Ordering::Acquire)
	}

	/// Resumes the future, running it on the current thread until it awaits or returns.
	///
	/// Returns whether the future has completed or not.
	pub fn poll(self: Pin<&mut Self>) -> Poll<F::Output> {
		// Project pin, to get access to a `Pin<&mut F>`.
		let this = self.project();
		// Clear awake flag now that we have resumed.
		this.is_awake.store(false, Ordering::Release);
		// Create context. This is just a holder for a waker reference, which we created in the constructor.
		// We could also create a new waker here, pointing to the `is_awake` flag.
		let mut ctx = Context::from_waker(&this.cached_waker);
		// Execute the future. Here we just return whether or not the future has completed. A real
		// scheduler would check the return value and either destroy the task if it finished, put it in a
		// "pending" set if it hasn't yet, or some other bookkeeping.
		this.future.poll(&mut ctx)
	}
}

/// Returns a future whose first call to `poll` will return `Pending`.
///
/// All other polls will result in `Ready(())`.
///
/// Can be used with `SimpleRunner` to "yield" from the future while being
/// immediately ready to poll again.
pub fn yield_() -> impl Future<Output = ()> {
	YieldFuture(true)
}
struct YieldFuture(bool);
impl Future for YieldFuture {
	type Output = ();

	fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
		let this = self.get_mut();
		if this.0 {
			// First call
			this.0 = false;
			// Wake immediately since we don't actually wait on anything
			ctx.waker().wake_by_ref();
			Poll::Pending
		} else {
			Poll::Ready(())
		}
	}
}

/// The "waker" implementation.
///
/// When a future returns `Poll::Pending`, it means its waiting on an async operation. However,
/// for efficient processing, the operation needs to be able to tell the future scheduler when the
/// operation has finished, so that the scheduler can know that it should poll the future again.
///
/// The waker is that mechanism. It gets passed, through a `Context`, into the future's `poll` function.
/// The async operation can clone the waker, store it, and call its wake method when it is finished.
/// For example, when waiting on an empty asynchronous queue, the queue can store the waker in it, so
/// that the queue's enqueue method can wake the pending future when a new item is enqueued and it can
/// start again.
///
/// Implementing a waker is a bit tricky, because it does not use the standard traits paradigm that rust uses
/// for other interfaces. Rather, waker "methods" are unsafe function operating on data pointers. The functions
/// are stored in a "vtable", which a reference to which is stored with the data pointer (fun fact: this is similar
/// to how `dyn Trait` objects are implemented). The `RawWaker` struct is a dumb tuple of the data pointer and the
/// vtable, and the `Waker` is a container for the `RawWaker` that provides higher-level methods that call the
/// vtable functions under-the-hood.
///
/// Since our runner doesn't actually do any scheduling, our implementation is very simple: the data pointer is
/// an `Arc`'s data pointer, which just contains an `AtomicBool` which is set upon awaking. The `SimpleRunner`
/// also has a reference to the `AtomicBool` and can read it. A real scheduler would want to, say, transfer the
/// future to an "active" set to be ran next, or do some other bookkeeping.
///
/// Worth pointing out two things:
///
/// * The waker must be thread-safe. Some async operations may be performed on a separate worker thread, which will then
///   call the wake function. Ours is safe due to our use of Arc and atomic operations.
/// * As a collaroy to the above: the waker may be awoken before the future even returns `Poll::Pending`, so any bookkeeping
///   you do has to work when targeting a future that's currently running.
mod waker {
	use super::*;

	/// Creates a waker, referencing the passed in awake flag
	pub fn create(data: Arc<AtomicBool>) -> Waker {
		let raw_waker = RawWaker::new(Arc::into_raw(data) as *const _, &VTABLE);
		unsafe { Waker::from_raw(raw_waker) }
	}

	/// Clones the waker, creating another waker that will wake up the same future
	unsafe fn v_clone(p: *const ()) -> RawWaker {
		let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _);
		let new_ref = Arc::clone(&rc);
		Arc::into_raw(rc); // keep reference we got as an arg alive, by re-leaking it
		RawWaker::new(Arc::into_raw(new_ref) as *const _, &VTABLE)
	}
	/// Flags the waker that the future can be resumed, then deallocates it.
	unsafe fn v_wake(p: *const ()) {
		let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _);
		rc.store(true, Ordering::Release);
	}
	/// Flags the waker that the future can be resumed, without deallocating it.
	unsafe fn v_wake_by_ref(p: *const ()) {
		let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _);
		rc.store(true, Ordering::Release);
		Arc::into_raw(rc); // keep reference we got as an arg alive, by re-leaking it
	}
	/// Deallocates the waker without waking.
	unsafe fn v_drop(p: *const ()) {
		mem::drop(Arc::from_raw(p as *const _))
	}
	/// The "virtual table" that holds all the methods. This must live for the 'static lifetime,
	/// though that's not a problem.
	const VTABLE: RawWakerVTable = RawWakerVTable::new(v_clone, v_wake, v_wake_by_ref, v_drop);
}

#[cfg(test)]
mod tests {
	use super::*;
	use futures_channel::{
		mpsc,
		oneshot,
	};
	use futures_util::{
		pin_mut,
		stream::StreamExt,
	};

	#[test]
	fn channel_take_preset() {
		let (mut sender, receiver) = mpsc::channel(5);
		sender.try_send(1u32).unwrap();
		sender.try_send(2).unwrap();
		sender.try_send(3).unwrap();
		sender.close_channel();
		let coro = runner(async move {
			pin_mut!(receiver);
			assert_eq!(receiver.next().await, Some(1));
			assert_eq!(receiver.next().await, Some(2));
			assert_eq!(receiver.next().await, Some(3));
			assert_eq!(receiver.next().await, None);
			42
		});
		pin_mut!(coro);
		// Ready immediately since the messages are all there
		assert_eq!(coro.as_mut().poll(), Poll::Ready(42));
	}

	#[test]
	fn channel_take_set_during() {
		let (mut sender, receiver) = mpsc::channel::<(u32, oneshot::Sender<u32>)>(5);
		let coro = runner(async move {
			println!("Enter");
			pin_mut!(receiver);
			for i in 0..5u32 {
				println!("Receiving {}", i);
				let (n, ok) = receiver.next().await.unwrap();
				assert_eq!(n, i);
				println!("Received");
				ok.send(n * 2).unwrap();
			}
			assert!(receiver.next().await.is_none());
			64
		});
		pin_mut!(coro);
		// Get to first await
		assert_eq!(coro.as_mut().poll(), Poll::Pending);

		for i in 0..5u32 {
			println!("Sending {}", i);
			// Should be waiting for a message...
			assert!(!coro.is_awake());

			let (result_sender, mut result_receiver) = oneshot::channel();
			// Which we now send...
			sender.try_send((i, result_sender)).unwrap();
			// Now should be awake
			assert!(coro.is_awake());
			// Continue running
			assert_eq!(coro.as_mut().poll(), Poll::Pending);
			// Make sure they responded with the right result
			assert_eq!(result_receiver.try_recv(), Ok(Some(i * 2)));
		}
		// Close channel
		sender.close_channel();
		// Now that the channel is closed, the future can run to completion
		assert_eq!(coro.as_mut().poll(), Poll::Ready(64));
	}

	#[test]
	fn yield_test() {
		let coro = runner(async move {
			yield_().await;
			yield_().await;
			yield_().await;
			return 32;
		});
		pin_mut!(coro);
		assert!(coro.is_awake());
		assert_eq!(coro.as_mut().poll(), Poll::Pending);
		assert!(coro.is_awake());
		assert_eq!(coro.as_mut().poll(), Poll::Pending);
		assert!(coro.is_awake());
		assert_eq!(coro.as_mut().poll(), Poll::Pending);
		assert!(coro.is_awake());
		assert_eq!(coro.as_mut().poll(), Poll::Ready(32));
	}
}