1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use std::sync::Arc;

use futures::{channel::mpsc::Sender, task::SpawnExt, SinkExt};

use crate::{
    actor::{MsgError, MsgResult},
    kernel::{
        mailbox::{AnySender, MailboxSchedule, MailboxSender},
        KernelMsg,
    },
    system::ActorSystem,
    AnyMessage, Envelope, Message,
};

#[derive(Clone)]
pub struct KernelRef {
    pub tx: Sender<KernelMsg>,
}

impl KernelRef {
    pub(crate) fn schedule(&self, sys: &ActorSystem) {
        self.send(KernelMsg::RunActor, sys);
    }

    pub(crate) fn restart(&self, sys: &ActorSystem) {
        self.send(KernelMsg::RestartActor, sys);
    }

    pub(crate) fn terminate(&self, sys: &ActorSystem) {
        self.send(KernelMsg::TerminateActor, sys);
    }

    pub(crate) fn sys_init(&self, sys: &ActorSystem) {
        self.send(KernelMsg::Sys(sys.clone()), sys);
    }

    fn send(&self, msg: KernelMsg, sys: &ActorSystem) {
        let mut tx = self.tx.clone();
        sys.exec
            .spawn(async move {
                drop(tx.send(msg).await);
            })
            .unwrap();
    }
}

pub fn dispatch<Msg>(
    msg: Envelope<Msg>,
    mbox: &MailboxSender<Msg>,
    kernel: &KernelRef,
    sys: &ActorSystem,
) -> MsgResult<Envelope<Msg>>
where
    Msg: Message,
{
    match mbox.try_enqueue(msg) {
        Ok(_) => {
            if !mbox.is_scheduled() {
                mbox.set_scheduled(true);
                kernel.schedule(sys);
            }

            Ok(())
        }
        Err(e) => Err(MsgError::new(e.msg)),
    }
}

pub fn dispatch_any(
    msg: &mut AnyMessage,
    sender: crate::actor::Sender,
    mbox: &Arc<dyn AnySender>,
    kernel: &KernelRef,
    sys: &ActorSystem,
) -> Result<(), ()> {
    match mbox.try_any_enqueue(msg, sender) {
        Ok(_) => {
            if !mbox.is_sched() {
                mbox.set_sched(true);
                kernel.schedule(sys);
            }

            Ok(())
        }
        Err(_) => Err(()),
    }
}

unsafe impl Send for KernelRef {}
unsafe impl Sync for KernelRef {}