seal_rs/actors/
default_dispatcher.rs

1//NOTOK
2use crate::executors::thread_pinned_executor::{ThreadPinnedExecutor, DistributionStrategy, TaskOptions};
3use crate::executors::executor::{Executor,ExecutorTask};
4use crate::actors::dispatcher::Dispatcher;
5use crate::actors::actor_cell::ActorCell;
6use crate::actors::local_actor_ref::LocalActorRef;
7use crate::actors::abstract_actor_ref::AbstractActorRef;
8use crate::actors::actor_context::ActorContext;
9use crate::actors::envelope::Envelope;
10use crate::actors::mailbox::Mailbox;
11use crate::actors::actor::{Actor, PoisonPill};
12use crate::common::tsafe::TSafe;
13use std::any::Any;
14
15
16//TODO нужно заменить все Mutex это возможно на RwLock
17pub struct DefaultDispatcher {
18    executor: ThreadPinnedExecutor,
19    rounds: usize
20}
21
22impl DefaultDispatcher {
23    pub fn new(_t_count: u32) -> DefaultDispatcher {
24        let executor = ThreadPinnedExecutor::new()
25            .set_distribution_strategy(DistributionStrategy::Load)
26            //.set_threads_count(1)
27            .run();
28        DefaultDispatcher {
29            executor,
30            rounds: 0
31        }
32    }
33
34    pub fn invoke(mailbox: &TSafe<Mailbox + Send>, actor: &TSafe<Actor + Send>, cell: &TSafe<ActorCell>) {
35        let envelope = {
36            let mut mailbox = mailbox.lock().unwrap();
37            if mailbox.has_messages() {
38                Some(mailbox.dequeue())
39            } else {
40                None
41            }
42        };
43
44        if envelope.is_some() {
45            let envelope = envelope.unwrap();
46
47            let sender: Box<AbstractActorRef + Send> = {
48                if envelope.sender.is_some() {
49                    envelope.sender.unwrap()
50                } else {
51                    let mut system = envelope.system.lock().unwrap();
52                    let dead_letters = system.dead_letters();
53                    dead_letters
54                }
55            };
56
57
58            let msg = envelope.message;
59
60            let mut actor = actor.lock().unwrap();
61            let ctx = ActorContext::new( sender.clone(), envelope.receiver.clone(), envelope.system.clone());
62            let handled = actor.receive(&msg, ctx);
63
64            if !handled {
65                let handled2 = DefaultDispatcher::internal_receive(mailbox, &msg, cell);
66                if !handled2 {
67                    let mut dead_letters = {
68                        let mut system = envelope.system.lock().unwrap();
69                        let dead_letters = system.dead_letters();
70                        dead_letters
71                    };
72                    dead_letters.cell().lock().unwrap().send(&dead_letters.cell(), msg, Some(sender), envelope.receiver );
73
74                }
75            }
76        }
77
78        mailbox.lock().unwrap().set_planned(false);
79    }
80
81    pub fn internal_receive(mailbox: &TSafe<Mailbox + Send>, msg: &Box<Any + Send>, cell: &TSafe<ActorCell>) -> bool {
82
83        if let Some(PoisonPill {}) = msg.downcast_ref::<PoisonPill>() {
84            let mut cell_u = cell.lock().unwrap();
85            cell_u.suspend();
86            // +++ cell.actor.timers().cancelAll();
87            let dead_letters = cell_u.system.lock().unwrap().dead_letters();
88            mailbox.lock().unwrap().clean_up(Box::new(LocalActorRef::new(cell.clone(), cell_u.path.clone())), dead_letters);
89            cell_u.stop(cell.clone());
90        } else {
91            return false
92        }
93
94        true
95    }
96}
97
98impl Executor for DefaultDispatcher {
99    fn execute(&mut self, f: ExecutorTask, options: Option<Box<Any>>) {
100        self.executor.execute(f, options)
101    }
102}
103
104impl Dispatcher for DefaultDispatcher {
105
106    fn dispatch(self: &mut Self, cell: TSafe<ActorCell>, bid: usize, mailbox: TSafe<Mailbox + Send>, actor: TSafe<Actor + Send>, envelope: Envelope) {
107        let mut mailbox_u = mailbox.lock().unwrap();
108        mailbox_u.enqueue(envelope);
109        //if !mailbox_u.is_planned() {
110            //mailbox_u.set_planned(true);
111
112            let mailbox = mailbox.clone();
113            let f = Box::new(move || {
114                DefaultDispatcher::invoke(&mailbox, &actor, &cell)
115            });
116
117            self.execute(f,  Some( Box::new(TaskOptions { thread_id: Some(bid) } )))
118        //}
119    }
120
121    fn obtain_bid(self: &mut Self) -> usize {
122        if self.rounds == 4 - 1 {
123            self.rounds = 0;
124        } else {
125            self.rounds = self.rounds + 1;
126        }
127
128        self.rounds
129    }
130}