use std::sync::{
Arc, RwLock, Weak,
atomic::{AtomicBool, Ordering},
};
use crate::computed::SignalSubscribable;
struct SignalInner<T> {
value: T,
subscribers: Vec<Weak<AtomicBool>>,
}
impl<T> SignalInner<T> {
fn new(initial: T) -> Self {
Self { value: initial, subscribers: Vec::new() }
}
fn notify_subscribers(&mut self) {
self.subscribers.retain(|weak| {
if let Some(flag) = weak.upgrade() {
flag.store(true, Ordering::Release);
true
} else {
false }
});
}
}
pub struct Signal<T: Clone + Send + Sync + 'static> {
inner: Arc<RwLock<SignalInner<T>>>,
}
impl<T: Clone + Send + Sync + 'static> Signal<T> {
pub fn new(initial: T) -> Self {
Self { inner: Arc::new(RwLock::new(SignalInner::new(initial))) }
}
pub fn get(&self) -> T {
self.inner.read().expect("Signal RwLock poisoned").value.clone()
}
pub fn set(&self, value: T) {
let mut guard = self.inner.write().expect("Signal RwLock poisoned");
guard.value = value;
guard.notify_subscribers();
}
pub fn subscribe(&self) -> Arc<AtomicBool> {
let flag = Arc::new(AtomicBool::new(false));
self.inner
.write()
.expect("Signal RwLock poisoned")
.subscribers
.push(Arc::downgrade(&flag));
flag
}
pub(crate) fn subscribe_with_flag(&self, flag: Arc<AtomicBool>) {
self.inner
.write()
.expect("Signal RwLock poisoned")
.subscribers
.push(Arc::downgrade(&flag));
}
}
impl<T: Clone + Send + Sync + 'static> Clone for Signal<T> {
fn clone(&self) -> Self {
Self { inner: Arc::clone(&self.inner) }
}
}
impl<T: Clone + Send + Sync + 'static> SignalSubscribable for Signal<T> {
fn subscribe_with_flag(&self, flag: Arc<AtomicBool>) {
Signal::subscribe_with_flag(self, flag);
}
}
impl<T: Clone + Send + Sync + std::fmt::Debug + 'static> std::fmt::Debug for Signal<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.inner.try_read() {
Ok(guard) => write!(f, "Signal({:?})", guard.value),
Err(_) => write!(f, "Signal(<locked>)"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn get_returns_initial_value() {
let s = Signal::new(42i32);
assert_eq!(s.get(), 42);
}
#[test]
fn set_updates_value() {
let s = Signal::new(0i32);
s.set(99);
assert_eq!(s.get(), 99);
}
#[test]
fn subscribe_flag_starts_clean() {
let s = Signal::new(0i32);
let flag = s.subscribe();
assert!(!flag.load(Ordering::Acquire));
}
#[test]
fn set_marks_subscriber_dirty() {
let s = Signal::new(0i32);
let flag = s.subscribe();
s.set(1);
assert!(flag.load(Ordering::Acquire));
}
#[test]
fn dropped_subscriber_does_not_prevent_set() {
let s = Signal::new(0i32);
{
let _flag = s.subscribe(); }
s.set(1);
assert_eq!(s.get(), 1);
}
#[test]
fn clone_shares_value() {
let s1 = Signal::new("hello");
let s2 = s1.clone();
s1.set("world");
assert_eq!(s2.get(), "world");
}
}