use std::any::type_name_of_val;
use std::fmt::Debug;
use std::time::Duration;
use futures::future::join_all;
use tokio::time::sleep;
use tracing::{instrument, trace};
use crate::actor::ManagedAgent;
use crate::common::{Envelope, OutboundEnvelope, ReactorItem, ReactorMap};
use crate::message::{BrokerRequestEnvelope, MessageAddress, SystemSignal};
use crate::traits::AgentHandleInterface;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct Started;
impl<Agent: Default + Send + Debug + 'static> ManagedAgent<Started, Agent> {
pub fn new_envelope(&self) -> Option<OutboundEnvelope> {
Some(OutboundEnvelope::new(MessageAddress::new(
self.handle.outbox.clone(),
self.id.clone(),
)))
}
pub fn new_parent_envelope(&self) -> Option<OutboundEnvelope> {
self.parent.as_ref().map(|parent_handle| {
OutboundEnvelope::new_with_recipient(
MessageAddress::new(self.handle.outbox.clone(), self.id.clone()), parent_handle.reply_address(), )
})
}
#[instrument(skip(reactors, self))]
pub(crate) async fn wake(&mut self, reactors: ReactorMap<Agent>) {
(self.after_start)(self).await;
let mut terminate_requested = false;
while let Some(incoming_envelope) = self.inbox.recv().await {
let type_id;
let mut envelope;
trace!(
"Received envelope from: {}",
incoming_envelope.reply_to.sender.root
);
trace!(
"Message type: {}",
type_name_of_val(&incoming_envelope.message)
);
if let Some(broker_request_envelope) = incoming_envelope
.message
.as_any()
.downcast_ref::<BrokerRequestEnvelope>()
{
trace!("Processing message via BrokerRequestEnvelope");
envelope = Envelope::new(
broker_request_envelope.message.clone(), incoming_envelope.reply_to.clone(),
incoming_envelope.recipient.clone(),
);
type_id = broker_request_envelope.message.as_any().type_id(); } else {
envelope = incoming_envelope;
type_id = envelope.message.as_any().type_id();
}
if let Some(reactor) = reactors.get(&type_id) {
match reactor.value() {
ReactorItem::FutureReactor(fut) => {
fut(self, &mut envelope).await;
}
ReactorItem::FutureReactorResult(fut) => {
let result = fut(self, &mut envelope).await;
if let Err(err) = result {
let mut handled = false;
let handler_arcs: Vec<_> =
self.error_handler_map.values().cloned().collect();
for handler_arc in handler_arcs {
let fut = handler_arc(self, &mut envelope, err.as_ref());
fut.await;
handled = true; }
if !handled {
tracing::error!(
"Unhandled error from message handler in agent {}: {:?}",
self.id(),
err
);
}
}
}
}
} else if let Some(SystemSignal::Terminate) =
envelope.message.as_any().downcast_ref::<SystemSignal>()
{
trace!("Terminate signal received for agent: {}", self.id());
terminate_requested = true;
(self.before_stop)(self).await; sleep(Duration::from_millis(10)).await;
self.inbox.close(); trace!("Inbox closed for agent: {}", self.id());
} else {
trace!(
"No handler found for message type {:?} for agent {}",
type_id,
self.id()
);
}
if terminate_requested && self.inbox.is_empty() && self.inbox.is_closed() {
trace!("Inbox empty and closed after terminate request, initiating termination for agent: {}", self.id());
self.terminate().await; break; }
}
trace!("Message loop finished for agent: {}", self.id());
(self.after_stop)(self).await; trace!("Agent {} stopped.", self.id());
}
#[instrument(skip(self))]
async fn terminate(&mut self) {
trace!("Terminating children for agent: {}", self.id());
use std::env;
use std::time::Duration;
use tokio::time::timeout as tokio_timeout;
let timeout_ms: u64 = env::var("ACTON_AGENT_SHUTDOWN_TIMEOUT_MS")
.ok()
.and_then(|val| val.parse().ok())
.unwrap_or(10_000);
let stop_futures: Vec<_> = self
.handle
.children()
.iter()
.map(|item| {
let child_handle = item.value().clone();
async move {
trace!("Sending stop signal to child: {}", child_handle.id());
let stop_res =
tokio_timeout(Duration::from_millis(timeout_ms), child_handle.stop()).await;
match stop_res {
Ok(Ok(())) => {
trace!(
"Stop signal sent to and child {} shut down successfully.",
child_handle.id()
);
}
Ok(Err(e)) => {
tracing::error!(
"Stop signal to child {} returned error: {:?}",
child_handle.id(),
e
);
}
Err(_) => {
tracing::error!(
"Shutdown timeout for child {} after {} ms",
child_handle.id(),
timeout_ms
);
}
}
}
})
.collect();
join_all(stop_futures).await;
trace!(
"All children stopped for agent: {}. Closing own inbox.",
self.id()
);
self.inbox.close();
}
}