1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
use crate::prelude::*; use observable::observable_proxy_impl; use std::time::Duration; #[derive(Clone)] pub struct DelayOp<S> { pub(crate) source: S, pub(crate) delay: Duration, } observable_proxy_impl!(DelayOp, S); impl<S> SharedObservable for DelayOp<S> where S: SharedObservable + Send + Sync + 'static, S::Unsub: Send + Sync, { type Unsub = SharedSubscription; fn actual_subscribe< O: Observer<Self::Item, Self::Err> + Sync + Send + 'static, >( self, subscriber: Subscriber<O, SharedSubscription>, ) -> Self::Unsub { let Self { delay, source } = self; Schedulers::ThreadPool.schedule( move |mut subscription, _| { subscription.add(source.actual_subscribe(subscriber)); }, Some(delay), (), ) } } #[test] fn smoke() { use std::sync::{Arc, Mutex}; let value = Arc::new(Mutex::new(0)); let c_value = value.clone(); observable::of(1) .delay(Duration::from_millis(50)) .to_shared() .subscribe(move |v| { *value.lock().unwrap() = v; }); assert_eq!(*c_value.lock().unwrap(), 0); std::thread::sleep(Duration::from_millis(60)); assert_eq!(*c_value.lock().unwrap(), 1); }