use std::sync::{Arc, Mutex, MutexGuard};
use rx_core_macro_subscription_derive::RxSubscription;
use crate::{
LockWithPoisonBehavior, SubscriptionClosedFlag, SubscriptionData, SubscriptionLike,
SubscriptionLikePushNotificationExtention, SubscriptionNotification, SubscriptionWithTeardown,
TeardownCollectionExtension,
};
pub(crate) const SUBSCRIPTION_MAX_RECURSION_DEPTH: usize = 10;
#[derive(Debug)]
pub(crate) struct SubscriptionUnsubscribeLockError;
#[derive(Debug)]
struct SubscriptionDeferredState {
pub(crate) deferred_notifications_queue: Vec<SubscriptionNotification>,
pub(crate) closed_flag: SubscriptionClosedFlag,
pub(crate) observed_unsubscribe: bool,
}
impl SubscriptionDeferredState {
pub(crate) fn defer_notification(&mut self, notification: SubscriptionNotification) {
let is_first_unsubscribe = matches!(notification, SubscriptionNotification::Unsubscribe)
&& !self.observed_unsubscribe;
if *self.closed_flag && !is_first_unsubscribe {
return;
}
self.deferred_notifications_queue.push(notification);
}
pub(crate) fn drain_notification_queue(&mut self) -> Vec<SubscriptionNotification> {
self.deferred_notifications_queue
.drain(..)
.collect::<Vec<_>>()
}
pub(crate) fn is_dirty(&self) -> bool {
!self.deferred_notifications_queue.is_empty()
}
pub(crate) fn is_unsubscribed(&self) -> bool {
self.observed_unsubscribe
}
pub(crate) fn is_closed(&self) -> bool {
self.is_closed_ignoring_deferred() || self.observed_unsubscribe
}
pub(crate) fn is_closed_ignoring_deferred(&self) -> bool {
*self.closed_flag
}
}
impl Drop for SubscriptionDeferredState {
fn drop(&mut self) {
self.closed_flag.close();
debug_assert!(
!self.is_dirty(),
"SubscriptionDeferredState was dropped dirty!"
);
}
}
impl Default for SubscriptionDeferredState {
fn default() -> Self {
Self {
closed_flag: false.into(),
observed_unsubscribe: false,
deferred_notifications_queue: Vec::default(),
}
}
}
#[derive(RxSubscription, Default, Clone, Debug)]
#[_rx_core_common_crate(crate)]
#[rx_delegate_teardown_collection]
#[rx_skip_unsubscribe_on_drop_impl] pub struct SharedSubscription {
#[destination]
subscription: Arc<Mutex<SubscriptionData>>,
deferred_state: Arc<Mutex<SubscriptionDeferredState>>,
}
impl SharedSubscription {
pub fn new<S>(mut subscription: S) -> Self
where
S: 'static + SubscriptionWithTeardown + Send + Sync,
{
let mut shared_subscription = Self::default();
subscription.add(shared_subscription.clone());
shared_subscription.add(subscription);
shared_subscription
}
fn try_apply_deferred(&mut self) {
if self.deferred_state.lock_ignore_poison().is_dirty()
&& let Ok(mut subscription) = self.subscription.try_lock()
{
SharedSubscription::apply_notification_queue(
self.deferred_state.clone(),
&mut subscription,
);
}
}
fn apply_notification_queue(
state: Arc<Mutex<SubscriptionDeferredState>>,
subscriber: &mut MutexGuard<'_, SubscriptionData>,
) {
for queue_depth in 0..=SUBSCRIPTION_MAX_RECURSION_DEPTH {
let notifications = {
let mut locked_state = state.lock_ignore_poison();
if queue_depth == SUBSCRIPTION_MAX_RECURSION_DEPTH {
panic!(
"Notification queue depth have exceeded {SUBSCRIPTION_MAX_RECURSION_DEPTH}!"
)
}
if locked_state.deferred_notifications_queue.is_empty() {
break;
}
locked_state.drain_notification_queue()
};
for notification in notifications.into_iter() {
let is_unsubscribe = matches!(¬ification, SubscriptionNotification::Unsubscribe);
if !state.lock_clear_poison().is_closed_ignoring_deferred() {
subscriber.push(notification);
}
if is_unsubscribe {
state.lock_ignore_poison().closed_flag.close();
}
}
}
}
pub(crate) fn try_unsubscribe(&mut self) -> Result<(), SubscriptionUnsubscribeLockError> {
match self.subscription.try_lock() {
Ok(mut subscription) => {
Self::apply_notification_queue(self.deferred_state.clone(), &mut subscription);
self.deferred_state.lock_ignore_poison().closed_flag.close();
if !subscription.is_closed() {
subscription.unsubscribe();
}
Ok(())
}
Err(_) => Err(SubscriptionUnsubscribeLockError),
}
}
}
impl SubscriptionLike for SharedSubscription {
fn is_closed(&self) -> bool {
self.deferred_state.lock_ignore_poison().is_closed()
|| self
.subscription
.try_lock()
.map(|s| s.is_closed())
.unwrap_or(false)
}
fn unsubscribe(&mut self) {
self.try_apply_deferred();
let was_unsubscribed = {
let mut state = self.deferred_state.lock_ignore_poison();
let was_unsubscribed = state.is_unsubscribed();
state.observed_unsubscribe = true;
was_unsubscribed
};
if !was_unsubscribed && let Err(_unsubscribe_error) = self.try_unsubscribe() {
self.deferred_state
.lock_ignore_poison()
.defer_notification(SubscriptionNotification::Unsubscribe);
}
self.try_apply_deferred();
}
}
impl Drop for SharedSubscription {
fn drop(&mut self) {
self.try_apply_deferred();
}
}