use std::convert::Infallible;
use crate::context::Context;
use crate::{
observable::{CoreObservable, ObservableType},
observer::Observer,
scheduler::{Duration, Scheduler, Task, TaskState},
};
pub struct Timer<S> {
pub delay: Duration,
pub scheduler: S,
}
fn timer_task<O, Err>((observer, _): &mut (Option<O>, ())) -> TaskState
where
O: Observer<(), Err>,
{
if let Some(mut observer) = observer.take() {
observer.next(());
observer.complete();
}
TaskState::Finished
}
impl<S> ObservableType for Timer<S> {
type Item<'a>
= ()
where
Self: 'a;
type Err = Infallible;
}
impl<S, C> CoreObservable<C> for Timer<S>
where
C: Context,
C::Inner: Observer<(), Infallible>,
S: Scheduler<Task<(Option<C::Inner>, ())>> + Clone,
{
type Unsub = crate::scheduler::TaskHandle;
fn subscribe(self, context: C) -> Self::Unsub {
let observer = context.into_inner();
let task = Task::new((Some(observer), ()), timer_task);
self.scheduler.schedule(task, Some(self.delay))
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use crate::prelude::*;
#[rxrust_macro::test(local)]
async fn test_timer_basic() {
let value = Arc::new(Mutex::new(None));
let value_c = value.clone();
let completed = Arc::new(Mutex::new(false));
let completed_c = completed.clone();
Local::timer(Duration::from_millis(10))
.on_complete(move || *completed_c.lock().unwrap() = true)
.subscribe(move |v| *value_c.lock().unwrap() = Some(v));
LocalScheduler
.sleep(Duration::from_millis(20))
.await;
assert_eq!(*value.lock().unwrap(), Some(()));
assert!(*completed.lock().unwrap());
}
#[rxrust_macro::test]
async fn test_timer_shared() {
let value = Arc::new(Mutex::new(None));
let value_c = value.clone();
let completed = Arc::new(Mutex::new(false));
let completed_c = completed.clone();
let handle = Shared::timer(Duration::from_millis(10))
.on_complete(move || *completed_c.lock().unwrap() = true)
.subscribe(move |v| *value_c.lock().unwrap() = Some(v));
handle.await;
assert_eq!(*value.lock().unwrap(), Some(()));
assert!(*completed.lock().unwrap());
}
#[rxrust_macro::test(local)]
async fn test_timer_duration() {
let start = Instant::now();
let value = Arc::new(Mutex::new(None));
let value_c = value.clone();
Local::timer(Duration::from_millis(50)).subscribe(move |v| *value_c.lock().unwrap() = Some(v));
LocalScheduler
.sleep(Duration::from_millis(60))
.await;
assert!(start.elapsed() >= Duration::from_millis(50));
assert_eq!(*value.lock().unwrap(), Some(()));
}
#[rxrust_macro::test(local)]
async fn test_timer_emit_value() {
let value = Arc::new(Mutex::new(None));
let value_c = value.clone();
Local::timer(Duration::from_millis(10))
.map(|_| 123)
.subscribe(move |v| *value_c.lock().unwrap() = Some(v));
LocalScheduler
.sleep(Duration::from_millis(20))
.await;
assert_eq!(*value.lock().unwrap(), Some(123));
}
#[rxrust_macro::test(local)]
async fn test_timer_at_basic() {
let value = Arc::new(Mutex::new(None));
let value_c = value.clone();
let now = Instant::now();
let target = now + Duration::from_millis(10);
Local::timer_at(target).subscribe(move |v| *value_c.lock().unwrap() = Some(v));
LocalScheduler
.sleep(Duration::from_millis(20))
.await;
assert_eq!(*value.lock().unwrap(), Some(()));
assert!(Instant::now() >= target);
}
#[rxrust_macro::test(local)]
async fn test_timer_at_emit_value() {
let value = Arc::new(Mutex::new(None));
let value_c = value.clone();
let target = Instant::now() + Duration::from_millis(10);
Local::timer_at(target)
.map(|_| "hello")
.subscribe(move |v| *value_c.lock().unwrap() = Some(v));
LocalScheduler
.sleep(Duration::from_millis(20))
.await;
assert_eq!(*value.lock().unwrap(), Some("hello"));
}
#[rxrust_macro::test(local)]
async fn test_timer_at_past_time() {
let value = Arc::new(Mutex::new(None));
let value_c = value.clone();
let completed = Arc::new(Mutex::new(false));
let completed_c = completed.clone();
let target = Instant::now();
Local::timer_at(target)
.on_complete(move || *completed_c.lock().unwrap() = true)
.subscribe(move |v| *value_c.lock().unwrap() = Some(v));
LocalScheduler
.sleep(Duration::from_millis(5))
.await;
assert_eq!(*value.lock().unwrap(), Some(()));
assert!(*completed.lock().unwrap());
}
}