another_rxrust/operators/
tap.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct Tap<'a, Item>
6where
7 Item: Clone + Send + Sync,
8{
9 tap_observer: Observer<'a, Item>,
10}
11
12impl<'a, Item> Tap<'a, Item>
13where
14 Item: Clone + Send + Sync,
15{
16 pub fn new<Next, Error, Complete>(
17 next: Next,
18 error: Error,
19 complete: Complete,
20 ) -> Tap<'a, Item>
21 where
22 Next: Fn(Item) + Send + Sync + 'a,
23 Error: Fn(RxError) + Send + Sync + 'a,
24 Complete: Fn() + Send + Sync + 'a,
25 {
26 Tap {
27 tap_observer: Observer::new(next, error, complete),
28 }
29 }
30
31 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
32 let tap_observer = self.tap_observer.clone();
33 Observable::create(move |s| {
34 let sctl = StreamController::new(s);
35 let source_next = source.clone();
36
37 let sctl_next = sctl.clone();
38 let sctl_error = sctl.clone();
39 let sctl_complete = sctl.clone();
40
41 let tap_observer_next = tap_observer.clone();
42 let tap_observer_error = tap_observer.clone();
43 let tap_observer_complete = tap_observer.clone();
44 source_next.inner_subscribe(sctl.new_observer(
45 move |_, x: Item| {
46 tap_observer_next.next(x.clone());
47 sctl_next.sink_next(x);
48 },
49 move |_, e| {
50 tap_observer_error.error(e.clone());
51 sctl_error.sink_error(e);
52 },
53 move |serial| {
54 tap_observer_complete.complete();
55 sctl_complete.sink_complete(&serial)
56 },
57 ));
58 })
59 }
60}
61
62impl<'a, Item> Observable<'a, Item>
63where
64 Item: Clone + Send + Sync,
65{
66 pub fn tap<Next, Error, Complete>(
67 &self,
68 next: Next,
69 error: Error,
70 complete: Complete,
71 ) -> Observable<'a, Item>
72 where
73 Next: Fn(Item) + Send + Sync + 'a,
74 Error: Fn(RxError) + Send + Sync + 'a,
75 Complete: Fn() + Send + Sync + 'a,
76 {
77 Tap::new(next, error, complete).execute(self.clone())
78 }
79}
80
81#[cfg(test)]
82mod test {
83 use crate::prelude::*;
84
85 #[test]
86 fn basic() {
87 let o = Observable::create(|s| {
88 for n in 0..5 {
89 s.next(n);
90 }
91 s.complete();
92 });
93
94 o.tap(
95 |x| println!("tap next {}", x),
96 |e| println!("tap error {:?}", e),
97 || println!("tap complete"),
98 )
99 .map(|x| x + 100)
100 .subscribe(
101 print_next_fmt!("{}"),
102 print_error!(),
103 print_complete!(),
104 );
105 }
106
107 #[test]
108 fn error() {
109 let o = Observable::create(|s| {
110 for n in 0..5 {
111 s.next(n);
112 }
113 s.error(RxError::from_error("ERR!"));
114 });
115
116 o.tap(
117 |x| println!("tap next {}", x),
118 |e| println!("tap error {:?}", e),
119 || println!("tap complete"),
120 )
121 .map(|x| x + 100)
122 .subscribe(
123 print_next_fmt!("{}"),
124 print_error_as!(&str),
125 print_complete!(),
126 );
127 }
128}