1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Copyright 2020-2021 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use riker::actors::*;

use std::sync::{Arc, Mutex};

use futures::{
    channel::oneshot::{channel, Sender as ChannelSender},
    future::RemoteHandle,
    FutureExt,
};

// riker ask pattern
pub fn ask<Msg, Ctx, R, T>(ctx: &Ctx, receiver: &T, msg: Msg) -> RemoteHandle<R>
where
    Msg: Message,
    R: Message,
    Ctx: TmpActorRefFactory + Run,
    T: Tell<Msg>,
{
    let (tx, rx) = channel::<R>();
    let tx = Arc::new(Mutex::new(Some(tx)));

    let props = Props::new_from_args(Box::new(AskActor::boxed), tx);
    let actor = ctx.tmp_actor_of_props(props).unwrap();
    receiver.tell(msg, Some(actor.into()));

    ctx.run(rx.map(|r| r.unwrap())).unwrap()
}

struct AskActor<Msg> {
    tx: Arc<Mutex<Option<ChannelSender<Msg>>>>,
}

impl<Msg: Message> AskActor<Msg> {
    fn boxed(tx: Arc<Mutex<Option<ChannelSender<Msg>>>>) -> BoxActor<Msg> {
        let ask = AskActor { tx };
        Box::new(ask)
    }
}

impl<Msg: Message> Actor for AskActor<Msg> {
    type Msg = Msg;

    fn recv(&mut self, ctx: &Context<Msg>, msg: Msg, _: Sender) {
        if let Ok(mut tx) = self.tx.lock() {
            tx.take().unwrap().send(msg).unwrap();
        }
        ctx.stop(&ctx.myself);
    }
}