1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use crate::internals::stream_controller::*;
use crate::prelude::*;

#[derive(Clone)]
pub struct Tap<'a, Item>
where
  Item: Clone + Send + Sync,
{
  tap_observer: Observer<'a, Item>,
}

impl<'a, Item> Tap<'a, Item>
where
  Item: Clone + Send + Sync,
{
  pub fn new<Next, Error, Complete>(
    next: Next,
    error: Error,
    complete: Complete,
  ) -> Tap<'a, Item>
  where
    Next: Fn(Item) + Send + Sync + 'a,
    Error: Fn(RxError) + Send + Sync + 'a,
    Complete: Fn() + Send + Sync + 'a,
  {
    Tap {
      tap_observer: Observer::new(next, error, complete),
    }
  }

  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
    let tap_observer = self.tap_observer.clone();
    Observable::create(move |s| {
      let sctl = StreamController::new(s);
      let source_next = source.clone();

      let sctl_next = sctl.clone();
      let sctl_error = sctl.clone();
      let sctl_complete = sctl.clone();

      let tap_observer_next = tap_observer.clone();
      let tap_observer_error = tap_observer.clone();
      let tap_observer_complete = tap_observer.clone();
      source_next.inner_subscribe(sctl.new_observer(
        move |_, x: Item| {
          tap_observer_next.next(x.clone());
          sctl_next.sink_next(x);
        },
        move |_, e| {
          tap_observer_error.error(e.clone());
          sctl_error.sink_error(e);
        },
        move |serial| {
          tap_observer_complete.complete();
          sctl_complete.sink_complete(&serial)
        },
      ));
    })
  }
}

impl<'a, Item> Observable<'a, Item>
where
  Item: Clone + Send + Sync,
{
  pub fn tap<Next, Error, Complete>(
    &self,
    next: Next,
    error: Error,
    complete: Complete,
  ) -> Observable<'a, Item>
  where
    Next: Fn(Item) + Send + Sync + 'a,
    Error: Fn(RxError) + Send + Sync + 'a,
    Complete: Fn() + Send + Sync + 'a,
  {
    Tap::new(next, error, complete).execute(self.clone())
  }
}

#[cfg(test)]
mod test {
  use crate::prelude::*;

  #[test]
  fn basic() {
    let o = Observable::create(|s| {
      for n in 0..5 {
        s.next(n);
      }
      s.complete();
    });

    o.tap(
      |x| println!("tap next {}", x),
      |e| println!("tap error {:?}", e),
      || println!("tap complete"),
    )
    .map(|x| x + 100)
    .subscribe(
      print_next_fmt!("{}"),
      print_error!(),
      print_complete!(),
    );
  }

  #[test]
  fn error() {
    let o = Observable::create(|s| {
      for n in 0..5 {
        s.next(n);
      }
      s.error(RxError::from_error("ERR!"));
    });

    o.tap(
      |x| println!("tap next {}", x),
      |e| println!("tap error {:?}", e),
      || println!("tap complete"),
    )
    .map(|x| x + 100)
    .subscribe(
      print_next_fmt!("{}"),
      print_error_as!(&str),
      print_complete!(),
    );
  }
}