roplat 0.2.0

roplat: just a robot operation system
Documentation
//! 跨进程三缓冲语义:最新值覆盖
//!
//! 与 `comm::triple_buffer` 对应的 IPC 版本。写端始终广播最新一帧,读端 `get_latest`
//! 时从 inbox 抽干并保留最后一条,实现"只关心最新值"的 SPMC 语义。
//!
//! 与真正的共享内存三缓冲相比,当前 MVP 实现复用 TCP 广播链路,延迟与吞吐受限于
//! 字节搬运开销。真·零拷贝共享内存版本在 shm 后端中实现(见 TODO)。

use super::endpoint::{EndpointUri, Role};
use super::rendezvous::RendezvousDir;
use super::ring_buffer::{IpcOptions, create_ipc_ring_buffer_with_opts};
use super::transport::{IpcResult, IpcTransport, IpcTransportHandle};
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::sync::{Arc, Mutex};

/// 工厂函数返回的一对 IPC 三缓冲端点(仅目标角色端为 `Some`)。
pub type IpcTriplePair<T> = IpcResult<(Option<IpcTripleWriter<T>>, Option<IpcTripleReader<T>>)>;

/// IPC 三缓冲写端
pub struct IpcTripleWriter<T: Copy> {
    transport: IpcTransportHandle,
    _marker: PhantomData<T>,
}

impl<T: Copy> IpcTripleWriter<T> {
    /// 用给定 transport 构造写端。
    pub fn from_transport(transport: IpcTransportHandle) -> Self {
        Self { transport, _marker: PhantomData }
    }

    /// 发布最新一帧;永不阻塞
    pub fn publish(&self, value: &T) -> bool {
        let bytes = unsafe {
            std::slice::from_raw_parts(value as *const T as *const u8, std::mem::size_of::<T>())
        };
        self.transport.publish(bytes).is_ok()
    }

    /// 对端是否已连接。
    pub fn is_connected(&self) -> bool {
        self.transport.is_ready()
    }
}

/// IPC 三缓冲读端
///
/// `get_latest` 会抽干 inbox,仅保留最后一帧并返回。读端语义为"最新可见值",
/// 中间帧可能被丢弃 —— 这正是三缓冲的预期语义。
pub struct IpcTripleReader<T: Copy> {
    transport: IpcTransportHandle,
    /// 已见过的最新帧缓存。subscribe 侧多次 `get_latest` 返回同一份拷贝
    /// 直到后台线程又塞进来新数据
    latest: Mutex<Option<T>>,
    _marker: PhantomData<T>,
}

impl<T: Copy> IpcTripleReader<T> {
    /// 用给定 transport 构造读端。
    pub fn from_transport(transport: IpcTransportHandle) -> Self {
        Self { transport, latest: Mutex::new(None), _marker: PhantomData }
    }

    /// 抽干 inbox,保留最后一帧并返回当前可见的最新值
    ///
    /// 返回值的快照由内部 `Mutex<Option<T>>` 持有,读端调用后下一次 `get_latest`
    /// 仍会返回这份缓存(直到发布者又塞入新数据)—— 对齐 `Subscriber::get_latest` 语义。
    pub fn get_latest(&self) -> IpcResult<Option<T>> {
        // 抽干 inbox,只留最后一帧
        let mut last_bytes: Option<Vec<u8>> = None;
        while let Some(b) = self.transport.try_recv()? {
            last_bytes = Some(b);
        }

        if let Some(bytes) = last_bytes
            && bytes.len() == std::mem::size_of::<T>()
        {
            // SAFETY: T: Copy 允许按字节构造;size 已核对
            let mut v: MaybeUninit<T> = MaybeUninit::uninit();
            unsafe {
                std::ptr::copy_nonoverlapping(
                    bytes.as_ptr(),
                    v.as_mut_ptr() as *mut u8,
                    bytes.len(),
                );
                let value = v.assume_init();
                let mut guard = self.latest.lock().expect("latest poisoned");
                *guard = Some(value);
            }
            // 长度不符时静默丢弃(MVP 策略,与 ring_buffer 读端一致)
        }

        Ok(*self.latest.lock().expect("latest poisoned"))
    }

    /// 对端是否已连接。
    pub fn is_connected(&self) -> bool {
        self.transport.is_ready()
    }
}

/// 工厂函数:创建一对 IPC 三缓冲端点(默认 TCP 后端)
///
/// 底层复用 `create_ipc_ring_buffer_with_opts` 的 transport 栈,仅在语义层
/// 把"FIFO 队列 + 抽干到最新"包装成三缓冲读端。
pub fn create_ipc_triple_buffer<T: Copy>(
    uri: &EndpointUri,
    role: Role,
    rdv: RendezvousDir,
) -> IpcTriplePair<T> {
    create_ipc_triple_buffer_with_opts(uri, role, rdv, &IpcOptions::default())
}

/// 带选项版本
pub fn create_ipc_triple_buffer_with_opts<T: Copy>(
    uri: &EndpointUri,
    role: Role,
    rdv: RendezvousDir,
    opts: &IpcOptions,
) -> IpcTriplePair<T> {
    // 借用 ring_buffer 工厂建立 transport,然后只取 transport 句柄重新包装
    let (w, r) = create_ipc_ring_buffer_with_opts::<T>(uri, role, rdv, opts)?;
    match role {
        Role::Publisher => {
            let t: Arc<dyn IpcTransport> = w.expect("publisher writer").transport().clone();
            Ok((Some(IpcTripleWriter::from_transport(t)), None))
        }
        Role::Subscriber => {
            let t: Arc<dyn IpcTransport> = r.expect("subscriber reader").transport().clone();
            Ok((None, Some(IpcTripleReader::from_transport(t))))
        }
    }
}

#[cfg(test)]
mod tests {
    use super::super::backend::loopback::LoopbackTransport;
    use super::*;

    #[repr(C)]
    #[derive(Copy, Clone, PartialEq, Debug)]
    struct Pose {
        seq: u64,
        x: f32,
    }

    #[test]
    fn loopback_latest_wins() {
        let t: Arc<dyn IpcTransport> = Arc::new(LoopbackTransport::new());
        let w = IpcTripleWriter::<Pose>::from_transport(t.clone());
        let r = IpcTripleReader::<Pose>::from_transport(t);

        // 无数据时返回 None
        assert_eq!(r.get_latest().unwrap(), None);

        // 连发三帧,读端只应看到最后一帧
        assert!(w.publish(&Pose { seq: 1, x: 0.1 }));
        assert!(w.publish(&Pose { seq: 2, x: 0.2 }));
        assert!(w.publish(&Pose { seq: 3, x: 0.3 }));
        assert_eq!(r.get_latest().unwrap(), Some(Pose { seq: 3, x: 0.3 }));

        // 无新帧时仍返回缓存(三缓冲语义:最新值持久)
        assert_eq!(r.get_latest().unwrap(), Some(Pose { seq: 3, x: 0.3 }));

        // 再来一帧后切换
        assert!(w.publish(&Pose { seq: 4, x: 0.4 }));
        assert_eq!(r.get_latest().unwrap(), Some(Pose { seq: 4, x: 0.4 }));
    }
}