rx_rust/operators/mathematical_aggregate/
min.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6};
7use educe::Educe;
8
9/// Emits the minimum item emitted by an Observable.
10/// See <https://reactivex.io/documentation/operators/min.html>
11///
12/// # Examples
13/// ```rust
14/// use rx_rust::{
15///     observable::observable_ext::ObservableExt,
16///     observer::Termination,
17///     operators::{
18///         creating::from_iter::FromIter,
19///         mathematical_aggregate::min::Min,
20///     },
21/// };
22///
23/// let mut values = Vec::new();
24/// let mut terminations = Vec::new();
25///
26/// let observable = Min::new(FromIter::new(vec![3, 1, 2]));
27/// observable.subscribe_with_callback(
28///     |value| values.push(value),
29///     |termination| terminations.push(termination),
30/// );
31///
32/// assert_eq!(values, vec![1]);
33/// assert_eq!(terminations, vec![Termination::Completed]);
34/// ```
35#[derive(Educe)]
36#[educe(Debug, Clone)]
37pub struct Min<OE> {
38    source: OE,
39}
40
41impl<OE> Min<OE> {
42    pub fn new<'or, 'sub, T, E>(source: OE) -> Self
43    where
44        OE: Observable<'or, 'sub, T, E>,
45    {
46        Self { source }
47    }
48}
49
50impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for Min<OE>
51where
52    T: PartialOrd + NecessarySendSync + 'or,
53    OE: Observable<'or, 'sub, T, E>,
54{
55    fn subscribe(
56        self,
57        observer: impl Observer<T, E> + NecessarySendSync + 'or,
58    ) -> Subscription<'sub> {
59        let observer = MinObserver {
60            observer,
61            min: None,
62        };
63        self.source.subscribe(observer)
64    }
65}
66
67struct MinObserver<T, OR> {
68    observer: OR,
69    min: Option<T>,
70}
71
72impl<T, E, OR> Observer<T, E> for MinObserver<T, OR>
73where
74    T: PartialOrd,
75    OR: Observer<T, E>,
76{
77    fn on_next(&mut self, value: T) {
78        if let Some(min) = &mut self.min {
79            if value < *min {
80                *min = value;
81            }
82        } else {
83            self.min = Some(value);
84        }
85    }
86
87    fn on_termination(mut self, termination: Termination<E>) {
88        match termination {
89            Termination::Completed => {
90                if let Some(min) = self.min {
91                    self.observer.on_next(min);
92                }
93            }
94            Termination::Error(_) => {}
95        }
96        self.observer.on_termination(termination)
97    }
98}