Skip to main content

whisker_runtime/reactive/
resource.rs

1//! Async-data primitive — runs an `async` fetcher on Whisker's
2//! single-threaded task pool ([`crate::tasks`]) and exposes the
3//! loading / ready / error state through a [`ReadSignal`]-shaped
4//! handle.
5//!
6//! The fetcher runs on the TASM thread under
7//! [`futures_executor::LocalPool`]. For blocking sync IO (`ureq`,
8//! `std::fs`, …) inside the fetcher, wrap the call in
9//! [`crate::tasks::run_blocking`] which offloads to a fresh worker
10//! thread and marshals the result back via [`run_on_main_thread`]:
11//!
12//! ```ignore
13//! use whisker::runtime::tasks::run_blocking;
14//!
15//! let stories = resource(|| async {
16//!     run_blocking(|| {
17//!         ureq::get("https://hn.algolia.com/...")
18//!             .call()
19//!             .map_err(|e| e.to_string())?
20//!             .into_string()
21//!             .map_err(|e| e.to_string())
22//!     })
23//!     .await
24//!     .and_then(|body| parse(&body))
25//! });
26//! ```
27//!
28//! For purely-async fetchers (a non-blocking HTTP client, a
29//! pre-computed value, etc.) you can just write `async move { ... }`
30//! and skip the `run_blocking` step.
31
32use std::cell::Cell;
33use std::future::Future;
34use std::pin::Pin;
35use std::rc::Rc;
36use std::task::{Context, Poll};
37
38use crate::tasks::spawn_local;
39
40use super::runtime::NodeId;
41use super::signal::RwSignal;
42
43/// Three-state machine the [`Resource`] cycles through. `Clone` so
44/// reads inside effects can take owned copies without borrowing the
45/// underlying signal slot.
46#[derive(Clone, Debug, PartialEq, Eq)]
47pub enum ResourceState<T> {
48    /// Worker hasn't returned yet — neither value nor error available.
49    Loading,
50    /// Worker returned `Ok(v)` — `v` is the fetched value.
51    Ready(T),
52    /// Worker returned `Err(msg)`. The string is the user-readable
53    /// reason. (Plain `String` rather than a generic `E` keeps the
54    /// type parameter count low and matches the common pattern of
55    /// stringifying upstream errors with `.map_err(|e| e.to_string())`.)
56    Error(String),
57}
58
59impl<T> ResourceState<T> {
60    pub fn is_loading(&self) -> bool {
61        matches!(self, ResourceState::Loading)
62    }
63    pub fn is_ready(&self) -> bool {
64        matches!(self, ResourceState::Ready(_))
65    }
66    pub fn is_error(&self) -> bool {
67        matches!(self, ResourceState::Error(_))
68    }
69}
70
71/// Copy handle to a deferred value. Wraps an [`RwSignal`] whose slot
72/// the worker thread writes into once the fetch completes; consumer
73/// code reads through the accessors below.
74pub struct Resource<T: Clone + 'static> {
75    state: RwSignal<ResourceState<T>>,
76}
77
78// Hand-written Copy/Clone — `derive(Copy)` would require `T: Copy`
79// which is unnecessarily strict (the resource only holds a u32-ish
80// signal handle, not the T itself).
81impl<T: Clone + 'static> Copy for Resource<T> {}
82impl<T: Clone + 'static> Clone for Resource<T> {
83    fn clone(&self) -> Self {
84        *self
85    }
86}
87
88impl<T: Clone + 'static> Resource<T> {
89    /// Construct a `Resource<T>` backed by an externally-owned
90    /// [`RwSignal`]. The signal becomes the resource's source of
91    /// truth — writes to it surface as state transitions through
92    /// the resource's accessors.
93    ///
94    /// Hidden from rustdoc: regular users go through [`resource`] or
95    /// [`resource_sync`]. This is here so tests + non-standard
96    /// "synthetic resource" cases (e.g. a value derived from a
97    /// context signal, exposed as a Resource) can build one without
98    /// re-spawning a fetcher.
99    #[doc(hidden)]
100    pub fn from_state(state: RwSignal<ResourceState<T>>) -> Self {
101        Self { state }
102    }
103
104    /// Read the current state (reactive — registers a dependency on
105    /// the underlying signal).
106    pub fn state(&self) -> ResourceState<T> {
107        self.state.get()
108    }
109
110    /// Convenience: return `Some(value)` when ready, `None` otherwise.
111    pub fn get(&self) -> Option<T> {
112        match self.state.get() {
113            ResourceState::Ready(v) => Some(v),
114            _ => None,
115        }
116    }
117
118    /// Convenience: `true` while the worker is still running.
119    pub fn loading(&self) -> bool {
120        matches!(self.state.get(), ResourceState::Loading)
121    }
122
123    /// Convenience: return `Some(message)` if the fetch ended in error.
124    pub fn error(&self) -> Option<String> {
125        match self.state.get() {
126            ResourceState::Error(e) => Some(e),
127            _ => None,
128        }
129    }
130}
131
132/// Reactive async fetch. Drives `fetcher` (an `async fn` or
133/// `async move {…}` block) on Whisker's task pool and writes the
134/// resolved [`Result`] into the returned [`Resource`]'s signal — then
135/// **re-runs the fetcher whenever any signal it read changes**.
136///
137/// Reactivity: the fetcher is wrapped in a reactive [`effect`] and the
138/// spawned future re-installs that effect node as the current observer
139/// on every `poll`. As a result, signals read **anywhere** in the
140/// fetcher are tracked as dependencies of the resource — both in the
141/// synchronous prefix (before the first `.await`) and after any
142/// `.await` point. When any tracked signal changes, the fetcher runs
143/// again from scratch and the resource updates.
144///
145/// While a (re)fetch is in flight the resource returns to
146/// [`ResourceState::Loading`]. Only the latest run's result is
147/// committed: a monotonically-increasing generation counter guards the
148/// write, so if a newer run starts before an older in-flight fetch
149/// resolves, the stale result is discarded rather than clobbering the
150/// fresh state. In-flight stale fetches are abandoned *cooperatively*
151/// (the superseded future stops at its next `poll` boundary) — there is
152/// no hard cancellation, and any worker thread spawned via
153/// [`crate::tasks::run_blocking`] runs to completion with its result
154/// dropped.
155///
156/// Dynamic-dependency caveat: dependencies are rebuilt on every run, so
157/// a signal that is only read on *some* code path is only a dependency
158/// on the runs where that path actually executes. A signal read after
159/// an `.await` is only tracked once the future advances past that
160/// suspension point.
161///
162/// For blocking sync work inside the fetcher (e.g. `ureq::get(...)`,
163/// `std::fs::read(...)`), wrap the call in
164/// [`crate::tasks::run_blocking`] which moves it to a worker thread
165/// and resumes the awaiting task on the main thread once the result
166/// is back.
167///
168/// Returns immediately with a `Resource<T>` in
169/// [`ResourceState::Loading`]; the first fetch is spawned during the
170/// effect's synchronous initial run.
171///
172/// Owner discipline: the underlying [`RwSignal`] and the driving effect
173/// are registered with whatever owner is current at call time. If that
174/// owner is disposed, the effect stops re-running and any eventual
175/// write is a no-op (the signal node is gone), so no stale write hits a
176/// re-mounted owner.
177///
178/// For tests / already-in-memory values, prefer [`resource_sync`] — it
179/// runs the fetcher inline once, untracked, and doesn't depend on the
180/// executor having been ticked.
181pub fn resource<T, F, Fut>(fetcher: F) -> Resource<T>
182where
183    T: Clone + 'static,
184    F: Fn() -> Fut + 'static,
185    Fut: Future<Output = Result<T, String>> + 'static,
186{
187    let state = RwSignal::new(ResourceState::Loading);
188    // Monotonic run counter. Each effect run bumps it and stamps its
189    // spawned future; the future only commits its result if the
190    // counter still matches at completion time (generation guard).
191    let generation = Rc::new(Cell::new(0u64));
192    let fetcher = Rc::new(fetcher);
193
194    super::effect::effect(move || {
195        // Inside an effect run the runtime's `current_tracker` IS this
196        // effect's node. Capture it so the spawned future can re-install
197        // it as the observer around each poll (so post-`.await` reads
198        // register as deps of this node too).
199        let node = super::current_tracker().expect("resource effect must run under a tracker");
200
201        let my_gen = generation.get().wrapping_add(1);
202        generation.set(my_gen);
203
204        // Build the future. The fetcher's SYNCHRONOUS prefix runs here,
205        // and because we're inside the effect run, its signal reads
206        // register as dependencies of `node`.
207        let fut = (fetcher)();
208
209        // Return to Loading on every (re)fetch. Write untracked so this
210        // never creates a dependency edge — the effect must not depend
211        // on `state` (the signal it writes) or it would re-trigger
212        // itself in an infinite loop.
213        state.update_untracked(|s| *s = ResourceState::Loading);
214
215        spawn_local(ScopedFetch {
216            node,
217            my_gen,
218            generation: generation.clone(),
219            state,
220            fut: Box::pin(fut),
221        });
222    });
223
224    Resource { state }
225}
226
227/// A spawned fetch future that re-installs its resource's effect node
228/// as the current reactive observer on every `poll`, so signal reads
229/// after `.await` points are tracked as dependencies of the resource.
230/// A generation stamp lets a superseded run abandon itself
231/// cooperatively without clobbering a fresher result.
232struct ScopedFetch<T: Clone + 'static> {
233    /// The driving effect's node — re-installed as the observer per poll.
234    node: NodeId,
235    /// This run's generation stamp.
236    my_gen: u64,
237    /// Shared run counter; if it has moved past `my_gen` we're stale.
238    generation: Rc<Cell<u64>>,
239    /// Resource state slot to commit into.
240    state: RwSignal<ResourceState<T>>,
241    /// The fetcher's future (single-threaded / `!Send` is fine here).
242    fut: Pin<Box<dyn Future<Output = Result<T, String>>>>,
243}
244
245impl<T: Clone + 'static> Future for ScopedFetch<T> {
246    type Output = ();
247
248    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
249        // Destructure through the pin so the borrow of `fut` inside the
250        // `with_observer` closure doesn't collide with reads of the
251        // other fields.
252        let this = self.get_mut();
253
254        // A newer run superseded us: abandon WITHOUT installing the
255        // observer (add no stale dependency edges) and WITHOUT writing
256        // state. The inner future is dropped with this `ScopedFetch`.
257        if this.generation.get() != this.my_gen {
258            return Poll::Ready(());
259        }
260
261        let node = this.node;
262        let fut = &mut this.fut;
263        // Re-install the resource's effect node as the current observer
264        // for THIS poll so reads after `.await` register as deps of it.
265        let poll = super::with_observer(node, || fut.as_mut().poll(cx));
266
267        match poll {
268            Poll::Pending => Poll::Pending,
269            Poll::Ready(result) => {
270                // Commit only if we're still the latest run.
271                if this.generation.get() == this.my_gen {
272                    this.state.set(match result {
273                        Ok(v) => ResourceState::Ready(v),
274                        Err(e) => ResourceState::Error(e),
275                    });
276                }
277                Poll::Ready(())
278            }
279        }
280    }
281}
282
283/// Synchronous-fetch variant. Runs `fetcher` inline on the calling
284/// thread and writes the result directly into the resource's signal.
285/// No worker thread, no main-thread dispatcher needed — useful for
286/// tests, for cases where the value is already in memory, and for
287/// computed pseudo-resources (e.g. derive from a context value).
288///
289/// The returned `Resource` is in [`ResourceState::Ready`] or
290/// [`ResourceState::Error`] *immediately* — never `Loading`.
291pub fn resource_sync<T, F>(fetcher: F) -> Resource<T>
292where
293    T: Clone + 'static,
294    F: FnOnce() -> Result<T, String>,
295{
296    // `fetcher` is a one-shot seed for the resource's RwSignal —
297    // its signal reads are meant to compute an initial value, not to
298    // re-fire the resource when those signals change. Run it under
299    // `untrack` so the reads don't leak into whatever outer effect
300    // / computed / component body happens to be calling
301    // `resource_sync`. Same principle as the computed seed guard.
302    let state = RwSignal::new(match super::untrack(fetcher) {
303        Ok(v) => ResourceState::Ready(v),
304        Err(e) => ResourceState::Error(e),
305    });
306    Resource { state }
307}