use std::any::TypeId;
use std::collections::HashSet;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use acton_ern::{Ern};
use dashmap::DashMap;
use futures::future::join_all;
use tracing::*;
use crate::actor::{AgentConfig, Idle, ManagedAgent};
use crate::common::{AgentHandle, BrokerRef};
use crate::message::{BrokerRequest, BrokerRequestEnvelope, SubscribeBroker};
use crate::traits::Actor;
#[derive(Default, Debug, Clone)]
pub struct AgentBroker {
subscribers: Subscribers,
agent_handle: AgentHandle,
}
type Subscribers = Arc<DashMap<TypeId, HashSet<(Ern, AgentHandle)>>>; impl Deref for AgentBroker {
type Target = AgentHandle;
fn deref(&self) -> &Self::Target {
&self.agent_handle
}
}
impl DerefMut for AgentBroker {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.agent_handle
}
}
impl AgentBroker {
#[instrument]
pub(crate) async fn initialize() -> BrokerRef {
let actor_config = AgentConfig::new(Ern::with_root("broker_main").unwrap(), None, None)
.expect("Couldn't create initial broker config");
let mut broker: ManagedAgent<Idle, AgentBroker> =
ManagedAgent::new(&None, Some(actor_config)).await;
broker
.act_on::<BrokerRequest>(|actor, event| {
trace!( "broadcasting request: {:?}", event.message);
let subscribers = actor.model.subscribers.clone();
let message = event.message.clone();
Box::pin(async move {
AgentBroker::broadcast(subscribers, message).await;
})
})
.act_on::<SubscribeBroker>(|actor, event| {
let message = event.message.clone();
let message_type_id = message.message_type_id;
let subscriber_context = message.subscriber_context.clone();
let subscriber_id = message.subscriber_id.clone();
trace!("subscribe from {} for {}", subscriber_id.root.to_string(), actor.handle.name());
let subscribers = actor.model.subscribers.clone();
Box::pin(async move {
subscribers
.entry(message_type_id)
.or_default()
.insert((subscriber_id.clone(), subscriber_context.clone()));
})
});
trace!("Activating the BrokerActor.");
let mut handle = broker.start().await;
handle.broker = Box::from(Some(handle.clone()));
handle
}
pub async fn broadcast(
subscribers: Subscribers,
request: BrokerRequest,
) {
let message_type_id = &request.message.as_ref().type_id();
trace!(" Subscriber count for message type: {:?} is {:?}", message_type_id, subscribers.get(message_type_id).map(|x| x.len()));
if let Some(subscribers) = subscribers.get(message_type_id) {
let futures = subscribers.value().clone().into_iter().map(|(_, subscriber_context)| {
let subscriber_context = subscriber_context.clone();
let message: BrokerRequestEnvelope = request.clone().into();
async move {
trace!("Broadcasting message to subscriber: {:?}", subscriber_context.name());
subscriber_context.send(message).await;
}
});
join_all(futures).await;
}
}
}