use std::any::TypeId;
use std::collections::HashSet;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use acton_ern::Ern;
use dashmap::DashMap;
use futures::future::join_all;
use tracing::*;
use crate::actor::{AgentConfig, Idle, ManagedAgent};
use crate::common::{AgentHandle, AgentRuntime, BrokerRef};
use crate::message::{BrokerRequest, BrokerRequestEnvelope, SubscribeBroker};
use crate::traits::AgentHandleInterface;
#[derive(Default, Debug, Clone)]
pub struct AgentBroker {
subscribers: Subscribers,
agent_handle: AgentHandle,
}
type Subscribers = Arc<DashMap<TypeId, HashSet<(Ern, AgentHandle)>>>;
impl Deref for AgentBroker {
type Target = AgentHandle;
#[inline]
fn deref(&self) -> &Self::Target {
&self.agent_handle
}
}
impl DerefMut for AgentBroker {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.agent_handle
}
}
impl AgentBroker {
#[instrument]
pub(crate) async fn initialize(runtime: AgentRuntime) -> BrokerRef {
let actor_config = AgentConfig::new(Ern::with_root("broker_main").unwrap(), None, None)
.expect("Couldn't create initial broker config");
assert!(
!runtime.0.cancellation_token.is_cancelled(),
"ActonInner cancellation_token must be present and active before creating ManagedAgent in AgentBroker::initialize"
);
let mut broker_agent: ManagedAgent<Idle, AgentBroker> =
ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
broker_agent
.mutate_on::<BrokerRequest>(|agent, event| {
trace!(message_type = ?event.message.message_type_id, "Broker received BrokerRequest");
let subscribers = agent.model.subscribers.clone(); let message_to_broadcast = event.message.clone();
Box::pin(async move {
AgentBroker::broadcast(subscribers, message_to_broadcast).await;
})
})
.act_on::<SubscribeBroker>(|agent, event| {
let subscription_msg = event.message.clone();
let type_id = subscription_msg.message_type_id;
let subscriber_handle = subscription_msg.subscriber_context.clone();
let subscriber_id = subscription_msg.subscriber_id.clone();
trace!(subscriber = %subscriber_id, message_type = ?type_id, "Broker received SubscribeBroker");
let subscribers_map = agent.model.subscribers.clone(); Box::pin(async move {
let subscriber_id_for_insert = subscriber_id.clone(); subscribers_map
.entry(type_id)
.or_default() .insert((subscriber_id_for_insert, subscriber_handle)); trace!(subscriber = %subscriber_id, message_type = ?type_id, "Subscription added"); })
});
trace!("Starting the AgentBroker agent...");
let mut broker_handle = broker_agent.start().await;
broker_handle.broker = Box::from(Some(broker_handle.clone()));
trace!("AgentBroker started with handle ID: {}", broker_handle.id());
broker_handle
}
pub async fn broadcast(
subscribers: Subscribers, request: BrokerRequest,
) {
let message_type_id = request.message_type_id; trace!(message_type = ?message_type_id, "Broadcasting message");
if let Some(subscribers_set) = subscribers.get(&message_type_id) {
let num_subscribers = subscribers_set.len();
trace!(count = num_subscribers, message_type = ?message_type_id, "Found subscribers");
let send_futures = subscribers_set.value().iter().map(|(_, subscriber_handle)| {
let handle = subscriber_handle.clone();
let envelope_to_send: BrokerRequestEnvelope = request.clone().into();
async move {
trace!(subscriber = %handle.id(), message_type = ?message_type_id, "Sending broadcast");
let _ = handle.send(envelope_to_send).await;
}
});
join_all(send_futures).await;
trace!(count = num_subscribers, message_type = ?message_type_id, "Broadcast sends completed");
} else {
trace!(message_type = ?message_type_id, "No subscribers found for message type");
}
}
}