use reactive_graph::{
computed::{
suspense::LocalResourceNotifier, ArcAsyncDerived, AsyncDerived,
AsyncDerivedFuture,
},
graph::{
AnySource, AnySubscriber, ReactiveNode, Source, Subscriber,
ToAnySource, ToAnySubscriber,
},
owner::use_context,
send_wrapper_ext::SendOption,
signal::{
guards::{AsyncPlain, Mapped, ReadGuard},
ArcRwSignal, RwSignal,
},
traits::{
DefinedAt, IsDisposed, Notify, ReadUntracked, Track, UntrackableGuard,
Update, With, Write,
},
};
use std::{
future::{pending, Future, IntoFuture},
ops::{Deref, DerefMut},
panic::Location,
};
pub struct ArcLocalResource<T> {
data: ArcAsyncDerived<T>,
refetch: ArcRwSignal<usize>,
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: &'static Location<'static>,
}
impl<T> Clone for ArcLocalResource<T> {
fn clone(&self) -> Self {
Self {
data: self.data.clone(),
refetch: self.refetch.clone(),
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: self.defined_at,
}
}
}
impl<T> Deref for ArcLocalResource<T> {
type Target = ArcAsyncDerived<T>;
fn deref(&self) -> &Self::Target {
&self.data
}
}
impl<T> ArcLocalResource<T> {
#[track_caller]
pub fn new<Fut>(fetcher: impl Fn() -> Fut + 'static) -> Self
where
T: 'static,
Fut: Future<Output = T> + 'static,
{
let fetcher = move || {
let fut = fetcher();
async move {
if cfg!(feature = "ssr") {
pending().await
} else {
any_spawner::Executor::tick().await;
fut.await
}
}
};
let refetch = ArcRwSignal::new(0);
Self {
data: if cfg!(feature = "ssr") {
ArcAsyncDerived::new_mock(fetcher)
} else {
let refetch = refetch.clone();
ArcAsyncDerived::new_unsync(move || {
refetch.track();
fetcher()
})
},
refetch,
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
}
}
pub fn refetch(&self) {
*self.refetch.write() += 1;
}
#[track_caller]
pub fn map<U>(&self, f: impl FnOnce(&T) -> U) -> Option<U>
where
T: 'static,
{
self.data.try_with(|n| n.as_ref().map(f))?
}
}
impl<T, E> ArcLocalResource<Result<T, E>>
where
T: 'static,
E: Clone + 'static,
{
#[track_caller]
pub fn and_then<U>(&self, f: impl FnOnce(&T) -> U) -> Option<Result<U, E>> {
self.map(|data| data.as_ref().map(f).map_err(|e| e.clone()))
}
}
impl<T> IntoFuture for ArcLocalResource<T>
where
T: Clone + 'static,
{
type Output = T;
type IntoFuture = AsyncDerivedFuture<T>;
fn into_future(self) -> Self::IntoFuture {
if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
notifier.notify();
} else if cfg!(feature = "ssr") {
panic!(
"Reading from a LocalResource outside Suspense in `ssr` mode \
will cause the response to hang, because LocalResources are \
always pending on the server."
);
}
self.data.into_future()
}
}
impl<T> DefinedAt for ArcLocalResource<T> {
fn defined_at(&self) -> Option<&'static Location<'static>> {
#[cfg(any(debug_assertions, leptos_debuginfo))]
{
Some(self.defined_at)
}
#[cfg(not(any(debug_assertions, leptos_debuginfo)))]
{
None
}
}
}
impl<T> Notify for ArcLocalResource<T>
where
T: 'static,
{
fn notify(&self) {
self.data.notify()
}
}
impl<T> Write for ArcLocalResource<T>
where
T: 'static,
{
type Value = Option<T>;
fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
self.data.try_write()
}
fn try_write_untracked(
&self,
) -> Option<impl DerefMut<Target = Self::Value>> {
self.data.try_write_untracked()
}
}
impl<T> ReadUntracked for ArcLocalResource<T>
where
T: 'static,
{
type Value =
ReadGuard<Option<T>, Mapped<AsyncPlain<SendOption<T>>, Option<T>>>;
fn try_read_untracked(&self) -> Option<Self::Value> {
if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
notifier.notify();
}
self.data.try_read_untracked()
}
}
impl<T: 'static> IsDisposed for ArcLocalResource<T> {
#[inline(always)]
fn is_disposed(&self) -> bool {
false
}
}
impl<T: 'static> ToAnySource for ArcLocalResource<T> {
fn to_any_source(&self) -> AnySource {
self.data.to_any_source()
}
}
impl<T: 'static> ToAnySubscriber for ArcLocalResource<T> {
fn to_any_subscriber(&self) -> AnySubscriber {
self.data.to_any_subscriber()
}
}
impl<T> Source for ArcLocalResource<T> {
fn add_subscriber(&self, subscriber: AnySubscriber) {
self.data.add_subscriber(subscriber)
}
fn remove_subscriber(&self, subscriber: &AnySubscriber) {
self.data.remove_subscriber(subscriber);
}
fn clear_subscribers(&self) {
self.data.clear_subscribers();
}
}
impl<T> ReactiveNode for ArcLocalResource<T> {
fn mark_dirty(&self) {
self.data.mark_dirty();
}
fn mark_check(&self) {
self.data.mark_check();
}
fn mark_subscribers_check(&self) {
self.data.mark_subscribers_check();
}
fn update_if_necessary(&self) -> bool {
self.data.update_if_necessary()
}
}
impl<T> Subscriber for ArcLocalResource<T> {
fn add_source(&self, source: AnySource) {
self.data.add_source(source);
}
fn clear_sources(&self, subscriber: &AnySubscriber) {
self.data.clear_sources(subscriber);
}
}
pub struct LocalResource<T> {
data: AsyncDerived<T>,
refetch: RwSignal<usize>,
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: &'static Location<'static>,
}
impl<T> Deref for LocalResource<T> {
type Target = AsyncDerived<T>;
fn deref(&self) -> &Self::Target {
&self.data
}
}
impl<T> Clone for LocalResource<T> {
fn clone(&self) -> Self {
*self
}
}
impl<T> Copy for LocalResource<T> {}
impl<T> LocalResource<T> {
#[track_caller]
pub fn new<Fut>(fetcher: impl Fn() -> Fut + 'static) -> Self
where
T: 'static,
Fut: Future<Output = T> + 'static,
{
let fetcher = move || {
let fut = fetcher();
async move {
if cfg!(feature = "ssr") {
pending().await
} else {
any_spawner::Executor::tick().await;
fut.await
}
}
};
let refetch = RwSignal::new(0);
Self {
data: if cfg!(feature = "ssr") {
AsyncDerived::new_mock(fetcher)
} else {
AsyncDerived::new_unsync_threadsafe_storage(move || {
refetch.track();
fetcher()
})
},
refetch,
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
}
}
pub fn refetch(&self) {
self.refetch.try_update(|n| *n += 1);
}
#[track_caller]
pub fn map<U>(&self, f: impl FnOnce(&T) -> U) -> Option<U>
where
T: 'static,
{
self.data.try_with(|n| n.as_ref().map(f))?
}
}
impl<T, E> LocalResource<Result<T, E>>
where
T: 'static,
E: Clone + 'static,
{
#[track_caller]
pub fn and_then<U>(&self, f: impl FnOnce(&T) -> U) -> Option<Result<U, E>> {
self.map(|data| data.as_ref().map(f).map_err(|e| e.clone()))
}
}
impl<T> IntoFuture for LocalResource<T>
where
T: Clone + 'static,
{
type Output = T;
type IntoFuture = AsyncDerivedFuture<T>;
fn into_future(self) -> Self::IntoFuture {
if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
notifier.notify();
} else if cfg!(feature = "ssr") {
panic!(
"Reading from a LocalResource outside Suspense in `ssr` mode \
will cause the response to hang, because LocalResources are \
always pending on the server."
);
}
self.data.into_future()
}
}
impl<T> DefinedAt for LocalResource<T> {
fn defined_at(&self) -> Option<&'static Location<'static>> {
#[cfg(any(debug_assertions, leptos_debuginfo))]
{
Some(self.defined_at)
}
#[cfg(not(any(debug_assertions, leptos_debuginfo)))]
{
None
}
}
}
impl<T> Notify for LocalResource<T>
where
T: 'static,
{
fn notify(&self) {
self.data.notify()
}
}
impl<T> Write for LocalResource<T>
where
T: 'static,
{
type Value = Option<T>;
fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
self.data.try_write()
}
fn try_write_untracked(
&self,
) -> Option<impl DerefMut<Target = Self::Value>> {
self.data.try_write_untracked()
}
}
impl<T> ReadUntracked for LocalResource<T>
where
T: 'static,
{
type Value =
ReadGuard<Option<T>, Mapped<AsyncPlain<SendOption<T>>, Option<T>>>;
fn try_read_untracked(&self) -> Option<Self::Value> {
if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
notifier.notify();
}
self.data.try_read_untracked()
}
}
impl<T: 'static> IsDisposed for LocalResource<T> {
fn is_disposed(&self) -> bool {
self.data.is_disposed()
}
}
impl<T: 'static> ToAnySource for LocalResource<T>
where
T: 'static,
{
fn to_any_source(&self) -> AnySource {
self.data.to_any_source()
}
}
impl<T: 'static> ToAnySubscriber for LocalResource<T>
where
T: 'static,
{
fn to_any_subscriber(&self) -> AnySubscriber {
self.data.to_any_subscriber()
}
}
impl<T> Source for LocalResource<T>
where
T: 'static,
{
fn add_subscriber(&self, subscriber: AnySubscriber) {
self.data.add_subscriber(subscriber)
}
fn remove_subscriber(&self, subscriber: &AnySubscriber) {
self.data.remove_subscriber(subscriber);
}
fn clear_subscribers(&self) {
self.data.clear_subscribers();
}
}
impl<T> ReactiveNode for LocalResource<T>
where
T: 'static,
{
fn mark_dirty(&self) {
self.data.mark_dirty();
}
fn mark_check(&self) {
self.data.mark_check();
}
fn mark_subscribers_check(&self) {
self.data.mark_subscribers_check();
}
fn update_if_necessary(&self) -> bool {
self.data.update_if_necessary()
}
}
impl<T> Subscriber for LocalResource<T>
where
T: 'static,
{
fn add_source(&self, source: AnySource) {
self.data.add_source(source);
}
fn clear_sources(&self, subscriber: &AnySubscriber) {
self.data.clear_sources(subscriber);
}
}
impl<T: 'static> From<ArcLocalResource<T>> for LocalResource<T> {
fn from(arc: ArcLocalResource<T>) -> Self {
Self {
data: arc.data.into(),
refetch: arc.refetch.into(),
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: arc.defined_at,
}
}
}
impl<T: 'static> From<LocalResource<T>> for ArcLocalResource<T> {
fn from(local: LocalResource<T>) -> Self {
Self {
data: local.data.into(),
refetch: local.refetch.into(),
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: local.defined_at,
}
}
}