1use crate::{hash_key, with_current_composer, Key, RuntimeHandle, TaskHandle};
2#[cfg(not(target_arch = "wasm32"))]
3use std::cell::{Cell, RefCell};
4use std::future::Future;
5use std::hash::Hash;
6use std::pin::Pin;
7#[cfg(not(target_arch = "wasm32"))]
8use std::rc::Rc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11
12#[derive(Default)]
13struct LaunchedEffectState {
14 key: Option<Key>,
15 cancel: Option<LaunchedEffectCancellation>,
16}
17
18struct LaunchedEffectCancellation {
19 #[cfg(not(target_arch = "wasm32"))]
20 runtime: RuntimeHandle,
21 active: Arc<AtomicBool>,
22 #[cfg(not(target_arch = "wasm32"))]
23 continuations: Rc<RefCell<Vec<u64>>>,
24}
25
26#[derive(Default)]
27struct LaunchedEffectAsyncState {
28 key: Option<Key>,
29 cancel: Option<LaunchedEffectCancellation>,
30 task: Option<TaskHandle>,
31}
32
33impl LaunchedEffectState {
34 fn should_run(&self, key: Key) -> bool {
35 match self.key {
36 Some(current) => current != key,
37 None => true,
38 }
39 }
40
41 fn set_key(&mut self, key: Key) {
42 self.key = Some(key);
43 }
44
45 fn launch(
46 &mut self,
47 runtime: RuntimeHandle,
48 effect: impl FnOnce(LaunchedEffectScope) + 'static,
49 ) {
50 self.cancel_current();
51 let active = Arc::new(AtomicBool::new(true));
52 #[cfg(not(target_arch = "wasm32"))]
53 let continuations = Rc::new(RefCell::new(Vec::new()));
54 self.cancel = Some(LaunchedEffectCancellation {
55 #[cfg(not(target_arch = "wasm32"))]
56 runtime: runtime.clone(),
57 active: Arc::clone(&active),
58 #[cfg(not(target_arch = "wasm32"))]
59 continuations: Rc::clone(&continuations),
60 });
61 let scope = LaunchedEffectScope {
62 active: Arc::clone(&active),
63 runtime: runtime.clone(),
64 #[cfg(not(target_arch = "wasm32"))]
65 continuations,
66 };
67 runtime.enqueue_ui_task(Box::new(move || effect(scope)));
68 }
69
70 fn cancel_current(&mut self) {
71 if let Some(cancel) = self.cancel.take() {
72 cancel.cancel();
73 }
74 }
75}
76
77impl LaunchedEffectCancellation {
78 fn cancel(&self) {
79 self.active.store(false, Ordering::SeqCst);
80 #[cfg(not(target_arch = "wasm32"))]
81 {
82 let mut pending = self.continuations.borrow_mut();
83 for id in pending.drain(..) {
84 self.runtime.cancel_ui_cont(id);
85 }
86 }
87 }
88}
89
90impl LaunchedEffectAsyncState {
91 fn should_run(&self, key: Key) -> bool {
92 match self.key {
93 Some(current) => current != key,
94 None => true,
95 }
96 }
97
98 fn set_key(&mut self, key: Key) {
99 self.key = Some(key);
100 }
101
102 fn launch(
103 &mut self,
104 runtime: RuntimeHandle,
105 mk_future: impl FnOnce(LaunchedEffectScope) -> Pin<Box<dyn Future<Output = ()>>> + 'static,
106 ) {
107 self.cancel_current();
108 let active = Arc::new(AtomicBool::new(true));
109 #[cfg(not(target_arch = "wasm32"))]
110 let continuations = Rc::new(RefCell::new(Vec::new()));
111 self.cancel = Some(LaunchedEffectCancellation {
112 #[cfg(not(target_arch = "wasm32"))]
113 runtime: runtime.clone(),
114 active: Arc::clone(&active),
115 #[cfg(not(target_arch = "wasm32"))]
116 continuations: Rc::clone(&continuations),
117 });
118 let scope = LaunchedEffectScope {
119 active: Arc::clone(&active),
120 runtime: runtime.clone(),
121 #[cfg(not(target_arch = "wasm32"))]
122 continuations,
123 };
124 let future = mk_future(scope.clone());
125 let active_flag = Arc::clone(&scope.active);
126 match runtime.spawn_ui(async move {
127 future.await;
128 active_flag.store(false, Ordering::SeqCst);
129 }) {
130 Some(handle) => {
131 self.task = Some(handle);
132 }
133 None => {
134 active.store(false, Ordering::SeqCst);
135 self.cancel = None;
136 }
137 }
138 }
139
140 fn cancel_current(&mut self) {
141 if let Some(handle) = self.task.take() {
142 handle.cancel();
143 }
144 if let Some(cancel) = self.cancel.take() {
145 cancel.cancel();
146 }
147 }
148}
149
150impl Drop for LaunchedEffectState {
151 fn drop(&mut self) {
152 self.cancel_current();
153 }
154}
155
156impl Drop for LaunchedEffectAsyncState {
157 fn drop(&mut self) {
158 self.cancel_current();
159 }
160}
161
162#[derive(Clone)]
163pub struct LaunchedEffectScope {
164 active: Arc<AtomicBool>,
165 runtime: RuntimeHandle,
166 #[cfg(not(target_arch = "wasm32"))]
167 continuations: Rc<RefCell<Vec<u64>>>,
168}
169
170impl LaunchedEffectScope {
171 #[cfg(not(target_arch = "wasm32"))]
172 fn track_continuation(&self, id: u64) {
173 self.continuations.borrow_mut().push(id);
174 }
175
176 #[cfg(not(target_arch = "wasm32"))]
177 fn release_continuation(&self, id: u64) {
178 let mut continuations = self.continuations.borrow_mut();
179 if let Some(index) = continuations.iter().position(|entry| *entry == id) {
180 continuations.remove(index);
181 }
182 }
183
184 pub fn is_active(&self) -> bool {
185 self.active.load(Ordering::SeqCst)
186 }
187
188 pub fn runtime(&self) -> RuntimeHandle {
189 self.runtime.clone()
190 }
191
192 pub fn launch(&self, task: impl FnOnce(LaunchedEffectScope) + 'static) {
198 if !self.is_active() {
199 return;
200 }
201 let scope = self.clone();
202 self.runtime.enqueue_ui_task(Box::new(move || {
203 if scope.is_active() {
204 task(scope);
205 }
206 }));
207 }
208
209 pub fn post_ui(&self, task: impl FnOnce() + 'static) {
214 if !self.is_active() {
215 return;
216 }
217 let active = Arc::clone(&self.active);
218 self.runtime.enqueue_ui_task(Box::new(move || {
219 if active.load(Ordering::SeqCst) {
220 task();
221 }
222 }));
223 }
224
225 pub fn post_ui_send(&self, task: impl FnOnce() + Send + 'static) {
231 if !self.is_active() {
232 return;
233 }
234 let active = Arc::clone(&self.active);
235 self.runtime.post_ui(move || {
236 if active.load(Ordering::SeqCst) {
237 task();
238 }
239 });
240 }
241
242 #[cfg(not(target_arch = "wasm32"))]
249 pub fn launch_background<T, Work, Ui, Fut>(&self, work: Work, on_ui: Ui)
250 where
251 T: Send + 'static,
252 Work: FnOnce(CancelToken) -> Fut + Send + 'static,
253 Fut: Future<Output = T> + Send + 'static,
254 Ui: FnOnce(T) + 'static,
255 {
256 if !self.is_active() {
257 return;
258 }
259 let dispatcher = self.runtime.dispatcher();
260 let active_for_thread = Arc::clone(&self.active);
261 let continuation_scope = self.clone();
262 let continuation_active = Arc::clone(&self.active);
263 let id_cell = Rc::new(Cell::new(0));
264 let id_for_closure = Rc::clone(&id_cell);
265 let continuation = move |value: T| {
266 let id = id_for_closure.get();
267 continuation_scope.release_continuation(id);
268 if continuation_active.load(Ordering::SeqCst) {
269 on_ui(value);
270 }
271 };
272
273 let Some(cont_id) = self.runtime.register_ui_cont(continuation) else {
274 return;
275 };
276 id_cell.set(cont_id);
277 self.track_continuation(cont_id);
278
279 std::thread::spawn(move || {
280 let token = CancelToken::new(Arc::clone(&active_for_thread));
281 let value = pollster::block_on(work(token.clone()));
282 if token.is_cancelled() {
283 return;
284 }
285 dispatcher.post_invoke(cont_id, value);
286 });
287 }
288
289 #[cfg(target_arch = "wasm32")]
296 pub fn launch_background<T, Work, Ui, Fut>(&self, work: Work, on_ui: Ui)
297 where
298 T: 'static,
299 Work: FnOnce(CancelToken) -> Fut + 'static,
300 Fut: Future<Output = T> + 'static,
301 Ui: FnOnce(T) + 'static,
302 {
303 if !self.is_active() {
304 return;
305 }
306 let active_for_task = Arc::clone(&self.active);
307 let scope = self.clone();
308 wasm_bindgen_futures::spawn_local(async move {
309 let token = CancelToken::new(Arc::clone(&active_for_task));
310 let value = work(token.clone()).await;
311 if token.is_cancelled() {
312 return;
313 }
314 scope.post_ui(move || {
315 if token.is_active() {
316 on_ui(value);
317 }
318 });
319 });
320 }
321}
322
323#[derive(Clone)]
324pub struct CancelToken {
330 active: Arc<AtomicBool>,
331}
332
333impl CancelToken {
334 fn new(active: Arc<AtomicBool>) -> Self {
335 Self { active }
336 }
337
338 pub fn is_cancelled(&self) -> bool {
340 !self.active.load(Ordering::SeqCst)
341 }
342
343 pub fn is_active(&self) -> bool {
345 self.active.load(Ordering::SeqCst)
346 }
347}
348
349pub fn __launched_effect_impl<K, F>(group_key: Key, keys: K, effect: F)
350where
351 K: Hash,
352 F: FnOnce(LaunchedEffectScope) + 'static,
353{
354 with_current_composer(|composer| {
357 composer.with_group(group_key, |composer| {
358 let key_hash = hash_key(&keys);
359 let state = composer.remember(LaunchedEffectState::default);
360 if state.with(|state| state.should_run(key_hash)) {
361 state.update(|state| state.set_key(key_hash));
362 let runtime = composer.runtime_handle();
363 let state_for_effect = state.clone();
364 let mut effect_opt = Some(effect);
365 composer.register_side_effect(move || {
366 if let Some(effect) = effect_opt.take() {
367 state_for_effect.update(|state| state.launch(runtime.clone(), effect));
368 }
369 });
370 }
371 });
372 });
373}
374
375#[macro_export]
376macro_rules! LaunchedEffect {
377 ($keys:expr, $effect:expr) => {
378 $crate::__launched_effect_impl(
379 $crate::location_key(file!(), line!(), column!()),
380 $keys,
381 $effect,
382 )
383 };
384}
385
386pub fn __launched_effect_async_impl<K, F>(group_key: Key, keys: K, mk_future: F)
387where
388 K: Hash,
389 F: FnOnce(LaunchedEffectScope) -> Pin<Box<dyn Future<Output = ()>>> + 'static,
390{
391 with_current_composer(|composer| {
392 composer.with_group(group_key, |composer| {
393 let key_hash = hash_key(&keys);
394 let state = composer.remember(LaunchedEffectAsyncState::default);
395 if state.with(|state| state.should_run(key_hash)) {
396 state.update(|state| state.set_key(key_hash));
397 let runtime = composer.runtime_handle();
398 let state_for_effect = state.clone();
399 let mut mk_future_opt = Some(mk_future);
400 composer.register_side_effect(move || {
401 if let Some(mk_future) = mk_future_opt.take() {
402 state_for_effect.update(|state| {
403 state.launch(runtime.clone(), mk_future);
404 });
405 }
406 });
407 }
408 });
409 });
410}
411
412#[macro_export]
413macro_rules! LaunchedEffectAsync {
414 ($keys:expr, $future:expr) => {
415 $crate::__launched_effect_async_impl(
416 $crate::location_key(file!(), line!(), column!()),
417 $keys,
418 $future,
419 )
420 };
421}