roplat 0.2.0

roplat: just a robot operation system
Documentation
//! Rendezvous 目录:基于文件的端点发现
//!
//! 发布者启动时在 `$ROPLAT_RUNTIME_DIR/ipc/<namespace>/<name>.rdv` 写入一个 JSON
//! 描述符;订阅者扫描同路径、验证 PID 存活后连接对应 `address`。
//!
//! 零外部依赖(不引入 mDNS / ZeroMQ),适配嵌入式与离线场景。

use super::endpoint::{EndpointUri, SchemaId};
use super::transport::{IpcError, IpcResult};
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};

/// 默认运行时目录
///
/// - 环境变量 `ROPLAT_RUNTIME_DIR` 优先
/// - Unix: `/tmp/roplat`
/// - Windows: `%TEMP%/roplat`
pub fn default_runtime_dir() -> PathBuf {
    if let Ok(dir) = std::env::var("ROPLAT_RUNTIME_DIR") {
        return PathBuf::from(dir);
    }
    let base = std::env::temp_dir();
    base.join("roplat")
}

/// Rendezvous 文件内容(JSON 序列化)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RendezvousDescriptor {
    /// 端点 URI 原文
    pub uri: String,
    /// 命名空间(冗余存储,便于 grep)
    pub namespace: String,
    /// 端点名
    pub name: String,
    /// 消息 schema 指纹
    pub schema_id: SchemaId,
    /// 消息版本
    pub msg_version: u16,
    /// 后端类型字符串,如 "tcp" / "uds" / "shm"
    pub transport: String,
    /// 后端地址,格式由后端自决
    /// - tcp: "127.0.0.1:54123"
    /// - uds: "/tmp/roplat/ipc/default/sensor.sock"
    pub address: String,
    /// 发布者进程 PID,订阅者扫描时用来清理僵尸文件
    pub publisher_pid: u32,
    /// 创建 unix 时间戳(秒)
    pub created_at: u64,
}

impl RendezvousDescriptor {
    /// Builds a descriptor from endpoint URI and backend data.
    pub fn new(
        uri: &EndpointUri,
        transport: impl Into<String>,
        address: impl Into<String>,
    ) -> Self {
        let created_at = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map(|d| d.as_secs())
            .unwrap_or(0);
        Self {
            uri: uri.to_string(),
            namespace: uri.namespace.clone(),
            name: uri.name.clone(),
            schema_id: uri.schema_id.clone(),
            msg_version: uri.msg_version,
            transport: transport.into(),
            address: address.into(),
            publisher_pid: std::process::id(),
            created_at,
        }
    }

    fn to_json(&self) -> IpcResult<String> {
        serde_json::to_string_pretty(self).map_err(|e| IpcError::Serde(e.to_string()))
    }

    fn from_json(s: &str) -> IpcResult<Self> {
        serde_json::from_str(s).map_err(|e| IpcError::Serde(e.to_string()))
    }
}

/// Rendezvous 目录管理器
pub struct RendezvousDir {
    root: PathBuf,
}

impl RendezvousDir {
    /// 使用默认运行时目录
    ///
    /// 根路径为 `default_runtime_dir()/ipc`。
    pub fn new_default() -> Self {
        Self { root: default_runtime_dir().join("ipc") }
    }

    /// 使用自定义根路径
    pub fn new(root: impl Into<PathBuf>) -> Self {
        Self { root: root.into() }
    }

    /// Returns the rendezvous root directory.
    pub fn root(&self) -> &Path {
        &self.root
    }

    fn path_for(&self, uri: &EndpointUri) -> PathBuf {
        self.root
            .join(&uri.namespace)
            .join(format!("{}.rdv", uri.name))
    }

    /// 写入描述符(发布者调用)
    pub fn publish(&self, desc: &RendezvousDescriptor) -> IpcResult<PathBuf> {
        let uri = EndpointUri::parse(&desc.uri)?;
        let path = self.path_for(&uri);
        if let Some(parent) = path.parent() {
            fs::create_dir_all(parent)?;
        }
        fs::write(&path, desc.to_json()?)?;
        Ok(path)
    }

    /// 读取描述符(订阅者调用)
    ///
    /// 若描述符文件不存在返回 `IpcError::NotReady`,上层可轮询重试。
    pub fn lookup(&self, uri: &EndpointUri) -> IpcResult<RendezvousDescriptor> {
        let path = self.path_for(uri);
        let s = match fs::read_to_string(&path) {
            Ok(s) => s,
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Err(IpcError::NotReady),
            Err(e) => return Err(e.into()),
        };
        let desc = RendezvousDescriptor::from_json(&s)?;

        // schema / version 校验
        if desc.schema_id != uri.schema_id {
            return Err(IpcError::SchemaMismatch {
                expected: uri.schema_id.to_string(),
                actual: desc.schema_id.to_string(),
            });
        }
        if desc.msg_version != uri.msg_version {
            return Err(IpcError::Protocol(format!(
                "msg_version mismatch: expected {}, got {}",
                uri.msg_version, desc.msg_version
            )));
        }

        // 僵尸清理:PID 不存在时删除描述符并报 NotReady
        if !is_pid_alive(desc.publisher_pid) {
            let _ = fs::remove_file(&path);
            return Err(IpcError::NotReady);
        }

        Ok(desc)
    }

