rx_rust/subject/
publish_subject.rs

1use super::Subject;
2use crate::disposable::Disposable;
3use crate::disposable::subscription::Subscription;
4use crate::observer::Event;
5use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
6use crate::{
7    observable::Observable,
8    observer::{Observer, Termination, boxed_observer::BoxedObserver},
9};
10use educe::Educe;
11use slotmap::{DefaultKey, DenseSlotMap};
12
13#[derive(Educe)]
14#[educe(Debug, Default)]
15enum State<'or, T, E> {
16    #[educe(Default)]
17    Idle(DenseSlotMap<DefaultKey, Option<BoxedObserver<'or, T, E>>>),
18    Processing {
19        slot_map: DenseSlotMap<DefaultKey, Option<BoxedObserver<'or, T, E>>>,
20        events: Vec<Event<T, E>>,
21    },
22    Terminated(Termination<E>),
23}
24
25/// Basic multicast subject that forwards events to all observers.
26#[derive(Educe)]
27#[educe(Debug, Clone, Default)]
28pub struct PublishSubject<'or, T, E>(Shared<Mutable<State<'or, T, E>>>);
29
30impl<T, E> PublishSubject<'_, T, E> {
31    pub fn new() -> Self {
32        Self(Shared::new(Mutable::new(State::Idle(DenseSlotMap::new()))))
33    }
34}
35
36impl<'or, 'sub, T, E> Observable<'or, 'sub, T, E> for PublishSubject<'or, T, E>
37where
38    T: NecessarySendSync + 'sub,
39    E: Clone + NecessarySendSync + 'sub,
40    'or: 'sub,
41{
42    fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
43        self.0.clone().lock_mut(|mut lock| match &mut *lock {
44            State::Idle(observers) => {
45                let key = observers.insert(Some(BoxedObserver::new(observer)));
46                drop(lock);
47                Subscription::new_with_disposal(PublishSubjectDisposal { state: self.0, key })
48            }
49            State::Processing {
50                slot_map,
51                events: _,
52            } => {
53                let key = slot_map.insert(Some(BoxedObserver::new(observer)));
54                drop(lock);
55                Subscription::new_with_disposal(PublishSubjectDisposal { state: self.0, key })
56            }
57            State::Terminated(termination) => {
58                let termination = termination.clone();
59                drop(lock);
60                observer.on_termination(termination);
61                Subscription::default()
62            }
63        })
64    }
65}
66
67impl<T, E> Observer<T, E> for PublishSubject<'_, T, E>
68where
69    T: Clone,
70    E: Clone,
71{
72    fn on_next(&mut self, value: T) {
73        self.0.clone().lock_mut(|mut lock| match &mut *lock {
74            State::Idle(_) => {
75                // Get SloptMap
76                let mut dense_slot_map = match std::mem::replace(
77                    &mut *lock,
78                    State::Processing {
79                        slot_map: DenseSlotMap::new(), // Placeholder
80                        events: Vec::new(),
81                    },
82                ) {
83                    State::Idle(dense_slot_map) => dense_slot_map,
84                    State::Processing { .. } => unreachable!(),
85                    State::Terminated(..) => unreachable!(),
86                };
87
88                // Get observers
89                let mut observers: Vec<_> = dense_slot_map
90                    .iter_mut()
91                    .map(|value| (value.0, value.1.take().unwrap()))
92                    .collect();
93
94                // Set SloptMap
95                match &mut *lock {
96                    State::Idle(..) => unreachable!(),
97                    State::Processing {
98                        slot_map,
99                        events: _,
100                    } => *slot_map = dense_slot_map,
101                    State::Terminated(..) => unreachable!(),
102                }
103
104                // Notify
105                drop(lock);
106                observers
107                    .iter_mut()
108                    .for_each(|observer| observer.1.on_next(value.clone()));
109
110                let events = self.0.clone().lock_mut(|mut lock| {
111                    // Get SloptMap and Events
112                    let (mut slot_map, events) = match std::mem::replace(
113                        &mut *lock,
114                        State::Idle(DenseSlotMap::new()), // Placeholder,
115                    ) {
116                        State::Idle(..) => unreachable!(),
117                        State::Processing { slot_map, events } => (slot_map, events),
118                        State::Terminated(..) => unreachable!(),
119                    };
120
121                    // Set Observers
122                    for (key, observer) in observers {
123                        if slot_map.contains_key(key) {
124                            slot_map[key] = Some(observer);
125                        } else {
126                            // already unsubscribed
127                        }
128                    }
129
130                    // Set SloptMap
131                    *lock = State::Idle(slot_map);
132
133                    // Return
134                    events
135                });
136
137                // Handle Events
138                for event in events {
139                    match event {
140                        Event::Next(value) => {
141                            self.on_next(value);
142                        }
143                        Event::Termination(termination) => {
144                            self.clone().on_termination(termination);
145                            break;
146                        }
147                    }
148                }
149            }
150            State::Processing {
151                slot_map: _,
152                events,
153            } => {
154                events.push(Event::Next(value));
155            }
156            State::Terminated(_) => {
157                // It's already terminated. Do nothing.
158            }
159        });
160    }
161
162    fn on_termination(self, termination: Termination<E>) {
163        self.0.lock_mut(|mut lock| {
164            match &mut *lock {
165                State::Idle(_) => {
166                    // Get SloptMap
167                    let dense_slot_map =
168                        match std::mem::replace(&mut *lock, State::Terminated(termination.clone()))
169                        {
170                            State::Idle(dense_slot_map) => dense_slot_map,
171                            State::Processing { .. } => unreachable!(),
172                            State::Terminated(..) => unreachable!(),
173                        };
174                    drop(lock);
175
176                    // Notify
177                    dense_slot_map.into_iter().for_each(|observer| {
178                        observer.1.unwrap().on_termination(termination.clone())
179                    });
180                }
181                State::Processing {
182                    slot_map: _,
183                    events,
184                } => {
185                    events.push(Event::Termination(termination));
186                }
187                State::Terminated(_) => {
188                    // It's already terminated. Do nothing.
189                }
190            }
191        });
192    }
193}
194
195impl<'or, 'sub, T, E> Subject<'or, 'sub, T, E> for PublishSubject<'or, T, E>
196where
197    T: Clone + NecessarySendSync + 'sub,
198    E: Clone + NecessarySendSync + 'sub,
199    'or: 'sub,
200{
201    fn terminated(&self) -> Option<Termination<E>>
202    where
203        E: Clone,
204    {
205        self.0.lock_ref(|lock| match &*lock {
206            State::Idle(_) => None,
207            State::Processing { .. } => None,
208            State::Terminated(termination) => Some(termination.clone()),
209        })
210    }
211}
212
213struct PublishSubjectDisposal<'or, T, E> {
214    state: Shared<Mutable<State<'or, T, E>>>,
215    key: DefaultKey,
216}
217
218impl<T, E> Disposable for PublishSubjectDisposal<'_, T, E> {
219    fn dispose(self) {
220        self.state.lock_mut(|mut lock| {
221            match &mut *lock {
222                State::Idle(observers) => {
223                    observers.remove(self.key);
224                }
225                State::Processing {
226                    slot_map,
227                    events: _,
228                } => {
229                    slot_map.remove(self.key);
230                }
231                State::Terminated(_) => {
232                    // Do nothing if it was already terminated
233                }
234            }
235        });
236    }
237}