use std::fmt::Debug;
use std::future::Future;
use std::hash::{Hash, Hasher};
use acton_ern::Ern;
use async_trait::async_trait;
use dashmap::DashMap;
use tokio::sync::mpsc;
use tokio_util::task::TaskTracker;
use tracing::{error, instrument, trace, warn};
use crate::actor::{Idle, ManagedAgent};
use crate::common::{AgentSender, BrokerRef, OutboundEnvelope, ParentRef};
use crate::message::{BrokerRequest, MessageAddress, SystemSignal};
use crate::prelude::ActonMessage;
use crate::traits::{AgentHandleInterface, Broker, Subscriber};
#[derive(Debug, Clone)]
pub struct AgentHandle {
pub(crate) id: Ern,
pub(crate) outbox: AgentSender,
tracker: TaskTracker,
pub parent: Option<Box<ParentRef>>,
pub broker: Box<Option<BrokerRef>>,
children: DashMap<String, AgentHandle>,
pub(crate) cancellation_token: tokio_util::sync::CancellationToken,
}
impl Default for AgentHandle {
fn default() -> Self {
let (outbox, _) = mpsc::channel(1); AgentHandle {
id: Ern::default(),
outbox,
tracker: TaskTracker::new(),
parent: None,
broker: Box::new(None),
children: DashMap::new(),
cancellation_token: tokio_util::sync::CancellationToken::new(),
}
}
}
impl Subscriber for AgentHandle {
fn get_broker(&self) -> Option<BrokerRef> {
*self.broker.clone() }
}
impl PartialEq for AgentHandle {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for AgentHandle {}
impl Hash for AgentHandle {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
impl AgentHandle {
#[instrument(skip(self, child))] pub async fn supervise<State: Default + Send + Debug + 'static>(
&self,
child: ManagedAgent<Idle, State>,
) -> anyhow::Result<AgentHandle> {
let child_id = child.id().clone(); trace!("Supervising child agent with id: {}", child_id);
let handle = child.start().await; trace!(
"Child agent {} started, adding to parent {} children map",
child_id,
self.id
);
self.children.insert(handle.id.to_string(), handle.clone()); Ok(handle)
}
}
impl Broker for AgentHandle {
fn broadcast(&self, message: impl ActonMessage) -> impl Future<Output = ()> + Send + Sync + '_ {
trace!("Attempting broadcast via handle: {}", self.id);
async move {
if let Some(broker_handle) = self.broker.as_ref() {
trace!("Broker found for handle {}, sending BrokerRequest", self.id);
broker_handle.send(BrokerRequest::new(message)).await;
} else {
error!(
"No broker configured for agent handle {}, cannot broadcast.",
self.id
);
}
}
}
}
#[async_trait]
impl AgentHandleInterface for AgentHandle {
#[inline]
fn reply_address(&self) -> MessageAddress {
MessageAddress::new(self.outbox.clone(), self.id.clone())
}
#[instrument(skip(self))]
fn create_envelope(&self, recipient_address: Option<MessageAddress>) -> OutboundEnvelope {
let return_address = self.reply_address();
trace!(sender = %return_address.sender.root, recipient = ?recipient_address.as_ref().map(|r| r.sender.root.as_str()), "Creating envelope");
if let Some(recipient) = recipient_address {
OutboundEnvelope::new_with_recipient(
return_address,
recipient,
self.cancellation_token.clone(),
)
} else {
OutboundEnvelope::new(return_address, self.cancellation_token.clone())
}
}
#[inline]
fn children(&self) -> DashMap<String, AgentHandle> {
self.children.clone()
}
#[instrument(skip(self))]
fn find_child(&self, ern: &Ern) -> Option<AgentHandle> {
trace!("Searching for child with ERN: {}", ern);
self.children.get(&ern.to_string()).map(
|entry| entry.value().clone(), )
}
#[inline]
fn tracker(&self) -> TaskTracker {
self.tracker.clone()
}
#[inline]
fn id(&self) -> Ern {
self.id.clone()
}
#[inline]
fn name(&self) -> String {
self.id.root.to_string()
}
#[inline]
fn clone_ref(&self) -> AgentHandle {
self.clone()
}
#[allow(clippy::manual_async_fn)] #[instrument(skip(self))]
fn stop(&self) -> impl Future<Output = anyhow::Result<()>> + Send + Sync + '_ {
async move {
let tracker = self.tracker();
let self_envelope = self.create_envelope(Some(self.reply_address()));
trace!(actor = %self.id, "Sending Terminate signal");
self_envelope.send(SystemSignal::Terminate).await;
tracker.wait().await;
trace!(actor = %self.id, "Agent terminated successfully.");
Ok(())
}
}
}