rx_rust/operators/conditional_boolean/
default_if_empty.rs1use crate::utils::types::NecessarySend;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6};
7use educe::Educe;
8
9#[derive(Educe)]
36#[educe(Debug, Clone)]
37pub struct DefaultIfEmpty<T, OE> {
38 source: OE,
39 default_value: T,
40}
41
42impl<T, OE> DefaultIfEmpty<T, OE> {
43 pub fn new<'or, 'sub, E>(source: OE, item: T) -> Self
44 where
45 OE: Observable<'or, 'sub, T, E>,
46 {
47 Self {
48 source,
49 default_value: item,
50 }
51 }
52}
53
54impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for DefaultIfEmpty<T, OE>
55where
56 OE: Observable<'or, 'sub, T, E>,
57 T: NecessarySend + 'or,
58{
59 fn subscribe(self, observer: impl Observer<T, E> + NecessarySend + 'or) -> Subscription<'sub> {
60 let observer = DefaultIfEmptyObserver {
61 observer,
62 default_value: Some(self.default_value),
63 };
64 self.source.subscribe(observer)
65 }
66}
67
68struct DefaultIfEmptyObserver<T, OR> {
69 observer: OR,
70 default_value: Option<T>, }
72
73impl<T, E, OR> Observer<T, E> for DefaultIfEmptyObserver<T, OR>
74where
75 OR: Observer<T, E>,
76{
77 fn on_next(&mut self, value: T) {
78 self.default_value = None;
79 self.observer.on_next(value);
80 }
81
82 fn on_termination(mut self, termination: Termination<E>) {
83 match termination {
84 Termination::Completed => {
85 if let Some(default_value) = self.default_value {
86 self.observer.on_next(default_value);
87 }
88 }
89 Termination::Error(_) => {}
90 }
91 self.observer.on_termination(termination);
92 }
93}