use crate::{
graph::{AnySubscriber, ReactiveNode, ToAnySubscriber},
owner::on_cleanup,
traits::{DefinedAt, Dispose},
};
use or_poisoned::OrPoisoned;
use std::{
panic::Location,
sync::{Arc, Mutex, RwLock},
};
#[derive(Debug, Clone)]
pub struct ImmediateEffect {
inner: StoredEffect,
}
type StoredEffect = Option<Arc<RwLock<inner::EffectInner>>>;
impl Dispose for ImmediateEffect {
fn dispose(self) {}
}
impl ImmediateEffect {
#[track_caller]
#[must_use]
pub fn new(fun: impl Fn() + Send + Sync + 'static) -> Self {
if !cfg!(feature = "effects") {
return Self { inner: None };
}
let inner = inner::EffectInner::new(fun);
inner.update_if_necessary();
Self { inner: Some(inner) }
}
#[track_caller]
#[must_use]
pub fn new_mut(fun: impl FnMut() + Send + Sync + 'static) -> Self {
const MSG: &str = "The effect recursed or its function panicked.";
let fun = Mutex::new(fun);
Self::new(move || fun.try_lock().expect(MSG)())
}
#[track_caller]
pub fn new_scoped(fun: impl Fn() + Send + Sync + 'static) {
let effect = Self::new(fun);
on_cleanup(move || effect.dispose());
}
#[track_caller]
#[must_use]
pub fn new_isomorphic(fun: impl Fn() + Send + Sync + 'static) -> Self {
let inner = inner::EffectInner::new(fun);
inner.update_if_necessary();
Self { inner: Some(inner) }
}
}
impl ToAnySubscriber for ImmediateEffect {
fn to_any_subscriber(&self) -> AnySubscriber {
const MSG: &str = "tried to set effect that has been stopped";
self.inner.as_ref().expect(MSG).to_any_subscriber()
}
}
impl DefinedAt for ImmediateEffect {
fn defined_at(&self) -> Option<&'static Location<'static>> {
self.inner.as_ref()?.read().or_poisoned().defined_at()
}
}
mod inner {
use crate::{
graph::{
AnySource, AnySubscriber, ReactiveNode, ReactiveNodeState,
SourceSet, Subscriber, ToAnySubscriber, WithObserver,
},
log_warning,
owner::Owner,
traits::DefinedAt,
};
use or_poisoned::OrPoisoned;
use std::{
panic::Location,
sync::{Arc, RwLock, Weak},
thread::{self, ThreadId},
};
pub(super) struct EffectInner {
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: &'static Location<'static>,
owner: Owner,
state: ReactiveNodeState,
run_count_start: usize,
run_done_count: usize,
run_done_max: usize,
last_run_thread_id: ThreadId,
fun: Arc<dyn Fn() + Send + Sync>,
sources: SourceSet,
any_subscriber: AnySubscriber,
}
impl EffectInner {
#[track_caller]
pub fn new(
fun: impl Fn() + Send + Sync + 'static,
) -> Arc<RwLock<EffectInner>> {
let owner = Owner::new();
Arc::new_cyclic(|weak| {
let any_subscriber = AnySubscriber(
weak.as_ptr() as usize,
Weak::clone(weak) as Weak<dyn Subscriber + Send + Sync>,
);
RwLock::new(EffectInner {
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
owner,
state: ReactiveNodeState::Dirty,
run_count_start: 0,
run_done_count: 0,
run_done_max: 0,
last_run_thread_id: thread::current().id(),
fun: Arc::new(fun),
sources: SourceSet::new(),
any_subscriber,
})
})
}
}
impl ToAnySubscriber for Arc<RwLock<EffectInner>> {
fn to_any_subscriber(&self) -> AnySubscriber {
AnySubscriber(
Arc::as_ptr(self) as usize,
Arc::downgrade(self) as Weak<dyn Subscriber + Send + Sync>,
)
}
}
impl ReactiveNode for RwLock<EffectInner> {
fn mark_subscribers_check(&self) {}
fn update_if_necessary(&self) -> bool {
let state = {
let guard = self.read().or_poisoned();
if guard.owner.paused() {
return false;
}
guard.state
};
let needs_update = match state {
ReactiveNodeState::Clean => false,
ReactiveNodeState::Check => {
let sources = self.read().or_poisoned().sources.clone();
sources
.into_iter()
.any(|source| source.update_if_necessary())
}
ReactiveNodeState::Dirty => true,
};
if needs_update {
let mut guard = self.write().or_poisoned();
let owner = guard.owner.clone();
let any_subscriber = guard.any_subscriber.clone();
let fun = guard.fun.clone();
guard.run_count_start += 1;
let recursion_count = guard.run_count_start;
guard.sources.clear_sources(&any_subscriber);
guard.last_run_thread_id = thread::current().id();
if recursion_count > 2 {
warn_excessive_recursion(&guard);
}
drop(guard);
owner.with_cleanup(|| any_subscriber.with_observer(|| fun()));
let mut guard = self.write().or_poisoned();
guard.run_done_count += 1;
guard.run_done_max =
Ord::max(recursion_count, guard.run_done_max);
if guard.run_count_start == guard.run_done_count {
guard.run_count_start = 0;
guard.run_done_count = 0;
guard.run_done_max = 0;
}
guard.state = ReactiveNodeState::Clean;
}
needs_update
}
fn mark_check(&self) {
self.write().or_poisoned().state = ReactiveNodeState::Check;
self.update_if_necessary();
}
fn mark_dirty(&self) {
self.write().or_poisoned().state = ReactiveNodeState::Dirty;
self.update_if_necessary();
}
}
impl Subscriber for RwLock<EffectInner> {
fn add_source(&self, source: AnySource) {
let mut guard = self.write().or_poisoned();
if guard.run_done_max < guard.run_count_start
&& guard.last_run_thread_id == thread::current().id()
{
guard.sources.insert(source);
}
}
fn clear_sources(&self, subscriber: &AnySubscriber) {
self.write().or_poisoned().sources.clear_sources(subscriber);
}
}
impl DefinedAt for EffectInner {
fn defined_at(&self) -> Option<&'static Location<'static>> {
#[cfg(any(debug_assertions, leptos_debuginfo))]
{
Some(self.defined_at)
}
#[cfg(not(any(debug_assertions, leptos_debuginfo)))]
{
None
}
}
}
impl std::fmt::Debug for EffectInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EffectInner")
.field("owner", &self.owner)
.field("state", &self.state)
.field("sources", &self.sources)
.field("any_subscriber", &self.any_subscriber)
.finish()
}
}
fn warn_excessive_recursion(effect: &EffectInner) {
const MSG: &str = "ImmediateEffect recursed more than once.";
match effect.defined_at() {
Some(defined_at) => {
log_warning(format_args!("{MSG} Defined at: {defined_at}"));
}
None => {
log_warning(format_args!("{MSG}"));
}
}
}
}