acton_core/actor/managed_agent/
idle.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::TypeId;
18use std::fmt::Debug;
19use std::future::Future;
20use std::mem;
21
22use acton_ern::Ern;
23use tokio::sync::mpsc::channel;
24use tracing::*;
25
26use crate::actor::{AgentConfig, ManagedAgent, Started};
27use crate::common::{
28    ActonInner, AgentHandle, AgentRuntime, Envelope, FutureBox, OutboundEnvelope, ReactorItem,
29};
30use crate::message::MessageContext;
31use crate::prelude::ActonMessage;
32use crate::traits::AgentHandleInterface;
33
34/// Type-state marker for a [`ManagedAgent`] that has been configured but not yet started.
35///
36/// When a `ManagedAgent` is in the `Idle` state, it can be configured with message handlers
37/// (via [`ManagedAgent::act_on`]) and lifecycle hooks (e.g., [`ManagedAgent::before_start`],
38/// [`ManagedAgent::after_stop`]). Once configuration is complete, the agent can be
39/// transitioned to the [`Started`](super::started::Started) state by calling [`ManagedAgent::start`].
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] // Add common derives
41pub struct Idle;
42
43use crate::common::ErrorHandler;
44use std::collections::HashMap;
45
46impl<State: Default + Send + Debug + 'static> ManagedAgent<Idle, State> {
47    /// Registers an asynchronous message handler for a specific message type `M`.
48    ///
49    /// This method is called during the agent's configuration phase (while in the `Idle` state).
50    /// It associates a specific message type `M` with a closure (`message_processor`) that
51    /// will be executed when the agent receives a message of that type after it has started.
52    ///
53    /// The framework handles the necessary type erasure and downcasting internally. The
54    /// provided `message_processor` receives the agent (in the `Started` state) and a
55    /// [`MessageContext`] containing the concrete message and metadata.
56    ///
57    /// # Type Parameters
58    ///
59    /// *   `M`: The concrete message type this handler will process. Must implement
60    ///     [`ActonMessage`], `Clone`, `Send`, `Sync`, and be `'static`.
61    ///
62    /// # Arguments
63    ///
64    /// *   `message_processor`: An asynchronous closure that takes the agent (`&mut ManagedAgent<Started, State>`)
65    ///     and the message context (`&mut MessageContext<M>`) and returns a `Future`
66    ///     (specifically, a [`FutureBox`]). This closure contains the logic for handling messages of type `M`.
67    ///
68    /// # Returns
69    ///
70    /// Returns a mutable reference to `self` to allow for method chaining during configuration.
71    #[instrument(skip(self, message_processor), level = "debug")]
72    #[deprecated(
73        note = "act_on for handlers returning () will be deprecated in the next version. Use act_on_result for Result-returning handlers."
74    )]
75    pub fn act_on<M>(
76        &mut self,
77        message_processor: impl for<'a> Fn(&'a mut ManagedAgent<Started, State>, &'a mut MessageContext<M>) -> FutureBox
78            + Send
79            + Sync
80            + 'static,
81    ) -> &mut Self
82    where
83        M: ActonMessage + Clone + Send + Sync + 'static,
84    {
85        let type_id = TypeId::of::<M>();
86        trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding legacy message handler (will be deprecated)");
87        let handler_box = Box::new(
88            move |actor: &mut ManagedAgent<Started, State>, envelope: &mut Envelope| -> FutureBox {
89                if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
90                    trace!(
91                        "Downcast successful for message type: {}",
92                        std::any::type_name::<M>()
93                    );
94                    let mut msg_context = {
95                        let origin_envelope = OutboundEnvelope::new_with_recipient(
96                            envelope.reply_to.clone(),
97                            envelope.recipient.clone(),
98                        );
99                        let reply_envelope = OutboundEnvelope::new_with_recipient(
100                            envelope.recipient.clone(),
101                            envelope.reply_to.clone(),
102                        );
103                        MessageContext {
104                            message: concrete_msg.clone(),
105                            timestamp: envelope.timestamp,
106                            origin_envelope,
107                            reply_envelope,
108                        }
109                    };
110                    message_processor(actor, &mut msg_context)
111                } else {
112                    error!(
113                        type_name = std::any::type_name::<M>(),
114                        "Message handler called with incompatible message type (downcast failed)"
115                    );
116                    Box::pin(async {})
117                }
118            },
119        );
120        self.message_handlers
121            .insert(type_id, ReactorItem::FutureReactor(handler_box));
122        self
123    }
124
125    /// Registers an asynchronous message handler for a specific message type `M` that returns a Result (new style, preferred).
126    pub fn act_on_result<M, E, Fut>(
127        &mut self,
128        message_processor: impl for<'a> Fn(&'a mut ManagedAgent<Started, State>, &'a mut MessageContext<M>) -> Fut
129            + Send
130            + Sync
131            + 'static,
132    ) -> &mut Self
133    where
134        M: ActonMessage + Clone + Send + Sync + 'static,
135        E: std::error::Error + Send + Sync + 'static,
136        Fut: std::future::Future<Output = Result<(), E>> + Send + Sync + 'static,
137    {
138        let type_id = TypeId::of::<M>();
139        trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding Result-returning message handler");
140        let handler_box = Box::new(
141            move |actor: &mut ManagedAgent<Started, State>,
142                  envelope: &mut Envelope|
143                  -> crate::common::FutureBoxResult {
144                if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
145                    trace!(
146                        "Downcast successful for message type: {}",
147                        std::any::type_name::<M>()
148                    );
149                    let mut msg_context = {
150                        let origin_envelope = OutboundEnvelope::new_with_recipient(
151                            envelope.reply_to.clone(),
152                            envelope.recipient.clone(),
153                        );
154                        let reply_envelope = OutboundEnvelope::new_with_recipient(
155                            envelope.recipient.clone(),
156                            envelope.reply_to.clone(),
157                        );
158                        MessageContext {
159                            message: concrete_msg.clone(),
160                            timestamp: envelope.timestamp,
161                            origin_envelope,
162                            reply_envelope,
163                        }
164                    };
165                    let fut = message_processor(actor, &mut msg_context);
166                    Box::pin(async move {
167                        match fut.await {
168                            Ok(()) => Ok(()),
169                            Err(e) => Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>),
170                        }
171                    })
172                } else {
173                    error!(
174                        type_name = std::any::type_name::<M>(),
175                        "Result handler called with incompatible message type (downcast failed)"
176                    );
177                    Box::pin(async { Ok(()) })
178                }
179            },
180        );
181        self.message_handlers
182            .insert(type_id, ReactorItem::FutureReactorResult(handler_box));
183        self
184    }
185
186    /// Registers an asynchronous hook to be executed *after* the agent successfully starts its message loop.
187    ///
188    /// This hook is called once, shortly after the agent transitions to the `Started` state
189    /// and its main task begins processing messages. It receives an immutable reference
190    /// to the agent in the `Started` state.
191    ///
192    /// # Arguments
193    ///
194    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
195    ///
196    /// # Returns
197    ///
198    /// Returns a mutable reference to `self` for chaining.
199    pub fn after_start<F, Fut>(&mut self, f: F) -> &mut Self
200    where
201        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
202        Fut: Future<Output = ()> + Send + Sync + 'static,
203    {
204        self.after_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
205        self
206    }
207
208    /// Registers an asynchronous hook to be executed *before* the agent starts its message loop.
209    ///
210    /// This hook is called once, just before the agent's main task (`wake`) is spawned
211    /// during the `start` process. It receives an immutable reference to the agent,
212    /// technically still in the `Started` state contextually, though the loop hasn't begun.
213    ///
214    /// # Arguments
215    ///
216    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
217    ///
218    /// # Returns
219    ///
220    /// Returns a mutable reference to `self` for chaining.
221    pub fn before_start<F, Fut>(&mut self, f: F) -> &mut Self
222    where
223        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
224        Fut: Future<Output = ()> + Send + Sync + 'static,
225    {
226        self.before_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
227        self
228    }
229
230    /// Registers an asynchronous hook to be executed *after* the agent stops processing messages.
231    ///
232    /// This hook is called once when the agent's main loop terminates gracefully (e.g., upon
233    /// receiving a `Terminate` signal or when the inbox closes). It receives an immutable
234    /// reference to the agent in the `Started` state context.
235    ///
236    /// # Arguments
237    ///
238    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
239    ///
240    /// # Returns
241    ///
242    /// Returns a mutable reference to `self` for chaining.
243    pub fn after_stop<F, Fut>(&mut self, f: F) -> &mut Self
244    where
245        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
246        Fut: Future<Output = ()> + Send + Sync + 'static,
247    {
248        self.after_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
249        self
250    }
251
252    /// Registers an asynchronous hook to be executed *before* the agent stops processing messages.
253    ///
254    /// This hook is called once, just before the agent's main loop begins its shutdown sequence
255    /// (e.g., after receiving `Terminate` but before fully stopping). It receives an immutable
256    /// reference to the agent in the `Started` state.
257    ///
258    /// # Arguments
259    ///
260    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
261    ///
262    /// # Returns
263    ///
264    /// Returns a mutable reference to `self` for chaining.
265    pub fn before_stop<F, Fut>(&mut self, f: F) -> &mut Self
266    where
267        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
268        Fut: Future<Output = ()> + Send + Sync + 'static,
269    {
270        self.before_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
271        self
272    }
273
274    /// Creates the configuration for a new child agent under this agent's supervision.
275    ///
276    /// This method generates a `ManagedAgent<Idle, State>` instance pre-configured
277    /// to be a child of the current agent. It automatically derives a hierarchical
278    /// [`Ern`] for the child based on the parent's ID and the provided `name`.
279    /// The child inherits the parent's broker reference.
280    ///
281    /// The returned agent is in the `Idle` state and still needs to be configured
282    /// (e.g., with `act_on`, lifecycle hooks) and then started using its `start` method.
283    /// The parent agent typically calls `handle.supervise(child_handle)` after the child
284    /// is started to register it formally.
285    ///
286    /// # Arguments
287    ///
288    /// * `name`: The name segment for the child agent's [`Ern`].
289    ///
290    /// # Returns
291    ///
292    /// Returns a `Result` containing a new `ManagedAgent` instance for the child
293    /// in the `Idle` state, ready for further configuration.
294    ///
295    /// # Errors
296    ///
297    /// Returns an error if creating the child's `Ern` fails or if creating the
298    /// `AgentConfig` fails (e.g., parsing the parent ID).
299    #[instrument(skip(self))]
300    pub async fn create_child(&self, name: String) -> anyhow::Result<ManagedAgent<Idle, State>> {
301        // Configure the child with parent and broker references.
302        let config = AgentConfig::new(
303            Ern::with_root(name)?,               // Child's name segment
304            Some(self.handle.clone()),           // Parent handle
305            Some(self.runtime.broker().clone()), // Inherited broker handle
306        )?;
307        // Create the Idle agent using the internal constructor.
308        Ok(ManagedAgent::new(&Some(self.runtime().clone()), Some(config)).await)
309    }
310
311    // Internal constructor - not part of public API documentation
312    #[instrument]
313    pub(crate) async fn new(runtime: &Option<AgentRuntime>, config: Option<AgentConfig>) -> Self {
314        let mut managed_actor: ManagedAgent<Idle, State> = ManagedAgent::default();
315
316        if let Some(app) = runtime {
317            managed_actor.broker = app.0.broker.clone();
318            managed_actor.handle.broker = Box::new(Some(app.0.broker.clone()));
319        }
320
321        if let Some(config) = &config {
322            managed_actor.handle.id = config.id();
323            managed_actor.parent = config.parent().clone();
324            managed_actor.handle.broker = Box::new(config.get_broker().clone());
325            if let Some(broker) = config.get_broker().clone() {
326                managed_actor.broker = broker;
327            }
328        }
329
330        debug_assert!(
331            !managed_actor.inbox.is_closed(),
332            "Agent mailbox is closed in new"
333        );
334
335        trace!("NEW ACTOR: {}", &managed_actor.handle.id());
336
337        managed_actor.runtime = runtime.clone().unwrap_or_else(|| {
338            AgentRuntime(ActonInner {
339                broker: managed_actor.handle.broker.clone().unwrap_or_default(),
340                ..Default::default()
341            })
342        });
343
344        managed_actor.id = managed_actor.handle.id();
345
346        managed_actor
347    }
348
349    /// Starts the agent's processing loop and transitions it to the `Started` state.
350    ///
351    /// This method consumes the `ManagedAgent` in the `Idle` state. It performs the following actions:
352    /// 1.  Transitions the agent's type state from `Idle` to [`Started`](super::started::Started).
353    /// 2.  Executes the registered `before_start` lifecycle hook.
354    /// 3.  Spawns the agent's main asynchronous task (`wake`) which handles message processing.
355    /// 4.  Closes the agent's `TaskTracker` to signal that the main task has been spawned.
356    /// 5.  Returns the agent's [`AgentHandle`] for external interaction.
357    ///
358    /// After this method returns, the agent is running and ready to process messages sent to its handle.
359    ///
360    /// # Returns
361    ///
362    /// An [`AgentHandle`] that can be used to interact with the now-running agent.
363    #[instrument(skip(self))]
364    pub async fn start(mut self) -> AgentHandle {
365        trace!("Starting agent: {}", self.id());
366        trace!("Model state before start: {:?}", self.model);
367
368        // Take ownership of handlers before converting state.
369        let message_handlers = mem::take(&mut self.message_handlers);
370        let actor_ref = self.handle.clone(); // Clone handle before consuming self.
371
372        // Convert the agent to the Started state.
373        let active_actor: ManagedAgent<Started, State> = self.into();
374        // Leak the agent into a static reference for the spawned task.
375        // The task itself is responsible for managing the agent's lifetime.
376        let actor = Box::leak(Box::new(active_actor));
377
378        trace!("Executing before_start hook for agent: {}", actor.id());
379        (actor.before_start)(actor).await; // Execute before_start hook.
380
381        trace!("Spawning main task (wake) for agent: {}", actor.id());
382        // Spawn the main message processing loop.
383        actor_ref.tracker().spawn(actor.wake(message_handlers));
384        // Close the tracker to indicate the main task is launched.
385        actor_ref.tracker().close();
386
387        trace!("Agent {} started successfully.", actor_ref.id());
388        actor_ref // Return the handle.
389    }
390
391    /// Registers an asynchronous error handler for a specific error type `E`.
392    ///
393    /// This allows the agent to handle errors of type `E` by executing the given closure
394    /// whenever a message handler returns an error of this type.
395    ///
396    /// # Type Parameters
397    ///
398    /// * `E`: The concrete error type to handle. Must implement `std::error::Error` and be `'static`.
399    ///
400    /// # Arguments
401    /// * `error_handler`: The handler closure executed with agent, envelope, and error reference.
402    ///
403    /// # Returns
404    /// A mutable reference to `self` for chaining.
405    pub fn on_error<E>(
406        &mut self,
407        error_handler: impl for<'a, 'b> Fn(
408                &'a mut ManagedAgent<Started, State>,
409                &'b mut crate::message::Envelope,
410                &'b E,
411            ) -> crate::common::FutureBox
412            + Send
413            + Sync
414            + 'static,
415    ) -> &mut Self
416    where
417        E: std::error::Error + 'static,
418    {
419        use std::any::TypeId;
420        // Wrap handler for dynamic dispatch
421        use std::sync::Arc;
422        let handler_box: Arc<Box<crate::common::ErrorHandler<State>>> =
423            Arc::new(Box::new(move |agent, envelope, err| {
424                // Downcast the error to &E
425                if let Some(specific) = err.downcast_ref::<E>() {
426                    error_handler(agent, envelope, specific)
427                } else {
428                    // If type doesn't match, do nothing
429                    Box::pin(async {})
430                }
431            }));
432        self.error_handler_map
433            .insert(TypeId::of::<E>(), handler_box);
434        self
435    }
436}
437
438// --- Utility Function ---
439
440/// Attempts to downcast an `ActonMessage` trait object to a concrete type `T`.
441///
442/// This utility function is used internally by the message dispatch mechanism
443/// (specifically within the closure generated by `act_on`) to safely convert
444/// a type-erased message (`&dyn ActonMessage`) back into its original concrete type (`&T`).
445///
446/// # Type Parameters
447///
448/// * `T`: The concrete message type to attempt downcasting to. Must be `'static`
449///   and implement [`ActonMessage`].
450///
451/// # Arguments
452///
453/// * `msg`: A reference to the `ActonMessage` trait object.
454///
455/// # Returns
456///
457/// * `Some(&T)`: If the trait object `msg` actually holds a value of type `T`.
458/// * `None`: If the trait object does not hold a value of type `T`.
459pub fn downcast_message<T: ActonMessage + 'static>(msg: &dyn ActonMessage) -> Option<&T> {
460    // Use the Any trait's downcast_ref method provided via ActonMessage's supertraits.
461    msg.as_any().downcast_ref::<T>()
462}
463
464// --- Internal Implementations ---
465// (Default, From, default_handler remain internal and undocumented)
466
467impl<State: Default + Send + Debug + 'static> From<ManagedAgent<Idle, State>>
468    for ManagedAgent<Started, State>
469{
470    fn from(value: ManagedAgent<Idle, State>) -> Self {
471        // Move all fields from Idle state to Started state.
472        ManagedAgent::<Started, State> {
473            handle: value.handle,
474            parent: value.parent,
475            halt_signal: value.halt_signal,
476            id: value.id,
477            runtime: value.runtime,
478            model: value.model,
479            tracker: value.tracker,
480            inbox: value.inbox,
481            before_start: value.before_start,
482            after_start: value.after_start,
483            before_stop: value.before_stop,
484            after_stop: value.after_stop,
485            broker: value.broker,
486            message_handlers: value.message_handlers,
487            error_handler_map: value.error_handler_map, // transfer error handlers
488            _actor_state: Default::default(),
489        }
490    }
491}
492
493impl<State: Default + Send + Debug + 'static> Default for ManagedAgent<Idle, State> {
494    fn default() -> Self {
495        let (outbox, inbox) = channel(255); // Default channel size
496        let id: Ern = Default::default();
497        let mut handle: crate::common::AgentHandle = Default::default();
498        handle.id = id.clone();
499        handle.outbox = outbox.clone();
500
501        ManagedAgent::<Idle, State> {
502            handle,
503            id,
504            inbox,
505            // Initialize lifecycle hooks with default no-op handlers.
506            before_start: Box::new(|_| default_handler()),
507            after_start: Box::new(|_| default_handler()),
508            before_stop: Box::new(|_| default_handler()),
509            after_stop: Box::new(|_| default_handler()),
510            model: State::default(),
511            broker: Default::default(),
512            error_handler_map: std::collections::HashMap::new(),
513            parent: Default::default(),
514            runtime: Default::default(),
515            halt_signal: Default::default(),
516            tracker: Default::default(),
517            message_handlers: Default::default(),
518            _actor_state: Default::default(),
519        }
520    }
521}
522
523// Default no-op async handler for lifecycle events.
524fn default_handler() -> FutureBox {
525    Box::pin(async {})
526}