whisker_runtime/reactive/resource.rs
1//! Async-data primitive — runs an `async` fetcher on Whisker's
2//! single-threaded task pool ([`crate::tasks`]) and exposes the
3//! loading / ready / error state through a [`ReadSignal`]-shaped
4//! handle.
5//!
6//! The fetcher runs on the TASM thread under
7//! [`futures_executor::LocalPool`]. For blocking sync IO (`ureq`,
8//! `std::fs`, …) inside the fetcher, wrap the call in
9//! [`crate::tasks::run_blocking`] which offloads to a fresh worker
10//! thread and marshals the result back via [`run_on_main_thread`]:
11//!
12//! ```ignore
13//! use whisker::runtime::tasks::run_blocking;
14//!
15//! let stories = resource(|| async {
16//! run_blocking(|| {
17//! ureq::get("https://hn.algolia.com/...")
18//! .call()
19//! .map_err(|e| e.to_string())?
20//! .into_string()
21//! .map_err(|e| e.to_string())
22//! })
23//! .await
24//! .and_then(|body| parse(&body))
25//! });
26//! ```
27//!
28//! For purely-async fetchers (a non-blocking HTTP client, a
29//! pre-computed value, etc.) you can just write `async move { ... }`
30//! and skip the `run_blocking` step.
31
32use std::cell::Cell;
33use std::future::Future;
34use std::pin::Pin;
35use std::rc::Rc;
36use std::task::{Context, Poll};
37
38use crate::tasks::spawn_local;
39
40use super::runtime::NodeId;
41use super::signal::RwSignal;
42
43/// Three-state machine the [`Resource`] cycles through. `Clone` so
44/// reads inside effects can take owned copies without borrowing the
45/// underlying signal slot.
46#[derive(Clone, Debug, PartialEq, Eq)]
47pub enum ResourceState<T> {
48 /// Worker hasn't returned yet — neither value nor error available.
49 Loading,
50 /// Worker returned `Ok(v)` — `v` is the fetched value.
51 Ready(T),
52 /// Worker returned `Err(msg)`. The string is the user-readable
53 /// reason. (Plain `String` rather than a generic `E` keeps the
54 /// type parameter count low and matches the common pattern of
55 /// stringifying upstream errors with `.map_err(|e| e.to_string())`.)
56 Error(String),
57}
58
59impl<T> ResourceState<T> {
60 pub fn is_loading(&self) -> bool {
61 matches!(self, ResourceState::Loading)
62 }
63 pub fn is_ready(&self) -> bool {
64 matches!(self, ResourceState::Ready(_))
65 }
66 pub fn is_error(&self) -> bool {
67 matches!(self, ResourceState::Error(_))
68 }
69}
70
71/// Copy handle to a deferred value. Wraps an [`RwSignal`] whose slot
72/// the worker thread writes into once the fetch completes; consumer
73/// code reads through the accessors below.
74pub struct Resource<T: Clone + 'static> {
75 state: RwSignal<ResourceState<T>>,
76}
77
78// Hand-written Copy/Clone — `derive(Copy)` would require `T: Copy`
79// which is unnecessarily strict (the resource only holds a u32-ish
80// signal handle, not the T itself).
81impl<T: Clone + 'static> Copy for Resource<T> {}
82impl<T: Clone + 'static> Clone for Resource<T> {
83 fn clone(&self) -> Self {
84 *self
85 }
86}
87
88impl<T: Clone + 'static> Resource<T> {
89 /// Construct a `Resource<T>` backed by an externally-owned
90 /// [`RwSignal`]. The signal becomes the resource's source of
91 /// truth — writes to it surface as state transitions through
92 /// the resource's accessors.
93 ///
94 /// Hidden from rustdoc: regular users go through [`resource`] or
95 /// [`resource_sync`]. This is here so tests + non-standard
96 /// "synthetic resource" cases (e.g. a value derived from a
97 /// context signal, exposed as a Resource) can build one without
98 /// re-spawning a fetcher.
99 #[doc(hidden)]
100 pub fn from_state(state: RwSignal<ResourceState<T>>) -> Self {
101 Self { state }
102 }
103
104 /// Read the current state (reactive — registers a dependency on
105 /// the underlying signal).
106 pub fn state(&self) -> ResourceState<T> {
107 self.state.get()
108 }
109
110 /// Convenience: return `Some(value)` when ready, `None` otherwise.
111 pub fn get(&self) -> Option<T> {
112 match self.state.get() {
113 ResourceState::Ready(v) => Some(v),
114 _ => None,
115 }
116 }
117
118 /// Convenience: `true` while the worker is still running.
119 pub fn loading(&self) -> bool {
120 matches!(self.state.get(), ResourceState::Loading)
121 }
122
123 /// Convenience: return `Some(message)` if the fetch ended in error.
124 pub fn error(&self) -> Option<String> {
125 match self.state.get() {
126 ResourceState::Error(e) => Some(e),
127 _ => None,
128 }
129 }
130}
131
132/// Reactive async fetch. Drives `fetcher` (an `async fn` or
133/// `async move {…}` block) on Whisker's task pool and writes the
134/// resolved [`Result`] into the returned [`Resource`]'s signal — then
135/// **re-runs the fetcher whenever any signal it read changes**.
136///
137/// Reactivity: the fetcher is wrapped in a reactive [`effect`] and the
138/// spawned future re-installs that effect node as the current observer
139/// on every `poll`. As a result, signals read **anywhere** in the
140/// fetcher are tracked as dependencies of the resource — both in the
141/// synchronous prefix (before the first `.await`) and after any
142/// `.await` point. When any tracked signal changes, the fetcher runs
143/// again from scratch and the resource updates.
144///
145/// While a (re)fetch is in flight the resource returns to
146/// [`ResourceState::Loading`]. Only the latest run's result is
147/// committed: a monotonically-increasing generation counter guards the
148/// write, so if a newer run starts before an older in-flight fetch
149/// resolves, the stale result is discarded rather than clobbering the
150/// fresh state. In-flight stale fetches are abandoned *cooperatively*
151/// (the superseded future stops at its next `poll` boundary) — there is
152/// no hard cancellation, and any worker thread spawned via
153/// [`crate::tasks::run_blocking`] runs to completion with its result
154/// dropped.
155///
156/// Dynamic-dependency caveat: dependencies are rebuilt on every run, so
157/// a signal that is only read on *some* code path is only a dependency
158/// on the runs where that path actually executes. A signal read after
159/// an `.await` is only tracked once the future advances past that
160/// suspension point.
161///
162/// For blocking sync work inside the fetcher (e.g. `ureq::get(...)`,
163/// `std::fs::read(...)`), wrap the call in
164/// [`crate::tasks::run_blocking`] which moves it to a worker thread
165/// and resumes the awaiting task on the main thread once the result
166/// is back.
167///
168/// Returns immediately with a `Resource<T>` in
169/// [`ResourceState::Loading`]; the first fetch is spawned during the
170/// effect's synchronous initial run.
171///
172/// Owner discipline: the underlying [`RwSignal`] and the driving effect
173/// are registered with whatever owner is current at call time. If that
174/// owner is disposed, the effect stops re-running and any eventual
175/// write is a no-op (the signal node is gone), so no stale write hits a
176/// re-mounted owner.
177///
178/// For tests / already-in-memory values, prefer [`resource_sync`] — it
179/// runs the fetcher inline once, untracked, and doesn't depend on the
180/// executor having been ticked.
181pub fn resource<T, F, Fut>(fetcher: F) -> Resource<T>
182where
183 T: Clone + 'static,
184 F: Fn() -> Fut + 'static,
185 Fut: Future<Output = Result<T, String>> + 'static,
186{
187 let state = RwSignal::new(ResourceState::Loading);
188 // Monotonic run counter. Each effect run bumps it and stamps its
189 // spawned future; the future only commits its result if the
190 // counter still matches at completion time (generation guard).
191 let generation = Rc::new(Cell::new(0u64));
192 let fetcher = Rc::new(fetcher);
193
194 super::effect::effect(move || {
195 // Inside an effect run the runtime's `current_tracker` IS this
196 // effect's node. Capture it so the spawned future can re-install
197 // it as the observer around each poll (so post-`.await` reads
198 // register as deps of this node too).
199 let node = super::current_tracker().expect("resource effect must run under a tracker");
200
201 let my_gen = generation.get().wrapping_add(1);
202 generation.set(my_gen);
203
204 // Build the future. The fetcher's SYNCHRONOUS prefix runs here,
205 // and because we're inside the effect run, its signal reads
206 // register as dependencies of `node`.
207 let fut = (fetcher)();
208
209 // Return to Loading on every (re)fetch. Write untracked so this
210 // never creates a dependency edge — the effect must not depend
211 // on `state` (the signal it writes) or it would re-trigger
212 // itself in an infinite loop.
213 state.update_untracked(|s| *s = ResourceState::Loading);
214
215 spawn_local(ScopedFetch {
216 node,
217 my_gen,
218 generation: generation.clone(),
219 state,
220 fut: Box::pin(fut),
221 });
222 });
223
224 Resource { state }
225}
226
227/// A spawned fetch future that re-installs its resource's effect node
228/// as the current reactive observer on every `poll`, so signal reads
229/// after `.await` points are tracked as dependencies of the resource.
230/// A generation stamp lets a superseded run abandon itself
231/// cooperatively without clobbering a fresher result.
232struct ScopedFetch<T: Clone + 'static> {
233 /// The driving effect's node — re-installed as the observer per poll.
234 node: NodeId,
235 /// This run's generation stamp.
236 my_gen: u64,
237 /// Shared run counter; if it has moved past `my_gen` we're stale.
238 generation: Rc<Cell<u64>>,
239 /// Resource state slot to commit into.
240 state: RwSignal<ResourceState<T>>,
241 /// The fetcher's future (single-threaded / `!Send` is fine here).
242 fut: Pin<Box<dyn Future<Output = Result<T, String>>>>,
243}
244
245impl<T: Clone + 'static> Future for ScopedFetch<T> {
246 type Output = ();
247
248 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
249 // Destructure through the pin so the borrow of `fut` inside the
250 // `with_observer` closure doesn't collide with reads of the
251 // other fields.
252 let this = self.get_mut();
253
254 // A newer run superseded us: abandon WITHOUT installing the
255 // observer (add no stale dependency edges) and WITHOUT writing
256 // state. The inner future is dropped with this `ScopedFetch`.
257 if this.generation.get() != this.my_gen {
258 return Poll::Ready(());
259 }
260
261 let node = this.node;
262 let fut = &mut this.fut;
263 // Re-install the resource's effect node as the current observer
264 // for THIS poll so reads after `.await` register as deps of it.
265 let poll = super::with_observer(node, || fut.as_mut().poll(cx));
266
267 match poll {
268 Poll::Pending => Poll::Pending,
269 Poll::Ready(result) => {
270 // Commit only if we're still the latest run.
271 if this.generation.get() == this.my_gen {
272 this.state.set(match result {
273 Ok(v) => ResourceState::Ready(v),
274 Err(e) => ResourceState::Error(e),
275 });
276 }
277 Poll::Ready(())
278 }
279 }
280 }
281}
282
283/// Synchronous-fetch variant. Runs `fetcher` inline on the calling
284/// thread and writes the result directly into the resource's signal.
285/// No worker thread, no main-thread dispatcher needed — useful for
286/// tests, for cases where the value is already in memory, and for
287/// computed pseudo-resources (e.g. derive from a context value).
288///
289/// The returned `Resource` is in [`ResourceState::Ready`] or
290/// [`ResourceState::Error`] *immediately* — never `Loading`.
291pub fn resource_sync<T, F>(fetcher: F) -> Resource<T>
292where
293 T: Clone + 'static,
294 F: FnOnce() -> Result<T, String>,
295{
296 // `fetcher` is a one-shot seed for the resource's RwSignal —
297 // its signal reads are meant to compute an initial value, not to
298 // re-fire the resource when those signals change. Run it under
299 // `untrack` so the reads don't leak into whatever outer effect
300 // / computed / component body happens to be calling
301 // `resource_sync`. Same principle as the computed seed guard.
302 let state = RwSignal::new(match super::untrack(fetcher) {
303 Ok(v) => ResourceState::Ready(v),
304 Err(e) => ResourceState::Error(e),
305 });
306 Resource { state }
307}