rxrust/ops/
default_if_empty.rs

1use crate::prelude::*;
2use crate::{error_proxy_impl, is_stopped_proxy_impl};
3
4#[derive(Clone)]
5pub struct DefaultIfEmptyOp<S>
6where
7  S: Observable,
8{
9  pub(crate) source: S,
10  pub(crate) is_empty: bool,
11  pub(crate) default_value: S::Item,
12}
13
14#[doc(hidden)]
15macro_rules! observable_impl {
16    ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
17  fn actual_subscribe<O>(
18    self,
19    subscriber: Subscriber<O, $subscription>,
20  ) -> Self::Unsub
21  where O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf {
22    let subscriber = Subscriber {
23      observer: DefaultIfEmptyObserver {
24        observer: subscriber.observer,
25        is_empty: self.is_empty,
26        default_value: self.default_value,
27      },
28      subscription: subscriber.subscription,
29    };
30    self.source.actual_subscribe(subscriber)
31  }
32}
33}
34
35observable_proxy_impl!(DefaultIfEmptyOp, S);
36
37impl<'a, S> LocalObservable<'a> for DefaultIfEmptyOp<S>
38where
39  S: LocalObservable<'a>,
40  S::Item: Clone + 'a,
41{
42  type Unsub = S::Unsub;
43  observable_impl!(LocalSubscription, 'a);
44}
45
46impl<S> SharedObservable for DefaultIfEmptyOp<S>
47where
48  S: SharedObservable,
49  S::Item: Clone + Send + Sync + 'static,
50{
51  type Unsub = S::Unsub;
52  observable_impl!(SharedSubscription, Send + Sync + 'static);
53}
54
55pub struct DefaultIfEmptyObserver<O, Item> {
56  observer: O,
57  is_empty: bool,
58  default_value: Item,
59}
60
61impl<Item, Err, O> Observer for DefaultIfEmptyObserver<O, Item>
62where
63  O: Observer<Item = Item, Err = Err>,
64  Item: Clone,
65{
66  type Item = Item;
67  type Err = Err;
68  fn next(&mut self, value: Item) {
69    self.observer.next(value);
70    if self.is_empty {
71      self.is_empty = false;
72    }
73  }
74
75  fn complete(&mut self) {
76    if self.is_empty {
77      self.observer.next(self.default_value.clone());
78    }
79    self.observer.complete()
80  }
81
82  error_proxy_impl!(Err, observer);
83
84  is_stopped_proxy_impl!(observer);
85}
86
87#[cfg(test)]
88mod test {
89  use crate::prelude::*;
90  use bencher::Bencher;
91
92  #[test]
93  fn base_function() {
94    let mut completed = false;
95    let mut value = 0;
96
97    observable::of(10)
98      .default_if_empty(5)
99      .subscribe_complete(|v| value = v, || completed = true);
100
101    assert_eq!(value, 10);
102    assert!(completed);
103  }
104
105  #[test]
106  fn base_empty_function() {
107    let mut completed = false;
108    let mut value = 0;
109
110    observable::empty()
111      .default_if_empty(5)
112      .subscribe_complete(|v| value = v, || completed = true);
113
114    assert_eq!(value, 5);
115    assert!(completed);
116  }
117
118  #[test]
119  fn into_shared() {
120    observable::from_iter(0..100)
121      .default_if_empty(5)
122      .into_shared()
123      .subscribe(|_| {});
124  }
125
126  #[test]
127  fn ininto_shared_empty() {
128    observable::empty()
129      .default_if_empty(5)
130      .into_shared()
131      .subscribe(|_| {});
132  }
133
134  #[test]
135  fn bench_base() { bench_b(); }
136
137  benchmark_group!(bench_b, bench_base_funciton);
138
139  fn bench_base_funciton(b: &mut Bencher) { b.iter(base_function); }
140
141  #[test]
142  fn bench_empty() { bench_e(); }
143
144  benchmark_group!(bench_e, bench_empty_funciton);
145
146  fn bench_empty_funciton(b: &mut Bencher) { b.iter(base_empty_function); }
147}