use std::any::TypeId;
use std::fmt::Debug;
use std::future::Future;
use std::mem;
use acton_ern::Ern;
use tokio::sync::mpsc::channel;
use tracing::*;
use crate::actor::{AgentConfig, ManagedAgent, Started};
use crate::common::{
ActonInner, AgentHandle, AgentRuntime, Envelope, FutureBox, OutboundEnvelope, ReactorItem,
};
use crate::message::MessageContext;
use crate::prelude::ActonMessage;
use crate::traits::AgentHandleInterface;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct Idle;
use crate::common::ErrorHandler;
use std::collections::HashMap;
impl<State: Default + Send + Debug + 'static> ManagedAgent<Idle, State> {
#[instrument(skip(self, message_processor), level = "debug")]
#[deprecated(
note = "act_on for handlers returning () will be deprecated in the next version. Use act_on_result for Result-returning handlers."
)]
pub fn act_on<M>(
&mut self,
message_processor: impl for<'a> Fn(&'a mut ManagedAgent<Started, State>, &'a mut MessageContext<M>) -> FutureBox
+ Send
+ Sync
+ 'static,
) -> &mut Self
where
M: ActonMessage + Clone + Send + Sync + 'static,
{
let type_id = TypeId::of::<M>();
trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding legacy message handler (will be deprecated)");
let handler_box = Box::new(
move |actor: &mut ManagedAgent<Started, State>, envelope: &mut Envelope| -> FutureBox {
if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
trace!(
"Downcast successful for message type: {}",
std::any::type_name::<M>()
);
let mut msg_context = {
let origin_envelope = OutboundEnvelope::new_with_recipient(
envelope.reply_to.clone(),
envelope.recipient.clone(),
actor.handle.cancellation_token.clone(),
);
let reply_envelope = OutboundEnvelope::new_with_recipient(
envelope.recipient.clone(),
envelope.reply_to.clone(),
actor.handle.cancellation_token.clone(),
);
MessageContext {
message: concrete_msg.clone(),
timestamp: envelope.timestamp,
origin_envelope,
reply_envelope,
}
};
message_processor(actor, &mut msg_context)
} else {
error!(
type_name = std::any::type_name::<M>(),
"Message handler called with incompatible message type (downcast failed)"
);
Box::pin(async {})
}
},
);
self.message_handlers
.insert(type_id, ReactorItem::FutureReactor(handler_box));
self
}
pub fn act_on_result<M, E, Fut>(
&mut self,
message_processor: impl for<'a> Fn(&'a mut ManagedAgent<Started, State>, &'a mut MessageContext<M>) -> Fut
+ Send
+ Sync
+ 'static,
) -> &mut Self
where
M: ActonMessage + Clone + Send + Sync + 'static,
E: std::error::Error + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<(), E>> + Send + Sync + 'static,
{
let type_id = TypeId::of::<M>();
trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding Result-returning message handler");
let handler_box = Box::new(
move |actor: &mut ManagedAgent<Started, State>,
envelope: &mut Envelope|
-> crate::common::FutureBoxResult {
if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
trace!(
"Downcast successful for message type: {}",
std::any::type_name::<M>()
);
let mut msg_context = {
let origin_envelope = OutboundEnvelope::new_with_recipient(
envelope.reply_to.clone(),
envelope.recipient.clone(),
actor.handle.cancellation_token.clone(),
);
let reply_envelope = OutboundEnvelope::new_with_recipient(
envelope.recipient.clone(),
envelope.reply_to.clone(),
actor.handle.cancellation_token.clone(),
);
MessageContext {
message: concrete_msg.clone(),
timestamp: envelope.timestamp,
origin_envelope,
reply_envelope,
}
};
let fut = message_processor(actor, &mut msg_context);
Box::pin(async move {
match fut.await {
Ok(()) => Ok(()),
Err(e) => Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>),
}
})
} else {
error!(
type_name = std::any::type_name::<M>(),
"Result handler called with incompatible message type (downcast failed)"
);
Box::pin(async { Ok(()) })
}
},
);
self.message_handlers
.insert(type_id, ReactorItem::FutureReactorResult(handler_box));
self
}
pub fn after_start<F, Fut>(&mut self, f: F) -> &mut Self
where
F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + Sync + 'static,
{
self.after_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
self
}
pub fn before_start<F, Fut>(&mut self, f: F) -> &mut Self
where
F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + Sync + 'static,
{
self.before_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
self
}
pub fn after_stop<F, Fut>(&mut self, f: F) -> &mut Self
where
F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + Sync + 'static,
{
self.after_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
self
}
pub fn before_stop<F, Fut>(&mut self, f: F) -> &mut Self
where
F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + Sync + 'static,
{
self.before_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
self
}
#[instrument(skip(self))]
pub async fn create_child(&self, name: String) -> anyhow::Result<ManagedAgent<Idle, State>> {
let config = AgentConfig::new(
Ern::with_root(name)?, Some(self.handle.clone()), Some(self.runtime.broker().clone()), )?;
Ok(ManagedAgent::new(&Some(self.runtime().clone()), Some(config)).await)
}
#[instrument]
pub(crate) async fn new(runtime: &Option<AgentRuntime>, config: Option<AgentConfig>) -> Self {
let mut managed_actor: ManagedAgent<Idle, State> = ManagedAgent::default();
if let Some(app) = runtime {
managed_actor.broker = app.0.broker.clone();
managed_actor.handle.broker = Box::new(Some(app.0.broker.clone()));
managed_actor.cancellation_token = Some(app.0.cancellation_token.child_token());
}
if let Some(config) = &config {
managed_actor.handle.id = config.id();
managed_actor.parent = config.parent().clone();
managed_actor.handle.broker = Box::new(config.get_broker().clone());
if let Some(broker) = config.get_broker().clone() {
managed_actor.broker = broker;
}
}
debug_assert!(
!managed_actor.inbox.is_closed(),
"Agent mailbox is closed in new"
);
trace!("NEW ACTOR: {}", &managed_actor.handle.id());
assert!(
runtime.is_some(),
"AgentRuntime must be provided to ManagedAgent::new"
);
managed_actor.runtime = runtime.clone().unwrap();
managed_actor
.runtime
.0
.roots
.insert(managed_actor.handle.id(), managed_actor.handle.clone());
managed_actor.id = managed_actor.handle.id();
managed_actor
}
#[instrument(skip(self))]
pub async fn start(mut self) -> AgentHandle {
trace!("Starting agent: {}", self.id());
trace!("Model state before start: {:?}", self.model);
let message_handlers = mem::take(&mut self.message_handlers);
let actor_ref = self.handle.clone();
let active_actor: ManagedAgent<Started, State> = self.into();
let actor = Box::leak(Box::new(active_actor));
trace!("Executing before_start hook for agent: {}", actor.id());
(actor.before_start)(actor).await;
trace!("Spawning main task (wake) for agent: {}", actor.id());
actor_ref.tracker().spawn(actor.wake(message_handlers));
actor_ref.tracker().close();
trace!("Agent {} started successfully.", actor_ref.id());
actor_ref }
pub fn on_error<E>(
&mut self,
error_handler: impl for<'a, 'b> Fn(
&'a mut ManagedAgent<Started, State>,
&'b mut crate::message::Envelope,
&'b E,
) -> crate::common::FutureBox
+ Send
+ Sync
+ 'static,
) -> &mut Self
where
E: std::error::Error + 'static,
{
use std::any::TypeId;
use std::sync::Arc;
let handler_box: Arc<Box<crate::common::ErrorHandler<State>>> =
Arc::new(Box::new(move |agent, envelope, err| {
if let Some(specific) = err.downcast_ref::<E>() {
error_handler(agent, envelope, specific)
} else {
Box::pin(async {})
}
}));
self.error_handler_map
.insert(TypeId::of::<E>(), handler_box);
self
}
}
pub fn downcast_message<T: ActonMessage + 'static>(msg: &dyn ActonMessage) -> Option<&T> {
msg.as_any().downcast_ref::<T>()
}
impl<State: Default + Send + Debug + 'static> From<ManagedAgent<Idle, State>>
for ManagedAgent<Started, State>
{
fn from(value: ManagedAgent<Idle, State>) -> Self {
assert!(
value.cancellation_token.is_some(),
"Cannot transition to ManagedAgent<Started, State> without a cancellation_token"
);
ManagedAgent::<Started, State> {
handle: value.handle,
parent: value.parent,
halt_signal: value.halt_signal,
id: value.id,
runtime: value.runtime,
model: value.model,
tracker: value.tracker,
inbox: value.inbox,
before_start: value.before_start,
after_start: value.after_start,
before_stop: value.before_stop,
after_stop: value.after_stop,
broker: value.broker,
message_handlers: value.message_handlers,
error_handler_map: value.error_handler_map, cancellation_token: value.cancellation_token,
_actor_state: Default::default(),
}
}
}
impl<State: Default + Send + Debug + 'static> Default for ManagedAgent<Idle, State> {
fn default() -> Self {
let (outbox, inbox) = channel(255); let id: Ern = Default::default();
let mut handle: crate::common::AgentHandle = Default::default();
handle.id = id.clone();
handle.outbox = outbox.clone();
ManagedAgent::<Idle, State> {
handle,
id,
inbox,
before_start: Box::new(|_| default_handler()),
after_start: Box::new(|_| default_handler()),
before_stop: Box::new(|_| default_handler()),
after_stop: Box::new(|_| default_handler()),
model: State::default(),
broker: Default::default(),
error_handler_map: std::collections::HashMap::new(),
parent: Default::default(),
runtime: Default::default(),
halt_signal: Default::default(),
tracker: Default::default(),
cancellation_token: Default::default(),
message_handlers: Default::default(),
_actor_state: Default::default(),
}
}
}
fn default_handler() -> FutureBox {
Box::pin(async {})
}