rx_rust/operators/transforming/
concat_map.rs1use super::map::Map;
2use crate::utils::types::NecessarySendSync;
3use crate::{
4 disposable::subscription::Subscription,
5 observable::{Observable, observable_ext::ObservableExt},
6 observer::Observer,
7 utils::types::MarkerType,
8};
9use educe::Educe;
10use std::marker::PhantomData;
11
12#[derive(Educe)]
41#[educe(Debug, Clone)]
42pub struct ConcatMap<T0, OE, OE1, F> {
43 source: OE,
44 callback: F,
45 _marker: MarkerType<(T0, OE1)>,
46}
47
48impl<T0, OE, OE1, F> ConcatMap<T0, OE, OE1, F> {
49 pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
50 where
51 OE: Observable<'or, 'sub, T0, E>,
52 OE1: Observable<'or, 'sub, T, E>,
53 F: FnMut(T0) -> OE1,
54 {
55 Self {
56 source,
57 callback,
58 _marker: PhantomData,
59 }
60 }
61}
62
63impl<'or, 'sub, T0, T, E, OE, OE1, F> Observable<'or, 'sub, T, E> for ConcatMap<T0, OE, OE1, F>
64where
65 T: 'or,
66 E: 'or,
67 OE: Observable<'or, 'sub, T0, E>,
68 OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'sub,
69 F: FnMut(T0) -> OE1 + NecessarySendSync + 'or,
70 'sub: 'or,
71{
72 fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
73 let observable = Map::new(self.source, self.callback);
74 let observable = observable.concat_all();
75 observable.subscribe(observer)
76 }
77}