#![allow(clippy::missing_safety_doc)]
#![expect(
clippy::undocumented_unsafe_blocks,
reason = "module-wide FFI safety contract documented in the # Safety preamble above"
)]
#![expect(
clippy::multiple_unsafe_ops_per_block,
reason = "FFI entry points routinely deref + write to multiple out-parameter fields under the same caller contract; splitting per-op would obscure the single boundary-cross"
)]
#![allow(clippy::not_unsafe_ptr_arg_deref)]
use std::ffi::CStr;
use std::os::raw::{c_char, c_int};
use std::ptr;
use tokio::runtime::Runtime;
use crate::bus::EventBus;
use crate::config::EventBusConfig;
use crate::consumer::ConsumeRequest;
use crate::event::{Event, RawEvent};
#[cfg(any(
all(feature = "netdb", feature = "redex-disk"),
feature = "net",
feature = "redis",
))]
pub mod handle_guard;
#[cfg(all(feature = "netdb", feature = "redex-disk"))]
#[allow(missing_docs)]
pub mod cortex;
#[cfg(feature = "dataforts")]
#[allow(missing_docs)]
pub mod blob;
#[allow(missing_docs)]
pub mod blob_stubs;
#[cfg(feature = "net")]
#[allow(missing_docs)]
pub mod mesh;
#[cfg(feature = "net")]
pub mod predicate;
#[cfg(feature = "net")]
pub mod schema;
#[cfg(feature = "net")]
pub mod predicate_debug;
#[cfg(feature = "redis")]
pub mod redis_dedup;
#[cfg(feature = "net")]
use crate::adapter::net::{NetAdapterConfig, ReliabilityConfig, StaticKeypair};
#[cfg(any(feature = "redis", feature = "jetstream", feature = "net"))]
use crate::config::AdapterConfig;
#[cfg(feature = "jetstream")]
use crate::config::JetStreamAdapterConfig;
#[cfg(feature = "redis")]
use crate::config::RedisAdapterConfig;
#[cfg(feature = "net")]
use std::ffi::CString;
pub struct NetHandle {
bus: std::mem::ManuallyDrop<EventBus>,
runtime: std::mem::ManuallyDrop<Runtime>,
shutting_down: std::sync::atomic::AtomicBool,
active_ops: std::sync::atomic::AtomicU32,
bus_taken: std::sync::atomic::AtomicBool,
shutdown_completed: std::sync::atomic::AtomicBool,
}
const FFI_SHUTDOWN_DEADLINE: std::time::Duration = std::time::Duration::from_secs(5);
struct FfiOpGuard<'a> {
handle: &'a NetHandle,
}
impl<'a> FfiOpGuard<'a> {
fn try_enter(handle: &'a NetHandle) -> Option<Self> {
handle
.active_ops
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if handle
.shutting_down
.load(std::sync::atomic::Ordering::SeqCst)
|| handle.bus_taken.load(std::sync::atomic::Ordering::SeqCst)
{
handle
.active_ops
.fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
None
} else {
Some(Self { handle })
}
}
}
impl Drop for FfiOpGuard<'_> {
fn drop(&mut self) {
self.handle
.active_ops
.fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
}
}
#[inline]
fn handle_is_valid(handle: *const NetHandle) -> bool {
!handle.is_null() && (handle as usize).is_multiple_of(std::mem::align_of::<NetHandle>())
}
#[repr(C)]
pub enum NetError {
Success = 0,
NullPointer = -1,
InvalidUtf8 = -2,
InvalidJson = -3,
InitFailed = -4,
IngestionFailed = -5,
PollFailed = -6,
BufferTooSmall = -7,
ShuttingDown = -8,
IntOverflow = -9,
MismatchedHandles = -10,
InteriorNul = -11,
Unknown = -99,
}
impl From<NetError> for c_int {
fn from(e: NetError) -> Self {
e as c_int
}
}
#[inline]
fn enter_ffi_op(handle: &NetHandle) -> Result<FfiOpGuard<'_>, c_int> {
FfiOpGuard::try_enter(handle).ok_or(NetError::ShuttingDown.into())
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_init(config_json: *const c_char) -> *mut NetHandle {
let config = if config_json.is_null() {
EventBusConfig::default()
} else {
let config_str = match unsafe { CStr::from_ptr(config_json) }.to_str() {
Ok("") => EventBusConfig::default(),
Ok(s) => match parse_config_json(s) {
Some(cfg) => cfg,
None => return ptr::null_mut(),
},
Err(_) => return ptr::null_mut(),
};
config_str
};
let runtime = match Runtime::new() {
Ok(rt) => rt,
Err(_) => return ptr::null_mut(),
};
create_with_config(runtime, config)
}
fn parse_config_json(json_str: &str) -> Option<EventBusConfig> {
let value: serde_json::Value = serde_json::from_str(json_str).ok()?;
let mut builder = EventBusConfig::builder();
if let Some(num_shards) = value.get("num_shards").and_then(|v| v.as_u64()) {
let num_shards = u16::try_from(num_shards).ok()?;
builder = builder.num_shards(num_shards);
}
if let Some(capacity) = value.get("ring_buffer_capacity").and_then(|v| v.as_u64()) {
let capacity = usize::try_from(capacity).ok()?;
builder = builder.ring_buffer_capacity(capacity);
}
if let Some(bp_value) = value.get("backpressure_mode") {
let bp_mode = if let Some(mode) = bp_value.as_str() {
match mode {
"DropNewest" | "drop_newest" => crate::config::BackpressureMode::DropNewest,
"DropOldest" | "drop_oldest" => crate::config::BackpressureMode::DropOldest,
"FailProducer" | "fail_producer" => crate::config::BackpressureMode::FailProducer,
_ => return None,
}
} else if let Some(obj) = bp_value.as_object() {
if let Some(sample) = obj.get("Sample").or_else(|| obj.get("sample")) {
let rate = sample.get("rate").and_then(|v| v.as_u64())?;
let rate = u32::try_from(rate).ok()?;
if rate == 0 {
return None;
}
crate::config::BackpressureMode::Sample { rate }
} else {
return None;
}
} else {
return None;
};
builder = builder.backpressure_mode(bp_mode);
}
#[cfg(feature = "redis")]
if let Some(redis) = value.get("redis") {
if let Some(url) = redis.get("url").and_then(|v| v.as_str()) {
let mut redis_config = RedisAdapterConfig::new(url);
if let Some(prefix) = redis.get("prefix").and_then(|v| v.as_str()) {
redis_config = redis_config.with_prefix(prefix);
}
if let Some(max_len) = redis.get("max_stream_len").and_then(|v| v.as_u64()) {
let max_len = usize::try_from(max_len).ok()?;
redis_config = redis_config.with_max_stream_len(max_len);
}
if let Some(pipeline_size) = redis.get("pipeline_size").and_then(|v| v.as_u64()) {
let pipeline_size = usize::try_from(pipeline_size).ok()?;
redis_config = redis_config.with_pipeline_size(pipeline_size);
}
builder = builder.adapter(AdapterConfig::Redis(redis_config));
}
}
#[cfg(feature = "jetstream")]
if let Some(jetstream) = value.get("jetstream") {
if let Some(url) = jetstream.get("url").and_then(|v| v.as_str()) {
let mut js_config = JetStreamAdapterConfig::new(url);
if let Some(prefix) = jetstream.get("prefix").and_then(|v| v.as_str()) {
js_config = js_config.with_prefix(prefix);
}
if let Some(max_messages) = jetstream.get("max_messages").and_then(|v| v.as_i64()) {
js_config = js_config.with_max_messages(max_messages);
}
if let Some(replicas) = jetstream.get("replicas").and_then(|v| v.as_u64()) {
let replicas = usize::try_from(replicas).ok()?;
js_config = js_config.with_replicas(replicas);
}
builder = builder.adapter(AdapterConfig::JetStream(js_config));
}
}
#[cfg(feature = "net")]
if let Some(net) = value.get("net") {
let bind_addr: std::net::SocketAddr = net
.get("bind_addr")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())?;
let peer_addr: std::net::SocketAddr = net
.get("peer_addr")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())?;
let psk: [u8; 32] = net
.get("psk")
.and_then(|v| v.as_str())
.and_then(|s| hex::decode(s).ok())
.and_then(|v| v.try_into().ok())?;
let role = net
.get("role")
.and_then(|v| v.as_str())
.unwrap_or("initiator");
let mut net_config = match role {
"initiator" => {
let peer_pubkey: [u8; 32] = net
.get("peer_public_key")
.and_then(|v| v.as_str())
.and_then(|s| hex::decode(s).ok())
.and_then(|v| v.try_into().ok())?;
NetAdapterConfig::initiator(bind_addr, peer_addr, psk, peer_pubkey)
}
"responder" => {
let secret_key: [u8; 32] = net
.get("secret_key")
.and_then(|v| v.as_str())
.and_then(|s| hex::decode(s).ok())
.and_then(|v| v.try_into().ok())?;
let public_key: [u8; 32] = net
.get("public_key")
.and_then(|v| v.as_str())
.and_then(|s| hex::decode(s).ok())
.and_then(|v| v.try_into().ok())?;
let keypair = StaticKeypair::from_keys(secret_key, public_key);
NetAdapterConfig::responder(bind_addr, peer_addr, psk, keypair)
}
_ => return None,
};
if let Some(reliability) = net.get("reliability").and_then(|v| v.as_str()) {
net_config = net_config.with_reliability(match reliability {
"light" => ReliabilityConfig::Light,
"full" => ReliabilityConfig::Full,
_ => ReliabilityConfig::None,
});
}
if let Some(pool_size) = net.get("packet_pool_size").and_then(|v| v.as_u64()) {
if let Ok(size) = usize::try_from(pool_size) {
net_config = net_config.with_pool_size(size);
}
}
if let Some(interval_ms) = net.get("heartbeat_interval_ms").and_then(|v| v.as_u64()) {
if interval_ms == 0 {
return None;
}
net_config =
net_config.with_heartbeat_interval(std::time::Duration::from_millis(interval_ms));
}
if let Some(timeout_ms) = net.get("session_timeout_ms").and_then(|v| v.as_u64()) {
if timeout_ms == 0 {
return None;
}
net_config =
net_config.with_session_timeout(std::time::Duration::from_millis(timeout_ms));
}
if let Some(batched) = net.get("batched_io").and_then(|v| v.as_bool()) {
net_config = net_config.with_batched_io(batched);
}
builder = builder.adapter(AdapterConfig::Net(Box::new(net_config)));
}
builder.build().ok()
}
fn create_with_config(runtime: Runtime, config: EventBusConfig) -> *mut NetHandle {
let bus = match runtime.block_on(EventBus::new(config)) {
Ok(bus) => bus,
Err(_) => {
std::thread::spawn(move || drop(runtime));
return ptr::null_mut();
}
};
let handle = Box::new(NetHandle {
bus: std::mem::ManuallyDrop::new(bus),
runtime: std::mem::ManuallyDrop::new(runtime),
shutting_down: std::sync::atomic::AtomicBool::new(false),
active_ops: std::sync::atomic::AtomicU32::new(0),
bus_taken: std::sync::atomic::AtomicBool::new(false),
shutdown_completed: std::sync::atomic::AtomicBool::new(false),
});
Box::into_raw(handle)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_ingest(
handle: *mut NetHandle,
event_json: *const c_char,
len: usize,
) -> c_int {
if !handle_is_valid(handle) || event_json.is_null() {
return NetError::NullPointer.into();
}
let handle = unsafe { &*handle };
let _guard = match enter_ffi_op(handle) {
Ok(g) => g,
Err(err) => return err,
};
if len > isize::MAX as usize {
return NetError::InvalidJson.into();
}
let json_bytes = unsafe { std::slice::from_raw_parts(event_json as *const u8, len) };
let json_str = match std::str::from_utf8(json_bytes) {
Ok(s) => s,
Err(_) => return NetError::InvalidUtf8.into(),
};
let event = match Event::from_str(json_str) {
Ok(e) => e,
Err(_) => return NetError::InvalidJson.into(),
};
match handle.bus.ingest(event) {
Ok(_) => NetError::Success.into(),
Err(_) => NetError::IngestionFailed.into(),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_ingest_raw(
handle: *mut NetHandle,
json: *const c_char,
len: usize,
) -> c_int {
if !handle_is_valid(handle) || json.is_null() {
return NetError::NullPointer.into();
}
let handle = unsafe { &*handle };
let _guard = match enter_ffi_op(handle) {
Ok(g) => g,
Err(err) => return err,
};
if len > isize::MAX as usize {
return NetError::InvalidJson.into();
}
let json_bytes = unsafe { std::slice::from_raw_parts(json as *const u8, len) };
let json_str = match std::str::from_utf8(json_bytes) {
Ok(s) => s,
Err(_) => return NetError::InvalidUtf8.into(),
};
let raw = RawEvent::from_str(json_str);
match handle.bus.ingest_raw(raw) {
Ok(_) => NetError::Success.into(),
Err(_) => NetError::IngestionFailed.into(),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_ingest_raw_batch(
handle: *mut NetHandle,
jsons: *const *const c_char,
lens: *const usize,
count: usize,
) -> c_int {
if !handle_is_valid(handle) || jsons.is_null() || lens.is_null() {
return NetError::NullPointer.into();
}
if count == 0 {
return 0;
}
let handle = unsafe { &*handle };
let _guard = match enter_ffi_op(handle) {
Ok(g) => g,
Err(err) => return err,
};
let mut events = Vec::with_capacity(count);
let mut dropped_null = 0usize;
let mut dropped_oversize = 0usize;
let mut dropped_invalid_utf8 = 0usize;
for i in 0..count {
let json_ptr = unsafe { *jsons.add(i) };
let len = unsafe { *lens.add(i) };
if json_ptr.is_null() {
tracing::warn!(
index = i,
"net_ingest_raw_batch: dropping entry with null pointer"
);
dropped_null += 1;
continue;
}
if len > isize::MAX as usize {
tracing::warn!(
index = i,
len,
"net_ingest_raw_batch: dropping entry with len > isize::MAX"
);
dropped_oversize += 1;
continue;
}
let json_bytes = unsafe { std::slice::from_raw_parts(json_ptr as *const u8, len) };
match std::str::from_utf8(json_bytes) {
Ok(json_str) => events.push(RawEvent::from_str(json_str)),
Err(_) => {
tracing::warn!(
index = i,
"net_ingest_raw_batch: dropping entry with invalid UTF-8"
);
dropped_invalid_utf8 += 1;
}
}
}
let total_dropped = dropped_null + dropped_oversize + dropped_invalid_utf8;
if total_dropped > 0 {
tracing::warn!(
input_count = count,
dropped_null,
dropped_oversize,
dropped_invalid_utf8,
"net_ingest_raw_batch: {} of {} entries dropped before ingest",
total_dropped,
count,
);
}
let count = handle.bus.ingest_raw_batch(events);
c_int::try_from(count).unwrap_or_else(|_| NetError::IntOverflow.into())
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_ingest_batch(
handle: *mut NetHandle,
events_json: *const c_char,
) -> c_int {
if !handle_is_valid(handle) || events_json.is_null() {
return NetError::NullPointer.into();
}
let handle = unsafe { &*handle };
let _guard = match enter_ffi_op(handle) {
Ok(g) => g,
Err(err) => return err,
};
let json_str = match unsafe { CStr::from_ptr(events_json) }.to_str() {
Ok(s) => s,
Err(_) => return NetError::InvalidUtf8.into(),
};
let array: Vec<serde_json::Value> = match serde_json::from_str(json_str) {
Ok(a) => a,
Err(_) => return NetError::InvalidJson.into(),
};
let events: Vec<Event> = array.into_iter().map(Event::new).collect();
let count = handle.bus.ingest_batch(events);
c_int::try_from(count).unwrap_or_else(|_| NetError::IntOverflow.into())
}
fn parse_poll_request_json(json_str: &str) -> Result<ConsumeRequest, c_int> {
let value: serde_json::Value =
serde_json::from_str(json_str).map_err(|_| c_int::from(NetError::InvalidJson))?;
let limit = match value.get("limit") {
None | Some(serde_json::Value::Null) => 100usize,
Some(v) => match v.as_u64() {
Some(n) => usize::try_from(n).map_err(|_| c_int::from(NetError::InvalidJson))?,
None => return Err(NetError::InvalidJson.into()),
},
};
let cursor = match value.get("cursor") {
None | Some(serde_json::Value::Null) => None,
Some(v) => match v.as_str() {
Some(s) => Some(s.to_owned()),
None => return Err(NetError::InvalidJson.into()),
},
};
let mut req = ConsumeRequest::new(limit);
req.from_id = cursor;
Ok(req)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_poll(
handle: *mut NetHandle,
request_json: *const c_char,
out_buffer: *mut c_char,
buffer_len: usize,
) -> c_int {
if !handle_is_valid(handle) || out_buffer.is_null() {
return NetError::NullPointer.into();
}
let handle = unsafe { &*handle };
let _guard = match enter_ffi_op(handle) {
Ok(g) => g,
Err(err) => return err,
};
let request = if request_json.is_null() {
ConsumeRequest::new(100)
} else {
let json_str = match unsafe { CStr::from_ptr(request_json) }.to_str() {
Ok(s) => s,
Err(_) => return NetError::InvalidUtf8.into(),
};
match parse_poll_request_json(json_str) {
Ok(req) => req,
Err(code) => return code,
}
};
const MIN_RESPONSE_BUFFER: usize = 256;
if buffer_len < MIN_RESPONSE_BUFFER {
return NetError::BufferTooSmall.into();
}
let cursor_snapshot = request.from_id.clone();
let response = match handle.runtime.block_on(handle.bus.poll(request)) {
Ok(r) => r,
Err(_) => return NetError::PollFailed.into(),
};
let total_events = response.events.len();
let mut parsed_events: Vec<serde_json::Value> = Vec::with_capacity(total_events);
let mut parse_errors: usize = 0;
for e in &response.events {
match e.parse() {
Ok(v) => parsed_events.push(v),
Err(_) => {
parse_errors += 1;
if let Ok(raw) = e.raw_str() {
parsed_events.push(serde_json::Value::String(raw.to_string()));
}
}
}
}
let response_json = match serde_json::to_string(&serde_json::json!({
"events": parsed_events,
"next_id": response.next_id,
"has_more": response.has_more,
"count": parsed_events.len(),
"parse_errors": parse_errors,
})) {
Ok(s) => s,
Err(_) => return NetError::Unknown.into(),
};
if response_json.len() + 1 > buffer_len {
let fallback = serde_json::to_string(&serde_json::json!({
"events": [],
"next_id": cursor_snapshot,
"has_more": true,
"count": 0,
"parse_errors": 0,
"buffer_too_small": true,
"events_dropped": total_events,
}))
.unwrap_or_else(|_| String::from(
r#"{"events":[],"next_id":null,"has_more":true,"count":0,"parse_errors":0,"buffer_too_small":true}"#
));
if fallback.len() < buffer_len {
unsafe {
ptr::copy_nonoverlapping(
fallback.as_ptr() as *const c_char,
out_buffer,
fallback.len(),
);
*out_buffer.add(fallback.len()) = 0;
}
}
return NetError::BufferTooSmall.into();
}
unsafe {
ptr::copy_nonoverlapping(
response_json.as_ptr() as *const c_char,
out_buffer,
response_json.len(),
);
*out_buffer.add(response_json.len()) = 0; }
match c_int::try_from(response_json.len()) {
Ok(n) => n,
Err(_) => NetError::IntOverflow.into(),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_stats(
handle: *mut NetHandle,
out_buffer: *mut c_char,
buffer_len: usize,
) -> c_int {
if !handle_is_valid(handle) || out_buffer.is_null() {
return NetError::NullPointer.into();
}
let handle = unsafe { &*handle };
let _guard = match enter_ffi_op(handle) {
Ok(g) => g,
Err(err) => return err,
};
let stats = handle.bus.stats();
let shard_stats = handle.bus.shard_stats();
let stats_json = match serde_json::to_string(&serde_json::json!({
"events_ingested": stats.events_ingested.load(std::sync::atomic::Ordering::Relaxed),
"events_dropped": stats.events_dropped.load(std::sync::atomic::Ordering::Relaxed),
"batches_dispatched": stats.batches_dispatched.load(std::sync::atomic::Ordering::Relaxed),
"shard_events_ingested": shard_stats.events_ingested,
"shard_events_dropped": shard_stats.events_dropped,
"shard_batches_dispatched": shard_stats.batches_dispatched,
})) {
Ok(s) => s,
Err(_) => return NetError::Unknown.into(),
};
if stats_json.len() + 1 > buffer_len {
return NetError::BufferTooSmall.into();
}
unsafe {
ptr::copy_nonoverlapping(
stats_json.as_ptr() as *const c_char,
out_buffer,
stats_json.len(),
);
*out_buffer.add(stats_json.len()) = 0;
}
match c_int::try_from(stats_json.len()) {
Ok(n) => n,
Err(_) => NetError::IntOverflow.into(),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_flush(handle: *mut NetHandle) -> c_int {
if !handle_is_valid(handle) {
return NetError::NullPointer.into();
}
let handle = unsafe { &*handle };
let _guard = match enter_ffi_op(handle) {
Ok(g) => g,
Err(err) => return err,
};
match handle.runtime.block_on(handle.bus.flush()) {
Ok(_) => NetError::Success.into(),
Err(_) => NetError::Unknown.into(),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_shutdown(handle: *mut NetHandle) -> c_int {
if !handle_is_valid(handle) {
return NetError::NullPointer.into();
}
let drained_and_taken = {
let handle_ref = unsafe { &*handle };
handle_ref
.shutting_down
.store(true, std::sync::atomic::Ordering::SeqCst);
let deadline = std::time::Instant::now() + FFI_SHUTDOWN_DEADLINE;
let mut drained = false;
loop {
if handle_ref
.active_ops
.load(std::sync::atomic::Ordering::SeqCst)
== 0
{
drained = true;
break;
}
if std::time::Instant::now() >= deadline {
break;
}
std::thread::yield_now();
std::thread::sleep(std::time::Duration::from_millis(1));
}
if !drained {
return NetError::Unknown.into();
}
if handle_ref
.bus_taken
.swap(true, std::sync::atomic::Ordering::SeqCst)
{
let inner_deadline = std::time::Instant::now() + FFI_SHUTDOWN_DEADLINE;
while !handle_ref
.shutdown_completed
.load(std::sync::atomic::Ordering::Acquire)
{
if std::time::Instant::now() >= inner_deadline {
return NetError::Unknown.into();
}
std::thread::yield_now();
std::thread::sleep(std::time::Duration::from_millis(1));
}
return NetError::Success.into();
}
drained
};
let _ = drained_and_taken;
let bus = unsafe { std::mem::ManuallyDrop::take(&mut (*handle).bus) };
let runtime = unsafe { std::mem::ManuallyDrop::take(&mut (*handle).runtime) };
let result = runtime.block_on(bus.shutdown());
unsafe { &*handle }
.shutdown_completed
.store(true, std::sync::atomic::Ordering::Release);
match result {
Ok(()) => NetError::Success.into(),
Err(_) => NetError::Unknown.into(),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_num_shards(handle: *mut NetHandle) -> u16 {
if !handle_is_valid(handle) {
return 0;
}
let handle = unsafe { &*handle };
let _guard = match enter_ffi_op(handle) {
Ok(g) => g,
Err(_) => return 0,
};
handle.bus.num_shards()
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_version() -> *const c_char {
static VERSION: &[u8] = b"0.8.0\0";
VERSION.as_ptr() as *const c_char
}
#[cfg(feature = "net")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_generate_keypair() -> *mut c_char {
let keypair = StaticKeypair::generate();
let json = serde_json::json!({
"public_key": hex::encode(keypair.public_key()),
"secret_key": hex::encode(keypair.secret_key()),
});
match CString::new(json.to_string()) {
Ok(s) => s.into_raw(),
Err(_) => ptr::null_mut(),
}
}
#[cfg(feature = "net")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_free_string(s: *mut c_char) {
if !s.is_null() {
unsafe {
drop(CString::from_raw(s));
}
}
}
#[cfg(not(feature = "net"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_generate_keypair() -> *mut c_char {
ptr::null_mut()
}
#[cfg(not(feature = "net"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_free_string(s: *mut c_char) {
if !s.is_null() {
unsafe {
drop(std::ffi::CString::from_raw(s));
}
}
}
#[repr(C)]
pub struct NetReceipt {
pub shard_id: u16,
pub timestamp: u64,
}
#[cfg(target_pointer_width = "64")]
const _: () = assert!(
std::mem::size_of::<NetReceipt>() == 16,
"NetReceipt size changed on 64-bit; bindings hard-code 16. \
If the change is intentional, bump the binding versions and \
update this assertion."
);
#[cfg(target_pointer_width = "64")]
const _: () = assert!(
std::mem::align_of::<NetReceipt>() == 8,
"NetReceipt alignment changed on 64-bit; bindings expect 8."
);
#[repr(C)]
pub struct NetEvent {
pub id: *const c_char,
pub id_len: usize,
pub raw: *const c_char,
pub raw_len: usize,
pub insertion_ts: u64,
pub shard_id: u16,
}
#[cfg(target_pointer_width = "64")]
const _: () = assert!(
std::mem::size_of::<NetEvent>() == 48,
"NetEvent size changed on 64-bit; bindings hard-code 48. \
If the change is intentional, bump the binding versions and \
update this assertion."
);
#[cfg(target_pointer_width = "64")]
const _: () = assert!(
std::mem::align_of::<NetEvent>() == 8,
"NetEvent alignment changed on 64-bit; bindings expect 8."
);
#[repr(C)]
pub struct NetPollResult {
pub events: *mut NetEvent,
pub count: usize,
pub next_id: *mut c_char,
pub has_more: c_int,
}
#[repr(C)]
pub struct NetStats {
pub events_ingested: u64,
pub events_dropped: u64,
pub batches_dispatched: u64,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_ingest_raw_ex(
handle: *mut NetHandle,
json: *const c_char,
len: usize,
out: *mut NetReceipt,
) -> c_int {
if !handle_is_valid(handle) || json.is_null() {
return NetError::NullPointer.into();
}
let handle = unsafe { &*handle };
let _guard = match enter_ffi_op(handle) {
Ok(g) => g,
Err(err) => return err,
};
if len > isize::MAX as usize {
return NetError::InvalidJson.into();
}
let json_bytes = unsafe { std::slice::from_raw_parts(json as *const u8, len) };
let json_str = match std::str::from_utf8(json_bytes) {
Ok(s) => s,
Err(_) => return NetError::InvalidUtf8.into(),
};
let raw = RawEvent::from_str(json_str);
match handle.bus.ingest_raw(raw) {
Ok((shard_id, timestamp)) => {
if !out.is_null() {
unsafe {
(*out).shard_id = shard_id;
(*out).timestamp = timestamp;
}
}
NetError::Success.into()
}
Err(_) => NetError::IngestionFailed.into(),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_poll_ex(
handle: *mut NetHandle,
limit: usize,
cursor: *const c_char,
out: *mut NetPollResult,
) -> c_int {
if !handle_is_valid(handle) || out.is_null() {
return NetError::NullPointer.into();
}
if limit > 0
&& (std::mem::size_of::<NetEvent>())
.checked_mul(limit)
.is_none_or(|v| v > isize::MAX as usize)
{
return NetError::IntOverflow.into();
}
let handle = unsafe { &*handle };
let _guard = match enter_ffi_op(handle) {
Ok(g) => g,
Err(err) => return err,
};
let mut request = ConsumeRequest::new(limit);
if !cursor.is_null() {
if let Ok(s) = unsafe { CStr::from_ptr(cursor) }.to_str() {
if !s.is_empty() {
request = request.from(s);
}
}
}
let response = match handle.runtime.block_on(handle.bus.poll(request)) {
Ok(r) => r,
Err(_) => return NetError::PollFailed.into(),
};
let count = response.events.len();
let events_ptr = if count > 0 {
let layout = match std::alloc::Layout::array::<NetEvent>(count) {
Ok(l) => l,
Err(_) => return NetError::Unknown.into(),
};
let ptr = unsafe { std::alloc::alloc(layout) as *mut NetEvent };
if ptr.is_null() {
return NetError::Unknown.into();
}
let completed = std::cell::Cell::new(0usize);
let build_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
for (i, event) in response.events.iter().enumerate() {
let id_bytes = event.id.as_bytes().to_vec().into_boxed_slice();
let id_len = id_bytes.len();
let id_ptr = Box::into_raw(id_bytes) as *const c_char;
let raw_bytes = event.raw.to_vec().into_boxed_slice();
let raw_len = raw_bytes.len();
let raw_ptr = Box::into_raw(raw_bytes) as *const c_char;
unsafe {
ptr.add(i).write(NetEvent {
id: id_ptr,
id_len,
raw: raw_ptr,
raw_len,
insertion_ts: event.insertion_ts,
shard_id: event.shard_id,
});
}
completed.set(i + 1);
}
}));
if build_result.is_err() {
free_events_array_partial(ptr, completed.get(), count);
return NetError::Unknown.into();
}
ptr
} else {
ptr::null_mut()
};
let next_id_ptr = match response.next_id {
Some(ref s) => match std::ffi::CString::new(s.as_str()) {
Ok(c) => c.into_raw(),
Err(_) => {
free_events_array(events_ptr, count);
return NetError::InteriorNul.into();
}
},
None => ptr::null_mut(),
};
unsafe {
(*out).events = events_ptr;
(*out).count = count;
(*out).next_id = next_id_ptr;
(*out).has_more = if response.has_more { 1 } else { 0 };
}
NetError::Success.into()
}
fn free_events_array(events: *mut NetEvent, count: usize) {
free_events_array_partial(events, count, count);
}
fn free_events_array_partial(events: *mut NetEvent, walk_count: usize, alloc_count: usize) {
if events.is_null() || alloc_count == 0 {
return;
}
for i in 0..walk_count {
let event = unsafe { &*events.add(i) };
if !event.id.is_null() {
unsafe {
let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(
event.id as *mut u8,
event.id_len,
));
}
}
if !event.raw.is_null() {
unsafe {
let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(
event.raw as *mut u8,
event.raw_len,
));
}
}
}
if let Ok(layout) = std::alloc::Layout::array::<NetEvent>(alloc_count) {
unsafe {
std::alloc::dealloc(events as *mut u8, layout);
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_free_poll_result(result: *mut NetPollResult) {
if result.is_null() {
return;
}
let result = unsafe { &mut *result };
free_events_array(result.events, result.count);
if !result.next_id.is_null() {
unsafe {
drop(std::ffi::CString::from_raw(result.next_id));
}
}
result.events = std::ptr::null_mut();
result.count = 0;
result.next_id = std::ptr::null_mut();
result.has_more = 0;
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_stats_ex(handle: *mut NetHandle, out: *mut NetStats) -> c_int {
if !handle_is_valid(handle) || out.is_null() {
return NetError::NullPointer.into();
}
let handle = unsafe { &*handle };
let _guard = match enter_ffi_op(handle) {
Ok(g) => g,
Err(err) => return err,
};
let stats = handle.bus.stats();
unsafe {
(*out).events_ingested = stats
.events_ingested
.load(std::sync::atomic::Ordering::Relaxed);
(*out).events_dropped = stats
.events_dropped
.load(std::sync::atomic::Ordering::Relaxed);
(*out).batches_dispatched = stats
.batches_dispatched
.load(std::sync::atomic::Ordering::Relaxed);
}
NetError::Success.into()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_config_valid() {
let config = parse_config_json(r#"{"num_shards": 8}"#);
assert!(config.is_some());
}
#[test]
fn test_parse_config_num_shards_overflow() {
let config = parse_config_json(r#"{"num_shards": 65536}"#);
assert!(
config.is_none(),
"num_shards exceeding u16::MAX should fail"
);
let config = parse_config_json(r#"{"num_shards": 100000}"#);
assert!(
config.is_none(),
"num_shards exceeding u16::MAX should fail"
);
}
#[test]
fn test_parse_config_num_shards_max_valid() {
let config = parse_config_json(r#"{"num_shards": 65535}"#);
assert!(config.is_some(), "num_shards at u16::MAX should be valid");
}
#[test]
fn test_parse_config_invalid_json() {
let config = parse_config_json(r#"{"num_shards": invalid}"#);
assert!(config.is_none());
}
#[test]
fn test_parse_config_empty() {
let config = parse_config_json(r#"{}"#);
assert!(config.is_some(), "empty config should use defaults");
}
#[test]
fn parse_config_rejects_unknown_backpressure_mode() {
for s in [
"DropNewest",
"drop_newest",
"DropOldest",
"drop_oldest",
"FailProducer",
"fail_producer",
] {
let cfg = parse_config_json(&format!(r#"{{"backpressure_mode": "{}"}}"#, s));
assert!(cfg.is_some(), "known mode `{}` must parse", s);
}
for s in ["DropOldset", "FailProduce", "drop_oldst", "garbage", ""] {
let cfg = parse_config_json(&format!(r#"{{"backpressure_mode": "{}"}}"#, s));
assert!(
cfg.is_none(),
"unknown mode `{}` must reject (pre-fix this silently \
fell through to DropNewest)",
s,
);
}
let cfg = parse_config_json(r#"{"backpressure_mode": 42}"#);
assert!(
cfg.is_none(),
"non-string non-object backpressure_mode must reject"
);
let cfg = parse_config_json(r#"{"backpressure_mode": true}"#);
assert!(cfg.is_none(), "boolean backpressure_mode must reject");
}
#[test]
fn parse_config_supports_sample_mode_with_validation() {
let cfg = parse_config_json(r#"{"backpressure_mode": {"Sample": {"rate": 10}}}"#);
assert!(cfg.is_some(), "Sample with non-zero rate must parse");
let cfg = parse_config_json(r#"{"backpressure_mode": {"Sample": {"rate": 0}}}"#);
assert!(cfg.is_none(), "Sample with rate=0 must reject");
let cfg = parse_config_json(r#"{"backpressure_mode": {"Sample": {}}}"#);
assert!(cfg.is_none(), "Sample missing rate must reject");
}
#[test]
fn test_parse_poll_request_preserves_cursor() {
let req = parse_poll_request_json(r#"{"limit": 50, "cursor": "abc:123"}"#).unwrap();
assert_eq!(req.limit, 50);
assert_eq!(req.from_id.as_deref(), Some("abc:123"));
}
#[test]
fn test_parse_poll_request_no_cursor_defaults_to_none() {
let req = parse_poll_request_json(r#"{"limit": 10}"#).unwrap();
assert_eq!(req.limit, 10);
assert_eq!(req.from_id, None);
}
#[test]
fn test_parse_poll_request_empty_uses_default_limit() {
let req = parse_poll_request_json(r#"{}"#).unwrap();
assert_eq!(req.limit, 100);
assert_eq!(req.from_id, None);
}
#[test]
fn test_parse_poll_request_wrong_type_limit_errors() {
let err = parse_poll_request_json(r#"{"limit": "50"}"#).unwrap_err();
assert_eq!(err, c_int::from(NetError::InvalidJson));
let err = parse_poll_request_json(r#"{"limit": -1}"#).unwrap_err();
assert_eq!(err, c_int::from(NetError::InvalidJson));
}
#[test]
fn test_parse_poll_request_wrong_type_cursor_errors() {
let err = parse_poll_request_json(r#"{"cursor": 123}"#).unwrap_err();
assert_eq!(err, c_int::from(NetError::InvalidJson));
}
#[test]
fn test_parse_poll_request_null_fields_use_defaults() {
let req = parse_poll_request_json(r#"{"limit": null, "cursor": null}"#).unwrap();
assert_eq!(req.limit, 100);
assert_eq!(req.from_id, None);
}
#[test]
fn test_parse_poll_request_limit_at_usize_max() {
let json = format!(r#"{{"limit": {}}}"#, usize::MAX);
let req = parse_poll_request_json(&json).unwrap();
assert_eq!(req.limit, usize::MAX);
}
#[cfg(target_pointer_width = "32")]
#[test]
fn test_parse_poll_request_limit_overflows_usize_on_32bit() {
let err = parse_poll_request_json(r#"{"limit": 8589934592}"#).unwrap_err();
assert_eq!(err, c_int::from(NetError::InvalidJson));
}
#[test]
fn cr22_c_header_parity_with_rust_neterror() {
let primary = include_str!("../../include/net.h");
let go_copy = include_str!("../../include/net.go.h");
let rust_values: &[i32] = &[0, -1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -99];
fn extract_assigned_values(src: &str) -> Vec<i32> {
let mut out = Vec::new();
let mut chars = src.chars().peekable();
while let Some(c) = chars.next() {
if c != '=' {
continue;
}
while let Some(&peek) = chars.peek() {
if peek == ' ' || peek == '\t' {
chars.next();
} else {
break;
}
}
let mut buf = String::new();
if let Some(&peek) = chars.peek() {
if peek == '-' || peek == '+' {
buf.push(peek);
chars.next();
}
}
let mut had_digit = false;
while let Some(&peek) = chars.peek() {
if peek.is_ascii_digit() {
buf.push(peek);
chars.next();
had_digit = true;
} else {
break;
}
}
if had_digit {
if let Ok(v) = buf.parse::<i32>() {
out.push(v);
}
}
}
out
}
let primary_vals = extract_assigned_values(primary);
let go_vals = extract_assigned_values(go_copy);
for &v in rust_values {
assert!(
primary_vals.contains(&v),
"CR-22 regression: include/net.h is missing the value {} \
(Rust NetError defines it). Add the matching `NET_ERR_*` \
enumerator before merging.",
v
);
assert!(
go_vals.contains(&v),
"CR-22 regression: bindings/go/net/net.h is missing the value {} \
(Rust NetError defines it).",
v
);
}
}
#[test]
fn cr5_example_does_not_double_include_net_headers() {
let example = include_str!("../../examples/capability.c");
let net_h_included = example.contains("#include \"../include/net.h\"");
let net_go_h_included = example.contains("#include \"../include/net.go.h\"");
assert!(
net_go_h_included,
"examples/capability.c must include net.go.h to declare \
net_validate_capabilities + net_predicate_* symbols"
);
assert!(
!net_h_included,
"examples/capability.c must NOT also include net.h: \
both headers share the NET_SDK_H guard, so the second \
include is silently skipped, leaving the example's \
net_predicate_* calls implicitly declared. Drop the \
redundant include — net.go.h is a superset."
);
}
#[test]
fn handle_is_valid_rejects_null_and_misaligned() {
assert!(
!handle_is_valid(std::ptr::null::<NetHandle>()),
"null pointer must not be considered a valid handle"
);
let align = std::mem::align_of::<NetHandle>();
let buf = vec![0u8; align * 2];
let base = buf.as_ptr() as usize;
let aligned = (base + align - 1) & !(align - 1);
let aligned_ptr = aligned as *const NetHandle;
assert!(
handle_is_valid(aligned_ptr),
"aligned non-null pointer must validate (align={align}, ptr={aligned_ptr:p})"
);
if align > 1 {
let misaligned_ptr = (aligned + 1) as *const NetHandle;
assert!(
!handle_is_valid(misaligned_ptr),
"misaligned pointer must be rejected (align={align}, ptr={misaligned_ptr:p})"
);
}
}
#[cfg(feature = "net")]
#[test]
fn parse_config_rejects_zero_heartbeat_and_session_timeout() {
let psk = "0".repeat(64);
let peer_pk = "1".repeat(64);
let baseline = format!(
r#"{{"net":{{"bind_addr":"127.0.0.1:9000","peer_addr":"127.0.0.1:9001",
"psk":"{psk}","peer_public_key":"{peer_pk}",
"heartbeat_interval_ms":1000,"session_timeout_ms":30000}}}}"#
);
assert!(
parse_config_json(&baseline).is_some(),
"baseline net config with non-zero heartbeat/session_timeout must parse"
);
let zero_hb = format!(
r#"{{"net":{{"bind_addr":"127.0.0.1:9000","peer_addr":"127.0.0.1:9001",
"psk":"{psk}","peer_public_key":"{peer_pk}",
"heartbeat_interval_ms":0,"session_timeout_ms":30000}}}}"#
);
assert!(
parse_config_json(&zero_hb).is_none(),
"heartbeat_interval_ms=0 must reject (pre-fix this produced a CPU-pegging busy loop)"
);
let zero_to = format!(
r#"{{"net":{{"bind_addr":"127.0.0.1:9000","peer_addr":"127.0.0.1:9001",
"psk":"{psk}","peer_public_key":"{peer_pk}",
"heartbeat_interval_ms":1000,"session_timeout_ms":0}}}}"#
);
assert!(
parse_config_json(&zero_to).is_none(),
"session_timeout_ms=0 must reject"
);
}
}