another_rxrust/operators/
flat_map.rs

1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct FlatMap<'a, In, Out>
6where
7  In: Clone + Send + Sync,
8  Out: Clone + Send + Sync,
9{
10  flatmap_f: FunctionWrapper<'a, In, Observable<'a, Out>>,
11}
12
13impl<'a, In, Out> FlatMap<'a, In, Out>
14where
15  In: Clone + Send + Sync + 'a,
16  Out: Clone + Send + Sync + 'a,
17{
18  pub fn new<F>(f: F) -> FlatMap<'a, In, Out>
19  where
20    F: Fn(In) -> Observable<'a, Out> + Send + Sync + 'a,
21  {
22    FlatMap { flatmap_f: FunctionWrapper::new(f) }
23  }
24  pub fn execute(&self, source: Observable<'a, In>) -> Observable<'a, Out> {
25    let f = self.flatmap_f.clone();
26
27    Observable::create(move |s| {
28      let f = f.clone();
29
30      let sctl = StreamController::new(s);
31      let sctl_next = sctl.clone();
32      let sctl_error = sctl.clone();
33      let sctl_complete = sctl.clone();
34
35      source.inner_subscribe(sctl.new_observer(
36        move |_, x| {
37          let sctl_next_next = sctl_next.clone();
38          let sctl_next_error = sctl_next.clone();
39          let sctl_next_complete = sctl_next.clone();
40
41          f.call(x).inner_subscribe(sctl_next.new_observer(
42            move |_, xx| {
43              sctl_next_next.sink_next(xx);
44            },
45            move |_, ee| {
46              sctl_next_error.sink_error(ee);
47            },
48            move |serial| {
49              sctl_next_complete.sink_complete(&serial);
50            },
51          ));
52        },
53        move |_, e| {
54          sctl_error.sink_error(e);
55        },
56        move |serial| {
57          sctl_complete.sink_complete(&serial);
58        },
59      ));
60    })
61  }
62}
63
64impl<'a, Item> Observable<'a, Item>
65where
66  Item: Clone + Send + Sync,
67{
68  pub fn flat_map<Out, F>(&self, f: F) -> Observable<'a, Out>
69  where
70    F: Fn(Item) -> Observable<'a, Out> + Send + Sync + 'a,
71    Out: Clone + Send + Sync,
72  {
73    FlatMap::new(f).execute(self.clone())
74  }
75}
76
77#[cfg(test)]
78mod test {
79  use crate::prelude::*;
80  use std::{thread, time};
81
82  #[test]
83  fn basic() {
84    let o = Observable::create(|s| {
85      for n in 0..10 {
86        s.next(n);
87      }
88      s.complete();
89    });
90
91    o.flat_map(|x| observables::just(x * 2)).subscribe(
92      print_next_fmt!("{}"),
93      print_error!(),
94      print_complete!(),
95    );
96  }
97
98  #[test]
99  fn thread() {
100    let o = Observable::create(|s| {
101      thread::spawn(move || {
102        for n in 0..100 {
103          if !s.is_subscribed() {
104            println!("break!");
105            break;
106          }
107          s.next(n);
108          thread::sleep(time::Duration::from_millis(100));
109        }
110        if s.is_subscribed() {
111          s.complete();
112        }
113      });
114    });
115
116    let binding = o.flat_map(|x| observables::just(format!("str {}", x)));
117    let sbsc = binding.subscribe(
118      print_next_fmt!("{}"),
119      print_error!(),
120      print_complete!(),
121    );
122    thread::sleep(time::Duration::from_millis(500));
123    sbsc.unsubscribe();
124    thread::sleep(time::Duration::from_millis(500));
125  }
126
127  #[test]
128  fn composite() {
129    fn o() -> Observable<'static, i32> {
130      Observable::create(|s| {
131        thread::spawn(move || {
132          for n in 0..100 {
133            if !s.is_subscribed() {
134              println!("break!");
135              break;
136            }
137            s.next(n);
138            thread::sleep(time::Duration::from_millis(100));
139          }
140          if s.is_subscribed() {
141            s.complete();
142          }
143        });
144      })
145    }
146
147    let binding = o().flat_map(move |_x| o());
148    let sbsc = binding.subscribe(
149      print_next_fmt!("{}"),
150      print_error!(),
151      print_complete!(),
152    );
153    thread::sleep(time::Duration::from_millis(500));
154    sbsc.unsubscribe();
155    thread::sleep(time::Duration::from_millis(500));
156  }
157}