reactive_graph/effect/
render_effect.rs

1use crate::{
2    channel::channel,
3    effect::inner::EffectInner,
4    graph::{
5        AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber,
6        WithObserver,
7    },
8    owner::Owner,
9};
10use futures::StreamExt;
11use or_poisoned::OrPoisoned;
12#[cfg(feature = "subsecond")]
13use std::sync::Mutex;
14use std::{
15    fmt::Debug,
16    future::{Future, IntoFuture},
17    mem,
18    pin::Pin,
19    sync::{Arc, RwLock, Weak},
20};
21
22/// A render effect is similar to an [`Effect`](super::Effect), but with two key differences:
23/// 1. Its first run takes place immediately and synchronously: for example, if it is being used to
24///    drive a user interface, it will run during rendering, not on the next tick after rendering.
25///    (Hence “render effect.”)
26/// 2. It is canceled when the `RenderEffect` itself is dropped, rather than being stored in the
27///    reactive system and canceled when the `Owner` cleans up.
28///
29/// Unless you are implementing a rendering framework, or require one of these two characteristics,
30/// it is unlikely you will use render effects directly.
31///
32/// Like an [`Effect`](super::Effect), a render effect runs only with the `effects` feature
33/// enabled.
34#[must_use = "A RenderEffect will be canceled when it is dropped. Creating a \
35              RenderEffect that is not stored in some other data structure or \
36              leaked will drop it immediately, and it will not react to \
37              changes in signals it reads."]
38pub struct RenderEffect<T>
39where
40    T: 'static,
41{
42    value: Arc<RwLock<Option<T>>>,
43    inner: Arc<RwLock<EffectInner>>,
44}
45
46impl<T> Debug for RenderEffect<T> {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("RenderEffect")
49            .field("inner", &Arc::as_ptr(&self.inner))
50            .finish()
51    }
52}
53
54#[cfg(feature = "subsecond")]
55type CurrentHotPtr = Box<dyn Fn() -> Option<subsecond::HotFnPtr> + Send + Sync>;
56
57impl<T> RenderEffect<T>
58where
59    T: 'static,
60{
61    /// Creates a new render effect, which immediately runs `fun`.
62    pub fn new(fun: impl FnMut(Option<T>) -> T + 'static) -> Self {
63        #[cfg(feature = "subsecond")]
64        let (hot_fn_ptr, fun) = {
65            let fun = Arc::new(Mutex::new(subsecond::HotFn::current(fun)));
66            (
67                {
68                    let fun = Arc::downgrade(&fun);
69                    let wrapped = send_wrapper::SendWrapper::new(move || {
70                        fun.upgrade()
71                            .map(|n| n.lock().or_poisoned().ptr_address())
72                    });
73                    // it's not redundant, it's due to the SendWrapper deref
74                    #[allow(clippy::redundant_closure)]
75                    Box::new(move || wrapped())
76                },
77                move |prev| fun.lock().or_poisoned().call((prev,)),
78            )
79        };
80
81        Self::new_with_value_erased(
82            Box::new(fun),
83            None,
84            #[cfg(feature = "subsecond")]
85            hot_fn_ptr,
86        )
87    }
88
89    /// Creates a new render effect with an initial value.
90    pub fn new_with_value(
91        fun: impl FnMut(Option<T>) -> T + 'static,
92        initial_value: Option<T>,
93    ) -> Self {
94        #[cfg(feature = "subsecond")]
95        let (hot_fn_ptr, fun) = {
96            let fun = Arc::new(Mutex::new(subsecond::HotFn::current(fun)));
97            (
98                {
99                    let fun = Arc::downgrade(&fun);
100                    let wrapped = send_wrapper::SendWrapper::new(move || {
101                        fun.upgrade()
102                            .map(|n| n.lock().or_poisoned().ptr_address())
103                    });
104                    // it's not redundant, it's due to the SendWrapper deref
105                    #[allow(clippy::redundant_closure)]
106                    Box::new(move || wrapped())
107                },
108                move |prev| fun.lock().or_poisoned().call((prev,)),
109            )
110        };
111
112        Self::new_with_value_erased(
113            Box::new(fun),
114            initial_value,
115            #[cfg(feature = "subsecond")]
116            hot_fn_ptr,
117        )
118    }
119
120    /// Creates a new render effect, which immediately runs `fun`.
121    pub async fn new_with_async_value(
122        fun: impl FnMut(Option<T>) -> T + 'static,
123        value: impl IntoFuture<Output = T> + 'static,
124    ) -> Self {
125        #[cfg(feature = "subsecond")]
126        let mut fun = subsecond::HotFn::current(fun);
127        #[cfg(feature = "subsecond")]
128        let fun = move |prev| fun.call((prev,));
129
130        Self::new_with_async_value_erased(
131            Box::new(fun),
132            Box::pin(value.into_future()),
133        )
134        .await
135    }
136
137    fn new_with_value_erased(
138        #[allow(unused_mut)] mut fun: Box<dyn FnMut(Option<T>) -> T + 'static>,
139        initial_value: Option<T>,
140        // this argument can be used to invalidate individual effects in the future
141        // in present experiments, I have found that it is not actually granular enough to make a difference
142        #[allow(unused)]
143        #[cfg(feature = "subsecond")]
144        hot_fn_ptr: CurrentHotPtr,
145    ) -> Self {
146        // codegen optimisation:
147        fn prep() -> (Owner, Arc<RwLock<EffectInner>>, crate::channel::Receiver)
148        {
149            let (observer, rx) = channel();
150            let owner = Owner::new();
151            let inner = Arc::new(RwLock::new(EffectInner {
152                dirty: false,
153                observer,
154                sources: SourceSet::new(),
155            }));
156            (owner, inner, rx)
157        }
158
159        let (owner, inner, mut rx) = prep();
160
161        let value = Arc::new(RwLock::new(None::<T>));
162
163        #[cfg(not(feature = "effects"))]
164        {
165            let _ = initial_value;
166            let _ = owner;
167            let _ = &mut rx;
168            let _ = fun;
169        }
170
171        #[cfg(feature = "effects")]
172        {
173            let subscriber = inner.to_any_subscriber();
174
175            #[cfg(all(feature = "subsecond", debug_assertions))]
176            let mut fun = {
177                use crate::graph::ReactiveNode;
178                use rustc_hash::FxHashMap;
179                use std::sync::{Arc, LazyLock, Mutex};
180                use subsecond::HotFnPtr;
181
182                static HOT_RELOAD_SUBSCRIBERS: LazyLock<
183                    Mutex<FxHashMap<AnySubscriber, (HotFnPtr, CurrentHotPtr)>>,
184                > = LazyLock::new(|| {
185                    subsecond::register_handler(Arc::new(|| {
186                        HOT_RELOAD_SUBSCRIBERS.lock().or_poisoned().retain(
187                            |subscriber, (prev_ptr, hot_fn_ptr)| {
188                                match hot_fn_ptr() {
189                                    None => false,
190                                    Some(curr_hot_ptr) => {
191                                        if curr_hot_ptr != *prev_ptr {
192                                            crate::log_warning(format_args!(
193                                                "{prev_ptr:?} <> \
194                                                 {curr_hot_ptr:?}",
195                                            ));
196                                            *prev_ptr = curr_hot_ptr;
197
198                                            subscriber.mark_dirty();
199                                        }
200                                        true
201                                    }
202                                }
203                            },
204                        );
205                    }));
206                    Default::default()
207                });
208
209                let mut fun = subsecond::HotFn::current(fun);
210                let initial_ptr = hot_fn_ptr().unwrap();
211                HOT_RELOAD_SUBSCRIBERS
212                    .lock()
213                    .or_poisoned()
214                    .insert(subscriber.clone(), (initial_ptr, hot_fn_ptr));
215                move |prev| fun.call((prev,))
216            };
217
218            *value.write().or_poisoned() = Some(
219                owner.with(|| subscriber.with_observer(|| fun(initial_value))),
220            );
221
222            any_spawner::Executor::spawn_local({
223                let value = Arc::clone(&value);
224
225                async move {
226                    while rx.next().await.is_some() {
227                        if !owner.paused()
228                            && subscriber.with_observer(|| {
229                                subscriber.update_if_necessary()
230                            })
231                        {
232                            subscriber.clear_sources(&subscriber);
233
234                            let old_value =
235                                mem::take(&mut *value.write().or_poisoned());
236                            let new_value = owner.with_cleanup(|| {
237                                subscriber.with_observer(|| fun(old_value))
238                            });
239                            *value.write().or_poisoned() = Some(new_value);
240                        }
241                    }
242                }
243            });
244        }
245
246        RenderEffect { value, inner }
247    }
248
249    async fn new_with_async_value_erased(
250        mut fun: Box<dyn FnMut(Option<T>) -> T + 'static>,
251        initial_value: Pin<Box<dyn Future<Output = T>>>,
252    ) -> Self {
253        // codegen optimisation:
254        fn prep() -> (Owner, Arc<RwLock<EffectInner>>, crate::channel::Receiver)
255        {
256            let (observer, rx) = channel();
257            let owner = Owner::new();
258            let inner = Arc::new(RwLock::new(EffectInner {
259                dirty: false,
260                observer,
261                sources: SourceSet::new(),
262            }));
263            (owner, inner, rx)
264        }
265
266        let (owner, inner, mut rx) = prep();
267
268        let value = Arc::new(RwLock::new(None::<T>));
269
270        #[cfg(not(feature = "effects"))]
271        {
272            drop(initial_value);
273            let _ = owner;
274            let _ = &mut rx;
275            let _ = &mut fun;
276        }
277
278        #[cfg(feature = "effects")]
279        {
280            use crate::computed::ScopedFuture;
281
282            let subscriber = inner.to_any_subscriber();
283
284            let initial = subscriber
285                .with_observer(|| ScopedFuture::new(initial_value))
286                .await;
287            *value.write().or_poisoned() = Some(initial);
288
289            any_spawner::Executor::spawn_local({
290                let value = Arc::clone(&value);
291
292                async move {
293                    while rx.next().await.is_some() {
294                        if !owner.paused()
295                            && subscriber.with_observer(|| {
296                                subscriber.update_if_necessary()
297                            })
298                        {
299                            subscriber.clear_sources(&subscriber);
300
301                            let old_value =
302                                mem::take(&mut *value.write().or_poisoned());
303                            let new_value = owner.with_cleanup(|| {
304                                subscriber.with_observer(|| fun(old_value))
305                            });
306                            *value.write().or_poisoned() = Some(new_value);
307                        }
308                    }
309                }
310            });
311        }
312
313        RenderEffect { value, inner }
314    }
315
316    /// Mutably accesses the current value.
317    pub fn with_value_mut<U>(
318        &self,
319        fun: impl FnOnce(&mut T) -> U,
320    ) -> Option<U> {
321        self.value.write().or_poisoned().as_mut().map(fun)
322    }
323
324    /// Takes the current value, replacing it with `None`.
325    pub fn take_value(&self) -> Option<T> {
326        self.value.write().or_poisoned().take()
327    }
328}
329
330impl<T> RenderEffect<T>
331where
332    T: Send + Sync + 'static,
333{
334    /// Creates a render effect that will run whether the `effects` feature is enabled or not.
335    pub fn new_isomorphic(
336        fun: impl FnMut(Option<T>) -> T + Send + Sync + 'static,
337    ) -> Self {
338        #[cfg(feature = "subsecond")]
339        let mut fun = subsecond::HotFn::current(fun);
340        #[cfg(feature = "subsecond")]
341        let fun = move |prev| fun.call((prev,));
342
343        fn erased<T: Send + Sync + 'static>(
344            mut fun: Box<dyn FnMut(Option<T>) -> T + Send + Sync + 'static>,
345        ) -> RenderEffect<T> {
346            let (observer, mut rx) = channel();
347            let value = Arc::new(RwLock::new(None::<T>));
348            let owner = Owner::new();
349            let inner = Arc::new(RwLock::new(EffectInner {
350                dirty: false,
351                observer,
352                sources: SourceSet::new(),
353            }));
354
355            let initial_value = owner
356                .with(|| inner.to_any_subscriber().with_observer(|| fun(None)));
357            *value.write().or_poisoned() = Some(initial_value);
358
359            crate::spawn({
360                let value = Arc::clone(&value);
361                let subscriber = inner.to_any_subscriber();
362
363                async move {
364                    while rx.next().await.is_some() {
365                        if !owner.paused()
366                            && subscriber.with_observer(|| {
367                                subscriber.update_if_necessary()
368                            })
369                        {
370                            subscriber.clear_sources(&subscriber);
371
372                            let old_value =
373                                mem::take(&mut *value.write().or_poisoned());
374                            let new_value = owner.with_cleanup(|| {
375                                subscriber.with_observer(|| fun(old_value))
376                            });
377                            *value.write().or_poisoned() = Some(new_value);
378                        }
379                    }
380                }
381            });
382
383            RenderEffect { value, inner }
384        }
385
386        erased(Box::new(fun))
387    }
388}
389
390impl<T> ToAnySubscriber for RenderEffect<T> {
391    fn to_any_subscriber(&self) -> AnySubscriber {
392        AnySubscriber(
393            Arc::as_ptr(&self.inner) as usize,
394            Arc::downgrade(&self.inner) as Weak<dyn Subscriber + Send + Sync>,
395        )
396    }
397}