another_rxrust/operators/
map.rs

1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct Map<'a, In, Out>
6where
7  In: Clone + Send + Sync,
8  Out: Clone + Send + Sync,
9{
10  map_f: FunctionWrapper<'a, In, Out>,
11}
12
13impl<'a, In, Out> Map<'a, In, Out>
14where
15  In: Clone + Send + Sync,
16  Out: Clone + Send + Sync,
17{
18  pub fn new<F>(f: F) -> Map<'a, In, Out>
19  where
20    F: Fn(In) -> Out + Send + Sync + 'a,
21  {
22    Map { map_f: FunctionWrapper::new(f) }
23  }
24  pub fn execute(&self, source: Observable<'a, In>) -> Observable<'a, Out> {
25    let f = self.map_f.clone();
26
27    Observable::<Out>::create(move |s| {
28      let f = f.clone();
29
30      let sctl = StreamController::new(s);
31      let sctl_next = sctl.clone();
32      let sctl_error = sctl.clone();
33      let sctl_complete = sctl.clone();
34
35      source.inner_subscribe(sctl.new_observer(
36        move |_, x| {
37          sctl_next.sink_next(f.call(x));
38        },
39        move |_, e| {
40          sctl_error.sink_error(e);
41        },
42        move |serial| sctl_complete.sink_complete(&serial),
43      ));
44    })
45  }
46}
47
48impl<'a, Item> Observable<'a, Item>
49where
50  Item: Clone + Send + Sync,
51{
52  pub fn map<Out, F>(&self, f: F) -> Observable<'a, Out>
53  where
54    F: Fn(Item) -> Out + Send + Sync + 'a,
55    Out: Clone + Send + Sync,
56  {
57    Map::new(f).execute(self.clone())
58  }
59}
60
61#[cfg(test)]
62mod test {
63  use crate::prelude::*;
64  use std::{thread, time};
65
66  #[test]
67  fn basic() {
68    let o = Observable::create(|s| {
69      for n in 0..10 {
70        s.next(n);
71      }
72      s.complete();
73    });
74
75    o.map(|x| x * 2).subscribe(
76      print_next_fmt!("{}"),
77      print_error!(),
78      print_complete!(),
79    );
80  }
81
82  #[test]
83  fn map_thread() {
84    let o = Observable::create(|s| {
85      thread::spawn(move || {
86        for n in 0..100 {
87          if !s.is_subscribed() {
88            println!("break!");
89            break;
90          }
91          s.next(n);
92          thread::sleep(time::Duration::from_millis(100));
93        }
94        if s.is_subscribed() {
95          s.complete();
96        }
97      });
98    });
99    let binding = o.map(|x| format!("str {}", x));
100    let sbsc = binding.subscribe(
101      print_next_fmt!("{}"),
102      print_error!(),
103      print_complete!(),
104    );
105    thread::sleep(time::Duration::from_millis(500));
106    sbsc.unsubscribe();
107    thread::sleep(time::Duration::from_millis(500));
108  }
109}