rx_rust/subject/
async_subject.rs

1use super::{Subject, publish_subject::PublishSubject};
2use crate::utils::types::{Mutable, NecessarySendSync, Shared};
3use crate::{
4    disposable::subscription::Subscription,
5    observable::Observable,
6    observer::{Observer, Termination},
7};
8use crate::{safe_lock, safe_lock_option};
9use educe::Educe;
10
11/// Remembers only the last emission and replays it on completion.
12#[derive(Educe)]
13#[educe(Debug, Clone, Default)]
14pub struct AsyncSubject<'or, T, E> {
15    value: Shared<Mutable<Option<T>>>,
16    publish_subject: PublishSubject<'or, T, E>,
17}
18
19impl<T, E> AsyncSubject<'_, T, E> {
20    pub fn new() -> Self {
21        Self {
22            value: Shared::new(Mutable::new(None)),
23            publish_subject: PublishSubject::default(),
24        }
25    }
26}
27
28impl<'or, 'sub, T, E> Observable<'or, 'sub, T, E> for AsyncSubject<'or, T, E>
29where
30    T: Clone + NecessarySendSync + 'sub,
31    E: Clone + NecessarySendSync + 'sub,
32    'or: 'sub,
33{
34    fn subscribe(
35        self,
36        mut observer: impl Observer<T, E> + NecessarySendSync + 'or,
37    ) -> Subscription<'sub> {
38        if let Some(terminated) = self.terminated() {
39            match &terminated {
40                Termination::Completed => {
41                    if let Some(value) = safe_lock!(clone: self.value) {
42                        observer.on_next(value);
43                    }
44                }
45                Termination::Error(_) => {}
46            }
47            observer.on_termination(terminated);
48            Subscription::default()
49        } else {
50            self.publish_subject.subscribe(observer)
51        }
52    }
53}
54
55impl<T, E> Observer<T, E> for AsyncSubject<'_, T, E>
56where
57    T: Clone + NecessarySendSync,
58    E: Clone + NecessarySendSync,
59{
60    fn on_next(&mut self, value: T) {
61        if self.terminated().is_none() {
62            safe_lock_option!(replace: self.value, value);
63        }
64    }
65
66    fn on_termination(mut self, termination: Termination<E>) {
67        match &termination {
68            Termination::Completed => {
69                if let Some(value) = safe_lock!(clone: self.value) {
70                    self.publish_subject.on_next(value);
71                }
72            }
73            Termination::Error(_) => {}
74        }
75        self.publish_subject.on_termination(termination);
76    }
77}
78
79impl<'or, 'sub, T, E> Subject<'or, 'sub, T, E> for AsyncSubject<'or, T, E>
80where
81    T: Clone + NecessarySendSync + 'sub,
82    E: Clone + NecessarySendSync + 'sub,
83    'or: 'sub,
84{
85    fn terminated(&self) -> Option<Termination<E>>
86    where
87        E: Clone,
88    {
89        self.publish_subject.terminated()
90    }
91}