acton_core/actor/managed_agent/
idle.rsuse std::any::TypeId;
use std::fmt::Debug;
use std::future::Future;
use std::mem;
use acton_ern::{Ern};
use tokio::sync::mpsc::channel;
use tracing::*;
use crate::actor::{AgentConfig, ManagedAgent, Started};
use crate::common::{ActonInner, AgentHandle, AgentRuntime,Envelope, FutureBox, OutboundEnvelope, ReactorItem};
use crate::message::MessageContext;
use crate::prelude::ActonMessage;
use crate::traits::Actor;
pub struct Idle;
impl<State: Default + Send + Debug + 'static> ManagedAgent<Idle, State> {
#[instrument(skip(self, message_processor), level = "debug")]
pub fn act_on<M>(
&mut self,
message_processor: impl for<'a> Fn(
&'a mut ManagedAgent<Started, State>,
&'a mut MessageContext<M>,
) -> FutureBox
+ Send
+ Sync
+ 'static,
) -> &mut Self
where
M: ActonMessage + Clone + Send + Sync + 'static,
{
let type_id = TypeId::of::<M>();
trace!(type_name=std::any::type_name::<M>(), type_id=?type_id, "Adding message handler");
let handler_box = Box::new(
move |actor: &mut ManagedAgent<Started, State>,
envelope: &mut Envelope|
-> FutureBox {
trace!("Creating handler for message type: {:?}", std::any::type_name::<M>());
if let Some(concrete_msg) = envelope.message.as_any().downcast_ref::<M>() {
trace!(
"Downcast successful for type: {:?}",
std::any::type_name::<M>()
);
let message = concrete_msg.clone();
let sent_time = envelope.timestamp;
let mut event_record = {
let msg_name = std::any::type_name::<M>();
let sender = envelope.reply_to.sender.root.to_string();
let recipient = envelope.recipient.sender.root.to_string();
let origin_envelope = OutboundEnvelope::new_with_recipient(
envelope.reply_to.clone(),
envelope.recipient.clone(),
);
let reply_envelope = OutboundEnvelope::new_with_recipient(
envelope.recipient.clone(),
envelope.reply_to.clone(),
);
trace!("sender {sender}::{msg_name}",);
trace!("recipient {recipient}::{msg_name}",);
MessageContext {
message,
timestamp: sent_time,
origin_envelope,
reply_envelope,
}
};
let user_future = message_processor(actor, &mut event_record);
Box::pin(user_future)
} else {
error!(
type_name = std::any::type_name::<M>(),
"Message failed to downcast"
);
Box::pin(async {})
}
},
);
self.reactors.insert(type_id, ReactorItem::FutureReactor(handler_box));
self
}
pub fn after_start<F, Fut>(&mut self, f: F) -> &mut Self
where
F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
Fut: Future<Output=()> + Send + Sync + 'static,
{
self.after_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
self
}
pub fn before_start<F, Fut>(&mut self, f: F) -> &mut Self
where
F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
Fut: Future<Output=()> + Send + Sync + 'static,
{
self.before_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
self
}
pub fn after_stop<F, Fut>(&mut self, f: F) -> &mut Self
where
F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
Fut: Future<Output=()> + Send + Sync + 'static,
{
self.after_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
self
}
pub fn before_stop<F, Fut>(&mut self, f: F) -> &mut Self
where
F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
Fut: Future<Output=()> + Send + Sync + 'static,
{
self.before_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
self
}
#[instrument(skip(self))]
pub async fn create_child(&self, name: String) -> anyhow::Result<ManagedAgent<Idle, State>> {
let config = AgentConfig::new(Ern::with_root(name)?, Some(self.handle.clone()), Some(self.runtime.broker().clone()))?;
Ok(ManagedAgent::new(&Some(self.runtime().clone()), Some(config)).await)
}
#[instrument]
pub(crate) async fn new(runtime: &Option<AgentRuntime>, config: Option<AgentConfig>) -> Self {
let mut managed_actor: ManagedAgent<Idle, State> = ManagedAgent::default();
if let Some(app) = runtime {
managed_actor.broker = app.0.broker.clone();
managed_actor.handle.broker = Box::new(Some(app.0.broker.clone()));
}
if let Some(config) = &config {
managed_actor.handle.id = config.ern();
managed_actor.parent = config.parent().clone();
managed_actor.handle.broker = Box::new(config.get_broker().clone());
if let Some(broker) = config.get_broker().clone() {
managed_actor.broker = broker;
}
}
debug_assert!(
!managed_actor.inbox.is_closed(),
"Actor mailbox is closed in new"
);
trace!("NEW ACTOR: {}", &managed_actor.handle.id());
managed_actor.runtime = runtime.clone().unwrap_or_else(|| AgentRuntime(ActonInner {
broker: managed_actor.handle.broker.clone().unwrap_or_default(),
..Default::default()
}));
managed_actor.id = managed_actor.handle.id();
managed_actor
}
#[instrument(skip(self))]
pub async fn start(mut self) -> AgentHandle {
trace!("The model is {:?}", self.model);
let reactors = mem::take(&mut self.reactors);
let actor_ref = self.handle.clone();
trace!("actor_ref before spawn: {:?}", actor_ref.id.root.to_string());
let active_actor: ManagedAgent<Started, State> = self.into();
let actor = Box::leak(Box::new(active_actor));
debug_assert!(
!actor.inbox.is_closed(),
"Actor mailbox is closed in activate"
);
(actor.before_start)(actor).await;
actor_ref.tracker().spawn(actor.wake(reactors));
actor_ref.tracker().close();
trace!("actor_ref after spawn: {:?}", actor_ref.id.root.to_string());
actor_ref
}
}
impl<State: Default + Send + Debug + 'static> From<ManagedAgent<Idle, State>>
for ManagedAgent<Started, State>
{
fn from(value: ManagedAgent<Idle, State>) -> Self {
let on_starting = value.before_start;
let on_start = value.after_start;
let on_stopped = value.after_stop;
let on_before_stop = value.before_stop;
let halt_signal = value.halt_signal;
let parent = value.parent;
let id = value.id;
let tracker = value.tracker;
let acton = value.runtime;
let reactors = value.reactors;
debug_assert!(
!value.inbox.is_closed(),
"Actor mailbox is closed before conversion in From<Actor<Idle, State>>"
);
let inbox = value.inbox;
let handle = value.handle;
let model = value.model;
let broker = value.broker;
if handle.children().is_empty() {
trace!(
"child count before Actor creation {}",
handle.children().len()
);
}
ManagedAgent::<Started, State> {
handle,
parent,
halt_signal,
id,
runtime: acton,
model,
tracker,
inbox,
before_start: on_starting,
after_start: on_start,
before_stop: on_before_stop,
after_stop: on_stopped,
broker,
reactors,
_actor_state: Default::default(),
}
}
}
impl<State: Default + Send + Debug + 'static> Default
for ManagedAgent<Idle, State>
{
fn default() -> Self {
let (outbox, inbox) = channel(255);
let id: Ern = Default::default();
let mut handle: AgentHandle = Default::default();
handle.id = id.clone();
handle.outbox = outbox.clone();
ManagedAgent::<Idle, State> {
handle,
id,
inbox,
before_start: Box::new(|a: &'_ ManagedAgent<Started, State>| default_handler(a)),
after_start: Box::new(|a: &'_ ManagedAgent<Started, State>| default_handler(a)),
before_stop: Box::new(|a: &'_ ManagedAgent<Started, State>| default_handler(a)),
after_stop: Box::new(|a: &'_ ManagedAgent<Started, State>| default_handler(a)),
model: State::default(),
broker: Default::default(),
parent: Default::default(),
runtime: Default::default(),
halt_signal: Default::default(),
tracker: Default::default(),
reactors: Default::default(),
_actor_state: Default::default(),
}
}
}
fn default_handler<State: Debug + Send + Default>(
_actor: &'_ ManagedAgent<Started, State>,
) -> FutureBox {
Box::pin(async {})
}