acton_core/common/
agent_handle.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16use std::fmt::Debug;
17use std::future::Future;
18use std::hash::{Hash, Hasher};
19
20use acton_ern::Ern;
21use async_trait::async_trait;
22use dashmap::DashMap;
23use tokio::sync::mpsc;
24use tokio_util::task::TaskTracker;
25use tracing::{error, instrument, trace, warn};
26
27use crate::actor::{Idle, ManagedAgent};
28use crate::common::{BrokerRef, OutboundEnvelope, Outbox, ParentRef};
29use crate::message::{BrokerRequest, MessageAddress, SystemSignal};
30use crate::prelude::ActonMessage;
31use crate::traits::{Actor, Broker, Subscriber};
32
33/// Represents the context in which an actor operates.
34#[derive(Debug, Clone)]
35pub struct AgentHandle {
36    /// The unique identifier (ARN) for the context.
37    pub(crate) id: Ern,
38    /// The outbound channel for sending messages.
39    pub(crate) outbox: Outbox,
40    /// The task tracker for the actor.
41    tracker: TaskTracker,
42    /// The actor's optional parent context.
43    pub parent: Option<Box<ParentRef>>,
44    /// The system broker for the actor.
45    pub broker: Box<Option<BrokerRef>>,
46    children: DashMap<String, AgentHandle>,
47}
48
49impl Default for AgentHandle {
50    fn default() -> Self {
51        let (outbox, _) = mpsc::channel(1);
52        AgentHandle {
53            id: Ern::default(),
54            outbox,
55            tracker: TaskTracker::new(),
56            parent: None,
57            broker: Box::new(None),
58            children: DashMap::new(),
59        }
60    }
61}
62
63impl Subscriber for AgentHandle {
64    fn get_broker(&self) -> Option<BrokerRef> {
65        *self.broker.clone()
66    }
67}
68
69impl PartialEq for AgentHandle {
70    fn eq(&self, other: &Self) -> bool {
71        self.id == other.id
72    }
73}
74
75impl Eq for AgentHandle {}
76
77impl Hash for AgentHandle {
78    fn hash<H: Hasher>(&self, state: &mut H) {
79        self.id.hash(state);
80    }
81}
82
83impl AgentHandle {
84    /// Supervises a child actor by activating it and tracking its context.
85    ///
86    /// This asynchronous method adds a child actor to the supervision hierarchy managed by this
87    /// `ActorRef`. It performs the following steps:
88    ///
89    /// 1. Logs the addition of the child actor with its unique identifier (`ern`).
90    /// 2. Activates the child actor by calling its `activate` method asynchronously.
91    /// 3. Retrieves the `ern` (unique identifier) from the child’s context.
92    /// 4. Inserts the child's context into the `children` map of the supervising actor,
93    ///    using the `ern` as the key.
94    ///
95    /// # Type Parameters
96    ///
97    /// - `State`: Represents the state type associated with the child actor. It must implement
98    ///   the [`Default`], [`Send`], and [`Debug`] traits.
99    ///
100    /// # Parameters
101    ///
102    /// - `child`: A [`ManagedAgent`] instance representing the child actor to be supervised.
103    ///
104    /// # Returns
105    ///
106    /// A [`Result`] which is:
107    /// - `Ok(())` if the child actor was successfully supervised and added to the supervision
108    ///   hierarchy.
109    /// - An error of type [`anyhow::Error`] if any step of the supervision process fails.
110    ///
111    /// # Errors
112    ///
113    /// This method will return an error if:
114    /// - The child actor fails to activate.
115    /// - Inserting the child context into the `children` map fails.
116    #[instrument(skip(self))]
117    pub async fn supervise<State: Default + Send + Debug>(
118        &self,
119        child: ManagedAgent<Idle, State>,
120    ) -> anyhow::Result<AgentHandle> {
121        trace!("Adding child actor with id: {}", child.id);
122        let handle = child.start().await;
123        let id = handle.id.clone();
124        trace!("Now have child id in context: {}", id);
125        self.children.insert(id.to_string(), handle.clone());
126
127        Ok(handle)
128    }
129}
130
131impl Broker for AgentHandle {
132    #[instrument(skip(self), name = "broadcast")]
133    fn broadcast(&self, message: impl ActonMessage) -> impl Future<Output = ()> + Send + Sync + '_ {
134        trace!("Looking for a broker to broadcast message.");
135        async move {
136            if let Some(broker) = self.broker.as_ref() {
137                broker.send(BrokerRequest::new(message)).await;
138            } else {
139                error!("No broker found to broadcast message.");
140            }
141        }
142    }
143}
144
145#[async_trait]
146impl Actor for AgentHandle {
147    /// Returns the message address for this agent.
148    fn reply_address(&self) -> MessageAddress {
149        MessageAddress::new(self.outbox.clone(), self.id.clone())
150    }
151
152    /// Returns an envelope for the specified recipient and message, ready to send.
153    #[instrument(skip(self))]
154    fn create_envelope(&self, recipient_address: Option<MessageAddress>) -> OutboundEnvelope {
155        trace!("self id is {}", self.id);
156        let return_address = self.reply_address();
157        trace!("return_address is {}", return_address.sender.root);
158        if let Some(recipient) = recipient_address {
159            OutboundEnvelope::new_with_recipient(return_address, recipient)
160        } else {
161            OutboundEnvelope::new(return_address)
162        }
163    }
164
165    fn children(&self) -> DashMap<String, AgentHandle> {
166        self.children.clone()
167    }
168
169    #[instrument(skip(self))]
170    fn find_child(&self, arn: &Ern) -> Option<AgentHandle> {
171        trace!("Searching for child with ARN: {}", arn);
172        self.children
173            .get(&arn.to_string())
174            .map(|item| item.value().clone())
175    }
176
177    /// Returns the task tracker for the actor.
178    fn tracker(&self) -> TaskTracker {
179        self.tracker.clone()
180    }
181    fn id(&self) -> Ern {
182        self.id.clone()
183    }
184
185    fn name(&self) -> String {
186        self.id.root.to_string()
187    }
188
189    fn clone_ref(&self) -> AgentHandle {
190        self.clone()
191    }
192
193    #[allow(clippy::manual_async_fn)]
194    #[instrument(skip(self))]
195    /// Suspends the actor.
196    fn stop(&self) -> impl Future<Output = anyhow::Result<()>> + Send + Sync + '_ {
197        async move {
198            let tracker = self.tracker();
199
200            let actor = self.create_envelope(None).clone();
201
202            // Event: Sending Terminate Signal
203            // Description: Sending a terminate signal to the actor.
204            // Context: Target actor key.
205            trace!(actor = self.id.to_string(), "Sending Terminate to");
206            actor.reply(SystemSignal::Terminate)?;
207
208            // Event: Waiting for Actor Tasks
209            // Description: Waiting for all actor tasks to complete.
210            // Context: None
211            trace!("Waiting for all actor tasks to complete.");
212            tracker.wait().await;
213
214            // Event: Actor Terminated
215            // Description: The actor and its subordinates have been terminated.
216            // Context: None
217            trace!(
218                actor = self.id.to_string(),
219                "The actor and its subordinates have been terminated."
220            );
221            Ok(())
222        }
223    }
224}