1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use std::sync::mpsc::{channel, Sender};

use futures::{Future, FutureExt};
use futures::future::RemoteHandle;

use crate::protocol::{Message, Envelope, SystemEnvelope, Enqueued};
use crate::actor::{BoxActor, ActorRef, ActorId, BoxActorProd};
use crate::actor::{CreateError, MsgError, MsgResult};
use crate::kernel::{KernelMsg, MailboxSender, MailboxSchedule};
use crate::system::Job;

use self::KernelMsg::{CreateActor, RestartActor, TerminateActor};
use self::KernelMsg::{ParkActor, UnparkActor, Stop, RunFuture};

#[derive(Clone)]
pub struct KernelRef<Msg: Message> {
    pub kernel_tx: Sender<KernelMsg<Msg>>,
    pub timer_tx: Sender<Job<Msg>>,
}

impl<Msg> KernelRef<Msg>
    where Msg: Message
{
    pub fn dispatch(&self,
                    msg: Envelope<Msg>,
                    mbox: &MailboxSender<Msg>) -> MsgResult<Envelope<Msg>> {
        let msg = Enqueued::ActorMsg(msg);

        match mbox.try_enqueue(msg) {
            Ok(_) => {
                if !mbox.is_scheduled() {
                    self.schedule_actor(mbox);
                }
                
                Ok(())
            }
            Err(e) => Err(MsgError::new(e.msg.into()))
        }
    }

    pub fn dispatch_sys(&self,
                        msg: SystemEnvelope<Msg>,
                        mbox: &MailboxSender<Msg>) -> MsgResult<SystemEnvelope<Msg>> {
        let msg = Enqueued::SystemMsg(msg);
        match mbox.try_sys_enqueue(msg) {
            Ok(_) => {
                if !mbox.is_scheduled() {
                    self.schedule_actor(mbox);
                }
                Ok(())
            }
            Err(e) => Err(MsgError::new(e.msg.into()))
        }
    }

    pub fn schedule_actor<Mbs>(&self, mbox: &Mbs)
        where Mbs: MailboxSchedule<Msg=Msg>
    {
        mbox.set_scheduled(true);
        send(UnparkActor(mbox.uid()), &self.kernel_tx);
    }

    pub fn create_actor(&self,
                        props: BoxActorProd<Msg>,
                        name: &str,
                        parent: &ActorRef<Msg>) -> Result<ActorRef<Msg>, CreateError> {
        let (tx, rx) = channel();
        let msg = CreateActor(props,
                                name.to_string(),
                                parent.clone(),
                                tx);
        send(msg, &self.kernel_tx);

        rx.recv().unwrap()
    }

    pub fn park_actor(&self, uid: ActorId, actor: Option<BoxActor<Msg>>) {
        send(ParkActor(uid, actor), &self.kernel_tx);
    }

    pub fn terminate_actor(&self, uid: ActorId) {
        send(TerminateActor(uid), &self.kernel_tx);
    }

    pub fn restart_actor(&self, uid: ActorId) {
        send(RestartActor(uid), &self.kernel_tx);
    }

    pub fn stop_kernel(&self) {
        send(Stop, &self.kernel_tx);
    }

    pub fn execute<F>(&self, f: F) -> RemoteHandle<F::Output>
        where F: Future + Send + 'static,
                <F as Future>::Output: std::marker::Send
    {
        let (r, rh) = f.remote_handle();
        send(RunFuture(r.boxed()), &self.kernel_tx);
        rh
    }

    pub fn schedule(&self, job: Job<Msg>) {
        drop(self.timer_tx.send(job))
    }
}

fn send<Msg>(msg: KernelMsg<Msg>, tx: &Sender<KernelMsg<Msg>>)
    where Msg: Message
{
    drop(tx.send(msg))
}

unsafe impl<Msg: Message> Send for KernelRef<Msg> {}
unsafe impl<Msg: Message> Sync for KernelRef<Msg> {}

// This exists as a temporary solution to allow getting the original msg
// out of the Enqueued wrapper.
// Instead the mailbox and queue functions should be refactored
// to not require Enqueued
impl<Msg: Message> Into<Envelope<Msg>> for Enqueued<Msg> {
    fn into(self) -> Envelope<Msg> {
        match self {
            Enqueued::ActorMsg(msg) => msg,
            _ => panic!("")

        }
    }
}

// This exists as a temporary solution to allow getting the original msg
// out of the Enqueued wrapper.
// Instead the mailbox and queue functions should be refactored
// to not require Enqueued
impl<Msg: Message> Into<SystemEnvelope<Msg>> for Enqueued<Msg> {
    fn into(self) -> SystemEnvelope<Msg> {
        match self {
            Enqueued::SystemMsg(msg) => msg,
            _ => panic!("")

        }
    }
}