riker_patterns/ask.rs
1#![allow(dead_code)]
2
3use std::sync::{Arc, Mutex};
4
5use futures::channel::oneshot::{channel, Sender as ChannelSender};
6use futures::future::RemoteHandle;
7use futures::FutureExt;
8
9use riker::actors::*;
10
11/// Convenience fuction to send and receive a message from an actor
12///
13/// This function sends a message `msg` to the provided actor `receiver`
14/// and returns a `Future` which will be completed when `receiver` replies
15/// by sending a message to the `sender`. The sender is a temporary actor
16/// that fulfills the `Future` upon receiving the reply.
17///
18/// `futures::future::RemoteHandle` is the future returned and the task
19/// is executed on the provided executor `ctx`.
20///
21/// This pattern is especially useful for interacting with actors from outside
22/// of the actor system, such as sending data from HTTP request to an actor
23/// and returning a future to the HTTP response, or using await.
24///
25/// # Examples
26///
27/// ```
28/// # use riker::actors::*;
29/// # use riker_patterns::ask::ask;
30/// # use futures::future::RemoteHandle;
31/// # use futures::executor::block_on;
32///
33/// #[derive(Default)]
34/// struct Reply;
35///
36/// impl Actor for Reply {
37/// type Msg = String;
38///
39/// fn recv(&mut self,
40/// ctx: &Context<Self::Msg>,
41/// msg: Self::Msg,
42/// sender: Sender) {
43/// // reply to the temporary ask actor
44/// sender.as_ref().unwrap().try_tell(
45/// format!("Hello {}", msg), None
46/// ).unwrap();
47/// }
48/// }
49///
50/// // set up the actor system
51/// let sys = ActorSystem::new().unwrap();
52///
53/// // create instance of Reply actor
54/// let actor = sys.actor_of::<Reply>("reply").unwrap();
55///
56/// // ask the actor
57/// let msg = "Will Riker".to_string();
58/// let r: RemoteHandle<String> = ask(&sys, &actor, msg);
59///
60/// assert_eq!(block_on(r), "Hello Will Riker".to_string());
61/// ```
62
63pub fn ask<Msg, Ctx, R, T>(ctx: &Ctx, receiver: &T, msg: Msg) -> RemoteHandle<R>
64where
65 Msg: Message,
66 R: Message,
67 Ctx: TmpActorRefFactory + Run,
68 T: Tell<Msg>,
69{
70 let (tx, rx) = channel::<R>();
71 let tx = Arc::new(Mutex::new(Some(tx)));
72
73 let props = Props::new_from_args(Box::new(AskActor::new), tx);
74 let actor = ctx.tmp_actor_of_props(props).unwrap();
75 receiver.tell(msg, Some(actor.into()));
76
77 ctx.run(rx.map(|r| r.unwrap())).unwrap()
78}
79
80struct AskActor<Msg> {
81 tx: Arc<Mutex<Option<ChannelSender<Msg>>>>,
82}
83
84impl<Msg: Message> AskActor<Msg> {
85 fn new(tx: Arc<Mutex<Option<ChannelSender<Msg>>>>) -> BoxActor<Msg> {
86 let ask = AskActor { tx };
87 Box::new(ask)
88 }
89}
90
91impl<Msg: Message> Actor for AskActor<Msg> {
92 type Msg = Msg;
93
94 fn recv(&mut self, ctx: &Context<Msg>, msg: Msg, _: Sender) {
95 if let Ok(mut tx) = self.tx.lock() {
96 tx.take().unwrap().send(msg).unwrap();
97 }
98 ctx.stop(&ctx.myself);
99 }
100}