#![allow(clippy::missing_safety_doc)]
#![expect(
clippy::undocumented_unsafe_blocks,
reason = "module-wide FFI safety contract documented in the # Safety preamble above"
)]
#![expect(
clippy::multiple_unsafe_ops_per_block,
reason = "FFI entry points routinely deref + write to multiple out-parameter fields under the same caller contract; splitting per-op would obscure the single boundary-cross"
)]
use std::ffi::{c_char, c_int, CStr, CString};
use std::mem::ManuallyDrop;
use std::sync::Arc;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use tokio::runtime::Runtime;
use crate::adapter::net::identity::{
EntityId, PermissionToken, TokenCache, TokenError as CoreTokenError, TokenScope,
};
use crate::adapter::net::{
ChannelConfig as InnerChannelConfig, ChannelConfigRegistry, ChannelHash, ChannelId,
ChannelName as InnerChannelName, ChannelPublisher, EntityKeypair, MeshNode, MeshNodeConfig,
OnFailure as InnerOnFailure, PublishConfig as InnerPublishConfig,
PublishReport as InnerPublishReport, Reliability, Stream as CoreStream, StreamConfig,
StreamError, Visibility as InnerVisibility, DEFAULT_STREAM_WINDOW_BYTES,
};
use crate::adapter::net::{SubnetId, SubnetPolicy, SubnetRule};
use crate::adapter::Adapter;
use crate::error::AdapterError;
use super::handle_guard::{HandleGuard, FFI_HANDLE_FREE_DEADLINE};
use super::NetError;
pub(crate) const NET_ERR_MESH_INIT: c_int = -110;
pub(crate) const NET_ERR_MESH_HANDSHAKE: c_int = -111;
pub(crate) const NET_ERR_MESH_BACKPRESSURE: c_int = -112;
pub(crate) const NET_ERR_MESH_NOT_CONNECTED: c_int = -113;
pub(crate) const NET_ERR_MESH_TRANSPORT: c_int = -114;
pub(crate) const NET_ERR_CHANNEL: c_int = -115;
pub(crate) const NET_ERR_CHANNEL_AUTH: c_int = -116;
pub(crate) const NET_ERR_IDENTITY: c_int = -120;
pub(crate) const NET_ERR_TOKEN_INVALID_FORMAT: c_int = -121;
pub(crate) const NET_ERR_TOKEN_INVALID_SIGNATURE: c_int = -122;
pub(crate) const NET_ERR_TOKEN_EXPIRED: c_int = -123;
pub(crate) const NET_ERR_TOKEN_NOT_YET_VALID: c_int = -124;
pub(crate) const NET_ERR_TOKEN_DELEGATION_EXHAUSTED: c_int = -125;
pub(crate) const NET_ERR_TOKEN_DELEGATION_NOT_ALLOWED: c_int = -126;
pub(crate) const NET_ERR_TOKEN_NOT_AUTHORIZED: c_int = -127;
#[cfg(feature = "nat-traversal")]
pub(crate) const NET_ERR_TRAVERSAL_REFLEX_TIMEOUT: c_int = -130;
#[cfg(feature = "nat-traversal")]
pub(crate) const NET_ERR_TRAVERSAL_PEER_NOT_REACHABLE: c_int = -131;
#[cfg(feature = "nat-traversal")]
pub(crate) const NET_ERR_TRAVERSAL_TRANSPORT: c_int = -132;
#[cfg(feature = "nat-traversal")]
pub(crate) const NET_ERR_TRAVERSAL_RENDEZVOUS_NO_RELAY: c_int = -133;
#[cfg(feature = "nat-traversal")]
pub(crate) const NET_ERR_TRAVERSAL_RENDEZVOUS_REJECTED: c_int = -134;
#[cfg(feature = "nat-traversal")]
pub(crate) const NET_ERR_TRAVERSAL_PUNCH_FAILED: c_int = -135;
#[cfg(feature = "nat-traversal")]
pub(crate) const NET_ERR_TRAVERSAL_PORT_MAP_UNAVAILABLE: c_int = -136;
pub(crate) const NET_ERR_TRAVERSAL_UNSUPPORTED: c_int = -137;
#[cfg(feature = "nat-traversal")]
fn traversal_err_to_code(e: &crate::adapter::net::traversal::TraversalError) -> c_int {
use crate::adapter::net::traversal::TraversalError;
match e {
TraversalError::ReflexTimeout => NET_ERR_TRAVERSAL_REFLEX_TIMEOUT,
TraversalError::PeerNotReachable => NET_ERR_TRAVERSAL_PEER_NOT_REACHABLE,
TraversalError::Transport(_) => NET_ERR_TRAVERSAL_TRANSPORT,
TraversalError::RendezvousNoRelay => NET_ERR_TRAVERSAL_RENDEZVOUS_NO_RELAY,
TraversalError::RendezvousRejected(_) => NET_ERR_TRAVERSAL_RENDEZVOUS_REJECTED,
TraversalError::PunchFailed => NET_ERR_TRAVERSAL_PUNCH_FAILED,
TraversalError::PortMapUnavailable => NET_ERR_TRAVERSAL_PORT_MAP_UNAVAILABLE,
TraversalError::Unsupported => NET_ERR_TRAVERSAL_UNSUPPORTED,
}
}
#[cfg(feature = "nat-traversal")]
fn nat_class_to_str(class: crate::adapter::net::traversal::classify::NatClass) -> &'static str {
use crate::adapter::net::traversal::classify::NatClass;
match class {
NatClass::Open => "open",
NatClass::Cone => "cone",
NatClass::Symmetric => "symmetric",
NatClass::Unknown => "unknown",
}
}
fn token_err_to_code(e: &CoreTokenError) -> c_int {
match e {
CoreTokenError::InvalidFormat => NET_ERR_TOKEN_INVALID_FORMAT,
CoreTokenError::InvalidSignature => NET_ERR_TOKEN_INVALID_SIGNATURE,
CoreTokenError::Expired => NET_ERR_TOKEN_EXPIRED,
CoreTokenError::NotYetValid => NET_ERR_TOKEN_NOT_YET_VALID,
CoreTokenError::DelegationExhausted => NET_ERR_TOKEN_DELEGATION_EXHAUSTED,
CoreTokenError::DelegationNotAllowed => NET_ERR_TOKEN_DELEGATION_NOT_ALLOWED,
CoreTokenError::NotAuthorized => NET_ERR_TOKEN_NOT_AUTHORIZED,
CoreTokenError::ReadOnly => NET_ERR_IDENTITY,
CoreTokenError::ZeroTtl => NET_ERR_TOKEN_INVALID_FORMAT,
}
}
fn runtime() -> &'static Arc<Runtime> {
use std::sync::OnceLock;
static RT: OnceLock<Arc<Runtime>> = OnceLock::new();
RT.get_or_init(|| {
match tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
{
Ok(rt) => Arc::new(rt),
Err(e) => {
eprintln!(
"FATAL: mesh FFI tokio runtime build failure ({e:?}); aborting to avoid panic across the FFI boundary"
);
std::process::abort();
}
}
})
}
pub(super) fn block_on<F: std::future::Future>(future: F) -> F::Output {
if tokio::runtime::Handle::try_current().is_ok() {
eprintln!(
"FATAL: mesh FFI called from inside a tokio runtime context; \
aborting to avoid runtime-in-runtime panic across the FFI boundary"
);
std::process::abort();
}
runtime().block_on(future)
}
#[inline]
pub(super) unsafe fn c_str_to_string(p: *const c_char) -> Option<String> {
if p.is_null() {
return None;
}
CStr::from_ptr(p).to_str().ok().map(str::to_owned)
}
fn write_json_out<T: Serialize>(
value: &T,
out_ptr: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if out_ptr.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let Ok(s) = serde_json::to_string(value) else {
return NetError::Unknown.into();
};
let len = s.len();
let Ok(cs) = CString::new(s) else {
return NetError::Unknown.into();
};
unsafe {
*out_ptr = cs.into_raw();
*out_len = len;
}
0
}
pub(super) fn write_string_out(s: String, out_ptr: *mut *mut c_char, out_len: *mut usize) -> c_int {
if out_ptr.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let len = s.len();
let Ok(cs) = CString::new(s) else {
return NetError::Unknown.into();
};
unsafe {
*out_ptr = cs.into_raw();
*out_len = len;
}
0
}
fn adapter_err_to_code(err: &AdapterError) -> c_int {
match err {
AdapterError::Connection(_) => NET_ERR_MESH_HANDSHAKE,
_ => NET_ERR_MESH_TRANSPORT,
}
}
fn stream_err_to_code(err: &StreamError) -> c_int {
match err {
StreamError::Backpressure => NET_ERR_MESH_BACKPRESSURE,
StreamError::NotConnected => NET_ERR_MESH_NOT_CONNECTED,
StreamError::Transport(_) => NET_ERR_MESH_TRANSPORT,
}
}
#[derive(Deserialize)]
struct SubnetPolicyJson {
#[serde(default)]
rules: Vec<SubnetRuleJson>,
}
#[derive(Deserialize)]
struct SubnetRuleJson {
tag_prefix: String,
level: u32,
#[serde(default)]
values: std::collections::HashMap<String, u32>,
}
fn u8_from_u32(value: u32) -> Option<u8> {
if value > 255 {
None
} else {
Some(value as u8)
}
}
fn subnet_id_from_json(levels: Vec<u32>) -> Option<SubnetId> {
if levels.is_empty() || levels.len() > 4 {
return None;
}
let mut bytes = [0u8; 4];
for (i, raw) in levels.iter().enumerate() {
bytes[i] = u8_from_u32(*raw)?;
}
Some(SubnetId::new(&bytes[..levels.len()]))
}
fn subnet_policy_from_json(p: SubnetPolicyJson) -> Option<SubnetPolicy> {
let mut policy = SubnetPolicy::new();
for rule_json in p.rules {
let level = u8_from_u32(rule_json.level)?;
if level > 3 {
return None;
}
let mut rule = SubnetRule::new(rule_json.tag_prefix, level);
for (tag_value, raw_val) in rule_json.values {
let v = u8_from_u32(raw_val)?;
if v == 0 {
return None;
}
rule = rule.map(tag_value, v);
}
policy = policy.add_rule(rule);
}
Some(policy)
}
#[derive(Deserialize)]
struct MeshNewConfig {
bind_addr: String,
psk_hex: String,
heartbeat_ms: Option<u64>,
session_timeout_ms: Option<u64>,
num_shards: Option<u16>,
capability_gc_interval_ms: Option<u64>,
require_signed_capabilities: Option<bool>,
subnet: Option<Vec<u32>>,
subnet_policy: Option<SubnetPolicyJson>,
identity_seed_hex: Option<String>,
#[serde(default)]
reflex_override: Option<String>,
#[serde(default)]
try_port_mapping: bool,
}
pub struct MeshNodeHandle {
inner: ManuallyDrop<Arc<MeshNode>>,
channel_configs: ManuallyDrop<Arc<ChannelConfigRegistry>>,
guard: HandleGuard,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_new(
config_json: *const c_char,
out_handle: *mut *mut MeshNodeHandle,
) -> c_int {
if config_json.is_null() || out_handle.is_null() {
return NetError::NullPointer.into();
}
let Some(s) = (unsafe { c_str_to_string(config_json) }) else {
return NetError::InvalidUtf8.into();
};
let cfg: MeshNewConfig = match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
};
let bind_addr: std::net::SocketAddr = match cfg.bind_addr.parse() {
Ok(a) => a,
Err(_) => return NET_ERR_MESH_INIT,
};
let psk_bytes = match hex::decode(&cfg.psk_hex) {
Ok(b) => b,
Err(_) => return NET_ERR_MESH_INIT,
};
if psk_bytes.len() != 32 {
return NET_ERR_MESH_INIT;
}
let mut psk = [0u8; 32];
psk.copy_from_slice(&psk_bytes);
let mut node_cfg = MeshNodeConfig::new(bind_addr, psk);
if let Some(ms) = cfg.heartbeat_ms {
if ms == 0 {
return NetError::InvalidJson.into();
}
node_cfg = node_cfg.with_heartbeat_interval(std::time::Duration::from_millis(ms));
}
if let Some(ms) = cfg.session_timeout_ms {
if ms == 0 {
return NetError::InvalidJson.into();
}
node_cfg = node_cfg.with_session_timeout(std::time::Duration::from_millis(ms));
}
if let Some(n) = cfg.num_shards {
node_cfg = node_cfg.with_num_shards(n);
}
if let Some(ms) = cfg.capability_gc_interval_ms {
node_cfg = node_cfg.with_capability_gc_interval(std::time::Duration::from_millis(ms));
}
if let Some(b) = cfg.require_signed_capabilities {
node_cfg = node_cfg.with_require_signed_capabilities(b);
}
if let Some(levels) = cfg.subnet {
let Some(id) = subnet_id_from_json(levels) else {
return NET_ERR_MESH_INIT;
};
node_cfg = node_cfg.with_subnet(id);
}
if let Some(policy_js) = cfg.subnet_policy {
let Some(policy) = subnet_policy_from_json(policy_js) else {
return NET_ERR_MESH_INIT;
};
node_cfg = node_cfg.with_subnet_policy(Arc::new(policy));
}
#[cfg(feature = "nat-traversal")]
if let Some(external_str) = cfg.reflex_override.as_deref() {
let Ok(external) = external_str.parse::<std::net::SocketAddr>() else {
return NET_ERR_MESH_INIT;
};
node_cfg = node_cfg.with_reflex_override(external);
}
#[cfg(not(feature = "nat-traversal"))]
let _ = cfg.reflex_override;
#[cfg(feature = "port-mapping")]
if cfg.try_port_mapping {
node_cfg = node_cfg.with_try_port_mapping(true);
}
#[cfg(not(feature = "port-mapping"))]
let _ = cfg.try_port_mapping;
let identity = match cfg.identity_seed_hex {
Some(seed_hex) => {
let bytes = match hex::decode(&seed_hex) {
Ok(b) => b,
Err(_) => return NET_ERR_MESH_INIT,
};
if bytes.len() != 32 {
return NET_ERR_MESH_INIT;
}
let mut arr = [0u8; 32];
arr.copy_from_slice(&bytes);
EntityKeypair::from_bytes(arr)
}
None => EntityKeypair::generate(),
};
let result = block_on(async move { MeshNode::new(identity, node_cfg).await });
match result {
Ok(mut node) => {
let channel_configs = Arc::new(ChannelConfigRegistry::new());
node.set_channel_configs(channel_configs.clone());
node.set_token_cache(Arc::new(TokenCache::new()));
let handle = Box::new(MeshNodeHandle {
inner: ManuallyDrop::new(Arc::new(node)),
channel_configs: ManuallyDrop::new(channel_configs),
guard: HandleGuard::new(),
});
unsafe {
*out_handle = Box::into_raw(handle);
}
0
}
Err(_) => NET_ERR_MESH_INIT,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_free(handle: *mut MeshNodeHandle) {
if handle.is_null() {
return;
}
let h: &MeshNodeHandle = unsafe { &*handle };
if h.guard.begin_free(FFI_HANDLE_FREE_DEADLINE) {
unsafe {
let mh = &mut *handle;
let inner = ManuallyDrop::take(&mut mh.inner);
let configs = ManuallyDrop::take(&mut mh.channel_configs);
drop(inner);
drop(configs);
}
} else {
tracing::warn!(
"net_mesh_free: in-flight ops did not drain within deadline; \
leaking inner to avoid use-after-free"
);
}
}
pub(super) fn mesh_node_arc(h: &MeshNodeHandle) -> Arc<MeshNode> {
Arc::clone(&h.inner)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_arc_clone(handle: *mut MeshNodeHandle) -> *mut Arc<MeshNode> {
if handle.is_null() {
return std::ptr::null_mut();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return std::ptr::null_mut(),
};
let cloned: Arc<MeshNode> = Arc::clone(&h.inner);
Box::into_raw(Box::new(cloned))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_channel_configs_arc_clone(
handle: *mut MeshNodeHandle,
) -> *mut Arc<ChannelConfigRegistry> {
if handle.is_null() {
return std::ptr::null_mut();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return std::ptr::null_mut(),
};
let cloned: Arc<ChannelConfigRegistry> = Arc::clone(&h.channel_configs);
Box::into_raw(Box::new(cloned))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_arc_free(p: *mut Arc<MeshNode>) {
if p.is_null() {
return;
}
unsafe {
drop(Box::from_raw(p));
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_channel_configs_arc_free(p: *mut Arc<ChannelConfigRegistry>) {
if p.is_null() {
return;
}
unsafe {
drop(Box::from_raw(p));
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_public_key_hex(
handle: *mut MeshNodeHandle,
out_ptr: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if handle.is_null() || out_ptr.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let s = hex::encode(h.inner.public_key());
write_string_out(s, out_ptr, out_len)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_node_id(handle: *mut MeshNodeHandle) -> u64 {
if handle.is_null() {
return 0;
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return 0,
};
h.inner.node_id()
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_entity_id(handle: *mut MeshNodeHandle, out: *mut u8) -> c_int {
if handle.is_null() || out.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let bytes = h.inner.entity_id().as_bytes();
unsafe {
std::ptr::copy_nonoverlapping(bytes.as_ptr(), out, 32);
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_connect(
handle: *mut MeshNodeHandle,
peer_addr: *const c_char,
peer_pubkey_hex: *const c_char,
peer_node_id: u64,
) -> c_int {
if handle.is_null() || peer_addr.is_null() || peer_pubkey_hex.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(addr_s) = (unsafe { c_str_to_string(peer_addr) }) else {
return NetError::InvalidUtf8.into();
};
let addr: std::net::SocketAddr = match addr_s.parse() {
Ok(a) => a,
Err(_) => return NET_ERR_MESH_HANDSHAKE,
};
let Some(pk_s) = (unsafe { c_str_to_string(peer_pubkey_hex) }) else {
return NetError::InvalidUtf8.into();
};
let pk_bytes = match hex::decode(pk_s) {
Ok(b) => b,
Err(_) => return NET_ERR_MESH_HANDSHAKE,
};
if pk_bytes.len() != 32 {
return NET_ERR_MESH_HANDSHAKE;
}
let mut pk = [0u8; 32];
pk.copy_from_slice(&pk_bytes);
let node = h.inner.clone();
match block_on(async move { node.connect(addr, &pk, peer_node_id).await }) {
Ok(_) => 0,
Err(e) => adapter_err_to_code(&e),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_accept(
handle: *mut MeshNodeHandle,
peer_node_id: u64,
out_addr: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if handle.is_null() || out_addr.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let node = h.inner.clone();
match block_on(async move { node.accept(peer_node_id).await }) {
Ok((addr, _)) => write_string_out(addr.to_string(), out_addr, out_len),
Err(e) => adapter_err_to_code(&e),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_start(handle: *mut MeshNodeHandle) -> c_int {
if handle.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let node = h.inner.clone();
block_on(async move { node.start() });
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_shutdown(handle: *mut MeshNodeHandle) -> c_int {
if handle.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
match block_on(async { h.inner.shutdown().await }) {
Ok(()) => 0,
Err(e) => adapter_err_to_code(&e),
}
}
#[cfg(feature = "nat-traversal")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_nat_type(
handle: *mut MeshNodeHandle,
out_str: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if handle.is_null() || out_str.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
write_string_out(
nat_class_to_str(h.inner.nat_class()).to_string(),
out_str,
out_len,
)
}
#[cfg(feature = "nat-traversal")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_reflex_addr(
handle: *mut MeshNodeHandle,
out_str: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if handle.is_null() || out_str.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let s = h
.inner
.reflex_addr()
.map(|a| a.to_string())
.unwrap_or_default();
write_string_out(s, out_str, out_len)
}
#[cfg(feature = "nat-traversal")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_peer_nat_type(
handle: *mut MeshNodeHandle,
peer_node_id: u64,
out_str: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if handle.is_null() || out_str.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
write_string_out(
nat_class_to_str(h.inner.peer_nat_class(peer_node_id)).to_string(),
out_str,
out_len,
)
}
#[cfg(feature = "nat-traversal")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_probe_reflex(
handle: *mut MeshNodeHandle,
peer_node_id: u64,
out_str: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if handle.is_null() || out_str.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let node = h.inner.clone();
match block_on(async move { node.probe_reflex(peer_node_id).await }) {
Ok(addr) => write_string_out(addr.to_string(), out_str, out_len),
Err(e) => traversal_err_to_code(&e),
}
}
#[cfg(feature = "nat-traversal")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_reclassify_nat(handle: *mut MeshNodeHandle) -> c_int {
if handle.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let node = h.inner.clone();
block_on(async move { node.reclassify_nat().await });
0
}
#[cfg(feature = "nat-traversal")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_traversal_stats(
handle: *mut MeshNodeHandle,
out_punches_attempted: *mut u64,
out_punches_succeeded: *mut u64,
out_relay_fallbacks: *mut u64,
) -> c_int {
if handle.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let snap = h.inner.traversal_stats();
unsafe {
if !out_punches_attempted.is_null() {
*out_punches_attempted = snap.punches_attempted;
}
if !out_punches_succeeded.is_null() {
*out_punches_succeeded = snap.punches_succeeded;
}
if !out_relay_fallbacks.is_null() {
*out_relay_fallbacks = snap.relay_fallbacks;
}
}
0
}
#[cfg(feature = "nat-traversal")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_connect_direct(
handle: *mut MeshNodeHandle,
peer_node_id: u64,
peer_pubkey_hex: *const c_char,
coordinator: u64,
) -> c_int {
if handle.is_null() || peer_pubkey_hex.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(pk_s) = (unsafe { c_str_to_string(peer_pubkey_hex) }) else {
return NetError::InvalidUtf8.into();
};
let pk_bytes = match hex::decode(pk_s) {
Ok(b) => b,
Err(_) => return NET_ERR_MESH_HANDSHAKE,
};
if pk_bytes.len() != 32 {
return NET_ERR_MESH_HANDSHAKE;
}
let mut pk = [0u8; 32];
pk.copy_from_slice(&pk_bytes);
let node = h.inner.clone();
match block_on(async move { node.connect_direct(peer_node_id, &pk, coordinator).await }) {
Ok(_) => 0,
Err(e) => traversal_err_to_code(&e),
}
}
#[cfg(feature = "nat-traversal")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_set_reflex_override(
handle: *mut MeshNodeHandle,
external: *const c_char,
) -> c_int {
if handle.is_null() || external.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(s) = (unsafe { c_str_to_string(external) }) else {
return NetError::InvalidUtf8.into();
};
let Ok(addr) = s.parse::<std::net::SocketAddr>() else {
return NET_ERR_MESH_INIT;
};
h.inner.set_reflex_override(addr);
0
}
#[cfg(feature = "nat-traversal")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_clear_reflex_override(handle: *mut MeshNodeHandle) -> c_int {
if handle.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
h.inner.clear_reflex_override();
0
}
#[cfg(not(feature = "nat-traversal"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_nat_type(
_handle: *mut MeshNodeHandle,
_out_str: *mut *mut c_char,
_out_len: *mut usize,
) -> c_int {
NET_ERR_TRAVERSAL_UNSUPPORTED
}
#[cfg(not(feature = "nat-traversal"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_reflex_addr(
_handle: *mut MeshNodeHandle,
_out_str: *mut *mut c_char,
_out_len: *mut usize,
) -> c_int {
NET_ERR_TRAVERSAL_UNSUPPORTED
}
#[cfg(not(feature = "nat-traversal"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_peer_nat_type(
_handle: *mut MeshNodeHandle,
_peer_node_id: u64,
_out_str: *mut *mut c_char,
_out_len: *mut usize,
) -> c_int {
NET_ERR_TRAVERSAL_UNSUPPORTED
}
#[cfg(not(feature = "nat-traversal"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_probe_reflex(
_handle: *mut MeshNodeHandle,
_peer_node_id: u64,
_out_str: *mut *mut c_char,
_out_len: *mut usize,
) -> c_int {
NET_ERR_TRAVERSAL_UNSUPPORTED
}
#[cfg(not(feature = "nat-traversal"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_reclassify_nat(_handle: *mut MeshNodeHandle) -> c_int {
NET_ERR_TRAVERSAL_UNSUPPORTED
}
#[cfg(not(feature = "nat-traversal"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_traversal_stats(
_handle: *mut MeshNodeHandle,
_out_punches_attempted: *mut u64,
_out_punches_succeeded: *mut u64,
_out_relay_fallbacks: *mut u64,
) -> c_int {
NET_ERR_TRAVERSAL_UNSUPPORTED
}
#[cfg(not(feature = "nat-traversal"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_connect_direct(
_handle: *mut MeshNodeHandle,
_peer_node_id: u64,
_peer_pubkey_hex: *const c_char,
_coordinator: u64,
) -> c_int {
NET_ERR_TRAVERSAL_UNSUPPORTED
}
#[cfg(not(feature = "nat-traversal"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_set_reflex_override(
_handle: *mut MeshNodeHandle,
_external: *const c_char,
) -> c_int {
NET_ERR_TRAVERSAL_UNSUPPORTED
}
#[cfg(not(feature = "nat-traversal"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_clear_reflex_override(_handle: *mut MeshNodeHandle) -> c_int {
NET_ERR_TRAVERSAL_UNSUPPORTED
}
#[derive(Deserialize, Default)]
struct StreamOpenConfig {
reliability: Option<String>,
window_bytes: Option<u32>,
fairness_weight: Option<u8>,
}
pub struct MeshStreamHandle {
stream: ManuallyDrop<CoreStream>,
_node: ManuallyDrop<Arc<MeshNode>>,
guard: HandleGuard,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_open_stream(
handle: *mut MeshNodeHandle,
peer_node_id: u64,
stream_id: u64,
config_json: *const c_char,
out_stream: *mut *mut MeshStreamHandle,
) -> c_int {
if handle.is_null() || out_stream.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let cfg_json: StreamOpenConfig = if config_json.is_null() {
StreamOpenConfig::default()
} else {
let Some(s) = (unsafe { c_str_to_string(config_json) }) else {
return NetError::InvalidUtf8.into();
};
match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
}
};
let reliability = match cfg_json.reliability.as_deref() {
None | Some("fire_and_forget") => Reliability::FireAndForget,
Some("reliable") => Reliability::Reliable,
Some(_) => return NET_ERR_MESH_TRANSPORT,
};
let window = cfg_json.window_bytes.unwrap_or(DEFAULT_STREAM_WINDOW_BYTES);
let weight = cfg_json.fairness_weight.unwrap_or(1);
let cfg = StreamConfig::new()
.with_reliability(reliability)
.with_window_bytes(window)
.with_fairness_weight(weight);
match h.inner.open_stream(peer_node_id, stream_id, cfg) {
Ok(stream) => {
let node_clone: Arc<MeshNode> = Arc::clone(&h.inner);
let sh = Box::new(MeshStreamHandle {
stream: ManuallyDrop::new(stream),
_node: ManuallyDrop::new(node_clone),
guard: HandleGuard::new(),
});
unsafe {
*out_stream = Box::into_raw(sh);
}
0
}
Err(e) => adapter_err_to_code(&e),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_stream_free(handle: *mut MeshStreamHandle) {
if handle.is_null() {
return;
}
let h: &MeshStreamHandle = unsafe { &*handle };
if h.guard.begin_free(FFI_HANDLE_FREE_DEADLINE) {
unsafe {
let _stream = ManuallyDrop::take(&mut (*handle).stream);
let node = ManuallyDrop::take(&mut (*handle)._node);
drop(node);
}
} else {
tracing::warn!(
"net_mesh_stream_free: in-flight ops did not drain within deadline; \
leaking inner to avoid use-after-free"
);
}
}
unsafe fn collect_payloads(
payloads: *const *const u8,
lens: *const usize,
count: usize,
) -> Option<Vec<Bytes>> {
let mut out = Vec::with_capacity(count);
for i in 0..count {
let ptr = *payloads.add(i);
let len = *lens.add(i);
if ptr.is_null() {
if len == 0 {
out.push(Bytes::new());
continue;
}
return None;
}
if len > isize::MAX as usize {
return None;
}
let slice = std::slice::from_raw_parts(ptr, len);
out.push(Bytes::copy_from_slice(slice));
}
Some(out)
}
#[inline]
fn handles_match(sh: &MeshStreamHandle, nh: &MeshNodeHandle) -> bool {
Arc::ptr_eq(&sh._node, &nh.inner)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_send(
handle: *mut MeshStreamHandle,
payloads: *const *const u8,
lens: *const usize,
count: usize,
node_handle: *mut MeshNodeHandle,
) -> c_int {
if handle.is_null() || node_handle.is_null() {
return NetError::NullPointer.into();
}
if count > 0 && (payloads.is_null() || lens.is_null()) {
return NetError::NullPointer.into();
}
let sh = unsafe { &*handle };
let nh = unsafe { &*node_handle };
let _sh_op = match sh.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let _nh_op = match nh.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
if !handles_match(sh, nh) {
return NetError::MismatchedHandles.into();
}
let payloads = match unsafe { collect_payloads(payloads, lens, count) } {
Some(v) => v,
None => return NetError::NullPointer.into(),
};
let node = nh.inner.clone();
let stream = sh.stream.clone();
match block_on(async move { node.send_on_stream(&stream, &payloads).await }) {
Ok(()) => 0,
Err(e) => stream_err_to_code(&e),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_send_with_retry(
handle: *mut MeshStreamHandle,
payloads: *const *const u8,
lens: *const usize,
count: usize,
max_retries: u32,
node_handle: *mut MeshNodeHandle,
) -> c_int {
if handle.is_null() || node_handle.is_null() {
return NetError::NullPointer.into();
}
if count > 0 && (payloads.is_null() || lens.is_null()) {
return NetError::NullPointer.into();
}
let sh = unsafe { &*handle };
let nh = unsafe { &*node_handle };
let _sh_op = match sh.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let _nh_op = match nh.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
if !handles_match(sh, nh) {
return NetError::MismatchedHandles.into();
}
let payloads = match unsafe { collect_payloads(payloads, lens, count) } {
Some(v) => v,
None => return NetError::NullPointer.into(),
};
let node = nh.inner.clone();
let stream = sh.stream.clone();
match block_on(async move {
node.send_with_retry(&stream, &payloads, max_retries as usize)
.await
}) {
Ok(()) => 0,
Err(e) => stream_err_to_code(&e),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_send_blocking(
handle: *mut MeshStreamHandle,
payloads: *const *const u8,
lens: *const usize,
count: usize,
node_handle: *mut MeshNodeHandle,
) -> c_int {
if handle.is_null() || node_handle.is_null() {
return NetError::NullPointer.into();
}
if count > 0 && (payloads.is_null() || lens.is_null()) {
return NetError::NullPointer.into();
}
let sh = unsafe { &*handle };
let nh = unsafe { &*node_handle };
let _sh_op = match sh.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let _nh_op = match nh.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
if !handles_match(sh, nh) {
return NetError::MismatchedHandles.into();
}
let payloads = match unsafe { collect_payloads(payloads, lens, count) } {
Some(v) => v,
None => return NetError::NullPointer.into(),
};
let node = nh.inner.clone();
let stream = sh.stream.clone();
match block_on(async move { node.send_blocking(&stream, &payloads).await }) {
Ok(()) => 0,
Err(e) => stream_err_to_code(&e),
}
}
#[derive(Serialize)]
struct StreamStatsJson {
tx_seq: u64,
rx_seq: u64,
inbound_pending: u64,
last_activity_ns: u64,
active: bool,
backpressure_events: u64,
tx_credit_remaining: u32,
tx_window: u32,
credit_grants_received: u64,
credit_grants_sent: u64,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_stream_stats(
node_handle: *mut MeshNodeHandle,
peer_node_id: u64,
stream_id: u64,
out_json: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if node_handle.is_null() || out_json.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*node_handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
match h.inner.stream_stats(peer_node_id, stream_id) {
Some(s) => {
let js = StreamStatsJson {
tx_seq: s.tx_seq,
rx_seq: s.rx_seq,
inbound_pending: s.inbound_pending,
last_activity_ns: s.last_activity_ns,
active: s.active,
backpressure_events: s.backpressure_events,
tx_credit_remaining: s.tx_credit_remaining,
tx_window: s.tx_window,
credit_grants_received: s.credit_grants_received,
credit_grants_sent: s.credit_grants_sent,
};
write_json_out(&js, out_json, out_len)
}
None => {
write_string_out("null".to_string(), out_json, out_len)
}
}
}
#[derive(Serialize)]
struct RecvEventJson {
id: String,
payload_b64: String,
insertion_ts: u64,
shard_id: u16,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_recv_shard(
handle: *mut MeshNodeHandle,
shard_id: u16,
limit: u32,
out_json: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if handle.is_null() || out_json.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let node = h.inner.clone();
let result = block_on(async move { node.poll_shard(shard_id, None, limit as usize).await });
let result = match result {
Ok(r) => r,
Err(e) => return adapter_err_to_code(&e),
};
let events: Vec<RecvEventJson> = result
.events
.into_iter()
.map(|e| RecvEventJson {
id: e.id,
payload_b64: encode_b64(&e.raw),
insertion_ts: e.insertion_ts,
shard_id: e.shard_id,
})
.collect();
write_json_out(&events, out_json, out_len)
}
fn encode_b64(bytes: &[u8]) -> String {
const ALPH: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut s = String::with_capacity(bytes.len().div_ceil(3) * 4);
let mut i = 0;
while i + 3 <= bytes.len() {
let chunk = &bytes[i..i + 3];
s.push(ALPH[(chunk[0] >> 2) as usize] as char);
s.push(ALPH[(((chunk[0] & 0b11) << 4) | (chunk[1] >> 4)) as usize] as char);
s.push(ALPH[(((chunk[1] & 0b1111) << 2) | (chunk[2] >> 6)) as usize] as char);
s.push(ALPH[(chunk[2] & 0b111111) as usize] as char);
i += 3;
}
let rem = bytes.len() - i;
if rem == 1 {
let b = bytes[i];
s.push(ALPH[(b >> 2) as usize] as char);
s.push(ALPH[((b & 0b11) << 4) as usize] as char);
s.push('=');
s.push('=');
} else if rem == 2 {
let b0 = bytes[i];
let b1 = bytes[i + 1];
s.push(ALPH[(b0 >> 2) as usize] as char);
s.push(ALPH[(((b0 & 0b11) << 4) | (b1 >> 4)) as usize] as char);
s.push(ALPH[((b1 & 0b1111) << 2) as usize] as char);
s.push('=');
}
s
}
#[derive(Deserialize)]
struct ChannelConfigInput {
name: String,
visibility: Option<String>,
reliable: Option<bool>,
require_token: Option<bool>,
priority: Option<u8>,
max_rate_pps: Option<u32>,
publish_caps: Option<CapabilityFilterJson>,
subscribe_caps: Option<CapabilityFilterJson>,
}
fn parse_visibility(s: &str) -> Option<InnerVisibility> {
match s {
"subnet-local" => Some(InnerVisibility::SubnetLocal),
"parent-visible" => Some(InnerVisibility::ParentVisible),
"exported" => Some(InnerVisibility::Exported),
"global" => Some(InnerVisibility::Global),
_ => None,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_register_channel(
handle: *mut MeshNodeHandle,
config_json: *const c_char,
) -> c_int {
if handle.is_null() || config_json.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(s) = (unsafe { c_str_to_string(config_json) }) else {
return NetError::InvalidUtf8.into();
};
let input: ChannelConfigInput = match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
};
let name = match InnerChannelName::new(&input.name) {
Ok(n) => n,
Err(_) => return NET_ERR_CHANNEL,
};
let mut cfg = InnerChannelConfig::new(ChannelId::new(name));
if let Some(v) = input.visibility {
let Some(vis) = parse_visibility(&v) else {
return NET_ERR_CHANNEL;
};
cfg = cfg.with_visibility(vis);
}
if let Some(r) = input.reliable {
cfg = cfg.with_reliable(r);
}
if let Some(t) = input.require_token {
cfg = cfg.with_require_token(t);
}
if let Some(p) = input.priority {
cfg = cfg.with_priority(p);
}
if let Some(pps) = input.max_rate_pps {
cfg = cfg.with_rate_limit(pps);
}
if let Some(filter_json) = input.publish_caps {
cfg = cfg.with_publish_caps(capability_filter_from_json(filter_json));
}
if let Some(filter_json) = input.subscribe_caps {
cfg = cfg.with_subscribe_caps(capability_filter_from_json(filter_json));
}
h.channel_configs.insert(cfg);
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_subscribe_channel(
handle: *mut MeshNodeHandle,
publisher_node_id: u64,
channel: *const c_char,
) -> c_int {
subscribe_or_unsubscribe(handle, publisher_node_id, channel, true)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_unsubscribe_channel(
handle: *mut MeshNodeHandle,
publisher_node_id: u64,
channel: *const c_char,
) -> c_int {
subscribe_or_unsubscribe(handle, publisher_node_id, channel, false)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_subscribe_channel_with_token(
handle: *mut MeshNodeHandle,
publisher_node_id: u64,
channel: *const c_char,
token: *const u8,
token_len: usize,
) -> c_int {
if handle.is_null() || channel.is_null() || token.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(s) = (unsafe { c_str_to_string(channel) }) else {
return NetError::InvalidUtf8.into();
};
let name = match InnerChannelName::new(&s) {
Ok(n) => n,
Err(_) => return NET_ERR_CHANNEL,
};
if token_len > isize::MAX as usize {
return NetError::InvalidJson.into();
}
let slice = unsafe { std::slice::from_raw_parts(token, token_len) };
let parsed = match PermissionToken::from_bytes(slice) {
Ok(t) => t,
Err(e) => return token_err_to_code(&e),
};
let node = h.inner.clone();
match block_on(async move {
node.subscribe_channel_with_token(publisher_node_id, name, parsed)
.await
}) {
Ok(()) => 0,
Err(e) => adapter_err_to_channel_code(&e),
}
}
fn subscribe_or_unsubscribe(
handle: *mut MeshNodeHandle,
publisher_node_id: u64,
channel: *const c_char,
subscribe: bool,
) -> c_int {
if handle.is_null() || channel.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(s) = (unsafe { c_str_to_string(channel) }) else {
return NetError::InvalidUtf8.into();
};
let name = match InnerChannelName::new(&s) {
Ok(n) => n,
Err(_) => return NET_ERR_CHANNEL,
};
let node = h.inner.clone();
let outcome = if subscribe {
block_on(async move { node.subscribe_channel(publisher_node_id, name).await })
} else {
block_on(async move { node.unsubscribe_channel(publisher_node_id, name).await })
};
match outcome {
Ok(()) => 0,
Err(e) => adapter_err_to_channel_code(&e),
}
}
fn adapter_err_to_channel_code(err: &AdapterError) -> c_int {
if let AdapterError::Connection(msg) = err {
let prefix = "membership request rejected: ";
if let Some(tail) = msg.strip_prefix(prefix) {
if tail.trim() == "Some(Unauthorized)" {
return NET_ERR_CHANNEL_AUTH;
}
}
}
NET_ERR_CHANNEL
}
#[derive(Deserialize, Default)]
struct PublishConfigInput {
reliability: Option<String>,
on_failure: Option<String>,
max_inflight: Option<u32>,
}
#[derive(Serialize)]
struct PublishReportJson {
attempted: u32,
delivered: u32,
errors: Vec<PublishFailureJson>,
}
#[derive(Serialize)]
struct PublishFailureJson {
node_id: u64,
message: String,
}
fn to_publish_report_json(r: InnerPublishReport) -> PublishReportJson {
PublishReportJson {
attempted: r.attempted as u32,
delivered: r.delivered as u32,
errors: r
.errors
.into_iter()
.map(|(id, e)| PublishFailureJson {
node_id: id,
message: format!("{}", e),
})
.collect(),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_publish(
handle: *mut MeshNodeHandle,
channel: *const c_char,
payload: *const u8,
len: usize,
config_json: *const c_char,
out_json: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if handle.is_null() || channel.is_null() || out_json.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(ch) = (unsafe { c_str_to_string(channel) }) else {
return NetError::InvalidUtf8.into();
};
let name = match InnerChannelName::new(&ch) {
Ok(n) => n,
Err(_) => return NET_ERR_CHANNEL,
};
let cfg_in: PublishConfigInput = if config_json.is_null() {
PublishConfigInput::default()
} else {
let Some(s) = (unsafe { c_str_to_string(config_json) }) else {
return NetError::InvalidUtf8.into();
};
match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
}
};
let reliability = match cfg_in.reliability.as_deref() {
None | Some("fire_and_forget") => Reliability::FireAndForget,
Some("reliable") => Reliability::Reliable,
Some(_) => return NET_ERR_CHANNEL,
};
let on_failure = match cfg_in.on_failure.as_deref() {
None | Some("best_effort") => InnerOnFailure::BestEffort,
Some("fail_fast") => InnerOnFailure::FailFast,
Some("collect") => InnerOnFailure::Collect,
Some(_) => return NET_ERR_CHANNEL,
};
let max_inflight = cfg_in.max_inflight.unwrap_or(32) as usize;
let publish_cfg = InnerPublishConfig {
reliability,
on_failure,
max_inflight,
};
let publisher = ChannelPublisher::new(name, publish_cfg);
let bytes = if len == 0 {
Bytes::new()
} else if payload.is_null() {
return NetError::NullPointer.into();
} else if len > isize::MAX as usize {
return NetError::InvalidJson.into();
} else {
Bytes::copy_from_slice(unsafe { std::slice::from_raw_parts(payload, len) })
};
let node = h.inner.clone();
match block_on(async move { node.publish(&publisher, bytes).await }) {
Ok(report) => {
let js = to_publish_report_json(report);
write_json_out(&js, out_json, out_len)
}
Err(e) => adapter_err_to_channel_code(&e),
}
}
pub struct IdentityHandle {
keypair: ManuallyDrop<Arc<EntityKeypair>>,
cache: ManuallyDrop<Arc<TokenCache>>,
guard: HandleGuard,
}
fn alloc_bytes(src: &[u8], out_ptr: *mut *mut u8, out_len: *mut usize) -> c_int {
if out_ptr.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let len = src.len();
if len == 0 {
unsafe {
*out_ptr = std::ptr::null_mut();
*out_len = 0;
}
return 0;
}
let layout = match std::alloc::Layout::array::<u8>(len) {
Ok(l) => l,
Err(_) => return NET_ERR_IDENTITY,
};
let ptr = unsafe { std::alloc::alloc(layout) };
if ptr.is_null() {
std::alloc::handle_alloc_error(layout);
}
unsafe {
std::ptr::copy_nonoverlapping(src.as_ptr(), ptr, len);
*out_ptr = ptr;
*out_len = len;
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_free_bytes(ptr: *mut u8, len: usize) {
if ptr.is_null() || len == 0 {
return;
}
let layout = match std::alloc::Layout::array::<u8>(len) {
Ok(l) => l,
Err(_) => return,
};
unsafe {
std::alloc::dealloc(ptr, layout);
}
}
fn entity_id_from_bytes(bytes: *const u8, len: usize) -> Option<EntityId> {
if bytes.is_null() || len != 32 {
return None;
}
let slice = unsafe { std::slice::from_raw_parts(bytes, 32) };
let mut arr = [0u8; 32];
arr.copy_from_slice(slice);
Some(EntityId::from_bytes(arr))
}
fn parse_scope_list(raw: &str) -> Option<TokenScope> {
let values: Vec<String> = serde_json::from_str(raw).ok()?;
let mut acc = TokenScope::NONE;
for s in &values {
acc = acc.union(match s.as_str() {
"publish" => TokenScope::PUBLISH,
"subscribe" => TokenScope::SUBSCRIBE,
"admin" => TokenScope::ADMIN,
"delegate" => TokenScope::DELEGATE,
_ => return None,
});
}
Some(acc)
}
fn scope_to_strings(scope: TokenScope) -> Vec<&'static str> {
let mut out = Vec::new();
if scope.contains(TokenScope::PUBLISH) {
out.push("publish");
}
if scope.contains(TokenScope::SUBSCRIBE) {
out.push("subscribe");
}
if scope.contains(TokenScope::ADMIN) {
out.push("admin");
}
if scope.contains(TokenScope::DELEGATE) {
out.push("delegate");
}
out
}
fn channel_name_to_hash(channel: &str) -> Option<ChannelHash> {
InnerChannelName::new(channel).ok().map(|n| n.hash())
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_identity_generate(out_handle: *mut *mut IdentityHandle) -> c_int {
if out_handle.is_null() {
return NetError::NullPointer.into();
}
let handle = Box::new(IdentityHandle {
keypair: ManuallyDrop::new(Arc::new(EntityKeypair::generate())),
cache: ManuallyDrop::new(Arc::new(TokenCache::new())),
guard: HandleGuard::new(),
});
unsafe {
*out_handle = Box::into_raw(handle);
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_identity_from_seed(
seed: *const u8,
seed_len: usize,
out_handle: *mut *mut IdentityHandle,
) -> c_int {
if seed.is_null() || out_handle.is_null() {
return NetError::NullPointer.into();
}
if seed_len != 32 {
return NET_ERR_IDENTITY;
}
let mut arr = [0u8; 32];
arr.copy_from_slice(unsafe { std::slice::from_raw_parts(seed, 32) });
let handle = Box::new(IdentityHandle {
keypair: ManuallyDrop::new(Arc::new(EntityKeypair::from_bytes(arr))),
cache: ManuallyDrop::new(Arc::new(TokenCache::new())),
guard: HandleGuard::new(),
});
unsafe {
*out_handle = Box::into_raw(handle);
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_identity_free(handle: *mut IdentityHandle) {
if handle.is_null() {
return;
}
let h: &IdentityHandle = unsafe { &*handle };
if h.guard.begin_free(FFI_HANDLE_FREE_DEADLINE) {
unsafe {
let mh = &mut *handle;
let kp = ManuallyDrop::take(&mut mh.keypair);
let cache = ManuallyDrop::take(&mut mh.cache);
drop(kp);
drop(cache);
}
} else {
tracing::warn!(
"net_identity_free: in-flight ops did not drain within deadline; \
leaking inner to avoid use-after-free"
);
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_identity_to_seed(handle: *mut IdentityHandle, out: *mut u8) -> c_int {
if handle.is_null() || out.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let seed = h.keypair.secret_bytes();
unsafe {
std::ptr::copy_nonoverlapping(seed.as_ptr(), out, 32);
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_identity_entity_id(
handle: *mut IdentityHandle,
out: *mut u8,
) -> c_int {
if handle.is_null() || out.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let id = h.keypair.entity_id().as_bytes();
unsafe {
std::ptr::copy_nonoverlapping(id.as_ptr(), out, 32);
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_identity_node_id(handle: *mut IdentityHandle) -> u64 {
if handle.is_null() {
return 0;
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return 0,
};
h.keypair.node_id()
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_identity_origin_hash(handle: *mut IdentityHandle) -> u64 {
if handle.is_null() {
return 0;
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return 0,
};
h.keypair.origin_hash()
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_identity_sign(
handle: *mut IdentityHandle,
msg: *const u8,
len: usize,
out_sig: *mut u8,
) -> c_int {
if handle.is_null() || out_sig.is_null() {
return NetError::NullPointer.into();
}
if len > 0 && msg.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let slice = if len == 0 {
&[][..]
} else if len > isize::MAX as usize {
return NetError::InvalidJson.into();
} else {
unsafe { std::slice::from_raw_parts(msg, len) }
};
let sig = h.keypair.sign(slice).to_bytes();
unsafe {
std::ptr::copy_nonoverlapping(sig.as_ptr(), out_sig, 64);
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_identity_issue_token(
signer: *mut IdentityHandle,
subject: *const u8,
subject_len: usize,
scope_json: *const c_char,
channel: *const c_char,
ttl_seconds: u32,
delegation_depth: u8,
out_token: *mut *mut u8,
out_token_len: *mut usize,
) -> c_int {
if signer.is_null() || out_token.is_null() || out_token_len.is_null() {
return NetError::NullPointer.into();
}
let Some(subject_id) = entity_id_from_bytes(subject, subject_len) else {
return NET_ERR_IDENTITY;
};
let Some(scope_s) = (unsafe { c_str_to_string(scope_json) }) else {
return NetError::InvalidUtf8.into();
};
let Some(scope) = parse_scope_list(&scope_s) else {
return NET_ERR_IDENTITY;
};
let Some(channel_s) = (unsafe { c_str_to_string(channel) }) else {
return NetError::InvalidUtf8.into();
};
let Some(channel_hash) = channel_name_to_hash(&channel_s) else {
return NET_ERR_IDENTITY;
};
let h = unsafe { &*signer };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let token = match PermissionToken::try_issue(
&h.keypair,
subject_id,
scope,
channel_hash,
u64::from(ttl_seconds),
delegation_depth,
) {
Ok(t) => t,
Err(e) => return token_err_to_code(&e),
};
alloc_bytes(&token.to_bytes(), out_token, out_token_len)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_identity_install_token(
handle: *mut IdentityHandle,
token: *const u8,
len: usize,
) -> c_int {
if handle.is_null() || token.is_null() {
return NetError::NullPointer.into();
}
if len > isize::MAX as usize {
return NetError::InvalidJson.into();
}
let slice = unsafe { std::slice::from_raw_parts(token, len) };
let parsed = match PermissionToken::from_bytes(slice) {
Ok(t) => t,
Err(e) => return token_err_to_code(&e),
};
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
match h.cache.insert(parsed) {
Ok(()) => 0,
Err(e) => token_err_to_code(&e),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_identity_lookup_token(
handle: *mut IdentityHandle,
subject: *const u8,
subject_len: usize,
channel: *const c_char,
out_token: *mut *mut u8,
out_token_len: *mut usize,
) -> c_int {
if handle.is_null() || out_token.is_null() || out_token_len.is_null() {
return NetError::NullPointer.into();
}
let Some(subject_id) = entity_id_from_bytes(subject, subject_len) else {
return NET_ERR_IDENTITY;
};
let Some(channel_s) = (unsafe { c_str_to_string(channel) }) else {
return NetError::InvalidUtf8.into();
};
let Some(channel_hash) = channel_name_to_hash(&channel_s) else {
return NET_ERR_IDENTITY;
};
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
match h.cache.get(&subject_id, channel_hash) {
Some(token) => alloc_bytes(&token.to_bytes(), out_token, out_token_len),
None => {
unsafe {
*out_token = std::ptr::null_mut();
*out_token_len = 0;
}
0
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_identity_token_cache_len(handle: *mut IdentityHandle) -> u32 {
if handle.is_null() {
return 0;
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return 0,
};
h.cache.len() as u32
}
#[derive(Serialize)]
struct ParsedTokenJson {
issuer_hex: String,
subject_hex: String,
scope: Vec<&'static str>,
channel_hash: ChannelHash,
not_before: u64,
not_after: u64,
delegation_depth: u8,
nonce: u64,
signature_hex: String,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_parse_token(
token: *const u8,
len: usize,
out_json: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if token.is_null() || out_json.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
if len > isize::MAX as usize {
return NetError::InvalidJson.into();
}
let slice = unsafe { std::slice::from_raw_parts(token, len) };
let parsed = match PermissionToken::from_bytes(slice) {
Ok(t) => t,
Err(e) => return token_err_to_code(&e),
};
let out = ParsedTokenJson {
issuer_hex: hex::encode(parsed.issuer.as_bytes()),
subject_hex: hex::encode(parsed.subject.as_bytes()),
scope: scope_to_strings(parsed.scope),
channel_hash: parsed.channel_hash,
not_before: parsed.not_before,
not_after: parsed.not_after,
delegation_depth: parsed.delegation_depth,
nonce: parsed.nonce,
signature_hex: hex::encode(parsed.signature),
};
write_json_out(&out, out_json, out_len)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_verify_token(
token: *const u8,
len: usize,
out_ok: *mut c_int,
) -> c_int {
if token.is_null() || out_ok.is_null() {
return NetError::NullPointer.into();
}
if len > isize::MAX as usize {
return NetError::InvalidJson.into();
}
let slice = unsafe { std::slice::from_raw_parts(token, len) };
let parsed = match PermissionToken::from_bytes(slice) {
Ok(t) => t,
Err(e) => return token_err_to_code(&e),
};
unsafe {
*out_ok = if parsed.verify().is_ok() { 1 } else { 0 };
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_token_is_expired(
token: *const u8,
len: usize,
out_expired: *mut c_int,
) -> c_int {
if token.is_null() || out_expired.is_null() {
return NetError::NullPointer.into();
}
if len > isize::MAX as usize {
return NetError::InvalidJson.into();
}
let slice = unsafe { std::slice::from_raw_parts(token, len) };
let parsed = match PermissionToken::from_bytes(slice) {
Ok(t) => t,
Err(e) => return token_err_to_code(&e),
};
unsafe {
*out_expired = if parsed.is_expired() { 1 } else { 0 };
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_delegate_token(
signer: *mut IdentityHandle,
parent: *const u8,
parent_len: usize,
new_subject: *const u8,
new_subject_len: usize,
restricted_scope_json: *const c_char,
out_token: *mut *mut u8,
out_token_len: *mut usize,
) -> c_int {
if signer.is_null()
|| parent.is_null()
|| new_subject.is_null()
|| restricted_scope_json.is_null()
|| out_token.is_null()
|| out_token_len.is_null()
{
return NetError::NullPointer.into();
}
if parent_len > isize::MAX as usize {
return NetError::InvalidJson.into();
}
let parent_slice = unsafe { std::slice::from_raw_parts(parent, parent_len) };
let parent_tok = match PermissionToken::from_bytes(parent_slice) {
Ok(t) => t,
Err(e) => return token_err_to_code(&e),
};
let Some(subject_id) = entity_id_from_bytes(new_subject, new_subject_len) else {
return NET_ERR_IDENTITY;
};
let Some(scope_s) = (unsafe { c_str_to_string(restricted_scope_json) }) else {
return NetError::InvalidUtf8.into();
};
let Some(scope) = parse_scope_list(&scope_s) else {
return NET_ERR_IDENTITY;
};
let h = unsafe { &*signer };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
match parent_tok.delegate(&h.keypair, subject_id, scope) {
Ok(child) => alloc_bytes(&child.to_bytes(), out_token, out_token_len),
Err(e) => token_err_to_code(&e),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_channel_hash(channel: *const c_char, out_hash: *mut u64) -> c_int {
if channel.is_null() || out_hash.is_null() {
return NetError::NullPointer.into();
}
let Some(s) = (unsafe { c_str_to_string(channel) }) else {
return NetError::InvalidUtf8.into();
};
let Some(hash) = channel_name_to_hash(&s) else {
return NET_ERR_IDENTITY;
};
unsafe {
*out_hash = hash;
}
0
}
use crate::adapter::net::behavior::capability::{
AcceleratorInfo, AcceleratorType, CapabilityFilter, CapabilitySet, GpuInfo, GpuVendor,
HardwareCapabilities, Modality, ModelCapability, ResourceLimits, SoftwareCapabilities,
ToolCapability, TAG_SCOPE_REGION_PREFIX, TAG_SCOPE_SUBNET_LOCAL, TAG_SCOPE_TENANT_PREFIX,
};
fn parse_gpu_vendor_cap(s: &str) -> GpuVendor {
match s.to_ascii_lowercase().as_str() {
"nvidia" => GpuVendor::Nvidia,
"amd" => GpuVendor::Amd,
"intel" => GpuVendor::Intel,
"apple" => GpuVendor::Apple,
"qualcomm" => GpuVendor::Qualcomm,
_ => GpuVendor::Unknown,
}
}
fn gpu_vendor_to_string_cap(v: GpuVendor) -> &'static str {
match v {
GpuVendor::Nvidia => "nvidia",
GpuVendor::Amd => "amd",
GpuVendor::Intel => "intel",
GpuVendor::Apple => "apple",
GpuVendor::Qualcomm => "qualcomm",
GpuVendor::Unknown => "unknown",
}
}
fn parse_modality_cap(s: &str) -> Option<Modality> {
match s.to_ascii_lowercase().as_str() {
"text" => Some(Modality::Text),
"image" => Some(Modality::Image),
"audio" => Some(Modality::Audio),
"video" => Some(Modality::Video),
"code" => Some(Modality::Code),
"embedding" => Some(Modality::Embedding),
"tool-use" | "tool_use" | "tooluse" => Some(Modality::ToolUse),
_ => None,
}
}
fn parse_accelerator_type_cap(s: &str) -> AcceleratorType {
match s.to_ascii_lowercase().as_str() {
"tpu" => AcceleratorType::Tpu,
"npu" => AcceleratorType::Npu,
"fpga" => AcceleratorType::Fpga,
"asic" => AcceleratorType::Asic,
"dsp" => AcceleratorType::Dsp,
_ => AcceleratorType::Unknown,
}
}
#[derive(Deserialize, Default)]
struct CapabilitySetJson {
#[serde(default)]
hardware: Option<HardwareJson>,
#[serde(default)]
software: Option<SoftwareJson>,
#[serde(default)]
models: Vec<ModelJson>,
#[serde(default)]
tools: Vec<ToolJson>,
#[serde(default)]
tags: Vec<String>,
#[serde(default)]
limits: Option<LimitsJson>,
}
#[derive(Deserialize, Default)]
struct HardwareJson {
cpu_cores: Option<u32>,
cpu_threads: Option<u32>,
memory_gb: Option<u32>,
gpu: Option<GpuJson>,
#[serde(default)]
additional_gpus: Vec<GpuJson>,
storage_gb: Option<u64>,
network_gbps: Option<u32>,
#[serde(default)]
accelerators: Vec<AcceleratorJson>,
}
#[derive(Deserialize)]
struct GpuJson {
vendor: Option<String>,
#[serde(default)]
model: String,
#[serde(default)]
vram_gb: u32,
compute_units: Option<u32>,
tensor_cores: Option<u32>,
fp16_tflops_x10: Option<u32>,
}
#[derive(Deserialize)]
struct AcceleratorJson {
#[serde(default)]
kind: String,
#[serde(default)]
model: String,
memory_gb: Option<u32>,
tops_x10: Option<u32>,
}
#[derive(Deserialize, Default)]
struct SoftwareJson {
os: Option<String>,
os_version: Option<String>,
#[serde(default)]
runtimes: Vec<Vec<String>>,
#[serde(default)]
frameworks: Vec<Vec<String>>,
cuda_version: Option<String>,
#[serde(default)]
drivers: Vec<Vec<String>>,
}
#[derive(Deserialize)]
struct ModelJson {
#[serde(default)]
model_id: String,
#[serde(default)]
family: String,
parameters_b_x10: Option<u32>,
context_length: Option<u32>,
quantization: Option<String>,
#[serde(default)]
modalities: Vec<String>,
tokens_per_sec: Option<u32>,
loaded: Option<bool>,
}
#[derive(Deserialize)]
struct ToolJson {
#[serde(default)]
tool_id: String,
#[serde(default)]
name: String,
version: Option<String>,
input_schema: Option<String>,
output_schema: Option<String>,
#[serde(default)]
requires: Vec<String>,
estimated_time_ms: Option<u32>,
stateless: Option<bool>,
}
#[derive(Deserialize, Default)]
struct LimitsJson {
max_concurrent_requests: Option<u32>,
max_tokens_per_request: Option<u32>,
rate_limit_rpm: Option<u32>,
max_batch_size: Option<u32>,
max_input_bytes: Option<u32>,
max_output_bytes: Option<u32>,
}
#[derive(Deserialize, Default)]
struct CapabilityFilterJson {
#[serde(default)]
require_tags: Vec<String>,
#[serde(default)]
require_models: Vec<String>,
#[serde(default)]
require_tools: Vec<String>,
min_memory_gb: Option<u32>,
require_gpu: Option<bool>,
gpu_vendor: Option<String>,
min_vram_gb: Option<u32>,
min_context_length: Option<u32>,
#[serde(default)]
require_modalities: Vec<String>,
}
fn pair_vec(xs: Vec<Vec<String>>) -> Vec<(String, String)> {
xs.into_iter()
.filter_map(|mut p| {
if p.len() >= 2 {
Some((std::mem::take(&mut p[0]), std::mem::take(&mut p[1])))
} else {
None
}
})
.collect()
}
#[inline]
fn saturating_u16_cap(v: u32) -> u16 {
v.min(u16::MAX as u32) as u16
}
fn gpu_info_from_json(g: GpuJson) -> GpuInfo {
let vendor = g
.vendor
.as_deref()
.map(parse_gpu_vendor_cap)
.unwrap_or(GpuVendor::Unknown);
let mut info = GpuInfo::new(vendor, g.model, g.vram_gb);
if let Some(cu) = g.compute_units {
info = info.with_compute_units(saturating_u16_cap(cu));
}
if let Some(tc) = g.tensor_cores {
info = info.with_tensor_cores(saturating_u16_cap(tc));
}
if let Some(tf) = g.fp16_tflops_x10 {
let tf_capped = saturating_u16_cap(tf);
info = info.with_fp16_tflops(tf_capped as f32 / 10.0);
}
info
}
fn accelerator_from_json(a: AcceleratorJson) -> AcceleratorInfo {
AcceleratorInfo {
accel_type: parse_accelerator_type_cap(&a.kind),
model: a.model,
memory_gb: a.memory_gb.unwrap_or(0),
tops_x10: a.tops_x10.map(saturating_u16_cap).unwrap_or(0),
}
}
fn hardware_from_json(h: HardwareJson) -> HardwareCapabilities {
let mut hw = HardwareCapabilities::new();
match (h.cpu_cores, h.cpu_threads) {
(Some(c), Some(t)) => hw = hw.with_cpu(saturating_u16_cap(c), saturating_u16_cap(t)),
(Some(c), None) => {
let c16 = saturating_u16_cap(c);
hw = hw.with_cpu(c16, c16);
}
_ => {}
}
if let Some(mb) = h.memory_gb {
hw = hw.with_memory(mb);
}
if let Some(g) = h.gpu {
hw = hw.with_gpu(gpu_info_from_json(g));
}
for g in h.additional_gpus {
hw = hw.add_gpu(gpu_info_from_json(g));
}
if let Some(mb) = h.storage_gb {
hw = hw.with_storage(mb);
}
if let Some(gbps) = h.network_gbps {
hw = hw.with_network(gbps);
}
for a in h.accelerators {
hw = hw.add_accelerator(accelerator_from_json(a));
}
hw
}
fn software_from_json(s: SoftwareJson) -> SoftwareCapabilities {
let mut sw = SoftwareCapabilities::new()
.with_os(s.os.unwrap_or_default(), s.os_version.unwrap_or_default());
for (k, v) in pair_vec(s.runtimes) {
sw = sw.add_runtime(k, v);
}
for (k, v) in pair_vec(s.frameworks) {
sw = sw.add_framework(k, v);
}
if let Some(c) = s.cuda_version {
sw = sw.with_cuda(c);
}
sw.drivers = pair_vec(s.drivers);
sw
}
fn model_from_json(m: ModelJson) -> ModelCapability {
let mut mc = ModelCapability::new(m.model_id, m.family);
if let Some(p) = m.parameters_b_x10 {
mc.parameters_b_x10 = p;
}
if let Some(c) = m.context_length {
mc = mc.with_context_length(c);
}
if let Some(q) = m.quantization {
mc = mc.with_quantization(q);
}
for modality in m.modalities {
match parse_modality_cap(&modality) {
Some(parsed) => mc = mc.add_modality(parsed),
None => {
tracing::warn!(
modality = %modality,
"announce_capabilities: unknown modality string (typo?), \
skipping rather than the pre-fix silent fallback to Text — \
advertising a Text capability the node doesn't actually \
have produced wrong scheduling decisions on the receiver",
);
}
}
}
if let Some(t) = m.tokens_per_sec {
mc = mc.with_tokens_per_sec(t);
}
if let Some(l) = m.loaded {
mc = mc.with_loaded(l);
}
mc
}
fn tool_from_json(t: ToolJson) -> ToolCapability {
let mut tc = ToolCapability::new(t.tool_id, t.name);
if let Some(v) = t.version {
tc = tc.with_version(v);
}
if let Some(s) = t.input_schema {
tc = tc.with_input_schema(s);
}
if let Some(s) = t.output_schema {
tc = tc.with_output_schema(s);
}
for r in t.requires {
tc = tc.requires(r);
}
if let Some(ms) = t.estimated_time_ms {
tc = tc.with_estimated_time(ms);
}
if let Some(st) = t.stateless {
tc = tc.with_stateless(st);
}
tc
}
fn limits_from_json(l: LimitsJson) -> ResourceLimits {
let mut rl = ResourceLimits::new();
if let Some(n) = l.max_concurrent_requests {
rl = rl.with_max_concurrent(n);
}
if let Some(n) = l.max_tokens_per_request {
rl = rl.with_max_tokens(n);
}
if let Some(n) = l.rate_limit_rpm {
rl = rl.with_rate_limit(n);
}
if let Some(n) = l.max_batch_size {
rl = rl.with_max_batch(n);
}
if let Some(n) = l.max_input_bytes {
rl.max_input_bytes = n;
}
if let Some(n) = l.max_output_bytes {
rl.max_output_bytes = n;
}
rl
}
fn capability_set_from_json(caps: CapabilitySetJson) -> CapabilitySet {
let mut cs = CapabilitySet::new();
if let Some(h) = caps.hardware {
cs = cs.with_hardware(hardware_from_json(h));
}
if let Some(s) = caps.software {
cs = cs.with_software(software_from_json(s));
}
for m in caps.models {
cs = cs.add_model(model_from_json(m));
}
for t in caps.tools {
cs = cs.add_tool(tool_from_json(t));
}
for tag in caps.tags {
if tag == TAG_SCOPE_SUBNET_LOCAL {
cs = cs.with_subnet_local_scope();
} else if let Some(id) = tag.strip_prefix(TAG_SCOPE_TENANT_PREFIX) {
cs = cs.with_tenant_scope(id);
} else if let Some(name) = tag.strip_prefix(TAG_SCOPE_REGION_PREFIX) {
cs = cs.with_region_scope(name);
} else {
cs = cs.add_tag(tag);
}
}
if let Some(l) = caps.limits {
cs = cs.with_limits(limits_from_json(l));
}
cs
}
fn capability_filter_from_json(f: CapabilityFilterJson) -> CapabilityFilter {
let mut cf = CapabilityFilter::new();
for t in f.require_tags {
cf = cf.require_tag(t);
}
for m in f.require_models {
cf = cf.require_model(m);
}
for t in f.require_tools {
cf = cf.require_tool(t);
}
if let Some(mb) = f.min_memory_gb {
cf = cf.with_min_memory(mb);
}
if f.require_gpu.unwrap_or(false) {
cf = cf.require_gpu();
}
if let Some(v) = f.gpu_vendor {
cf = cf.with_gpu_vendor(parse_gpu_vendor_cap(&v));
}
if let Some(mb) = f.min_vram_gb {
cf = cf.with_min_vram(mb);
}
if let Some(n) = f.min_context_length {
cf = cf.with_min_context(n);
}
for m in f.require_modalities {
match parse_modality_cap(&m) {
Some(parsed) => cf = cf.require_modality(parsed),
None => {
tracing::warn!(
modality = %m,
"find_nodes: unknown modality string in require_modalities \
filter (typo?), dropping the constraint; the resulting \
filter is too permissive — pre-fix it was silently \
re-interpreted as `require Text`, which returned the \
wrong nodes",
);
}
}
}
cf
}
pub(crate) const NET_ERR_CAPABILITY: c_int = -128;
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_announce_capabilities(
handle: *mut MeshNodeHandle,
caps_json: *const c_char,
) -> c_int {
if handle.is_null() || caps_json.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(s) = (unsafe { c_str_to_string(caps_json) }) else {
return NetError::InvalidUtf8.into();
};
let parsed: CapabilitySetJson = match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
};
let caps = capability_set_from_json(parsed);
let node = h.inner.clone();
match block_on(async move { node.announce_capabilities(caps).await }) {
Ok(()) => 0,
Err(_) => NET_ERR_CAPABILITY,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_find_nodes(
handle: *mut MeshNodeHandle,
filter_json: *const c_char,
out_json: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if handle.is_null() || filter_json.is_null() || out_json.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(s) = (unsafe { c_str_to_string(filter_json) }) else {
return NetError::InvalidUtf8.into();
};
let parsed: CapabilityFilterJson = match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
};
let filter = capability_filter_from_json(parsed);
let ids = h.inner.find_nodes_by_filter(&filter);
write_json_out(&ids, out_json, out_len)
}
#[derive(serde::Deserialize)]
struct ScopeFilterJson {
kind: String,
#[serde(default)]
tenant: Option<String>,
#[serde(default)]
tenants: Option<Vec<String>>,
#[serde(default)]
region: Option<String>,
#[serde(default)]
regions: Option<Vec<String>>,
}
enum ScopeFilterOwned {
Any,
GlobalOnly,
SameSubnet,
Tenant(String),
Tenants(Vec<String>),
Region(String),
Regions(Vec<String>),
}
fn scope_filter_from_json(f: ScopeFilterJson) -> ScopeFilterOwned {
match f.kind.as_str() {
"any" => ScopeFilterOwned::Any,
"global_only" | "globalOnly" => ScopeFilterOwned::GlobalOnly,
"same_subnet" | "sameSubnet" => ScopeFilterOwned::SameSubnet,
"tenant" => match f.tenant {
Some(t) if !t.is_empty() => ScopeFilterOwned::Tenant(t),
_ => ScopeFilterOwned::Any,
},
"tenants" => match f.tenants {
Some(ts) => {
let cleaned: Vec<String> = ts.into_iter().filter(|t| !t.is_empty()).collect();
if cleaned.is_empty() {
ScopeFilterOwned::Any
} else {
ScopeFilterOwned::Tenants(cleaned)
}
}
None => ScopeFilterOwned::Any,
},
"region" => match f.region {
Some(r) if !r.is_empty() => ScopeFilterOwned::Region(r),
_ => ScopeFilterOwned::Any,
},
"regions" => match f.regions {
Some(rs) => {
let cleaned: Vec<String> = rs.into_iter().filter(|r| !r.is_empty()).collect();
if cleaned.is_empty() {
ScopeFilterOwned::Any
} else {
ScopeFilterOwned::Regions(cleaned)
}
}
None => ScopeFilterOwned::Any,
},
_ => ScopeFilterOwned::Any,
}
}
fn with_scope_filter<R>(
owned: &ScopeFilterOwned,
f: impl FnOnce(&crate::adapter::net::behavior::capability::ScopeFilter<'_>) -> R,
) -> R {
use crate::adapter::net::behavior::capability::ScopeFilter as F;
match owned {
ScopeFilterOwned::Any => f(&F::Any),
ScopeFilterOwned::GlobalOnly => f(&F::GlobalOnly),
ScopeFilterOwned::SameSubnet => f(&F::SameSubnet),
ScopeFilterOwned::Tenant(t) => f(&F::Tenant(t.as_str())),
ScopeFilterOwned::Tenants(ts) => {
let refs: Vec<&str> = ts.iter().map(|s| s.as_str()).collect();
f(&F::Tenants(refs.as_slice()))
}
ScopeFilterOwned::Region(r) => f(&F::Region(r.as_str())),
ScopeFilterOwned::Regions(rs) => {
let refs: Vec<&str> = rs.iter().map(|s| s.as_str()).collect();
f(&F::Regions(refs.as_slice()))
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_find_nodes_scoped(
handle: *mut MeshNodeHandle,
filter_json: *const c_char,
scope_json: *const c_char,
out_json: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if handle.is_null()
|| filter_json.is_null()
|| scope_json.is_null()
|| out_json.is_null()
|| out_len.is_null()
{
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(filter_s) = (unsafe { c_str_to_string(filter_json) }) else {
return NetError::InvalidUtf8.into();
};
let Some(scope_s) = (unsafe { c_str_to_string(scope_json) }) else {
return NetError::InvalidUtf8.into();
};
let parsed_filter: CapabilityFilterJson = match serde_json::from_str(&filter_s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
};
let parsed_scope: ScopeFilterJson = match serde_json::from_str(&scope_s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
};
let filter = capability_filter_from_json(parsed_filter);
let owned = scope_filter_from_json(parsed_scope);
let ids = with_scope_filter(&owned, |sf| {
h.inner.find_nodes_by_filter_scoped(&filter, sf)
});
write_json_out(&ids, out_json, out_len)
}
#[derive(serde::Deserialize)]
struct CapabilityRequirementJson {
#[serde(default)]
filter: CapabilityFilterJson,
#[serde(default)]
prefer_more_memory: f32,
#[serde(default)]
prefer_more_vram: f32,
#[serde(default)]
prefer_faster_inference: f32,
#[serde(default)]
prefer_loaded_models: f32,
}
fn capability_requirement_from_json(
j: CapabilityRequirementJson,
) -> crate::adapter::net::behavior::capability::CapabilityRequirement {
crate::adapter::net::behavior::capability::CapabilityRequirement::from_filter(
capability_filter_from_json(j.filter),
)
.prefer_memory(j.prefer_more_memory)
.prefer_vram(j.prefer_more_vram)
.prefer_speed(j.prefer_faster_inference)
.prefer_loaded(j.prefer_loaded_models)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_find_best_node(
handle: *mut MeshNodeHandle,
requirement_json: *const c_char,
out_node_id: *mut u64,
out_has_match: *mut c_int,
) -> c_int {
if handle.is_null()
|| requirement_json.is_null()
|| out_node_id.is_null()
|| out_has_match.is_null()
{
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(s) = (unsafe { c_str_to_string(requirement_json) }) else {
return NetError::InvalidUtf8.into();
};
let parsed: CapabilityRequirementJson = match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
};
let req = capability_requirement_from_json(parsed);
match h.inner.find_best_node(&req) {
Some(node_id) => unsafe {
*out_node_id = node_id;
*out_has_match = 1;
},
None => unsafe {
*out_has_match = 0;
},
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_mesh_find_best_node_scoped(
handle: *mut MeshNodeHandle,
requirement_json: *const c_char,
scope_json: *const c_char,
out_node_id: *mut u64,
out_has_match: *mut c_int,
) -> c_int {
if handle.is_null()
|| requirement_json.is_null()
|| scope_json.is_null()
|| out_node_id.is_null()
|| out_has_match.is_null()
{
return NetError::NullPointer.into();
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(req_s) = (unsafe { c_str_to_string(requirement_json) }) else {
return NetError::InvalidUtf8.into();
};
let Some(scope_s) = (unsafe { c_str_to_string(scope_json) }) else {
return NetError::InvalidUtf8.into();
};
let parsed_req: CapabilityRequirementJson = match serde_json::from_str(&req_s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
};
let parsed_scope: ScopeFilterJson = match serde_json::from_str(&scope_s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
};
let req = capability_requirement_from_json(parsed_req);
let owned = scope_filter_from_json(parsed_scope);
let result = with_scope_filter(&owned, |sf| h.inner.find_best_node_scoped(&req, sf));
match result {
Some(node_id) => unsafe {
*out_node_id = node_id;
*out_has_match = 1;
},
None => unsafe {
*out_has_match = 0;
},
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_normalize_gpu_vendor(
raw: *const c_char,
out_json: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if raw.is_null() || out_json.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let Some(s) = (unsafe { c_str_to_string(raw) }) else {
return NetError::InvalidUtf8.into();
};
let canonical = gpu_vendor_to_string_cap(parse_gpu_vendor_cap(&s));
write_string_out(canonical.to_string(), out_json, out_len)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn saturating_u16_cap_clamps_at_u16_max() {
assert_eq!(saturating_u16_cap(0), 0);
assert_eq!(saturating_u16_cap(42), 42);
assert_eq!(saturating_u16_cap(u16::MAX as u32), u16::MAX);
assert_eq!(saturating_u16_cap(u16::MAX as u32 + 1), u16::MAX);
assert_eq!(saturating_u16_cap(u32::MAX), u16::MAX);
}
#[test]
fn parse_modality_cap_returns_none_on_unknown_strings() {
for (s, expected) in [
("text", Modality::Text),
("Text", Modality::Text),
("TEXT", Modality::Text),
("image", Modality::Image),
("audio", Modality::Audio),
("video", Modality::Video),
("code", Modality::Code),
("embedding", Modality::Embedding),
("tool-use", Modality::ToolUse),
("tool_use", Modality::ToolUse),
("tooluse", Modality::ToolUse),
] {
assert_eq!(
parse_modality_cap(s),
Some(expected),
"known modality `{s}` must parse",
);
}
for s in ["audoi", "imageX", "vidoe", "embeding", "garbage", ""] {
assert_eq!(
parse_modality_cap(s),
None,
"unknown modality `{s}` must return None — pre-fix this \
fell back to Modality::Text, advertising a capability \
the node didn't actually have",
);
}
}
#[test]
fn gpu_info_from_json_saturates_fp16_tflops_to_u16_max() {
let g = GpuJson {
vendor: None,
model: "test".to_string(),
vram_gb: 0,
compute_units: None,
tensor_cores: None,
fp16_tflops_x10: Some(1_000_000_000u32),
};
let info = gpu_info_from_json(g);
assert_eq!(
info.fp16_tflops_x10,
u16::MAX as u32,
"fp16_tflops_x10 must saturate at u16::MAX (65535) instead of \
losing precision through the f32 round-trip; got {}",
info.fp16_tflops_x10,
);
let g_small = GpuJson {
vendor: None,
model: "test".to_string(),
vram_gb: 0,
compute_units: None,
tensor_cores: None,
fp16_tflops_x10: Some(425), };
let info_small = gpu_info_from_json(g_small);
assert_eq!(
info_small.fp16_tflops_x10, 425,
"small fp16_tflops_x10 must round-trip exactly"
);
}
#[test]
fn alloc_bytes_round_trip_across_sizes() {
for size in [0usize, 1, 15, 16, 17, 32, 64, 1024, 8192] {
let src: Vec<u8> = (0..size).map(|i| (i as u8).wrapping_mul(37)).collect();
let mut ptr: *mut u8 = std::ptr::null_mut();
let mut len: usize = 0;
let rc = alloc_bytes(&src, &mut ptr as *mut _, &mut len as *mut _);
assert_eq!(rc, 0);
assert_eq!(len, size);
if size == 0 {
assert!(ptr.is_null());
} else {
assert!(!ptr.is_null());
let observed = unsafe { std::slice::from_raw_parts(ptr, len) };
assert_eq!(observed, &src[..]);
}
unsafe { net_free_bytes(ptr, len) };
}
}
#[test]
fn net_free_bytes_null_and_zero_len_are_noops() {
unsafe { net_free_bytes(std::ptr::null_mut(), 0) };
unsafe { net_free_bytes(std::ptr::null_mut(), 42) };
let mut sentinel: u8 = 0;
unsafe { net_free_bytes(&mut sentinel as *mut u8, 0) };
}
#[test]
fn net_free_bytes_does_not_panic_on_oversized_len() {
let mut sentinel: u8 = 0;
let ptr = &mut sentinel as *mut u8;
unsafe { net_free_bytes(ptr, usize::MAX) };
assert_eq!(sentinel, 0, "sentinel must not have been written through");
}
#[test]
fn net_mesh_shutdown_runs_even_with_outstanding_arc_refs() {
let cfg = serde_json::json!({
"bind_addr": "127.0.0.1:0",
"psk_hex": "0".repeat(64),
});
let cfg_c = CString::new(cfg.to_string()).unwrap();
let mut out: *mut MeshNodeHandle = std::ptr::null_mut();
let rc = unsafe { net_mesh_new(cfg_c.as_ptr(), &mut out) };
assert_eq!(rc, 0, "net_mesh_new failed: {rc}");
assert!(!out.is_null());
let inner_clone = {
let h = unsafe { &*out };
Arc::clone(&h.inner)
};
assert!(Arc::strong_count(&inner_clone) >= 2);
assert!(!inner_clone.is_shutdown());
let rc = unsafe { net_mesh_shutdown(out) };
assert_eq!(rc, 0, "net_mesh_shutdown returned {rc}");
assert!(
inner_clone.is_shutdown(),
"shutdown flag must be set even when extra Arc refs are outstanding"
);
drop(inner_clone);
unsafe { net_mesh_free(out) };
}
#[test]
fn handles_match_rejects_stream_node_mismatch() {
fn make_node_handle() -> *mut MeshNodeHandle {
let cfg = serde_json::json!({
"bind_addr": "127.0.0.1:0",
"psk_hex": "0".repeat(64),
});
let cfg_c = CString::new(cfg.to_string()).unwrap();
let mut out: *mut MeshNodeHandle = std::ptr::null_mut();
let rc = unsafe { net_mesh_new(cfg_c.as_ptr(), &mut out) };
assert_eq!(rc, 0);
assert!(!out.is_null());
out
}
let nh_a = make_node_handle();
let nh_b = make_node_handle();
let sh_a = {
let h = unsafe { &*nh_a };
let node_clone: Arc<MeshNode> = Arc::clone(&h.inner);
MeshStreamHandle {
stream: ManuallyDrop::new(CoreStream {
peer_node_id: 0xDEAD,
stream_id: 1,
epoch: 0,
config: StreamConfig::new(),
}),
_node: ManuallyDrop::new(node_clone),
guard: HandleGuard::new(),
}
};
assert!(
handles_match(&sh_a, unsafe { &*nh_a }),
"stream from node_a + node_a handle must match"
);
assert!(
!handles_match(&sh_a, unsafe { &*nh_b }),
"stream from node_a + node_b handle must be rejected (#19)"
);
unsafe {
let mut sh_a = sh_a;
let _ = ManuallyDrop::take(&mut sh_a.stream);
let _ = ManuallyDrop::take(&mut sh_a._node);
}
unsafe { net_mesh_free(nh_a) };
unsafe { net_mesh_free(nh_b) };
}
#[test]
fn net_mesh_free_is_idempotent() {
let cfg = serde_json::json!({
"bind_addr": "127.0.0.1:0",
"psk_hex": "0".repeat(64),
});
let cfg_c = CString::new(cfg.to_string()).unwrap();
let mut nh: *mut MeshNodeHandle = std::ptr::null_mut();
assert_eq!(unsafe { net_mesh_new(cfg_c.as_ptr(), &mut nh) }, 0);
assert!(!nh.is_null());
unsafe { net_mesh_free(nh) };
unsafe { net_mesh_free(nh) };
}
#[test]
fn net_identity_free_is_idempotent() {
let mut h: *mut IdentityHandle = std::ptr::null_mut();
assert_eq!(unsafe { net_identity_generate(&mut h) }, 0);
assert!(!h.is_null());
unsafe { net_identity_free(h) };
unsafe { net_identity_free(h) };
}
#[test]
fn net_mesh_free_waits_for_inflight_op() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
let cfg = serde_json::json!({
"bind_addr": "127.0.0.1:0",
"psk_hex": "0".repeat(64),
});
let cfg_c = CString::new(cfg.to_string()).unwrap();
let mut nh: *mut MeshNodeHandle = std::ptr::null_mut();
assert_eq!(unsafe { net_mesh_new(cfg_c.as_ptr(), &mut nh) }, 0);
assert!(!nh.is_null());
let nh_addr = nh as usize;
let started = Arc::new(AtomicBool::new(false));
let release = Arc::new(AtomicBool::new(false));
let started_w = started.clone();
let release_w = release.clone();
let worker = std::thread::spawn(move || {
let h = unsafe { &*(nh_addr as *mut MeshNodeHandle) };
let op = h.guard.try_enter().expect("entry must succeed pre-free");
started_w.store(true, Ordering::SeqCst);
while !release_w.load(Ordering::SeqCst) {
std::thread::sleep(Duration::from_millis(1));
}
drop(op);
});
while !started.load(Ordering::SeqCst) {
std::thread::yield_now();
}
let release_clone = release.clone();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(50));
release_clone.store(true, Ordering::SeqCst);
});
let t0 = Instant::now();
unsafe { net_mesh_free(nh) };
let elapsed = t0.elapsed();
assert!(
elapsed >= Duration::from_millis(40),
"net_mesh_free returned in {:?} — pre-fix it would have proceeded \
immediately and the worker's subsequent op would UAF",
elapsed,
);
worker.join().unwrap();
}
#[test]
fn net_mesh_stream_stats_returns_shutting_down_after_free() {
let cfg = serde_json::json!({
"bind_addr": "127.0.0.1:0",
"psk_hex": "0".repeat(64),
});
let cfg_c = CString::new(cfg.to_string()).unwrap();
let mut nh: *mut MeshNodeHandle = std::ptr::null_mut();
assert_eq!(unsafe { net_mesh_new(cfg_c.as_ptr(), &mut nh) }, 0);
assert!(!nh.is_null());
unsafe { net_mesh_free(nh) };
let mut out_json: *mut c_char = std::ptr::null_mut();
let mut out_len: usize = 0;
let rc = unsafe { net_mesh_stream_stats(nh, 0xDEAD, 1, &mut out_json, &mut out_len) };
assert_eq!(
rc,
NetError::ShuttingDown as c_int,
"post-free stream_stats must surface ShuttingDown (got {rc})",
);
assert!(
out_json.is_null(),
"no payload may be written after the guard fires",
);
}
#[test]
fn net_identity_issue_token_returns_shutting_down_after_free() {
let mut signer: *mut IdentityHandle = std::ptr::null_mut();
assert_eq!(unsafe { net_identity_generate(&mut signer) }, 0);
assert!(!signer.is_null());
unsafe { net_identity_free(signer) };
let subject = [0u8; 32];
let scope = CString::new("[\"publish\"]").unwrap();
let channel = CString::new("test-channel").unwrap();
let mut out_token: *mut u8 = std::ptr::null_mut();
let mut out_token_len: usize = 0;
let rc = unsafe {
net_identity_issue_token(
signer,
subject.as_ptr(),
subject.len(),
scope.as_ptr(),
channel.as_ptr(),
60,
0,
&mut out_token,
&mut out_token_len,
)
};
assert_eq!(
rc,
NetError::ShuttingDown as c_int,
"post-free issue_token must surface ShuttingDown (got {rc})",
);
assert!(out_token.is_null(), "no token bytes may be allocated");
}
#[test]
fn net_delegate_token_returns_shutting_down_after_free() {
let mut signer: *mut IdentityHandle = std::ptr::null_mut();
assert_eq!(unsafe { net_identity_generate(&mut signer) }, 0);
assert!(!signer.is_null());
let subject = [0u8; 32];
let scope = CString::new("[\"publish\",\"delegate\"]").unwrap();
let channel = CString::new("test-channel").unwrap();
let mut parent_bytes: *mut u8 = std::ptr::null_mut();
let mut parent_len: usize = 0;
assert_eq!(
unsafe {
net_identity_issue_token(
signer,
subject.as_ptr(),
subject.len(),
scope.as_ptr(),
channel.as_ptr(),
60,
1,
&mut parent_bytes,
&mut parent_len,
)
},
0,
);
assert!(!parent_bytes.is_null());
unsafe { net_identity_free(signer) };
let new_subject = [1u8; 32];
let restricted = CString::new("[\"publish\"]").unwrap();
let mut child_bytes: *mut u8 = std::ptr::null_mut();
let mut child_len: usize = 0;
let rc = unsafe {
net_delegate_token(
signer,
parent_bytes,
parent_len,
new_subject.as_ptr(),
new_subject.len(),
restricted.as_ptr(),
&mut child_bytes,
&mut child_len,
)
};
assert_eq!(
rc,
NetError::ShuttingDown as c_int,
"post-free delegate_token must surface ShuttingDown (got {rc})",
);
assert!(child_bytes.is_null(), "no child token may be allocated");
unsafe { net_free_bytes(parent_bytes, parent_len) };
}
#[test]
fn hardware_from_json_saturates_overflow_cpu_fields() {
let h = HardwareJson {
cpu_cores: Some(70_000),
cpu_threads: Some(200_000),
memory_gb: None,
gpu: None,
additional_gpus: Vec::new(),
storage_gb: None,
network_gbps: None,
accelerators: Vec::new(),
};
let hw = hardware_from_json(h);
assert_eq!(hw.cpu_cores, u16::MAX);
assert_eq!(hw.cpu_threads, u16::MAX);
}
#[test]
fn token_entry_points_reject_oversize_len() {
let invalid_json: c_int = NetError::InvalidJson.into();
let mut sentinel: u8 = 0;
let token = &mut sentinel as *mut u8 as *const u8;
let mut out_json: *mut c_char = std::ptr::null_mut();
let mut out_len: usize = 0;
assert_eq!(
unsafe { net_parse_token(token, usize::MAX, &mut out_json, &mut out_len) },
invalid_json,
);
assert!(out_json.is_null());
let mut out_ok: c_int = -42;
assert_eq!(
unsafe { net_verify_token(token, usize::MAX, &mut out_ok) },
invalid_json,
);
let mut out_expired: c_int = -42;
assert_eq!(
unsafe { net_token_is_expired(token, usize::MAX, &mut out_expired) },
invalid_json,
);
assert_eq!(
sentinel, 0,
"sentinel must not be touched: the length guard fires before any deref"
);
}
}
#[cfg(all(test, not(feature = "nat-traversal")))]
mod nat_traversal_stub_tests {
use super::*;
use std::ptr;
#[test]
fn nat_type_stub_returns_unsupported() {
let mut out_str: *mut c_char = ptr::null_mut();
let mut out_len: usize = 0;
let code = net_mesh_nat_type(ptr::null_mut(), &mut out_str, &mut out_len);
assert_eq!(code, NET_ERR_TRAVERSAL_UNSUPPORTED);
}
#[test]
fn reflex_addr_stub_returns_unsupported() {
let mut out_str: *mut c_char = ptr::null_mut();
let mut out_len: usize = 0;
let code = net_mesh_reflex_addr(ptr::null_mut(), &mut out_str, &mut out_len);
assert_eq!(code, NET_ERR_TRAVERSAL_UNSUPPORTED);
}
#[test]
fn peer_nat_type_stub_returns_unsupported() {
let mut out_str: *mut c_char = ptr::null_mut();
let mut out_len: usize = 0;
let code = net_mesh_peer_nat_type(ptr::null_mut(), 0, &mut out_str, &mut out_len);
assert_eq!(code, NET_ERR_TRAVERSAL_UNSUPPORTED);
}
#[test]
fn probe_reflex_stub_returns_unsupported() {
let mut out_str: *mut c_char = ptr::null_mut();
let mut out_len: usize = 0;
let code = net_mesh_probe_reflex(ptr::null_mut(), 0, &mut out_str, &mut out_len);
assert_eq!(code, NET_ERR_TRAVERSAL_UNSUPPORTED);
}
#[test]
fn reclassify_nat_stub_returns_unsupported() {
let code = net_mesh_reclassify_nat(ptr::null_mut());
assert_eq!(code, NET_ERR_TRAVERSAL_UNSUPPORTED);
}
#[test]
fn traversal_stats_stub_returns_unsupported() {
let mut a: u64 = 0;
let mut b: u64 = 0;
let mut c: u64 = 0;
let code = net_mesh_traversal_stats(ptr::null_mut(), &mut a, &mut b, &mut c);
assert_eq!(code, NET_ERR_TRAVERSAL_UNSUPPORTED);
}
#[test]
fn connect_direct_stub_returns_unsupported() {
let code = net_mesh_connect_direct(ptr::null_mut(), 0, ptr::null(), 0);
assert_eq!(code, NET_ERR_TRAVERSAL_UNSUPPORTED);
}
#[test]
fn set_reflex_override_stub_returns_unsupported() {
let code = net_mesh_set_reflex_override(ptr::null_mut(), ptr::null());
assert_eq!(code, NET_ERR_TRAVERSAL_UNSUPPORTED);
}
#[test]
fn clear_reflex_override_stub_returns_unsupported() {
let code = net_mesh_clear_reflex_override(ptr::null_mut());
assert_eq!(code, NET_ERR_TRAVERSAL_UNSUPPORTED);
}
#[test]
fn unsupported_code_is_stable() {
assert_eq!(NET_ERR_TRAVERSAL_UNSUPPORTED, -137);
}
#[test]
fn capability_set_from_go_marshal_preserves_gpu_vendor() {
let json = r#"{"hardware":{"cpu_cores":16,"memory_gb":64,"gpu":{"vendor":"nvidia","model":"h100","vram_gb":80}},"tags":["gpu"]}"#;
let parsed: CapabilitySetJson = serde_json::from_str(json).expect("JSON should parse");
let caps = capability_set_from_json(parsed);
let views = caps.views();
assert_eq!(
views.hardware().gpu_vendor(),
Some(super::GpuVendor::Nvidia),
"vendor lost in conversion"
);
assert_eq!(views.hardware().memory_gb, 64);
assert_eq!(views.hardware().total_vram_gb(), 80);
assert!(caps.has_tag("gpu"));
}
#[test]
fn collect_payloads_rejects_null_entry_with_nonzero_length() {
let buf_a = b"hello".as_slice();
let buf_b = b"world".as_slice();
let ptrs: [*const u8; 3] = [buf_a.as_ptr(), std::ptr::null(), buf_b.as_ptr()];
let lens: [usize; 3] = [buf_a.len(), 4, buf_b.len()];
let result = unsafe { collect_payloads(ptrs.as_ptr(), lens.as_ptr(), 3) };
assert!(
result.is_none(),
"null entry with non-zero length must reject the whole batch"
);
}
#[test]
fn collect_payloads_allows_null_entry_with_zero_length() {
let buf_a = b"hello".as_slice();
let ptrs: [*const u8; 2] = [buf_a.as_ptr(), std::ptr::null()];
let lens: [usize; 2] = [buf_a.len(), 0];
let result = unsafe { collect_payloads(ptrs.as_ptr(), lens.as_ptr(), 2) }
.expect("zero-length null is treated as empty payload");
assert_eq!(result.len(), 2);
assert_eq!(&result[0][..], b"hello");
assert!(result[1].is_empty());
}
#[test]
fn collect_payloads_happy_path() {
let buf_a = b"abc".as_slice();
let buf_b = b"defg".as_slice();
let ptrs: [*const u8; 2] = [buf_a.as_ptr(), buf_b.as_ptr()];
let lens: [usize; 2] = [buf_a.len(), buf_b.len()];
let result = unsafe { collect_payloads(ptrs.as_ptr(), lens.as_ptr(), 2) }
.expect("non-null entries should succeed");
assert_eq!(result.len(), 2);
assert_eq!(&result[0][..], b"abc");
assert_eq!(&result[1][..], b"defg");
}
}