rx_rust/operators/transforming/
switch_map.rs

1use super::map::Map;
2use crate::utils::types::NecessarySendSync;
3use crate::{
4    disposable::subscription::Subscription,
5    observable::{Observable, observable_ext::ObservableExt},
6    observer::Observer,
7    utils::types::MarkerType,
8};
9use educe::Educe;
10use std::marker::PhantomData;
11
12/// Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable.
13/// See <https://reactivex.io/documentation/operators/flatmap.html>
14///
15/// # Examples
16/// ```rust
17/// use rx_rust::{
18///     observable::observable_ext::ObservableExt,
19///     observer::Termination,
20///     operators::{
21///         creating::from_iter::FromIter,
22///         transforming::switch_map::SwitchMap,
23///     },
24/// };
25///
26/// let mut values = Vec::new();
27/// let mut terminations = Vec::new();
28///
29/// let observable = SwitchMap::new(FromIter::new(vec![1, 2]), |value| {
30///     FromIter::new(vec![value, value + 10])
31/// });
32/// observable.subscribe_with_callback(
33///     |value| values.push(value),
34///     |termination| terminations.push(termination),
35/// );
36///
37/// assert_eq!(values, vec![1, 11, 2, 12]);
38/// assert_eq!(terminations, vec![Termination::Completed]);
39/// ```
40#[derive(Educe)]
41#[educe(Debug, Clone)]
42pub struct SwitchMap<T0, OE, OE1, F> {
43    source: OE,
44    callback: F,
45    _marker: MarkerType<(T0, OE1)>,
46}
47
48impl<T0, OE, OE1, F> SwitchMap<T0, OE, OE1, F> {
49    pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
50    where
51        OE: Observable<'or, 'sub, T0, E>,
52        OE1: Observable<'or, 'sub, T, E>,
53        F: FnMut(T0) -> OE1,
54    {
55        Self {
56            source,
57            callback,
58            _marker: PhantomData,
59        }
60    }
61}
62
63impl<'or, 'sub, T0, T, E, OE, OE1, F> Observable<'or, 'sub, T, E> for SwitchMap<T0, OE, OE1, F>
64where
65    T: 'or,
66    OE: Observable<'or, 'sub, T0, E>,
67    OE1: Observable<'or, 'sub, T, E>,
68    F: FnMut(T0) -> OE1 + NecessarySendSync + 'or,
69    'sub: 'or,
70{
71    fn subscribe(
72        self,
73        observer: impl Observer<T, E> + NecessarySendSync + 'or,
74    ) -> Subscription<'sub> {
75        let observable = Map::new(self.source, self.callback);
76        let observable = observable.switch();
77        observable.subscribe(observer)
78    }
79}