1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
//! This module contains the `Envelope` that allow
//! to call methods of actors related to a sepcific
//! imcoming message.

use crate::Actor;
use anyhow::{anyhow, Error};
use async_trait::async_trait;
use futures::channel::oneshot;

pub(crate) struct Envelope<A: Actor> {
    handler: Box<dyn Handler<A>>,
}

impl<A: Actor> Envelope<A> {
    pub(crate) async fn handle(&mut self, actor: &mut A) -> Result<(), Error> {
        self.handler.handle(actor).await
    }

    /// Creates an `Envelope` for `Interaction`.
    pub(crate) fn interaction<I>(input: I) -> (Self, oneshot::Receiver<Result<I::Output, Error>>)
    where
        A: InteractionHandler<I>,
        I: Interaction,
    {
        let (tx, rx) = oneshot::channel();
        let handler = InteractionHandlerImpl {
            input: Some(input),
            tx: Some(tx),
        };
        let this = Self {
            handler: Box::new(handler),
        };
        (this, rx)
    }

    /// Creates an `Envelope` for `Action`.
    pub(crate) fn action<I>(input: I) -> Self
    where
        A: ActionHandler<I>,
        I: Action,
    {
        let handler = ActionHandlerImpl { input: Some(input) };
        Self {
            handler: Box::new(handler),
        }
    }
}

/// Internal `Handler` type that used by `Actor`'s routine to execute
/// `ActionHandler` or `InteractionHandler`.
#[async_trait]
trait Handler<A: Actor>: Send {
    /// Main method that expects a mutable reference to `Actor` that
    /// will be used by implementations to handle messages.
    async fn handle(&mut self, actor: &mut A) -> Result<(), Error>;
}

/// `Interaction` type can be sent to an `Actor` that implements
/// `InteractionHandler` for that message type.
/// It has to return a response of `Output` type.
pub trait Interaction: Send + 'static {
    /// The type of a response.
    type Output: Send + 'static;
}

/// Type of `Handler` to process interaction in request-response style.
#[async_trait]
pub trait InteractionHandler<I: Interaction> {
    /// Asyncronous method that receives incoming message and return a response.
    async fn handle(&mut self, input: I) -> Result<I::Output, Error>;
}

struct InteractionHandlerImpl<I, O> {
    input: Option<I>,
    tx: Option<oneshot::Sender<Result<O, Error>>>,
}

#[async_trait]
impl<A, I, O> Handler<A> for InteractionHandlerImpl<I, O>
where
    A: Actor + InteractionHandler<I>,
    I: Interaction<Output = O>,
    O: Send + 'static,
{
    async fn handle(&mut self, actor: &mut A) -> Result<(), Error> {
        let input = self
            .input
            .take()
            .expect("interaction handler called twice (no msg)");
        let response = actor.handle(input).await;
        let tx = self
            .tx
            .take()
            .expect("interaction handler called twice (no tx)");
        tx.send(response)
            .map_err(|_| anyhow!("can't send a response of interaction"))?;
        Ok(())
    }
}

/// `Action` type can be sent to an `Actor` that implements
/// `ActionHandler` for that message type.
pub trait Action: Send + 'static {}

/// Type of `Handler` to process incoming messages in one-shot style.
#[async_trait]
pub trait ActionHandler<I: Action> {
    /// Asyncronous method that receives incoming message.
    async fn handle(&mut self, input: I) -> Result<(), Error>;
}

struct ActionHandlerImpl<I> {
    input: Option<I>,
}

#[async_trait]
impl<A, I> Handler<A> for ActionHandlerImpl<I>
where
    A: Actor + ActionHandler<I>,
    I: Action,
{
    async fn handle(&mut self, actor: &mut A) -> Result<(), Error> {
        let input = self.input.take().expect("action handler called twice");
        actor.handle(input).await
    }
}