acton_core/actor/managed_agent/
idle.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::TypeId;
18use std::fmt::Debug;
19use std::future::Future;
20use std::mem;
21
22use acton_ern::{Ern};
23use tokio::sync::mpsc::channel;
24use tracing::*;
25
26use crate::actor::{AgentConfig, ManagedAgent, Started};
27use crate::common::{ActonInner, AgentHandle, AgentRuntime,Envelope, FutureBox, OutboundEnvelope, ReactorItem};
28use crate::message::MessageContext;
29use crate::prelude::ActonMessage;
30use crate::traits::AgentHandleInterface;
31
32/// Type-state marker for a [`ManagedAgent`] that has been configured but not yet started.
33///
34/// When a `ManagedAgent` is in the `Idle` state, it can be configured with message handlers
35/// (via [`ManagedAgent::act_on`]) and lifecycle hooks (e.g., [`ManagedAgent::before_start`],
36/// [`ManagedAgent::after_stop`]). Once configuration is complete, the agent can be
37/// transitioned to the [`Started`](super::started::Started) state by calling [`ManagedAgent::start`].
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] // Add common derives
39pub struct Idle;
40
41impl<State: Default + Send + Debug + 'static> ManagedAgent<Idle, State> {
42    /// Registers an asynchronous message handler for a specific message type `M`.
43    ///
44    /// This method is called during the agent's configuration phase (while in the `Idle` state).
45    /// It associates a specific message type `M` with a closure (`message_processor`) that
46    /// will be executed when the agent receives a message of that type after it has started.
47    ///
48    /// The framework handles the necessary type erasure and downcasting internally. The
49    /// provided `message_processor` receives the agent (in the `Started` state) and a
50    /// [`MessageContext`] containing the concrete message and metadata.
51    ///
52    /// # Type Parameters
53    ///
54    /// *   `M`: The concrete message type this handler will process. Must implement
55    ///     [`ActonMessage`], `Clone`, `Send`, `Sync`, and be `'static`.
56    ///
57    /// # Arguments
58    ///
59    /// *   `message_processor`: An asynchronous closure that takes the agent (`&mut ManagedAgent<Started, State>`)
60    ///     and the message context (`&mut MessageContext<M>`) and returns a `Future`
61    ///     (specifically, a [`FutureBox`]). This closure contains the logic for handling messages of type `M`.
62    ///
63    /// # Returns
64    ///
65    /// Returns a mutable reference to `self` to allow for method chaining during configuration.
66    #[instrument(skip(self, message_processor), level = "debug")]
67    pub fn act_on<M>(
68        &mut self,
69        message_processor: impl for<'a> Fn(
70            &'a mut ManagedAgent<Started, State>,
71            &'a mut MessageContext<M>,
72        ) -> FutureBox
73        + Send
74        + Sync
75        + 'static,
76    ) -> &mut Self
77    where
78        M: ActonMessage + Clone + Send + Sync + 'static,
79    {
80        let type_id = TypeId::of::<M>();
81        trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding message handler");
82        // Create a boxed handler that performs downcasting and calls the user's processor.
83        let handler_box = Box::new(
84            move |actor: &mut ManagedAgent<Started, State>,
85                  envelope: &mut Envelope|
86                  -> FutureBox {
87                // Downcast the trait object message back to the concrete type M.
88                if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
89                    trace!(
90                        "Downcast successful for message type: {}",
91                        std::any::type_name::<M>()
92                    );
93
94                    // Prepare the MessageContext for the handler.
95                    let mut msg_context = {
96                        let origin_envelope = OutboundEnvelope::new_with_recipient(envelope.reply_to.clone(), envelope.recipient.clone());
97                        let reply_envelope = OutboundEnvelope::new_with_recipient(envelope.recipient.clone(), envelope.reply_to.clone());
98                        MessageContext {
99                            message: concrete_msg.clone(),
100                            timestamp: envelope.timestamp,
101                            origin_envelope,
102                            reply_envelope,
103                        }
104                    };
105
106                    // Call the user-provided message processor.
107                    message_processor(actor, &mut msg_context) // Return the FutureBox directly
108                } else {
109                    // This should ideally not happen if type registration is correct.
110                    error!(
111                        type_name = std::any::type_name::<M>(),
112                        "Message handler called with incompatible message type (downcast failed)"
113                    );
114                    Box::pin(async {}) // Return an empty future on error.
115                }
116            },
117        );
118
119        // Store the type-erased handler.
120        self.message_handlers.insert(type_id, ReactorItem::FutureReactor(handler_box));
121        self
122    }
123
124
125    /// Registers an asynchronous hook to be executed *after* the agent successfully starts its message loop.
126    ///
127    /// This hook is called once, shortly after the agent transitions to the `Started` state
128    /// and its main task begins processing messages. It receives an immutable reference
129    /// to the agent in the `Started` state.
130    ///
131    /// # Arguments
132    ///
133    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
134    ///
135    /// # Returns
136    ///
137    /// Returns a mutable reference to `self` for chaining.
138    pub fn after_start<F, Fut>(&mut self, f: F) -> &mut Self
139    where
140        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
141        Fut: Future<Output=()> + Send + Sync + 'static,
142    {
143        self.after_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
144        self
145    }
146
147    /// Registers an asynchronous hook to be executed *before* the agent starts its message loop.
148    ///
149    /// This hook is called once, just before the agent's main task (`wake`) is spawned
150    /// during the `start` process. It receives an immutable reference to the agent,
151    /// technically still in the `Started` state contextually, though the loop hasn't begun.
152    ///
153    /// # Arguments
154    ///
155    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
156    ///
157    /// # Returns
158    ///
159    /// Returns a mutable reference to `self` for chaining.
160    pub fn before_start<F, Fut>(&mut self, f: F) -> &mut Self
161    where
162        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
163        Fut: Future<Output=()> + Send + Sync + 'static,
164    {
165        self.before_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
166        self
167    }
168
169    /// Registers an asynchronous hook to be executed *after* the agent stops processing messages.
170    ///
171    /// This hook is called once when the agent's main loop terminates gracefully (e.g., upon
172    /// receiving a `Terminate` signal or when the inbox closes). It receives an immutable
173    /// reference to the agent in the `Started` state context.
174    ///
175    /// # Arguments
176    ///
177    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
178    ///
179    /// # Returns
180    ///
181    /// Returns a mutable reference to `self` for chaining.
182    pub fn after_stop<F, Fut>(&mut self, f: F) -> &mut Self
183    where
184        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
185        Fut: Future<Output=()> + Send + Sync + 'static,
186    {
187        self.after_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
188        self
189    }
190
191    /// Registers an asynchronous hook to be executed *before* the agent stops processing messages.
192    ///
193    /// This hook is called once, just before the agent's main loop begins its shutdown sequence
194    /// (e.g., after receiving `Terminate` but before fully stopping). It receives an immutable
195    /// reference to the agent in the `Started` state.
196    ///
197    /// # Arguments
198    ///
199    /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
200    ///
201    /// # Returns
202    ///
203    /// Returns a mutable reference to `self` for chaining.
204    pub fn before_stop<F, Fut>(&mut self, f: F) -> &mut Self
205    where
206        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
207        Fut: Future<Output=()> + Send + Sync + 'static,
208    {
209        self.before_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
210        self
211    }
212
213    /// Creates the configuration for a new child agent under this agent's supervision.
214    ///
215    /// This method generates a `ManagedAgent<Idle, State>` instance pre-configured
216    /// to be a child of the current agent. It automatically derives a hierarchical
217    /// [`Ern`] for the child based on the parent's ID and the provided `name`.
218    /// The child inherits the parent's broker reference.
219    ///
220    /// The returned agent is in the `Idle` state and still needs to be configured
221    /// (e.g., with `act_on`, lifecycle hooks) and then started using its `start` method.
222    /// The parent agent typically calls `handle.supervise(child_handle)` after the child
223    /// is started to register it formally.
224    ///
225    /// # Arguments
226    ///
227    /// * `name`: The name segment for the child agent's [`Ern`].
228    ///
229    /// # Returns
230    ///
231    /// Returns a `Result` containing a new `ManagedAgent` instance for the child
232    /// in the `Idle` state, ready for further configuration.
233    ///
234    /// # Errors
235    ///
236    /// Returns an error if creating the child's `Ern` fails or if creating the
237    /// `AgentConfig` fails (e.g., parsing the parent ID).
238    #[instrument(skip(self))]
239    pub async fn create_child(&self, name: String) -> anyhow::Result<ManagedAgent<Idle, State>> {
240        // Configure the child with parent and broker references.
241        let config = AgentConfig::new(
242            Ern::with_root(name)?, // Child's name segment
243            Some(self.handle.clone()), // Parent handle
244            Some(self.runtime.broker().clone()) // Inherited broker handle
245        )?;
246        // Create the Idle agent using the internal constructor.
247        Ok(ManagedAgent::new(&Some(self.runtime().clone()), Some(config)).await)
248    }
249
250    // Internal constructor - not part of public API documentation
251    #[instrument]
252    pub(crate) async fn new(runtime: &Option<AgentRuntime>, config: Option<AgentConfig>) -> Self {
253        let mut managed_actor: ManagedAgent<Idle, State> = ManagedAgent::default();
254
255        if let Some(app) = runtime {
256            managed_actor.broker = app.0.broker.clone();
257            managed_actor.handle.broker = Box::new(Some(app.0.broker.clone()));
258        }
259
260        if let Some(config) = &config {
261            managed_actor.handle.id = config.id();
262            managed_actor.parent = config.parent().clone();
263            managed_actor.handle.broker = Box::new(config.get_broker().clone());
264            if let Some(broker) = config.get_broker().clone() {
265                managed_actor.broker = broker;
266            }
267        }
268
269        debug_assert!(
270            !managed_actor.inbox.is_closed(),
271            "Agent mailbox is closed in new"
272        );
273
274        trace!("NEW ACTOR: {}", &managed_actor.handle.id());
275
276        managed_actor.runtime = runtime.clone().unwrap_or_else(|| AgentRuntime(ActonInner {
277            broker: managed_actor.handle.broker.clone().unwrap_or_default(),
278            ..Default::default()
279        }));
280
281        managed_actor.id = managed_actor.handle.id();
282
283        managed_actor
284    }
285
286    /// Starts the agent's processing loop and transitions it to the `Started` state.
287    ///
288    /// This method consumes the `ManagedAgent` in the `Idle` state. It performs the following actions:
289    /// 1.  Transitions the agent's type state from `Idle` to [`Started`](super::started::Started).
290    /// 2.  Executes the registered `before_start` lifecycle hook.
291    /// 3.  Spawns the agent's main asynchronous task (`wake`) which handles message processing.
292    /// 4.  Closes the agent's `TaskTracker` to signal that the main task has been spawned.
293    /// 5.  Returns the agent's [`AgentHandle`] for external interaction.
294    ///
295    /// After this method returns, the agent is running and ready to process messages sent to its handle.
296    ///
297    /// # Returns
298    ///
299    /// An [`AgentHandle`] that can be used to interact with the now-running agent.
300    #[instrument(skip(self))]
301    pub async fn start(mut self) -> AgentHandle {
302        trace!("Starting agent: {}", self.id());
303        trace!("Model state before start: {:?}", self.model);
304
305        // Take ownership of handlers before converting state.
306        let message_handlers = mem::take(&mut self.message_handlers);
307        let actor_ref = self.handle.clone(); // Clone handle before consuming self.
308
309        // Convert the agent to the Started state.
310        let active_actor: ManagedAgent<Started, State> = self.into();
311        // Leak the agent into a static reference for the spawned task.
312        // The task itself is responsible for managing the agent's lifetime.
313        let actor = Box::leak(Box::new(active_actor));
314
315        trace!("Executing before_start hook for agent: {}", actor.id());
316        (actor.before_start)(actor).await; // Execute before_start hook.
317
318        trace!("Spawning main task (wake) for agent: {}", actor.id());
319        // Spawn the main message processing loop.
320        actor_ref.tracker().spawn(actor.wake(message_handlers));
321        // Close the tracker to indicate the main task is launched.
322        actor_ref.tracker().close();
323
324        trace!("Agent {} started successfully.", actor_ref.id());
325        actor_ref // Return the handle.
326    }
327}
328
329// --- Utility Function ---
330
331/// Attempts to downcast an `ActonMessage` trait object to a concrete type `T`.
332///
333/// This utility function is used internally by the message dispatch mechanism
334/// (specifically within the closure generated by `act_on`) to safely convert
335/// a type-erased message (`&dyn ActonMessage`) back into its original concrete type (`&T`).
336///
337/// # Type Parameters
338///
339/// * `T`: The concrete message type to attempt downcasting to. Must be `'static`
340///   and implement [`ActonMessage`].
341///
342/// # Arguments
343///
344/// * `msg`: A reference to the `ActonMessage` trait object.
345///
346/// # Returns
347///
348/// * `Some(&T)`: If the trait object `msg` actually holds a value of type `T`.
349/// * `None`: If the trait object does not hold a value of type `T`.
350pub fn downcast_message<T: ActonMessage + 'static>(msg: &dyn ActonMessage) -> Option<&T> {
351    // Use the Any trait's downcast_ref method provided via ActonMessage's supertraits.
352    msg.as_any().downcast_ref::<T>()
353}
354
355// --- Internal Implementations ---
356// (Default, From, default_handler remain internal and undocumented)
357
358impl<State: Default + Send + Debug + 'static> From<ManagedAgent<Idle, State>>
359for ManagedAgent<Started, State>
360{
361    fn from(value: ManagedAgent<Idle, State>) -> Self {
362        // Move all fields from Idle state to Started state.
363        ManagedAgent::<Started, State> {
364            handle: value.handle,
365            parent: value.parent,
366            halt_signal: value.halt_signal,
367            id: value.id,
368            runtime: value.runtime,
369            model: value.model,
370            tracker: value.tracker,
371            inbox: value.inbox,
372            before_start: value.before_start,
373            after_start: value.after_start,
374            before_stop: value.before_stop,
375            after_stop: value.after_stop,
376            broker: value.broker,
377            message_handlers: value.message_handlers,
378            _actor_state: Default::default(),
379        }
380    }
381}
382
383impl<State: Default + Send + Debug + 'static> Default
384for ManagedAgent<Idle, State>
385{
386    fn default() -> Self {
387        let (outbox, inbox) = channel(255); // Default channel size
388        let id: Ern = Default::default();
389        let mut handle: AgentHandle = Default::default();
390        handle.id = id.clone();
391        handle.outbox = outbox.clone();
392
393        ManagedAgent::<Idle, State> {
394            handle,
395            id,
396            inbox,
397            // Initialize lifecycle hooks with default no-op handlers.
398            before_start: Box::new(|_| default_handler()),
399            after_start: Box::new(|_| default_handler()),
400            before_stop: Box::new(|_| default_handler()),
401            after_stop: Box::new(|_| default_handler()),
402            model: State::default(),
403            broker: Default::default(),
404            parent: Default::default(),
405            runtime: Default::default(),
406            halt_signal: Default::default(),
407            tracker: Default::default(),
408            message_handlers: Default::default(),
409            _actor_state: Default::default(),
410        }
411    }
412}
413
414// Default no-op async handler for lifecycle events.
415fn default_handler() -> FutureBox {
416    Box::pin(async {})
417}