1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use crate::{
  observable::{Observable, ObservableExt},
  observer::Observer,
};

pub struct OnCompleteOp<S, F> {
  pub(crate) source: S,
  pub(crate) func: F,
}

impl<S, F, Item, Err, O> Observable<Item, Err, O> for OnCompleteOp<S, F>
where
  O: Observer<Item, Err>,
  S: Observable<Item, Err, OnCompleteObserver<O, F>>,
  F: FnOnce(),
{
  type Unsub = S::Unsub;

  fn actual_subscribe(self, observer: O) -> Self::Unsub {
    self
      .source
      .actual_subscribe(OnCompleteObserver { observer, func: self.func })
  }
}

impl<S, F, Item, Err> ObservableExt<Item, Err> for OnCompleteOp<S, F> where
  S: ObservableExt<Item, Err>
{
}

pub struct OnCompleteObserver<O, F> {
  observer: O,
  func: F,
}

impl<Item, Err, O, F> Observer<Item, Err> for OnCompleteObserver<O, F>
where
  O: Observer<Item, Err>,
  F: FnOnce(),
{
  #[inline]
  fn next(&mut self, value: Item) {
    self.observer.next(value)
  }

  #[inline]
  fn error(self, err: Err) {
    self.observer.error(err)
  }

  #[inline]
  fn complete(self) {
    (self.func)();
    self.observer.complete();
  }

  #[inline]
  fn is_finished(&self) -> bool {
    self.observer.is_finished()
  }
}