roplat 0.1.0

roplat: just a robot operation system
Documentation
use futures::future::BoxFuture;
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::RwLock;

// ==========================================
// 核心抽象 (Core Abstractions)
// ==========================================

/// 分发特征:定义资源具体的执行逻辑
/// 通常由 #[roplat::interface] 宏自动生成实现
#[async_trait::async_trait]
pub trait Dispatch<Cmd> {
    async fn dispatch(&mut self, cmd: Cmd);
}

/// 抽象的一次性回复句柄
/// 用于将 "回复通道的发送端" 塞入 Cmd 枚举中发送给远端
pub trait ReplyHandle<T>: Send {
    fn send(self: Box<Self>, val: T);
}

// 为 tokio::sync::oneshot::Sender 实现 ReplyHandle
impl<T: Send> ReplyHandle<T> for tokio::sync::oneshot::Sender<T> {
    fn send(self: Box<Self>, val: T) {
        let _ = (*self).send(val);
    }
}

/// 后端特征:定义“指令如何送达”
/// 改为 async trait,移除原本复杂的 ReturnFactory 泛型,利用 Future
#[async_trait::async_trait]
pub trait Backend<T: ?Sized, C>: Send + Sync {
    /// Fire-and-Forget (无返回值)
    /// 适用于单向通知,如 Sensor 数据发布
    async fn send(&self, cmd: C);

    /// Request-Response (有返回值)
    /// R: 返回值类型
    /// FLocal:  当后端是 Local 时,如何直接操作 &mut T
    /// FRemote: 当后端是 Remote/Arrow 时,如何将 ReplyHandle 打包进 Cmd
    async fn request<R, FLocal, FRemote>(&self, f_local: FLocal, f_remote: FRemote) -> R
    where
        R: Send + 'static,
        FLocal: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, R> + Send,
        FRemote: FnOnce(Box<dyn ReplyHandle<R>>) -> C + Send;
}

// ==========================================
// 1. Local Backend (进程内/协程间通讯)
// ==========================================

/// Local call backend
/// 修正:引入 RwLock 提供内部可变性和线程安全
#[derive(Clone)]
pub struct Local<T: ?Sized> {
    // 使用 Tokio 的读写锁,支持在 async 环境下挂起而不是阻塞线程
    inner: Arc<RwLock<T>>,
}

impl<T: ?Sized> Local<T> {
    pub fn new(inner: Arc<RwLock<T>>) -> Self {
        Self { inner }
    }
}

#[async_trait::async_trait]
impl<T, C> Backend<T, C> for Local<T>
where
    T: ?Sized + Dispatch<C> + Send + Sync,
    C: Send + 'static,
{
    #[inline]
    async fn send(&self, cmd: C) {
        // 获取写锁,执行 dispatch
        let mut guard = self.inner.write().await;
        guard.dispatch(cmd).await;
    }

    #[inline]
    async fn request<R, FLocal, FRemote>(&self, f_local: FLocal, _f_remote: FRemote) -> R
    where
        R: Send + 'static,
        FLocal: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, R> + Send,
        FRemote: FnOnce(Box<dyn ReplyHandle<R>>) -> C + Send,
    {
        // 获取写锁,直接调用闭包操作数据
        // 这里实现了“零拷贝”,因为直接操作了内存引用
        let mut guard = self.inner.write().await;
        f_local(&mut *guard).await
    }
}

// ==========================================
// 2. Remote Backend (跨进程/网络通讯)
// ==========================================

/// 传输层抽象
pub trait Transport<C>: Send + Sync + Clone {
    fn send_msg(&self, cmd: C);
}

#[derive(Clone)]
pub struct Remote<Tr, C> {
    transport: Tr,
    _marker: PhantomData<C>,
}

#[async_trait::async_trait]
impl<T, C, Tr> Backend<T, C> for Remote<Tr, C>
where
    T: ?Sized + Sync,
    C: Send + Sync + 'static,
    Tr: Transport<C>,
{
    #[inline]
    async fn send(&self, cmd: C) {
        self.transport.send_msg(cmd);
    }

    async fn request<R, FLocal, FRemote>(&self, _f_local: FLocal, f_remote: FRemote) -> R
    where
        R: Send + 'static,
        FLocal: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, R> + Send,
        FRemote: FnOnce(Box<dyn ReplyHandle<R>>) -> C + Send,
    {
        // 1. 创建异步通道
        let (tx, rx) = tokio::sync::oneshot::channel();

        // 2. 将发送端 tx 封装进 Cmd
        // 用户侧宏生成的代码负责把这个 Box<ReplyHandle> 放到 Enum 里的 reply_to 字段
        let cmd = f_remote(Box::new(tx));

        // 3. 发送消息
        self.transport.send_msg(cmd);

        // 4. 异步等待回传,不阻塞线程
        rx.await.expect("Remote channel closed unexpectedly")
    }
}

// ==========================================
// 3. Arrow Backend (共享内存/跨语言大数据)
// ==========================================

/// Arrow Backend 专用传输层
/// 它不仅发送控制指令,还管理着一块共享内存区域 (Shm)
pub trait ArrowTransport<C>: Transport<C> {
    /// 获取共享内存的元数据或指针
    fn get_shm_handle(&self) -> String; // 示例:返回 Shared Memory ID
}

/// Arrow Backend
/// 专门用于处理基于 Apache Arrow 的数据交互
/// 场景:Python 节点写入 Arrow 共享内存,Rust 节点通过此 Backend 读取,无需序列化
#[derive(Clone)]
pub struct ArrowShm<Tr, C> {
    transport: Tr,
    _marker: PhantomData<C>,
}

#[async_trait::async_trait]
impl<T, C, Tr> Backend<T, C> for ArrowShm<Tr, C>
where
    T: ?Sized + Sync,
    C: Send + Sync + 'static, // Cmd 中通常包含 Shm 的 Offset 或 Metadata
    Tr: ArrowTransport<C>,
{
    #[inline]
    async fn send(&self, cmd: C) {
        // Arrow 后端的 Send 通常意味着:
        // "我更新了共享内存区域 X,新的 Offset 是 Y,请查收"
        self.transport.send_msg(cmd);
    }

    async fn request<R, FLocal, FRemote>(&self, _f_local: FLocal, f_remote: FRemote) -> R
    where
        R: Send + 'static,
        FLocal: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, R> + Send,
        FRemote: FnOnce(Box<dyn ReplyHandle<R>>) -> C + Send,
    {
        // Arrow 的 request 逻辑与 Remote 类似:
        // 1. 建立回传通道
        let (tx, rx) = tokio::sync::oneshot::channel();

        // 2. 打包指令 (包含 ReplyHandle)
        // 注意:这里的 Cmd 可能会携带 "请把结果写入到某个 Arrow Buffer" 的指示
        let cmd = f_remote(Box::new(tx));

        // 3. 发送
        self.transport.send_msg(cmd);

        // 4. 等待
        // 返回的 R 可能是 Arrow Array 的引用或者是包含 Shm 指针的结构体
        rx.await.expect("Arrow IPC channel closed")
    }
}