another_rxrust/operators/
publish.rs1use crate::prelude::*;
2use subject::Subject;
3
4#[derive(Clone)]
5pub struct Publish<'a, Item>
6where
7 Item: Clone + Send + Sync,
8{
9 sbj: subjects::Subject<'a, Item>,
10 source: Observable<'a, Item>,
11}
12
13impl<'a, Item> Publish<'a, Item>
14where
15 Item: Clone + Send + Sync,
16{
17 pub fn new(source: Observable<'a, Item>) -> Publish<'a, Item> {
18 Publish { sbj: Subject::<Item>::new(), source }
19 }
20
21 pub fn observable(&self) -> Observable<'a, Item> {
22 self.sbj.observable()
23 }
24
25 pub fn connect(&self) -> Subscription<'a> {
26 let sbj_next = self.sbj.clone();
27 let sbj_error = self.sbj.clone();
28 let sbj_complete = self.sbj.clone();
29
30 self.source.subscribe(
31 move |x| {
32 sbj_next.next(x);
33 },
34 move |e| {
35 sbj_error.error(e);
36 },
37 move || {
38 sbj_complete.complete();
39 },
40 )
41 }
42}
43
44impl<'a, Item> Observable<'a, Item>
45where
46 Item: Clone + Send + Sync,
47{
48 pub fn publish(&self) -> publish::Publish<'a, Item> {
49 Publish::new(self.clone())
50 }
51}
52
53#[cfg(all(test, not(feature = "web")))]
54mod test {
55 use crate::prelude::*;
56 use crate::{print_complete, print_error, print_next_fmt};
57 use schedulers::new_thread_scheduler;
58 use std::{thread, time};
59
60 #[test]
61 fn basic() {
62 let o = observables::interval(
63 time::Duration::from_millis(100),
64 new_thread_scheduler(),
65 )
66 .tap(
67 print_next_fmt!("tap {}"),
68 print_error!(),
69 print_complete!(),
70 )
71 .publish();
72 let obs = o.observable();
73
74 println!("start #1");
75 let sbsc1 = obs.subscribe(
76 print_next_fmt!("#1 {}"),
77 print_error!(),
78 print_complete!(),
79 );
80
81 println!("connect");
82 let breaker = o.connect();
83 thread::sleep(time::Duration::from_millis(500));
84
85 println!("start #1");
86 let sbsc2 = obs.subscribe(
87 print_next_fmt!("#2 {}"),
88 print_error!(),
89 print_complete!(),
90 );
91
92 thread::sleep(time::Duration::from_millis(500));
93
94 println!("end #1");
95 sbsc1.unsubscribe();
96 thread::sleep(time::Duration::from_millis(500));
97
98 println!("end #2");
99 sbsc2.unsubscribe();
100 thread::sleep(time::Duration::from_millis(500));
101
102 println!("braker");
103 breaker.unsubscribe();
104 thread::sleep(time::Duration::from_millis(500));
105 }
106
107 #[test]
108 fn first_connect() {
109 let o = observables::interval(
110 time::Duration::from_millis(100),
111 new_thread_scheduler(),
112 )
113 .tap(
114 print_next_fmt!("tap {}"),
115 print_error!(),
116 print_complete!(),
117 )
118 .publish();
119 let obs = o.observable();
120
121 println!("connect");
122 let breaker = o.connect();
123 thread::sleep(time::Duration::from_millis(500));
124
125 println!("start #1");
126 let sbsc1 = obs.subscribe(
127 print_next_fmt!("#1 {}"),
128 print_error!(),
129 print_complete!(),
130 );
131
132 println!("start #1");
133 let sbsc2 = obs.subscribe(
134 print_next_fmt!("#2 {}"),
135 print_error!(),
136 print_complete!(),
137 );
138
139 thread::sleep(time::Duration::from_millis(500));
140
141 println!("end #1");
142 sbsc1.unsubscribe();
143 thread::sleep(time::Duration::from_millis(500));
144
145 println!("end #2");
146 sbsc2.unsubscribe();
147 thread::sleep(time::Duration::from_millis(500));
148
149 println!("braker");
150 breaker.unsubscribe();
151 thread::sleep(time::Duration::from_millis(500));
152 }
153}