1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
//! A not-actually-async async runner (for coroutines, etc). //! //! Lets you run futures in the current thread, blocking until they await or exit. //! //! Primairly intended as a coroutine alternative, when you want a separate thread of executiong //! but want to run it synchronously with your main thread. Can also be used for testing, and also //! functions as a basic example of how to execute futures, as a stepping stone to writing your //! own scheduler. //! //! It's not very big, and the code has comments. Check the tests for some simple examples. //! //! ```rust //! use sync_async_runner::runner; //! use std::task::Poll; //! use futures_channel::{ //! mpsc, //! oneshot, //! }; //! use futures_util::{ //! pin_mut, //! stream::StreamExt, //! }; //! //! let (mut sender, receiver) = mpsc::channel(5); //! sender.try_send(1u32).unwrap(); //! sender.try_send(2).unwrap(); //! sender.try_send(3).unwrap(); //! sender.close_channel(); //! let coro = runner(async move { //! pin_mut!(receiver); //! assert_eq!(receiver.next().await, Some(1)); //! assert_eq!(receiver.next().await, Some(2)); //! assert_eq!(receiver.next().await, Some(3)); //! assert_eq!(receiver.next().await, None); //! 42 //! }); //! pin_mut!(coro); //! // Ready immediately since the messages are all there //! assert_eq!(coro.as_mut().poll(), Poll::Ready(42)); //! //! ``` use pin_project::pin_project; use std::{ future::Future, mem, pin::Pin, sync::{ atomic::{ AtomicBool, Ordering, }, Arc, }, task::{ Context, Poll, RawWaker, RawWakerVTable, Waker, }, }; /// Wraps a future in a runner, allowing it to run on the current thread pub fn runner<F: Future>(future: F) -> SimpleRunner<F> { // Create the waker data... let waker_data = Arc::new(AtomicBool::new(true)); // Then the waker. See the waker module below for an explanation on what a waker is. let waker_obj = waker::create(Arc::clone(&waker_data)); SimpleRunner { future, is_awake: waker_data, cached_waker: waker_obj, } } /// The not-actually-async async runner. /// /// Wraps a future and provides a method to run it on the current thread. #[pin_project] pub struct SimpleRunner<F: Future> { /// The future we are executing. Futures need to be pinned in order to run, so we /// use the pin_project crate to allow us to get a `Pin<&mut F>` from a `Pin<&mut SimpleRunner<F>>`. #[pin] future: F, /// Flag set when a waker says we can run again. See below. is_awake: Arc<AtomicBool>, /// A pre-created waker pointing to `is_awake` that we can use in `poll`. cached_waker: Waker, } impl<F: Future> SimpleRunner<F> { /// Has a waker been activated for this future? /// /// For example, this will be true if the future was waiting on a channel to have an element for it /// to receive and the channel now has one. /// /// You can run an unawakened future, but it will likely just result in it awaiting without making /// any progress. pub fn is_awake(&self) -> bool { self.is_awake.load(Ordering::Acquire) } /// Resumes the future, running it on the current thread until it awaits or returns. /// /// Returns whether the future has completed or not. pub fn poll(self: Pin<&mut Self>) -> Poll<F::Output> { // Project pin, to get access to a `Pin<&mut F>`. let this = self.project(); // Clear awake flag now that we have resumed. this.is_awake.store(false, Ordering::Release); // Create context. This is just a holder for a waker reference, which we created in the constructor. // We could also create a new waker here, pointing to the `is_awake` flag. let mut ctx = Context::from_waker(&this.cached_waker); // Execute the future. Here we just return whether or not the future has completed. A real // scheduler would check the return value and either destroy the task if it finished, put it in a // "pending" set if it hasn't yet, or some other bookkeeping. this.future.poll(&mut ctx) } } /// Returns a future whose first call to `poll` will return `Pending`. /// /// All other polls will result in `Ready(())`. /// /// Can be used with `SimpleRunner` to "yield" from the future while being /// immediately ready to poll again. pub fn yield_() -> impl Future<Output = ()> { YieldFuture(true) } struct YieldFuture(bool); impl Future for YieldFuture { type Output = (); fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> { let this = self.get_mut(); if this.0 { // First call this.0 = false; // Wake immediately since we don't actually wait on anything ctx.waker().wake_by_ref(); Poll::Pending } else { Poll::Ready(()) } } } /// The "waker" implementation. /// /// When a future returns `Poll::Pending`, it means its waiting on an async operation. However, /// for efficient processing, the operation needs to be able to tell the future scheduler when the /// operation has finished, so that the scheduler can know that it should poll the future again. /// /// The waker is that mechanism. It gets passed, through a `Context`, into the future's `poll` function. /// The async operation can clone the waker, store it, and call its wake method when it is finished. /// For example, when waiting on an empty asynchronous queue, the queue can store the waker in it, so /// that the queue's enqueue method can wake the pending future when a new item is enqueued and it can /// start again. /// /// Implementing a waker is a bit tricky, because it does not use the standard traits paradigm that rust uses /// for other interfaces. Rather, waker "methods" are unsafe function operating on data pointers. The functions /// are stored in a "vtable", which a reference to which is stored with the data pointer (fun fact: this is similar /// to how `dyn Trait` objects are implemented). The `RawWaker` struct is a dumb tuple of the data pointer and the /// vtable, and the `Waker` is a container for the `RawWaker` that provides higher-level methods that call the /// vtable functions under-the-hood. /// /// Since our runner doesn't actually do any scheduling, our implementation is very simple: the data pointer is /// an `Arc`'s data pointer, which just contains an `AtomicBool` which is set upon awaking. The `SimpleRunner` /// also has a reference to the `AtomicBool` and can read it. A real scheduler would want to, say, transfer the /// future to an "active" set to be ran next, or do some other bookkeeping. /// /// Worth pointing out two things: /// /// * The waker must be thread-safe. Some async operations may be performed on a separate worker thread, which will then /// call the wake function. Ours is safe due to our use of Arc and atomic operations. /// * As a collaroy to the above: the waker may be awoken before the future even returns `Poll::Pending`, so any bookkeeping /// you do has to work when targeting a future that's currently running. mod waker { use super::*; /// Creates a waker, referencing the passed in awake flag pub fn create(data: Arc<AtomicBool>) -> Waker { let raw_waker = RawWaker::new(Arc::into_raw(data) as *const _, &VTABLE); unsafe { Waker::from_raw(raw_waker) } } /// Clones the waker, creating another waker that will wake up the same future unsafe fn v_clone(p: *const ()) -> RawWaker { let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _); let new_ref = Arc::clone(&rc); Arc::into_raw(rc); // keep reference we got as an arg alive, by re-leaking it RawWaker::new(Arc::into_raw(new_ref) as *const _, &VTABLE) } /// Flags the waker that the future can be resumed, then deallocates it. unsafe fn v_wake(p: *const ()) { let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _); rc.store(true, Ordering::Release); } /// Flags the waker that the future can be resumed, without deallocating it. unsafe fn v_wake_by_ref(p: *const ()) { let rc: Arc<AtomicBool> = Arc::from_raw(p as *const _); rc.store(true, Ordering::Release); Arc::into_raw(rc); // keep reference we got as an arg alive, by re-leaking it } /// Deallocates the waker without waking. unsafe fn v_drop(p: *const ()) { mem::drop(Arc::from_raw(p as *const _)) } /// The "virtual table" that holds all the methods. This must live for the 'static lifetime, /// though that's not a problem. const VTABLE: RawWakerVTable = RawWakerVTable::new(v_clone, v_wake, v_wake_by_ref, v_drop); } #[cfg(test)] mod tests { use super::*; use futures_channel::{ mpsc, oneshot, }; use futures_util::{ pin_mut, stream::StreamExt, }; #[test] fn channel_take_preset() { let (mut sender, receiver) = mpsc::channel(5); sender.try_send(1u32).unwrap(); sender.try_send(2).unwrap(); sender.try_send(3).unwrap(); sender.close_channel(); let coro = runner(async move { pin_mut!(receiver); assert_eq!(receiver.next().await, Some(1)); assert_eq!(receiver.next().await, Some(2)); assert_eq!(receiver.next().await, Some(3)); assert_eq!(receiver.next().await, None); 42 }); pin_mut!(coro); // Ready immediately since the messages are all there assert_eq!(coro.as_mut().poll(), Poll::Ready(42)); } #[test] fn channel_take_set_during() { let (mut sender, receiver) = mpsc::channel::<(u32, oneshot::Sender<u32>)>(5); let coro = runner(async move { println!("Enter"); pin_mut!(receiver); for i in 0..5u32 { println!("Receiving {}", i); let (n, ok) = receiver.next().await.unwrap(); assert_eq!(n, i); println!("Received"); ok.send(n * 2).unwrap(); } assert!(receiver.next().await.is_none()); 64 }); pin_mut!(coro); // Get to first await assert_eq!(coro.as_mut().poll(), Poll::Pending); for i in 0..5u32 { println!("Sending {}", i); // Should be waiting for a message... assert!(!coro.is_awake()); let (result_sender, mut result_receiver) = oneshot::channel(); // Which we now send... sender.try_send((i, result_sender)).unwrap(); // Now should be awake assert!(coro.is_awake()); // Continue running assert_eq!(coro.as_mut().poll(), Poll::Pending); // Make sure they responded with the right result assert_eq!(result_receiver.try_recv(), Ok(Some(i * 2))); } // Close channel sender.close_channel(); // Now that the channel is closed, the future can run to completion assert_eq!(coro.as_mut().poll(), Poll::Ready(64)); } #[test] fn yield_test() { let coro = runner(async move { yield_().await; yield_().await; yield_().await; return 32; }); pin_mut!(coro); assert!(coro.is_awake()); assert_eq!(coro.as_mut().poll(), Poll::Pending); assert!(coro.is_awake()); assert_eq!(coro.as_mut().poll(), Poll::Pending); assert!(coro.is_awake()); assert_eq!(coro.as_mut().poll(), Poll::Pending); assert!(coro.is_awake()); assert_eq!(coro.as_mut().poll(), Poll::Ready(32)); } }