another_rxrust/operators/
tap.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct Tap<'a, Item>
6where
7  Item: Clone + Send + Sync,
8{
9  tap_observer: Observer<'a, Item>,
10}
11
12impl<'a, Item> Tap<'a, Item>
13where
14  Item: Clone + Send + Sync,
15{
16  pub fn new<Next, Error, Complete>(
17    next: Next,
18    error: Error,
19    complete: Complete,
20  ) -> Tap<'a, Item>
21  where
22    Next: Fn(Item) + Send + Sync + 'a,
23    Error: Fn(RxError) + Send + Sync + 'a,
24    Complete: Fn() + Send + Sync + 'a,
25  {
26    Tap {
27      tap_observer: Observer::new(next, error, complete),
28    }
29  }
30
31  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
32    let tap_observer = self.tap_observer.clone();
33    Observable::create(move |s| {
34      let sctl = StreamController::new(s);
35      let source_next = source.clone();
36
37      let sctl_next = sctl.clone();
38      let sctl_error = sctl.clone();
39      let sctl_complete = sctl.clone();
40
41      let tap_observer_next = tap_observer.clone();
42      let tap_observer_error = tap_observer.clone();
43      let tap_observer_complete = tap_observer.clone();
44      source_next.inner_subscribe(sctl.new_observer(
45        move |_, x: Item| {
46          tap_observer_next.next(x.clone());
47          sctl_next.sink_next(x);
48        },
49        move |_, e| {
50          tap_observer_error.error(e.clone());
51          sctl_error.sink_error(e);
52        },
53        move |serial| {
54          tap_observer_complete.complete();
55          sctl_complete.sink_complete(&serial)
56        },
57      ));
58    })
59  }
60}
61
62impl<'a, Item> Observable<'a, Item>
63where
64  Item: Clone + Send + Sync,
65{
66  pub fn tap<Next, Error, Complete>(
67    &self,
68    next: Next,
69    error: Error,
70    complete: Complete,
71  ) -> Observable<'a, Item>
72  where
73    Next: Fn(Item) + Send + Sync + 'a,
74    Error: Fn(RxError) + Send + Sync + 'a,
75    Complete: Fn() + Send + Sync + 'a,
76  {
77    Tap::new(next, error, complete).execute(self.clone())
78  }
79}
80
81#[cfg(test)]
82mod test {
83  use crate::prelude::*;
84
85  #[test]
86  fn basic() {
87    let o = Observable::create(|s| {
88      for n in 0..5 {
89        s.next(n);
90      }
91      s.complete();
92    });
93
94    o.tap(
95      |x| println!("tap next {}", x),
96      |e| println!("tap error {:?}", e),
97      || println!("tap complete"),
98    )
99    .map(|x| x + 100)
100    .subscribe(
101      print_next_fmt!("{}"),
102      print_error!(),
103      print_complete!(),
104    );
105  }
106
107  #[test]
108  fn error() {
109    let o = Observable::create(|s| {
110      for n in 0..5 {
111        s.next(n);
112      }
113      s.error(RxError::from_error("ERR!"));
114    });
115
116    o.tap(
117      |x| println!("tap next {}", x),
118      |e| println!("tap error {:?}", e),
119      || println!("tap complete"),
120    )
121    .map(|x| x + 100)
122    .subscribe(
123      print_next_fmt!("{}"),
124      print_error_as!(&str),
125      print_complete!(),
126    );
127  }
128}