rx_rust/operators/mathematical_aggregate/
max.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6};
7use educe::Educe;
8
9/// Emits the maximum item emitted by an Observable.
10/// See <https://reactivex.io/documentation/operators/max.html>
11///
12/// # Examples
13/// ```rust
14/// use rx_rust::{
15///     observable::observable_ext::ObservableExt,
16///     observer::Termination,
17///     operators::{
18///         creating::from_iter::FromIter,
19///         mathematical_aggregate::max::Max,
20///     },
21/// };
22///
23/// let mut values = Vec::new();
24/// let mut terminations = Vec::new();
25///
26/// let observable = Max::new(FromIter::new(vec![3, 5, 4]));
27/// observable.subscribe_with_callback(
28///     |value| values.push(value),
29///     |termination| terminations.push(termination),
30/// );
31///
32/// assert_eq!(values, vec![5]);
33/// assert_eq!(terminations, vec![Termination::Completed]);
34/// ```
35#[derive(Educe)]
36#[educe(Debug, Clone)]
37pub struct Max<OE> {
38    source: OE,
39}
40
41impl<OE> Max<OE> {
42    pub fn new<'or, 'sub, T, E>(source: OE) -> Self
43    where
44        OE: Observable<'or, 'sub, T, E>,
45    {
46        Self { source }
47    }
48}
49
50impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for Max<OE>
51where
52    T: PartialOrd + NecessarySendSync + 'or,
53    OE: Observable<'or, 'sub, T, E>,
54{
55    fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
56        let observer = MaxObserver {
57            observer,
58            max: None,
59        };
60        self.source.subscribe(observer)
61    }
62}
63
64struct MaxObserver<T, OR> {
65    observer: OR,
66    max: Option<T>,
67}
68
69impl<T, E, OR> Observer<T, E> for MaxObserver<T, OR>
70where
71    T: PartialOrd,
72    OR: Observer<T, E>,
73{
74    fn on_next(&mut self, value: T) {
75        if let Some(max) = &mut self.max {
76            if value > *max {
77                *max = value;
78            }
79        } else {
80            self.max = Some(value);
81        }
82    }
83
84    fn on_termination(mut self, termination: Termination<E>) {
85        match termination {
86            Termination::Completed => {
87                if let Some(max) = self.max {
88                    self.observer.on_next(max);
89                }
90            }
91            Termination::Error(_) => {}
92        }
93        self.observer.on_termination(termination)
94    }
95}