1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
#![allow(dead_code)]

use std::sync::{Arc, Mutex};

use futures::channel::oneshot::{channel, Sender as ChannelSender};
use futures::future::RemoteHandle;
use futures::FutureExt;

use riker::actors::*;

/// Convenience fuction to send and receive a message from an actor
///
/// This function sends a message `msg` to the provided actor `receiver`
/// and returns a `Future` which will be completed when `receiver` replies
/// by sending a message to the `sender`. The sender is a temporary actor
/// that fulfills the `Future` upon receiving the reply.
///
/// `futures::future::RemoteHandle` is the future returned and the task
/// is executed on the provided executor `ctx`.
///
/// This pattern is especially useful for interacting with actors from outside
/// of the actor system, such as sending data from HTTP request to an actor
/// and returning a future to the HTTP response, or using await.
///
/// # Examples
///
/// ```
/// # use riker::actors::*;
/// # use riker_patterns::ask::ask;
/// # use futures::future::RemoteHandle;
/// # use futures::executor::block_on;
///
/// #[derive(Default)]
/// struct Reply;
///
/// impl Actor for Reply {
///    type Msg = String;
///
///    fn recv(&mut self,
///                 ctx: &Context<Self::Msg>,
///                 msg: Self::Msg,
///                 sender: Sender) {
///         // reply to the temporary ask actor
///         sender.as_ref().unwrap().try_tell(
///             format!("Hello {}", msg), None
///         ).unwrap();
///     }
/// }
///
/// // set up the actor system
/// let sys = ActorSystem::new().unwrap();
///
/// // create instance of Reply actor
/// let actor = sys.actor_of::<Reply>("reply").unwrap();
///
/// // ask the actor
/// let msg = "Will Riker".to_string();
/// let r: RemoteHandle<String> = ask(&sys, &actor, msg);
///
/// assert_eq!(block_on(r), "Hello Will Riker".to_string());
/// ```

pub fn ask<Msg, Ctx, R, T>(ctx: &Ctx, receiver: &T, msg: Msg) -> RemoteHandle<R>
where
    Msg: Message,
    R: Message,
    Ctx: TmpActorRefFactory + Run,
    T: Tell<Msg>,
{
    let (tx, rx) = channel::<R>();
    let tx = Arc::new(Mutex::new(Some(tx)));

    let props = Props::new_from_args(Box::new(AskActor::new), tx);
    let actor = ctx.tmp_actor_of_props(props).unwrap();
    receiver.tell(msg, Some(actor.into()));

    ctx.run(rx.map(|r| r.unwrap())).unwrap()
}

struct AskActor<Msg> {
    tx: Arc<Mutex<Option<ChannelSender<Msg>>>>,
}

impl<Msg: Message> AskActor<Msg> {
    fn new(tx: Arc<Mutex<Option<ChannelSender<Msg>>>>) -> BoxActor<Msg> {
        let ask = AskActor { tx };
        Box::new(ask)
    }
}

impl<Msg: Message> Actor for AskActor<Msg> {
    type Msg = Msg;

    fn recv(&mut self, ctx: &Context<Msg>, msg: Msg, _: Sender) {
        if let Ok(mut tx) = self.tx.lock() {
            tx.take().unwrap().send(msg).unwrap();
        }
        ctx.stop(&ctx.myself);
    }
}