another_rxrust/operators/
map.rs1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct Map<'a, In, Out>
6where
7 In: Clone + Send + Sync,
8 Out: Clone + Send + Sync,
9{
10 map_f: FunctionWrapper<'a, In, Out>,
11}
12
13impl<'a, In, Out> Map<'a, In, Out>
14where
15 In: Clone + Send + Sync,
16 Out: Clone + Send + Sync,
17{
18 pub fn new<F>(f: F) -> Map<'a, In, Out>
19 where
20 F: Fn(In) -> Out + Send + Sync + 'a,
21 {
22 Map { map_f: FunctionWrapper::new(f) }
23 }
24 pub fn execute(&self, source: Observable<'a, In>) -> Observable<'a, Out> {
25 let f = self.map_f.clone();
26
27 Observable::<Out>::create(move |s| {
28 let f = f.clone();
29
30 let sctl = StreamController::new(s);
31 let sctl_next = sctl.clone();
32 let sctl_error = sctl.clone();
33 let sctl_complete = sctl.clone();
34
35 source.inner_subscribe(sctl.new_observer(
36 move |_, x| {
37 sctl_next.sink_next(f.call(x));
38 },
39 move |_, e| {
40 sctl_error.sink_error(e);
41 },
42 move |serial| sctl_complete.sink_complete(&serial),
43 ));
44 })
45 }
46}
47
48impl<'a, Item> Observable<'a, Item>
49where
50 Item: Clone + Send + Sync,
51{
52 pub fn map<Out, F>(&self, f: F) -> Observable<'a, Out>
53 where
54 F: Fn(Item) -> Out + Send + Sync + 'a,
55 Out: Clone + Send + Sync,
56 {
57 Map::new(f).execute(self.clone())
58 }
59}
60
61#[cfg(test)]
62mod test {
63 use crate::prelude::*;
64 use std::{thread, time};
65
66 #[test]
67 fn basic() {
68 let o = Observable::create(|s| {
69 for n in 0..10 {
70 s.next(n);
71 }
72 s.complete();
73 });
74
75 o.map(|x| x * 2).subscribe(
76 print_next_fmt!("{}"),
77 print_error!(),
78 print_complete!(),
79 );
80 }
81
82 #[test]
83 fn map_thread() {
84 let o = Observable::create(|s| {
85 thread::spawn(move || {
86 for n in 0..100 {
87 if !s.is_subscribed() {
88 println!("break!");
89 break;
90 }
91 s.next(n);
92 thread::sleep(time::Duration::from_millis(100));
93 }
94 if s.is_subscribed() {
95 s.complete();
96 }
97 });
98 });
99 let binding = o.map(|x| format!("str {}", x));
100 let sbsc = binding.subscribe(
101 print_next_fmt!("{}"),
102 print_error!(),
103 print_complete!(),
104 );
105 thread::sleep(time::Duration::from_millis(500));
106 sbsc.unsubscribe();
107 thread::sleep(time::Duration::from_millis(500));
108 }
109}