use super::Subject;
use crate::disposable::Disposable;
use crate::disposable::subscription::Subscription;
use crate::observer::Event;
use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
use crate::{
observable::Observable,
observer::{Observer, Termination, boxed_observer::BoxedObserver},
};
use educe::Educe;
use slotmap::{DefaultKey, DenseSlotMap};
#[derive(Educe)]
#[educe(Debug, Default)]
enum State<'or, T, E> {
#[educe(Default)]
Idle(DenseSlotMap<DefaultKey, Option<BoxedObserver<'or, T, E>>>),
Processing {
slot_map: DenseSlotMap<DefaultKey, Option<BoxedObserver<'or, T, E>>>,
events: Vec<Event<T, E>>,
},
Terminated(Termination<E>),
}
#[derive(Educe)]
#[educe(Debug, Clone, Default)]
pub struct PublishSubject<'or, T, E>(Shared<Mutable<State<'or, T, E>>>);
impl<T, E> PublishSubject<'_, T, E> {
pub fn new() -> Self {
Self(Shared::new(Mutable::new(State::Idle(DenseSlotMap::new()))))
}
}
impl<'or, 'sub, T, E> Observable<'or, 'sub, T, E> for PublishSubject<'or, T, E>
where
T: NecessarySendSync + 'sub,
E: Clone + NecessarySendSync + 'sub,
'or: 'sub,
{
fn subscribe(
self,
observer: impl Observer<T, E> + NecessarySendSync + 'or,
) -> Subscription<'sub> {
self.0.clone().lock_mut(|mut lock| match &mut *lock {
State::Idle(observers) => {
let key = observers.insert(Some(BoxedObserver::new(observer)));
drop(lock);
Subscription::new_with_disposal(PublishSubjectDisposal { state: self.0, key })
}
State::Processing {
slot_map,
events: _,
} => {
let key = slot_map.insert(Some(BoxedObserver::new(observer)));
drop(lock);
Subscription::new_with_disposal(PublishSubjectDisposal { state: self.0, key })
}
State::Terminated(termination) => {
let termination = termination.clone();
drop(lock);
observer.on_termination(termination);
Subscription::default()
}
})
}
}
impl<T, E> Observer<T, E> for PublishSubject<'_, T, E>
where
T: Clone,
E: Clone,
{
fn on_next(&mut self, value: T) {
self.0.clone().lock_mut(|mut lock| match &mut *lock {
State::Idle(_) => {
let mut dense_slot_map = match std::mem::replace(
&mut *lock,
State::Processing {
slot_map: DenseSlotMap::new(), events: Vec::new(),
},
) {
State::Idle(dense_slot_map) => dense_slot_map,
State::Processing { .. } => unreachable!(),
State::Terminated(..) => unreachable!(),
};
let mut observers: Vec<_> = dense_slot_map
.iter_mut()
.map(|value| (value.0, value.1.take().unwrap()))
.collect();
match &mut *lock {
State::Idle(..) => unreachable!(),
State::Processing {
slot_map,
events: _,
} => *slot_map = dense_slot_map,
State::Terminated(..) => unreachable!(),
}
drop(lock);
observers
.iter_mut()
.for_each(|observer| observer.1.on_next(value.clone()));
let events = self.0.clone().lock_mut(|mut lock| {
let (mut slot_map, events) = match std::mem::replace(
&mut *lock,
State::Idle(DenseSlotMap::new()), ) {
State::Idle(..) => unreachable!(),
State::Processing { slot_map, events } => (slot_map, events),
State::Terminated(..) => unreachable!(),
};
for (key, observer) in observers {
if slot_map.contains_key(key) {
slot_map[key] = Some(observer);
} else {
}
}
*lock = State::Idle(slot_map);
events
});
for event in events {
match event {
Event::Next(value) => {
self.on_next(value);
}
Event::Termination(termination) => {
self.clone().on_termination(termination);
break;
}
}
}
}
State::Processing {
slot_map: _,
events,
} => {
events.push(Event::Next(value));
}
State::Terminated(_) => {
}
});
}
fn on_termination(self, termination: Termination<E>) {
self.0.lock_mut(|mut lock| {
match &mut *lock {
State::Idle(_) => {
let dense_slot_map =
match std::mem::replace(&mut *lock, State::Terminated(termination.clone()))
{
State::Idle(dense_slot_map) => dense_slot_map,
State::Processing { .. } => unreachable!(),
State::Terminated(..) => unreachable!(),
};
drop(lock);
dense_slot_map.into_iter().for_each(|observer| {
observer.1.unwrap().on_termination(termination.clone())
});
}
State::Processing {
slot_map: _,
events,
} => {
events.push(Event::Termination(termination));
}
State::Terminated(_) => {
}
}
});
}
}
impl<'or, 'sub, T, E> Subject<'or, 'sub, T, E> for PublishSubject<'or, T, E>
where
T: Clone + NecessarySendSync + 'sub,
E: Clone + NecessarySendSync + 'sub,
'or: 'sub,
{
fn terminated(&self) -> Option<Termination<E>>
where
E: Clone,
{
self.0.lock_ref(|lock| match &*lock {
State::Idle(_) => None,
State::Processing { .. } => None,
State::Terminated(termination) => Some(termination.clone()),
})
}
}
struct PublishSubjectDisposal<'or, T, E> {
state: Shared<Mutable<State<'or, T, E>>>,
key: DefaultKey,
}
impl<T, E> Disposable for PublishSubjectDisposal<'_, T, E> {
fn dispose(self) {
self.state.lock_mut(|mut lock| {
match &mut *lock {
State::Idle(observers) => {
observers.remove(self.key);
}
State::Processing {
slot_map,
events: _,
} => {
slot_map.remove(self.key);
}
State::Terminated(_) => {
}
}
});
}
}