use super::{ArcAsyncDerived, AsyncDerivedReadyFuture, BlockingLock};
use crate::{
graph::{
AnySource, AnySubscriber, ReactiveNode, Source, Subscriber,
ToAnySource, ToAnySubscriber,
},
owner::{ArenaItem, FromLocal, LocalStorage, Storage, SyncStorage},
send_wrapper_ext::SendOption,
signal::guards::{AsyncPlain, Mapped, MappedMut, ReadGuard, WriteGuard},
traits::{
DefinedAt, Dispose, IsDisposed, Notify, ReadUntracked,
UntrackableGuard, Write,
},
unwrap_signal,
};
use core::fmt::Debug;
use or_poisoned::OrPoisoned;
use std::{
future::Future,
mem,
ops::{Deref, DerefMut},
panic::Location,
};
pub struct AsyncDerived<T, S = SyncStorage> {
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: &'static Location<'static>,
pub(crate) inner: ArenaItem<ArcAsyncDerived<T>, S>,
}
impl<T, S> Dispose for AsyncDerived<T, S> {
fn dispose(self) {
self.inner.dispose()
}
}
impl<T, S> From<ArcAsyncDerived<T>> for AsyncDerived<T, S>
where
T: 'static,
S: Storage<ArcAsyncDerived<T>>,
{
fn from(value: ArcAsyncDerived<T>) -> Self {
#[cfg(any(debug_assertions, leptos_debuginfo))]
let defined_at = value.defined_at;
Self {
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at,
inner: ArenaItem::new_with_storage(value),
}
}
}
impl<T, S> From<AsyncDerived<T, S>> for ArcAsyncDerived<T>
where
T: 'static,
S: Storage<ArcAsyncDerived<T>>,
{
#[track_caller]
fn from(value: AsyncDerived<T, S>) -> Self {
value
.inner
.try_get_value()
.unwrap_or_else(unwrap_signal!(value))
}
}
impl<T> FromLocal<ArcAsyncDerived<T>> for AsyncDerived<T, LocalStorage>
where
T: 'static,
{
fn from_local(value: ArcAsyncDerived<T>) -> Self {
#[cfg(any(debug_assertions, leptos_debuginfo))]
let defined_at = value.defined_at;
Self {
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at,
inner: ArenaItem::new_with_storage(value),
}
}
}
impl<T> AsyncDerived<T>
where
T: 'static,
{
#[track_caller]
pub fn new<Fut>(fun: impl Fn() -> Fut + Send + Sync + 'static) -> Self
where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
Self {
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
inner: ArenaItem::new_with_storage(ArcAsyncDerived::new(fun)),
}
}
pub fn new_with_initial<Fut>(
initial_value: Option<T>,
fun: impl Fn() -> Fut + Send + Sync + 'static,
) -> Self
where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
Self {
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
inner: ArenaItem::new_with_storage(
ArcAsyncDerived::new_with_initial(initial_value, fun),
),
}
}
}
impl<T> AsyncDerived<T> {
#[doc(hidden)]
pub fn new_mock<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
where
T: 'static,
Fut: Future<Output = T> + 'static,
{
Self {
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
inner: ArenaItem::new_with_storage(ArcAsyncDerived::new_mock(fun)),
}
}
pub fn new_unsync_threadsafe_storage<Fut>(
fun: impl Fn() -> Fut + 'static,
) -> Self
where
T: 'static,
Fut: Future<Output = T> + 'static,
{
Self {
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
inner: ArenaItem::new_with_storage(ArcAsyncDerived::new_unsync(
fun,
)),
}
}
}
impl<T> AsyncDerived<T, LocalStorage>
where
T: 'static,
{
pub fn new_unsync<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
where
T: 'static,
Fut: Future<Output = T> + 'static,
{
Self {
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
inner: ArenaItem::new_with_storage(ArcAsyncDerived::new_unsync(
fun,
)),
}
}
pub fn new_unsync_with_initial<Fut>(
initial_value: Option<T>,
fun: impl Fn() -> Fut + 'static,
) -> Self
where
T: 'static,
Fut: Future<Output = T> + 'static,
{
Self {
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
inner: ArenaItem::new_with_storage(
ArcAsyncDerived::new_unsync_with_initial(initial_value, fun),
),
}
}
}
impl<T, S> AsyncDerived<T, S>
where
T: 'static,
S: Storage<ArcAsyncDerived<T>>,
{
#[track_caller]
pub fn ready(&self) -> AsyncDerivedReadyFuture {
let this = self
.inner
.try_get_value()
.unwrap_or_else(unwrap_signal!(self));
this.ready()
}
}
impl<T, S> Copy for AsyncDerived<T, S> {}
impl<T, S> Clone for AsyncDerived<T, S> {
fn clone(&self) -> Self {
*self
}
}
impl<T, S> Debug for AsyncDerived<T, S>
where
S: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncDerived")
.field("type", &std::any::type_name::<T>())
.field("store", &self.inner)
.finish()
}
}
impl<T, S> DefinedAt for AsyncDerived<T, S> {
#[inline(always)]
fn defined_at(&self) -> Option<&'static Location<'static>> {
#[cfg(any(debug_assertions, leptos_debuginfo))]
{
Some(self.defined_at)
}
#[cfg(not(any(debug_assertions, leptos_debuginfo)))]
{
None
}
}
}
impl<T, S> ReadUntracked for AsyncDerived<T, S>
where
T: 'static,
S: Storage<ArcAsyncDerived<T>>,
{
type Value =
ReadGuard<Option<T>, Mapped<AsyncPlain<SendOption<T>>, Option<T>>>;
fn try_read_untracked(&self) -> Option<Self::Value> {
self.inner
.try_get_value()
.map(|inner| inner.read_untracked())
}
}
impl<T, S> Notify for AsyncDerived<T, S>
where
T: 'static,
S: Storage<ArcAsyncDerived<T>>,
{
fn notify(&self) {
self.inner.try_with_value(|inner| inner.notify());
}
}
impl<T, S> Write for AsyncDerived<T, S>
where
T: 'static,
S: Storage<ArcAsyncDerived<T>>,
{
type Value = Option<T>;
fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
let guard = self
.inner
.try_with_value(|n| n.value.blocking_write_arc())?;
self.inner.try_with_value(|n| {
let mut guard = n.inner.write().or_poisoned();
guard.version += 1;
drop(mem::take(&mut guard.pending_suspenses));
});
Some(MappedMut::new(
WriteGuard::new(*self, guard),
|v| v.deref(),
|v| v.deref_mut(),
))
}
fn try_write_untracked(
&self,
) -> Option<impl DerefMut<Target = Self::Value>> {
self.inner.try_with_value(|n| {
let mut guard = n.inner.write().or_poisoned();
guard.version += 1;
drop(mem::take(&mut guard.pending_suspenses));
});
self.inner
.try_with_value(|n| n.value.blocking_write_arc())
.map(|inner| {
MappedMut::new(inner, |v| v.deref(), |v| v.deref_mut())
})
}
}
impl<T, S> IsDisposed for AsyncDerived<T, S>
where
T: 'static,
S: Storage<ArcAsyncDerived<T>>,
{
fn is_disposed(&self) -> bool {
self.inner.is_disposed()
}
}
impl<T, S> ToAnySource for AsyncDerived<T, S>
where
T: 'static,
S: Storage<ArcAsyncDerived<T>>,
{
fn to_any_source(&self) -> AnySource {
self.inner
.try_get_value()
.map(|inner| inner.to_any_source())
.unwrap_or_else(unwrap_signal!(self))
}
}
impl<T, S> ToAnySubscriber for AsyncDerived<T, S>
where
T: 'static,
S: Storage<ArcAsyncDerived<T>>,
{
fn to_any_subscriber(&self) -> AnySubscriber {
self.inner
.try_get_value()
.map(|inner| inner.to_any_subscriber())
.unwrap_or_else(unwrap_signal!(self))
}
}
impl<T, S> Source for AsyncDerived<T, S>
where
T: 'static,
S: Storage<ArcAsyncDerived<T>>,
{
fn add_subscriber(&self, subscriber: AnySubscriber) {
if let Some(inner) = self.inner.try_get_value() {
inner.add_subscriber(subscriber);
}
}
fn remove_subscriber(&self, subscriber: &AnySubscriber) {
if let Some(inner) = self.inner.try_get_value() {
inner.remove_subscriber(subscriber);
}
}
fn clear_subscribers(&self) {
if let Some(inner) = self.inner.try_get_value() {
inner.clear_subscribers();
}
}
}
impl<T, S> ReactiveNode for AsyncDerived<T, S>
where
T: 'static,
S: Storage<ArcAsyncDerived<T>>,
{
fn mark_dirty(&self) {
if let Some(inner) = self.inner.try_get_value() {
inner.mark_dirty();
}
}
fn mark_check(&self) {
if let Some(inner) = self.inner.try_get_value() {
inner.mark_check();
}
}
fn mark_subscribers_check(&self) {
if let Some(inner) = self.inner.try_get_value() {
inner.mark_subscribers_check();
}
}
fn update_if_necessary(&self) -> bool {
if let Some(inner) = self.inner.try_get_value() {
inner.update_if_necessary()
} else {
false
}
}
}
impl<T, S> Subscriber for AsyncDerived<T, S>
where
T: 'static,
S: Storage<ArcAsyncDerived<T>>,
{
fn add_source(&self, source: AnySource) {
if let Some(inner) = self.inner.try_get_value() {
inner.add_source(source);
}
}
fn clear_sources(&self, subscriber: &AnySubscriber) {
if let Some(inner) = self.inner.try_get_value() {
inner.clear_sources(subscriber);
}
}
}