acton_core/actor/managed_agent/
idle.rs1use std::any::TypeId;
18use std::fmt::Debug;
19use std::future::Future;
20use std::mem;
21
22use acton_ern::{Ern};
23use tokio::sync::mpsc::channel;
24use tracing::*;
25
26use crate::actor::{AgentConfig, ManagedAgent, Started};
27use crate::common::{ActonInner, AgentHandle, AgentRuntime,Envelope, FutureBox, OutboundEnvelope, ReactorItem};
28use crate::message::MessageContext;
29use crate::prelude::ActonMessage;
30use crate::traits::Actor;
31
32pub struct Idle;
34
35impl<State: Default + Send + Debug + 'static> ManagedAgent<Idle, State> {
36 #[instrument(skip(self, message_processor), level = "debug")]
41 pub fn act_on<M>(
42 &mut self,
43 message_processor: impl for<'a> Fn(
44 &'a mut ManagedAgent<Started, State>,
45 &'a mut MessageContext<M>,
46 ) -> FutureBox
47 + Send
48 + Sync
49 + 'static,
50 ) -> &mut Self
51 where
52 M: ActonMessage + Clone + Send + Sync + 'static,
53 {
54 let type_id = TypeId::of::<M>();
55 trace!(type_name=std::any::type_name::<M>(), type_id=?type_id, "Adding message handler");
56
57 let handler_box = Box::new(
59 move |actor: &mut ManagedAgent<Started, State>,
60 envelope: &mut Envelope|
61 -> FutureBox {
62 trace!("Creating handler for message type: {:?}", std::any::type_name::<M>());
63
64 if let Some(concrete_msg) = envelope.message.as_any().downcast_ref::<M>() {
66 trace!(
67 "Downcast successful for type: {:?}",
68 std::any::type_name::<M>()
69 );
70
71 let message = concrete_msg.clone();
72 let sent_time = envelope.timestamp;
73 let mut event_record = {
74 let msg_name = std::any::type_name::<M>();
75 let sender = envelope.reply_to.sender.root.to_string();
76 let recipient = envelope.recipient.sender.root.to_string();
77 let origin_envelope = OutboundEnvelope::new_with_recipient(
78 envelope.reply_to.clone(),
79 envelope.recipient.clone(),
80 );
81 let reply_envelope = OutboundEnvelope::new_with_recipient(
82 envelope.recipient.clone(),
83 envelope.reply_to.clone(),
84 );
85 trace!("sender {sender}::{msg_name}",);
86 trace!("recipient {recipient}::{msg_name}",);
87 MessageContext {
88 message,
89 timestamp: sent_time,
90 origin_envelope,
91 reply_envelope,
92 }
93 };
94
95 let user_future = message_processor(actor, &mut event_record);
97
98 Box::pin(user_future)
100 } else {
101 error!(
102 type_name = std::any::type_name::<M>(),
103 "Message failed to downcast"
104 );
105 Box::pin(async {})
107 }
108 },
109 );
110
111 self.reactors.insert(type_id, ReactorItem::FutureReactor(handler_box));
113 self
114 }
115
116
117 pub fn after_start<F, Fut>(&mut self, f: F) -> &mut Self
122 where
123 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
124 Fut: Future<Output=()> + Send + Sync + 'static,
125 {
126 self.after_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
128 self
129 }
130 pub fn before_start<F, Fut>(&mut self, f: F) -> &mut Self
135 where
136 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
137 Fut: Future<Output=()> + Send + Sync + 'static,
138 {
139 self.before_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
141 self
142 }
143
144 pub fn after_stop<F, Fut>(&mut self, f: F) -> &mut Self
149 where
150 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
151 Fut: Future<Output=()> + Send + Sync + 'static,
152 {
153 self.after_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
154 self
155 }
156 pub fn before_stop<F, Fut>(&mut self, f: F) -> &mut Self
161 where
162 F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
163 Fut: Future<Output=()> + Send + Sync + 'static,
164 {
165 self.before_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
166 self
167 }
168
169 #[instrument(skip(self))]
177 pub async fn create_child(&self, name: String) -> anyhow::Result<ManagedAgent<Idle, State>> {
178 let config = AgentConfig::new(Ern::with_root(name)?, Some(self.handle.clone()), Some(self.runtime.broker().clone()))?;
179 Ok(ManagedAgent::new(&Some(self.runtime().clone()), Some(config)).await)
180 }
181
182 #[instrument]
183 pub(crate) async fn new(runtime: &Option<AgentRuntime>, config: Option<AgentConfig>) -> Self {
184 let mut managed_actor: ManagedAgent<Idle, State> = ManagedAgent::default();
185
186 if let Some(app) = runtime {
187 managed_actor.broker = app.0.broker.clone();
188 managed_actor.handle.broker = Box::new(Some(app.0.broker.clone()));
189 }
190
191 if let Some(config) = &config {
192 managed_actor.handle.id = config.ern();
193 managed_actor.parent = config.parent().clone();
194 managed_actor.handle.broker = Box::new(config.get_broker().clone());
195 if let Some(broker) = config.get_broker().clone() {
196 managed_actor.broker = broker;
197 }
198 }
199
200 debug_assert!(
201 !managed_actor.inbox.is_closed(),
202 "Actor mailbox is closed in new"
203 );
204
205 trace!("NEW ACTOR: {}", &managed_actor.handle.id());
206
207 managed_actor.runtime = runtime.clone().unwrap_or_else(|| AgentRuntime(ActonInner {
208 broker: managed_actor.handle.broker.clone().unwrap_or_default(),
209 ..Default::default()
210 }));
211
212 managed_actor.id = managed_actor.handle.id();
213
214 managed_actor
215 }
216
217 #[instrument(skip(self))]
219 pub async fn start(mut self) -> AgentHandle {
220 trace!("The model is {:?}", self.model);
221
222 let reactors = mem::take(&mut self.reactors);
223 let actor_ref = self.handle.clone();
224 trace!("actor_ref before spawn: {:?}", actor_ref.id.root.to_string());
225 let active_actor: ManagedAgent<Started, State> = self.into();
226 let actor = Box::leak(Box::new(active_actor));
227
228 debug_assert!(
229 !actor.inbox.is_closed(),
230 "Actor mailbox is closed in activate"
231 );
232 (actor.before_start)(actor).await;
233 actor_ref.tracker().spawn(actor.wake(reactors));
234 actor_ref.tracker().close();
235 trace!("actor_ref after spawn: {:?}", actor_ref.id.root.to_string());
236
237 actor_ref
238 }
239}
240
241impl<State: Default + Send + Debug + 'static> From<ManagedAgent<Idle, State>>
242for ManagedAgent<Started, State>
243{
244 fn from(value: ManagedAgent<Idle, State>) -> Self {
245 let on_starting = value.before_start;
246 let on_start = value.after_start;
247 let on_stopped = value.after_stop;
248 let on_before_stop = value.before_stop;
249 let halt_signal = value.halt_signal;
250 let parent = value.parent;
251 let id = value.id;
252 let tracker = value.tracker;
253 let acton = value.runtime;
254 let reactors = value.reactors;
255
256
257 debug_assert!(
258 !value.inbox.is_closed(),
259 "Actor mailbox is closed before conversion in From<Actor<Idle, State>>"
260 );
261
262 let inbox = value.inbox;
263 let handle = value.handle;
264 let model = value.model;
265 let broker = value.broker;
266
267 if handle.children().is_empty() {
269 trace!(
270 "child count before Actor creation {}",
271 handle.children().len()
272 );
273 }
274 ManagedAgent::<Started, State> {
276 handle,
277 parent,
278 halt_signal,
279 id,
280 runtime: acton,
281 model,
282 tracker,
283 inbox,
284 before_start: on_starting,
285 after_start: on_start,
286 before_stop: on_before_stop,
287 after_stop: on_stopped,
288 broker,
289 reactors,
290 _actor_state: Default::default(),
291 }
292 }
293}
294
295impl<State: Default + Send + Debug + 'static> Default
296for ManagedAgent<Idle, State>
297{
298 fn default() -> Self {
299 let (outbox, inbox) = channel(255);
300 let id: Ern = Default::default();
301 let mut handle: AgentHandle = Default::default();
302 handle.id = id.clone();
303 handle.outbox = outbox.clone();
304
305 ManagedAgent::<Idle, State> {
306 handle,
307 id,
308 inbox,
309 before_start: Box::new(|a: &'_ ManagedAgent<Started, State>| default_handler(a)),
310 after_start: Box::new(|a: &'_ ManagedAgent<Started, State>| default_handler(a)),
311 before_stop: Box::new(|a: &'_ ManagedAgent<Started, State>| default_handler(a)),
312 after_stop: Box::new(|a: &'_ ManagedAgent<Started, State>| default_handler(a)),
313 model: State::default(),
314 broker: Default::default(),
315 parent: Default::default(),
316 runtime: Default::default(),
317 halt_signal: Default::default(),
318 tracker: Default::default(),
319 reactors: Default::default(),
320 _actor_state: Default::default(),
321 }
322 }
323}
324
325fn default_handler<State: Debug + Send + Default>(
326 _actor: &'_ ManagedAgent<Started, State>,
327) -> FutureBox {
328 Box::pin(async {})
329}