1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
/// Example code provided in the top level README.md.
use std::{cell::RefCell, time::Duration};
use rtactor as rt;
pub mod can_controller {
use rtactor::rtactor_macros::ResponseEnum;
#[derive(Debug)]
pub enum Error {
SendFailed,
InvalidId,
InvalidLength,
}
#[derive(ResponseEnum)]
pub enum Request {
#[response_val(Result<(), Error>)]
SendMessage { id: u32, length: u8, data: [i16; 4] },
}
}
pub mod acceleration_broadcaster {
use rtactor as rt;
use rt::{
define_sim_sync_accessor, define_sync_accessor,
rtactor_macros::{ResponseEnum, SyncNotifier, SyncRequester},
};
#[derive(ResponseEnum, SyncRequester)]
pub enum Request {
Start { can_controller_addr: rt::Addr },
Stop {},
}
#[derive(SyncNotifier)]
pub enum Notification {
AccelerationSample { acceleration: [i16; 3] },
}
define_sim_sync_accessor!(SimSyncAccessor, SyncRequester, SyncNotifier);
define_sync_accessor!(SyncAccessor, SyncRequester, SyncNotifier);
}
pub mod simple_acceleration_broadcaster {
use rtactor as rt;
use std::time::Duration;
use crate::{acceleration_broadcaster, can_controller};
enum State {
Stopped,
WaitSamples,
SendingCanMessage { request_id: rt::RequestId },
}
pub struct Broadcaster {
can_controller_addr: rt::Addr,
sample_count: u32,
acceleration_sum: [i32; 3],
state: State,
timer: rt::Timer,
}
const BROADCAST_PERIOD: Duration = Duration::from_millis(500);
const CAN_ACCELERATION_ID: u32 = 0x1000;
impl Broadcaster {
pub fn new() -> Broadcaster {
Broadcaster {
can_controller_addr: rt::Addr::INVALID,
sample_count: 0,
acceleration_sum: [0; 3],
state: State::Stopped,
timer: rt::Timer::new(),
}
}
fn reset_sum(&mut self) {
for a in &mut self.acceleration_sum {
*a = 0;
}
self.sample_count = 0;
}
}
impl Default for Broadcaster {
fn default() -> Self {
Self::new()
}
}
impl rt::Behavior for Broadcaster {
fn process_message(&mut self, context: &mut rt::ProcessContext, msg: &rt::Message) {
match msg {
rt::Message::Request(request) => {
if let Some(req_data) = request.data.downcast_ref() {
self.process_broadcaster_request(context, request, req_data);
}
}
rt::Message::Response(response) => {
if let Ok(resp_data) = response.result.as_ref() {
if let Some(resp_data) = resp_data.downcast_ref() {
self.process_can_controller_response(response, resp_data);
}
}
}
rt::Message::Notification(notification) => {
if let Some(notif_data) = notification.data.downcast_ref() {
self.process_accelerometer_notification(notif_data);
} else if self.timer.is_scheduling(notification) {
self.process_timer_elapsed(context);
}
}
}
}
}
impl Broadcaster {
fn process_broadcaster_request(
&mut self,
context: &mut rt::ProcessContext,
request: &rt::Request,
data: &acceleration_broadcaster::Request,
) {
match data {
acceleration_broadcaster::Request::Start {
can_controller_addr,
} => {
if let State::Stopped = self.state {
self.can_controller_addr = can_controller_addr.clone();
context.schedule_for(&mut self.timer, BROADCAST_PERIOD);
self.state = State::WaitSamples;
self.reset_sum();
}
context.send_response(request, acceleration_broadcaster::Response::Start());
}
acceleration_broadcaster::Request::Stop {} => {
if let State::Stopped = self.state {
} else {
context.unschedule(&mut self.timer);
self.state = State::Stopped;
self.reset_sum();
}
context.send_response(request, acceleration_broadcaster::Response::Stop());
}
}
}
fn process_accelerometer_notification(
&mut self,
data: &acceleration_broadcaster::Notification,
) {
let acceleration_broadcaster::Notification::AccelerationSample { acceleration } = data;
for (i, a) in acceleration.iter().enumerate() {
self.acceleration_sum[i] += *a as i32;
}
self.sample_count += 1;
}
fn process_timer_elapsed(&mut self, context: &mut rt::ProcessContext) {
if let State::WaitSamples = self.state {
if self.sample_count > 0 {
let mut data = [0i16; 4];
for (i, a_sum) in self.acceleration_sum.into_iter().enumerate() {
let a = a_sum / (self.sample_count as i32);
data[i] = a as i16;
}
let request_id = context.send_request(
&self.can_controller_addr,
can_controller::Request::SendMessage {
id: CAN_ACCELERATION_ID,
length: 3,
data,
},
);
self.state = State::SendingCanMessage { request_id };
}
}
self.reset_sum();
context.schedule_for(&mut self.timer, BROADCAST_PERIOD);
}
fn process_can_controller_response(
&mut self,
response: &rt::Response,
data: &can_controller::Response,
) {
let can_controller::Response::SendMessage(result) = data;
if let State::SendingCanMessage { request_id } = self.state {
if response.id_eq(request_id) {
if let Err(error) = result {
println!("can error: {error:?}");
}
self.state = State::WaitSamples;
}
}
}
}
}
#[test]
fn test_broadcaster_simulated() {
const ACCELEROMETER_T_SAMPLE: Duration = Duration::from_millis(50);
// Create the simulation dispatcher. To use "SimSyncAccessor" it
// has to be wrapper in a RefCell.
let disp = RefCell::new(rt::simulation::SimulationDispatcher::new(10));
// Create the broadcaster actor in a Box. This allows to move it
// in the dispatcher with `register_reactive`. The actor is now
// managed by the dispatcher. The address of the broadcaster is returned.
let broadcaster_addr = disp
.borrow_mut()
.register_reactive(Box::new(simple_acceleration_broadcaster::Broadcaster::new()));
// Create a struct that allows to access synchronously to the broadcaster
// request and notification. This makes writing tests much less verbose.
let mut broadcaster_accessor =
acceleration_broadcaster::SimSyncAccessor::new(&disp, &broadcaster_addr);
// Create an active actor to simulate the CAN controller. Here the
// actor interface is used to fake the behavior. It is also possible to
// use mock created with mock libs like `mockall`.
let mut fake_can = rt::ActiveMailbox::new(10);
// It is necessary to bring these trait to the scope to use them.
use crate::acceleration_broadcaster::{SyncNotifier, SyncRequester};
let begin = disp.borrow_mut().now();
// Use the accessor to start the broadcaster. It is expected that
// the execution is without delay. This is possible because in the simulation
// the CPU processing take no time.
broadcaster_accessor
.start(fake_can.addr(), Duration::ZERO)
.unwrap();
// Make the simulation advance half the accelerometer sampling rate
// so the can message is guaranteed to be received in the test bellow.
// Any queued message is processed in `process_for` (but in this case none should be).
disp.borrow_mut().process_for(ACCELEROMETER_T_SAMPLE / 2);
for i in 0..3 {
for j in 0..10 {
println!(
"i={i}, j={j} {:?}",
disp.borrow_mut().now().saturating_sub(&begin)
);
// Use the accessor to send the accelerometer notification.
broadcaster_accessor
.acceleration_sample([-1000, 2000, 3000])
.unwrap();
disp.borrow_mut().process_for(ACCELEROMETER_T_SAMPLE);
}
// It is very important to use the method `active_wait_message*` of the dispatcher
// when working with active actors. This insures processing of the queued messages
// in the dispatcher and use of the simulated time. It's not the case if
// `ActiveMailbox::wait_message*` methods of `ActiveMailbox` are used.
let msg = disp
.borrow_mut()
.active_wait_message_for(&mut fake_can, Duration::ZERO)
.unwrap();
if let rt::Message::Request(request) = msg {
if let Some(can_controller::Request::SendMessage { id, length, data }) =
request.data.downcast_ref()
{
assert_eq!(*id, 0x1000);
assert_eq!(*length, 3);
assert_eq!(data[0], -1000);
assert_eq!(data[1], 2000);
assert_eq!(data[2], 3000);
} else {
panic!();
}
// Simulate the response.
fake_can
.responds(request, can_controller::Response::SendMessage(Ok(())))
.unwrap();
} else {
panic!();
}
}
broadcaster_accessor.stop(Duration::ZERO).unwrap();
}
#[test]
fn test_broadcaster_threaded() {
// Max real time duration for a dispatcher operation. For example
// CI tasks can be frozen by other activities and lead to
// test falsely failing.
const MAX_EXEC_DURATION: Duration = Duration::from_secs(5);
// Start a dispatcher in it own thread. The creation of one or
// many reactives is done with a FnOnce called inside the thread
// of the dispatcher. This allows to keep the construction single
// threaded, simplifying the use of single threaded libs and code.
// `spawn_dispatcher` returns as third value the return of the FnOnce,
// here the broadcaster address.
let builder = rt::mpsc_dispatcher::Builder::new(10);
let mut disp_accessor = builder.to_accessor();
let join_handle = std::thread::spawn(|| builder.build().process());
let mut broadcaster_accessor = acceleration_broadcaster::SyncAccessor::new(
&disp_accessor
.register_reactive_unwrap(simple_acceleration_broadcaster::Broadcaster::new()),
);
use crate::acceleration_broadcaster::SyncRequester;
let fake_can = rt::ActiveMailbox::new(10);
// Simply start and stop the broadcaster.
broadcaster_accessor
.start(fake_can.addr(), MAX_EXEC_DURATION)
.unwrap();
broadcaster_accessor.stop(MAX_EXEC_DURATION).unwrap();
// Ask the dispatcher to stop its operations.
disp_accessor.stop_dispatcher(MAX_EXEC_DURATION).unwrap();
// It is not strictly necessary to join() here because stop_dispatcher()
// insure that all behaviors are destructed and all non processed Request
// responded with an error. The thread after it will only live a short time
// to destroy the dispatcher and its helpers (TimeoutManager, ProcessContext).
// So it make sense only to wait that the memory used is given back to the system,
// seldom a concern.
join_handle.join().unwrap();
}