1use super::Subject;
2use crate::disposable::Disposable;
3use crate::disposable::subscription::Subscription;
4use crate::observer::Event;
5use crate::utils::types::{Mutable, MutableHelper, NecessarySend, Shared};
6use crate::{
7 observable::Observable,
8 observer::{Observer, Termination, boxed_observer::BoxedObserver},
9};
10use educe::Educe;
11use slotmap::{DefaultKey, DenseSlotMap};
12
13#[derive(Educe)]
14#[educe(Debug, Default)]
15enum State<'or, T, E> {
16 #[educe(Default)]
17 Idle(DenseSlotMap<DefaultKey, Option<BoxedObserver<'or, T, E>>>),
18 Processing {
19 slot_map: DenseSlotMap<DefaultKey, Option<BoxedObserver<'or, T, E>>>,
20 events: Vec<Event<T, E>>,
21 },
22 Terminated(Termination<E>),
23}
24
25#[derive(Educe)]
27#[educe(Debug, Clone, Default)]
28pub struct PublishSubject<'or, T, E>(Shared<Mutable<State<'or, T, E>>>);
29
30impl<T, E> PublishSubject<'_, T, E> {
31 pub fn new() -> Self {
32 Self(Shared::new(Mutable::new(State::Idle(DenseSlotMap::new()))))
33 }
34}
35
36impl<'or, 'sub, T, E> Observable<'or, 'sub, T, E> for PublishSubject<'or, T, E>
37where
38 T: NecessarySend + 'sub,
39 E: Clone + NecessarySend + 'sub,
40 'or: 'sub,
41{
42 fn subscribe(self, observer: impl Observer<T, E> + NecessarySend + 'or) -> Subscription<'sub> {
43 self.0.clone().lock_mut(|mut lock| match &mut *lock {
44 State::Idle(observers) => {
45 let key = observers.insert(Some(BoxedObserver::new(observer)));
46 drop(lock);
47 Subscription::new_with_disposal(PublishSubjectDisposal { state: self.0, key })
48 }
49 State::Processing {
50 slot_map,
51 events: _,
52 } => {
53 let key = slot_map.insert(Some(BoxedObserver::new(observer)));
54 drop(lock);
55 Subscription::new_with_disposal(PublishSubjectDisposal { state: self.0, key })
56 }
57 State::Terminated(termination) => {
58 let termination = termination.clone();
59 drop(lock);
60 observer.on_termination(termination);
61 Subscription::default()
62 }
63 })
64 }
65}
66
67impl<T, E> Observer<T, E> for PublishSubject<'_, T, E>
68where
69 T: Clone,
70 E: Clone,
71{
72 fn on_next(&mut self, value: T) {
73 self.0.clone().lock_mut(|mut lock| match &mut *lock {
74 State::Idle(_) => {
75 let mut dense_slot_map = match std::mem::replace(
77 &mut *lock,
78 State::Processing {
79 slot_map: DenseSlotMap::new(), events: Vec::new(),
81 },
82 ) {
83 State::Idle(dense_slot_map) => dense_slot_map,
84 State::Processing { .. } => unreachable!(),
85 State::Terminated(..) => unreachable!(),
86 };
87
88 let mut observers: Vec<_> = dense_slot_map
90 .iter_mut()
91 .map(|value| (value.0, value.1.take().unwrap()))
92 .collect();
93
94 match &mut *lock {
96 State::Idle(..) => unreachable!(),
97 State::Processing {
98 slot_map,
99 events: _,
100 } => *slot_map = dense_slot_map,
101 State::Terminated(..) => unreachable!(),
102 }
103
104 drop(lock);
106 observers
107 .iter_mut()
108 .for_each(|observer| observer.1.on_next(value.clone()));
109
110 let events = self.0.clone().lock_mut(|mut lock| {
111 let (mut slot_map, events) = match std::mem::replace(
113 &mut *lock,
114 State::Idle(DenseSlotMap::new()), ) {
116 State::Idle(..) => unreachable!(),
117 State::Processing { slot_map, events } => (slot_map, events),
118 State::Terminated(..) => unreachable!(),
119 };
120
121 for (key, observer) in observers {
123 if slot_map.contains_key(key) {
124 slot_map[key] = Some(observer);
125 } else {
126 }
128 }
129
130 *lock = State::Idle(slot_map);
132
133 events
135 });
136
137 for event in events {
139 match event {
140 Event::Next(value) => {
141 self.on_next(value);
142 }
143 Event::Termination(termination) => {
144 self.clone().on_termination(termination);
145 break;
146 }
147 }
148 }
149 }
150 State::Processing {
151 slot_map: _,
152 events,
153 } => {
154 events.push(Event::Next(value));
155 }
156 State::Terminated(_) => {
157 }
159 });
160 }
161
162 fn on_termination(self, termination: Termination<E>) {
163 self.0.lock_mut(|mut lock| {
164 match &mut *lock {
165 State::Idle(_) => {
166 let dense_slot_map =
168 match std::mem::replace(&mut *lock, State::Terminated(termination.clone()))
169 {
170 State::Idle(dense_slot_map) => dense_slot_map,
171 State::Processing { .. } => unreachable!(),
172 State::Terminated(..) => unreachable!(),
173 };
174 drop(lock);
175
176 dense_slot_map.into_iter().for_each(|observer| {
178 observer.1.unwrap().on_termination(termination.clone())
179 });
180 }
181 State::Processing {
182 slot_map: _,
183 events,
184 } => {
185 events.push(Event::Termination(termination));
186 }
187 State::Terminated(_) => {
188 }
190 }
191 });
192 }
193}
194
195impl<'or, 'sub, T, E> Subject<'or, 'sub, T, E> for PublishSubject<'or, T, E>
196where
197 T: Clone + NecessarySend + 'sub,
198 E: Clone + NecessarySend + 'sub,
199 'or: 'sub,
200{
201 fn terminated(&self) -> Option<Termination<E>>
202 where
203 E: Clone,
204 {
205 self.0.lock_ref(|lock| match &*lock {
206 State::Idle(_) => None,
207 State::Processing { .. } => None,
208 State::Terminated(termination) => Some(termination.clone()),
209 })
210 }
211}
212
213struct PublishSubjectDisposal<'or, T, E> {
214 state: Shared<Mutable<State<'or, T, E>>>,
215 key: DefaultKey,
216}
217
218impl<T, E> Disposable for PublishSubjectDisposal<'_, T, E> {
219 fn dispose(self) {
220 self.state.lock_mut(|mut lock| {
221 match &mut *lock {
222 State::Idle(observers) => {
223 observers.remove(self.key);
224 }
225 State::Processing {
226 slot_map,
227 events: _,
228 } => {
229 slot_map.remove(self.key);
230 }
231 State::Terminated(_) => {
232 }
234 }
235 });
236 }
237}