use crate::{
channel::channel,
effect::inner::EffectInner,
graph::{
AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber,
WithObserver,
},
owner::Owner,
};
use futures::StreamExt;
use or_poisoned::OrPoisoned;
#[cfg(feature = "subsecond")]
use std::sync::Mutex;
use std::{
fmt::Debug,
future::{Future, IntoFuture},
mem,
pin::Pin,
sync::{Arc, RwLock, Weak},
};
#[must_use = "A RenderEffect will be canceled when it is dropped. Creating a \
RenderEffect that is not stored in some other data structure or \
leaked will drop it immediately, and it will not react to \
changes in signals it reads."]
pub struct RenderEffect<T>
where
T: 'static,
{
value: Arc<RwLock<Option<T>>>,
inner: Arc<RwLock<EffectInner>>,
}
impl<T> Debug for RenderEffect<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RenderEffect")
.field("inner", &Arc::as_ptr(&self.inner))
.finish()
}
}
#[cfg(feature = "subsecond")]
type CurrentHotPtr = Box<dyn Fn() -> Option<subsecond::HotFnPtr> + Send + Sync>;
impl<T> RenderEffect<T>
where
T: 'static,
{
pub fn new(fun: impl FnMut(Option<T>) -> T + 'static) -> Self {
#[cfg(feature = "subsecond")]
let (hot_fn_ptr, fun) = {
let fun = Arc::new(Mutex::new(subsecond::HotFn::current(fun)));
(
{
let fun = Arc::downgrade(&fun);
let wrapped = send_wrapper::SendWrapper::new(move || {
fun.upgrade()
.map(|n| n.lock().or_poisoned().ptr_address())
});
#[allow(clippy::redundant_closure)]
Box::new(move || wrapped())
},
move |prev| fun.lock().or_poisoned().call((prev,)),
)
};
Self::new_with_value_erased(
Box::new(fun),
None,
#[cfg(feature = "subsecond")]
hot_fn_ptr,
)
}
pub fn new_with_value(
fun: impl FnMut(Option<T>) -> T + 'static,
initial_value: Option<T>,
) -> Self {
#[cfg(feature = "subsecond")]
let (hot_fn_ptr, fun) = {
let fun = Arc::new(Mutex::new(subsecond::HotFn::current(fun)));
(
{
let fun = Arc::downgrade(&fun);
let wrapped = send_wrapper::SendWrapper::new(move || {
fun.upgrade()
.map(|n| n.lock().or_poisoned().ptr_address())
});
#[allow(clippy::redundant_closure)]
Box::new(move || wrapped())
},
move |prev| fun.lock().or_poisoned().call((prev,)),
)
};
Self::new_with_value_erased(
Box::new(fun),
initial_value,
#[cfg(feature = "subsecond")]
hot_fn_ptr,
)
}
pub async fn new_with_async_value(
fun: impl FnMut(Option<T>) -> T + 'static,
value: impl IntoFuture<Output = T> + 'static,
) -> Self {
#[cfg(feature = "subsecond")]
let mut fun = subsecond::HotFn::current(fun);
#[cfg(feature = "subsecond")]
let fun = move |prev| fun.call((prev,));
Self::new_with_async_value_erased(
Box::new(fun),
Box::pin(value.into_future()),
)
.await
}
fn new_with_value_erased(
#[allow(unused_mut)] mut fun: Box<dyn FnMut(Option<T>) -> T + 'static>,
initial_value: Option<T>,
#[allow(unused)]
#[cfg(feature = "subsecond")]
hot_fn_ptr: CurrentHotPtr,
) -> Self {
fn prep() -> (Owner, Arc<RwLock<EffectInner>>, crate::channel::Receiver)
{
let (observer, rx) = channel();
let owner = Owner::new();
let inner = Arc::new(RwLock::new(EffectInner {
dirty: false,
observer,
sources: SourceSet::new(),
}));
(owner, inner, rx)
}
let (owner, inner, mut rx) = prep();
let value = Arc::new(RwLock::new(None::<T>));
#[cfg(not(feature = "effects"))]
{
let _ = initial_value;
let _ = owner;
let _ = &mut rx;
let _ = fun;
}
#[cfg(feature = "effects")]
{
let subscriber = inner.to_any_subscriber();
#[cfg(all(feature = "subsecond", debug_assertions))]
let mut fun = {
use crate::graph::ReactiveNode;
use rustc_hash::FxHashMap;
use std::sync::{Arc, LazyLock, Mutex};
use subsecond::HotFnPtr;
static HOT_RELOAD_SUBSCRIBERS: LazyLock<
Mutex<FxHashMap<AnySubscriber, (HotFnPtr, CurrentHotPtr)>>,
> = LazyLock::new(|| {
subsecond::register_handler(Arc::new(|| {
HOT_RELOAD_SUBSCRIBERS.lock().or_poisoned().retain(
|subscriber, (prev_ptr, hot_fn_ptr)| {
match hot_fn_ptr() {
None => false,
Some(curr_hot_ptr) => {
if curr_hot_ptr != *prev_ptr {
crate::log_warning(format_args!(
"{prev_ptr:?} <> \
{curr_hot_ptr:?}",
));
*prev_ptr = curr_hot_ptr;
subscriber.mark_dirty();
}
true
}
}
},
);
}));
Default::default()
});
let mut fun = subsecond::HotFn::current(fun);
let initial_ptr = hot_fn_ptr().unwrap();
HOT_RELOAD_SUBSCRIBERS
.lock()
.or_poisoned()
.insert(subscriber.clone(), (initial_ptr, hot_fn_ptr));
move |prev| fun.call((prev,))
};
*value.write().or_poisoned() = Some(
owner.with(|| subscriber.with_observer(|| fun(initial_value))),
);
any_spawner::Executor::spawn_local({
let value = Arc::clone(&value);
async move {
while rx.next().await.is_some() {
if !owner.paused()
&& subscriber.with_observer(|| {
subscriber.update_if_necessary()
})
{
subscriber.clear_sources(&subscriber);
let old_value =
mem::take(&mut *value.write().or_poisoned());
let new_value = owner.with_cleanup(|| {
subscriber.with_observer(|| fun(old_value))
});
*value.write().or_poisoned() = Some(new_value);
}
}
}
});
}
RenderEffect { value, inner }
}
async fn new_with_async_value_erased(
mut fun: Box<dyn FnMut(Option<T>) -> T + 'static>,
initial_value: Pin<Box<dyn Future<Output = T>>>,
) -> Self {
fn prep() -> (Owner, Arc<RwLock<EffectInner>>, crate::channel::Receiver)
{
let (observer, rx) = channel();
let owner = Owner::new();
let inner = Arc::new(RwLock::new(EffectInner {
dirty: false,
observer,
sources: SourceSet::new(),
}));
(owner, inner, rx)
}
let (owner, inner, mut rx) = prep();
let value = Arc::new(RwLock::new(None::<T>));
#[cfg(not(feature = "effects"))]
{
drop(initial_value);
let _ = owner;
let _ = &mut rx;
let _ = &mut fun;
}
#[cfg(feature = "effects")]
{
use crate::computed::ScopedFuture;
let subscriber = inner.to_any_subscriber();
let initial = subscriber
.with_observer(|| ScopedFuture::new(initial_value))
.await;
*value.write().or_poisoned() = Some(initial);
any_spawner::Executor::spawn_local({
let value = Arc::clone(&value);
async move {
while rx.next().await.is_some() {
if !owner.paused()
&& subscriber.with_observer(|| {
subscriber.update_if_necessary()
})
{
subscriber.clear_sources(&subscriber);
let old_value =
mem::take(&mut *value.write().or_poisoned());
let new_value = owner.with_cleanup(|| {
subscriber.with_observer(|| fun(old_value))
});
*value.write().or_poisoned() = Some(new_value);
}
}
}
});
}
RenderEffect { value, inner }
}
pub fn with_value_mut<U>(
&self,
fun: impl FnOnce(&mut T) -> U,
) -> Option<U> {
self.value.write().or_poisoned().as_mut().map(fun)
}
pub fn take_value(&self) -> Option<T> {
self.value.write().or_poisoned().take()
}
}
impl<T> RenderEffect<T>
where
T: Send + Sync + 'static,
{
pub fn new_isomorphic(
fun: impl FnMut(Option<T>) -> T + Send + Sync + 'static,
) -> Self {
#[cfg(feature = "subsecond")]
let mut fun = subsecond::HotFn::current(fun);
#[cfg(feature = "subsecond")]
let fun = move |prev| fun.call((prev,));
fn erased<T: Send + Sync + 'static>(
mut fun: Box<dyn FnMut(Option<T>) -> T + Send + Sync + 'static>,
) -> RenderEffect<T> {
let (observer, mut rx) = channel();
let value = Arc::new(RwLock::new(None::<T>));
let owner = Owner::new();
let inner = Arc::new(RwLock::new(EffectInner {
dirty: false,
observer,
sources: SourceSet::new(),
}));
let initial_value = owner
.with(|| inner.to_any_subscriber().with_observer(|| fun(None)));
*value.write().or_poisoned() = Some(initial_value);
crate::spawn({
let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber();
async move {
while rx.next().await.is_some() {
if !owner.paused()
&& subscriber.with_observer(|| {
subscriber.update_if_necessary()
})
{
subscriber.clear_sources(&subscriber);
let old_value =
mem::take(&mut *value.write().or_poisoned());
let new_value = owner.with_cleanup(|| {
subscriber.with_observer(|| fun(old_value))
});
*value.write().or_poisoned() = Some(new_value);
}
}
}
});
RenderEffect { value, inner }
}
erased(Box::new(fun))
}
}
impl<T> ToAnySubscriber for RenderEffect<T> {
fn to_any_subscriber(&self) -> AnySubscriber {
AnySubscriber(
Arc::as_ptr(&self.inner) as usize,
Arc::downgrade(&self.inner) as Weak<dyn Subscriber + Send + Sync>,
)
}
}