use crate::actor::context::{ActorContext, ActorStatus};
use crate::actor::describe::Describe;
use crate::actor::lifecycle::{Status, Stop};
use crate::actor::message::{
ActorMessage, Exec, Handler, Message, MessageHandler, MessageUnwrapErr, MessageWrapErr,
};
use crate::actor::metrics::ActorMetrics;
use crate::actor::scheduler::ActorType::{Anonymous, Tracked};
use crate::actor::supervised::Terminated;
use crate::actor::system::ActorSystem;
use std::any::Any;
use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
#[cfg(feature = "remote")]
use crate::actor::message::Envelope;
#[cfg(feature = "remote")]
use crate::remote::{system::NodeId, RemoteActorRef};
pub use refs::*;
pub mod blocking;
pub mod context;
pub mod describe;
#[cfg(feature = "actor-events")]
pub mod event;
pub mod watch;
pub mod lifecycle;
pub mod message;
pub mod metrics;
pub mod refs;
pub mod scheduler;
pub mod supervised;
pub mod system;
pub mod worker;
pub type ActorId = Arc<str>;
pub type ActorPath = Arc<str>;
#[async_trait]
pub trait Actor: 'static + Send + Sync {
fn new_context(
&self,
system: Option<ActorSystem>,
status: ActorStatus,
boxed_ref: BoxedActorRef,
) -> ActorContext {
ActorContext::new(system, status, boxed_ref, Self::DEFAULT_TAGS)
}
async fn started(&mut self, _ctx: &mut ActorContext) {}
async fn stopped(&mut self, _ctx: &mut ActorContext) {}
async fn on_child_stopped(&mut self, _id: &ActorId, _ctx: &mut ActorContext) {}
fn actor_ref(&self, ctx: &ActorContext) -> LocalActorRef<Self>
where
Self: Sized,
{
ctx.actor_ref()
}
fn type_name() -> &'static str
where
Self: Sized,
{
std::any::type_name::<Self>()
}
fn tags(&self, ctx: &ActorContext) -> ActorTags {
ctx.tags()
}
const DEFAULT_TAGS: ActorTags = { ActorTags::None };
}
#[async_trait]
pub trait IntoActor: Actor + Sized {
async fn into_actor<'a, I: 'a + IntoActorId + Send>(
self,
id: Option<I>,
sys: &ActorSystem,
) -> Result<LocalActorRef<Self>, ActorRefErr>;
async fn into_anon_actor<'a, I: 'a + IntoActorId + Send>(
self,
id: Option<I>,
sys: &ActorSystem,
) -> Result<LocalActorRef<Self>, ActorRefErr>;
}
#[async_trait]
pub trait IntoChild: Actor + Sized {
async fn into_child(
self,
id: Option<ActorId>,
ctx: &mut ActorContext,
) -> Result<LocalActorRef<Self>, ActorRefErr>;
}
#[async_trait]
pub trait ActorFactory: Clone {
type Actor: Actor + 'static + Sync + Send;
type Recipe: ActorRecipe + 'static + Sync + Send;
async fn create(&self, recipe: Self::Recipe) -> Result<Self::Actor, ActorCreationErr>;
}
pub enum ActorCreationErr {
InvalidRecipe(String),
}
pub trait ActorRecipe: Sized {
fn read_from_bytes(bytes: &Vec<u8>) -> Option<Self>;
fn write_to_bytes(&self) -> Option<Vec<u8>>;
}
impl ActorRecipe for () {
fn read_from_bytes(_: &Vec<u8>) -> Option<Self> {
Some(())
}
fn write_to_bytes(&self) -> Option<Vec<u8>> {
Some(vec![])
}
}
pub async fn new_actor<A: Actor>(actor: A) -> Result<LocalActorRef<A>, ActorRefErr>
where
A: 'static + Sync + Send,
{
ActorSystem::global_system().new_tracked_actor(actor).await
}
pub async fn get_actor<A: Actor>(id: ActorId) -> Option<LocalActorRef<A>>
where
A: 'static + Sync + Send,
{
ActorSystem::global_system().get_tracked_actor(id).await
}
#[async_trait]
pub trait MessageReceiver<M: Message>: 'static + Send + Sync + MessageReceiverClone<M> {
fn actor_id(&self) -> &ActorId;
async fn send(&self, msg: M) -> Result<M::Result, ActorRefErr>;
fn notify(&self, msg: M) -> Result<(), ActorRefErr>;
}
pub trait MessageReceiverClone<M: Message>: 'static + Send + Sync {
fn clone_box(&self) -> Box<dyn MessageReceiver<M>>;
}
impl<T, M> MessageReceiverClone<M> for T
where
T: 'static + MessageReceiver<M> + Clone,
M: Message,
{
fn clone_box(&self) -> Box<dyn MessageReceiver<M>> {
Box::new(self.clone())
}
}
impl<M: Message> Clone for Box<dyn MessageReceiver<M>> {
fn clone(&self) -> Self {
self.clone_box()
}
}
#[async_trait]
impl<A: Actor, M: Message> MessageReceiver<M> for LocalActorRef<A>
where
A: Handler<M>,
{
fn actor_id(&self) -> &ActorId {
self.actor_id()
}
async fn send(&self, msg: M) -> Result<M::Result, ActorRefErr> {
self.send(msg).await
}
fn notify(&self, msg: M) -> Result<(), ActorRefErr> {
self.notify(msg)
}
}
impl<A: Actor, M: Message> From<LocalActorRef<A>> for Receiver<M>
where
A: Handler<M>,
{
fn from(actor_ref: LocalActorRef<A>) -> Self {
Self(Box::new(actor_ref))
}
}
pub struct Receiver<M: Message>(Box<dyn MessageReceiver<M>>);
impl<M: Message> Clone for Receiver<M> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<M: Message> Receiver<M> {
pub fn actor_id(&self) -> &ActorId {
self.0.actor_id()
}
pub async fn send(&self, msg: M) -> Result<M::Result, ActorRefErr> {
self.0.send(msg).await
}
pub fn notify(&self, msg: M) -> Result<(), ActorRefErr> {
self.0.notify(msg)
}
}
#[async_trait]
impl<A: Actor> IntoActor for A {
async fn into_actor<'a, I: 'a + IntoActorId + Send>(
self,
id: Option<I>,
sys: &ActorSystem,
) -> Result<LocalActorRef<Self>, ActorRefErr> {
let id = id.map(|i| i.into_actor_id());
if let Some(id) = id {
sys.new_actor(id, self, Tracked).await
} else {
sys.new_anon_actor(self).await
}
}
async fn into_anon_actor<'a, I: 'a + IntoActorId + Send>(
self,
id: Option<I>,
sys: &ActorSystem,
) -> Result<LocalActorRef<Self>, ActorRefErr> {
sys.new_actor(
id.map_or_else(new_actor_id, |id| id.into_actor_id()),
self,
Anonymous,
)
.await
}
}
#[async_trait]
impl<A: Actor> IntoChild for A {
async fn into_child(
self,
id: Option<ActorId>,
ctx: &mut ActorContext,
) -> Result<LocalActorRef<Self>, ActorRefErr> {
ctx.spawn(id.unwrap_or_else(new_actor_id), self).await
}
}
pub struct ScheduledNotify<A: Actor, M: Message> {
cancellation_token: CancellationToken,
_a: PhantomData<A>,
_m: PhantomData<M>,
}
impl<A: Actor, M: Message> ScheduledNotify<A, M> {
pub(crate) fn new(cancellation_token: CancellationToken) -> Self {
ScheduledNotify {
cancellation_token,
_a: PhantomData,
_m: PhantomData,
}
}
pub fn cancel(&self) {
self.cancellation_token.cancel();
}
}
impl<A: Actor> LocalActorRef<A> {
pub fn scheduled_notify<M: Message>(&self, message: M, delay: Duration) -> ScheduledNotify<A, M>
where
A: Handler<M>,
{
let actor_ref = self.clone();
let cancellation_token = CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();
let _join_handle = tokio::spawn(async move {
tokio::select! {
_ = cancellation_token_clone.cancelled() => { }
_ = tokio::time::sleep(delay) => {
let _ = actor_ref.notify(message);
}
}
});
ScheduledNotify::new(cancellation_token)
}
}
pub type ActorTag = Box<str>;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ActorTags {
None,
Tag(ActorTag),
Tags(Vec<ActorTag>),
}
impl From<&'static str> for ActorTags {
fn from(value: &'static str) -> Self {
Self::Tag(value.into())
}
}
impl From<String> for ActorTags {
fn from(value: String) -> Self {
Self::Tag(value.into())
}
}
impl From<Vec<&'static str>> for ActorTags {
fn from(value: Vec<&'static str>) -> Self {
Self::Tags(value.into_iter().map(|s| s.into()).collect())
}
}
impl From<Vec<String>> for ActorTags {
fn from(value: Vec<String>) -> Self {
Self::Tags(value.into_iter().map(|s| s.into()).collect())
}
}
pub fn new_actor_id() -> ActorId {
Uuid::new_v4().into_actor_id()
}
pub trait IntoActorId {
fn into_actor_id(self) -> ActorId;
}
pub trait ToActorId {
fn to_actor_id(&self) -> ActorId;
}
pub trait IntoActorPath {
fn into_actor_path(self) -> ActorPath;
}
impl<T: ToString + Send + Sync> IntoActorId for T {
fn into_actor_id(self) -> ActorId {
self.to_string().into()
}
}
impl<T: ToString + Send + Sync> ToActorId for T {
fn to_actor_id(&self) -> ActorId {
self.to_string().into()
}
}
impl<T: ToString + Send + Sync> IntoActorPath for T {
fn into_actor_path(self) -> ActorPath {
self.to_string().into()
}
}