rx_rust/operators/transforming/
concat_map.rs

1use super::map::Map;
2use crate::utils::types::NecessarySendSync;
3use crate::{
4    disposable::subscription::Subscription,
5    observable::{Observable, observable_ext::ObservableExt},
6    observer::Observer,
7    utils::types::MarkerType,
8};
9use educe::Educe;
10use std::marker::PhantomData;
11
12/// Projects each source value to an Observable which is merged in a serialized fashion in the output Observable.
13/// See <https://reactivex.io/documentation/operators/flatmap.html>
14///
15/// # Examples
16/// ```rust
17/// use rx_rust::{
18///     observable::observable_ext::ObservableExt,
19///     observer::Termination,
20///     operators::{
21///         creating::from_iter::FromIter,
22///         transforming::concat_map::ConcatMap,
23///     },
24/// };
25///
26/// let mut values = Vec::new();
27/// let mut terminations = Vec::new();
28///
29/// let observable = ConcatMap::new(FromIter::new(vec![1, 2]), |value| {
30///     FromIter::new(vec![value, value + 10])
31/// });
32/// observable.subscribe_with_callback(
33///     |value| values.push(value),
34///     |termination| terminations.push(termination),
35/// );
36///
37/// assert_eq!(values, vec![1, 11, 2, 12]);
38/// assert_eq!(terminations, vec![Termination::Completed]);
39/// ```
40#[derive(Educe)]
41#[educe(Debug, Clone)]
42pub struct ConcatMap<T0, OE, OE1, F> {
43    source: OE,
44    callback: F,
45    _marker: MarkerType<(T0, OE1)>,
46}
47
48impl<T0, OE, OE1, F> ConcatMap<T0, OE, OE1, F> {
49    pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
50    where
51        OE: Observable<'or, 'sub, T0, E>,
52        OE1: Observable<'or, 'sub, T, E>,
53        F: FnMut(T0) -> OE1,
54    {
55        Self {
56            source,
57            callback,
58            _marker: PhantomData,
59        }
60    }
61}
62
63impl<'or, 'sub, T0, T, E, OE, OE1, F> Observable<'or, 'sub, T, E> for ConcatMap<T0, OE, OE1, F>
64where
65    T: 'or,
66    E: 'or,
67    OE: Observable<'or, 'sub, T0, E>,
68    OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'sub,
69    F: FnMut(T0) -> OE1 + NecessarySendSync + 'or,
70    'sub: 'or,
71{
72    fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
73        let observable = Map::new(self.source, self.callback);
74        let observable = observable.concat_all();
75        observable.subscribe(observer)
76    }
77}