#![allow(clippy::type_complexity)]
use std::cell::{Cell, RefCell};
use std::fmt;
use std::rc::Rc;
use crate::batch::{batch_depth, push_batched_notification};
use crate::observer::OBSERVER;
pub(crate) type SubscriberId = u64;
pub(crate) struct Subscriber {
pub(crate) id: SubscriberId,
pub(crate) alive: Rc<Cell<bool>>,
pub(crate) callback: Rc<dyn Fn()>,
}
pub(crate) struct SignalState<T> {
pub(crate) value: T,
pub(crate) version: u64,
pub(crate) next_subscriber_id: SubscriberId,
pub(crate) subscribers: Vec<Subscriber>,
dirty: bool,
notifying: bool,
}
pub struct Signal<T> {
pub(crate) state: Rc<RefCell<SignalState<T>>>,
}
impl<T> Signal<T> {
pub(crate) fn state_addr(&self) -> usize {
Rc::as_ptr(&self.state) as usize
}
#[must_use]
pub fn new(val: T) -> Self {
Self {
state: Rc::new(RefCell::new(SignalState {
value: val,
version: 0,
next_subscriber_id: 0,
subscribers: Vec::new(),
dirty: false,
notifying: false,
})),
}
}
#[must_use]
pub fn read(&self) -> T
where
T: Clone + 'static,
{
let val = self.state.borrow().value.clone();
track_observer(self);
val
}
pub fn set(&self, val: T)
where
T: 'static,
{
let mut state = self.state.borrow_mut();
state.value = val;
state.version = state.version.wrapping_add(1);
let subs = Self::prepare_notification(&mut state);
drop(state);
if let Some(subs) = subs {
Self::schedule_notification(&self.state, subs);
}
}
fn notify_signal_state(state_ref: &Rc<RefCell<SignalState<T>>>)
where
T: 'static,
{
let subs = {
let mut state = state_ref.borrow_mut();
if !state.dirty {
return;
}
state.notifying = true;
state.dirty = false;
let s: Vec<(Rc<Cell<bool>>, Rc<dyn Fn()>)> = state
.subscribers
.iter()
.filter(|s| s.alive.get())
.map(|s| (Rc::clone(&s.alive), Rc::clone(&s.callback)))
.collect();
state.subscribers.retain(|s| s.alive.get());
s
};
if Self::notify_and_check_follow_up(state_ref, &subs) {
let state_ref2 = Rc::clone(state_ref);
executor_schedule(move || {
Self::notify_signal_state(&state_ref2);
});
}
}
fn notify_and_check_follow_up(
state_ref: &Rc<RefCell<SignalState<T>>>,
subs: &[(Rc<Cell<bool>>, Rc<dyn Fn()>)],
) -> bool {
for (alive, cb) in subs {
if alive.get() {
cb();
}
}
let mut state = state_ref.borrow_mut();
state.notifying = false;
state.subscribers.retain(|s| s.alive.get());
state.dirty
}
fn prepare_notification(
state: &mut SignalState<T>,
) -> Option<Vec<(Rc<Cell<bool>>, Rc<dyn Fn()>)>> {
if state.notifying {
state.dirty = true;
return None;
}
if state.dirty {
return None;
}
if state.subscribers.is_empty() {
return None;
}
state.dirty = true;
let subs: Vec<(Rc<Cell<bool>>, Rc<dyn Fn()>)> = state
.subscribers
.iter()
.filter(|s| s.alive.get())
.map(|s| (Rc::clone(&s.alive), Rc::clone(&s.callback)))
.collect();
Some(subs)
}
fn schedule_notification(
state_ref: &Rc<RefCell<SignalState<T>>>,
subs: Vec<(Rc<Cell<bool>>, Rc<dyn Fn()>)>,
) where
T: 'static,
{
let state_ref = Rc::clone(state_ref);
let notification = move || {
{
let mut state = state_ref.borrow_mut();
state.notifying = true;
state.dirty = false;
state.subscribers.retain(|s| s.alive.get());
}
if Self::notify_and_check_follow_up(&state_ref, &subs) {
let state_ref2 = Rc::clone(&state_ref);
executor_schedule(move || {
Self::notify_signal_state(&state_ref2);
});
}
};
if batch_depth() > 0 {
push_batched_notification(Box::new(notification));
} else {
executor_schedule(notification);
}
}
pub fn set_if_changed(&self, val: T)
where
T: PartialEq + 'static,
{
let changed = self.with(|current| current != &val);
if changed {
self.set(val);
}
}
pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U
where
T: 'static,
{
let result = f(&self.state.borrow().value);
track_observer(self);
result
}
pub fn map<U, F>(&self, f: F) -> SignalMap<T, U, F>
where
F: Fn(&T) -> U,
{
SignalMap {
source: self.clone(),
f,
_phantom: std::marker::PhantomData,
}
}
pub fn changed(&self) -> crate::SignalChangedFuture<T> {
crate::SignalChangedFuture::new(self)
}
pub fn map_changed<U, F>(&self, f: F) -> crate::MapChangedFuture<T, U, F>
where
F: Fn(&T) -> U,
{
crate::MapChangedFuture::new(self, f)
}
pub fn filter_changed<F>(&self, f: F) -> crate::FilterChangedFuture<T, F>
where
F: Fn(&T) -> bool,
{
crate::FilterChangedFuture::new(self, f)
}
#[must_use]
pub fn ptr_eq(&self, other: &Self) -> bool {
Rc::ptr_eq(&self.state, &other.state)
}
#[must_use]
pub fn subscriber_count(&self) -> usize {
self.state.borrow().subscribers.len()
}
#[must_use]
pub fn version(&self) -> u64 {
self.state.borrow().version
}
#[cfg(test)]
#[must_use]
pub fn debug_count_waiters(&self) -> usize {
self.subscriber_count()
}
pub(crate) fn bump_version(&self)
where
T: 'static,
{
let mut state = self.state.borrow_mut();
state.version = state.version.wrapping_add(1);
let subs = Self::prepare_notification(&mut state);
drop(state);
if let Some(subs) = subs {
Self::schedule_notification(&self.state, subs);
}
}
}
impl<T> Clone for Signal<T> {
fn clone(&self) -> Self {
Self {
state: Rc::clone(&self.state),
}
}
}
impl<T: fmt::Debug> fmt::Debug for Signal<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.borrow();
f.debug_struct("Signal")
.field("value", &state.value)
.field("version", &state.version)
.field("subscribers", &state.subscribers.len())
.finish()
}
}
impl<T: Default> Default for Signal<T> {
fn default() -> Self {
Self::new(T::default())
}
}
pub struct SignalMap<T, U, F> {
source: Signal<T>,
f: F,
_phantom: std::marker::PhantomData<fn() -> U>,
}
impl<T: Clone + 'static, U, F: Fn(&T) -> U> SignalMap<T, U, F> {
#[must_use]
pub fn read(&self) -> U {
self.source.with(|v| (self.f)(v))
}
#[must_use]
pub fn with<R>(&self, g: impl FnOnce(&U) -> R) -> R {
self.source.with(|v| g(&(self.f)(v)))
}
pub async fn changed(&self) -> U {
self.source.changed().await;
self.read()
}
}
impl<T, U, F: Clone> Clone for SignalMap<T, U, F> {
fn clone(&self) -> Self {
Self {
source: self.source.clone(),
f: self.f.clone(),
_phantom: std::marker::PhantomData,
}
}
}
pub(crate) fn borrow_state<T>(sig: &Signal<T>) -> std::cell::Ref<'_, SignalState<T>> {
sig.state.borrow()
}
#[doc(hidden)]
pub fn subscribe<T>(sig: &Signal<T>, callback: Rc<dyn Fn()>) -> SubscriberId {
let mut state = sig.state.borrow_mut();
let id = state.next_subscriber_id;
state.next_subscriber_id = state.next_subscriber_id.wrapping_add(1);
state.subscribers.push(Subscriber {
id,
alive: Rc::new(Cell::new(true)),
callback,
});
id
}
#[doc(hidden)]
pub fn unsubscribe<T>(sig: &Signal<T>, id: SubscriberId) {
let mut state = sig.state.borrow_mut();
if let Some(sub) = state.subscribers.iter().find(|s| s.id == id) {
sub.alive.set(false);
}
state.subscribers.retain(|s| s.id != id);
}
thread_local! {
static SCHEDULE_FN: RefCell<Option<Box<dyn Fn(Box<dyn FnOnce()>)>>> = RefCell::new(None);
}
#[doc(hidden)]
pub fn install_schedule_hook(hook: Box<dyn Fn(Box<dyn FnOnce()>)>) {
SCHEDULE_FN.with(|cell| {
*cell.borrow_mut() = Some(hook);
});
}
#[doc(hidden)]
pub fn remove_schedule_hook() {
SCHEDULE_FN.with(|cell| {
*cell.borrow_mut() = None;
});
}
pub(crate) fn executor_schedule(f: impl FnOnce() + 'static) {
SCHEDULE_FN.with(|cell| {
if let Some(hook) = cell.borrow().as_ref() {
hook(Box::new(f));
} else {
f();
}
});
}
fn track_observer<T: 'static>(sig: &Signal<T>) {
OBSERVER.with(|cell| {
if let Some(ref observer) = *cell.borrow() {
let key = crate::memo::SignalKey::new(sig);
{
let mut seen = observer.seen.borrow_mut();
if !seen.insert(key) {
observer.re_read.borrow_mut().insert(key);
return;
}
}
let signal = sig.clone();
let alive = Rc::new(Cell::new(true));
let alive_clone = Rc::clone(&alive);
let dirty_cb = Rc::clone(&observer.dirty_callback);
let callback: Rc<dyn Fn()> = Rc::new(move || {
if alive_clone.get() {
dirty_cb();
}
});
let id = subscribe(&signal, callback);
let cleanup = Box::new(move || {
alive.set(false);
unsubscribe(&signal, id);
});
(observer.on_subscribe)(key, cleanup);
}
});
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn signal_version_increments_on_set() {
let sig = Signal::new(0i32);
assert_eq!(sig.version(), 0);
sig.set(1);
assert_eq!(sig.version(), 1);
sig.set(2);
assert_eq!(sig.version(), 2);
}
#[test]
fn signal_version_unchanged_without_set() {
let sig = Signal::new(42);
let v1 = sig.version();
let _ = sig.read();
assert_eq!(sig.version(), v1);
}
}