rx_rust/operators/mathematical_aggregate/
average.rs1use crate::utils::types::{MarkerType, NecessarySendSync};
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6};
7use educe::Educe;
8use std::marker::PhantomData;
9
10#[derive(Educe)]
37#[educe(Debug, Clone)]
38pub struct Average<T, OE> {
39 source: OE,
40 _marker: MarkerType<T>,
41}
42
43impl<T, OE> Average<T, OE> {
44 pub fn new<'or, 'sub, E>(source: OE) -> Self
45 where
46 OE: Observable<'or, 'sub, T, E>,
47 {
48 Self {
49 source,
50 _marker: PhantomData,
51 }
52 }
53}
54
55struct AverageObserver<T, OR> {
56 observer: OR,
57 sum: T,
58 count: usize,
59}
60
61macro_rules! average_observer_impl {
62 ($($t:ty)*) => ($(
63
64 impl<'or, 'sub, E, OE> Observable<'or, 'sub, f64, E> for Average<$t, OE>
65 where
66 OE: Observable<'or, 'sub, $t, E>,
67 {
68 fn subscribe(self, observer: impl Observer<f64, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
69 let observer = AverageObserver {
70 observer,
71 sum: 0 as $t,
72 count: 0,
73 };
74 self.source.subscribe(observer)
75 }
76 }
77
78 impl<E, OR> Observer<$t, E> for AverageObserver<$t, OR>
79 where
80 OR: Observer<f64, E>,
81 {
82 fn on_next(&mut self, value: $t) {
83 self.sum += value;
84 self.count += 1;
85 }
86
87 fn on_termination(mut self, termination: Termination<E>) {
88 match termination {
89 Termination::Completed => {
90 if self.count != 0 {
91 self.observer.on_next(self.sum as f64 / self.count as f64);
92 }
93 }
94 Termination::Error(_) => {}
95 }
96 self.observer.on_termination(termination)
97 }
98 }
99
100 )*)
101}
102
103average_observer_impl! { usize u8 u16 u32 u64 u128 isize i8 i16 i32 i64 i128 f32 f64 }