acton_core/actor/managed_agent/idle.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::TypeId;
18use std::fmt::Debug;
19use std::future::Future;
20use std::mem;
21
22use acton_ern::{Ern};
23use tokio::sync::mpsc::channel;
24use tracing::*;
25
26use crate::actor::{AgentConfig, ManagedAgent, Started};
27use crate::common::{ActonInner, AgentHandle, AgentRuntime,Envelope, FutureBox, OutboundEnvelope, ReactorItem};
28use crate::message::MessageContext;
29use crate::prelude::ActonMessage;
30use crate::traits::AgentHandleInterface;
31
32/// Type-state marker for a [`ManagedAgent`] that has been configured but not yet started.
33///
34/// When a `ManagedAgent` is in the `Idle` state, it can be configured with message handlers
35/// (via [`ManagedAgent::act_on`]) and lifecycle hooks (e.g., [`ManagedAgent::before_start`],
36/// [`ManagedAgent::after_stop`]). Once configuration is complete, the agent can be
37/// transitioned to the [`Started`](super::started::Started) state by calling [`ManagedAgent::start`].
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] // Add common derives
39pub struct Idle;
40
41impl<State: Default + Send + Debug + 'static> ManagedAgent<Idle, State> {
42 /// Registers an asynchronous message handler for a specific message type `M`.
43 ///
44 /// This method is called during the agent's configuration phase (while in the `Idle` state).
45 /// It associates a specific message type `M` with a closure (`message_processor`) that
46 /// will be executed when the agent receives a message of that type after it has started.
47 ///
48 /// The framework handles the necessary type erasure and downcasting internally. The
49 /// provided `message_processor` receives the agent (in the `Started` state) and a
50 /// [`MessageContext`] containing the concrete message and metadata.
51 ///
52 /// # Type Parameters
53 ///
54 /// * `M`: The concrete message type this handler will process. Must implement
55 /// [`ActonMessage`], `Clone`, `Send`, `Sync`, and be `'static`.
56 ///
57 /// # Arguments
58 ///
59 /// * `message_processor`: An asynchronous closure that takes the agent (`&mut ManagedAgent<Started, State>`)
60 /// and the message context (`&mut MessageContext<M>`) and returns a `Future`
61 /// (specifically, a [`FutureBox`]). This closure contains the logic for handling messages of type `M`.
62 ///
63 /// # Returns
64 ///
65 /// Returns a mutable reference to `self` to allow for method chaining during configuration.
66 #[instrument(skip(self, message_processor), level = "debug")]
67 pub fn act_on<M>(
68 &mut self,
69 message_processor: impl for<'a> Fn(
70 &'a mut ManagedAgent<Started, State>,
71 &'a mut MessageContext<M>,
72 ) -> FutureBox
73 + Send
74 + Sync
75 + 'static,
76 ) -> &mut Self
77 where
78 M: ActonMessage + Clone + Send + Sync + 'static,
79 {
80 let type_id = TypeId::of::<M>();
81 trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding message handler");
82 // Create a boxed handler that performs downcasting and calls the user's processor.
83 let handler_box = Box::new(
84 move |actor: &mut ManagedAgent<Started, State>,
85 envelope: &mut Envelope|
86 -> FutureBox {
87 // Downcast the trait object message back to the concrete type M.
88 if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
89 trace!(
90 "Downcast successful for message type: {}",
91 std::any::type_name::<M>()
92 );
93
94 // Prepare the MessageContext for the handler.
95 let mut msg_context = {
96 let origin_envelope = OutboundEnvelope::new_with_recipient(envelope.reply_to.clone(), envelope.recipient.clone());
97 let reply_envelope = OutboundEnvelope::new_with_recipient(envelope.recipient.clone(), envelope.reply_to.clone());
98 MessageContext {
99 message: concrete_msg.clone(),
100 timestamp: envelope.timestamp,
101 origin_envelope,
102 reply_envelope,
103 }
104 };
105
106 // Call the user-provided message processor.
107 message_processor(actor, &mut msg_context) // Return the FutureBox directly
108 } else {
109 // This should ideally not happen if type registration is correct.
110 error!(
111 type_name = std::any::type_name::<M>(),
112 "Message handler called with incompatible message type (downcast failed)"
113 );
114 Box::pin(async {}) // Return an empty future on error.
115 }
116 },
117 );
118
119 // Store the type-erased handler.
120 self.message_handlers.insert(type_id, ReactorItem::FutureReactor(handler_box));
121 self
122 }
123
124
125 /// Registers an asynchronous hook to be executed *after* the agent successfully starts its message loop.
126 ///
127 /// This hook is called once, shortly after the agent transitions to the `Started` state
128 /// and its main task begins processing messages. It receives an immutable reference
129 /// to the agent in the `Started` state.
130 ///
131 /// # Arguments
132 ///
133 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
134 ///
135 /// # Returns
136 ///
137 /// Returns a mutable reference to `self` for chaining.
138 pub fn after_start<F, Fut>(&mut self, f: F) -> &mut Self
139 where
140 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
141 Fut: Future<Output=()> + Send + Sync + 'static,
142 {
143 self.after_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
144 self
145 }
146
147 /// Registers an asynchronous hook to be executed *before* the agent starts its message loop.
148 ///
149 /// This hook is called once, just before the agent's main task (`wake`) is spawned
150 /// during the `start` process. It receives an immutable reference to the agent,
151 /// technically still in the `Started` state contextually, though the loop hasn't begun.
152 ///
153 /// # Arguments
154 ///
155 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
156 ///
157 /// # Returns
158 ///
159 /// Returns a mutable reference to `self` for chaining.
160 pub fn before_start<F, Fut>(&mut self, f: F) -> &mut Self
161 where
162 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
163 Fut: Future<Output=()> + Send + Sync + 'static,
164 {
165 self.before_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
166 self
167 }
168
169 /// Registers an asynchronous hook to be executed *after* the agent stops processing messages.
170 ///
171 /// This hook is called once when the agent's main loop terminates gracefully (e.g., upon
172 /// receiving a `Terminate` signal or when the inbox closes). It receives an immutable
173 /// reference to the agent in the `Started` state context.
174 ///
175 /// # Arguments
176 ///
177 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
178 ///
179 /// # Returns
180 ///
181 /// Returns a mutable reference to `self` for chaining.
182 pub fn after_stop<F, Fut>(&mut self, f: F) -> &mut Self
183 where
184 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
185 Fut: Future<Output=()> + Send + Sync + 'static,
186 {
187 self.after_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
188 self
189 }
190
191 /// Registers an asynchronous hook to be executed *before* the agent stops processing messages.
192 ///
193 /// This hook is called once, just before the agent's main loop begins its shutdown sequence
194 /// (e.g., after receiving `Terminate` but before fully stopping). It receives an immutable
195 /// reference to the agent in the `Started` state.
196 ///
197 /// # Arguments
198 ///
199 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
200 ///
201 /// # Returns
202 ///
203 /// Returns a mutable reference to `self` for chaining.
204 pub fn before_stop<F, Fut>(&mut self, f: F) -> &mut Self
205 where
206 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
207 Fut: Future<Output=()> + Send + Sync + 'static,
208 {
209 self.before_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
210 self
211 }
212
213 /// Creates the configuration for a new child agent under this agent's supervision.
214 ///
215 /// This method generates a `ManagedAgent<Idle, State>` instance pre-configured
216 /// to be a child of the current agent. It automatically derives a hierarchical
217 /// [`Ern`] for the child based on the parent's ID and the provided `name`.
218 /// The child inherits the parent's broker reference.
219 ///
220 /// The returned agent is in the `Idle` state and still needs to be configured
221 /// (e.g., with `act_on`, lifecycle hooks) and then started using its `start` method.
222 /// The parent agent typically calls `handle.supervise(child_handle)` after the child
223 /// is started to register it formally.
224 ///
225 /// # Arguments
226 ///
227 /// * `name`: The name segment for the child agent's [`Ern`].
228 ///
229 /// # Returns
230 ///
231 /// Returns a `Result` containing a new `ManagedAgent` instance for the child
232 /// in the `Idle` state, ready for further configuration.
233 ///
234 /// # Errors
235 ///
236 /// Returns an error if creating the child's `Ern` fails or if creating the
237 /// `AgentConfig` fails (e.g., parsing the parent ID).
238 #[instrument(skip(self))]
239 pub async fn create_child(&self, name: String) -> anyhow::Result<ManagedAgent<Idle, State>> {
240 // Configure the child with parent and broker references.
241 let config = AgentConfig::new(
242 Ern::with_root(name)?, // Child's name segment
243 Some(self.handle.clone()), // Parent handle
244 Some(self.runtime.broker().clone()) // Inherited broker handle
245 )?;
246 // Create the Idle agent using the internal constructor.
247 Ok(ManagedAgent::new(&Some(self.runtime().clone()), Some(config)).await)
248 }
249
250 // Internal constructor - not part of public API documentation
251 #[instrument]
252 pub(crate) async fn new(runtime: &Option<AgentRuntime>, config: Option<AgentConfig>) -> Self {
253 let mut managed_actor: ManagedAgent<Idle, State> = ManagedAgent::default();
254
255 if let Some(app) = runtime {
256 managed_actor.broker = app.0.broker.clone();
257 managed_actor.handle.broker = Box::new(Some(app.0.broker.clone()));
258 }
259
260 if let Some(config) = &config {
261 managed_actor.handle.id = config.id();
262 managed_actor.parent = config.parent().clone();
263 managed_actor.handle.broker = Box::new(config.get_broker().clone());
264 if let Some(broker) = config.get_broker().clone() {
265 managed_actor.broker = broker;
266 }
267 }
268
269 debug_assert!(
270 !managed_actor.inbox.is_closed(),
271 "Agent mailbox is closed in new"
272 );
273
274 trace!("NEW ACTOR: {}", &managed_actor.handle.id());
275
276 managed_actor.runtime = runtime.clone().unwrap_or_else(|| AgentRuntime(ActonInner {
277 broker: managed_actor.handle.broker.clone().unwrap_or_default(),
278 ..Default::default()
279 }));
280
281 managed_actor.id = managed_actor.handle.id();
282
283 managed_actor
284 }
285
286 /// Starts the agent's processing loop and transitions it to the `Started` state.
287 ///
288 /// This method consumes the `ManagedAgent` in the `Idle` state. It performs the following actions:
289 /// 1. Transitions the agent's type state from `Idle` to [`Started`](super::started::Started).
290 /// 2. Executes the registered `before_start` lifecycle hook.
291 /// 3. Spawns the agent's main asynchronous task (`wake`) which handles message processing.
292 /// 4. Closes the agent's `TaskTracker` to signal that the main task has been spawned.
293 /// 5. Returns the agent's [`AgentHandle`] for external interaction.
294 ///
295 /// After this method returns, the agent is running and ready to process messages sent to its handle.
296 ///
297 /// # Returns
298 ///
299 /// An [`AgentHandle`] that can be used to interact with the now-running agent.
300 #[instrument(skip(self))]
301 pub async fn start(mut self) -> AgentHandle {
302 trace!("Starting agent: {}", self.id());
303 trace!("Model state before start: {:?}", self.model);
304
305 // Take ownership of handlers before converting state.
306 let message_handlers = mem::take(&mut self.message_handlers);
307 let actor_ref = self.handle.clone(); // Clone handle before consuming self.
308
309 // Convert the agent to the Started state.
310 let active_actor: ManagedAgent<Started, State> = self.into();
311 // Leak the agent into a static reference for the spawned task.
312 // The task itself is responsible for managing the agent's lifetime.
313 let actor = Box::leak(Box::new(active_actor));
314
315 trace!("Executing before_start hook for agent: {}", actor.id());
316 (actor.before_start)(actor).await; // Execute before_start hook.
317
318 trace!("Spawning main task (wake) for agent: {}", actor.id());
319 // Spawn the main message processing loop.
320 actor_ref.tracker().spawn(actor.wake(message_handlers));
321 // Close the tracker to indicate the main task is launched.
322 actor_ref.tracker().close();
323
324 trace!("Agent {} started successfully.", actor_ref.id());
325 actor_ref // Return the handle.
326 }
327}
328
329// --- Utility Function ---
330
331/// Attempts to downcast an `ActonMessage` trait object to a concrete type `T`.
332///
333/// This utility function is used internally by the message dispatch mechanism
334/// (specifically within the closure generated by `act_on`) to safely convert
335/// a type-erased message (`&dyn ActonMessage`) back into its original concrete type (`&T`).
336///
337/// # Type Parameters
338///
339/// * `T`: The concrete message type to attempt downcasting to. Must be `'static`
340/// and implement [`ActonMessage`].
341///
342/// # Arguments
343///
344/// * `msg`: A reference to the `ActonMessage` trait object.
345///
346/// # Returns
347///
348/// * `Some(&T)`: If the trait object `msg` actually holds a value of type `T`.
349/// * `None`: If the trait object does not hold a value of type `T`.
350pub fn downcast_message<T: ActonMessage + 'static>(msg: &dyn ActonMessage) -> Option<&T> {
351 // Use the Any trait's downcast_ref method provided via ActonMessage's supertraits.
352 msg.as_any().downcast_ref::<T>()
353}
354
355// --- Internal Implementations ---
356// (Default, From, default_handler remain internal and undocumented)
357
358impl<State: Default + Send + Debug + 'static> From<ManagedAgent<Idle, State>>
359for ManagedAgent<Started, State>
360{
361 fn from(value: ManagedAgent<Idle, State>) -> Self {
362 // Move all fields from Idle state to Started state.
363 ManagedAgent::<Started, State> {
364 handle: value.handle,
365 parent: value.parent,
366 halt_signal: value.halt_signal,
367 id: value.id,
368 runtime: value.runtime,
369 model: value.model,
370 tracker: value.tracker,
371 inbox: value.inbox,
372 before_start: value.before_start,
373 after_start: value.after_start,
374 before_stop: value.before_stop,
375 after_stop: value.after_stop,
376 broker: value.broker,
377 message_handlers: value.message_handlers,
378 _actor_state: Default::default(),
379 }
380 }
381}
382
383impl<State: Default + Send + Debug + 'static> Default
384for ManagedAgent<Idle, State>
385{
386 fn default() -> Self {
387 let (outbox, inbox) = channel(255); // Default channel size
388 let id: Ern = Default::default();
389 let mut handle: AgentHandle = Default::default();
390 handle.id = id.clone();
391 handle.outbox = outbox.clone();
392
393 ManagedAgent::<Idle, State> {
394 handle,
395 id,
396 inbox,
397 // Initialize lifecycle hooks with default no-op handlers.
398 before_start: Box::new(|_| default_handler()),
399 after_start: Box::new(|_| default_handler()),
400 before_stop: Box::new(|_| default_handler()),
401 after_stop: Box::new(|_| default_handler()),
402 model: State::default(),
403 broker: Default::default(),
404 parent: Default::default(),
405 runtime: Default::default(),
406 halt_signal: Default::default(),
407 tracker: Default::default(),
408 message_handlers: Default::default(),
409 _actor_state: Default::default(),
410 }
411 }
412}
413
414// Default no-op async handler for lifecycle events.
415fn default_handler() -> FutureBox {
416 Box::pin(async {})
417}