rx_rust/operators/conditional_boolean/
skip_until.rs1use crate::safe_lock_option_observer;
2use crate::utils::types::{Mutable, NecessarySendSync, Shared};
3use crate::utils::unsub_after_termination::subscribe_unsub_after_termination;
4use crate::{
5 disposable::subscription::Subscription,
6 observable::Observable,
7 observer::{Observer, Termination},
8 utils::types::MarkerType,
9};
10use educe::Educe;
11use std::marker::PhantomData;
12use std::sync::atomic::{AtomicBool, Ordering};
13
14#[derive(Educe)]
56#[educe(Debug, Clone)]
57pub struct SkipUntil<OE, OE1> {
58 source: OE,
59 start: OE1,
60}
61
62impl<OE, OE1> SkipUntil<OE, OE1> {
63 pub fn new<'or, 'sub, T, E>(source: OE, start: OE1) -> Self
64 where
65 OE: Observable<'or, 'sub, T, E>,
66 OE1: Observable<'or, 'sub, (), E>,
67 {
68 Self { source, start }
69 }
70}
71
72impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for SkipUntil<OE, OE1>
73where
74 T: 'or,
75 OE: Observable<'or, 'sub, T, E>,
76 OE1: Observable<'or, 'sub, (), E>,
77 'sub: 'or,
78{
79 fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
80 subscribe_unsub_after_termination(observer, |observer| {
81 let observer = Shared::new(Mutable::new(Some(observer)));
82 let started = Shared::new(AtomicBool::new(false));
83 let start_observer = StartObserver {
84 observer: observer.clone(),
85 started: started.clone(),
86 _marker: PhantomData,
87 };
88 let subscription_1 = self.start.subscribe(start_observer);
89 let observer = SkipUntilObserver { observer, started };
90 let subscription_2 = self.source.subscribe(observer);
91 subscription_1 + subscription_2
92 })
93 }
94}
95
96struct SkipUntilObserver<OR> {
97 observer: Shared<Mutable<Option<OR>>>,
98 started: Shared<AtomicBool>,
99}
100
101impl<T, E, OR> Observer<T, E> for SkipUntilObserver<OR>
102where
103 OR: Observer<T, E>,
104{
105 fn on_next(&mut self, value: T) {
106 if self.started.load(Ordering::SeqCst) {
107 safe_lock_option_observer!(on_next: self.observer, value);
108 }
109 }
110
111 fn on_termination(self, termination: Termination<E>) {
112 safe_lock_option_observer!(on_termination: self.observer, termination);
113 }
114}
115
116struct StartObserver<T, OR> {
117 observer: Shared<Mutable<Option<OR>>>,
118 started: Shared<AtomicBool>,
119 _marker: MarkerType<T>,
120}
121
122impl<T, E, OR> Observer<(), E> for StartObserver<T, OR>
123where
124 OR: Observer<T, E>,
125{
126 fn on_next(&mut self, _: ()) {
127 self.started.store(true, Ordering::SeqCst);
128 }
129
130 fn on_termination(self, termination: Termination<E>) {
131 match termination {
132 Termination::Completed => {}
133 Termination::Error(_) => {
134 safe_lock_option_observer!(on_termination: self.observer, termination);
135 }
136 }
137 }
138}