reactive_graph/effect/
effect.rs

1use crate::{
2    channel::{channel, Receiver},
3    effect::{inner::EffectInner, EffectFunction},
4    graph::{
5        AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber,
6        WithObserver,
7    },
8    owner::{ArenaItem, LocalStorage, Owner, Storage, SyncStorage},
9    traits::Dispose,
10};
11use any_spawner::Executor;
12use futures::StreamExt;
13use or_poisoned::OrPoisoned;
14use std::{
15    mem,
16    sync::{atomic::AtomicBool, Arc, RwLock},
17};
18
19/// Effects run a certain chunk of code whenever the signals they depend on change.
20///
21/// Creating an effect runs the given function once after any current synchronous work is done.
22/// This tracks its reactive values read within it, and reruns the function whenever the value
23/// of a dependency changes.
24///
25/// Effects are intended to run *side-effects* of the system, not to synchronize state
26/// *within* the system. In other words: In most cases, you usually should not write to
27/// signals inside effects. (If you need to define a signal that depends on the value of
28/// other signals, use a derived signal or a [`Memo`](crate::computed::Memo)).
29///
30/// You can provide an effect function without parameters or one with one parameter.
31/// If you provide such a parameter, the effect function is called with an argument containing
32/// whatever value it returned the last time it ran. On the initial run, this is `None`.
33///
34/// Effects stop running when their reactive [`Owner`] is disposed.
35///
36///
37/// ## Example
38///
39/// ```
40/// # use reactive_graph::computed::*;
41/// # use reactive_graph::signal::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
42/// # use reactive_graph::prelude::*;
43/// # use reactive_graph::effect::Effect;
44/// # use reactive_graph::owner::ArenaItem;
45/// # tokio_test::block_on(async move {
46/// # tokio::task::LocalSet::new().run_until(async move {
47/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
48/// let a = RwSignal::new(0);
49/// let b = RwSignal::new(0);
50///
51/// // ✅ use effects to interact between reactive state and the outside world
52/// Effect::new(move || {
53///   // on the next “tick” prints "Value: 0" and subscribes to `a`
54///   println!("Value: {}", a.get());
55/// });
56///
57/// # assert_eq!(a.get(), 0);
58/// a.set(1);
59/// # assert_eq!(a.get(), 1);
60/// // ✅ because it's subscribed to `a`, the effect reruns and prints "Value: 1"
61///
62/// // ❌ don't use effects to synchronize state within the reactive system
63/// Effect::new(move || {
64///   // this technically works but can cause unnecessary re-renders
65///   // and easily lead to problems like infinite loops
66///   b.set(a.get() + 1);
67/// });
68/// # }).await;
69/// # });
70/// ```
71/// ## Web-Specific Notes
72///
73/// 1. **Scheduling**: Effects run after synchronous work, on the next “tick” of the reactive
74///    system. This makes them suitable for “on mount” actions: they will fire immediately after
75///    DOM rendering.
76/// 2. By default, effects do not run unless the `effects` feature is enabled. If you are using
77///    this with a web framework, this generally means that effects **do not run on the server**.
78///    and you can call browser-specific APIs within the effect function without causing issues.
79///    If you need an effect to run on the server, use [`Effect::new_isomorphic`].
80#[derive(Debug, Clone, Copy)]
81pub struct Effect<S> {
82    inner: Option<ArenaItem<StoredEffect, S>>,
83}
84
85type StoredEffect = Option<Arc<RwLock<EffectInner>>>;
86
87impl<S> Dispose for Effect<S> {
88    fn dispose(self) {
89        if let Some(inner) = self.inner {
90            inner.dispose()
91        }
92    }
93}
94
95fn effect_base() -> (Receiver, Owner, Arc<RwLock<EffectInner>>) {
96    let (mut observer, rx) = channel();
97
98    // spawn the effect asynchronously
99    // we'll notify once so it runs on the next tick,
100    // to register observed values
101    observer.notify();
102
103    let owner = Owner::new();
104    let inner = Arc::new(RwLock::new(EffectInner {
105        dirty: true,
106        observer,
107        sources: SourceSet::new(),
108    }));
109
110    (rx, owner, inner)
111}
112
113thread_local! {
114    static EFFECT_SCOPE_ACTIVE: AtomicBool = const { AtomicBool::new(false) };
115}
116
117/// Returns whether the current thread is currently running an effect.
118pub fn in_effect_scope() -> bool {
119    EFFECT_SCOPE_ACTIVE
120        .with(|scope| scope.load(std::sync::atomic::Ordering::Relaxed))
121}
122
123/// Set a static to true whilst running the given function.
124/// [`is_in_effect_scope`] will return true whilst the function is running.
125fn run_in_effect_scope<T>(fun: impl FnOnce() -> T) -> T {
126    // For the theoretical nested case, set back to initial value rather than false:
127    let initial = EFFECT_SCOPE_ACTIVE
128        .with(|scope| scope.swap(true, std::sync::atomic::Ordering::Relaxed));
129    let result = fun();
130    EFFECT_SCOPE_ACTIVE.with(|scope| {
131        scope.store(initial, std::sync::atomic::Ordering::Relaxed)
132    });
133    result
134}
135
136impl<S> Effect<S>
137where
138    S: Storage<StoredEffect>,
139{
140    /// Stops this effect before it is disposed.
141    pub fn stop(self) {
142        if let Some(inner) = self
143            .inner
144            .and_then(|this| this.try_update_value(|inner| inner.take()))
145        {
146            drop(inner);
147        }
148    }
149}
150
151impl Effect<LocalStorage> {
152    /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
153    /// that are read inside it change.
154    ///
155    /// This spawns a task on the local thread using
156    /// [`spawn_local`](any_spawner::Executor::spawn_local). For an effect that can be spawned on
157    /// any thread, use [`new_sync`](Effect::new_sync).
158    pub fn new<T, M>(mut fun: impl EffectFunction<T, M> + 'static) -> Self
159    where
160        T: 'static,
161    {
162        let inner = cfg!(feature = "effects").then(|| {
163            let (mut rx, owner, inner) = effect_base();
164            let value = Arc::new(RwLock::new(None::<T>));
165            let mut first_run = true;
166
167            Executor::spawn_local({
168                let value = Arc::clone(&value);
169                let subscriber = inner.to_any_subscriber();
170
171                async move {
172                    while rx.next().await.is_some() {
173                        if !owner.paused()
174                            && (subscriber.with_observer(|| {
175                                subscriber.update_if_necessary()
176                            }) || first_run)
177                        {
178                            first_run = false;
179                            subscriber.clear_sources(&subscriber);
180
181                            let old_value =
182                                mem::take(&mut *value.write().or_poisoned());
183                            let new_value = owner.with_cleanup(|| {
184                                subscriber.with_observer(|| {
185                                    run_in_effect_scope(|| fun.run(old_value))
186                                })
187                            });
188                            *value.write().or_poisoned() = Some(new_value);
189                        }
190                    }
191                }
192            });
193
194            ArenaItem::new_with_storage(Some(inner))
195        });
196
197        Self { inner }
198    }
199
200    /// A version of [`Effect::new`] that only listens to any dependency
201    /// that is accessed inside `dependency_fn`.
202    ///
203    /// The return value of `dependency_fn` is passed into `handler` as an argument together with the previous value.
204    /// Additionally, the last return value of `handler` is provided as a third argument, as is done in [`Effect::new`].
205    ///
206    /// ## Usage
207    ///
208    /// ```
209    /// # use reactive_graph::effect::Effect;
210    /// # use reactive_graph::traits::*;
211    /// # use reactive_graph::signal::signal;
212    /// # tokio_test::block_on(async move {
213    /// # tokio::task::LocalSet::new().run_until(async move {
214    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
215    /// #
216    /// let (num, set_num) = signal(0);
217    ///
218    /// let effect = Effect::watch(
219    ///     move || num.get(),
220    ///     move |num, prev_num, _| {
221    ///         // log::debug!("Number: {}; Prev: {:?}", num, prev_num);
222    ///     },
223    ///     false,
224    /// );
225    /// # assert_eq!(num.get(), 0);
226    ///
227    /// set_num.set(1); // > "Number: 1; Prev: Some(0)"
228    /// # assert_eq!(num.get(), 1);
229    ///
230    /// effect.stop(); // stop watching
231    ///
232    /// set_num.set(2); // (nothing happens)
233    /// # assert_eq!(num.get(), 2);
234    /// # }).await;
235    /// # });
236    /// ```
237    ///
238    /// The callback itself doesn't track any signal that is accessed within it.
239    ///
240    /// ```
241    /// # use reactive_graph::effect::Effect;
242    /// # use reactive_graph::traits::*;
243    /// # use reactive_graph::signal::signal;
244    /// # tokio_test::block_on(async move {
245    /// # tokio::task::LocalSet::new().run_until(async move {
246    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
247    /// #
248    /// let (num, set_num) = signal(0);
249    /// let (cb_num, set_cb_num) = signal(0);
250    ///
251    /// Effect::watch(
252    ///     move || num.get(),
253    ///     move |num, _, _| {
254    ///         // log::debug!("Number: {}; Cb: {}", num, cb_num.get());
255    ///     },
256    ///     false,
257    /// );
258    ///
259    /// # assert_eq!(num.get(), 0);
260    /// set_num.set(1); // > "Number: 1; Cb: 0"
261    /// # assert_eq!(num.get(), 1);
262    ///
263    /// # assert_eq!(cb_num.get(), 0);
264    /// set_cb_num.set(1); // (nothing happens)
265    /// # assert_eq!(cb_num.get(), 1);
266    ///
267    /// set_num.set(2); // > "Number: 2; Cb: 1"
268    /// # assert_eq!(num.get(), 2);
269    /// # }).await;
270    /// # });
271    /// ```
272    ///
273    /// ## Immediate
274    ///
275    /// If the final parameter `immediate` is true, the `handler` will run immediately.
276    /// If it's `false`, the `handler` will run only after
277    /// the first change is detected of any signal that is accessed in `dependency_fn`.
278    ///
279    /// ```
280    /// # use reactive_graph::effect::Effect;
281    /// # use reactive_graph::traits::*;
282    /// # use reactive_graph::signal::signal;
283    /// # tokio_test::block_on(async move {
284    /// # tokio::task::LocalSet::new().run_until(async move {
285    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
286    /// #
287    /// let (num, set_num) = signal(0);
288    ///
289    /// Effect::watch(
290    ///     move || num.get(),
291    ///     move |num, prev_num, _| {
292    ///         // log::debug!("Number: {}; Prev: {:?}", num, prev_num);
293    ///     },
294    ///     true,
295    /// ); // > "Number: 0; Prev: None"
296    ///
297    /// # assert_eq!(num.get(), 0);
298    /// set_num.set(1); // > "Number: 1; Prev: Some(0)"
299    /// # assert_eq!(num.get(), 1);
300    /// # }).await;
301    /// # });
302    /// ```
303    pub fn watch<D, T>(
304        mut dependency_fn: impl FnMut() -> D + 'static,
305        mut handler: impl FnMut(&D, Option<&D>, Option<T>) -> T + 'static,
306        immediate: bool,
307    ) -> Self
308    where
309        D: 'static,
310        T: 'static,
311    {
312        let inner = cfg!(feature = "effects").then(|| {
313            let (mut rx, owner, inner) = effect_base();
314            let mut first_run = true;
315            let dep_value = Arc::new(RwLock::new(None::<D>));
316            let watch_value = Arc::new(RwLock::new(None::<T>));
317
318            Executor::spawn_local({
319                let dep_value = Arc::clone(&dep_value);
320                let watch_value = Arc::clone(&watch_value);
321                let subscriber = inner.to_any_subscriber();
322
323                async move {
324                    while rx.next().await.is_some() {
325                        if !owner.paused()
326                            && (subscriber.with_observer(|| {
327                                subscriber.update_if_necessary()
328                            }) || first_run)
329                        {
330                            subscriber.clear_sources(&subscriber);
331
332                            let old_dep_value = mem::take(
333                                &mut *dep_value.write().or_poisoned(),
334                            );
335                            let new_dep_value = owner.with_cleanup(|| {
336                                subscriber.with_observer(&mut dependency_fn)
337                            });
338
339                            let old_watch_value = mem::take(
340                                &mut *watch_value.write().or_poisoned(),
341                            );
342
343                            if immediate || !first_run {
344                                let new_watch_value = handler(
345                                    &new_dep_value,
346                                    old_dep_value.as_ref(),
347                                    old_watch_value,
348                                );
349
350                                *watch_value.write().or_poisoned() =
351                                    Some(new_watch_value);
352                            }
353
354                            *dep_value.write().or_poisoned() =
355                                Some(new_dep_value);
356
357                            first_run = false;
358                        }
359                    }
360                }
361            });
362
363            ArenaItem::new_with_storage(Some(inner))
364        });
365
366        Self { inner }
367    }
368}
369
370impl Effect<SyncStorage> {
371    /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
372    /// that are read inside it change.
373    ///
374    /// This spawns a task that can be run on any thread. For an effect that will be spawned on
375    /// the current thread, use [`new`](Effect::new).
376    pub fn new_sync<T, M>(
377        fun: impl EffectFunction<T, M> + Send + Sync + 'static,
378    ) -> Self
379    where
380        T: Send + Sync + 'static,
381    {
382        if !cfg!(feature = "effects") {
383            return Self { inner: None };
384        }
385
386        Self::new_isomorphic(fun)
387    }
388
389    /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
390    /// that are read inside it change.
391    ///
392    /// This will run whether the `effects` feature is enabled or not.
393    pub fn new_isomorphic<T, M>(
394        mut fun: impl EffectFunction<T, M> + Send + Sync + 'static,
395    ) -> Self
396    where
397        T: Send + Sync + 'static,
398    {
399        let (mut rx, owner, inner) = effect_base();
400        let mut first_run = true;
401        let value = Arc::new(RwLock::new(None::<T>));
402
403        let task = {
404            let value = Arc::clone(&value);
405            let subscriber = inner.to_any_subscriber();
406
407            async move {
408                while rx.next().await.is_some() {
409                    if !owner.paused()
410                        && (subscriber
411                            .with_observer(|| subscriber.update_if_necessary())
412                            || first_run)
413                    {
414                        first_run = false;
415                        subscriber.clear_sources(&subscriber);
416
417                        let old_value =
418                            mem::take(&mut *value.write().or_poisoned());
419                        let new_value = owner.with_cleanup(|| {
420                            subscriber.with_observer(|| {
421                                run_in_effect_scope(|| fun.run(old_value))
422                            })
423                        });
424                        *value.write().or_poisoned() = Some(new_value);
425                    }
426                }
427            }
428        };
429
430        crate::spawn(task);
431
432        Self {
433            inner: Some(ArenaItem::new_with_storage(Some(inner))),
434        }
435    }
436
437    /// This is to [`Effect::watch`] what [`Effect::new_sync`] is to [`Effect::new`].
438    pub fn watch_sync<D, T>(
439        mut dependency_fn: impl FnMut() -> D + Send + Sync + 'static,
440        mut handler: impl FnMut(&D, Option<&D>, Option<T>) -> T
441            + Send
442            + Sync
443            + 'static,
444        immediate: bool,
445    ) -> Self
446    where
447        D: Send + Sync + 'static,
448        T: Send + Sync + 'static,
449    {
450        let (mut rx, owner, inner) = effect_base();
451        let mut first_run = true;
452        let dep_value = Arc::new(RwLock::new(None::<D>));
453        let watch_value = Arc::new(RwLock::new(None::<T>));
454
455        let inner = cfg!(feature = "effects").then(|| {
456            crate::spawn({
457                let dep_value = Arc::clone(&dep_value);
458                let watch_value = Arc::clone(&watch_value);
459                let subscriber = inner.to_any_subscriber();
460
461                async move {
462                    while rx.next().await.is_some() {
463                        if !owner.paused()
464                            && (subscriber.with_observer(|| {
465                                subscriber.update_if_necessary()
466                            }) || first_run)
467                        {
468                            subscriber.clear_sources(&subscriber);
469
470                            let old_dep_value = mem::take(
471                                &mut *dep_value.write().or_poisoned(),
472                            );
473                            let new_dep_value = owner.with_cleanup(|| {
474                                subscriber.with_observer(&mut dependency_fn)
475                            });
476
477                            let old_watch_value = mem::take(
478                                &mut *watch_value.write().or_poisoned(),
479                            );
480
481                            if immediate || !first_run {
482                                let new_watch_value = handler(
483                                    &new_dep_value,
484                                    old_dep_value.as_ref(),
485                                    old_watch_value,
486                                );
487
488                                *watch_value.write().or_poisoned() =
489                                    Some(new_watch_value);
490                            }
491
492                            *dep_value.write().or_poisoned() =
493                                Some(new_dep_value);
494
495                            first_run = false;
496                        }
497                    }
498                }
499            });
500
501            ArenaItem::new_with_storage(Some(inner))
502        });
503
504        Self { inner }
505    }
506}
507
508impl<S> ToAnySubscriber for Effect<S>
509where
510    S: Storage<StoredEffect>,
511{
512    fn to_any_subscriber(&self) -> AnySubscriber {
513        self.inner
514            .and_then(|inner| {
515                inner
516                    .try_with_value(|inner| {
517                        inner.as_ref().map(|inner| inner.to_any_subscriber())
518                    })
519                    .flatten()
520            })
521            .expect("tried to set effect that has been stopped")
522    }
523}
524
525/// Creates an [`Effect`].
526#[inline(always)]
527#[track_caller]
528#[deprecated = "This function is being removed to conform to Rust idioms. \
529                Please use `Effect::new()` instead."]
530pub fn create_effect<T>(
531    fun: impl FnMut(Option<T>) -> T + 'static,
532) -> Effect<LocalStorage>
533where
534    T: 'static,
535{
536    Effect::new(fun)
537}
538
539/// Creates an [`Effect`], equivalent to [Effect::watch].
540#[inline(always)]
541#[track_caller]
542#[deprecated = "This function is being removed to conform to Rust idioms. \
543                Please use `Effect::watch()` instead."]
544pub fn watch<W, T>(
545    deps: impl Fn() -> W + 'static,
546    callback: impl Fn(&W, Option<&W>, Option<T>) -> T + Clone + 'static,
547    immediate: bool,
548) -> impl Fn() + Clone
549where
550    W: Clone + 'static,
551    T: 'static,
552{
553    let watch = Effect::watch(deps, callback, immediate);
554
555    move || watch.stop()
556}