rx_rust/operators/conditional_boolean/
take_until.rs

1use crate::safe_lock_option_observer;
2use crate::utils::types::{Mutable, NecessarySendSync, Shared};
3use crate::{
4    disposable::subscription::Subscription,
5    observable::Observable,
6    observer::{Observer, Termination},
7    utils::{types::MarkerType, unsub_after_termination::subscribe_unsub_after_termination},
8};
9use educe::Educe;
10use std::marker::PhantomData;
11
12/// Emits the items emitted by a source Observable until a second Observable emits an item or a notification.
13/// See <https://reactivex.io/documentation/operators/takeuntil.html>
14///
15/// # Examples
16/// ```rust
17/// use rx_rust::{
18///     observable::observable_ext::ObservableExt,
19///     observer::{Observer, Termination},
20///     operators::conditional_boolean::take_until::TakeUntil,
21///     subject::publish_subject::PublishSubject,
22/// };
23/// use std::{convert::Infallible, sync::{Arc, Mutex}};
24///
25/// let values = Arc::new(Mutex::new(Vec::new()));
26/// let terminations = Arc::new(Mutex::new(Vec::new()));
27///
28/// let mut source: PublishSubject<'_, i32, Infallible> = PublishSubject::default();
29/// let mut stop: PublishSubject<'_, (), Infallible> = PublishSubject::default();
30/// let values_observer = Arc::clone(&values);
31/// let terminations_observer = Arc::clone(&terminations);
32///
33/// let subscription = TakeUntil::new(source.clone(), stop.clone()).subscribe_with_callback(
34///     move |value| values_observer.lock().unwrap().push(value),
35///     move |termination| terminations_observer
36///         .lock()
37///         .unwrap()
38///         .push(termination),
39/// );
40///
41/// source.on_next(1);
42/// source.on_next(2);
43/// stop.on_next(());
44/// source.on_next(3);
45/// drop(subscription);
46///
47/// assert_eq!(&*values.lock().unwrap(), &[1, 2]);
48/// assert_eq!(
49///     &*terminations.lock().unwrap(),
50///     &[Termination::Completed]
51/// );
52/// ```
53#[derive(Educe)]
54#[educe(Debug, Clone)]
55pub struct TakeUntil<OE, OE1> {
56    source: OE,
57    stop: OE1,
58}
59
60impl<OE, OE1> TakeUntil<OE, OE1> {
61    pub fn new<'or, 'sub, T, E>(source: OE, stop: OE1) -> Self
62    where
63        OE: Observable<'or, 'sub, T, E>,
64        OE1: Observable<'or, 'sub, (), E>,
65    {
66        Self { source, stop }
67    }
68}
69
70impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for TakeUntil<OE, OE1>
71where
72    T: 'or,
73    OE: Observable<'or, 'sub, T, E>,
74    OE1: Observable<'or, 'sub, (), E>,
75    'sub: 'or,
76{
77    fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
78        subscribe_unsub_after_termination(observer, |observer| {
79            let observer = Shared::new(Mutable::new(Some(observer)));
80            let stop_observer = StopObserver {
81                observer: observer.clone(),
82                _marker: PhantomData,
83            };
84            let subscription_1 = self.stop.subscribe(stop_observer);
85            let observer = TakeUntilObserver(observer.clone());
86            let subscription_2 = self.source.subscribe(observer);
87            subscription_1 + subscription_2
88        })
89    }
90}
91
92struct TakeUntilObserver<OR>(Shared<Mutable<Option<OR>>>);
93
94impl<T, E, OR> Observer<T, E> for TakeUntilObserver<OR>
95where
96    OR: Observer<T, E>,
97{
98    fn on_next(&mut self, value: T) {
99        safe_lock_option_observer!(on_next: self.0, value);
100    }
101
102    fn on_termination(self, termination: Termination<E>) {
103        safe_lock_option_observer!(on_termination: self.0, termination);
104    }
105}
106
107struct StopObserver<T, OR> {
108    observer: Shared<Mutable<Option<OR>>>,
109    _marker: MarkerType<T>,
110}
111
112impl<T, E, OR> Observer<(), E> for StopObserver<T, OR>
113where
114    OR: Observer<T, E>,
115{
116    fn on_next(&mut self, _: ()) {
117        safe_lock_option_observer!(on_termination: self.observer, Termination::Completed);
118    }
119
120    fn on_termination(self, termination: Termination<E>) {
121        match termination {
122            Termination::Completed => {}
123            Termination::Error(_) => {
124                safe_lock_option_observer!(on_termination: self.observer, termination);
125            }
126        }
127    }
128}