use crate::stream::Stream;
use crate::sync::Mutex;
use crate::types::{MaybeOwned, Storage};
use std::fmt;
use std::sync::{mpsc, Arc};
#[cfg(feature = "lazycell")]
use lazycell::AtomicLazyCell;
pub struct Signal<T>(Arc<dyn Fn() -> T + Send + Sync>);
impl<T> Signal<T> {
#[inline]
pub fn constant(val: T) -> Self
where
T: Clone + Send + Sync + 'static,
{
Signal::from_fn(move || val.clone())
}
#[inline]
pub fn from_fn<F>(f: F) -> Self
where
F: Fn() -> T + Send + Sync + 'static,
{
Signal(Arc::new(f))
}
#[inline]
pub(crate) fn from_storage<S>(storage: Arc<Storage<T>>, source: S) -> Self
where
T: Clone + Send + Sync + 'static,
S: Send + Sync + 'static,
{
Signal::from_fn(move || {
let _keepalive = &source;
storage.get()
})
}
#[inline]
pub fn sample(&self) -> T {
self.0()
}
pub fn map<F, R>(&self, f: F) -> Signal<R>
where
F: Fn(T) -> R + Send + Sync + 'static,
T: 'static,
{
let this = self.clone();
Signal::from_fn(move || f(this.sample()))
}
pub fn fold<A, F>(&self, initial: A, f: F) -> Signal<A>
where
F: Fn(A, T) -> A + Send + Sync + 'static,
T: 'static,
A: Clone + Send + Sync + 'static,
{
let this = self.clone();
let storage = Storage::new(initial);
Signal::from_fn(move || {
let val = this.sample();
storage.replace_fetch(|acc| f(acc, val))
})
}
pub fn snapshot<S, F, R>(&self, trigger: &Stream<S>, f: F) -> Stream<R>
where
F: Fn(T, MaybeOwned<'_, S>) -> R + Send + Sync + 'static,
T: 'static,
S: 'static,
R: 'static,
{
let this = self.clone();
trigger.map(move |t| f(this.sample(), t))
}
#[inline]
pub fn from_channel(initial: T, rx: mpsc::Receiver<T>) -> Self
where
T: Clone + Send + Sync + 'static,
{
Self::fold_channel(initial, rx, |_, v| v)
}
pub fn fold_channel<V, F>(initial: T, rx: mpsc::Receiver<V>, f: F) -> Self
where
F: Fn(T, V) -> T + Send + Sync + 'static,
T: Clone + Send + Sync + 'static,
V: Send + 'static,
{
let storage = Storage::new(initial);
let rx = Mutex::new(rx);
Signal::from_fn(move || {
let source = rx.lock();
if let Ok(first) = source.try_recv() {
storage.replace_fetch(|old| {
let acc = f(old, first);
source.try_iter().fold(acc, &f)
})
} else {
storage.get()
}
})
}
#[cfg(feature = "lazycell")]
pub fn cyclic<F>(definition: F) -> Self
where
F: FnOnce(&Signal<T>) -> Signal<T>,
T: 'static,
{
let storage = Arc::new(AtomicLazyCell::new());
let st = storage.clone();
let sig = Signal::from_fn(move || {
Signal::sample(st.borrow().expect("sampled forward-declared Signal"))
});
storage.fill(definition(&sig)).unwrap();
sig
}
}
impl<T: 'static> Signal<Signal<T>> {
pub fn switch(&self) -> Signal<T> {
let this = self.clone();
Signal::from_fn(move || this.sample().sample())
}
}
impl<T> Clone for Signal<T> {
#[inline]
fn clone(&self) -> Self {
Signal(self.0.clone())
}
}
impl<T: Default + 'static> Default for Signal<T> {
#[inline]
fn default() -> Self {
Signal::from_fn(T::default)
}
}
impl<T: Clone + Send + Sync + 'static> From<T> for Signal<T> {
#[inline]
fn from(val: T) -> Self {
Signal::constant(val)
}
}
impl<T> fmt::Debug for Signal<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Signal(Fn@{:p})", self.0)
}
}
impl<T: fmt::Display> fmt::Display for Signal<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.sample(), f)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
#[test]
fn signal_constant() {
let signal = Signal::constant(42);
let double = signal.map(|a| a * 2);
let plusone = double.map(|a| a + 1);
assert_eq!(signal.sample(), 42);
assert_eq!(double.sample(), 84);
assert_eq!(plusone.sample(), 85);
let c = AtomicUsize::new(0);
let counter = signal.map(move |a| a + c.fetch_add(1, Ordering::Relaxed));
assert_eq!(counter.sample(), 42);
assert_eq!(counter.sample(), 43);
}
#[test]
fn signal_dynamic() {
let t = Instant::now();
let signal = Signal::from_fn(move || t);
assert_eq!(signal.sample(), t);
let n = Arc::new(AtomicUsize::new(1));
let n_ = n.clone();
let signal = Signal::from_fn(move || n_.load(Ordering::Relaxed));
let double = signal.map(|a| a * 2);
let plusone = double.map(|a| a + 1);
assert_eq!(signal.sample(), 1);
assert_eq!(double.sample(), 2);
assert_eq!(plusone.sample(), 3);
n.store(13, Ordering::Relaxed);
assert_eq!(signal.sample(), 13);
assert_eq!(double.sample(), 26);
assert_eq!(plusone.sample(), 27);
}
#[test]
fn signal_fold() {
let sig1 = Signal::constant(1).fold(0, |a, n| a + n);
let sig2 = Signal::from_fn(|| 1).fold(0, |a, n| a + n);
assert_eq!(sig1.sample(), 1);
assert_eq!(sig2.sample(), 1);
assert_eq!(sig1.sample(), 2);
assert_eq!(sig2.sample(), 2);
}
#[test]
fn signal_default() {
let sig1: Signal<i32> = Default::default();
let sig2: Signal<String> = Default::default();
assert_eq!(sig1.sample(), 0);
assert_eq!(sig2.sample(), "");
}
#[test]
fn signal_from() {
let sig1 = Signal::from(42);
let sig2: Signal<i32> = 13.into();
assert_eq!(sig1.sample(), 42);
assert_eq!(sig2.sample(), 13);
}
#[test]
fn signal_display() {
let sig1 = Signal::constant(42);
let sig2 = Signal::from_fn(|| 13);
assert_eq!(format!("{}", sig1), "42");
assert_eq!(format!("{}", sig2), "13");
}
#[test]
fn signal_channel() {
let (tx, rx) = mpsc::channel();
let sig = Signal::fold_channel(0, rx, |a, n| a + n);
for i in 0..=10 {
tx.send(i).unwrap();
}
assert_eq!(sig.sample(), 55);
}
}