use roplat::comm::opaque::{OpaqueData, TypedOpaque};
use roplat::comm::triple_buffer::{TripleBufferChannel, create_triple_buffer};
use std::ffi::c_void;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
#[derive(Clone, Debug, PartialEq)]
#[repr(C)]
struct SensorData {
x: f64,
y: f64,
z: f64,
timestamp: u64,
}
#[derive(Clone, Debug, PartialEq)]
#[repr(C)]
struct MotorCommand {
velocity: f32,
torque: f32,
position: f64,
}
#[derive(Clone, Debug, PartialEq)]
#[repr(C)]
struct JointState {
positions: [f64; 6],
velocities: [f64; 6],
efforts: [f64; 6],
}
#[test]
fn test_transparent_sensor_data_communication() {
let (mut pub_sensor, mut subs) = create_triple_buffer::<SensorData>(1);
let sub = &mut subs[0];
assert!(sub.get_latest().is_none());
pub_sensor.publish(SensorData { x: 1.0, y: 2.0, z: 3.0, timestamp: 1000 });
let data = sub.get_latest().unwrap();
assert_eq!(data.x, 1.0);
assert_eq!(data.y, 2.0);
assert_eq!(data.z, 3.0);
assert_eq!(data.timestamp, 1000);
}
#[test]
fn test_transparent_motor_command_communication() {
let (mut publisher, mut subs) = create_triple_buffer::<MotorCommand>(1);
let sub = &mut subs[0];
publisher.publish(MotorCommand { velocity: std::f32::consts::PI, torque: 1.5, position: 90.0 });
let cmd = sub.get_latest().unwrap();
assert_eq!(cmd.velocity, std::f32::consts::PI);
assert_eq!(cmd.torque, 1.5);
assert_eq!(cmd.position, 90.0);
}
#[test]
fn test_transparent_array_type_communication() {
let (mut publisher, mut subs) = create_triple_buffer::<JointState>(1);
let sub = &mut subs[0];
let state = JointState {
positions: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0],
velocities: [0.1, 0.2, 0.3, 0.4, 0.5, 0.6],
efforts: [10.0, 20.0, 30.0, 40.0, 50.0, 60.0],
};
publisher.publish(state.clone());
let received = sub.get_latest().unwrap();
assert_eq!(received, &state);
}
#[test]
fn test_spmc_broadcast_to_multiple_subscribers() {
let (mut publisher, mut subs) = create_triple_buffer::<SensorData>(5);
let data = SensorData { x: 42.0, y: -1.0, z: 0.5, timestamp: 999 };
publisher.publish(data.clone());
for (i, sub) in subs.iter_mut().enumerate() {
let received = sub
.get_latest()
.unwrap_or_else(|| panic!("Subscriber {} should have data", i));
assert_eq!(received, &data, "Subscriber {} received wrong data", i);
}
}
#[test]
fn test_spmc_subscribers_see_latest_update() {
let (mut publisher, mut subs) = create_triple_buffer::<i32>(3);
publisher.publish(1);
publisher.publish(2);
publisher.publish(3);
for sub in &mut subs {
let val = sub.get_latest().unwrap();
assert!(*val >= 1 && *val <= 3, "Value {} out of range", *val);
}
}
#[test]
fn test_lazy_allocation_no_default_constructor() {
#[derive(Clone, Debug, PartialEq)]
struct NoDefault {
data: Vec<u8>,
label: String,
}
let (mut publisher, mut subs) = create_triple_buffer::<NoDefault>(2);
publisher.publish(NoDefault { data: vec![1, 2, 3, 4, 5], label: "test".to_string() });
for sub in &mut subs {
let val = sub.get_latest().unwrap();
assert_eq!(val.data, vec![1, 2, 3, 4, 5]);
assert_eq!(val.label, "test");
}
}
#[test]
fn test_lazy_allocation_convergence() {
let (mut publisher, mut subs) = create_triple_buffer::<u64>(2);
for i in 0..100 {
publisher.publish(i);
}
for sub in &mut subs {
let val = sub.get_latest().unwrap();
assert!(*val >= 90, "Value {} should be near 99", *val);
}
}
#[test]
fn test_channel_lifecycle() {
let channel = TripleBufferChannel::<SensorData>::new(2);
let mut publisher = channel.publisher();
let mut sub0 = channel.subscriber(0);
let mut sub1 = channel.subscriber(1);
publisher.publish(SensorData { x: 10.0, y: 20.0, z: 30.0, timestamp: 42 });
assert_eq!(sub0.get_latest().unwrap().x, 10.0);
assert_eq!(sub1.get_latest().unwrap().x, 10.0);
}
#[test]
fn test_multithreaded_publish_subscribe() {
use std::sync::Arc;
let (mut publisher, mut subs) = create_triple_buffer::<u64>(1);
let mut sub = subs.pop().unwrap();
let done = Arc::new(AtomicBool::new(false));
let done_clone = done.clone();
let producer = thread::spawn(move || {
for i in 0..1000u64 {
publisher.publish(i);
}
done_clone.store(true, Ordering::SeqCst);
});
let consumer = thread::spawn(move || {
let mut max_seen: Option<u64> = None;
let mut any_value: Option<u64> = None;
loop {
if let Some(&val) = sub.get_latest() {
assert!(val < 1000, "value out of range: {}", val);
any_value = Some(val);
max_seen = Some(max_seen.map_or(val, |m| m.max(val)));
}
if done.load(Ordering::SeqCst) {
if let Some(&val) = sub.get_latest() {
assert!(val < 1000);
any_value = Some(val);
max_seen = Some(max_seen.map_or(val, |m| m.max(val)));
}
break;
}
}
(any_value, max_seen)
});
producer.join().unwrap();
let (final_val, max_val) = consumer.join().unwrap();
assert!(final_val.is_some(), "Consumer should have seen some values");
assert!(
max_val.unwrap_or(0) >= 900,
"Consumer should observe values close to producer's tail, got max={:?}",
max_val
);
}
#[test]
fn test_opaque_data_create_and_destroy() {
static DESTROYED: AtomicBool = AtomicBool::new(false);
unsafe extern "C" fn destroy_data(ptr: *mut c_void) {
unsafe {
let _ = Box::from_raw(ptr as *mut String);
}
DESTROYED.store(true, Ordering::SeqCst);
}
{
let data = Box::into_raw(Box::new("hello opaque".to_string()));
let opaque = unsafe {
OpaqueData::new(
data as *mut c_void,
destroy_data,
std::mem::size_of::<String>(),
"String",
)
};
assert_eq!(opaque.type_id(), "String");
assert!(!opaque.is_null());
unsafe {
let s = opaque.cast_ref::<String>();
assert_eq!(s, "hello opaque");
}
}
assert!(DESTROYED.load(Ordering::SeqCst));
}
#[test]
fn test_typed_opaque_marker() {
struct CppNode;
struct PyNode;
unsafe extern "C" fn destroy_i32(ptr: *mut c_void) {
unsafe {
let _ = Box::from_raw(ptr as *mut i32);
}
}
let cpp_data = Box::into_raw(Box::new(42i32));
let cpp_opaque =
unsafe { OpaqueData::new(cpp_data as *mut c_void, destroy_i32, 4, "cpp_node_data") };
let typed_cpp = TypedOpaque::<CppNode>::from_opaque(cpp_opaque);
let py_data = Box::into_raw(Box::new(100i32));
let py_opaque =
unsafe { OpaqueData::new(py_data as *mut c_void, destroy_i32, 4, "py_node_data") };
let typed_py = TypedOpaque::<PyNode>::from_opaque(py_opaque);
assert_eq!(typed_cpp.inner().type_id(), "cpp_node_data");
assert_eq!(typed_py.inner().type_id(), "py_node_data");
}
#[test]
fn test_opaque_ffi_functions() {
static FFI_DESTROYED: AtomicBool = AtomicBool::new(false);
unsafe extern "C" fn ffi_destroy(ptr: *mut c_void) {
unsafe {
let _ = Box::from_raw(ptr as *mut [f64; 3]);
}
FFI_DESTROYED.store(true, Ordering::SeqCst);
}
let value = Box::into_raw(Box::new([1.0f64, 2.0, 3.0]));
let type_id = std::ffi::CString::new("f64_array_3").unwrap();
unsafe {
use roplat::comm::opaque::{
roplat_opaque_create, roplat_opaque_destroy, roplat_opaque_get_ptr,
roplat_opaque_get_size,
};
let opaque = roplat_opaque_create(
value as *mut c_void,
ffi_destroy,
std::mem::size_of::<[f64; 3]>(),
type_id.as_ptr(),
);
assert!(!opaque.is_null());
assert_eq!(roplat_opaque_get_size(opaque), 24);
let ptr = roplat_opaque_get_ptr(opaque);
let arr = &*(ptr as *const [f64; 3]);
assert_eq!(arr, &[1.0, 2.0, 3.0]);
roplat_opaque_destroy(opaque);
}
assert!(FFI_DESTROYED.load(Ordering::SeqCst));
}
#[roplat::roplat_msg]
#[repr(C)]
#[derive(Clone, Debug, PartialEq)]
pub struct TestMsg {
pub x: f64,
pub y: f64,
pub z: f64,
pub id: u32,
pub active: bool,
}
#[test]
fn test_roplat_msg_c_header_generation() {
let header = TestMsg::C_HEADER;
assert!(header.contains("#ifndef ROPLAT_MSG_TESTMSG_H"));
assert!(header.contains("#define ROPLAT_MSG_TESTMSG_H"));
assert!(header.contains("typedef struct TestMsg"));
assert!(header.contains("double x;"));
assert!(header.contains("double y;"));
assert!(header.contains("double z;"));
assert!(header.contains("uint32_t id;"));
assert!(header.contains("bool active;"));
assert!(header.contains("} TestMsg;"));
assert!(header.contains("#include <stdint.h>"));
assert!(header.contains("extern \"C\""));
}
#[test]
fn test_roplat_msg_python_stub_generation() {
let stub = TestMsg::PYTHON_STUB;
assert!(stub.contains("import ctypes"));
assert!(stub.contains("class TestMsg(ctypes.Structure):"));
assert!(stub.contains("_fields_ = ["));
assert!(stub.contains("(\"x\", ctypes.c_double)"));
assert!(stub.contains("(\"y\", ctypes.c_double)"));
assert!(stub.contains("(\"z\", ctypes.c_double)"));
assert!(stub.contains("(\"id\", ctypes.c_uint32)"));
assert!(stub.contains("(\"active\", ctypes.c_bool)"));
}
#[roplat::roplat_msg]
#[repr(C)]
#[derive(Clone, Debug, PartialEq)]
pub struct ArrayMsg {
pub data: [f32; 4],
pub count: u32,
}
#[test]
fn test_roplat_msg_array_field() {
let header = ArrayMsg::C_HEADER;
assert!(header.contains("float data[4];"));
assert!(header.contains("uint32_t count;"));
let stub = ArrayMsg::PYTHON_STUB;
assert!(stub.contains("(\"data\", ctypes.c_float * 4)"));
assert!(stub.contains("(\"count\", ctypes.c_uint32)"));
}
#[test]
fn test_roplat_msg_type_with_triple_buffer() {
let (mut publisher, mut subs) = create_triple_buffer::<TestMsg>(2);
let msg = TestMsg { x: 1.5, y: 2.5, z: 3.5, id: 42, active: true };
publisher.publish(msg.clone());
for sub in &mut subs {
let received = sub.get_latest().unwrap();
assert_eq!(received, &msg);
}
let msg2 = TestMsg { x: 10.0, y: 20.0, z: 30.0, id: 43, active: false };
publisher.publish(msg2.clone());
for sub in &mut subs {
let received = sub.get_latest().unwrap();
assert_eq!(received, &msg2);
}
}
#[derive(Clone, Debug)]
struct OpaqueWrapper {
type_id: &'static str,
data: Vec<u8>,
}
#[test]
fn test_opaque_type_intra_language_comm() {
let (mut publisher, mut subs) = create_triple_buffer::<OpaqueWrapper>(1);
let sub = &mut subs[0];
let wrapper = OpaqueWrapper { type_id: "CppSensorNode", data: vec![0xDE, 0xAD, 0xBE, 0xEF] };
publisher.publish(wrapper.clone());
let received = sub.get_latest().unwrap();
assert_eq!(received.type_id, "CppSensorNode");
assert_eq!(received.data, vec![0xDE, 0xAD, 0xBE, 0xEF]);
}
use roplat::comm::ring_buffer::{RingBufferChannel, create_ring_buffer};
#[test]
fn test_ring_buffer_transparent_sensor_data() {
let (mut writer, mut reader) = create_ring_buffer::<SensorData>(4);
let s1 = SensorData { x: 1.0, y: 2.0, z: 3.0, timestamp: 100 };
let s2 = SensorData { x: 4.0, y: 5.0, z: 6.0, timestamp: 200 };
assert!(writer.try_push(&s1));
assert!(writer.try_push(&s2));
let r1 = reader.try_pop().unwrap();
assert_eq!(r1.x, 1.0);
assert_eq!(r1.timestamp, 100);
let r2 = reader.try_pop().unwrap();
assert_eq!(r2.x, 4.0);
assert_eq!(r2.timestamp, 200);
assert!(reader.try_pop().is_none());
}
#[test]
fn test_ring_buffer_fifo_ordering() {
let (mut writer, mut reader) = create_ring_buffer::<MotorCommand>(8);
for i in 0..8 {
writer.try_push(&MotorCommand { velocity: i as f32, torque: 0.0, position: 0.0 });
}
for i in 0..8 {
let cmd = reader.try_pop().unwrap();
assert_eq!(cmd.velocity, i as f32, "FIFO ordering broken at index {i}");
}
}
#[test]
fn test_ring_buffer_full_behavior() {
let (mut writer, mut reader) = create_ring_buffer::<u64>(3);
assert!(writer.try_push(&1));
assert!(writer.try_push(&2));
assert!(writer.try_push(&3));
assert!(!writer.try_push(&4));
assert_eq!(reader.try_pop(), Some(1));
assert!(writer.try_push(&4)); }
#[test]
fn test_ring_buffer_force_push_drops_oldest() {
let (mut writer, mut reader) = create_ring_buffer::<i32>(2);
writer.force_push(&10);
writer.force_push(&20);
writer.force_push(&30);
assert_eq!(reader.try_pop(), Some(20));
assert_eq!(reader.try_pop(), Some(30));
assert!(reader.try_pop().is_none());
}
#[test]
fn test_ring_buffer_channel_lifecycle() {
let channel = RingBufferChannel::<SensorData>::new(4);
assert_eq!(channel.capacity(), 4);
let mut writer = channel.writer();
let mut reader = channel.reader();
let data = SensorData { x: 9.0, y: 8.0, z: 7.0, timestamp: 999 };
writer.try_push(&data);
let got = reader.try_pop().unwrap();
assert_eq!(got.z, 7.0);
}
#[test]
fn test_ring_buffer_spsc_multithreaded() {
let (mut writer, mut reader) = create_ring_buffer::<u64>(128);
let count = 50_000u64;
let producer = thread::spawn(move || {
for i in 0..count {
while !writer.try_push(&i) {
std::hint::spin_loop();
}
}
});
let consumer = thread::spawn(move || {
let mut received = Vec::with_capacity(count as usize);
while received.len() < count as usize {
if let Some(v) = reader.try_pop() {
received.push(v);
} else {
std::hint::spin_loop();
}
}
received
});
producer.join().unwrap();
let received = consumer.join().unwrap();
assert_eq!(received.len(), count as usize);
for (i, &v) in received.iter().enumerate() {
assert_eq!(v, i as u64, "FIFO violated at position {i}");
}
}