acton_core/actor/managed_agent/idle.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::TypeId;
18
19use crate::traits::ActonMessageReply;
20use std::fmt::Debug;
21use std::future::Future;
22use std::mem;
23
24use acton_ern::Ern;
25use tokio::sync::mpsc::channel;
26use tracing::*;
27
28use crate::actor::{AgentConfig, ManagedAgent, Started};
29use crate::common::{
30 AgentHandle, AgentRuntime, Envelope, FutureBox, OutboundEnvelope, ReactorItem,
31};
32use crate::message::MessageContext;
33use crate::prelude::ActonMessage;
34use crate::traits::AgentHandleInterface;
35
36/// Type-state marker for a [`ManagedAgent`] that has been configured but not yet started.
37///
38/// When a `ManagedAgent` is in the `Idle` state, it can be configured with message handlers
39/// (via [`ManagedAgent::act_on`]) and lifecycle hooks (e.g., [`ManagedAgent::before_start`],
40/// [`ManagedAgent::after_stop`]). Once configuration is complete, the agent can be
41/// transitioned to the [`Started`](super::started::Started) state by calling [`ManagedAgent::start`].
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] // Add common derives
43pub struct Idle;
44
45
46impl<State: Default + Send + Debug + 'static> ManagedAgent<Idle, State> {
47 /// Registers an asynchronous message handler for a specific message type `M`.
48 ///
49 /// This method is called during the agent's configuration phase (while in the `Idle` state).
50 /// It associates a specific message type `M` with a closure (`message_processor`) that
51 /// will be executed when the agent receives a message of that type after it has started.
52 ///
53 /// The framework handles the necessary type erasure and downcasting internally. The
54 /// provided `message_processor` receives the agent (in the `Started` state) and a
55 /// [`MessageContext`] containing the concrete message and metadata.
56 ///
57 /// # Type Parameters
58 ///
59 /// * `M`: The concrete message type this handler will process. Must implement
60 /// [`ActonMessage`], `Clone`, `Send`, `Sync`, and be `'static`.
61 ///
62 /// # Arguments
63 ///
64 /// * `message_processor`: An asynchronous closure that takes the agent (`&mut ManagedAgent<Started, State>`)
65 /// and the message context (`&mut MessageContext<M>`) and returns a `Future`
66 /// (specifically, a [`FutureBox`]). This closure contains the logic for handling messages of type `M`.
67 ///
68 /// # Returns
69 ///
70 /// Returns a mutable reference to `self` to allow for method chaining during configuration.
71 #[instrument(skip(self, message_processor), level = "debug")]
72 pub fn act_on<M>(
73 &mut self,
74 message_processor: impl for<'a> Fn(&'a mut ManagedAgent<Started, State>, &'a mut MessageContext<M>) -> FutureBox
75 + Send
76 + Sync
77 + 'static,
78 ) -> &mut Self
79 where
80 M: ActonMessage + Clone + Send + Sync + 'static,
81 {
82 let type_id = TypeId::of::<M>();
83 trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding legacy message handler (will be deprecated)");
84 let handler_box = Box::new(
85 move |actor: &mut ManagedAgent<Started, State>, envelope: &mut Envelope| -> FutureBox {
86 if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
87 trace!(
88 "Downcast successful for message type: {}",
89 std::any::type_name::<M>()
90 );
91 let mut msg_context = {
92 let origin_envelope = OutboundEnvelope::new_with_recipient(
93 envelope.reply_to.clone(),
94 envelope.recipient.clone(),
95 actor.handle.cancellation_token.clone(),
96 );
97 let reply_envelope = OutboundEnvelope::new_with_recipient(
98 envelope.recipient.clone(),
99 envelope.reply_to.clone(),
100 actor.handle.cancellation_token.clone(),
101 );
102 MessageContext {
103 message: concrete_msg.clone(),
104 timestamp: envelope.timestamp,
105 origin_envelope,
106 reply_envelope,
107 }
108 };
109 message_processor(actor, &mut msg_context)
110 } else {
111 error!(
112 type_name = std::any::type_name::<M>(),
113 "Message handler called with incompatible message type (downcast failed)"
114 );
115 Box::pin(async {})
116 }
117 },
118 );
119 self.message_handlers
120 .insert(type_id, ReactorItem::FutureReactor(handler_box));
121 self
122 }
123
124 /// Registers an asynchronous error handler for a specific error type `E`.
125 ///
126 /// This allows the agent to handle errors of type `E` by executing the given closure
127 /// whenever a message handler returns an error of this type.
128 ///
129 /// # Type Parameters
130 ///
131 /// * `E`: The concrete error type to handle. Must implement `std::error::Error` and be `'static`.
132 ///
133 /// # Arguments
134 /// * `error_handler`: The handler closure executed with agent, envelope, and error reference.
135 ///
136 /// # Returns
137 /// A mutable reference to `self` for chaining.
138 pub fn on_error<M, E>(
139 &mut self,
140 error_handler: impl for<'a, 'b> Fn(
141 &'a mut ManagedAgent<Started, State>,
142 &'b mut MessageContext<M>,
143 &'b E,
144 ) -> crate::common::FutureBox
145 + Send
146 + Sync
147 + 'static,
148 ) -> &mut Self
149 where
150 M: ActonMessage + Clone + Send + Sync + 'static,
151 E: std::error::Error + 'static,
152 {
153 use std::any::TypeId;
154 let message_type_id = TypeId::of::<M>();
155 let error_type_id = TypeId::of::<E>();
156
157 // Wrap handler for dynamic dispatch
158 let handler_box: Box<crate::common::ErrorHandler<State>> =
159 Box::new(move |agent, envelope, err| {
160 if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
161 // Downcast the error to &E
162 if let Some(specific_err) = err.downcast_ref::<E>() {
163 let mut msg_context = {
164 let origin_envelope = OutboundEnvelope::new_with_recipient(
165 envelope.reply_to.clone(),
166 envelope.recipient.clone(),
167 agent.handle.cancellation_token.clone(),
168 );
169 let reply_envelope = OutboundEnvelope::new_with_recipient(
170 envelope.recipient.clone(),
171 envelope.reply_to.clone(),
172 agent.handle.cancellation_token.clone(),
173 );
174 MessageContext {
175 message: concrete_msg.clone(),
176 timestamp: envelope.timestamp,
177 origin_envelope,
178 reply_envelope,
179 }
180 };
181 error_handler(agent, &mut msg_context, specific_err)
182 } else {
183 // If type doesn't match, do nothing
184 Box::pin(async {})
185 }
186 } else {
187 // If type doesn't match, do nothing
188 Box::pin(async {})
189 }
190 });
191 self.error_handler_map
192 .insert((message_type_id, error_type_id), handler_box);
193 self
194 }
195 /// Registers an asynchronous message handler for a specific message type `M` that returns a Result (new style, preferred).
196 pub fn act_on_fallible<M, T, E>(
197 &mut self,
198 message_processor: impl for<'a> Fn(
199 &'a mut ManagedAgent<Started, State>,
200 &'a mut MessageContext<M>,
201 ) -> std::pin::Pin<
202 Box<dyn std::future::Future<Output = Result<T, E>> + Send + Sync + 'static>,
203 > + Send
204 + Sync
205 + 'static,
206 ) -> &mut Self
207 where
208 M: ActonMessage + Clone + Send + Sync + 'static,
209 T: ActonMessageReply + 'static,
210 E: std::error::Error + Send + Sync + 'static,
211 {
212 let type_id = TypeId::of::<M>();
213 trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding Result-returning message handler");
214 let handler_box = Box::new(
215 move |actor: &mut ManagedAgent<Started, State>,
216 envelope: &mut Envelope|
217 -> crate::common::FutureBoxResult {
218 if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
219 trace!(
220 "Downcast successful for message type: {}",
221 std::any::type_name::<M>()
222 );
223 let mut msg_context = {
224 let origin_envelope = OutboundEnvelope::new_with_recipient(
225 envelope.reply_to.clone(),
226 envelope.recipient.clone(),
227 actor.handle.cancellation_token.clone(),
228 );
229 let reply_envelope = OutboundEnvelope::new_with_recipient(
230 envelope.recipient.clone(),
231 envelope.reply_to.clone(),
232 actor.handle.cancellation_token.clone(),
233 );
234 MessageContext {
235 message: concrete_msg.clone(),
236 timestamp: envelope.timestamp,
237 origin_envelope,
238 reply_envelope,
239 }
240 };
241 let fut = message_processor(actor, &mut msg_context);
242 Box::pin(async move {
243 match fut.await {
244 Ok(val) => Ok(Box::new(val) as Box<dyn ActonMessageReply + Send>),
245 Err(e) => {
246 let error_type_id = TypeId::of::<E>();
247 Err((
248 Box::new(e) as Box<dyn std::error::Error + Send + Sync>,
249 error_type_id,
250 ))
251 }
252 }
253 })
254 } else {
255 error!(
256 type_name = std::any::type_name::<M>(),
257 "Result handler called with incompatible message type (downcast failed)"
258 );
259 Box::pin(async { Ok(Box::new(()) as Box<dyn ActonMessageReply + Send>) })
260 }
261 },
262 );
263 self.message_handlers
264 .insert(type_id, ReactorItem::FutureReactorResult(handler_box));
265 self
266 }
267
268 /// Registers an asynchronous hook to be executed *after* the agent successfully starts its message loop.
269 ///
270 /// This hook is called once, shortly after the agent transitions to the `Started` state
271 /// and its main task begins processing messages. It receives an immutable reference
272 /// to the agent in the `Started` state.
273 ///
274 /// # Arguments
275 ///
276 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
277 ///
278 /// # Returns
279 ///
280 /// Returns a mutable reference to `self` for chaining.
281 pub fn after_start<F, Fut>(&mut self, f: F) -> &mut Self
282 where
283 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
284 Fut: Future<Output = ()> + Send + Sync + 'static,
285 {
286 self.after_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
287 self
288 }
289
290 /// Registers an asynchronous hook to be executed *before* the agent starts its message loop.
291 ///
292 /// This hook is called once, just before the agent's main task (`wake`) is spawned
293 /// during the `start` process. It receives an immutable reference to the agent,
294 /// technically still in the `Started` state contextually, though the loop hasn't begun.
295 ///
296 /// # Arguments
297 ///
298 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
299 ///
300 /// # Returns
301 ///
302 /// Returns a mutable reference to `self` for chaining.
303 pub fn before_start<F, Fut>(&mut self, f: F) -> &mut Self
304 where
305 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
306 Fut: Future<Output = ()> + Send + Sync + 'static,
307 {
308 self.before_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
309 self
310 }
311
312 /// Registers an asynchronous hook to be executed *after* the agent stops processing messages.
313 ///
314 /// This hook is called once when the agent's main loop terminates gracefully (e.g., upon
315 /// receiving a `Terminate` signal or when the inbox closes). It receives an immutable
316 /// reference to the agent in the `Started` state context.
317 ///
318 /// # Arguments
319 ///
320 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
321 ///
322 /// # Returns
323 ///
324 /// Returns a mutable reference to `self` for chaining.
325 pub fn after_stop<F, Fut>(&mut self, f: F) -> &mut Self
326 where
327 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
328 Fut: Future<Output = ()> + Send + Sync + 'static,
329 {
330 self.after_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
331 self
332 }
333
334 /// Registers an asynchronous hook to be executed *before* the agent stops processing messages.
335 ///
336 /// This hook is called once, just before the agent's main loop begins its shutdown sequence
337 /// (e.g., after receiving `Terminate` but before fully stopping). It receives an immutable
338 /// reference to the agent in the `Started` state.
339 ///
340 /// # Arguments
341 ///
342 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
343 ///
344 /// # Returns
345 ///
346 /// Returns a mutable reference to `self` for chaining.
347 pub fn before_stop<F, Fut>(&mut self, f: F) -> &mut Self
348 where
349 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
350 Fut: Future<Output = ()> + Send + Sync + 'static,
351 {
352 self.before_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
353 self
354 }
355
356 /// Creates the configuration for a new child agent under this agent's supervision.
357 ///
358 /// This method generates a `ManagedAgent<Idle, State>` instance pre-configured
359 /// to be a child of the current agent. It automatically derives a hierarchical
360 /// [`Ern`] for the child based on the parent's ID and the provided `name`.
361 /// The child inherits the parent's broker reference.
362 ///
363 /// The returned agent is in the `Idle` state and still needs to be configured
364 /// (e.g., with `act_on`, lifecycle hooks) and then started using its `start` method.
365 /// The parent agent typically calls `handle.supervise(child_handle)` after the child
366 /// is started to register it formally.
367 ///
368 /// # Arguments
369 ///
370 /// * `name`: The name segment for the child agent's [`Ern`].
371 ///
372 /// # Returns
373 ///
374 /// Returns a `Result` containing a new `ManagedAgent` instance for the child
375 /// in the `Idle` state, ready for further configuration.
376 ///
377 /// # Errors
378 ///
379 /// Returns an error if creating the child's `Ern` fails or if creating the
380 /// `AgentConfig` fails (e.g., parsing the parent ID).
381 #[instrument(skip(self))]
382 pub async fn create_child(&self, name: String) -> anyhow::Result<ManagedAgent<Idle, State>> {
383 // Configure the child with parent and broker references.
384 let config = AgentConfig::new(
385 Ern::with_root(name)?, // Child's name segment
386 Some(self.handle.clone()), // Parent handle
387 Some(self.runtime.broker().clone()), // Inherited broker handle
388 )?;
389 // Create the Idle agent using the internal constructor.
390 Ok(ManagedAgent::new(&Some(self.runtime().clone()), Some(config)).await)
391 }
392
393 // Internal constructor - not part of public API documentation
394 #[instrument]
395 pub(crate) async fn new(runtime: &Option<AgentRuntime>, config: Option<AgentConfig>) -> Self {
396 let mut managed_actor: ManagedAgent<Idle, State> = ManagedAgent::default();
397
398 if let Some(app) = runtime {
399 managed_actor.broker = app.0.broker.clone();
400 managed_actor.handle.broker = Box::new(Some(app.0.broker.clone()));
401 managed_actor.cancellation_token = Some(app.0.cancellation_token.child_token());
402 }
403
404 if let Some(config) = &config {
405 managed_actor.handle.id = config.id();
406 managed_actor.parent = config.parent().clone();
407 managed_actor.handle.broker = Box::new(config.get_broker().clone());
408 if let Some(broker) = config.get_broker().clone() {
409 managed_actor.broker = broker;
410 }
411 }
412
413 debug_assert!(
414 !managed_actor.inbox.is_closed(),
415 "Agent mailbox is closed in new"
416 );
417
418 trace!("NEW ACTOR: {}", &managed_actor.handle.id());
419
420 // Ensure runtime always exists; creating a new one here is an error.
421 assert!(
422 runtime.is_some(),
423 "AgentRuntime must be provided to ManagedAgent::new"
424 );
425 managed_actor.runtime = runtime.clone().unwrap();
426 managed_actor
427 .runtime
428 .0
429 .roots
430 .insert(managed_actor.handle.id(), managed_actor.handle.clone());
431
432 managed_actor.id = managed_actor.handle.id();
433
434 managed_actor
435 }
436
437 /// Starts the agent's processing loop and transitions it to the `Started` state.
438 ///
439 /// This method consumes the `ManagedAgent` in the `Idle` state. It performs the following actions:
440 /// 1. Transitions the agent's type state from `Idle` to [`Started`](super::started::Started).
441 /// 2. Executes the registered `before_start` lifecycle hook.
442 /// 3. Spawns the agent's main asynchronous task (`wake`) which handles message processing.
443 /// 4. Closes the agent's `TaskTracker` to signal that the main task has been spawned.
444 /// 5. Returns the agent's [`AgentHandle`] for external interaction.
445 ///
446 /// After this method returns, the agent is running and ready to process messages sent to its handle.
447 ///
448 /// # Returns
449 ///
450 /// An [`AgentHandle`] that can be used to interact with the now-running agent.
451 #[instrument(skip(self))]
452 pub async fn start(mut self) -> AgentHandle {
453 trace!("Starting agent: {}", self.id());
454 trace!("Model state before start: {:?}", self.model);
455
456 // Take ownership of handlers before converting state.
457 let message_handlers = mem::take(&mut self.message_handlers);
458 let actor_ref = self.handle.clone(); // Clone handle before consuming self.
459
460 // Convert the agent to the Started state.
461 let active_actor: ManagedAgent<Started, State> = self.into();
462 // Leak the agent into a static reference for the spawned task.
463 // The task itself is responsible for managing the agent's lifetime.
464 let actor = Box::leak(Box::new(active_actor));
465
466 trace!("Executing before_start hook for agent: {}", actor.id());
467 (actor.before_start)(actor).await; // Execute before_start hook.
468
469 trace!("Spawning main task (wake) for agent: {}", actor.id());
470 // Spawn the main message processing loop.
471 actor_ref.tracker().spawn(actor.wake(message_handlers));
472 // Close the tracker to indicate the main task is launched.
473 actor_ref.tracker().close();
474
475 trace!("Agent {} started successfully.", actor_ref.id());
476 actor_ref // Return the handle.
477 }
478}
479
480// --- Utility Function ---
481
482/// Attempts to downcast an `ActonMessage` trait object to a concrete type `T`.
483///
484/// This utility function is used internally by the message dispatch mechanism
485/// (specifically within the closure generated by `act_on`) to safely convert
486/// a type-erased message (`&dyn ActonMessage`) back into its original concrete type (`&T`).
487///
488/// # Type Parameters
489///
490/// * `T`: The concrete message type to attempt downcasting to. Must be `'static`
491/// and implement [`ActonMessage`].
492///
493/// # Arguments
494///
495/// * `msg`: A reference to the `ActonMessage` trait object.
496///
497/// # Returns
498///
499/// * `Some(&T)`: If the trait object `msg` actually holds a value of type `T`.
500/// * `None`: If the trait object does not hold a value of type `T`.
501pub fn downcast_message<T: ActonMessage + 'static>(msg: &dyn ActonMessage) -> Option<&T> {
502 // Use the Any trait's downcast_ref method provided via ActonMessage's supertraits.
503 msg.as_any().downcast_ref::<T>()
504}
505
506// --- Internal Implementations ---
507// (Default, From, default_handler remain internal and undocumented)
508
509impl<State: Default + Send + Debug + 'static> From<ManagedAgent<Idle, State>>
510 for ManagedAgent<Started, State>
511{
512 fn from(value: ManagedAgent<Idle, State>) -> Self {
513 // Ensure cancellation_token is always present when transitioning to Started state
514 assert!(
515 value.cancellation_token.is_some(),
516 "Cannot transition to ManagedAgent<Started, State> without a cancellation_token"
517 );
518 // Move all fields from Idle state to Started state.
519 ManagedAgent::<Started, State> {
520 handle: value.handle,
521 parent: value.parent,
522 halt_signal: value.halt_signal,
523 id: value.id,
524 runtime: value.runtime,
525 model: value.model,
526 tracker: value.tracker,
527 inbox: value.inbox,
528 before_start: value.before_start,
529 after_start: value.after_start,
530 before_stop: value.before_stop,
531 after_stop: value.after_stop,
532 broker: value.broker,
533 message_handlers: value.message_handlers,
534 error_handler_map: value.error_handler_map, // transfer error handlers
535 cancellation_token: value.cancellation_token,
536 _actor_state: Default::default(),
537 }
538 }
539}
540
541impl<State: Default + Send + Debug + 'static> Default for ManagedAgent<Idle, State> {
542 fn default() -> Self {
543 let (outbox, inbox) = channel(255); // Default channel size
544 let id: Ern = Default::default();
545 let mut handle: crate::common::AgentHandle = Default::default();
546 handle.id = id.clone();
547 handle.outbox = outbox.clone();
548
549 ManagedAgent::<Idle, State> {
550 handle,
551 id,
552 inbox,
553 // Initialize lifecycle hooks with default no-op handlers.
554 before_start: Box::new(|_| default_handler()),
555 after_start: Box::new(|_| default_handler()),
556 before_stop: Box::new(|_| default_handler()),
557 after_stop: Box::new(|_| default_handler()),
558 model: State::default(),
559 broker: Default::default(),
560 error_handler_map: std::collections::HashMap::new(),
561 parent: Default::default(),
562 runtime: Default::default(),
563 halt_signal: Default::default(),
564 tracker: Default::default(),
565 cancellation_token: Default::default(),
566 message_handlers: Default::default(),
567 _actor_state: Default::default(),
568 }
569 }
570}
571
572// Default no-op async handler for lifecycle events.
573fn default_handler() -> FutureBox {
574 Box::pin(async {})
575}