use std::any::TypeId;
use std::future::Future;
use async_trait::async_trait;
use tracing::*;
use crate::message::{SubscribeBroker, UnsubscribeBroker};
use crate::traits::{ActonMessage, Actor};
use crate::traits::subscriber::Subscriber;
#[async_trait]
pub trait Subscribable {
fn subscribe<T: ActonMessage + Send + Sync + 'static>(
&self,
) -> impl Future<Output=()> + Send + Sync + '_
where
Self: Actor + Subscriber;
fn unsubscribe<T: ActonMessage>(&self)
where
Self: Actor + Subscriber + Send + Sync + 'static;
}
#[async_trait]
impl<T> Subscribable for T
where
T: ActonMessage + Send + Sync + 'static,
{
fn subscribe<M: ActonMessage + Send + Sync + 'static>(
&self,
) -> impl Future<Output=()> + Send + Sync + '_
where
Self: Actor + Subscriber + 'static,
{
let subscriber_id = self.id();
let message_type_id = TypeId::of::<M>();
let message_type_name = std::any::type_name::<M>().to_string();
let subscription = SubscribeBroker {
subscriber_id,
message_type_id,
subscriber_context: self.clone_ref(),
};
let broker = self.get_broker();
let ern = self.id().clone();
async move {
trace!( type_id=?message_type_id, subscriber_ern = ern.to_string(), "Subscribing to type_name {}", message_type_name);
if let Some(broadcast_broker) = broker {
let broker_key = broadcast_broker.name();
trace!(
"Subscribing to type_name {} with {}",
message_type_name,
broker_key
);
broadcast_broker.send(subscription).await;
} else {
error!( subscriber_ern = ern.to_string(), "No broker found for type_name {}", message_type_name);
}
}
}
fn unsubscribe<M: ActonMessage>(&self)
where
Self: Actor + Subscriber,
{
let subscription = UnsubscribeBroker {
};
let broker = self.get_broker();
if let Some(broker) = broker {
let broker = broker.clone();
tokio::spawn(async move {
broker.send(subscription).await;
});
}
trace!(
type_id = ?TypeId::of::<M>(),
repository_actor = self.id().to_string(),
"Unsubscribed to {}",
std::any::type_name::<M>()
);
}
}