acton_core/actor/managed_agent/
started.rs1use std::any::type_name_of_val;
18use std::fmt::Debug;
19use std::time::Duration;
20
21use futures::future::join_all;
22use tokio::time::sleep;
23use tracing::{instrument, trace};
24
25use crate::actor::ManagedAgent;
26use crate::common::{Envelope, OutboundEnvelope, ReactorItem, ReactorMap};
27use crate::message::{BrokerRequestEnvelope, MessageAddress, SystemSignal};
28use crate::traits::Actor;
29
30pub struct Started;
32
33impl<Agent: Default + Send + Debug + 'static> ManagedAgent<Started, Agent> {
34 pub fn new_envelope(&self) -> Option<OutboundEnvelope> {
39 Option::from(OutboundEnvelope::new(MessageAddress::new(
40 self.handle.outbox.clone(),
41 self.id.clone(),
42 )))
43 }
44
45 pub fn new_parent_envelope(&self) -> Option<OutboundEnvelope> {
50 self.parent.as_ref().map(|parent| parent.create_envelope(None).clone())
51 }
52
53 #[instrument(skip(reactors, self))]
54 pub(crate) async fn wake(&mut self, reactors: ReactorMap<Agent>) {
55 (self.after_start)(self).await;
56 let mut terminate_requested = false;
57 while let Some(incoming_envelope) = self.inbox.recv().await {
58 let type_id;
59 let mut envelope;
60 trace!("envelope sender is {}", incoming_envelope.reply_to.sender.root);
61 trace!("{}", type_name_of_val(&incoming_envelope.message));
62 if let Some(broker_request_envelope) = incoming_envelope
64 .message
65 .as_any()
66 .downcast_ref::<BrokerRequestEnvelope>()
67 {
68 envelope = Envelope::new(
69 broker_request_envelope.message.clone(),
70 incoming_envelope.reply_to.clone(),
71 incoming_envelope.recipient.clone(),
72 );
73 type_id = broker_request_envelope.message.as_any().type_id();
74 } else {
75 envelope = incoming_envelope;
76 type_id = envelope.message.as_any().type_id();
77 }
78
79 if let Some(reactor) = reactors.get(&type_id) {
80 match reactor.value() {
81 ReactorItem::FutureReactor(fut) => fut(self, &mut envelope).await,
82 }
83 } else if let Some(SystemSignal::Terminate) =
84 envelope.message.as_any().downcast_ref::<SystemSignal>()
85 {
86 terminate_requested = true;
88 trace!("Termination signal received, waiting for remaining messages...");
89 (self.before_stop)(self).await;
90 sleep(Duration::from_millis(10)).await;
92 self.inbox.close();
93 }
94 if terminate_requested && self.inbox.is_empty() && self.inbox.is_closed() {
95 self.inbox.close();
96 self.terminate().await;
97 break;
98 }
99 }
100
101 (self.after_stop)(self).await;
102 }
103 #[instrument(skip(self))]
104 async fn terminate(&mut self) {
105
106 let suspend_futures: Vec<_> = self.handle.children().iter().map(|item| {
108 let child_ref = item.value().clone(); async move {
110 let _ = child_ref.stop().await;
111 }
112 }).collect();
113
114 join_all(suspend_futures).await;
116
117 trace!(
118 actor = self.id.to_string(),
119 "All subordinates terminated. Closing mailbox for"
120 );
121
122 self.inbox.close();
123 }
124}