acton_core/actor/managed_agent/idle.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::TypeId;
18use std::fmt::Debug;
19use std::future::Future;
20use std::mem;
21
22use acton_ern::Ern;
23use tokio::sync::mpsc::channel;
24use tracing::*;
25
26use crate::actor::{AgentConfig, ManagedAgent, Started};
27use crate::common::{
28 ActonInner, AgentHandle, AgentRuntime, Envelope, FutureBox, OutboundEnvelope, ReactorItem,
29};
30use crate::message::MessageContext;
31use crate::prelude::ActonMessage;
32use crate::traits::AgentHandleInterface;
33
34/// Type-state marker for a [`ManagedAgent`] that has been configured but not yet started.
35///
36/// When a `ManagedAgent` is in the `Idle` state, it can be configured with message handlers
37/// (via [`ManagedAgent::act_on`]) and lifecycle hooks (e.g., [`ManagedAgent::before_start`],
38/// [`ManagedAgent::after_stop`]). Once configuration is complete, the agent can be
39/// transitioned to the [`Started`](super::started::Started) state by calling [`ManagedAgent::start`].
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] // Add common derives
41pub struct Idle;
42
43use crate::common::ErrorHandler;
44use std::collections::HashMap;
45
46impl<State: Default + Send + Debug + 'static> ManagedAgent<Idle, State> {
47 /// Registers an asynchronous message handler for a specific message type `M`.
48 ///
49 /// This method is called during the agent's configuration phase (while in the `Idle` state).
50 /// It associates a specific message type `M` with a closure (`message_processor`) that
51 /// will be executed when the agent receives a message of that type after it has started.
52 ///
53 /// The framework handles the necessary type erasure and downcasting internally. The
54 /// provided `message_processor` receives the agent (in the `Started` state) and a
55 /// [`MessageContext`] containing the concrete message and metadata.
56 ///
57 /// # Type Parameters
58 ///
59 /// * `M`: The concrete message type this handler will process. Must implement
60 /// [`ActonMessage`], `Clone`, `Send`, `Sync`, and be `'static`.
61 ///
62 /// # Arguments
63 ///
64 /// * `message_processor`: An asynchronous closure that takes the agent (`&mut ManagedAgent<Started, State>`)
65 /// and the message context (`&mut MessageContext<M>`) and returns a `Future`
66 /// (specifically, a [`FutureBox`]). This closure contains the logic for handling messages of type `M`.
67 ///
68 /// # Returns
69 ///
70 /// Returns a mutable reference to `self` to allow for method chaining during configuration.
71 #[instrument(skip(self, message_processor), level = "debug")]
72 #[deprecated(
73 note = "act_on for handlers returning () will be deprecated in the next version. Use act_on_result for Result-returning handlers."
74 )]
75 pub fn act_on<M>(
76 &mut self,
77 message_processor: impl for<'a> Fn(&'a mut ManagedAgent<Started, State>, &'a mut MessageContext<M>) -> FutureBox
78 + Send
79 + Sync
80 + 'static,
81 ) -> &mut Self
82 where
83 M: ActonMessage + Clone + Send + Sync + 'static,
84 {
85 let type_id = TypeId::of::<M>();
86 trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding legacy message handler (will be deprecated)");
87 let handler_box = Box::new(
88 move |actor: &mut ManagedAgent<Started, State>, envelope: &mut Envelope| -> FutureBox {
89 if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
90 trace!(
91 "Downcast successful for message type: {}",
92 std::any::type_name::<M>()
93 );
94 let mut msg_context = {
95 let origin_envelope = OutboundEnvelope::new_with_recipient(
96 envelope.reply_to.clone(),
97 envelope.recipient.clone(),
98 );
99 let reply_envelope = OutboundEnvelope::new_with_recipient(
100 envelope.recipient.clone(),
101 envelope.reply_to.clone(),
102 );
103 MessageContext {
104 message: concrete_msg.clone(),
105 timestamp: envelope.timestamp,
106 origin_envelope,
107 reply_envelope,
108 }
109 };
110 message_processor(actor, &mut msg_context)
111 } else {
112 error!(
113 type_name = std::any::type_name::<M>(),
114 "Message handler called with incompatible message type (downcast failed)"
115 );
116 Box::pin(async {})
117 }
118 },
119 );
120 self.message_handlers
121 .insert(type_id, ReactorItem::FutureReactor(handler_box));
122 self
123 }
124
125 /// Registers an asynchronous message handler for a specific message type `M` that returns a Result (new style, preferred).
126 pub fn act_on_result<M, E, Fut>(
127 &mut self,
128 message_processor: impl for<'a> Fn(&'a mut ManagedAgent<Started, State>, &'a mut MessageContext<M>) -> Fut
129 + Send
130 + Sync
131 + 'static,
132 ) -> &mut Self
133 where
134 M: ActonMessage + Clone + Send + Sync + 'static,
135 E: std::error::Error + Send + Sync + 'static,
136 Fut: std::future::Future<Output = Result<(), E>> + Send + Sync + 'static,
137 {
138 let type_id = TypeId::of::<M>();
139 trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding Result-returning message handler");
140 let handler_box = Box::new(
141 move |actor: &mut ManagedAgent<Started, State>,
142 envelope: &mut Envelope|
143 -> crate::common::FutureBoxResult {
144 if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
145 trace!(
146 "Downcast successful for message type: {}",
147 std::any::type_name::<M>()
148 );
149 let mut msg_context = {
150 let origin_envelope = OutboundEnvelope::new_with_recipient(
151 envelope.reply_to.clone(),
152 envelope.recipient.clone(),
153 );
154 let reply_envelope = OutboundEnvelope::new_with_recipient(
155 envelope.recipient.clone(),
156 envelope.reply_to.clone(),
157 );
158 MessageContext {
159 message: concrete_msg.clone(),
160 timestamp: envelope.timestamp,
161 origin_envelope,
162 reply_envelope,
163 }
164 };
165 let fut = message_processor(actor, &mut msg_context);
166 Box::pin(async move {
167 match fut.await {
168 Ok(()) => Ok(()),
169 Err(e) => Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>),
170 }
171 })
172 } else {
173 error!(
174 type_name = std::any::type_name::<M>(),
175 "Result handler called with incompatible message type (downcast failed)"
176 );
177 Box::pin(async { Ok(()) })
178 }
179 },
180 );
181 self.message_handlers
182 .insert(type_id, ReactorItem::FutureReactorResult(handler_box));
183 self
184 }
185
186 /// Registers an asynchronous hook to be executed *after* the agent successfully starts its message loop.
187 ///
188 /// This hook is called once, shortly after the agent transitions to the `Started` state
189 /// and its main task begins processing messages. It receives an immutable reference
190 /// to the agent in the `Started` state.
191 ///
192 /// # Arguments
193 ///
194 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
195 ///
196 /// # Returns
197 ///
198 /// Returns a mutable reference to `self` for chaining.
199 pub fn after_start<F, Fut>(&mut self, f: F) -> &mut Self
200 where
201 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
202 Fut: Future<Output = ()> + Send + Sync + 'static,
203 {
204 self.after_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
205 self
206 }
207
208 /// Registers an asynchronous hook to be executed *before* the agent starts its message loop.
209 ///
210 /// This hook is called once, just before the agent's main task (`wake`) is spawned
211 /// during the `start` process. It receives an immutable reference to the agent,
212 /// technically still in the `Started` state contextually, though the loop hasn't begun.
213 ///
214 /// # Arguments
215 ///
216 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
217 ///
218 /// # Returns
219 ///
220 /// Returns a mutable reference to `self` for chaining.
221 pub fn before_start<F, Fut>(&mut self, f: F) -> &mut Self
222 where
223 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
224 Fut: Future<Output = ()> + Send + Sync + 'static,
225 {
226 self.before_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
227 self
228 }
229
230 /// Registers an asynchronous hook to be executed *after* the agent stops processing messages.
231 ///
232 /// This hook is called once when the agent's main loop terminates gracefully (e.g., upon
233 /// receiving a `Terminate` signal or when the inbox closes). It receives an immutable
234 /// reference to the agent in the `Started` state context.
235 ///
236 /// # Arguments
237 ///
238 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
239 ///
240 /// # Returns
241 ///
242 /// Returns a mutable reference to `self` for chaining.
243 pub fn after_stop<F, Fut>(&mut self, f: F) -> &mut Self
244 where
245 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
246 Fut: Future<Output = ()> + Send + Sync + 'static,
247 {
248 self.after_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
249 self
250 }
251
252 /// Registers an asynchronous hook to be executed *before* the agent stops processing messages.
253 ///
254 /// This hook is called once, just before the agent's main loop begins its shutdown sequence
255 /// (e.g., after receiving `Terminate` but before fully stopping). It receives an immutable
256 /// reference to the agent in the `Started` state.
257 ///
258 /// # Arguments
259 ///
260 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
261 ///
262 /// # Returns
263 ///
264 /// Returns a mutable reference to `self` for chaining.
265 pub fn before_stop<F, Fut>(&mut self, f: F) -> &mut Self
266 where
267 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
268 Fut: Future<Output = ()> + Send + Sync + 'static,
269 {
270 self.before_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
271 self
272 }
273
274 /// Creates the configuration for a new child agent under this agent's supervision.
275 ///
276 /// This method generates a `ManagedAgent<Idle, State>` instance pre-configured
277 /// to be a child of the current agent. It automatically derives a hierarchical
278 /// [`Ern`] for the child based on the parent's ID and the provided `name`.
279 /// The child inherits the parent's broker reference.
280 ///
281 /// The returned agent is in the `Idle` state and still needs to be configured
282 /// (e.g., with `act_on`, lifecycle hooks) and then started using its `start` method.
283 /// The parent agent typically calls `handle.supervise(child_handle)` after the child
284 /// is started to register it formally.
285 ///
286 /// # Arguments
287 ///
288 /// * `name`: The name segment for the child agent's [`Ern`].
289 ///
290 /// # Returns
291 ///
292 /// Returns a `Result` containing a new `ManagedAgent` instance for the child
293 /// in the `Idle` state, ready for further configuration.
294 ///
295 /// # Errors
296 ///
297 /// Returns an error if creating the child's `Ern` fails or if creating the
298 /// `AgentConfig` fails (e.g., parsing the parent ID).
299 #[instrument(skip(self))]
300 pub async fn create_child(&self, name: String) -> anyhow::Result<ManagedAgent<Idle, State>> {
301 // Configure the child with parent and broker references.
302 let config = AgentConfig::new(
303 Ern::with_root(name)?, // Child's name segment
304 Some(self.handle.clone()), // Parent handle
305 Some(self.runtime.broker().clone()), // Inherited broker handle
306 )?;
307 // Create the Idle agent using the internal constructor.
308 Ok(ManagedAgent::new(&Some(self.runtime().clone()), Some(config)).await)
309 }
310
311 // Internal constructor - not part of public API documentation
312 #[instrument]
313 pub(crate) async fn new(runtime: &Option<AgentRuntime>, config: Option<AgentConfig>) -> Self {
314 let mut managed_actor: ManagedAgent<Idle, State> = ManagedAgent::default();
315
316 if let Some(app) = runtime {
317 managed_actor.broker = app.0.broker.clone();
318 managed_actor.handle.broker = Box::new(Some(app.0.broker.clone()));
319 }
320
321 if let Some(config) = &config {
322 managed_actor.handle.id = config.id();
323 managed_actor.parent = config.parent().clone();
324 managed_actor.handle.broker = Box::new(config.get_broker().clone());
325 if let Some(broker) = config.get_broker().clone() {
326 managed_actor.broker = broker;
327 }
328 }
329
330 debug_assert!(
331 !managed_actor.inbox.is_closed(),
332 "Agent mailbox is closed in new"
333 );
334
335 trace!("NEW ACTOR: {}", &managed_actor.handle.id());
336
337 managed_actor.runtime = runtime.clone().unwrap_or_else(|| {
338 AgentRuntime(ActonInner {
339 broker: managed_actor.handle.broker.clone().unwrap_or_default(),
340 ..Default::default()
341 })
342 });
343
344 managed_actor.id = managed_actor.handle.id();
345
346 managed_actor
347 }
348
349 /// Starts the agent's processing loop and transitions it to the `Started` state.
350 ///
351 /// This method consumes the `ManagedAgent` in the `Idle` state. It performs the following actions:
352 /// 1. Transitions the agent's type state from `Idle` to [`Started`](super::started::Started).
353 /// 2. Executes the registered `before_start` lifecycle hook.
354 /// 3. Spawns the agent's main asynchronous task (`wake`) which handles message processing.
355 /// 4. Closes the agent's `TaskTracker` to signal that the main task has been spawned.
356 /// 5. Returns the agent's [`AgentHandle`] for external interaction.
357 ///
358 /// After this method returns, the agent is running and ready to process messages sent to its handle.
359 ///
360 /// # Returns
361 ///
362 /// An [`AgentHandle`] that can be used to interact with the now-running agent.
363 #[instrument(skip(self))]
364 pub async fn start(mut self) -> AgentHandle {
365 trace!("Starting agent: {}", self.id());
366 trace!("Model state before start: {:?}", self.model);
367
368 // Take ownership of handlers before converting state.
369 let message_handlers = mem::take(&mut self.message_handlers);
370 let actor_ref = self.handle.clone(); // Clone handle before consuming self.
371
372 // Convert the agent to the Started state.
373 let active_actor: ManagedAgent<Started, State> = self.into();
374 // Leak the agent into a static reference for the spawned task.
375 // The task itself is responsible for managing the agent's lifetime.
376 let actor = Box::leak(Box::new(active_actor));
377
378 trace!("Executing before_start hook for agent: {}", actor.id());
379 (actor.before_start)(actor).await; // Execute before_start hook.
380
381 trace!("Spawning main task (wake) for agent: {}", actor.id());
382 // Spawn the main message processing loop.
383 actor_ref.tracker().spawn(actor.wake(message_handlers));
384 // Close the tracker to indicate the main task is launched.
385 actor_ref.tracker().close();
386
387 trace!("Agent {} started successfully.", actor_ref.id());
388 actor_ref // Return the handle.
389 }
390
391 /// Registers an asynchronous error handler for a specific error type `E`.
392 ///
393 /// This allows the agent to handle errors of type `E` by executing the given closure
394 /// whenever a message handler returns an error of this type.
395 ///
396 /// # Type Parameters
397 ///
398 /// * `E`: The concrete error type to handle. Must implement `std::error::Error` and be `'static`.
399 ///
400 /// # Arguments
401 /// * `error_handler`: The handler closure executed with agent, envelope, and error reference.
402 ///
403 /// # Returns
404 /// A mutable reference to `self` for chaining.
405 pub fn on_error<E>(
406 &mut self,
407 error_handler: impl for<'a, 'b> Fn(
408 &'a mut ManagedAgent<Started, State>,
409 &'b mut crate::message::Envelope,
410 &'b E,
411 ) -> crate::common::FutureBox
412 + Send
413 + Sync
414 + 'static,
415 ) -> &mut Self
416 where
417 E: std::error::Error + 'static,
418 {
419 use std::any::TypeId;
420 // Wrap handler for dynamic dispatch
421 use std::sync::Arc;
422 let handler_box: Arc<Box<crate::common::ErrorHandler<State>>> =
423 Arc::new(Box::new(move |agent, envelope, err| {
424 // Downcast the error to &E
425 if let Some(specific) = err.downcast_ref::<E>() {
426 error_handler(agent, envelope, specific)
427 } else {
428 // If type doesn't match, do nothing
429 Box::pin(async {})
430 }
431 }));
432 self.error_handler_map
433 .insert(TypeId::of::<E>(), handler_box);
434 self
435 }
436}
437
438// --- Utility Function ---
439
440/// Attempts to downcast an `ActonMessage` trait object to a concrete type `T`.
441///
442/// This utility function is used internally by the message dispatch mechanism
443/// (specifically within the closure generated by `act_on`) to safely convert
444/// a type-erased message (`&dyn ActonMessage`) back into its original concrete type (`&T`).
445///
446/// # Type Parameters
447///
448/// * `T`: The concrete message type to attempt downcasting to. Must be `'static`
449/// and implement [`ActonMessage`].
450///
451/// # Arguments
452///
453/// * `msg`: A reference to the `ActonMessage` trait object.
454///
455/// # Returns
456///
457/// * `Some(&T)`: If the trait object `msg` actually holds a value of type `T`.
458/// * `None`: If the trait object does not hold a value of type `T`.
459pub fn downcast_message<T: ActonMessage + 'static>(msg: &dyn ActonMessage) -> Option<&T> {
460 // Use the Any trait's downcast_ref method provided via ActonMessage's supertraits.
461 msg.as_any().downcast_ref::<T>()
462}
463
464// --- Internal Implementations ---
465// (Default, From, default_handler remain internal and undocumented)
466
467impl<State: Default + Send + Debug + 'static> From<ManagedAgent<Idle, State>>
468 for ManagedAgent<Started, State>
469{
470 fn from(value: ManagedAgent<Idle, State>) -> Self {
471 // Move all fields from Idle state to Started state.
472 ManagedAgent::<Started, State> {
473 handle: value.handle,
474 parent: value.parent,
475 halt_signal: value.halt_signal,
476 id: value.id,
477 runtime: value.runtime,
478 model: value.model,
479 tracker: value.tracker,
480 inbox: value.inbox,
481 before_start: value.before_start,
482 after_start: value.after_start,
483 before_stop: value.before_stop,
484 after_stop: value.after_stop,
485 broker: value.broker,
486 message_handlers: value.message_handlers,
487 error_handler_map: value.error_handler_map, // transfer error handlers
488 _actor_state: Default::default(),
489 }
490 }
491}
492
493impl<State: Default + Send + Debug + 'static> Default for ManagedAgent<Idle, State> {
494 fn default() -> Self {
495 let (outbox, inbox) = channel(255); // Default channel size
496 let id: Ern = Default::default();
497 let mut handle: crate::common::AgentHandle = Default::default();
498 handle.id = id.clone();
499 handle.outbox = outbox.clone();
500
501 ManagedAgent::<Idle, State> {
502 handle,
503 id,
504 inbox,
505 // Initialize lifecycle hooks with default no-op handlers.
506 before_start: Box::new(|_| default_handler()),
507 after_start: Box::new(|_| default_handler()),
508 before_stop: Box::new(|_| default_handler()),
509 after_stop: Box::new(|_| default_handler()),
510 model: State::default(),
511 broker: Default::default(),
512 error_handler_map: std::collections::HashMap::new(),
513 parent: Default::default(),
514 runtime: Default::default(),
515 halt_signal: Default::default(),
516 tracker: Default::default(),
517 message_handlers: Default::default(),
518 _actor_state: Default::default(),
519 }
520 }
521}
522
523// Default no-op async handler for lifecycle events.
524fn default_handler() -> FutureBox {
525 Box::pin(async {})
526}