another_rxrust/operators/
default_if_empty.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::sync::{Arc, RwLock};
4
5#[derive(Clone)]
6pub struct DefaultIfEmpty<Item>
7where
8  Item: Clone + Send + Sync,
9{
10  default: Item,
11}
12
13impl<'a, Item> DefaultIfEmpty<Item>
14where
15  Item: Clone + Send + Sync,
16{
17  pub fn new(default: Item) -> DefaultIfEmpty<Item> {
18    DefaultIfEmpty { default }
19  }
20  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
21    let default = self.default.clone();
22    let emitted = Arc::new(RwLock::new(false));
23
24    Observable::<Item>::create(move |s| {
25      let default_complete = default.clone();
26
27      let emitted_next = Arc::clone(&emitted);
28      let emitted_complete = Arc::clone(&emitted);
29
30      let sctl = StreamController::new(s);
31      let sctl_next = sctl.clone();
32      let sctl_error = sctl.clone();
33      let sctl_complete = sctl.clone();
34      source.inner_subscribe(sctl.new_observer(
35        move |_, x| {
36          *emitted_next.write().unwrap() = true;
37          sctl_next.sink_next(x);
38        },
39        move |_, e| {
40          sctl_error.sink_error(e);
41        },
42        move |serial| {
43          if !*emitted_complete.read().unwrap() {
44            sctl_complete.sink_next(default_complete.clone());
45          }
46          sctl_complete.sink_complete(&serial);
47        },
48      ));
49    })
50  }
51}
52
53impl<'a, Item> Observable<'a, Item>
54where
55  Item: Clone + Send + Sync,
56{
57  pub fn default_if_empty(&self, target: Item) -> Observable<'a, Item> {
58    DefaultIfEmpty::new(target).execute(self.clone())
59  }
60}
61
62#[cfg(test)]
63mod test {
64  use crate::prelude::*;
65
66  #[test]
67  fn basic() {
68    observables::empty().default_if_empty(5).subscribe(
69      |x| {
70        println!("{}", x);
71        assert_eq!(x, 5);
72      },
73      print_error!(),
74      print_complete!(),
75    );
76  }
77
78  #[test]
79  fn not_empty() {
80    observables::from_iter(0..10)
81      .default_if_empty(100)
82      .subscribe(
83        |x| {
84          println!("{}", x);
85          assert_ne!(x, 100);
86        },
87        print_error!(),
88        print_complete!(),
89      );
90  }
91
92  #[test]
93  fn error() {
94    observables::error(RxError::from_error("ERR!"))
95      .default_if_empty(5)
96      .subscribe(
97        |_| assert!(true, "don't come here"),
98        print_error_as!(&str),
99        print_complete!(),
100      );
101  }
102}