use std::fmt::Debug;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use super::pubsub::{SubReceiver, TopicTree};
use super::{Error, Spec};
pub trait SubscriptionRequest {
type Topic;
type SubscriptionId;
fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error>;
fn subscription_name(&self) -> Arc<Self::SubscriptionId>;
}
#[allow(missing_debug_implementations)]
pub struct ActiveSubscription<S>
where
S: Spec + 'static,
{
id: usize,
name: Arc<S::SubscriptionId>,
active_subscribers: Arc<AtomicUsize>,
topics: TopicTree<S>,
subscribed_to: Vec<S::Topic>,
receiver: Option<SubReceiver<S>>,
}
impl<S> ActiveSubscription<S>
where
S: Spec + 'static,
{
pub fn new(
id: usize,
name: Arc<S::SubscriptionId>,
active_subscribers: Arc<AtomicUsize>,
topics: TopicTree<S>,
subscribed_to: Vec<S::Topic>,
receiver: Option<SubReceiver<S>>,
) -> Self {
Self {
id,
name,
active_subscribers,
subscribed_to,
topics,
receiver,
}
}
pub async fn recv(&mut self) -> Option<S::Event> {
self.receiver.as_mut()?.recv().await.map(|(_, event)| event)
}
pub fn try_recv(&mut self) -> Option<S::Event> {
self.receiver
.as_mut()?
.try_recv()
.ok()
.map(|(_, event)| event)
}
pub fn name(&self) -> &S::SubscriptionId {
&self.name
}
}
impl<S> Drop for ActiveSubscription<S>
where
S: Spec + 'static,
{
fn drop(&mut self) {
let mut topics = self.topics.write();
for index in self.subscribed_to.drain(..) {
topics.remove(&(index, self.id));
}
self.active_subscribers
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
}
#[derive(Debug)]
pub struct Subscriber<S>
where
S: Spec + 'static,
{
subscription: Arc<S::SubscriptionId>,
inner: mpsc::Sender<(Arc<S::SubscriptionId>, S::Event)>,
latest: Arc<Mutex<Option<S::Event>>>,
}
impl<S> Clone for Subscriber<S>
where
S: Spec + 'static,
{
fn clone(&self) -> Self {
Self {
subscription: self.subscription.clone(),
inner: self.inner.clone(),
latest: self.latest.clone(),
}
}
}
impl<S> Subscriber<S>
where
S: Spec + 'static,
{
pub fn new(
subscription: Arc<S::SubscriptionId>,
inner: &mpsc::Sender<(Arc<S::SubscriptionId>, S::Event)>,
) -> Self {
Self {
inner: inner.clone(),
subscription,
latest: Arc::new(Mutex::new(None)),
}
}
pub fn send(&self, event: S::Event) {
let mut latest = if let Ok(reader) = self.latest.lock() {
reader
} else {
let _ = self.inner.try_send((self.subscription.to_owned(), event));
return;
};
if let Some(last_event) = latest.replace(event.clone()) {
if last_event == event {
return;
}
}
let _ = self.inner.try_send((self.subscription.to_owned(), event));
}
}