1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
//! Functionality for working with messages whose type is not statically known.
//!
//! This is useful for writing generic tools such as introspection tools, bridges to
//! other communication systems, or nodes that manipulate messages à la `topic_tools`.
//!
//! The central type of this module is [`DynamicMessage`].
use std::{
collections::HashMap,
fmt::{self, Display},
ops::Deref,
path::PathBuf,
sync::{Arc, Mutex, OnceLock},
};
use rosidl_runtime_rs::RmwMessage;
use crate::rcl_bindings::{
rosidl_typesupport_introspection_c__MessageMembers_s as rosidl_message_members_t, *,
};
mod dynamic_publisher;
mod dynamic_subscription;
mod error;
mod field_access;
mod message_structure;
pub use dynamic_publisher::*;
pub use dynamic_subscription::*;
pub use error::*;
pub use field_access::*;
pub use message_structure::*;
/// A struct to cache loaded shared libraries for dynamic messages, indexing them by name.
#[derive(Default)]
pub struct DynamicMessageLibraryCache(HashMap<String, Arc<libloading::Library>>);
impl DynamicMessageLibraryCache {
/// Get a reference to the library for the specific `package_name`. Attempt to load and store
/// it in the cache if it is not currently loaded.
pub fn get_or_load(
&mut self,
package_name: &str,
) -> Result<Arc<libloading::Library>, DynamicMessageError> {
use std::collections::hash_map::Entry;
let lib = match self.0.entry(package_name.into()) {
Entry::Occupied(entry) => entry.get().clone(),
Entry::Vacant(entry) => entry
.insert(get_type_support_library(
package_name,
INTROSPECTION_TYPE_SUPPORT_IDENTIFIER,
)?)
.clone(),
};
Ok(lib)
}
/// Remove a package_name from the cache. Return `true` if it was removed, `false` otherwise
///
/// This function can be used to reduce memory footprint if the message library is not used
/// anymore.
/// Note that since shared libraries are wrapped by an `Arc` this does _not_ unload the library
/// until all other structures that reference it ([`DynamicMessage`] or
/// [`DynamicMessageMetadata`]) are also dropped.
pub fn unload(&mut self, package_name: &str) -> bool {
self.0.remove(package_name).is_some()
}
}
/// A global cache for loaded message packages.
///
/// Since creating a new dynamic message requires loading a shared library from the file system, by
/// caching loaded libraries we can reduce the overhead for preloaded libraries to
/// just a [`Arc::clone`].
pub fn get_dynamic_message_package_cache() -> &'static Mutex<DynamicMessageLibraryCache> {
static DYNAMIC_MESSAGE_PACKAGE_CACHE: OnceLock<Mutex<DynamicMessageLibraryCache>> =
OnceLock::new();
DYNAMIC_MESSAGE_PACKAGE_CACHE.get_or_init(|| Default::default())
}
/// A parsed/validated message type name of the form `<package_name>/msg/<type_name>`.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct MessageTypeName {
/// The package name, which acts as a namespace.
pub package_name: String,
/// The name of the message type in the package.
pub type_name: String,
}
/// A runtime representation of the message "class".
///
/// This is not an instance of a message itself, but it
/// can be used as a factory to create message instances.
#[derive(Clone)]
pub struct DynamicMessageMetadata {
message_type: MessageTypeName,
// The library needs to be kept loaded in order to keep the type_support_ptr valid.
// This is the introspection type support library, not the regular one.
#[allow(dead_code)]
introspection_type_support_library: Arc<libloading::Library>,
type_support_ptr: *const rosidl_message_type_support_t,
structure: MessageStructure,
fini_function: unsafe extern "C" fn(*mut std::os::raw::c_void),
}
/// A message whose type is not known at compile-time.
///
/// This type allows inspecting the structure of the message as well as the
/// values contained in it.
/// It also allows _modifying_ the values, but not the structure, because
/// even a dynamic message must always correspond to a specific message type.
// There is no clone function yet, we need to add that in rosidl.
pub struct DynamicMessage {
metadata: DynamicMessageMetadata,
// This is aligned to the maximum possible alignment of a message (8)
// by the use of a special allocation function.
storage: Box<[u8]>,
// This type allows moving the message contents out into another message,
// in which case the drop impl is not responsible for calling fini anymore
needs_fini: bool,
}
/// This is an analogue of rclcpp::get_typesupport_library.
fn get_type_support_library(
package_name: &str,
type_support_identifier: &str,
) -> Result<Arc<libloading::Library>, DynamicMessageError> {
use DynamicMessageError::RequiredPrefixNotSourced;
// Creating this is pretty cheap, it just parses an env var
let ament = ament_rs::Ament::new().map_err(|_| RequiredPrefixNotSourced {
package: package_name.to_owned(),
})?;
let prefix = PathBuf::from(ament.find_package(package_name).ok_or(
RequiredPrefixNotSourced {
package: package_name.to_owned(),
},
)?);
#[cfg(target_os = "windows")]
let library_path = prefix.join("bin").join(format!(
"{}__{}.dll",
&package_name, type_support_identifier
));
#[cfg(target_os = "macos")]
let library_path = prefix.join("lib").join(format!(
"lib{}__{}.dylib",
&package_name, type_support_identifier
));
#[cfg(all(not(target_os = "windows"), not(target_os = "macos")))]
let library_path = prefix.join("lib").join(format!(
"lib{}__{}.so",
&package_name, type_support_identifier
));
Ok({
// SAFETY: This function is unsafe because it may execute initialization/termination routines
// contained in the library. A type support library should not cause problems there.
let lib = unsafe { libloading::Library::new(library_path) };
let lib = lib.map_err(DynamicMessageError::LibraryLoadingError)?;
Arc::new(lib)
})
}
/// This is an analogue of rclcpp::get_typesupport_handle.
///
/// It is unsafe because it would be theoretically possible to pass in a library that has
/// the expected symbol defined, but with an unexpected type.
unsafe fn get_type_support_handle(
type_support_library: &libloading::Library,
type_support_identifier: &str,
message_type: &MessageTypeName,
) -> Result<*const rosidl_message_type_support_t, DynamicMessageError> {
let symbol_name = format!(
"{}__get_message_type_support_handle__{}__msg__{}",
type_support_identifier, &message_type.package_name, &message_type.type_name
);
// SAFETY: We know that the symbol has this type, from the safety requirement of this function.
let getter: libloading::Symbol<unsafe extern "C" fn() -> *const rosidl_message_type_support_t> = /* unsafe */ {
type_support_library
.get(symbol_name.as_bytes())
.map_err(|_| DynamicMessageError::InvalidMessageType)?
};
// SAFETY: The caller is responsible for keeping the library loaded while
// using this pointer.
let type_support_ptr = /* unsafe */ { getter() };
Ok(type_support_ptr)
}
const INTROSPECTION_TYPE_SUPPORT_IDENTIFIER: &str = "rosidl_typesupport_introspection_c";
// ========================= impl for MessageTypeName =========================
impl TryFrom<&str> for MessageTypeName {
type Error = DynamicMessageError;
fn try_from(full_message_type: &str) -> Result<Self, Self::Error> {
let mut parts = full_message_type.split('/');
use DynamicMessageError::InvalidMessageTypeSyntax;
let package_name = parts
.next()
.ok_or(InvalidMessageTypeSyntax {
input: full_message_type.to_owned(),
})?
.to_owned();
if Some("msg") != parts.next() {
return Err(InvalidMessageTypeSyntax {
input: full_message_type.to_owned(),
});
};
let type_name = parts
.next()
.ok_or(InvalidMessageTypeSyntax {
input: full_message_type.to_owned(),
})?
.to_owned();
if parts.next().is_some() {
return Err(InvalidMessageTypeSyntax {
input: full_message_type.to_owned(),
});
}
Ok(Self {
package_name,
type_name,
})
}
}
impl Display for MessageTypeName {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}/msg/{}", &self.package_name, &self.type_name)
}
}
// ========================= impl for DynamicMessageMetadata =========================
impl Deref for DynamicMessageMetadata {
type Target = MessageStructure;
fn deref(&self) -> &Self::Target {
&self.structure
}
}
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
// they are running in. Therefore, this type can be safely sent to another thread.
unsafe impl Send for DynamicMessageMetadata {}
// SAFETY: The type_support_ptr member is the one that makes this type not implement Sync
// automatically, but it is not used for interior mutability.
unsafe impl Sync for DynamicMessageMetadata {}
impl DynamicMessageMetadata {
/// Loads the metadata for the given message type.
///
/// See [`DynamicMessage::new()`] for the expected format of the `full_message_type`.
pub fn new(message_type: MessageTypeName) -> Result<Self, DynamicMessageError> {
// SAFETY: The symbol type of the type support getter function can be trusted
// assuming the install dir hasn't been tampered with.
// The pointer returned by this function is kept valid by keeping the library loaded.
let library = get_dynamic_message_package_cache()
.lock()
.unwrap()
.get_or_load(&message_type.package_name)?;
let type_support_ptr = unsafe {
get_type_support_handle(
&*library,
INTROSPECTION_TYPE_SUPPORT_IDENTIFIER,
&message_type,
)?
};
// SAFETY: The pointer returned by get_type_support_handle() is always valid.
let type_support = unsafe { &*type_support_ptr };
debug_assert!(!type_support.data.is_null());
let message_members: &rosidl_message_members_t =
// SAFETY: The data pointer is supposed to be always valid.
unsafe { &*(type_support.data as *const rosidl_message_members_t) };
// SAFETY: The message members coming from a type support library will always be valid.
let structure = unsafe { MessageStructure::from_rosidl_message_members(message_members) };
// The fini function will always exist.
let fini_function = message_members.fini_function.unwrap();
let metadata = DynamicMessageMetadata {
message_type,
introspection_type_support_library: library,
type_support_ptr,
structure,
fini_function,
};
Ok(metadata)
}
/// Instantiates a new message.
pub fn create(&self) -> Result<DynamicMessage, DynamicMessageError> {
// Get an aligned boxed slice. This is inspired by the maligned crate.
use std::alloc::Layout;
// As mentioned in the struct definition, the maximum alignment required is 8.
let layout = Layout::from_size_align(self.structure.size, 8).unwrap();
let mut storage = unsafe {
assert_ne!(self.structure.size, 0);
// SAFETY: The layout has non-zero size.
let ptr = std::alloc::alloc_zeroed(layout);
// SAFETY: This is valid, memory in ptr has appropriate size and is initialized
let slice = std::slice::from_raw_parts_mut(ptr, self.structure.size);
// The mutable reference decays into a (fat) *mut [u8]
Box::from_raw(slice)
};
// SAFETY: The pointer returned by get_type_support_handle() is always valid.
let type_support = unsafe { &*self.type_support_ptr };
let message_members: &rosidl_message_members_t =
// SAFETY: The data pointer is supposed to be always valid.
unsafe { &*(type_support.data as *const rosidl_message_members_t) };
// SAFETY: The init function is passed zeroed memory of the correct alignment.
unsafe {
(message_members.init_function.unwrap())(
storage.as_mut_ptr() as _,
rosidl_runtime_c__message_initialization::ROSIDL_RUNTIME_C_MSG_INIT_ALL,
);
};
let dyn_msg = DynamicMessage {
metadata: self.clone(),
storage,
needs_fini: true,
};
Ok(dyn_msg)
}
/// Returns a description of the message structure.
pub fn structure(&self) -> &MessageStructure {
&self.structure
}
}
// ========================= impl for DynamicMessage =========================
impl Deref for DynamicMessage {
type Target = MessageStructure;
fn deref(&self) -> &Self::Target {
&self.metadata.structure
}
}
impl Drop for DynamicMessage {
fn drop(&mut self) {
if self.needs_fini {
// SAFETY: The fini_function expects to be passed a pointer to the message
unsafe { (self.metadata.fini_function)(self.storage.as_mut_ptr() as _) }
}
}
}
impl PartialEq for DynamicMessage {
fn eq(&self, other: &Self) -> bool {
self.metadata.type_support_ptr == other.metadata.type_support_ptr
&& self.storage == other.storage
}
}
impl Eq for DynamicMessage {}
impl DynamicMessage {
/// Dynamically loads a type support library for the specified type and creates a message instance.
///
/// The full message type is of the form `<package>/msg/<type_name>`, e.g.
/// `std_msgs/msg/String`.
///
/// The message instance will contain the default values of the message type.
///
/// This method might not be the most efficient because of the library loading. If you need to
/// instantiate multiple messages of the same type consider using
/// [`crate::dynamic_message::DynamicMessageMetadata::create`]
pub fn new(message_type: MessageTypeName) -> Result<Self, DynamicMessageError> {
DynamicMessageMetadata::new(message_type)?.create()
}
/// See [`DynamicMessageView::get()`][1].
///
/// [1]: crate::dynamic_message::DynamicMessageView::get
pub fn get(&self, field_name: &str) -> Option<Value<'_>> {
let field_info = self.metadata.structure.get_field_info(field_name)?;
// For the unwrap_or, see DynamicMessageViewMut::get_mut
let size = field_info.size().unwrap_or(1);
let bytes = &self.storage[field_info.offset..field_info.offset + size];
// SAFETY: The bytes contain a valid field of the type recorded in field_info.
unsafe { Value::new(bytes, field_info) }
}
/// See [`DynamicMessageViewMut::get_mut()`][1].
///
/// [1]: crate::dynamic_message::DynamicMessageViewMut::get_mut
pub fn get_mut(&mut self, field_name: &str) -> Option<ValueMut<'_>> {
let field_info = self.metadata.structure.get_field_info(field_name)?;
// For the unwrap_or, see DynamicMessageViewMut::get_mut
let size = field_info.size().unwrap_or(1);
let bytes = &mut self.storage[field_info.offset..field_info.offset + size];
// SAFETY: The bytes contain a valid field of the type recorded in field_info.
Some(unsafe { ValueMut::new(bytes, field_info) })
}
/// Returns a description of the message structure.
pub fn structure(&self) -> &MessageStructure {
&self.metadata.structure
}
/// Iterate over all fields in declaration order.
pub fn iter(&self) -> impl Iterator<Item = (&str, Value<'_>)> + '_ {
self.metadata.structure.fields.iter().map(|field_info| {
let value = self.get(&field_info.name).unwrap();
(field_info.name.as_str(), value)
})
}
/// Iterate over all fields in declaration order (mutable version).
pub fn iter_mut(&mut self) -> impl Iterator<Item = (&str, ValueMut<'_>)> + '_ {
self.view_mut().iter_mut()
}
/// Returns a view object of this message.
///
/// The purpose for this conversion is to allow uniform handling of this top-level message
/// and nested messages contained in it through a [`DynamicMessageView`].
pub fn view(&self) -> DynamicMessageView<'_> {
DynamicMessageView {
structure: &self.metadata.structure,
storage: &self.storage,
}
}
/// Returns a mutable view object of this message.
///
/// The purpose for this conversion is to allow uniform handling of this top-level message
/// and nested messages contained in it through a [`DynamicMessageViewMut`].
pub fn view_mut(&mut self) -> DynamicMessageViewMut<'_> {
DynamicMessageViewMut {
structure: &self.metadata.structure,
storage: &mut self.storage,
}
}
/// Converts a statically typed RMW-native message into a `DynamicMessage`.
pub fn convert_from_rmw_message<T>(mut msg: T) -> Result<Self, DynamicMessageError>
where
T: RmwMessage,
{
let message_type = MessageTypeName::try_from(<T as RmwMessage>::TYPE_NAME)?;
let mut dyn_msg = Self::new(message_type)?;
let align = std::mem::align_of::<T>();
assert_eq!(dyn_msg.storage.as_ptr().align_offset(align), 0);
{
// SAFETY: This transmutes the slice of bytes into a &mut T. This is fine, since
// under the hood it *is* a T.
// However, the resulting value is not seen as borrowing from dyn_msg by the borrow checker,
// so we are careful to not create a second mutable reference before dropping this one,
// since that would be UB.
let dyn_msg_transmuted = unsafe { &mut *(dyn_msg.storage.as_mut_ptr() as *mut T) };
// We cannot simply overwrite one message with the other, or we will get a memory leak/double-free.
// Swapping is the solution.
std::mem::swap(&mut msg, dyn_msg_transmuted);
}
Ok(dyn_msg)
}
/// Converts a `DynamicMessage` into a statically typed RMW-native message.
///
/// If the RMW-native message type does not match the underlying message type of this `DynamicMessage`,
/// it is not converted but instead returned unchanged.
pub fn convert_into_rmw_message<T>(mut self) -> Result<T, Self>
where
T: RmwMessage,
{
if <T as RmwMessage>::TYPE_NAME == self.metadata.message_type.to_string() {
// SAFETY: Even though a zero-initialized message might not match RMW expectations for
// what a message should look like, it is safe to temporarily have a zero-initialized
// value, i.e. it is not undefined behavior to do this since it's a C struct, and an
// all-zeroes bit pattern is always a valid instance of any C struct.
let mut dest = unsafe { std::mem::zeroed::<T>() };
let dest_ptr = &mut dest as *mut T as *mut u8;
// This reinterprets the struct as a slice of bytes.
// The bytes copied into the dest slice are a valid value of T, as ensured by comparison
// of the type support pointers.
let dest_slice =
unsafe { std::slice::from_raw_parts_mut(dest_ptr, std::mem::size_of::<T>()) };
// This creates a shallow copy, with ownership of the "deep" (or inner) parts moving
// into the destination.
dest_slice.copy_from_slice(&self.storage);
// Don't run the fini function on the src data anymore, because the inner parts would be
// double-freed by dst and src.
self.needs_fini = false;
Ok(dest)
} else {
Err(self)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn traits() {
use crate::test_helpers::*;
assert_send::<DynamicMessageMetadata>();
assert_sync::<DynamicMessageMetadata>();
assert_send::<DynamicMessage>();
assert_sync::<DynamicMessage>();
}
#[test]
fn invalid_message_type_name() {
assert!(matches!(
MessageTypeName::try_from("x"),
Err(DynamicMessageError::InvalidMessageTypeSyntax { .. })
));
assert!(matches!(
MessageTypeName::try_from("x/y"),
Err(DynamicMessageError::InvalidMessageTypeSyntax { .. })
));
assert!(matches!(
MessageTypeName::try_from("x//y"),
Err(DynamicMessageError::InvalidMessageTypeSyntax { .. })
));
assert!(matches!(
MessageTypeName::try_from("x/msg/y/z"),
Err(DynamicMessageError::InvalidMessageTypeSyntax { .. })
));
// This is valid, but not found in the prefix
let message_type = MessageTypeName::try_from("x/msg/y").unwrap();
assert!(matches!(
DynamicMessage::new(message_type),
Err(DynamicMessageError::RequiredPrefixNotSourced { .. })
));
}
#[test]
fn message_getters_setters() {
let message_type = MessageTypeName {
package_name: "test_msgs".to_owned(),
type_name: "BasicTypes".to_owned(),
};
let mut message = DynamicMessage::new(message_type).unwrap();
{
// Access non existing values
assert!(message.get("invalid_value").is_none());
}
{
// Get then set a sample value
let value = message.get_mut("int32_value").unwrap();
let ValueMut::Simple(value) = value else {
panic!("Unexpected value type, expected Simple value");
};
let SimpleValueMut::Int32(value) = value else {
panic!("Unexpected value type, expected Int32");
};
assert_eq!(*value, 0);
*value = 42;
}
{
// Read previously set value
let value = message.get("int32_value").unwrap();
let Value::Simple(value) = value else {
panic!("Unexpected value type, expected Simple value");
};
let SimpleValue::Int32(value) = value else {
panic!("Unexpected value type, expected Int32");
};
assert_eq!(*value, 42);
}
}
#[test]
fn message_package_cache() {
let package_name = "test_msgs";
// Create a weak reference to avoid increasing reference count
let mut cache = DynamicMessageLibraryCache::default();
let lib = Arc::downgrade(&cache.get_or_load(package_name).unwrap());
{
// Mock a user of the library (i.e. message)
let _mock = lib.upgrade().unwrap();
assert!(cache.unload(package_name));
assert!(!cache.unload("non_existing_package"));
// The library should _still_ be loaded since the message holds a reference
assert!(lib.upgrade().is_some());
}
// Now the library should be unloaded and the reference should not be upgradeable
assert!(lib.upgrade().is_none());
}
}