rx_rust/operators/conditional_boolean/
default_if_empty.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6};
7use educe::Educe;
8
9/// Emits a specified item if the source Observable completes without emitting any items.
10/// See <https://reactivex.io/documentation/operators/defaultifempty.html>
11///
12/// # Examples
13/// ```rust
14/// use rx_rust::{
15///     observable::observable_ext::ObservableExt,
16///     observer::Termination,
17///     operators::{
18///         conditional_boolean::default_if_empty::DefaultIfEmpty,
19///         creating::from_iter::FromIter,
20///     },
21/// };
22///
23/// let mut values = Vec::new();
24/// let mut terminations = Vec::new();
25///
26/// let observable = DefaultIfEmpty::new(FromIter::new(Vec::<i32>::new()), 42);
27/// observable.subscribe_with_callback(
28///     |value| values.push(value),
29///     |termination| terminations.push(termination),
30/// );
31///
32/// assert_eq!(values, vec![42]);
33/// assert_eq!(terminations, vec![Termination::Completed]);
34/// ```
35#[derive(Educe)]
36#[educe(Debug, Clone)]
37pub struct DefaultIfEmpty<T, OE> {
38    source: OE,
39    default_value: T,
40}
41
42impl<T, OE> DefaultIfEmpty<T, OE> {
43    pub fn new<'or, 'sub, E>(source: OE, item: T) -> Self
44    where
45        OE: Observable<'or, 'sub, T, E>,
46    {
47        Self {
48            source,
49            default_value: item,
50        }
51    }
52}
53
54impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for DefaultIfEmpty<T, OE>
55where
56    OE: Observable<'or, 'sub, T, E>,
57    T: NecessarySendSync + 'or,
58{
59    fn subscribe(
60        self,
61        observer: impl Observer<T, E> + NecessarySendSync + 'or,
62    ) -> Subscription<'sub> {
63        let observer = DefaultIfEmptyObserver {
64            observer,
65            default_value: Some(self.default_value),
66        };
67        self.source.subscribe(observer)
68    }
69}
70
71struct DefaultIfEmptyObserver<T, OR> {
72    observer: OR,
73    default_value: Option<T>, // None means the source's got at least one value already.
74}
75
76impl<T, E, OR> Observer<T, E> for DefaultIfEmptyObserver<T, OR>
77where
78    OR: Observer<T, E>,
79{
80    fn on_next(&mut self, value: T) {
81        self.default_value = None;
82        self.observer.on_next(value);
83    }
84
85    fn on_termination(mut self, termination: Termination<E>) {
86        match termination {
87            Termination::Completed => {
88                if let Some(default_value) = self.default_value {
89                    self.observer.on_next(default_value);
90                }
91            }
92            Termination::Error(_) => {}
93        }
94        self.observer.on_termination(termination);
95    }
96}