use super::{
inner::{ArcAsyncDerivedInner, AsyncDerivedState},
AsyncDerivedReadyFuture, ScopedFuture,
};
#[cfg(feature = "sandboxed-arenas")]
use crate::owner::Sandboxed;
use crate::{
channel::channel,
computed::suspense::SuspenseContext,
diagnostics::SpecialNonReactiveFuture,
graph::{
AnySource, AnySubscriber, ReactiveNode, Source, SourceSet, Subscriber,
SubscriberSet, ToAnySource, ToAnySubscriber, WithObserver,
},
owner::{use_context, Owner},
send_wrapper_ext::SendOption,
signal::{
guards::{AsyncPlain, Mapped, MappedMut, ReadGuard, WriteGuard},
ArcTrigger,
},
traits::{
DefinedAt, IsDisposed, Notify, ReadUntracked, Track, UntrackableGuard,
Write,
},
transition::AsyncTransition,
};
use async_lock::RwLock as AsyncRwLock;
use core::fmt::Debug;
use futures::{channel::oneshot, FutureExt, StreamExt};
use or_poisoned::OrPoisoned;
use std::{
future::Future,
mem,
ops::{Deref, DerefMut},
panic::Location,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock, Weak,
},
task::Waker,
};
pub struct ArcAsyncDerived<T> {
#[cfg(any(debug_assertions, leptos_debuginfo))]
pub(crate) defined_at: &'static Location<'static>,
pub(crate) value: Arc<AsyncRwLock<SendOption<T>>>,
pub(crate) wakers: Arc<RwLock<Vec<Waker>>>,
pub(crate) inner: Arc<RwLock<ArcAsyncDerivedInner>>,
pub(crate) loading: Arc<AtomicBool>,
}
#[allow(dead_code)]
pub(crate) trait BlockingLock<T> {
fn blocking_read_arc(self: &Arc<Self>)
-> async_lock::RwLockReadGuardArc<T>;
fn blocking_write_arc(
self: &Arc<Self>,
) -> async_lock::RwLockWriteGuardArc<T>;
fn blocking_read(&self) -> async_lock::RwLockReadGuard<'_, T>;
fn blocking_write(&self) -> async_lock::RwLockWriteGuard<'_, T>;
}
impl<T> BlockingLock<T> for AsyncRwLock<T> {
fn blocking_read_arc(
self: &Arc<Self>,
) -> async_lock::RwLockReadGuardArc<T> {
#[cfg(not(target_family = "wasm"))]
{
self.read_arc_blocking()
}
#[cfg(target_family = "wasm")]
{
self.read_arc().now_or_never().unwrap()
}
}
fn blocking_write_arc(
self: &Arc<Self>,
) -> async_lock::RwLockWriteGuardArc<T> {
#[cfg(not(target_family = "wasm"))]
{
self.write_arc_blocking()
}
#[cfg(target_family = "wasm")]
{
self.write_arc().now_or_never().unwrap()
}
}
fn blocking_read(&self) -> async_lock::RwLockReadGuard<'_, T> {
#[cfg(not(target_family = "wasm"))]
{
self.read_blocking()
}
#[cfg(target_family = "wasm")]
{
self.read().now_or_never().unwrap()
}
}
fn blocking_write(&self) -> async_lock::RwLockWriteGuard<'_, T> {
#[cfg(not(target_family = "wasm"))]
{
self.write_blocking()
}
#[cfg(target_family = "wasm")]
{
self.write().now_or_never().unwrap()
}
}
}
impl<T> Clone for ArcAsyncDerived<T> {
fn clone(&self) -> Self {
Self {
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: self.defined_at,
value: Arc::clone(&self.value),
wakers: Arc::clone(&self.wakers),
inner: Arc::clone(&self.inner),
loading: Arc::clone(&self.loading),
}
}
}
impl<T> Debug for ArcAsyncDerived<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut f = f.debug_struct("ArcAsyncDerived");
#[cfg(any(debug_assertions, leptos_debuginfo))]
f.field("defined_at", &self.defined_at);
f.finish_non_exhaustive()
}
}
impl<T> DefinedAt for ArcAsyncDerived<T> {
#[inline(always)]
fn defined_at(&self) -> Option<&'static Location<'static>> {
#[cfg(any(debug_assertions, leptos_debuginfo))]
{
Some(self.defined_at)
}
#[cfg(not(any(debug_assertions, leptos_debuginfo)))]
{
None
}
}
}
macro_rules! spawn_derived {
($spawner:expr, $initial:ident, $fun:ident, $should_spawn:literal, $force_spawn:literal, $should_track:literal, $source:expr) => {{
let (notifier, mut rx) = channel();
let is_ready = $initial.is_some() && !$force_spawn;
let owner = Owner::new();
let inner = Arc::new(RwLock::new(ArcAsyncDerivedInner {
owner: owner.clone(),
notifier,
sources: SourceSet::new(),
subscribers: SubscriberSet::new(),
state: AsyncDerivedState::Clean,
version: 0,
suspenses: Vec::new(),
pending_suspenses: Vec::new()
}));
let value = Arc::new(AsyncRwLock::new($initial));
let wakers = Arc::new(RwLock::new(Vec::new()));
let this = ArcAsyncDerived {
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
value: Arc::clone(&value),
wakers,
inner: Arc::clone(&inner),
loading: Arc::new(AtomicBool::new(!is_ready)),
};
let any_subscriber = this.to_any_subscriber();
let initial_fut = if $should_track {
owner.with_cleanup(|| {
any_subscriber
.with_observer(|| ScopedFuture::new($fun()))
})
} else {
owner.with_cleanup(|| {
any_subscriber
.with_observer_untracked(|| ScopedFuture::new($fun()))
})
};
#[cfg(feature = "sandboxed-arenas")]
let initial_fut = Sandboxed::new(initial_fut);
let mut initial_fut = Box::pin(initial_fut);
let (was_ready, mut initial_fut) = {
if is_ready {
(true, None)
} else {
let initial = initial_fut.as_mut().now_or_never();
match initial {
None => {
inner.write().or_poisoned().notifier.notify();
(false, Some(initial_fut))
}
Some(orig_value) => {
let mut guard = this.inner.write().or_poisoned();
guard.state = AsyncDerivedState::Clean;
*value.blocking_write() = orig_value;
this.loading.store(false, Ordering::Relaxed);
(true, None)
}
}
}
};
let mut first_run = {
let (ready_tx, ready_rx) = oneshot::channel();
if !was_ready {
AsyncTransition::register(ready_rx);
}
Some(ready_tx)
};
if was_ready {
first_run.take();
}
if let Some(source) = $source {
any_subscriber.with_observer(|| source.track());
}
if $should_spawn {
$spawner({
let value = Arc::downgrade(&this.value);
let inner = Arc::downgrade(&this.inner);
let wakers = Arc::downgrade(&this.wakers);
let loading = Arc::downgrade(&this.loading);
let fut = async move {
let already_dirty = inner.upgrade()
.as_ref()
.and_then(|inner| inner.read().ok())
.map(|inner| inner.state == AsyncDerivedState::Dirty)
.unwrap_or(false);
if already_dirty {
initial_fut.take();
}
while rx.next().await.is_some() {
let update_if_necessary = !owner.paused() && if $should_track {
any_subscriber
.with_observer(|| any_subscriber.update_if_necessary())
} else {
any_subscriber
.with_observer_untracked(|| any_subscriber.update_if_necessary())
};
if update_if_necessary || first_run.is_some() {
match (value.upgrade(), inner.upgrade(), wakers.upgrade(), loading.upgrade()) {
(Some(value), Some(inner), Some(wakers), Some(loading)) => {
let owner = inner.read().or_poisoned().owner.clone();
let fut = initial_fut.take().unwrap_or_else(|| {
let fut = if $should_track {
owner.with_cleanup(|| {
any_subscriber
.with_observer(|| ScopedFuture::new($fun()))
})
} else {
owner.with_cleanup(|| {
any_subscriber
.with_observer_untracked(|| ScopedFuture::new($fun()))
})
};
#[cfg(feature = "sandboxed-arenas")]
let fut = Sandboxed::new(fut);
Box::pin(fut)
});
let ready_tx = first_run.take().unwrap_or_else(|| {
let (ready_tx, ready_rx) = oneshot::channel();
if !was_ready {
AsyncTransition::register(ready_rx);
}
ready_tx
});
loading.store(true, Ordering::Relaxed);
let this_version = {
let mut guard = inner.write().or_poisoned();
guard.version += 1;
let version = guard.version;
let suspense_ids = mem::take(&mut guard.suspenses)
.into_iter()
.map(|sc| sc.task_id())
.collect::<Vec<_>>();
guard.pending_suspenses.extend(suspense_ids);
version
};
let new_value = fut.await;
let latest_version = {
let mut guard = inner.write().or_poisoned();
drop(mem::take(&mut guard.pending_suspenses));
guard.version
};
if latest_version == this_version {
Self::set_inner_value(new_value, value, wakers, inner, loading, Some(ready_tx)).await;
}
}
_ => break,
}
}
}
};
#[cfg(feature = "sandboxed-arenas")]
let fut = Sandboxed::new(fut);
fut
});
}
(this, is_ready)
}};
}
impl<T: 'static> ArcAsyncDerived<T> {
async fn set_inner_value(
new_value: SendOption<T>,
value: Arc<AsyncRwLock<SendOption<T>>>,
wakers: Arc<RwLock<Vec<Waker>>>,
inner: Arc<RwLock<ArcAsyncDerivedInner>>,
loading: Arc<AtomicBool>,
ready_tx: Option<oneshot::Sender<()>>,
) {
*value.write().await.deref_mut() = new_value;
Self::notify_subs(&wakers, &inner, &loading, ready_tx);
}
fn notify_subs(
wakers: &Arc<RwLock<Vec<Waker>>>,
inner: &Arc<RwLock<ArcAsyncDerivedInner>>,
loading: &Arc<AtomicBool>,
ready_tx: Option<oneshot::Sender<()>>,
) {
loading.store(false, Ordering::Relaxed);
let prev_state = mem::replace(
&mut inner.write().or_poisoned().state,
AsyncDerivedState::Notifying,
);
if let Some(ready_tx) = ready_tx {
_ = ready_tx.send(());
}
for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
sub.mark_dirty();
}
for waker in mem::take(&mut *wakers.write().or_poisoned()) {
waker.wake();
}
inner.write().or_poisoned().state = prev_state;
}
}
impl<T: 'static> ArcAsyncDerived<T> {
#[track_caller]
pub fn new<Fut>(fun: impl Fn() -> Fut + Send + Sync + 'static) -> Self
where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
Self::new_with_initial(None, fun)
}
#[track_caller]
pub fn new_with_initial<Fut>(
initial_value: Option<T>,
fun: impl Fn() -> Fut + Send + Sync + 'static,
) -> Self
where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
let fun = move || {
let fut = fun();
let fut = async move { SendOption::new(Some(fut.await)) };
#[cfg(feature = "sandboxed-arenas")]
let fut = Sandboxed::new(fut);
fut
};
let initial_value = SendOption::new(initial_value);
let (this, _) = spawn_derived!(
crate::spawn,
initial_value,
fun,
true,
true,
true,
None::<ArcTrigger>
);
this
}
#[doc(hidden)]
#[track_caller]
pub fn new_with_manual_dependencies<Fut, S>(
initial_value: Option<T>,
fun: impl Fn() -> Fut + Send + Sync + 'static,
source: &S,
) -> Self
where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
S: Track,
{
let fun = move || {
let fut = fun();
let fut = ScopedFuture::new_untracked(async move {
SendOption::new(Some(fut.await))
});
#[cfg(feature = "sandboxed-arenas")]
let fut = Sandboxed::new(fut);
fut
};
let initial_value = SendOption::new(initial_value);
let (this, _) = spawn_derived!(
crate::spawn,
initial_value,
fun,
true,
false,
false,
Some(source)
);
this
}
#[track_caller]
pub fn new_unsync<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
where
T: 'static,
Fut: Future<Output = T> + 'static,
{
Self::new_unsync_with_initial(None, fun)
}
#[track_caller]
pub fn new_unsync_with_initial<Fut>(
initial_value: Option<T>,
fun: impl Fn() -> Fut + 'static,
) -> Self
where
T: 'static,
Fut: Future<Output = T> + 'static,
{
let fun = move || {
let fut = fun();
let fut = async move { SendOption::new_local(Some(fut.await)) };
#[cfg(feature = "sandboxed-arenas")]
let fut = Sandboxed::new(fut);
fut
};
let initial_value = SendOption::new_local(initial_value);
let (this, _) = spawn_derived!(
crate::spawn_local,
initial_value,
fun,
true,
true,
true,
None::<ArcTrigger>
);
this
}
pub fn ready(&self) -> AsyncDerivedReadyFuture {
AsyncDerivedReadyFuture::new(
self.to_any_source(),
&self.loading,
&self.wakers,
)
}
}
impl<T: 'static> ArcAsyncDerived<T> {
#[doc(hidden)]
#[track_caller]
pub fn new_mock<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
where
T: 'static,
Fut: Future<Output = T> + 'static,
{
let initial = SendOption::new_local(None::<T>);
let fun = move || {
let fut = fun();
let fut = async move { SendOption::new_local(Some(fut.await)) };
#[cfg(feature = "sandboxed-arenas")]
let fut = Sandboxed::new(fut);
fut
};
let (this, _) = spawn_derived!(
crate::spawn_local,
initial,
fun,
false,
false,
true,
None::<ArcTrigger>
);
this
}
}
impl<T: 'static> ReadUntracked for ArcAsyncDerived<T> {
type Value =
ReadGuard<Option<T>, Mapped<AsyncPlain<SendOption<T>>, Option<T>>>;
fn try_read_untracked(&self) -> Option<Self::Value> {
if let Some(suspense_context) = use_context::<SuspenseContext>() {
let handle = suspense_context.task_id();
let ready = SpecialNonReactiveFuture::new(self.ready());
crate::spawn(async move {
ready.await;
drop(handle);
});
self.inner
.write()
.or_poisoned()
.suspenses
.push(suspense_context);
}
AsyncPlain::try_new(&self.value).map(|plain| {
ReadGuard::new(Mapped::new_with_guard(plain, |v| v.deref()))
})
}
}
impl<T: 'static> Notify for ArcAsyncDerived<T> {
fn notify(&self) {
Self::notify_subs(&self.wakers, &self.inner, &self.loading, None);
}
}
impl<T: 'static> Write for ArcAsyncDerived<T> {
type Value = Option<T>;
fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
let mut guard = self.inner.write().or_poisoned();
guard.version += 1;
drop(mem::take(&mut guard.pending_suspenses));
Some(MappedMut::new(
WriteGuard::new(self.clone(), self.value.blocking_write()),
|v| v.deref(),
|v| v.deref_mut(),
))
}
fn try_write_untracked(
&self,
) -> Option<impl DerefMut<Target = Self::Value>> {
let mut guard = self.inner.write().or_poisoned();
guard.version += 1;
drop(mem::take(&mut guard.pending_suspenses));
Some(MappedMut::new(
self.value.blocking_write(),
|v| v.deref(),
|v| v.deref_mut(),
))
}
}
impl<T: 'static> IsDisposed for ArcAsyncDerived<T> {
#[inline(always)]
fn is_disposed(&self) -> bool {
false
}
}
impl<T: 'static> ToAnySource for ArcAsyncDerived<T> {
fn to_any_source(&self) -> AnySource {
AnySource(
Arc::as_ptr(&self.inner) as usize,
Arc::downgrade(&self.inner) as Weak<dyn Source + Send + Sync>,
#[cfg(any(debug_assertions, leptos_debuginfo))]
self.defined_at,
)
}
}
impl<T: 'static> ToAnySubscriber for ArcAsyncDerived<T> {
fn to_any_subscriber(&self) -> AnySubscriber {
AnySubscriber(
Arc::as_ptr(&self.inner) as usize,
Arc::downgrade(&self.inner) as Weak<dyn Subscriber + Send + Sync>,
)
}
}
impl<T> Source for ArcAsyncDerived<T> {
fn add_subscriber(&self, subscriber: AnySubscriber) {
self.inner.add_subscriber(subscriber);
}
fn remove_subscriber(&self, subscriber: &AnySubscriber) {
self.inner.remove_subscriber(subscriber);
}
fn clear_subscribers(&self) {
self.inner.clear_subscribers();
}
}
impl<T> ReactiveNode for ArcAsyncDerived<T> {
fn mark_dirty(&self) {
self.inner.mark_dirty();
}
fn mark_check(&self) {
self.inner.mark_check();
}
fn mark_subscribers_check(&self) {
self.inner.mark_subscribers_check();
}
fn update_if_necessary(&self) -> bool {
self.inner.update_if_necessary()
}
}
impl<T> Subscriber for ArcAsyncDerived<T> {
fn add_source(&self, source: AnySource) {
self.inner.add_source(source);
}
fn clear_sources(&self, subscriber: &AnySubscriber) {
self.inner.clear_sources(subscriber);
}
}