rx_rust/operators/transforming/
scan.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6    utils::types::MarkerType,
7};
8use educe::Educe;
9use std::marker::PhantomData;
10
11/// Applies a function to each item emitted by an Observable, sequentially, and emits each intermediate accumulated value.
12/// See <https://reactivex.io/documentation/operators/scan.html>
13///
14/// # Examples
15/// ```rust
16/// use rx_rust::{
17///     observable::observable_ext::ObservableExt,
18///     observer::Termination,
19///     operators::{
20///         creating::from_iter::FromIter,
21///         transforming::scan::Scan,
22///     },
23/// };
24///
25/// let mut values = Vec::new();
26/// let mut terminations = Vec::new();
27///
28/// let observable = Scan::new(FromIter::new(vec![1, 2, 3]), 0, |acc, value| acc + value);
29/// observable.subscribe_with_callback(
30///     |value| values.push(value),
31///     |termination| terminations.push(termination),
32/// );
33///
34/// assert_eq!(values, vec![1, 3, 6]);
35/// assert_eq!(terminations, vec![Termination::Completed]);
36/// ```
37#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct Scan<T, T1, OE, F> {
40    source: OE,
41    initial_value: T,
42    callback: F,
43    _marker: MarkerType<T1>,
44}
45
46impl<T, T1, OE, F> Scan<T, T1, OE, F> {
47    pub fn new<'or, 'sub, E>(source: OE, initial_value: T, callback: F) -> Self
48    where
49        OE: Observable<'or, 'sub, T1, E>,
50        F: FnMut(T, T1) -> T,
51    {
52        Self {
53            source,
54            initial_value,
55            callback,
56            _marker: PhantomData,
57        }
58    }
59}
60
61impl<'or, 'sub, T, T1, E, OE, F> Observable<'or, 'sub, T, E> for Scan<T, T1, OE, F>
62where
63    T: Clone + NecessarySendSync + 'or,
64    OE: Observable<'or, 'sub, T1, E>,
65    F: FnMut(T, T1) -> T + NecessarySendSync + 'or,
66{
67    fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
68        let observer = ScanObserver {
69            observer,
70            value: self.initial_value,
71            callback: self.callback,
72        };
73        self.source.subscribe(observer)
74    }
75}
76
77struct ScanObserver<T, OR, F> {
78    observer: OR,
79    value: T,
80    callback: F,
81}
82
83impl<T, T1, E, OR, F> Observer<T1, E> for ScanObserver<T, OR, F>
84where
85    T: Clone,
86    OR: Observer<T, E>,
87    F: FnMut(T, T1) -> T,
88{
89    fn on_next(&mut self, value: T1) {
90        self.value = (self.callback)(self.value.clone(), value);
91        self.observer.on_next(self.value.clone())
92    }
93
94    fn on_termination(self, termination: Termination<E>) {
95        self.observer.on_termination(termination)
96    }
97}