rx_rust/operators/transforming/
scan.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6 utils::types::MarkerType,
7};
8use educe::Educe;
9use std::marker::PhantomData;
10
11#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct Scan<T, T1, OE, F> {
40 source: OE,
41 initial_value: T,
42 callback: F,
43 _marker: MarkerType<T1>,
44}
45
46impl<T, T1, OE, F> Scan<T, T1, OE, F> {
47 pub fn new<'or, 'sub, E>(source: OE, initial_value: T, callback: F) -> Self
48 where
49 OE: Observable<'or, 'sub, T1, E>,
50 F: FnMut(T, T1) -> T,
51 {
52 Self {
53 source,
54 initial_value,
55 callback,
56 _marker: PhantomData,
57 }
58 }
59}
60
61impl<'or, 'sub, T, T1, E, OE, F> Observable<'or, 'sub, T, E> for Scan<T, T1, OE, F>
62where
63 T: Clone + NecessarySendSync + 'or,
64 OE: Observable<'or, 'sub, T1, E>,
65 F: FnMut(T, T1) -> T + NecessarySendSync + 'or,
66{
67 fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
68 let observer = ScanObserver {
69 observer,
70 value: self.initial_value,
71 callback: self.callback,
72 };
73 self.source.subscribe(observer)
74 }
75}
76
77struct ScanObserver<T, OR, F> {
78 observer: OR,
79 value: T,
80 callback: F,
81}
82
83impl<T, T1, E, OR, F> Observer<T1, E> for ScanObserver<T, OR, F>
84where
85 T: Clone,
86 OR: Observer<T, E>,
87 F: FnMut(T, T1) -> T,
88{
89 fn on_next(&mut self, value: T1) {
90 self.value = (self.callback)(self.value.clone(), value);
91 self.observer.on_next(self.value.clone())
92 }
93
94 fn on_termination(self, termination: Termination<E>) {
95 self.observer.on_termination(termination)
96 }
97}