another_rxrust/operators/
timestamp.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{marker::PhantomData, time::SystemTime};
4
5#[derive(Clone)]
6pub struct Timestamp<Item>
7where
8 Item: Clone + Send + Sync,
9{
10 _item: PhantomData<Item>,
11}
12
13impl<'a, Item> Timestamp<Item>
14where
15 Item: Clone + Send + Sync,
16{
17 pub fn new() -> Timestamp<Item> {
18 Timestamp { _item: PhantomData }
19 }
20 pub fn execute(
21 &self,
22 source: Observable<'a, Item>,
23 ) -> Observable<'a, (SystemTime, Item)> {
24 Observable::create(move |s| {
25 let sctl = StreamController::new(s);
26 let sctl_next = sctl.clone();
27 let sctl_error = sctl.clone();
28 let sctl_complete = sctl.clone();
29
30 source.inner_subscribe(sctl.new_observer(
31 move |_, x| {
32 sctl_next.sink_next((SystemTime::now(), x));
33 },
34 move |_, e| {
35 sctl_error.sink_error(e);
36 },
37 move |serial| {
38 sctl_complete.sink_complete(&serial);
39 },
40 ));
41 })
42 }
43}
44
45impl<'a, Item> Observable<'a, Item>
46where
47 Item: Clone + Send + Sync,
48{
49 pub fn timestamp(&self) -> Observable<'a, (SystemTime, Item)> {
50 Timestamp::new().execute(self.clone())
51 }
52}
53
54#[cfg(all(test, not(feature = "web")))]
55mod test {
56 use crate::prelude::*;
57 use std::{thread, time};
58
59 #[test]
60 fn basic() {
61 observables::interval(
62 time::Duration::from_millis(100),
63 schedulers::new_thread_scheduler(),
64 )
65 .timestamp()
66 .take(10)
67 .subscribe(
68 print_next_fmt!("{:?}"),
69 print_error!(),
70 print_complete!(),
71 );
72 thread::sleep(time::Duration::from_millis(1500));
73 }
74}