rx_rust/subject/
replay_subject.rs1use super::{Subject, publish_subject::PublishSubject};
2use crate::safe_lock;
3use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
4use crate::{
5 disposable::subscription::Subscription,
6 observable::Observable,
7 observer::{Observer, Termination},
8};
9use educe::Educe;
10use std::collections::VecDeque;
11
12#[derive(Educe)]
14#[educe(Debug, Clone)]
15pub struct ReplaySubject<'or, T, E> {
16 values: Shared<Mutable<VecDeque<T>>>,
17 buffer_size: Option<usize>,
18 publish_subject: PublishSubject<'or, T, E>,
19}
20
21impl<T, E> ReplaySubject<'_, T, E> {
22 pub fn new(buffer_size: Option<usize>) -> Self {
23 let vec = match buffer_size {
24 Some(size) => VecDeque::with_capacity(size),
25 None => VecDeque::new(),
26 };
27 Self {
28 values: Shared::new(Mutable::new(vec)),
29 buffer_size,
30 publish_subject: PublishSubject::default(),
31 }
32 }
33}
34
35impl<'or, 'sub, T, E> Observable<'or, 'sub, T, E> for ReplaySubject<'or, T, E>
36where
37 T: Clone + NecessarySendSync + 'sub,
38 E: Clone + NecessarySendSync + 'sub,
39 'or: 'sub,
40{
41 fn subscribe(
42 self,
43 mut observer: impl Observer<T, E> + NecessarySendSync + 'or,
44 ) -> Subscription<'sub> {
45 if let Some(terminated) = self.terminated() {
46 match &terminated {
47 Termination::Completed => {
48 let values = safe_lock!(clone: self.values);
49 for value in values {
50 observer.on_next(value);
51 }
52 }
53 Termination::Error(_) => {}
54 }
55 observer.on_termination(terminated);
56 Subscription::default()
57 } else {
58 let values = safe_lock!(clone: self.values);
59 for value in values {
60 observer.on_next(value);
61 }
62 self.publish_subject.subscribe(observer)
63 }
64 }
65}
66
67impl<T, E> Observer<T, E> for ReplaySubject<'_, T, E>
68where
69 T: Clone + NecessarySendSync,
70 E: Clone + NecessarySendSync,
71{
72 fn on_next(&mut self, value: T) {
73 if self.terminated().is_none() {
74 self.values.lock_mut(|mut lock| {
75 if let Some(buffer_size) = self.buffer_size {
76 if lock.len() == buffer_size {
77 if lock.pop_front().is_some() {
78 lock.push_back(value.clone());
80 }
81 } else {
82 lock.push_back(value.clone());
83 }
84 } else {
85 lock.push_back(value.clone());
86 }
87 });
88 self.publish_subject.on_next(value);
89 }
90 }
91
92 fn on_termination(self, termination: Termination<E>) {
93 self.publish_subject.on_termination(termination);
94 }
95}
96
97impl<'or, 'sub, T, E> Subject<'or, 'sub, T, E> for ReplaySubject<'or, T, E>
98where
99 T: Clone + NecessarySendSync + 'sub,
100 E: Clone + NecessarySendSync + 'sub,
101 'or: 'sub,
102{
103 fn terminated(&self) -> Option<Termination<E>>
104 where
105 E: Clone,
106 {
107 self.publish_subject.terminated()
108 }
109}