roplat 0.2.0

roplat: just a robot operation system
Documentation
//! 三缓冲状态更新通讯内核
//!
//! 采用原子指针交换实现无锁的 SPMC(单写多读)状态更新。
//! 核心思想:内存写入/读取不是原子的,但指针交换是原子的。
//! 写入时先写入缓冲区,再原子交换 ready 指针,保证读取到完整数据。
//!
//! 缓冲区数量:N+2(1 Writer + 1 Ready + N Reader)
//! 空间开辟是惰性的,因为消息不一定有默认构造函数。

use std::ffi::c_void;
use std::sync::atomic::{AtomicPtr, Ordering};

/// 三缓冲控制块,管理 ready 槽位的原子指针
#[repr(C)]
pub struct TripleBufferCtrl {
    ready: AtomicPtr<c_void>,
}

impl TripleBufferCtrl {
    /// 创建一个新的控制块,ready 槽位初始为空
    pub fn new() -> Self {
        Self { ready: AtomicPtr::new(std::ptr::null_mut()) }
    }

    /// 创建一个带初始 ready 指针的控制块
    pub fn with_ptr(ptr: *mut c_void) -> Self {
        Self { ready: AtomicPtr::new(ptr) }
    }
}

impl Default for TripleBufferCtrl {
    fn default() -> Self {
        Self::new()
    }
}

/// C FFI:原子交换 ready 指针和 local 指针
///
/// # Safety
/// - `ctrl` 必须是有效的 `TripleBufferCtrl` 指针
/// - 调用者负责确保交换回的指针的生命周期管理
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_tb_ctrl_new() -> *mut TripleBufferCtrl {
    Box::into_raw(Box::new(TripleBufferCtrl::new()))
}

/// C FFI:释放控制块
///
/// # Safety
/// - `ctrl` 必须是 `roplat_tb_ctrl_new` 返回的指针
/// - 调用后 `ctrl` 不可再使用
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_tb_ctrl_destroy(ctrl: *mut TripleBufferCtrl) {
    if !ctrl.is_null() {
        unsafe {
            drop(Box::from_raw(ctrl));
        }
    }
}

/// C FFI:原子交换 ready 指针
///
/// # Safety
/// - `ctrl` 必须指向有效的 `TripleBufferCtrl`
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_tb_swap(
    ctrl: *mut TripleBufferCtrl,
    local: *mut c_void,
) -> *mut c_void {
    unsafe { (*ctrl).ready.swap(local, Ordering::AcqRel) }
}

/// Rust 端的泛型订阅者
pub struct Subscriber<T> {
    ctrl: *mut TripleBufferCtrl,
    read_buf: *mut T,
}

// Safety: TripleBufferCtrl 内部使用原子操作,Subscriber 持有独占的 read_buf
unsafe impl<T: Send> Send for Subscriber<T> {}
unsafe impl<T: Send + Sync> Sync for Subscriber<T> {}

impl<T> Subscriber<T> {
    /// 从控制块创建订阅者,初始 read_buf 为空
    ///
    /// # Safety
    /// - `ctrl` 必须在 Subscriber 生命周期内有效
    pub unsafe fn new(ctrl: *mut TripleBufferCtrl) -> Self {
        Self { ctrl, read_buf: std::ptr::null_mut() }
    }

    /// 获取最新数据。如果Publisher尚未发布过数据,返回 None
    pub fn get_latest(&mut self) -> Option<&T> {
        let next = unsafe { roplat_tb_swap(self.ctrl, self.read_buf as *mut c_void) };
        self.read_buf = next as *mut T;

        if self.read_buf.is_null() {
            None
        } else {
            unsafe { Some(&*self.read_buf) }
        }
    }
}

impl<T> Drop for Subscriber<T> {
    fn drop(&mut self) {
        if !self.read_buf.is_null() {
            unsafe {
                drop(Box::from_raw(self.read_buf));
            }
        }
    }
}

/// Rust 端的泛型发布者(支持 SPMC 广播)
pub struct Publisher<T> {
    ctrls: Vec<*mut TripleBufferCtrl>,
    write_buf: *mut T,
}

// Safety: Publisher 通过原子操作与 Subscriber 通信
unsafe impl<T: Send> Send for Publisher<T> {}
unsafe impl<T: Send + Sync> Sync for Publisher<T> {}

impl<T: Clone> Publisher<T> {
    /// 创建发布者,绑定到一组控制块
    ///
    /// # Safety
    /// - 所有 `ctrls` 中的指针在 Publisher 生命周期内必须有效
    pub unsafe fn new(ctrls: Vec<*mut TripleBufferCtrl>) -> Self {
        Self { ctrls, write_buf: std::ptr::null_mut() }
    }

    /// 发布数据到所有订阅者
    pub fn publish(&mut self, data: T) {
        // 1. 准备写缓冲区
        if self.write_buf.is_null() {
            self.write_buf = Box::into_raw(Box::new(data));
        } else {
            unsafe {
                std::ptr::write(self.write_buf, data);
            }
        }

        // 2. 广播交换到所有订阅者
        unsafe {
            for &ctrl in &self.ctrls {
                let next = roplat_tb_swap(ctrl, self.write_buf as *mut c_void);
                if next.is_null() {
                    // 惰性扩张:ready 槽位是空的,需要新建缓冲区
                    // 仅发生在冷启动的前几次调用
                    self.write_buf = Box::into_raw(Box::new((*self.write_buf).clone()));
                } else {
                    self.write_buf = next as *mut T;
                }
            }
        }
    }
}

