acton_core/common/
agent_handle.rs1use std::fmt::Debug;
17use std::future::Future;
18use std::hash::{Hash, Hasher};
19
20use acton_ern::Ern;
21use async_trait::async_trait;
22use dashmap::DashMap;
23use tokio::sync::mpsc;
24use tokio_util::task::TaskTracker;
25use tracing::{error, instrument, trace, warn};
26
27use crate::actor::{Idle, ManagedAgent};
28use crate::common::{BrokerRef, OutboundEnvelope, Outbox, ParentRef};
29use crate::message::{BrokerRequest, MessageAddress, SystemSignal};
30use crate::prelude::ActonMessage;
31use crate::traits::{Actor, Broker, Subscriber};
32
33#[derive(Debug, Clone)]
35pub struct AgentHandle {
36 pub(crate) id: Ern,
38 pub(crate) outbox: Outbox,
40 tracker: TaskTracker,
42 pub parent: Option<Box<ParentRef>>,
44 pub broker: Box<Option<BrokerRef>>,
46 children: DashMap<String, AgentHandle>,
47}
48
49impl Default for AgentHandle {
50 fn default() -> Self {
51 let (outbox, _) = mpsc::channel(1);
52 AgentHandle {
53 id: Ern::default(),
54 outbox,
55 tracker: TaskTracker::new(),
56 parent: None,
57 broker: Box::new(None),
58 children: DashMap::new(),
59 }
60 }
61}
62
63impl Subscriber for AgentHandle {
64 fn get_broker(&self) -> Option<BrokerRef> {
65 *self.broker.clone()
66 }
67}
68
69impl PartialEq for AgentHandle {
70 fn eq(&self, other: &Self) -> bool {
71 self.id == other.id
72 }
73}
74
75impl Eq for AgentHandle {}
76
77impl Hash for AgentHandle {
78 fn hash<H: Hasher>(&self, state: &mut H) {
79 self.id.hash(state);
80 }
81}
82
83impl AgentHandle {
84 #[instrument(skip(self))]
117 pub async fn supervise<State: Default + Send + Debug>(
118 &self,
119 child: ManagedAgent<Idle, State>,
120 ) -> anyhow::Result<AgentHandle> {
121 trace!("Adding child actor with id: {}", child.id);
122 let handle = child.start().await;
123 let id = handle.id.clone();
124 trace!("Now have child id in context: {}", id);
125 self.children.insert(id.to_string(), handle.clone());
126
127 Ok(handle)
128 }
129}
130
131impl Broker for AgentHandle {
132 #[instrument(skip(self), name = "broadcast")]
133 fn broadcast(&self, message: impl ActonMessage) -> impl Future<Output = ()> + Send + Sync + '_ {
134 trace!("Looking for a broker to broadcast message.");
135 async move {
136 if let Some(broker) = self.broker.as_ref() {
137 broker.send(BrokerRequest::new(message)).await;
138 } else {
139 error!("No broker found to broadcast message.");
140 }
141 }
142 }
143}
144
145#[async_trait]
146impl Actor for AgentHandle {
147 fn reply_address(&self) -> MessageAddress {
149 MessageAddress::new(self.outbox.clone(), self.id.clone())
150 }
151
152 #[instrument(skip(self))]
154 fn create_envelope(&self, recipient_address: Option<MessageAddress>) -> OutboundEnvelope {
155 trace!("self id is {}", self.id);
156 let return_address = self.reply_address();
157 trace!("return_address is {}", return_address.sender.root);
158 if let Some(recipient) = recipient_address {
159 OutboundEnvelope::new_with_recipient(return_address, recipient)
160 } else {
161 OutboundEnvelope::new(return_address)
162 }
163 }
164
165 fn children(&self) -> DashMap<String, AgentHandle> {
166 self.children.clone()
167 }
168
169 #[instrument(skip(self))]
170 fn find_child(&self, arn: &Ern) -> Option<AgentHandle> {
171 trace!("Searching for child with ARN: {}", arn);
172 self.children
173 .get(&arn.to_string())
174 .map(|item| item.value().clone())
175 }
176
177 fn tracker(&self) -> TaskTracker {
179 self.tracker.clone()
180 }
181 fn id(&self) -> Ern {
182 self.id.clone()
183 }
184
185 fn name(&self) -> String {
186 self.id.root.to_string()
187 }
188
189 fn clone_ref(&self) -> AgentHandle {
190 self.clone()
191 }
192
193 #[allow(clippy::manual_async_fn)]
194 #[instrument(skip(self))]
195 fn stop(&self) -> impl Future<Output = anyhow::Result<()>> + Send + Sync + '_ {
197 async move {
198 let tracker = self.tracker();
199
200 let actor = self.create_envelope(None).clone();
201
202 trace!(actor = self.id.to_string(), "Sending Terminate to");
206 actor.reply(SystemSignal::Terminate)?;
207
208 trace!("Waiting for all actor tasks to complete.");
212 tracker.wait().await;
213
214 trace!(
218 actor = self.id.to_string(),
219 "The actor and its subordinates have been terminated."
220 );
221 Ok(())
222 }
223 }
224}