use std::any::type_name_of_val;
use std::fmt::Debug;
use futures::future::join_all;
use tracing::{instrument, trace};
use crate::actor::ManagedAgent;
use crate::common::{Envelope, OutboundEnvelope, ReactorItem, ReactorMap};
use crate::message::{BrokerRequestEnvelope, MessageAddress, SystemSignal};
use crate::traits::AgentHandleInterface;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct Started;
impl<Agent: Default + Send + Debug + 'static> ManagedAgent<Started, Agent> {
pub fn new_envelope(&self) -> Option<OutboundEnvelope> {
self.cancellation_token.clone().map(|cancellation_token| {
OutboundEnvelope::new(
MessageAddress::new(self.handle.outbox.clone(), self.id.clone()),
cancellation_token,
)
})
}
pub fn new_parent_envelope(&self) -> Option<OutboundEnvelope> {
let cancellation_token = self.cancellation_token.clone()?;
self.parent.as_ref().map(|parent_handle| {
OutboundEnvelope::new_with_recipient(
MessageAddress::new(self.handle.outbox.clone(), self.id.clone()), parent_handle.reply_address(), cancellation_token,
)
})
}
#[instrument(skip(reactors, self))]
pub(crate) async fn wake(&mut self, reactors: ReactorMap<Agent>) {
(self.after_start)(self).await;
assert!(
self.cancellation_token.is_some(),
"ManagedAgent in Started state must always have a cancellation_token"
);
let cancel_token = self.cancellation_token.as_ref().cloned().unwrap();
let mut cancel = Box::pin(cancel_token.cancelled());
let mut _terminate_signal_received = false;
loop {
tokio::select! {
_ = &mut cancel => {
trace!("Forceful cancellation triggered for agent: {}", self.id());
break; }
incoming_opt = self.inbox.recv() => {
let Some(incoming_envelope) = incoming_opt else { break; };
let type_id;
let mut envelope;
trace!(
"Received envelope from: {}",
incoming_envelope.reply_to.sender.root
);
trace!(
"Message type: {}",
type_name_of_val(&incoming_envelope.message)
);
if let Some(broker_request_envelope) = incoming_envelope
.message
.as_any()
.downcast_ref::<BrokerRequestEnvelope>()
{
trace!("Processing message via BrokerRequestEnvelope");
envelope = Envelope::new(
broker_request_envelope.message.clone(), incoming_envelope.reply_to.clone(),
incoming_envelope.recipient.clone(),
);
type_id = broker_request_envelope.message.as_any().type_id(); } else {
envelope = incoming_envelope;
type_id = envelope.message.as_any().type_id();
}
if let Some(reactor) = reactors.get(&type_id) {
match reactor.value() {
ReactorItem::FutureReactor(fut) => {
fut(self, &mut envelope).await;
}
ReactorItem::FutureReactorResult(fut) => {
let result = fut(self, &mut envelope).await;
if let Err((err, error_type_id)) = result {
let message_type_id = envelope.message.as_any().type_id();
if let Some(handler) =
self.error_handler_map.remove(&(message_type_id, error_type_id))
{
let fut = handler(self, &mut envelope, err.as_ref());
fut.await;
self.error_handler_map.insert((message_type_id, error_type_id), handler);
} else {
tracing::error!(
"Unhandled error from message handler in agent {}: {:?}",
self.id(),
err
);
}
}
}
}
} else if let Some(SystemSignal::Terminate) =
envelope.message.as_any().downcast_ref::<SystemSignal>()
{
trace!("Terminate signal received for agent: {}. Closing inbox.", self.id());
_terminate_signal_received = true; (self.before_stop)(self).await; self.inbox.close(); } else {
trace!(
"No handler found for message type {:?} for agent {}",
type_id,
self.id()
);
}
}
}
}
trace!(
"Message loop finished for agent: {}. Initiating final termination.",
self.id()
);
self.terminate().await; (self.after_stop)(self).await;
trace!("Agent {} stopped.", self.id());
}
#[instrument(skip(self))]
async fn terminate(&mut self) {
trace!("Terminating children for agent: {}", self.id());
use std::env;
use std::time::Duration;
use tokio::time::timeout as tokio_timeout;
let timeout_ms: u64 = env::var("ACTON_AGENT_SHUTDOWN_TIMEOUT_MS")
.ok()
.and_then(|val| val.parse().ok())
.unwrap_or(10_000);
let stop_futures: Vec<_> = self
.handle
.children()
.iter()
.map(|item| {
let child_handle = item.value().clone();
async move {
trace!("Sending stop signal to child: {}", child_handle.id());
let stop_res =
tokio_timeout(Duration::from_millis(timeout_ms), child_handle.stop()).await;
match stop_res {
Ok(Ok(())) => {
trace!(
"Stop signal sent to and child {} shut down successfully.",
child_handle.id()
);
}
Ok(Err(e)) => {
tracing::error!(
"Stop signal to child {} returned error: {:?}",
child_handle.id(),
e
);
}
Err(_) => {
tracing::error!(
"Shutdown timeout for child {} after {} ms",
child_handle.id(),
timeout_ms
);
}
}
}
})
.collect();
join_all(stop_futures).await;
trace!("All children stopped for agent: {}.", self.id());
}
}