use std::any::type_name_of_val;
use std::fmt::Debug;
use std::time::Duration;
use futures::future::join_all;
use tokio::time::sleep;
use tracing::{instrument, trace};
use crate::actor::ManagedAgent;
use crate::common::{Envelope, OutboundEnvelope, ReactorItem, ReactorMap};
use crate::message::{BrokerRequestEnvelope, MessageAddress, SystemSignal};
use crate::traits::AgentHandleInterface;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct Started;
impl<Agent: Default + Send + Debug + 'static> ManagedAgent<Started, Agent> {
pub fn new_envelope(&self) -> Option<OutboundEnvelope> {
Some(OutboundEnvelope::new(MessageAddress::new(
self.handle.outbox.clone(),
self.id.clone(),
)))
}
pub fn new_parent_envelope(&self) -> Option<OutboundEnvelope> {
self.parent.as_ref().map(|parent_handle| {
OutboundEnvelope::new_with_recipient(
MessageAddress::new(self.handle.outbox.clone(), self.id.clone()), parent_handle.reply_address() )
})
}
#[instrument(skip(reactors, self))]
pub(crate) async fn wake(&mut self, reactors: ReactorMap<Agent>) {
(self.after_start)(self).await;
let mut terminate_requested = false;
while let Some(incoming_envelope) = self.inbox.recv().await {
let type_id;
let mut envelope;
trace!("Received envelope from: {}", incoming_envelope.reply_to.sender.root);
trace!("Message type: {}", type_name_of_val(&incoming_envelope.message));
if let Some(broker_request_envelope) = incoming_envelope
.message
.as_any()
.downcast_ref::<BrokerRequestEnvelope>()
{
trace!("Processing message via BrokerRequestEnvelope");
envelope = Envelope::new(
broker_request_envelope.message.clone(), incoming_envelope.reply_to.clone(),
incoming_envelope.recipient.clone(),
);
type_id = broker_request_envelope.message.as_any().type_id(); } else {
envelope = incoming_envelope;
type_id = envelope.message.as_any().type_id();
}
if let Some(reactor) = reactors.get(&type_id) {
match reactor.value() {
ReactorItem::FutureReactor(fut) => fut(self, &mut envelope).await,
}
} else if let Some(SystemSignal::Terminate) =
envelope.message.as_any().downcast_ref::<SystemSignal>()
{
trace!("Terminate signal received for agent: {}", self.id());
terminate_requested = true;
(self.before_stop)(self).await; sleep(Duration::from_millis(10)).await;
self.inbox.close(); trace!("Inbox closed for agent: {}", self.id());
} else {
trace!("No handler found for message type {:?} for agent {}", type_id, self.id());
}
if terminate_requested && self.inbox.is_empty() && self.inbox.is_closed() {
trace!("Inbox empty and closed after terminate request, initiating termination for agent: {}", self.id());
self.terminate().await; break; }
}
trace!("Message loop finished for agent: {}", self.id());
(self.after_stop)(self).await; trace!("Agent {} stopped.", self.id());
}
#[instrument(skip(self))]
async fn terminate(&mut self) {
trace!("Terminating children for agent: {}", self.id());
let stop_futures: Vec<_> = self.handle.children().iter().map(|item| {
let child_handle = item.value().clone();
async move {
trace!("Sending stop signal to child: {}", child_handle.id());
let _ = child_handle.stop().await; trace!("Stop signal sent to child: {}", child_handle.id());
}
}).collect();
join_all(stop_futures).await;
trace!("All children stopped for agent: {}. Closing own inbox.", self.id());
self.inbox.close();
}
}