rx_rust/operators/conditional_boolean/
default_if_empty.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6};
7use educe::Educe;
8
9#[derive(Educe)]
36#[educe(Debug, Clone)]
37pub struct DefaultIfEmpty<T, OE> {
38 source: OE,
39 default_value: T,
40}
41
42impl<T, OE> DefaultIfEmpty<T, OE> {
43 pub fn new<'or, 'sub, E>(source: OE, item: T) -> Self
44 where
45 OE: Observable<'or, 'sub, T, E>,
46 {
47 Self {
48 source,
49 default_value: item,
50 }
51 }
52}
53
54impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for DefaultIfEmpty<T, OE>
55where
56 OE: Observable<'or, 'sub, T, E>,
57 T: NecessarySendSync + 'or,
58{
59 fn subscribe(
60 self,
61 observer: impl Observer<T, E> + NecessarySendSync + 'or,
62 ) -> Subscription<'sub> {
63 let observer = DefaultIfEmptyObserver {
64 observer,
65 default_value: Some(self.default_value),
66 };
67 self.source.subscribe(observer)
68 }
69}
70
71struct DefaultIfEmptyObserver<T, OR> {
72 observer: OR,
73 default_value: Option<T>, }
75
76impl<T, E, OR> Observer<T, E> for DefaultIfEmptyObserver<T, OR>
77where
78 OR: Observer<T, E>,
79{
80 fn on_next(&mut self, value: T) {
81 self.default_value = None;
82 self.observer.on_next(value);
83 }
84
85 fn on_termination(mut self, termination: Termination<E>) {
86 match termination {
87 Termination::Completed => {
88 if let Some(default_value) = self.default_value {
89 self.observer.on_next(default_value);
90 }
91 }
92 Termination::Error(_) => {}
93 }
94 self.observer.on_termination(termination);
95 }
96}