roplat 0.2.0

roplat: just a robot operation system
Documentation
//! IPC 环形队列:跨进程 FIFO 语义
//!
//! 与 `comm::ring_buffer` 对应的进程间版本。契约保持一致:
//! - 写端 `try_push(&T)`:非阻塞发布
//! - 读端 `try_pop()`:非阻塞接收 `Option<T>`
//!
//! 实现差异:
//! - 不共享内存,通过 `IpcTransport` 搬运序列化后的字节
//! - `T` 必须 `#[repr(C)] + Copy`,直接按字节搬运(与三缓冲、进程内 ring_buffer 一致)
//! - 容量语义由后端的缓冲策略决定;MVP 背靠 TCP 后端的内存队列,容量近似为无界
//!
//! 为未来扩展保留口子:
//! - 将来可引入 `Serializer` trait 允许 bincode / capnp 编码非 `repr(C)` 类型
//! - 共享内存后端可在 ctrl 块上实现真正的固定容量环

use super::endpoint::EndpointUri;
use super::rendezvous::RendezvousDir;
use super::transport::{IpcError, IpcResult, IpcTransport, IpcTransportHandle};
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::sync::Arc;
use std::time::{Duration, Instant};

/// 工厂函数返回的一对 IPC 环形端点(仅目标角色端为 `Some`)。
pub type IpcRingPair<T> = IpcResult<(Option<IpcRingWriter<T>>, Option<IpcRingReader<T>>)>;

/// 订阅侧连接选项
///
/// 封装「发布者晚启动」的普适情况:只需配置 `connect_timeout` 与 `retry_interval`,
/// 工厂函数会在内部循环消化 `IpcError::NotReady`,避免每个调用方都手写重试逻辑。
#[derive(Debug, Clone)]
pub struct ConnectOptions {
    /// 等待发布者就绪的总超时;`None` 表示无限等待
    pub connect_timeout: Option<Duration>,
    /// 两次 rendezvous 查询之间的间隔
    pub retry_interval: Duration,
}

impl ConnectOptions {
    /// 非阻塞(立即返回)
    pub const fn non_blocking() -> Self {
        Self {
            connect_timeout: Some(Duration::from_millis(0)),
            retry_interval: Duration::from_millis(0),
        }
    }

    /// 无限等待发布者就绪,间隔 500ms
    pub const fn wait_forever() -> Self {
        Self {
            connect_timeout: None,
            retry_interval: Duration::from_millis(500),
        }
    }

    /// 自定义超时与间隔
    pub const fn with_timeout(connect_timeout: Duration) -> Self {
        Self {
            connect_timeout: Some(connect_timeout),
            retry_interval: Duration::from_millis(500),
        }
    }
}

impl Default for ConnectOptions {
    /// 默认:30 秒超时、500ms 重试间隔(对齐 CI/本地常见场景)
    fn default() -> Self {
        Self::with_timeout(Duration::from_secs(30))
    }
}

/// TCP 后端发布侧可调选项
#[derive(Debug, Clone)]
pub struct TcpOptions {
    /// 每个订阅者 outbox 的高水位;`None` 表示无界
    pub high_watermark: Option<usize>,
    /// 超过水位时的处理策略
    pub overflow: OverflowPolicy,
}

impl Default for TcpOptions {
    fn default() -> Self {
        Self { high_watermark: None, overflow: OverflowPolicy::DropOldest }
    }
}

/// 背压策略:outbox 写入超过 `high_watermark` 时的行为
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OverflowPolicy {
    /// 丢弃队首最旧一帧,写入新帧(力推,默认)
    DropOldest,
    /// 丢弃当前新帧(try_push 返回 false)
    DropNewest,
    /// 返回错误(由 Transport 向上暴露)
    Error,
}

/// IPC 资源选项
#[derive(Debug, Clone, Default)]
pub struct IpcOptions {
    /// 订阅侧连接选项
    pub connect: ConnectOptions,
    /// TCP 后端选项(发布侧生效)
    pub tcp: TcpOptions,
}

/// IPC 环形队列写端
pub struct IpcRingWriter<T: Copy> {
    transport: IpcTransportHandle,
    _marker: PhantomData<T>,
}

impl<T: Copy> IpcRingWriter<T> {
    /// 从已有 transport 构造(测试 / 自定义后端用)
    pub fn from_transport(transport: IpcTransportHandle) -> Self {
        Self { transport, _marker: PhantomData }
    }

    /// 尝试发送一帧
    ///
    /// MVP:总是入队到后端 outbox;满的概念由后端定义(TCP 后端当前近似无界)。
    pub fn try_push(&self, value: &T) -> bool {
        let bytes = value_to_bytes(value);
        self.transport.publish(bytes).is_ok()
    }

    /// 对端是否已连接
    pub fn is_connected(&self) -> bool {
        self.transport.is_ready()
    }

    /// 底层 transport(诊断用)
    pub fn transport(&self) -> &IpcTransportHandle {
        &self.transport
    }
}

/// IPC 环形队列读端
pub struct IpcRingReader<T: Copy> {
    transport: IpcTransportHandle,
    _marker: PhantomData<T>,
}

impl<T: Copy> IpcRingReader<T> {
    /// 用给定 transport 构造读端。
    pub fn from_transport(transport: IpcTransportHandle) -> Self {
        Self { transport, _marker: PhantomData }
    }

