rx_rust/operators/mathematical_aggregate/
reduce.rs1use crate::utils::types::NecessarySend;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6 utils::types::MarkerType,
7};
8use educe::Educe;
9use std::marker::PhantomData;
10
11#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct Reduce<T, T1, OE, F> {
40 source: OE,
41 initial_value: T,
42 callback: F,
43 _marker: MarkerType<T1>,
44}
45
46impl<T, T1, OE, F> Reduce<T, T1, OE, F> {
47 pub fn new<'or, 'sub, E>(source: OE, initial_value: T, callback: F) -> Self
48 where
49 OE: Observable<'or, 'sub, T1, E>,
50 F: FnMut(T, T1) -> T,
51 {
52 Self {
53 source,
54 initial_value,
55 callback,
56 _marker: PhantomData,
57 }
58 }
59}
60
61impl<'or, 'sub, T, T1, E, OE, F> Observable<'or, 'sub, T, E> for Reduce<T, T1, OE, F>
62where
63 T: NecessarySend + 'or,
64 OE: Observable<'or, 'sub, T1, E>,
65 F: FnMut(T, T1) -> T + NecessarySend + 'or,
66{
67 fn subscribe(self, observer: impl Observer<T, E> + NecessarySend + 'or) -> Subscription<'sub> {
68 let observer = ReduceObserver {
69 observer,
70 value: Some(self.initial_value),
71 callback: self.callback,
72 };
73 self.source.subscribe(observer)
74 }
75}
76
77struct ReduceObserver<T, OR, F> {
78 observer: OR,
79 value: Option<T>,
80 callback: F,
81}
82
83impl<T, T1, E, OR, F> Observer<T1, E> for ReduceObserver<T, OR, F>
84where
85 OR: Observer<T, E>,
86 F: FnMut(T, T1) -> T,
87{
88 fn on_next(&mut self, value: T1) {
89 self.value = Some((self.callback)(self.value.take().unwrap(), value));
90 }
91
92 fn on_termination(mut self, termination: Termination<E>) {
93 match termination {
94 Termination::Completed => {
95 self.observer.on_next(self.value.unwrap());
96 }
97 Termination::Error(_) => {}
98 }
99 self.observer.on_termination(termination)
100 }
101}