use actor_system::*;
use async_trait::async_trait;
use flume::*;
mod actor_system;
pub mod prelude {
pub use crate::actor_system::ActorSystem;
pub use crate::{
Actor, ActorBehavior, ActorBehaviorAsync, ActorExt, ActorHooks, ActorHooksAsync, Addr,
Address, Chan, Channel, Context, HasLength, MappedChannel,
};
#[cfg(feature = "macros")]
pub use tractor_macros::*;
}
pub trait Actor: Sized + Send + 'static {
type Msg: Send;
}
pub trait ActorBehavior: Actor {
fn handle(&mut self, msg: Self::Msg, ctx: &Context<Self>);
}
pub trait ActorHooks: Actor {
fn started(&mut self, _ctx: &Context<Self>) {}
fn stopped(&mut self) {}
}
#[async_trait]
pub trait ActorHooksAsync: Actor {
async fn started(&mut self, _ctx: &Context<Self>) {}
async fn stopped(&mut self) {}
}
#[async_trait]
pub trait ActorBehaviorAsync: Actor {
async fn handle(&mut self, msg: Self::Msg, ctx: &Context<Self>);
}
#[async_trait]
impl<T: ActorBehavior> ActorBehaviorAsync for T {
async fn handle(&mut self, msg: <Self as Actor>::Msg, ctx: &Context<Self>) {
<Self as ActorBehavior>::handle(self, msg, ctx);
}
}
#[async_trait]
impl<T: ActorHooks> ActorHooksAsync for T {
async fn started(&mut self, ctx: &Context<Self>) {
<Self as ActorHooks>::started(self, ctx);
}
async fn stopped(&mut self) {
<Self as ActorHooks>::stopped(self);
}
}
pub trait ActorExt: Actor + ActorBehaviorAsync + ActorHooksAsync {
fn start(self) -> Addr<Self> {
let (sender, receiver) = unbounded::<ActorMessage<Self::Msg>>();
let mailbox = Mailbox {
receiver,
consumed: false,
};
let addr = Addr(Chan(sender));
let context = Context {
myself: addr.clone(),
};
ActorSystem::spawn_actor(actor_loop(self, context, mailbox));
addr
}
}
impl<T: Actor + ActorBehaviorAsync + ActorHooksAsync> ActorExt for T {}
pub trait HasLength {
fn len(&self) -> usize;
}
pub trait Channel<T: Sized>: HasLength + Send {
fn send(&self, msg: T);
}
pub struct MappedChannel<FROM, TO, CHAN, MAP>
where
FROM: Send,
TO: Send,
CHAN: Channel<FROM>,
MAP: Fn(TO) -> FROM + Send,
{
channel: CHAN,
map: MAP,
_from: std::marker::PhantomData<FROM>,
_to: std::marker::PhantomData<TO>,
}
impl<FROM, TO, CHAN, MAP> MappedChannel<FROM, TO, CHAN, MAP>
where
FROM: Send,
TO: Send,
CHAN: Channel<FROM>,
MAP: Fn(TO) -> FROM + Send,
{
pub fn from(channel: CHAN, map: MAP) -> Self {
Self {
channel,
map,
_from: std::marker::PhantomData,
_to: std::marker::PhantomData,
}
}
}
impl<FROM, TO, CHAN, MAP> HasLength for MappedChannel<FROM, TO, CHAN, MAP>
where
FROM: Send,
TO: Send,
CHAN: Channel<FROM>,
MAP: Fn(TO) -> FROM + Send,
{
fn len(&self) -> usize {
self.channel.len()
}
}
impl<FROM, TO, CHAN, MAP> Channel<TO> for MappedChannel<FROM, TO, CHAN, MAP>
where
FROM: Send,
TO: Send,
CHAN: Channel<FROM>,
MAP: Fn(TO) -> FROM + Send,
{
fn send(&self, msg: TO) {
self.channel.send((self.map)(msg));
}
}
pub trait Address<T: Actor>: HasLength {
fn send(&self, msg: T::Msg);
}
enum ActorMessage<T: Sized + Send> {
Ref,
UnRef,
Msg(T),
}
pub struct Chan<T: Send>(Sender<ActorMessage<T>>);
impl<T: Send> Clone for Chan<T> {
fn clone(&self) -> Self {
let () = self.0.send(ActorMessage::Ref).unwrap();
Self(self.0.clone())
}
}
impl<T: Send> HasLength for Chan<T> {
fn len(&self) -> usize {
self.0.len()
}
}
impl<T: Send> Channel<T> for Chan<T> {
fn send(&self, msg: T) {
let () = self.0.send(ActorMessage::Msg(msg)).unwrap();
}
}
impl<T: Send> Drop for Chan<T> {
fn drop(&mut self) {
let () = self.0.send(ActorMessage::UnRef).unwrap();
}
}
pub struct Addr<T: Actor>(Chan<T::Msg>);
impl<T: Actor> Clone for Addr<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T: Actor> HasLength for Addr<T> {
fn len(&self) -> usize {
self.0.len()
}
}
impl<T: Actor> Address<T> for Addr<T> {
fn send(&self, msg: T::Msg) {
self.0.send(msg);
}
}
impl<T: Actor> Addr<T> {
pub fn as_chan(&self) -> &Chan<T::Msg> {
&self.0
}
pub fn chan(&self) -> Chan<T::Msg> {
self.0.clone()
}
pub fn into_chan(self) -> Chan<T::Msg> {
self.0
}
}
pub struct Context<T: Actor> {
myself: Addr<T>,
}
impl<T: Actor> Context<T> {
pub fn myself(&self) -> &Addr<T> {
&self.myself
}
}
pub(crate) struct Mailbox<T: Send + Sized> {
pub(crate) receiver: Receiver<ActorMessage<T>>,
pub(crate) consumed: bool,
}
async fn process_messages<T>(actor: &mut T, mailbox: &mut Mailbox<T::Msg>, context: Context<T>)
where
T: Actor + ActorBehaviorAsync,
{
let mut ref_cnt: usize = 0;
let mut next_msg = mailbox.receiver.recv_async().await.ok();
loop {
match next_msg.take() {
Some(ActorMessage::Msg(msg)) => actor.handle(msg, &context).await,
Some(ActorMessage::Ref) => {
ref_cnt += 1;
}
Some(ActorMessage::UnRef) => {
ref_cnt -= 1;
}
None => {
if ref_cnt == 0 {
mailbox.consumed = true;
break;
} else {
panic!("Mailbox was closed prematurely");
}
}
}
if ActorSystem::is_terminating() {
break;
}
next_msg = if ref_cnt > 0 {
mailbox.receiver.recv_async().await.ok()
} else {
mailbox.receiver.try_recv().ok()
};
}
drop(context);
}
async fn actor_loop<T: Actor + ActorBehaviorAsync + ActorHooksAsync>(
mut actor: T,
context: Context<T>,
mut mailbox: Mailbox<T::Msg>,
) -> (T, Mailbox<T::Msg>) {
let () = actor.started(&context).await;
let () = process_messages(&mut actor, &mut mailbox, context).await;
let () = actor.stopped().await;
(actor, mailbox)
}