use super::endpoint::{EndpointUri, SchemaId};
use super::transport::{IpcError, IpcResult};
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
pub fn default_runtime_dir() -> PathBuf {
if let Ok(dir) = std::env::var("ROPLAT_RUNTIME_DIR") {
return PathBuf::from(dir);
}
let base = std::env::temp_dir();
base.join("roplat")
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RendezvousDescriptor {
pub uri: String,
pub namespace: String,
pub name: String,
pub schema_id: SchemaId,
pub msg_version: u16,
pub transport: String,
pub address: String,
pub publisher_pid: u32,
pub created_at: u64,
}
impl RendezvousDescriptor {
pub fn new(
uri: &EndpointUri,
transport: impl Into<String>,
address: impl Into<String>,
) -> Self {
let created_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
Self {
uri: uri.to_string(),
namespace: uri.namespace.clone(),
name: uri.name.clone(),
schema_id: uri.schema_id.clone(),
msg_version: uri.msg_version,
transport: transport.into(),
address: address.into(),
publisher_pid: std::process::id(),
created_at,
}
}
fn to_json(&self) -> IpcResult<String> {
serde_json::to_string_pretty(self).map_err(|e| IpcError::Serde(e.to_string()))
}
fn from_json(s: &str) -> IpcResult<Self> {
serde_json::from_str(s).map_err(|e| IpcError::Serde(e.to_string()))
}
}
pub struct RendezvousDir {
root: PathBuf,
}
impl RendezvousDir {
pub fn new_default() -> Self {
Self { root: default_runtime_dir().join("ipc") }
}
pub fn new(root: impl Into<PathBuf>) -> Self {
Self { root: root.into() }
}
pub fn root(&self) -> &Path {
&self.root
}
fn path_for(&self, uri: &EndpointUri) -> PathBuf {
self.root
.join(&uri.namespace)
.join(format!("{}.rdv", uri.name))
}
pub fn publish(&self, desc: &RendezvousDescriptor) -> IpcResult<PathBuf> {
let uri = EndpointUri::parse(&desc.uri)?;
let path = self.path_for(&uri);
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
fs::write(&path, desc.to_json()?)?;
Ok(path)
}
pub fn lookup(&self, uri: &EndpointUri) -> IpcResult<RendezvousDescriptor> {
let path = self.path_for(uri);
let s = match fs::read_to_string(&path) {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Err(IpcError::NotReady),
Err(e) => return Err(e.into()),
};
let desc = RendezvousDescriptor::from_json(&s)?;
if desc.schema_id != uri.schema_id {
return Err(IpcError::SchemaMismatch {
expected: uri.schema_id.to_string(),
actual: desc.schema_id.to_string(),
});
}
if desc.msg_version != uri.msg_version {
return Err(IpcError::Protocol(format!(
"msg_version mismatch: expected {}, got {}",
uri.msg_version, desc.msg_version
)));
}
if !is_pid_alive(desc.publisher_pid) {
let _ = fs::remove_file(&path);
return Err(IpcError::NotReady);
}
Ok(desc)
}
pub fn withdraw(&self, uri: &EndpointUri) -> IpcResult<()> {
let path = self.path_for(uri);
match fs::remove_file(&path) {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(e.into()),
}
}
pub fn scan_all(&self) -> IpcResult<Vec<(RendezvousDescriptor, bool)>> {
let mut out = Vec::new();
if !self.root.exists() {
return Ok(out);
}
scan_dir_rec(&self.root, &mut out)?;
Ok(out)
}
}
fn scan_dir_rec(dir: &Path, out: &mut Vec<(RendezvousDescriptor, bool)>) -> IpcResult<()> {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
scan_dir_rec(&path, out)?;
} else if path.extension().and_then(|s| s.to_str()) == Some("rdv") {
let s = match fs::read_to_string(&path) {
Ok(s) => s,
Err(_) => continue,
};
let desc = match RendezvousDescriptor::from_json(&s) {
Ok(d) => d,
Err(_) => continue,
};
let alive = is_pid_alive(desc.publisher_pid);
out.push((desc, alive));
}
}
Ok(())
}
pub fn check_pid_alive(pid: u32) -> bool {
is_pid_alive(pid)
}
#[cfg(target_os = "linux")]
fn is_pid_alive(pid: u32) -> bool {
Path::new(&format!("/proc/{pid}")).exists()
}
#[cfg(all(unix, not(target_os = "linux")))]
fn is_pid_alive(_pid: u32) -> bool {
true
}
#[cfg(windows)]
fn is_pid_alive(pid: u32) -> bool {
use std::ffi::c_void;
#[link(name = "kernel32")]
unsafe extern "system" {
fn OpenProcess(desired_access: u32, inherit: i32, pid: u32) -> *mut c_void;
fn CloseHandle(h: *mut c_void) -> i32;
}
const PROCESS_QUERY_LIMITED_INFORMATION: u32 = 0x1000;
unsafe {
let h = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
if h.is_null() {
false
} else {
CloseHandle(h);
true
}
}
}
#[cfg(not(any(unix, windows)))]
fn is_pid_alive(_pid: u32) -> bool {
true
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn publish_and_lookup_roundtrip() {
let tmp = std::env::temp_dir().join(format!("roplat-rdv-test-{}", std::process::id()));
let dir = RendezvousDir::new(&tmp);
let uri = EndpointUri::parse("roplat-ipc://test/foo?msg=abc&v=1").unwrap();
let desc = RendezvousDescriptor::new(&uri, "tcp", "127.0.0.1:0");
dir.publish(&desc).unwrap();
let got = dir.lookup(&uri).unwrap();
assert_eq!(got.address, "127.0.0.1:0");
dir.withdraw(&uri).unwrap();
let _ = fs::remove_dir_all(&tmp);
}
#[test]
fn lookup_missing() {
let tmp = std::env::temp_dir().join(format!("roplat-rdv-test2-{}", std::process::id()));
let dir = RendezvousDir::new(&tmp);
let uri = EndpointUri::parse("roplat-ipc://test/nope?msg=abc&v=1").unwrap();
match dir.lookup(&uri) {
Err(IpcError::NotReady) => {}
other => panic!("expected NotReady, got {other:?}"),
}
let _ = fs::remove_dir_all(&tmp);
}
}