1use super::{
2 inner::{ArcAsyncDerivedInner, AsyncDerivedState},
3 AsyncDerivedReadyFuture, ScopedFuture,
4};
5#[cfg(feature = "sandboxed-arenas")]
6use crate::owner::Sandboxed;
7use crate::{
8 channel::channel,
9 computed::suspense::SuspenseContext,
10 diagnostics::SpecialNonReactiveFuture,
11 graph::{
12 AnySource, AnySubscriber, ReactiveNode, Source, SourceSet, Subscriber,
13 SubscriberSet, ToAnySource, ToAnySubscriber, WithObserver,
14 },
15 owner::{use_context, Owner},
16 send_wrapper_ext::SendOption,
17 signal::{
18 guards::{AsyncPlain, Mapped, MappedMut, ReadGuard, WriteGuard},
19 ArcTrigger,
20 },
21 traits::{
22 DefinedAt, IsDisposed, Notify, ReadUntracked, Track, UntrackableGuard,
23 Write,
24 },
25 transition::AsyncTransition,
26};
27use async_lock::RwLock as AsyncRwLock;
28use core::fmt::Debug;
29use futures::{channel::oneshot, FutureExt, StreamExt};
30use or_poisoned::OrPoisoned;
31use std::{
32 future::Future,
33 mem,
34 ops::{Deref, DerefMut},
35 panic::Location,
36 sync::{
37 atomic::{AtomicBool, Ordering},
38 Arc, RwLock, Weak,
39 },
40 task::Waker,
41};
42
43pub struct ArcAsyncDerived<T> {
109 #[cfg(any(debug_assertions, leptos_debuginfo))]
110 pub(crate) defined_at: &'static Location<'static>,
111 pub(crate) value: Arc<AsyncRwLock<SendOption<T>>>,
113 pub(crate) wakers: Arc<RwLock<Vec<Waker>>>,
115 pub(crate) inner: Arc<RwLock<ArcAsyncDerivedInner>>,
116 pub(crate) loading: Arc<AtomicBool>,
117}
118
119#[allow(dead_code)]
120pub(crate) trait BlockingLock<T> {
121 fn blocking_read_arc(self: &Arc<Self>)
122 -> async_lock::RwLockReadGuardArc<T>;
123
124 fn blocking_write_arc(
125 self: &Arc<Self>,
126 ) -> async_lock::RwLockWriteGuardArc<T>;
127
128 fn blocking_read(&self) -> async_lock::RwLockReadGuard<'_, T>;
129
130 fn blocking_write(&self) -> async_lock::RwLockWriteGuard<'_, T>;
131}
132
133impl<T> BlockingLock<T> for AsyncRwLock<T> {
134 fn blocking_read_arc(
135 self: &Arc<Self>,
136 ) -> async_lock::RwLockReadGuardArc<T> {
137 #[cfg(not(target_family = "wasm"))]
138 {
139 self.read_arc_blocking()
140 }
141 #[cfg(target_family = "wasm")]
142 {
143 self.read_arc().now_or_never().unwrap()
144 }
145 }
146
147 fn blocking_write_arc(
148 self: &Arc<Self>,
149 ) -> async_lock::RwLockWriteGuardArc<T> {
150 #[cfg(not(target_family = "wasm"))]
151 {
152 self.write_arc_blocking()
153 }
154 #[cfg(target_family = "wasm")]
155 {
156 self.write_arc().now_or_never().unwrap()
157 }
158 }
159
160 fn blocking_read(&self) -> async_lock::RwLockReadGuard<'_, T> {
161 #[cfg(not(target_family = "wasm"))]
162 {
163 self.read_blocking()
164 }
165 #[cfg(target_family = "wasm")]
166 {
167 self.read().now_or_never().unwrap()
168 }
169 }
170
171 fn blocking_write(&self) -> async_lock::RwLockWriteGuard<'_, T> {
172 #[cfg(not(target_family = "wasm"))]
173 {
174 self.write_blocking()
175 }
176 #[cfg(target_family = "wasm")]
177 {
178 self.write().now_or_never().unwrap()
179 }
180 }
181}
182
183impl<T> Clone for ArcAsyncDerived<T> {
184 fn clone(&self) -> Self {
185 Self {
186 #[cfg(any(debug_assertions, leptos_debuginfo))]
187 defined_at: self.defined_at,
188 value: Arc::clone(&self.value),
189 wakers: Arc::clone(&self.wakers),
190 inner: Arc::clone(&self.inner),
191 loading: Arc::clone(&self.loading),
192 }
193 }
194}
195
196impl<T> Debug for ArcAsyncDerived<T> {
197 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 let mut f = f.debug_struct("ArcAsyncDerived");
199 #[cfg(any(debug_assertions, leptos_debuginfo))]
200 f.field("defined_at", &self.defined_at);
201 f.finish_non_exhaustive()
202 }
203}
204
205impl<T> DefinedAt for ArcAsyncDerived<T> {
206 #[inline(always)]
207 fn defined_at(&self) -> Option<&'static Location<'static>> {
208 #[cfg(any(debug_assertions, leptos_debuginfo))]
209 {
210 Some(self.defined_at)
211 }
212 #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
213 {
214 None
215 }
216 }
217}
218
219macro_rules! spawn_derived {
224 ($spawner:expr, $initial:ident, $fun:ident, $should_spawn:literal, $force_spawn:literal, $should_track:literal, $source:expr) => {{
225 let (notifier, mut rx) = channel();
226
227 let is_ready = $initial.is_some() && !$force_spawn;
228
229 let owner = Owner::new();
230 let inner = Arc::new(RwLock::new(ArcAsyncDerivedInner {
231 owner: owner.clone(),
232 notifier,
233 sources: SourceSet::new(),
234 subscribers: SubscriberSet::new(),
235 state: AsyncDerivedState::Clean,
236 version: 0,
237 suspenses: Vec::new(),
238 pending_suspenses: Vec::new()
239 }));
240 let value = Arc::new(AsyncRwLock::new($initial));
241 let wakers = Arc::new(RwLock::new(Vec::new()));
242
243 let this = ArcAsyncDerived {
244 #[cfg(any(debug_assertions, leptos_debuginfo))]
245 defined_at: Location::caller(),
246 value: Arc::clone(&value),
247 wakers,
248 inner: Arc::clone(&inner),
249 loading: Arc::new(AtomicBool::new(!is_ready)),
250 };
251 let any_subscriber = this.to_any_subscriber();
252 let initial_fut = if $should_track {
253 owner.with_cleanup(|| {
254 any_subscriber
255 .with_observer(|| ScopedFuture::new($fun()))
256 })
257 } else {
258 owner.with_cleanup(|| {
259 any_subscriber
260 .with_observer_untracked(|| ScopedFuture::new($fun()))
261 })
262 };
263 #[cfg(feature = "sandboxed-arenas")]
264 let initial_fut = Sandboxed::new(initial_fut);
265 let mut initial_fut = Box::pin(initial_fut);
266
267 let (was_ready, mut initial_fut) = {
268 if is_ready {
269 (true, None)
270 } else {
271 let initial = initial_fut.as_mut().now_or_never();
274 match initial {
275 None => {
276 inner.write().or_poisoned().notifier.notify();
277 (false, Some(initial_fut))
278 }
279 Some(orig_value) => {
280 let mut guard = this.inner.write().or_poisoned();
281
282 guard.state = AsyncDerivedState::Clean;
283 *value.blocking_write() = orig_value;
284 this.loading.store(false, Ordering::Relaxed);
285 (true, None)
286 }
287 }
288 }
289 };
290
291 let mut first_run = {
292 let (ready_tx, ready_rx) = oneshot::channel();
293 if !was_ready {
294 AsyncTransition::register(ready_rx);
295 }
296 Some(ready_tx)
297 };
298
299 if was_ready {
300 first_run.take();
301 }
302
303 if let Some(source) = $source {
304 any_subscriber.with_observer(|| source.track());
305 }
306
307 if $should_spawn {
308 $spawner({
309 let value = Arc::downgrade(&this.value);
310 let inner = Arc::downgrade(&this.inner);
311 let wakers = Arc::downgrade(&this.wakers);
312 let loading = Arc::downgrade(&this.loading);
313 let fut = async move {
314 let already_dirty = inner.upgrade()
318 .as_ref()
319 .and_then(|inner| inner.read().ok())
320 .map(|inner| inner.state == AsyncDerivedState::Dirty)
321 .unwrap_or(false);
322 if already_dirty {
323 initial_fut.take();
324 }
325
326 while rx.next().await.is_some() {
327 let update_if_necessary = !owner.paused() && if $should_track {
328 any_subscriber
329 .with_observer(|| any_subscriber.update_if_necessary())
330 } else {
331 any_subscriber
332 .with_observer_untracked(|| any_subscriber.update_if_necessary())
333 };
334 if update_if_necessary || first_run.is_some() {
335 match (value.upgrade(), inner.upgrade(), wakers.upgrade(), loading.upgrade()) {
336 (Some(value), Some(inner), Some(wakers), Some(loading)) => {
337 let owner = inner.read().or_poisoned().owner.clone();
339 let fut = initial_fut.take().unwrap_or_else(|| {
340 let fut = if $should_track {
341 owner.with_cleanup(|| {
342 any_subscriber
343 .with_observer(|| ScopedFuture::new($fun()))
344 })
345 } else {
346 owner.with_cleanup(|| {
347 any_subscriber
348 .with_observer_untracked(|| ScopedFuture::new($fun()))
349 })
350 };
351 #[cfg(feature = "sandboxed-arenas")]
352 let fut = Sandboxed::new(fut);
353 Box::pin(fut)
354 });
355
356 let ready_tx = first_run.take().unwrap_or_else(|| {
358 let (ready_tx, ready_rx) = oneshot::channel();
359 if !was_ready {
360 AsyncTransition::register(ready_rx);
361 }
362 ready_tx
363 });
364
365 loading.store(true, Ordering::Relaxed);
367
368 let this_version = {
369 let mut guard = inner.write().or_poisoned();
370 guard.version += 1;
371 let version = guard.version;
372 let suspense_ids = mem::take(&mut guard.suspenses)
373 .into_iter()
374 .map(|sc| sc.task_id())
375 .collect::<Vec<_>>();
376 guard.pending_suspenses.extend(suspense_ids);
377 version
378 };
379
380 let new_value = fut.await;
381
382 let latest_version = {
383 let mut guard = inner.write().or_poisoned();
384 drop(mem::take(&mut guard.pending_suspenses));
385 guard.version
386 };
387
388 if latest_version == this_version {
389 Self::set_inner_value(new_value, value, wakers, inner, loading, Some(ready_tx)).await;
390 }
391 }
392 _ => break,
393 }
394 }
395 }
396 };
397
398 #[cfg(feature = "sandboxed-arenas")]
399 let fut = Sandboxed::new(fut);
400
401 fut
402 });
403 }
404
405 (this, is_ready)
406 }};
407}
408
409impl<T: 'static> ArcAsyncDerived<T> {
410 async fn set_inner_value(
411 new_value: SendOption<T>,
412 value: Arc<AsyncRwLock<SendOption<T>>>,
413 wakers: Arc<RwLock<Vec<Waker>>>,
414 inner: Arc<RwLock<ArcAsyncDerivedInner>>,
415 loading: Arc<AtomicBool>,
416 ready_tx: Option<oneshot::Sender<()>>,
417 ) {
418 *value.write().await.deref_mut() = new_value;
419 Self::notify_subs(&wakers, &inner, &loading, ready_tx);
420 }
421
422 fn notify_subs(
423 wakers: &Arc<RwLock<Vec<Waker>>>,
424 inner: &Arc<RwLock<ArcAsyncDerivedInner>>,
425 loading: &Arc<AtomicBool>,
426 ready_tx: Option<oneshot::Sender<()>>,
427 ) {
428 loading.store(false, Ordering::Relaxed);
429
430 let prev_state = mem::replace(
431 &mut inner.write().or_poisoned().state,
432 AsyncDerivedState::Notifying,
433 );
434
435 if let Some(ready_tx) = ready_tx {
436 _ = ready_tx.send(());
441 }
442
443 for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
445 sub.mark_dirty();
446 }
447
448 for waker in mem::take(&mut *wakers.write().or_poisoned()) {
450 waker.wake();
451 }
452
453 inner.write().or_poisoned().state = prev_state;
457 }
458}
459
460impl<T: 'static> ArcAsyncDerived<T> {
461 #[track_caller]
466 pub fn new<Fut>(fun: impl Fn() -> Fut + Send + Sync + 'static) -> Self
467 where
468 T: Send + Sync + 'static,
469 Fut: Future<Output = T> + Send + 'static,
470 {
471 Self::new_with_initial(None, fun)
472 }
473
474 #[track_caller]
477 pub fn new_with_initial<Fut>(
478 initial_value: Option<T>,
479 fun: impl Fn() -> Fut + Send + Sync + 'static,
480 ) -> Self
481 where
482 T: Send + Sync + 'static,
483 Fut: Future<Output = T> + Send + 'static,
484 {
485 let fun = move || {
486 let fut = fun();
487 let fut = async move { SendOption::new(Some(fut.await)) };
488 #[cfg(feature = "sandboxed-arenas")]
489 let fut = Sandboxed::new(fut);
490 fut
491 };
492 let initial_value = SendOption::new(initial_value);
493 let (this, _) = spawn_derived!(
494 crate::spawn,
495 initial_value,
496 fun,
497 true,
498 true,
499 true,
500 None::<ArcTrigger>
501 );
502 this
503 }
504
505 #[doc(hidden)]
511 #[track_caller]
512 pub fn new_with_manual_dependencies<Fut, S>(
513 initial_value: Option<T>,
514 fun: impl Fn() -> Fut + Send + Sync + 'static,
515 source: &S,
516 ) -> Self
517 where
518 T: Send + Sync + 'static,
519 Fut: Future<Output = T> + Send + 'static,
520 S: Track,
521 {
522 let fun = move || {
523 let fut = fun();
524 let fut = ScopedFuture::new_untracked(async move {
525 SendOption::new(Some(fut.await))
526 });
527 #[cfg(feature = "sandboxed-arenas")]
528 let fut = Sandboxed::new(fut);
529 fut
530 };
531 let initial_value = SendOption::new(initial_value);
532 let (this, _) = spawn_derived!(
533 crate::spawn,
534 initial_value,
535 fun,
536 true,
537 false,
538 false,
539 Some(source)
540 );
541 this
542 }
543
544 #[track_caller]
550 pub fn new_unsync<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
551 where
552 T: 'static,
553 Fut: Future<Output = T> + 'static,
554 {
555 Self::new_unsync_with_initial(None, fun)
556 }
557
558 #[track_caller]
561 pub fn new_unsync_with_initial<Fut>(
562 initial_value: Option<T>,
563 fun: impl Fn() -> Fut + 'static,
564 ) -> Self
565 where
566 T: 'static,
567 Fut: Future<Output = T> + 'static,
568 {
569 let fun = move || {
570 let fut = fun();
571 let fut = async move { SendOption::new_local(Some(fut.await)) };
572 #[cfg(feature = "sandboxed-arenas")]
573 let fut = Sandboxed::new(fut);
574 fut
575 };
576 let initial_value = SendOption::new_local(initial_value);
577 let (this, _) = spawn_derived!(
578 crate::spawn_local,
579 initial_value,
580 fun,
581 true,
582 true,
583 true,
584 None::<ArcTrigger>
585 );
586 this
587 }
588
589 pub fn ready(&self) -> AsyncDerivedReadyFuture {
591 AsyncDerivedReadyFuture::new(
592 self.to_any_source(),
593 &self.loading,
594 &self.wakers,
595 )
596 }
597}
598
599impl<T: 'static> ArcAsyncDerived<T> {
600 #[doc(hidden)]
601 #[track_caller]
602 pub fn new_mock<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
603 where
604 T: 'static,
605 Fut: Future<Output = T> + 'static,
606 {
607 let initial = SendOption::new_local(None::<T>);
608 let fun = move || {
609 let fut = fun();
610 let fut = async move { SendOption::new_local(Some(fut.await)) };
611 #[cfg(feature = "sandboxed-arenas")]
612 let fut = Sandboxed::new(fut);
613 fut
614 };
615 let (this, _) = spawn_derived!(
616 crate::spawn_local,
617 initial,
618 fun,
619 false,
620 false,
621 true,
622 None::<ArcTrigger>
623 );
624 this
625 }
626}
627
628impl<T: 'static> ReadUntracked for ArcAsyncDerived<T> {
629 type Value =
630 ReadGuard<Option<T>, Mapped<AsyncPlain<SendOption<T>>, Option<T>>>;
631
632 fn try_read_untracked(&self) -> Option<Self::Value> {
633 if let Some(suspense_context) = use_context::<SuspenseContext>() {
634 let handle = suspense_context.task_id();
635 let ready = SpecialNonReactiveFuture::new(self.ready());
636 crate::spawn(async move {
637 ready.await;
638 drop(handle);
639 });
640 self.inner
641 .write()
642 .or_poisoned()
643 .suspenses
644 .push(suspense_context);
645 }
646 AsyncPlain::try_new(&self.value).map(|plain| {
647 ReadGuard::new(Mapped::new_with_guard(plain, |v| v.deref()))
648 })
649 }
650}
651
652impl<T: 'static> Notify for ArcAsyncDerived<T> {
653 fn notify(&self) {
654 Self::notify_subs(&self.wakers, &self.inner, &self.loading, None);
655 }
656}
657
658impl<T: 'static> Write for ArcAsyncDerived<T> {
659 type Value = Option<T>;
660
661 fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
662 let mut guard = self.inner.write().or_poisoned();
665 guard.version += 1;
666
667 drop(mem::take(&mut guard.pending_suspenses));
669
670 Some(MappedMut::new(
671 WriteGuard::new(self.clone(), self.value.blocking_write()),
672 |v| v.deref(),
673 |v| v.deref_mut(),
674 ))
675 }
676
677 fn try_write_untracked(
678 &self,
679 ) -> Option<impl DerefMut<Target = Self::Value>> {
680 let mut guard = self.inner.write().or_poisoned();
683 guard.version += 1;
684
685 drop(mem::take(&mut guard.pending_suspenses));
687
688 Some(MappedMut::new(
689 self.value.blocking_write(),
690 |v| v.deref(),
691 |v| v.deref_mut(),
692 ))
693 }
694}
695
696impl<T: 'static> IsDisposed for ArcAsyncDerived<T> {
697 #[inline(always)]
698 fn is_disposed(&self) -> bool {
699 false
700 }
701}
702
703impl<T: 'static> ToAnySource for ArcAsyncDerived<T> {
704 fn to_any_source(&self) -> AnySource {
705 AnySource(
706 Arc::as_ptr(&self.inner) as usize,
707 Arc::downgrade(&self.inner) as Weak<dyn Source + Send + Sync>,
708 #[cfg(any(debug_assertions, leptos_debuginfo))]
709 self.defined_at,
710 )
711 }
712}
713
714impl<T: 'static> ToAnySubscriber for ArcAsyncDerived<T> {
715 fn to_any_subscriber(&self) -> AnySubscriber {
716 AnySubscriber(
717 Arc::as_ptr(&self.inner) as usize,
718 Arc::downgrade(&self.inner) as Weak<dyn Subscriber + Send + Sync>,
719 )
720 }
721}
722
723impl<T> Source for ArcAsyncDerived<T> {
724 fn add_subscriber(&self, subscriber: AnySubscriber) {
725 self.inner.add_subscriber(subscriber);
726 }
727
728 fn remove_subscriber(&self, subscriber: &AnySubscriber) {
729 self.inner.remove_subscriber(subscriber);
730 }
731
732 fn clear_subscribers(&self) {
733 self.inner.clear_subscribers();
734 }
735}
736
737impl<T> ReactiveNode for ArcAsyncDerived<T> {
738 fn mark_dirty(&self) {
739 self.inner.mark_dirty();
740 }
741
742 fn mark_check(&self) {
743 self.inner.mark_check();
744 }
745
746 fn mark_subscribers_check(&self) {
747 self.inner.mark_subscribers_check();
748 }
749
750 fn update_if_necessary(&self) -> bool {
751 self.inner.update_if_necessary()
752 }
753}
754
755impl<T> Subscriber for ArcAsyncDerived<T> {
756 fn add_source(&self, source: AnySource) {
757 self.inner.add_source(source);
758 }
759
760 fn clear_sources(&self, subscriber: &AnySubscriber) {
761 self.inner.clear_sources(subscriber);
762 }
763}