Crate rtactor

Crate rtactor 

Source
Expand description

A framework to implement the reactive pattern on real-time systems.

§Example of reactive actor executed by a dispatcher in its thread

use rtactor::dispatcher;
use rtactor::{spawn_dispatcher, ActiveMailbox, Addr, Behavior, Message, ProcessContext, send_notification};
use std::time::Duration;

   // A very simple reactive actor that allows incrementing and querying an integer.
   struct TestReactive {
       pub val: i32,
   }

   enum Notification {
       Increment(i32),
   }

   enum Request {
       GetValue,
       ToString(String /*label*/),
   }

   enum Response {
       GetValue(i32),
       ToString(String),
   }

   impl Behavior for TestReactive {
       fn process_message<'a>(&mut self, context: &'a mut ProcessContext, msg: &Message) {
           match msg {
               Message::Notification(notif) => {
                   if let Some(notif) = notif.data.downcast_ref::<Notification>() {
                       match notif {
                           Notification::Increment(increment) => self.val += increment,
                       }
                   }
               }
               Message::Request(request) => {
                   if let Some(data) = request.data.downcast_ref::<Request>() {
                       match data {
                           Request::GetValue => {
                               context.send_response(request, Response::GetValue(self.val));
                           }

                           Request::ToString(label) => context.send_response(
                               &request,
                               Response::ToString(format!("{label}: {}", self.val)),
                           ),
                       }
                   }
               }
               _ => panic!(),
           }
       }
   }

   let initial_value = 0;

   // Start a dispatcher inside its own thread.
   // The active object is created with the closure called inside the dispatcher thread.
   // This allows to have reactive object that are not movable between threads.
   let (dispatcher_addr, join_handle, test_reactive_addr) = spawn_dispatcher(10, move |disp| {
       // Create a reactive object on the heap.
       let test_reactive = Box::new(TestReactive { val: initial_value });
       // Move it inside the dispatcher and return the reactive address as the return of `setup_func`
       disp.register_reactive(test_reactive)
   });

   send_notification(&test_reactive_addr, Notification::Increment(10))
   .unwrap();

   // Create an active object to interact with the reactive under test.
   let mut prober = ActiveMailbox::new(1);

   // Request the value.
   let result = prober.request_for::<_, Response>(
       &test_reactive_addr,
       Request::GetValue,
       Duration::from_secs(10),
   );

   if let Ok(Response::GetValue(val)) = result {
       assert_eq!(val, 10);
   } else {
       panic!();
   }

   // An other notification.
   send_notification(&test_reactive_addr, Notification::Increment(-3))
   .unwrap();

   // An other different request.
   let result = prober.request_for::<_, Response>(
       &test_reactive_addr,
       Request::ToString("the value".to_string()),
       Duration::from_secs(10),
   );
   if let Ok(Response::ToString(s)) = result {
       assert_eq!(s, "the value: 7");
   } else {
       panic!();
   }

   // Request to stop the dispatcher using its own address.
   let result = prober.request_for::<_, dispatcher::Response>(
       &dispatcher_addr,
       dispatcher::Request::StopDispatcher{},
       Duration::from_secs(10),
   );
   if let Ok(dispatcher::Response::StopDispatcher()) = result {
   } else {
       panic!();
   }

   // Wait that the dispatcher thread finishes.
   join_handle.join().unwrap();

§Example of simulation of a reactive actor in a single threaded test

use rtactor::simulation::SimulationDispatcher;
use rtactor::{ActiveMailbox, Behavior, Message, ProcessContext, send_notification};
use std::time::Duration;

// A very simple reactive actor that allows incrementing and querying an integer.
struct TestReactive {
    pub val: i32,
}

enum Notification {
    Increment(i32),
}

enum Request {
    GetValue,
    ToString(String /*label*/),
}

enum Response {
    GetValue(i32),
    ToString(String),
}

