Expand description
A framework to implement the reactive pattern on real-time systems.
§Example of reactive actor executed by a dispatcher in its thread
use rtactor::dispatcher;
use rtactor::{spawn_dispatcher, ActiveMailbox, Addr, Behavior, Message, ProcessContext, send_notification};
use std::time::Duration;
// A very simple reactive actor that allows incrementing and querying an integer.
struct TestReactive {
pub val: i32,
}
enum Notification {
Increment(i32),
}
enum Request {
GetValue,
ToString(String /*label*/),
}
enum Response {
GetValue(i32),
ToString(String),
}
impl Behavior for TestReactive {
fn process_message<'a>(&mut self, context: &'a mut ProcessContext, msg: &Message) {
match msg {
Message::Notification(notif) => {
if let Some(notif) = notif.data.downcast_ref::<Notification>() {
match notif {
Notification::Increment(increment) => self.val += increment,
}
}
}
Message::Request(request) => {
if let Some(data) = request.data.downcast_ref::<Request>() {
match data {
Request::GetValue => {
context.send_response(request, Response::GetValue(self.val));
}
Request::ToString(label) => context.send_response(
&request,
Response::ToString(format!("{label}: {}", self.val)),
),
}
}
}
_ => panic!(),
}
}
}
let initial_value = 0;
// Start a dispatcher inside its own thread.
// The active object is created with the closure called inside the dispatcher thread.
// This allows to have reactive object that are not movable between threads.
let (dispatcher_addr, join_handle, test_reactive_addr) = spawn_dispatcher(10, move |disp| {
// Create a reactive object on the heap.
let test_reactive = Box::new(TestReactive { val: initial_value });
// Move it inside the dispatcher and return the reactive address as the return of `setup_func`
disp.register_reactive(test_reactive)
});
send_notification(&test_reactive_addr, Notification::Increment(10))
.unwrap();
// Create an active object to interact with the reactive under test.
let mut prober = ActiveMailbox::new(1);
// Request the value.
let result = prober.request_for::<_, Response>(
&test_reactive_addr,
Request::GetValue,
Duration::from_secs(10),
);
if let Ok(Response::GetValue(val)) = result {
assert_eq!(val, 10);
} else {
panic!();
}
// An other notification.
send_notification(&test_reactive_addr, Notification::Increment(-3))
.unwrap();
// An other different request.
let result = prober.request_for::<_, Response>(
&test_reactive_addr,
Request::ToString("the value".to_string()),
Duration::from_secs(10),
);
if let Ok(Response::ToString(s)) = result {
assert_eq!(s, "the value: 7");
} else {
panic!();
}
// Request to stop the dispatcher using its own address.
let result = prober.request_for::<_, dispatcher::Response>(
&dispatcher_addr,
dispatcher::Request::StopDispatcher{},
Duration::from_secs(10),
);
if let Ok(dispatcher::Response::StopDispatcher()) = result {
} else {
panic!();
}
// Wait that the dispatcher thread finishes.
join_handle.join().unwrap();
§Example of simulation of a reactive actor in a single threaded test
use rtactor::simulation::SimulationDispatcher;
use rtactor::{ActiveMailbox, Behavior, Message, ProcessContext, send_notification};
use std::time::Duration;
// A very simple reactive actor that allows incrementing and querying an integer.
struct TestReactive {
pub val: i32,
}
enum Notification {
Increment(i32),
}
enum Request {
GetValue,
ToString(String /*label*/),
}
enum Response {
GetValue(i32),
ToString(String),
}
impl Behavior for TestReactive {
fn process_message<'a>(&mut self, context: &'a mut ProcessContext, msg: &Message) {
match msg {
Message::Notification(notif) => {
if let Some(notif) = notif.data.downcast_ref::<Notification>() {
match notif {
Notification::Increment(increment) => self.val += increment,
}
}
}
Message::Request(request) => {
if let Some(data) = request.data.downcast_ref::<Request>() {
match data {
Request::GetValue => {
context.send_response(request, Response::GetValue(self.val))
}
Request::ToString(label) => context.send_response(
&request,
Response::ToString(format!("{label}: {}", self.val)),
),
}
}
}
_ => panic!(),
}
}
}
// Create a simulation dispatcher.
let mut disp = SimulationDispatcher::new(10);
// Create a reactive object on the heap.
let test_reactive = Box::new(TestReactive { val: 0 });
// Move it inside the dispatcher. It starts the dispatch of messages for it.
let test_reactive_addr = disp.register_reactive(test_reactive);
// Send a notification to the reactive.
send_notification(&test_reactive_addr, Notification::Increment(10))
.unwrap();
// Create an active object to interact with the reactive under test.
let mut prober = ActiveMailbox::new(1);
// Ask the simulation dispatcher to simulate a request by the active actor.
let result = disp.active_request_for::<_, Response>(
&mut prober,
&test_reactive_addr,
Request::GetValue,
Duration::from_secs(10),
);
if let Ok(Response::GetValue(val)) = result {
assert_eq!(val, 10);
} else {
panic!();
}
// An other notification.
send_notification(&test_reactive_addr, Notification::Increment(-3))
.unwrap();
// An other different request.
let result = disp.active_request_for::<_, Response>(
&mut prober,
&test_reactive_addr,
Request::ToString("the value".to_string()),
Duration::from_secs(10),
);
if let Ok(Response::ToString(s)) = result {
assert_eq!(s, "the value: 7");
} else {
panic!();
}
// No need to stop the dispatcher, there is no thread, everything is single threaded.
// The reactive actor will be dropped by the drop of the simulation dispatcher.
§Doc about actors
A good explanation why the actor pattern is good for multitask: https://getakka.net/articles/intro/what-problems-does-actor-model-solve.html
A well designed rust actor framework (but not suitable for real-time): https://docs.rs/axiom/latest/axiom/
Re-exports§
pub extern crate rtactor_macros;
pub use mpsc_dispatcher::spawn_dispatcher;
pub use mpsc_dispatcher::MpscDispatcher;
pub use async_actor::AsyncAccessor;
pub use async_actor::AsyncMailbox;
Modules§
- async_
actor - dispatcher
- An actor interface to a dispatcher for reactive actor.
- mpsc_
dispatcher - Dispatcher implementation based on std::sync::mpsc::sync_channel.
- profiled_
actor - An interface for actor profiled by profiling_aggregator and the corresponding Metrics struct.
- profiling_
aggregator - An actor were addresses of profiled_actor are registered and collect metrics when a snapshot is taken.
- simulation
- Various tools to allow time simulation of actors.
Macros§
- define_
async_ accessor - Create a struct that allows a blocking access to a address and implements one or more AsyncRequester or AsyncNotifier
- define_
sim_ sync_ accessor - Create a struct that allows a simulation access to an address and implement some or more SyncRequester or SyncNotifier
- define_
sync_ accessor - Create a struct that allows a blocking access to a address and implements one or more SyncRequester or SyncNotifier
Structs§
- Active
Mailbox - Actor that has it own message queue and manage actively how to wait on it.
- Addr
- Way to send message to an actor.
- Dummy
Behavior - An implementation of behavior that do nothing in process_message().
- Instant
- Point in time similar to
std::time::Instant
but that allows simulation for reactive. - Notification
- Message without any confirmation.
- Process
Context - An way to interact with the dispatcher that is only valid during
Behavior::process_message()
. - Request
- Message to ask something and that will generate a
Response
tosrc
withrequest_id
=id
. - Response
- Message to respond to a
Request
. - Timeout
- Data send in Notification when a timer mature.
- Timer
- An handle to a timer that can send a message to a Behavior at a given time.
Enums§
- Error
- Error returned during actor operations.
- Message
- The basic block of information that can be exchanged between actors.
Traits§
- Behavior
- The custom implementation of message handling for a given reactive actor.
- Sync
Accessor - Super trait used by SyncRequester and SyncNotifier to send requests and notifications
Functions§
- send_
notification - Send notification.
Type Aliases§
- Active
Actor Deprecated - Request
Id - Identifier of a request that allows to drop response not part of the transaction.