seal_rs/actors/
default_dispatcher.rs1use crate::executors::thread_pinned_executor::{ThreadPinnedExecutor, DistributionStrategy, TaskOptions};
3use crate::executors::executor::{Executor,ExecutorTask};
4use crate::actors::dispatcher::Dispatcher;
5use crate::actors::actor_cell::ActorCell;
6use crate::actors::local_actor_ref::LocalActorRef;
7use crate::actors::abstract_actor_ref::AbstractActorRef;
8use crate::actors::actor_context::ActorContext;
9use crate::actors::envelope::Envelope;
10use crate::actors::mailbox::Mailbox;
11use crate::actors::actor::{Actor, PoisonPill};
12use crate::common::tsafe::TSafe;
13use std::any::Any;
14
15
16pub struct DefaultDispatcher {
18 executor: ThreadPinnedExecutor,
19 rounds: usize
20}
21
22impl DefaultDispatcher {
23 pub fn new(_t_count: u32) -> DefaultDispatcher {
24 let executor = ThreadPinnedExecutor::new()
25 .set_distribution_strategy(DistributionStrategy::Load)
26 .run();
28 DefaultDispatcher {
29 executor,
30 rounds: 0
31 }
32 }
33
34 pub fn invoke(mailbox: &TSafe<Mailbox + Send>, actor: &TSafe<Actor + Send>, cell: &TSafe<ActorCell>) {
35 let envelope = {
36 let mut mailbox = mailbox.lock().unwrap();
37 if mailbox.has_messages() {
38 Some(mailbox.dequeue())
39 } else {
40 None
41 }
42 };
43
44 if envelope.is_some() {
45 let envelope = envelope.unwrap();
46
47 let sender: Box<AbstractActorRef + Send> = {
48 if envelope.sender.is_some() {
49 envelope.sender.unwrap()
50 } else {
51 let mut system = envelope.system.lock().unwrap();
52 let dead_letters = system.dead_letters();
53 dead_letters
54 }
55 };
56
57
58 let msg = envelope.message;
59
60 let mut actor = actor.lock().unwrap();
61 let ctx = ActorContext::new( sender.clone(), envelope.receiver.clone(), envelope.system.clone());
62 let handled = actor.receive(&msg, ctx);
63
64 if !handled {
65 let handled2 = DefaultDispatcher::internal_receive(mailbox, &msg, cell);
66 if !handled2 {
67 let mut dead_letters = {
68 let mut system = envelope.system.lock().unwrap();
69 let dead_letters = system.dead_letters();
70 dead_letters
71 };
72 dead_letters.cell().lock().unwrap().send(&dead_letters.cell(), msg, Some(sender), envelope.receiver );
73
74 }
75 }
76 }
77
78 mailbox.lock().unwrap().set_planned(false);
79 }
80
81 pub fn internal_receive(mailbox: &TSafe<Mailbox + Send>, msg: &Box<Any + Send>, cell: &TSafe<ActorCell>) -> bool {
82
83 if let Some(PoisonPill {}) = msg.downcast_ref::<PoisonPill>() {
84 let mut cell_u = cell.lock().unwrap();
85 cell_u.suspend();
86 let dead_letters = cell_u.system.lock().unwrap().dead_letters();
88 mailbox.lock().unwrap().clean_up(Box::new(LocalActorRef::new(cell.clone(), cell_u.path.clone())), dead_letters);
89 cell_u.stop(cell.clone());
90 } else {
91 return false
92 }
93
94 true
95 }
96}
97
98impl Executor for DefaultDispatcher {
99 fn execute(&mut self, f: ExecutorTask, options: Option<Box<Any>>) {
100 self.executor.execute(f, options)
101 }
102}
103
104impl Dispatcher for DefaultDispatcher {
105
106 fn dispatch(self: &mut Self, cell: TSafe<ActorCell>, bid: usize, mailbox: TSafe<Mailbox + Send>, actor: TSafe<Actor + Send>, envelope: Envelope) {
107 let mut mailbox_u = mailbox.lock().unwrap();
108 mailbox_u.enqueue(envelope);
109 let mailbox = mailbox.clone();
113 let f = Box::new(move || {
114 DefaultDispatcher::invoke(&mailbox, &actor, &cell)
115 });
116
117 self.execute(f, Some( Box::new(TaskOptions { thread_id: Some(bid) } )))
118 }
120
121 fn obtain_bid(self: &mut Self) -> usize {
122 if self.rounds == 4 - 1 {
123 self.rounds = 0;
124 } else {
125 self.rounds = self.rounds + 1;
126 }
127
128 self.rounds
129 }
130}