Crate safe_drive
source ·Expand description
safe_drive: Formally Specified Rust Bindings for ROS2
safe_drive
is a Rust bindings for ROS2.
This library provides formal specifications and tested the specifications by using a model checker.
Therefore, you can clearly understand how the scheduler work and the safeness of it.safe_drive
is a ROS2 bindings
Specifications
Some algorithms we adopted are formally specified and tested the safeness by using TLA+. Original ROS2’s executor (rclcpp) is suffering from starvation. In contrast, the starvation freedom of our executor has been validated by not only dynamic analysis but also formal verification.
See specifications.
We specified and tested as follows.
- Async/await Scheduler
- Deadlock freedom
- Starvation freedom
- Single Threaded Callback Execution
- Deadlock freedom
- Starvation freedom
- Initialize Once
- Deadlock freedom
- Termination
- Initialization is performed just once
Important Types
- Context
- Node
- Selector
- Topic
- Service
Examples
safe_drive
provides single threaded and multi threaded execution methods.
The single threaded execution is based on traditional callback based execution.
You need to register callback functions for subscribers, servers, clients, and timers.
The multi threaded execution is performed by async/await fashion. You can choose any async/await’s runtime.
Single Threaded Execution
This code is an example of Pub/Sub.
use safe_drive::{
context::Context, logger::Logger, msg::common_interfaces::std_msgs, pr_info,
};
use std::time::Duration;
// First of all, you need create a context.
let ctx = Context::new().unwrap();
// Create a publish node.
let node_pub = ctx
.create_node("publish_node", None, Default::default())
.unwrap();
// Create a subscribe node.
let node_sub = ctx
.create_node("subscribe_node", None, Default::default())
.unwrap();
// Create a publisher.
// The 2nd argument is for QoS.
// If `None` is specified to the 2nd argument, the default QoS will be used.
let publisher = node_pub
.create_publisher::<std_msgs::msg::String>("example_topic", None)
.unwrap();
// Create a subscriber.
let subscriber = node_sub
.create_subscriber::<std_msgs::msg::String>("example_topic", None)
.unwrap();
// Create a selector, which is for IO multiplexing.
let mut selector = ctx.create_selector().unwrap();
// Create loggers.
let logger_pub = Logger::new("example_publisher");
let logger_sub = Logger::new("example_subscriber");
// Add subscriber to the selector.
// The 2nd argument is a callback function.
// If data arrive, the callback will be invoked.
// The 3rd argument is used to specify the callback will be invoked only once or infinitely.
// If the 3rd argument is `true`, the callback function is invoked once and unregistered.
selector.add_subscriber(
subscriber,
Box::new(move |msg| {
// Print the message
pr_info!(logger_sub, "Received: msg = {}", msg.data); // Print a message.
}),
);
// Create a wall timer, which invoke the callback periodically.
selector.add_wall_timer(
"timer_name", // name of the timer
Duration::from_millis(100),
Box::new(move || {
let mut msg = std_msgs::msg::String::new().unwrap();
msg.data.assign("Hello, World!");
pr_info!(logger_pub, "Send: msg = {}", msg.data); // Print a message.
publisher.send(&msg).unwrap();
}),
);
// Spin.
for _ in 0..10 {
selector.wait().unwrap();
}
Multi Threaded Execution
This code uses async_std
as a async/await’s runtime.
You can use other runtime such as tokio
.
use safe_drive::{
context::Context,
error::DynError,
logger::Logger,
msg::common_interfaces::std_msgs,
pr_info, pr_warn,
topic::{publisher::Publisher, subscriber::Subscriber},
};
#[allow(unused_imports)]
use async_std::{future, prelude::*};
use std::time::Duration;
// Create a context.
let ctx = Context::new().unwrap();
// Create nodes.
let node_pub = ctx
.create_node("publish_node_async", None, Default::default())
.unwrap();
let node_sub = ctx
.create_node("subscribe_node_async", None, Default::default())
.unwrap();
// Create a publisher.
let publisher = node_pub
.create_publisher::<std_msgs::msg::String>("example_topic_async", None)
.unwrap();
// Create a subscriber.
let subscriber = node_sub
.create_subscriber::<std_msgs::msg::String>("example_topic_async", None)
.unwrap();
// Create tasks.
async_std::task::block_on(async {
let p = async_std::task::spawn(run_publisher(publisher));
let s = async_std::task::spawn(run_subscriber(subscriber));
p.await;
s.await;
});
/// The publisher.
async fn run_publisher(publisher: Publisher<std_msgs::msg::String>) {
let dur = Duration::from_millis(100);
let logger = Logger::new("example_publisher_async");
let mut msg = std_msgs::msg::String::new().unwrap();
for _ in 0..10 {
// Publish a message periodically.
msg.data.assign("Hello, World!");
pr_info!(logger, "Send (async): msg = {}", msg.data);
publisher.send(&msg).unwrap();
// Sleep 100[ms].
async_std::task::sleep(dur).await;
}
}
/// The subscriber
async fn run_subscriber(mut s: Subscriber<std_msgs::msg::String>) {
let dur = Duration::from_millis(500);
let logger = Logger::new("example_subscriber_async");
loop {
// receive a message specifying timeout of 500ms
match future::timeout(dur, s.recv()).await {
Ok(Ok(msg)) => {
// received a message
pr_info!(logger, "Received (async): msg = {}", msg.data);
}
Ok(Err(e)) => panic!("{}", e), // fatal error
Err(_) => {
// timeout
pr_warn!(logger, "Subscribe (async): timeout");
break;
}
}
}
}
Modules
Node
and Selector
.Context::create_node
.select
or epoll
.
This is used to single threaded execution.
For multi threaded execution, this is used internally.Macros
Structs
ST<T>
cannot be send to another thread and shared by multiple threads.Enums
Functions
true
.
If is_halt()
is true
, some functions to receive or wait returns error to halt the process.