slt/context/async_tasks.rs
1//! In-frame async task API (`Context::spawn` / `Context::poll`), feature-gated
2//! behind `async`. See issue #234.
3//!
4//! BubbleTea's `tea.Cmd` is the inspiration. SLT's existing
5//! [`run_async`](crate::run_async) entry point uses an external `mpsc` channel,
6//! which works but requires the caller to hold a `Sender` outside the closure
7//! and wire messages manually. The in-frame API closes this ergonomics gap for
8//! the common case: "click button -> spawn fetch -> show result next frame".
9//!
10//! This module defines [`TaskHandle`] (the opaque, `#[must_use]` handle
11//! returned by `Context::spawn`) and [`AsyncTasks`] (the per-session registry
12//! round-tripped through [`Context`] exactly like the scheduler timer table).
13
14use std::any::Any;
15use std::marker::PhantomData;
16use std::sync::mpsc::{Receiver, Sender};
17
18/// A completed task result delivered from a spawned future back to the
19/// [`AsyncTasks`] registry: `(task id, boxed result)`.
20type ResultMsg = (u64, Box<dyn Any + Send>);
21
22/// Opaque handle returned by [`Context::spawn`](crate::Context::spawn).
23///
24/// Store the handle and pass it to [`Context::poll`](crate::Context::poll) on
25/// subsequent frames to retrieve the task's result. **Dropping the handle
26/// cancels the in-flight task** (via [`tokio::task::JoinHandle::abort`]), so
27/// keep it alive for as long as you care about the result.
28///
29/// The type parameter `T` ties the handle to the future's output type so
30/// `poll` can downcast safely. Two handles never collide: each carries a
31/// unique `id`, so even two `TaskHandle<String>` live simultaneously route
32/// their results to the correct caller.
33///
34/// Requires the `async` feature.
35#[must_use = "dropping a TaskHandle cancels the spawned task; store it to poll the result"]
36pub struct TaskHandle<T> {
37 pub(crate) id: u64,
38 /// Sends this handle's `id` to the registry on drop so the task is
39 /// cancelled. `None` only for handles that were already disarmed (never
40 /// happens in normal use; kept for forward flexibility).
41 cancel: Option<Sender<u64>>,
42 _marker: PhantomData<fn() -> T>,
43}
44
45impl<T> TaskHandle<T> {
46 pub(crate) fn new(id: u64, cancel: Sender<u64>) -> Self {
47 Self {
48 id,
49 cancel: Some(cancel),
50 _marker: PhantomData,
51 }
52 }
53
54 /// The stable task id this handle refers to. Crate-internal: callers match
55 /// handles by identity, not by reading the raw id.
56 pub(crate) fn id(&self) -> u64 {
57 self.id
58 }
59}
60
61impl<T> std::fmt::Debug for TaskHandle<T> {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 f.debug_struct("TaskHandle").field("id", &self.id).finish()
64 }
65}
66
67impl<T> Drop for TaskHandle<T> {
68 fn drop(&mut self) {
69 if let Some(cancel) = self.cancel.take() {
70 // Best-effort: if the registry's receiver is already gone (session
71 // ended) the send fails and there is nothing left to cancel.
72 let _ = cancel.send(self.id);
73 }
74 }
75}
76
77/// Per-session async task registry, round-tripped through [`Context`] each
78/// frame (moved out at frame start, moved back at frame end) exactly like the
79/// scheduler timer table.
80///
81/// Holds the ambient Tokio runtime handle (injected by
82/// [`run_async`](crate::run_async) / [`run_async_with`](crate::run_async_with)),
83/// the live `JoinHandle`s for cancellation, and completed results keyed by id.
84///
85/// `Default` produces an inert registry with no runtime — `spawn` on it panics,
86/// matching the documented contract that `spawn` requires an active runtime.
87pub(crate) struct AsyncTasks {
88 /// Ambient Tokio runtime handle. `None` outside `run_async*` (TestBackend,
89 /// the sync `run` loop) — `spawn` panics in that case.
90 runtime: Option<tokio::runtime::Handle>,
91 /// Monotonic id allocator. Never reused within a session so a stale handle
92 /// can never collide with a freshly-spawned task.
93 next_id: u64,
94 /// Live abort handles, keyed by task id. Removed when the result arrives or
95 /// the task is cancelled.
96 joins: std::collections::HashMap<u64, tokio::task::JoinHandle<()>>,
97 /// Completed results awaiting a `poll`, keyed by task id. A result is
98 /// inserted exactly once and removed by the first matching `poll`.
99 results: std::collections::HashMap<u64, Box<dyn Any + Send>>,
100 /// Sender cloned into every spawned future; the future sends its boxed
101 /// result here on completion. `None` until the first `spawn` lazily wires
102 /// the channel.
103 result_tx: Option<Sender<ResultMsg>>,
104 /// Receiver drained at the top of `spawn`/`poll` to move completed results
105 /// into `results`. Paired with `result_tx`.
106 result_rx: Option<Receiver<ResultMsg>>,
107 /// Sender handed to each [`TaskHandle`]; the handle sends its id here on
108 /// drop to request cancellation. Drained alongside the result channel.
109 cancel_tx: Sender<u64>,
110 /// Receiver for handle-drop cancellation requests.
111 cancel_rx: Receiver<u64>,
112}
113
114impl Default for AsyncTasks {
115 fn default() -> Self {
116 let (cancel_tx, cancel_rx) = std::sync::mpsc::channel();
117 Self {
118 runtime: None,
119 next_id: 0,
120 joins: std::collections::HashMap::new(),
121 results: std::collections::HashMap::new(),
122 result_tx: None,
123 result_rx: None,
124 cancel_tx,
125 cancel_rx,
126 }
127 }
128}
129
130impl AsyncTasks {
131 /// Inject the ambient Tokio runtime handle. Called once by `run_async*`
132 /// before the first frame so `spawn` has a runtime to launch onto.
133 pub(crate) fn set_runtime(&mut self, handle: tokio::runtime::Handle) {
134 self.runtime = Some(handle);
135 }
136
137 /// Spawn `fut` onto the ambient runtime, returning a [`TaskHandle`] keyed by
138 /// a fresh id. Panics if no runtime was injected.
139 pub(crate) fn spawn<T: Send + 'static>(
140 &mut self,
141 fut: impl std::future::Future<Output = T> + Send + 'static,
142 ) -> TaskHandle<T> {
143 let runtime = self.runtime.clone().unwrap_or_else(|| {
144 panic!(
145 "Context::spawn requires an active Tokio runtime; call it inside \
146 run_async() / run_async_with()"
147 )
148 });
149
150 // Lazily wire the result channel on first spawn so the inert
151 // `Default` registry carries no allocation.
152 if self.result_tx.is_none() {
153 let (tx, rx) = std::sync::mpsc::channel();
154 self.result_tx = Some(tx);
155 self.result_rx = Some(rx);
156 }
157 let result_tx = self
158 .result_tx
159 .clone()
160 .expect("result_tx wired immediately above");
161
162 let id = self.next_id;
163 self.next_id = self.next_id.wrapping_add(1);
164
165 let join = runtime.spawn(async move {
166 let out = fut.await;
167 // Best-effort delivery: if the registry (and its receiver) is gone
168 // the result is simply dropped.
169 let _ = result_tx.send((id, Box::new(out) as Box<dyn Any + Send>));
170 });
171 self.joins.insert(id, join);
172
173 TaskHandle::new(id, self.cancel_tx.clone())
174 }
175
176 /// Drain the result and cancellation channels: move completed results into
177 /// `results`, and abort any tasks whose handle was dropped. Called at the
178 /// top of `spawn` and `poll` so the registry stays current without a
179 /// dedicated per-frame tick.
180 fn drain(&mut self) {
181 if let Some(rx) = self.result_rx.as_ref() {
182 while let Ok((id, value)) = rx.try_recv() {
183 // The task finished on its own; its JoinHandle no longer needs
184 // tracking for cancellation.
185 self.joins.remove(&id);
186 self.results.insert(id, value);
187 }
188 }
189 while let Ok(id) = self.cancel_rx.try_recv() {
190 self.cancel(id);
191 }
192 }
193
194 /// Per-frame pump: move in completed results and process handle-drop
195 /// cancellations. Called once per frame from the frame kernel's registry
196 /// round-trip so a frame that calls neither `spawn` nor `poll` still honours
197 /// a [`TaskHandle`] dropped on the previous frame (otherwise the abort would
198 /// only fire on the next frame that happens to spawn or poll).
199 pub(crate) fn maintain(&mut self) {
200 self.drain();
201 }
202
203 /// Take the result for `id` if it has arrived. Returns `Some(T)` exactly
204 /// once, then `None`.
205 pub(crate) fn poll<T: 'static>(&mut self, id: u64) -> Option<T> {
206 self.drain();
207 let boxed = self.results.remove(&id)?;
208 match boxed.downcast::<T>() {
209 Ok(value) => Some(*value),
210 Err(boxed) => {
211 // Type mismatch should be impossible: the id-typed handle pins
212 // the result type. Re-insert defensively rather than lose the
213 // result, and report `None` to this (mistyped) caller.
214 self.results.insert(id, boxed);
215 None
216 }
217 }
218 }
219
220 /// Cancel the task with `id`: abort the future and drop any pending result.
221 fn cancel(&mut self, id: u64) {
222 if let Some(join) = self.joins.remove(&id) {
223 join.abort();
224 }
225 self.results.remove(&id);
226 }
227}