rx_rust/operators/transforming/
scan.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6 utils::types::MarkerType,
7};
8use educe::Educe;
9use std::marker::PhantomData;
10
11#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct Scan<T, T1, OE, F> {
40 source: OE,
41 initial_value: T,
42 callback: F,
43 _marker: MarkerType<T1>,
44}
45
46impl<T, T1, OE, F> Scan<T, T1, OE, F> {
47 pub fn new<'or, 'sub, E>(source: OE, initial_value: T, callback: F) -> Self
48 where
49 OE: Observable<'or, 'sub, T1, E>,
50 F: FnMut(T, T1) -> T,
51 {
52 Self {
53 source,
54 initial_value,
55 callback,
56 _marker: PhantomData,
57 }
58 }
59}
60
61impl<'or, 'sub, T, T1, E, OE, F> Observable<'or, 'sub, T, E> for Scan<T, T1, OE, F>
62where
63 T: Clone + NecessarySendSync + 'or,
64 OE: Observable<'or, 'sub, T1, E>,
65 F: FnMut(T, T1) -> T + NecessarySendSync + 'or,
66{
67 fn subscribe(
68 self,
69 observer: impl Observer<T, E> + NecessarySendSync + 'or,
70 ) -> Subscription<'sub> {
71 let observer = ScanObserver {
72 observer,
73 value: self.initial_value,
74 callback: self.callback,
75 };
76 self.source.subscribe(observer)
77 }
78}
79
80struct ScanObserver<T, OR, F> {
81 observer: OR,
82 value: T,
83 callback: F,
84}
85
86impl<T, T1, E, OR, F> Observer<T1, E> for ScanObserver<T, OR, F>
87where
88 T: Clone,
89 OR: Observer<T, E>,
90 F: FnMut(T, T1) -> T,
91{
92 fn on_next(&mut self, value: T1) {
93 self.value = (self.callback)(self.value.clone(), value);
94 self.observer.on_next(self.value.clone())
95 }
96
97 fn on_termination(self, termination: Termination<E>) {
98 self.observer.on_termination(termination)
99 }
100}