acton_core/common/
agent_handle.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
/*
 * Copyright (c) 2024. Govcraft
 *
 * Licensed under either of
 *   * Apache License, Version 2.0 (the "License");
 *     you may not use this file except in compliance with the License.
 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
 *   * MIT license: http://opensource.org/licenses/MIT
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the applicable License for the specific language governing permissions and
 * limitations under that License.
 */
use std::fmt::Debug;
use std::future::Future;
use std::hash::{Hash, Hasher};

use acton_ern::Ern;
use async_trait::async_trait;
use dashmap::DashMap;
use tokio::sync::mpsc;
use tokio_util::task::TaskTracker;
use tracing::{error, instrument, trace, warn};

use crate::actor::{Idle, ManagedAgent};
use crate::common::{BrokerRef, OutboundEnvelope, Outbox, ParentRef};
use crate::message::{BrokerRequest, MessageAddress, SystemSignal};
use crate::prelude::ActonMessage;
use crate::traits::{Actor, Broker, Subscriber};

/// Represents the context in which an actor operates.
#[derive(Debug, Clone)]
pub struct AgentHandle {
    /// The unique identifier (ARN) for the context.
    pub(crate) id: Ern,
    /// The outbound channel for sending messages.
    pub(crate) outbox: Outbox,
    /// The task tracker for the actor.
    tracker: TaskTracker,
    /// The actor's optional parent context.
    pub parent: Option<Box<ParentRef>>,
    /// The system broker for the actor.
    pub broker: Box<Option<BrokerRef>>,
    children: DashMap<String, AgentHandle>,
}

impl Default for AgentHandle {
    fn default() -> Self {
        let (outbox, _) = mpsc::channel(1);
        AgentHandle {
            id: Ern::default(),
            outbox,
            tracker: TaskTracker::new(),
            parent: None,
            broker: Box::new(None),
            children: DashMap::new(),
        }
    }
}

impl Subscriber for AgentHandle {
    fn get_broker(&self) -> Option<BrokerRef> {
        *self.broker.clone()
    }
}

impl PartialEq for AgentHandle {
    fn eq(&self, other: &Self) -> bool {
        self.id == other.id
    }
}

impl Eq for AgentHandle {}

impl Hash for AgentHandle {
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.id.hash(state);
    }
}

impl AgentHandle {
    /// Supervises a child actor by activating it and tracking its context.
    ///
    /// This asynchronous method adds a child actor to the supervision hierarchy managed by this
    /// `ActorRef`. It performs the following steps:
    ///
    /// 1. Logs the addition of the child actor with its unique identifier (`ern`).
    /// 2. Activates the child actor by calling its `activate` method asynchronously.
    /// 3. Retrieves the `ern` (unique identifier) from the child’s context.
    /// 4. Inserts the child's context into the `children` map of the supervising actor,
    ///    using the `ern` as the key.
    ///
    /// # Type Parameters
    ///
    /// - `State`: Represents the state type associated with the child actor. It must implement
    ///   the [`Default`], [`Send`], and [`Debug`] traits.
    ///
    /// # Parameters
    ///
    /// - `child`: A [`ManagedAgent`] instance representing the child actor to be supervised.
    ///
    /// # Returns
    ///
    /// A [`Result`] which is:
    /// - `Ok(())` if the child actor was successfully supervised and added to the supervision
    ///   hierarchy.
    /// - An error of type [`anyhow::Error`] if any step of the supervision process fails.
    ///
    /// # Errors
    ///
    /// This method will return an error if:
    /// - The child actor fails to activate.
    /// - Inserting the child context into the `children` map fails.
    #[instrument(skip(self))]
    pub async fn supervise<State: Default + Send + Debug>(
        &self,
        child: ManagedAgent<Idle, State>,
    ) -> anyhow::Result<AgentHandle> {
        trace!("Adding child actor with id: {}", child.id);
        let handle = child.start().await;
        let id = handle.id.clone();
        trace!("Now have child id in context: {}", id);
        self.children.insert(id.to_string(), handle.clone());

        Ok(handle)
    }
}

impl Broker for AgentHandle {
    #[instrument(skip(self), name = "broadcast")]
    fn broadcast(&self, message: impl ActonMessage) -> impl Future<Output = ()> + Send + Sync + '_ {
        trace!("Looking for a broker to broadcast message.");
        async move {
            if let Some(broker) = self.broker.as_ref() {
                broker.send(BrokerRequest::new(message)).await;
            } else {
                error!("No broker found to broadcast message.");
            }
        }
    }
}

#[async_trait]
impl Actor for AgentHandle {
    /// Returns the message address for this agent.
    fn reply_address(&self) -> MessageAddress {
        MessageAddress::new(self.outbox.clone(), self.id.clone())
    }

    /// Returns an envelope for the specified recipient and message, ready to send.
    #[instrument(skip(self))]
    fn create_envelope(&self, recipient_address: Option<MessageAddress>) -> OutboundEnvelope {
        trace!("self id is {}", self.id);
        let return_address = self.reply_address();
        trace!("return_address is {}", return_address.sender.root);
        if let Some(recipient) = recipient_address {
            OutboundEnvelope::new_with_recipient(return_address, recipient)
        } else {
            OutboundEnvelope::new(return_address)
        }
    }

    fn children(&self) -> DashMap<String, AgentHandle> {
        self.children.clone()
    }

    #[instrument(skip(self))]
    fn find_child(&self, arn: &Ern) -> Option<AgentHandle> {
        trace!("Searching for child with ARN: {}", arn);
        self.children
            .get(&arn.to_string())
            .map(|item| item.value().clone())
    }

    /// Returns the task tracker for the actor.
    fn tracker(&self) -> TaskTracker {
        self.tracker.clone()
    }
    fn id(&self) -> Ern {
        self.id.clone()
    }

    fn name(&self) -> String {
        self.id.root.to_string()
    }

    fn clone_ref(&self) -> AgentHandle {
        self.clone()
    }

    #[allow(clippy::manual_async_fn)]
    #[instrument(skip(self))]
    /// Suspends the actor.
    fn stop(&self) -> impl Future<Output = anyhow::Result<()>> + Send + Sync + '_ {
        async move {
            let tracker = self.tracker();

            let actor = self.create_envelope(None).clone();

            // Event: Sending Terminate Signal
            // Description: Sending a terminate signal to the actor.
            // Context: Target actor key.
            trace!(actor = self.id.to_string(), "Sending Terminate to");
            actor.reply(SystemSignal::Terminate)?;

            // Event: Waiting for Actor Tasks
            // Description: Waiting for all actor tasks to complete.
            // Context: None
            trace!("Waiting for all actor tasks to complete.");
            tracker.wait().await;

            // Event: Actor Terminated
            // Description: The actor and its subordinates have been terminated.
            // Context: None
            trace!(
                actor = self.id.to_string(),
                "The actor and its subordinates have been terminated."
            );
            Ok(())
        }
    }
}