rx_rust/operators/conditional_boolean/
default_if_empty.rs

1use crate::utils::types::NecessarySend;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6};
7use educe::Educe;
8
9/// Emits a specified item if the source Observable completes without emitting any items.
10/// See <https://reactivex.io/documentation/operators/defaultifempty.html>
11///
12/// # Examples
13/// ```rust
14/// use rx_rust::{
15///     observable::observable_ext::ObservableExt,
16///     observer::Termination,
17///     operators::{
18///         conditional_boolean::default_if_empty::DefaultIfEmpty,
19///         creating::from_iter::FromIter,
20///     },
21/// };
22///
23/// let mut values = Vec::new();
24/// let mut terminations = Vec::new();
25///
26/// let observable = DefaultIfEmpty::new(FromIter::new(Vec::<i32>::new()), 42);
27/// observable.subscribe_with_callback(
28///     |value| values.push(value),
29///     |termination| terminations.push(termination),
30/// );
31///
32/// assert_eq!(values, vec![42]);
33/// assert_eq!(terminations, vec![Termination::Completed]);
34/// ```
35#[derive(Educe)]
36#[educe(Debug, Clone)]
37pub struct DefaultIfEmpty<T, OE> {
38    source: OE,
39    default_value: T,
40}
41
42impl<T, OE> DefaultIfEmpty<T, OE> {
43    pub fn new<'or, 'sub, E>(source: OE, item: T) -> Self
44    where
45        OE: Observable<'or, 'sub, T, E>,
46    {
47        Self {
48            source,
49            default_value: item,
50        }
51    }
52}
53
54impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for DefaultIfEmpty<T, OE>
55where
56    OE: Observable<'or, 'sub, T, E>,
57    T: NecessarySend + 'or,
58{
59    fn subscribe(self, observer: impl Observer<T, E> + NecessarySend + 'or) -> Subscription<'sub> {
60        let observer = DefaultIfEmptyObserver {
61            observer,
62            default_value: Some(self.default_value),
63        };
64        self.source.subscribe(observer)
65    }
66}
67
68struct DefaultIfEmptyObserver<T, OR> {
69    observer: OR,
70    default_value: Option<T>, // None means the source's got at least one value already.
71}
72
73impl<T, E, OR> Observer<T, E> for DefaultIfEmptyObserver<T, OR>
74where
75    OR: Observer<T, E>,
76{
77    fn on_next(&mut self, value: T) {
78        self.default_value = None;
79        self.observer.on_next(value);
80    }
81
82    fn on_termination(mut self, termination: Termination<E>) {
83        match termination {
84            Termination::Completed => {
85                if let Some(default_value) = self.default_value {
86                    self.observer.on_next(default_value);
87                }
88            }
89            Termination::Error(_) => {}
90        }
91        self.observer.on_termination(termination);
92    }
93}