riker_patterns/
ask.rs

1#![allow(dead_code)]
2
3use std::sync::{Arc, Mutex};
4
5use futures::channel::oneshot::{channel, Sender as ChannelSender};
6use futures::future::RemoteHandle;
7use futures::FutureExt;
8
9use riker::actors::*;
10
11/// Convenience fuction to send and receive a message from an actor
12///
13/// This function sends a message `msg` to the provided actor `receiver`
14/// and returns a `Future` which will be completed when `receiver` replies
15/// by sending a message to the `sender`. The sender is a temporary actor
16/// that fulfills the `Future` upon receiving the reply.
17///
18/// `futures::future::RemoteHandle` is the future returned and the task
19/// is executed on the provided executor `ctx`.
20///
21/// This pattern is especially useful for interacting with actors from outside
22/// of the actor system, such as sending data from HTTP request to an actor
23/// and returning a future to the HTTP response, or using await.
24///
25/// # Examples
26///
27/// ```
28/// # use riker::actors::*;
29/// # use riker_patterns::ask::ask;
30/// # use futures::future::RemoteHandle;
31/// # use futures::executor::block_on;
32///
33/// #[derive(Default)]
34/// struct Reply;
35///
36/// impl Actor for Reply {
37///    type Msg = String;
38///
39///    fn recv(&mut self,
40///                 ctx: &Context<Self::Msg>,
41///                 msg: Self::Msg,
42///                 sender: Sender) {
43///         // reply to the temporary ask actor
44///         sender.as_ref().unwrap().try_tell(
45///             format!("Hello {}", msg), None
46///         ).unwrap();
47///     }
48/// }
49///
50/// // set up the actor system
51/// let sys = ActorSystem::new().unwrap();
52///
53/// // create instance of Reply actor
54/// let actor = sys.actor_of::<Reply>("reply").unwrap();
55///
56/// // ask the actor
57/// let msg = "Will Riker".to_string();
58/// let r: RemoteHandle<String> = ask(&sys, &actor, msg);
59///
60/// assert_eq!(block_on(r), "Hello Will Riker".to_string());
61/// ```
62
63pub fn ask<Msg, Ctx, R, T>(ctx: &Ctx, receiver: &T, msg: Msg) -> RemoteHandle<R>
64where
65    Msg: Message,
66    R: Message,
67    Ctx: TmpActorRefFactory + Run,
68    T: Tell<Msg>,
69{
70    let (tx, rx) = channel::<R>();
71    let tx = Arc::new(Mutex::new(Some(tx)));
72
73    let props = Props::new_from_args(Box::new(AskActor::new), tx);
74    let actor = ctx.tmp_actor_of_props(props).unwrap();
75    receiver.tell(msg, Some(actor.into()));
76
77    ctx.run(rx.map(|r| r.unwrap())).unwrap()
78}
79
80struct AskActor<Msg> {
81    tx: Arc<Mutex<Option<ChannelSender<Msg>>>>,
82}
83
84impl<Msg: Message> AskActor<Msg> {
85    fn new(tx: Arc<Mutex<Option<ChannelSender<Msg>>>>) -> BoxActor<Msg> {
86        let ask = AskActor { tx };
87        Box::new(ask)
88    }
89}
90
91impl<Msg: Message> Actor for AskActor<Msg> {
92    type Msg = Msg;
93
94    fn recv(&mut self, ctx: &Context<Msg>, msg: Msg, _: Sender) {
95        if let Ok(mut tx) = self.tx.lock() {
96            tx.take().unwrap().send(msg).unwrap();
97        }
98        ctx.stop(&ctx.myself);
99    }
100}