rx_rust/operators/mathematical_aggregate/
max.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6};
7use educe::Educe;
8
9#[derive(Educe)]
36#[educe(Debug, Clone)]
37pub struct Max<OE> {
38 source: OE,
39}
40
41impl<OE> Max<OE> {
42 pub fn new<'or, 'sub, T, E>(source: OE) -> Self
43 where
44 OE: Observable<'or, 'sub, T, E>,
45 {
46 Self { source }
47 }
48}
49
50impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for Max<OE>
51where
52 T: PartialOrd + NecessarySendSync + 'or,
53 OE: Observable<'or, 'sub, T, E>,
54{
55 fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
56 let observer = MaxObserver {
57 observer,
58 max: None,
59 };
60 self.source.subscribe(observer)
61 }
62}
63
64struct MaxObserver<T, OR> {
65 observer: OR,
66 max: Option<T>,
67}
68
69impl<T, E, OR> Observer<T, E> for MaxObserver<T, OR>
70where
71 T: PartialOrd,
72 OR: Observer<T, E>,
73{
74 fn on_next(&mut self, value: T) {
75 if let Some(max) = &mut self.max {
76 if value > *max {
77 *max = value;
78 }
79 } else {
80 self.max = Some(value);
81 }
82 }
83
84 fn on_termination(mut self, termination: Termination<E>) {
85 match termination {
86 Termination::Completed => {
87 if let Some(max) = self.max {
88 self.observer.on_next(max);
89 }
90 }
91 Termination::Error(_) => {}
92 }
93 self.observer.on_termination(termination)
94 }
95}