rx_rust/operators/mathematical_aggregate/
reduce.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6 utils::types::MarkerType,
7};
8use educe::Educe;
9use std::marker::PhantomData;
10
11#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct Reduce<T, T1, OE, F> {
40 source: OE,
41 initial_value: T,
42 callback: F,
43 _marker: MarkerType<T1>,
44}
45
46impl<T, T1, OE, F> Reduce<T, T1, OE, F> {
47 pub fn new<'or, 'sub, E>(source: OE, initial_value: T, callback: F) -> Self
48 where
49 OE: Observable<'or, 'sub, T1, E>,
50 F: FnMut(T, T1) -> T,
51 {
52 Self {
53 source,
54 initial_value,
55 callback,
56 _marker: PhantomData,
57 }
58 }
59}
60
61impl<'or, 'sub, T, T1, E, OE, F> Observable<'or, 'sub, T, E> for Reduce<T, T1, OE, F>
62where
63 T: NecessarySendSync + 'or,
64 OE: Observable<'or, 'sub, T1, E>,
65 F: FnMut(T, T1) -> T + NecessarySendSync + 'or,
66{
67 fn subscribe(
68 self,
69 observer: impl Observer<T, E> + NecessarySendSync + 'or,
70 ) -> Subscription<'sub> {
71 let observer = ReduceObserver {
72 observer,
73 value: Some(self.initial_value),
74 callback: self.callback,
75 };
76 self.source.subscribe(observer)
77 }
78}
79
80struct ReduceObserver<T, OR, F> {
81 observer: OR,
82 value: Option<T>,
83 callback: F,
84}
85
86impl<T, T1, E, OR, F> Observer<T1, E> for ReduceObserver<T, OR, F>
87where
88 OR: Observer<T, E>,
89 F: FnMut(T, T1) -> T,
90{
91 fn on_next(&mut self, value: T1) {
92 self.value = Some((self.callback)(self.value.take().unwrap(), value));
93 }
94
95 fn on_termination(mut self, termination: Termination<E>) {
96 match termination {
97 Termination::Completed => {
98 self.observer.on_next(self.value.unwrap());
99 }
100 Termination::Error(_) => {}
101 }
102 self.observer.on_termination(termination)
103 }
104}