1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
//! An actor interface to a dispatcher for reactive actor.
use rtactor_macros::{ResponseEnum, SyncRequester};
use std::boxed::Box;
use std::time::Duration;
use crate::ActiveMailbox;
use super::actor::Addr;
use super::reactive::Behavior;
use std::any::Any;
use std::cell::Cell;
pub trait Dispatcher {
#[deprecated = "use the better named addr()"]
fn own_addr(&self) -> Addr {
self.addr()
}
/// Return the actor address of the dispatcher itself.
fn addr(&self) -> Addr;
/// Migrate a reactive inside the dispatcher and return an actor address to it.
fn register_reactive(&mut self, behavior: Box<dyn Behavior>) -> Addr;
/// Replace an implementation of a reactive by a new one.
/// This only change the object that responds to process message.
/// This is thread safe but of course the management of new and old
/// inner states transition should be taken into account.
/// All copies of the address are still valid.
///
/// Return Ok(old_behavior) or Err(new_behavior)
fn replace_reactive(
&mut self,
addr: &Addr,
behavior: Box<dyn Behavior>,
) -> Result<Box<dyn Behavior>, Box<dyn Behavior>>;
/// Extract a reactive from the dispatcher by it address.
fn unregister_reactive(&mut self, addr: &Addr) -> Option<Box<dyn Behavior>>;
}
/// A function that can be executed by the dispatcher with a `dispatcher::Request::ExecuteFn`.
pub type ExecutableFn = dyn FnOnce(&mut dyn Dispatcher) -> Box<dyn Any + Send> + Send;
#[derive(ResponseEnum, SyncRequester)]
pub enum Request {
/// Register a reactive that is Send.
/// To register reactives that are not Send, do it in the spawn_dispatcher() function.
#[response_val(Addr)]
RegisterReactive {
behavior: Cell<Option<Box<dyn Behavior + Send>>>,
},
// TODO deprecated because it make no sense. "#[deprecated]" is not used because it create a
// warning probably due to the ResponseEnum proc_macro
#[response_val(Box<dyn Behavior + Send>)]
StopReactive { addr: Addr },
/// Stop the dispatch of message and return from process().
///
/// All Request not processed are responded with a Error::ActorDisappeared. Then
/// all registered behavior are destroyed. Finally a Response::StopDispatcher is sent.
#[response_val()]
StopDispatcher {},
/// Execute a FnOnce inside the dispatcher thread.
/// The Response returns the return value of the FnOnce.
/// The Cell is necessary to allow FnOnce in the implementation.
#[response_val(Box<dyn Any + Send>)]
ExecuteFn {
executable_fn: Cell<Box<ExecutableFn>>,
},
}
/// An accessor for a dispatcher with specialized functions.
pub struct SyncAccessor {
mailbox: ActiveMailbox,
disp_addr: Addr,
}
impl SyncAccessor {
/// Create an accessor from a dispatcher address.
pub fn new(disp_addr: &Addr) -> Self {
Self {
mailbox: ActiveMailbox::new(1),
disp_addr: disp_addr.clone(),
}
}
/// Build from an existing dispatcher.
#[deprecated = "use new() instead and do join() manually if needed"]
pub fn from_existing_dispatcher(
disp_addr: Addr,
_join_handle: std::thread::JoinHandle<()>,
) -> Self {
Self {
mailbox: ActiveMailbox::new(1),
disp_addr,
}
}
/// Return the dispatcher address.
pub fn dispatcher_addr(&self) -> &Addr {
&self.disp_addr
}
/// Register a reactive that will be moved in a Box into the dispatcher.
///
/// If the Behavior is not Send, the pattern to use is to use execute_fn()
/// and to construct the behavior implementation in f.
pub fn register_reactive_for<T>(
&mut self,
reactive: T,
timeout: Duration,
) -> Result<Addr, crate::Error>
where
T: Behavior + Send + 'static,
{
let result = self.mailbox.request_for::<_, Response>(
&self.disp_addr,
Request::RegisterReactive {
behavior: Cell::new(Some(Box::new(reactive))),
},
timeout,
);
match result {
Ok(response) => {
if let Response::RegisterReactive(addr) = response {
Ok(addr)
} else {
Err(crate::Error::DowncastFailed)
}
}
Err(e) => Err(e),
}
}
/// Replace a reactive and drop the older one in the dispatcher thread.
///
/// Because Behavior are not Send in the dispatcher, ones as to use execute_fn()
/// replace_reactive() to get the old behavior.
pub fn replace_reactive_for<T>(
&mut self,
addr: &Addr,
reactive: T,
timeout: Duration,
) -> Result<(), crate::Error>
where
T: Behavior + Send + 'static,
{
let boxed_reactive = Box::new(reactive);
let addr = addr.clone();
match self.execute_fn(
move |disp| match disp.replace_reactive(&addr, boxed_reactive) {
Ok(_) => Ok(()),
Err(_) => Err(()),
},
timeout,
) {
Ok(Ok(())) => Ok(()),
Ok(Err(())) => Err(crate::Error::ActorDisappeared),
Err(e) => Err(e),
}
}
/// Same as register_reactive_for(reactive, Duration::from_secs(60)) that panics if something goes wrong.
/// To be used with a valid accessor.
pub fn register_reactive_unwrap<T>(&mut self, reactive: T) -> Addr
where
T: Behavior + Send + 'static,
{
match self.register_reactive_for(
reactive,
// One minute reaction time is way beyond a not broken system.
Duration::from_secs(60),
) {
Ok(addr) => {
if addr.is_valid() {
addr
} else {
panic!("register_reactive_unwrap() failed because addr was INVALID");
}
}
Err(e) => panic!("register_reactive_unwrap() failed with error {:?}", e),
}
}
/// Same as replace_reactive_for(addr, reactive, Duration::from_secs(60)) that panics if something goes wrong.
/// To be used with a valid accessor.
pub fn replace_reactive_unwrap<T>(&mut self, addr: &Addr, reactive: T)
where
T: Behavior + Send + 'static,
{
match self.replace_reactive_for(
addr,
reactive,
// One minute reaction time is way beyond a not broken system.
Duration::from_secs(60),
) {
Ok(()) => {}
Err(e) => panic!("replace_reactive_unwrap() failed with error {:?}", e),
}
}
/// Execute a function inside the thread of the dispatcher.
/// Useful to add to register to the dispatcher actor that are not Send.
pub fn execute_fn<F, T>(&mut self, f: F, timeout: Duration) -> Result<T, crate::Error>
where
F: FnOnce(&mut dyn Dispatcher) -> T + Send + 'static,
T: Send + 'static + Sized,
{
let result = self.mailbox.request_for::<_, Box<dyn Any + Send>>(
&self.disp_addr,
Request::ExecuteFn {
executable_fn: Cell::new(Box::new(move |disp| Box::new((f)(disp)))),
},
timeout,
);
match result {
Ok(boxed) => match boxed.downcast::<T>() {
Ok(t) => Ok(*t),
Err(_) => Err(crate::Error::DowncastFailed),
},
Err(_) => todo!(),
}
}
/// Join the thread of the dispatcher that must have been stopped by other means.
/// Return the result of join().
#[deprecated = "do join() manually if needed"]
pub fn join_dispatcher_thread(&mut self) -> Result<(), Box<(dyn Any + Send + 'static)>> {
Ok(())
}
/// Stop the dispatcher.
/// Return the result from the request_for.
pub fn stop_dispatcher(&mut self, timeout: Duration) -> Result<(), crate::Error> {
if !self.disp_addr.is_valid() {
return Ok(());
}
match self.mailbox.request_for::<_, Response>(
&self.disp_addr,
Request::StopDispatcher {},
timeout,
) {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
}