use crate::forwarders::StreamForwarder;
use crate::handlers::{
Consumer, Eliminated, Envelope, Interaction, InteractionDone, InteractionTask, InterruptedBy,
Operation, StartedBy, TaskEliminated,
};
use crate::ids::{Id, IdOf};
use crate::lifecycle::{Awake, Done, LifecycleNotifier, LifetimeTracker};
use crate::linkage::{Address, AddressJoint, AddressPair};
use crate::lite_runtime::{self, LiteTask, Tag, TaskAddress};
use anyhow::Error;
use async_trait::async_trait;
use futures::stream::{pending, FusedStream};
use futures::{select_biased, FutureExt, Stream, StreamExt};
use std::hash::Hash;
use thiserror::Error;
#[derive(Debug, Error)]
enum Reason {
#[error("Actor is terminating...")]
Terminating,
}
pub trait TerminationSequence: Sized {
fn termination_sequence() -> Vec<Self>;
}
impl TerminationSequence for () {
fn termination_sequence() -> Vec<Self> {
vec![()]
}
}
#[async_trait]
pub trait Actor: Sized + Send + 'static {
type GroupBy: TerminationSequence + Clone + Send + Eq + Hash;
fn log_target(&self) -> &str;
#[doc(hidden)] async fn queue_drained(&mut self, _ctx: &mut Context<Self>) -> Result<(), Error> {
Ok(())
}
#[doc(hidden)] async fn instant_queue_drained(&mut self, _ctx: &mut Context<Self>) -> Result<(), Error> {
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Status {
Alive,
Stop,
}
impl Status {
pub fn is_done(&self) -> bool {
*self == Status::Stop
}
}
pub(crate) fn spawn<A, S>(actor: A, supervisor: Option<Address<S>>, address_pair: AddressPair<A>)
where
A: Actor + StartedBy<S>,
S: Actor + Eliminated<A>,
{
let AddressPair { joint, address } = address_pair;
let id: Id = address.id().into();
let awake_envelope = Envelope::instant(Awake::new());
let done_notifier = {
match supervisor {
None => <dyn LifecycleNotifier<_>>::ignore(),
Some(super_addr) => {
let op = Operation::Done { id };
<dyn LifecycleNotifier<_>>::once(super_addr, op)
}
}
};
let context = Context {
alive: true,
address: address.clone(),
lifetime_tracker: LifetimeTracker::new(),
};
let runtime = ActorRuntime {
id: address.id(),
actor,
context,
awake_envelope: Some(awake_envelope),
done_notifier,
joint,
};
crate::compat::spawn_async(runtime.entrypoint());
}
pub struct Context<A: Actor> {
alive: bool,
address: Address<A>,
lifetime_tracker: LifetimeTracker<A>,
}
impl<A: Actor> Context<A> {
pub fn address(&mut self) -> &mut Address<A> {
&mut self.address
}
pub fn spawn_actor_with_addr<T>(&mut self, actor: T, pair: AddressPair<T>, group: A::GroupBy)
where
T: Actor + StartedBy<A> + InterruptedBy<A>,
A: Eliminated<T>,
{
let address = pair.address().clone();
spawn(actor, Some(self.address.clone()), pair);
self.lifetime_tracker.insert(address, group);
}
pub fn spawn_actor<T>(&mut self, actor: T, group: A::GroupBy) -> Address<T>
where
T: Actor + StartedBy<A> + InterruptedBy<A>,
A: Eliminated<T>,
{
let pair = AddressPair::new();
let address = pair.address().clone();
self.spawn_actor_with_addr(actor, pair, group);
address
}
pub fn spawn_task<T, M>(&mut self, task: T, tag: M, group: A::GroupBy) -> TaskAddress<T>
where
T: LiteTask,
A: TaskEliminated<T, M>,
M: Tag,
{
let stopper = lite_runtime::spawn(task, tag, Some(self.address.clone()));
self.lifetime_tracker.insert_task(stopper.clone(), group);
stopper
}
pub fn attach<S, M>(&mut self, stream: S, tag: M, group: A::GroupBy)
where
S: Stream + Unpin + Send + 'static,
S::Item: Send,
A: Consumer<S::Item>,
M: Tag,
{
let forwarder = StreamForwarder::new(stream, self.address.clone());
self.spawn_task(forwarder, tag, group);
}
pub fn track_interaction<I, M>(&mut self, task: InteractionTask<I>, tag: M, group: A::GroupBy)
where
I: Interaction,
A: InteractionDone<I, M>,
M: Tag,
{
self.spawn_task(task, tag, group);
}
pub fn interrupt<T>(&mut self, address: &mut Address<T>) -> Result<(), Error>
where
T: Actor + InterruptedBy<A>,
{
address.interrupt_by()
}
pub fn not_terminating(&self) -> Result<(), Error> {
if self.is_terminating() {
Err(Reason::Terminating.into())
} else {
Ok(())
}
}
pub fn stop(&mut self) {
self.alive = false;
}
pub fn shutdown(&mut self) {
self.lifetime_tracker.start_termination();
if self.lifetime_tracker.is_finished() {
self.stop();
}
}
pub fn terminate_group(&mut self, group: A::GroupBy) {
self.lifetime_tracker.terminate_group(group)
}
pub fn is_terminating(&self) -> bool {
self.lifetime_tracker.is_terminating()
}
fn termination_sequence(&mut self, sequence: Vec<A::GroupBy>) {
self.lifetime_tracker.termination_sequence(sequence);
}
}
pub struct ActorRuntime<A: Actor> {
id: IdOf<A>,
actor: A,
context: Context<A>,
awake_envelope: Option<Envelope<A>>,
done_notifier: Box<dyn LifecycleNotifier<Done<A>>>,
joint: AddressJoint<A>,
}
impl<A: Actor> ActorRuntime<A> {
async fn entrypoint(mut self) {
log::info!(target: self.actor.log_target(), "Actor started: {}", self.id);
let mut awake_envelope = self
.awake_envelope
.take()
.expect("awake envelope has to be set in spawn method!");
let term_seq = A::GroupBy::termination_sequence();
self.context.termination_sequence(term_seq);
let awake_res = awake_envelope
.handle(&mut self.actor, &mut self.context)
.await;
match awake_res {
Ok(_) => {
self.routine().await;
}
Err(err) => {
log::error!(
target: self.actor.log_target(),
"Can't call awake notification handler of the actor {:?}: {}",
self.id,
err
);
}
}
log::info!(target: self.actor.log_target(), "Actor finished: {}", self.id);
let done_event = Done::new(self.id.clone());
if let Err(err) = self.done_notifier.notify(done_event) {
log::error!(
target: self.actor.log_target(),
"Can't send done notification from the actor {:?}: {}",
self.id,
err
);
}
if !self.joint.join_tx.is_closed() {
if let Err(_err) = self.joint.join_tx.send(Status::Stop) {
log::error!(target: self.actor.log_target(), "Can't release joiners of {}", self.id);
}
}
}
async fn routine(&mut self) {
let mut scheduled_queue = crate::compat::DelayQueue::<Envelope<A>>::new().fuse();
let mut pendel = pending();
while self.context.alive {
let maybe_queue: &mut (dyn FusedStream<Item = Result<_, _>> + Unpin + Send) =
if scheduled_queue.get_ref().len() > 0 {
&mut scheduled_queue
} else {
&mut pendel
};
select_biased! {
hp_envelope = self.joint.hp_msg_rx.recv().fuse() => {
if let Some(hp_env) = hp_envelope {
let envelope = hp_env.envelope;
let process_envelope;
match hp_env.operation {
Operation::Forward => {
process_envelope = Some(envelope);
}
Operation::Done { id } => {
self.context.lifetime_tracker.remove(&id);
if self.context.lifetime_tracker.is_finished() {
self.context.stop();
}
process_envelope = Some(envelope);
}
Operation::Schedule { deadline } => {
scheduled_queue.get_mut().insert_at(envelope, deadline);
log::trace!(target: self.actor.log_target(), "Scheduled events: {}", scheduled_queue.get_ref().len());
process_envelope = None;
}
}
if let Some(mut envelope) = process_envelope {
let handle_res = envelope.handle(&mut self.actor, &mut self.context).await;
if let Err(err) = handle_res {
log::error!(target: self.actor.log_target(), "Handler for {} (high-priority) failed: {}", self.id, err);
}
}
} else {
log::trace!(target: self.actor.log_target(), "Messages stream of {} (high-priority) drained.", self.id);
if let Err(err) = self.actor.instant_queue_drained(&mut self.context).await {
log::error!(target: self.actor.log_target(), "Queue (high-priority) drained handler {} failed: {}", self.id, err);
}
}
}
opt_delayed_envelope = maybe_queue.next() => {
if let Some(delayed_envelope) = opt_delayed_envelope {
match delayed_envelope {
Ok(expired) => {
log::trace!(target: self.actor.log_target(), "Execute scheduled event. Remained: {}", scheduled_queue.get_ref().len());
let mut envelope = expired.into_inner();
let handle_res = envelope.handle(&mut self.actor, &mut self.context).await;
if let Err(err) = handle_res {
log::error!(target: self.actor.log_target(), "Handler for {} (scheduled) failed: {}", self.id, err);
}
}
Err(err) => {
log::error!(target: self.actor.log_target(), "Failed scheduled execution for {}: {}", self.id, err);
}
}
} else {
log::error!(target: self.actor.log_target(), "Delay queue of {} closed.", self.id);
if let Err(err) = self.actor.instant_queue_drained(&mut self.context).await {
log::error!(target: self.actor.log_target(), "Queue (high-priority) drained handler {} failed: {}", self.id, err);
}
}
}
lp_envelope = self.joint.msg_rx.recv().fuse() => {
if let Some(mut envelope) = lp_envelope {
let handle_res = envelope.handle(&mut self.actor, &mut self.context).await;
if let Err(err) = handle_res {
log::error!(target: self.actor.log_target(), "Handler for {} failed: {}", self.id, err);
}
} else {
log::trace!(target: self.actor.log_target(), "Messages stream of {} drained.", self.id);
if let Err(err) = self.actor.queue_drained(&mut self.context).await {
log::error!(target: self.actor.log_target(), "Queue drained handler {} failed: {}", self.id, err);
}
}
}
}
}
}
}