#[macro_use(lift)]
extern crate carboxyl;
extern crate time;
#[macro_use(lazy_static)]
extern crate lazy_static;
use std::thread;
use carboxyl::{ Stream, Signal, Sink };
use time::{ Duration, Tm };
macro_rules! static_signal {
($t: ty, $f: expr) => { {
lazy_static! {
static ref TIME_SIGNAL: Signal<$t> = lift!($f);
}
TIME_SIGNAL.clone()
} }
}
pub fn now() -> Signal<Tm> {
static_signal!(Tm, time::now)
}
pub fn now_utc() -> Signal<Tm> {
static_signal!(Tm, time::now_utc)
}
pub fn every(interval: Duration) -> Stream<Duration> {
let sink = Sink::new();
let stream = sink.stream();
thread::spawn({
let mut last = time::now();
move || loop {
let togo = last + interval - time::now();
if togo < Duration::zero() {
let passed = interval * (1 + togo.num_milliseconds() / interval.num_milliseconds()) as i32;
sink.send(passed);
last = last + passed;
} else {
thread::sleep(togo.to_std().unwrap());
}
}
});
stream
}
pub fn integrate<A, B, F>(a: &Signal<A>, initial: B, dt: Duration, f: F) -> Signal<B>
where A: Clone + Send + Sync + 'static,
B: Clone + Send + Sync + 'static,
F: Fn(B, A, Duration) -> B + Send + Sync + 'static,
{
a.snapshot(&every(dt), |a, dt| (a, dt))
.fold(initial, move |b, (a, dt)| f(b, a, dt))
}
#[cfg(test)]
mod test {
use std::thread;
use std::time;
use std::fmt::Debug;
use carboxyl::{ Signal, Sink };
use time::{ Duration, Tm };
use super::{ now, now_utc, every };
fn samples_equal<A, F: Fn() -> Signal<A>>(f: F)
where A: PartialEq + Debug + Send + Clone + Sync + 'static,
{
let sink = Sink::new();
let cmp = f().snapshot(
&f().snapshot(&sink.stream(), |t, ()| t),
|t0, t1| (t0, t1)
);
let mut events = cmp.events();
sink.send(());
let (t0, t1) = events.next().unwrap();
assert_eq!(t0, t1);
}
#[test]
fn now_samples_equal() {
samples_equal(now);
}
#[test]
fn now_utc_samples_equal() {
samples_equal(now_utc);
}
fn consistent_with_sleep_ms<F: Fn() -> Signal<Tm>>(f: F) {
let t0 = f().sample();
for n in 0..5 {
thread::sleep(time::Duration::from_millis(n));
let dt = f().sample() - t0;
assert!(dt > Duration::milliseconds(n as i64));
}
}
#[test]
fn now_consistent_with_sleep_ms() {
consistent_with_sleep_ms(now);
}
#[test]
fn now_utc_consistent_with_sleep_ms() {
consistent_with_sleep_ms(now_utc);
}
#[test]
fn every_timing() {
let dt = Duration::microseconds(10000);
let ms = now().snapshot(&every(dt), |t, _| t);
let mut events = ms.events();
events.next();
let t1 = events.next().unwrap();
let t2 = events.next().unwrap();
let delta = t2 - t1;
assert!(delta < Duration::microseconds(10500));
assert!(delta > Duration::microseconds(9500));
}
}