another_rxrust/operators/
flat_map.rs1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct FlatMap<'a, In, Out>
6where
7 In: Clone + Send + Sync,
8 Out: Clone + Send + Sync,
9{
10 flatmap_f: FunctionWrapper<'a, In, Observable<'a, Out>>,
11}
12
13impl<'a, In, Out> FlatMap<'a, In, Out>
14where
15 In: Clone + Send + Sync + 'a,
16 Out: Clone + Send + Sync + 'a,
17{
18 pub fn new<F>(f: F) -> FlatMap<'a, In, Out>
19 where
20 F: Fn(In) -> Observable<'a, Out> + Send + Sync + 'a,
21 {
22 FlatMap { flatmap_f: FunctionWrapper::new(f) }
23 }
24 pub fn execute(&self, source: Observable<'a, In>) -> Observable<'a, Out> {
25 let f = self.flatmap_f.clone();
26
27 Observable::create(move |s| {
28 let f = f.clone();
29
30 let sctl = StreamController::new(s);
31 let sctl_next = sctl.clone();
32 let sctl_error = sctl.clone();
33 let sctl_complete = sctl.clone();
34
35 source.inner_subscribe(sctl.new_observer(
36 move |_, x| {
37 let sctl_next_next = sctl_next.clone();
38 let sctl_next_error = sctl_next.clone();
39 let sctl_next_complete = sctl_next.clone();
40
41 f.call(x).inner_subscribe(sctl_next.new_observer(
42 move |_, xx| {
43 sctl_next_next.sink_next(xx);
44 },
45 move |_, ee| {
46 sctl_next_error.sink_error(ee);
47 },
48 move |serial| {
49 sctl_next_complete.sink_complete(&serial);
50 },
51 ));
52 },
53 move |_, e| {
54 sctl_error.sink_error(e);
55 },
56 move |serial| {
57 sctl_complete.sink_complete(&serial);
58 },
59 ));
60 })
61 }
62}
63
64impl<'a, Item> Observable<'a, Item>
65where
66 Item: Clone + Send + Sync,
67{
68 pub fn flat_map<Out, F>(&self, f: F) -> Observable<'a, Out>
69 where
70 F: Fn(Item) -> Observable<'a, Out> + Send + Sync + 'a,
71 Out: Clone + Send + Sync,
72 {
73 FlatMap::new(f).execute(self.clone())
74 }
75}
76
77#[cfg(test)]
78mod test {
79 use crate::prelude::*;
80 use std::{thread, time};
81
82 #[test]
83 fn basic() {
84 let o = Observable::create(|s| {
85 for n in 0..10 {
86 s.next(n);
87 }
88 s.complete();
89 });
90
91 o.flat_map(|x| observables::just(x * 2)).subscribe(
92 print_next_fmt!("{}"),
93 print_error!(),
94 print_complete!(),
95 );
96 }
97
98 #[test]
99 fn thread() {
100 let o = Observable::create(|s| {
101 thread::spawn(move || {
102 for n in 0..100 {
103 if !s.is_subscribed() {
104 println!("break!");
105 break;
106 }
107 s.next(n);
108 thread::sleep(time::Duration::from_millis(100));
109 }
110 if s.is_subscribed() {
111 s.complete();
112 }
113 });
114 });
115
116 let binding = o.flat_map(|x| observables::just(format!("str {}", x)));
117 let sbsc = binding.subscribe(
118 print_next_fmt!("{}"),
119 print_error!(),
120 print_complete!(),
121 );
122 thread::sleep(time::Duration::from_millis(500));
123 sbsc.unsubscribe();
124 thread::sleep(time::Duration::from_millis(500));
125 }
126
127 #[test]
128 fn composite() {
129 fn o() -> Observable<'static, i32> {
130 Observable::create(|s| {
131 thread::spawn(move || {
132 for n in 0..100 {
133 if !s.is_subscribed() {
134 println!("break!");
135 break;
136 }
137 s.next(n);
138 thread::sleep(time::Duration::from_millis(100));
139 }
140 if s.is_subscribed() {
141 s.complete();
142 }
143 });
144 })
145 }
146
147 let binding = o().flat_map(move |_x| o());
148 let sbsc = binding.subscribe(
149 print_next_fmt!("{}"),
150 print_error!(),
151 print_complete!(),
152 );
153 thread::sleep(time::Duration::from_millis(500));
154 sbsc.unsubscribe();
155 thread::sleep(time::Duration::from_millis(500));
156 }
157}