rx_rust/operators/mathematical_aggregate/
min.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6};
7use educe::Educe;
8
9#[derive(Educe)]
36#[educe(Debug, Clone)]
37pub struct Min<OE> {
38 source: OE,
39}
40
41impl<OE> Min<OE> {
42 pub fn new<'or, 'sub, T, E>(source: OE) -> Self
43 where
44 OE: Observable<'or, 'sub, T, E>,
45 {
46 Self { source }
47 }
48}
49
50impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for Min<OE>
51where
52 T: PartialOrd + NecessarySendSync + 'or,
53 OE: Observable<'or, 'sub, T, E>,
54{
55 fn subscribe(
56 self,
57 observer: impl Observer<T, E> + NecessarySendSync + 'or,
58 ) -> Subscription<'sub> {
59 let observer = MinObserver {
60 observer,
61 min: None,
62 };
63 self.source.subscribe(observer)
64 }
65}
66
67struct MinObserver<T, OR> {
68 observer: OR,
69 min: Option<T>,
70}
71
72impl<T, E, OR> Observer<T, E> for MinObserver<T, OR>
73where
74 T: PartialOrd,
75 OR: Observer<T, E>,
76{
77 fn on_next(&mut self, value: T) {
78 if let Some(min) = &mut self.min {
79 if value < *min {
80 *min = value;
81 }
82 } else {
83 self.min = Some(value);
84 }
85 }
86
87 fn on_termination(mut self, termination: Termination<E>) {
88 match termination {
89 Termination::Completed => {
90 if let Some(min) = self.min {
91 self.observer.on_next(min);
92 }
93 }
94 Termination::Error(_) => {}
95 }
96 self.observer.on_termination(termination)
97 }
98}