use std::sync::{Arc, Mutex};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SlotId(u64);
type SlotFn<T> = Box<dyn Fn(&T) + Send + Sync + 'static>;
struct SignalInner<T> {
slots: Vec<(SlotId, SlotFn<T>)>,
next_id: u64,
}
impl<T> SignalInner<T> {
fn new() -> Self {
SignalInner {
slots: Vec::new(),
next_id: 1,
}
}
}
pub struct Signal<T: Clone> {
inner: Arc<Mutex<SignalInner<T>>>,
}
impl<T: Clone + 'static> Signal<T> {
pub fn new() -> Self {
Signal {
inner: Arc::new(Mutex::new(SignalInner::new())),
}
}
pub fn connect(&self, slot: impl Fn(&T) + Send + Sync + 'static) -> SlotId {
let mut guard = match self.inner.lock() {
Ok(g) => g,
Err(_) => return SlotId(0),
};
let id = SlotId(guard.next_id);
guard.next_id = guard.next_id.wrapping_add(1);
guard.slots.push((id, Box::new(slot)));
id
}
pub fn disconnect(&self, id: SlotId) -> bool {
let mut guard = match self.inner.lock() {
Ok(g) => g,
Err(_) => return false,
};
let before = guard.slots.len();
guard.slots.retain(|(sid, _)| *sid != id);
guard.slots.len() < before
}
pub fn emit(&self, value: &T) {
let callbacks: Vec<SlotFn<T>> = {
if let Ok(guard) = self.inner.lock() {
for (_, cb) in &guard.slots {
cb(value);
}
}
Vec::new() };
drop(callbacks);
}
pub fn slot_count(&self) -> usize {
self.inner.lock().map(|g| g.slots.len()).unwrap_or(0)
}
}
impl<T: Clone + 'static> Clone for Signal<T> {
fn clone(&self) -> Self {
Signal {
inner: Arc::clone(&self.inner),
}
}
}
struct ObservableInner<T: Clone> {
value: T,
signal: Signal<T>,
}
pub struct Observable<T: Clone + 'static> {
inner: Arc<Mutex<ObservableInner<T>>>,
}
impl<T: Clone + 'static> Observable<T> {
pub fn new(value: T) -> Self {
Observable {
inner: Arc::new(Mutex::new(ObservableInner {
value,
signal: Signal::new(),
})),
}
}
pub fn get(&self) -> T {
self.inner
.lock()
.map(|g| g.value.clone())
.unwrap_or_else(|_| panic!("Observable mutex poisoned"))
}
pub fn set(&self, value: T) {
let signal_clone = {
let mut guard = match self.inner.lock() {
Ok(g) => g,
Err(_) => return,
};
guard.value = value.clone();
guard.signal.clone()
};
signal_clone.emit(&value);
}
pub fn on_change(&self) -> Signal<T> {
self.inner
.lock()
.map(|g| g.signal.clone())
.unwrap_or_else(|_| Signal::new())
}
pub fn map<U: Clone + Send + 'static>(
&self,
f: impl Fn(&T) -> U + Send + Sync + 'static,
) -> Observable<U> {
let initial = f(&self.get());
let derived = Observable::<U>::new(initial);
let derived_clone = derived.clone();
self.on_change().connect(move |v| {
derived_clone.set(f(v));
});
derived
}
}
impl<T: Clone + 'static> Clone for Observable<T> {
fn clone(&self) -> Self {
Observable {
inner: Arc::clone(&self.inner),
}
}
}
pub trait AnyObservable: Send + Sync {
fn on_any_change(&self, cb: Box<dyn Fn() + Send + Sync + 'static>) -> SlotId;
fn disconnect_any(&self, id: SlotId);
}
impl<T: Clone + Send + Sync + 'static> AnyObservable for Observable<T> {
fn on_any_change(&self, cb: Box<dyn Fn() + Send + Sync + 'static>) -> SlotId {
self.on_change().connect(move |_| cb())
}
fn disconnect_any(&self, id: SlotId) {
self.on_change().disconnect(id);
}
}
pub struct ComputedObservable<T: Clone + 'static> {
inner: Observable<T>,
_subscriptions: Arc<Mutex<Vec<(Box<dyn AnyObservable + 'static>, SlotId)>>>,
}
impl<T: Clone + Send + 'static> ComputedObservable<T> {
pub fn new(
compute_fn: impl Fn() -> T + Send + Sync + 'static,
dependencies: Vec<Box<dyn AnyObservable + 'static>>,
) -> Self {
let compute_fn = Arc::new(compute_fn);
let initial = compute_fn();
let inner = Observable::new(initial);
let mut subscriptions: Vec<(Box<dyn AnyObservable + 'static>, SlotId)> = Vec::new();
for dep in dependencies {
let inner_clone = inner.clone();
let compute_clone = Arc::clone(&compute_fn);
let id = dep.on_any_change(Box::new(move || {
inner_clone.set(compute_clone());
}));
subscriptions.push((dep, id));
}
ComputedObservable {
inner,
_subscriptions: Arc::new(Mutex::new(subscriptions)),
}
}
pub fn get(&self) -> T {
self.inner.get()
}
pub fn on_change(&self) -> Signal<T> {
self.inner.on_change()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
#[test]
fn test_signal_emit() {
let sig = Signal::<i32>::new();
let received = Arc::new(Mutex::new(Vec::new()));
let r2 = Arc::clone(&received);
sig.connect(move |v| {
r2.lock().map(|mut g| g.push(*v)).ok();
});
sig.emit(&10);
sig.emit(&20);
assert_eq!(*received.lock().unwrap(), vec![10, 20]);
}
#[test]
fn test_signal_disconnect() {
let sig = Signal::<i32>::new();
let count = Arc::new(Mutex::new(0usize));
let c2 = Arc::clone(&count);
let id = sig.connect(move |_| {
c2.lock().map(|mut g| *g += 1).ok();
});
sig.emit(&1);
sig.disconnect(id);
sig.emit(&2);
assert_eq!(*count.lock().unwrap(), 1);
}
#[test]
fn test_signal_multiple_slots() {
let sig = Signal::<i32>::new();
let sum = Arc::new(Mutex::new(0i32));
let s2 = Arc::clone(&sum);
let s3 = Arc::clone(&sum);
sig.connect(move |v| {
s2.lock().map(|mut g| *g += *v).ok();
});
sig.connect(move |v| {
s3.lock().map(|mut g| *g += *v * 2).ok();
});
sig.emit(&3);
assert_eq!(*sum.lock().unwrap(), 9);
}
#[test]
fn test_observable_set_get() {
let obs = Observable::new(42i32);
assert_eq!(obs.get(), 42);
obs.set(100);
assert_eq!(obs.get(), 100);
}
#[test]
fn test_observable_on_change() {
let obs = Observable::new(0i32);
let log = Arc::new(Mutex::new(Vec::new()));
let log2 = Arc::clone(&log);
obs.on_change().connect(move |v| {
log2.lock().map(|mut g| g.push(*v)).ok();
});
obs.set(5);
obs.set(10);
assert_eq!(*log.lock().unwrap(), vec![5, 10]);
}
#[test]
fn test_observable_map() {
let obs = Observable::new(3i32);
let doubled = obs.map(|v| *v * 2);
assert_eq!(doubled.get(), 6);
obs.set(7);
assert_eq!(doubled.get(), 14);
}
#[test]
fn test_computed_observable() {
let a = Observable::new(10i32);
let b = Observable::new(20i32);
let a_clone = a.clone();
let b_clone = b.clone();
let sum = ComputedObservable::new(
move || a_clone.get() + b_clone.get(),
vec![
Box::new(a.clone()) as Box<dyn AnyObservable>,
Box::new(b.clone()) as Box<dyn AnyObservable>,
],
);
assert_eq!(sum.get(), 30);
a.set(5);
assert_eq!(sum.get(), 25);
b.set(5);
assert_eq!(sum.get(), 10);
}
}