rx_rust/subject/
publish_subject.rs

1use super::Subject;
2use crate::disposable::Disposable;
3use crate::disposable::subscription::Subscription;
4use crate::observer::Event;
5use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
6use crate::{
7    observable::Observable,
8    observer::{Observer, Termination, boxed_observer::BoxedObserver},
9};
10use educe::Educe;
11use slotmap::{DefaultKey, DenseSlotMap};
12
13#[derive(Educe)]
14#[educe(Debug, Default)]
15enum State<'or, T, E> {
16    #[educe(Default)]
17    Idle(DenseSlotMap<DefaultKey, Option<BoxedObserver<'or, T, E>>>),
18    Processing {
19        slot_map: DenseSlotMap<DefaultKey, Option<BoxedObserver<'or, T, E>>>,
20        events: Vec<Event<T, E>>,
21    },
22    Terminated(Termination<E>),
23}
24
25/// Basic multicast subject that forwards events to all observers.
26#[derive(Educe)]
27#[educe(Debug, Clone, Default)]
28pub struct PublishSubject<'or, T, E>(Shared<Mutable<State<'or, T, E>>>);
29
30impl<T, E> PublishSubject<'_, T, E> {
31    pub fn new() -> Self {
32        Self(Shared::new(Mutable::new(State::Idle(DenseSlotMap::new()))))
33    }
34}
35
36impl<'or, 'sub, T, E> Observable<'or, 'sub, T, E> for PublishSubject<'or, T, E>
37where
38    T: NecessarySendSync + 'sub,
39    E: Clone + NecessarySendSync + 'sub,
40    'or: 'sub,
41{
42    fn subscribe(
43        self,
44        observer: impl Observer<T, E> + NecessarySendSync + 'or,
45    ) -> Subscription<'sub> {
46        self.0.clone().lock_mut(|mut lock| match &mut *lock {
47            State::Idle(observers) => {
48                let key = observers.insert(Some(BoxedObserver::new(observer)));
49                drop(lock);
50                Subscription::new_with_disposal(PublishSubjectDisposal { state: self.0, key })
51            }
52            State::Processing {
53                slot_map,
54                events: _,
55            } => {
56                let key = slot_map.insert(Some(BoxedObserver::new(observer)));
57                drop(lock);
58                Subscription::new_with_disposal(PublishSubjectDisposal { state: self.0, key })
59            }
60            State::Terminated(termination) => {
61                let termination = termination.clone();
62                drop(lock);
63                observer.on_termination(termination);
64                Subscription::default()
65            }
66        })
67    }
68}
69
70impl<T, E> Observer<T, E> for PublishSubject<'_, T, E>
71where
72    T: Clone,
73    E: Clone,
74{
75    fn on_next(&mut self, value: T) {
76        self.0.clone().lock_mut(|mut lock| match &mut *lock {
77            State::Idle(_) => {
78                // Get SloptMap
79                let mut dense_slot_map = match std::mem::replace(
80                    &mut *lock,
81                    State::Processing {
82                        slot_map: DenseSlotMap::new(), // Placeholder
83                        events: Vec::new(),
84                    },
85                ) {
86                    State::Idle(dense_slot_map) => dense_slot_map,
87                    State::Processing { .. } => unreachable!(),
88                    State::Terminated(..) => unreachable!(),
89                };
90
91                // Get observers
92                let mut observers: Vec<_> = dense_slot_map
93                    .iter_mut()
94                    .map(|value| (value.0, value.1.take().unwrap()))
95                    .collect();
96
97                // Set SloptMap
98                match &mut *lock {
99                    State::Idle(..) => unreachable!(),
100                    State::Processing {
101                        slot_map,
102                        events: _,
103                    } => *slot_map = dense_slot_map,
104                    State::Terminated(..) => unreachable!(),
105                }
106
107                // Notify
108                drop(lock);
109                observers
110                    .iter_mut()
111                    .for_each(|observer| observer.1.on_next(value.clone()));
112
113                let events = self.0.clone().lock_mut(|mut lock| {
114                    // Get SloptMap and Events
115                    let (mut slot_map, events) = match std::mem::replace(
116                        &mut *lock,
117                        State::Idle(DenseSlotMap::new()), // Placeholder,
118                    ) {
119                        State::Idle(..) => unreachable!(),
120                        State::Processing { slot_map, events } => (slot_map, events),
121                        State::Terminated(..) => unreachable!(),
122                    };
123
124                    // Set Observers
125                    for (key, observer) in observers {
126                        if slot_map.contains_key(key) {
127                            slot_map[key] = Some(observer);
128                        } else {
129                            // already unsubscribed
130                        }
131                    }
132
133                    // Set SloptMap
134                    *lock = State::Idle(slot_map);
135
136                    // Return
137                    events
138                });
139
140                // Handle Events
141                for event in events {
142                    match event {
143                        Event::Next(value) => {
144                            self.on_next(value);
145                        }
146                        Event::Termination(termination) => {
147                            self.clone().on_termination(termination);
148                            break;
149                        }
150                    }
151                }
152            }
153            State::Processing {
154                slot_map: _,
155                events,
156            } => {
157                events.push(Event::Next(value));
158            }
159            State::Terminated(_) => {
160                // It's already terminated. Do nothing.
161            }
162        });
163    }
164
165    fn on_termination(self, termination: Termination<E>) {
166        self.0.lock_mut(|mut lock| {
167            match &mut *lock {
168                State::Idle(_) => {
169                    // Get SloptMap
170                    let dense_slot_map =
171                        match std::mem::replace(&mut *lock, State::Terminated(termination.clone()))
172                        {
173                            State::Idle(dense_slot_map) => dense_slot_map,
174                            State::Processing { .. } => unreachable!(),
175                            State::Terminated(..) => unreachable!(),
176                        };
177                    drop(lock);
178
179                    // Notify
180                    dense_slot_map.into_iter().for_each(|observer| {
181                        observer.1.unwrap().on_termination(termination.clone())
182                    });
183                }
184                State::Processing {
185                    slot_map: _,
186                    events,
187                } => {
188                    events.push(Event::Termination(termination));
189                }
190                State::Terminated(_) => {
191                    // It's already terminated. Do nothing.
192                }
193            }
194        });
195    }
196}
197
198impl<'or, 'sub, T, E> Subject<'or, 'sub, T, E> for PublishSubject<'or, T, E>
199where
200    T: Clone + NecessarySendSync + 'sub,
201    E: Clone + NecessarySendSync + 'sub,
202    'or: 'sub,
203{
204    fn terminated(&self) -> Option<Termination<E>>
205    where
206        E: Clone,
207    {
208        self.0.lock_ref(|lock| match &*lock {
209            State::Idle(_) => None,
210            State::Processing { .. } => None,
211            State::Terminated(termination) => Some(termination.clone()),
212        })
213    }
214}
215
216struct PublishSubjectDisposal<'or, T, E> {
217    state: Shared<Mutable<State<'or, T, E>>>,
218    key: DefaultKey,
219}
220
221impl<T, E> Disposable for PublishSubjectDisposal<'_, T, E> {
222    fn dispose(self) {
223        self.state.lock_mut(|mut lock| {
224            match &mut *lock {
225                State::Idle(observers) => {
226                    observers.remove(self.key);
227                }
228                State::Processing {
229                    slot_map,
230                    events: _,
231                } => {
232                    slot_map.remove(self.key);
233                }
234                State::Terminated(_) => {
235                    // Do nothing if it was already terminated
236                }
237            }
238        });
239    }
240}