another_rxrust/operators/
timestamp.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{marker::PhantomData, time::SystemTime};
4
5#[derive(Clone)]
6pub struct Timestamp<Item>
7where
8  Item: Clone + Send + Sync,
9{
10  _item: PhantomData<Item>,
11}
12
13impl<'a, Item> Timestamp<Item>
14where
15  Item: Clone + Send + Sync,
16{
17  pub fn new() -> Timestamp<Item> {
18    Timestamp { _item: PhantomData }
19  }
20  pub fn execute(
21    &self,
22    source: Observable<'a, Item>,
23  ) -> Observable<'a, (SystemTime, Item)> {
24    Observable::create(move |s| {
25      let sctl = StreamController::new(s);
26      let sctl_next = sctl.clone();
27      let sctl_error = sctl.clone();
28      let sctl_complete = sctl.clone();
29
30      source.inner_subscribe(sctl.new_observer(
31        move |_, x| {
32          sctl_next.sink_next((SystemTime::now(), x));
33        },
34        move |_, e| {
35          sctl_error.sink_error(e);
36        },
37        move |serial| {
38          sctl_complete.sink_complete(&serial);
39        },
40      ));
41    })
42  }
43}
44
45impl<'a, Item> Observable<'a, Item>
46where
47  Item: Clone + Send + Sync,
48{
49  pub fn timestamp(&self) -> Observable<'a, (SystemTime, Item)> {
50    Timestamp::new().execute(self.clone())
51  }
52}
53
54#[cfg(all(test, not(feature = "web")))]
55mod test {
56  use crate::prelude::*;
57  use std::{thread, time};
58
59  #[test]
60  fn basic() {
61    observables::interval(
62      time::Duration::from_millis(100),
63      schedulers::new_thread_scheduler(),
64    )
65    .timestamp()
66    .take(10)
67    .subscribe(
68      print_next_fmt!("{:?}"),
69      print_error!(),
70      print_complete!(),
71    );
72    thread::sleep(time::Duration::from_millis(1500));
73  }
74}