Skip to main content

slt/context/
async_tasks.rs

1//! In-frame async task API (`Context::spawn` / `Context::poll`), feature-gated
2//! behind `async`. See issue #234.
3//!
4//! BubbleTea's `tea.Cmd` is the inspiration. SLT's existing
5//! [`run_async`](crate::run_async) entry point uses an external `mpsc` channel,
6//! which works but requires the caller to hold a `Sender` outside the closure
7//! and wire messages manually. The in-frame API closes this ergonomics gap for
8//! the common case: "click button -> spawn fetch -> show result next frame".
9//!
10//! This module defines [`TaskHandle`] (the opaque, `#[must_use]` handle
11//! returned by `Context::spawn`) and [`AsyncTasks`] (the per-session registry
12//! round-tripped through [`Context`] exactly like the scheduler timer table).
13
14use std::any::Any;
15use std::marker::PhantomData;
16use std::sync::mpsc::{Receiver, Sender};
17
18/// A completed task result delivered from a spawned future back to the
19/// [`AsyncTasks`] registry: `(task id, boxed result)`.
20type ResultMsg = (u64, Box<dyn Any + Send>);
21
22/// Opaque handle returned by [`Context::spawn`](crate::Context::spawn).
23///
24/// Store the handle and pass it to [`Context::poll`](crate::Context::poll) on
25/// subsequent frames to retrieve the task's result. **Dropping the handle
26/// cancels the in-flight task** (via [`tokio::task::JoinHandle::abort`]), so
27/// keep it alive for as long as you care about the result.
28///
29/// The type parameter `T` ties the handle to the future's output type so
30/// `poll` can downcast safely. Two handles never collide: each carries a
31/// unique `id`, so even two `TaskHandle<String>` live simultaneously route
32/// their results to the correct caller.
33///
34/// Requires the `async` feature.
35#[must_use = "dropping a TaskHandle cancels the spawned task; store it to poll the result"]
36pub struct TaskHandle<T> {
37    pub(crate) id: u64,
38    /// Sends this handle's `id` to the registry on drop so the task is
39    /// cancelled. `None` only for handles that were already disarmed (never
40    /// happens in normal use; kept for forward flexibility).
41    cancel: Option<Sender<u64>>,
42    _marker: PhantomData<fn() -> T>,
43}
44
45impl<T> TaskHandle<T> {
46    pub(crate) fn new(id: u64, cancel: Sender<u64>) -> Self {
47        Self {
48            id,
49            cancel: Some(cancel),
50            _marker: PhantomData,
51        }
52    }
53
54    /// The stable task id this handle refers to. Crate-internal: callers match
55    /// handles by identity, not by reading the raw id.
56    pub(crate) fn id(&self) -> u64 {
57        self.id
58    }
59}
60
61impl<T> std::fmt::Debug for TaskHandle<T> {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        f.debug_struct("TaskHandle").field("id", &self.id).finish()
64    }
65}
66
67impl<T> Drop for TaskHandle<T> {
68    fn drop(&mut self) {
69        if let Some(cancel) = self.cancel.take() {
70            // Best-effort: if the registry's receiver is already gone (session
71            // ended) the send fails and there is nothing left to cancel.
72            let _ = cancel.send(self.id);
73        }
74    }
75}
76
77/// Per-session async task registry, round-tripped through [`Context`] each
78/// frame (moved out at frame start, moved back at frame end) exactly like the
79/// scheduler timer table.
80///
81/// Holds the ambient Tokio runtime handle (injected by
82/// [`run_async`](crate::run_async) / [`run_async_with`](crate::run_async_with)),
83/// the live `JoinHandle`s for cancellation, and completed results keyed by id.
84///
85/// `Default` produces an inert registry with no runtime — `spawn` on it panics,
86/// matching the documented contract that `spawn` requires an active runtime.
87pub(crate) struct AsyncTasks {
88    /// Ambient Tokio runtime handle. `None` outside `run_async*` (TestBackend,
89    /// the sync `run` loop) — `spawn` panics in that case.
90    runtime: Option<tokio::runtime::Handle>,
91    /// Monotonic id allocator. Never reused within a session so a stale handle
92    /// can never collide with a freshly-spawned task.
93    next_id: u64,
94    /// Live abort handles, keyed by task id. Removed when the result arrives or
95    /// the task is cancelled.
96    joins: std::collections::HashMap<u64, tokio::task::JoinHandle<()>>,
97    /// Completed results awaiting a `poll`, keyed by task id. A result is
98    /// inserted exactly once and removed by the first matching `poll`.
99    results: std::collections::HashMap<u64, Box<dyn Any + Send>>,
100    /// Sender cloned into every spawned future; the future sends its boxed
101    /// result here on completion. `None` until the first `spawn` lazily wires
102    /// the channel.
103    result_tx: Option<Sender<ResultMsg>>,
104    /// Receiver drained at the top of `spawn`/`poll` to move completed results
105    /// into `results`. Paired with `result_tx`.
106    result_rx: Option<Receiver<ResultMsg>>,
107    /// Sender handed to each [`TaskHandle`]; the handle sends its id here on
108    /// drop to request cancellation. Drained alongside the result channel.
109    cancel_tx: Sender<u64>,
110    /// Receiver for handle-drop cancellation requests.
111    cancel_rx: Receiver<u64>,
112}
113
114impl Default for AsyncTasks {
115    fn default() -> Self {
116        let (cancel_tx, cancel_rx) = std::sync::mpsc::channel();
117        Self {
118            runtime: None,
119            next_id: 0,
120            joins: std::collections::HashMap::new(),
121            results: std::collections::HashMap::new(),
122            result_tx: None,
123            result_rx: None,
124            cancel_tx,
125            cancel_rx,
126        }
127    }
128}
129
130impl AsyncTasks {
131    /// Inject the ambient Tokio runtime handle. Called once by `run_async*`
132    /// before the first frame so `spawn` has a runtime to launch onto.
133    pub(crate) fn set_runtime(&mut self, handle: tokio::runtime::Handle) {
134        self.runtime = Some(handle);
135    }
136
137    /// Spawn `fut` onto the ambient runtime, returning a [`TaskHandle`] keyed by
138    /// a fresh id. Panics if no runtime was injected.
139    pub(crate) fn spawn<T: Send + 'static>(
140        &mut self,
141        fut: impl std::future::Future<Output = T> + Send + 'static,
142    ) -> TaskHandle<T> {
143        let runtime = self.runtime.clone().unwrap_or_else(|| {
144            panic!(
145                "Context::spawn requires an active Tokio runtime; call it inside \
146                 run_async() / run_async_with()"
147            )
148        });
149
150        // Lazily wire the result channel on first spawn so the inert
151        // `Default` registry carries no allocation.
152        if self.result_tx.is_none() {
153            let (tx, rx) = std::sync::mpsc::channel();
154            self.result_tx = Some(tx);
155            self.result_rx = Some(rx);
156        }
157        let result_tx = self
158            .result_tx
159            .clone()
160            .expect("result_tx wired immediately above");
161
162        let id = self.next_id;
163        self.next_id = self.next_id.wrapping_add(1);
164
165        let join = runtime.spawn(async move {
166            let out = fut.await;
167            // Best-effort delivery: if the registry (and its receiver) is gone
168            // the result is simply dropped.
169            let _ = result_tx.send((id, Box::new(out) as Box<dyn Any + Send>));
170        });
171        self.joins.insert(id, join);
172
173        TaskHandle::new(id, self.cancel_tx.clone())
174    }
175
176    /// Drain the result and cancellation channels: move completed results into
177    /// `results`, and abort any tasks whose handle was dropped. Called at the
178    /// top of `spawn` and `poll` so the registry stays current without a
179    /// dedicated per-frame tick.
180    fn drain(&mut self) {
181        if let Some(rx) = self.result_rx.as_ref() {
182            while let Ok((id, value)) = rx.try_recv() {
183                // The task finished on its own; its JoinHandle no longer needs
184                // tracking for cancellation.
185                self.joins.remove(&id);
186                self.results.insert(id, value);
187            }
188        }
189        while let Ok(id) = self.cancel_rx.try_recv() {
190            self.cancel(id);
191        }
192    }
193
194    /// Per-frame pump: move in completed results and process handle-drop
195    /// cancellations. Called once per frame from the frame kernel's registry
196    /// round-trip so a frame that calls neither `spawn` nor `poll` still honours
197    /// a [`TaskHandle`] dropped on the previous frame (otherwise the abort would
198    /// only fire on the next frame that happens to spawn or poll).
199    pub(crate) fn maintain(&mut self) {
200        self.drain();
201    }
202
203    /// Take the result for `id` if it has arrived. Returns `Some(T)` exactly
204    /// once, then `None`.
205    pub(crate) fn poll<T: 'static>(&mut self, id: u64) -> Option<T> {
206        self.drain();
207        let boxed = self.results.remove(&id)?;
208        match boxed.downcast::<T>() {
209            Ok(value) => Some(*value),
210            Err(boxed) => {
211                // Type mismatch should be impossible: the id-typed handle pins
212                // the result type. Re-insert defensively rather than lose the
213                // result, and report `None` to this (mistyped) caller.
214                self.results.insert(id, boxed);
215                None
216            }
217        }
218    }
219
220    /// Cancel the task with `id`: abort the future and drop any pending result.
221    fn cancel(&mut self, id: u64) {
222        if let Some(join) = self.joins.remove(&id) {
223            join.abort();
224        }
225        self.results.remove(&id);
226    }
227}