use std::fmt::Debug;
use std::future::Future;
use std::hash::{Hash, Hasher};
use acton_ern::{Ern};
use async_trait::async_trait;
use dashmap::DashMap;
use tokio::sync::mpsc;
use tokio_util::task::TaskTracker;
use tracing::{error, instrument, trace, warn};
use crate::actor::{Idle, ManagedAgent};
use crate::common::{BrokerRef, OutboundEnvelope, Outbox, ParentRef};
use crate::message::{BrokerRequest, MessageAddress, SystemSignal};
use crate::prelude::ActonMessage;
use crate::traits::{Actor, Broker, Subscriber};
#[derive(Debug, Clone)]
pub struct AgentHandle {
pub(crate) id: Ern,
pub(crate) outbox: Outbox,
tracker: TaskTracker,
pub parent: Option<Box<ParentRef>>,
pub broker: Box<Option<BrokerRef>>,
children: DashMap<String, AgentHandle>,
}
impl Default for AgentHandle {
fn default() -> Self {
let (outbox, _) = mpsc::channel(1);
AgentHandle {
id: Ern::default(),
outbox,
tracker: TaskTracker::new(),
parent: None,
broker: Box::new(None),
children: DashMap::new(),
}
}
}
impl Subscriber for AgentHandle {
fn get_broker(&self) -> Option<BrokerRef> {
*self.broker.clone()
}
}
impl PartialEq for AgentHandle {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for AgentHandle {}
impl Hash for AgentHandle {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
impl AgentHandle {
#[instrument(skip(self))]
pub async fn supervise<State: Default + Send + Debug>(
&self,
child: ManagedAgent<Idle, State>,
) -> anyhow::Result<AgentHandle> {
trace!("Adding child actor with id: {}", child.id);
let handle = child.start().await;
let id = handle.id.clone();
trace!("Now have child id in context: {}", id);
self.children.insert(id.to_string(), handle.clone());
Ok(handle)
}
}
impl Broker for AgentHandle {
#[instrument(skip(self), name = "broadcast")]
fn broadcast(&self, message: impl ActonMessage) -> impl Future<Output=()> + Send + Sync + '_ {
trace!("Looking for a broker to broadcast message.");
async move {
if let Some(broker) = self.broker.as_ref() {
broker.send(BrokerRequest::new(message)).await;
} else {
error!("No broker found to broadcast message.");
}
}
}
}
#[async_trait]
impl Actor for AgentHandle {
fn reply_address(&self) -> MessageAddress {
MessageAddress::new(self.outbox.clone(), self.id.clone())
}
#[instrument(skip(self))]
fn create_envelope(&self, recipient_address: Option<MessageAddress>) -> OutboundEnvelope {
trace!( "self id is {}", self.id);
let return_address = self.reply_address();
trace!( "return_address is {}", return_address.sender.root);
if let Some(recipient) = recipient_address {
OutboundEnvelope::new_with_recipient(return_address, recipient)
} else {
OutboundEnvelope::new(return_address)
}
}
fn children(&self) -> DashMap<String, AgentHandle> {
self.children.clone()
}
#[instrument(skip(self))]
fn find_child(&self, arn: &Ern) -> Option<AgentHandle> {
trace!("Searching for child with ARN: {}", arn);
self.children.get(&arn.to_string()).map(|item|
item.value().clone()
)
}
fn tracker(&self) -> TaskTracker {
self.tracker.clone()
}
fn id(&self) -> Ern {
self.id.clone()
}
fn name(&self) -> String {
self.id.root.to_string()
}
fn clone_ref(&self) -> AgentHandle {
self.clone()
}
#[allow(clippy::manual_async_fn)]
#[instrument(skip(self))]
fn stop(&self) -> impl Future<Output=anyhow::Result<()>> + Send + Sync + '_ {
async move {
let tracker = self.tracker();
let actor = self.create_envelope(None).clone();
trace!(actor = self.id.to_string(), "Sending Terminate to");
actor.reply(SystemSignal::Terminate)?;
trace!("Waiting for all actor tasks to complete.");
tracker.wait().await;
trace!(
actor = self.id.to_string(),
"The actor and its subordinates have been terminated."
);
Ok(())
}
}
}