wit_bindgen/rt/async_support.rs
1#![deny(missing_docs)]
2
3use alloc::boxed::Box;
4use alloc::collections::BTreeMap;
5use alloc::sync::Arc;
6use alloc::task::Wake;
7use core::ffi::c_void;
8use core::future::Future;
9use core::mem;
10use core::pin::Pin;
11use core::ptr;
12use core::sync::atomic::{AtomicU32, Ordering};
13use core::task::{Context, Poll, Waker};
14
15macro_rules! rtdebug {
16 ($($f:tt)*) => {
17 // Change this flag to enable debugging, right now we're not using a
18 // crate like `log` or such to reduce runtime deps. Intended to be used
19 // during development for now.
20 if false {
21 #[cfg(feature = "std")]
22 std::eprintln!($($f)*);
23 }
24 }
25}
26
27/// Helper macro to deduplicate foreign definitions of wasm functions.
28///
29/// This automatically imports when on wasm targets and then defines a dummy
30/// panicking shim for native targets to support native compilation but fail at
31/// runtime.
32macro_rules! extern_wasm {
33 (
34 $(#[$extern_attr:meta])*
35 unsafe extern "C" {
36 $(
37 $(#[$func_attr:meta])*
38 $vis:vis fn $func_name:ident ( $($args:tt)* ) $(-> $ret:ty)?;
39 )*
40 }
41 ) => {
42 $(
43 #[cfg(not(target_family = "wasm"))]
44 #[allow(unused, reason = "dummy shim for non-wasm compilation, never invoked")]
45 $vis unsafe fn $func_name($($args)*) $(-> $ret)? {
46 unreachable!();
47 }
48 )*
49
50 #[cfg(target_family = "wasm")]
51 $(#[$extern_attr])*
52 unsafe extern "C" {
53 $(
54 $(#[$func_attr])*
55 $vis fn $func_name($($args)*) $(-> $ret)?;
56 )*
57 }
58 };
59}
60
61mod abi_buffer;
62mod cabi;
63mod error_context;
64mod future_support;
65#[cfg(feature = "futures-stream")]
66mod futures_stream;
67#[cfg(feature = "inter-task-wakeup")]
68mod inter_task_wakeup;
69mod stream_support;
70mod subtask;
71mod try_lock;
72#[cfg(feature = "inter-task-wakeup")]
73mod unit_stream;
74mod waitable;
75mod waitable_set;
76
77#[cfg(not(feature = "inter-task-wakeup"))]
78use inter_task_wakeup_disabled as inter_task_wakeup;
79#[cfg(not(feature = "inter-task-wakeup"))]
80mod inter_task_wakeup_disabled;
81
82use self::waitable_set::WaitableSet;
83pub use abi_buffer::*;
84pub use error_context::*;
85pub use future_support::*;
86#[cfg(feature = "futures-stream")]
87pub use futures_stream::*;
88pub use stream_support::*;
89#[doc(hidden)]
90pub use subtask::Subtask;
91#[cfg(feature = "inter-task-wakeup")]
92pub use unit_stream::*;
93
94type BoxFuture<'a> = Pin<Box<dyn Future<Output = ()> + 'a>>;
95
96#[cfg(feature = "async-spawn")]
97mod spawn;
98#[cfg(feature = "async-spawn")]
99pub use spawn::spawn_local;
100#[cfg(not(feature = "async-spawn"))]
101mod spawn_disabled;
102#[cfg(not(feature = "async-spawn"))]
103use spawn_disabled as spawn;
104
105/// Represents a task created by either a call to an async-lifted export or a
106/// future run using `block_on` or `start_task`.
107struct FutureState<'a> {
108 /// Remaining work to do (if any) before this task can be considered "done".
109 ///
110 /// Note that we won't tell the host the task is done until this is drained
111 /// and `waitables` is empty.
112 tasks: spawn::Tasks<'a>,
113
114 /// The waitable set containing waitables created by this task, if any.
115 waitable_set: Option<WaitableSet>,
116
117 /// State of all waitables in `waitable_set`, and the ptr/callback they're
118 /// associated with.
119 //
120 // Note that this is a `BTreeMap` rather than a `HashMap` only because, as
121 // of this writing, initializing the default hasher for `HashMap` requires
122 // calling `wasi_snapshot_preview1:random_get`, which requires initializing
123 // the `wasi_snapshot_preview1` adapter when targeting `wasm32-wasip2` and
124 // later, and that's expensive enough that we'd prefer to avoid it for apps
125 // which otherwise make no use of the adapter.
126 waitables: BTreeMap<u32, (*mut c_void, unsafe extern "C" fn(*mut c_void, u32))>,
127
128 /// Raw structure used to pass to `cabi::wasip3_task_set`
129 wasip3_task: cabi::wasip3_task,
130
131 /// Rust-level state for the waker, notably a bool as to whether this has
132 /// been woken.
133 waker: Arc<FutureWaker>,
134
135 /// Clone of `waker` field, but represented as `std::task::Waker`.
136 waker_clone: Waker,
137
138 /// State related to supporting inter-task wakeup scenarios.
139 inter_task_wakeup: inter_task_wakeup::State,
140}
141
142impl FutureState<'_> {
143 fn new(future: BoxFuture<'_>) -> FutureState<'_> {
144 let waker = Arc::new(FutureWaker::default());
145 FutureState {
146 waker_clone: waker.clone().into(),
147 waker,
148 tasks: spawn::Tasks::new(future),
149 waitable_set: None,
150 waitables: BTreeMap::new(),
151 wasip3_task: cabi::wasip3_task {
152 // This pointer is filled in before calling `wasip3_task_set`.
153 ptr: ptr::null_mut(),
154 version: cabi::WASIP3_TASK_V1,
155 waitable_register,
156 waitable_unregister,
157 },
158 inter_task_wakeup: Default::default(),
159 }
160 }
161
162 fn get_or_create_waitable_set(&mut self) -> &WaitableSet {
163 self.waitable_set.get_or_insert_with(WaitableSet::new)
164 }
165
166 fn add_waitable(&mut self, waitable: u32) {
167 self.get_or_create_waitable_set().join(waitable)
168 }
169
170 fn remove_waitable(&mut self, waitable: u32) {
171 WaitableSet::remove_waitable_from_all_sets(waitable)
172 }
173
174 fn remaining_work(&self) -> bool {
175 !self.waitables.is_empty()
176 }
177
178 /// Handles the `event{0,1,2}` event codes and returns a corresponding
179 /// return code along with a flag whether this future is "done" or not.
180 fn callback(&mut self, event0: u32, event1: u32, event2: u32) -> CallbackCode {
181 match event0 {
182 EVENT_NONE => rtdebug!("EVENT_NONE"),
183 EVENT_SUBTASK => rtdebug!("EVENT_SUBTASK({event1:#x}, {event2:#x})"),
184 EVENT_STREAM_READ => rtdebug!("EVENT_STREAM_READ({event1:#x}, {event2:#x})"),
185 EVENT_STREAM_WRITE => rtdebug!("EVENT_STREAM_WRITE({event1:#x}, {event2:#x})"),
186 EVENT_FUTURE_READ => rtdebug!("EVENT_FUTURE_READ({event1:#x}, {event2:#x})"),
187 EVENT_FUTURE_WRITE => rtdebug!("EVENT_FUTURE_WRITE({event1:#x}, {event2:#x})"),
188 EVENT_CANCEL => {
189 rtdebug!("EVENT_CANCEL");
190
191 // Cancellation is mapped to destruction in Rust, so return a
192 // code/bool indicating we're done. The caller will then
193 // appropriately deallocate this `FutureState` which will
194 // transitively run all destructors.
195 return CallbackCode::Exit;
196 }
197 _ => unreachable!(),
198 }
199
200 self.with_p3_task_set(|me| {
201 // Transition our sleep state to ensure that the inter-task stream
202 // isn't used since there's no need to use that here.
203 me.waker
204 .sleep_state
205 .store(SLEEP_STATE_WOKEN, Ordering::Relaxed);
206
207 // With all of our context now configured, deliver the event
208 // notification this callback corresponds to.
209 //
210 // Note that this should happen under the reset of
211 // `waker.sleep_state` above to ensure that if a waker is woken it
212 // won't actually signal our inter-task stream since we're already
213 // in the process of handling the future.
214 if event0 != EVENT_NONE {
215 me.deliver_waitable_event(event1, event2)
216 }
217
218 // If there's still an in-progress read (e.g. `event{1,2}`) wasn't
219 // ourselves getting woken up, then cancel the read since we're
220 // processing the future here anyway.
221 me.cancel_inter_task_stream_read();
222
223 loop {
224 let mut context = Context::from_waker(&me.waker_clone);
225
226 // On each turn of this loop reset the state to "polling"
227 // which clears out any pending wakeup if one was sent. This
228 // in theory helps minimize wakeups from previous iterations
229 // happening in this iteration.
230 me.waker
231 .sleep_state
232 .store(SLEEP_STATE_POLLING, Ordering::Relaxed);
233
234 // Poll our future, seeing if it was able to make progress.
235 let poll = me.tasks.poll_next(&mut context);
236
237 match poll {
238 // The task list is empty, but there might be remaining work
239 // in terms of waitables through the cabi interface. In this
240 // situation wait for all waitables to be resolved before
241 // signaling that our own task is done.
242 Poll::Ready(()) => {
243 assert!(me.tasks.is_empty());
244 if me.remaining_work() {
245 let waitable = me.waitable_set.as_ref().unwrap().as_raw();
246 break CallbackCode::Wait(waitable);
247 } else {
248 break CallbackCode::Exit;
249 }
250 }
251
252 // Some future within `self.tasks` is not ready yet. If our
253 // `waker` was signaled then that means this is a yield
254 // operation, otherwise it means we're blocking on
255 // something.
256 Poll::Pending => {
257 assert!(!me.tasks.is_empty());
258 if me.waker.sleep_state.load(Ordering::Relaxed) == SLEEP_STATE_WOKEN {
259 if me.remaining_work() {
260 let (event0, event1, event2) =
261 me.waitable_set.as_ref().unwrap().poll();
262 if event0 != EVENT_NONE {
263 me.deliver_waitable_event(event1, event2);
264 continue;
265 }
266 }
267 break CallbackCode::Yield;
268 }
269
270 // Transition our state to "sleeping" so wakeup
271 // notifications know that they need to signal the
272 // inter-task stream.
273 me.waker
274 .sleep_state
275 .store(SLEEP_STATE_SLEEPING, Ordering::Relaxed);
276 me.read_inter_task_stream();
277 let waitable = me.waitable_set.as_ref().unwrap().as_raw();
278 break CallbackCode::Wait(waitable);
279 }
280 }
281 }
282 })
283 }
284
285 /// Deliver the `code` event to the `waitable` store within our map. This
286 /// waitable should be present because it's part of the waitable set which
287 /// is kept in-sync with our map.
288 fn deliver_waitable_event(&mut self, waitable: u32, code: u32) {
289 self.remove_waitable(waitable);
290
291 if self
292 .inter_task_wakeup
293 .consume_waitable_event(waitable, code)
294 {
295 return;
296 }
297
298 let (ptr, callback) = self.waitables.remove(&waitable).unwrap();
299 unsafe {
300 callback(ptr, code);
301 }
302 }
303
304 fn with_p3_task_set<R>(&mut self, f: impl FnOnce(&mut Self) -> R) -> R {
305 // Finish our `wasip3_task` by initializing its self-referential pointer,
306 // and then register it for the duration of this function with
307 // `wasip3_task_set`. The previous value of `wasip3_task_set` will get
308 // restored when this function returns.
309 struct ResetTask(*mut cabi::wasip3_task);
310 impl Drop for ResetTask {
311 fn drop(&mut self) {
312 unsafe {
313 cabi::wasip3_task_set(self.0);
314 }
315 }
316 }
317 let self_raw = self as *mut FutureState<'_>;
318 self.wasip3_task.ptr = self_raw.cast();
319 let prev = unsafe { cabi::wasip3_task_set(&mut self.wasip3_task) };
320 let _reset = ResetTask(prev);
321
322 f(self)
323 }
324}
325
326impl Drop for FutureState<'_> {
327 fn drop(&mut self) {
328 // If there's an active read of the inter-task stream, go ahead and
329 // cancel it, since we're about to drop the stream anyway.
330 self.cancel_inter_task_stream_read();
331
332 // If this state has active tasks then they need to be dropped which may
333 // execute arbitrary code. This arbitrary code might require the p3 APIs
334 // for managing waitables, notably around removing them. In this
335 // situation we ensure that the p3 task is set while futures are being
336 // destroyed.
337 if !self.tasks.is_empty() {
338 self.with_p3_task_set(|me| {
339 me.tasks = Default::default();
340 })
341 }
342 }
343}
344
345unsafe extern "C" fn waitable_register(
346 ptr: *mut c_void,
347 waitable: u32,
348 callback: unsafe extern "C" fn(*mut c_void, u32),
349 callback_ptr: *mut c_void,
350) -> *mut c_void {
351 let ptr = ptr.cast::<FutureState<'static>>();
352 assert!(!ptr.is_null());
353 unsafe {
354 (*ptr).add_waitable(waitable);
355 match (*ptr).waitables.insert(waitable, (callback_ptr, callback)) {
356 Some((prev, _)) => prev,
357 None => ptr::null_mut(),
358 }
359 }
360}
361
362unsafe extern "C" fn waitable_unregister(ptr: *mut c_void, waitable: u32) -> *mut c_void {
363 let ptr = ptr.cast::<FutureState<'static>>();
364 assert!(!ptr.is_null());
365 unsafe {
366 (*ptr).remove_waitable(waitable);
367 match (*ptr).waitables.remove(&waitable) {
368 Some((prev, _)) => prev,
369 None => ptr::null_mut(),
370 }
371 }
372}
373
374/// Status for "this task is actively being polled"
375const SLEEP_STATE_POLLING: u32 = 0;
376/// Status for "this task has a wakeup scheduled, no more action need be taken".
377const SLEEP_STATE_WOKEN: u32 = 1;
378/// Status for "this task is not being polled and has not been woken"
379///
380/// Wakeups on this status signal the inter-task stream.
381const SLEEP_STATE_SLEEPING: u32 = 2;
382
383#[derive(Default)]
384struct FutureWaker {
385 /// One of `SLEEP_STATE_*` indicating the current status.
386 sleep_state: AtomicU32,
387 inter_task_stream: inter_task_wakeup::WakerState,
388}
389
390impl Wake for FutureWaker {
391 fn wake(self: Arc<Self>) {
392 Self::wake_by_ref(&self)
393 }
394
395 fn wake_by_ref(self: &Arc<Self>) {
396 match self.sleep_state.swap(SLEEP_STATE_WOKEN, Ordering::Relaxed) {
397 // If this future was currently being polled, or if someone else
398 // already woke it up, then there's nothing to do.
399 SLEEP_STATE_POLLING | SLEEP_STATE_WOKEN => {}
400
401 // If this future is sleeping, however, then this is a cross-task
402 // wakeup meaning that we need to write to its wakeup stream.
403 other => {
404 assert_eq!(other, SLEEP_STATE_SLEEPING);
405 self.inter_task_stream.wake();
406 }
407 }
408 }
409}
410
411const EVENT_NONE: u32 = 0;
412const EVENT_SUBTASK: u32 = 1;
413const EVENT_STREAM_READ: u32 = 2;
414const EVENT_STREAM_WRITE: u32 = 3;
415const EVENT_FUTURE_READ: u32 = 4;
416const EVENT_FUTURE_WRITE: u32 = 5;
417const EVENT_CANCEL: u32 = 6;
418
419#[derive(PartialEq, Debug)]
420enum CallbackCode {
421 Exit,
422 Yield,
423 Wait(u32),
424}
425
426impl CallbackCode {
427 fn encode(self) -> u32 {
428 match self {
429 CallbackCode::Exit => 0,
430 CallbackCode::Yield => 1,
431 CallbackCode::Wait(waitable) => 2 | (waitable << 4),
432 }
433 }
434}
435
436const STATUS_STARTING: u32 = 0;
437const STATUS_STARTED: u32 = 1;
438const STATUS_RETURNED: u32 = 2;
439const STATUS_STARTED_CANCELLED: u32 = 3;
440const STATUS_RETURNED_CANCELLED: u32 = 4;
441
442const BLOCKED: u32 = 0xffff_ffff;
443const COMPLETED: u32 = 0x0;
444const DROPPED: u32 = 0x1;
445const CANCELLED: u32 = 0x2;
446
447/// Return code of stream/future operations.
448#[derive(PartialEq, Debug, Copy, Clone)]
449enum ReturnCode {
450 /// The operation is blocked and has not completed.
451 Blocked,
452 /// The operation completed with the specified number of items.
453 Completed(u32),
454 /// The other end is dropped, but before that the specified number of items
455 /// were transferred.
456 Dropped(u32),
457 /// The operation was cancelled, but before that the specified number of
458 /// items were transferred.
459 Cancelled(u32),
460}
461
462impl ReturnCode {
463 fn decode(val: u32) -> ReturnCode {
464 if val == BLOCKED {
465 return ReturnCode::Blocked;
466 }
467 let amt = val >> 4;
468 match val & 0xf {
469 COMPLETED => ReturnCode::Completed(amt),
470 DROPPED => ReturnCode::Dropped(amt),
471 CANCELLED => ReturnCode::Cancelled(amt),
472 _ => panic!("unknown return code {val:#x}"),
473 }
474 }
475}
476
477/// Starts execution of the `task` provided, an asynchronous computation.
478///
479/// This is used for async-lifted exports at their definition site. The
480/// representation of the export is `task` and this function is called from the
481/// entrypoint. The code returned here is the same as the callback associated
482/// with this export, and the callback will be used if this task doesn't exit
483/// immediately with its result.
484#[doc(hidden)]
485pub fn start_task(task: impl Future<Output = ()> + 'static) -> i32 {
486 // Allocate a new `FutureState` which will track all state necessary for
487 // our exported task.
488 let state = Box::into_raw(Box::new(FutureState::new(Box::pin(task))));
489
490 // Store our `FutureState` into our context-local-storage slot and then
491 // pretend we got EVENT_NONE to kick off everything.
492 //
493 // SAFETY: we should own `context.set` as we're the root level exported
494 // task, and then `callback` is only invoked when context-local storage is
495 // valid.
496 unsafe {
497 assert!(context_get().is_null());
498 context_set(state.cast());
499 callback(EVENT_NONE, 0, 0) as i32
500 }
501}
502
503/// Handle a progress notification from the host regarding either a call to an
504/// async-lowered import or a stream/future read/write operation.
505///
506/// # Unsafety
507///
508/// This function assumes that `context_get()` returns a `FutureState`.
509#[doc(hidden)]
510pub unsafe fn callback(event0: u32, event1: u32, event2: u32) -> u32 {
511 // Acquire our context-local state, assert it's not-null, and then reset
512 // the state to null while we're running to help prevent any unintended
513 // usage.
514 let state = context_get().cast::<FutureState<'static>>();
515 assert!(!state.is_null());
516 unsafe {
517 context_set(ptr::null_mut());
518 }
519
520 // Use `state` to run the `callback` function in the context of our event
521 // codes we received. If the callback decides to exit then we're done with
522 // our future so deallocate it. Otherwise put our future back in
523 // context-local storage and forward the code.
524 unsafe {
525 let rc = (*state).callback(event0, event1, event2);
526 if rc == CallbackCode::Exit {
527 drop(Box::from_raw(state));
528 } else {
529 context_set(state.cast());
530 }
531 rtdebug!(" => (cb) {rc:?}");
532 rc.encode()
533 }
534}
535
536/// Run the specified future to completion, returning the result.
537///
538/// This uses `waitable-set.wait` to poll for progress on any in-progress calls
539/// to async-lowered imports as necessary.
540// TODO: refactor so `'static` bounds aren't necessary
541pub fn block_on<T: 'static>(future: impl Future<Output = T>) -> T {
542 let mut result = None;
543 let mut state = FutureState::new(Box::pin(async {
544 result = Some(future.await);
545 }));
546 let mut event = (EVENT_NONE, 0, 0);
547 loop {
548 match state.callback(event.0, event.1, event.2) {
549 CallbackCode::Exit => {
550 drop(state);
551 break result.unwrap();
552 }
553 CallbackCode::Yield => event = state.waitable_set.as_ref().unwrap().poll(),
554 CallbackCode::Wait(_) => event = state.waitable_set.as_ref().unwrap().wait(),
555 }
556 }
557}
558
559/// Call the `yield` canonical built-in function.
560///
561/// This yields control to the host temporarily, allowing other tasks to make
562/// progress. It's a good idea to call this inside a busy loop which does not
563/// otherwise ever yield control the host.
564///
565/// Note that this function is a blocking function, not an `async` function.
566/// That means that this is not an async yield which allows other tasks in this
567/// component to progress, but instead this will block the current function
568/// until the host gets back around to returning from this yield. Asynchronous
569/// functions should probably use [`yield_async`] instead.
570///
571/// # Return Value
572///
573/// This function returns a `bool` which indicates whether execution should
574/// continue after this yield point. A return value of `true` means that the
575/// task was not cancelled and execution should continue. A return value of
576/// `false`, however, means that the task was cancelled while it was suspended
577/// at this yield point. The caller should return back and exit from the task
578/// ASAP in this situation.
579pub fn yield_blocking() -> bool {
580 extern_wasm! {
581 #[link(wasm_import_module = "$root")]
582 unsafe extern "C" {
583 #[link_name = "[thread-yield]"]
584 fn yield_() -> bool;
585 }
586 }
587
588 // Note that the return value from the raw intrinsic is inverted, the
589 // canonical ABI returns "did this task get cancelled" while this function
590 // works as "should work continue going".
591 unsafe { !yield_() }
592}
593
594/// The asynchronous counterpart to [`yield_blocking`].
595///
596/// This function does not block the current task but instead gives the
597/// Rust-level executor a chance to yield control back to the host temporarily.
598/// This means that other Rust-level tasks may also be able to progress during
599/// this yield operation.
600///
601/// # Return Value
602///
603/// Unlike [`yield_blocking`] this function does not return anything. If this
604/// component task is cancelled while paused at this yield point then the future
605/// will be dropped and a Rust-level destructor will take over and clean up the
606/// task. It's not necessary to do anything with the return value of this
607/// function other than ensuring that you `.await` the function call.
608pub async fn yield_async() {
609 #[derive(Default)]
610 struct Yield {
611 yielded: bool,
612 }
613
614 impl Future for Yield {
615 type Output = ();
616
617 fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<()> {
618 if self.yielded {
619 Poll::Ready(())
620 } else {
621 self.yielded = true;
622 context.waker().wake_by_ref();
623 Poll::Pending
624 }
625 }
626 }
627
628 Yield::default().await;
629}
630
631/// Call the `backpressure.inc` canonical built-in function.
632pub fn backpressure_inc() {
633 extern_wasm! {
634 #[link(wasm_import_module = "$root")]
635 unsafe extern "C" {
636 #[link_name = "[backpressure-inc]"]
637 fn backpressure_inc();
638 }
639 }
640
641 unsafe { backpressure_inc() }
642}
643
644/// Call the `backpressure.dec` canonical built-in function.
645pub fn backpressure_dec() {
646 extern_wasm! {
647 #[link(wasm_import_module = "$root")]
648 unsafe extern "C" {
649 #[link_name = "[backpressure-dec]"]
650 fn backpressure_dec();
651 }
652 }
653
654 unsafe { backpressure_dec() }
655}
656
657fn context_get() -> *mut u8 {
658 extern_wasm! {
659 #[link(wasm_import_module = "$root")]
660 unsafe extern "C" {
661 #[link_name = "[context-get-0]"]
662 fn get() -> *mut u8;
663 }
664 }
665
666 unsafe { get() }
667}
668
669unsafe fn context_set(value: *mut u8) {
670 extern_wasm! {
671 #[link(wasm_import_module = "$root")]
672 unsafe extern "C" {
673 #[link_name = "[context-set-0]"]
674 fn set(value: *mut u8);
675 }
676 }
677
678 unsafe { set(value) }
679}
680
681#[doc(hidden)]
682pub struct TaskCancelOnDrop {
683 _priv: (),
684}
685
686impl TaskCancelOnDrop {
687 #[doc(hidden)]
688 pub fn new() -> TaskCancelOnDrop {
689 TaskCancelOnDrop { _priv: () }
690 }
691
692 #[doc(hidden)]
693 pub fn forget(self) {
694 mem::forget(self);
695 }
696}
697
698impl Drop for TaskCancelOnDrop {
699 fn drop(&mut self) {
700 extern_wasm! {
701 #[link(wasm_import_module = "[export]$root")]
702 unsafe extern "C" {
703 #[link_name = "[task-cancel]"]
704 fn cancel();
705 }
706 }
707
708 unsafe { cancel() }
709 }
710}