Skip to main content

cranpose_core/
launched_effect.rs

1use crate::{hash_key, with_current_composer, Key, RuntimeHandle, TaskHandle};
2#[cfg(not(target_arch = "wasm32"))]
3use std::cell::{Cell, RefCell};
4use std::future::Future;
5use std::hash::Hash;
6use std::pin::Pin;
7#[cfg(not(target_arch = "wasm32"))]
8use std::rc::Rc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11
12#[derive(Default)]
13struct LaunchedEffectState {
14    key: Option<Key>,
15    cancel: Option<LaunchedEffectCancellation>,
16}
17
18struct LaunchedEffectCancellation {
19    #[cfg(not(target_arch = "wasm32"))]
20    runtime: RuntimeHandle,
21    active: Arc<AtomicBool>,
22    #[cfg(not(target_arch = "wasm32"))]
23    continuations: Rc<RefCell<Vec<u64>>>,
24}
25
26#[derive(Default)]
27struct LaunchedEffectAsyncState {
28    key: Option<Key>,
29    cancel: Option<LaunchedEffectCancellation>,
30    task: Option<TaskHandle>,
31}
32
33impl LaunchedEffectState {
34    fn should_run(&self, key: Key) -> bool {
35        match self.key {
36            Some(current) => current != key,
37            None => true,
38        }
39    }
40
41    fn set_key(&mut self, key: Key) {
42        self.key = Some(key);
43    }
44
45    fn launch(
46        &mut self,
47        runtime: RuntimeHandle,
48        effect: impl FnOnce(LaunchedEffectScope) + 'static,
49    ) {
50        self.cancel_current();
51        let active = Arc::new(AtomicBool::new(true));
52        #[cfg(not(target_arch = "wasm32"))]
53        let continuations = Rc::new(RefCell::new(Vec::new()));
54        self.cancel = Some(LaunchedEffectCancellation {
55            #[cfg(not(target_arch = "wasm32"))]
56            runtime: runtime.clone(),
57            active: Arc::clone(&active),
58            #[cfg(not(target_arch = "wasm32"))]
59            continuations: Rc::clone(&continuations),
60        });
61        let scope = LaunchedEffectScope {
62            active: Arc::clone(&active),
63            runtime: runtime.clone(),
64            #[cfg(not(target_arch = "wasm32"))]
65            continuations,
66        };
67        runtime.enqueue_ui_task(Box::new(move || effect(scope)));
68    }
69
70    fn cancel_current(&mut self) {
71        if let Some(cancel) = self.cancel.take() {
72            cancel.cancel();
73        }
74    }
75}
76
77impl LaunchedEffectCancellation {
78    fn cancel(&self) {
79        self.active.store(false, Ordering::SeqCst);
80        #[cfg(not(target_arch = "wasm32"))]
81        {
82            let mut pending = self.continuations.borrow_mut();
83            for id in pending.drain(..) {
84                self.runtime.cancel_ui_cont(id);
85            }
86        }
87    }
88}
89
90impl LaunchedEffectAsyncState {
91    fn should_run(&self, key: Key) -> bool {
92        match self.key {
93            Some(current) => current != key,
94            None => true,
95        }
96    }
97
98    fn set_key(&mut self, key: Key) {
99        self.key = Some(key);
100    }
101
102    fn launch(
103        &mut self,
104        runtime: RuntimeHandle,
105        mk_future: impl FnOnce(LaunchedEffectScope) -> Pin<Box<dyn Future<Output = ()>>> + 'static,
106    ) {
107        self.cancel_current();
108        let active = Arc::new(AtomicBool::new(true));
109        #[cfg(not(target_arch = "wasm32"))]
110        let continuations = Rc::new(RefCell::new(Vec::new()));
111        self.cancel = Some(LaunchedEffectCancellation {
112            #[cfg(not(target_arch = "wasm32"))]
113            runtime: runtime.clone(),
114            active: Arc::clone(&active),
115            #[cfg(not(target_arch = "wasm32"))]
116            continuations: Rc::clone(&continuations),
117        });
118        let scope = LaunchedEffectScope {
119            active: Arc::clone(&active),
120            runtime: runtime.clone(),
121            #[cfg(not(target_arch = "wasm32"))]
122            continuations,
123        };
124        let future = mk_future(scope.clone());
125        let active_flag = Arc::clone(&scope.active);
126        match runtime.spawn_ui(async move {
127            future.await;
128            active_flag.store(false, Ordering::SeqCst);
129        }) {
130            Some(handle) => {
131                self.task = Some(handle);
132            }
133            None => {
134                active.store(false, Ordering::SeqCst);
135                self.cancel = None;
136            }
137        }
138    }
139
140    fn cancel_current(&mut self) {
141        if let Some(handle) = self.task.take() {
142            handle.cancel();
143        }
144        if let Some(cancel) = self.cancel.take() {
145            cancel.cancel();
146        }
147    }
148}
149
150impl Drop for LaunchedEffectState {
151    fn drop(&mut self) {
152        self.cancel_current();
153    }
154}
155
156impl Drop for LaunchedEffectAsyncState {
157    fn drop(&mut self) {
158        self.cancel_current();
159    }
160}
161
162#[derive(Clone)]
163pub struct LaunchedEffectScope {
164    active: Arc<AtomicBool>,
165    runtime: RuntimeHandle,
166    #[cfg(not(target_arch = "wasm32"))]
167    continuations: Rc<RefCell<Vec<u64>>>,
168}
169
170impl LaunchedEffectScope {
171    #[cfg(not(target_arch = "wasm32"))]
172    fn track_continuation(&self, id: u64) {
173        self.continuations.borrow_mut().push(id);
174    }
175
176    #[cfg(not(target_arch = "wasm32"))]
177    fn release_continuation(&self, id: u64) {
178        let mut continuations = self.continuations.borrow_mut();
179        if let Some(index) = continuations.iter().position(|entry| *entry == id) {
180            continuations.remove(index);
181        }
182    }
183
184    pub fn is_active(&self) -> bool {
185        self.active.load(Ordering::SeqCst)
186    }
187
188    pub fn runtime(&self) -> RuntimeHandle {
189        self.runtime.clone()
190    }
191
192    /// Runs a follow-up `LaunchedEffect` task on the UI thread.
193    ///
194    /// The provided closure executes on the runtime thread and may freely
195    /// capture `Rc`/`RefCell` state. This must only be called from the UI
196    /// thread, typically inside another effect callback.
197    pub fn launch(&self, task: impl FnOnce(LaunchedEffectScope) + 'static) {
198        if !self.is_active() {
199            return;
200        }
201        let scope = self.clone();
202        self.runtime.enqueue_ui_task(Box::new(move || {
203            if scope.is_active() {
204                task(scope);
205            }
206        }));
207    }
208
209    /// Posts UI-only work that will execute on the runtime thread.
210    ///
211    /// The closure never crosses threads, so it may capture non-`Send` values.
212    /// Callers must invoke this from the UI thread.
213    pub fn post_ui(&self, task: impl FnOnce() + 'static) {
214        if !self.is_active() {
215            return;
216        }
217        let active = Arc::clone(&self.active);
218        self.runtime.enqueue_ui_task(Box::new(move || {
219            if active.load(Ordering::SeqCst) {
220                task();
221            }
222        }));
223    }
224
225    /// Posts work from any thread to run on the UI thread.
226    ///
227    /// The closure must be `Send` because it may be sent across threads before
228    /// running on the runtime thread. Use this helper when posting from
229    /// background threads that need to interact with UI state.
230    pub fn post_ui_send(&self, task: impl FnOnce() + Send + 'static) {
231        if !self.is_active() {
232            return;
233        }
234        let active = Arc::clone(&self.active);
235        self.runtime.post_ui(move || {
236            if active.load(Ordering::SeqCst) {
237                task();
238            }
239        });
240    }
241
242    /// Runs background work and delivers results to the UI.
243    ///
244    /// On native targets, `work` runs on a worker thread and its future is
245    /// driven to completion there. On WASM, `work` runs as a task on the
246    /// browser event loop. The `on_ui` continuation always runs on the runtime
247    /// thread, so it may capture `Rc`/`RefCell` state safely.
248    #[cfg(not(target_arch = "wasm32"))]
249    pub fn launch_background<T, Work, Ui, Fut>(&self, work: Work, on_ui: Ui)
250    where
251        T: Send + 'static,
252        Work: FnOnce(CancelToken) -> Fut + Send + 'static,
253        Fut: Future<Output = T> + Send + 'static,
254        Ui: FnOnce(T) + 'static,
255    {
256        if !self.is_active() {
257            return;
258        }
259        let dispatcher = self.runtime.dispatcher();
260        let active_for_thread = Arc::clone(&self.active);
261        let continuation_scope = self.clone();
262        let continuation_active = Arc::clone(&self.active);
263        let id_cell = Rc::new(Cell::new(0));
264        let id_for_closure = Rc::clone(&id_cell);
265        let continuation = move |value: T| {
266            let id = id_for_closure.get();
267            continuation_scope.release_continuation(id);
268            if continuation_active.load(Ordering::SeqCst) {
269                on_ui(value);
270            }
271        };
272
273        let Some(cont_id) = self.runtime.register_ui_cont(continuation) else {
274            return;
275        };
276        id_cell.set(cont_id);
277        self.track_continuation(cont_id);
278
279        std::thread::spawn(move || {
280            let token = CancelToken::new(Arc::clone(&active_for_thread));
281            let value = pollster::block_on(work(token.clone()));
282            if token.is_cancelled() {
283                return;
284            }
285            dispatcher.post_invoke(cont_id, value);
286        });
287    }
288
289    /// Runs background work and delivers results to the UI.
290    ///
291    /// On native targets, `work` runs on a worker thread and its future is
292    /// driven to completion there. On WASM, `work` runs as a task on the
293    /// browser event loop. The `on_ui` continuation always runs on the runtime
294    /// thread, so it may capture `Rc`/`RefCell` state safely.
295    #[cfg(target_arch = "wasm32")]
296    pub fn launch_background<T, Work, Ui, Fut>(&self, work: Work, on_ui: Ui)
297    where
298        T: 'static,
299        Work: FnOnce(CancelToken) -> Fut + 'static,
300        Fut: Future<Output = T> + 'static,
301        Ui: FnOnce(T) + 'static,
302    {
303        if !self.is_active() {
304            return;
305        }
306        let active_for_task = Arc::clone(&self.active);
307        let scope = self.clone();
308        wasm_bindgen_futures::spawn_local(async move {
309            let token = CancelToken::new(Arc::clone(&active_for_task));
310            let value = work(token.clone()).await;
311            if token.is_cancelled() {
312                return;
313            }
314            scope.post_ui(move || {
315                if token.is_active() {
316                    on_ui(value);
317                }
318            });
319        });
320    }
321}
322
323#[derive(Clone)]
324/// Cooperative cancellation token passed into background `LaunchedEffect` work.
325///
326/// The token flips to "cancelled" when the associated scope leaves composition.
327/// Callers should periodically check [`CancelToken::is_cancelled`] in long-running
328/// operations and exit early; blocking I/O will not be interrupted automatically.
329pub struct CancelToken {
330    active: Arc<AtomicBool>,
331}
332
333impl CancelToken {
334    fn new(active: Arc<AtomicBool>) -> Self {
335        Self { active }
336    }
337
338    /// Returns `true` once the associated scope has been cancelled.
339    pub fn is_cancelled(&self) -> bool {
340        !self.active.load(Ordering::SeqCst)
341    }
342
343    /// Returns whether the scope is still active.
344    pub fn is_active(&self) -> bool {
345        self.active.load(Ordering::SeqCst)
346    }
347}
348
349pub fn __launched_effect_impl<K, F>(group_key: Key, keys: K, effect: F)
350where
351    K: Hash,
352    F: FnOnce(LaunchedEffectScope) + 'static,
353{
354    // Create a group using the caller's location to ensure each LaunchedEffect
355    // gets its own slot table entry, even in conditional branches
356    with_current_composer(|composer| {
357        composer.with_group(group_key, |composer| {
358            let key_hash = hash_key(&keys);
359            let state = composer.remember(LaunchedEffectState::default);
360            if state.with(|state| state.should_run(key_hash)) {
361                state.update(|state| state.set_key(key_hash));
362                let runtime = composer.runtime_handle();
363                let state_for_effect = state.clone();
364                let mut effect_opt = Some(effect);
365                composer.register_side_effect(move || {
366                    if let Some(effect) = effect_opt.take() {
367                        state_for_effect.update(|state| state.launch(runtime.clone(), effect));
368                    }
369                });
370            }
371        });
372    });
373}
374
375#[macro_export]
376macro_rules! LaunchedEffect {
377    ($keys:expr, $effect:expr) => {
378        $crate::__launched_effect_impl(
379            $crate::location_key(file!(), line!(), column!()),
380            $keys,
381            $effect,
382        )
383    };
384}
385
386pub fn __launched_effect_async_impl<K, F>(group_key: Key, keys: K, mk_future: F)
387where
388    K: Hash,
389    F: FnOnce(LaunchedEffectScope) -> Pin<Box<dyn Future<Output = ()>>> + 'static,
390{
391    with_current_composer(|composer| {
392        composer.with_group(group_key, |composer| {
393            let key_hash = hash_key(&keys);
394            let state = composer.remember(LaunchedEffectAsyncState::default);
395            if state.with(|state| state.should_run(key_hash)) {
396                state.update(|state| state.set_key(key_hash));
397                let runtime = composer.runtime_handle();
398                let state_for_effect = state.clone();
399                let mut mk_future_opt = Some(mk_future);
400                composer.register_side_effect(move || {
401                    if let Some(mk_future) = mk_future_opt.take() {
402                        state_for_effect.update(|state| {
403                            state.launch(runtime.clone(), mk_future);
404                        });
405                    }
406                });
407            }
408        });
409    });
410}
411
412#[macro_export]
413macro_rules! LaunchedEffectAsync {
414    ($keys:expr, $future:expr) => {
415        $crate::__launched_effect_async_impl(
416            $crate::location_key(file!(), line!(), column!()),
417            $keys,
418            $future,
419        )
420    };
421}