use crate::compat::watch;
use crate::forwarders::StreamForwarder;
use crate::handlers::{
Consumer, Eliminated, Envelope, Interaction, InteractionDone, InteractionTask, InterruptedBy,
Operation, Parcel, StartedBy, TaskEliminated,
};
use crate::ids::{Id, IdOf};
use crate::lifecycle::{Awake, Done, LifecycleNotifier, LifetimeTracker};
use crate::linkage::Address;
use crate::lite_runtime::{self, LiteTask, Tag, TaskAddress};
use anyhow::Error;
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::stream::{pending, FusedStream};
use futures::{select_biased, Stream, StreamExt};
use std::hash::Hash;
use thiserror::Error;
use uuid::Uuid;
#[derive(Debug, Error)]
enum Reason {
#[error("Actor is terminating...")]
Terminating,
}
const MESSAGES_CHANNEL_DEPTH: usize = 32;
#[async_trait]
pub trait Actor: Sized + Send + 'static {
type GroupBy: Clone + Send + Eq + Hash;
fn name(&self) -> String {
let uuid = Uuid::new_v4();
format!("Actor:{}({})", std::any::type_name::<Self>(), uuid)
}
#[doc(hidden)]
async fn queue_drained(&mut self, _ctx: &mut Context<Self>) -> Result<(), Error> {
Ok(())
}
#[doc(hidden)]
async fn instant_queue_drained(&mut self, _ctx: &mut Context<Self>) -> Result<(), Error> {
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Status {
Alive,
Stop,
}
impl Status {
pub fn is_done(&self) -> bool {
*self == Status::Stop
}
}
pub(crate) fn spawn<A, S>(actor: A, supervisor: Option<Address<S>>) -> Address<A>
where
A: Actor + StartedBy<S>,
S: Actor + Eliminated<A>,
{
let id = Id::of_actor(&actor);
let (hp_msg_tx, hp_msg_rx) = mpsc::unbounded();
let (msg_tx, msg_rx) = mpsc::channel(MESSAGES_CHANNEL_DEPTH);
let (join_tx, join_rx) = watch::channel(Status::Alive);
let address = Address::new(id, hp_msg_tx, msg_tx, join_rx);
let id: Id = address.id().into();
let awake_envelope = Envelope::instant(Awake::new());
let done_notifier = {
match supervisor {
None => LifecycleNotifier::ignore(),
Some(super_addr) => {
let op = Operation::Done { id };
LifecycleNotifier::once(super_addr, op)
}
}
};
let context = Context {
alive: true,
address: address.clone(),
lifetime_tracker: LifetimeTracker::new(),
};
let runtime = ActorRuntime {
id: address.id(),
actor,
context,
awake_envelope: Some(awake_envelope),
done_notifier,
msg_rx,
hp_msg_rx,
join_tx,
};
crate::compat::spawn_async(runtime.entrypoint());
address
}
pub struct Context<A: Actor> {
alive: bool,
address: Address<A>,
lifetime_tracker: LifetimeTracker<A>,
}
impl<A: Actor> Context<A> {
pub fn address(&mut self) -> &mut Address<A> {
&mut self.address
}
pub fn spawn_actor<T>(&mut self, actor: T, group: A::GroupBy) -> Address<T>
where
T: Actor + StartedBy<A> + InterruptedBy<A>,
A: Eliminated<T>,
{
let address = spawn(actor, Some(self.address.clone()));
self.lifetime_tracker.insert(address.clone(), group);
address
}
pub fn spawn_task<T, M>(&mut self, task: T, tag: M, group: A::GroupBy) -> TaskAddress<T>
where
T: LiteTask,
A: TaskEliminated<T, M>,
M: Tag,
{
let stopper = lite_runtime::spawn(task, tag, Some(self.address.clone()));
self.lifetime_tracker.insert_task(stopper.clone(), group);
stopper
}
pub fn attach<S, M>(&mut self, stream: S, tag: M, group: A::GroupBy)
where
S: Stream + Unpin + Send + 'static,
S::Item: Send,
A: Consumer<S::Item>,
M: Tag,
{
let forwarder = StreamForwarder::new(stream, self.address.clone());
self.spawn_task(forwarder, tag, group);
}
pub fn track_interaction<I, M>(&mut self, task: InteractionTask<I>, tag: M, group: A::GroupBy)
where
I: Interaction,
A: InteractionDone<I, M>,
M: Tag,
{
self.spawn_task(task, tag, group);
}
pub fn interrupt<T>(&mut self, address: &mut Address<T>) -> Result<(), Error>
where
T: Actor + InterruptedBy<A>,
{
address.interrupt_by()
}
pub fn is_terminating(&self) -> bool {
self.lifetime_tracker.is_terminating()
}
pub fn not_terminating(&self) -> Result<(), Error> {
if self.is_terminating() {
Err(Reason::Terminating.into())
} else {
Ok(())
}
}
pub fn termination_sequence(&mut self, sequence: Vec<A::GroupBy>) {
self.lifetime_tracker.prioritize_termination_by(sequence);
}
pub fn stop(&mut self) {
self.alive = false;
}
pub fn shutdown(&mut self) {
self.lifetime_tracker.start_termination();
if self.lifetime_tracker.is_finished() {
self.stop();
}
}
}
pub struct ActorRuntime<A: Actor> {
id: IdOf<A>,
actor: A,
context: Context<A>,
awake_envelope: Option<Envelope<A>>,
done_notifier: Box<dyn LifecycleNotifier<Done<A>>>,
msg_rx: mpsc::Receiver<Envelope<A>>,
hp_msg_rx: mpsc::UnboundedReceiver<Parcel<A>>,
join_tx: watch::Sender<Status>,
}
impl<A: Actor> ActorRuntime<A> {
async fn entrypoint(mut self) {
log::info!("Actor started: {:?}", self.id);
let mut awake_envelope = self
.awake_envelope
.take()
.expect("awake envelope has to be set in spawn method!");
let awake_res = awake_envelope
.handle(&mut self.actor, &mut self.context)
.await;
match awake_res {
Ok(_) => {
self.routine().await;
}
Err(err) => {
log::error!(
"Can't call awake notification handler of the actor {:?}: {}",
self.id,
err
);
}
}
log::info!("Actor finished: {:?}", self.id);
let done_event = Done::new(self.id.clone());
if let Err(err) = self.done_notifier.notify(done_event) {
log::error!(
"Can't send done notification from the actor {:?}: {}",
self.id,
err
);
}
if !self.join_tx.is_closed() {
if let Err(_err) = self.join_tx.send(Status::Stop) {
log::error!("Can't release joiners of {:?}", self.id);
}
}
}
async fn routine(&mut self) {
let mut scheduled_queue = crate::compat::DelayQueue::<Envelope<A>>::new().fuse();
let mut pendel = pending();
while self.context.alive {
let maybe_queue: &mut (dyn FusedStream<Item = Result<_, _>> + Unpin + Send) =
if scheduled_queue.get_ref().len() > 0 {
&mut scheduled_queue
} else {
&mut pendel
};
select_biased! {
hp_envelope = self.hp_msg_rx.next() => {
if let Some(hp_env) = hp_envelope {
let envelope = hp_env.envelope;
let process_envelope;
match hp_env.operation {
Operation::Forward => {
process_envelope = Some(envelope);
}
Operation::Done { id } => {
self.context.lifetime_tracker.remove(&id);
if self.context.lifetime_tracker.is_finished() {
self.context.stop();
}
process_envelope = Some(envelope);
}
Operation::Schedule { deadline } => {
scheduled_queue.get_mut().insert_at(envelope, deadline);
log::trace!("Scheduled events: {}", scheduled_queue.get_ref().len());
process_envelope = None;
}
}
if let Some(mut envelope) = process_envelope {
let handle_res = envelope.handle(&mut self.actor, &mut self.context).await;
if let Err(err) = handle_res {
log::error!("Handler for {:?} (high-priority) failed: {}", self.id, err);
}
}
} else {
log::trace!("Messages stream of {:?} (high-priority) drained.", self.id);
if let Err(err) = self.actor.instant_queue_drained(&mut self.context).await {
log::error!("Queue (high-priority) drained handler {:?} failed: {}", self.id, err);
}
}
}
opt_delayed_envelope = maybe_queue.next() => {
if let Some(delayed_envelope) = opt_delayed_envelope {
match delayed_envelope {
Ok(expired) => {
log::trace!("Execute scheduled event. Remained: {}", scheduled_queue.get_ref().len());
let mut envelope = expired.into_inner();
let handle_res = envelope.handle(&mut self.actor, &mut self.context).await;
if let Err(err) = handle_res {
log::error!("Handler for {:?} (scheduled) failed: {}", self.id, err);
}
}
Err(err) => {
log::error!("Failed scheduled execution for {:?}: {}", self.id, err);
}
}
} else {
log::error!("Delay queue of {} closed.", self.id);
if let Err(err) = self.actor.instant_queue_drained(&mut self.context).await {
log::error!("Queue (high-priority) drained handler {:?} failed: {}", self.id, err);
}
}
}
lp_envelope = self.msg_rx.next() => {
if let Some(mut envelope) = lp_envelope {
let handle_res = envelope.handle(&mut self.actor, &mut self.context).await;
if let Err(err) = handle_res {
log::error!("Handler for {:?} failed: {}", self.id, err);
}
} else {
log::trace!("Messages stream of {:?} drained.", self.id);
if let Err(err) = self.actor.queue_drained(&mut self.context).await {
log::error!("Queue drained handler {:?} failed: {}", self.id, err);
}
}
}
}
}
}
}