use crate::channel;
use crate::{
ActorError, ActorSystem, ActorSystemCmd, Blocking, CHANNEL_SIZE, ErrorHandling, LifeCycle,
Message, TypedMailbox,
};
use std::sync::Arc;
#[async_trait::async_trait]
pub trait Actor
where
Self: Sized + Send + Sync + 'static,
{
type Message: std::fmt::Debug + Sized + Send + Sync + 'static;
type Result: std::fmt::Debug + Sized + Send + 'static;
type Error: std::fmt::Debug + std::fmt::Display + Send;
#[cfg(not(feature = "multi-node"))]
fn address(&self) -> &str;
#[cfg(feature = "multi-node")]
fn address(&self) -> &crate::inter_node::Address;
#[doc(hidden)]
fn local_name(&self) -> &str {
#[cfg(not(feature = "multi-node"))]
{
self.address()
}
#[cfg(feature = "multi-node")]
{
&self.address().name
}
}
async fn handle(&mut self, msg: Arc<Self::Message>) -> Result<Self::Result, Self::Error>;
async fn pre_start(&mut self) {}
async fn pre_restart(&mut self) {}
async fn post_stop(&mut self) {}
async fn post_restart(&mut self) {}
async fn run_actor(
&mut self,
actor_system_tx: channel::Sender<ActorSystemCmd>,
error_handling: ErrorHandling,
ready_tx: channel::Sender<Result<(), ActorError>>,
channel_size: Option<usize>,
) -> Result<(), ActorError> {
let mut is_restarted = false;
let size = channel_size.unwrap_or(CHANNEL_SIZE);
loop {
if is_restarted {
self.post_restart().await;
}
let (tx, mut rx) = channel::channel::<Message<Self>>(size);
let (kill_tx, mut kill_rx) = channel::channel::<()>(size);
let (restart_tx, mut restart_rx) = channel::channel::<()>(size);
let mailbox = Arc::new(TypedMailbox::<Self>::new(tx.clone()));
let mut count = 0;
let result_rx = loop {
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
if let Err(e) = channel::send(
&actor_system_tx,
ActorSystemCmd::Register {
actor_type: std::any::type_name::<Self>().to_string(),
#[cfg(not(feature = "multi-node"))]
address: self.address().to_string(),
#[cfg(feature = "multi-node")]
address: self.address().clone(),
mailbox: mailbox.clone(),
restart_tx: restart_tx.clone(),
kill_tx: kill_tx.clone(),
life_cycle: if is_restarted {
LifeCycle::Restarting
} else {
LifeCycle::Starting
},
result_tx,
is_restarted,
},
)
.await
{
count += 1;
error!(
"Failed to register actor {}...({}): {:?}",
self.local_name(),
count,
e
);
if count > 10 {
let _ = channel::send(&ready_tx, Err(ActorError::UnhealthyActorSystem))
.await;
return Err(ActorError::UnhealthyActorSystem);
}
}
break result_rx;
};
match result_rx.await {
Ok(Err(e)) => {
let _ = channel::send(&ready_tx, Err(e)).await;
return Err(ActorError::AddressAlreadyExist(self.local_name().to_string()));
}
Err(e) => {
let _ = channel::send(&ready_tx, Err(ActorError::from(e))).await;
return Err(ActorError::UnhealthyActorSystem);
}
_ => {}
}
self.pre_start().await;
is_restarted = true;
let _ = channel::send(
&actor_system_tx,
ActorSystemCmd::SetLifeCycle {
address: self.local_name().to_string(),
life_cycle: LifeCycle::Receiving,
},
)
.await;
let _ = channel::send(&ready_tx, Ok(())).await;
if let Some(_) = loop {
tokio::select! {
Some(mut msg) = rx.recv() => {
let result_tx = msg.result_tx();
let msg_de = msg.inner();
match self.handle(msg_de).await {
Ok(result) => {
if let Some(result_tx) = result_tx {
let _ = result_tx.send(result);
}
}
Err(e) => {
match error_handling {
ErrorHandling::Resume => {
debug!("Handler's result has error: {:?} ...Resume this actor", e);
continue;
}
ErrorHandling::Restart => {
debug!("Handler's result has error: {:?} ...Restart this actor", e);
break None;
}
ErrorHandling::Stop => {
error!("Handler's result has error: {:?} ...Stop this actor", e);
break Some(());
}
}
}
}
}
Some(_) = kill_rx.recv() => {
info!("Kill actor: address={}", self.local_name());
break Some(());
}
Some(_) = restart_rx.recv() => {
info!("Restart actor: address={}", self.local_name());
break None;
}
};
} {
let _ = channel::send(
&actor_system_tx,
ActorSystemCmd::SetLifeCycle {
address: self.local_name().to_string(),
life_cycle: LifeCycle::Stopping,
},
)
.await;
self.post_stop().await;
let _ = channel::send(
&actor_system_tx,
ActorSystemCmd::SetLifeCycle {
address: self.local_name().to_string(),
life_cycle: LifeCycle::Terminated,
},
)
.await;
break Ok(());
}
let _ = channel::send(
&actor_system_tx,
ActorSystemCmd::SetLifeCycle {
address: self.local_name().to_string(),
life_cycle: LifeCycle::Stopping,
},
)
.await;
self.pre_restart().await;
let _ = channel::send(
&actor_system_tx,
ActorSystemCmd::SetLifeCycle {
address: self.local_name().to_string(),
life_cycle: LifeCycle::Restarting,
},
)
.await;
}
}
async fn register(
mut self,
actor_system: &mut ActorSystem,
error_handling: ErrorHandling,
blocking: Blocking,
channel_size: Option<usize>,
) -> Result<(), ActorError> {
let size = channel_size.unwrap_or(CHANNEL_SIZE);
let (tx, mut rx) = channel::channel(size);
let actor_system_tx = actor_system.handler_tx();
let _ = if blocking == Blocking::Blocking {
tokio::task::spawn_blocking(move || {
let result = tokio::runtime::Handle::current().block_on(self.run_actor(
actor_system_tx,
error_handling,
tx,
channel_size,
));
if let Err(e) = result {
error!("Actor {} run failed: {:?}", self.local_name(), e);
}
})
} else {
tokio::spawn(async move {
let result = self
.run_actor(actor_system_tx, error_handling, tx, channel_size)
.await;
if let Err(e) = result {
error!("Actor {} run failed: {:?}", self.local_name(), e);
}
})
};
if let Some(result) = rx.recv().await {
result
} else {
Err(ActorError::ChannelRecv)
}
}
}