use std::any::type_name_of_val;
use std::fmt::Debug;
use std::time::{Duration, Instant};
use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt};
use tracing::{instrument, trace};
use crate::actor::ManagedAgent;
use crate::common::{Envelope, OutboundEnvelope, ReactorItem, ReactorMap};
use crate::common::config::CONFIG;
use crate::message::{BrokerRequestEnvelope, MessageAddress, SystemSignal};
use crate::traits::AgentHandleInterface;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct Started;
impl<Agent: Default + Send + Debug + 'static> ManagedAgent<Started, Agent> {
pub fn new_envelope(&self) -> Option<OutboundEnvelope> {
self.cancellation_token.clone().map(|cancellation_token| {
OutboundEnvelope::new(
MessageAddress::new(self.handle.outbox.clone(), self.id.clone()),
cancellation_token,
)
})
}
pub fn new_parent_envelope(&self) -> Option<OutboundEnvelope> {
let cancellation_token = self.cancellation_token.clone()?;
self.parent.as_ref().map(|parent_handle| {
OutboundEnvelope::new_with_recipient(
MessageAddress::new(self.handle.outbox.clone(), self.id.clone()), parent_handle.reply_address(), cancellation_token,
)
})
}
#[instrument(skip(mutable_reactors, read_only_reactors, self))]
pub(crate) async fn wake(
&mut self,
mutable_reactors: ReactorMap<Agent>,
read_only_reactors: ReactorMap<Agent>,
) {
(self.after_start)(self).await;
assert!(
self.cancellation_token.is_some(),
"ManagedAgent in Started state must always have a cancellation_token"
);
let cancel_token = self.cancellation_token.as_ref().cloned().unwrap();
let mut cancel = Box::pin(cancel_token.cancelled());
let mut _terminate_signal_received = false;
use crate::common::config::CONFIG;
let mut read_only_futures = FuturesUnordered::new();
let high_water_mark = CONFIG.limits.concurrent_handlers_high_water_mark;
let max_wait_duration = Duration::from_millis(CONFIG.timeouts.read_only_handler_flush_ms);
let mut last_flush_time = Instant::now();
loop {
tokio::select! {
_ = &mut cancel => {
trace!("Forceful cancellation triggered for agent: {}", self.id());
if !read_only_futures.is_empty() {
trace!("Flushing {} remaining read-only futures on cancellation", read_only_futures.len());
while let Some(_) = read_only_futures.next().await {}
}
break; }
_ = tokio::time::sleep_until((last_flush_time + max_wait_duration).into()), if !read_only_futures.is_empty() => {
trace!("Flushing {} read-only futures due to time limit", read_only_futures.len());
while let Some(_) = read_only_futures.next().await {}
last_flush_time = Instant::now();
}
incoming_opt = self.inbox.recv() => {
let Some(incoming_envelope) = incoming_opt else { break; };
let type_id;
let mut envelope;
trace!(
"Received envelope from: {}",
incoming_envelope.reply_to.sender.root
);
trace!(
"Message type: {}",
type_name_of_val(&incoming_envelope.message)
);
if let Some(broker_request_envelope) = incoming_envelope
.message
.as_any()
.downcast_ref::<BrokerRequestEnvelope>()
{
trace!("Processing message via BrokerRequestEnvelope");
envelope = Envelope::new(
broker_request_envelope.message.clone(), incoming_envelope.reply_to.clone(),
incoming_envelope.recipient.clone(),
);
type_id = broker_request_envelope.message.as_any().type_id(); } else {
envelope = incoming_envelope;
type_id = envelope.message.as_any().type_id();
}
if let Some(reactor) = mutable_reactors.get(&type_id) {
if !read_only_futures.is_empty() {
trace!("Flushing {} read-only futures before mutable handler", read_only_futures.len());
while let Some(_) = read_only_futures.next().await {}
last_flush_time = Instant::now();
}
match reactor.value() {
ReactorItem::FutureReactor(fut) => {
fut(self, &mut envelope).await;
}
ReactorItem::FutureReactorResult(fut) => {
let result = fut(self, &mut envelope).await;
if let Err((err, error_type_id)) = result {
let message_type_id = envelope.message.as_any().type_id();
if let Some(handler) =
self.error_handler_map.remove(&(message_type_id, error_type_id))
{
let fut = handler(self, &mut envelope, err.as_ref());
fut.await;
self.error_handler_map.insert((message_type_id, error_type_id), handler);
} else {
tracing::error!(
"Unhandled error from message handler in agent {}: {:?}",
self.id(),
err
);
}
}
}
ReactorItem::FutureReactorReadOnly(_) => {
tracing::warn!("Found read-only handler in mutable_reactors map");
}
ReactorItem::FutureReactorReadOnlyResult(_) => {
tracing::warn!("Found read-only Result handler in mutable_reactors map");
}
}
} else if let Some(reactor) = read_only_reactors.get(&type_id) {
match reactor.value() {
ReactorItem::FutureReactorReadOnly(fut) => {
let fut = fut(self, &mut envelope);
read_only_futures.push(tokio::spawn(async move {
fut.await;
}));
if read_only_futures.len() >= high_water_mark {
trace!("Flushing {} read-only futures due to high water mark", read_only_futures.len());
while let Some(_) = read_only_futures.next().await {}
last_flush_time = Instant::now();
}
}
ReactorItem::FutureReactorReadOnlyResult(fut) => {
let fut = fut(self, &mut envelope);
read_only_futures.push(tokio::spawn(async move {
if let Err((err, _error_type_id)) = fut.await {
tracing::error!(
"Unhandled error from read-only message handler: {:?}",
err
);
}
}));
if read_only_futures.len() >= high_water_mark {
trace!("Flushing {} read-only Result futures due to high water mark", read_only_futures.len());
while let Some(_) = read_only_futures.next().await {}
last_flush_time = Instant::now();
}
}
_ => {
tracing::warn!("Found mutable handler in read_only_reactors map");
}
}
} else if let Some(SystemSignal::Terminate) =
envelope.message.as_any().downcast_ref::<SystemSignal>()
{
if !read_only_futures.is_empty() {
trace!("Flushing {} read-only futures before terminate", read_only_futures.len());
while let Some(_) = read_only_futures.next().await {}
}
trace!("Terminate signal received for agent: {}. Closing inbox.", self.id());
_terminate_signal_received = true; (self.before_stop)(self).await; self.inbox.close(); } else {
trace!(
"No handler found for message type {:?} for agent {}",
type_id,
self.id()
);
}
}
}
}
trace!(
"Message loop finished for agent: {}. Initiating final termination.",
self.id()
);
if !read_only_futures.is_empty() {
trace!(
"Flushing {} remaining read-only futures before final termination",
read_only_futures.len()
);
while let Some(_) = read_only_futures.next().await {}
}
self.terminate().await; (self.after_stop)(self).await;
trace!("Agent {} stopped.", self.id());
}
#[instrument(skip(self))]
async fn terminate(&mut self) {
trace!("Terminating children for agent: {}", self.id());
use std::time::Duration;
use tokio::time::timeout as tokio_timeout;
let timeout_ms = CONFIG.timeouts.agent_shutdown_timeout_ms as u64;
let stop_futures: Vec<_> = self
.handle
.children()
.iter()
.map(|item| {
let child_handle = item.value().clone();
async move {
trace!("Sending stop signal to child: {}", child_handle.id());
let stop_res =
tokio_timeout(Duration::from_millis(timeout_ms), child_handle.stop()).await;
match stop_res {
Ok(Ok(())) => {
trace!(
"Stop signal sent to and child {} shut down successfully.",
child_handle.id()
);
}
Ok(Err(e)) => {
tracing::error!(
"Stop signal to child {} returned error: {:?}",
child_handle.id(),
e
);
}
Err(_) => {
tracing::error!(
"Shutdown timeout for child {} after {} ms",
child_handle.id(),
timeout_ms
);
}
}
}
})
.collect();
join_all(stop_futures).await;
trace!("All children stopped for agent: {}.", self.id());
}
}