rx_rust/operators/mathematical_aggregate/
max.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6};
7use educe::Educe;
8
9/// Emits the maximum item emitted by an Observable.
10/// See <https://reactivex.io/documentation/operators/max.html>
11///
12/// # Examples
13/// ```rust
14/// use rx_rust::{
15///     observable::observable_ext::ObservableExt,
16///     observer::Termination,
17///     operators::{
18///         creating::from_iter::FromIter,
19///         mathematical_aggregate::max::Max,
20///     },
21/// };
22///
23/// let mut values = Vec::new();
24/// let mut terminations = Vec::new();
25///
26/// let observable = Max::new(FromIter::new(vec![3, 5, 4]));
27/// observable.subscribe_with_callback(
28///     |value| values.push(value),
29///     |termination| terminations.push(termination),
30/// );
31///
32/// assert_eq!(values, vec![5]);
33/// assert_eq!(terminations, vec![Termination::Completed]);
34/// ```
35#[derive(Educe)]
36#[educe(Debug, Clone)]
37pub struct Max<OE> {
38    source: OE,
39}
40
41impl<OE> Max<OE> {
42    pub fn new<'or, 'sub, T, E>(source: OE) -> Self
43    where
44        OE: Observable<'or, 'sub, T, E>,
45    {
46        Self { source }
47    }
48}
49
50impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for Max<OE>
51where
52    T: PartialOrd + NecessarySendSync + 'or,
53    OE: Observable<'or, 'sub, T, E>,
54{
55    fn subscribe(
56        self,
57        observer: impl Observer<T, E> + NecessarySendSync + 'or,
58    ) -> Subscription<'sub> {
59        let observer = MaxObserver {
60            observer,
61            max: None,
62        };
63        self.source.subscribe(observer)
64    }
65}
66
67struct MaxObserver<T, OR> {
68    observer: OR,
69    max: Option<T>,
70}
71
72impl<T, E, OR> Observer<T, E> for MaxObserver<T, OR>
73where
74    T: PartialOrd,
75    OR: Observer<T, E>,
76{
77    fn on_next(&mut self, value: T) {
78        if let Some(max) = &mut self.max {
79            if value > *max {
80                *max = value;
81            }
82        } else {
83            self.max = Some(value);
84        }
85    }
86
87    fn on_termination(mut self, termination: Termination<E>) {
88        match termination {
89            Termination::Completed => {
90                if let Some(max) = self.max {
91                    self.observer.on_next(max);
92                }
93            }
94            Termination::Error(_) => {}
95        }
96        self.observer.on_termination(termination)
97    }
98}