reactive_graph/effect/effect.rs
1use crate::{
2 channel::{channel, Receiver},
3 effect::{inner::EffectInner, EffectFunction},
4 graph::{
5 AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber,
6 WithObserver,
7 },
8 owner::{ArenaItem, LocalStorage, Owner, Storage, SyncStorage},
9 traits::Dispose,
10};
11use any_spawner::Executor;
12use futures::StreamExt;
13use or_poisoned::OrPoisoned;
14use std::{
15 mem,
16 sync::{atomic::AtomicBool, Arc, RwLock},
17};
18
19/// Effects run a certain chunk of code whenever the signals they depend on change.
20///
21/// Creating an effect runs the given function once after any current synchronous work is done.
22/// This tracks its reactive values read within it, and reruns the function whenever the value
23/// of a dependency changes.
24///
25/// Effects are intended to run *side-effects* of the system, not to synchronize state
26/// *within* the system. In other words: In most cases, you usually should not write to
27/// signals inside effects. (If you need to define a signal that depends on the value of
28/// other signals, use a derived signal or a [`Memo`](crate::computed::Memo)).
29///
30/// You can provide an effect function without parameters or one with one parameter.
31/// If you provide such a parameter, the effect function is called with an argument containing
32/// whatever value it returned the last time it ran. On the initial run, this is `None`.
33///
34/// Effects stop running when their reactive [`Owner`] is disposed.
35///
36///
37/// ## Example
38///
39/// ```
40/// # use reactive_graph::computed::*;
41/// # use reactive_graph::signal::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
42/// # use reactive_graph::prelude::*;
43/// # use reactive_graph::effect::Effect;
44/// # use reactive_graph::owner::ArenaItem;
45/// # tokio_test::block_on(async move {
46/// # tokio::task::LocalSet::new().run_until(async move {
47/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
48/// let a = RwSignal::new(0);
49/// let b = RwSignal::new(0);
50///
51/// // ✅ use effects to interact between reactive state and the outside world
52/// Effect::new(move || {
53/// // on the next “tick” prints "Value: 0" and subscribes to `a`
54/// println!("Value: {}", a.get());
55/// });
56///
57/// # assert_eq!(a.get(), 0);
58/// a.set(1);
59/// # assert_eq!(a.get(), 1);
60/// // ✅ because it's subscribed to `a`, the effect reruns and prints "Value: 1"
61///
62/// // ❌ don't use effects to synchronize state within the reactive system
63/// Effect::new(move || {
64/// // this technically works but can cause unnecessary re-renders
65/// // and easily lead to problems like infinite loops
66/// b.set(a.get() + 1);
67/// });
68/// # }).await;
69/// # });
70/// ```
71/// ## Web-Specific Notes
72///
73/// 1. **Scheduling**: Effects run after synchronous work, on the next “tick” of the reactive
74/// system. This makes them suitable for “on mount” actions: they will fire immediately after
75/// DOM rendering.
76/// 2. By default, effects do not run unless the `effects` feature is enabled. If you are using
77/// this with a web framework, this generally means that effects **do not run on the server**.
78/// and you can call browser-specific APIs within the effect function without causing issues.
79/// If you need an effect to run on the server, use [`Effect::new_isomorphic`].
80#[derive(Debug, Clone, Copy)]
81pub struct Effect<S> {
82 inner: Option<ArenaItem<StoredEffect, S>>,
83}
84
85type StoredEffect = Option<Arc<RwLock<EffectInner>>>;
86
87impl<S> Dispose for Effect<S> {
88 fn dispose(self) {
89 if let Some(inner) = self.inner {
90 inner.dispose()
91 }
92 }
93}
94
95fn effect_base() -> (Receiver, Owner, Arc<RwLock<EffectInner>>) {
96 let (mut observer, rx) = channel();
97
98 // spawn the effect asynchronously
99 // we'll notify once so it runs on the next tick,
100 // to register observed values
101 observer.notify();
102
103 let owner = Owner::new();
104 let inner = Arc::new(RwLock::new(EffectInner {
105 dirty: true,
106 observer,
107 sources: SourceSet::new(),
108 }));
109
110 (rx, owner, inner)
111}
112
113thread_local! {
114 static EFFECT_SCOPE_ACTIVE: AtomicBool = const { AtomicBool::new(false) };
115}
116
117/// Returns whether the current thread is currently running an effect.
118pub fn in_effect_scope() -> bool {
119 EFFECT_SCOPE_ACTIVE
120 .with(|scope| scope.load(std::sync::atomic::Ordering::Relaxed))
121}
122
123/// Set a static to true whilst running the given function.
124/// [`is_in_effect_scope`] will return true whilst the function is running.
125fn run_in_effect_scope<T>(fun: impl FnOnce() -> T) -> T {
126 // For the theoretical nested case, set back to initial value rather than false:
127 let initial = EFFECT_SCOPE_ACTIVE
128 .with(|scope| scope.swap(true, std::sync::atomic::Ordering::Relaxed));
129 let result = fun();
130 EFFECT_SCOPE_ACTIVE.with(|scope| {
131 scope.store(initial, std::sync::atomic::Ordering::Relaxed)
132 });
133 result
134}
135
136impl<S> Effect<S>
137where
138 S: Storage<StoredEffect>,
139{
140 /// Stops this effect before it is disposed.
141 pub fn stop(self) {
142 if let Some(inner) = self
143 .inner
144 .and_then(|this| this.try_update_value(|inner| inner.take()))
145 {
146 drop(inner);
147 }
148 }
149}
150
151impl Effect<LocalStorage> {
152 /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
153 /// that are read inside it change.
154 ///
155 /// This spawns a task on the local thread using
156 /// [`spawn_local`](any_spawner::Executor::spawn_local). For an effect that can be spawned on
157 /// any thread, use [`new_sync`](Effect::new_sync).
158 pub fn new<T, M>(mut fun: impl EffectFunction<T, M> + 'static) -> Self
159 where
160 T: 'static,
161 {
162 let inner = cfg!(feature = "effects").then(|| {
163 let (mut rx, owner, inner) = effect_base();
164 let value = Arc::new(RwLock::new(None::<T>));
165 let mut first_run = true;
166
167 Executor::spawn_local({
168 let value = Arc::clone(&value);
169 let subscriber = inner.to_any_subscriber();
170
171 async move {
172 while rx.next().await.is_some() {
173 if !owner.paused()
174 && (subscriber.with_observer(|| {
175 subscriber.update_if_necessary()
176 }) || first_run)
177 {
178 first_run = false;
179 subscriber.clear_sources(&subscriber);
180
181 let old_value =
182 mem::take(&mut *value.write().or_poisoned());
183 let new_value = owner.with_cleanup(|| {
184 subscriber.with_observer(|| {
185 run_in_effect_scope(|| fun.run(old_value))
186 })
187 });
188 *value.write().or_poisoned() = Some(new_value);
189 }
190 }
191 }
192 });
193
194 ArenaItem::new_with_storage(Some(inner))
195 });
196
197 Self { inner }
198 }
199
200 /// A version of [`Effect::new`] that only listens to any dependency
201 /// that is accessed inside `dependency_fn`.
202 ///
203 /// The return value of `dependency_fn` is passed into `handler` as an argument together with the previous value.
204 /// Additionally, the last return value of `handler` is provided as a third argument, as is done in [`Effect::new`].
205 ///
206 /// ## Usage
207 ///
208 /// ```
209 /// # use reactive_graph::effect::Effect;
210 /// # use reactive_graph::traits::*;
211 /// # use reactive_graph::signal::signal;
212 /// # tokio_test::block_on(async move {
213 /// # tokio::task::LocalSet::new().run_until(async move {
214 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
215 /// #
216 /// let (num, set_num) = signal(0);
217 ///
218 /// let effect = Effect::watch(
219 /// move || num.get(),
220 /// move |num, prev_num, _| {
221 /// // log::debug!("Number: {}; Prev: {:?}", num, prev_num);
222 /// },
223 /// false,
224 /// );
225 /// # assert_eq!(num.get(), 0);
226 ///
227 /// set_num.set(1); // > "Number: 1; Prev: Some(0)"
228 /// # assert_eq!(num.get(), 1);
229 ///
230 /// effect.stop(); // stop watching
231 ///
232 /// set_num.set(2); // (nothing happens)
233 /// # assert_eq!(num.get(), 2);
234 /// # }).await;
235 /// # });
236 /// ```
237 ///
238 /// The callback itself doesn't track any signal that is accessed within it.
239 ///
240 /// ```
241 /// # use reactive_graph::effect::Effect;
242 /// # use reactive_graph::traits::*;
243 /// # use reactive_graph::signal::signal;
244 /// # tokio_test::block_on(async move {
245 /// # tokio::task::LocalSet::new().run_until(async move {
246 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
247 /// #
248 /// let (num, set_num) = signal(0);
249 /// let (cb_num, set_cb_num) = signal(0);
250 ///
251 /// Effect::watch(
252 /// move || num.get(),
253 /// move |num, _, _| {
254 /// // log::debug!("Number: {}; Cb: {}", num, cb_num.get());
255 /// },
256 /// false,
257 /// );
258 ///
259 /// # assert_eq!(num.get(), 0);
260 /// set_num.set(1); // > "Number: 1; Cb: 0"
261 /// # assert_eq!(num.get(), 1);
262 ///
263 /// # assert_eq!(cb_num.get(), 0);
264 /// set_cb_num.set(1); // (nothing happens)
265 /// # assert_eq!(cb_num.get(), 1);
266 ///
267 /// set_num.set(2); // > "Number: 2; Cb: 1"
268 /// # assert_eq!(num.get(), 2);
269 /// # }).await;
270 /// # });
271 /// ```
272 ///
273 /// ## Immediate
274 ///
275 /// If the final parameter `immediate` is true, the `handler` will run immediately.
276 /// If it's `false`, the `handler` will run only after
277 /// the first change is detected of any signal that is accessed in `dependency_fn`.
278 ///
279 /// ```
280 /// # use reactive_graph::effect::Effect;
281 /// # use reactive_graph::traits::*;
282 /// # use reactive_graph::signal::signal;
283 /// # tokio_test::block_on(async move {
284 /// # tokio::task::LocalSet::new().run_until(async move {
285 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
286 /// #
287 /// let (num, set_num) = signal(0);
288 ///
289 /// Effect::watch(
290 /// move || num.get(),
291 /// move |num, prev_num, _| {
292 /// // log::debug!("Number: {}; Prev: {:?}", num, prev_num);
293 /// },
294 /// true,
295 /// ); // > "Number: 0; Prev: None"
296 ///
297 /// # assert_eq!(num.get(), 0);
298 /// set_num.set(1); // > "Number: 1; Prev: Some(0)"
299 /// # assert_eq!(num.get(), 1);
300 /// # }).await;
301 /// # });
302 /// ```
303 pub fn watch<D, T>(
304 mut dependency_fn: impl FnMut() -> D + 'static,
305 mut handler: impl FnMut(&D, Option<&D>, Option<T>) -> T + 'static,
306 immediate: bool,
307 ) -> Self
308 where
309 D: 'static,
310 T: 'static,
311 {
312 let inner = cfg!(feature = "effects").then(|| {
313 let (mut rx, owner, inner) = effect_base();
314 let mut first_run = true;
315 let dep_value = Arc::new(RwLock::new(None::<D>));
316 let watch_value = Arc::new(RwLock::new(None::<T>));
317
318 Executor::spawn_local({
319 let dep_value = Arc::clone(&dep_value);
320 let watch_value = Arc::clone(&watch_value);
321 let subscriber = inner.to_any_subscriber();
322
323 async move {
324 while rx.next().await.is_some() {
325 if !owner.paused()
326 && (subscriber.with_observer(|| {
327 subscriber.update_if_necessary()
328 }) || first_run)
329 {
330 subscriber.clear_sources(&subscriber);
331
332 let old_dep_value = mem::take(
333 &mut *dep_value.write().or_poisoned(),
334 );
335 let new_dep_value = owner.with_cleanup(|| {
336 subscriber.with_observer(&mut dependency_fn)
337 });
338
339 let old_watch_value = mem::take(
340 &mut *watch_value.write().or_poisoned(),
341 );
342
343 if immediate || !first_run {
344 let new_watch_value = handler(
345 &new_dep_value,
346 old_dep_value.as_ref(),
347 old_watch_value,
348 );
349
350 *watch_value.write().or_poisoned() =
351 Some(new_watch_value);
352 }
353
354 *dep_value.write().or_poisoned() =
355 Some(new_dep_value);
356
357 first_run = false;
358 }
359 }
360 }
361 });
362
363 ArenaItem::new_with_storage(Some(inner))
364 });
365
366 Self { inner }
367 }
368}
369
370impl Effect<SyncStorage> {
371 /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
372 /// that are read inside it change.
373 ///
374 /// This spawns a task that can be run on any thread. For an effect that will be spawned on
375 /// the current thread, use [`new`](Effect::new).
376 pub fn new_sync<T, M>(
377 fun: impl EffectFunction<T, M> + Send + Sync + 'static,
378 ) -> Self
379 where
380 T: Send + Sync + 'static,
381 {
382 if !cfg!(feature = "effects") {
383 return Self { inner: None };
384 }
385
386 Self::new_isomorphic(fun)
387 }
388
389 /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
390 /// that are read inside it change.
391 ///
392 /// This will run whether the `effects` feature is enabled or not.
393 pub fn new_isomorphic<T, M>(
394 mut fun: impl EffectFunction<T, M> + Send + Sync + 'static,
395 ) -> Self
396 where
397 T: Send + Sync + 'static,
398 {
399 let (mut rx, owner, inner) = effect_base();
400 let mut first_run = true;
401 let value = Arc::new(RwLock::new(None::<T>));
402
403 let task = {
404 let value = Arc::clone(&value);
405 let subscriber = inner.to_any_subscriber();
406
407 async move {
408 while rx.next().await.is_some() {
409 if !owner.paused()
410 && (subscriber
411 .with_observer(|| subscriber.update_if_necessary())
412 || first_run)
413 {
414 first_run = false;
415 subscriber.clear_sources(&subscriber);
416
417 let old_value =
418 mem::take(&mut *value.write().or_poisoned());
419 let new_value = owner.with_cleanup(|| {
420 subscriber.with_observer(|| {
421 run_in_effect_scope(|| fun.run(old_value))
422 })
423 });
424 *value.write().or_poisoned() = Some(new_value);
425 }
426 }
427 }
428 };
429
430 crate::spawn(task);
431
432 Self {
433 inner: Some(ArenaItem::new_with_storage(Some(inner))),
434 }
435 }
436
437 /// This is to [`Effect::watch`] what [`Effect::new_sync`] is to [`Effect::new`].
438 pub fn watch_sync<D, T>(
439 mut dependency_fn: impl FnMut() -> D + Send + Sync + 'static,
440 mut handler: impl FnMut(&D, Option<&D>, Option<T>) -> T
441 + Send
442 + Sync
443 + 'static,
444 immediate: bool,
445 ) -> Self
446 where
447 D: Send + Sync + 'static,
448 T: Send + Sync + 'static,
449 {
450 let (mut rx, owner, inner) = effect_base();
451 let mut first_run = true;
452 let dep_value = Arc::new(RwLock::new(None::<D>));
453 let watch_value = Arc::new(RwLock::new(None::<T>));
454
455 let inner = cfg!(feature = "effects").then(|| {
456 crate::spawn({
457 let dep_value = Arc::clone(&dep_value);
458 let watch_value = Arc::clone(&watch_value);
459 let subscriber = inner.to_any_subscriber();
460
461 async move {
462 while rx.next().await.is_some() {
463 if !owner.paused()
464 && (subscriber.with_observer(|| {
465 subscriber.update_if_necessary()
466 }) || first_run)
467 {
468 subscriber.clear_sources(&subscriber);
469
470 let old_dep_value = mem::take(
471 &mut *dep_value.write().or_poisoned(),
472 );
473 let new_dep_value = owner.with_cleanup(|| {
474 subscriber.with_observer(&mut dependency_fn)
475 });
476
477 let old_watch_value = mem::take(
478 &mut *watch_value.write().or_poisoned(),
479 );
480
481 if immediate || !first_run {
482 let new_watch_value = handler(
483 &new_dep_value,
484 old_dep_value.as_ref(),
485 old_watch_value,
486 );
487
488 *watch_value.write().or_poisoned() =
489 Some(new_watch_value);
490 }
491
492 *dep_value.write().or_poisoned() =
493 Some(new_dep_value);
494
495 first_run = false;
496 }
497 }
498 }
499 });
500
501 ArenaItem::new_with_storage(Some(inner))
502 });
503
504 Self { inner }
505 }
506}
507
508impl<S> ToAnySubscriber for Effect<S>
509where
510 S: Storage<StoredEffect>,
511{
512 fn to_any_subscriber(&self) -> AnySubscriber {
513 self.inner
514 .and_then(|inner| {
515 inner
516 .try_with_value(|inner| {
517 inner.as_ref().map(|inner| inner.to_any_subscriber())
518 })
519 .flatten()
520 })
521 .expect("tried to set effect that has been stopped")
522 }
523}
524
525/// Creates an [`Effect`].
526#[inline(always)]
527#[track_caller]
528#[deprecated = "This function is being removed to conform to Rust idioms. \
529 Please use `Effect::new()` instead."]
530pub fn create_effect<T>(
531 fun: impl FnMut(Option<T>) -> T + 'static,
532) -> Effect<LocalStorage>
533where
534 T: 'static,
535{
536 Effect::new(fun)
537}
538
539/// Creates an [`Effect`], equivalent to [Effect::watch].
540#[inline(always)]
541#[track_caller]
542#[deprecated = "This function is being removed to conform to Rust idioms. \
543 Please use `Effect::watch()` instead."]
544pub fn watch<W, T>(
545 deps: impl Fn() -> W + 'static,
546 callback: impl Fn(&W, Option<&W>, Option<T>) -> T + Clone + 'static,
547 immediate: bool,
548) -> impl Fn() + Clone
549where
550 W: Clone + 'static,
551 T: 'static,
552{
553 let watch = Effect::watch(deps, callback, immediate);
554
555 move || watch.stop()
556}