rx_rust/operators/mathematical_aggregate/
reduce.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6    utils::types::MarkerType,
7};
8use educe::Educe;
9use std::marker::PhantomData;
10
11/// Applies a function to each item emitted by an Observable, sequentially, and emits the final accumulated value.
12/// See <https://reactivex.io/documentation/operators/reduce.html>
13///
14/// # Examples
15/// ```rust
16/// use rx_rust::{
17///     observable::observable_ext::ObservableExt,
18///     observer::Termination,
19///     operators::{
20///         creating::from_iter::FromIter,
21///         mathematical_aggregate::reduce::Reduce,
22///     },
23/// };
24///
25/// let mut values = Vec::new();
26/// let mut terminations = Vec::new();
27///
28/// let observable = Reduce::new(FromIter::new(vec![1, 2, 3]), 0, |acc, value| acc + value);
29/// observable.subscribe_with_callback(
30///     |value| values.push(value),
31///     |termination| terminations.push(termination),
32/// );
33///
34/// assert_eq!(values, vec![6]);
35/// assert_eq!(terminations, vec![Termination::Completed]);
36/// ```
37#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct Reduce<T, T1, OE, F> {
40    source: OE,
41    initial_value: T,
42    callback: F,
43    _marker: MarkerType<T1>,
44}
45
46impl<T, T1, OE, F> Reduce<T, T1, OE, F> {
47    pub fn new<'or, 'sub, E>(source: OE, initial_value: T, callback: F) -> Self
48    where
49        OE: Observable<'or, 'sub, T1, E>,
50        F: FnMut(T, T1) -> T,
51    {
52        Self {
53            source,
54            initial_value,
55            callback,
56            _marker: PhantomData,
57        }
58    }
59}
60
61impl<'or, 'sub, T, T1, E, OE, F> Observable<'or, 'sub, T, E> for Reduce<T, T1, OE, F>
62where
63    T: NecessarySendSync + 'or,
64    OE: Observable<'or, 'sub, T1, E>,
65    F: FnMut(T, T1) -> T + NecessarySendSync + 'or,
66{
67    fn subscribe(
68        self,
69        observer: impl Observer<T, E> + NecessarySendSync + 'or,
70    ) -> Subscription<'sub> {
71        let observer = ReduceObserver {
72            observer,
73            value: Some(self.initial_value),
74            callback: self.callback,
75        };
76        self.source.subscribe(observer)
77    }
78}
79
80struct ReduceObserver<T, OR, F> {
81    observer: OR,
82    value: Option<T>,
83    callback: F,
84}
85
86impl<T, T1, E, OR, F> Observer<T1, E> for ReduceObserver<T, OR, F>
87where
88    OR: Observer<T, E>,
89    F: FnMut(T, T1) -> T,
90{
91    fn on_next(&mut self, value: T1) {
92        self.value = Some((self.callback)(self.value.take().unwrap(), value));
93    }
94
95    fn on_termination(mut self, termination: Termination<E>) {
96        match termination {
97            Termination::Completed => {
98                self.observer.on_next(self.value.unwrap());
99            }
100            Termination::Error(_) => {}
101        }
102        self.observer.on_termination(termination)
103    }
104}