another_rxrust/operators/
publish.rs

1use crate::prelude::*;
2use subject::Subject;
3
4#[derive(Clone)]
5pub struct Publish<'a, Item>
6where
7  Item: Clone + Send + Sync,
8{
9  sbj: subjects::Subject<'a, Item>,
10  source: Observable<'a, Item>,
11}
12
13impl<'a, Item> Publish<'a, Item>
14where
15  Item: Clone + Send + Sync,
16{
17  pub fn new(source: Observable<'a, Item>) -> Publish<'a, Item> {
18    Publish { sbj: Subject::<Item>::new(), source }
19  }
20
21  pub fn observable(&self) -> Observable<'a, Item> {
22    self.sbj.observable()
23  }
24
25  pub fn connect(&self) -> Subscription<'a> {
26    let sbj_next = self.sbj.clone();
27    let sbj_error = self.sbj.clone();
28    let sbj_complete = self.sbj.clone();
29
30    self.source.subscribe(
31      move |x| {
32        sbj_next.next(x);
33      },
34      move |e| {
35        sbj_error.error(e);
36      },
37      move || {
38        sbj_complete.complete();
39      },
40    )
41  }
42}
43
44impl<'a, Item> Observable<'a, Item>
45where
46  Item: Clone + Send + Sync,
47{
48  pub fn publish(&self) -> publish::Publish<'a, Item> {
49    Publish::new(self.clone())
50  }
51}
52
53#[cfg(all(test, not(feature = "web")))]
54mod test {
55  use crate::prelude::*;
56  use crate::{print_complete, print_error, print_next_fmt};
57  use schedulers::new_thread_scheduler;
58  use std::{thread, time};
59
60  #[test]
61  fn basic() {
62    let o = observables::interval(
63      time::Duration::from_millis(100),
64      new_thread_scheduler(),
65    )
66    .tap(
67      print_next_fmt!("tap {}"),
68      print_error!(),
69      print_complete!(),
70    )
71    .publish();
72    let obs = o.observable();
73
74    println!("start #1");
75    let sbsc1 = obs.subscribe(
76      print_next_fmt!("#1 {}"),
77      print_error!(),
78      print_complete!(),
79    );
80
81    println!("connect");
82    let breaker = o.connect();
83    thread::sleep(time::Duration::from_millis(500));
84
85    println!("start #1");
86    let sbsc2 = obs.subscribe(
87      print_next_fmt!("#2 {}"),
88      print_error!(),
89      print_complete!(),
90    );
91
92    thread::sleep(time::Duration::from_millis(500));
93
94    println!("end #1");
95    sbsc1.unsubscribe();
96    thread::sleep(time::Duration::from_millis(500));
97
98    println!("end #2");
99    sbsc2.unsubscribe();
100    thread::sleep(time::Duration::from_millis(500));
101
102    println!("braker");
103    breaker.unsubscribe();
104    thread::sleep(time::Duration::from_millis(500));
105  }
106
107  #[test]
108  fn first_connect() {
109    let o = observables::interval(
110      time::Duration::from_millis(100),
111      new_thread_scheduler(),
112    )
113    .tap(
114      print_next_fmt!("tap {}"),
115      print_error!(),
116      print_complete!(),
117    )
118    .publish();
119    let obs = o.observable();
120
121    println!("connect");
122    let breaker = o.connect();
123    thread::sleep(time::Duration::from_millis(500));
124
125    println!("start #1");
126    let sbsc1 = obs.subscribe(
127      print_next_fmt!("#1 {}"),
128      print_error!(),
129      print_complete!(),
130    );
131
132    println!("start #1");
133    let sbsc2 = obs.subscribe(
134      print_next_fmt!("#2 {}"),
135      print_error!(),
136      print_complete!(),
137    );
138
139    thread::sleep(time::Duration::from_millis(500));
140
141    println!("end #1");
142    sbsc1.unsubscribe();
143    thread::sleep(time::Duration::from_millis(500));
144
145    println!("end #2");
146    sbsc2.unsubscribe();
147    thread::sleep(time::Duration::from_millis(500));
148
149    println!("braker");
150    breaker.unsubscribe();
151    thread::sleep(time::Duration::from_millis(500));
152  }
153}