use super::subscribers::Subscribers;
use crate::{
context::{RcDerefMut, SharedCell},
scheduler::{Scheduler, Task, TaskState},
subscription::Subscription,
};
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum SubscriptionState {
Pending,
Ready(usize),
Cancelled,
}
pub struct SubjectSubscription<P, Sch, Cell> {
pub(crate) observers: P,
pub(crate) state: Cell,
pub(crate) scheduler: Sch,
}
impl<P, Sch, Cell> SubjectSubscription<P, Sch, Cell> {
pub(crate) fn new(observers: P, state: Cell, scheduler: Sch) -> Self {
Self { observers, state, scheduler }
}
}
pub(crate) struct RemoveState<P> {
observers: P,
id: usize,
}
impl<P, O, Sch, Cell> Subscription for SubjectSubscription<P, Sch, Cell>
where
P: RcDerefMut<Target = Subscribers<O>>,
Sch: Scheduler<Task<RemoveState<P>>>,
Cell: SharedCell<SubscriptionState>,
{
fn unsubscribe(self) {
loop {
let current = self.state.get();
match current {
SubscriptionState::Cancelled => return, SubscriptionState::Pending => {
if self
.state
.compare_exchange(SubscriptionState::Pending, SubscriptionState::Cancelled)
.is_ok()
{
return;
}
}
SubscriptionState::Ready(id) => {
if self
.state
.compare_exchange(SubscriptionState::Ready(id), SubscriptionState::Cancelled)
.is_ok()
{
if let Some(mut guard) = self.observers.try_rc_deref_mut() {
let _ob = guard.remove(id);
return;
}
let task = Task::new(RemoveState { observers: self.observers, id }, |state| {
let _ob = { state.observers.rc_deref_mut().remove(state.id) };
TaskState::Finished
});
let _handle = self.scheduler.schedule(task, None);
return;
}
}
}
}
}
fn is_closed(&self) -> bool { self.state.get() == SubscriptionState::Cancelled }
}