acton_core/actor/managed_agent/idle.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::TypeId;
18use std::fmt::Debug;
19use std::future::Future;
20use std::mem;
21
22use acton_ern::Ern;
23use tokio::sync::mpsc::channel;
24use tracing::*;
25
26use crate::actor::{AgentConfig, ManagedAgent, Started};
27use crate::common::{
28 ActonInner, AgentHandle, AgentRuntime, Envelope, FutureBox, OutboundEnvelope, ReactorItem,
29};
30use crate::message::MessageContext;
31use crate::prelude::ActonMessage;
32use crate::traits::AgentHandleInterface;
33
34/// Type-state marker for a [`ManagedAgent`] that has been configured but not yet started.
35///
36/// When a `ManagedAgent` is in the `Idle` state, it can be configured with message handlers
37/// (via [`ManagedAgent::act_on`]) and lifecycle hooks (e.g., [`ManagedAgent::before_start`],
38/// [`ManagedAgent::after_stop`]). Once configuration is complete, the agent can be
39/// transitioned to the [`Started`](super::started::Started) state by calling [`ManagedAgent::start`].
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] // Add common derives
41pub struct Idle;
42
43use crate::common::ErrorHandler;
44use std::collections::HashMap;
45
46impl<State: Default + Send + Debug + 'static> ManagedAgent<Idle, State> {
47 /// Registers an asynchronous message handler for a specific message type `M`.
48 ///
49 /// This method is called during the agent's configuration phase (while in the `Idle` state).
50 /// It associates a specific message type `M` with a closure (`message_processor`) that
51 /// will be executed when the agent receives a message of that type after it has started.
52 ///
53 /// The framework handles the necessary type erasure and downcasting internally. The
54 /// provided `message_processor` receives the agent (in the `Started` state) and a
55 /// [`MessageContext`] containing the concrete message and metadata.
56 ///
57 /// # Type Parameters
58 ///
59 /// * `M`: The concrete message type this handler will process. Must implement
60 /// [`ActonMessage`], `Clone`, `Send`, `Sync`, and be `'static`.
61 ///
62 /// # Arguments
63 ///
64 /// * `message_processor`: An asynchronous closure that takes the agent (`&mut ManagedAgent<Started, State>`)
65 /// and the message context (`&mut MessageContext<M>`) and returns a `Future`
66 /// (specifically, a [`FutureBox`]). This closure contains the logic for handling messages of type `M`.
67 ///
68 /// # Returns
69 ///
70 /// Returns a mutable reference to `self` to allow for method chaining during configuration.
71 #[instrument(skip(self, message_processor), level = "debug")]
72 #[deprecated(
73 note = "act_on for handlers returning () will be deprecated in the next version. Use act_on_result for Result-returning handlers."
74 )]
75 pub fn act_on<M>(
76 &mut self,
77 message_processor: impl for<'a> Fn(&'a mut ManagedAgent<Started, State>, &'a mut MessageContext<M>) -> FutureBox
78 + Send
79 + Sync
80 + 'static,
81 ) -> &mut Self
82 where
83 M: ActonMessage + Clone + Send + Sync + 'static,
84 {
85 let type_id = TypeId::of::<M>();
86 trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding legacy message handler (will be deprecated)");
87 let handler_box = Box::new(
88 move |actor: &mut ManagedAgent<Started, State>, envelope: &mut Envelope| -> FutureBox {
89 if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
90 trace!(
91 "Downcast successful for message type: {}",
92 std::any::type_name::<M>()
93 );
94 let mut msg_context = {
95 let origin_envelope = OutboundEnvelope::new_with_recipient(
96 envelope.reply_to.clone(),
97 envelope.recipient.clone(),
98 actor.handle.cancellation_token.clone(),
99 );
100 let reply_envelope = OutboundEnvelope::new_with_recipient(
101 envelope.recipient.clone(),
102 envelope.reply_to.clone(),
103 actor.handle.cancellation_token.clone(),
104 );
105 MessageContext {
106 message: concrete_msg.clone(),
107 timestamp: envelope.timestamp,
108 origin_envelope,
109 reply_envelope,
110 }
111 };
112 message_processor(actor, &mut msg_context)
113 } else {
114 error!(
115 type_name = std::any::type_name::<M>(),
116 "Message handler called with incompatible message type (downcast failed)"
117 );
118 Box::pin(async {})
119 }
120 },
121 );
122 self.message_handlers
123 .insert(type_id, ReactorItem::FutureReactor(handler_box));
124 self
125 }
126
127 /// Registers an asynchronous message handler for a specific message type `M` that returns a Result (new style, preferred).
128 pub fn act_on_result<M, E, Fut>(
129 &mut self,
130 message_processor: impl for<'a> Fn(&'a mut ManagedAgent<Started, State>, &'a mut MessageContext<M>) -> Fut
131 + Send
132 + Sync
133 + 'static,
134 ) -> &mut Self
135 where
136 M: ActonMessage + Clone + Send + Sync + 'static,
137 E: std::error::Error + Send + Sync + 'static,
138 Fut: std::future::Future<Output = Result<(), E>> + Send + Sync + 'static,
139 {
140 let type_id = TypeId::of::<M>();
141 trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding Result-returning message handler");
142 let handler_box = Box::new(
143 move |actor: &mut ManagedAgent<Started, State>,
144 envelope: &mut Envelope|
145 -> crate::common::FutureBoxResult {
146 if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
147 trace!(
148 "Downcast successful for message type: {}",
149 std::any::type_name::<M>()
150 );
151 let mut msg_context = {
152 let origin_envelope = OutboundEnvelope::new_with_recipient(
153 envelope.reply_to.clone(),
154 envelope.recipient.clone(),
155 actor.handle.cancellation_token.clone(),
156 );
157 let reply_envelope = OutboundEnvelope::new_with_recipient(
158 envelope.recipient.clone(),
159 envelope.reply_to.clone(),
160 actor.handle.cancellation_token.clone(),
161 );
162 MessageContext {
163 message: concrete_msg.clone(),
164 timestamp: envelope.timestamp,
165 origin_envelope,
166 reply_envelope,
167 }
168 };
169 let fut = message_processor(actor, &mut msg_context);
170 Box::pin(async move {
171 match fut.await {
172 Ok(()) => Ok(()),
173 Err(e) => Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>),
174 }
175 })
176 } else {
177 error!(
178 type_name = std::any::type_name::<M>(),
179 "Result handler called with incompatible message type (downcast failed)"
180 );
181 Box::pin(async { Ok(()) })
182 }
183 },
184 );
185 self.message_handlers
186 .insert(type_id, ReactorItem::FutureReactorResult(handler_box));
187 self
188 }
189
190 /// Registers an asynchronous hook to be executed *after* the agent successfully starts its message loop.
191 ///
192 /// This hook is called once, shortly after the agent transitions to the `Started` state
193 /// and its main task begins processing messages. It receives an immutable reference
194 /// to the agent in the `Started` state.
195 ///
196 /// # Arguments
197 ///
198 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
199 ///
200 /// # Returns
201 ///
202 /// Returns a mutable reference to `self` for chaining.
203 pub fn after_start<F, Fut>(&mut self, f: F) -> &mut Self
204 where
205 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
206 Fut: Future<Output = ()> + Send + Sync + 'static,
207 {
208 self.after_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
209 self
210 }
211
212 /// Registers an asynchronous hook to be executed *before* the agent starts its message loop.
213 ///
214 /// This hook is called once, just before the agent's main task (`wake`) is spawned
215 /// during the `start` process. It receives an immutable reference to the agent,
216 /// technically still in the `Started` state contextually, though the loop hasn't begun.
217 ///
218 /// # Arguments
219 ///
220 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
221 ///
222 /// # Returns
223 ///
224 /// Returns a mutable reference to `self` for chaining.
225 pub fn before_start<F, Fut>(&mut self, f: F) -> &mut Self
226 where
227 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
228 Fut: Future<Output = ()> + Send + Sync + 'static,
229 {
230 self.before_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
231 self
232 }
233
234 /// Registers an asynchronous hook to be executed *after* the agent stops processing messages.
235 ///
236 /// This hook is called once when the agent's main loop terminates gracefully (e.g., upon
237 /// receiving a `Terminate` signal or when the inbox closes). It receives an immutable
238 /// reference to the agent in the `Started` state context.
239 ///
240 /// # Arguments
241 ///
242 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
243 ///
244 /// # Returns
245 ///
246 /// Returns a mutable reference to `self` for chaining.
247 pub fn after_stop<F, Fut>(&mut self, f: F) -> &mut Self
248 where
249 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
250 Fut: Future<Output = ()> + Send + Sync + 'static,
251 {
252 self.after_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
253 self
254 }
255
256 /// Registers an asynchronous hook to be executed *before* the agent stops processing messages.
257 ///
258 /// This hook is called once, just before the agent's main loop begins its shutdown sequence
259 /// (e.g., after receiving `Terminate` but before fully stopping). It receives an immutable
260 /// reference to the agent in the `Started` state.
261 ///
262 /// # Arguments
263 ///
264 /// * `f`: An asynchronous closure that takes `&ManagedAgent<Started, State>` and returns a `Future`.
265 ///
266 /// # Returns
267 ///
268 /// Returns a mutable reference to `self` for chaining.
269 pub fn before_stop<F, Fut>(&mut self, f: F) -> &mut Self
270 where
271 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
272 Fut: Future<Output = ()> + Send + Sync + 'static,
273 {
274 self.before_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
275 self
276 }
277
278 /// Creates the configuration for a new child agent under this agent's supervision.
279 ///
280 /// This method generates a `ManagedAgent<Idle, State>` instance pre-configured
281 /// to be a child of the current agent. It automatically derives a hierarchical
282 /// [`Ern`] for the child based on the parent's ID and the provided `name`.
283 /// The child inherits the parent's broker reference.
284 ///
285 /// The returned agent is in the `Idle` state and still needs to be configured
286 /// (e.g., with `act_on`, lifecycle hooks) and then started using its `start` method.
287 /// The parent agent typically calls `handle.supervise(child_handle)` after the child
288 /// is started to register it formally.
289 ///
290 /// # Arguments
291 ///
292 /// * `name`: The name segment for the child agent's [`Ern`].
293 ///
294 /// # Returns
295 ///
296 /// Returns a `Result` containing a new `ManagedAgent` instance for the child
297 /// in the `Idle` state, ready for further configuration.
298 ///
299 /// # Errors
300 ///
301 /// Returns an error if creating the child's `Ern` fails or if creating the
302 /// `AgentConfig` fails (e.g., parsing the parent ID).
303 #[instrument(skip(self))]
304 pub async fn create_child(&self, name: String) -> anyhow::Result<ManagedAgent<Idle, State>> {
305 // Configure the child with parent and broker references.
306 let config = AgentConfig::new(
307 Ern::with_root(name)?, // Child's name segment
308 Some(self.handle.clone()), // Parent handle
309 Some(self.runtime.broker().clone()), // Inherited broker handle
310 )?;
311 // Create the Idle agent using the internal constructor.
312 Ok(ManagedAgent::new(&Some(self.runtime().clone()), Some(config)).await)
313 }
314
315 // Internal constructor - not part of public API documentation
316 #[instrument]
317 pub(crate) async fn new(runtime: &Option<AgentRuntime>, config: Option<AgentConfig>) -> Self {
318 let mut managed_actor: ManagedAgent<Idle, State> = ManagedAgent::default();
319
320 if let Some(app) = runtime {
321 managed_actor.broker = app.0.broker.clone();
322 managed_actor.handle.broker = Box::new(Some(app.0.broker.clone()));
323 managed_actor.cancellation_token = Some(app.0.cancellation_token.child_token());
324 }
325
326 if let Some(config) = &config {
327 managed_actor.handle.id = config.id();
328 managed_actor.parent = config.parent().clone();
329 managed_actor.handle.broker = Box::new(config.get_broker().clone());
330 if let Some(broker) = config.get_broker().clone() {
331 managed_actor.broker = broker;
332 }
333 }
334
335 debug_assert!(
336 !managed_actor.inbox.is_closed(),
337 "Agent mailbox is closed in new"
338 );
339
340 trace!("NEW ACTOR: {}", &managed_actor.handle.id());
341
342 // Ensure runtime always exists; creating a new one here is an error.
343 assert!(
344 runtime.is_some(),
345 "AgentRuntime must be provided to ManagedAgent::new"
346 );
347 managed_actor.runtime = runtime.clone().unwrap();
348 managed_actor
349 .runtime
350 .0
351 .roots
352 .insert(managed_actor.handle.id(), managed_actor.handle.clone());
353
354 managed_actor.id = managed_actor.handle.id();
355
356 managed_actor
357 }
358
359 /// Starts the agent's processing loop and transitions it to the `Started` state.
360 ///
361 /// This method consumes the `ManagedAgent` in the `Idle` state. It performs the following actions:
362 /// 1. Transitions the agent's type state from `Idle` to [`Started`](super::started::Started).
363 /// 2. Executes the registered `before_start` lifecycle hook.
364 /// 3. Spawns the agent's main asynchronous task (`wake`) which handles message processing.
365 /// 4. Closes the agent's `TaskTracker` to signal that the main task has been spawned.
366 /// 5. Returns the agent's [`AgentHandle`] for external interaction.
367 ///
368 /// After this method returns, the agent is running and ready to process messages sent to its handle.
369 ///
370 /// # Returns
371 ///
372 /// An [`AgentHandle`] that can be used to interact with the now-running agent.
373 #[instrument(skip(self))]
374 pub async fn start(mut self) -> AgentHandle {
375 trace!("Starting agent: {}", self.id());
376 trace!("Model state before start: {:?}", self.model);
377
378 // Take ownership of handlers before converting state.
379 let message_handlers = mem::take(&mut self.message_handlers);
380 let actor_ref = self.handle.clone(); // Clone handle before consuming self.
381
382 // Convert the agent to the Started state.
383 let active_actor: ManagedAgent<Started, State> = self.into();
384 // Leak the agent into a static reference for the spawned task.
385 // The task itself is responsible for managing the agent's lifetime.
386 let actor = Box::leak(Box::new(active_actor));
387
388 trace!("Executing before_start hook for agent: {}", actor.id());
389 (actor.before_start)(actor).await; // Execute before_start hook.
390
391 trace!("Spawning main task (wake) for agent: {}", actor.id());
392 // Spawn the main message processing loop.
393 actor_ref.tracker().spawn(actor.wake(message_handlers));
394 // Close the tracker to indicate the main task is launched.
395 actor_ref.tracker().close();
396
397 trace!("Agent {} started successfully.", actor_ref.id());
398 actor_ref // Return the handle.
399 }
400
401 /// Registers an asynchronous error handler for a specific error type `E`.
402 ///
403 /// This allows the agent to handle errors of type `E` by executing the given closure
404 /// whenever a message handler returns an error of this type.
405 ///
406 /// # Type Parameters
407 ///
408 /// * `E`: The concrete error type to handle. Must implement `std::error::Error` and be `'static`.
409 ///
410 /// # Arguments
411 /// * `error_handler`: The handler closure executed with agent, envelope, and error reference.
412 ///
413 /// # Returns
414 /// A mutable reference to `self` for chaining.
415 pub fn on_error<E>(
416 &mut self,
417 error_handler: impl for<'a, 'b> Fn(
418 &'a mut ManagedAgent<Started, State>,
419 &'b mut crate::message::Envelope,
420 &'b E,
421 ) -> crate::common::FutureBox
422 + Send
423 + Sync
424 + 'static,
425 ) -> &mut Self
426 where
427 E: std::error::Error + 'static,
428 {
429 use std::any::TypeId;
430 // Wrap handler for dynamic dispatch
431 use std::sync::Arc;
432 let handler_box: Arc<Box<crate::common::ErrorHandler<State>>> =
433 Arc::new(Box::new(move |agent, envelope, err| {
434 // Downcast the error to &E
435 if let Some(specific) = err.downcast_ref::<E>() {
436 error_handler(agent, envelope, specific)
437 } else {
438 // If type doesn't match, do nothing
439 Box::pin(async {})
440 }
441 }));
442 self.error_handler_map
443 .insert(TypeId::of::<E>(), handler_box);
444 self
445 }
446}
447
448// --- Utility Function ---
449
450/// Attempts to downcast an `ActonMessage` trait object to a concrete type `T`.
451///
452/// This utility function is used internally by the message dispatch mechanism
453/// (specifically within the closure generated by `act_on`) to safely convert
454/// a type-erased message (`&dyn ActonMessage`) back into its original concrete type (`&T`).
455///
456/// # Type Parameters
457///
458/// * `T`: The concrete message type to attempt downcasting to. Must be `'static`
459/// and implement [`ActonMessage`].
460///
461/// # Arguments
462///
463/// * `msg`: A reference to the `ActonMessage` trait object.
464///
465/// # Returns
466///
467/// * `Some(&T)`: If the trait object `msg` actually holds a value of type `T`.
468/// * `None`: If the trait object does not hold a value of type `T`.
469pub fn downcast_message<T: ActonMessage + 'static>(msg: &dyn ActonMessage) -> Option<&T> {
470 // Use the Any trait's downcast_ref method provided via ActonMessage's supertraits.
471 msg.as_any().downcast_ref::<T>()
472}
473
474// --- Internal Implementations ---
475// (Default, From, default_handler remain internal and undocumented)
476
477impl<State: Default + Send + Debug + 'static> From<ManagedAgent<Idle, State>>
478 for ManagedAgent<Started, State>
479{
480 fn from(value: ManagedAgent<Idle, State>) -> Self {
481 // Ensure cancellation_token is always present when transitioning to Started state
482 assert!(
483 value.cancellation_token.is_some(),
484 "Cannot transition to ManagedAgent<Started, State> without a cancellation_token"
485 );
486 // Move all fields from Idle state to Started state.
487 ManagedAgent::<Started, State> {
488 handle: value.handle,
489 parent: value.parent,
490 halt_signal: value.halt_signal,
491 id: value.id,
492 runtime: value.runtime,
493 model: value.model,
494 tracker: value.tracker,
495 inbox: value.inbox,
496 before_start: value.before_start,
497 after_start: value.after_start,
498 before_stop: value.before_stop,
499 after_stop: value.after_stop,
500 broker: value.broker,
501 message_handlers: value.message_handlers,
502 error_handler_map: value.error_handler_map, // transfer error handlers
503 cancellation_token: value.cancellation_token,
504 _actor_state: Default::default(),
505 }
506 }
507}
508
509impl<State: Default + Send + Debug + 'static> Default for ManagedAgent<Idle, State> {
510 fn default() -> Self {
511 let (outbox, inbox) = channel(255); // Default channel size
512 let id: Ern = Default::default();
513 let mut handle: crate::common::AgentHandle = Default::default();
514 handle.id = id.clone();
515 handle.outbox = outbox.clone();
516
517 ManagedAgent::<Idle, State> {
518 handle,
519 id,
520 inbox,
521 // Initialize lifecycle hooks with default no-op handlers.
522 before_start: Box::new(|_| default_handler()),
523 after_start: Box::new(|_| default_handler()),
524 before_stop: Box::new(|_| default_handler()),
525 after_stop: Box::new(|_| default_handler()),
526 model: State::default(),
527 broker: Default::default(),
528 error_handler_map: std::collections::HashMap::new(),
529 parent: Default::default(),
530 runtime: Default::default(),
531 halt_signal: Default::default(),
532 tracker: Default::default(),
533 cancellation_token: Default::default(),
534 message_handlers: Default::default(),
535 _actor_state: Default::default(),
536 }
537 }
538}
539
540// Default no-op async handler for lifecycle events.
541fn default_handler() -> FutureBox {
542 Box::pin(async {})
543}