another_rxrust/observables/
interval.rs

1use crate::prelude::*;
2use scheduler::IScheduler;
3use std::{thread, time::Duration};
4
5pub fn interval<'a, Scheduler, SchedulerCreator>(
6  dur: Duration,
7  scheduler_ctor: SchedulerCreator,
8) -> Observable<'a, u64>
9where
10  Scheduler: IScheduler<'a> + Clone + Send + Sync + 'a,
11  SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
12{
13  Observable::create(move |s| {
14    let scheduler = scheduler_ctor();
15    let scheduler_in_post = scheduler.clone();
16    scheduler.post(move || {
17      let mut n = 0;
18      loop {
19        thread::sleep(dur);
20        if !s.is_subscribed() {
21          break;
22        }
23        s.next(n);
24        n += 1;
25      }
26      scheduler_in_post.abort();
27    })
28  })
29}
30
31#[cfg(all(test, not(feature = "web")))]
32mod test {
33  use crate::prelude::*;
34  use std::{thread, time};
35
36  #[test]
37  fn basic() {
38    observables::interval(
39      time::Duration::from_millis(100),
40      schedulers::default_scheduler(),
41    )
42    .take(5)
43    .subscribe(
44      print_next_fmt!("{}"),
45      print_error!(),
46      print_complete!(),
47    );
48    println!("marker");
49  }
50
51  #[test]
52  fn thread() {
53    observables::interval(
54      time::Duration::from_millis(100),
55      schedulers::new_thread_scheduler(),
56    )
57    .take(5)
58    .subscribe(
59      print_next_fmt!("{}"),
60      print_error!(),
61      print_complete!(),
62    );
63    println!("marker");
64    thread::sleep(time::Duration::from_millis(1500));
65  }
66}