impl Behavior for TestReactive {
    fn process_message<'a>(&mut self, context: &'a mut ProcessContext, msg: &Message) {
        match msg {
            Message::Notification(notif) => {
                if let Some(notif) = notif.data.downcast_ref::<Notification>() {
                    match notif {
                        Notification::Increment(increment) => self.val += increment,
                    }
                }
            }
            Message::Request(request) => {
                if let Some(data) = request.data.downcast_ref::<Request>() {
                    match data {
                        Request::GetValue => {
                            context.send_response(request, Response::GetValue(self.val))
                        }

                        Request::ToString(label) => context.send_response(
                            &request,
                            Response::ToString(format!("{label}: {}", self.val)),
                        ),
                    }
                }
            }
            _ => panic!(),
        }
    }
}

// Create a simulation dispatcher.
let mut disp = SimulationDispatcher::new(10);

// Create a reactive object on the heap.
let test_reactive = Box::new(TestReactive { val: 0 });

// Move it inside the dispatcher. It starts the dispatch of messages for it.
let test_reactive_addr = disp.register_reactive(test_reactive);

// Send a notification to the reactive.
    send_notification(&test_reactive_addr, Notification::Increment(10))
    .unwrap();

// Create an active object to interact with the reactive under test.
let mut prober = ActiveMailbox::new(1);

// Ask the simulation dispatcher to simulate a request by the active actor.
let result = disp.active_request_for::<_, Response>(
    &mut prober,
    &test_reactive_addr,
    Request::GetValue,
    Duration::from_secs(10),
);
if let Ok(Response::GetValue(val)) = result {
    assert_eq!(val, 10);
} else {
    panic!();
}

// An other notification.
    send_notification(&test_reactive_addr, Notification::Increment(-3))
    .unwrap();

// An other different request.
let result = disp.active_request_for::<_, Response>(
    &mut prober,
    &test_reactive_addr,
    Request::ToString("the value".to_string()),
    Duration::from_secs(10),
);
if let Ok(Response::ToString(s)) = result {
    assert_eq!(s, "the value: 7");
} else {
    panic!();
}

// No need to stop the dispatcher, there is no thread, everything is single threaded.
// The reactive actor will be dropped by the drop of the simulation dispatcher.

§Doc about actors

A good explanation why the actor pattern is good for multitask: https://getakka.net/articles/intro/what-problems-does-actor-model-solve.html

A well designed rust actor framework (but not suitable for real-time): https://docs.rs/axiom/latest/axiom/

Re-exports§

pub extern crate rtactor_macros;
pub use mpsc_dispatcher::spawn_dispatcher;
pub use mpsc_dispatcher::MpscDispatcher;
pub use async_actor::AsyncAccessor;
pub use async_actor::AsyncMailbox;

Modules§

async_actor
dispatcher
An actor interface to a dispatcher for reactive actor.
mpsc_dispatcher
Dispatcher implementation based on std::sync::mpsc::sync_channel.
profiled_actor
An interface for actor profiled by profiling_aggregator and the corresponding Metrics struct.
profiling_aggregator
An actor were addresses of profiled_actor are registered and collect metrics when a snapshot is taken.
simulation
Various tools to allow time simulation of actors.

Macros§

define_async_accessor
Create a struct that allows a blocking access to a address and implements one or more AsyncRequester or AsyncNotifier
define_sim_sync_accessor
Create a struct that allows a simulation access to an address and implement some or more SyncRequester or SyncNotifier
define_sync_accessor
Create a struct that allows a blocking access to a address and implements one or more SyncRequester or SyncNotifier

Structs§

ActiveMailbox
Actor that has it own message queue and manage actively how to wait on it.
Addr
Way to send message to an actor.
DummyBehavior
An implementation of behavior that do nothing in process_message().
Instant
Point in time similar to std::time::Instant but that allows simulation for reactive.
Notification
Message without any confirmation.
ProcessContext
An way to interact with the dispatcher that is only valid during Behavior::process_message().
Request
Message to ask something and that will generate a Response to src with request_id=id.
Response
Message to respond to a Request.
Timeout
Data send in Notification when a timer mature.
Timer
An handle to a timer that can send a message to a Behavior at a given time.

Enums§

Error
Error returned during actor operations.
Message
The basic block of information that can be exchanged between actors.

Traits§

Behavior
The custom implementation of message handling for a given reactive actor.
SyncAccessor
Super trait used by SyncRequester and SyncNotifier to send requests and notifications

Functions§

send_notification
Send notification.

Type Aliases§

ActiveActorDeprecated
RequestId
Identifier of a request that allows to drop response not part of the transaction.