#![allow(clippy::missing_safety_doc)]
#![expect(
clippy::undocumented_unsafe_blocks,
reason = "module-wide FFI safety contract documented in ffi::mod.rs preamble"
)]
use std::ffi::{c_char, c_int, CStr, CString};
use std::sync::Arc;
use std::time::Duration;
use parking_lot::{Mutex as ParkingMutex, RwLock as ParkingRwLock};
use crate::adapter::net::behavior::aggregator::{
FoldQueryClient, FoldQueryClientError, FoldQueryError, RegistryClient, RegistryClientError,
RegistryGroupSummary, RegistryRpcError, SummaryAnnouncement, DEFAULT_QUERY_DEADLINE,
DEFAULT_REGISTRY_DEADLINE,
};
use crate::adapter::net::{ChannelConfig, ChannelId, ChannelName, Visibility};
use super::mesh::MeshNodeHandle;
pub const NET_REGISTRY_ERR_UNKNOWN_KIND: i32 = 7;
pub const NET_REGISTRY_OK: i32 = 0;
pub const NET_REGISTRY_ERR_TRANSPORT: i32 = 1;
pub const NET_REGISTRY_ERR_CODEC: i32 = 2;
pub const NET_REGISTRY_ERR_UNKNOWN_TEMPLATE: i32 = 3;
pub const NET_REGISTRY_ERR_DUPLICATE_GROUP_NAME: i32 = 4;
pub const NET_REGISTRY_ERR_SPAWN_REJECTED: i32 = 5;
pub const NET_REGISTRY_ERR_SPAWN_NOT_SUPPORTED: i32 = 6;
pub const NET_REGISTRY_ERR_UNKNOWN_GROUP: i32 = 8;
pub const NET_REGISTRY_ERR_SCALE_REJECTED: i32 = 9;
pub const NET_REGISTRY_ERR_SCALE_NOT_SUPPORTED: i32 = 10;
pub const NET_REGISTRY_ERR_INVALID_ARGS: i32 = 99;
#[repr(i32)]
#[derive(Copy, Clone)]
pub enum NetVisibility {
Global = 0,
ParentVisible = 1,
Exported = 2,
SubnetLocal = 3,
}
impl NetVisibility {
fn from_raw(raw: i32) -> Option<Visibility> {
match raw {
0 => Some(Visibility::Global),
1 => Some(Visibility::ParentVisible),
2 => Some(Visibility::Exported),
3 => Some(Visibility::SubnetLocal),
_ => None,
}
}
#[allow(dead_code)] fn to_raw(v: Visibility) -> NetVisibility {
match v {
Visibility::Global => NetVisibility::Global,
Visibility::ParentVisible => NetVisibility::ParentVisible,
Visibility::Exported => NetVisibility::Exported,
Visibility::SubnetLocal => NetVisibility::SubnetLocal,
}
}
}
pub struct RegistryClientHandle {
client: ParkingRwLock<RegistryClient>,
last_error_detail: ParkingMutex<Option<CString>>,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_registry_client_new(
mesh_handle: *mut MeshNodeHandle,
) -> *mut RegistryClientHandle {
if mesh_handle.is_null() {
return std::ptr::null_mut();
}
let mesh_arc = unsafe { super::mesh::mesh_node_arc(&*mesh_handle) };
let boxed = Box::new(RegistryClientHandle {
client: ParkingRwLock::new(RegistryClient::new(mesh_arc)),
last_error_detail: ParkingMutex::new(None),
});
Box::into_raw(boxed)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_registry_client_free(handle: *mut RegistryClientHandle) {
if handle.is_null() {
return;
}
drop(unsafe { Box::from_raw(handle) });
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_registry_client_set_deadline(
handle: *mut RegistryClientHandle,
millis: u64,
) {
if handle.is_null() {
return;
}
let h: &RegistryClientHandle = unsafe { &*handle };
let deadline = if millis == 0 {
DEFAULT_REGISTRY_DEADLINE
} else {
Duration::from_millis(millis)
};
h.client.write().set_deadline_mut(deadline);
}
#[inline]
unsafe fn write_kind(out: *mut c_int, kind: c_int) {
if !out.is_null() {
unsafe { *out = kind };
}
}
#[inline]
unsafe fn cstr_arg(ptr: *const c_char, out: *mut c_int) -> Option<String> {
if ptr.is_null() {
unsafe { write_kind(out, NET_REGISTRY_ERR_INVALID_ARGS) };
return None;
}
match unsafe { CStr::from_ptr(ptr).to_str() } {
Ok(s) => Some(s.to_owned()),
Err(_) => {
unsafe { write_kind(out, NET_REGISTRY_ERR_INVALID_ARGS) };
None
}
}
}
#[inline]
unsafe fn json_to_raw(json: String, out: *mut c_int) -> *mut c_char {
match CString::new(json) {
Ok(s) => {
unsafe { write_kind(out, NET_REGISTRY_OK) };
s.into_raw()
}
Err(_) => {
unsafe { write_kind(out, NET_REGISTRY_ERR_CODEC) };
std::ptr::null_mut()
}
}
}
unsafe fn registry_op_json<F>(
handle: *mut RegistryClientHandle,
out_error_kind: *mut c_int,
op: F,
) -> *mut c_char
where
F: FnOnce(RegistryClient) -> Result<String, RegistryClientError>,
{
if handle.is_null() {
unsafe { write_kind(out_error_kind, NET_REGISTRY_ERR_INVALID_ARGS) };
return std::ptr::null_mut();
}
let h: &RegistryClientHandle = unsafe { &*handle };
let client = h.client.read().clone();
match op(client) {
Ok(json) => unsafe { json_to_raw(json, out_error_kind) },
Err(e) => {
let (kind, detail) = classify(&e);
store_error_detail(h, detail);
unsafe { write_kind(out_error_kind, kind) };
std::ptr::null_mut()
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_registry_client_list(
handle: *mut RegistryClientHandle,
target_node_id: u64,
out_error_kind: *mut c_int,
) -> *mut c_char {
if out_error_kind.is_null() {
return std::ptr::null_mut();
}
unsafe {
registry_op_json(handle, out_error_kind, |client| {
block_on(client.list(target_node_id)).map(|groups| groups_to_json(&groups))
})
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_registry_client_spawn(
handle: *mut RegistryClientHandle,
target_node_id: u64,
template_name: *const c_char,
group_name: *const c_char,
replica_count: u8,
out_error_kind: *mut c_int,
) -> *mut c_char {
let Some(template) = (unsafe { cstr_arg(template_name, out_error_kind) }) else {
return std::ptr::null_mut();
};
let Some(group) = (unsafe { cstr_arg(group_name, out_error_kind) }) else {
return std::ptr::null_mut();
};
unsafe {
registry_op_json(handle, out_error_kind, |client| {
block_on(client.spawn(target_node_id, template, group, replica_count))
.map(|summary| group_to_json(&summary))
})
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_registry_client_unregister(
handle: *mut RegistryClientHandle,
target_node_id: u64,
group_name: *const c_char,
out_error_kind: *mut c_int,
) -> c_int {
if handle.is_null() {
unsafe { write_kind(out_error_kind, NET_REGISTRY_ERR_INVALID_ARGS) };
return -1;
}
let Some(group) = (unsafe { cstr_arg(group_name, out_error_kind) }) else {
return -1;
};
let h: &RegistryClientHandle = unsafe { &*handle };
let client = h.client.read().clone();
match block_on(client.unregister(target_node_id, group)) {
Ok(existed) => {
unsafe { write_kind(out_error_kind, NET_REGISTRY_OK) };
if existed {
1
} else {
0
}
}
Err(e) => {
let (kind, detail) = classify(&e);
store_error_detail(h, detail);
unsafe { write_kind(out_error_kind, kind) };
-1
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_registry_last_error_detail(
handle: *mut RegistryClientHandle,
) -> *const c_char {
if handle.is_null() {
return std::ptr::null();
}
let h: &RegistryClientHandle = unsafe { &*handle };
let guard = h.last_error_detail.lock();
match guard.as_ref() {
Some(c) => c.as_ptr(),
None => std::ptr::null(),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_register_channel(
mesh_handle: *mut MeshNodeHandle,
name: *const c_char,
visibility: c_int,
) -> c_int {
if mesh_handle.is_null() || name.is_null() {
return NET_REGISTRY_ERR_INVALID_ARGS;
}
let vis = match NetVisibility::from_raw(visibility) {
Some(v) => v,
None => return NET_REGISTRY_ERR_INVALID_ARGS,
};
let name_str = match unsafe { CStr::from_ptr(name).to_str() } {
Ok(s) => s,
Err(_) => return NET_REGISTRY_ERR_INVALID_ARGS,
};
let channel = match ChannelName::new(name_str) {
Ok(c) => c,
Err(_) => return NET_REGISTRY_ERR_INVALID_ARGS,
};
let mesh_arc: Arc<crate::adapter::net::MeshNode> =
unsafe { super::mesh::mesh_node_arc(&*mesh_handle) };
let Some(configs) = mesh_arc.channel_configs() else {
return NET_REGISTRY_ERR_INVALID_ARGS;
};
let cfg = ChannelConfig::new(ChannelId::new(channel)).with_visibility(vis);
configs.insert(cfg);
NET_REGISTRY_OK
}
pub struct FoldQueryClientHandle {
client: ParkingRwLock<FoldQueryClient>,
last_error_detail: ParkingMutex<Option<CString>>,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_fold_query_client_new(
mesh_handle: *mut MeshNodeHandle,
) -> *mut FoldQueryClientHandle {
if mesh_handle.is_null() {
return std::ptr::null_mut();
}
let mesh_arc = unsafe { super::mesh::mesh_node_arc(&*mesh_handle) };
let boxed = Box::new(FoldQueryClientHandle {
client: ParkingRwLock::new(FoldQueryClient::new(mesh_arc)),
last_error_detail: ParkingMutex::new(None),
});
Box::into_raw(boxed)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_fold_query_client_free(handle: *mut FoldQueryClientHandle) {
if handle.is_null() {
return;
}
drop(unsafe { Box::from_raw(handle) });
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_fold_query_client_set_ttl(
handle: *mut FoldQueryClientHandle,
millis: u64,
) {
if handle.is_null() {
return;
}
let h: &FoldQueryClientHandle = unsafe { &*handle };
h.client.write().set_ttl_mut(Duration::from_millis(millis));
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_fold_query_client_set_deadline(
handle: *mut FoldQueryClientHandle,
millis: u64,
) {
if handle.is_null() {
return;
}
let h: &FoldQueryClientHandle = unsafe { &*handle };
let deadline = if millis == 0 {
DEFAULT_QUERY_DEADLINE
} else {
Duration::from_millis(millis)
};
h.client.write().set_deadline_mut(deadline);
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_fold_query_client_query_latest(
handle: *mut FoldQueryClientHandle,
target_node_id: u64,
kind: u16,
out_error_kind: *mut c_int,
) -> *mut c_char {
if out_error_kind.is_null() {
return std::ptr::null_mut();
}
unsafe {
fold_query_op_json(handle, out_error_kind, |client| {
block_on(client.query_latest(target_node_id, kind))
.map(|summaries| summaries_to_json(&summaries))
})
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_fold_query_client_query_summarize_now(
handle: *mut FoldQueryClientHandle,
target_node_id: u64,
kind: u16,
out_error_kind: *mut c_int,
) -> *mut c_char {
if out_error_kind.is_null() {
return std::ptr::null_mut();
}
unsafe {
fold_query_op_json(handle, out_error_kind, |client| {
block_on(client.query_summarize_now(target_node_id, kind))
.map(|summaries| summaries_to_json(&summaries))
})
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_fold_query_client_invalidate_cache(
handle: *mut FoldQueryClientHandle,
) {
if handle.is_null() {
return;
}
let h: &FoldQueryClientHandle = unsafe { &*handle };
h.client.read().invalidate_cache();
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_fold_query_client_invalidate_target(
handle: *mut FoldQueryClientHandle,
target_node_id: u64,
) {
if handle.is_null() {
return;
}
let h: &FoldQueryClientHandle = unsafe { &*handle };
h.client.read().invalidate_target(target_node_id);
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_fold_query_last_error_detail(
handle: *mut FoldQueryClientHandle,
) -> *const c_char {
if handle.is_null() {
return std::ptr::null();
}
let h: &FoldQueryClientHandle = unsafe { &*handle };
let guard = h.last_error_detail.lock();
match guard.as_ref() {
Some(c) => c.as_ptr(),
None => std::ptr::null(),
}
}
fn block_on<F: std::future::Future>(future: F) -> F::Output {
super::mesh::block_on(future)
}
unsafe fn fold_query_op_json<F>(
handle: *mut FoldQueryClientHandle,
out_error_kind: *mut c_int,
op: F,
) -> *mut c_char
where
F: FnOnce(FoldQueryClient) -> Result<String, FoldQueryClientError>,
{
if handle.is_null() {
unsafe { write_kind(out_error_kind, NET_REGISTRY_ERR_INVALID_ARGS) };
return std::ptr::null_mut();
}
let h: &FoldQueryClientHandle = unsafe { &*handle };
let client = h.client.read().clone();
match op(client) {
Ok(json) => unsafe { json_to_raw(json, out_error_kind) },
Err(e) => {
let (kind, detail) = classify_fold_query(&e);
store_fold_query_error_detail(h, detail);
unsafe { write_kind(out_error_kind, kind) };
std::ptr::null_mut()
}
}
}
fn classify_fold_query(err: &FoldQueryClientError) -> (i32, String) {
match err {
FoldQueryClientError::Transport(e) => (NET_REGISTRY_ERR_TRANSPORT, format!("{e}")),
FoldQueryClientError::Codec(c) => (NET_REGISTRY_ERR_CODEC, c.clone()),
FoldQueryClientError::Server(FoldQueryError::UnknownKind { kind }) => (
NET_REGISTRY_ERR_UNKNOWN_KIND,
format!("unknown fold kind: 0x{kind:04x}"),
),
FoldQueryClientError::Server(FoldQueryError::DecodeFailed(s)) => {
(NET_REGISTRY_ERR_CODEC, format!("server decode: {s}"))
}
}
}
fn store_fold_query_error_detail(h: &FoldQueryClientHandle, detail: String) {
let c = match CString::new(detail) {
Ok(c) => c,
Err(_) => CString::new("invalid utf-8 in error detail").unwrap_or_default(),
};
*h.last_error_detail.lock() = Some(c);
}
fn summaries_to_json(summaries: &[SummaryAnnouncement]) -> String {
let wire: Vec<SummaryWire<'_>> = summaries.iter().map(SummaryWire::from).collect();
serde_json::to_string(&wire).unwrap_or_else(|_| "[]".to_string())
}
#[cfg(test)]
fn summary_to_json(s: &SummaryAnnouncement) -> String {
serde_json::to_string(&SummaryWire::from(s)).unwrap_or_else(|_| "{}".to_string())
}
#[derive(serde::Serialize)]
struct SummaryWire<'a> {
fold_kind: u16,
source_subnet: String,
generation: u64,
buckets: Vec<BucketWire<'a>>,
}
#[derive(serde::Serialize)]
struct BucketWire<'a> {
name: &'a str,
count: u64,
}
impl<'a> From<&'a SummaryAnnouncement> for SummaryWire<'a> {
fn from(s: &'a SummaryAnnouncement) -> Self {
Self {
fold_kind: s.fold_kind,
source_subnet: format!("{}", s.source_subnet),
generation: s.generation,
buckets: s
.buckets
.iter()
.map(|(n, c)| BucketWire {
name: n.as_str(),
count: *c,
})
.collect(),
}
}
}
fn classify(err: &RegistryClientError) -> (i32, String) {
match err {
RegistryClientError::Transport(e) => (NET_REGISTRY_ERR_TRANSPORT, format!("{e}")),
RegistryClientError::Codec(c) => (NET_REGISTRY_ERR_CODEC, c.clone()),
RegistryClientError::Server(RegistryRpcError::DecodeFailed(s)) => {
(NET_REGISTRY_ERR_CODEC, format!("server decode: {s}"))
}
RegistryClientError::Server(RegistryRpcError::UnknownTemplate(t)) => (
NET_REGISTRY_ERR_UNKNOWN_TEMPLATE,
format!("unknown template: {t}"),
),
RegistryClientError::Server(RegistryRpcError::DuplicateGroupName(n)) => (
NET_REGISTRY_ERR_DUPLICATE_GROUP_NAME,
format!("duplicate group name: {n}"),
),
RegistryClientError::Server(RegistryRpcError::SpawnRejected(d)) => (
NET_REGISTRY_ERR_SPAWN_REJECTED,
format!("spawn rejected: {d}"),
),
RegistryClientError::Server(RegistryRpcError::SpawnNotSupported) => (
NET_REGISTRY_ERR_SPAWN_NOT_SUPPORTED,
"daemon is read-only (no spawn handler installed)".to_string(),
),
RegistryClientError::Server(RegistryRpcError::UnknownGroup(g)) => (
NET_REGISTRY_ERR_UNKNOWN_GROUP,
format!("unknown group: {g}"),
),
RegistryClientError::Server(RegistryRpcError::ScaleRejected(d)) => (
NET_REGISTRY_ERR_SCALE_REJECTED,
format!("scale rejected: {d}"),
),
RegistryClientError::Server(RegistryRpcError::ScaleNotSupported) => (
NET_REGISTRY_ERR_SCALE_NOT_SUPPORTED,
"daemon doesn't accept dynamic scale (no scaler installed)".to_string(),
),
}
}
fn store_error_detail(h: &RegistryClientHandle, detail: String) {
let c = match CString::new(detail) {
Ok(c) => c,
Err(_) => CString::new("invalid utf-8 in error detail").unwrap_or_default(),
};
*h.last_error_detail.lock() = Some(c);
}
fn groups_to_json(groups: &[RegistryGroupSummary]) -> String {
let wire: Vec<GroupWire<'_>> = groups.iter().map(GroupWire::from).collect();
serde_json::to_string(&wire).unwrap_or_else(|_| "[]".to_string())
}
fn group_to_json(g: &RegistryGroupSummary) -> String {
serde_json::to_string(&GroupWire::from(g)).unwrap_or_else(|_| "{}".to_string())
}
#[derive(serde::Serialize)]
struct GroupWire<'a> {
name: &'a str,
group_seed_hex: String,
replicas: Vec<ReplicaWire<'a>>,
}
#[derive(serde::Serialize)]
struct ReplicaWire<'a> {
generation: u64,
healthy: bool,
diagnostic: Option<&'a str>,
placement_node_id: Option<u64>,
}
impl<'a> From<&'a RegistryGroupSummary> for GroupWire<'a> {
fn from(g: &'a RegistryGroupSummary) -> Self {
Self {
name: g.name.as_str(),
group_seed_hex: hex::encode(g.group_seed),
replicas: g
.replicas
.iter()
.map(|r| ReplicaWire {
generation: r.generation,
healthy: r.healthy,
diagnostic: r.diagnostic.as_deref(),
placement_node_id: r.placement_node_id,
})
.collect(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn visibility_round_trips_through_raw() {
for (raw, expected) in [
(0, Visibility::Global),
(1, Visibility::ParentVisible),
(2, Visibility::Exported),
(3, Visibility::SubnetLocal),
] {
let back = NetVisibility::from_raw(raw).expect("known discriminant");
assert_eq!(format!("{back:?}"), format!("{expected:?}"));
}
assert!(NetVisibility::from_raw(99).is_none());
assert!(NetVisibility::from_raw(-1).is_none());
}
#[test]
fn group_to_json_includes_every_documented_field() {
let g = RegistryGroupSummary {
name: "alpha".into(),
group_seed: [0xABu8; 32],
source_subnet: crate::adapter::net::subnet::SubnetId::GLOBAL,
fold_kinds: vec![0x0001],
replicas: vec![
crate::adapter::net::behavior::aggregator::RegistryReplicaSummary {
generation: 42,
healthy: true,
diagnostic: None,
placement_node_id: Some(0xBEEF),
},
crate::adapter::net::behavior::aggregator::RegistryReplicaSummary {
generation: 0,
healthy: false,
diagnostic: Some("stuck".into()),
placement_node_id: None,
},
],
};
let json = group_to_json(&g);
assert!(json.contains("\"name\":\"alpha\""));
assert!(json.contains("\"group_seed_hex\":\"abababababababababababababababababababababababababababababababab\""));
assert!(json.contains("\"generation\":42"));
assert!(json.contains("\"healthy\":true"));
assert!(json.contains("\"diagnostic\":null"));
assert!(json.contains("\"placement_node_id\":48879"));
assert!(json.contains("\"healthy\":false"));
assert!(json.contains("\"diagnostic\":\"stuck\""));
assert!(json.contains("\"placement_node_id\":null"));
}
#[test]
fn summary_to_json_includes_every_documented_field() {
let s = SummaryAnnouncement {
fold_kind: 0x42,
source_subnet: crate::adapter::net::subnet::SubnetId::GLOBAL,
generation: 7,
buckets: vec![("alpha".into(), 1), ("beta".into(), 2)],
};
let json = summary_to_json(&s);
assert!(json.contains("\"fold_kind\":66"));
assert!(json.contains("\"source_subnet\":\"global\""));
assert!(json.contains("\"generation\":7"));
assert!(json.contains("\"name\":\"alpha\""));
assert!(json.contains("\"count\":1"));
assert!(json.contains("\"name\":\"beta\""));
assert!(json.contains("\"count\":2"));
}
#[test]
fn classify_fold_query_maps_every_variant() {
use crate::adapter::net::mesh_rpc::RpcError;
let transport = FoldQueryClientError::Transport(RpcError::NoRoute {
target: 0,
reason: String::new(),
});
assert_eq!(
classify_fold_query(&transport).0,
NET_REGISTRY_ERR_TRANSPORT
);
let codec = FoldQueryClientError::Codec("bad".into());
assert_eq!(classify_fold_query(&codec).0, NET_REGISTRY_ERR_CODEC);
let unknown_kind = FoldQueryClientError::Server(FoldQueryError::UnknownKind { kind: 0x42 });
let (kind_code, detail) = classify_fold_query(&unknown_kind);
assert_eq!(kind_code, NET_REGISTRY_ERR_UNKNOWN_KIND);
assert!(detail.contains("0x0042"));
let decode_failed =
FoldQueryClientError::Server(FoldQueryError::DecodeFailed("boom".into()));
assert_eq!(
classify_fold_query(&decode_failed).0,
NET_REGISTRY_ERR_CODEC,
);
}
}