another_rxrust/operators/
default_if_empty.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::sync::{Arc, RwLock};
4
5#[derive(Clone)]
6pub struct DefaultIfEmpty<Item>
7where
8 Item: Clone + Send + Sync,
9{
10 default: Item,
11}
12
13impl<'a, Item> DefaultIfEmpty<Item>
14where
15 Item: Clone + Send + Sync,
16{
17 pub fn new(default: Item) -> DefaultIfEmpty<Item> {
18 DefaultIfEmpty { default }
19 }
20 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
21 let default = self.default.clone();
22 let emitted = Arc::new(RwLock::new(false));
23
24 Observable::<Item>::create(move |s| {
25 let default_complete = default.clone();
26
27 let emitted_next = Arc::clone(&emitted);
28 let emitted_complete = Arc::clone(&emitted);
29
30 let sctl = StreamController::new(s);
31 let sctl_next = sctl.clone();
32 let sctl_error = sctl.clone();
33 let sctl_complete = sctl.clone();
34 source.inner_subscribe(sctl.new_observer(
35 move |_, x| {
36 *emitted_next.write().unwrap() = true;
37 sctl_next.sink_next(x);
38 },
39 move |_, e| {
40 sctl_error.sink_error(e);
41 },
42 move |serial| {
43 if !*emitted_complete.read().unwrap() {
44 sctl_complete.sink_next(default_complete.clone());
45 }
46 sctl_complete.sink_complete(&serial);
47 },
48 ));
49 })
50 }
51}
52
53impl<'a, Item> Observable<'a, Item>
54where
55 Item: Clone + Send + Sync,
56{
57 pub fn default_if_empty(&self, target: Item) -> Observable<'a, Item> {
58 DefaultIfEmpty::new(target).execute(self.clone())
59 }
60}
61
62#[cfg(test)]
63mod test {
64 use crate::prelude::*;
65
66 #[test]
67 fn basic() {
68 observables::empty().default_if_empty(5).subscribe(
69 |x| {
70 println!("{}", x);
71 assert_eq!(x, 5);
72 },
73 print_error!(),
74 print_complete!(),
75 );
76 }
77
78 #[test]
79 fn not_empty() {
80 observables::from_iter(0..10)
81 .default_if_empty(100)
82 .subscribe(
83 |x| {
84 println!("{}", x);
85 assert_ne!(x, 100);
86 },
87 print_error!(),
88 print_complete!(),
89 );
90 }
91
92 #[test]
93 fn error() {
94 observables::error(RxError::from_error("ERR!"))
95 .default_if_empty(5)
96 .subscribe(
97 |_| assert!(true, "don't come here"),
98 print_error_as!(&str),
99 print_complete!(),
100 );
101 }
102}