another_rxrust/
observable.rs

1use crate::internals::function_wrapper::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct Observable<'a, Item>
6where
7  Item: Clone + Send + Sync,
8{
9  source: FunctionWrapper<'a, Observer<'a, Item>, ()>,
10}
11
12impl<'a, Item> Observable<'a, Item>
13where
14  Item: Clone + Send + Sync,
15{
16  pub fn create<Source>(source: Source) -> Observable<'a, Item>
17  where
18    Source: Fn(Observer<'a, Item>) + Send + Sync + 'a,
19  {
20    Observable { source: FunctionWrapper::new(source) }
21  }
22
23  pub(crate) fn inner_subscribe(
24    &self,
25    observer: Observer<'a, Item>,
26  ) -> Subscription<'a> {
27    let unsub_observer = observer.clone();
28    let issub_observer = observer.clone();
29    self.source.call(observer.clone());
30    Subscription::new(
31      move || {
32        unsub_observer.unsubscribe();
33      },
34      move || issub_observer.is_subscribed(),
35    )
36  }
37
38  pub fn subscribe<Next, Error, Complete>(
39    &self,
40    next: Next,
41    error: Error,
42    complete: Complete,
43  ) -> Subscription<'a>
44  where
45    Next: Fn(Item) + Send + Sync + 'a,
46    Error: Fn(RxError) + Send + Sync + 'a,
47    Complete: Fn() + Send + Sync + 'a,
48  {
49    self.inner_subscribe(Observer::new(next, error, complete))
50  }
51}
52
53#[cfg(test)]
54mod test {
55  use crate::prelude::*;
56  use std::{thread, time};
57
58  #[test]
59  fn basic() {
60    let o = Observable::create(|s| {
61      for n in 0..10 {
62        s.next(n);
63      }
64      s.complete();
65    });
66
67    o.subscribe(
68      print_next_fmt!("{}"),
69      print_error!(),
70      print_complete!(),
71    );
72
73    o.subscribe(
74      print_next_fmt!("{}"),
75      print_error!(),
76      print_complete!(),
77    );
78  }
79
80  #[test]
81  fn thread() {
82    let o = Observable::create(|s| {
83      thread::spawn(move || {
84        for n in 0..100 {
85          if !s.is_subscribed() {
86            break;
87          }
88          s.next(n);
89        }
90        if s.is_subscribed() {
91          s.complete();
92        }
93      });
94    });
95
96    o.subscribe(
97      print_next_fmt!("{}"),
98      print_error!(),
99      print_complete!(),
100    );
101    println!("started");
102  }
103
104  #[test]
105  fn unsubscribe() {
106    let o = Observable::create(|s| {
107      thread::spawn(move || {
108        for n in 0..100 {
109          if !s.is_subscribed() {
110            println!("break!");
111            break;
112          }
113          s.next(n);
114          thread::sleep(time::Duration::from_millis(100));
115        }
116        if s.is_subscribed() {
117          s.complete();
118        }
119      });
120    });
121
122    let sbsc = o.subscribe(
123      print_next_fmt!("{}"),
124      print_error!(),
125      print_complete!(),
126    );
127    println!("started");
128    thread::sleep(time::Duration::from_millis(1000));
129    sbsc.unsubscribe();
130    thread::sleep(time::Duration::from_millis(1000));
131  }
132
133  #[test]
134  fn move_to_closure() {
135    let o = Observable::create(|s| {
136      s.next(1);
137      s.complete();
138    });
139    let oo = o.clone(); // prepare for `move`ing to closure
140    o.flat_map(move |_| {
141      // Be sure to clone and use the moved `oo`.
142      let ooo = oo.clone(); // prepare for `move`ing to closure
143
144      // `oo` must be cloned or an error will occur.
145      oo.clone().flat_map(move |_| {
146        // Be sure to clone and use the moved `ooo`.
147        return ooo.clone();
148      })
149    })
150    .subscribe(
151      print_next_fmt!("{}"),
152      print_error!(),
153      print_complete!(),
154    );
155  }
156}