another_rxrust/
observable.rs1use crate::internals::function_wrapper::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct Observable<'a, Item>
6where
7 Item: Clone + Send + Sync,
8{
9 source: FunctionWrapper<'a, Observer<'a, Item>, ()>,
10}
11
12impl<'a, Item> Observable<'a, Item>
13where
14 Item: Clone + Send + Sync,
15{
16 pub fn create<Source>(source: Source) -> Observable<'a, Item>
17 where
18 Source: Fn(Observer<'a, Item>) + Send + Sync + 'a,
19 {
20 Observable { source: FunctionWrapper::new(source) }
21 }
22
23 pub(crate) fn inner_subscribe(
24 &self,
25 observer: Observer<'a, Item>,
26 ) -> Subscription<'a> {
27 let unsub_observer = observer.clone();
28 let issub_observer = observer.clone();
29 self.source.call(observer.clone());
30 Subscription::new(
31 move || {
32 unsub_observer.unsubscribe();
33 },
34 move || issub_observer.is_subscribed(),
35 )
36 }
37
38 pub fn subscribe<Next, Error, Complete>(
39 &self,
40 next: Next,
41 error: Error,
42 complete: Complete,
43 ) -> Subscription<'a>
44 where
45 Next: Fn(Item) + Send + Sync + 'a,
46 Error: Fn(RxError) + Send + Sync + 'a,
47 Complete: Fn() + Send + Sync + 'a,
48 {
49 self.inner_subscribe(Observer::new(next, error, complete))
50 }
51}
52
53#[cfg(test)]
54mod test {
55 use crate::prelude::*;
56 use std::{thread, time};
57
58 #[test]
59 fn basic() {
60 let o = Observable::create(|s| {
61 for n in 0..10 {
62 s.next(n);
63 }
64 s.complete();
65 });
66
67 o.subscribe(
68 print_next_fmt!("{}"),
69 print_error!(),
70 print_complete!(),
71 );
72
73 o.subscribe(
74 print_next_fmt!("{}"),
75 print_error!(),
76 print_complete!(),
77 );
78 }
79
80 #[test]
81 fn thread() {
82 let o = Observable::create(|s| {
83 thread::spawn(move || {
84 for n in 0..100 {
85 if !s.is_subscribed() {
86 break;
87 }
88 s.next(n);
89 }
90 if s.is_subscribed() {
91 s.complete();
92 }
93 });
94 });
95
96 o.subscribe(
97 print_next_fmt!("{}"),
98 print_error!(),
99 print_complete!(),
100 );
101 println!("started");
102 }
103
104 #[test]
105 fn unsubscribe() {
106 let o = Observable::create(|s| {
107 thread::spawn(move || {
108 for n in 0..100 {
109 if !s.is_subscribed() {
110 println!("break!");
111 break;
112 }
113 s.next(n);
114 thread::sleep(time::Duration::from_millis(100));
115 }
116 if s.is_subscribed() {
117 s.complete();
118 }
119 });
120 });
121
122 let sbsc = o.subscribe(
123 print_next_fmt!("{}"),
124 print_error!(),
125 print_complete!(),
126 );
127 println!("started");
128 thread::sleep(time::Duration::from_millis(1000));
129 sbsc.unsubscribe();
130 thread::sleep(time::Duration::from_millis(1000));
131 }
132
133 #[test]
134 fn move_to_closure() {
135 let o = Observable::create(|s| {
136 s.next(1);
137 s.complete();
138 });
139 let oo = o.clone(); o.flat_map(move |_| {
141 let ooo = oo.clone(); oo.clone().flat_map(move |_| {
146 return ooo.clone();
148 })
149 })
150 .subscribe(
151 print_next_fmt!("{}"),
152 print_error!(),
153 print_complete!(),
154 );
155 }
156}