reactive_graph/effect/effect.rs
1use crate::{
2 channel::{channel, Receiver},
3 effect::{inner::EffectInner, EffectFunction},
4 graph::{
5 AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber,
6 WithObserver,
7 },
8 owner::{ArenaItem, LocalStorage, Owner, Storage, SyncStorage},
9 traits::Dispose,
10};
11use any_spawner::Executor;
12use futures::StreamExt;
13use or_poisoned::OrPoisoned;
14use std::{
15 mem,
16 sync::{atomic::AtomicBool, Arc, RwLock},
17};
18
19/// Effects run a certain chunk of code whenever the signals they depend on change.
20///
21/// Creating an effect runs the given function once after any current synchronous work is done.
22/// This tracks its reactive values read within it, and reruns the function whenever the value
23/// of a dependency changes.
24///
25/// Effects are intended to run *side-effects* of the system, not to synchronize state
26/// *within* the system. In other words: In most cases, you usually should not write to
27/// signals inside effects. (If you need to define a signal that depends on the value of
28/// other signals, use a derived signal or a [`Memo`](crate::computed::Memo)).
29///
30/// You can provide an effect function without parameters or one with one parameter.
31/// If you provide such a parameter, the effect function is called with an argument containing
32/// whatever value it returned the last time it ran. On the initial run, this is `None`.
33///
34/// Effects stop running when their reactive [`Owner`] is disposed.
35///
36///
37/// ## Example
38///
39/// ```
40/// # use reactive_graph::computed::*;
41/// # use reactive_graph::signal::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
42/// # use reactive_graph::prelude::*;
43/// # use reactive_graph::effect::Effect;
44/// # use reactive_graph::owner::ArenaItem;
45/// # tokio_test::block_on(async move {
46/// # tokio::task::LocalSet::new().run_until(async move {
47/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
48/// let a = RwSignal::new(0);
49/// let b = RwSignal::new(0);
50///
51/// // ✅ use effects to interact between reactive state and the outside world
52/// Effect::new(move || {
53/// // on the next “tick” prints "Value: 0" and subscribes to `a`
54/// println!("Value: {}", a.get());
55/// });
56///
57/// # assert_eq!(a.get(), 0);
58/// a.set(1);
59/// # assert_eq!(a.get(), 1);
60/// // ✅ because it's subscribed to `a`, the effect reruns and prints "Value: 1"
61///
62/// // ❌ don't use effects to synchronize state within the reactive system
63/// Effect::new(move || {
64/// // this technically works but can cause unnecessary re-renders
65/// // and easily lead to problems like infinite loops
66/// b.set(a.get() + 1);
67/// });
68/// # }).await;
69/// # });
70/// ```
71/// ## Web-Specific Notes
72///
73/// 1. **Scheduling**: Effects run after synchronous work, on the next “tick” of the reactive
74/// system. This makes them suitable for “on mount” actions: they will fire immediately after
75/// DOM rendering.
76/// 2. By default, effects do not run unless the `effects` feature is enabled. If you are using
77/// this with a web framework, this generally means that effects **do not run on the server**.
78/// and you can call browser-specific APIs within the effect function without causing issues.
79/// If you need an effect to run on the server, use [`Effect::new_isomorphic`].
80#[derive(Debug, Clone, Copy)]
81pub struct Effect<S> {
82 inner: Option<ArenaItem<StoredEffect, S>>,
83}
84
85type StoredEffect = Option<Arc<RwLock<EffectInner>>>;
86
87impl<S> Dispose for Effect<S> {
88 fn dispose(self) {
89 if let Some(inner) = self.inner {
90 inner.dispose()
91 }
92 }
93}
94
95fn effect_base() -> (Receiver, Owner, Arc<RwLock<EffectInner>>) {
96 let (mut observer, rx) = channel();
97
98 // spawn the effect asynchronously
99 // we'll notify once so it runs on the next tick,
100 // to register observed values
101 observer.notify();
102
103 let owner = Owner::new();
104 let inner = Arc::new(RwLock::new(EffectInner {
105 dirty: true,
106 observer,
107 sources: SourceSet::new(),
108 }));
109
110 (rx, owner, inner)
111}
112
113#[cfg(debug_assertions)]
114thread_local! {
115 static EFFECT_SCOPE_ACTIVE: AtomicBool = const { AtomicBool::new(false) };
116}
117
118#[cfg(debug_assertions)]
119/// Returns whether the current thread is currently running an effect.
120pub fn in_effect_scope() -> bool {
121 EFFECT_SCOPE_ACTIVE
122 .with(|scope| scope.load(std::sync::atomic::Ordering::Relaxed))
123}
124
125/// Set a static to true whilst running the given function.
126/// [`is_in_effect_scope`] will return true whilst the function is running.
127fn run_in_effect_scope<T>(fun: impl FnOnce() -> T) -> T {
128 #[cfg(debug_assertions)]
129 {
130 // For the theoretical nested case, set back to initial value rather than false:
131 let initial = EFFECT_SCOPE_ACTIVE.with(|scope| {
132 scope.swap(true, std::sync::atomic::Ordering::Relaxed)
133 });
134 let result = fun();
135 EFFECT_SCOPE_ACTIVE.with(|scope| {
136 scope.store(initial, std::sync::atomic::Ordering::Relaxed)
137 });
138 result
139 }
140 #[cfg(not(debug_assertions))]
141 {
142 fun()
143 }
144}
145
146impl<S> Effect<S>
147where
148 S: Storage<StoredEffect>,
149{
150 /// Stops this effect before it is disposed.
151 pub fn stop(self) {
152 if let Some(inner) = self
153 .inner
154 .and_then(|this| this.try_update_value(|inner| inner.take()))
155 {
156 drop(inner);
157 }
158 }
159}
160
161impl Effect<LocalStorage> {
162 /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
163 /// that are read inside it change.
164 ///
165 /// This spawns a task on the local thread using
166 /// [`spawn_local`](any_spawner::Executor::spawn_local). For an effect that can be spawned on
167 /// any thread, use [`new_sync`](Effect::new_sync).
168 pub fn new<T, M>(mut fun: impl EffectFunction<T, M> + 'static) -> Self
169 where
170 T: 'static,
171 {
172 let inner = cfg!(feature = "effects").then(|| {
173 let (mut rx, owner, inner) = effect_base();
174 let value = Arc::new(RwLock::new(None::<T>));
175 let mut first_run = true;
176
177 Executor::spawn_local({
178 let value = Arc::clone(&value);
179 let subscriber = inner.to_any_subscriber();
180
181 async move {
182 while rx.next().await.is_some() {
183 if !owner.paused()
184 && (subscriber.with_observer(|| {
185 subscriber.update_if_necessary()
186 }) || first_run)
187 {
188 first_run = false;
189 subscriber.clear_sources(&subscriber);
190
191 let old_value =
192 mem::take(&mut *value.write().or_poisoned());
193 let new_value = owner.with_cleanup(|| {
194 subscriber.with_observer(|| {
195 run_in_effect_scope(|| fun.run(old_value))
196 })
197 });
198 *value.write().or_poisoned() = Some(new_value);
199 }
200 }
201 }
202 });
203
204 ArenaItem::new_with_storage(Some(inner))
205 });
206
207 Self { inner }
208 }
209
210 /// A version of [`Effect::new`] that only listens to any dependency
211 /// that is accessed inside `dependency_fn`.
212 ///
213 /// The return value of `dependency_fn` is passed into `handler` as an argument together with the previous value.
214 /// Additionally, the last return value of `handler` is provided as a third argument, as is done in [`Effect::new`].
215 ///
216 /// ## Usage
217 ///
218 /// ```
219 /// # use reactive_graph::effect::Effect;
220 /// # use reactive_graph::traits::*;
221 /// # use reactive_graph::signal::signal;
222 /// # tokio_test::block_on(async move {
223 /// # tokio::task::LocalSet::new().run_until(async move {
224 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
225 /// #
226 /// let (num, set_num) = signal(0);
227 ///
228 /// let effect = Effect::watch(
229 /// move || num.get(),
230 /// move |num, prev_num, _| {
231 /// // log::debug!("Number: {}; Prev: {:?}", num, prev_num);
232 /// },
233 /// false,
234 /// );
235 /// # assert_eq!(num.get(), 0);
236 ///
237 /// set_num.set(1); // > "Number: 1; Prev: Some(0)"
238 /// # assert_eq!(num.get(), 1);
239 ///
240 /// effect.stop(); // stop watching
241 ///
242 /// set_num.set(2); // (nothing happens)
243 /// # assert_eq!(num.get(), 2);
244 /// # }).await;
245 /// # });
246 /// ```
247 ///
248 /// The callback itself doesn't track any signal that is accessed within it.
249 ///
250 /// ```
251 /// # use reactive_graph::effect::Effect;
252 /// # use reactive_graph::traits::*;
253 /// # use reactive_graph::signal::signal;
254 /// # tokio_test::block_on(async move {
255 /// # tokio::task::LocalSet::new().run_until(async move {
256 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
257 /// #
258 /// let (num, set_num) = signal(0);
259 /// let (cb_num, set_cb_num) = signal(0);
260 ///
261 /// Effect::watch(
262 /// move || num.get(),
263 /// move |num, _, _| {
264 /// // log::debug!("Number: {}; Cb: {}", num, cb_num.get());
265 /// },
266 /// false,
267 /// );
268 ///
269 /// # assert_eq!(num.get(), 0);
270 /// set_num.set(1); // > "Number: 1; Cb: 0"
271 /// # assert_eq!(num.get(), 1);
272 ///
273 /// # assert_eq!(cb_num.get(), 0);
274 /// set_cb_num.set(1); // (nothing happens)
275 /// # assert_eq!(cb_num.get(), 1);
276 ///
277 /// set_num.set(2); // > "Number: 2; Cb: 1"
278 /// # assert_eq!(num.get(), 2);
279 /// # }).await;
280 /// # });
281 /// ```
282 ///
283 /// ## Immediate
284 ///
285 /// If the final parameter `immediate` is true, the `handler` will run immediately.
286 /// If it's `false`, the `handler` will run only after
287 /// the first change is detected of any signal that is accessed in `dependency_fn`.
288 ///
289 /// ```
290 /// # use reactive_graph::effect::Effect;
291 /// # use reactive_graph::traits::*;
292 /// # use reactive_graph::signal::signal;
293 /// # tokio_test::block_on(async move {
294 /// # tokio::task::LocalSet::new().run_until(async move {
295 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
296 /// #
297 /// let (num, set_num) = signal(0);
298 ///
299 /// Effect::watch(
300 /// move || num.get(),
301 /// move |num, prev_num, _| {
302 /// // log::debug!("Number: {}; Prev: {:?}", num, prev_num);
303 /// },
304 /// true,
305 /// ); // > "Number: 0; Prev: None"
306 ///
307 /// # assert_eq!(num.get(), 0);
308 /// set_num.set(1); // > "Number: 1; Prev: Some(0)"
309 /// # assert_eq!(num.get(), 1);
310 /// # }).await;
311 /// # });
312 /// ```
313 pub fn watch<D, T>(
314 mut dependency_fn: impl FnMut() -> D + 'static,
315 mut handler: impl FnMut(&D, Option<&D>, Option<T>) -> T + 'static,
316 immediate: bool,
317 ) -> Self
318 where
319 D: 'static,
320 T: 'static,
321 {
322 let inner = cfg!(feature = "effects").then(|| {
323 let (mut rx, owner, inner) = effect_base();
324 let mut first_run = true;
325 let dep_value = Arc::new(RwLock::new(None::<D>));
326 let watch_value = Arc::new(RwLock::new(None::<T>));
327
328 Executor::spawn_local({
329 let dep_value = Arc::clone(&dep_value);
330 let watch_value = Arc::clone(&watch_value);
331 let subscriber = inner.to_any_subscriber();
332
333 async move {
334 while rx.next().await.is_some() {
335 if !owner.paused()
336 && (subscriber.with_observer(|| {
337 subscriber.update_if_necessary()
338 }) || first_run)
339 {
340 subscriber.clear_sources(&subscriber);
341
342 let old_dep_value = mem::take(
343 &mut *dep_value.write().or_poisoned(),
344 );
345 let new_dep_value = owner.with_cleanup(|| {
346 subscriber.with_observer(&mut dependency_fn)
347 });
348
349 let old_watch_value = mem::take(
350 &mut *watch_value.write().or_poisoned(),
351 );
352
353 if immediate || !first_run {
354 let new_watch_value = handler(
355 &new_dep_value,
356 old_dep_value.as_ref(),
357 old_watch_value,
358 );
359
360 *watch_value.write().or_poisoned() =
361 Some(new_watch_value);
362 }
363
364 *dep_value.write().or_poisoned() =
365 Some(new_dep_value);
366
367 first_run = false;
368 }
369 }
370 }
371 });
372
373 ArenaItem::new_with_storage(Some(inner))
374 });
375
376 Self { inner }
377 }
378}
379
380impl Effect<SyncStorage> {
381 /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
382 /// that are read inside it change.
383 ///
384 /// This spawns a task that can be run on any thread. For an effect that will be spawned on
385 /// the current thread, use [`new`](Effect::new).
386 pub fn new_sync<T, M>(
387 fun: impl EffectFunction<T, M> + Send + Sync + 'static,
388 ) -> Self
389 where
390 T: Send + Sync + 'static,
391 {
392 if !cfg!(feature = "effects") {
393 return Self { inner: None };
394 }
395
396 Self::new_isomorphic(fun)
397 }
398
399 /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
400 /// that are read inside it change.
401 ///
402 /// This will run whether the `effects` feature is enabled or not.
403 pub fn new_isomorphic<T, M>(
404 mut fun: impl EffectFunction<T, M> + Send + Sync + 'static,
405 ) -> Self
406 where
407 T: Send + Sync + 'static,
408 {
409 let (mut rx, owner, inner) = effect_base();
410 let mut first_run = true;
411 let value = Arc::new(RwLock::new(None::<T>));
412
413 let task = {
414 let value = Arc::clone(&value);
415 let subscriber = inner.to_any_subscriber();
416
417 async move {
418 while rx.next().await.is_some() {
419 if !owner.paused()
420 && (subscriber
421 .with_observer(|| subscriber.update_if_necessary())
422 || first_run)
423 {
424 first_run = false;
425 subscriber.clear_sources(&subscriber);
426
427 let old_value =
428 mem::take(&mut *value.write().or_poisoned());
429 let new_value = owner.with_cleanup(|| {
430 subscriber.with_observer(|| {
431 run_in_effect_scope(|| fun.run(old_value))
432 })
433 });
434 *value.write().or_poisoned() = Some(new_value);
435 }
436 }
437 }
438 };
439
440 crate::spawn(task);
441
442 Self {
443 inner: Some(ArenaItem::new_with_storage(Some(inner))),
444 }
445 }
446
447 /// This is to [`Effect::watch`] what [`Effect::new_sync`] is to [`Effect::new`].
448 pub fn watch_sync<D, T>(
449 mut dependency_fn: impl FnMut() -> D + Send + Sync + 'static,
450 mut handler: impl FnMut(&D, Option<&D>, Option<T>) -> T
451 + Send
452 + Sync
453 + 'static,
454 immediate: bool,
455 ) -> Self
456 where
457 D: Send + Sync + 'static,
458 T: Send + Sync + 'static,
459 {
460 let (mut rx, owner, inner) = effect_base();
461 let mut first_run = true;
462 let dep_value = Arc::new(RwLock::new(None::<D>));
463 let watch_value = Arc::new(RwLock::new(None::<T>));
464
465 let inner = cfg!(feature = "effects").then(|| {
466 crate::spawn({
467 let dep_value = Arc::clone(&dep_value);
468 let watch_value = Arc::clone(&watch_value);
469 let subscriber = inner.to_any_subscriber();
470
471 async move {
472 while rx.next().await.is_some() {
473 if !owner.paused()
474 && (subscriber.with_observer(|| {
475 subscriber.update_if_necessary()
476 }) || first_run)
477 {
478 subscriber.clear_sources(&subscriber);
479
480 let old_dep_value = mem::take(
481 &mut *dep_value.write().or_poisoned(),
482 );
483 let new_dep_value = owner.with_cleanup(|| {
484 subscriber.with_observer(&mut dependency_fn)
485 });
486
487 let old_watch_value = mem::take(
488 &mut *watch_value.write().or_poisoned(),
489 );
490
491 if immediate || !first_run {
492 let new_watch_value = handler(
493 &new_dep_value,
494 old_dep_value.as_ref(),
495 old_watch_value,
496 );
497
498 *watch_value.write().or_poisoned() =
499 Some(new_watch_value);
500 }
501
502 *dep_value.write().or_poisoned() =
503 Some(new_dep_value);
504
505 first_run = false;
506 }
507 }
508 }
509 });
510
511 ArenaItem::new_with_storage(Some(inner))
512 });
513
514 Self { inner }
515 }
516}
517
518impl<S> ToAnySubscriber for Effect<S>
519where
520 S: Storage<StoredEffect>,
521{
522 fn to_any_subscriber(&self) -> AnySubscriber {
523 self.inner
524 .and_then(|inner| {
525 inner
526 .try_with_value(|inner| {
527 inner.as_ref().map(|inner| inner.to_any_subscriber())
528 })
529 .flatten()
530 })
531 .expect("tried to set effect that has been stopped")
532 }
533}
534
535/// Creates an [`Effect`].
536#[inline(always)]
537#[track_caller]
538#[deprecated = "This function is being removed to conform to Rust idioms. \
539 Please use `Effect::new()` instead."]
540pub fn create_effect<T>(
541 fun: impl FnMut(Option<T>) -> T + 'static,
542) -> Effect<LocalStorage>
543where
544 T: 'static,
545{
546 Effect::new(fun)
547}
548
549/// Creates an [`Effect`], equivalent to [Effect::watch].
550#[inline(always)]
551#[track_caller]
552#[deprecated = "This function is being removed to conform to Rust idioms. \
553 Please use `Effect::watch()` instead."]
554pub fn watch<W, T>(
555 deps: impl Fn() -> W + 'static,
556 callback: impl Fn(&W, Option<&W>, Option<T>) -> T + Clone + 'static,
557 immediate: bool,
558) -> impl Fn() + Clone
559where
560 W: Clone + 'static,
561 T: 'static,
562{
563 let watch = Effect::watch(deps, callback, immediate);
564
565 move || watch.stop()
566}