rx_rust/operators/conditional_boolean/
take_until.rs1use crate::safe_lock_option_observer;
2use crate::utils::types::{Mutable, NecessarySendSync, Shared};
3use crate::{
4 disposable::subscription::Subscription,
5 observable::Observable,
6 observer::{Observer, Termination},
7 utils::{types::MarkerType, unsub_after_termination::subscribe_unsub_after_termination},
8};
9use educe::Educe;
10use std::marker::PhantomData;
11
12#[derive(Educe)]
54#[educe(Debug, Clone)]
55pub struct TakeUntil<OE, OE1> {
56 source: OE,
57 stop: OE1,
58}
59
60impl<OE, OE1> TakeUntil<OE, OE1> {
61 pub fn new<'or, 'sub, T, E>(source: OE, stop: OE1) -> Self
62 where
63 OE: Observable<'or, 'sub, T, E>,
64 OE1: Observable<'or, 'sub, (), E>,
65 {
66 Self { source, stop }
67 }
68}
69
70impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for TakeUntil<OE, OE1>
71where
72 T: 'or,
73 OE: Observable<'or, 'sub, T, E>,
74 OE1: Observable<'or, 'sub, (), E>,
75 'sub: 'or,
76{
77 fn subscribe(
78 self,
79 observer: impl Observer<T, E> + NecessarySendSync + 'or,
80 ) -> Subscription<'sub> {
81 subscribe_unsub_after_termination(observer, |observer| {
82 let observer = Shared::new(Mutable::new(Some(observer)));
83 let stop_observer = StopObserver {
84 observer: observer.clone(),
85 _marker: PhantomData,
86 };
87 let subscription_1 = self.stop.subscribe(stop_observer);
88 let observer = TakeUntilObserver(observer.clone());
89 let subscription_2 = self.source.subscribe(observer);
90 subscription_1 + subscription_2
91 })
92 }
93}
94
95struct TakeUntilObserver<OR>(Shared<Mutable<Option<OR>>>);
96
97impl<T, E, OR> Observer<T, E> for TakeUntilObserver<OR>
98where
99 OR: Observer<T, E>,
100{
101 fn on_next(&mut self, value: T) {
102 safe_lock_option_observer!(on_next: self.0, value);
103 }
104
105 fn on_termination(self, termination: Termination<E>) {
106 safe_lock_option_observer!(on_termination: self.0, termination);
107 }
108}
109
110struct StopObserver<T, OR> {
111 observer: Shared<Mutable<Option<OR>>>,
112 _marker: MarkerType<T>,
113}
114
115impl<T, E, OR> Observer<(), E> for StopObserver<T, OR>
116where
117 OR: Observer<T, E>,
118{
119 fn on_next(&mut self, _: ()) {
120 safe_lock_option_observer!(on_termination: self.observer, Termination::Completed);
121 }
122
123 fn on_termination(self, termination: Termination<E>) {
124 match termination {
125 Termination::Completed => {}
126 Termination::Error(_) => {
127 safe_lock_option_observer!(on_termination: self.observer, termination);
128 }
129 }
130 }
131}