rx_rust/subject/
replay_subject.rs

1use super::{Subject, publish_subject::PublishSubject};
2use crate::safe_lock;
3use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
4use crate::{
5    disposable::subscription::Subscription,
6    observable::Observable,
7    observer::{Observer, Termination},
8};
9use educe::Educe;
10use std::collections::VecDeque;
11
12/// Buffers emissions and replays them to late subscribers.
13#[derive(Educe)]
14#[educe(Debug, Clone)]
15pub struct ReplaySubject<'or, T, E> {
16    values: Shared<Mutable<VecDeque<T>>>,
17    buffer_size: Option<usize>,
18    publish_subject: PublishSubject<'or, T, E>,
19}
20
21impl<T, E> ReplaySubject<'_, T, E> {
22    pub fn new(buffer_size: Option<usize>) -> Self {
23        let vec = match buffer_size {
24            Some(size) => VecDeque::with_capacity(size),
25            None => VecDeque::new(),
26        };
27        Self {
28            values: Shared::new(Mutable::new(vec)),
29            buffer_size,
30            publish_subject: PublishSubject::default(),
31        }
32    }
33}
34
35impl<'or, 'sub, T, E> Observable<'or, 'sub, T, E> for ReplaySubject<'or, T, E>
36where
37    T: Clone + NecessarySendSync + 'sub,
38    E: Clone + NecessarySendSync + 'sub,
39    'or: 'sub,
40{
41    fn subscribe(
42        self,
43        mut observer: impl Observer<T, E> + NecessarySendSync + 'or,
44    ) -> Subscription<'sub> {
45        if let Some(terminated) = self.terminated() {
46            match &terminated {
47                Termination::Completed => {
48                    let values = safe_lock!(clone: self.values);
49                    for value in values {
50                        observer.on_next(value);
51                    }
52                }
53                Termination::Error(_) => {}
54            }
55            observer.on_termination(terminated);
56            Subscription::default()
57        } else {
58            let values = safe_lock!(clone: self.values);
59            for value in values {
60                observer.on_next(value);
61            }
62            self.publish_subject.subscribe(observer)
63        }
64    }
65}
66
67impl<T, E> Observer<T, E> for ReplaySubject<'_, T, E>
68where
69    T: Clone + NecessarySendSync,
70    E: Clone + NecessarySendSync,
71{
72    fn on_next(&mut self, value: T) {
73        if self.terminated().is_none() {
74            self.values.lock_mut(|mut lock| {
75                if let Some(buffer_size) = self.buffer_size {
76                    if lock.len() == buffer_size {
77                        if lock.pop_front().is_some() {
78                            // only push if the buffer is not 0
79                            lock.push_back(value.clone());
80                        }
81                    } else {
82                        lock.push_back(value.clone());
83                    }
84                } else {
85                    lock.push_back(value.clone());
86                }
87            });
88            self.publish_subject.on_next(value);
89        }
90    }
91
92    fn on_termination(self, termination: Termination<E>) {
93        self.publish_subject.on_termination(termination);
94    }
95}
96
97impl<'or, 'sub, T, E> Subject<'or, 'sub, T, E> for ReplaySubject<'or, T, E>
98where
99    T: Clone + NecessarySendSync + 'sub,
100    E: Clone + NecessarySendSync + 'sub,
101    'or: 'sub,
102{
103    fn terminated(&self) -> Option<Termination<E>>
104    where
105        E: Clone,
106    {
107        self.publish_subject.terminated()
108    }
109}