reactive_graph/effect/
render_effect.rs1use crate::{
2 channel::channel,
3 effect::inner::EffectInner,
4 graph::{
5 AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber,
6 WithObserver,
7 },
8 owner::Owner,
9};
10use any_spawner::Executor;
11use futures::StreamExt;
12use or_poisoned::OrPoisoned;
13use std::{
14 fmt::Debug,
15 mem,
16 sync::{Arc, RwLock, Weak},
17};
18
19#[must_use = "A RenderEffect will be canceled when it is dropped. Creating a \
32 RenderEffect that is not stored in some other data structure or \
33 leaked will drop it immediately, and it will not react to \
34 changes in signals it reads."]
35pub struct RenderEffect<T>
36where
37 T: 'static,
38{
39 value: Arc<RwLock<Option<T>>>,
40 inner: Arc<RwLock<EffectInner>>,
41}
42
43impl<T> Debug for RenderEffect<T> {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 f.debug_struct("RenderEffect")
46 .field("inner", &Arc::as_ptr(&self.inner))
47 .finish()
48 }
49}
50
51impl<T> RenderEffect<T>
52where
53 T: 'static,
54{
55 pub fn new(fun: impl FnMut(Option<T>) -> T + 'static) -> Self {
57 Self::new_with_value(fun, None)
58 }
59
60 pub fn new_with_value(
62 fun: impl FnMut(Option<T>) -> T + 'static,
63 initial_value: Option<T>,
64 ) -> Self {
65 fn erased<T>(
66 mut fun: Box<dyn FnMut(Option<T>) -> T + 'static>,
67 initial_value: Option<T>,
68 ) -> RenderEffect<T> {
69 let (observer, mut rx) = channel();
70 let value = Arc::new(RwLock::new(None::<T>));
71 let owner = Owner::new();
72 let inner = Arc::new(RwLock::new(EffectInner {
73 dirty: false,
74 observer,
75 sources: SourceSet::new(),
76 }));
77
78 let initial_value = cfg!(feature = "effects").then(|| {
79 owner.with(|| {
80 inner
81 .to_any_subscriber()
82 .with_observer(|| fun(initial_value))
83 })
84 });
85 *value.write().or_poisoned() = initial_value;
86
87 if cfg!(feature = "effects") {
88 Executor::spawn_local({
89 let value = Arc::clone(&value);
90 let subscriber = inner.to_any_subscriber();
91
92 async move {
93 while rx.next().await.is_some() {
94 if !owner.paused()
95 && subscriber.with_observer(|| {
96 subscriber.update_if_necessary()
97 })
98 {
99 subscriber.clear_sources(&subscriber);
100
101 let old_value = mem::take(
102 &mut *value.write().or_poisoned(),
103 );
104 let new_value = owner.with_cleanup(|| {
105 subscriber.with_observer(|| fun(old_value))
106 });
107 *value.write().or_poisoned() = Some(new_value);
108 }
109 }
110 }
111 });
112 }
113
114 RenderEffect { value, inner }
115 }
116
117 erased(Box::new(fun), initial_value)
118 }
119
120 pub fn with_value_mut<U>(
122 &self,
123 fun: impl FnOnce(&mut T) -> U,
124 ) -> Option<U> {
125 self.value.write().or_poisoned().as_mut().map(fun)
126 }
127
128 pub fn take_value(&self) -> Option<T> {
130 self.value.write().or_poisoned().take()
131 }
132}
133
134impl<T> RenderEffect<T>
135where
136 T: Send + Sync + 'static,
137{
138 pub fn new_isomorphic(
140 fun: impl FnMut(Option<T>) -> T + Send + Sync + 'static,
141 ) -> Self {
142 fn erased<T: Send + Sync + 'static>(
143 mut fun: Box<dyn FnMut(Option<T>) -> T + Send + Sync + 'static>,
144 ) -> RenderEffect<T> {
145 let (observer, mut rx) = channel();
146 let value = Arc::new(RwLock::new(None::<T>));
147 let owner = Owner::new();
148 let inner = Arc::new(RwLock::new(EffectInner {
149 dirty: false,
150 observer,
151 sources: SourceSet::new(),
152 }));
153
154 let initial_value = owner
155 .with(|| inner.to_any_subscriber().with_observer(|| fun(None)));
156 *value.write().or_poisoned() = Some(initial_value);
157
158 crate::spawn({
159 let value = Arc::clone(&value);
160 let subscriber = inner.to_any_subscriber();
161
162 async move {
163 while rx.next().await.is_some() {
164 if !owner.paused()
165 && subscriber.with_observer(|| {
166 subscriber.update_if_necessary()
167 })
168 {
169 subscriber.clear_sources(&subscriber);
170
171 let old_value =
172 mem::take(&mut *value.write().or_poisoned());
173 let new_value = owner.with_cleanup(|| {
174 subscriber.with_observer(|| fun(old_value))
175 });
176 *value.write().or_poisoned() = Some(new_value);
177 }
178 }
179 }
180 });
181
182 RenderEffect { value, inner }
183 }
184
185 erased(Box::new(fun))
186 }
187}
188
189impl<T> ToAnySubscriber for RenderEffect<T> {
190 fn to_any_subscriber(&self) -> AnySubscriber {
191 AnySubscriber(
192 Arc::as_ptr(&self.inner) as usize,
193 Arc::downgrade(&self.inner) as Weak<dyn Subscriber + Send + Sync>,
194 )
195 }
196}