1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#![allow(dead_code)]

use std::sync::{Arc, Mutex};

use futures::channel::oneshot::{channel, Sender, Receiver, Canceled};
use futures::{Future, Poll, task};

use riker::actors::*;

pub fn ask<Msg, Ctx, T>(ctx: &Ctx, receiver: &T, msg: Msg)
                        -> Box<Future<Item=Msg, Error=Canceled> + Send>
    where Msg: Message,
            Ctx: TmpActorRefFactory<Msg=Msg> + ExecutionContext,
            T: Tell<Msg=Msg>
{
    let ask = Ask::new(ctx, receiver.clone(), msg);
    let ask = ctx.execute(ask);
    Box::new(ask)
}

pub struct Ask<Msg: Message> {
    inner: Receiver<Msg>,
}

impl<Msg: Message> Ask<Msg> {
    pub fn new<Ctx, T>(ctx: &Ctx, receiver: &T, msg: Msg) -> Ask<Msg>
        where Ctx: TmpActorRefFactory<Msg=Msg>, T: Tell<Msg=Msg>
    {
        let (tx, rx) = channel::<Msg>();
        let tx = Arc::new(Mutex::new(Some(tx)));

        let props = Props::new_args(Box::new(AskActor::new), tx);
        let actor = ctx.tmp_actor_of(props).unwrap();
        receiver.tell(msg, Some(actor));

        Ask {
            inner: rx
        }
    }
}

impl<Msg: Message> Future for Ask<Msg> {
    type Item = Msg;
    type Error = Canceled;

    fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
        self.inner.poll(cx)
    }
}

struct AskActor<Msg> {
    tx: Arc<Mutex<Option<Sender<Msg>>>>,
}

impl<Msg: Message> AskActor<Msg> {
    fn new(tx: Arc<Mutex<Option<Sender<Msg>>>>) -> BoxActor<Msg> {
        let ask = AskActor {
            tx: tx
        };
        Box::new(ask)
    }
}

impl<Msg: Message> Actor for AskActor<Msg> {
    type Msg = Msg;

    fn receive(&mut self, ctx: &Context<Msg>, msg: Msg, _: Option<ActorRef<Msg>>) {
        if let Ok(mut tx) = self.tx.lock() {
            tx.take().unwrap().send(msg).unwrap();
        }
        
        ctx.stop(&ctx.myself);
    }
}