    /// 删除描述符(发布者退出时调用)
    pub fn withdraw(&self, uri: &EndpointUri) -> IpcResult<()> {
        let path = self.path_for(uri);
        match fs::remove_file(&path) {
            Ok(()) => Ok(()),
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
            Err(e) => Err(e.into()),
        }
    }

    /// 扫描整个 rendezvous 目录,列出所有描述符及其 PID 存活标记
    ///
    /// 不校验 schema / version(CLI 诊断路径希望看到错配情况),也不主动清理僵尸。
    /// 返回 `(描述符, 是否存活)` 列表;若目录不存在返回空。
    pub fn scan_all(&self) -> IpcResult<Vec<(RendezvousDescriptor, bool)>> {
        let mut out = Vec::new();
        if !self.root.exists() {
            return Ok(out);
        }
        scan_dir_rec(&self.root, &mut out)?;
        Ok(out)
    }
}

fn scan_dir_rec(dir: &Path, out: &mut Vec<(RendezvousDescriptor, bool)>) -> IpcResult<()> {
    for entry in fs::read_dir(dir)? {
        let entry = entry?;
        let path = entry.path();
        if path.is_dir() {
            scan_dir_rec(&path, out)?;
        } else if path.extension().and_then(|s| s.to_str()) == Some("rdv") {
            let s = match fs::read_to_string(&path) {
                Ok(s) => s,
                Err(_) => continue,
            };
            let desc = match RendezvousDescriptor::from_json(&s) {
                Ok(d) => d,
                Err(_) => continue,
            };
            let alive = is_pid_alive(desc.publisher_pid);
            out.push((desc, alive));
        }
    }
    Ok(())
}

/// 公开的 PID 存活检查(供 CLI 诊断使用)
pub fn check_pid_alive(pid: u32) -> bool {
    is_pid_alive(pid)
}

// ============================================================
// PID 存活检查(跨平台,best-effort)
// ============================================================

#[cfg(target_os = "linux")]
fn is_pid_alive(pid: u32) -> bool {
    // 零依赖:检查 /proc/<pid> 是否存在
    Path::new(&format!("/proc/{pid}")).exists()
}

#[cfg(all(unix, not(target_os = "linux")))]
fn is_pid_alive(_pid: u32) -> bool {
    // macOS / BSD:MVP 保守返回 true,依赖后端连接失败兜底
    true
}

#[cfg(windows)]
fn is_pid_alive(pid: u32) -> bool {
    use std::ffi::c_void;
    #[link(name = "kernel32")]
    unsafe extern "system" {
        fn OpenProcess(desired_access: u32, inherit: i32, pid: u32) -> *mut c_void;
        fn CloseHandle(h: *mut c_void) -> i32;
    }
    const PROCESS_QUERY_LIMITED_INFORMATION: u32 = 0x1000;
    // SAFETY: OpenProcess 接受任意 PID;返回 NULL 表示不存在或无权限
    unsafe {
        let h = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
        if h.is_null() {
            false
        } else {
            CloseHandle(h);
            true
        }
    }
}

#[cfg(not(any(unix, windows)))]
fn is_pid_alive(_pid: u32) -> bool {
    true
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn publish_and_lookup_roundtrip() {
        let tmp = std::env::temp_dir().join(format!("roplat-rdv-test-{}", std::process::id()));
        let dir = RendezvousDir::new(&tmp);
        let uri = EndpointUri::parse("roplat-ipc://test/foo?msg=abc&v=1").unwrap();
        let desc = RendezvousDescriptor::new(&uri, "tcp", "127.0.0.1:0");
        dir.publish(&desc).unwrap();
        let got = dir.lookup(&uri).unwrap();
        assert_eq!(got.address, "127.0.0.1:0");
        dir.withdraw(&uri).unwrap();
        let _ = fs::remove_dir_all(&tmp);
    }

    #[test]
    fn lookup_missing() {
        let tmp = std::env::temp_dir().join(format!("roplat-rdv-test2-{}", std::process::id()));
        let dir = RendezvousDir::new(&tmp);
        let uri = EndpointUri::parse("roplat-ipc://test/nope?msg=abc&v=1").unwrap();
        match dir.lookup(&uri) {
            Err(IpcError::NotReady) => {}
            other => panic!("expected NotReady, got {other:?}"),
        }
        let _ = fs::remove_dir_all(&tmp);
    }
}