rx_rust/operators/mathematical_aggregate/
average.rs

1use crate::utils::types::{MarkerType, NecessarySendSync};
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6};
7use educe::Educe;
8use std::marker::PhantomData;
9
10/// Calculates the average of numbers emitted by an Observable and emits this average.
11/// See <https://reactivex.io/documentation/operators/average.html>
12///
13/// # Examples
14/// ```rust
15/// use rx_rust::{
16///     observable::observable_ext::ObservableExt,
17///     observer::Termination,
18///     operators::{
19///         creating::from_iter::FromIter,
20///         mathematical_aggregate::average::Average,
21///     },
22/// };
23///
24/// let mut values = Vec::new();
25/// let mut terminations = Vec::new();
26///
27/// let observable = Average::new(FromIter::new(vec![1.0_f64, 3.0, 5.0]));
28/// observable.subscribe_with_callback(
29///     |value| values.push(value),
30///     |termination| terminations.push(termination),
31/// );
32///
33/// assert_eq!(values, vec![3.0]);
34/// assert_eq!(terminations, vec![Termination::Completed]);
35/// ```
36#[derive(Educe)]
37#[educe(Debug, Clone)]
38pub struct Average<T, OE> {
39    source: OE,
40    _marker: MarkerType<T>,
41}
42
43impl<T, OE> Average<T, OE> {
44    pub fn new<'or, 'sub, E>(source: OE) -> Self
45    where
46        OE: Observable<'or, 'sub, T, E>,
47    {
48        Self {
49            source,
50            _marker: PhantomData,
51        }
52    }
53}
54
55struct AverageObserver<T, OR> {
56    observer: OR,
57    sum: T,
58    count: usize,
59}
60
61macro_rules! average_observer_impl {
62    ($($t:ty)*) => ($(
63
64        impl<'or, 'sub, E, OE> Observable<'or, 'sub, f64, E> for Average<$t, OE>
65        where
66            OE: Observable<'or, 'sub, $t, E>,
67        {
68            fn subscribe(self, observer: impl Observer<f64, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
69                let observer = AverageObserver {
70                    observer,
71                    sum: 0 as $t,
72                    count: 0,
73                };
74                self.source.subscribe(observer)
75            }
76        }
77
78        impl<E, OR> Observer<$t, E> for AverageObserver<$t, OR>
79        where
80            OR: Observer<f64, E>,
81        {
82            fn on_next(&mut self, value: $t) {
83                self.sum += value;
84                self.count += 1;
85            }
86
87            fn on_termination(mut self, termination: Termination<E>) {
88                match termination {
89                    Termination::Completed => {
90                        if self.count != 0 {
91                            self.observer.on_next(self.sum as f64 / self.count as f64);
92                        }
93                    }
94                    Termination::Error(_) => {}
95                }
96                self.observer.on_termination(termination)
97            }
98        }
99
100    )*)
101}
102
103average_observer_impl! { usize u8 u16 u32 u64 u128 isize i8 i16 i32 i64 i128 f32 f64 }