acton_core/actor/managed_agent/
started.rsuse std::any::type_name_of_val;
use std::fmt::Debug;
use std::time::Duration;
use futures::future::join_all;
use tokio::time::sleep;
use tracing::{instrument, trace};
use crate::actor::ManagedAgent;
use crate::common::{Envelope, OutboundEnvelope, ReactorItem, ReactorMap};
use crate::message::{BrokerRequestEnvelope, MessageAddress, SystemSignal};
use crate::traits::Actor;
pub struct Started;
impl<Agent: Default + Send + Debug + 'static> ManagedAgent<Started, Agent> {
pub fn new_envelope(&self) -> Option<OutboundEnvelope> {
Option::from(OutboundEnvelope::new(MessageAddress::new(
self.handle.outbox.clone(),
self.id.clone(),
)))
}
pub fn new_parent_envelope(&self) -> Option<OutboundEnvelope> {
self.parent.as_ref().map(|parent| parent.create_envelope(None).clone())
}
#[instrument(skip(reactors, self))]
pub(crate) async fn wake(&mut self, reactors: ReactorMap<Agent>) {
(self.after_start)(self).await;
let mut terminate_requested = false;
while let Some(incoming_envelope) = self.inbox.recv().await {
let type_id;
let mut envelope;
trace!("envelope sender is {}", incoming_envelope.reply_to.sender.root);
trace!("{}", type_name_of_val(&incoming_envelope.message));
if let Some(broker_request_envelope) = incoming_envelope
.message
.as_any()
.downcast_ref::<BrokerRequestEnvelope>()
{
envelope = Envelope::new(
broker_request_envelope.message.clone(),
incoming_envelope.reply_to.clone(),
incoming_envelope.recipient.clone(),
);
type_id = broker_request_envelope.message.as_any().type_id();
} else {
envelope = incoming_envelope;
type_id = envelope.message.as_any().type_id();
}
if let Some(reactor) = reactors.get(&type_id) {
match reactor.value() {
ReactorItem::FutureReactor(fut) => fut(self, &mut envelope).await,
}
} else if let Some(SystemSignal::Terminate) =
envelope.message.as_any().downcast_ref::<SystemSignal>()
{
terminate_requested = true;
trace!("Termination signal received, waiting for remaining messages...");
(self.before_stop)(self).await;
sleep(Duration::from_millis(10)).await;
self.inbox.close();
}
if terminate_requested && self.inbox.is_empty() && self.inbox.is_closed() {
self.inbox.close();
self.terminate().await;
break;
}
}
(self.after_stop)(self).await;
}
#[instrument(skip(self))]
async fn terminate(&mut self) {
let suspend_futures: Vec<_> = self.handle.children().iter().map(|item| {
let child_ref = item.value().clone(); async move {
let _ = child_ref.stop().await;
}
}).collect();
join_all(suspend_futures).await;
trace!(
actor = self.id.to_string(),
"All subordinates terminated. Closing mailbox for"
);
self.inbox.close();
}
}