rx_rust/operators/conditional_boolean/
take_until.rs1use crate::safe_lock_option_observer;
2use crate::utils::types::{Mutable, NecessarySendSync, Shared};
3use crate::{
4 disposable::subscription::Subscription,
5 observable::Observable,
6 observer::{Observer, Termination},
7 utils::{types::MarkerType, unsub_after_termination::subscribe_unsub_after_termination},
8};
9use educe::Educe;
10use std::marker::PhantomData;
11
12#[derive(Educe)]
54#[educe(Debug, Clone)]
55pub struct TakeUntil<OE, OE1> {
56 source: OE,
57 stop: OE1,
58}
59
60impl<OE, OE1> TakeUntil<OE, OE1> {
61 pub fn new<'or, 'sub, T, E>(source: OE, stop: OE1) -> Self
62 where
63 OE: Observable<'or, 'sub, T, E>,
64 OE1: Observable<'or, 'sub, (), E>,
65 {
66 Self { source, stop }
67 }
68}
69
70impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for TakeUntil<OE, OE1>
71where
72 T: 'or,
73 OE: Observable<'or, 'sub, T, E>,
74 OE1: Observable<'or, 'sub, (), E>,
75 'sub: 'or,
76{
77 fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
78 subscribe_unsub_after_termination(observer, |observer| {
79 let observer = Shared::new(Mutable::new(Some(observer)));
80 let stop_observer = StopObserver {
81 observer: observer.clone(),
82 _marker: PhantomData,
83 };
84 let subscription_1 = self.stop.subscribe(stop_observer);
85 let observer = TakeUntilObserver(observer.clone());
86 let subscription_2 = self.source.subscribe(observer);
87 subscription_1 + subscription_2
88 })
89 }
90}
91
92struct TakeUntilObserver<OR>(Shared<Mutable<Option<OR>>>);
93
94impl<T, E, OR> Observer<T, E> for TakeUntilObserver<OR>
95where
96 OR: Observer<T, E>,
97{
98 fn on_next(&mut self, value: T) {
99 safe_lock_option_observer!(on_next: self.0, value);
100 }
101
102 fn on_termination(self, termination: Termination<E>) {
103 safe_lock_option_observer!(on_termination: self.0, termination);
104 }
105}
106
107struct StopObserver<T, OR> {
108 observer: Shared<Mutable<Option<OR>>>,
109 _marker: MarkerType<T>,
110}
111
112impl<T, E, OR> Observer<(), E> for StopObserver<T, OR>
113where
114 OR: Observer<T, E>,
115{
116 fn on_next(&mut self, _: ()) {
117 safe_lock_option_observer!(on_termination: self.observer, Termination::Completed);
118 }
119
120 fn on_termination(self, termination: Termination<E>) {
121 match termination {
122 Termination::Completed => {}
123 Termination::Error(_) => {
124 safe_lock_option_observer!(on_termination: self.observer, termination);
125 }
126 }
127 }
128}