Skip to main content

cranpose_core/
launched_effect.rs

1use crate::{hash_key, with_current_composer, Key, RuntimeHandle, TaskHandle};
2use std::cell::{Cell, RefCell};
3use std::future::Future;
4use std::hash::Hash;
5use std::pin::Pin;
6use std::rc::Rc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10#[derive(Default)]
11struct LaunchedEffectState {
12    key: Option<Key>,
13    cancel: Option<LaunchedEffectCancellation>,
14}
15
16struct LaunchedEffectCancellation {
17    runtime: RuntimeHandle,
18    active: Arc<AtomicBool>,
19    continuations: Rc<RefCell<Vec<u64>>>,
20}
21
22#[derive(Default)]
23struct LaunchedEffectAsyncState {
24    key: Option<Key>,
25    cancel: Option<LaunchedEffectCancellation>,
26    task: Option<TaskHandle>,
27}
28
29impl LaunchedEffectState {
30    fn should_run(&self, key: Key) -> bool {
31        match self.key {
32            Some(current) => current != key,
33            None => true,
34        }
35    }
36
37    fn set_key(&mut self, key: Key) {
38        self.key = Some(key);
39    }
40
41    fn launch(
42        &mut self,
43        runtime: RuntimeHandle,
44        effect: impl FnOnce(LaunchedEffectScope) + 'static,
45    ) {
46        self.cancel_current();
47        let active = Arc::new(AtomicBool::new(true));
48        let continuations = Rc::new(RefCell::new(Vec::new()));
49        self.cancel = Some(LaunchedEffectCancellation {
50            runtime: runtime.clone(),
51            active: Arc::clone(&active),
52            continuations: Rc::clone(&continuations),
53        });
54        let scope = LaunchedEffectScope {
55            active: Arc::clone(&active),
56            runtime: runtime.clone(),
57            continuations,
58        };
59        runtime.enqueue_ui_task(Box::new(move || effect(scope)));
60    }
61
62    fn cancel_current(&mut self) {
63        if let Some(cancel) = self.cancel.take() {
64            cancel.cancel();
65        }
66    }
67}
68
69impl LaunchedEffectCancellation {
70    fn cancel(&self) {
71        self.active.store(false, Ordering::SeqCst);
72        let mut pending = self.continuations.borrow_mut();
73        for id in pending.drain(..) {
74            self.runtime.cancel_ui_cont(id);
75        }
76    }
77}
78
79impl LaunchedEffectAsyncState {
80    fn should_run(&self, key: Key) -> bool {
81        match self.key {
82            Some(current) => current != key,
83            None => true,
84        }
85    }
86
87    fn set_key(&mut self, key: Key) {
88        self.key = Some(key);
89    }
90
91    fn launch(
92        &mut self,
93        runtime: RuntimeHandle,
94        mk_future: impl FnOnce(LaunchedEffectScope) -> Pin<Box<dyn Future<Output = ()>>> + 'static,
95    ) {
96        self.cancel_current();
97        let active = Arc::new(AtomicBool::new(true));
98        let continuations = Rc::new(RefCell::new(Vec::new()));
99        self.cancel = Some(LaunchedEffectCancellation {
100            runtime: runtime.clone(),
101            active: Arc::clone(&active),
102            continuations: Rc::clone(&continuations),
103        });
104        let scope = LaunchedEffectScope {
105            active: Arc::clone(&active),
106            runtime: runtime.clone(),
107            continuations,
108        };
109        let future = mk_future(scope.clone());
110        let active_flag = Arc::clone(&scope.active);
111        match runtime.spawn_ui(async move {
112            future.await;
113            active_flag.store(false, Ordering::SeqCst);
114        }) {
115            Some(handle) => {
116                self.task = Some(handle);
117            }
118            None => {
119                active.store(false, Ordering::SeqCst);
120                self.cancel = None;
121            }
122        }
123    }
124
125    fn cancel_current(&mut self) {
126        if let Some(handle) = self.task.take() {
127            handle.cancel();
128        }
129        if let Some(cancel) = self.cancel.take() {
130            cancel.cancel();
131        }
132    }
133}
134
135impl Drop for LaunchedEffectState {
136    fn drop(&mut self) {
137        self.cancel_current();
138    }
139}
140
141impl Drop for LaunchedEffectAsyncState {
142    fn drop(&mut self) {
143        self.cancel_current();
144    }
145}
146
147#[derive(Clone)]
148pub struct LaunchedEffectScope {
149    active: Arc<AtomicBool>,
150    runtime: RuntimeHandle,
151    continuations: Rc<RefCell<Vec<u64>>>,
152}
153
154impl LaunchedEffectScope {
155    fn track_continuation(&self, id: u64) {
156        self.continuations.borrow_mut().push(id);
157    }
158
159    fn release_continuation(&self, id: u64) {
160        let mut continuations = self.continuations.borrow_mut();
161        if let Some(index) = continuations.iter().position(|entry| *entry == id) {
162            continuations.remove(index);
163        }
164    }
165
166    pub fn is_active(&self) -> bool {
167        self.active.load(Ordering::SeqCst)
168    }
169
170    pub fn runtime(&self) -> RuntimeHandle {
171        self.runtime.clone()
172    }
173
174    /// Runs a follow-up `LaunchedEffect` task on the UI thread.
175    ///
176    /// The provided closure executes on the runtime thread and may freely
177    /// capture `Rc`/`RefCell` state. This must only be called from the UI
178    /// thread, typically inside another effect callback.
179    pub fn launch(&self, task: impl FnOnce(LaunchedEffectScope) + 'static) {
180        if !self.is_active() {
181            return;
182        }
183        let scope = self.clone();
184        self.runtime.enqueue_ui_task(Box::new(move || {
185            if scope.is_active() {
186                task(scope);
187            }
188        }));
189    }
190
191    /// Posts UI-only work that will execute on the runtime thread.
192    ///
193    /// The closure never crosses threads, so it may capture non-`Send` values.
194    /// Callers must invoke this from the UI thread.
195    pub fn post_ui(&self, task: impl FnOnce() + 'static) {
196        if !self.is_active() {
197            return;
198        }
199        let active = Arc::clone(&self.active);
200        self.runtime.enqueue_ui_task(Box::new(move || {
201            if active.load(Ordering::SeqCst) {
202                task();
203            }
204        }));
205    }
206
207    /// Posts work from any thread to run on the UI thread.
208    ///
209    /// The closure must be `Send` because it may be sent across threads before
210    /// running on the runtime thread. Use this helper when posting from
211    /// background threads that need to interact with UI state.
212    pub fn post_ui_send(&self, task: impl FnOnce() + Send + 'static) {
213        if !self.is_active() {
214            return;
215        }
216        let active = Arc::clone(&self.active);
217        self.runtime.post_ui(move || {
218            if active.load(Ordering::SeqCst) {
219                task();
220            }
221        });
222    }
223
224    /// Runs background work on a worker thread and delivers results to the UI.
225    ///
226    /// `work` executes on a background thread, receives a cooperative
227    /// [`CancelToken`], and must produce a `Send` value. The `on_ui` continuation
228    /// runs on the runtime thread, so it may capture `Rc`/`RefCell` state safely.
229    pub fn launch_background<T, Work, Ui>(&self, work: Work, on_ui: Ui)
230    where
231        T: Send + 'static,
232        Work: FnOnce(CancelToken) -> T + Send + 'static,
233        Ui: FnOnce(T) + 'static,
234    {
235        if !self.is_active() {
236            return;
237        }
238        let dispatcher = self.runtime.dispatcher();
239        let active_for_thread = Arc::clone(&self.active);
240        let continuation_scope = self.clone();
241        let continuation_active = Arc::clone(&self.active);
242        let id_cell = Rc::new(Cell::new(0));
243        let id_for_closure = Rc::clone(&id_cell);
244        let continuation = move |value: T| {
245            let id = id_for_closure.get();
246            continuation_scope.release_continuation(id);
247            if continuation_active.load(Ordering::SeqCst) {
248                on_ui(value);
249            }
250        };
251
252        let Some(cont_id) = self.runtime.register_ui_cont(continuation) else {
253            return;
254        };
255        id_cell.set(cont_id);
256        self.track_continuation(cont_id);
257
258        std::thread::spawn(move || {
259            let token = CancelToken::new(Arc::clone(&active_for_thread));
260            let value = work(token.clone());
261            if token.is_cancelled() {
262                return;
263            }
264            dispatcher.post_invoke(cont_id, value);
265        });
266    }
267}
268
269#[derive(Clone)]
270/// Cooperative cancellation token passed into background `LaunchedEffect` work.
271///
272/// The token flips to "cancelled" when the associated scope leaves composition.
273/// Callers should periodically check [`CancelToken::is_cancelled`] in long-running
274/// operations and exit early; blocking I/O will not be interrupted automatically.
275pub struct CancelToken {
276    active: Arc<AtomicBool>,
277}
278
279impl CancelToken {
280    fn new(active: Arc<AtomicBool>) -> Self {
281        Self { active }
282    }
283
284    /// Returns `true` once the associated scope has been cancelled.
285    pub fn is_cancelled(&self) -> bool {
286        !self.active.load(Ordering::SeqCst)
287    }
288
289    /// Returns whether the scope is still active.
290    pub fn is_active(&self) -> bool {
291        self.active.load(Ordering::SeqCst)
292    }
293}
294
295pub fn __launched_effect_impl<K, F>(group_key: Key, keys: K, effect: F)
296where
297    K: Hash,
298    F: FnOnce(LaunchedEffectScope) + 'static,
299{
300    // Create a group using the caller's location to ensure each LaunchedEffect
301    // gets its own slot table entry, even in conditional branches
302    with_current_composer(|composer| {
303        composer.with_group(group_key, |composer| {
304            let key_hash = hash_key(&keys);
305            let state = composer.remember(LaunchedEffectState::default);
306            if state.with(|state| state.should_run(key_hash)) {
307                state.update(|state| state.set_key(key_hash));
308                let runtime = composer.runtime_handle();
309                let state_for_effect = state.clone();
310                let mut effect_opt = Some(effect);
311                composer.register_side_effect(move || {
312                    if let Some(effect) = effect_opt.take() {
313                        state_for_effect.update(|state| state.launch(runtime.clone(), effect));
314                    }
315                });
316            }
317        });
318    });
319}
320
321#[macro_export]
322macro_rules! LaunchedEffect {
323    ($keys:expr, $effect:expr) => {
324        $crate::__launched_effect_impl(
325            $crate::location_key(file!(), line!(), column!()),
326            $keys,
327            $effect,
328        )
329    };
330}
331
332pub fn __launched_effect_async_impl<K, F>(group_key: Key, keys: K, mk_future: F)
333where
334    K: Hash,
335    F: FnOnce(LaunchedEffectScope) -> Pin<Box<dyn Future<Output = ()>>> + 'static,
336{
337    with_current_composer(|composer| {
338        composer.with_group(group_key, |composer| {
339            let key_hash = hash_key(&keys);
340            let state = composer.remember(LaunchedEffectAsyncState::default);
341            if state.with(|state| state.should_run(key_hash)) {
342                state.update(|state| state.set_key(key_hash));
343                let runtime = composer.runtime_handle();
344                let state_for_effect = state.clone();
345                let mut mk_future_opt = Some(mk_future);
346                composer.register_side_effect(move || {
347                    if let Some(mk_future) = mk_future_opt.take() {
348                        state_for_effect.update(|state| {
349                            state.launch(runtime.clone(), mk_future);
350                        });
351                    }
352                });
353            }
354        });
355    });
356}
357
358#[macro_export]
359macro_rules! LaunchedEffectAsync {
360    ($keys:expr, $future:expr) => {
361        $crate::__launched_effect_async_impl(
362            $crate::location_key(file!(), line!(), column!()),
363            $keys,
364            $future,
365        )
366    };
367}