acton_core/actor/managed_agent/
idle.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::TypeId;
18use std::fmt::Debug;
19use std::future::Future;
20use std::mem;
21
22use acton_ern::{Ern};
23use tokio::sync::mpsc::channel;
24use tracing::*;
25
26use crate::actor::{AgentConfig, ManagedAgent, Started};
27use crate::common::{ActonInner, AgentHandle, AgentRuntime,Envelope, FutureBox, OutboundEnvelope, ReactorItem};
28use crate::message::MessageContext;
29use crate::prelude::ActonMessage;
30use crate::traits::Actor;
31
32/// The idle state of an actor.
33pub struct Idle;
34
35impl<State: Default + Send + Debug + 'static> ManagedAgent<Idle, State> {
36    /// Adds an asynchronous message handler for a specific message type.
37    ///
38    /// # Parameters
39    /// - `message_processor`: The function to handle the message.
40    #[instrument(skip(self, message_processor), level = "debug")]
41    pub fn act_on<M>(
42        &mut self,
43        message_processor: impl for<'a> Fn(
44            &'a mut ManagedAgent<Started, State>,
45            &'a mut MessageContext<M>,
46        ) -> FutureBox
47        + Send
48        + Sync
49        + 'static,
50    ) -> &mut Self
51    where
52        M: ActonMessage + Clone + Send + Sync + 'static,
53    {
54        let type_id = TypeId::of::<M>();
55        trace!(type_name=std::any::type_name::<M>(), type_id=?type_id, "Adding message handler");
56
57        // Create a boxed handler for the message type.
58        let handler_box = Box::new(
59            move |actor: &mut ManagedAgent<Started, State>,
60                  envelope: &mut Envelope|
61                  -> FutureBox {
62                trace!("Creating handler for message type: {:?}", std::any::type_name::<M>());
63
64                // Attempt to downcast the message to the expected type.
65                if let Some(concrete_msg) = envelope.message.as_any().downcast_ref::<M>() {
66                    trace!(
67                    "Downcast successful for type: {:?}",
68                    std::any::type_name::<M>()
69                );
70
71                    let message = concrete_msg.clone();
72                    let sent_time = envelope.timestamp;
73                    let mut event_record = {
74                        let msg_name = std::any::type_name::<M>();
75                        let sender = envelope.reply_to.sender.root.to_string();
76                        let recipient = envelope.recipient.sender.root.to_string();
77                        let origin_envelope = OutboundEnvelope::new_with_recipient(
78                            envelope.reply_to.clone(),
79                            envelope.recipient.clone(),
80                        );
81                        let reply_envelope = OutboundEnvelope::new_with_recipient(
82                            envelope.recipient.clone(),
83                            envelope.reply_to.clone(),
84                        );
85                        trace!("sender {sender}::{msg_name}",);
86                        trace!("recipient {recipient}::{msg_name}",);
87                        MessageContext {
88                            message,
89                            timestamp: sent_time,
90                            origin_envelope,
91                            reply_envelope,
92                        }
93                    };
94
95                    // Call the user-provided function and get the future.
96                    let user_future = message_processor(actor, &mut event_record);
97
98                    // Automatically box and pin the user future.
99                    Box::pin(user_future)
100                } else {
101                    error!(
102                    type_name = std::any::type_name::<M>(),
103                    "Message failed to downcast"
104                );
105                    // Return an immediately resolving future if downcast fails.
106                    Box::pin(async {})
107                }
108            },
109        );
110
111        // Insert the handler into the reactors map.
112        self.reactors.insert(type_id, ReactorItem::FutureReactor(handler_box));
113        self
114    }
115
116
117    /// Sets the reactor to be called when the actor wakes up.
118    ///
119    /// # Parameters
120    /// - `life_cycle_event_reactor`: The function to be called.
121    pub fn after_start<F, Fut>(&mut self, f: F) -> &mut Self
122    where
123        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
124        Fut: Future<Output=()> + Send + Sync + 'static,
125    {
126        // Create a boxed handler that can be stored in the HashMap.
127        self.after_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
128        self
129    }
130    /// Sets the reactor to be called when the actor wakes up.
131    ///
132    /// # Parameters
133    /// - `life_cycle_event_reactor`: The function to be called.
134    pub fn before_start<F, Fut>(&mut self, f: F) -> &mut Self
135    where
136        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
137        Fut: Future<Output=()> + Send + Sync + 'static,
138    {
139        // Create a boxed handler that can be stored in the HashMap.
140        self.before_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
141        self
142    }
143
144    /// Sets the reactor to be called when the actor stops processing messages in its mailbox.
145    ///
146    /// # Parameters
147    /// - `life_cycle_event_reactor`: The function to be called.
148    pub fn after_stop<F, Fut>(&mut self, f: F) -> &mut Self
149    where
150        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
151        Fut: Future<Output=()> + Send + Sync + 'static,
152    {
153        self.after_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
154        self
155    }
156    /// Sets the reactor to be called just before the actor stops processing messages in its mailbox.
157    ///
158    /// # Parameters
159    /// - `life_cycle_event_reactor`: The function to be called.
160    pub fn before_stop<F, Fut>(&mut self, f: F) -> &mut Self
161    where
162        F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
163        Fut: Future<Output=()> + Send + Sync + 'static,
164    {
165        self.before_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
166        self
167    }
168
169    /// Creates and supervises a new actor with the given ID and state.
170    ///
171    /// # Parameters
172    /// - `id`: The identifier for the new actor.
173    ///
174    /// # Returns
175    /// A new `Actor` instance in the idle state.
176    #[instrument(skip(self))]
177    pub async fn create_child(&self, name: String) -> anyhow::Result<ManagedAgent<Idle, State>> {
178        let config = AgentConfig::new(Ern::with_root(name)?, Some(self.handle.clone()), Some(self.runtime.broker().clone()))?;
179        Ok(ManagedAgent::new(&Some(self.runtime().clone()), Some(config)).await)
180    }
181
182    #[instrument]
183    pub(crate) async fn new(runtime: &Option<AgentRuntime>, config: Option<AgentConfig>) -> Self {
184        let mut managed_actor: ManagedAgent<Idle, State> = ManagedAgent::default();
185
186        if let Some(app) = runtime {
187            managed_actor.broker = app.0.broker.clone();
188            managed_actor.handle.broker = Box::new(Some(app.0.broker.clone()));
189        }
190
191        if let Some(config) = &config {
192            managed_actor.handle.id = config.ern();
193            managed_actor.parent = config.parent().clone();
194            managed_actor.handle.broker = Box::new(config.get_broker().clone());
195            if let Some(broker) = config.get_broker().clone() {
196                managed_actor.broker = broker;
197            }
198        }
199
200        debug_assert!(
201            !managed_actor.inbox.is_closed(),
202            "Actor mailbox is closed in new"
203        );
204
205        trace!("NEW ACTOR: {}", &managed_actor.handle.id());
206
207        managed_actor.runtime = runtime.clone().unwrap_or_else(|| AgentRuntime(ActonInner {
208            broker: managed_actor.handle.broker.clone().unwrap_or_default(),
209            ..Default::default()
210        }));
211
212        managed_actor.id = managed_actor.handle.id();
213
214        managed_actor
215    }
216
217    /// Starts the actor and transitions it to the running state.
218    #[instrument(skip(self))]
219    pub async fn start(mut self) -> AgentHandle {
220        trace!("The model is {:?}", self.model);
221
222        let reactors = mem::take(&mut self.reactors);
223        let actor_ref = self.handle.clone();
224        trace!("actor_ref before spawn: {:?}", actor_ref.id.root.to_string());
225        let active_actor: ManagedAgent<Started, State> = self.into();
226        let actor = Box::leak(Box::new(active_actor));
227
228        debug_assert!(
229            !actor.inbox.is_closed(),
230            "Actor mailbox is closed in activate"
231        );
232        (actor.before_start)(actor).await;
233        actor_ref.tracker().spawn(actor.wake(reactors));
234        actor_ref.tracker().close();
235        trace!("actor_ref after spawn: {:?}", actor_ref.id.root.to_string());
236
237        actor_ref
238    }
239}
240
241impl<State: Default + Send + Debug + 'static> From<ManagedAgent<Idle, State>>
242for ManagedAgent<Started, State>
243{
244    fn from(value: ManagedAgent<Idle, State>) -> Self {
245        let on_starting = value.before_start;
246        let on_start = value.after_start;
247        let on_stopped = value.after_stop;
248        let on_before_stop = value.before_stop;
249        let halt_signal = value.halt_signal;
250        let parent = value.parent;
251        let id = value.id;
252        let tracker = value.tracker;
253        let acton = value.runtime;
254        let reactors = value.reactors;
255
256
257        debug_assert!(
258            !value.inbox.is_closed(),
259            "Actor mailbox is closed before conversion in From<Actor<Idle, State>>"
260        );
261
262        let inbox = value.inbox;
263        let handle = value.handle;
264        let model = value.model;
265        let broker = value.broker;
266
267        // tracing::trace!("Mailbox is not closed, proceeding with conversion");
268        if handle.children().is_empty() {
269            trace!(
270                "child count before Actor creation {}",
271                handle.children().len()
272            );
273        }
274        // Create and return the new actor in the running state
275        ManagedAgent::<Started, State> {
276            handle,
277            parent,
278            halt_signal,
279            id,
280            runtime: acton,
281            model,
282            tracker,
283            inbox,
284            before_start: on_starting,
285            after_start: on_start,
286            before_stop: on_before_stop,
287            after_stop: on_stopped,
288            broker,
289            reactors,
290            _actor_state: Default::default(),
291        }
292    }
293}
294
295impl<State: Default + Send + Debug + 'static> Default
296for ManagedAgent<Idle, State>
297{
298    fn default() -> Self {
299        let (outbox, inbox) = channel(255);
300        let id: Ern = Default::default();
301        let mut handle: AgentHandle = Default::default();
302        handle.id = id.clone();
303        handle.outbox = outbox.clone();
304
305        ManagedAgent::<Idle, State> {
306            handle,
307            id,
308            inbox,
309            before_start: Box::new(|a: &'_ ManagedAgent<Started, State>| default_handler(a)),
310            after_start: Box::new(|a: &'_ ManagedAgent<Started, State>| default_handler(a)),
311            before_stop: Box::new(|a: &'_ ManagedAgent<Started, State>| default_handler(a)),
312            after_stop: Box::new(|a: &'_ ManagedAgent<Started, State>| default_handler(a)),
313            model: State::default(),
314            broker: Default::default(),
315            parent: Default::default(),
316            runtime: Default::default(),
317            halt_signal: Default::default(),
318            tracker: Default::default(),
319            reactors: Default::default(),
320            _actor_state: Default::default(),
321        }
322    }
323}
324
325fn default_handler<State: Debug + Send + Default>(
326    _actor: &'_ ManagedAgent<Started, State>,
327) -> FutureBox {
328    Box::pin(async {})
329}