use std::any::TypeId;
use std::fmt::Debug;
use std::future::Future;
use std::mem;
use acton_ern::{Ern};
use tokio::sync::mpsc::channel;
use tracing::*;
use crate::actor::{AgentConfig, ManagedAgent, Started};
use crate::common::{ActonInner, AgentHandle, AgentRuntime,Envelope, FutureBox, OutboundEnvelope, ReactorItem};
use crate::message::MessageContext;
use crate::prelude::ActonMessage;
use crate::traits::AgentHandleInterface;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct Idle;
impl<State: Default + Send + Debug + 'static> ManagedAgent<Idle, State> {
#[instrument(skip(self, message_processor), level = "debug")]
pub fn act_on<M>(
&mut self,
message_processor: impl for<'a> Fn(
&'a mut ManagedAgent<Started, State>,
&'a mut MessageContext<M>,
) -> FutureBox
+ Send
+ Sync
+ 'static,
) -> &mut Self
where
M: ActonMessage + Clone + Send + Sync + 'static,
{
let type_id = TypeId::of::<M>();
trace!(type_name=std::any::type_name::<M>(),type_id=?type_id, " Adding message handler");
let handler_box = Box::new(
move |actor: &mut ManagedAgent<Started, State>,
envelope: &mut Envelope|
-> FutureBox {
if let Some(concrete_msg) = downcast_message::<M>(&*envelope.message) {
trace!(
"Downcast successful for message type: {}",
std::any::type_name::<M>()
);
let mut msg_context = {
let origin_envelope = OutboundEnvelope::new_with_recipient(envelope.reply_to.clone(), envelope.recipient.clone());
let reply_envelope = OutboundEnvelope::new_with_recipient(envelope.recipient.clone(), envelope.reply_to.clone());
MessageContext {
message: concrete_msg.clone(),
timestamp: envelope.timestamp,
origin_envelope,
reply_envelope,
}
};
message_processor(actor, &mut msg_context) } else {
error!(
type_name = std::any::type_name::<M>(),
"Message handler called with incompatible message type (downcast failed)"
);
Box::pin(async {}) }
},
);
self.message_handlers.insert(type_id, ReactorItem::FutureReactor(handler_box));
self
}
pub fn after_start<F, Fut>(&mut self, f: F) -> &mut Self
where
F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
Fut: Future<Output=()> + Send + Sync + 'static,
{
self.after_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
self
}
pub fn before_start<F, Fut>(&mut self, f: F) -> &mut Self
where
F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
Fut: Future<Output=()> + Send + Sync + 'static,
{
self.before_start = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
self
}
pub fn after_stop<F, Fut>(&mut self, f: F) -> &mut Self
where
F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
Fut: Future<Output=()> + Send + Sync + 'static,
{
self.after_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
self
}
pub fn before_stop<F, Fut>(&mut self, f: F) -> &mut Self
where
F: for<'b> Fn(&'b ManagedAgent<Started, State>) -> Fut + Send + Sync + 'static,
Fut: Future<Output=()> + Send + Sync + 'static,
{
self.before_stop = Box::new(move |agent| Box::pin(f(agent)) as FutureBox);
self
}
#[instrument(skip(self))]
pub async fn create_child(&self, name: String) -> anyhow::Result<ManagedAgent<Idle, State>> {
let config = AgentConfig::new(
Ern::with_root(name)?, Some(self.handle.clone()), Some(self.runtime.broker().clone()) )?;
Ok(ManagedAgent::new(&Some(self.runtime().clone()), Some(config)).await)
}
#[instrument]
pub(crate) async fn new(runtime: &Option<AgentRuntime>, config: Option<AgentConfig>) -> Self {
let mut managed_actor: ManagedAgent<Idle, State> = ManagedAgent::default();
if let Some(app) = runtime {
managed_actor.broker = app.0.broker.clone();
managed_actor.handle.broker = Box::new(Some(app.0.broker.clone()));
}
if let Some(config) = &config {
managed_actor.handle.id = config.id();
managed_actor.parent = config.parent().clone();
managed_actor.handle.broker = Box::new(config.get_broker().clone());
if let Some(broker) = config.get_broker().clone() {
managed_actor.broker = broker;
}
}
debug_assert!(
!managed_actor.inbox.is_closed(),
"Agent mailbox is closed in new"
);
trace!("NEW ACTOR: {}", &managed_actor.handle.id());
managed_actor.runtime = runtime.clone().unwrap_or_else(|| AgentRuntime(ActonInner {
broker: managed_actor.handle.broker.clone().unwrap_or_default(),
..Default::default()
}));
managed_actor.id = managed_actor.handle.id();
managed_actor
}
#[instrument(skip(self))]
pub async fn start(mut self) -> AgentHandle {
trace!("Starting agent: {}", self.id());
trace!("Model state before start: {:?}", self.model);
let message_handlers = mem::take(&mut self.message_handlers);
let actor_ref = self.handle.clone();
let active_actor: ManagedAgent<Started, State> = self.into();
let actor = Box::leak(Box::new(active_actor));
trace!("Executing before_start hook for agent: {}", actor.id());
(actor.before_start)(actor).await;
trace!("Spawning main task (wake) for agent: {}", actor.id());
actor_ref.tracker().spawn(actor.wake(message_handlers));
actor_ref.tracker().close();
trace!("Agent {} started successfully.", actor_ref.id());
actor_ref }
}
pub fn downcast_message<T: ActonMessage + 'static>(msg: &dyn ActonMessage) -> Option<&T> {
msg.as_any().downcast_ref::<T>()
}
impl<State: Default + Send + Debug + 'static> From<ManagedAgent<Idle, State>>
for ManagedAgent<Started, State>
{
fn from(value: ManagedAgent<Idle, State>) -> Self {
ManagedAgent::<Started, State> {
handle: value.handle,
parent: value.parent,
halt_signal: value.halt_signal,
id: value.id,
runtime: value.runtime,
model: value.model,
tracker: value.tracker,
inbox: value.inbox,
before_start: value.before_start,
after_start: value.after_start,
before_stop: value.before_stop,
after_stop: value.after_stop,
broker: value.broker,
message_handlers: value.message_handlers,
_actor_state: Default::default(),
}
}
}
impl<State: Default + Send + Debug + 'static> Default
for ManagedAgent<Idle, State>
{
fn default() -> Self {
let (outbox, inbox) = channel(255); let id: Ern = Default::default();
let mut handle: AgentHandle = Default::default();
handle.id = id.clone();
handle.outbox = outbox.clone();
ManagedAgent::<Idle, State> {
handle,
id,
inbox,
before_start: Box::new(|_| default_handler()),
after_start: Box::new(|_| default_handler()),
before_stop: Box::new(|_| default_handler()),
after_stop: Box::new(|_| default_handler()),
model: State::default(),
broker: Default::default(),
parent: Default::default(),
runtime: Default::default(),
halt_signal: Default::default(),
tracker: Default::default(),
message_handlers: Default::default(),
_actor_state: Default::default(),
}
}
}
fn default_handler() -> FutureBox {
Box::pin(async {})
}