#[cfg(test)]
use quickcheck::{Arbitrary, Gen};
use std::fmt;
use std::ops::Deref;
use std::sync::{Arc, Mutex, RwLock, Weak};
use crate::lift;
use crate::pending::Pending;
use crate::source::{with_weak, CallbackError, Source};
use crate::stream::{self, BoxClone, Stream};
#[cfg(test)]
use crate::testing::ArcFn;
use crate::transaction::{commit, later};
pub struct FuncSignal<A> {
func: Box<dyn Fn() -> A + Send + Sync + 'static>,
cache: Arc<Mutex<Option<A>>>,
}
impl<A> FuncSignal<A> {
pub fn new<F: Fn() -> A + Send + Sync + 'static>(f: F) -> FuncSignal<A> {
FuncSignal {
func: Box::new(f),
cache: Arc::new(Mutex::new(None)),
}
}
}
impl<A: Clone + 'static> FuncSignal<A> {
pub fn call(&self) -> A {
let mut cached = self.cache.lock().unwrap();
match &mut *cached {
&mut Some(ref value) => value.clone(),
cached => {
let cache = self.cache.clone();
later(move || {
let mut live = cache.lock().unwrap();
*live = None;
});
let value = (self.func)();
*cached = Some(value.clone());
value
}
}
}
}
pub enum SignalFn<A> {
Const(A),
Func(FuncSignal<A>),
}
impl<A> SignalFn<A> {
pub fn from_fn<F: Fn() -> A + Send + Sync + 'static>(f: F) -> SignalFn<A> {
SignalFn::Func(FuncSignal::new(f))
}
}
impl<A: Clone + 'static> SignalFn<A> {
pub fn call(&self) -> A {
match *self {
SignalFn::Const(ref a) => a.clone(),
SignalFn::Func(ref f) => f.call(),
}
}
}
pub fn reg_signal<A, B, F>(parent_source: &mut Source<A>, signal: &Signal<B>, handler: F)
where
A: Send + Sync + 'static,
B: Send + Sync + 'static,
F: Fn(A) -> SignalFn<B> + Send + Sync + 'static,
{
let weak_source = Arc::downgrade(&signal.source);
let weak_current = Arc::downgrade(&signal.current);
parent_source.register(move |a| {
weak_current
.upgrade()
.map(|cur| {
later(move || {
let _ = cur.write().map(|mut cur| cur.update());
})
})
.ok_or(CallbackError::Disappeared)
.and(with_weak(&weak_current, |cur| cur.queue(handler(a))))
.and(with_weak(&weak_source, |src| src.send(())))
});
}
pub fn signal_build<A, K>(func: SignalFn<A>, keep_alive: K) -> Signal<A>
where
K: Send + Sync + Clone + 'static,
{
Signal::build(func, keep_alive)
}
pub fn signal_current<A>(signal: &Signal<A>) -> &Arc<RwLock<Pending<SignalFn<A>>>> {
&signal.current
}
pub fn signal_source<A>(signal: &Signal<A>) -> &Arc<RwLock<Source<()>>> {
&signal.source
}
pub fn sample_raw<A: Clone + 'static>(signal: &Signal<A>) -> A {
signal.current.read().unwrap().call()
}
pub struct Signal<A> {
current: Arc<RwLock<Pending<SignalFn<A>>>>,
source: Arc<RwLock<Source<()>>>,
#[allow(dead_code)]
keep_alive: Box<dyn BoxClone>,
}
impl<A> Clone for Signal<A> {
fn clone(&self) -> Signal<A> {
Signal {
current: self.current.clone(),
source: self.source.clone(),
keep_alive: self.keep_alive.box_clone(),
}
}
}
impl<A> Signal<A> {
fn build<K>(func: SignalFn<A>, keep_alive: K) -> Signal<A>
where
K: Send + Sync + Clone + 'static,
{
Signal {
current: Arc::new(RwLock::new(Pending::new(func))),
source: Arc::new(RwLock::new(Source::new())),
keep_alive: Box::new(keep_alive),
}
}
}
impl<A: Clone + 'static> Signal<A> {
pub fn new(a: A) -> Signal<A> {
Signal::build(SignalFn::Const(a), ())
}
pub fn sample(&self) -> A {
commit(|| sample_raw(self))
}
}
impl<A: Clone + Send + Sync + 'static> Signal<A> {
pub fn cyclic<F>(def: F) -> Signal<A>
where
F: FnOnce(&Signal<A>) -> Signal<A>,
{
commit(|| {
let cycle = SignalCycle::new();
let finished = def(&cycle);
cycle.define(finished)
})
}
pub fn snapshot<B, C, F>(&self, stream: &Stream<B>, f: F) -> Stream<C>
where
B: Clone + Send + Sync + 'static,
C: Clone + Send + Sync + 'static,
F: Fn(A, B) -> C + Send + Sync + 'static,
{
stream::snapshot(self, stream, f)
}
pub fn map<B, F>(&self, function: F) -> Signal<B>
where
B: Clone + Send + Sync + 'static,
F: Fn(A) -> B + Send + Sync + 'static,
{
lift::lift1(function, self)
}
}
impl<A: Clone + Send + Sync + 'static> Signal<Signal<A>> {
pub fn switch(&self) -> Signal<A> {
fn make_callback<A>(parent: &Signal<Signal<A>>) -> SignalFn<A>
where
A: Send + Clone + Sync + 'static,
{
let current_signal = parent.current.clone();
SignalFn::from_fn(move || sample_raw(¤t_signal.read().unwrap().call()))
}
commit(|| {
let signal = Signal::build(make_callback(self), ());
let parent = self.clone();
reg_signal(&mut self.source.write().unwrap(), &signal, move |_| {
make_callback(&parent)
});
signal
})
}
}
impl<A: Clone + Send + Sync + 'static> Signal<Stream<A>> {
pub fn switch(&self) -> Stream<A> {
fn reg_callback<A: Send + Sync + Clone + 'static>(
parent: &Signal<Stream<A>>,
weak_src: Weak<RwLock<Source<A>>>,
weak_term: Weak<()>,
) {
let stream_a = sample_raw(parent);
stream::source(&stream_a)
.write()
.unwrap()
.register(move |a| {
weak_term
.upgrade()
.ok_or(CallbackError::Disappeared)
.and_then(|_| with_weak(&weak_src, |src| src.send(a)))
});
}
commit(|| {
let mut terminate = Arc::new(());
let src = Arc::new(RwLock::new(Source::new()));
later({
let parent = self.clone();
let weak_src = Arc::downgrade(&src);
let weak_term = Arc::downgrade(&terminate);
move || reg_callback(&parent, weak_src, weak_term)
});
self.source.write().unwrap().register({
let parent = self.clone();
let weak_src = Arc::downgrade(&src);
move |_| {
let parent = parent.clone();
let weak_src = weak_src.clone();
terminate = Arc::new(());
let weak_term = Arc::downgrade(&terminate);
later(move || reg_callback(&parent, weak_src, weak_term));
Ok(())
}
});
stream::build(src, self)
})
}
}
#[cfg(test)]
impl<A, B> Signal<ArcFn<A, B>>
where
A: Clone + Send + Sync + 'static,
B: Clone + Send + Sync + 'static,
{
fn apply(&self, signal: &Signal<A>) -> Signal<B> {
lift::lift2(|f, a| f(a), self, signal)
}
}
#[cfg(test)]
impl<A: Arbitrary + Send + Sync + Clone + 'static> Arbitrary for Signal<A> {
fn arbitrary(g: &mut Gen) -> Signal<A> {
let values = Vec::<A>::arbitrary(g);
if values.is_empty() {
Signal::new(Arbitrary::arbitrary(g))
} else {
let n = Mutex::new(0);
lift::lift0(move || {
let mut n = n.lock().unwrap();
*n += 1;
if *n >= values.len() {
*n = 0
}
values[*n].clone()
})
}
}
}
impl<A: fmt::Debug + Clone + 'static> fmt::Debug for Signal<A> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
commit(|| match **self.current.read().unwrap() {
SignalFn::Const(ref a) => fmt
.debug_struct("Signal::const")
.field("value", &a)
.finish(),
SignalFn::Func(ref f) => fmt
.debug_struct("Signal::fn")
.field("current", &f.call())
.finish(),
})
}
}
struct SignalCycle<A> {
signal: Signal<A>,
}
impl<A: Send + Sync + Clone + 'static> SignalCycle<A> {
pub fn new() -> SignalCycle<A> {
SignalCycle {
signal: Signal::build(
SignalFn::from_fn(|| panic!("sampled on forward-declaration of signal")),
(),
),
}
}
pub fn define(self, definition: Signal<A>) -> Signal<A> {
fn make_callback<A>(current_def: &Arc<RwLock<Pending<SignalFn<A>>>>) -> SignalFn<A>
where
A: Send + Sync + Clone + 'static,
{
match *current_def.read().unwrap().future() {
SignalFn::Const(ref a) => SignalFn::Const(a.clone()),
SignalFn::Func(_) => SignalFn::from_fn({
let sig = Arc::downgrade(¤t_def);
move || {
let strong = sig.upgrade().unwrap();
let ret = strong.read().unwrap().call();
ret
}
}),
}
}
commit(move || {
*self.signal.current.write().unwrap() =
Pending::new(make_callback(&definition.current));
let weak_parent = Arc::downgrade(&definition.current);
reg_signal(
&mut definition.source.write().unwrap(),
&self.signal,
move |_| make_callback(&weak_parent.upgrade().unwrap()),
);
Signal {
keep_alive: Box::new(definition),
..self.signal
}
})
}
}
impl<A> Deref for SignalCycle<A> {
type Target = Signal<A>;
fn deref(&self) -> &Signal<A> {
&self.signal
}
}
pub fn hold<A>(initial: A, stream: &Stream<A>) -> Signal<A>
where
A: Send + Sync + 'static,
{
commit(|| {
let signal = Signal::build(SignalFn::Const(initial), stream.clone());
reg_signal(
&mut stream::source(&stream).write().unwrap(),
&signal,
SignalFn::Const,
);
signal
})
}
#[cfg(test)]
mod test {
use quickcheck::quickcheck;
use crate::lift::lift1;
use crate::signal::{self, Signal, SignalCycle};
use crate::stream::Sink;
use crate::testing::{id, partial_comp, pure_fn, signal_eq, ArcFn};
#[test]
fn functor_identity() {
fn check(signal: Signal<i32>) -> bool {
let eq = signal_eq(&signal, &lift1(id, &signal));
(0..10).all(|_| eq.sample())
}
quickcheck(check as fn(Signal<i32>) -> bool);
}
#[test]
fn functor_composition() {
fn check(signal: Signal<i32>) -> bool {
fn f(n: i32) -> i32 {
3 * n
}
fn g(n: i32) -> i32 {
n + 2
}
let eq = signal_eq(&lift1(|n| f(g(n)), &signal), &lift1(f, &lift1(g, &signal)));
(0..10).all(|_| eq.sample())
}
quickcheck(check as fn(Signal<i32>) -> bool);
}
#[test]
fn applicative_identity() {
fn check(signal: Signal<i32>) -> bool {
let eq = signal_eq(&pure_fn(id).apply(&signal), &signal);
(0..10).all(|_| eq.sample())
}
quickcheck(check as fn(Signal<i32>) -> bool);
}
#[test]
fn applicative_composition() {
fn check(signal: Signal<i32>) -> bool {
fn f(n: i32) -> i32 {
n * 4
}
fn g(n: i32) -> i32 {
n - 3
}
let u = pure_fn(f);
let v = pure_fn(g);
let eq = signal_eq(
&pure_fn(partial_comp).apply(&u).apply(&v).apply(&signal),
&u.apply(&v.apply(&signal)),
);
(0..10).all(|_| eq.sample())
}
quickcheck(check as fn(Signal<i32>) -> bool);
}
#[test]
fn applicative_homomorphism() {
fn check(x: i32) -> bool {
fn f(x: i32) -> i32 {
x * (-5)
}
let eq = signal_eq(&pure_fn(f).apply(&Signal::new(x)), &Signal::new(f(x)));
(0..10).all(|_| eq.sample())
}
quickcheck(check as fn(i32) -> bool);
}
#[test]
fn applicative_interchange() {
fn check(x: i32) -> bool {
fn f(x: i32) -> i32 {
x * 2 - 7
}
let u = pure_fn(f);
let eq = signal_eq(
&u.apply(&Signal::new(x)),
&pure_fn(move |f: ArcFn<i32, i32>| f(x)).apply(&u),
);
(0..10).all(|_| eq.sample())
}
quickcheck(check as fn(i32) -> bool);
}
#[test]
fn clone() {
let b = Signal::new(3);
assert_eq!(b.clone().sample(), 3);
}
#[test]
fn hold() {
let sink = Sink::new();
let signal = sink.stream().hold(3);
assert_eq!(signal.sample(), 3);
sink.send(4);
assert_eq!(signal.sample(), 4);
}
#[test]
fn hold_implicit_stream() {
let sink = Sink::new();
let signal = signal::hold(0, &sink.stream().map(|n| 2 * n));
assert_eq!(signal.sample(), 0);
sink.send(4);
assert_eq!(signal.sample(), 8);
}
#[test]
fn snapshot() {
let sink1: Sink<i32> = Sink::new();
let sink2: Sink<f64> = Sink::new();
let mut snap_events = sink1
.stream()
.hold(1)
.snapshot(&sink2.stream().map(|x| x + 3.0), |a, b| (a, b))
.events();
sink2.send(4.0);
assert_eq!(snap_events.next(), Some((1, 7.0)));
}
#[test]
fn snapshot_2() {
let ev1 = Sink::new();
let beh1 = ev1.stream().hold(5);
let ev2 = Sink::new();
let snap = beh1.snapshot(&ev2.stream(), |a, b| (a, b));
let mut events = snap.events();
ev2.send(4);
assert_eq!(events.next(), Some((5, 4)));
ev1.send(-2);
ev2.send(6);
assert_eq!(events.next(), Some((-2, 6)));
}
#[test]
fn cyclic_snapshot_accum() {
let sink = Sink::new();
let stream = sink.stream();
let accum = SignalCycle::new();
let def = accum.snapshot(&stream, |a, s| a + s).hold(0);
let accum = accum.define(def);
assert_eq!(accum.sample(), 0);
sink.send(3);
assert_eq!(accum.sample(), 3);
sink.send(7);
assert_eq!(accum.sample(), 10);
sink.send(-21);
assert_eq!(accum.sample(), -11);
}
#[test]
fn snapshot_order_standard() {
let sink = Sink::new();
let signal = sink.stream().hold(0);
let mut events = signal.snapshot(&sink.stream(), |a, b| (a, b)).events();
sink.send(1);
assert_eq!(events.next(), Some((0, 1)));
}
#[test]
fn snapshot_lift_order_standard() {
let sink = Sink::new();
let signal = sink.stream().hold(0);
let mut events = lift1(|x| x, &signal)
.snapshot(&sink.stream(), |a, b| (a, b))
.events();
sink.send(1);
assert_eq!(events.next(), Some((0, 1)));
}
#[test]
fn snapshot_order_alternative() {
let sink = Sink::new();
let first = sink.stream().map(|x| x);
let signal = sink.stream().hold(0);
let mut events = signal.snapshot(&first, |a, b| (a, b)).events();
sink.send(1);
assert_eq!(events.next(), Some((0, 1)));
}
#[test]
fn cyclic_signal_intermediate() {
let sink = Sink::new();
let stream = sink.stream();
let mut snap = None;
let sum = Signal::cyclic(|a| {
let my_snap = a.snapshot(&stream, |a, e| e + a);
snap = Some(my_snap.clone());
my_snap.hold(0)
});
let snap = snap.unwrap();
let mut events = snap.events();
sink.send(3);
assert_eq!(sum.sample(), 3);
assert_eq!(events.next(), Some(3));
}
#[test]
fn switch_signal_stream() {
let control_sink = Sink::new();
let control_stream = control_sink.stream();
let sink1 = Sink::new();
let sink2 = Sink::new();
let mut switched = control_stream
.fold(sink1.stream(), |_, s| s)
.switch()
.events();
sink2.send(1);
sink1.send(2);
assert_eq!(switched.next(), Some(2));
control_sink.send(sink2.stream());
sink1.send(3);
sink2.send(4);
assert_eq!(switched.next(), Some(4));
control_sink.send(sink1.stream());
sink2.send(5);
sink1.send(6);
assert_eq!(switched.next(), Some(6));
}
}