impl<T> Drop for Publisher<T> {
    fn drop(&mut self) {
        if !self.write_buf.is_null() {
            unsafe {
                drop(Box::from_raw(self.write_buf));
            }
        }
    }
}

/// 创建一个 1-Publisher N-Subscriber 的三缓冲通道
///
/// # Safety
/// 返回的 Publisher 和 Subscriber 通过共享的 TripleBufferCtrl 连接。
/// 控制块由此函数分配,在 Publisher 和所有 Subscriber 都 drop 后需要确保被释放。
pub fn create_triple_buffer<T: Clone>(
    subscriber_count: usize,
) -> (Publisher<T>, Vec<Subscriber<T>>) {
    let mut ctrls = Vec::with_capacity(subscriber_count);
    let mut subscribers = Vec::with_capacity(subscriber_count);

    for _ in 0..subscriber_count {
        let ctrl = Box::into_raw(Box::new(TripleBufferCtrl::new()));
        ctrls.push(ctrl);
        subscribers.push(unsafe { Subscriber::new(ctrl) });
    }

    let publisher = unsafe { Publisher::new(ctrls) };
    (publisher, subscribers)
}

/// 带生命周期管理的三缓冲通道
/// 持有所有控制块的所有权,确保正确释放
pub struct TripleBufferChannel<T: Clone> {
    ctrls: Vec<*mut TripleBufferCtrl>,
    _marker: std::marker::PhantomData<T>,
}

unsafe impl<T: Clone + Send> Send for TripleBufferChannel<T> {}
unsafe impl<T: Clone + Send + Sync> Sync for TripleBufferChannel<T> {}

impl<T: Clone> TripleBufferChannel<T> {
    /// 创建带所有权管理的三缓冲通道。
    pub fn new(subscriber_count: usize) -> Self {
        let mut ctrls = Vec::with_capacity(subscriber_count);
        for _ in 0..subscriber_count {
            ctrls.push(Box::into_raw(Box::new(TripleBufferCtrl::new())));
        }
        Self { ctrls, _marker: std::marker::PhantomData }
    }

    /// 创建发布端句柄。
    pub fn publisher(&self) -> Publisher<T> {
        unsafe { Publisher::new(self.ctrls.clone()) }
    }

    /// 按索引创建订阅端句柄。
    pub fn subscriber(&self, index: usize) -> Subscriber<T> {
        assert!(index < self.ctrls.len(), "subscriber index out of bounds");
        unsafe { Subscriber::new(self.ctrls[index]) }
    }
}

impl<T: Clone> Drop for TripleBufferChannel<T> {
    fn drop(&mut self) {
        for &ctrl in &self.ctrls {
            unsafe {
                // 释放 ready 槽位中残留的数据
                let ptr = (*ctrl).ready.load(Ordering::Acquire);
                if !ptr.is_null() {
                    drop(Box::from_raw(ptr as *mut T));
                }
                drop(Box::from_raw(ctrl));
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_basic_publish_subscribe() {
        let (mut publisher, mut subscribers) = create_triple_buffer::<i32>(1);
        let sub = &mut subscribers[0];

        // 初始无数据
        assert!(sub.get_latest().is_none());

        // 发布数据
        publisher.publish(42);
        assert_eq!(sub.get_latest(), Some(&42));

        // 更新数据
        publisher.publish(100);
        assert_eq!(sub.get_latest(), Some(&100));
    }

    #[test]
    fn test_spmc_broadcast() {
        let (mut publisher, mut subscribers) = create_triple_buffer::<String>(3);

        publisher.publish("hello".to_string());

        for sub in &mut subscribers {
            assert_eq!(sub.get_latest().map(|s| s.as_str()), Some("hello"));
        }

        publisher.publish("world".to_string());

        for sub in &mut subscribers {
            assert_eq!(sub.get_latest().map(|s| s.as_str()), Some("world"));
        }
    }

    #[test]
    fn test_lazy_allocation() {
        let (mut publisher, mut subscribers) = create_triple_buffer::<Vec<i32>>(2);

        // 第一次发布触发惰性分配
        publisher.publish(vec![1, 2, 3]);

        for sub in &mut subscribers {
            assert_eq!(sub.get_latest(), Some(&vec![1, 2, 3]));
        }
    }

    #[test]
    fn test_repr_c_struct() {
        #[derive(Clone, Debug, PartialEq)]
        #[repr(C)]
        struct SensorData {
            x: f64,
            y: f64,
            z: f64,
        }

        let (mut publisher, mut subscribers) = create_triple_buffer::<SensorData>(1);
        let sub = &mut subscribers[0];

        publisher.publish(SensorData { x: 1.0, y: 2.0, z: 3.0 });

        let data = sub.get_latest().unwrap();
        assert_eq!(data.x, 1.0);
        assert_eq!(data.y, 2.0);
        assert_eq!(data.z, 3.0);
    }
}