1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
//! Actor dispatcher with strategy - dedicated thread per actor
use crate::executors::thread_pinned_executor::{ThreadPinnedExecutor, DistributionStrategy};
use crate::executors::executor::{Executor, ExecutorTask};
use crate::actors::dispatcher::Dispatcher;
use crate::actors::actor_cell::ActorCell;
use crate::actors::local_actor_ref::LocalActorRef;
use crate::actors::abstract_actor_ref::AbstractActorRef;
use crate::actors::actor_context::ActorContext;
use crate::actors::envelope::Envelope;
use crate::actors::mailbox::Mailbox;
use crate::actors::actor::{Actor, PoisonPill};
use crate::actors::message::Message;
use crate::common::tsafe::TSafe;
use std::any::Any;


pub struct PinnedDispatcher {
    executor: ThreadPinnedExecutor,
    rounds: usize
}

impl PinnedDispatcher {
    pub fn new() -> PinnedDispatcher {
        let executor = ThreadPinnedExecutor::new()
            .set_distribution_strategy(DistributionStrategy::EventLoop)
            .set_threads_count(1)
            .run();
        PinnedDispatcher {
            executor,
            rounds: 0
        }
    }

    pub fn invoke(mailbox: &TSafe<Mailbox + Send>, actor: &TSafe<Actor + Send>, cell: &TSafe<ActorCell>) {
        while true {
            let envelope = {
                let mut mailbox = mailbox.lock().unwrap();
                if mailbox.has_messages() {
                    Some(mailbox.dequeue())
                } else {
                    mailbox.set_planned(false);
                    break;
                }
            };

            if envelope.is_some() {
                let envelope = envelope.unwrap();

                let sender: Box<AbstractActorRef + Send> = {
                    if envelope.sender.is_some() {
                        envelope.sender.unwrap()
                    } else {
                        let mut system = envelope.system.lock().unwrap();
                        let dead_letters = system.dead_letters();
                        dead_letters
                    }
                };


                let msg = envelope.message;

                let handled = {
                    let mut actor = actor.lock().unwrap();
                    let ctx = ActorContext::new(
                        sender.clone(),
                        envelope.receiver.clone(),
                        envelope.system.clone());
                    let im = actor.receive(msg.clone(), ctx);
                    if im.is_ok() {
                        im.ok().unwrap()
                    } else {
                        false
                    }
                };

                if !handled {
                    let handled2 = PinnedDispatcher::internal_receive(mailbox, msg.clone(), cell);
                    if !handled2 {
                        let mut dead_letters = {
                            let mut system = envelope.system.lock().unwrap();
                            let dead_letters = system.dead_letters();
                            dead_letters
                        };
                        dead_letters.cell().lock().unwrap().send(&dead_letters.cell(), msg, Some(sender), envelope.receiver );

                    }
                }
            }
        }
    }

    pub fn internal_receive(mailbox: &TSafe<Mailbox + Send>, msg: Message, cell: &TSafe<ActorCell>) -> bool {

        if let Some(PoisonPill {}) = msg.get().downcast_ref::<PoisonPill>() {
            let mut cell_u = cell.lock().unwrap();
            cell_u.suspend();
            // +++ cell.actor.timers().cancelAll();
            let dead_letters = cell_u.system.lock().unwrap().dead_letters();
            mailbox.lock().unwrap().clean_up(Box::new(LocalActorRef::new(cell.clone(), cell_u.path.clone())), dead_letters);
            cell_u.stop(cell.clone());
            cell_u.dispatcher.lock().unwrap().stop();
        } else {
            return false
        }

        true
    }
}

impl Executor for PinnedDispatcher {
    fn execute(&mut self, f: ExecutorTask, options: Option<Box<Any>>) {
        self.executor.execute(f, options)
    }

    fn stop(&mut self) {
        self.executor.stop();
    }
}

impl Dispatcher for PinnedDispatcher {

    fn dispatch(self: &mut Self, cell: TSafe<ActorCell>, _bid: usize, mailbox: TSafe<Mailbox + Send>, actor: TSafe<Actor + Send>, envelope: Envelope) {
        let mut mailbox_u = mailbox.lock().unwrap();
        mailbox_u.enqueue(envelope);
        if !mailbox_u.is_planned() {
            mailbox_u.set_planned(true);

            let mailbox = mailbox.clone();
            let f = Box::new(move || {
                PinnedDispatcher::invoke(&mailbox, &actor, &cell)
            });

            self.execute(f,  None)
        }
    }

    fn obtain_bid(self: &mut Self) -> usize {
        0
    }
}

//impl Drop for PinnedDispatcher {
//    fn drop(&mut self) {
//        println!("PinnedDispatcher dropped")
//    }
//}