use crate::actor_runtime::{Actor, Context};
use crate::forwarders::StreamForwarder;
use crate::ids::{Id, IdOf};
use crate::lifecycle;
use crate::linkage::{ActionRecipient, Address};
use crate::lite_runtime::{LiteTask, Tag, TaskError};
use anyhow::Error;
use async_trait::async_trait;
use futures::channel::oneshot;
use futures::Stream;
use std::convert::identity;
use std::time::Instant;
pub struct Parcel<A: Actor> {
pub(crate) operation: Operation,
pub(crate) envelope: Envelope<A>,
}
impl<A: Actor> Parcel<A> {
pub fn pack<I>(input: I) -> Self
where
A: InstantActionHandler<I>,
I: InstantAction,
{
Self::new(Operation::Forward, input)
}
pub(crate) fn new<I>(operation: Operation, input: I) -> Self
where
A: InstantActionHandler<I>,
I: InstantAction,
{
Self {
operation,
envelope: Envelope::instant(input),
}
}
}
pub(crate) struct Envelope<A: Actor> {
handler: Box<dyn Handler<A>>,
}
impl<A: Actor> Envelope<A> {
pub(crate) async fn handle(
&mut self,
actor: &mut A,
ctx: &mut Context<A>,
) -> Result<(), Error> {
self.handler.handle(actor, ctx).await
}
pub(crate) fn new<I>(input: I) -> Self
where
A: ActionHandler<I>,
I: Action,
{
let handler = ActionHandlerImpl { input: Some(input) };
Self {
handler: Box::new(handler),
}
}
pub(crate) fn instant<I>(input: I) -> Self
where
A: InstantActionHandler<I>,
I: InstantAction,
{
let handler = InstantActionHandlerImpl { input: Some(input) };
Self {
handler: Box::new(handler),
}
}
}
#[derive(Clone)]
pub(crate) enum Operation {
Done {
id: Id,
},
Forward,
Schedule {
deadline: Instant,
},
}
#[async_trait]
trait Handler<A: Actor>: Send {
async fn handle(&mut self, actor: &mut A, _ctx: &mut Context<A>) -> Result<(), Error>;
}
pub trait Action: Send + 'static {}
#[async_trait]
pub trait ActionHandler<I: Action>: Actor {
async fn handle(&mut self, input: I, _ctx: &mut Context<Self>) -> Result<(), Error>;
}
struct ActionHandlerImpl<I> {
input: Option<I>,
}
#[async_trait]
impl<A, I> Handler<A> for ActionHandlerImpl<I>
where
A: ActionHandler<I>,
I: Action,
{
async fn handle(&mut self, actor: &mut A, ctx: &mut Context<A>) -> Result<(), Error> {
let input = self.input.take().expect("action handler called twice");
actor.handle(input, ctx).await
}
}
pub trait InstantAction: Send + 'static {}
#[async_trait]
pub trait InstantActionHandler<I: InstantAction>: Actor {
async fn handle(&mut self, input: I, _ctx: &mut Context<Self>) -> Result<(), Error>;
}
struct InstantActionHandlerImpl<I> {
input: Option<I>,
}
#[async_trait]
impl<A, I> Handler<A> for InstantActionHandlerImpl<I>
where
A: InstantActionHandler<I>,
I: InstantAction,
{
async fn handle(&mut self, actor: &mut A, ctx: &mut Context<A>) -> Result<(), Error> {
let input = self
.input
.take()
.expect("instant action handler called twice");
actor.handle(input, ctx).await
}
}
pub trait SyncAction {}
pub trait SyncActionHandler<I: SyncAction>: Actor {
fn handle(&self) -> Result<(), Error>;
}
#[async_trait]
pub trait InteractionHandler<I: Interaction>: Actor {
async fn handle(&mut self, input: I, _ctx: &mut Context<Self>) -> Result<I::Output, Error>;
}
#[async_trait]
impl<T, I> ActionHandler<Interact<I>> for T
where
T: InteractionHandler<I>,
I: Interaction,
{
async fn handle(&mut self, input: Interact<I>, ctx: &mut Context<Self>) -> Result<(), Error> {
let res = InteractionHandler::handle(self, input.request, ctx).await;
let send_res = input.responder.send(res);
match send_res {
Ok(()) => Ok(()),
Err(Ok(_)) => Err(Error::msg(
"Can't send the successful result of interaction",
)),
Err(Err(err)) => Err(err),
}
}
}
pub type InteractionResponder<T> = oneshot::Sender<Result<T, Error>>;
pub struct Interact<T: Interaction> {
pub request: T,
pub responder: InteractionResponder<T::Output>,
}
impl<T: Interaction> Action for Interact<T> {}
pub trait Interaction: Send + 'static {
type Output: Send + 'static;
}
pub struct InteractionTask<I: Interaction> {
recipient: Box<dyn ActionRecipient<Interact<I>>>,
request: I,
}
impl<I: Interaction> InteractionTask<I> {
pub(crate) fn new<T>(address: &Address<T>, request: I) -> Self
where
T: ActionHandler<Interact<I>>,
{
Self {
recipient: address.action_recipient(),
request,
}
}
pub async fn recv(mut self) -> Result<I::Output, Error> {
let (responder, rx) = oneshot::channel();
let input = Interact {
request: self.request,
responder,
};
self.recipient.act(input).await?;
rx.await.map_err(Error::from).and_then(identity)
}
}
#[async_trait]
impl<I> LiteTask for InteractionTask<I>
where
I: Interaction,
{
type Output = I::Output;
async fn interruptable_routine(mut self) -> Result<Self::Output, Error> {
self.recv().await
}
}
#[async_trait]
pub trait StartedBy<A: Actor>: Actor {
async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error>;
}
#[async_trait]
impl<T, S> InstantActionHandler<lifecycle::Awake<S>> for T
where
T: StartedBy<S>,
S: Actor,
{
async fn handle(
&mut self,
_input: lifecycle::Awake<S>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
StartedBy::handle(self, ctx).await
}
}
#[async_trait]
pub trait InterruptedBy<A: Actor>: Actor {
async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error>;
}
#[async_trait]
impl<T, S> InstantActionHandler<lifecycle::Interrupt<S>> for T
where
T: InterruptedBy<S>,
S: Actor,
{
async fn handle(
&mut self,
_input: lifecycle::Interrupt<S>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
InterruptedBy::handle(self, ctx).await
}
}
#[async_trait]
pub trait Eliminated<A: Actor>: Actor {
async fn handle(&mut self, id: IdOf<A>, ctx: &mut Context<Self>) -> Result<(), Error>;
}
#[async_trait]
impl<T, C> InstantActionHandler<lifecycle::Done<C>> for T
where
T: Eliminated<C>,
C: Actor,
{
async fn handle(
&mut self,
done: lifecycle::Done<C>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
Eliminated::handle(self, done.id, ctx).await
}
}
#[async_trait]
pub trait TaskEliminated<T: LiteTask, M: Tag>: Actor {
async fn handle(
&mut self,
id: IdOf<T>,
tag: M,
result: Result<T::Output, TaskError>,
ctx: &mut Context<Self>,
) -> Result<(), Error>;
}
#[async_trait]
impl<T, C, M> InstantActionHandler<lifecycle::TaskDone<C, M>> for T
where
T: TaskEliminated<C, M>,
C: LiteTask,
M: Tag,
{
async fn handle(
&mut self,
done: lifecycle::TaskDone<C, M>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
TaskEliminated::handle(self, done.id, done.tag, done.result, ctx).await
}
}
#[async_trait]
pub trait InteractionDone<I: Interaction, M: Tag>: Actor {
async fn handle(
&mut self,
tag: M,
output: I::Output,
ctx: &mut Context<Self>,
) -> Result<(), Error>;
async fn failed(
&mut self,
_tag: M,
err: TaskError,
_ctx: &mut Context<Self>,
) -> Result<(), Error> {
if let Some(err) = err.into_other() {
log::error!("Interaction failed: {}", err);
}
Ok(())
}
}
#[async_trait]
impl<T, I, M> TaskEliminated<InteractionTask<I>, M> for T
where
T: InteractionDone<I, M>,
I: Interaction,
M: Tag,
{
async fn handle(
&mut self,
_id: IdOf<InteractionTask<I>>,
tag: M,
result: Result<I::Output, TaskError>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
match result {
Ok(output) => InteractionDone::handle(self, tag, output, ctx).await,
Err(err) => InteractionDone::failed(self, tag, err, ctx).await,
}
}
}
pub(crate) enum StreamItem<T> {
Item(T),
Done,
}
impl<T: Send + 'static> Action for StreamItem<T> {}
#[async_trait]
pub trait Consumer<T: 'static>: Actor {
async fn handle(&mut self, item: T, ctx: &mut Context<Self>) -> Result<(), Error>;
async fn finished(&mut self, _ctx: &mut Context<Self>) -> Result<(), Error> {
log::info!("Stream finished");
Ok(())
}
async fn task_failed(&mut self, err: TaskError, _ctx: &mut Context<Self>) -> Result<(), Error> {
if let Some(err) = err.into_other() {
log::error!("Consumer task failed: {}", err);
}
Ok(())
}
async fn task_finished(&mut self, _ctx: &mut Context<Self>) -> Result<(), Error> {
log::info!("Stream task finished");
Ok(())
}
}
#[async_trait]
impl<T, I> ActionHandler<StreamItem<I>> for T
where
T: Consumer<I>,
I: Send + 'static,
{
async fn handle(&mut self, msg: StreamItem<I>, ctx: &mut Context<Self>) -> Result<(), Error> {
match msg {
StreamItem::Item(item) => Consumer::handle(self, item, ctx).await,
StreamItem::Done => Consumer::finished(self, ctx).await,
}
}
}
#[async_trait]
impl<T, S, M> TaskEliminated<StreamForwarder<S>, M> for T
where
T: Consumer<S::Item>,
S: Stream + Unpin + Send + 'static,
S::Item: Send,
M: Tag,
{
async fn handle(
&mut self,
_id: IdOf<StreamForwarder<S>>,
_tag: M,
result: Result<(), TaskError>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
match result {
Ok(()) => Consumer::task_finished(self, ctx).await,
Err(err) => Consumer::task_failed(self, err, ctx).await,
}
}
}
pub trait StreamAcceptor<T>: Actor {
fn stream_group(&self) -> Self::GroupBy;
}
pub(crate) struct ScheduledItem<T> {
pub timestamp: Instant,
pub item: T,
}
impl<T: Send + 'static> InstantAction for ScheduledItem<T> {}
#[async_trait]
pub trait Scheduled<T>: Actor {
async fn handle(
&mut self,
timestamp: Instant,
item: T,
ctx: &mut Context<Self>,
) -> Result<(), Error>;
}
#[async_trait]
impl<T, I> InstantActionHandler<ScheduledItem<I>> for T
where
T: Scheduled<I>,
I: Send + 'static,
{
async fn handle(
&mut self,
msg: ScheduledItem<I>,
ctx: &mut Context<Self>,
) -> Result<(), Error> {
Scheduled::handle(self, msg.timestamp, msg.item, ctx).await
}
}