acton_core/common/
agent_runtime.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20
21use acton_ern::Ern;
22use futures::future::join_all;
23use tokio::sync::oneshot;
24use tracing::trace;
25
26use crate::actor::{AgentConfig, Idle, ManagedAgent};
27use crate::common::{ActonApp, AgentBroker, AgentHandle, BrokerRef};
28use crate::common::acton_inner::ActonInner;
29use crate::traits::Actor;
30
31/// Represents a ready state of the Acton system.
32///
33/// This struct encapsulates the internal state of the Acton system when it's ready for use.
34/// It provides methods for creating and managing actors within the system.
35#[derive(Debug, Clone, Default)]
36pub struct AgentRuntime(pub(crate) ActonInner);
37
38impl AgentRuntime {
39    /// Creates a new actor with the provided id root name.
40    ///
41    /// # Type Parameters
42    ///
43    /// * `State` - The state type of the actor, which must implement `Default`, `Send`, `Debug`, and have a static lifetime.
44    ///
45    /// # Returns
46    ///
47    /// A `ManagedActor` in the `Idle` state with the specified `State`.
48    pub async fn new_agent_with_name<State>(&mut self, name: String) -> ManagedAgent<Idle, State>
49    where
50        State: Default + Send + Debug + 'static,
51    {
52        let actor_config = AgentConfig::new(
53            Ern::with_root(name).unwrap(),
54            None,
55            Some(self.0.broker.clone()),
56        ).expect("Failed to create actor config");
57
58        // let broker = self.0.broker.clone();
59        let runtime = self.clone();
60        let new_actor = ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
61        self.0.roots.insert(new_actor.id.clone(), new_actor.handle.clone());
62        new_actor
63    }
64
65    /// Creates a new actor with default configuration.
66    ///
67    /// # Type Parameters
68    ///
69    /// * `State` - The state type of the actor, which must implement `Default`, `Send`, `Debug`, and have a static lifetime.
70    ///
71    /// # Returns
72    ///
73    /// A `ManagedActor` in the `Idle` state with the specified `State`.
74    pub async fn new_agent<State>(&mut self) -> ManagedAgent<Idle, State>
75    where
76        State: Default + Send + Debug + 'static,
77    {
78        let actor_config = AgentConfig::new(
79            Ern::with_root("agent").unwrap(),
80            None,
81            Some(self.0.broker.clone()),
82        ).expect("Failed to create actor config");
83
84        // let broker = self.0.broker.clone();
85        let runtime = self.clone();
86        let new_actor = ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
87        self.0.roots.insert(new_actor.id.clone(), new_actor.handle.clone());
88        new_actor
89    }
90
91    /// Retrieves the number of actors currently running in the system.
92    pub fn agent_count(&self) -> usize {
93        self.0.roots.len()
94    }
95
96    /// Creates a new actor with a specified configuration.
97    ///
98    /// # Type Parameters
99    ///
100    /// * `State` - The state type of the actor, which must implement `Default`, `Send`, `Debug`, and have a static lifetime.
101    ///
102    /// # Arguments
103    ///
104    /// * `config` - The `ActorConfig` to use for creating the actor.
105    ///
106    /// # Returns
107    ///
108    /// A `ManagedActor` in the `Idle` state with the specified `State` and configuration.
109    pub async fn create_actor_with_config<State>(
110        &mut self,
111        mut config: AgentConfig,
112    ) -> ManagedAgent<Idle, State>
113    where
114        State: Default + Send + Debug + 'static,
115    {
116        let acton_ready = self.clone();
117        // we should make sure the config has a broker, if it doesn't, we should provide it from self.0.broker
118        if config.broker.is_none() {
119            config.broker = Some(self.0.broker.clone());
120        }
121        let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
122        trace!("Created new actor with id {}", new_agent.id);
123        self.0.roots.insert(new_agent.id.clone(), new_agent.handle.clone());
124        new_agent
125    }
126
127    /// Retrieves the broker reference for the system.
128    ///
129    /// # Returns
130    ///
131    /// A clone of the `BrokerRef` associated with this `SystemReady` instance.
132    pub fn broker(&self) -> BrokerRef {
133        self.0.broker.clone()
134    }
135
136    /// Spawns an actor with a custom setup function and configuration.
137    ///
138    /// # Type Parameters
139    ///
140    /// * `State` - The state type of the actor, which must implement `Default`, `Send`, `Debug`, and have a static lifetime.
141    ///
142    /// # Arguments
143    ///
144    /// * `config` - The `ActorConfig` to use for creating the actor.
145    /// * `setup_fn` - A function that takes a `ManagedActor` and returns a `Future` resolving to an `ActorRef`.
146    ///
147    /// # Returns
148    ///
149    /// A `Result` containing the `ActorRef` of the spawned actor, or an error if the spawn failed.
150    pub async fn spawn_agent_with_setup_fn<State>(
151        &mut self,
152        mut config: AgentConfig,
153        setup_fn: impl FnOnce(
154            ManagedAgent<Idle, State>,
155        ) -> Pin<Box<dyn Future<Output=AgentHandle> + Send + 'static>>,
156    ) -> anyhow::Result<AgentHandle>
157    where
158        State: Default + Send + Debug + 'static,
159    {
160        let acton_ready = self.clone();
161        if config.broker.is_none() {
162            config.broker = Some(self.0.broker.clone());
163        }
164
165        let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
166        let handle = setup_fn(new_agent).await;
167        self.0.roots.insert(handle.id.clone(), handle.clone());
168        Ok(handle)
169    }
170
171    /// Shuts down the Acton system, stopping all actors and their children.
172    pub async fn shutdown_all(&mut self) -> anyhow::Result<()> {
173        // Collect all suspend futures into a vector
174        let suspend_futures = self.0.roots.iter().map(|item| {
175            let root_actor = item.value().clone(); // Clone to take ownership
176            async move {
177                root_actor.stop().await
178            }
179        });
180
181        // Wait for all actors to suspend concurrently
182        let results: Vec<anyhow::Result<()>> = join_all(suspend_futures).await;
183
184        // Check for any errors
185        for result in results {
186            result?;
187        }
188        self.0.broker.stop().await?;
189
190        Ok(())
191    }
192
193    /// Spawns an actor with a custom setup function and default configuration.
194    ///
195    /// # Type Parameters
196    ///
197    /// * `State` - The state type of the actor, which must implement `Default`, `Send`, `Debug`, and have a static lifetime.
198    ///
199    /// # Arguments
200    ///
201    /// * `setup_fn` - A function that takes a `ManagedActor` and returns a `Future` resolving to an `ActorRef`.
202    ///
203    /// # Returns
204    ///
205    /// A `Result` containing the `ActorRef` of the spawned actor, or an error if the spawn failed.
206    pub async fn spawn_actor<State>(
207        &mut self,
208        setup_fn: impl FnOnce(
209            ManagedAgent<Idle, State>,
210        ) -> Pin<Box<dyn Future<Output=AgentHandle> + Send + 'static>>,
211    ) -> anyhow::Result<AgentHandle>
212    where
213        State: Default + Send + Debug + 'static,
214    {
215        let broker = self.broker();
216        let mut config = AgentConfig::new(Ern::default(), None, Some(broker.clone()))?;
217        if config.broker.is_none() {
218            config.broker = Some(self.0.broker.clone());
219        }
220        let runtime = self.clone();
221        let new_agent = ManagedAgent::new(&Some(runtime), Some(config)).await;
222        let handle = setup_fn(new_agent).await;
223        self.0.roots.insert(handle.id.clone(), handle.clone());
224        Ok(handle)
225    }
226}
227
228impl From<ActonApp> for AgentRuntime {
229    fn from(_acton: ActonApp) -> Self {
230        let (sender, receiver) = oneshot::channel();
231
232        tokio::spawn(async move {
233            let broker = AgentBroker::initialize().await;
234            let _ = sender.send(broker);
235        });
236
237        let broker = tokio::task::block_in_place(|| {
238            tokio::runtime::Handle::current()
239                .block_on(async { receiver.await.expect("Broker initialization failed") })
240        });
241
242        AgentRuntime(ActonInner { broker: broker.clone(), ..Default::default() })
243    }
244}