acton_core/actor/managed_agent/
idle.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::TypeId;
18
19use crate::traits::ActonMessageReply;
20use std::fmt::Debug;
21use std::future::Future;
22use std::mem;
23
24use acton_ern::Ern;
25use tokio::sync::mpsc::channel;
26use tracing::*;
27
28use crate::actor::{AgentConfig, ManagedAgent, Started};
29use crate::common::{
30    AgentHandle, AgentRuntime, Envelope, FutureBox, OutboundEnvelope, ReactorItem,
31};
32use crate::message::MessageContext;
33use crate::prelude::ActonMessage;
34use crate::traits::AgentHandleInterface;
35
36/// Type-state marker for a [`ManagedAgent`] that has been configured but not yet started.
37///
38/// When a `ManagedAgent` is in the `Idle` state, it can be configured with message handlers
39/// (via [`ManagedAgent::act_on`]) and lifecycle hooks (e.g., [`ManagedAgent::before_start`],
40/// [`ManagedAgent::after_stop`]). Once configuration is complete, the agent can be
41/// transitioned to the [`Started`](super::started::Started) state by calling [`ManagedAgent::start`].
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] // Add common derives
43pub struct Idle;
44
45
46impl<State: Default + Send + Debug + 'static> ManagedAgent<Idle, State> {
47    /// Registers an asynchronous message handler for a specific message type `M`.
48    ///
49    /// This method is called during the agent's configuration phase (while in the `Idle` state).
50    /// It associates a specific message type `M` with a closure (`message_processor`) that
51    /// will be executed when the agent receives a message of that type after it has started.
52    ///
53    /// The framework handles the necessary type erasure and downcasting internally. The
54    /// provided `message_processor` receives the agent (in the `Started` state) and a
55    /// [`MessageContext`] containing the concrete message and metadata.
56    ///
57    /// # Type Parameters
58    ///
59    /// *   `M`: The concrete message type this handler will process. Must implement
60    ///     [`ActonMessage`], `Clone`, `Send`, `Sync`, and be `'static`.
61    ///
62    /// # Arguments
63    ///
64    /// *   `message_processor`: An asynchronous closure that takes the agent (`&mut ManagedAgent<Started, State>`)
65    ///     and the message context (`&mut MessageContext<M>`) and returns a `Future`
66    ///     (specifically, a [`FutureBox`]). This closure contains the logic for handling messages of type `M`.
67    ///
68    /// # Returns
69    ///
70    /// Returns a mutable reference to `self` to allow for method chaining during configuration.
71    #[instrument(skip(self, message_processor), level = "debug")]
72    pub fn act_on<M>(
73        &mut self,
74        message_processor: impl for<'a> Fn(&'a mut ManagedAgent<Started, State>, &'a mut MessageContext<M>) -> FutureBox
75            + Send
76            + Sync
77            + 'static,
78    ) -> &mut Self
79    where
80        M: ActonMessage + Clone + Send + Sync + 'static,
81    {
82        let type_id = TypeId::of::<M>();
83        trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding legacy message handler (will be deprecated)");
84        let handler_box = Box::new(
85            move |actor: &mut ManagedAgent<Started, State>, envelope: &mut Envelope| -> FutureBox {
86                if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
87                    trace!(
88                        "Downcast successful for message type: {}",
89                        std::any::type_name::<M>()
90                    );
91                    let mut msg_context = {
92                        let origin_envelope = OutboundEnvelope::new_with_recipient(
93                            envelope.reply_to.clone(),
94                            envelope.recipient.clone(),
95                            actor.handle.cancellation_token.clone(),
96                        );
97                        let reply_envelope = OutboundEnvelope::new_with_recipient(
98                            envelope.recipient.clone(),
99                            envelope.reply_to.clone(),
100                            actor.handle.cancellation_token.clone(),
101                        );
102                        MessageContext {
103                            message: concrete_msg.clone(),
104                            timestamp: envelope.timestamp,
105                            origin_envelope,
106                            reply_envelope,
107                        }
108                    };
109                    message_processor(actor, &mut msg_context)
110                } else {
111                    error!(
112                        type_name = std::any::type_name::<M>(),
113                        "Message handler called with incompatible message type (downcast failed)"
114                    );
115                    Box::pin(async {})
116                }
117            },
118        );
119        self.message_handlers
120            .insert(type_id, ReactorItem::FutureReactor(handler_box));
121        self
122    }
123
124    /// Registers an asynchronous error handler for a specific error type `E`.
125    ///
126    /// This allows the agent to handle errors of type `E` by executing the given closure
127    /// whenever a message handler returns an error of this type.
128    ///
129    /// # Type Parameters
130    ///
131    /// * `E`: The concrete error type to handle. Must implement `std::error::Error` and be `'static`.
132    ///
133    /// # Arguments
134    /// * `error_handler`: The handler closure executed with agent, envelope, and error reference.
135    ///
136    /// # Returns
137    /// A mutable reference to `self` for chaining.
138    pub fn on_error<M, E>(
139        &mut self,
140        error_handler: impl for<'a, 'b> Fn(
141                &'a mut ManagedAgent<Started, State>,
142                &'b mut MessageContext<M>,
143                &'b E,
144            ) -> crate::common::FutureBox
145            + Send
146            + Sync
147            + 'static,
148    ) -> &mut Self
149    where
150        M: ActonMessage + Clone + Send + Sync + 'static,
151        E: std::error::Error + 'static,
152    {
153        use std::any::TypeId;
154        let message_type_id = TypeId::of::<M>();
155        let error_type_id = TypeId::of::<E>();
156
157        // Wrap handler for dynamic dispatch
158        let handler_box: Box<crate::common::ErrorHandler<State>> =
159            Box::new(move |agent, envelope, err| {
160                if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
161                    // Downcast the error to &E
162                    if let Some(specific_err) = err.downcast_ref::<E>() {
163                        let mut msg_context = {
164                            let origin_envelope = OutboundEnvelope::new_with_recipient(
165                                envelope.reply_to.clone(),
166                                envelope.recipient.clone(),
167                                agent.handle.cancellation_token.clone(),
168                            );
169                            let reply_envelope = OutboundEnvelope::new_with_recipient(
170                                envelope.recipient.clone(),
171                                envelope.reply_to.clone(),
172                                agent.handle.cancellation_token.clone(),
173                            );
174                            MessageContext {
175                                message: concrete_msg.clone(),
176                                timestamp: envelope.timestamp,
177                                origin_envelope,
178                                reply_envelope,
179                            }
180                        };
181                        error_handler(agent, &mut msg_context, specific_err)
182                    } else {
183                        // If type doesn't match, do nothing
184                        Box::pin(async {})
185                    }
186                } else {
187                    // If type doesn't match, do nothing
188                    Box::pin(async {})
189                }
190            });
191        self.error_handler_map
192            .insert((message_type_id, error_type_id), handler_box);
193        self
194    }
195    /// Registers an asynchronous message handler for a specific message type `M` that returns a Result (new style, preferred).
196    pub fn act_on_fallible<M, T, E>(
197        &mut self,
198        message_processor: impl for<'a> Fn(
199                &'a mut ManagedAgent<Started, State>,
200                &'a mut MessageContext<M>,
201            ) -> std::pin::Pin<
202                Box<dyn std::future::Future<Output = Result<T, E>> + Send + Sync + 'static>,
203            > + Send
204            + Sync
205            + 'static,
206    ) -> &mut Self
207    where
208        M: ActonMessage + Clone + Send + Sync + 'static,
209        T: ActonMessageReply + 'static,
210        E: std::error::Error + Send + Sync + 'static,
211    {
212        let type_id = TypeId::of::<M>();
213        trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding Result-returning message handler");
214        let handler_box = Box::new(
215            move |actor: &mut ManagedAgent<Started, State>,
216                  envelope: &mut Envelope|
217                  -> crate::common::FutureBoxResult {
218                if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
219                    trace!(
220                        "Downcast successful for message type: {}",
221                        std::any::type_name::<M>()
222                    );
223                    let mut msg_context = {
224                        let origin_envelope = OutboundEnvelope::new_with_recipient(
225                            envelope.reply_to.clone(),
226                            envelope.recipient.clone(),
227                            actor.handle.cancellation_token.clone(),
228                        );
229                        let reply_envelope = OutboundEnvelope::new_with_recipient(
230                            envelope.recipient.clone(),
231                            envelope.reply_to.clone(),
232                            actor.handle.cancellation_token.clone(),
233                        );
234                        MessageContext {
235                            message: concrete_msg.clone(),
236                            timestamp: envelope.timestamp,
237                            origin_envelope,
238                            reply_envelope,
239                        }
240                    };
241                    let fut = message_processor(actor, &mut msg_context);
242                    Box::pin(async move {
243                        match fut.await {
244                            Ok(val) => Ok(Box::new(val) as Box<dyn ActonMessageReply + Send>),
245                            Err(e) => {
246                                let error_type_id = TypeId::of::<E>();
247                                Err((
248                                    Box::new(e) as Box<dyn std::error::Error + Send + Sync>,
249                                    error_type_id,
250                                ))
251                            }
252                        }
253                    })
254                } else {
255                    error!(
256                        type_name = std::any::type_name::<M>(),
257                        "Result handler called with incompatible message type (downcast failed)"
258                    );
259                    Box::pin(async { Ok(Box::new(()) as Box<dyn ActonMessageReply + Send>) })
260                }
261            },
262        );
263        self.message_handlers
264            .insert(type_id, ReactorItem::FutureReactorResult(handler_box));
265        self
266    }
267
268    /// Registers an asynchronous hook to be executed *after* the agent successfully starts its message loop.
269    ///
270    /// This hook is called once, shortly after the agent transitions to the `Started` state
271    /// and its main task begins processing messages. It receives an immutable reference
272    /// to the agent in the `Started` state.
273    ///
274    /// # Arguments
275    ///
276    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
277    ///
278    /// # Returns
279    ///
280    /// Returns a mutable reference to `self` for chaining.
281    pub fn after_start<F, Fut>(&mut self, f: F) -> &mut Self
282    where
283        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
284        Fut: Future<Output = ()> + Send + Sync + 'static,
285    {
286        self.after_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
287        self
288    }
289
290    /// Registers an asynchronous hook to be executed *before* the agent starts its message loop.
291    ///
292    /// This hook is called once, just before the agent's main task (`wake`) is spawned
293    /// during the `start` process. It receives an immutable reference to the agent,
294    /// technically still in the `Started` state contextually, though the loop hasn't begun.
295    ///
296    /// # Arguments
297    ///
298    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
299    ///
300    /// # Returns
301    ///
302    /// Returns a mutable reference to `self` for chaining.
303    pub fn before_start<F, Fut>(&mut self, f: F) -> &mut Self
304    where
305        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
306        Fut: Future<Output = ()> + Send + Sync + 'static,
307    {
308        self.before_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
309        self
310    }
311
312    /// Registers an asynchronous hook to be executed *after* the agent stops processing messages.
313    ///
314    /// This hook is called once when the agent's main loop terminates gracefully (e.g., upon
315    /// receiving a `Terminate` signal or when the inbox closes). It receives an immutable
316    /// reference to the agent in the `Started` state context.
317    ///
318    /// # Arguments
319    ///
320    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
321    ///
322    /// # Returns
323    ///
324    /// Returns a mutable reference to `self` for chaining.
325    pub fn after_stop<F, Fut>(&mut self, f: F) -> &mut Self
326    where
327        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
328        Fut: Future<Output = ()> + Send + Sync + 'static,
329    {
330        self.after_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
331        self
332    }
333
334    /// Registers an asynchronous hook to be executed *before* the agent stops processing messages.
335    ///
336    /// This hook is called once, just before the agent's main loop begins its shutdown sequence
337    /// (e.g., after receiving `Terminate` but before fully stopping). It receives an immutable
338    /// reference to the agent in the `Started` state.
339    ///
340    /// # Arguments
341    ///
342    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
343    ///
344    /// # Returns
345    ///
346    /// Returns a mutable reference to `self` for chaining.
347    pub fn before_stop<F, Fut>(&mut self, f: F) -> &mut Self
348    where
349        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
350        Fut: Future<Output = ()> + Send + Sync + 'static,
351    {
352        self.before_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
353        self
354    }
355
356    /// Creates the configuration for a new child agent under this agent's supervision.
357    ///
358    /// This method generates a `ManagedAgent<Idle, State>` instance pre-configured
359    /// to be a child of the current agent. It automatically derives a hierarchical
360    /// [`Ern`] for the child based on the parent's ID and the provided `name`.
361    /// The child inherits the parent's broker reference.
362    ///
363    /// The returned agent is in the `Idle` state and still needs to be configured
364    /// (e.g., with `act_on`, lifecycle hooks) and then started using its `start` method.
365    /// The parent agent typically calls `handle.supervise(child_handle)` after the child
366    /// is started to register it formally.
367    ///
368    /// # Arguments
369    ///
370    /// * `name`: The name segment for the child agent's [`Ern`].
371    ///
372    /// # Returns
373    ///
374    /// Returns a `Result` containing a new `ManagedAgent` instance for the child
375    /// in the `Idle` state, ready for further configuration.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if creating the child's `Ern` fails or if creating the
380    /// `AgentConfig` fails (e.g., parsing the parent ID).
381    #[instrument(skip(self))]
382    pub async fn create_child(&self, name: String) -> anyhow::Result<ManagedAgent<Idle, State>> {
383        // Configure the child with parent and broker references.
384        let config = AgentConfig::new(
385            Ern::with_root(name)?,               // Child's name segment
386            Some(self.handle.clone()),           // Parent handle
387            Some(self.runtime.broker().clone()), // Inherited broker handle
388        )?;
389        // Create the Idle agent using the internal constructor.
390        Ok(ManagedAgent::new(&Some(self.runtime().clone()), Some(config)).await)
391    }
392
393    // Internal constructor - not part of public API documentation
394    #[instrument]
395    pub(crate) async fn new(runtime: &Option<AgentRuntime>, config: Option<AgentConfig>) -> Self {
396        let mut managed_actor: ManagedAgent<Idle, State> = ManagedAgent::default();
397
398        if let Some(app) = runtime {
399            managed_actor.broker = app.0.broker.clone();
400            managed_actor.handle.broker = Box::new(Some(app.0.broker.clone()));
401            managed_actor.cancellation_token = Some(app.0.cancellation_token.child_token());
402        }
403
404        if let Some(config) = &config {
405            managed_actor.handle.id = config.id();
406            managed_actor.parent = config.parent().clone();
407            managed_actor.handle.broker = Box::new(config.get_broker().clone());
408            if let Some(broker) = config.get_broker().clone() {
409                managed_actor.broker = broker;
410            }
411        }
412
413        debug_assert!(
414            !managed_actor.inbox.is_closed(),
415            "Agent mailbox is closed in new"
416        );
417
418        trace!("NEW ACTOR: {}", &managed_actor.handle.id());
419
420        // Ensure runtime always exists; creating a new one here is an error.
421        assert!(
422            runtime.is_some(),
423            "AgentRuntime must be provided to ManagedAgent::new"
424        );
425        managed_actor.runtime = runtime.clone().unwrap();
426        managed_actor
427            .runtime
428            .0
429            .roots
430            .insert(managed_actor.handle.id(), managed_actor.handle.clone());
431
432        managed_actor.id = managed_actor.handle.id();
433
434        managed_actor
435    }
436
437    /// Starts the agent's processing loop and transitions it to the `Started` state.
438    ///
439    /// This method consumes the `ManagedAgent` in the `Idle` state. It performs the following actions:
440    /// 1.  Transitions the agent's type state from `Idle` to [`Started`](super::started::Started).
441    /// 2.  Executes the registered `before_start` lifecycle hook.
442    /// 3.  Spawns the agent's main asynchronous task (`wake`) which handles message processing.
443    /// 4.  Closes the agent's `TaskTracker` to signal that the main task has been spawned.
444    /// 5.  Returns the agent's [`AgentHandle`] for external interaction.
445    ///
446    /// After this method returns, the agent is running and ready to process messages sent to its handle.
447    ///
448    /// # Returns
449    ///
450    /// An [`AgentHandle`] that can be used to interact with the now-running agent.
451    #[instrument(skip(self))]
452    pub async fn start(mut self) -> AgentHandle {
453        trace!("Starting agent: {}", self.id());
454        trace!("Model state before start: {:?}", self.model);
455
456        // Take ownership of handlers before converting state.
457        let message_handlers = mem::take(&mut self.message_handlers);
458        let actor_ref = self.handle.clone(); // Clone handle before consuming self.
459
460        // Convert the agent to the Started state.
461        let active_actor: ManagedAgent<Started, State> = self.into();
462        // Leak the agent into a static reference for the spawned task.
463        // The task itself is responsible for managing the agent's lifetime.
464        let actor = Box::leak(Box::new(active_actor));
465
466        trace!("Executing before_start hook for agent: {}", actor.id());
467        (actor.before_start)(actor).await; // Execute before_start hook.
468
469        trace!("Spawning main task (wake) for agent: {}", actor.id());
470        // Spawn the main message processing loop.
471        actor_ref.tracker().spawn(actor.wake(message_handlers));
472        // Close the tracker to indicate the main task is launched.
473        actor_ref.tracker().close();
474
475        trace!("Agent {} started successfully.", actor_ref.id());
476        actor_ref // Return the handle.
477    }
478}
479
480// --- Utility Function ---
481
482/// Attempts to downcast an `ActonMessage` trait object to a concrete type `T`.
483///
484/// This utility function is used internally by the message dispatch mechanism
485/// (specifically within the closure generated by `act_on`) to safely convert
486/// a type-erased message (`&dyn ActonMessage`) back into its original concrete type (`&T`).
487///
488/// # Type Parameters
489///
490/// * `T`: The concrete message type to attempt downcasting to. Must be `'static`
491///   and implement [`ActonMessage`].
492///
493/// # Arguments
494///
495/// * `msg`: A reference to the `ActonMessage` trait object.
496///
497/// # Returns
498///
499/// * `Some(&T)`: If the trait object `msg` actually holds a value of type `T`.
500/// * `None`: If the trait object does not hold a value of type `T`.
501pub fn downcast_message<T: ActonMessage + 'static>(msg: &dyn ActonMessage) -> Option<&T> {
502    // Use the Any trait's downcast_ref method provided via ActonMessage's supertraits.
503    msg.as_any().downcast_ref::<T>()
504}
505
506// --- Internal Implementations ---
507// (Default, From, default_handler remain internal and undocumented)
508
509impl<State: Default + Send + Debug + 'static> From<ManagedAgent<Idle, State>>
510    for ManagedAgent<Started, State>
511{
512    fn from(value: ManagedAgent<Idle, State>) -> Self {
513        // Ensure cancellation_token is always present when transitioning to Started state
514        assert!(
515            value.cancellation_token.is_some(),
516            "Cannot transition to ManagedAgent<Started, State> without a cancellation_token"
517        );
518        // Move all fields from Idle state to Started state.
519        ManagedAgent::<Started, State> {
520            handle: value.handle,
521            parent: value.parent,
522            halt_signal: value.halt_signal,
523            id: value.id,
524            runtime: value.runtime,
525            model: value.model,
526            tracker: value.tracker,
527            inbox: value.inbox,
528            before_start: value.before_start,
529            after_start: value.after_start,
530            before_stop: value.before_stop,
531            after_stop: value.after_stop,
532            broker: value.broker,
533            message_handlers: value.message_handlers,
534            error_handler_map: value.error_handler_map, // transfer error handlers
535            cancellation_token: value.cancellation_token,
536            _actor_state: Default::default(),
537        }
538    }
539}
540
541impl<State: Default + Send + Debug + 'static> Default for ManagedAgent<Idle, State> {
542    fn default() -> Self {
543        let (outbox, inbox) = channel(255); // Default channel size
544        let id: Ern = Default::default();
545        let mut handle: crate::common::AgentHandle = Default::default();
546        handle.id = id.clone();
547        handle.outbox = outbox.clone();
548
549        ManagedAgent::<Idle, State> {
550            handle,
551            id,
552            inbox,
553            // Initialize lifecycle hooks with default no-op handlers.
554            before_start: Box::new(|_| default_handler()),
555            after_start: Box::new(|_| default_handler()),
556            before_stop: Box::new(|_| default_handler()),
557            after_stop: Box::new(|_| default_handler()),
558            model: State::default(),
559            broker: Default::default(),
560            error_handler_map: std::collections::HashMap::new(),
561            parent: Default::default(),
562            runtime: Default::default(),
563            halt_signal: Default::default(),
564            tracker: Default::default(),
565            cancellation_token: Default::default(),
566            message_handlers: Default::default(),
567            _actor_state: Default::default(),
568        }
569    }
570}
571
572// Default no-op async handler for lifecycle events.
573fn default_handler() -> FutureBox {
574    Box::pin(async {})
575}