rx_rust/operators/transforming/
map.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6    utils::types::MarkerType,
7};
8use educe::Educe;
9use std::marker::PhantomData;
10
11/// Transforms items emitted by an Observable by applying a function to each item.
12/// See <https://reactivex.io/documentation/operators/map.html>
13///
14/// # Examples
15/// ```rust
16/// use rx_rust::{
17///     observable::observable_ext::ObservableExt,
18///     observer::Termination,
19///     operators::{
20///         creating::from_iter::FromIter,
21///         transforming::map::Map,
22///     },
23/// };
24///
25/// let mut values = Vec::new();
26/// let mut terminations = Vec::new();
27///
28/// let observable = Map::new(FromIter::new(vec![1, 2]), |value| value * 10);
29/// observable.subscribe_with_callback(
30///     |value| values.push(value),
31///     |termination| terminations.push(termination),
32/// );
33///
34/// assert_eq!(values, vec![10, 20]);
35/// assert_eq!(terminations, vec![Termination::Completed]);
36/// ```
37#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct Map<T0, OE, F> {
40    source: OE,
41    callback: F,
42    _marker: MarkerType<T0>,
43}
44
45impl<T0, OE, F> Map<T0, OE, F> {
46    pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
47    where
48        OE: Observable<'or, 'sub, T0, E>,
49        F: FnMut(T0) -> T,
50    {
51        Self {
52            source,
53            callback,
54            _marker: PhantomData,
55        }
56    }
57}
58
59impl<'or, 'sub, T0, T, E, OE, F> Observable<'or, 'sub, T, E> for Map<T0, OE, F>
60where
61    OE: Observable<'or, 'sub, T0, E>,
62    F: FnMut(T0) -> T + NecessarySendSync + 'or,
63{
64    fn subscribe(
65        self,
66        observer: impl Observer<T, E> + NecessarySendSync + 'or,
67    ) -> Subscription<'sub> {
68        let observer = MapObserver {
69            observer,
70            callback: self.callback,
71        };
72        self.source.subscribe(observer)
73    }
74}
75
76struct MapObserver<OR, F> {
77    observer: OR,
78    callback: F,
79}
80
81impl<T0, T, E, OR, F> Observer<T0, E> for MapObserver<OR, F>
82where
83    OR: Observer<T, E>,
84    F: FnMut(T0) -> T,
85{
86    fn on_next(&mut self, value: T0) {
87        self.observer.on_next((self.callback)(value))
88    }
89
90    fn on_termination(self, termination: Termination<E>) {
91        self.observer.on_termination(termination)
92    }
93}