fp_bindgen_support/guest/async/
task.rs

1// This file is a modified version from wasm-bindgen-futures.
2// See: https://github.com/rustwasm/wasm-bindgen/blob/master/crates/futures/src/task/singlethread.rs
3// Licensed under Apache/MIT
4
5use serde::Serialize;
6use std::cell::{Cell, RefCell};
7use std::future::Future;
8use std::mem::ManuallyDrop;
9use std::pin::Pin;
10use std::rc::Rc;
11use std::task::{Context, RawWaker, RawWakerVTable, Waker};
12
13use crate::common::mem::{to_fat_ptr, FatPtr};
14use crate::common::r#async::AsyncValue;
15use crate::guest::io::export_value_to_host;
16
17use super::host_resolve_async_value;
18
19struct Inner {
20    future: Pin<Box<dyn Future<Output = ()> + 'static>>,
21    waker: Waker,
22}
23
24pub struct Task {
25    // The actual Future that we're executing as part of this task.
26    //
27    // This is an Option so that the Future can be immediately dropped when it's
28    // finished
29    inner: RefCell<Option<Inner>>,
30
31    // This is used to ensure that the Task will only be queued once
32    is_queued: Cell<bool>,
33}
34
35impl Task {
36    pub fn spawn(future: Pin<Box<dyn Future<Output = ()> + 'static>>) {
37        let this = Rc::new(Self {
38            inner: RefCell::new(None),
39            is_queued: Cell::new(false),
40        });
41
42        let waker = unsafe { Waker::from_raw(Task::into_raw_waker(Rc::clone(&this))) };
43
44        *this.inner.borrow_mut() = Some(Inner { future, waker });
45
46        Task::wake_by_ref(&this);
47    }
48
49    pub fn alloc_and_spawn<FUT, RET>(future: FUT) -> FatPtr
50    where
51        FUT: Future<Output = RET> + 'static,
52        RET: Serialize,
53    {
54        let layout = std::alloc::Layout::new::<AsyncValue>();
55        let len = layout.size() as u32;
56        let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
57        let fat_ptr = to_fat_ptr(ptr, len);
58
59        Task::spawn(Box::pin(async move {
60            let ret = future.await;
61            let result_ptr = export_value_to_host(&ret);
62            host_resolve_async_value(fat_ptr, result_ptr);
63        }));
64
65        fat_ptr
66    }
67
68    fn wake_by_ref(this: &Rc<Self>) {
69        // If we've already been placed on the run queue then there's no need to
70        // requeue ourselves since we're going to run at some point in the
71        // future anyway.
72        if this.is_queued.replace(true) {
73            return;
74        }
75
76        super::queue::push_task(Rc::clone(this));
77    }
78
79    /// Creates a standard library `RawWaker` from an `Rc` of ourselves.
80    ///
81    /// Note that in general this is wildly unsafe because everything with
82    /// Futures requires `Sync` + `Send` with regard to Wakers. For wasm,
83    /// however, everything is guaranteed to be singlethreaded (since we're
84    /// compiled without the `atomics` feature) so we "safely lie" and say our
85    /// `Rc` pointer is good enough.
86    unsafe fn into_raw_waker(this: Rc<Self>) -> RawWaker {
87        unsafe fn raw_clone(ptr: *const ()) -> RawWaker {
88            let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
89            Task::into_raw_waker((*ptr).clone())
90        }
91
92        unsafe fn raw_wake(ptr: *const ()) {
93            let ptr = Rc::from_raw(ptr as *const Task);
94            Task::wake_by_ref(&ptr);
95        }
96
97        unsafe fn raw_wake_by_ref(ptr: *const ()) {
98            let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
99            Task::wake_by_ref(&ptr);
100        }
101
102        unsafe fn raw_drop(ptr: *const ()) {
103            drop(Rc::from_raw(ptr as *const Task));
104        }
105
106        const VTABLE: RawWakerVTable =
107            RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop);
108
109        RawWaker::new(Rc::into_raw(this) as *const (), &VTABLE)
110    }
111
112    pub(crate) fn run(&self) {
113        let mut borrow = self.inner.borrow_mut();
114
115        // Wakeups can come in after a Future has finished and been destroyed,
116        // so handle this gracefully by just ignoring the request to run.
117        let inner = match borrow.as_mut() {
118            Some(inner) => inner,
119            None => return,
120        };
121
122        // Ensure that if poll calls `waker.wake()` we can get enqueued back on
123        // the run queue.
124        self.is_queued.set(false);
125
126        let poll = {
127            let mut cx = Context::from_waker(&inner.waker);
128            inner.future.as_mut().poll(&mut cx)
129        };
130
131        // If a future has finished (`Ready`) then clean up resources associated
132        // with the future ASAP. This ensures that we don't keep anything extra
133        // alive in-memory by accident. Our own struct, `Rc<Task>` won't
134        // actually go away until all wakers referencing us go away, which may
135        // take quite some time, so ensure that the heaviest of resources are
136        // released early.
137        if poll.is_ready() {
138            *borrow = None;
139        }
140    }
141}