use spin::RwLock;
use uuid::Uuid;
use crate::core::{Deserializer, Transport};
use crate::subscribe::traits::EventHandler;
use crate::{
core::{DataStream, PubNubEntity},
dx::pubnub_client::PubNubClientInstance,
lib::{
alloc::{
string::String,
sync::{Arc, Weak},
vec,
vec::Vec,
},
collections::HashMap,
core::{
fmt::{Debug, Formatter, Result},
ops::{Add, AddAssign, Deref, DerefMut, Sub, SubAssign},
},
},
subscribe::{
event_engine::SubscriptionInput, AppContext, EventDispatcher, EventEmitter,
EventSubscriber, File, Message, MessageAction, Presence, Subscriber, Subscription,
SubscriptionCursor, SubscriptionOptions, Update,
},
};
pub struct SubscriptionSet<
T: Transport + Send + Sync + 'static,
D: Deserializer + Send + Sync + 'static,
> {
pub(super) inner: Arc<SubscriptionSetRef<T, D>>,
is_clone: bool,
}
pub struct SubscriptionSetRef<
T: Transport + Send + Sync + 'static,
D: Deserializer + Send + Sync + 'static,
> {
pub(super) instance_id: String,
state: Arc<SubscriptionSetState<T, D>>,
event_dispatcher: EventDispatcher,
}
#[derive(Debug)]
pub struct SubscriptionSetState<
T: Transport + Send + Sync + 'static,
D: Deserializer + Send + Sync + 'static,
> {
pub(super) id: String,
pub(super) client: Weak<PubNubClientInstance<T, D>>,
pub(crate) subscriptions: RwLock<Vec<Subscription<T, D>>>,
pub(super) is_subscribed: Arc<RwLock<bool>>,
pub(crate) subscription_input: RwLock<SubscriptionInput>,
cursor: RwLock<Option<SubscriptionCursor>>,
options: Option<Vec<SubscriptionOptions>>,
clones: RwLock<HashMap<String, Weak<SubscriptionSetRef<T, D>>>>,
}
impl<T, D> SubscriptionSet<T, D>
where
T: Transport + Send + Sync + 'static,
D: Deserializer + Send + Sync + 'static,
{
pub(crate) fn new(
entities: Vec<PubNubEntity<T, D>>,
options: Option<Vec<SubscriptionOptions>>,
) -> Self {
Self {
inner: SubscriptionSetRef::new(entities, options),
is_clone: false,
}
}
pub fn new_with_subscriptions(
subscriptions: Vec<Subscription<T, D>>,
options: Option<Vec<SubscriptionOptions>>,
) -> Self {
Self {
inner: SubscriptionSetRef::new_with_subscriptions(subscriptions, options),
is_clone: false,
}
}
pub fn clone_empty(&self) -> Self {
Self {
inner: self.inner.clone_empty(),
is_clone: false,
}
}
pub fn add_subscriptions(&mut self, subscriptions: Vec<Subscription<T, D>>) {
let unique_subscriptions =
{ Self::unique_subscriptions_from_list(Some(self), subscriptions) };
{
let mut subscription_input = self.subscription_input.write();
*subscription_input += Self::subscription_input_from_list(&unique_subscriptions, true);
self.subscriptions
.write()
.extend(unique_subscriptions.clone());
}
if !self.is_subscribed() || unique_subscriptions.is_empty() {
return;
}
let Some(client) = self.client().upgrade().clone() else {
return;
};
if let Some(manager) = client.subscription_manager(true).write().as_mut() {
unique_subscriptions.iter().for_each(|subscription| {
subscription.entity.increase_subscriptions_count();
});
if let Some((_, handler)) = self.clones.read().iter().next() {
let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
manager.update(&handler, None);
}
};
}
pub fn sub_subscriptions(&mut self, subscriptions: Vec<Subscription<T, D>>) {
let removed: Vec<Subscription<T, D>> = {
let subscriptions_slot = self.subscriptions.read();
Self::unique_subscriptions_from_list(None, subscriptions)
.into_iter()
.filter(|subscription| subscriptions_slot.contains(subscription))
.collect()
};
{
let mut subscription_input = self.subscription_input.write();
*subscription_input -= Self::subscription_input_from_list(&removed, true);
let mut subscription_slot = self.subscriptions.write();
subscription_slot.retain(|subscription| !removed.contains(subscription));
}
if !self.is_subscribed() || removed.is_empty() {
return;
}
let Some(client) = self.client().upgrade().clone() else {
return;
};
removed.iter().for_each(|subscription| {
subscription.entity.decrease_subscriptions_count();
});
if let Some(manager) = client.subscription_manager(true).write().as_mut() {
if let Some((_, handler)) = self.clones.read().iter().next() {
let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
manager.update(&handler, Some(&removed));
}
};
}
fn subscription_input_from_list(
subscriptions: &[Subscription<T, D>],
include_inactive: bool,
) -> SubscriptionInput {
let input = subscriptions
.iter()
.map(|subscription| {
if !include_inactive && subscription.entity.subscriptions_count().eq(&0) {
return Default::default();
}
subscription.subscription_input.clone()
})
.sum();
input
}
fn unique_subscriptions_from_list(
set: Option<&Self>,
subscriptions: Vec<Subscription<T, D>>,
) -> Vec<Subscription<T, D>> {
let subscriptions_slot = if let Some(set) = set {
set.subscriptions.read().clone()
} else {
vec![]
};
let mut unique_subscriptions = Vec::with_capacity(subscriptions.len());
subscriptions.into_iter().for_each(|subscription| {
if !unique_subscriptions.contains(&subscription)
&& !subscriptions_slot.contains(&subscription)
{
unique_subscriptions.push(subscription);
}
});
unique_subscriptions
}
}
impl<T, D> Deref for SubscriptionSet<T, D>
where
T: Transport + Send + Sync + 'static,
D: Deserializer + Send + Sync + 'static,
{
type Target = SubscriptionSetRef<T, D>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T, D> DerefMut for SubscriptionSet<T, D>
where
T: Transport + Send + Sync,
D: Deserializer + Send + Sync,
{
fn deref_mut(&mut self) -> &mut Self::Target {
Arc::get_mut(&mut self.inner)
.expect("Multiple mutable references to the SubscriptionSet are not allowed")
}
}
impl<T, D> Clone for SubscriptionSet<T, D>
where
T: Transport + Send + Sync,
D: Deserializer + Send + Sync,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
is_clone: true,
}
}
}
impl<T, D> Drop for SubscriptionSet<T, D>
where
T: Transport + Send + Sync,
D: Deserializer + Send + Sync,
{
fn drop(&mut self) {
if self.is_clone {
return;
}
let Some(client) = self.client().upgrade().clone() else {
return;
};
if let Some(manager) = client.subscription_manager(false).write().as_mut() {
let mut clones = self.clones.write();
if clones.len().gt(&1) {
clones.retain(|instance_id, _| instance_id.ne(&self.instance_id));
} else if let Some((_, handler)) = clones.iter().next() {
let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
manager.unregister(&handler);
}
}
}
}
impl<T, D> Debug for SubscriptionSet<T, D>
where
T: Transport + Send + Sync + 'static,
D: Deserializer + Send + Sync + 'static,
{
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
write!(
f,
"SubscriptionSet {{ id: {}, subscription_input: {:?}, is_subscribed: {}, cursor: {:?}, \
options: {:?}, subscriptions: {:?}}}",
self.id,
self.subscription_input,
self.is_subscribed(),
self.cursor.read().clone(),
self.options,
self.subscriptions
)
}
}
impl<T, D> Add for SubscriptionSet<T, D>
where
T: Transport + Send + Sync,
D: Deserializer + Send + Sync,
{
type Output = SubscriptionSet<T, D>;
fn add(self, rhs: Self) -> Self::Output {
let mut subscriptions = {
let other_subscriptions = rhs.subscriptions.read();
SubscriptionSet::unique_subscriptions_from_list(
Some(&self),
other_subscriptions.clone(),
)
};
subscriptions.extend(self.subscriptions.read().clone());
SubscriptionSet::new_with_subscriptions(subscriptions, None)
}
}
impl<T, D> AddAssign for SubscriptionSet<T, D>
where
T: Transport + Send + Sync,
D: Deserializer + Send + Sync,
{
fn add_assign(&mut self, rhs: Self) {
self.add_subscriptions(rhs.subscriptions.read().clone());
}
}
impl<T, D> Sub for SubscriptionSet<T, D>
where
T: Transport + Send + Sync,
D: Deserializer + Send + Sync,
{
type Output = SubscriptionSet<T, D>;
fn sub(self, rhs: Self) -> Self::Output {
let removed: Vec<Subscription<T, D>> = {
let other_subscriptions = rhs.subscriptions.read();
let subscriptions_slot = self.subscriptions.read();
Self::unique_subscriptions_from_list(None, other_subscriptions.clone())
.into_iter()
.filter(|subscription| subscriptions_slot.contains(subscription))
.collect()
};
let mut subscriptions = self.subscriptions.read().clone();
subscriptions.retain(|subscription| !removed.contains(subscription));
SubscriptionSet::new_with_subscriptions(subscriptions, None)
}
}
impl<T, D> SubAssign for SubscriptionSet<T, D>
where
T: Transport + Send + Sync,
D: Deserializer + Send + Sync,
{
fn sub_assign(&mut self, rhs: Self) {
self.sub_subscriptions(rhs.subscriptions.read().clone());
}
}
impl<T, D> SubscriptionSetRef<T, D>
where
T: Transport + Send + Sync + 'static,
D: Deserializer + Send + Sync + 'static,
{
pub(crate) fn new(
entities: Vec<PubNubEntity<T, D>>,
options: Option<Vec<SubscriptionOptions>>,
) -> Arc<Self> {
let subscriptions = entities
.into_iter()
.map(|entity| entity.subscription(options.clone()))
.collect::<Vec<Subscription<T, D>>>();
Self::new_with_subscriptions(subscriptions, options)
}
pub(crate) fn new_with_subscriptions(
subscriptions: Vec<Subscription<T, D>>,
options: Option<Vec<SubscriptionOptions>>,
) -> Arc<Self> {
let subscription = subscriptions
.first()
.expect("At least one subscription expected.");
let subscription_state =
SubscriptionSetState::new(subscription.client(), subscriptions, options);
let subscription_set = Arc::new(Self {
instance_id: Uuid::new_v4().to_string(),
state: Arc::new(subscription_state),
event_dispatcher: Default::default(),
});
subscription_set.store_clone(
subscription_set.instance_id.clone(),
Arc::downgrade(&subscription_set),
);
subscription_set
}
pub fn clone_empty(&self) -> Arc<Self> {
let instance_id = Uuid::new_v4().to_string();
let instance = Arc::new(Self {
instance_id: instance_id.clone(),
state: Arc::clone(&self.state),
event_dispatcher: Default::default(),
});
self.store_clone(instance_id, Arc::downgrade(&instance));
instance
}
pub(super) fn current_timetoken(&self) -> usize {
let cursor = self.cursor.read();
cursor
.as_ref()
.and_then(|cursor| cursor.timetoken.parse::<usize>().ok())
.unwrap_or(0)
}
pub(super) fn is_subscribed(&self) -> bool {
*self.is_subscribed.read()
}
fn register_with_cursor(&self, cursor: Option<SubscriptionCursor>) {
let Some(client) = self.client().upgrade().clone() else {
return;
};
{
let manager = client.subscription_manager(true);
if let Some(manager) = manager.write().as_mut() {
self.subscriptions.read().iter().for_each(|subscription| {
subscription.entity.increase_subscriptions_count();
});
if let Some((_, handler)) = self.clones.read().iter().next() {
let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
manager.register(&handler, cursor);
}
};
}
}
fn filtered_events(&self, events: &[Update]) -> Vec<Update> {
let subscription_input = self.subscription_input(true);
let current_timetoken = self.current_timetoken();
events
.iter()
.filter(|event| {
subscription_input.contains(&event.subscription())
&& event.event_timestamp().ge(¤t_timetoken)
})
.cloned()
.collect::<Vec<Update>>()
}
}
impl<T, D> Deref for SubscriptionSetRef<T, D>
where
T: Transport + Send + Sync + 'static,
D: Deserializer + Send + Sync + 'static,
{
type Target = SubscriptionSetState<T, D>;
fn deref(&self) -> &Self::Target {
&self.state
}
}
impl<T, D> DerefMut for SubscriptionSetRef<T, D>
where
T: Transport + Send + Sync,
D: Deserializer + Send + Sync,
{
fn deref_mut(&mut self) -> &mut Self::Target {
Arc::get_mut(&mut self.state)
.expect("Multiple mutable references to the SubscriptionSetRef are not allowed")
}
}
impl<T, D> EventSubscriber for SubscriptionSetRef<T, D>
where
T: Transport + Send + Sync + 'static,
D: Deserializer + Send + Sync + 'static,
{
fn subscribe(&self) {
let mut is_subscribed = self.is_subscribed.write();
if *is_subscribed {
return;
}
*is_subscribed = true;
self.register_with_cursor(self.cursor.read().clone())
}
fn subscribe_with_timetoken<SC>(&self, cursor: SC)
where
SC: Into<SubscriptionCursor>,
{
let mut is_subscribed = self.is_subscribed.write();
if *is_subscribed {
return;
}
*is_subscribed = true;
let user_cursor = cursor.into();
let cursor = user_cursor.is_valid().then_some(user_cursor);
{
if cursor.is_some() {
let mut cursor_slot = self.cursor.write();
if let Some(current_cursor) = cursor_slot.as_ref() {
let catchup_cursor = cursor.clone().unwrap_or_default();
catchup_cursor
.gt(current_cursor)
.then(|| *cursor_slot = Some(catchup_cursor));
} else {
*cursor_slot = cursor.clone();
}
}
}
self.register_with_cursor(cursor);
}
fn unsubscribe(&self) {
{
let mut is_subscribed_slot = self.is_subscribed.write();
if !*is_subscribed_slot {
return;
}
*is_subscribed_slot = false;
let mut cursor = self.cursor.write();
*cursor = None;
}
let Some(client) = self.client().upgrade().clone() else {
return;
};
{
if let Some(manager) = client.subscription_manager(false).write().as_mut() {
self.subscriptions.read().iter().for_each(|subscription| {
subscription.entity.increase_subscriptions_count();
});
if let Some((_, handler)) = self.clones.read().iter().next() {
let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
manager.unregister(&handler);
}
};
}
}
}
impl<T, D> EventHandler<T, D> for SubscriptionSetRef<T, D>
where
T: Transport + Send + Sync + 'static,
D: Deserializer + Send + Sync + 'static,
{
fn handle_events(&self, cursor: SubscriptionCursor, events: &[Update]) {
if !self.is_subscribed() {
return;
}
let filtered_events = self.filtered_events(events);
let mut cursor_slot = self.cursor.write();
if let Some(current_cursor) = cursor_slot.as_ref() {
cursor
.gt(current_cursor)
.then(|| *cursor_slot = Some(cursor));
} else {
*cursor_slot = Some(cursor);
}
self.clones.write().retain(|_, handler| {
if let Some(handler) = handler.upgrade().clone() {
handler
.event_dispatcher
.handle_events(filtered_events.clone());
return true;
}
false
});
}
fn subscription_input(&self, include_inactive: bool) -> SubscriptionInput {
SubscriptionSet::subscription_input_from_list(&self.subscriptions.read(), include_inactive)
}
fn invalidate(&self) {
{
let mut is_subscribed = self.is_subscribed.write();
if !*is_subscribed {
return;
}
*is_subscribed = false;
}
self.subscriptions
.read()
.iter()
.for_each(|subscription| subscription.invalidate());
self.event_dispatcher.invalidate();
}
fn id(&self) -> &String {
&self.id
}
fn client(&self) -> Weak<PubNubClientInstance<T, D>> {
self.client.clone()
}
}
impl<T, D> EventEmitter for SubscriptionSetRef<T, D>
where
T: Transport + Send + Sync,
D: Deserializer + Send + Sync,
{
fn messages_stream(&self) -> DataStream<Message> {
self.event_dispatcher.messages_stream()
}
fn signals_stream(&self) -> DataStream<Message> {
self.event_dispatcher.signals_stream()
}
fn message_actions_stream(&self) -> DataStream<MessageAction> {
self.event_dispatcher.message_actions_stream()
}
fn files_stream(&self) -> DataStream<File> {
self.event_dispatcher.files_stream()
}
fn app_context_stream(&self) -> DataStream<AppContext> {
self.event_dispatcher.app_context_stream()
}
fn presence_stream(&self) -> DataStream<Presence> {
self.event_dispatcher.presence_stream()
}
fn stream(&self) -> DataStream<Update> {
self.event_dispatcher.stream()
}
}
impl<T, D> SubscriptionSetState<T, D>
where
T: Transport + Send + Sync + 'static,
D: Deserializer + Send + Sync + 'static,
{
fn new(
client: Weak<PubNubClientInstance<T, D>>,
subscriptions: Vec<Subscription<T, D>>,
options: Option<Vec<SubscriptionOptions>>,
) -> SubscriptionSetState<T, D> {
Self {
id: Uuid::new_v4().to_string(),
client,
subscription_input: RwLock::new(SubscriptionSet::subscription_input_from_list(
&subscriptions,
true,
)),
is_subscribed: Default::default(),
cursor: Default::default(),
subscriptions: RwLock::new(SubscriptionSet::unique_subscriptions_from_list(
None,
subscriptions,
)),
options,
clones: Default::default(),
}
}
fn store_clone(&self, instance_id: String, instance: Weak<SubscriptionSetRef<T, D>>) {
let mut clones = self.clones.write();
(!clones.contains_key(&instance_id)).then(|| clones.insert(instance_id, instance));
}
}
#[cfg(test)]
mod it_should {
use super::*;
use crate::{Channel, Keyset, PubNubClient, PubNubClientBuilder};
fn client() -> PubNubClient {
PubNubClientBuilder::with_reqwest_transport()
.with_keyset(Keyset {
subscribe_key: "demo",
publish_key: Some("demo"),
secret_key: None,
})
.with_user_id("user")
.build()
.unwrap()
}
#[test]
fn create_subscription_set_from_entities() {
let client = Arc::new(client());
let channels = vec!["channel_1", "channel_2"]
.into_iter()
.map(|name| PubNubEntity::Channel(Channel::new(&client, name)))
.collect();
let subscription_set = SubscriptionSet::new(channels, None);
assert!(!subscription_set.is_subscribed());
assert!(subscription_set
.subscription_input
.read()
.contains("channel_1"));
assert!(subscription_set
.subscription_input
.read()
.contains("channel_2"));
}
#[test]
fn preserve_id_between_clones() {
let client = Arc::new(client());
let channels = vec!["channel_1", "channel_2"]
.into_iter()
.map(|name| PubNubEntity::Channel(Channel::new(&client, name)))
.collect();
let subscription_set = SubscriptionSet::new(channels, None);
assert_eq!(
subscription_set.clone().id.clone(),
subscription_set.id.clone()
);
}
#[test]
fn not_preserve_listeners_between_clones() {
let client = Arc::new(client());
let channels = vec!["channel_1", "channel_2"]
.into_iter()
.map(|name| PubNubEntity::Channel(Channel::new(&client, name)))
.collect();
let subscription_set = SubscriptionSet::new(channels, None);
let _ = subscription_set.messages_stream();
assert_eq!(
subscription_set
.clone()
.event_dispatcher
.message_streams
.read()
.as_ref()
.unwrap()
.len(),
1
);
assert!(subscription_set
.clone_empty()
.event_dispatcher
.message_streams
.read()
.as_ref()
.is_none());
}
#[test]
fn concat_subscriptions() {
let client = Arc::new(client());
let channels_1_subscriptions = vec!["channel_1", "channel_2"]
.into_iter()
.map(|name| client.channel(name).subscription(None))
.collect::<Vec<Subscription<_, _>>>();
let channels_2_subscriptions = vec!["channel_3", "channel_4"]
.into_iter()
.map(|name| client.channel(name).subscription(None))
.collect::<Vec<Subscription<_, _>>>();
let channels_3_subscriptions = [
channels_1_subscriptions[0].clone(),
channels_2_subscriptions[1].clone(),
];
let mut subscription_set_1 =
channels_1_subscriptions[0].clone() + channels_1_subscriptions[1].clone();
let subscription_set_2 =
channels_2_subscriptions[0].clone() + channels_2_subscriptions[1].clone();
let subscription_set_3 =
channels_3_subscriptions[0].clone() + channels_3_subscriptions[1].clone();
subscription_set_1 += subscription_set_2;
assert!(subscription_set_1
.subscription_input(true)
.contains_channel("channel_3"));
subscription_set_1 -= subscription_set_3;
assert!(!subscription_set_1
.subscription_input(true)
.contains_channel("channel_1"));
}
}