Skip to main content

reactive_graph/effect/
effect.rs

1use crate::{
2    channel::{channel, Receiver},
3    effect::{inner::EffectInner, EffectFunction},
4    graph::{
5        AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber,
6        WithObserver,
7    },
8    owner::{ArenaItem, LocalStorage, Owner, Storage, SyncStorage},
9    traits::Dispose,
10};
11use any_spawner::Executor;
12use futures::StreamExt;
13use or_poisoned::OrPoisoned;
14use std::{
15    mem,
16    sync::{atomic::AtomicBool, Arc, RwLock},
17};
18
19/// Effects run a certain chunk of code whenever the signals they depend on change.
20///
21/// Creating an effect runs the given function once after any current synchronous work is done.
22/// This tracks its reactive values read within it, and reruns the function whenever the value
23/// of a dependency changes.
24///
25/// Effects are intended to run *side-effects* of the system, not to synchronize state
26/// *within* the system. In other words: In most cases, you usually should not write to
27/// signals inside effects. (If you need to define a signal that depends on the value of
28/// other signals, use a derived signal or a [`Memo`](crate::computed::Memo)).
29///
30/// You can provide an effect function without parameters or one with one parameter.
31/// If you provide such a parameter, the effect function is called with an argument containing
32/// whatever value it returned the last time it ran. On the initial run, this is `None`.
33///
34/// Effects stop running when their reactive [`Owner`] is disposed.
35///
36///
37/// ## Example
38///
39/// ```
40/// # use reactive_graph::computed::*;
41/// # use reactive_graph::signal::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
42/// # use reactive_graph::prelude::*;
43/// # use reactive_graph::effect::Effect;
44/// # use reactive_graph::owner::ArenaItem;
45/// # tokio_test::block_on(async move {
46/// # tokio::task::LocalSet::new().run_until(async move {
47/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
48/// let a = RwSignal::new(0);
49/// let b = RwSignal::new(0);
50///
51/// // ✅ use effects to interact between reactive state and the outside world
52/// Effect::new(move || {
53///   // on the next “tick” prints "Value: 0" and subscribes to `a`
54///   println!("Value: {}", a.get());
55/// });
56///
57/// # assert_eq!(a.get(), 0);
58/// a.set(1);
59/// # assert_eq!(a.get(), 1);
60/// // ✅ because it's subscribed to `a`, the effect reruns and prints "Value: 1"
61///
62/// // ❌ don't use effects to synchronize state within the reactive system
63/// Effect::new(move || {
64///   // this technically works but can cause unnecessary re-renders
65///   // and easily lead to problems like infinite loops
66///   b.set(a.get() + 1);
67/// });
68/// # }).await;
69/// # });
70/// ```
71/// ## Web-Specific Notes
72///
73/// 1. **Scheduling**: Effects run after synchronous work, on the next “tick” of the reactive
74///    system. This makes them suitable for “on mount” actions: they will fire immediately after
75///    DOM rendering.
76/// 2. By default, effects do not run unless the `effects` feature is enabled. If you are using
77///    this with a web framework, this generally means that effects **do not run on the server**.
78///    and you can call browser-specific APIs within the effect function without causing issues.
79///    If you need an effect to run on the server, use [`Effect::new_isomorphic`].
80#[derive(Debug, Clone, Copy)]
81pub struct Effect<S> {
82    inner: Option<ArenaItem<StoredEffect, S>>,
83}
84
85type StoredEffect = Option<Arc<RwLock<EffectInner>>>;
86
87impl<S> Dispose for Effect<S> {
88    fn dispose(self) {
89        if let Some(inner) = self.inner {
90            inner.dispose()
91        }
92    }
93}
94
95fn effect_base() -> (Receiver, Owner, Arc<RwLock<EffectInner>>) {
96    let (mut observer, rx) = channel();
97
98    // spawn the effect asynchronously
99    // we'll notify once so it runs on the next tick,
100    // to register observed values
101    observer.notify();
102
103    let owner = Owner::new();
104    let inner = Arc::new(RwLock::new(EffectInner {
105        dirty: true,
106        observer,
107        sources: SourceSet::new(),
108    }));
109
110    (rx, owner, inner)
111}
112
113#[cfg(debug_assertions)]
114thread_local! {
115    static EFFECT_SCOPE_ACTIVE: AtomicBool = const { AtomicBool::new(false) };
116}
117
118#[cfg(debug_assertions)]
119/// Returns whether the current thread is currently running an effect.
120pub fn in_effect_scope() -> bool {
121    EFFECT_SCOPE_ACTIVE
122        .with(|scope| scope.load(std::sync::atomic::Ordering::Relaxed))
123}
124
125/// Set a static to true whilst running the given function.
126/// [`is_in_effect_scope`] will return true whilst the function is running.
127fn run_in_effect_scope<T>(fun: impl FnOnce() -> T) -> T {
128    #[cfg(debug_assertions)]
129    {
130        // For the theoretical nested case, set back to initial value rather than false:
131        let initial = EFFECT_SCOPE_ACTIVE.with(|scope| {
132            scope.swap(true, std::sync::atomic::Ordering::Relaxed)
133        });
134        let result = fun();
135        EFFECT_SCOPE_ACTIVE.with(|scope| {
136            scope.store(initial, std::sync::atomic::Ordering::Relaxed)
137        });
138        result
139    }
140    #[cfg(not(debug_assertions))]
141    {
142        fun()
143    }
144}
145
146impl<S> Effect<S>
147where
148    S: Storage<StoredEffect>,
149{
150    /// Stops this effect before it is disposed.
151    pub fn stop(self) {
152        if let Some(inner) = self
153            .inner
154            .and_then(|this| this.try_update_value(|inner| inner.take()))
155        {
156            drop(inner);
157        }
158    }
159}
160
161impl Effect<LocalStorage> {
162    /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
163    /// that are read inside it change.
164    ///
165    /// This spawns a task on the local thread using
166    /// [`spawn_local`](any_spawner::Executor::spawn_local). For an effect that can be spawned on
167    /// any thread, use [`new_sync`](Effect::new_sync).
168    pub fn new<T, M>(mut fun: impl EffectFunction<T, M> + 'static) -> Self
169    where
170        T: 'static,
171    {
172        let inner = cfg!(feature = "effects").then(|| {
173            let (mut rx, owner, inner) = effect_base();
174            let value = Arc::new(RwLock::new(None::<T>));
175            let mut first_run = true;
176
177            Executor::spawn_local({
178                let value = Arc::clone(&value);
179                let subscriber = inner.to_any_subscriber();
180
181                async move {
182                    while rx.next().await.is_some() {
183                        if !owner.paused()
184                            && (subscriber.with_observer(|| {
185                                subscriber.update_if_necessary()
186                            }) || first_run)
187                        {
188                            first_run = false;
189                            subscriber.clear_sources(&subscriber);
190
191                            let old_value =
192                                mem::take(&mut *value.write().or_poisoned());
193                            let new_value = owner.with_cleanup(|| {
194                                subscriber.with_observer(|| {
195                                    run_in_effect_scope(|| fun.run(old_value))
196                                })
197                            });
198                            *value.write().or_poisoned() = Some(new_value);
199                        }
200                    }
201                }
202            });
203
204            ArenaItem::new_with_storage(Some(inner))
205        });
206
207        Self { inner }
208    }
209
210    /// A version of [`Effect::new`] that only listens to any dependency
211    /// that is accessed inside `dependency_fn`.
212    ///
213    /// The return value of `dependency_fn` is passed into `handler` as an argument together with the previous value.
214    /// Additionally, the last return value of `handler` is provided as a third argument, as is done in [`Effect::new`].
215    ///
216    /// ## Usage
217    ///
218    /// ```
219    /// # use reactive_graph::effect::Effect;
220    /// # use reactive_graph::traits::*;
221    /// # use reactive_graph::signal::signal;
222    /// # tokio_test::block_on(async move {
223    /// # tokio::task::LocalSet::new().run_until(async move {
224    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
225    /// #
226    /// let (num, set_num) = signal(0);
227    ///
228    /// let effect = Effect::watch(
229    ///     move || num.get(),
230    ///     move |num, prev_num, _| {
231    ///         // log::debug!("Number: {}; Prev: {:?}", num, prev_num);
232    ///     },
233    ///     false,
234    /// );
235    /// # assert_eq!(num.get(), 0);
236    ///
237    /// set_num.set(1); // > "Number: 1; Prev: Some(0)"
238    /// # assert_eq!(num.get(), 1);
239    ///
240    /// effect.stop(); // stop watching
241    ///
242    /// set_num.set(2); // (nothing happens)
243    /// # assert_eq!(num.get(), 2);
244    /// # }).await;
245    /// # });
246    /// ```
247    ///
248    /// The callback itself doesn't track any signal that is accessed within it.
249    ///
250    /// ```
251    /// # use reactive_graph::effect::Effect;
252    /// # use reactive_graph::traits::*;
253    /// # use reactive_graph::signal::signal;
254    /// # tokio_test::block_on(async move {
255    /// # tokio::task::LocalSet::new().run_until(async move {
256    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
257    /// #
258    /// let (num, set_num) = signal(0);
259    /// let (cb_num, set_cb_num) = signal(0);
260    ///
261    /// Effect::watch(
262    ///     move || num.get(),
263    ///     move |num, _, _| {
264    ///         // log::debug!("Number: {}; Cb: {}", num, cb_num.get());
265    ///     },
266    ///     false,
267    /// );
268    ///
269    /// # assert_eq!(num.get(), 0);
270    /// set_num.set(1); // > "Number: 1; Cb: 0"
271    /// # assert_eq!(num.get(), 1);
272    ///
273    /// # assert_eq!(cb_num.get(), 0);
274    /// set_cb_num.set(1); // (nothing happens)
275    /// # assert_eq!(cb_num.get(), 1);
276    ///
277    /// set_num.set(2); // > "Number: 2; Cb: 1"
278    /// # assert_eq!(num.get(), 2);
279    /// # }).await;
280    /// # });
281    /// ```
282    ///
283    /// ## Immediate
284    ///
285    /// If the final parameter `immediate` is true, the `handler` will run immediately.
286    /// If it's `false`, the `handler` will run only after
287    /// the first change is detected of any signal that is accessed in `dependency_fn`.
288    ///
289    /// ```
290    /// # use reactive_graph::effect::Effect;
291    /// # use reactive_graph::traits::*;
292    /// # use reactive_graph::signal::signal;
293    /// # tokio_test::block_on(async move {
294    /// # tokio::task::LocalSet::new().run_until(async move {
295    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
296    /// #
297    /// let (num, set_num) = signal(0);
298    ///
299    /// Effect::watch(
300    ///     move || num.get(),
301    ///     move |num, prev_num, _| {
302    ///         // log::debug!("Number: {}; Prev: {:?}", num, prev_num);
303    ///     },
304    ///     true,
305    /// ); // > "Number: 0; Prev: None"
306    ///
307    /// # assert_eq!(num.get(), 0);
308    /// set_num.set(1); // > "Number: 1; Prev: Some(0)"
309    /// # assert_eq!(num.get(), 1);
310    /// # }).await;
311    /// # });
312    /// ```
313    pub fn watch<D, T>(
314        mut dependency_fn: impl FnMut() -> D + 'static,
315        mut handler: impl FnMut(&D, Option<&D>, Option<T>) -> T + 'static,
316        immediate: bool,
317    ) -> Self
318    where
319        D: 'static,
320        T: 'static,
321    {
322        let inner = cfg!(feature = "effects").then(|| {
323            let (mut rx, owner, inner) = effect_base();
324            let mut first_run = true;
325            let dep_value = Arc::new(RwLock::new(None::<D>));
326            let watch_value = Arc::new(RwLock::new(None::<T>));
327
328            Executor::spawn_local({
329                let dep_value = Arc::clone(&dep_value);
330                let watch_value = Arc::clone(&watch_value);
331                let subscriber = inner.to_any_subscriber();
332
333                async move {
334                    while rx.next().await.is_some() {
335                        if !owner.paused()
336                            && (subscriber.with_observer(|| {
337                                subscriber.update_if_necessary()
338                            }) || first_run)
339                        {
340                            subscriber.clear_sources(&subscriber);
341
342                            let old_dep_value = mem::take(
343                                &mut *dep_value.write().or_poisoned(),
344                            );
345                            let new_dep_value = owner.with_cleanup(|| {
346                                subscriber.with_observer(&mut dependency_fn)
347                            });
348
349                            let old_watch_value = mem::take(
350                                &mut *watch_value.write().or_poisoned(),
351                            );
352
353                            if immediate || !first_run {
354                                let new_watch_value = handler(
355                                    &new_dep_value,
356                                    old_dep_value.as_ref(),
357                                    old_watch_value,
358                                );
359
360                                *watch_value.write().or_poisoned() =
361                                    Some(new_watch_value);
362                            }
363
364                            *dep_value.write().or_poisoned() =
365                                Some(new_dep_value);
366
367                            first_run = false;
368                        }
369                    }
370                }
371            });
372
373            ArenaItem::new_with_storage(Some(inner))
374        });
375
376        Self { inner }
377    }
378}
379
380impl Effect<SyncStorage> {
381    /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
382    /// that are read inside it change.
383    ///
384    /// This spawns a task that can be run on any thread. For an effect that will be spawned on
385    /// the current thread, use [`new`](Effect::new).
386    pub fn new_sync<T, M>(
387        fun: impl EffectFunction<T, M> + Send + Sync + 'static,
388    ) -> Self
389    where
390        T: Send + Sync + 'static,
391    {
392        if !cfg!(feature = "effects") {
393            return Self { inner: None };
394        }
395
396        Self::new_isomorphic(fun)
397    }
398
399    /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
400    /// that are read inside it change.
401    ///
402    /// This will run whether the `effects` feature is enabled or not.
403    pub fn new_isomorphic<T, M>(
404        mut fun: impl EffectFunction<T, M> + Send + Sync + 'static,
405    ) -> Self
406    where
407        T: Send + Sync + 'static,
408    {
409        let (mut rx, owner, inner) = effect_base();
410        let mut first_run = true;
411        let value = Arc::new(RwLock::new(None::<T>));
412
413        let task = {
414            let value = Arc::clone(&value);
415            let subscriber = inner.to_any_subscriber();
416
417            async move {
418                while rx.next().await.is_some() {
419                    if !owner.paused()
420                        && (subscriber
421                            .with_observer(|| subscriber.update_if_necessary())
422                            || first_run)
423                    {
424                        first_run = false;
425                        subscriber.clear_sources(&subscriber);
426
427                        let old_value =
428                            mem::take(&mut *value.write().or_poisoned());
429                        let new_value = owner.with_cleanup(|| {
430                            subscriber.with_observer(|| {
431                                run_in_effect_scope(|| fun.run(old_value))
432                            })
433                        });
434                        *value.write().or_poisoned() = Some(new_value);
435                    }
436                }
437            }
438        };
439
440        crate::spawn(task);
441
442        Self {
443            inner: Some(ArenaItem::new_with_storage(Some(inner))),
444        }
445    }
446
447    /// This is to [`Effect::watch`] what [`Effect::new_sync`] is to [`Effect::new`].
448    pub fn watch_sync<D, T>(
449        mut dependency_fn: impl FnMut() -> D + Send + Sync + 'static,
450        mut handler: impl FnMut(&D, Option<&D>, Option<T>) -> T
451            + Send
452            + Sync
453            + 'static,
454        immediate: bool,
455    ) -> Self
456    where
457        D: Send + Sync + 'static,
458        T: Send + Sync + 'static,
459    {
460        let (mut rx, owner, inner) = effect_base();
461        let mut first_run = true;
462        let dep_value = Arc::new(RwLock::new(None::<D>));
463        let watch_value = Arc::new(RwLock::new(None::<T>));
464
465        let inner = cfg!(feature = "effects").then(|| {
466            crate::spawn({
467                let dep_value = Arc::clone(&dep_value);
468                let watch_value = Arc::clone(&watch_value);
469                let subscriber = inner.to_any_subscriber();
470
471                async move {
472                    while rx.next().await.is_some() {
473                        if !owner.paused()
474                            && (subscriber.with_observer(|| {
475                                subscriber.update_if_necessary()
476                            }) || first_run)
477                        {
478                            subscriber.clear_sources(&subscriber);
479
480                            let old_dep_value = mem::take(
481                                &mut *dep_value.write().or_poisoned(),
482                            );
483                            let new_dep_value = owner.with_cleanup(|| {
484                                subscriber.with_observer(&mut dependency_fn)
485                            });
486
487                            let old_watch_value = mem::take(
488                                &mut *watch_value.write().or_poisoned(),
489                            );
490
491                            if immediate || !first_run {
492                                let new_watch_value = handler(
493                                    &new_dep_value,
494                                    old_dep_value.as_ref(),
495                                    old_watch_value,
496                                );
497
498                                *watch_value.write().or_poisoned() =
499                                    Some(new_watch_value);
500                            }
501
502                            *dep_value.write().or_poisoned() =
503                                Some(new_dep_value);
504
505                            first_run = false;
506                        }
507                    }
508                }
509            });
510
511            ArenaItem::new_with_storage(Some(inner))
512        });
513
514        Self { inner }
515    }
516}
517
518impl<S> ToAnySubscriber for Effect<S>
519where
520    S: Storage<StoredEffect>,
521{
522    fn to_any_subscriber(&self) -> AnySubscriber {
523        self.inner
524            .and_then(|inner| {
525                inner
526                    .try_with_value(|inner| {
527                        inner.as_ref().map(|inner| inner.to_any_subscriber())
528                    })
529                    .flatten()
530            })
531            .expect("tried to set effect that has been stopped")
532    }
533}
534
535/// Creates an [`Effect`].
536#[inline(always)]
537#[track_caller]
538#[deprecated = "This function is being removed to conform to Rust idioms. \
539                Please use `Effect::new()` instead."]
540pub fn create_effect<T>(
541    fun: impl FnMut(Option<T>) -> T + 'static,
542) -> Effect<LocalStorage>
543where
544    T: 'static,
545{
546    Effect::new(fun)
547}
548
549/// Creates an [`Effect`], equivalent to [Effect::watch].
550#[inline(always)]
551#[track_caller]
552#[deprecated = "This function is being removed to conform to Rust idioms. \
553                Please use `Effect::watch()` instead."]
554pub fn watch<W, T>(
555    deps: impl Fn() -> W + 'static,
556    callback: impl Fn(&W, Option<&W>, Option<T>) -> T + Clone + 'static,
557    immediate: bool,
558) -> impl Fn() + Clone
559where
560    W: Clone + 'static,
561    T: 'static,
562{
563    let watch = Effect::watch(deps, callback, immediate);
564
565    move || watch.stop()
566}