use async_trait::async_trait;
use std::sync::Arc;
use std::collections::HashMap;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use super::subscription_handler::{
SubscriptionFilter,
SubscriptionHandler,
SubscriptionResponse,
SubscriptionResponseTx,
SubscriptionBroadcast,
SubscriptionBroadcastTx,
SubscriptionBroadcastRx
};
struct SubscriptionItem<R> {
id : usize,
broadcast_tx: SubscriptionBroadcastTx<R>,
filter: Box<dyn SubscriptionFilter<R> + Send + Sync>,
}
pub struct DefaultSubscriptionFilter;
impl<R> SubscriptionFilter<R> for DefaultSubscriptionFilter {
fn matches(&self, _message: &R) -> bool {
true
}
}
#[async_trait]
pub trait SubscriptionNode<R> {
async fn subscription_message(&mut self, message: R) -> Result<(), anyhow::Error>;
}
pub struct SubscriptionManagerV2<R>
where
R: Clone + Send + Sync + 'static,
{
subscriptions: Arc<Mutex<HashMap<String, Vec<SubscriptionItem<R>>>>>,
id_count : usize,
}
impl<R> SubscriptionManagerV2<R>
where
R: Clone + Send + Sync + 'static,
{
pub fn new() -> Self {
Self {
subscriptions: Arc::new(Mutex::new(HashMap::new())),
id_count : 0
}
}
pub fn spawn_message_processor<H>(
&self,
mut receiver: SubscriptionBroadcastRx<R>,
mut handler: H,
) where
H: SubscriptionNode<R> + Send + 'static,
R: Clone + Send + Sync + 'static,
{
tokio::spawn(async move {
while let Some(message) = receiver.recv().await {
if let Err(err) = handler.subscription_message(
message.message
).await {
log::error!("Error in message handler: {}", err);
}
}
});
}
}
#[async_trait]
impl<R> SubscriptionHandler<R> for SubscriptionManagerV2<R>
where
R: Clone + Send + Sync + 'static,
{
async fn subscribe(
&mut self,
topic: String,
broadcast_tx: SubscriptionBroadcastTx<R>,
respond_to: SubscriptionResponseTx
) {
log::info!("Subcription Manager subscribing to topic: {} with id {}", topic, self.id_count + 1);
let mut subscriptions = self.subscriptions.lock().await;
subscriptions
.entry(topic.clone())
.or_insert_with(Vec::new)
.push(SubscriptionItem {
broadcast_tx,
id : self.id_count + 1,
filter: Box::new(DefaultSubscriptionFilter),
});
let count = subscriptions.get(&topic).unwrap().len();
log::info!("Subcription Manager topic: {} has {} subscribers", topic, count);
self.id_count += 1;
let _ = respond_to.send(SubscriptionResponse{
id : self.id_count
});
}
async fn subscribe_with_filter(
&mut self,
topic: String,
filter: Box<dyn SubscriptionFilter<R> + Send + Sync>,
broadcast_tx: SubscriptionBroadcastTx<R>,
respond_to: SubscriptionResponseTx
) {
let mut subscriptions = self.subscriptions.lock().await;
subscriptions
.entry(topic)
.or_insert_with(Vec::new)
.push(SubscriptionItem {
broadcast_tx,
id : self.id_count + 1,
filter,
});
self.id_count += 1;
let _ = respond_to.send(SubscriptionResponse{
id : self.id_count
});
}
async fn unsubscribe(
&mut self,
id: usize,
respond_to: SubscriptionResponseTx
) {
let mut subscriptions = self.subscriptions.lock().await;
for (_, subscribers) in subscriptions.iter_mut() {
subscribers.retain(|subscriber| subscriber.id != id);
}
let mut topics_to_remove = Vec::new();
for (topic, _) in subscriptions.iter() {
log::info!("Subcription Manager topic: {} has {} subscribers", topic, subscriptions.get(topic).unwrap().len());
if subscriptions.get(topic).map_or(true, |v| v.is_empty()) {
topics_to_remove.push(topic.clone());
}
}
for topic in topics_to_remove {
subscriptions.remove(&topic);
}
let _ = respond_to.send(SubscriptionResponse{
id : id
});
}
async fn broadcast(&self, topic: String, message: R) -> Result<(), anyhow::Error> {
log::debug!("Subcription Manager checking topic: {}", topic);
let subscriptions = self.subscriptions.lock().await;
for (k,v) in subscriptions.iter() {
log::debug!("Subcription Manager topic: {} has {} subscribers", k, v.len());
if k == topic.as_str() {
log::debug!("k {} == topic {}", k, topic);
}
else {
log::debug!("k {} != topic {}", k, topic);
}
}
if let Some(subscribers) = subscriptions.get(&topic) {
for subscriber in subscribers {
log::debug!("Subcription Checking filter on: {}", topic);
if subscriber.filter.matches(&message) {
log::debug!("Subcription Manager broadcasting topic: {}", topic);
if let Err(err) = subscriber.broadcast_tx.send(
SubscriptionBroadcast {
id : subscriber.id,
topic : topic.clone(),
message : message.clone()
}
).await {
log::error!("SubscriptionManager broadcast failed: {}", err);
}
}
}
}
else {
log::debug!("Subcription Manager no subscribers for topic: {}", topic);
}
Ok(())
}
}