rx_rust/operators/mathematical_aggregate/
max.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6};
7use educe::Educe;
8
9#[derive(Educe)]
36#[educe(Debug, Clone)]
37pub struct Max<OE> {
38 source: OE,
39}
40
41impl<OE> Max<OE> {
42 pub fn new<'or, 'sub, T, E>(source: OE) -> Self
43 where
44 OE: Observable<'or, 'sub, T, E>,
45 {
46 Self { source }
47 }
48}
49
50impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for Max<OE>
51where
52 T: PartialOrd + NecessarySendSync + 'or,
53 OE: Observable<'or, 'sub, T, E>,
54{
55 fn subscribe(
56 self,
57 observer: impl Observer<T, E> + NecessarySendSync + 'or,
58 ) -> Subscription<'sub> {
59 let observer = MaxObserver {
60 observer,
61 max: None,
62 };
63 self.source.subscribe(observer)
64 }
65}
66
67struct MaxObserver<T, OR> {
68 observer: OR,
69 max: Option<T>,
70}
71
72impl<T, E, OR> Observer<T, E> for MaxObserver<T, OR>
73where
74 T: PartialOrd,
75 OR: Observer<T, E>,
76{
77 fn on_next(&mut self, value: T) {
78 if let Some(max) = &mut self.max {
79 if value > *max {
80 *max = value;
81 }
82 } else {
83 self.max = Some(value);
84 }
85 }
86
87 fn on_termination(mut self, termination: Termination<E>) {
88 match termination {
89 Termination::Completed => {
90 if let Some(max) = self.max {
91 self.observer.on_next(max);
92 }
93 }
94 Termination::Error(_) => {}
95 }
96 self.observer.on_termination(termination)
97 }
98}