use std::{future::Future, sync::Arc};
use tokio::sync::{mpsc, oneshot, watch, OwnedSemaphorePermit, Semaphore, TryAcquireError};
use crate::{
actor::InternalHandler,
envelope::SendMessage,
executor::Executor,
message::{AnonymousTaskCancelled, AsyncHandle, DeadActor},
single::{AskRx, AsyncAskRx, Noop},
Actor, ActorRef, AnonymousRef, Ask, AsyncAsk, DeadActorResult, Handler, Message,
};
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ActorState {
Running,
Stopping,
Stopped,
}
impl std::fmt::Display for ActorState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ActorState::Running => write!(f, "Running"),
ActorState::Stopping => write!(f, "Stopping"),
ActorState::Stopped => write!(f, "Stopped"),
}
}
}
#[derive(Debug, Clone)]
pub(crate) enum SupervisorMessage {
Shutdown,
}
pub struct AnonymousActor<T> {
pub(crate) result: Option<T>,
receiver: watch::Receiver<Option<SupervisorMessage>>,
}
impl<T> AnonymousActor<T> {
pub(crate) fn new(receiver: watch::Receiver<Option<SupervisorMessage>>) -> Self {
Self {
result: None,
receiver,
}
}
pub(crate) async fn handle<F>(mut self, f: F) -> Self
where
F: Future<Output = T> + Send + 'static,
{
let is_reciever_updated = self.receiver.borrow_and_update().is_some();
if is_reciever_updated {
self
} else {
tokio::select! {
_ = self.receiver.changed() => {},
result = f => {
self.result = Some(result);
},
};
self
}
}
}
pub enum IntoFutureSender<A: Actor> {
Now(oneshot::Sender<A>),
UponCompletion(oneshot::Sender<A>),
}
impl<A: Actor> IntoFutureSender<A> {
pub fn into_inner(self) -> oneshot::Sender<A> {
match self {
IntoFutureSender::Now(inner) => inner,
IntoFutureSender::UponCompletion(inner) => inner,
}
}
}
pub struct Ctx<A: Actor> {
address: ActorRef<A>,
pub(crate) mailbox: mpsc::Receiver<Box<dyn SendMessage<A>>>,
pub(crate) notifier: watch::Sender<Option<SupervisorMessage>>,
pub(crate) state: ActorState,
pub(crate) max_anonymous_actors: Arc<Semaphore>,
pub(crate) overflow_anonymous_actors: usize,
pub(crate) into_future_sender: Option<IntoFutureSender<A>>,
}
impl<A: Actor> Ctx<A> {
pub(crate) fn new() -> Ctx<A> {
let (tx, rx) = mpsc::channel(A::mailbox_size());
let (notifier, _) = watch::channel(None);
let max_anonymous_actors = Semaphore::new(A::max_anonymous_actors());
Self {
address: ActorRef::new(tx),
mailbox: rx,
state: ActorState::Running,
into_future_sender: None,
max_anonymous_actors: Arc::new(max_anonymous_actors),
overflow_anonymous_actors: 0,
notifier,
}
}
pub fn run(self, actor: A) -> ActorRef<A> {
let address = self.address.clone();
let executor = Executor::new(actor, self);
tokio::spawn(executor.run_actor());
address
}
pub fn spawn_with<C>(&self, ctx: Ctx<C>, actor: C) -> ActorRef<C>
where
A: Handler<DeadActorResult<C>>,
C: Actor,
{
tracing::info!(parent = A::name(), actor = C::name(), "spawning");
if self.state != ActorState::Running {
panic!("Can't start an actor when stopped or stopping");
}
let address = ctx.address.clone();
let supervisor = self.address.clone();
let executor = Executor::child(actor, ctx, self.notifier.subscribe());
tokio::spawn(async move {
match tokio::spawn(executor.run_supervised_actor()).await {
Ok(mut executor) => {
if let Some(sender) = executor.context.into_future_sender.take() {
if let Err(actor) = sender.into_inner().send(executor.actor) {
tracing::error!(
parent = A::name(),
actor = C::name(),
"reciever dropped"
);
executor.actor = actor;
} else {
return;
}
}
let (_rx, dead_actor) = executor.into_dead_actor();
if (supervisor.send_async(Ok(dead_actor)).await).is_err() {
unreachable!("Tried to send dead actor {}, but supervisor {} failed to accept message", C::name(), A::name())
}
}
Err(err) => {
if err.is_cancelled() {
let _ = supervisor.send_async(DeadActor::cancelled(err)).await;
} else if err.is_panic() {
let _ = supervisor.send_async(DeadActor::panic(err)).await;
} else {
unreachable!("Tokio tasked failed in unknown way. This shouldn't happen");
}
}
};
});
address
}
pub fn spawn<C>(&self, actor: C) -> ActorRef<C>
where
A: Handler<DeadActorResult<C>>,
C: Actor,
{
self.spawn_with(Ctx::new(), actor)
}
pub(crate) fn spawn_anonymous_rx_handle<In>(&self, rx: mpsc::Receiver<In>)
where
A: Handler<In>,
In: Message,
{
if self.state != ActorState::Running {
panic!("Can't start an actor when stopped or stopping");
}
let ctx = Ctx::new();
let address = self.address();
let executor = Executor::child(Noop(), ctx, self.notifier.subscribe());
tokio::spawn(executor.child_with_custom_handle_rx(address, rx));
}
pub(crate) fn spawn_anonymous_rx_ask<In>(&self, rx: AskRx<In, A>)
where
A: Actor + Ask<In>,
In: Message,
{
if self.state != ActorState::Running {
panic!("Can't start an actor when stopped or stopping");
}
let ctx = Ctx::new();
let address = self.address();
let executor = Executor::child(Noop(), ctx, self.notifier.subscribe());
tokio::spawn(executor.child_with_custom_ask_rx(address, rx));
}
pub(crate) fn spawn_anonymous_rx_async<In>(&self, rx: AsyncAskRx<In, A>)
where
A: AsyncAsk<In>,
In: Message,
{
if self.state != ActorState::Running {
panic!("Can't start an actor when stopped or stopping");
}
let ctx = Ctx::new();
let address = self.address();
let executor = Executor::child(Noop(), ctx, self.notifier.subscribe());
tokio::spawn(executor.child_with_custom_async_ask_rx(address, rx));
}
fn acquire_anonymous_permit(&mut self) -> Option<OwnedSemaphorePermit> {
match self.max_anonymous_actors.clone().try_acquire_owned() {
Ok(permit) => Some(permit),
Err(TryAcquireError::NoPermits) => {
self.overflow_anonymous_actors += 1;
None
}
Err(TryAcquireError::Closed) => unreachable!(),
}
}
pub fn anonymous<F>(&mut self, future: F) -> AnonymousRef
where
F: Future + Sync + Send + 'static,
F::Output: Message + Send + 'static,
A: Handler<F::Output> + InternalHandler<AnonymousTaskCancelled>,
{
let supervisor = self.address.clone();
let actor = AnonymousActor::new(self.notifier.subscribe());
let permit = self.acquire_anonymous_permit();
let handle = tokio::spawn(async move {
let _permit = permit;
match tokio::spawn(actor.handle(future)).await {
Ok(inner) => {
match inner.result {
Some(message) => {
let _ = supervisor.send_async(message).await;
}
None => {
let _ = supervisor
.internal_send_async(AnonymousTaskCancelled::Cancel)
.await;
}
}
}
Err(_) => {
tracing::error!(parent = A::name(), actor = "anonymous", "actor paniced");
let _ = supervisor
.internal_send_async(AnonymousTaskCancelled::Panic)
.await;
}
};
});
AnonymousRef::new(handle)
}
pub fn anonymous_task<F>(&mut self, future: F) -> AnonymousRef
where
F: Future<Output = ()> + Sync + Send + 'static,
{
let supervisor = self.address.clone();
let actor = AnonymousActor::new(self.notifier.subscribe());
let permit = self.acquire_anonymous_permit();
let handle = tokio::spawn(async move {
let _permit = permit;
let _ = tokio::spawn(actor.handle(future)).await;
let _ = supervisor
.internal_send_async(AnonymousTaskCancelled::Success)
.await;
});
AnonymousRef::new(handle)
}
pub fn anonymous_handle<F>(&mut self, future: F) -> AsyncHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Message + Send + 'static,
{
let actor = AnonymousActor::<F::Output>::new(self.notifier.subscribe());
let permit = self.acquire_anonymous_permit();
let inner = tokio::spawn(async move {
let _permit = permit;
actor.handle(future).await
});
AsyncHandle(inner)
}
pub fn address(&self) -> ActorRef<A> {
self.address.clone()
}
pub(crate) fn subscribe_and_stop(&mut self, tx: oneshot::Sender<A>) {
self.into_future_sender = Some(IntoFutureSender::Now(tx));
if matches!(self.state, ActorState::Running) {
self.stop();
}
}
pub(crate) fn subscribe_and_wait(&mut self, tx: oneshot::Sender<A>) {
self.into_future_sender = Some(IntoFutureSender::UponCompletion(tx));
}
}
pub trait ActorContext {
fn stop(&mut self);
fn abort(&mut self);
}
impl<A: Actor> ActorContext for Ctx<A> {
fn stop(&mut self) {
if self.state == ActorState::Running {
self.state = ActorState::Stopping;
}
}
fn abort(&mut self) {
if matches!(self.state, ActorState::Running | ActorState::Stopping) {
self.mailbox.close();
self.state = ActorState::Stopped;
}
}
}
#[cfg(test)]
impl<A: Actor> Drop for Ctx<A> {
fn drop(&mut self) {
if let Err(err) = self.mailbox.try_recv() {
assert_eq!(
err,
mpsc::error::TryRecvError::Empty,
"Actors mailbox should be empty"
);
} else {
eprintln!(
"MAILBOX FOR ACTOR SHOULD BE CLOSED. ACTOR {} PROBABLY PANICED",
A::name()
);
}
if self.state == ActorState::Running {
eprintln!(
"ACTOR {} IS BEING DESTORIED IN A RUNNING STATE. PROBABLY PANICED",
A::name()
);
} else {
assert_eq!(
self.state,
ActorState::Stopped,
"Actor {} is not in correct state",
A::name()
);
}
assert_eq!(
self.notifier.receiver_count(),
0,
"All Actor children should be dead"
);
assert!(
self.into_future_sender.is_none(),
"Actor should not be waiting for the future"
);
}
}
#[cfg(test)]
mod tests {
use std::{collections::VecDeque, time::Duration};
use crate::{
message::ChildError, Actor, ActorContext, ActorRef, Ctx, DeadActorResult, Handler,
IntoFutureError,
};
#[derive(Debug, PartialEq, Eq)]
enum ActorLifecycle {
Start,
PreRun,
PostRun,
Stopping,
Stopped,
End,
}
trait DebugActor: Send + Sync + 'static {
const DEBUG_KIND: &'static str;
fn start(self) -> ActorRef<DebuggableActor<Self>>
where
Self: Default + Send + Sync + 'static,
{
Ctx::new().run(DebuggableActor::default())
}
}
#[derive(Default)]
struct DebuggableActor<A: Send + Sync + 'static> {
state: VecDeque<ActorLifecycle>,
messages: VecDeque<TestMessage>,
inner: A,
}
impl<A: Send + Sync + 'static> DebuggableActor<A> {
fn push_state(&mut self, state: ActorLifecycle) {
self.state.push_back(state);
}
fn push_message(&mut self, message: TestMessage) {
self.messages.push_back(message)
}
fn shift_state(&mut self) -> Option<ActorLifecycle> {
self.state.pop_front()
}
fn shift_message(&mut self) -> Option<TestMessage> {
self.messages.pop_front()
}
fn expect_message(&mut self, msg: TestMessage) {
assert_eq!(self.shift_state(), Some(ActorLifecycle::PreRun));
assert_eq!(self.shift_message(), Some(msg));
assert_eq!(self.shift_state(), Some(ActorLifecycle::PostRun));
}
fn expect_system_message(&mut self) {
assert_eq!(self.shift_state(), Some(ActorLifecycle::PreRun));
assert_eq!(self.shift_state(), Some(ActorLifecycle::PostRun));
}
fn expect_start(&mut self) {
assert_eq!(self.shift_state(), Some(ActorLifecycle::Start));
}
fn expect_stopping(&mut self) {
assert_eq!(self.shift_state(), Some(ActorLifecycle::Stopping));
}
fn expect_stopped(&mut self) {
assert_eq!(self.shift_state(), Some(ActorLifecycle::Stopped));
}
fn expect_end(&mut self) {
assert_eq!(self.shift_state(), Some(ActorLifecycle::End));
}
fn expect_stopped_event_in_state(&mut self) {
let index = self
.state
.iter()
.position(|p| matches!(p, ActorLifecycle::Stopped));
self.state.remove(index.unwrap());
}
fn is_empty(&mut self) {
assert_eq!(self.shift_message(), None);
assert_eq!(self.shift_state(), None);
}
fn inner(self) -> A {
self.inner
}
}
impl<A: Send + Sync + 'static> Actor for DebuggableActor<A> {
fn pre_run(&mut self, _: &mut Ctx<Self>) {
self.push_state(ActorLifecycle::PreRun)
}
fn post_run(&mut self, _: &mut Ctx<Self>) {
self.push_state(ActorLifecycle::PostRun)
}
fn on_stopping(&mut self, _: &mut Ctx<Self>) {
self.push_state(ActorLifecycle::Stopping)
}
fn on_stopped(&mut self, _: &mut Ctx<Self>) {
self.push_state(ActorLifecycle::Stopped)
}
fn on_end(&mut self, _: &mut Ctx<Self>) {
self.push_state(ActorLifecycle::End)
}
fn start(self) -> crate::ActorRef<Self>
where
Self: Actor,
{
Ctx::new().run(self)
}
fn on_start(&mut self, _: &mut Ctx<Self>)
where
Self: Actor,
{
self.push_state(ActorLifecycle::Start)
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum TestResult {
Ok(usize),
Panic,
Cancel,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum TestMessage {
Normal,
Spawn(usize, Option<Box<TestMessage>>),
SpawnAnonymous(u64, usize),
SpawnAnonymousTask(u64),
DespawnAnonymous(usize),
Despawn(TestResult),
Stop,
Abort,
Panic,
}
impl<A: Send + Sync + 'static> Handler<TestMessage> for DebuggableActor<A> {
fn handle(&mut self, message: TestMessage, context: &mut Ctx<Self>) {
self.push_message(message.clone());
match message {
TestMessage::Normal => {}
TestMessage::Spawn(num, mut opt) => {
let inner = ChildActor { inner: num };
let addr = context.spawn(DebuggableActor {
state: Default::default(),
messages: Default::default(),
inner,
});
if let Some(msg) = opt.take() {
addr.try_send(*msg);
}
}
TestMessage::SpawnAnonymous(time, num) => {
context.anonymous(async move {
tokio::time::sleep(Duration::from_millis(time)).await;
TestMessage::DespawnAnonymous(num)
});
}
TestMessage::SpawnAnonymousTask(time) => {
context.anonymous_task(tokio::time::sleep(Duration::from_millis(time)));
}
TestMessage::Stop => context.stop(),
TestMessage::Abort => context.abort(),
TestMessage::Panic => panic!("AAAHHH"),
_ => {
}
}
}
}
type DeadChildActor = DeadActorResult<DebuggableActor<ChildActor>>;
impl<A: Send + Sync + 'static> Handler<DeadChildActor> for DebuggableActor<A> {
fn handle(&mut self, message: DeadChildActor, _context: &mut Ctx<Self>) {
let msg = match message {
Ok(actor) => TestMessage::Despawn(TestResult::Ok(actor.actor.inner().inner)),
Err(ChildError::Panic(_)) => TestMessage::Despawn(TestResult::Panic),
Err(ChildError::Cancelled(_)) => TestMessage::Despawn(TestResult::Cancel),
};
self.push_message(msg);
}
}
#[derive(Default)]
struct ParentActor {}
impl DebugActor for ParentActor {
const DEBUG_KIND: &'static str = "ParentActor";
}
struct ChildActor {
inner: usize,
}
impl DebugActor for ChildActor {
const DEBUG_KIND: &'static str = "ChildActor";
}
#[test]
fn size_of_context() {
assert_eq!(64, std::mem::size_of::<Ctx<DebuggableActor<ParentActor>>>())
}
async fn start_message_and_stop_test_actor<Inner: DebugActor + Default>(
messages: &[TestMessage],
) -> DebuggableActor<Inner> {
let addr = Inner::default().start();
for msg in messages {
addr.send(msg.clone()).unwrap();
}
addr.await.unwrap()
}
#[tokio::test]
async fn run_actor_to_complition() {
let addr = Ctx::new().run(DebuggableActor::<ParentActor>::default());
let addr2 = addr.clone();
let mut actor = addr.await.unwrap();
actor.expect_start();
actor.expect_system_message(); actor.expect_stopping();
actor.expect_stopped();
actor.expect_end();
actor.is_empty();
assert_eq!(addr2.await.err(), Some(IntoFutureError::MailboxClosed));
}
#[tokio::test]
async fn stop_running_actor() {
use TestMessage::*;
let messages = vec![Normal, Stop, Normal, Normal];
let mut actor: DebuggableActor<ParentActor> =
start_message_and_stop_test_actor::<ParentActor>(&messages).await;
actor.expect_start();
actor.expect_message(Normal);
actor.expect_message(Stop);
actor.expect_stopping();
actor.expect_stopped();
actor.expect_message(Normal);
actor.expect_message(Normal);
actor.expect_system_message(); actor.expect_end();
actor.is_empty();
}
#[tokio::test]
async fn abort_running_actor() {
use TestMessage::*;
let messages = vec![Normal, Abort, Normal, Normal];
let mut actor = start_message_and_stop_test_actor::<ParentActor>(&messages).await;
actor.expect_start();
actor.expect_message(Normal);
actor.expect_message(Abort);
actor.expect_stopping();
actor.expect_stopped();
actor.expect_message(Normal);
actor.expect_message(Normal);
actor.expect_system_message();
actor.expect_end();
actor.is_empty();
}
#[tokio::test]
#[should_panic]
async fn panic_during_actor_running() {
use TestMessage::*;
let messages = vec![Normal, Panic, Normal, Normal];
let _ = start_message_and_stop_test_actor::<ParentActor>(&messages).await;
}
#[tokio::test]
async fn run_parent_and_child_to_complition() {
let addr = DebuggableActor::<ParentActor>::default().start();
addr.try_send(TestMessage::Spawn(1, None));
let mut debuggable = addr.await.unwrap();
debuggable.expect_start();
debuggable.expect_message(TestMessage::Spawn(1, None));
debuggable.expect_system_message(); debuggable.expect_stopping();
debuggable.expect_message(TestMessage::Despawn(TestResult::Ok(1)));
debuggable.expect_stopped();
debuggable.expect_end();
debuggable.is_empty();
}
#[tokio::test]
async fn run_parent_and_many_child_to_complition() {
let range = 1..=10;
let addr = DebuggableActor::<ParentActor>::default().start();
for i in range.clone() {
addr.send_async(TestMessage::Spawn(i, None)).await.unwrap();
}
let mut debuggable = addr.await.unwrap();
debuggable.expect_start();
for i in range.clone() {
debuggable.expect_message(TestMessage::Spawn(i, None));
}
debuggable.expect_system_message(); debuggable.expect_stopping();
let mut list = vec![];
while let Some(msg) = debuggable.shift_message() {
list.push(msg);
}
list.sort();
debuggable.expect_stopped_event_in_state();
for i in range.rev() {
assert_eq!(list.pop(), Some(TestMessage::Despawn(TestResult::Ok(i))));
debuggable.expect_system_message(); }
debuggable.expect_end();
debuggable.is_empty();
}
#[tokio::test]
async fn child_actor_stops_by_itself() {
let addr = DebuggableActor::<ParentActor>::default().start();
addr.try_send(TestMessage::Spawn(1, Some(Box::new(TestMessage::Stop))));
let mut debuggable = addr.await.unwrap();
debuggable.expect_start();
debuggable.expect_message(TestMessage::Spawn(1, Some(Box::new(TestMessage::Stop))));
debuggable.expect_system_message(); debuggable.expect_stopping();
debuggable.expect_message(TestMessage::Despawn(TestResult::Ok(1)));
debuggable.expect_stopped();
debuggable.expect_end();
debuggable.is_empty();
}
#[tokio::test]
async fn child_actor_panics() {
let addr = DebuggableActor::<ParentActor>::default().start();
addr.try_send(TestMessage::Spawn(1, Some(Box::new(TestMessage::Panic))));
let mut debuggable = addr.await.unwrap();
debuggable.expect_start();
debuggable.expect_message(TestMessage::Spawn(1, Some(Box::new(TestMessage::Panic))));
debuggable.expect_system_message(); debuggable.expect_stopping();
debuggable.expect_message(TestMessage::Despawn(TestResult::Panic));
debuggable.expect_stopped();
debuggable.expect_end();
debuggable.is_empty();
}
#[tokio::test]
async fn run_parent_and_anonymous_actor_to_complition() {
let addr = DebuggableActor::<ParentActor>::default().start();
addr.try_send(TestMessage::SpawnAnonymous(500, 1));
tokio::time::sleep(Duration::from_millis(600)).await;
let mut debuggable = addr.await.unwrap();
debuggable.expect_start();
debuggable.expect_message(TestMessage::SpawnAnonymous(500, 1));
debuggable.expect_system_message(); debuggable.expect_message(TestMessage::DespawnAnonymous(1));
debuggable.expect_stopping();
debuggable.expect_stopped();
debuggable.expect_end();
debuggable.is_empty();
}
#[tokio::test]
async fn run_parent_and_cancel_anonymous_actor() {
let addr = DebuggableActor::<ParentActor>::default().start();
addr.try_send(TestMessage::SpawnAnonymous(1000, 1));
let mut debuggable = addr.await.unwrap();
debuggable.expect_start();
debuggable.expect_message(TestMessage::SpawnAnonymous(1000, 1));
debuggable.expect_system_message(); debuggable.expect_stopping();
debuggable.expect_system_message(); debuggable.expect_stopped();
debuggable.expect_end();
debuggable.is_empty();
}
#[tokio::test]
async fn run_parent_and_anonymous_task_to_complition() {
let addr = DebuggableActor::<ParentActor>::default().start();
addr.try_send(TestMessage::SpawnAnonymousTask(100));
tokio::time::sleep(Duration::from_millis(50)).await;
let mut debuggable = addr.await.unwrap();
debuggable.expect_start();
debuggable.expect_message(TestMessage::SpawnAnonymousTask(100));
debuggable.expect_system_message(); debuggable.expect_stopping();
debuggable.expect_system_message(); debuggable.expect_stopped();
debuggable.expect_end();
debuggable.is_empty();
}
#[tokio::test]
async fn run_parent_and_cancel_anonymous_task() {
let addr = DebuggableActor::<ParentActor>::default().start();
addr.try_send(TestMessage::SpawnAnonymousTask(1000));
let mut debuggable = addr.await.unwrap();
debuggable.expect_start();
debuggable.expect_message(TestMessage::SpawnAnonymousTask(1000));
debuggable.expect_system_message(); debuggable.expect_stopping();
debuggable.expect_system_message(); debuggable.expect_stopped();
debuggable.expect_end();
debuggable.is_empty();
}
}