reactive_graph/effect/
immediate.rs

1use crate::{
2    graph::{AnySubscriber, ReactiveNode, ToAnySubscriber},
3    owner::on_cleanup,
4    traits::{DefinedAt, Dispose},
5};
6use or_poisoned::OrPoisoned;
7use std::{
8    panic::Location,
9    sync::{Arc, Mutex, RwLock},
10};
11
12/// Effects run a certain chunk of code whenever the signals they depend on change.
13///
14/// The effect runs on creation and again as soon as any tracked signal changes.
15///
16/// NOTE: you probably want use [`Effect`](super::Effect) instead.
17/// This is for the few cases where it's important to execute effects immediately and in order.
18///
19/// [ImmediateEffect]s stop running when dropped.
20///
21/// NOTE: since effects are executed immediately, they might recurse.
22/// Under recursion or parallelism only the last run to start is tracked.
23///
24/// ## Example
25///
26/// ```
27/// # use reactive_graph::computed::*;
28/// # use reactive_graph::signal::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
29/// # use reactive_graph::prelude::*;
30/// # use reactive_graph::effect::ImmediateEffect;
31/// # use reactive_graph::owner::ArenaItem;
32/// # let owner = reactive_graph::owner::Owner::new(); owner.set();
33/// let a = RwSignal::new(0);
34/// let b = RwSignal::new(0);
35///
36/// // ✅ use effects to interact between reactive state and the outside world
37/// let _drop_guard = ImmediateEffect::new(move || {
38///   // on the next “tick” prints "Value: 0" and subscribes to `a`
39///   println!("Value: {}", a.get());
40/// });
41///
42/// // The effect runs immediately and subscribes to `a`, in the process it prints "Value: 0"
43/// # assert_eq!(a.get(), 0);
44/// a.set(1);
45/// # assert_eq!(a.get(), 1);
46/// // ✅ because it's subscribed to `a`, the effect reruns and prints "Value: 1"
47/// ```
48/// ## Notes
49///
50/// 1. **Scheduling**: Effects run immediately, as soon as any tracked signal changes.
51/// 2. By default, effects do not run unless the `effects` feature is enabled. If you are using
52///    this with a web framework, this generally means that effects **do not run on the server**.
53///    and you can call browser-specific APIs within the effect function without causing issues.
54///    If you need an effect to run on the server, use [`ImmediateEffect::new_isomorphic`].
55#[derive(Debug, Clone)]
56pub struct ImmediateEffect {
57    inner: StoredEffect,
58}
59
60type StoredEffect = Option<Arc<RwLock<inner::EffectInner>>>;
61
62impl Dispose for ImmediateEffect {
63    fn dispose(self) {}
64}
65
66impl ImmediateEffect {
67    /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
68    /// (Unless [batch] is used.)
69    ///
70    /// NOTE: this requires a `Fn` function because it might recurse.
71    /// Use [Self::new_mut] to pass a `FnMut` function, it'll panic on recursion.
72    #[track_caller]
73    #[must_use]
74    pub fn new(fun: impl Fn() + Send + Sync + 'static) -> Self {
75        if !cfg!(feature = "effects") {
76            return Self { inner: None };
77        }
78
79        let inner = inner::EffectInner::new(fun);
80
81        inner.update_if_necessary();
82
83        Self { inner: Some(inner) }
84    }
85    /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
86    /// (Unless [batch] is used.)
87    ///
88    /// # Panics
89    /// Panics on recursion or if triggered in parallel. Also see [Self::new]
90    #[track_caller]
91    #[must_use]
92    pub fn new_mut(fun: impl FnMut() + Send + Sync + 'static) -> Self {
93        const MSG: &str = "The effect recursed or its function panicked.";
94        let fun = Mutex::new(fun);
95        Self::new(move || fun.try_lock().expect(MSG)())
96    }
97    /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
98    /// (Unless [batch] is used.)
99    ///
100    /// NOTE: this requires a `Fn` function because it might recurse.
101    /// Use [Self::new_mut_scoped] to pass a `FnMut` function, it'll panic on recursion.
102    /// NOTE: this effect is automatically cleaned up when the current owner is cleared or disposed.
103    #[track_caller]
104    pub fn new_scoped(fun: impl Fn() + Send + Sync + 'static) {
105        let effect = Self::new(fun);
106
107        on_cleanup(move || effect.dispose());
108    }
109    /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
110    /// (Unless [batch] is used.)
111    ///
112    /// NOTE: this effect is automatically cleaned up when the current owner is cleared or disposed.
113    ///
114    /// # Panics
115    /// Panics on recursion or if triggered in parallel. Also see [Self::new_scoped]
116    #[track_caller]
117    pub fn new_mut_scoped(fun: impl FnMut() + Send + Sync + 'static) {
118        let effect = Self::new_mut(fun);
119
120        on_cleanup(move || effect.dispose());
121    }
122
123    /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
124    ///
125    /// This will run whether the `effects` feature is enabled or not.
126    #[track_caller]
127    #[must_use]
128    pub fn new_isomorphic(fun: impl Fn() + Send + Sync + 'static) -> Self {
129        let inner = inner::EffectInner::new(fun);
130
131        inner.update_if_necessary();
132
133        Self { inner: Some(inner) }
134    }
135}
136
137impl ToAnySubscriber for ImmediateEffect {
138    fn to_any_subscriber(&self) -> AnySubscriber {
139        const MSG: &str = "tried to set effect that has been stopped";
140        self.inner.as_ref().expect(MSG).to_any_subscriber()
141    }
142}
143
144impl DefinedAt for ImmediateEffect {
145    fn defined_at(&self) -> Option<&'static Location<'static>> {
146        self.inner.as_ref()?.read().or_poisoned().defined_at()
147    }
148}
149
150/// Defers any [ImmediateEffect]s from running until the end of the function.
151///
152/// NOTE: this affects only [ImmediateEffect]s, not other effects.
153///
154/// NOTE: this is rarely needed, but it is useful for example when multiple signals
155/// need to be updated atomically (for example a double-bound signal tree).
156pub fn batch<T>(f: impl FnOnce() -> T) -> T {
157    struct ExecuteOnDrop;
158    impl Drop for ExecuteOnDrop {
159        fn drop(&mut self) {
160            let effects = {
161                let mut batch = inner::BATCH.write().or_poisoned();
162                batch.take().unwrap().into_inner().expect("lock poisoned")
163            };
164            // TODO: Should we skip the effects if it's panicking?
165            for effect in effects {
166                effect.update_if_necessary();
167            }
168        }
169    }
170    let mut execute_on_drop = None;
171    {
172        let mut batch = inner::BATCH.write().or_poisoned();
173        if batch.is_none() {
174            execute_on_drop = Some(ExecuteOnDrop);
175        } else {
176            // Nested batching has no effect.
177        }
178        *batch = Some(batch.take().unwrap_or_default());
179    }
180    let ret = f();
181    drop(execute_on_drop);
182    ret
183}
184
185mod inner {
186    use crate::{
187        graph::{
188            AnySource, AnySubscriber, ReactiveNode, ReactiveNodeState,
189            SourceSet, Subscriber, ToAnySubscriber, WithObserver,
190        },
191        log_warning,
192        owner::Owner,
193        traits::DefinedAt,
194    };
195    use indexmap::IndexSet;
196    use or_poisoned::OrPoisoned;
197    use std::{
198        panic::Location,
199        sync::{Arc, RwLock, Weak},
200        thread::{self, ThreadId},
201    };
202
203    /// Only the [super::batch] function ever writes to the outer RwLock.
204    /// While the effects will write to the inner one.
205    pub(super) static BATCH: RwLock<Option<RwLock<IndexSet<AnySubscriber>>>> =
206        RwLock::new(None);
207
208    /// Handles subscription logic for effects.
209    ///
210    /// To handle parallelism and recursion we assign ordered (1..) ids to each run.
211    /// We only keep the sources tracked by the run with the highest id (the last one).
212    ///
213    /// We do this by:
214    /// - Clearing the sources before every run, so the last one clears anything before it.
215    /// - We stop tracking sources after the last run has completed.
216    ///   (A parent run will start before and end after a recursive child run.)
217    /// - To handle parallelism with the last run, we only allow sources to be added by its thread.
218    pub(super) struct EffectInner {
219        #[cfg(any(debug_assertions, leptos_debuginfo))]
220        defined_at: &'static Location<'static>,
221        owner: Owner,
222        state: ReactiveNodeState,
223        /// The number of effect runs in this 'batch'.
224        /// Cleared when no runs are *ongoing* anymore.
225        /// Used to assign ordered ids to each run, and to know when we can clear these values.
226        run_count_start: usize,
227        /// The number of effect runs that have completed in the current 'batch'.
228        /// Cleared when no runs are *ongoing* anymore.
229        /// Used to know when we can clear these values.
230        run_done_count: usize,
231        /// Given ordered ids (1..), the run with the highest id that has completed in this 'batch'.
232        /// Cleared when no runs are *ongoing* anymore.
233        /// Used to know whether the current run is the latest one.
234        run_done_max: usize,
235        /// The [ThreadId] of the run with the highest id.
236        /// Used to prevent over-subscribing during parallel execution with the last run.
237        ///
238        /// ```text
239        /// Thread 1:
240        /// -------------------------
241        ///   ---   ---    =======
242        ///
243        /// Thread 2:
244        /// -------------------------
245        ///             -----------
246        /// ```
247        ///
248        /// In the parallel example above, we can see why we need this.
249        /// The last run is marked using `=`, but another run in the other thread might
250        /// also be gathering sources. So we only allow the run from the correct [ThreadId] to push sources.
251        last_run_thread_id: ThreadId,
252        fun: Arc<dyn Fn() + Send + Sync>,
253        sources: SourceSet,
254        any_subscriber: AnySubscriber,
255    }
256
257    impl EffectInner {
258        #[track_caller]
259        pub fn new(
260            fun: impl Fn() + Send + Sync + 'static,
261        ) -> Arc<RwLock<EffectInner>> {
262            let owner = Owner::new();
263            #[cfg(any(debug_assertions, leptos_debuginfo))]
264            let defined_at = Location::caller();
265
266            Arc::new_cyclic(|weak| {
267                let any_subscriber = AnySubscriber(
268                    weak.as_ptr() as usize,
269                    Weak::clone(weak) as Weak<dyn Subscriber + Send + Sync>,
270                );
271
272                RwLock::new(EffectInner {
273                    #[cfg(any(debug_assertions, leptos_debuginfo))]
274                    defined_at,
275                    owner,
276                    state: ReactiveNodeState::Dirty,
277                    run_count_start: 0,
278                    run_done_count: 0,
279                    run_done_max: 0,
280                    last_run_thread_id: thread::current().id(),
281                    fun: Arc::new(fun),
282                    sources: SourceSet::new(),
283                    any_subscriber,
284                })
285            })
286        }
287    }
288
289    impl ToAnySubscriber for Arc<RwLock<EffectInner>> {
290        fn to_any_subscriber(&self) -> AnySubscriber {
291            AnySubscriber(
292                Arc::as_ptr(self) as usize,
293                Arc::downgrade(self) as Weak<dyn Subscriber + Send + Sync>,
294            )
295        }
296    }
297
298    impl ReactiveNode for RwLock<EffectInner> {
299        fn mark_subscribers_check(&self) {}
300
301        fn update_if_necessary(&self) -> bool {
302            let state = {
303                let guard = self.read().or_poisoned();
304
305                if guard.owner.paused() {
306                    return false;
307                }
308
309                guard.state
310            };
311
312            let needs_update = match state {
313                ReactiveNodeState::Clean => false,
314                ReactiveNodeState::Check => {
315                    let sources = self.read().or_poisoned().sources.clone();
316                    sources
317                        .into_iter()
318                        .any(|source| source.update_if_necessary())
319                }
320                ReactiveNodeState::Dirty => true,
321            };
322
323            {
324                if let Some(batch) = &*BATCH.read().or_poisoned() {
325                    let mut batch = batch.write().or_poisoned();
326                    let subscriber =
327                        self.read().or_poisoned().any_subscriber.clone();
328
329                    batch.insert(subscriber);
330                    return needs_update;
331                }
332            }
333
334            if needs_update {
335                let mut guard = self.write().or_poisoned();
336
337                let owner = guard.owner.clone();
338                let any_subscriber = guard.any_subscriber.clone();
339                let fun = guard.fun.clone();
340
341                // New run has started.
342                guard.run_count_start += 1;
343                // We get a value for this run, the highest value will be what we keep the sources from.
344                let recursion_count = guard.run_count_start;
345                // We clear the sources before running the effect.
346                // Note that this is tied to the ordering of the initial write lock acquisition
347                // to ensure the last run is also the last to clear them.
348                guard.sources.clear_sources(&any_subscriber);
349                // Only this thread will be able to subscribe.
350                guard.last_run_thread_id = thread::current().id();
351
352                if recursion_count > 2 {
353                    warn_excessive_recursion(&guard);
354                }
355
356                drop(guard);
357
358                // We execute the effect.
359                // Note that *this could happen in parallel across threads*.
360                owner.with_cleanup(|| any_subscriber.with_observer(|| fun()));
361
362                let mut guard = self.write().or_poisoned();
363
364                // This run has completed.
365                guard.run_done_count += 1;
366
367                // We update the done count.
368                // Sources will only be added if recursion_done_max < recursion_count_start.
369                // (Meaning the last run is not done yet.)
370                guard.run_done_max =
371                    Ord::max(recursion_count, guard.run_done_max);
372
373                // The same amount of runs has started and completed,
374                // so we can clear everything up for next time.
375                if guard.run_count_start == guard.run_done_count {
376                    guard.run_count_start = 0;
377                    guard.run_done_count = 0;
378                    guard.run_done_max = 0;
379                    // Can be left unchanged, it'll be set again next time.
380                    // guard.last_run_thread_id = thread::current().id();
381                }
382
383                guard.state = ReactiveNodeState::Clean;
384            }
385
386            needs_update
387        }
388
389        fn mark_check(&self) {
390            self.write().or_poisoned().state = ReactiveNodeState::Check;
391            self.update_if_necessary();
392        }
393
394        fn mark_dirty(&self) {
395            self.write().or_poisoned().state = ReactiveNodeState::Dirty;
396            self.update_if_necessary();
397        }
398    }
399
400    impl Subscriber for RwLock<EffectInner> {
401        fn add_source(&self, source: AnySource) {
402            let mut guard = self.write().or_poisoned();
403            if guard.run_done_max < guard.run_count_start
404                && guard.last_run_thread_id == thread::current().id()
405            {
406                guard.sources.insert(source);
407            }
408        }
409
410        fn clear_sources(&self, subscriber: &AnySubscriber) {
411            self.write().or_poisoned().sources.clear_sources(subscriber);
412        }
413    }
414
415    impl DefinedAt for EffectInner {
416        fn defined_at(&self) -> Option<&'static Location<'static>> {
417            #[cfg(any(debug_assertions, leptos_debuginfo))]
418            {
419                Some(self.defined_at)
420            }
421            #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
422            {
423                None
424            }
425        }
426    }
427
428    impl std::fmt::Debug for EffectInner {
429        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
430            f.debug_struct("EffectInner")
431                .field("owner", &self.owner)
432                .field("state", &self.state)
433                .field("sources", &self.sources)
434                .field("any_subscriber", &self.any_subscriber)
435                .finish()
436        }
437    }
438
439    fn warn_excessive_recursion(effect: &EffectInner) {
440        const MSG: &str = "ImmediateEffect recursed more than once.";
441        match effect.defined_at() {
442            Some(defined_at) => {
443                log_warning(format_args!("{MSG} Defined at: {defined_at}"));
444            }
445            None => {
446                log_warning(format_args!("{MSG}"));
447            }
448        }
449    }
450}