rx_rust/operators/conditional_boolean/
all.rs

1use crate::utils::types::{MarkerType, NecessarySendSync};
2use crate::utils::unsub_after_termination::subscribe_unsub_after_termination;
3use crate::{
4    disposable::subscription::Subscription,
5    observable::Observable,
6    observer::{Observer, Termination},
7};
8use educe::Educe;
9use std::marker::PhantomData;
10
11/// Emits a single boolean value that indicates whether all items emitted by a source Observable satisfy a specified condition.
12/// See <https://reactivex.io/documentation/operators/all.html>
13///
14/// # Examples
15/// ```rust
16/// use rx_rust::{
17///     observable::observable_ext::ObservableExt,
18///     observer::Termination,
19///     operators::{
20///         conditional_boolean::all::All,
21///         creating::from_iter::FromIter,
22///     },
23/// };
24///
25/// let mut values = Vec::new();
26/// let mut terminations = Vec::new();
27///
28/// let observable = All::new(FromIter::new(vec![1, 2, 3]), |value| value < 5);
29/// observable.subscribe_with_callback(
30///     |value| values.push(value),
31///     |termination| terminations.push(termination),
32/// );
33///
34/// assert_eq!(values, vec![true]);
35/// assert_eq!(terminations, vec![Termination::Completed]);
36/// ```
37#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct All<T, OE, F> {
40    source: OE,
41    callback: F,
42    _marker: MarkerType<T>,
43}
44
45impl<T, OE, F> All<T, OE, F> {
46    pub fn new<'or, 'sub, E>(source: OE, callback: F) -> Self
47    where
48        OE: Observable<'or, 'sub, T, E>,
49        F: FnMut(T) -> bool,
50    {
51        Self {
52            source,
53            callback,
54            _marker: PhantomData,
55        }
56    }
57}
58
59impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, bool, E> for All<T, OE, F>
60where
61    OE: Observable<'or, 'sub, T, E>,
62    F: FnMut(T) -> bool + NecessarySendSync + 'or,
63    'sub: 'or,
64{
65    fn subscribe(
66        self,
67        observer: impl Observer<bool, E> + NecessarySendSync + 'or,
68    ) -> Subscription<'sub> {
69        subscribe_unsub_after_termination(observer, |observer| {
70            let observer = AllObserver {
71                observer: Some(observer),
72                callback: self.callback,
73            };
74            self.source.subscribe(observer)
75        })
76    }
77}
78
79struct AllObserver<OR, F> {
80    observer: Option<OR>,
81    callback: F,
82}
83
84impl<T, E, OR, F> Observer<T, E> for AllObserver<OR, F>
85where
86    OR: Observer<bool, E>,
87    F: FnMut(T) -> bool,
88{
89    fn on_next(&mut self, value: T) {
90        if !(self.callback)(value)
91            && let Some(mut observer) = self.observer.take()
92        {
93            observer.on_next(false);
94            observer.on_termination(Termination::Completed);
95        }
96    }
97
98    fn on_termination(mut self, termination: Termination<E>) {
99        if let Some(mut observer) = self.observer.take() {
100            match termination {
101                Termination::Completed => observer.on_next(true),
102                Termination::Error(_) => {}
103            }
104            observer.on_termination(termination);
105        }
106    }
107}