Skip to main content

whisker_runtime/
main_thread.rs

1//! Post a closure to the Lynx TASM thread (= Whisker's main thread).
2//!
3//! ## When to use this
4//!
5//! Background threads can compute values freely, but **`Signal::set` /
6//! `effect()` / any other reactive primitive must run on the main
7//! thread** — the reactive runtime is thread-local. The typical
8//! pattern for "fetch on a worker, render the result" is:
9//!
10//! ```ignore
11//! use whisker::prelude::*;
12//! use whisker::runtime::main_thread::run_on_main_thread;
13//!
14//! #[component]
15//! fn list_view() -> Element {
16//!     let data = RwSignal::new(None);
17//!
18//!     on_mount(move || {
19//!         std::thread::spawn(move || {
20//!             // worker thread: blocking work, no signal access
21//!             let result = fetch_http_blocking("https://...");
22//!
23//!             // marshal result back to the main thread
24//!             run_on_main_thread(move || {
25//!                 data.set(Some(result));
26//!             });
27//!         });
28//!     });
29//!
30//!     render! { /* ... */ }
31//! }
32//! ```
33//!
34//! ## Why not `spawn_local` / async?
35//!
36//! `spawn_local` (Leptos, wasm-bindgen-futures, Tauri, …) is a
37//! main-thread async executor: it takes a `Future` and polls it on
38//! the UI thread. `run_on_main_thread` is the simpler primitive on
39//! the other side of the boundary — it takes a plain `FnOnce` and
40//! posts it to the main-thread queue. The same idea as Android's
41//! `Activity.runOnUiThread(r)`, iOS's `DispatchQueue.main.async {}`,
42//! Slint's `invoke_from_event_loop`, or gtk-rs's
43//! `MainContext::invoke`.
44//!
45//! Whisker doesn't run an async executor on the main thread (yet),
46//! so we expose only the marshaling primitive. If A4 lands a
47//! single-threaded executor later, `spawn_local` will sit on top of
48//! this same dispatcher.
49//!
50//! ## How it routes
51//!
52//! `whisker-driver`'s bootstrap registers a dispatcher
53//! ([`set_main_thread_dispatcher`]) that ultimately calls Lynx's
54//! `lynx_shell_run_on_tasm_thread` C API. The closure is boxed, the
55//! pointer is handed across the C ABI as opaque `user_data`, and a
56//! trampoline unboxes + invokes it on the TASM thread.
57
58use std::ffi::c_void;
59use std::sync::Mutex;
60
61/// Function-pointer signature of the host-provided dispatcher. Matches
62/// the C ABI of `whisker_bridge_dispatch` after erasing the engine
63/// pointer type to `*mut c_void` (so this crate doesn't depend on
64/// `whisker-driver-sys`).
65pub type DispatchFn = extern "C" fn(
66    engine: *mut c_void,
67    callback: extern "C" fn(*mut c_void),
68    user_data: *mut c_void,
69) -> bool;
70
71/// Snapshot of the registered dispatcher. Stored globally so any
72/// thread can call [`run_on_main_thread`] without thread-local
73/// access.
74#[derive(Copy, Clone)]
75struct Dispatcher {
76    func: DispatchFn,
77    engine: *mut c_void,
78}
79
80/// SAFETY: the engine pointer is an opaque handle owned by the host;
81/// the host's contract for `lynx_shell_run_on_tasm_thread` is "safe
82/// to call from any thread". The dispatcher itself is a fn pointer.
83unsafe impl Send for Dispatcher {}
84unsafe impl Sync for Dispatcher {}
85
86static DISPATCHER: Mutex<Option<Dispatcher>> = Mutex::new(None);
87
88/// Optional "drive the runtime now" callback, registered by
89/// `whisker-driver::bootstrap`. When set, the [`trampoline`] invokes
90/// it (on the main thread, right after the marshaled closure runs)
91/// instead of merely requesting a vsync frame. The callback runs the
92/// driver's `tick_frame` — flush + drain the task pool + flush +
93/// mounts + renderer flush — so an async completion that was just
94/// marshaled onto the main thread is drained and painted immediately,
95/// on this main-run-loop post, with the vsync render loop untouched.
96///
97/// This is the proper fix for the resource hang: the worker's result
98/// is delivered via the host's main-thread dispatch (CFRunLoop /
99/// Looper), which the OS services even while CADisplayLink /
100/// Choreographer is paused, and we DRIVE the consequence here rather
101/// than racing an unpause of the paused vsync loop.
102///
103/// A plain `extern "C" fn()` pointer — no `user_data` needed; the
104/// driver's `tick_frame` reads its own thread-locals.
105static DRIVE: Mutex<Option<extern "C" fn()>> = Mutex::new(None);
106
107std::thread_local! {
108    /// Re-entrancy depth for main-thread render/tick/drive work.
109    ///
110    /// The [`trampoline`] runs the driver's `tick_frame` directly on a
111    /// main-loop post. That is correct when the host dispatcher genuinely
112    /// *posts* the trampoline to a later run-loop turn. But some
113    /// dispatchers (Lynx's `run_on_tasm_thread`, iOS `Thread.isMainThread`
114    /// fast paths) invoke it **inline** when called from the TASM thread.
115    /// If `run_on_main_thread` is called from inside the initial render or
116    /// a `tick_frame` (e.g. a module's startup wiring), an inline trampoline
117    /// would re-enter `tick_frame` while the renderer/reactive runtime is
118    /// already active — a re-entrant borrow that aborts. This depth lets the
119    /// trampoline detect that nesting and DEFER (request a vsync frame)
120    /// instead of re-entering.
121    static MAIN_WORK_DEPTH: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
122}
123
124/// RAII guard marking that main-thread render/tick/drive work is in
125/// progress. The driver wraps `init_callback`'s initial render and every
126/// `tick_frame` in one, so a re-entrant `run_on_main_thread` dispatch
127/// defers (via the trampoline) instead of nesting.
128pub struct MainWorkGuard(());
129
130impl MainWorkGuard {
131    pub fn new() -> Self {
132        MAIN_WORK_DEPTH.with(|d| d.set(d.get() + 1));
133        MainWorkGuard(())
134    }
135}
136
137impl Default for MainWorkGuard {
138    fn default() -> Self {
139        Self::new()
140    }
141}
142
143impl Drop for MainWorkGuard {
144    fn drop(&mut self) {
145        MAIN_WORK_DEPTH.with(|d| d.set(d.get().saturating_sub(1)));
146    }
147}
148
149/// True while a [`MainWorkGuard`] is alive on this thread — i.e. we are
150/// already inside whisker render/tick/drive work and must not re-enter it.
151fn main_work_in_progress() -> bool {
152    MAIN_WORK_DEPTH.with(|d| d.get()) > 0
153}
154
155/// Register the host's main-thread dispatcher. Called once from
156/// `whisker-driver::bootstrap` during init. Pass `None` for `func`
157/// to clear (used in tests).
158#[doc(hidden)]
159pub fn set_main_thread_dispatcher(func: Option<DispatchFn>, engine: *mut c_void) {
160    let built = func.map(|func| Dispatcher { func, engine });
161    if let Ok(mut guard) = DISPATCHER.lock() {
162        *guard = built;
163    }
164}
165
166/// Register the "drive the runtime now" callback (see [`DRIVE`]).
167/// Called once from `whisker-driver::bootstrap` during init. Pass
168/// `None` to clear (used in tests).
169#[doc(hidden)]
170pub fn set_drive_callback(cb: Option<extern "C" fn()>) {
171    if let Ok(mut guard) = DRIVE.lock() {
172        *guard = cb;
173    }
174}
175
176/// Schedule `f` to run on the Whisker main thread (= Lynx TASM
177/// thread) as soon as it services its next message. Safe to call
178/// from any thread.
179///
180/// `f` runs asynchronously — this function returns immediately. The
181/// closure is dropped without running if no dispatcher is registered
182/// yet (i.e. before bootstrap completes). In that pre-bootstrap
183/// window the call is a no-op; debug builds log a warning.
184///
185/// Inside `f`, the reactive runtime is fully accessible: signal
186/// writes, effect registrations, context lookups all work as if you
187/// were inside an event handler. Writes that mark new dependencies
188/// dirty will wake the host's render loop automatically (via
189/// `host_wake::wake_runtime` from the scheduler).
190///
191/// After `f` runs, the [`trampoline`] DRIVES the runtime directly (via
192/// the registered [`set_drive_callback`]) on this same main-thread
193/// post, so an async result marshaled here (the `run_blocking` /
194/// `resource()` path) is drained and painted immediately — see
195/// [`trampoline`]'s comment for why this beats requesting a vsync
196/// frame.
197pub fn run_on_main_thread<F>(f: F)
198where
199    F: FnOnce() + Send + 'static,
200{
201    let dispatcher = match DISPATCHER.lock().ok().and_then(|g| *g) {
202        Some(d) => d,
203        None => {
204            #[cfg(debug_assertions)]
205            eprintln!(
206                "whisker-runtime: run_on_main_thread called before dispatcher \
207                 registration; closure dropped"
208            );
209            return;
210        }
211    };
212
213    // Double-box: the outer `Box<...>` is what we hand across the C
214    // ABI as a raw pointer; the inner `Box<dyn FnOnce>` is what
215    // makes the closure type-erased and sized (dyn FnOnce is
216    // unsized). The trampoline unboxes both layers and invokes.
217    let boxed: Box<Box<dyn FnOnce() + Send + 'static>> = Box::new(Box::new(f));
218    let user_data = Box::into_raw(boxed) as *mut c_void;
219
220    let ok = (dispatcher.func)(dispatcher.engine, trampoline, user_data);
221    if !ok {
222        // Dispatch refused (typically: engine torn down). Reclaim
223        // the box so we don't leak the closure.
224        let _: Box<Box<dyn FnOnce() + Send + 'static>> =
225            unsafe { Box::from_raw(user_data as *mut Box<dyn FnOnce() + Send + 'static>) };
226    }
227}
228
229/// Static C-ABI fn the dispatcher invokes on the TASM thread.
230extern "C" fn trampoline(user_data: *mut c_void) {
231    if user_data.is_null() {
232        return;
233    }
234    // SAFETY: `run_on_main_thread` is the only producer of
235    // `user_data` and it always boxes a `Box<dyn FnOnce>` here.
236    let boxed: Box<Box<dyn FnOnce() + Send + 'static>> =
237        unsafe { Box::from_raw(user_data as *mut Box<dyn FnOnce() + Send + 'static>) };
238    boxed();
239    // We're now on the MAIN thread (this trampoline ran via the host's
240    // main-thread dispatch — a real CFRunLoop / Looper post, which the
241    // OS services even while the vsync render loop is paused). The
242    // closure we just ran is typically `tx.send(value)` from a
243    // `run_blocking` worker (the `resource()` path): it woke the
244    // awaiting future's `Waker`, re-queuing it in `LocalPool`. But
245    // `LocalPool` only re-polls when `run_until_stalled` runs, which
246    // happens inside the driver's `tick_frame`.
247    //
248    // If a drive callback is registered (production + the
249    // `cross_thread_wake` tests), invoke it: it runs `tick_frame` HERE,
250    // on this main-loop post — draining the pool, flushing, and
251    // painting the fetch's consequences immediately. The vsync loop is
252    // untouched, so there is NO race against an end-of-frame pause and
253    // NO need to busy-tick. This is the proper fix for the resource
254    // hang (was: request a vsync frame, which races the paused
255    // CADisplayLink/Choreographer and is silently clobbered).
256    //
257    // Fall back to `wake_runtime()` (request a vsync frame) when no
258    // drive callback is wired — e.g. tests that don't model the driver.
259    //
260    // RE-ENTRANCY: if this trampoline was invoked INLINE by the host
261    // dispatcher while we're already inside the initial render or a
262    // `tick_frame` (some dispatchers run same-thread posts synchronously),
263    // running `tick_frame` again would re-enter the renderer/reactive
264    // runtime and abort. In that case, defer via a vsync frame request
265    // instead — the deferred tick drains the pool on the next frame.
266    if main_work_in_progress() {
267        crate::host_wake::wake_runtime();
268        return;
269    }
270    let drive = DRIVE.lock().ok().and_then(|g| *g);
271    match drive {
272        Some(cb) => cb(),
273        None => crate::host_wake::wake_runtime(),
274    }
275}
276
277/// (Test only) clear the registered dispatcher and drive callback.
278#[doc(hidden)]
279pub fn __reset_for_tests() {
280    if let Ok(mut guard) = DISPATCHER.lock() {
281        *guard = None;
282    }
283    if let Ok(mut guard) = DRIVE.lock() {
284        *guard = None;
285    }
286}
287
288/// Shared serialisation lock for every test that touches the
289/// process-global host wiring (the main-thread dispatcher in this
290/// module and the frame-request callback in [`crate::host_wake`]).
291///
292/// These globals are reset/installed by tests across SEVERAL modules
293/// (`main_thread`, `tasks`, `reactive::tests_resource`). A per-module
294/// lock can't keep them from racing — module A could clear the
295/// dispatcher mid-fetch in module B, dropping the marshaled result.
296/// All such tests take THIS one lock instead.
297#[cfg(test)]
298pub(crate) fn host_test_lock<'a>() -> std::sync::MutexGuard<'a, ()> {
299    static HOST_TEST_LOCK: Mutex<()> = Mutex::new(());
300    HOST_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner())
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
307    use std::sync::{Arc, MutexGuard};
308
309    /// Tests poke process-global host state (the registered
310    /// dispatcher), so they must run one at a time AND not race tests
311    /// in sibling modules that touch the same globals — hence the
312    /// shared [`super::host_test_lock`].
313    fn lock<'a>() -> MutexGuard<'a, ()> {
314        super::host_test_lock()
315    }
316
317    /// Pretend-host dispatcher: invokes the callback synchronously on
318    /// the caller's thread. Good enough to verify the trampoline /
319    /// boxing / unbox cycle without spawning a real second thread.
320    extern "C" fn sync_invoke(
321        _engine: *mut c_void,
322        callback: extern "C" fn(*mut c_void),
323        user_data: *mut c_void,
324    ) -> bool {
325        callback(user_data);
326        true
327    }
328
329    /// Dispatcher that simulates a failure (engine torn down).
330    extern "C" fn refuse(
331        _engine: *mut c_void,
332        _callback: extern "C" fn(*mut c_void),
333        _user_data: *mut c_void,
334    ) -> bool {
335        false
336    }
337
338    fn install(func: DispatchFn) {
339        __reset_for_tests();
340        set_main_thread_dispatcher(Some(func), std::ptr::null_mut());
341    }
342
343    #[test]
344    fn closure_runs_when_dispatcher_installed() {
345        let _guard = lock();
346        install(sync_invoke);
347        let ran = Arc::new(AtomicBool::new(false));
348        let ran_clone = ran.clone();
349        run_on_main_thread(move || {
350            ran_clone.store(true, Ordering::SeqCst);
351        });
352        assert!(ran.load(Ordering::SeqCst), "closure must have run");
353        __reset_for_tests();
354    }
355
356    #[test]
357    fn closure_dropped_when_no_dispatcher() {
358        let _guard = lock();
359        __reset_for_tests();
360        // Closure captures a state we can observe via Drop.
361        struct DropFlag(Arc<AtomicBool>);
362        impl Drop for DropFlag {
363            fn drop(&mut self) {
364                self.0.store(true, Ordering::SeqCst);
365            }
366        }
367        let dropped = Arc::new(AtomicBool::new(false));
368        let flag = DropFlag(dropped.clone());
369        run_on_main_thread(move || {
370            // Move `flag` in; if the closure is dropped without
371            // running, `flag` is also dropped.
372            let _ = &flag;
373        });
374        assert!(
375            dropped.load(Ordering::SeqCst),
376            "closure (and captured state) must be dropped when no dispatcher is set"
377        );
378    }
379
380    #[test]
381    fn closure_dropped_on_dispatch_failure() {
382        let _guard = lock();
383        install(refuse);
384        struct DropFlag(Arc<AtomicBool>);
385        impl Drop for DropFlag {
386            fn drop(&mut self) {
387                self.0.store(true, Ordering::SeqCst);
388            }
389        }
390        let dropped = Arc::new(AtomicBool::new(false));
391        let flag = DropFlag(dropped.clone());
392        run_on_main_thread(move || {
393            let _ = &flag;
394        });
395        assert!(
396            dropped.load(Ordering::SeqCst),
397            "closure must be dropped when dispatcher refuses"
398        );
399        __reset_for_tests();
400    }
401
402    #[test]
403    fn multiple_dispatches_each_run_once() {
404        let _guard = lock();
405        install(sync_invoke);
406        let counter = Arc::new(AtomicUsize::new(0));
407        for _ in 0..5 {
408            let c = counter.clone();
409            run_on_main_thread(move || {
410                c.fetch_add(1, Ordering::SeqCst);
411            });
412        }
413        assert_eq!(counter.load(Ordering::SeqCst), 5);
414        __reset_for_tests();
415    }
416}