use crate::actor_runtime::{Actor, Context};
use crate::forwarders::{InteractionForwarder, StreamForwarder};
use crate::ids::{Id, IdOf};
use crate::lifecycle;
use crate::lite_runtime::{LiteTask, TaskError};
use anyhow::Error;
use async_trait::async_trait;
use futures::channel::oneshot;
use futures::Stream;
use std::time::Instant;
pub struct Parcel<A: Actor> {
pub(crate) operation: Operation,
pub(crate) envelope: Envelope<A>,
}
impl<A: Actor> Parcel<A> {
pub fn pack<I>(input: I) -> Self
where
A: InstantActionHandler<I>,
I: InstantAction,
{
Self::new(Operation::Forward, input)
}
pub(crate) fn new<I>(operation: Operation, input: I) -> Self
where
A: InstantActionHandler<I>,
I: InstantAction,
{
Self {
operation,
envelope: Envelope::instant(input),
}
}
}
pub(crate) struct Envelope<A: Actor> {
handler: Box<dyn Handler<A>>,
}
impl<A: Actor> Envelope<A> {
pub(crate) async fn handle(
&mut self,
actor: &mut A,
ctx: &mut Context<A>,
) -> Result<(), Error> {
self.handler.handle(actor, ctx).await
}
pub(crate) fn new<I>(input: I) -> Self
where
A: ActionHandler<I>,
I: Action,
{
let handler = ActionHandlerImpl { input: Some(input) };
Self {
handler: Box::new(handler),
}
}
pub(crate) fn instant<I>(input: I) -> Self
where
A: InstantActionHandler<I>,
I: InstantAction,
{
let handler = InstantActionHandlerImpl { input: Some(input) };
Self {
handler: Box::new(handler),
}
}
}
#[derive(Clone)]
pub(crate) enum Operation {
Done {
id: Id,
},
Forward,
Schedule {
deadline: Instant,
},
}
#[async_trait]
trait Handler<A: Actor>: Send {
async fn handle(&mut self, actor: &mut A, _ctx: &mut Context<A>) -> Result<(), Error>;
}
pub trait Action: Send + 'static {}
#[async_trait]
pub trait ActionHandler<I: Action>: Actor {
async fn handle(&mut self, input: I, _ctx: &mut Context<Self>) -> Result<(), Error>;
}
struct ActionHandlerImpl<I> {
input: Option<I>,
}
#[async_trait]
impl<A, I> Handler<A> for ActionHandlerImpl<I>
where
A: ActionHandler<I>,
I: Action,
{
async fn handle(&mut self, actor: &mut A, ctx: &mut Context<A>) -> Result<(), Error> {
let input = self.input.take().expect("action handler called twice");
actor.handle(input, ctx).await
}
}
pub trait InstantAction: Send + 'static {}
#[async_trait]
pub trait InstantActionHandler<I: InstantAction>: Actor {
async fn handle(&mut self, input: I, _ctx: &mut Context<Self>) -> Result<(), Error>;
}
struct InstantActionHandlerImpl<I> {
input: Option<I>,
}
#[async_trait]
impl<A, I> Handler<A> for InstantActionHandlerImpl<I>
where
A: InstantActionHandler<I>,
I: InstantAction,
{
async fn handle(&mut self, actor: &mut A, ctx: &mut Context<A>) -> Result<(), Error> {
let input = self
.input
.take()
.expect("instant action handler called twice");
actor.handle(input, ctx).await
}
}
pub trait SyncAction {}
pub trait SyncActionHandler<I: SyncAction>: Actor {
fn handle(&self) -> Result<(), Error>;
}
#[async_trait]
pub trait InteractionHandler<I: Interaction>: Actor {
async fn handle(&mut self, input: I, _ctx: &mut Context<Self>) -> Result<I::Output, Error>;
}
#[async_trait]
impl<T, I> ActionHandler<Interact<I>> for T
where
T: InteractionHandler<I>,
I: Interaction,
{
async fn handle(&mut self, input: Interact<I>, ctx: &mut Context<Self>) -> Result<(), Error> {
let res = InteractionHandler::handle(self, input.request, ctx).await;
let send_res = input.responder.send(res);
match send_res {
Ok(()) => Ok(()),
Err(Ok(_)) => Err(Error::msg(
"Can't send the successful result of interaction",
)),
Err(Err(err)) => Err(err),
}
}
}
pub type InteractionResponder<T> = oneshot::Sender<Result<T, Error>>;
pub struct Interact<T: Interaction> {
pub request: T,
pub responder: InteractionResponder<T::Output>,
}
impl<T: Interaction> Action for Interact<T> {}
pub trait Interaction: Send + 'static {
type Output: Send + 'static;
}
#[async_trait]
pub trait StartedBy<A: Actor>: Actor {
async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error>;
}
#[async_trait]
impl<T, S> InstantActionHandler<lifecycle::Awake<S>> for T
where
T: StartedBy<S>,
S: Actor,
{
async fn handle(
&mut self,
_input: lifecycle::Awake<S>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
StartedBy::handle(self, ctx).await
}
}
#[async_trait]
pub trait InterruptedBy<A: Actor>: Actor {
async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error>;
}
#[async_trait]
impl<T, S> InstantActionHandler<lifecycle::Interrupt<S>> for T
where
T: InterruptedBy<S>,
S: Actor,
{
async fn handle(
&mut self,
_input: lifecycle::Interrupt<S>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
InterruptedBy::handle(self, ctx).await
}
}
#[async_trait]
pub trait Eliminated<A: Actor>: Actor {
async fn handle(&mut self, id: IdOf<A>, ctx: &mut Context<Self>) -> Result<(), Error>;
}
#[async_trait]
impl<T, C> InstantActionHandler<lifecycle::Done<C>> for T
where
T: Eliminated<C>,
C: Actor,
{
async fn handle(
&mut self,
done: lifecycle::Done<C>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
Eliminated::handle(self, done.id, ctx).await
}
}
#[async_trait]
pub trait TaskEliminated<T: LiteTask>: Actor {
async fn handle(
&mut self,
id: IdOf<T>,
result: Result<T::Output, TaskError>,
ctx: &mut Context<Self>,
) -> Result<(), Error>;
}
#[async_trait]
impl<T, C> InstantActionHandler<lifecycle::TaskDone<C>> for T
where
T: TaskEliminated<C>,
C: LiteTask,
{
async fn handle(
&mut self,
done: lifecycle::TaskDone<C>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
TaskEliminated::handle(self, done.id, done.result, ctx).await
}
}
#[async_trait]
pub trait InteractionDone<I: Interaction>: Actor {
async fn handle(&mut self, output: I::Output, ctx: &mut Context<Self>) -> Result<(), Error>;
async fn failed(&mut self, err: TaskError, _ctx: &mut Context<Self>) -> Result<(), Error> {
log::error!("Interaction failed: {}", err);
Ok(())
}
}
#[async_trait]
impl<T, I> TaskEliminated<InteractionForwarder<I>> for T
where
T: InteractionDone<I>,
I: Interaction,
{
async fn handle(
&mut self,
_id: IdOf<InteractionForwarder<I>>,
result: Result<I::Output, TaskError>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
match result {
Ok(output) => InteractionDone::handle(self, output, ctx).await,
Err(err) => InteractionDone::failed(self, err, ctx).await,
}
}
}
pub(crate) enum StreamItem<T> {
Chunk(Vec<T>),
Done,
}
impl<T: Send + 'static> Action for StreamItem<T> {}
#[async_trait]
pub trait Consumer<T: 'static>: Actor {
fn stream_group(&self) -> Self::GroupBy;
async fn handle(&mut self, chunk: Vec<T>, ctx: &mut Context<Self>) -> Result<(), Error>;
async fn finished(&mut self, _ctx: &mut Context<Self>) -> Result<(), Error> {
log::info!("Stream finished");
Ok(())
}
async fn task_failed(&mut self, err: TaskError, _ctx: &mut Context<Self>) -> Result<(), Error> {
log::error!("Consumer task failed: {}", err);
Ok(())
}
async fn task_finished(&mut self, _ctx: &mut Context<Self>) -> Result<(), Error> {
log::info!("Stream task finished");
Ok(())
}
}
#[async_trait]
impl<T, I> ActionHandler<StreamItem<I>> for T
where
T: Consumer<I>,
I: Send + 'static,
{
async fn handle(&mut self, msg: StreamItem<I>, ctx: &mut Context<Self>) -> Result<(), Error> {
match msg {
StreamItem::Chunk(chunk) => Consumer::handle(self, chunk, ctx).await,
StreamItem::Done => Consumer::finished(self, ctx).await,
}
}
}
#[async_trait]
impl<T, S> TaskEliminated<StreamForwarder<S>> for T
where
T: Consumer<S::Item>,
S: Stream + Unpin + Send + 'static,
S::Item: Send,
{
async fn handle(
&mut self,
_id: IdOf<StreamForwarder<S>>,
result: Result<(), TaskError>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
match result {
Ok(()) => Consumer::task_finished(self, ctx).await,
Err(err) => Consumer::task_failed(self, err, ctx).await,
}
}
}
pub(crate) struct ScheduledItem<T> {
pub timestamp: Instant,
pub item: T,
}
impl<T: Send + 'static> InstantAction for ScheduledItem<T> {}
#[async_trait]
pub trait Scheduled<T>: Actor {
async fn handle(
&mut self,
timestamp: Instant,
item: T,
ctx: &mut Context<Self>,
) -> Result<(), Error>;
}
#[async_trait]
impl<T, I> InstantActionHandler<ScheduledItem<I>> for T
where
T: Scheduled<I>,
I: Send + 'static,
{
async fn handle(
&mut self,
msg: ScheduledItem<I>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
Scheduled::handle(self, msg.timestamp, msg.item, ctx).await
}
}