reactive_graph/effect/immediate.rs
1use crate::{
2 graph::{AnySubscriber, ReactiveNode, ToAnySubscriber},
3 owner::on_cleanup,
4 traits::{DefinedAt, Dispose},
5};
6use or_poisoned::OrPoisoned;
7use std::{
8 panic::Location,
9 sync::{Arc, Mutex, RwLock},
10};
11
12/// Effects run a certain chunk of code whenever the signals they depend on change.
13///
14/// The effect runs on creation and again as soon as any tracked signal changes.
15///
16/// NOTE: you probably want use [`Effect`](super::Effect) instead.
17/// This is for the few cases where it's important to execute effects immediately and in order.
18///
19/// [ImmediateEffect]s stop running when dropped.
20///
21/// NOTE: since effects are executed immediately, they might recurse.
22/// Under recursion or parallelism only the last run to start is tracked.
23///
24/// ## Example
25///
26/// ```
27/// # use reactive_graph::computed::*;
28/// # use reactive_graph::signal::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
29/// # use reactive_graph::prelude::*;
30/// # use reactive_graph::effect::ImmediateEffect;
31/// # use reactive_graph::owner::ArenaItem;
32/// # let owner = reactive_graph::owner::Owner::new(); owner.set();
33/// let a = RwSignal::new(0);
34/// let b = RwSignal::new(0);
35///
36/// // ✅ use effects to interact between reactive state and the outside world
37/// let _drop_guard = ImmediateEffect::new(move || {
38/// // on the next “tick” prints "Value: 0" and subscribes to `a`
39/// println!("Value: {}", a.get());
40/// });
41///
42/// // The effect runs immediately and subscribes to `a`, in the process it prints "Value: 0"
43/// # assert_eq!(a.get(), 0);
44/// a.set(1);
45/// # assert_eq!(a.get(), 1);
46/// // ✅ because it's subscribed to `a`, the effect reruns and prints "Value: 1"
47/// ```
48/// ## Notes
49///
50/// 1. **Scheduling**: Effects run immediately, as soon as any tracked signal changes.
51/// 2. By default, effects do not run unless the `effects` feature is enabled. If you are using
52/// this with a web framework, this generally means that effects **do not run on the server**.
53/// and you can call browser-specific APIs within the effect function without causing issues.
54/// If you need an effect to run on the server, use [`ImmediateEffect::new_isomorphic`].
55#[derive(Debug, Clone)]
56pub struct ImmediateEffect {
57 inner: StoredEffect,
58}
59
60type StoredEffect = Option<Arc<RwLock<inner::EffectInner>>>;
61
62impl Dispose for ImmediateEffect {
63 fn dispose(self) {}
64}
65
66impl ImmediateEffect {
67 /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
68 /// (Unless [batch] is used.)
69 ///
70 /// NOTE: this requires a `Fn` function because it might recurse.
71 /// Use [Self::new_mut] to pass a `FnMut` function, it'll panic on recursion.
72 #[track_caller]
73 #[must_use]
74 pub fn new(fun: impl Fn() + Send + Sync + 'static) -> Self {
75 if !cfg!(feature = "effects") {
76 return Self { inner: None };
77 }
78
79 let inner = inner::EffectInner::new(fun);
80
81 inner.update_if_necessary();
82
83 Self { inner: Some(inner) }
84 }
85 /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
86 /// (Unless [batch] is used.)
87 ///
88 /// # Panics
89 /// Panics on recursion or if triggered in parallel. Also see [Self::new]
90 #[track_caller]
91 #[must_use]
92 pub fn new_mut(fun: impl FnMut() + Send + Sync + 'static) -> Self {
93 const MSG: &str = "The effect recursed or its function panicked.";
94 let fun = Mutex::new(fun);
95 Self::new(move || fun.try_lock().expect(MSG)())
96 }
97 /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
98 /// (Unless [batch] is used.)
99 ///
100 /// NOTE: this requires a `Fn` function because it might recurse.
101 /// Use [Self::new_mut_scoped] to pass a `FnMut` function, it'll panic on recursion.
102 /// NOTE: this effect is automatically cleaned up when the current owner is cleared or disposed.
103 #[track_caller]
104 pub fn new_scoped(fun: impl Fn() + Send + Sync + 'static) {
105 let effect = Self::new(fun);
106
107 on_cleanup(move || effect.dispose());
108 }
109 /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
110 /// (Unless [batch] is used.)
111 ///
112 /// NOTE: this effect is automatically cleaned up when the current owner is cleared or disposed.
113 ///
114 /// # Panics
115 /// Panics on recursion or if triggered in parallel. Also see [Self::new_scoped]
116 #[track_caller]
117 pub fn new_mut_scoped(fun: impl FnMut() + Send + Sync + 'static) {
118 let effect = Self::new_mut(fun);
119
120 on_cleanup(move || effect.dispose());
121 }
122
123 /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
124 ///
125 /// This will run whether the `effects` feature is enabled or not.
126 #[track_caller]
127 #[must_use]
128 pub fn new_isomorphic(fun: impl Fn() + Send + Sync + 'static) -> Self {
129 let inner = inner::EffectInner::new(fun);
130
131 inner.update_if_necessary();
132
133 Self { inner: Some(inner) }
134 }
135}
136
137impl ToAnySubscriber for ImmediateEffect {
138 fn to_any_subscriber(&self) -> AnySubscriber {
139 const MSG: &str = "tried to set effect that has been stopped";
140 self.inner.as_ref().expect(MSG).to_any_subscriber()
141 }
142}
143
144impl DefinedAt for ImmediateEffect {
145 fn defined_at(&self) -> Option<&'static Location<'static>> {
146 self.inner.as_ref()?.read().or_poisoned().defined_at()
147 }
148}
149
150/// Defers any [ImmediateEffect]s from running until the end of the function.
151///
152/// NOTE: this affects only [ImmediateEffect]s, not other effects.
153///
154/// NOTE: this is rarely needed, but it is useful for example when multiple signals
155/// need to be updated atomically (for example a double-bound signal tree).
156pub fn batch<T>(f: impl FnOnce() -> T) -> T {
157 struct ExecuteOnDrop;
158 impl Drop for ExecuteOnDrop {
159 fn drop(&mut self) {
160 let effects = {
161 let mut batch = inner::BATCH.write().or_poisoned();
162 batch.take().unwrap().into_inner().expect("lock poisoned")
163 };
164 // TODO: Should we skip the effects if it's panicking?
165 for effect in effects {
166 effect.update_if_necessary();
167 }
168 }
169 }
170 let mut execute_on_drop = None;
171 {
172 let mut batch = inner::BATCH.write().or_poisoned();
173 if batch.is_none() {
174 execute_on_drop = Some(ExecuteOnDrop);
175 } else {
176 // Nested batching has no effect.
177 }
178 *batch = Some(batch.take().unwrap_or_default());
179 }
180 let ret = f();
181 drop(execute_on_drop);
182 ret
183}
184
185mod inner {
186 use crate::{
187 graph::{
188 AnySource, AnySubscriber, ReactiveNode, ReactiveNodeState,
189 SourceSet, Subscriber, ToAnySubscriber, WithObserver,
190 },
191 log_warning,
192 owner::Owner,
193 traits::DefinedAt,
194 };
195 use indexmap::IndexSet;
196 use or_poisoned::OrPoisoned;
197 use std::{
198 panic::Location,
199 sync::{Arc, RwLock, Weak},
200 thread::{self, ThreadId},
201 };
202
203 /// Only the [super::batch] function ever writes to the outer RwLock.
204 /// While the effects will write to the inner one.
205 pub(super) static BATCH: RwLock<Option<RwLock<IndexSet<AnySubscriber>>>> =
206 RwLock::new(None);
207
208 /// Handles subscription logic for effects.
209 ///
210 /// To handle parallelism and recursion we assign ordered (1..) ids to each run.
211 /// We only keep the sources tracked by the run with the highest id (the last one).
212 ///
213 /// We do this by:
214 /// - Clearing the sources before every run, so the last one clears anything before it.
215 /// - We stop tracking sources after the last run has completed.
216 /// (A parent run will start before and end after a recursive child run.)
217 /// - To handle parallelism with the last run, we only allow sources to be added by its thread.
218 pub(super) struct EffectInner {
219 #[cfg(any(debug_assertions, leptos_debuginfo))]
220 defined_at: &'static Location<'static>,
221 owner: Owner,
222 state: ReactiveNodeState,
223 /// The number of effect runs in this 'batch'.
224 /// Cleared when no runs are *ongoing* anymore.
225 /// Used to assign ordered ids to each run, and to know when we can clear these values.
226 run_count_start: usize,
227 /// The number of effect runs that have completed in the current 'batch'.
228 /// Cleared when no runs are *ongoing* anymore.
229 /// Used to know when we can clear these values.
230 run_done_count: usize,
231 /// Given ordered ids (1..), the run with the highest id that has completed in this 'batch'.
232 /// Cleared when no runs are *ongoing* anymore.
233 /// Used to know whether the current run is the latest one.
234 run_done_max: usize,
235 /// The [ThreadId] of the run with the highest id.
236 /// Used to prevent over-subscribing during parallel execution with the last run.
237 ///
238 /// ```text
239 /// Thread 1:
240 /// -------------------------
241 /// --- --- =======
242 ///
243 /// Thread 2:
244 /// -------------------------
245 /// -----------
246 /// ```
247 ///
248 /// In the parallel example above, we can see why we need this.
249 /// The last run is marked using `=`, but another run in the other thread might
250 /// also be gathering sources. So we only allow the run from the correct [ThreadId] to push sources.
251 last_run_thread_id: ThreadId,
252 fun: Arc<dyn Fn() + Send + Sync>,
253 sources: SourceSet,
254 any_subscriber: AnySubscriber,
255 }
256
257 impl EffectInner {
258 #[track_caller]
259 pub fn new(
260 fun: impl Fn() + Send + Sync + 'static,
261 ) -> Arc<RwLock<EffectInner>> {
262 let owner = Owner::new();
263 #[cfg(any(debug_assertions, leptos_debuginfo))]
264 let defined_at = Location::caller();
265
266 Arc::new_cyclic(|weak| {
267 let any_subscriber = AnySubscriber(
268 weak.as_ptr() as usize,
269 Weak::clone(weak) as Weak<dyn Subscriber + Send + Sync>,
270 );
271
272 RwLock::new(EffectInner {
273 #[cfg(any(debug_assertions, leptos_debuginfo))]
274 defined_at,
275 owner,
276 state: ReactiveNodeState::Dirty,
277 run_count_start: 0,
278 run_done_count: 0,
279 run_done_max: 0,
280 last_run_thread_id: thread::current().id(),
281 fun: Arc::new(fun),
282 sources: SourceSet::new(),
283 any_subscriber,
284 })
285 })
286 }
287 }
288
289 impl ToAnySubscriber for Arc<RwLock<EffectInner>> {
290 fn to_any_subscriber(&self) -> AnySubscriber {
291 AnySubscriber(
292 Arc::as_ptr(self) as usize,
293 Arc::downgrade(self) as Weak<dyn Subscriber + Send + Sync>,
294 )
295 }
296 }
297
298 impl ReactiveNode for RwLock<EffectInner> {
299 fn mark_subscribers_check(&self) {}
300
301 fn update_if_necessary(&self) -> bool {
302 let state = {
303 let guard = self.read().or_poisoned();
304
305 if guard.owner.paused() {
306 return false;
307 }
308
309 guard.state
310 };
311
312 let needs_update = match state {
313 ReactiveNodeState::Clean => false,
314 ReactiveNodeState::Check => {
315 let sources = self.read().or_poisoned().sources.clone();
316 sources
317 .into_iter()
318 .any(|source| source.update_if_necessary())
319 }
320 ReactiveNodeState::Dirty => true,
321 };
322
323 {
324 if let Some(batch) = &*BATCH.read().or_poisoned() {
325 let mut batch = batch.write().or_poisoned();
326 let subscriber =
327 self.read().or_poisoned().any_subscriber.clone();
328
329 batch.insert(subscriber);
330 return needs_update;
331 }
332 }
333
334 if needs_update {
335 let mut guard = self.write().or_poisoned();
336
337 let owner = guard.owner.clone();
338 let any_subscriber = guard.any_subscriber.clone();
339 let fun = guard.fun.clone();
340
341 // New run has started.
342 guard.run_count_start += 1;
343 // We get a value for this run, the highest value will be what we keep the sources from.
344 let recursion_count = guard.run_count_start;
345 // We clear the sources before running the effect.
346 // Note that this is tied to the ordering of the initial write lock acquisition
347 // to ensure the last run is also the last to clear them.
348 guard.sources.clear_sources(&any_subscriber);
349 // Only this thread will be able to subscribe.
350 guard.last_run_thread_id = thread::current().id();
351
352 if recursion_count > 2 {
353 warn_excessive_recursion(&guard);
354 }
355
356 drop(guard);
357
358 // We execute the effect.
359 // Note that *this could happen in parallel across threads*.
360 owner.with_cleanup(|| any_subscriber.with_observer(|| fun()));
361
362 let mut guard = self.write().or_poisoned();
363
364 // This run has completed.
365 guard.run_done_count += 1;
366
367 // We update the done count.
368 // Sources will only be added if recursion_done_max < recursion_count_start.
369 // (Meaning the last run is not done yet.)
370 guard.run_done_max =
371 Ord::max(recursion_count, guard.run_done_max);
372
373 // The same amount of runs has started and completed,
374 // so we can clear everything up for next time.
375 if guard.run_count_start == guard.run_done_count {
376 guard.run_count_start = 0;
377 guard.run_done_count = 0;
378 guard.run_done_max = 0;
379 // Can be left unchanged, it'll be set again next time.
380 // guard.last_run_thread_id = thread::current().id();
381 }
382
383 guard.state = ReactiveNodeState::Clean;
384 }
385
386 needs_update
387 }
388
389 fn mark_check(&self) {
390 self.write().or_poisoned().state = ReactiveNodeState::Check;
391 self.update_if_necessary();
392 }
393
394 fn mark_dirty(&self) {
395 self.write().or_poisoned().state = ReactiveNodeState::Dirty;
396 self.update_if_necessary();
397 }
398 }
399
400 impl Subscriber for RwLock<EffectInner> {
401 fn add_source(&self, source: AnySource) {
402 let mut guard = self.write().or_poisoned();
403 if guard.run_done_max < guard.run_count_start
404 && guard.last_run_thread_id == thread::current().id()
405 {
406 guard.sources.insert(source);
407 }
408 }
409
410 fn clear_sources(&self, subscriber: &AnySubscriber) {
411 self.write().or_poisoned().sources.clear_sources(subscriber);
412 }
413 }
414
415 impl DefinedAt for EffectInner {
416 fn defined_at(&self) -> Option<&'static Location<'static>> {
417 #[cfg(any(debug_assertions, leptos_debuginfo))]
418 {
419 Some(self.defined_at)
420 }
421 #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
422 {
423 None
424 }
425 }
426 }
427
428 impl std::fmt::Debug for EffectInner {
429 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
430 f.debug_struct("EffectInner")
431 .field("owner", &self.owner)
432 .field("state", &self.state)
433 .field("sources", &self.sources)
434 .field("any_subscriber", &self.any_subscriber)
435 .finish()
436 }
437 }
438
439 fn warn_excessive_recursion(effect: &EffectInner) {
440 const MSG: &str = "ImmediateEffect recursed more than once.";
441 match effect.defined_at() {
442 Some(defined_at) => {
443 log_warning(format_args!("{MSG} Defined at: {defined_at}"));
444 }
445 None => {
446 log_warning(format_args!("{MSG}"));
447 }
448 }
449 }
450}