use std::any::TypeId;
use std::future::Future;
use async_trait::async_trait;
use tracing::*;
use crate::message::{SubscribeBroker, UnsubscribeBroker};
use crate::traits::{ActonMessage, AgentHandleInterface};
use crate::traits::subscriber::Subscriber;
#[async_trait]
pub trait Subscribable: Send + Sync + 'static { fn subscribe<M: ActonMessage + Send + Sync + 'static>(
&self,
) -> impl Future<Output = ()> + Send + Sync + '_
where
Self: AgentHandleInterface + Subscriber;
fn unsubscribe<M: ActonMessage>(&self)
where
Self: AgentHandleInterface + Subscriber;
}
#[async_trait]
impl<T> Subscribable for T
where
T: AgentHandleInterface + Subscriber + Send + Sync + 'static,
{
#[instrument(skip(self), fields(message_type = std::any::type_name::<M>(), subscriber = %self.id()))]
fn subscribe<M: ActonMessage + Send + Sync + 'static>(
&self,
) -> impl Future<Output = ()> + Send + Sync + '_
{
let subscriber_id = self.id();
let message_type_id = TypeId::of::<M>();
let message_type_name = std::any::type_name::<M>().to_string();
let subscription = SubscribeBroker {
subscriber_id: subscriber_id.clone(), message_type_id,
subscriber_context: self.clone_ref(), };
let broker_option = self.get_broker();
async move {
trace!( type_id=?message_type_id, subscriber = %subscriber_id, "Sending subscription request");
if let Some(broker_handle) = broker_option {
trace!(broker = %broker_handle.id(), "Sending SubscribeBroker message");
broker_handle.send(subscription).await;
} else {
error!(subscriber = %subscriber_id, message_type = %message_type_name, "Cannot subscribe: No broker found.");
}
}
}
#[instrument(skip(self), fields(message_type = std::any::type_name::<M>(), subscriber = %self.id()))]
fn unsubscribe<M: ActonMessage>(&self)
{
let type_id = TypeId::of::<M>();
let type_name = std::any::type_name::<M>();
let subscriber_id = self.id(); let broker_option = self.get_broker();
trace!(type_id = ?type_id, subscriber = %subscriber_id, "Initiating unsubscribe request for type {}", type_name);
if let Some(broker_handle) = broker_option {
let unsubscription = UnsubscribeBroker {
};
tokio::spawn(async move {
trace!(broker = %broker_handle.id(), type_id = ?type_id, "Sending UnsubscribeBroker message");
broker_handle.send(unsubscription).await;
});
} else {
error!(subscriber = %subscriber_id, message_type = %type_name, "Cannot unsubscribe: No broker found.");
}
}
}