acton_core/actor/managed_agent/
idle.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::TypeId;
18use std::fmt::Debug;
19use std::future::Future;
20use std::mem;
21
22use acton_ern::Ern;
23use tokio::sync::mpsc::channel;
24use tracing::*;
25
26use crate::actor::{AgentConfig, ManagedAgent, Started};
27use crate::common::{
28    ActonInner, AgentHandle, AgentRuntime, Envelope, FutureBox, OutboundEnvelope, ReactorItem,
29};
30use crate::message::MessageContext;
31use crate::prelude::ActonMessage;
32use crate::traits::AgentHandleInterface;
33
34/// Type-state marker for a [`ManagedAgent`] that has been configured but not yet started.
35///
36/// When a `ManagedAgent` is in the `Idle` state, it can be configured with message handlers
37/// (via [`ManagedAgent::act_on`]) and lifecycle hooks (e.g., [`ManagedAgent::before_start`],
38/// [`ManagedAgent::after_stop`]). Once configuration is complete, the agent can be
39/// transitioned to the [`Started`](super::started::Started) state by calling [`ManagedAgent::start`].
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] // Add common derives
41pub struct Idle;
42
43use crate::common::ErrorHandler;
44use std::collections::HashMap;
45
46impl<State: Default + Send + Debug + 'static> ManagedAgent<Idle, State> {
47    /// Registers an asynchronous message handler for a specific message type `M`.
48    ///
49    /// This method is called during the agent's configuration phase (while in the `Idle` state).
50    /// It associates a specific message type `M` with a closure (`message_processor`) that
51    /// will be executed when the agent receives a message of that type after it has started.
52    ///
53    /// The framework handles the necessary type erasure and downcasting internally. The
54    /// provided `message_processor` receives the agent (in the `Started` state) and a
55    /// [`MessageContext`] containing the concrete message and metadata.
56    ///
57    /// # Type Parameters
58    ///
59    /// *   `M`: The concrete message type this handler will process. Must implement
60    ///     [`ActonMessage`], `Clone`, `Send`, `Sync`, and be `'static`.
61    ///
62    /// # Arguments
63    ///
64    /// *   `message_processor`: An asynchronous closure that takes the agent (`&mut ManagedAgent<Started, State>`)
65    ///     and the message context (`&mut MessageContext<M>`) and returns a `Future`
66    ///     (specifically, a [`FutureBox`]). This closure contains the logic for handling messages of type `M`.
67    ///
68    /// # Returns
69    ///
70    /// Returns a mutable reference to `self` to allow for method chaining during configuration.
71    #[instrument(skip(self, message_processor), level = "debug")]
72    #[deprecated(
73        note = "act_on for handlers returning () will be deprecated in the next version. Use act_on_result for Result-returning handlers."
74    )]
75    pub fn act_on<M>(
76        &mut self,
77        message_processor: impl for<'a> Fn(&'a mut ManagedAgent<Started, State>, &'a mut MessageContext<M>) -> FutureBox
78            + Send
79            + Sync
80            + 'static,
81    ) -> &mut Self
82    where
83        M: ActonMessage + Clone + Send + Sync + 'static,
84    {
85        let type_id = TypeId::of::<M>();
86        trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding legacy message handler (will be deprecated)");
87        let handler_box = Box::new(
88            move |actor: &mut ManagedAgent<Started, State>, envelope: &mut Envelope| -> FutureBox {
89                if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
90                    trace!(
91                        "Downcast successful for message type: {}",
92                        std::any::type_name::<M>()
93                    );
94                    let mut msg_context = {
95                        let origin_envelope = OutboundEnvelope::new_with_recipient(
96                            envelope.reply_to.clone(),
97                            envelope.recipient.clone(),
98                            actor.handle.cancellation_token.clone(),
99                        );
100                        let reply_envelope = OutboundEnvelope::new_with_recipient(
101                            envelope.recipient.clone(),
102                            envelope.reply_to.clone(),
103                            actor.handle.cancellation_token.clone(),
104                        );
105                        MessageContext {
106                            message: concrete_msg.clone(),
107                            timestamp: envelope.timestamp,
108                            origin_envelope,
109                            reply_envelope,
110                        }
111                    };
112                    message_processor(actor, &mut msg_context)
113                } else {
114                    error!(
115                        type_name = std::any::type_name::<M>(),
116                        "Message handler called with incompatible message type (downcast failed)"
117                    );
118                    Box::pin(async {})
119                }
120            },
121        );
122        self.message_handlers
123            .insert(type_id, ReactorItem::FutureReactor(handler_box));
124        self
125    }
126
127    /// Registers an asynchronous message handler for a specific message type `M` that returns a Result (new style, preferred).
128    pub fn act_on_result<M, E, Fut>(
129        &mut self,
130        message_processor: impl for<'a> Fn(&'a mut ManagedAgent<Started, State>, &'a mut MessageContext<M>) -> Fut
131            + Send
132            + Sync
133            + 'static,
134    ) -> &mut Self
135    where
136        M: ActonMessage + Clone + Send + Sync + 'static,
137        E: std::error::Error + Send + Sync + 'static,
138        Fut: std::future::Future<Output = Result<(), E>> + Send + Sync + 'static,
139    {
140        let type_id = TypeId::of::<M>();
141        trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding Result-returning message handler");
142        let handler_box = Box::new(
143            move |actor: &mut ManagedAgent<Started, State>,
144                  envelope: &mut Envelope|
145                  -> crate::common::FutureBoxResult {
146                if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
147                    trace!(
148                        "Downcast successful for message type: {}",
149                        std::any::type_name::<M>()
150                    );
151                    let mut msg_context = {
152                        let origin_envelope = OutboundEnvelope::new_with_recipient(
153                            envelope.reply_to.clone(),
154                            envelope.recipient.clone(),
155                            actor.handle.cancellation_token.clone(),
156                        );
157                        let reply_envelope = OutboundEnvelope::new_with_recipient(
158                            envelope.recipient.clone(),
159                            envelope.reply_to.clone(),
160                            actor.handle.cancellation_token.clone(),
161                        );
162                        MessageContext {
163                            message: concrete_msg.clone(),
164                            timestamp: envelope.timestamp,
165                            origin_envelope,
166                            reply_envelope,
167                        }
168                    };
169                    let fut = message_processor(actor, &mut msg_context);
170                    Box::pin(async move {
171                        match fut.await {
172                            Ok(()) => Ok(()),
173                            Err(e) => Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>),
174                        }
175                    })
176                } else {
177                    error!(
178                        type_name = std::any::type_name::<M>(),
179                        "Result handler called with incompatible message type (downcast failed)"
180                    );
181                    Box::pin(async { Ok(()) })
182                }
183            },
184        );
185        self.message_handlers
186            .insert(type_id, ReactorItem::FutureReactorResult(handler_box));
187        self
188    }
189
190    /// Registers an asynchronous hook to be executed *after* the agent successfully starts its message loop.
191    ///
192    /// This hook is called once, shortly after the agent transitions to the `Started` state
193    /// and its main task begins processing messages. It receives an immutable reference
194    /// to the agent in the `Started` state.
195    ///
196    /// # Arguments
197    ///
198    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
199    ///
200    /// # Returns
201    ///
202    /// Returns a mutable reference to `self` for chaining.
203    pub fn after_start<F, Fut>(&mut self, f: F) -> &mut Self
204    where
205        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
206        Fut: Future<Output = ()> + Send + Sync + 'static,
207    {
208        self.after_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
209        self
210    }
211
212    /// Registers an asynchronous hook to be executed *before* the agent starts its message loop.
213    ///
214    /// This hook is called once, just before the agent's main task (`wake`) is spawned
215    /// during the `start` process. It receives an immutable reference to the agent,
216    /// technically still in the `Started` state contextually, though the loop hasn't begun.
217    ///
218    /// # Arguments
219    ///
220    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
221    ///
222    /// # Returns
223    ///
224    /// Returns a mutable reference to `self` for chaining.
225    pub fn before_start<F, Fut>(&mut self, f: F) -> &mut Self
226    where
227        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
228        Fut: Future<Output = ()> + Send + Sync + 'static,
229    {
230        self.before_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
231        self
232    }
233
234    /// Registers an asynchronous hook to be executed *after* the agent stops processing messages.
235    ///
236    /// This hook is called once when the agent's main loop terminates gracefully (e.g., upon
237    /// receiving a `Terminate` signal or when the inbox closes). It receives an immutable
238    /// reference to the agent in the `Started` state context.
239    ///
240    /// # Arguments
241    ///
242    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
243    ///
244    /// # Returns
245    ///
246    /// Returns a mutable reference to `self` for chaining.
247    pub fn after_stop<F, Fut>(&mut self, f: F) -> &mut Self
248    where
249        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
250        Fut: Future<Output = ()> + Send + Sync + 'static,
251    {
252        self.after_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
253        self
254    }
255
256    /// Registers an asynchronous hook to be executed *before* the agent stops processing messages.
257    ///
258    /// This hook is called once, just before the agent's main loop begins its shutdown sequence
259    /// (e.g., after receiving `Terminate` but before fully stopping). It receives an immutable
260    /// reference to the agent in the `Started` state.
261    ///
262    /// # Arguments
263    ///
264    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
265    ///
266    /// # Returns
267    ///
268    /// Returns a mutable reference to `self` for chaining.
269    pub fn before_stop<F, Fut>(&mut self, f: F) -> &mut Self
270    where
271        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
272        Fut: Future<Output = ()> + Send + Sync + 'static,
273    {
274        self.before_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
275        self
276    }
277
278    /// Creates the configuration for a new child agent under this agent's supervision.
279    ///
280    /// This method generates a `ManagedAgent<Idle, State>` instance pre-configured
281    /// to be a child of the current agent. It automatically derives a hierarchical
282    /// [`Ern`] for the child based on the parent's ID and the provided `name`.
283    /// The child inherits the parent's broker reference.
284    ///
285    /// The returned agent is in the `Idle` state and still needs to be configured
286    /// (e.g., with `act_on`, lifecycle hooks) and then started using its `start` method.
287    /// The parent agent typically calls `handle.supervise(child_handle)` after the child
288    /// is started to register it formally.
289    ///
290    /// # Arguments
291    ///
292    /// * `name`: The name segment for the child agent's [`Ern`].
293    ///
294    /// # Returns
295    ///
296    /// Returns a `Result` containing a new `ManagedAgent` instance for the child
297    /// in the `Idle` state, ready for further configuration.
298    ///
299    /// # Errors
300    ///
301    /// Returns an error if creating the child's `Ern` fails or if creating the
302    /// `AgentConfig` fails (e.g., parsing the parent ID).
303    #[instrument(skip(self))]
304    pub async fn create_child(&self, name: String) -> anyhow::Result<ManagedAgent<Idle, State>> {
305        // Configure the child with parent and broker references.
306        let config = AgentConfig::new(
307            Ern::with_root(name)?,               // Child's name segment
308            Some(self.handle.clone()),           // Parent handle
309            Some(self.runtime.broker().clone()), // Inherited broker handle
310        )?;
311        // Create the Idle agent using the internal constructor.
312        Ok(ManagedAgent::new(&Some(self.runtime().clone()), Some(config)).await)
313    }
314
315    // Internal constructor - not part of public API documentation
316    #[instrument]
317    pub(crate) async fn new(runtime: &Option<AgentRuntime>, config: Option<AgentConfig>) -> Self {
318        let mut managed_actor: ManagedAgent<Idle, State> = ManagedAgent::default();
319
320        if let Some(app) = runtime {
321            managed_actor.broker = app.0.broker.clone();
322            managed_actor.handle.broker = Box::new(Some(app.0.broker.clone()));
323            managed_actor.cancellation_token = Some(app.0.cancellation_token.child_token());
324        }
325
326        if let Some(config) = &config {
327            managed_actor.handle.id = config.id();
328            managed_actor.parent = config.parent().clone();
329            managed_actor.handle.broker = Box::new(config.get_broker().clone());
330            if let Some(broker) = config.get_broker().clone() {
331                managed_actor.broker = broker;
332            }
333        }
334
335        debug_assert!(
336            !managed_actor.inbox.is_closed(),
337            "Agent mailbox is closed in new"
338        );
339
340        trace!("NEW ACTOR: {}", &managed_actor.handle.id());
341
342        // Ensure runtime always exists; creating a new one here is an error.
343        assert!(
344            runtime.is_some(),
345            "AgentRuntime must be provided to ManagedAgent::new"
346        );
347        managed_actor.runtime = runtime.clone().unwrap();
348        managed_actor
349            .runtime
350            .0
351            .roots
352            .insert(managed_actor.handle.id(), managed_actor.handle.clone());
353
354        managed_actor.id = managed_actor.handle.id();
355
356        managed_actor
357    }
358
359    /// Starts the agent's processing loop and transitions it to the `Started` state.
360    ///
361    /// This method consumes the `ManagedAgent` in the `Idle` state. It performs the following actions:
362    /// 1.  Transitions the agent's type state from `Idle` to [`Started`](super::started::Started).
363    /// 2.  Executes the registered `before_start` lifecycle hook.
364    /// 3.  Spawns the agent's main asynchronous task (`wake`) which handles message processing.
365    /// 4.  Closes the agent's `TaskTracker` to signal that the main task has been spawned.
366    /// 5.  Returns the agent's [`AgentHandle`] for external interaction.
367    ///
368    /// After this method returns, the agent is running and ready to process messages sent to its handle.
369    ///
370    /// # Returns
371    ///
372    /// An [`AgentHandle`] that can be used to interact with the now-running agent.
373    #[instrument(skip(self))]
374    pub async fn start(mut self) -> AgentHandle {
375        trace!("Starting agent: {}", self.id());
376        trace!("Model state before start: {:?}", self.model);
377
378        // Take ownership of handlers before converting state.
379        let message_handlers = mem::take(&mut self.message_handlers);
380        let actor_ref = self.handle.clone(); // Clone handle before consuming self.
381
382        // Convert the agent to the Started state.
383        let active_actor: ManagedAgent<Started, State> = self.into();
384        // Leak the agent into a static reference for the spawned task.
385        // The task itself is responsible for managing the agent's lifetime.
386        let actor = Box::leak(Box::new(active_actor));
387
388        trace!("Executing before_start hook for agent: {}", actor.id());
389        (actor.before_start)(actor).await; // Execute before_start hook.
390
391        trace!("Spawning main task (wake) for agent: {}", actor.id());
392        // Spawn the main message processing loop.
393        actor_ref.tracker().spawn(actor.wake(message_handlers));
394        // Close the tracker to indicate the main task is launched.
395        actor_ref.tracker().close();
396
397        trace!("Agent {} started successfully.", actor_ref.id());
398        actor_ref // Return the handle.
399    }
400
401    /// Registers an asynchronous error handler for a specific error type `E`.
402    ///
403    /// This allows the agent to handle errors of type `E` by executing the given closure
404    /// whenever a message handler returns an error of this type.
405    ///
406    /// # Type Parameters
407    ///
408    /// * `E`: The concrete error type to handle. Must implement `std::error::Error` and be `'static`.
409    ///
410    /// # Arguments
411    /// * `error_handler`: The handler closure executed with agent, envelope, and error reference.
412    ///
413    /// # Returns
414    /// A mutable reference to `self` for chaining.
415    pub fn on_error<E>(
416        &mut self,
417        error_handler: impl for<'a, 'b> Fn(
418                &'a mut ManagedAgent<Started, State>,
419                &'b mut crate::message::Envelope,
420                &'b E,
421            ) -> crate::common::FutureBox
422            + Send
423            + Sync
424            + 'static,
425    ) -> &mut Self
426    where
427        E: std::error::Error + 'static,
428    {
429        use std::any::TypeId;
430        // Wrap handler for dynamic dispatch
431        use std::sync::Arc;
432        let handler_box: Arc<Box<crate::common::ErrorHandler<State>>> =
433            Arc::new(Box::new(move |agent, envelope, err| {
434                // Downcast the error to &E
435                if let Some(specific) = err.downcast_ref::<E>() {
436                    error_handler(agent, envelope, specific)
437                } else {
438                    // If type doesn't match, do nothing
439                    Box::pin(async {})
440                }
441            }));
442        self.error_handler_map
443            .insert(TypeId::of::<E>(), handler_box);
444        self
445    }
446}
447
448// --- Utility Function ---
449
450/// Attempts to downcast an `ActonMessage` trait object to a concrete type `T`.
451///
452/// This utility function is used internally by the message dispatch mechanism
453/// (specifically within the closure generated by `act_on`) to safely convert
454/// a type-erased message (`&dyn ActonMessage`) back into its original concrete type (`&T`).
455///
456/// # Type Parameters
457///
458/// * `T`: The concrete message type to attempt downcasting to. Must be `'static`
459///   and implement [`ActonMessage`].
460///
461/// # Arguments
462///
463/// * `msg`: A reference to the `ActonMessage` trait object.
464///
465/// # Returns
466///
467/// * `Some(&T)`: If the trait object `msg` actually holds a value of type `T`.
468/// * `None`: If the trait object does not hold a value of type `T`.
469pub fn downcast_message<T: ActonMessage + 'static>(msg: &dyn ActonMessage) -> Option<&T> {
470    // Use the Any trait's downcast_ref method provided via ActonMessage's supertraits.
471    msg.as_any().downcast_ref::<T>()
472}
473
474// --- Internal Implementations ---
475// (Default, From, default_handler remain internal and undocumented)
476
477impl<State: Default + Send + Debug + 'static> From<ManagedAgent<Idle, State>>
478    for ManagedAgent<Started, State>
479{
480    fn from(value: ManagedAgent<Idle, State>) -> Self {
481        // Ensure cancellation_token is always present when transitioning to Started state
482        assert!(
483            value.cancellation_token.is_some(),
484            "Cannot transition to ManagedAgent<Started, State> without a cancellation_token"
485        );
486        // Move all fields from Idle state to Started state.
487        ManagedAgent::<Started, State> {
488            handle: value.handle,
489            parent: value.parent,
490            halt_signal: value.halt_signal,
491            id: value.id,
492            runtime: value.runtime,
493            model: value.model,
494            tracker: value.tracker,
495            inbox: value.inbox,
496            before_start: value.before_start,
497            after_start: value.after_start,
498            before_stop: value.before_stop,
499            after_stop: value.after_stop,
500            broker: value.broker,
501            message_handlers: value.message_handlers,
502            error_handler_map: value.error_handler_map, // transfer error handlers
503            cancellation_token: value.cancellation_token,
504            _actor_state: Default::default(),
505        }
506    }
507}
508
509impl<State: Default + Send + Debug + 'static> Default for ManagedAgent<Idle, State> {
510    fn default() -> Self {
511        let (outbox, inbox) = channel(255); // Default channel size
512        let id: Ern = Default::default();
513        let mut handle: crate::common::AgentHandle = Default::default();
514        handle.id = id.clone();
515        handle.outbox = outbox.clone();
516
517        ManagedAgent::<Idle, State> {
518            handle,
519            id,
520            inbox,
521            // Initialize lifecycle hooks with default no-op handlers.
522            before_start: Box::new(|_| default_handler()),
523            after_start: Box::new(|_| default_handler()),
524            before_stop: Box::new(|_| default_handler()),
525            after_stop: Box::new(|_| default_handler()),
526            model: State::default(),
527            broker: Default::default(),
528            error_handler_map: std::collections::HashMap::new(),
529            parent: Default::default(),
530            runtime: Default::default(),
531            halt_signal: Default::default(),
532            tracker: Default::default(),
533            cancellation_token: Default::default(),
534            message_handlers: Default::default(),
535            _actor_state: Default::default(),
536        }
537    }
538}
539
540// Default no-op async handler for lifecycle events.
541fn default_handler() -> FutureBox {
542    Box::pin(async {})
543}