    /// 非阻塞弹出一帧;无数据返回 None
    ///
    /// 若收到的字节长度与 `size_of::<T>()` 不符,视为协议错误并丢弃该帧返回 None。
    pub fn try_pop(&self) -> IpcResult<Option<T>> {
        match self.transport.try_recv()? {
            None => Ok(None),
            Some(bytes) => {
                if bytes.len() != std::mem::size_of::<T>() {
                    // 长度错配:发布侧 schema 偏差或被篡改;MVP 选择丢弃并返回 None
                    return Ok(None);
                }
                // SAFETY: T: Copy 保证按字节拷贝安全;size 已核对
                let mut v: MaybeUninit<T> = MaybeUninit::uninit();
                unsafe {
                    std::ptr::copy_nonoverlapping(
                        bytes.as_ptr(),
                        v.as_mut_ptr() as *mut u8,
                        bytes.len(),
                    );
                    Ok(Some(v.assume_init()))
                }
            }
        }
    }

    /// 对端是否已连接。
    pub fn is_connected(&self) -> bool {
        self.transport.is_ready()
    }

    /// 底层 transport(诊断用)。
    pub fn transport(&self) -> &IpcTransportHandle {
        &self.transport
    }
}

/// 工厂函数:创建一对 IPC 环形队列端点,使用 TCP 后端
///
/// 角色由调用方决定:
/// - `role = Role::Publisher` → 绑定本地端口、写 rendezvous、返回 writer
/// - `role = Role::Subscriber` → 读 rendezvous、连接发布者、返回 reader
///
/// 返回 `(Option<Writer>, Option<Reader>)`,仅目标角色的一端为 `Some`。
/// 这样两侧调用同一工厂、只根据 role 取各自端点,避免双端共用资源。
pub fn create_ipc_ring_buffer<T: Copy>(
    uri: &EndpointUri,
    role: super::endpoint::Role,
    rdv: RendezvousDir,
) -> IpcRingPair<T> {
    create_ipc_ring_buffer_with_opts(uri, role, rdv, &IpcOptions::default())
}

/// 带选项版工厂函数
///
/// 与 `create_ipc_ring_buffer` 相同,但:
/// - 订阅侧会按 `opts.connect` 自动消化 `IpcError::NotReady` 并重试
/// - 发布侧会把 `opts.tcp` 下发到 `TcpTransport`(背压水位)
pub fn create_ipc_ring_buffer_with_opts<T: Copy>(
    uri: &EndpointUri,
    role: super::endpoint::Role,
    rdv: RendezvousDir,
    opts: &IpcOptions,
) -> IpcRingPair<T> {
    use super::backend::TcpTransport;
    use super::endpoint::Role;

    match role {
        Role::Publisher => {
            let transport: Arc<dyn IpcTransport> = Arc::new(
                TcpTransport::bind_publisher_with_opts(uri, rdv, opts.tcp.clone())?,
            );
            Ok((Some(IpcRingWriter::from_transport(transport)), None))
        }
        Role::Subscriber => {
            let transport = connect_subscriber_retry::<TcpTransport>(uri, &rdv, &opts.connect)?;
            Ok((None, Some(IpcRingReader::from_transport(transport))))
        }
    }
}

fn connect_subscriber_retry<B: 'static + ConnectSubscriber>(
    uri: &EndpointUri,
    rdv: &RendezvousDir,
    opts: &ConnectOptions,
) -> IpcResult<IpcTransportHandle> {
    let deadline = opts.connect_timeout.map(|d| Instant::now() + d);
    loop {
        match B::connect_subscriber(uri, rdv) {
            Ok(t) => return Ok(Arc::new(t)),
            Err(e @ (IpcError::NotReady | IpcError::Io(_))) => {
                if let Some(dl) = deadline
                    && Instant::now() >= dl
                {
                    return Err(if matches!(e, IpcError::NotReady) {
                        IpcError::NotReady
                    } else {
                        e
                    });
                }
                std::thread::sleep(opts.retry_interval);
            }
            Err(e) => return Err(e),
        }
    }
}

/// 订阅端构造 trait,用于让重试工具与具体后端解耦
pub(crate) trait ConnectSubscriber: Sized + IpcTransport {
    fn connect_subscriber(uri: &EndpointUri, rdv: &RendezvousDir) -> IpcResult<Self>;
}

impl ConnectSubscriber for super::backend::TcpTransport {
    fn connect_subscriber(uri: &EndpointUri, rdv: &RendezvousDir) -> IpcResult<Self> {
        Self::connect_subscriber(uri, rdv)
    }
}

// ============================================================
// 内部工具
// ============================================================

#[inline]
fn value_to_bytes<T: Copy>(value: &T) -> &[u8] {
    // SAFETY: T: Copy 允许按字节读取;生命周期绑定到 value 的借用
    unsafe { std::slice::from_raw_parts(value as *const T as *const u8, std::mem::size_of::<T>()) }
}

#[cfg(test)]
mod tests {
    use super::super::backend::loopback::LoopbackTransport;
    use super::*;

    #[repr(C)]
    #[derive(Copy, Clone, PartialEq, Debug)]
    struct Pose {
        x: f32,
        y: f32,
        z: f32,
    }

    #[test]
    fn loopback_push_pop() {
        let t: Arc<dyn IpcTransport> = Arc::new(LoopbackTransport::new());
        let w = IpcRingWriter::<Pose>::from_transport(t.clone());
        let r = IpcRingReader::<Pose>::from_transport(t);
        let p = Pose { x: 1.0, y: 2.0, z: 3.0 };
        assert!(w.try_push(&p));
        assert_eq!(r.try_pop().unwrap(), Some(p));
        assert_eq!(r.try_pop().unwrap(), None);
    }
}