1use crate::{
2 channel::channel,
3 effect::inner::EffectInner,
4 graph::{
5 AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber,
6 WithObserver,
7 },
8 owner::Owner,
9};
10use futures::StreamExt;
11use or_poisoned::OrPoisoned;
12#[cfg(feature = "subsecond")]
13use std::sync::Mutex;
14use std::{
15 fmt::Debug,
16 future::{Future, IntoFuture},
17 mem,
18 pin::Pin,
19 sync::{Arc, RwLock, Weak},
20};
21
22#[must_use = "A RenderEffect will be canceled when it is dropped. Creating a \
35 RenderEffect that is not stored in some other data structure or \
36 leaked will drop it immediately, and it will not react to \
37 changes in signals it reads."]
38pub struct RenderEffect<T>
39where
40 T: 'static,
41{
42 value: Arc<RwLock<Option<T>>>,
43 inner: Arc<RwLock<EffectInner>>,
44}
45
46impl<T> Debug for RenderEffect<T> {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("RenderEffect")
49 .field("inner", &Arc::as_ptr(&self.inner))
50 .finish()
51 }
52}
53
54#[cfg(feature = "subsecond")]
55type CurrentHotPtr = Box<dyn Fn() -> Option<subsecond::HotFnPtr> + Send + Sync>;
56
57impl<T> RenderEffect<T>
58where
59 T: 'static,
60{
61 pub fn new(fun: impl FnMut(Option<T>) -> T + 'static) -> Self {
63 #[cfg(feature = "subsecond")]
64 let (hot_fn_ptr, fun) = {
65 let fun = Arc::new(Mutex::new(subsecond::HotFn::current(fun)));
66 (
67 {
68 let fun = Arc::downgrade(&fun);
69 let wrapped = send_wrapper::SendWrapper::new(move || {
70 fun.upgrade()
71 .map(|n| n.lock().or_poisoned().ptr_address())
72 });
73 #[allow(clippy::redundant_closure)]
75 Box::new(move || wrapped())
76 },
77 move |prev| fun.lock().or_poisoned().call((prev,)),
78 )
79 };
80
81 Self::new_with_value_erased(
82 Box::new(fun),
83 None,
84 #[cfg(feature = "subsecond")]
85 hot_fn_ptr,
86 )
87 }
88
89 pub fn new_with_value(
91 fun: impl FnMut(Option<T>) -> T + 'static,
92 initial_value: Option<T>,
93 ) -> Self {
94 #[cfg(feature = "subsecond")]
95 let (hot_fn_ptr, fun) = {
96 let fun = Arc::new(Mutex::new(subsecond::HotFn::current(fun)));
97 (
98 {
99 let fun = Arc::downgrade(&fun);
100 let wrapped = send_wrapper::SendWrapper::new(move || {
101 fun.upgrade()
102 .map(|n| n.lock().or_poisoned().ptr_address())
103 });
104 #[allow(clippy::redundant_closure)]
106 Box::new(move || wrapped())
107 },
108 move |prev| fun.lock().or_poisoned().call((prev,)),
109 )
110 };
111
112 Self::new_with_value_erased(
113 Box::new(fun),
114 initial_value,
115 #[cfg(feature = "subsecond")]
116 hot_fn_ptr,
117 )
118 }
119
120 pub async fn new_with_async_value(
122 fun: impl FnMut(Option<T>) -> T + 'static,
123 value: impl IntoFuture<Output = T> + 'static,
124 ) -> Self {
125 #[cfg(feature = "subsecond")]
126 let mut fun = subsecond::HotFn::current(fun);
127 #[cfg(feature = "subsecond")]
128 let fun = move |prev| fun.call((prev,));
129
130 Self::new_with_async_value_erased(
131 Box::new(fun),
132 Box::pin(value.into_future()),
133 )
134 .await
135 }
136
137 fn new_with_value_erased(
138 #[allow(unused_mut)] mut fun: Box<dyn FnMut(Option<T>) -> T + 'static>,
139 initial_value: Option<T>,
140 #[allow(unused)]
143 #[cfg(feature = "subsecond")]
144 hot_fn_ptr: CurrentHotPtr,
145 ) -> Self {
146 fn prep() -> (Owner, Arc<RwLock<EffectInner>>, crate::channel::Receiver)
148 {
149 let (observer, rx) = channel();
150 let owner = Owner::new();
151 let inner = Arc::new(RwLock::new(EffectInner {
152 dirty: false,
153 observer,
154 sources: SourceSet::new(),
155 }));
156 (owner, inner, rx)
157 }
158
159 let (owner, inner, mut rx) = prep();
160
161 let value = Arc::new(RwLock::new(None::<T>));
162
163 #[cfg(not(feature = "effects"))]
164 {
165 let _ = initial_value;
166 let _ = owner;
167 let _ = &mut rx;
168 let _ = fun;
169 }
170
171 #[cfg(feature = "effects")]
172 {
173 let subscriber = inner.to_any_subscriber();
174
175 #[cfg(all(feature = "subsecond", debug_assertions))]
176 let mut fun = {
177 use crate::graph::ReactiveNode;
178 use rustc_hash::FxHashMap;
179 use std::sync::{Arc, LazyLock, Mutex};
180 use subsecond::HotFnPtr;
181
182 static HOT_RELOAD_SUBSCRIBERS: LazyLock<
183 Mutex<FxHashMap<AnySubscriber, (HotFnPtr, CurrentHotPtr)>>,
184 > = LazyLock::new(|| {
185 subsecond::register_handler(Arc::new(|| {
186 HOT_RELOAD_SUBSCRIBERS.lock().or_poisoned().retain(
187 |subscriber, (prev_ptr, hot_fn_ptr)| {
188 match hot_fn_ptr() {
189 None => false,
190 Some(curr_hot_ptr) => {
191 if curr_hot_ptr != *prev_ptr {
192 crate::log_warning(format_args!(
193 "{prev_ptr:?} <> \
194 {curr_hot_ptr:?}",
195 ));
196 *prev_ptr = curr_hot_ptr;
197
198 subscriber.mark_dirty();
199 }
200 true
201 }
202 }
203 },
204 );
205 }));
206 Default::default()
207 });
208
209 let mut fun = subsecond::HotFn::current(fun);
210 let initial_ptr = hot_fn_ptr().unwrap();
211 HOT_RELOAD_SUBSCRIBERS
212 .lock()
213 .or_poisoned()
214 .insert(subscriber.clone(), (initial_ptr, hot_fn_ptr));
215 move |prev| fun.call((prev,))
216 };
217
218 *value.write().or_poisoned() = Some(
219 owner.with(|| subscriber.with_observer(|| fun(initial_value))),
220 );
221
222 any_spawner::Executor::spawn_local({
223 let value = Arc::clone(&value);
224
225 async move {
226 while rx.next().await.is_some() {
227 if !owner.paused()
228 && subscriber.with_observer(|| {
229 subscriber.update_if_necessary()
230 })
231 {
232 subscriber.clear_sources(&subscriber);
233
234 let old_value =
235 mem::take(&mut *value.write().or_poisoned());
236 let new_value = owner.with_cleanup(|| {
237 subscriber.with_observer(|| fun(old_value))
238 });
239 *value.write().or_poisoned() = Some(new_value);
240 }
241 }
242 }
243 });
244 }
245
246 RenderEffect { value, inner }
247 }
248
249 async fn new_with_async_value_erased(
250 mut fun: Box<dyn FnMut(Option<T>) -> T + 'static>,
251 initial_value: Pin<Box<dyn Future<Output = T>>>,
252 ) -> Self {
253 fn prep() -> (Owner, Arc<RwLock<EffectInner>>, crate::channel::Receiver)
255 {
256 let (observer, rx) = channel();
257 let owner = Owner::new();
258 let inner = Arc::new(RwLock::new(EffectInner {
259 dirty: false,
260 observer,
261 sources: SourceSet::new(),
262 }));
263 (owner, inner, rx)
264 }
265
266 let (owner, inner, mut rx) = prep();
267
268 let value = Arc::new(RwLock::new(None::<T>));
269
270 #[cfg(not(feature = "effects"))]
271 {
272 drop(initial_value);
273 let _ = owner;
274 let _ = &mut rx;
275 let _ = &mut fun;
276 }
277
278 #[cfg(feature = "effects")]
279 {
280 use crate::computed::ScopedFuture;
281
282 let subscriber = inner.to_any_subscriber();
283
284 let initial = subscriber
285 .with_observer(|| ScopedFuture::new(initial_value))
286 .await;
287 *value.write().or_poisoned() = Some(initial);
288
289 any_spawner::Executor::spawn_local({
290 let value = Arc::clone(&value);
291
292 async move {
293 while rx.next().await.is_some() {
294 if !owner.paused()
295 && subscriber.with_observer(|| {
296 subscriber.update_if_necessary()
297 })
298 {
299 subscriber.clear_sources(&subscriber);
300
301 let old_value =
302 mem::take(&mut *value.write().or_poisoned());
303 let new_value = owner.with_cleanup(|| {
304 subscriber.with_observer(|| fun(old_value))
305 });
306 *value.write().or_poisoned() = Some(new_value);
307 }
308 }
309 }
310 });
311 }
312
313 RenderEffect { value, inner }
314 }
315
316 pub fn with_value_mut<U>(
318 &self,
319 fun: impl FnOnce(&mut T) -> U,
320 ) -> Option<U> {
321 self.value.write().or_poisoned().as_mut().map(fun)
322 }
323
324 pub fn take_value(&self) -> Option<T> {
326 self.value.write().or_poisoned().take()
327 }
328}
329
330impl<T> RenderEffect<T>
331where
332 T: Send + Sync + 'static,
333{
334 pub fn new_isomorphic(
336 fun: impl FnMut(Option<T>) -> T + Send + Sync + 'static,
337 ) -> Self {
338 #[cfg(feature = "subsecond")]
339 let mut fun = subsecond::HotFn::current(fun);
340 #[cfg(feature = "subsecond")]
341 let fun = move |prev| fun.call((prev,));
342
343 fn erased<T: Send + Sync + 'static>(
344 mut fun: Box<dyn FnMut(Option<T>) -> T + Send + Sync + 'static>,
345 ) -> RenderEffect<T> {
346 let (observer, mut rx) = channel();
347 let value = Arc::new(RwLock::new(None::<T>));
348 let owner = Owner::new();
349 let inner = Arc::new(RwLock::new(EffectInner {
350 dirty: false,
351 observer,
352 sources: SourceSet::new(),
353 }));
354
355 let initial_value = owner
356 .with(|| inner.to_any_subscriber().with_observer(|| fun(None)));
357 *value.write().or_poisoned() = Some(initial_value);
358
359 crate::spawn({
360 let value = Arc::clone(&value);
361 let subscriber = inner.to_any_subscriber();
362
363 async move {
364 while rx.next().await.is_some() {
365 if !owner.paused()
366 && subscriber.with_observer(|| {
367 subscriber.update_if_necessary()
368 })
369 {
370 subscriber.clear_sources(&subscriber);
371
372 let old_value =
373 mem::take(&mut *value.write().or_poisoned());
374 let new_value = owner.with_cleanup(|| {
375 subscriber.with_observer(|| fun(old_value))
376 });
377 *value.write().or_poisoned() = Some(new_value);
378 }
379 }
380 }
381 });
382
383 RenderEffect { value, inner }
384 }
385
386 erased(Box::new(fun))
387 }
388}
389
390impl<T> ToAnySubscriber for RenderEffect<T> {
391 fn to_any_subscriber(&self) -> AnySubscriber {
392 AnySubscriber(
393 Arc::as_ptr(&self.inner) as usize,
394 Arc::downgrade(&self.inner) as Weak<dyn Subscriber + Send + Sync>,
395 )
396 }
397}