use std::ffi::c_void;
use std::sync::atomic::{AtomicPtr, Ordering};
#[repr(C)]
pub struct TripleBufferCtrl {
ready: AtomicPtr<c_void>,
}
impl TripleBufferCtrl {
pub fn new() -> Self {
Self { ready: AtomicPtr::new(std::ptr::null_mut()) }
}
pub fn with_ptr(ptr: *mut c_void) -> Self {
Self { ready: AtomicPtr::new(ptr) }
}
}
impl Default for TripleBufferCtrl {
fn default() -> Self {
Self::new()
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_tb_ctrl_new() -> *mut TripleBufferCtrl {
Box::into_raw(Box::new(TripleBufferCtrl::new()))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_tb_ctrl_destroy(ctrl: *mut TripleBufferCtrl) {
if !ctrl.is_null() {
unsafe {
drop(Box::from_raw(ctrl));
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_tb_swap(
ctrl: *mut TripleBufferCtrl,
local: *mut c_void,
) -> *mut c_void {
unsafe { (*ctrl).ready.swap(local, Ordering::AcqRel) }
}
pub struct Subscriber<T> {
ctrl: *mut TripleBufferCtrl,
read_buf: *mut T,
}
unsafe impl<T: Send> Send for Subscriber<T> {}
unsafe impl<T: Send + Sync> Sync for Subscriber<T> {}
impl<T> Subscriber<T> {
pub unsafe fn new(ctrl: *mut TripleBufferCtrl) -> Self {
Self { ctrl, read_buf: std::ptr::null_mut() }
}
pub fn get_latest(&mut self) -> Option<&T> {
let next = unsafe { roplat_tb_swap(self.ctrl, self.read_buf as *mut c_void) };
self.read_buf = next as *mut T;
if self.read_buf.is_null() {
None
} else {
unsafe { Some(&*self.read_buf) }
}
}
}
impl<T> Drop for Subscriber<T> {
fn drop(&mut self) {
if !self.read_buf.is_null() {
unsafe {
drop(Box::from_raw(self.read_buf));
}
}
}
}
pub struct Publisher<T> {
ctrls: Vec<*mut TripleBufferCtrl>,
write_buf: *mut T,
}
unsafe impl<T: Send> Send for Publisher<T> {}
unsafe impl<T: Send + Sync> Sync for Publisher<T> {}
impl<T: Clone> Publisher<T> {
pub unsafe fn new(ctrls: Vec<*mut TripleBufferCtrl>) -> Self {
Self { ctrls, write_buf: std::ptr::null_mut() }
}
pub fn publish(&mut self, data: T) {
if self.write_buf.is_null() {
self.write_buf = Box::into_raw(Box::new(data));
} else {
unsafe {
std::ptr::write(self.write_buf, data);
}
}
unsafe {
for &ctrl in &self.ctrls {
let next = roplat_tb_swap(ctrl, self.write_buf as *mut c_void);
if next.is_null() {
self.write_buf = Box::into_raw(Box::new((*self.write_buf).clone()));
} else {
self.write_buf = next as *mut T;
}
}
}
}
}
impl<T> Drop for Publisher<T> {
fn drop(&mut self) {
if !self.write_buf.is_null() {
unsafe {
drop(Box::from_raw(self.write_buf));
}
}
}
}
pub fn create_triple_buffer<T: Clone>(
subscriber_count: usize,
) -> (Publisher<T>, Vec<Subscriber<T>>) {
let mut ctrls = Vec::with_capacity(subscriber_count);
let mut subscribers = Vec::with_capacity(subscriber_count);
for _ in 0..subscriber_count {
let ctrl = Box::into_raw(Box::new(TripleBufferCtrl::new()));
ctrls.push(ctrl);
subscribers.push(unsafe { Subscriber::new(ctrl) });
}
let publisher = unsafe { Publisher::new(ctrls) };
(publisher, subscribers)
}
pub struct TripleBufferChannel<T: Clone> {
ctrls: Vec<*mut TripleBufferCtrl>,
_marker: std::marker::PhantomData<T>,
}
unsafe impl<T: Clone + Send> Send for TripleBufferChannel<T> {}
unsafe impl<T: Clone + Send + Sync> Sync for TripleBufferChannel<T> {}
impl<T: Clone> TripleBufferChannel<T> {
pub fn new(subscriber_count: usize) -> Self {
let mut ctrls = Vec::with_capacity(subscriber_count);
for _ in 0..subscriber_count {
ctrls.push(Box::into_raw(Box::new(TripleBufferCtrl::new())));
}
Self { ctrls, _marker: std::marker::PhantomData }
}
pub fn publisher(&self) -> Publisher<T> {
unsafe { Publisher::new(self.ctrls.clone()) }
}
pub fn subscriber(&self, index: usize) -> Subscriber<T> {
assert!(index < self.ctrls.len(), "subscriber index out of bounds");
unsafe { Subscriber::new(self.ctrls[index]) }
}
}
impl<T: Clone> Drop for TripleBufferChannel<T> {
fn drop(&mut self) {
for &ctrl in &self.ctrls {
unsafe {
let ptr = (*ctrl).ready.load(Ordering::Acquire);
if !ptr.is_null() {
drop(Box::from_raw(ptr as *mut T));
}
drop(Box::from_raw(ctrl));
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_publish_subscribe() {
let (mut publisher, mut subscribers) = create_triple_buffer::<i32>(1);
let sub = &mut subscribers[0];
assert!(sub.get_latest().is_none());
publisher.publish(42);
assert_eq!(sub.get_latest(), Some(&42));
publisher.publish(100);
assert_eq!(sub.get_latest(), Some(&100));
}
#[test]
fn test_spmc_broadcast() {
let (mut publisher, mut subscribers) = create_triple_buffer::<String>(3);
publisher.publish("hello".to_string());
for sub in &mut subscribers {
assert_eq!(sub.get_latest().map(|s| s.as_str()), Some("hello"));
}
publisher.publish("world".to_string());
for sub in &mut subscribers {
assert_eq!(sub.get_latest().map(|s| s.as_str()), Some("world"));
}
}
#[test]
fn test_lazy_allocation() {
let (mut publisher, mut subscribers) = create_triple_buffer::<Vec<i32>>(2);
publisher.publish(vec![1, 2, 3]);
for sub in &mut subscribers {
assert_eq!(sub.get_latest(), Some(&vec![1, 2, 3]));
}
}
#[test]
fn test_repr_c_struct() {
#[derive(Clone, Debug, PartialEq)]
#[repr(C)]
struct SensorData {
x: f64,
y: f64,
z: f64,
}
let (mut publisher, mut subscribers) = create_triple_buffer::<SensorData>(1);
let sub = &mut subscribers[0];
publisher.publish(SensorData { x: 1.0, y: 2.0, z: 3.0 });
let data = sub.get_latest().unwrap();
assert_eq!(data.x, 1.0);
assert_eq!(data.y, 2.0);
assert_eq!(data.z, 3.0);
}
}