another_rxrust/operators/
time_interval.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4 marker::PhantomData,
5 sync::{Arc, RwLock},
6 time::{Duration, Instant},
7};
8
9#[derive(Clone)]
10pub struct TimeInterval<Item>
11where
12 Item: Clone + Send + Sync,
13{
14 _item: PhantomData<Item>,
15}
16
17impl<'a, Item> TimeInterval<Item>
18where
19 Item: Clone + Send + Sync,
20{
21 pub fn new() -> TimeInterval<Item> {
22 TimeInterval { _item: PhantomData }
23 }
24 pub fn execute(
25 &self,
26 source: Observable<'a, Item>,
27 ) -> Observable<'a, Duration> {
28 Observable::create(move |s| {
29 let start_time = Arc::new(RwLock::new(None::<Instant>));
30 let start_time_next = Arc::clone(&start_time);
31 let start_time_complete = Arc::clone(&start_time);
32
33 let sctl = StreamController::new(s);
34 let sctl_next = sctl.clone();
35 let sctl_error = sctl.clone();
36 let sctl_complete = sctl.clone();
37
38 source.inner_subscribe(sctl.new_observer(
39 move |_, _| {
40 if let Some(start_time) = *start_time_next.read().unwrap() {
41 sctl_next.sink_next(start_time.elapsed());
42 }
43 *start_time_next.write().unwrap() = Some(Instant::now());
44 },
45 move |_, e| {
46 sctl_error.sink_error(e);
47 },
48 move |serial| {
49 if let Some(start_time) = *start_time_complete.read().unwrap() {
50 sctl_complete.sink_next(start_time.elapsed());
51 }
52 sctl_complete.sink_complete(&serial);
53 },
54 ));
55 })
56 }
57}
58
59impl<'a, Item> Observable<'a, Item>
60where
61 Item: Clone + Send + Sync,
62{
63 pub fn time_interval(&self) -> Observable<'a, Duration> {
64 TimeInterval::new().execute(self.clone())
65 }
66}
67
68#[cfg(all(test, not(feature = "web")))]
69mod test {
70 use crate::prelude::*;
71 use std::{thread, time};
72
73 #[test]
74 fn basic() {
75 observables::interval(
76 time::Duration::from_millis(100),
77 schedulers::new_thread_scheduler(),
78 )
79 .time_interval()
80 .take(10)
81 .subscribe(
82 print_next_fmt!("{:?}"),
83 print_error!(),
84 print_complete!(),
85 );
86 thread::sleep(time::Duration::from_millis(1500));
87 }
88}