acton_core/common/agent_runtime.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20
21use acton_ern::Ern;
22use futures::future::join_all;
23use tokio::sync::oneshot;
24use tracing::trace;
25
26use crate::actor::{AgentConfig, Idle, ManagedAgent};
27use crate::common::{ActonApp, AgentBroker, AgentHandle, BrokerRef};
28use crate::common::acton_inner::ActonInner;
29use crate::traits::Actor;
30
31/// Represents a ready state of the Acton system.
32///
33/// This struct encapsulates the internal state of the Acton system when it's ready for use.
34/// It provides methods for creating and managing actors within the system.
35#[derive(Debug, Clone, Default)]
36pub struct AgentRuntime(pub(crate) ActonInner);
37
38impl AgentRuntime {
39 /// Creates a new actor with the provided id root name.
40 ///
41 /// # Type Parameters
42 ///
43 /// * `State` - The state type of the actor, which must implement `Default`, `Send`, `Debug`, and have a static lifetime.
44 ///
45 /// # Returns
46 ///
47 /// A `ManagedActor` in the `Idle` state with the specified `State`.
48 pub async fn new_agent_with_name<State>(&mut self, name: String) -> ManagedAgent<Idle, State>
49 where
50 State: Default + Send + Debug + 'static,
51 {
52 let actor_config = AgentConfig::new(
53 Ern::with_root(name).unwrap(),
54 None,
55 Some(self.0.broker.clone()),
56 ).expect("Failed to create actor config");
57
58 // let broker = self.0.broker.clone();
59 let runtime = self.clone();
60 let new_actor = ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
61 self.0.roots.insert(new_actor.id.clone(), new_actor.handle.clone());
62 new_actor
63 }
64
65 /// Creates a new actor with default configuration.
66 ///
67 /// # Type Parameters
68 ///
69 /// * `State` - The state type of the actor, which must implement `Default`, `Send`, `Debug`, and have a static lifetime.
70 ///
71 /// # Returns
72 ///
73 /// A `ManagedActor` in the `Idle` state with the specified `State`.
74 pub async fn new_agent<State>(&mut self) -> ManagedAgent<Idle, State>
75 where
76 State: Default + Send + Debug + 'static,
77 {
78 let actor_config = AgentConfig::new(
79 Ern::with_root("agent").unwrap(),
80 None,
81 Some(self.0.broker.clone()),
82 ).expect("Failed to create actor config");
83
84 // let broker = self.0.broker.clone();
85 let runtime = self.clone();
86 let new_actor = ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
87 self.0.roots.insert(new_actor.id.clone(), new_actor.handle.clone());
88 new_actor
89 }
90
91 /// Retrieves the number of actors currently running in the system.
92 pub fn agent_count(&self) -> usize {
93 self.0.roots.len()
94 }
95
96 /// Creates a new actor with a specified configuration.
97 ///
98 /// # Type Parameters
99 ///
100 /// * `State` - The state type of the actor, which must implement `Default`, `Send`, `Debug`, and have a static lifetime.
101 ///
102 /// # Arguments
103 ///
104 /// * `config` - The `ActorConfig` to use for creating the actor.
105 ///
106 /// # Returns
107 ///
108 /// A `ManagedActor` in the `Idle` state with the specified `State` and configuration.
109 pub async fn create_actor_with_config<State>(
110 &mut self,
111 mut config: AgentConfig,
112 ) -> ManagedAgent<Idle, State>
113 where
114 State: Default + Send + Debug + 'static,
115 {
116 let acton_ready = self.clone();
117 // we should make sure the config has a broker, if it doesn't, we should provide it from self.0.broker
118 if config.broker.is_none() {
119 config.broker = Some(self.0.broker.clone());
120 }
121 let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
122 trace!("Created new actor with id {}", new_agent.id);
123 self.0.roots.insert(new_agent.id.clone(), new_agent.handle.clone());
124 new_agent
125 }
126
127 /// Retrieves the broker reference for the system.
128 ///
129 /// # Returns
130 ///
131 /// A clone of the `BrokerRef` associated with this `SystemReady` instance.
132 pub fn broker(&self) -> BrokerRef {
133 self.0.broker.clone()
134 }
135
136 /// Spawns an actor with a custom setup function and configuration.
137 ///
138 /// # Type Parameters
139 ///
140 /// * `State` - The state type of the actor, which must implement `Default`, `Send`, `Debug`, and have a static lifetime.
141 ///
142 /// # Arguments
143 ///
144 /// * `config` - The `ActorConfig` to use for creating the actor.
145 /// * `setup_fn` - A function that takes a `ManagedActor` and returns a `Future` resolving to an `ActorRef`.
146 ///
147 /// # Returns
148 ///
149 /// A `Result` containing the `ActorRef` of the spawned actor, or an error if the spawn failed.
150 pub async fn spawn_agent_with_setup_fn<State>(
151 &mut self,
152 mut config: AgentConfig,
153 setup_fn: impl FnOnce(
154 ManagedAgent<Idle, State>,
155 ) -> Pin<Box<dyn Future<Output=AgentHandle> + Send + 'static>>,
156 ) -> anyhow::Result<AgentHandle>
157 where
158 State: Default + Send + Debug + 'static,
159 {
160 let acton_ready = self.clone();
161 if config.broker.is_none() {
162 config.broker = Some(self.0.broker.clone());
163 }
164
165 let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
166 let handle = setup_fn(new_agent).await;
167 self.0.roots.insert(handle.id.clone(), handle.clone());
168 Ok(handle)
169 }
170
171 /// Shuts down the Acton system, stopping all actors and their children.
172 pub async fn shutdown_all(&mut self) -> anyhow::Result<()> {
173 // Collect all suspend futures into a vector
174 let suspend_futures = self.0.roots.iter().map(|item| {
175 let root_actor = item.value().clone(); // Clone to take ownership
176 async move {
177 root_actor.stop().await
178 }
179 });
180
181 // Wait for all actors to suspend concurrently
182 let results: Vec<anyhow::Result<()>> = join_all(suspend_futures).await;
183
184 // Check for any errors
185 for result in results {
186 result?;
187 }
188 self.0.broker.stop().await?;
189
190 Ok(())
191 }
192
193 /// Spawns an actor with a custom setup function and default configuration.
194 ///
195 /// # Type Parameters
196 ///
197 /// * `State` - The state type of the actor, which must implement `Default`, `Send`, `Debug`, and have a static lifetime.
198 ///
199 /// # Arguments
200 ///
201 /// * `setup_fn` - A function that takes a `ManagedActor` and returns a `Future` resolving to an `ActorRef`.
202 ///
203 /// # Returns
204 ///
205 /// A `Result` containing the `ActorRef` of the spawned actor, or an error if the spawn failed.
206 pub async fn spawn_actor<State>(
207 &mut self,
208 setup_fn: impl FnOnce(
209 ManagedAgent<Idle, State>,
210 ) -> Pin<Box<dyn Future<Output=AgentHandle> + Send + 'static>>,
211 ) -> anyhow::Result<AgentHandle>
212 where
213 State: Default + Send + Debug + 'static,
214 {
215 let broker = self.broker();
216 let mut config = AgentConfig::new(Ern::default(), None, Some(broker.clone()))?;
217 if config.broker.is_none() {
218 config.broker = Some(self.0.broker.clone());
219 }
220 let runtime = self.clone();
221 let new_agent = ManagedAgent::new(&Some(runtime), Some(config)).await;
222 let handle = setup_fn(new_agent).await;
223 self.0.roots.insert(handle.id.clone(), handle.clone());
224 Ok(handle)
225 }
226}
227
228impl From<ActonApp> for AgentRuntime {
229 fn from(_acton: ActonApp) -> Self {
230 let (sender, receiver) = oneshot::channel();
231
232 tokio::spawn(async move {
233 let broker = AgentBroker::initialize().await;
234 let _ = sender.send(broker);
235 });
236
237 let broker = tokio::task::block_in_place(|| {
238 tokio::runtime::Handle::current()
239 .block_on(async { receiver.await.expect("Broker initialization failed") })
240 });
241
242 AgentRuntime(ActonInner { broker: broker.clone(), ..Default::default() })
243 }
244}