rx_rust/operators/mathematical_aggregate/
min.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6};
7use educe::Educe;
8
9#[derive(Educe)]
36#[educe(Debug, Clone)]
37pub struct Min<OE> {
38 source: OE,
39}
40
41impl<OE> Min<OE> {
42 pub fn new<'or, 'sub, T, E>(source: OE) -> Self
43 where
44 OE: Observable<'or, 'sub, T, E>,
45 {
46 Self { source }
47 }
48}
49
50impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for Min<OE>
51where
52 T: PartialOrd + NecessarySendSync + 'or,
53 OE: Observable<'or, 'sub, T, E>,
54{
55 fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
56 let observer = MinObserver {
57 observer,
58 min: None,
59 };
60 self.source.subscribe(observer)
61 }
62}
63
64struct MinObserver<T, OR> {
65 observer: OR,
66 min: Option<T>,
67}
68
69impl<T, E, OR> Observer<T, E> for MinObserver<T, OR>
70where
71 T: PartialOrd,
72 OR: Observer<T, E>,
73{
74 fn on_next(&mut self, value: T) {
75 if let Some(min) = &mut self.min {
76 if value < *min {
77 *min = value;
78 }
79 } else {
80 self.min = Some(value);
81 }
82 }
83
84 fn on_termination(mut self, termination: Termination<E>) {
85 match termination {
86 Termination::Completed => {
87 if let Some(min) = self.min {
88 self.observer.on_next(min);
89 }
90 }
91 Termination::Error(_) => {}
92 }
93 self.observer.on_termination(termination)
94 }
95}