use crate::{
channel::{channel, Receiver},
effect::{inner::EffectInner, EffectFunction},
graph::{
AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber,
WithObserver,
},
owner::{ArenaItem, LocalStorage, Owner, Storage, SyncStorage},
traits::Dispose,
};
use any_spawner::Executor;
use futures::StreamExt;
use or_poisoned::OrPoisoned;
use std::{
mem,
sync::{atomic::AtomicBool, Arc, RwLock},
};
#[derive(Debug, Clone, Copy)]
pub struct Effect<S> {
inner: Option<ArenaItem<StoredEffect, S>>,
}
type StoredEffect = Option<Arc<RwLock<EffectInner>>>;
impl<S> Dispose for Effect<S> {
fn dispose(self) {
if let Some(inner) = self.inner {
inner.dispose()
}
}
}
fn effect_base() -> (Receiver, Owner, Arc<RwLock<EffectInner>>) {
let (mut observer, rx) = channel();
observer.notify();
let owner = Owner::new();
let inner = Arc::new(RwLock::new(EffectInner {
dirty: true,
observer,
sources: SourceSet::new(),
}));
(rx, owner, inner)
}
thread_local! {
static EFFECT_SCOPE_ACTIVE: AtomicBool = const { AtomicBool::new(false) };
}
pub fn in_effect_scope() -> bool {
EFFECT_SCOPE_ACTIVE
.with(|scope| scope.load(std::sync::atomic::Ordering::Relaxed))
}
fn run_in_effect_scope<T>(fun: impl FnOnce() -> T) -> T {
let initial = EFFECT_SCOPE_ACTIVE
.with(|scope| scope.swap(true, std::sync::atomic::Ordering::Relaxed));
let result = fun();
EFFECT_SCOPE_ACTIVE.with(|scope| {
scope.store(initial, std::sync::atomic::Ordering::Relaxed)
});
result
}
impl<S> Effect<S>
where
S: Storage<StoredEffect>,
{
pub fn stop(self) {
if let Some(inner) = self
.inner
.and_then(|this| this.try_update_value(|inner| inner.take()))
{
drop(inner);
}
}
}
impl Effect<LocalStorage> {
pub fn new<T, M>(mut fun: impl EffectFunction<T, M> + 'static) -> Self
where
T: 'static,
{
let inner = cfg!(feature = "effects").then(|| {
let (mut rx, owner, inner) = effect_base();
let value = Arc::new(RwLock::new(None::<T>));
let mut first_run = true;
Executor::spawn_local({
let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber();
async move {
while rx.next().await.is_some() {
if !owner.paused()
&& (subscriber.with_observer(|| {
subscriber.update_if_necessary()
}) || first_run)
{
first_run = false;
subscriber.clear_sources(&subscriber);
let old_value =
mem::take(&mut *value.write().or_poisoned());
let new_value = owner.with_cleanup(|| {
subscriber.with_observer(|| {
run_in_effect_scope(|| fun.run(old_value))
})
});
*value.write().or_poisoned() = Some(new_value);
}
}
}
});
ArenaItem::new_with_storage(Some(inner))
});
Self { inner }
}
pub fn watch<D, T>(
mut dependency_fn: impl FnMut() -> D + 'static,
mut handler: impl FnMut(&D, Option<&D>, Option<T>) -> T + 'static,
immediate: bool,
) -> Self
where
D: 'static,
T: 'static,
{
let inner = cfg!(feature = "effects").then(|| {
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let dep_value = Arc::new(RwLock::new(None::<D>));
let watch_value = Arc::new(RwLock::new(None::<T>));
Executor::spawn_local({
let dep_value = Arc::clone(&dep_value);
let watch_value = Arc::clone(&watch_value);
let subscriber = inner.to_any_subscriber();
async move {
while rx.next().await.is_some() {
if !owner.paused()
&& (subscriber.with_observer(|| {
subscriber.update_if_necessary()
}) || first_run)
{
subscriber.clear_sources(&subscriber);
let old_dep_value = mem::take(
&mut *dep_value.write().or_poisoned(),
);
let new_dep_value = owner.with_cleanup(|| {
subscriber.with_observer(&mut dependency_fn)
});
let old_watch_value = mem::take(
&mut *watch_value.write().or_poisoned(),
);
if immediate || !first_run {
let new_watch_value = handler(
&new_dep_value,
old_dep_value.as_ref(),
old_watch_value,
);
*watch_value.write().or_poisoned() =
Some(new_watch_value);
}
*dep_value.write().or_poisoned() =
Some(new_dep_value);
first_run = false;
}
}
}
});
ArenaItem::new_with_storage(Some(inner))
});
Self { inner }
}
}
impl Effect<SyncStorage> {
pub fn new_sync<T, M>(
fun: impl EffectFunction<T, M> + Send + Sync + 'static,
) -> Self
where
T: Send + Sync + 'static,
{
if !cfg!(feature = "effects") {
return Self { inner: None };
}
Self::new_isomorphic(fun)
}
pub fn new_isomorphic<T, M>(
mut fun: impl EffectFunction<T, M> + Send + Sync + 'static,
) -> Self
where
T: Send + Sync + 'static,
{
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let value = Arc::new(RwLock::new(None::<T>));
let task = {
let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber();
async move {
while rx.next().await.is_some() {
if !owner.paused()
&& (subscriber
.with_observer(|| subscriber.update_if_necessary())
|| first_run)
{
first_run = false;
subscriber.clear_sources(&subscriber);
let old_value =
mem::take(&mut *value.write().or_poisoned());
let new_value = owner.with_cleanup(|| {
subscriber.with_observer(|| {
run_in_effect_scope(|| fun.run(old_value))
})
});
*value.write().or_poisoned() = Some(new_value);
}
}
}
};
crate::spawn(task);
Self {
inner: Some(ArenaItem::new_with_storage(Some(inner))),
}
}
pub fn watch_sync<D, T>(
mut dependency_fn: impl FnMut() -> D + Send + Sync + 'static,
mut handler: impl FnMut(&D, Option<&D>, Option<T>) -> T
+ Send
+ Sync
+ 'static,
immediate: bool,
) -> Self
where
D: Send + Sync + 'static,
T: Send + Sync + 'static,
{
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let dep_value = Arc::new(RwLock::new(None::<D>));
let watch_value = Arc::new(RwLock::new(None::<T>));
let inner = cfg!(feature = "effects").then(|| {
crate::spawn({
let dep_value = Arc::clone(&dep_value);
let watch_value = Arc::clone(&watch_value);
let subscriber = inner.to_any_subscriber();
async move {
while rx.next().await.is_some() {
if !owner.paused()
&& (subscriber.with_observer(|| {
subscriber.update_if_necessary()
}) || first_run)
{
subscriber.clear_sources(&subscriber);
let old_dep_value = mem::take(
&mut *dep_value.write().or_poisoned(),
);
let new_dep_value = owner.with_cleanup(|| {
subscriber.with_observer(&mut dependency_fn)
});
let old_watch_value = mem::take(
&mut *watch_value.write().or_poisoned(),
);
if immediate || !first_run {
let new_watch_value = handler(
&new_dep_value,
old_dep_value.as_ref(),
old_watch_value,
);
*watch_value.write().or_poisoned() =
Some(new_watch_value);
}
*dep_value.write().or_poisoned() =
Some(new_dep_value);
first_run = false;
}
}
}
});
ArenaItem::new_with_storage(Some(inner))
});
Self { inner }
}
}
impl<S> ToAnySubscriber for Effect<S>
where
S: Storage<StoredEffect>,
{
fn to_any_subscriber(&self) -> AnySubscriber {
self.inner
.and_then(|inner| {
inner
.try_with_value(|inner| {
inner.as_ref().map(|inner| inner.to_any_subscriber())
})
.flatten()
})
.expect("tried to set effect that has been stopped")
}
}
#[inline(always)]
#[track_caller]
#[deprecated = "This function is being removed to conform to Rust idioms. \
Please use `Effect::new()` instead."]
pub fn create_effect<T>(
fun: impl FnMut(Option<T>) -> T + 'static,
) -> Effect<LocalStorage>
where
T: 'static,
{
Effect::new(fun)
}
#[inline(always)]
#[track_caller]
#[deprecated = "This function is being removed to conform to Rust idioms. \
Please use `Effect::watch()` instead."]
pub fn watch<W, T>(
deps: impl Fn() -> W + 'static,
callback: impl Fn(&W, Option<&W>, Option<T>) -> T + Clone + 'static,
immediate: bool,
) -> impl Fn() + Clone
where
W: Clone + 'static,
T: 'static,
{
let watch = Effect::watch(deps, callback, immediate);
move || watch.stop()
}