silkenweb_task/
lib.rs

1use clonelet::clone;
2use futures::{Future, FutureExt, StreamExt};
3use futures_signals::{
4    signal::{Mutable, ReadOnlyMutable, Signal, SignalExt},
5    signal_vec::{MutableVec, MutableVecLockMut, SignalVec, SignalVecExt, VecDiff},
6};
7use silkenweb_macros::cfg_browser;
8
9#[cfg_browser(false)]
10/// Server only task tools.
11pub mod server {
12    use std::{
13        pin::pin,
14        sync::Arc,
15        task::{Context, Poll, Wake},
16    };
17
18    use crossbeam::sync::{Parker, Unparker};
19    use futures::Future;
20
21    /// Synchronous version of [`run_tasks`][super::run_tasks].
22    ///
23    /// This is only available on the server.
24    pub fn run_tasks_sync() {
25        super::arch::run_tasks_sync()
26    }
27
28    struct ThreadWaker(Unparker);
29
30    impl Wake for ThreadWaker {
31        fn wake(self: Arc<Self>) {
32            self.0.unpark();
33        }
34    }
35
36    /// Run a future to completion on the current thread.
37    ///
38    /// This doesn't use the microtask executor, so it's safe to call
39    /// [run_tasks] from within the future. It's also safe to call `block_on`
40    /// recursively.
41    ///
42    /// [run_tasks]: super::run_tasks
43    pub fn block_on<T>(fut: impl Future<Output = T>) -> T {
44        let mut fut = pin!(fut);
45
46        // Use a `Parker` instance rather than global `thread::park/unpark`, so no one
47        // else can steal our `unpark`s and they don't get confused with recursive
48        // `block_on` `unpark`s.
49        let parker = Parker::new();
50        // Make sure we create a new waker each call, rather than using a global, so
51        // recursive `block_on`s don't use the same waker.
52        let waker = Arc::new(ThreadWaker(parker.unparker().clone())).into();
53        let mut cx = Context::from_waker(&waker);
54
55        // Run the future to completion.
56        loop {
57            match fut.as_mut().poll(&mut cx) {
58                Poll::Ready(res) => return res,
59                Poll::Pending => parker.park(),
60            }
61        }
62    }
63}
64
65#[cfg_browser(false)]
66mod arch {
67    use std::{cell::RefCell, future::Future};
68
69    use futures::{
70        executor::{LocalPool, LocalSpawner},
71        task::LocalSpawnExt,
72    };
73    use tokio::task_local;
74
75    pub struct Runtime {
76        executor: RefCell<LocalPool>,
77        spawner: LocalSpawner,
78    }
79
80    impl Default for Runtime {
81        fn default() -> Self {
82            let executor = RefCell::new(LocalPool::new());
83            let spawner = executor.borrow().spawner();
84
85            Self { executor, spawner }
86        }
87    }
88
89    task_local! {
90        pub static RUNTIME: Runtime;
91    }
92
93    fn with_runtime<R>(f: impl FnOnce(&Runtime) -> R) -> R {
94        match RUNTIME.try_with(f) {
95            Ok(r) => r,
96            Err(_) => panic!("Must be run from within `silkenweb_task::task::scope`"),
97        }
98    }
99
100    pub fn scope<Fut>(f: Fut) -> impl Future<Output = Fut::Output>
101    where
102        Fut: Future,
103    {
104        RUNTIME.scope(Runtime::default(), f)
105    }
106
107    pub fn sync_scope<F, R>(f: F) -> R
108    where
109        F: FnOnce() -> R,
110    {
111        RUNTIME.sync_scope(Runtime::default(), f)
112    }
113
114    pub async fn run_tasks() {
115        run_tasks_sync()
116    }
117
118    pub fn run_tasks_sync() {
119        with_runtime(|runtime| runtime.executor.borrow_mut().run_until_stalled())
120    }
121
122    pub fn spawn_local<F>(future: F)
123    where
124        F: Future<Output = ()> + 'static,
125    {
126        with_runtime(|runtime| runtime.spawner.spawn_local(future).unwrap())
127    }
128}
129
130#[cfg_browser(true)]
131mod arch {
132    use std::future::Future;
133
134    use js_sys::Promise;
135    use wasm_bindgen::{JsValue, UnwrapThrowExt};
136    use wasm_bindgen_futures::JsFuture;
137
138    pub fn scope<Fut>(f: Fut) -> impl Future<Output = Fut::Output>
139    where
140        Fut: Future,
141    {
142        f
143    }
144
145    pub fn sync_scope<F, R>(f: F) -> R
146    where
147        F: FnOnce() -> R,
148    {
149        f()
150    }
151
152    // Microtasks are run in the order they were queued in Javascript, so we just
153    // put a task on the queue and `await` it.
154    pub async fn run_tasks() {
155        let wait_for_microtasks = Promise::resolve(&JsValue::NULL);
156        JsFuture::from(wait_for_microtasks).await.unwrap_throw();
157    }
158
159    pub fn spawn_local<F>(future: F)
160    where
161        F: Future<Output = ()> + 'static,
162    {
163        wasm_bindgen_futures::spawn_local(future)
164    }
165}
166
167/// Run futures on the microtask queue, until no more progress can be
168/// made.
169///
170/// Don't call this from a future already on the microtask queue.
171pub async fn run_tasks() {
172    arch::run_tasks().await
173}
174
175/// Run a future with a local task queue.
176///
177/// On the server, this creates a [`tokio`] task local queue. You can put
178/// futures on this queue with [`spawn_local`].
179///
180/// On the browser, this does nothing and returns the original future.
181pub use arch::scope;
182/// Synchronous version of [`scope`].
183pub use arch::sync_scope;
184
185/// Spawn a future on the microtask queue.
186pub fn spawn_local<F>(future: F)
187where
188    F: Future<Output = ()> + 'static,
189{
190    arch::spawn_local(future)
191}
192
193/// [`Signal`] methods that require a task queue.
194pub trait TaskSignal: Signal {
195    /// Convert `self` to a [`Mutable`].
196    ///
197    /// This uses the microtask queue to spawn a future that drives the signal.
198    /// The resulting `Mutable` can be used to memoize the signal, allowing many
199    /// signals to be derived from it.
200    ///
201    /// # Example
202    ///
203    /// ```rust
204    /// # use futures_signals::signal::Mutable;
205    /// # use silkenweb_task::{sync_scope, server::run_tasks_sync, TaskSignal};
206    /// #
207    /// let source = Mutable::new(0);
208    /// let signal = source.signal();
209    ///
210    /// // A scope isn't required on browser platforms
211    /// sync_scope(|| {
212    ///     let copy = signal.to_mutable();
213    ///     assert_eq!(copy.get(), 0);
214    ///     source.set(1);
215    ///     run_tasks_sync();
216    ///     assert_eq!(copy.get(), 1);
217    /// });
218    /// ```
219    fn to_mutable(self) -> ReadOnlyMutable<Self::Item>;
220
221    /// Run `callback` on each signal value.
222    ///
223    /// The future is spawned on the microtask queue. This is equivalent to
224    /// `spawn_local(sig.for_each(callback))`.
225    fn spawn_for_each<U, F>(self, callback: F)
226    where
227        U: Future<Output = ()> + 'static,
228        F: FnMut(Self::Item) -> U + 'static;
229}
230
231impl<Sig> TaskSignal for Sig
232where
233    Sig: Signal + 'static,
234{
235    fn to_mutable(self) -> ReadOnlyMutable<Self::Item> {
236        let mut s = Box::pin(self.to_stream());
237        let first_value = s
238            .next()
239            .now_or_never()
240            .expect("A `Signal`'s initial value must be `Ready` immediately")
241            .expect("`Signal`s must have an initial value");
242        let mutable = Mutable::new(first_value);
243
244        spawn_local({
245            clone!(mutable);
246
247            async move {
248                while let Some(value) = s.next().await {
249                    mutable.set(value);
250                }
251            }
252        });
253
254        mutable.read_only()
255    }
256
257    fn spawn_for_each<U, F>(self, callback: F)
258    where
259        U: Future<Output = ()> + 'static,
260        F: FnMut(Self::Item) -> U + 'static,
261    {
262        spawn_local(self.for_each(callback));
263    }
264}
265
266/// [`SignalVec`] methods that require a task queue.
267pub trait TaskSignalVec: SignalVec {
268    /// Convert `self` to a [`MutableVec`].
269    ///
270    /// This uses the microtask queue to spawn a future that drives the signal.
271    /// The resulting `MutableVec` can be used to memoize the signal, allowing
272    /// many signals to be derived from it.
273    ///
274    /// # Example
275    ///
276    /// ```rust
277    /// # use futures_signals::signal_vec::MutableVec;
278    /// # use silkenweb_task::{sync_scope, server::run_tasks_sync, TaskSignalVec};
279    /// #
280    /// let source = MutableVec::new();
281    /// let signal = source.signal_vec();
282    ///
283    /// // A scope isn't required on browser platforms
284    /// sync_scope(|| {
285    ///     let copy = signal.to_mutable();
286    ///     assert!(copy.lock_ref().is_empty());
287    ///     source.lock_mut().push_cloned(1);
288    ///     run_tasks_sync();
289    ///     assert_eq!(*copy.lock_ref(), [1]);
290    /// });
291    /// ```
292    fn to_mutable(self) -> MutableVec<Self::Item>;
293
294    /// Run `callback` on each signal delta.
295    ///
296    /// The future is spawned on the microtask queue. This is equivalent to
297    /// `spawn_local(sig.for_each(callback))`.
298    fn spawn_for_each<U, F>(self, callback: F)
299    where
300        U: Future<Output = ()> + 'static,
301        F: FnMut(VecDiff<Self::Item>) -> U + 'static;
302}
303
304impl<Sig> TaskSignalVec for Sig
305where
306    Self::Item: Clone + 'static,
307    Sig: SignalVec + 'static,
308{
309    fn to_mutable(self) -> MutableVec<Self::Item> {
310        let mv = MutableVec::new();
311
312        self.spawn_for_each({
313            clone!(mv);
314
315            move |diff| {
316                MutableVecLockMut::apply_vec_diff(&mut mv.lock_mut(), diff);
317                async {}
318            }
319        });
320
321        mv
322    }
323
324    fn spawn_for_each<U, F>(self, callback: F)
325    where
326        U: Future<Output = ()> + 'static,
327        F: FnMut(VecDiff<Self::Item>) -> U + 'static,
328    {
329        spawn_local(self.for_each(callback));
330    }
331}