another_rxrust/observables/
interval.rs1use crate::prelude::*;
2use scheduler::IScheduler;
3use std::{thread, time::Duration};
4
5pub fn interval<'a, Scheduler, SchedulerCreator>(
6 dur: Duration,
7 scheduler_ctor: SchedulerCreator,
8) -> Observable<'a, u64>
9where
10 Scheduler: IScheduler<'a> + Clone + Send + Sync + 'a,
11 SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
12{
13 Observable::create(move |s| {
14 let scheduler = scheduler_ctor();
15 let scheduler_in_post = scheduler.clone();
16 scheduler.post(move || {
17 let mut n = 0;
18 loop {
19 thread::sleep(dur);
20 if !s.is_subscribed() {
21 break;
22 }
23 s.next(n);
24 n += 1;
25 }
26 scheduler_in_post.abort();
27 })
28 })
29}
30
31#[cfg(all(test, not(feature = "web")))]
32mod test {
33 use crate::prelude::*;
34 use std::{thread, time};
35
36 #[test]
37 fn basic() {
38 observables::interval(
39 time::Duration::from_millis(100),
40 schedulers::default_scheduler(),
41 )
42 .take(5)
43 .subscribe(
44 print_next_fmt!("{}"),
45 print_error!(),
46 print_complete!(),
47 );
48 println!("marker");
49 }
50
51 #[test]
52 fn thread() {
53 observables::interval(
54 time::Duration::from_millis(100),
55 schedulers::new_thread_scheduler(),
56 )
57 .take(5)
58 .subscribe(
59 print_next_fmt!("{}"),
60 print_error!(),
61 print_complete!(),
62 );
63 println!("marker");
64 thread::sleep(time::Duration::from_millis(1500));
65 }
66}