rxr/subjects/async_subject.rs
1use std::{
2 error::Error,
3 sync::{Arc, Mutex},
4};
5
6use crate::{
7 observer::Observer,
8 subscribe::Unsubscribeable,
9 subscription::subscribe::{
10 Subscribeable, Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic,
11 },
12 Observable,
13};
14
15/// A specialized `Subject` variant emits its latest value to observers upon completion.
16///
17/// `AsyncSubject` captures and broadcasts the last emitted value from a source
18/// observable, but this broadcasting occurs only after the source observable
19/// completes. It sends this value to all new subscriptions.
20///
21/// If an error is invoked in the source observable, the `AsyncSubject` will not emit
22/// the latest value to subscriptions. Instead, it propagates the error notification
23/// from the source `Observable` to all subscriptions. This ensures that existing and
24/// new subscriptions are properly informed about the error, maintaining consistent
25/// error handling across observers.
26///
27/// In `rxr`, this type is primarily used for calling its `emitter_receiver` function,
28/// and then you use the returned [`AsyncSubjectEmitter`] to emit values, while
29/// using [`AsyncSubjectReceiver`] to subscribe to those values.
30///
31/// [`AsyncSubjectEmitter`]: struct.AsyncSubjectEmitter.html
32/// [`AsyncSubjectReceiver`]: struct.AsyncSubjectReceiver.html
33///
34/// # Examples
35///
36/// `AsyncSubject` completion
37///
38///```no_run
39/// use std::fmt::Display;
40///
41/// use rxr::{subjects::AsyncSubject, subscribe::Subscriber};
42/// use rxr::{ObservableExt, Observer, Subscribeable};
43///
44/// pub fn create_subscriber<T: Display>(subscriber_id: i32) -> Subscriber<T> {
45/// Subscriber::new(
46/// move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
47/// |_| eprintln!("Error"),
48/// move || println!("Completed {}", subscriber_id),
49/// )
50/// }
51///
52/// // Initialize a `AsyncSubject` and obtain its emitter and receiver.
53/// let (mut emitter, mut receiver) = AsyncSubject::emitter_receiver();
54///
55/// // Registers `Subscriber` 1.
56/// receiver.subscribe(create_subscriber(1));
57///
58/// emitter.next(101); // Stores 101 ast the latest value.
59/// emitter.next(102); // Latest value is now 102.
60///
61/// // All Observable operators can be applied to the receiver.
62/// // Registers mapped `Subscriber` 2.
63/// receiver
64/// .clone() // Shallow clone: clones only the pointer to the `AsyncSubject`.
65/// .map(|v| format!("mapped {}", v))
66/// .subscribe(create_subscriber(2));
67///
68/// // Registers `Subscriber` 3.
69/// receiver.subscribe(create_subscriber(3));
70///
71/// emitter.next(103); // Latest value is now 103.
72///
73/// // Emits latest value (103) to registered `Subscriber`'s 1, 2 and 3 and calls
74/// // `complete` on each of them.
75/// emitter.complete();
76///
77/// // Subscriber 4: post-completion subscribe, emits latest value (103) and completes.
78/// receiver.subscribe(create_subscriber(4));
79///
80/// emitter.next(104); // Called post-completion, does not emit.
81///```
82///
83/// `AsyncSubject` error
84///
85///```no_run
86/// use std::error::Error;
87/// use std::fmt::Display;
88/// use std::sync::Arc;
89///
90/// use rxr::{subjects::AsyncSubject, subscribe::Subscriber};
91/// use rxr::{ObservableExt, Observer, Subscribeable};
92///
93/// pub fn create_subscriber<T: Display>(subscriber_id: i32) -> Subscriber<T> {
94/// Subscriber::new(
95/// move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
96/// move |e| eprintln!("Error: {} {}", e, subscriber_id),
97/// || println!("Completed"),
98/// )
99/// }
100///
101/// #[derive(Debug)]
102/// struct AsyncSubjectError(String);
103///
104/// impl Display for AsyncSubjectError {
105/// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106/// write!(f, "{}", self.0)
107/// }
108/// }
109///
110/// impl Error for AsyncSubjectError {}
111///
112/// // Initialize a `AsyncSubject` and obtain its emitter and receiver.
113/// let (mut emitter, mut receiver) = AsyncSubject::emitter_receiver();
114///
115/// // Registers `Subscriber` 1.
116/// receiver.subscribe(create_subscriber(1));
117///
118/// emitter.next(101); // Stores 101 ast the latest value.
119/// emitter.next(102); // Latest value is now 102.
120///
121/// // All Observable operators can be applied to the receiver.
122/// // Registers mapped `Subscriber` 2.
123/// receiver
124/// .clone() // Shallow clone: clones only the pointer to the `AsyncSubject`.
125/// .map(|v| format!("mapped {}", v))
126/// .subscribe(create_subscriber(2));
127///
128/// // Registers `Subscriber` 3.
129/// receiver.subscribe(create_subscriber(3));
130///
131/// emitter.next(103); // Latest value is now 103.
132///
133/// // Calls `error` on registered `Subscriber`'s 1, 2 and 3.
134/// emitter.error(Arc::new(AsyncSubjectError(
135/// "AsyncSubject error".to_string(),
136/// )));
137///
138/// // Subscriber 4: subscribed after subject's error call; emits error and
139/// // does not emit further.
140/// receiver.subscribe(create_subscriber(4));
141///
142/// emitter.next(104); // Called post-completion, does not emit.
143///```
144pub struct AsyncSubject<T> {
145 value: Option<T>,
146 observers: Vec<(u64, Subscriber<T>)>,
147 // fused: bool,
148 completed: bool,
149 closed: bool,
150 error: Option<Arc<dyn Error + Send + Sync>>,
151}
152
153impl<T: Send + Sync + 'static> AsyncSubject<T> {
154 /// Initializes an `AsyncSubject` and returns a tuple containing an
155 /// `AsyncSubjectEmitter` for emitting values and an `AsyncSubjectReceiver`
156 /// for subscribing to emitted values.
157 #[must_use]
158 pub fn emitter_receiver() -> (AsyncSubjectEmitter<T>, AsyncSubjectReceiver<T>) {
159 let s = Arc::new(Mutex::new(AsyncSubject {
160 value: None,
161 observers: Vec::with_capacity(16),
162 // fused: false,
163 completed: false,
164 closed: false,
165 error: None,
166 }));
167
168 (
169 AsyncSubjectEmitter(Arc::clone(&s)),
170 AsyncSubjectReceiver(Arc::clone(&s)),
171 )
172 }
173}
174
175/// Subscription handler for `AsyncSubject`.
176///
177/// `AsyncSubjectReceiver` acts as an `Observable`, allowing you to utilize its
178/// `subscribe` method for receiving emissions from the `AsyncSubject`'s multicasting.
179/// You can also employ its `unsubscribe` method to close the `AsyncSubject` and
180/// remove registered observers.
181#[allow(clippy::module_name_repetitions)]
182#[derive(Clone)]
183pub struct AsyncSubjectReceiver<T>(Arc<Mutex<AsyncSubject<T>>>);
184
185// Multicasting emitter for `AsyncSubject`.
186///
187/// `AsyncSubjectEmitter` acts as an `Observer`, allowing you to utilize its `next`,
188/// `error`, and `complete` methods for multicasting emissions to all registered
189/// observers within the `AsyncSubject`.
190#[allow(clippy::module_name_repetitions)]
191#[derive(Clone)]
192pub struct AsyncSubjectEmitter<T>(Arc<Mutex<AsyncSubject<T>>>);
193
194impl<T> AsyncSubjectReceiver<T> {
195 /// Returns the number of registered observers.
196 #[must_use]
197 pub fn len(&self) -> usize {
198 self.0.lock().unwrap().observers.len()
199 }
200
201 /// Returns `true` if no observers are registered, `false` otherwise.
202 #[must_use]
203 pub fn is_empty(&self) -> bool {
204 self.len() == 0
205 }
206
207 // pub(crate) fn fuse(self) -> Self {
208 // for (_, o) in &mut self.0.lock().unwrap().observers {
209 // o.set_fused(true);
210 // }
211 // self
212 // }
213
214 // pub(crate) fn defuse(self) -> Self {
215 // for (_, o) in &mut self.0.lock().unwrap().observers {
216 // o.set_fused(false);
217 // }
218 // self
219 // }
220}
221
222impl<T> crate::subscription::subscribe::Fuse for AsyncSubjectReceiver<T> {}
223
224impl<T: Clone + Send + Sync + 'static> Subscribeable for AsyncSubjectReceiver<T> {
225 type ObsType = T;
226
227 fn subscribe(&mut self, mut v: Subscriber<Self::ObsType>) -> Subscription {
228 let key: u64 = super::gen_key().next().unwrap_or(super::random_seed());
229
230 if let Ok(mut src) = self.0.lock() {
231 if src.closed {
232 return Subscription::subject_subscription(
233 UnsubscribeLogic::Nil,
234 SubscriptionHandle::Nil,
235 );
236 }
237 // if src.fused {
238 // v.set_fused(true);
239 // }
240 // If AsyncSubject is completed do not register new Subscriber.
241 if src.completed {
242 if let Some(err) = &src.error {
243 // AsyncSubject completed with error. Call error() on
244 // every subsequent Subscriber.
245 v.error(Arc::clone(err));
246 } else {
247 // AsyncSubject completed. Emit stored value if there is one and
248 // call complete() on every subsequent Subscriber.
249 if let Some(value) = &src.value {
250 v.next(value.clone());
251 }
252 v.complete();
253 }
254 return Subscription::subject_subscription(
255 UnsubscribeLogic::Nil,
256 SubscriptionHandle::Nil,
257 );
258 }
259 // Register Subscriber.
260 src.observers.push((key, v));
261 } else {
262 return Subscription::subject_subscription(
263 UnsubscribeLogic::Nil,
264 SubscriptionHandle::Nil,
265 );
266 };
267
268 let source_cloned = Arc::clone(&self.0);
269
270 Subscription::subject_subscription(
271 UnsubscribeLogic::Logic(Box::new(move || {
272 source_cloned
273 .lock()
274 .unwrap()
275 .observers
276 .retain(move |v| v.0 != key);
277 })),
278 SubscriptionHandle::Nil,
279 )
280 }
281}
282
283impl<T> Unsubscribeable for AsyncSubjectReceiver<T> {
284 fn unsubscribe(self) {
285 if let Ok(mut r) = self.0.lock() {
286 r.closed = true;
287 r.observers.clear();
288 }
289 }
290}
291
292impl<T: Clone> Observer for AsyncSubjectEmitter<T> {
293 type NextFnType = T;
294
295 fn next(&mut self, v: Self::NextFnType) {
296 if let Ok(mut src) = self.0.lock() {
297 if src.completed || src.closed {
298 return;
299 }
300 // Store new value in AsyncSubject.
301 src.value = Some(v);
302 }
303 }
304
305 fn error(&mut self, e: Arc<dyn Error + Send + Sync>) {
306 if let Ok(mut src) = self.0.lock() {
307 if src.completed || src.closed {
308 return;
309 }
310 for (_, o) in &mut src.observers {
311 o.error(e.clone());
312 }
313 src.completed = true;
314 src.error = Some(e);
315 src.observers.clear();
316 }
317 }
318
319 fn complete(&mut self) {
320 if let Ok(mut src) = self.0.lock() {
321 if src.completed || src.closed {
322 return;
323 }
324 src.completed = true;
325 if let Some(value) = &src.value {
326 let v = value.clone();
327 for (_, o) in &mut src.observers {
328 o.next(v.clone());
329 }
330 }
331 for (_, o) in &mut src.observers {
332 o.complete();
333 }
334 src.observers.clear();
335 }
336 }
337}
338
339impl<T: Clone + Send + 'static> From<AsyncSubjectEmitter<T>> for Subscriber<T> {
340 fn from(mut value: AsyncSubjectEmitter<T>) -> Self {
341 let mut vn = value.clone();
342 let mut ve = value.clone();
343 Subscriber::new(
344 move |v| {
345 vn.next(v);
346 },
347 move |e| ve.error(e),
348 move || value.complete(),
349 )
350 }
351}
352
353impl<T: Clone + Send + Sync + 'static> From<AsyncSubjectReceiver<T>> for Observable<T> {
354 fn from(mut value: AsyncSubjectReceiver<T>) -> Self {
355 Observable::new(move |subscriber| value.subscribe(subscriber))
356 }
357}