rx_rust/operators/conditional_boolean/
skip_until.rs1use crate::safe_lock_option_observer;
2use crate::utils::types::{Mutable, NecessarySendSync, Shared};
3use crate::utils::unsub_after_termination::subscribe_unsub_after_termination;
4use crate::{
5 disposable::subscription::Subscription,
6 observable::Observable,
7 observer::{Observer, Termination},
8 utils::types::MarkerType,
9};
10use educe::Educe;
11use std::marker::PhantomData;
12use std::sync::atomic::{AtomicBool, Ordering};
13
14#[derive(Educe)]
56#[educe(Debug, Clone)]
57pub struct SkipUntil<OE, OE1> {
58 source: OE,
59 start: OE1,
60}
61
62impl<OE, OE1> SkipUntil<OE, OE1> {
63 pub fn new<'or, 'sub, T, E>(source: OE, start: OE1) -> Self
64 where
65 OE: Observable<'or, 'sub, T, E>,
66 OE1: Observable<'or, 'sub, (), E>,
67 {
68 Self { source, start }
69 }
70}
71
72impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for SkipUntil<OE, OE1>
73where
74 T: 'or,
75 OE: Observable<'or, 'sub, T, E>,
76 OE1: Observable<'or, 'sub, (), E>,
77 'sub: 'or,
78{
79 fn subscribe(
80 self,
81 observer: impl Observer<T, E> + NecessarySendSync + 'or,
82 ) -> Subscription<'sub> {
83 subscribe_unsub_after_termination(observer, |observer| {
84 let observer = Shared::new(Mutable::new(Some(observer)));
85 let started = Shared::new(AtomicBool::new(false));
86 let start_observer = StartObserver {
87 observer: observer.clone(),
88 started: started.clone(),
89 _marker: PhantomData,
90 };
91 let subscription_1 = self.start.subscribe(start_observer);
92 let observer = SkipUntilObserver { observer, started };
93 let subscription_2 = self.source.subscribe(observer);
94 subscription_1 + subscription_2
95 })
96 }
97}
98
99struct SkipUntilObserver<OR> {
100 observer: Shared<Mutable<Option<OR>>>,
101 started: Shared<AtomicBool>,
102}
103
104impl<T, E, OR> Observer<T, E> for SkipUntilObserver<OR>
105where
106 OR: Observer<T, E>,
107{
108 fn on_next(&mut self, value: T) {
109 if self.started.load(Ordering::SeqCst) {
110 safe_lock_option_observer!(on_next: self.observer, value);
111 }
112 }
113
114 fn on_termination(self, termination: Termination<E>) {
115 safe_lock_option_observer!(on_termination: self.observer, termination);
116 }
117}
118
119struct StartObserver<T, OR> {
120 observer: Shared<Mutable<Option<OR>>>,
121 started: Shared<AtomicBool>,
122 _marker: MarkerType<T>,
123}
124
125impl<T, E, OR> Observer<(), E> for StartObserver<T, OR>
126where
127 OR: Observer<T, E>,
128{
129 fn on_next(&mut self, _: ()) {
130 self.started.store(true, Ordering::SeqCst);
131 }
132
133 fn on_termination(self, termination: Termination<E>) {
134 match termination {
135 Termination::Completed => {}
136 Termination::Error(_) => {
137 safe_lock_option_observer!(on_termination: self.observer, termination);
138 }
139 }
140 }
141}