rx_rust/subject/
behavior_subject.rs

1use super::{Subject, publish_subject::PublishSubject};
2use crate::safe_lock;
3use crate::utils::types::{Mutable, NecessarySendSync, Shared};
4use crate::{
5    disposable::subscription::Subscription,
6    observable::Observable,
7    observer::{Observer, Termination},
8};
9use educe::Educe;
10
11/// Keeps the latest value and emits it immediately to new subscribers.
12#[derive(Educe)]
13#[educe(Debug, Clone)]
14pub struct BehaviorSubject<'or, T, E> {
15    value: Shared<Mutable<T>>,
16    publish_subject: PublishSubject<'or, T, E>,
17}
18
19impl<T, E> BehaviorSubject<'_, T, E> {
20    pub fn new(value: T) -> Self {
21        Self {
22            value: Shared::new(Mutable::new(value)),
23            publish_subject: PublishSubject::default(),
24        }
25    }
26
27    pub fn value(&self) -> T
28    where
29        T: Clone,
30    {
31        safe_lock!(clone: self.value)
32    }
33}
34
35impl<'or, 'sub, T, E> Observable<'or, 'sub, T, E> for BehaviorSubject<'or, T, E>
36where
37    T: Clone + NecessarySendSync + 'sub,
38    E: Clone + NecessarySendSync + 'sub,
39    'or: 'sub,
40{
41    fn subscribe(
42        self,
43        mut observer: impl Observer<T, E> + NecessarySendSync + 'or,
44    ) -> Subscription<'sub> {
45        if let Some(terminated) = self.terminated() {
46            observer.on_termination(terminated);
47            Subscription::default()
48        } else {
49            observer.on_next(safe_lock!(clone: self.value));
50            self.publish_subject.subscribe(observer)
51        }
52    }
53}
54
55impl<T, E> Observer<T, E> for BehaviorSubject<'_, T, E>
56where
57    T: Clone + NecessarySendSync,
58    E: Clone + NecessarySendSync,
59{
60    fn on_next(&mut self, value: T) {
61        if self.terminated().is_none() {
62            safe_lock!(set: self.value, value.clone());
63            self.publish_subject.on_next(value);
64        }
65    }
66
67    fn on_termination(self, termination: Termination<E>) {
68        self.publish_subject.on_termination(termination);
69    }
70}
71
72impl<'or, 'sub, T, E> Subject<'or, 'sub, T, E> for BehaviorSubject<'or, T, E>
73where
74    T: Clone + NecessarySendSync + 'sub,
75    E: Clone + NecessarySendSync + 'sub,
76    'or: 'sub,
77{
78    fn terminated(&self) -> Option<Termination<E>>
79    where
80        E: Clone,
81    {
82        self.publish_subject.terminated()
83    }
84}