rx_rust/operators/conditional_boolean/
contains.rs

1use crate::utils::types::NecessarySendSync;
2use crate::utils::unsub_after_termination::subscribe_unsub_after_termination;
3use crate::{
4    disposable::subscription::Subscription,
5    observable::Observable,
6    observer::{Observer, Termination},
7};
8use educe::Educe;
9
10/// Emits a single boolean value that indicates whether a source Observable emits a specified item.
11/// See <https://reactivex.io/documentation/operators/contains.html>
12///
13/// # Examples
14/// ```rust
15/// use rx_rust::{
16///     observable::observable_ext::ObservableExt,
17///     observer::Termination,
18///     operators::{
19///         conditional_boolean::contains::Contains,
20///         creating::from_iter::FromIter,
21///     },
22/// };
23///
24/// let mut values = Vec::new();
25/// let mut terminations = Vec::new();
26///
27/// let observable = Contains::new(FromIter::new(vec![1, 2, 3]), 2);
28/// observable.subscribe_with_callback(
29///     |value| values.push(value),
30///     |termination| terminations.push(termination),
31/// );
32///
33/// assert_eq!(values, vec![true]);
34/// assert_eq!(terminations, vec![Termination::Completed]);
35/// ```
36#[derive(Educe)]
37#[educe(Debug, Clone)]
38pub struct Contains<T, OE> {
39    source: OE,
40    item: T,
41}
42
43impl<T, OE> Contains<T, OE> {
44    pub fn new<'or, 'sub, E>(source: OE, item: T) -> Self
45    where
46        OE: Observable<'or, 'sub, T, E>,
47    {
48        Self { source, item }
49    }
50}
51
52impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, bool, E> for Contains<T, OE>
53where
54    OE: Observable<'or, 'sub, T, E>,
55    T: PartialEq + NecessarySendSync + 'or,
56    'sub: 'or,
57{
58    fn subscribe(
59        self,
60        observer: impl Observer<bool, E> + NecessarySendSync + 'or,
61    ) -> Subscription<'sub> {
62        subscribe_unsub_after_termination(observer, |observer| {
63            let observer = ContainsObserver {
64                observer: Some(observer),
65                item: self.item,
66            };
67            self.source.subscribe(observer)
68        })
69    }
70}
71
72struct ContainsObserver<T, OR> {
73    observer: Option<OR>,
74    item: T,
75}
76
77impl<T, E, OR> Observer<T, E> for ContainsObserver<T, OR>
78where
79    OR: Observer<bool, E>,
80    T: PartialEq,
81{
82    fn on_next(&mut self, value: T) {
83        if self.item == value
84            && let Some(mut observer) = self.observer.take()
85        {
86            observer.on_next(true);
87            observer.on_termination(Termination::Completed);
88        }
89    }
90
91    fn on_termination(mut self, termination: Termination<E>) {
92        if let Some(mut observer) = self.observer.take() {
93            match termination {
94                Termination::Completed => observer.on_next(false),
95                Termination::Error(_) => {}
96            }
97            observer.on_termination(termination);
98        }
99    }
100}