rx_rust/operators/conditional_boolean/
take_until.rs

1use crate::safe_lock_option_observer;
2use crate::utils::types::{Mutable, NecessarySendSync, Shared};
3use crate::{
4    disposable::subscription::Subscription,
5    observable::Observable,
6    observer::{Observer, Termination},
7    utils::{types::MarkerType, unsub_after_termination::subscribe_unsub_after_termination},
8};
9use educe::Educe;
10use std::marker::PhantomData;
11
12/// Emits the items emitted by a source Observable until a second Observable emits an item or a notification.
13/// See <https://reactivex.io/documentation/operators/takeuntil.html>
14///
15/// # Examples
16/// ```rust
17/// use rx_rust::{
18///     observable::observable_ext::ObservableExt,
19///     observer::{Observer, Termination},
20///     operators::conditional_boolean::take_until::TakeUntil,
21///     subject::publish_subject::PublishSubject,
22/// };
23/// use std::{convert::Infallible, sync::{Arc, Mutex}};
24///
25/// let values = Arc::new(Mutex::new(Vec::new()));
26/// let terminations = Arc::new(Mutex::new(Vec::new()));
27///
28/// let mut source: PublishSubject<'_, i32, Infallible> = PublishSubject::default();
29/// let mut stop: PublishSubject<'_, (), Infallible> = PublishSubject::default();
30/// let values_observer = Arc::clone(&values);
31/// let terminations_observer = Arc::clone(&terminations);
32///
33/// let subscription = TakeUntil::new(source.clone(), stop.clone()).subscribe_with_callback(
34///     move |value| values_observer.lock().unwrap().push(value),
35///     move |termination| terminations_observer
36///         .lock()
37///         .unwrap()
38///         .push(termination),
39/// );
40///
41/// source.on_next(1);
42/// source.on_next(2);
43/// stop.on_next(());
44/// source.on_next(3);
45/// drop(subscription);
46///
47/// assert_eq!(&*values.lock().unwrap(), &[1, 2]);
48/// assert_eq!(
49///     &*terminations.lock().unwrap(),
50///     &[Termination::Completed]
51/// );
52/// ```
53#[derive(Educe)]
54#[educe(Debug, Clone)]
55pub struct TakeUntil<OE, OE1> {
56    source: OE,
57    stop: OE1,
58}
59
60impl<OE, OE1> TakeUntil<OE, OE1> {
61    pub fn new<'or, 'sub, T, E>(source: OE, stop: OE1) -> Self
62    where
63        OE: Observable<'or, 'sub, T, E>,
64        OE1: Observable<'or, 'sub, (), E>,
65    {
66        Self { source, stop }
67    }
68}
69
70impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for TakeUntil<OE, OE1>
71where
72    T: 'or,
73    OE: Observable<'or, 'sub, T, E>,
74    OE1: Observable<'or, 'sub, (), E>,
75    'sub: 'or,
76{
77    fn subscribe(
78        self,
79        observer: impl Observer<T, E> + NecessarySendSync + 'or,
80    ) -> Subscription<'sub> {
81        subscribe_unsub_after_termination(observer, |observer| {
82            let observer = Shared::new(Mutable::new(Some(observer)));
83            let stop_observer = StopObserver {
84                observer: observer.clone(),
85                _marker: PhantomData,
86            };
87            let subscription_1 = self.stop.subscribe(stop_observer);
88            let observer = TakeUntilObserver(observer.clone());
89            let subscription_2 = self.source.subscribe(observer);
90            subscription_1 + subscription_2
91        })
92    }
93}
94
95struct TakeUntilObserver<OR>(Shared<Mutable<Option<OR>>>);
96
97impl<T, E, OR> Observer<T, E> for TakeUntilObserver<OR>
98where
99    OR: Observer<T, E>,
100{
101    fn on_next(&mut self, value: T) {
102        safe_lock_option_observer!(on_next: self.0, value);
103    }
104
105    fn on_termination(self, termination: Termination<E>) {
106        safe_lock_option_observer!(on_termination: self.0, termination);
107    }
108}
109
110struct StopObserver<T, OR> {
111    observer: Shared<Mutable<Option<OR>>>,
112    _marker: MarkerType<T>,
113}
114
115impl<T, E, OR> Observer<(), E> for StopObserver<T, OR>
116where
117    OR: Observer<T, E>,
118{
119    fn on_next(&mut self, _: ()) {
120        safe_lock_option_observer!(on_termination: self.observer, Termination::Completed);
121    }
122
123    fn on_termination(self, termination: Termination<E>) {
124        match termination {
125            Termination::Completed => {}
126            Termination::Error(_) => {
127                safe_lock_option_observer!(on_termination: self.observer, termination);
128            }
129        }
130    }
131}