1use super::Subject;
2use crate::disposable::Disposable;
3use crate::disposable::subscription::Subscription;
4use crate::observer::Event;
5use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
6use crate::{
7 observable::Observable,
8 observer::{Observer, Termination, boxed_observer::BoxedObserver},
9};
10use educe::Educe;
11use slotmap::{DefaultKey, DenseSlotMap};
12
13#[derive(Educe)]
14#[educe(Debug, Default)]
15enum State<'or, T, E> {
16 #[educe(Default)]
17 Idle(DenseSlotMap<DefaultKey, Option<BoxedObserver<'or, T, E>>>),
18 Processing {
19 slot_map: DenseSlotMap<DefaultKey, Option<BoxedObserver<'or, T, E>>>,
20 events: Vec<Event<T, E>>,
21 },
22 Terminated(Termination<E>),
23}
24
25#[derive(Educe)]
27#[educe(Debug, Clone, Default)]
28pub struct PublishSubject<'or, T, E>(Shared<Mutable<State<'or, T, E>>>);
29
30impl<T, E> PublishSubject<'_, T, E> {
31 pub fn new() -> Self {
32 Self(Shared::new(Mutable::new(State::Idle(DenseSlotMap::new()))))
33 }
34}
35
36impl<'or, 'sub, T, E> Observable<'or, 'sub, T, E> for PublishSubject<'or, T, E>
37where
38 T: NecessarySendSync + 'sub,
39 E: Clone + NecessarySendSync + 'sub,
40 'or: 'sub,
41{
42 fn subscribe(
43 self,
44 observer: impl Observer<T, E> + NecessarySendSync + 'or,
45 ) -> Subscription<'sub> {
46 self.0.clone().lock_mut(|mut lock| match &mut *lock {
47 State::Idle(observers) => {
48 let key = observers.insert(Some(BoxedObserver::new(observer)));
49 drop(lock);
50 Subscription::new_with_disposal(PublishSubjectDisposal { state: self.0, key })
51 }
52 State::Processing {
53 slot_map,
54 events: _,
55 } => {
56 let key = slot_map.insert(Some(BoxedObserver::new(observer)));
57 drop(lock);
58 Subscription::new_with_disposal(PublishSubjectDisposal { state: self.0, key })
59 }
60 State::Terminated(termination) => {
61 let termination = termination.clone();
62 drop(lock);
63 observer.on_termination(termination);
64 Subscription::default()
65 }
66 })
67 }
68}
69
70impl<T, E> Observer<T, E> for PublishSubject<'_, T, E>
71where
72 T: Clone,
73 E: Clone,
74{
75 fn on_next(&mut self, value: T) {
76 self.0.clone().lock_mut(|mut lock| match &mut *lock {
77 State::Idle(_) => {
78 let mut dense_slot_map = match std::mem::replace(
80 &mut *lock,
81 State::Processing {
82 slot_map: DenseSlotMap::new(), events: Vec::new(),
84 },
85 ) {
86 State::Idle(dense_slot_map) => dense_slot_map,
87 State::Processing { .. } => unreachable!(),
88 State::Terminated(..) => unreachable!(),
89 };
90
91 let mut observers: Vec<_> = dense_slot_map
93 .iter_mut()
94 .map(|value| (value.0, value.1.take().unwrap()))
95 .collect();
96
97 match &mut *lock {
99 State::Idle(..) => unreachable!(),
100 State::Processing {
101 slot_map,
102 events: _,
103 } => *slot_map = dense_slot_map,
104 State::Terminated(..) => unreachable!(),
105 }
106
107 drop(lock);
109 observers
110 .iter_mut()
111 .for_each(|observer| observer.1.on_next(value.clone()));
112
113 let events = self.0.clone().lock_mut(|mut lock| {
114 let (mut slot_map, events) = match std::mem::replace(
116 &mut *lock,
117 State::Idle(DenseSlotMap::new()), ) {
119 State::Idle(..) => unreachable!(),
120 State::Processing { slot_map, events } => (slot_map, events),
121 State::Terminated(..) => unreachable!(),
122 };
123
124 for (key, observer) in observers {
126 if slot_map.contains_key(key) {
127 slot_map[key] = Some(observer);
128 } else {
129 }
131 }
132
133 *lock = State::Idle(slot_map);
135
136 events
138 });
139
140 for event in events {
142 match event {
143 Event::Next(value) => {
144 self.on_next(value);
145 }
146 Event::Termination(termination) => {
147 self.clone().on_termination(termination);
148 break;
149 }
150 }
151 }
152 }
153 State::Processing {
154 slot_map: _,
155 events,
156 } => {
157 events.push(Event::Next(value));
158 }
159 State::Terminated(_) => {
160 }
162 });
163 }
164
165 fn on_termination(self, termination: Termination<E>) {
166 self.0.lock_mut(|mut lock| {
167 match &mut *lock {
168 State::Idle(_) => {
169 let dense_slot_map =
171 match std::mem::replace(&mut *lock, State::Terminated(termination.clone()))
172 {
173 State::Idle(dense_slot_map) => dense_slot_map,
174 State::Processing { .. } => unreachable!(),
175 State::Terminated(..) => unreachable!(),
176 };
177 drop(lock);
178
179 dense_slot_map.into_iter().for_each(|observer| {
181 observer.1.unwrap().on_termination(termination.clone())
182 });
183 }
184 State::Processing {
185 slot_map: _,
186 events,
187 } => {
188 events.push(Event::Termination(termination));
189 }
190 State::Terminated(_) => {
191 }
193 }
194 });
195 }
196}
197
198impl<'or, 'sub, T, E> Subject<'or, 'sub, T, E> for PublishSubject<'or, T, E>
199where
200 T: Clone + NecessarySendSync + 'sub,
201 E: Clone + NecessarySendSync + 'sub,
202 'or: 'sub,
203{
204 fn terminated(&self) -> Option<Termination<E>>
205 where
206 E: Clone,
207 {
208 self.0.lock_ref(|lock| match &*lock {
209 State::Idle(_) => None,
210 State::Processing { .. } => None,
211 State::Terminated(termination) => Some(termination.clone()),
212 })
213 }
214}
215
216struct PublishSubjectDisposal<'or, T, E> {
217 state: Shared<Mutable<State<'or, T, E>>>,
218 key: DefaultKey,
219}
220
221impl<T, E> Disposable for PublishSubjectDisposal<'_, T, E> {
222 fn dispose(self) {
223 self.state.lock_mut(|mut lock| {
224 match &mut *lock {
225 State::Idle(observers) => {
226 observers.remove(self.key);
227 }
228 State::Processing {
229 slot_map,
230 events: _,
231 } => {
232 slot_map.remove(self.key);
233 }
234 State::Terminated(_) => {
235 }
237 }
238 });
239 }
240}