rxr/subjects/
async_subject.rs

1use std::{
2    error::Error,
3    sync::{Arc, Mutex},
4};
5
6use crate::{
7    observer::Observer,
8    subscribe::Unsubscribeable,
9    subscription::subscribe::{
10        Subscribeable, Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic,
11    },
12    Observable,
13};
14
15/// A specialized `Subject` variant emits its latest value to observers upon completion.
16///
17/// `AsyncSubject` captures and broadcasts the last emitted value from a source
18/// observable, but this broadcasting occurs only after the source observable
19/// completes. It sends this value to all new subscriptions.
20///
21/// If an error is invoked in the source observable, the `AsyncSubject` will not emit
22/// the latest value to subscriptions. Instead, it propagates the error notification
23/// from the source `Observable` to all subscriptions. This ensures that existing and
24/// new subscriptions are properly informed about the error, maintaining consistent
25/// error handling across observers.
26///
27/// In `rxr`, this type is primarily used for calling its `emitter_receiver` function,
28/// and then you use the returned [`AsyncSubjectEmitter`] to emit values, while
29/// using [`AsyncSubjectReceiver`] to subscribe to those values.
30///
31/// [`AsyncSubjectEmitter`]: struct.AsyncSubjectEmitter.html
32/// [`AsyncSubjectReceiver`]: struct.AsyncSubjectReceiver.html
33///
34/// # Examples
35///
36/// `AsyncSubject` completion
37///
38///```no_run
39/// use std::fmt::Display;
40///
41/// use rxr::{subjects::AsyncSubject, subscribe::Subscriber};
42/// use rxr::{ObservableExt, Observer, Subscribeable};
43///
44/// pub fn create_subscriber<T: Display>(subscriber_id: i32) -> Subscriber<T> {
45///     Subscriber::new(
46///         move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
47///         |_| eprintln!("Error"),
48///         move || println!("Completed {}", subscriber_id),
49///     )
50/// }
51///
52/// // Initialize a `AsyncSubject` and obtain its emitter and receiver.
53/// let (mut emitter, mut receiver) = AsyncSubject::emitter_receiver();
54///
55/// // Registers `Subscriber` 1.
56/// receiver.subscribe(create_subscriber(1));
57///
58/// emitter.next(101); // Stores 101 ast the latest value.
59/// emitter.next(102); // Latest value is now 102.
60///
61/// // All Observable operators can be applied to the receiver.
62/// // Registers mapped `Subscriber` 2.
63/// receiver
64///     .clone() // Shallow clone: clones only the pointer to the `AsyncSubject`.
65///     .map(|v| format!("mapped {}", v))
66///     .subscribe(create_subscriber(2));
67///
68/// // Registers `Subscriber` 3.
69/// receiver.subscribe(create_subscriber(3));
70///
71/// emitter.next(103); // Latest value is now 103.
72///
73/// // Emits latest value (103) to registered `Subscriber`'s 1, 2 and 3 and calls
74/// // `complete` on each of them.
75/// emitter.complete();
76///
77/// // Subscriber 4: post-completion subscribe, emits latest value (103) and completes.
78/// receiver.subscribe(create_subscriber(4));
79///
80/// emitter.next(104); // Called post-completion, does not emit.
81///```
82///
83/// `AsyncSubject` error
84///
85///```no_run
86/// use std::error::Error;
87/// use std::fmt::Display;
88/// use std::sync::Arc;
89///
90/// use rxr::{subjects::AsyncSubject, subscribe::Subscriber};
91/// use rxr::{ObservableExt, Observer, Subscribeable};
92///
93/// pub fn create_subscriber<T: Display>(subscriber_id: i32) -> Subscriber<T> {
94///     Subscriber::new(
95///         move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
96///         move |e| eprintln!("Error: {} {}", e, subscriber_id),
97///         || println!("Completed"),
98///     )
99/// }
100///
101/// #[derive(Debug)]
102/// struct AsyncSubjectError(String);
103///
104/// impl Display for AsyncSubjectError {
105///     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106///         write!(f, "{}", self.0)
107///     }
108/// }
109///
110/// impl Error for AsyncSubjectError {}
111///
112/// // Initialize a `AsyncSubject` and obtain its emitter and receiver.
113/// let (mut emitter, mut receiver) = AsyncSubject::emitter_receiver();
114///
115/// // Registers `Subscriber` 1.
116/// receiver.subscribe(create_subscriber(1));
117///
118/// emitter.next(101); // Stores 101 ast the latest value.
119/// emitter.next(102); // Latest value is now 102.
120///
121/// // All Observable operators can be applied to the receiver.
122/// // Registers mapped `Subscriber` 2.
123/// receiver
124///     .clone() // Shallow clone: clones only the pointer to the `AsyncSubject`.
125///     .map(|v| format!("mapped {}", v))
126///     .subscribe(create_subscriber(2));
127///
128/// // Registers `Subscriber` 3.
129/// receiver.subscribe(create_subscriber(3));
130///
131/// emitter.next(103); // Latest value is now 103.
132///
133/// // Calls `error` on registered `Subscriber`'s 1, 2 and 3.
134/// emitter.error(Arc::new(AsyncSubjectError(
135///     "AsyncSubject error".to_string(),
136/// )));
137///
138/// // Subscriber 4: subscribed after subject's error call; emits error and
139/// // does not emit further.
140/// receiver.subscribe(create_subscriber(4));
141///
142/// emitter.next(104); // Called post-completion, does not emit.
143///```
144pub struct AsyncSubject<T> {
145    value: Option<T>,
146    observers: Vec<(u64, Subscriber<T>)>,
147    // fused: bool,
148    completed: bool,
149    closed: bool,
150    error: Option<Arc<dyn Error + Send + Sync>>,
151}
152
153impl<T: Send + Sync + 'static> AsyncSubject<T> {
154    /// Initializes an `AsyncSubject` and returns a tuple containing an
155    /// `AsyncSubjectEmitter` for emitting values and an `AsyncSubjectReceiver`
156    /// for subscribing to emitted values.
157    #[must_use]
158    pub fn emitter_receiver() -> (AsyncSubjectEmitter<T>, AsyncSubjectReceiver<T>) {
159        let s = Arc::new(Mutex::new(AsyncSubject {
160            value: None,
161            observers: Vec::with_capacity(16),
162            // fused: false,
163            completed: false,
164            closed: false,
165            error: None,
166        }));
167
168        (
169            AsyncSubjectEmitter(Arc::clone(&s)),
170            AsyncSubjectReceiver(Arc::clone(&s)),
171        )
172    }
173}
174
175/// Subscription handler for `AsyncSubject`.
176///
177/// `AsyncSubjectReceiver` acts as an `Observable`, allowing you to utilize its
178/// `subscribe` method for receiving emissions from the `AsyncSubject`'s multicasting.
179/// You can also employ its `unsubscribe` method to close the `AsyncSubject` and
180/// remove registered observers.
181#[allow(clippy::module_name_repetitions)]
182#[derive(Clone)]
183pub struct AsyncSubjectReceiver<T>(Arc<Mutex<AsyncSubject<T>>>);
184
185// Multicasting emitter for `AsyncSubject`.
186///
187/// `AsyncSubjectEmitter` acts as an `Observer`, allowing you to utilize its `next`,
188/// `error`, and `complete` methods for multicasting emissions to all registered
189/// observers within the `AsyncSubject`.
190#[allow(clippy::module_name_repetitions)]
191#[derive(Clone)]
192pub struct AsyncSubjectEmitter<T>(Arc<Mutex<AsyncSubject<T>>>);
193
194impl<T> AsyncSubjectReceiver<T> {
195    /// Returns the number of registered observers.
196    #[must_use]
197    pub fn len(&self) -> usize {
198        self.0.lock().unwrap().observers.len()
199    }
200
201    /// Returns `true` if no observers are registered, `false` otherwise.
202    #[must_use]
203    pub fn is_empty(&self) -> bool {
204        self.len() == 0
205    }
206
207    // pub(crate) fn fuse(self) -> Self {
208    //     for (_, o) in &mut self.0.lock().unwrap().observers {
209    //         o.set_fused(true);
210    //     }
211    //     self
212    // }
213
214    // pub(crate) fn defuse(self) -> Self {
215    //     for (_, o) in &mut self.0.lock().unwrap().observers {
216    //         o.set_fused(false);
217    //     }
218    //     self
219    // }
220}
221
222impl<T> crate::subscription::subscribe::Fuse for AsyncSubjectReceiver<T> {}
223
224impl<T: Clone + Send + Sync + 'static> Subscribeable for AsyncSubjectReceiver<T> {
225    type ObsType = T;
226
227    fn subscribe(&mut self, mut v: Subscriber<Self::ObsType>) -> Subscription {
228        let key: u64 = super::gen_key().next().unwrap_or(super::random_seed());
229
230        if let Ok(mut src) = self.0.lock() {
231            if src.closed {
232                return Subscription::subject_subscription(
233                    UnsubscribeLogic::Nil,
234                    SubscriptionHandle::Nil,
235                );
236            }
237            // if src.fused {
238            //     v.set_fused(true);
239            // }
240            // If AsyncSubject is completed do not register new Subscriber.
241            if src.completed {
242                if let Some(err) = &src.error {
243                    // AsyncSubject completed with error. Call error() on
244                    // every subsequent Subscriber.
245                    v.error(Arc::clone(err));
246                } else {
247                    // AsyncSubject completed. Emit stored value if there is one and
248                    // call complete() on every subsequent Subscriber.
249                    if let Some(value) = &src.value {
250                        v.next(value.clone());
251                    }
252                    v.complete();
253                }
254                return Subscription::subject_subscription(
255                    UnsubscribeLogic::Nil,
256                    SubscriptionHandle::Nil,
257                );
258            }
259            // Register Subscriber.
260            src.observers.push((key, v));
261        } else {
262            return Subscription::subject_subscription(
263                UnsubscribeLogic::Nil,
264                SubscriptionHandle::Nil,
265            );
266        };
267
268        let source_cloned = Arc::clone(&self.0);
269
270        Subscription::subject_subscription(
271            UnsubscribeLogic::Logic(Box::new(move || {
272                source_cloned
273                    .lock()
274                    .unwrap()
275                    .observers
276                    .retain(move |v| v.0 != key);
277            })),
278            SubscriptionHandle::Nil,
279        )
280    }
281}
282
283impl<T> Unsubscribeable for AsyncSubjectReceiver<T> {
284    fn unsubscribe(self) {
285        if let Ok(mut r) = self.0.lock() {
286            r.closed = true;
287            r.observers.clear();
288        }
289    }
290}
291
292impl<T: Clone> Observer for AsyncSubjectEmitter<T> {
293    type NextFnType = T;
294
295    fn next(&mut self, v: Self::NextFnType) {
296        if let Ok(mut src) = self.0.lock() {
297            if src.completed || src.closed {
298                return;
299            }
300            // Store new value in AsyncSubject.
301            src.value = Some(v);
302        }
303    }
304
305    fn error(&mut self, e: Arc<dyn Error + Send + Sync>) {
306        if let Ok(mut src) = self.0.lock() {
307            if src.completed || src.closed {
308                return;
309            }
310            for (_, o) in &mut src.observers {
311                o.error(e.clone());
312            }
313            src.completed = true;
314            src.error = Some(e);
315            src.observers.clear();
316        }
317    }
318
319    fn complete(&mut self) {
320        if let Ok(mut src) = self.0.lock() {
321            if src.completed || src.closed {
322                return;
323            }
324            src.completed = true;
325            if let Some(value) = &src.value {
326                let v = value.clone();
327                for (_, o) in &mut src.observers {
328                    o.next(v.clone());
329                }
330            }
331            for (_, o) in &mut src.observers {
332                o.complete();
333            }
334            src.observers.clear();
335        }
336    }
337}
338
339impl<T: Clone + Send + 'static> From<AsyncSubjectEmitter<T>> for Subscriber<T> {
340    fn from(mut value: AsyncSubjectEmitter<T>) -> Self {
341        let mut vn = value.clone();
342        let mut ve = value.clone();
343        Subscriber::new(
344            move |v| {
345                vn.next(v);
346            },
347            move |e| ve.error(e),
348            move || value.complete(),
349        )
350    }
351}
352
353impl<T: Clone + Send + Sync + 'static> From<AsyncSubjectReceiver<T>> for Observable<T> {
354    fn from(mut value: AsyncSubjectReceiver<T>) -> Self {
355        Observable::new(move |subscriber| value.subscribe(subscriber))
356    }
357}