rx_rust/operators/mathematical_aggregate/
min.rs

1use crate::utils::types::NecessarySend;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6};
7use educe::Educe;
8
9/// Emits the minimum item emitted by an Observable.
10/// See <https://reactivex.io/documentation/operators/min.html>
11///
12/// # Examples
13/// ```rust
14/// use rx_rust::{
15///     observable::observable_ext::ObservableExt,
16///     observer::Termination,
17///     operators::{
18///         creating::from_iter::FromIter,
19///         mathematical_aggregate::min::Min,
20///     },
21/// };
22///
23/// let mut values = Vec::new();
24/// let mut terminations = Vec::new();
25///
26/// let observable = Min::new(FromIter::new(vec![3, 1, 2]));
27/// observable.subscribe_with_callback(
28///     |value| values.push(value),
29///     |termination| terminations.push(termination),
30/// );
31///
32/// assert_eq!(values, vec![1]);
33/// assert_eq!(terminations, vec![Termination::Completed]);
34/// ```
35#[derive(Educe)]
36#[educe(Debug, Clone)]
37pub struct Min<OE> {
38    source: OE,
39}
40
41impl<OE> Min<OE> {
42    pub fn new<'or, 'sub, T, E>(source: OE) -> Self
43    where
44        OE: Observable<'or, 'sub, T, E>,
45    {
46        Self { source }
47    }
48}
49
50impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for Min<OE>
51where
52    T: PartialOrd + NecessarySend + 'or,
53    OE: Observable<'or, 'sub, T, E>,
54{
55    fn subscribe(self, observer: impl Observer<T, E> + NecessarySend + 'or) -> Subscription<'sub> {
56        let observer = MinObserver {
57            observer,
58            min: None,
59        };
60        self.source.subscribe(observer)
61    }
62}
63
64struct MinObserver<T, OR> {
65    observer: OR,
66    min: Option<T>,
67}
68
69impl<T, E, OR> Observer<T, E> for MinObserver<T, OR>
70where
71    T: PartialOrd,
72    OR: Observer<T, E>,
73{
74    fn on_next(&mut self, value: T) {
75        if let Some(min) = &mut self.min {
76            if value < *min {
77                *min = value;
78            }
79        } else {
80            self.min = Some(value);
81        }
82    }
83
84    fn on_termination(mut self, termination: Termination<E>) {
85        match termination {
86            Termination::Completed => {
87                if let Some(min) = self.min {
88                    self.observer.on_next(min);
89                }
90            }
91            Termination::Error(_) => {}
92        }
93        self.observer.on_termination(termination)
94    }
95}