use crate::prelude::*;
use std::time::{Duration, Instant};
pub trait Delay {
fn delay(self, dur: Duration) -> DelayOp<Self>
where
Self: Sized,
{
DelayOp {
source: self,
delay: dur,
}
}
fn delay_at(self, at: Instant) -> DelayOp<Self>
where
Self: Sized,
{
DelayOp {
source: self,
delay: at.elapsed(),
}
}
}
impl<S> Delay for S {}
pub struct DelayOp<S> {
source: S,
delay: Duration,
}
impl<S> IntoShared for DelayOp<S>
where
S: Send + Sync + 'static,
{
type Shared = Self;
#[inline(always)]
fn to_shared(self) -> Self::Shared { self }
}
impl<Sub, S> RawSubscribable<Sub> for DelayOp<S>
where
S: IntoShared,
S::Shared: RawSubscribable<Sub::Shared>,
Sub: IntoShared,
<S::Shared as RawSubscribable<Sub::Shared>>::Unsub: IntoShared,
<<S::Shared as RawSubscribable<Sub::Shared>>::Unsub as IntoShared>::Shared:
SubscriptionLike,
{
type Unsub = SharedSubscription;
fn raw_subscribe(self, subscriber: Sub) -> Self::Unsub {
let Self { delay, source } = self;
let source = source.to_shared();
let subscriber = subscriber.to_shared();
Schedulers::ThreadPool.schedule(
move |mut subscription, _| {
subscription.add(source.raw_subscribe(subscriber).to_shared());
},
Some(delay),
(),
)
}
}
#[test]
fn smoke() {
use std::sync::{Arc, Mutex};
let value = Arc::new(Mutex::new(0));
let c_value = value.clone();
observable::of(1)
.delay(Duration::from_millis(50))
.subscribe(move |v| {
*value.lock().unwrap() = v;
});
assert_eq!(*c_value.lock().unwrap(), 0);
std::thread::sleep(Duration::from_millis(51));
assert_eq!(*c_value.lock().unwrap(), 1);
}