whisker_runtime/main_thread.rs
1//! Post a closure to the Lynx TASM thread (= Whisker's main thread).
2//!
3//! ## When to use this
4//!
5//! Background threads can compute values freely, but **`Signal::set` /
6//! `effect()` / any other reactive primitive must run on the main
7//! thread** — the reactive runtime is thread-local. The typical
8//! pattern for "fetch on a worker, render the result" is:
9//!
10//! ```ignore
11//! use whisker::prelude::*;
12//! use whisker::runtime::main_thread::run_on_main_thread;
13//!
14//! #[component]
15//! fn list_view() -> Element {
16//! let data = RwSignal::new(None);
17//!
18//! on_mount(move || {
19//! std::thread::spawn(move || {
20//! // worker thread: blocking work, no signal access
21//! let result = fetch_http_blocking("https://...");
22//!
23//! // marshal result back to the main thread
24//! run_on_main_thread(move || {
25//! data.set(Some(result));
26//! });
27//! });
28//! });
29//!
30//! render! { /* ... */ }
31//! }
32//! ```
33//!
34//! ## Why not `spawn_local` / async?
35//!
36//! `spawn_local` (Leptos, wasm-bindgen-futures, Tauri, …) is a
37//! main-thread async executor: it takes a `Future` and polls it on
38//! the UI thread. `run_on_main_thread` is the simpler primitive on
39//! the other side of the boundary — it takes a plain `FnOnce` and
40//! posts it to the main-thread queue. The same idea as Android's
41//! `Activity.runOnUiThread(r)`, iOS's `DispatchQueue.main.async {}`,
42//! Slint's `invoke_from_event_loop`, or gtk-rs's
43//! `MainContext::invoke`.
44//!
45//! Whisker doesn't run an async executor on the main thread (yet),
46//! so we expose only the marshaling primitive. If A4 lands a
47//! single-threaded executor later, `spawn_local` will sit on top of
48//! this same dispatcher.
49//!
50//! ## How it routes
51//!
52//! `whisker-driver`'s bootstrap registers a dispatcher
53//! ([`set_main_thread_dispatcher`]) that ultimately calls Lynx's
54//! `lynx_shell_run_on_tasm_thread` C API. The closure is boxed, the
55//! pointer is handed across the C ABI as opaque `user_data`, and a
56//! trampoline unboxes + invokes it on the TASM thread.
57
58use std::ffi::c_void;
59use std::sync::Mutex;
60
61/// Function-pointer signature of the host-provided dispatcher. Matches
62/// the C ABI of `whisker_bridge_dispatch` after erasing the engine
63/// pointer type to `*mut c_void` (so this crate doesn't depend on
64/// `whisker-driver-sys`).
65pub type DispatchFn = extern "C" fn(
66 engine: *mut c_void,
67 callback: extern "C" fn(*mut c_void),
68 user_data: *mut c_void,
69) -> bool;
70
71/// Snapshot of the registered dispatcher. Stored globally so any
72/// thread can call [`run_on_main_thread`] without thread-local
73/// access.
74#[derive(Copy, Clone)]
75struct Dispatcher {
76 func: DispatchFn,
77 engine: *mut c_void,
78}
79
80/// SAFETY: the engine pointer is an opaque handle owned by the host;
81/// the host's contract for `lynx_shell_run_on_tasm_thread` is "safe
82/// to call from any thread". The dispatcher itself is a fn pointer.
83unsafe impl Send for Dispatcher {}
84unsafe impl Sync for Dispatcher {}
85
86static DISPATCHER: Mutex<Option<Dispatcher>> = Mutex::new(None);
87
88/// Register the host's main-thread dispatcher. Called once from
89/// `whisker-driver::bootstrap` during init. Pass `None` for `func`
90/// to clear (used in tests).
91#[doc(hidden)]
92pub fn set_main_thread_dispatcher(func: Option<DispatchFn>, engine: *mut c_void) {
93 let built = func.map(|func| Dispatcher { func, engine });
94 if let Ok(mut guard) = DISPATCHER.lock() {
95 *guard = built;
96 }
97}
98
99/// Schedule `f` to run on the Whisker main thread (= Lynx TASM
100/// thread) as soon as it services its next message. Safe to call
101/// from any thread.
102///
103/// `f` runs asynchronously — this function returns immediately. The
104/// closure is dropped without running if no dispatcher is registered
105/// yet (i.e. before bootstrap completes). In that pre-bootstrap
106/// window the call is a no-op; debug builds log a warning.
107///
108/// Inside `f`, the reactive runtime is fully accessible: signal
109/// writes, effect registrations, context lookups all work as if you
110/// were inside an event handler. Writes that mark new dependencies
111/// dirty will wake the host's render loop automatically (via
112/// `host_wake::wake_runtime` from the scheduler).
113pub fn run_on_main_thread<F>(f: F)
114where
115 F: FnOnce() + Send + 'static,
116{
117 let dispatcher = match DISPATCHER.lock().ok().and_then(|g| *g) {
118 Some(d) => d,
119 None => {
120 #[cfg(debug_assertions)]
121 eprintln!(
122 "whisker-runtime: run_on_main_thread called before dispatcher \
123 registration; closure dropped"
124 );
125 return;
126 }
127 };
128
129 // Double-box: the outer `Box<...>` is what we hand across the C
130 // ABI as a raw pointer; the inner `Box<dyn FnOnce>` is what
131 // makes the closure type-erased and sized (dyn FnOnce is
132 // unsized). The trampoline unboxes both layers and invokes.
133 let boxed: Box<Box<dyn FnOnce() + Send + 'static>> = Box::new(Box::new(f));
134 let user_data = Box::into_raw(boxed) as *mut c_void;
135
136 let ok = (dispatcher.func)(dispatcher.engine, trampoline, user_data);
137 if !ok {
138 // Dispatch refused (typically: engine torn down). Reclaim
139 // the box so we don't leak the closure.
140 let _: Box<Box<dyn FnOnce() + Send + 'static>> =
141 unsafe { Box::from_raw(user_data as *mut Box<dyn FnOnce() + Send + 'static>) };
142 }
143}
144
145/// Static C-ABI fn the dispatcher invokes on the TASM thread.
146extern "C" fn trampoline(user_data: *mut c_void) {
147 if user_data.is_null() {
148 return;
149 }
150 // SAFETY: `run_on_main_thread` is the only producer of
151 // `user_data` and it always boxes a `Box<dyn FnOnce>` here.
152 let boxed: Box<Box<dyn FnOnce() + Send + 'static>> =
153 unsafe { Box::from_raw(user_data as *mut Box<dyn FnOnce() + Send + 'static>) };
154 boxed();
155 // Wake the runtime so the host schedules another tick. Without
156 // this, a worker thread that calls `run_on_main_thread(|| tx.send(v))`
157 // (the `run_blocking` path inside `resource()`) would wake the
158 // awaiting future's `Waker` via `tx.send`, which re-queues the
159 // future in `LocalPool` — but `LocalPool` only re-polls when
160 // `run_until_stalled` is invoked, which the driver only does
161 // from the tick callback, which only fires when CADisplayLink is
162 // unpaused, which only happens via `request_frame`. So unless we
163 // explicitly request a frame here, the awaiting future sleeps
164 // forever and `Resource::state` stays at `Loading` even though
165 // the worker thread finished. This was the hn-reader
166 // "Loading top stories never transitions" bug.
167 crate::host_wake::wake_runtime();
168}
169
170/// (Test only) clear the registered dispatcher.
171#[doc(hidden)]
172pub fn __reset_for_tests() {
173 if let Ok(mut guard) = DISPATCHER.lock() {
174 *guard = None;
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
182 use std::sync::{Arc, Mutex, MutexGuard};
183
184 /// Tests poke a shared global (the registered dispatcher), so they
185 /// must run one at a time. `cargo test` defaults to parallel test
186 /// threads — this lock serialises just the tests in this module.
187 static TEST_LOCK: Mutex<()> = Mutex::new(());
188
189 fn lock<'a>() -> MutexGuard<'a, ()> {
190 // Unwrap on poison: a poisoned lock means a previous test panicked
191 // mid-dispatch — re-running on top of that is fine because we
192 // reset state at the start of every test anyway.
193 TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner())
194 }
195
196 /// Pretend-host dispatcher: invokes the callback synchronously on
197 /// the caller's thread. Good enough to verify the trampoline /
198 /// boxing / unbox cycle without spawning a real second thread.
199 extern "C" fn sync_invoke(
200 _engine: *mut c_void,
201 callback: extern "C" fn(*mut c_void),
202 user_data: *mut c_void,
203 ) -> bool {
204 callback(user_data);
205 true
206 }
207
208 /// Dispatcher that simulates a failure (engine torn down).
209 extern "C" fn refuse(
210 _engine: *mut c_void,
211 _callback: extern "C" fn(*mut c_void),
212 _user_data: *mut c_void,
213 ) -> bool {
214 false
215 }
216
217 fn install(func: DispatchFn) {
218 __reset_for_tests();
219 set_main_thread_dispatcher(Some(func), std::ptr::null_mut());
220 }
221
222 #[test]
223 fn closure_runs_when_dispatcher_installed() {
224 let _guard = lock();
225 install(sync_invoke);
226 let ran = Arc::new(AtomicBool::new(false));
227 let ran_clone = ran.clone();
228 run_on_main_thread(move || {
229 ran_clone.store(true, Ordering::SeqCst);
230 });
231 assert!(ran.load(Ordering::SeqCst), "closure must have run");
232 __reset_for_tests();
233 }
234
235 #[test]
236 fn closure_dropped_when_no_dispatcher() {
237 let _guard = lock();
238 __reset_for_tests();
239 // Closure captures a state we can observe via Drop.
240 struct DropFlag(Arc<AtomicBool>);
241 impl Drop for DropFlag {
242 fn drop(&mut self) {
243 self.0.store(true, Ordering::SeqCst);
244 }
245 }
246 let dropped = Arc::new(AtomicBool::new(false));
247 let flag = DropFlag(dropped.clone());
248 run_on_main_thread(move || {
249 // Move `flag` in; if the closure is dropped without
250 // running, `flag` is also dropped.
251 let _ = &flag;
252 });
253 assert!(
254 dropped.load(Ordering::SeqCst),
255 "closure (and captured state) must be dropped when no dispatcher is set"
256 );
257 }
258
259 #[test]
260 fn closure_dropped_on_dispatch_failure() {
261 let _guard = lock();
262 install(refuse);
263 struct DropFlag(Arc<AtomicBool>);
264 impl Drop for DropFlag {
265 fn drop(&mut self) {
266 self.0.store(true, Ordering::SeqCst);
267 }
268 }
269 let dropped = Arc::new(AtomicBool::new(false));
270 let flag = DropFlag(dropped.clone());
271 run_on_main_thread(move || {
272 let _ = &flag;
273 });
274 assert!(
275 dropped.load(Ordering::SeqCst),
276 "closure must be dropped when dispatcher refuses"
277 );
278 __reset_for_tests();
279 }
280
281 #[test]
282 fn multiple_dispatches_each_run_once() {
283 let _guard = lock();
284 install(sync_invoke);
285 let counter = Arc::new(AtomicUsize::new(0));
286 for _ in 0..5 {
287 let c = counter.clone();
288 run_on_main_thread(move || {
289 c.fetch_add(1, Ordering::SeqCst);
290 });
291 }
292 assert_eq!(counter.load(Ordering::SeqCst), 5);
293 __reset_for_tests();
294 }
295}