use crate::actor_runtime::{Actor, Context};
use crate::ids::{Id, IdOf};
use crate::lifecycle;
use crate::lite_runtime::LiteTask;
use anyhow::Error;
use async_trait::async_trait;
use futures::channel::oneshot;
use std::time::Instant;
pub(crate) struct Envelope<A: Actor> {
handler: Box<dyn Handler<A>>,
}
impl<A: Actor> Envelope<A> {
pub(crate) async fn handle(
&mut self,
actor: &mut A,
ctx: &mut Context<A>,
) -> Result<(), Error> {
self.handler.handle(actor, ctx).await
}
pub(crate) fn new<I>(input: I) -> Self
where
A: ActionHandler<I>,
I: Action,
{
let handler = ActionHandlerImpl { input: Some(input) };
Self {
handler: Box::new(handler),
}
}
}
#[derive(Clone)]
pub(crate) enum Operation {
Done {
id: Id,
},
Forward,
Schedule {
deadline: Instant,
},
}
pub(crate) struct HpEnvelope<A: Actor> {
pub operation: Operation,
pub envelope: Envelope<A>,
}
#[async_trait]
trait Handler<A: Actor>: Send {
async fn handle(&mut self, actor: &mut A, _ctx: &mut Context<A>) -> Result<(), Error>;
}
pub trait Action: Send + 'static {
fn is_high_priority(&self) -> bool {
false
}
}
#[async_trait]
pub trait ActionHandler<I: Action>: Actor {
async fn handle(&mut self, input: I, _ctx: &mut Context<Self>) -> Result<(), Error>;
}
struct ActionHandlerImpl<I> {
input: Option<I>,
}
#[async_trait]
impl<A, I> Handler<A> for ActionHandlerImpl<I>
where
A: ActionHandler<I>,
I: Action,
{
async fn handle(&mut self, actor: &mut A, ctx: &mut Context<A>) -> Result<(), Error> {
let input = self.input.take().expect("action handler called twice");
actor.handle(input, ctx).await
}
}
#[async_trait]
pub trait InteractionHandler<I: Interaction>: Actor {
async fn handle(&mut self, input: I, _ctx: &mut Context<Self>) -> Result<I::Output, Error>;
}
#[async_trait]
impl<T, I> ActionHandler<Interact<I>> for T
where
T: InteractionHandler<I>,
I: Interaction,
{
async fn handle(&mut self, input: Interact<I>, ctx: &mut Context<Self>) -> Result<(), Error> {
let res = InteractionHandler::handle(self, input.request, ctx).await;
let send_res = input.responder.send(res);
match send_res {
Ok(()) => Ok(()),
Err(Ok(_)) => Err(Error::msg(
"Can't send the successful result of interaction",
)),
Err(Err(err)) => Err(err),
}
}
}
pub(crate) struct Interact<T: Interaction> {
pub request: T,
pub responder: oneshot::Sender<Result<T::Output, Error>>,
}
impl<T: Interaction> Action for Interact<T> {
fn is_high_priority(&self) -> bool {
Interaction::is_high_priority(&self.request)
}
}
pub trait Interaction: Send + 'static {
type Output: Send + 'static;
fn is_high_priority(&self) -> bool {
false
}
}
#[async_trait]
pub trait StartedBy<A: Actor>: Actor {
async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error>;
}
#[async_trait]
impl<T, S> ActionHandler<lifecycle::Awake<S>> for T
where
T: StartedBy<S>,
S: Actor,
{
async fn handle(
&mut self,
_input: lifecycle::Awake<S>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
StartedBy::handle(self, ctx).await
}
}
#[async_trait]
pub trait InterruptedBy<A: Actor>: Actor {
async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error>;
}
#[async_trait]
impl<T, S> ActionHandler<lifecycle::Interrupt<S>> for T
where
T: InterruptedBy<S>,
S: Actor,
{
async fn handle(
&mut self,
_input: lifecycle::Interrupt<S>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
InterruptedBy::handle(self, ctx).await
}
}
#[async_trait]
pub trait Eliminated<A: Actor>: Actor {
async fn handle(&mut self, id: IdOf<A>, ctx: &mut Context<Self>) -> Result<(), Error>;
}
#[async_trait]
impl<T, C> ActionHandler<lifecycle::Done<C>> for T
where
T: Eliminated<C>,
C: Actor,
{
async fn handle(
&mut self,
done: lifecycle::Done<C>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
Eliminated::handle(self, done.id, ctx).await
}
}
#[async_trait]
pub trait TaskEliminated<T: LiteTask>: Actor {
async fn handle(&mut self, id: IdOf<T>, ctx: &mut Context<Self>) -> Result<(), Error>;
}
#[async_trait]
impl<T, C> ActionHandler<lifecycle::TaskDone<C>> for T
where
T: TaskEliminated<C>,
C: LiteTask,
{
async fn handle(
&mut self,
done: lifecycle::TaskDone<C>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
TaskEliminated::handle(self, done.id, ctx).await
}
}
pub(crate) struct StreamItem<T> {
pub item: T,
}
impl<T: Send + 'static> Action for StreamItem<T> {}
#[async_trait]
pub trait Consumer<T>: Actor {
async fn handle(&mut self, item: T, ctx: &mut Context<Self>) -> Result<(), Error>;
}
#[async_trait]
impl<T, I> ActionHandler<StreamItem<I>> for T
where
T: Consumer<I>,
I: Send + 'static,
{
async fn handle(&mut self, msg: StreamItem<I>, ctx: &mut Context<Self>) -> Result<(), Error> {
Consumer::handle(self, msg.item, ctx).await
}
}
#[async_trait]
pub trait TryConsumer<T>: Actor {
type Error;
async fn handle(&mut self, item: T, ctx: &mut Context<Self>) -> Result<(), Error>;
async fn error(&mut self, error: Self::Error, ctx: &mut Context<Self>) -> Result<(), Error>;
}
#[async_trait]
impl<T, I> Consumer<Result<I, T::Error>> for T
where
T: TryConsumer<I>,
T::Error: Send,
I: Send + 'static,
{
async fn handle(
&mut self,
result: Result<I, T::Error>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
match result {
Ok(item) => TryConsumer::handle(self, item, ctx).await,
Err(err) => TryConsumer::error(self, err, ctx).await,
}
}
}
pub(crate) struct ScheduledItem<T> {
pub timestamp: Instant,
pub item: T,
}
impl<T: Send + 'static> Action for ScheduledItem<T> {
fn is_high_priority(&self) -> bool {
true
}
}
#[async_trait]
pub trait Scheduled<T>: Actor {
async fn handle(
&mut self,
timestamp: Instant,
item: T,
ctx: &mut Context<Self>,
) -> Result<(), Error>;
}
#[async_trait]
impl<T, I> ActionHandler<ScheduledItem<I>> for T
where
T: Scheduled<I>,
I: Send + 'static,
{
async fn handle(
&mut self,
msg: ScheduledItem<I>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
Scheduled::handle(self, msg.timestamp, msg.item, ctx).await
}
}