net/ffi/mod.rs
1//! C FFI bindings for cross-language integration.
2//!
3//! This module provides a C-compatible API for using Net from
4//! other languages (Python, Node.js, Go, etc.).
5//!
6//! # Safety
7//!
8//! All public FFI functions in this module accept raw pointers from C code.
9//! Each is declared `pub unsafe extern "C" fn` so the unsafety is
10//! explicit at the type level; the module-wide contract callers
11//! must uphold is:
12//! - Pointers are valid and properly aligned
13//! - Opaque handle pointers (`*mut T`) were produced by this crate's
14//! matching constructor (`Box::into_raw` inside the FFI surface).
15//! Foreign-allocated pointers, even if valid and aligned, will UB
16//! when consumed by `Box::from_raw` in the corresponding `_free`.
17//! - String pointers point to valid UTF-8 data
18//! - Buffer sizes are accurate
19//! - Handles are not used after `net_shutdown`
20//!
21//! The per-function `# Safety` rustdoc is intentionally suppressed
22//! at the module level — every entry point shares the same contract
23//! and the module doc-comment above (plus `include/README.md`) is
24//! the source of truth. Adding individual `# Safety` blocks would
25//! duplicate the same wording 200 times without adding signal.
26//!
27//! # Thread Safety
28//!
29//! All FFI functions are thread-safe. The event bus handle can be shared
30//! across threads.
31//!
32#![allow(clippy::missing_safety_doc)]
33// The cross-cutting C-side safety contract for every `unsafe` block in
34// this module is documented in the `# Safety` section above:
35// caller-validated pointer / length / lifetime / handle-not-after-shutdown
36// invariants documented in `include/net.h`. Inlining `// SAFETY:` on each
37// block would add ~200 identical "see module preamble" comments without
38// adding any signal beyond what the preamble already says.
39#![expect(
40 clippy::undocumented_unsafe_blocks,
41 reason = "module-wide FFI safety contract documented in the # Safety preamble above"
42)]
43#![expect(
44 clippy::multiple_unsafe_ops_per_block,
45 reason = "FFI entry points routinely deref + write to multiple out-parameter fields under the same caller contract; splitting per-op would obscure the single boundary-cross"
46)]
47
48//! # Tokio runtime restriction
49//!
50//! Internal FFI ops (`net_poll`, `net_flush`, `net_shutdown`,
51//! `net_redex_*`, `net_mesh_new`, the cortex FFI, the mesh FFI)
52//! drive the bus's tokio runtime via `Runtime::block_on`. That
53//! function panics with "Cannot start a runtime from within a
54//! runtime" if the calling thread is already inside a tokio
55//! runtime context. The functions are `extern "C"`, so a panic
56//! unwinds across the FFI boundary into C / Go-cgo / Python /
57//! NAPI — undefined behavior.
58//!
59//! **The common-case C / Go / Python caller has no Rust tokio
60//! runtime, so this is unreachable for them.** The narrow path is:
61//!
62//! - A **Rust** caller loads the cdylib and calls these
63//! functions from inside its own `#[tokio::main]` (or any
64//! thread that has called `Runtime::enter()`).
65//! - A non-Rust caller embeds a Rust library that runs its own
66//! tokio runtime and forwards calls into this cdylib on the
67//! same thread.
68//!
69//! Both forms are unusual but reachable. **Do not call any FFI
70//! op from a thread that already holds a tokio runtime
71//! context.** If you must, spawn the FFI call on a fresh OS
72//! thread that doesn't carry a runtime guard, or wrap the call
73//! with `tokio::task::spawn_blocking(|| net_xxx(...))` to escape
74//! the worker pool.
75//!
76//! `net_init` (`mod.rs:284-316`) hardens against this for runtime
77//! *construction*; the steady-state ops do not, since the cost
78//! of a `Handle::try_current()` check on every poll would be
79//! measurable for the common path that doesn't hit the bug.
80//!
81//! # `catch_unwind` + caller-held locks
82//!
83//! Several FFI entries (`net_blob_publish`, `net_blob_resolve`,
84//! `net_*_wait_for_token`) wrap their body in
85//! `std::panic::catch_unwind(AssertUnwindSafe(...))` so a panic
86//! during the call returns a typed `NET_ERR_BLOB_PANIC` /
87//! `NET_ERR_PANIC` code rather than unwinding across the FFI
88//! boundary. That stops the substrate-side undefined behavior,
89//! but it does NOT make the wrapped code transparently panic-safe
90//! from the caller's perspective.
91//!
92//! **If the caller invokes an FFI op while holding an OS-level
93//! lock, a `sync.Mutex` (Go), `threading.Lock` (Python), or any
94//! other mutex with poisoning semantics, and the FFI body panics,
95//! the mutex is left in a poisoned state.** Subsequent acquires
96//! on the same mutex by the caller observe the poisoning and
97//! either error (Rust `parking_lot` with `poison_on_unwind`) or
98//! deadlock (Go's `sync.Mutex` doesn't poison; the caller has
99//! observed a return value that may not reflect the state of
100//! the FFI op).
101//!
102//! Recommended caller pattern: **do not hold a caller-side lock
103//! across an FFI call**. Acquire the lock, prepare the inputs,
104//! release the lock, then call the FFI. Re-acquire if you need
105//! to update caller state with the result.
106//!
107//! The hazard is documented per-binding in:
108//! - Python: `bindings/python/README.md` (caller-mutex notes)
109//! - Node: `bindings/node/README.md`
110//! - Go: `bindings/go/net/redex.go` lifecycle docs
111//! - C: `include/net.h` (every wait-family declaration)
112//!
113//! # Memory Management
114//!
115//! - Handles returned by `net_init` must be freed with `net_shutdown`
116//! - String buffers passed to `net_poll` are owned by the caller
117//! - Error codes are returned as integers (0 = success, negative = error)
118//!
119//! # Example (C)
120//!
121//! ```c
122//! #include "net.h"
123//!
124//! int main() {
125//! // Initialize with default config
126//! void* bus = net_init("{\"num_shards\": 4}");
127//! if (!bus) return 1;
128//!
129//! // Ingest an event
130//! int result = net_ingest(bus, "{\"token\": \"hello\"}", 19);
131//! if (result < 0) { /* handle error */ }
132//!
133//! // Poll events
134//! char buffer[65536];
135//! result = net_poll(bus, "{\"limit\": 100}", buffer, sizeof(buffer));
136//!
137//! // Shutdown
138//! net_shutdown(bus);
139//! return 0;
140//! }
141//! ```
142
143// FFI functions accept raw pointers but are not marked `unsafe` to maintain
144// C ABI compatibility. Safety is documented in the module-level docs.
145#![allow(clippy::not_unsafe_ptr_arg_deref)]
146
147use std::ffi::CStr;
148use std::os::raw::{c_char, c_int};
149use std::ptr;
150
151use tokio::runtime::Runtime;
152
153use crate::bus::EventBus;
154use crate::config::EventBusConfig;
155use crate::consumer::ConsumeRequest;
156use crate::event::{Event, RawEvent};
157
158/// C FFI for CortEX / NetDb / RedexFile. Requires `netdb` (for the
159/// unified facade) and `redex-disk` (for persistent storage paths on
160/// `Redex` / `RedexFile`). Go / cgo consumers target this surface.
161///
162/// `missing_docs` is suppressed on this module: these are extern "C"
163/// shims over already-documented Rust adapters, and the per-function
164/// contract is documented in the binding-side READMEs (Go / TS / Py).
165/// Re-documenting each shim would duplicate with drift risk.
166/// Per-FFI-handle quiescing protocol shared by cortex / mesh
167/// handles to close the audit-#23/#24/#25 use-after-free hazards
168/// when a `_free` races a concurrent op. See module docs for the
169/// soundness story (intentional box leak) and the per-handle
170/// recipe.
171#[cfg(any(
172 all(feature = "netdb", feature = "redex-disk"),
173 feature = "net",
174 feature = "redis",
175))]
176pub mod handle_guard;
177
178#[cfg(all(feature = "netdb", feature = "redex-disk"))]
179#[allow(missing_docs)]
180pub mod cortex;
181
182/// C FFI for the Dataforts Phase 3 blob surface. Exposes the
183/// BlobRef wire codec, the global adapter registry, and the
184/// `publish_blob` / `resolve_payload` helpers for cgo / native
185/// consumers.
186#[cfg(feature = "dataforts")]
187#[allow(missing_docs)]
188pub mod blob;
189
190/// Stub definitions for the `net_mesh_blob_adapter_*` symbols
191/// when the `dataforts / netdb / redex-disk` feature triple is
192/// off. cgo / dlsym consumers link these symbols unconditionally
193/// (see `bindings/go/blob.go`), so a libnet built without the
194/// triple must still satisfy them — each stub returns
195/// `NET_ERR_FEATURE_NOT_BUILT` (or null) so Go programs route to
196/// a clean error rather than fail at program load. The module is
197/// empty when the feature triple is on (the real impls in
198/// `ffi::blob` cover the same symbol names).
199#[allow(missing_docs)]
200pub mod blob_stubs;
201
202/// C FFI for the encrypted-UDP mesh transport + channels. Requires
203/// the `net` feature (which brings in the crypto + transport). Go /
204/// cgo consumers target this surface alongside `ffi::cortex`. See
205/// the `ffi::cortex` note for why `missing_docs` is suppressed here.
206#[cfg(feature = "net")]
207#[allow(missing_docs)]
208pub mod mesh;
209
210/// C FFI for the `aggregator.registry` RPC client + channel
211/// visibility setter. Stage 5 of `SDK_AGGREGATOR_SUBNET_PLAN.md`.
212/// Rides the `net` feature alongside `ffi::mesh` because every
213/// op needs a `MeshNodeHandle`.
214#[cfg(feature = "net")]
215#[allow(missing_docs)]
216pub mod aggregator;
217
218/// C FFI for stateless predicate evaluation (Phase 9c of
219/// `CAPABILITY_SYSTEM_SDK_PLAN.md`). Pure helpers — no handles,
220/// no state. Mirrors the SDK-layer `evaluatePredicate` /
221/// `evaluate_predicate` surface every binding ships, exposed at
222/// the C ABI for raw consumers (C / C++ / Zig / Swift / etc.).
223#[cfg(feature = "net")]
224pub mod predicate;
225
226/// C FFI for stateless capability-set validation (Phase 9a of
227/// `CAPABILITY_SYSTEM_SDK_PLAN.md`). Pure helper — `caps_json`
228/// in, `report_json` out. Mirrors the SDK-layer
229/// `validate_capabilities` surface, exposed at the C ABI for raw
230/// consumers.
231#[cfg(feature = "net")]
232pub mod schema;
233
234/// C FFI for predicate debug-session helpers (Phase 9d of
235/// `CAPABILITY_SYSTEM_SDK_PLAN.md`). Pure helpers — single-eval
236/// `evaluate_with_trace`, corpus-wide
237/// `aggregate_debug_report`, and host-side
238/// `redact_metadata_keys`. Mirror what every other binding
239/// ships at the SDK layer; exposed at the C ABI for raw
240/// consumers.
241#[cfg(feature = "net")]
242pub mod predicate_debug;
243
244/// C FFI for the Redis Streams consumer-side dedup helper. Mirrors
245/// the Rust `net::adapter::RedisStreamDedup` surface for Go / C / Zig
246/// consumers. See `ffi::redis_dedup` module docs for the wire
247/// shape and the dedup contract.
248#[cfg(feature = "redis")]
249pub mod redis_dedup;
250
251#[cfg(feature = "net")]
252use crate::adapter::net::{NetAdapterConfig, ReliabilityConfig, StaticKeypair};
253#[cfg(any(feature = "redis", feature = "jetstream", feature = "net"))]
254use crate::config::AdapterConfig;
255#[cfg(feature = "jetstream")]
256use crate::config::JetStreamAdapterConfig;
257#[cfg(feature = "redis")]
258use crate::config::RedisAdapterConfig;
259#[cfg(feature = "net")]
260use std::ffi::CString;
261
262/// Opaque handle to an event bus instance.
263///
264/// This wraps the EventBus along with a Tokio runtime for async operations.
265///
266/// # Lifetime / soundness
267///
268/// The handle storage is *intentionally leaked* on `net_shutdown` rather
269/// than freed via `Box::from_raw`. Reasoning: every FFI entry point
270/// dereferences the C-side `*mut NetHandle` to access the atomics that
271/// gate shutdown. The previous Dekker-style SeqCst handshake between
272/// `FfiOpGuard::try_enter` (which calls `fetch_add` on `active_ops`) and
273/// `net_shutdown` (which loads `active_ops` then `Box::from_raw`s the
274/// handle) was unsound: SeqCst orders the atomic operations only — the
275/// non-atomic `Box::from_raw` could deallocate the storage between
276/// shutdown's load and a concurrent FFI op's `fetch_add`, producing a
277/// use-after-free on the freed atomic. By never freeing the box, the
278/// atomic memory backing the handle is always valid; concurrent FFI ops
279/// observe `shutting_down=true` after shutdown signals it and bail
280/// before touching `bus`/`runtime`.
281///
282/// `bus` and `runtime` are stored in `ManuallyDrop` so that
283/// `net_shutdown` can `take` them out (via `ptr::read`) in order to
284/// call `bus.shutdown().await`. Because `shutting_down` is set first
285/// and shutdown waits for `active_ops` to drop to zero before reading
286/// these fields, no FFI op can be racing the read. If the wait times
287/// out, the `ptr::read` is skipped and both fields are leaked along
288/// with the box.
289pub struct NetHandle {
290 /// Owned `EventBus`. Read out via `ManuallyDrop::take` during
291 /// shutdown once `active_ops` has drained to zero. After that
292 /// point, `shutting_down` is `true` and no FFI op may access this
293 /// field.
294 bus: std::mem::ManuallyDrop<EventBus>,
295 /// Owned tokio runtime. Same lifetime contract as `bus`.
296 runtime: std::mem::ManuallyDrop<Runtime>,
297 /// Set to `true` once `net_shutdown` begins. All other FFI
298 /// functions check this flag and return `ShuttingDown` before
299 /// touching `bus` / `runtime`.
300 shutting_down: std::sync::atomic::AtomicBool,
301 /// Number of in-flight FFI operations (excluding shutdown itself).
302 /// `net_shutdown` spins until this drops to zero (with a deadline)
303 /// before reading `bus` / `runtime` to call shutdown.
304 active_ops: std::sync::atomic::AtomicU32,
305 /// Set to `true` after `net_shutdown` has consumed `bus` /
306 /// `runtime` via `ManuallyDrop::take`. A second `net_shutdown`
307 /// call observes this and returns `Success` without re-taking
308 /// (which would be UB). FFI ops also check this before touching
309 /// `bus` / `runtime`, defending against a contract-violating
310 /// caller that races a post-shutdown call.
311 bus_taken: std::sync::atomic::AtomicBool,
312 /// Set to `true` after `bus.shutdown()` returns from the
313 /// first `net_shutdown` call. A second/third concurrent
314 /// `net_shutdown` caller spins until this flips before
315 /// returning success — without this gate the second caller
316 /// observed `bus_taken == true` and returned `Success` while
317 /// the first caller was still mid-`block_on(bus.shutdown())`,
318 /// falsely signaling completion of an in-progress shutdown.
319 shutdown_completed: std::sync::atomic::AtomicBool,
320}
321
322/// Maximum time `net_shutdown` will wait for in-flight FFI operations
323/// to complete before giving up. If the deadline expires, the bus is
324/// leaked rather than read out — leaking is correct (the box is
325/// already leaked permanently for soundness reasons) but means the
326/// adapter's `flush()` / `shutdown()` won't run.
327const FFI_SHUTDOWN_DEADLINE: std::time::Duration = std::time::Duration::from_secs(5);
328
329/// RAII guard that increments `active_ops` on creation and decrements on drop.
330struct FfiOpGuard<'a> {
331 handle: &'a NetHandle,
332}
333
334impl<'a> FfiOpGuard<'a> {
335 /// Try to enter an FFI operation. Returns `None` if the handle is
336 /// shutting down or if `bus` / `runtime` have already been taken.
337 ///
338 /// Soundness rests on the fact that the box backing `handle` is
339 /// never freed (see `NetHandle` doc). The `fetch_add` is therefore
340 /// always on valid memory regardless of whether shutdown is in
341 /// progress. The subsequent loads decide whether the op is allowed
342 /// to proceed; if shutdown was signaled or `bus_taken` flipped
343 /// before our increment was visible, we bail without touching
344 /// `bus` / `runtime`. The `bus_taken` check defends against a
345 /// contract-violating caller that races a post-shutdown call: even
346 /// if `shutting_down` was reset somehow, an op that would touch the
347 /// already-taken `ManuallyDrop` fields is rejected.
348 fn try_enter(handle: &'a NetHandle) -> Option<Self> {
349 handle
350 .active_ops
351 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
352 if handle
353 .shutting_down
354 .load(std::sync::atomic::Ordering::SeqCst)
355 || handle.bus_taken.load(std::sync::atomic::Ordering::SeqCst)
356 {
357 handle
358 .active_ops
359 .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
360 None
361 } else {
362 Some(Self { handle })
363 }
364 }
365}
366
367impl Drop for FfiOpGuard<'_> {
368 fn drop(&mut self) {
369 self.handle
370 .active_ops
371 .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
372 }
373}
374
375/// Returns `true` when `handle` is non-null and aligned for
376/// `NetHandle`. Every `extern "C"` entry point that derefs the
377/// raw handle must gate on this — a misaligned pointer produced
378/// by an over-eager `void *` cast in a foreign caller would be
379/// immediate UB on `&*handle`, even before the `is_null` check.
380#[inline]
381fn handle_is_valid(handle: *const NetHandle) -> bool {
382 !handle.is_null() && (handle as usize).is_multiple_of(std::mem::align_of::<NetHandle>())
383}
384
385/// Error codes returned by FFI functions.
386#[repr(C)]
387pub enum NetError {
388 /// Success (no error).
389 Success = 0,
390 /// Null pointer passed.
391 NullPointer = -1,
392 /// Invalid UTF-8 string.
393 InvalidUtf8 = -2,
394 /// Invalid JSON.
395 InvalidJson = -3,
396 /// Initialization failed.
397 InitFailed = -4,
398 /// Ingestion failed (backpressure).
399 IngestionFailed = -5,
400 /// Poll failed.
401 PollFailed = -6,
402 /// Buffer too small.
403 BufferTooSmall = -7,
404 /// Shutting down.
405 ShuttingDown = -8,
406 /// Integer overflow: result does not fit in `c_int`.
407 IntOverflow = -9,
408 /// Stream handle does not belong to the supplied node handle.
409 /// Previously the send-family FFIs accepted any (stream, node)
410 /// pair without verifying they were created from the same node,
411 /// allowing silent cross-session traffic.
412 MismatchedHandles = -10,
413 /// `CString::new` failure: the input bytes are valid UTF-8 by
414 /// Rust's `String` invariant but contain an interior NUL byte
415 /// — and the C ABI cannot represent that, since C strings are
416 /// NUL-terminated. Pre-fix this was reported as
417 /// `InvalidUtf8`, which was wrong: the input is UTF-8-valid;
418 /// it just has a NUL where C expects it not to. A binding
419 /// reading the typed error and seeing "invalid UTF-8" would
420 /// chase the wrong cause.
421 InteriorNul = -11,
422 /// Unknown error.
423 Unknown = -99,
424}
425
426impl From<NetError> for c_int {
427 fn from(e: NetError) -> Self {
428 e as c_int
429 }
430}
431
432/// Enter an FFI operation with lifetime protection. Returns an `FfiOpGuard`
433/// that prevents `net_shutdown` from deallocating the handle until the guard
434/// is dropped. Returns `Err` with the error code if shutdown is in progress.
435#[inline]
436fn enter_ffi_op(handle: &NetHandle) -> Result<FfiOpGuard<'_>, c_int> {
437 FfiOpGuard::try_enter(handle).ok_or(NetError::ShuttingDown.into())
438}
439
440/// Initialize a new event bus.
441///
442/// # Parameters
443///
444/// - `config_json`: JSON configuration string (UTF-8, null-terminated).
445/// Pass NULL or empty string for default configuration.
446///
447/// # Returns
448///
449/// Opaque handle to the event bus, or NULL on failure.
450/// The handle must be freed with `net_shutdown`.
451///
452/// # Example Configuration
453///
454/// ```json
455/// {
456/// "num_shards": 8,
457/// "ring_buffer_capacity": 1048576,
458/// "backpressure_mode": "DropOldest",
459/// "batch": {
460/// "min_size": 1000,
461/// "max_size": 10000,
462/// "max_delay_ms": 10
463/// }
464/// }
465/// ```
466#[unsafe(no_mangle)]
467pub unsafe extern "C" fn net_init(config_json: *const c_char) -> *mut NetHandle {
468 // Parse and validate the config BEFORE constructing the tokio
469 // runtime. Building the runtime first would let any subsequent
470 // early-return path (`CStr::to_str` Err, `parse_config_json`
471 // returning None, `EventBus::new` returning Err) drop the
472 // local `Runtime` on function return. Dropping a multi-thread
473 // tokio runtime from inside ANOTHER tokio runtime's worker
474 // thread panics with "Cannot drop a runtime in a context where
475 // blocking is not allowed", unwinding across this `extern "C"`
476 // boundary into a Python / Go-cgo / NAPI / PyO3 caller —
477 // undefined behaviour. By validating inputs first, the runtime
478 // is only built once we know it will be installed into the
479 // `NetHandle` and survive the call.
480 let config = if config_json.is_null() {
481 EventBusConfig::default()
482 } else {
483 let config_str = match unsafe { CStr::from_ptr(config_json) }.to_str() {
484 Ok("") => EventBusConfig::default(),
485 Ok(s) => match parse_config_json(s) {
486 Some(cfg) => cfg,
487 None => return ptr::null_mut(),
488 },
489 Err(_) => return ptr::null_mut(),
490 };
491 config_str
492 };
493
494 // Now construct the runtime — its lifetime is tied to the
495 // returned `NetHandle` (via `create_with_config`), so the only
496 // remaining drop is on `net_shutdown`, which already handles
497 // it via `runtime.block_on(...)` (see #74) outside any other
498 // tokio context.
499 let runtime = match Runtime::new() {
500 Ok(rt) => rt,
501 Err(_) => return ptr::null_mut(),
502 };
503
504 create_with_config(runtime, config)
505}
506
507/// Parse JSON configuration into EventBusConfig.
508///
509/// Supports:
510/// - `num_shards`: number of shards
511/// - `ring_buffer_capacity`: ring buffer size per shard
512/// - `backpressure_mode`: "DropNewest", "DropOldest", "FailProducer"
513fn parse_config_json(json_str: &str) -> Option<EventBusConfig> {
514 let value: serde_json::Value = serde_json::from_str(json_str).ok()?;
515
516 let mut builder = EventBusConfig::builder();
517
518 if let Some(num_shards) = value.get("num_shards").and_then(|v| v.as_u64()) {
519 let num_shards = u16::try_from(num_shards).ok()?;
520 builder = builder.num_shards(num_shards);
521 }
522
523 if let Some(capacity) = value.get("ring_buffer_capacity").and_then(|v| v.as_u64()) {
524 let capacity = usize::try_from(capacity).ok()?;
525 builder = builder.ring_buffer_capacity(capacity);
526 }
527
528 if let Some(bp_value) = value.get("backpressure_mode") {
529 let bp_mode = if let Some(mode) = bp_value.as_str() {
530 match mode {
531 "DropNewest" | "drop_newest" => crate::config::BackpressureMode::DropNewest,
532 "DropOldest" | "drop_oldest" => crate::config::BackpressureMode::DropOldest,
533 "FailProducer" | "fail_producer" => crate::config::BackpressureMode::FailProducer,
534 // Pre-fix every other string silently fell back to
535 // `DropNewest`. A typo (`"DropOldset"`) thus
536 // changed durability profile at deploy time with
537 // no error. Reject unknowns to match the contract
538 // already enforced by `parse_poll_request_json`.
539 _ => return None,
540 }
541 } else if let Some(obj) = bp_value.as_object() {
542 // Object form: `{"Sample": {"rate": N}}` for the
543 // sampling mode that has an associated value.
544 if let Some(sample) = obj.get("Sample").or_else(|| obj.get("sample")) {
545 let rate = sample.get("rate").and_then(|v| v.as_u64())?;
546 let rate = u32::try_from(rate).ok()?;
547 if rate == 0 {
548 // Validated again by `EventBusConfig::validate`,
549 // but reject earlier so the parser surface
550 // matches the validator surface.
551 return None;
552 }
553 crate::config::BackpressureMode::Sample { rate }
554 } else {
555 return None;
556 }
557 } else {
558 return None;
559 };
560 builder = builder.backpressure_mode(bp_mode);
561 }
562
563 // Parse Redis config
564 #[cfg(feature = "redis")]
565 if let Some(redis) = value.get("redis") {
566 if let Some(url) = redis.get("url").and_then(|v| v.as_str()) {
567 let mut redis_config = RedisAdapterConfig::new(url);
568
569 if let Some(prefix) = redis.get("prefix").and_then(|v| v.as_str()) {
570 redis_config = redis_config.with_prefix(prefix);
571 }
572 if let Some(max_len) = redis.get("max_stream_len").and_then(|v| v.as_u64()) {
573 let max_len = usize::try_from(max_len).ok()?;
574 redis_config = redis_config.with_max_stream_len(max_len);
575 }
576 if let Some(pipeline_size) = redis.get("pipeline_size").and_then(|v| v.as_u64()) {
577 let pipeline_size = usize::try_from(pipeline_size).ok()?;
578 redis_config = redis_config.with_pipeline_size(pipeline_size);
579 }
580
581 builder = builder.adapter(AdapterConfig::Redis(redis_config));
582 }
583 }
584
585 // Parse JetStream config
586 #[cfg(feature = "jetstream")]
587 if let Some(jetstream) = value.get("jetstream") {
588 if let Some(url) = jetstream.get("url").and_then(|v| v.as_str()) {
589 let mut js_config = JetStreamAdapterConfig::new(url);
590
591 if let Some(prefix) = jetstream.get("prefix").and_then(|v| v.as_str()) {
592 js_config = js_config.with_prefix(prefix);
593 }
594 if let Some(max_messages) = jetstream.get("max_messages").and_then(|v| v.as_i64()) {
595 js_config = js_config.with_max_messages(max_messages);
596 }
597 if let Some(replicas) = jetstream.get("replicas").and_then(|v| v.as_u64()) {
598 let replicas = usize::try_from(replicas).ok()?;
599 js_config = js_config.with_replicas(replicas);
600 }
601
602 builder = builder.adapter(AdapterConfig::JetStream(js_config));
603 }
604 }
605
606 // Parse Net config
607 #[cfg(feature = "net")]
608 if let Some(net) = value.get("net") {
609 let bind_addr: std::net::SocketAddr = net
610 .get("bind_addr")
611 .and_then(|v| v.as_str())
612 .and_then(|s| s.parse().ok())?;
613
614 let peer_addr: std::net::SocketAddr = net
615 .get("peer_addr")
616 .and_then(|v| v.as_str())
617 .and_then(|s| s.parse().ok())?;
618
619 let psk: [u8; 32] = net
620 .get("psk")
621 .and_then(|v| v.as_str())
622 .and_then(|s| hex::decode(s).ok())
623 .and_then(|v| v.try_into().ok())?;
624
625 let role = net
626 .get("role")
627 .and_then(|v| v.as_str())
628 .unwrap_or("initiator");
629
630 let mut net_config = match role {
631 "initiator" => {
632 let peer_pubkey: [u8; 32] = net
633 .get("peer_public_key")
634 .and_then(|v| v.as_str())
635 .and_then(|s| hex::decode(s).ok())
636 .and_then(|v| v.try_into().ok())?;
637 NetAdapterConfig::initiator(bind_addr, peer_addr, psk, peer_pubkey)
638 }
639 "responder" => {
640 let secret_key: [u8; 32] = net
641 .get("secret_key")
642 .and_then(|v| v.as_str())
643 .and_then(|s| hex::decode(s).ok())
644 .and_then(|v| v.try_into().ok())?;
645 let public_key: [u8; 32] = net
646 .get("public_key")
647 .and_then(|v| v.as_str())
648 .and_then(|s| hex::decode(s).ok())
649 .and_then(|v| v.try_into().ok())?;
650 let keypair = StaticKeypair::from_keys(secret_key, public_key);
651 NetAdapterConfig::responder(bind_addr, peer_addr, psk, keypair)
652 }
653 _ => return None,
654 };
655
656 // Apply optional settings
657 if let Some(reliability) = net.get("reliability").and_then(|v| v.as_str()) {
658 net_config = net_config.with_reliability(match reliability {
659 "light" => ReliabilityConfig::Light,
660 "full" => ReliabilityConfig::Full,
661 _ => ReliabilityConfig::None,
662 });
663 }
664
665 if let Some(pool_size) = net.get("packet_pool_size").and_then(|v| v.as_u64()) {
666 if let Ok(size) = usize::try_from(pool_size) {
667 net_config = net_config.with_pool_size(size);
668 }
669 }
670
671 // Reject `0` for `heartbeat_interval_ms` and
672 // `session_timeout_ms`. `EventBusConfig::validate` rejects
673 // zero `Duration`s for `cooldown`, `metrics_window`, etc.,
674 // but the Net adapter's JSON parser had no equivalent guard
675 // — a `0` here flowed through to `Duration::from_millis(0)`,
676 // which on the heartbeat path busy-loops the heartbeat task
677 // and saturates a CPU. Treat zero as a misconfig and refuse
678 // to build the bus, surfacing as `InvalidJson` so the FFI
679 // caller sees a typed failure rather than a hung daemon.
680 if let Some(interval_ms) = net.get("heartbeat_interval_ms").and_then(|v| v.as_u64()) {
681 if interval_ms == 0 {
682 return None;
683 }
684 net_config =
685 net_config.with_heartbeat_interval(std::time::Duration::from_millis(interval_ms));
686 }
687
688 if let Some(timeout_ms) = net.get("session_timeout_ms").and_then(|v| v.as_u64()) {
689 if timeout_ms == 0 {
690 return None;
691 }
692 net_config =
693 net_config.with_session_timeout(std::time::Duration::from_millis(timeout_ms));
694 }
695
696 if let Some(batched) = net.get("batched_io").and_then(|v| v.as_bool()) {
697 net_config = net_config.with_batched_io(batched);
698 }
699
700 builder = builder.adapter(AdapterConfig::Net(Box::new(net_config)));
701 }
702
703 builder.build().ok()
704}
705
706fn create_with_config(runtime: Runtime, config: EventBusConfig) -> *mut NetHandle {
707 let bus = match runtime.block_on(EventBus::new(config)) {
708 Ok(bus) => bus,
709 Err(_) => {
710 // Send the runtime off to a fresh OS thread for
711 // dropping. Dropping a multi-thread tokio `Runtime`
712 // from inside another tokio runtime's worker thread
713 // panics ("Cannot drop a runtime in a context where
714 // blocking is not allowed"); a panic here would unwind
715 // across this `extern "C"` frame. The fresh thread
716 // guarantees a non-tokio context, so the drop is sound
717 // regardless of the caller's runtime environment. We
718 // don't `join()` the thread — the drop completes on
719 // its own and the caller has already been told
720 // `net_init` failed (returning null).
721 std::thread::spawn(move || drop(runtime));
722 return ptr::null_mut();
723 }
724 };
725
726 let handle = Box::new(NetHandle {
727 bus: std::mem::ManuallyDrop::new(bus),
728 runtime: std::mem::ManuallyDrop::new(runtime),
729 shutting_down: std::sync::atomic::AtomicBool::new(false),
730 active_ops: std::sync::atomic::AtomicU32::new(0),
731 bus_taken: std::sync::atomic::AtomicBool::new(false),
732 shutdown_completed: std::sync::atomic::AtomicBool::new(false),
733 });
734
735 Box::into_raw(handle)
736}
737
738/// Ingest a single event.
739///
740/// # Parameters
741///
742/// - `handle`: Event bus handle from `net_init`.
743/// - `event_json`: JSON event string (UTF-8).
744/// - `len`: Length of the event string in bytes.
745///
746/// # Returns
747///
748/// - `0` on success
749/// - Negative error code on failure
750#[unsafe(no_mangle)]
751pub unsafe extern "C" fn net_ingest(
752 handle: *mut NetHandle,
753 event_json: *const c_char,
754 len: usize,
755) -> c_int {
756 if !handle_is_valid(handle) || event_json.is_null() {
757 return NetError::NullPointer.into();
758 }
759
760 let handle = unsafe { &*handle };
761 let _guard = match enter_ffi_op(handle) {
762 Ok(g) => g,
763 Err(err) => return err,
764 };
765
766 // `slice::from_raw_parts` requires `len <= isize::MAX`. A
767 // C caller passing a sign-extended `-1` (or any
768 // `len > isize::MAX as usize`) triggers immediate UB before
769 // any other validation runs. Reject such inputs explicitly
770 // — caller should never see this in practice; surfacing a
771 // typed error is safer than UB.
772 if len > isize::MAX as usize {
773 return NetError::InvalidJson.into();
774 }
775 // Parse event JSON
776 let json_bytes = unsafe { std::slice::from_raw_parts(event_json as *const u8, len) };
777 let json_str = match std::str::from_utf8(json_bytes) {
778 Ok(s) => s,
779 Err(_) => return NetError::InvalidUtf8.into(),
780 };
781
782 let event = match Event::from_str(json_str) {
783 Ok(e) => e,
784 Err(_) => return NetError::InvalidJson.into(),
785 };
786
787 // Ingest
788 match handle.bus.ingest(event) {
789 Ok(_) => NetError::Success.into(),
790 Err(_) => NetError::IngestionFailed.into(),
791 }
792}
793
794/// Ingest a raw JSON string (fastest path).
795///
796/// The JSON string is stored directly without parsing.
797/// This is the recommended method for high-throughput ingestion.
798///
799/// # Parameters
800///
801/// - `handle`: Event bus handle from `net_init`.
802/// - `json`: JSON string (UTF-8).
803/// - `len`: Length of the JSON string in bytes.
804///
805/// # Returns
806///
807/// - `0` on success
808/// - Negative error code on failure
809#[unsafe(no_mangle)]
810pub unsafe extern "C" fn net_ingest_raw(
811 handle: *mut NetHandle,
812 json: *const c_char,
813 len: usize,
814) -> c_int {
815 if !handle_is_valid(handle) || json.is_null() {
816 return NetError::NullPointer.into();
817 }
818
819 let handle = unsafe { &*handle };
820 let _guard = match enter_ffi_op(handle) {
821 Ok(g) => g,
822 Err(err) => return err,
823 };
824
825 // `slice::from_raw_parts` requires `len <= isize::MAX`.
826 if len > isize::MAX as usize {
827 return NetError::InvalidJson.into();
828 }
829 let json_bytes = unsafe { std::slice::from_raw_parts(json as *const u8, len) };
830 let json_str = match std::str::from_utf8(json_bytes) {
831 Ok(s) => s,
832 Err(_) => return NetError::InvalidUtf8.into(),
833 };
834
835 let raw = RawEvent::from_str(json_str);
836
837 match handle.bus.ingest_raw(raw) {
838 Ok(_) => NetError::Success.into(),
839 Err(_) => NetError::IngestionFailed.into(),
840 }
841}
842
843/// Ingest multiple raw JSON strings (fastest batch path).
844///
845/// # Parameters
846///
847/// - `handle`: Event bus handle.
848/// - `jsons`: Array of pointers to JSON strings.
849/// - `lens`: Array of lengths for each JSON string.
850/// - `count`: Number of events in the arrays.
851///
852/// # Returns
853///
854/// Number of successfully ingested events, or negative error code.
855#[unsafe(no_mangle)]
856pub unsafe extern "C" fn net_ingest_raw_batch(
857 handle: *mut NetHandle,
858 jsons: *const *const c_char,
859 lens: *const usize,
860 count: usize,
861) -> c_int {
862 if !handle_is_valid(handle) || jsons.is_null() || lens.is_null() {
863 return NetError::NullPointer.into();
864 }
865 if count == 0 {
866 return 0;
867 }
868
869 let handle = unsafe { &*handle };
870 let _guard = match enter_ffi_op(handle) {
871 Ok(g) => g,
872 Err(err) => return err,
873 };
874 let mut events = Vec::with_capacity(count);
875 // Track per-entry drops so the caller's accounting can
876 // reconcile the returned count against the input count.
877 // Pre-fix per-entry rejects (null pointer, oversized length,
878 // invalid UTF-8) were silently `continue`-d and the caller
879 // saw `count - drops` accepted events without any signal as
880 // to which input indices were dropped. A binding that
881 // attributed the drop to back-pressure and retried got the
882 // wrong indices and double-published the good ones.
883 //
884 // The C-API contract is "returns count of accepted events";
885 // expanding it to take an out-param of dropped indices is
886 // an API addition, not a fix-in-place. Emit `tracing::warn!`
887 // with the offending index AND reason so operators
888 // observing the bus can correlate drop counts to specific
889 // inputs without changing the C surface. For high-volume
890 // bindings this should still be sized at one log line per
891 // dropped entry; if that ever matters in practice the
892 // `*_ex` follow-up can return the indices structurally.
893 let mut dropped_null = 0usize;
894 let mut dropped_oversize = 0usize;
895 let mut dropped_invalid_utf8 = 0usize;
896
897 for i in 0..count {
898 let json_ptr = unsafe { *jsons.add(i) };
899 let len = unsafe { *lens.add(i) };
900
901 if json_ptr.is_null() {
902 tracing::warn!(
903 index = i,
904 "net_ingest_raw_batch: dropping entry with null pointer"
905 );
906 dropped_null += 1;
907 continue;
908 }
909
910 // `slice::from_raw_parts` requires `len <= isize::MAX`.
911 // Skip pathological per-entry lengths rather than UB.
912 if len > isize::MAX as usize {
913 tracing::warn!(
914 index = i,
915 len,
916 "net_ingest_raw_batch: dropping entry with len > isize::MAX"
917 );
918 dropped_oversize += 1;
919 continue;
920 }
921 let json_bytes = unsafe { std::slice::from_raw_parts(json_ptr as *const u8, len) };
922 match std::str::from_utf8(json_bytes) {
923 Ok(json_str) => events.push(RawEvent::from_str(json_str)),
924 Err(_) => {
925 tracing::warn!(
926 index = i,
927 "net_ingest_raw_batch: dropping entry with invalid UTF-8"
928 );
929 dropped_invalid_utf8 += 1;
930 }
931 }
932 }
933 let total_dropped = dropped_null + dropped_oversize + dropped_invalid_utf8;
934 if total_dropped > 0 {
935 // Aggregate summary for log-pipeline filters that fold
936 // per-index lines.
937 tracing::warn!(
938 input_count = count,
939 dropped_null,
940 dropped_oversize,
941 dropped_invalid_utf8,
942 "net_ingest_raw_batch: {} of {} entries dropped before ingest",
943 total_dropped,
944 count,
945 );
946 }
947
948 let count = handle.bus.ingest_raw_batch(events);
949 // Returning `c_int::MAX` on overflow would be ambiguous with a real
950 // `INT_MAX` ingest. Signal overflow explicitly so callers doing
951 // accounting in high-throughput paths do not silently miscount.
952 c_int::try_from(count).unwrap_or_else(|_| NetError::IntOverflow.into())
953}
954
955/// Ingest multiple events.
956///
957/// # Parameters
958///
959/// - `handle`: Event bus handle.
960/// - `events_json`: JSON array of events (UTF-8, null-terminated).
961///
962/// # Returns
963///
964/// Number of successfully ingested events, or negative error code.
965#[unsafe(no_mangle)]
966pub unsafe extern "C" fn net_ingest_batch(
967 handle: *mut NetHandle,
968 events_json: *const c_char,
969) -> c_int {
970 if !handle_is_valid(handle) || events_json.is_null() {
971 return NetError::NullPointer.into();
972 }
973
974 let handle = unsafe { &*handle };
975 let _guard = match enter_ffi_op(handle) {
976 Ok(g) => g,
977 Err(err) => return err,
978 };
979
980 let json_str = match unsafe { CStr::from_ptr(events_json) }.to_str() {
981 Ok(s) => s,
982 Err(_) => return NetError::InvalidUtf8.into(),
983 };
984
985 // Parse as JSON array
986 let array: Vec<serde_json::Value> = match serde_json::from_str(json_str) {
987 Ok(a) => a,
988 Err(_) => return NetError::InvalidJson.into(),
989 };
990
991 let events: Vec<Event> = array.into_iter().map(Event::new).collect();
992 let count = handle.bus.ingest_batch(events);
993
994 // Returning `c_int::MAX` on overflow would be ambiguous with a real
995 // `INT_MAX` ingest. Signal overflow explicitly — matches the
996 // `net_ingest_raw_batch` contract.
997 c_int::try_from(count).unwrap_or_else(|_| NetError::IntOverflow.into())
998}
999
1000/// Parse the JSON request body passed to `net_poll` into a
1001/// `ConsumeRequest`. Returns the negative `NetError` code on parse
1002/// failure so the caller can surface it back across FFI. Both `limit`
1003/// and `cursor` are optional, but if either key is present with the
1004/// wrong JSON type it is an explicit error — silently falling back to
1005/// the default would hide caller bugs (e.g. the Go binding that
1006/// previously serialized `cursor` but had it dropped server-side).
1007fn parse_poll_request_json(json_str: &str) -> Result<ConsumeRequest, c_int> {
1008 let value: serde_json::Value =
1009 serde_json::from_str(json_str).map_err(|_| c_int::from(NetError::InvalidJson))?;
1010
1011 let limit = match value.get("limit") {
1012 None | Some(serde_json::Value::Null) => 100usize,
1013 Some(v) => match v.as_u64() {
1014 // `as usize` would silently truncate on 32-bit targets for
1015 // values above `usize::MAX`. Reject such inputs explicitly
1016 // so a caller asking for e.g. 2^33 events on a wasm32
1017 // build gets `InvalidJson` instead of a tiny wrap-around.
1018 Some(n) => usize::try_from(n).map_err(|_| c_int::from(NetError::InvalidJson))?,
1019 None => return Err(NetError::InvalidJson.into()),
1020 },
1021 };
1022 let cursor = match value.get("cursor") {
1023 None | Some(serde_json::Value::Null) => None,
1024 Some(v) => match v.as_str() {
1025 Some(s) => Some(s.to_owned()),
1026 None => return Err(NetError::InvalidJson.into()),
1027 },
1028 };
1029 let mut req = ConsumeRequest::new(limit);
1030 req.from_id = cursor;
1031 Ok(req)
1032}
1033
1034/// Poll events from the bus.
1035///
1036/// # Parameters
1037///
1038/// - `handle`: Event bus handle.
1039/// - `request_json`: JSON request string (UTF-8, null-terminated).
1040/// Example: `{"limit": 100, "ordering": "InsertionTs"}`
1041/// - `out_buffer`: Output buffer for JSON response.
1042/// - `buffer_len`: Size of the output buffer.
1043///
1044/// # Returns
1045///
1046/// - Number of bytes written to buffer on success
1047/// - Negative error code on failure
1048#[unsafe(no_mangle)]
1049pub unsafe extern "C" fn net_poll(
1050 handle: *mut NetHandle,
1051 request_json: *const c_char,
1052 out_buffer: *mut c_char,
1053 buffer_len: usize,
1054) -> c_int {
1055 if !handle_is_valid(handle) || out_buffer.is_null() {
1056 return NetError::NullPointer.into();
1057 }
1058
1059 let handle = unsafe { &*handle };
1060 let _guard = match enter_ffi_op(handle) {
1061 Ok(g) => g,
1062 Err(err) => return err,
1063 };
1064
1065 // Parse request
1066 let request = if request_json.is_null() {
1067 ConsumeRequest::new(100)
1068 } else {
1069 let json_str = match unsafe { CStr::from_ptr(request_json) }.to_str() {
1070 Ok(s) => s,
1071 Err(_) => return NetError::InvalidUtf8.into(),
1072 };
1073 match parse_poll_request_json(json_str) {
1074 Ok(req) => req,
1075 Err(code) => return code,
1076 }
1077 };
1078
1079 // Reject buffers too small to even hold an empty-response
1080 // JSON envelope. This catches the degenerate "tiny buffer"
1081 // case before we hit the adapter — `BufferTooSmall` returned
1082 // here means "no work was done, caller's cursor is unchanged."
1083 // 256 bytes comfortably fits the empty-response JSON below
1084 // even with a long echoed `next_id` cursor.
1085 const MIN_RESPONSE_BUFFER: usize = 256;
1086 if buffer_len < MIN_RESPONSE_BUFFER {
1087 return NetError::BufferTooSmall.into();
1088 }
1089
1090 // Stash the cursor before moving `request` into `poll()` so
1091 // the post-poll fallback can echo it back to the caller. On
1092 // overflow we write a minimal "no events delivered, cursor
1093 // unchanged" response so the caller's next poll re-fetches
1094 // the same range — events are not lost on idempotent
1095 // adapters (Redis XRANGE, JetStream direct_get).
1096 let cursor_snapshot = request.from_id.clone();
1097
1098 // Poll
1099 let response = match handle.runtime.block_on(handle.bus.poll(request)) {
1100 Ok(r) => r,
1101 Err(_) => return NetError::PollFailed.into(),
1102 };
1103
1104 // Serialize response. Events that fail to parse are included as raw
1105 // strings so the caller can see all events and detect parse failures.
1106 let total_events = response.events.len();
1107 let mut parsed_events: Vec<serde_json::Value> = Vec::with_capacity(total_events);
1108 let mut parse_errors: usize = 0;
1109 for e in &response.events {
1110 match e.parse() {
1111 Ok(v) => parsed_events.push(v),
1112 Err(_) => {
1113 parse_errors += 1;
1114 // Include the raw bytes as a string so the caller doesn't silently lose events
1115 if let Ok(raw) = e.raw_str() {
1116 parsed_events.push(serde_json::Value::String(raw.to_string()));
1117 }
1118 }
1119 }
1120 }
1121 let response_json = match serde_json::to_string(&serde_json::json!({
1122 "events": parsed_events,
1123 "next_id": response.next_id,
1124 "has_more": response.has_more,
1125 "count": parsed_events.len(),
1126 "parse_errors": parse_errors,
1127 })) {
1128 Ok(s) => s,
1129 Err(_) => return NetError::Unknown.into(),
1130 };
1131
1132 // Buffer overflow: emit a minimal fallback response that echoes
1133 // the caller's original cursor as `next_id`. The caller's next
1134 // poll runs against the same range and re-delivers the events
1135 // (idempotent on Redis XRANGE / JetStream direct_get). Without
1136 // this, a caller that trusts `next_id` blindly would advance
1137 // past the unread batch.
1138 if response_json.len() + 1 > buffer_len {
1139 let fallback = serde_json::to_string(&serde_json::json!({
1140 "events": [],
1141 "next_id": cursor_snapshot,
1142 "has_more": true,
1143 "count": 0,
1144 "parse_errors": 0,
1145 "buffer_too_small": true,
1146 "events_dropped": total_events,
1147 }))
1148 .unwrap_or_else(|_| String::from(
1149 r#"{"events":[],"next_id":null,"has_more":true,"count":0,"parse_errors":0,"buffer_too_small":true}"#
1150 ));
1151 if fallback.len() < buffer_len {
1152 unsafe {
1153 ptr::copy_nonoverlapping(
1154 fallback.as_ptr() as *const c_char,
1155 out_buffer,
1156 fallback.len(),
1157 );
1158 *out_buffer.add(fallback.len()) = 0;
1159 }
1160 }
1161 return NetError::BufferTooSmall.into();
1162 }
1163
1164 // Copy to output buffer
1165 unsafe {
1166 ptr::copy_nonoverlapping(
1167 response_json.as_ptr() as *const c_char,
1168 out_buffer,
1169 response_json.len(),
1170 );
1171 *out_buffer.add(response_json.len()) = 0; // Null terminate
1172 }
1173
1174 // Data was already copied into the caller's buffer; a
1175 // `c_int` overflow here means the byte count exceeds c_int's
1176 // range, NOT that the buffer was too small. Returning
1177 // `BufferTooSmall` would tell the caller to "resize and retry"
1178 // when retrying can't fix the actual condition. `IntOverflow`
1179 // is the documented variant for this case.
1180 match c_int::try_from(response_json.len()) {
1181 Ok(n) => n,
1182 Err(_) => NetError::IntOverflow.into(),
1183 }
1184}
1185
1186/// Get event bus statistics.
1187///
1188/// # Parameters
1189///
1190/// - `handle`: Event bus handle.
1191/// - `out_buffer`: Output buffer for JSON statistics.
1192/// - `buffer_len`: Size of the output buffer.
1193///
1194/// # Returns
1195///
1196/// Number of bytes written, or negative error code.
1197#[unsafe(no_mangle)]
1198pub unsafe extern "C" fn net_stats(
1199 handle: *mut NetHandle,
1200 out_buffer: *mut c_char,
1201 buffer_len: usize,
1202) -> c_int {
1203 if !handle_is_valid(handle) || out_buffer.is_null() {
1204 return NetError::NullPointer.into();
1205 }
1206
1207 let handle = unsafe { &*handle };
1208 let _guard = match enter_ffi_op(handle) {
1209 Ok(g) => g,
1210 Err(err) => return err,
1211 };
1212 let stats = handle.bus.stats();
1213 let shard_stats = handle.bus.shard_stats();
1214
1215 let stats_json = match serde_json::to_string(&serde_json::json!({
1216 "events_ingested": stats.events_ingested.load(std::sync::atomic::Ordering::Relaxed),
1217 "events_dropped": stats.events_dropped.load(std::sync::atomic::Ordering::Relaxed),
1218 "batches_dispatched": stats.batches_dispatched.load(std::sync::atomic::Ordering::Relaxed),
1219 "shard_events_ingested": shard_stats.events_ingested,
1220 "shard_events_dropped": shard_stats.events_dropped,
1221 "shard_batches_dispatched": shard_stats.batches_dispatched,
1222 })) {
1223 Ok(s) => s,
1224 Err(_) => return NetError::Unknown.into(),
1225 };
1226
1227 if stats_json.len() + 1 > buffer_len {
1228 return NetError::BufferTooSmall.into();
1229 }
1230
1231 unsafe {
1232 ptr::copy_nonoverlapping(
1233 stats_json.as_ptr() as *const c_char,
1234 out_buffer,
1235 stats_json.len(),
1236 );
1237 *out_buffer.add(stats_json.len()) = 0;
1238 }
1239
1240 // See net_poll above — the data was already copied, so an
1241 // overflowing length is `IntOverflow`, not `BufferTooSmall`.
1242 match c_int::try_from(stats_json.len()) {
1243 Ok(n) => n,
1244 Err(_) => NetError::IntOverflow.into(),
1245 }
1246}
1247
1248/// Flush all pending batches to the adapter.
1249///
1250/// # Parameters
1251///
1252/// - `handle`: Event bus handle.
1253///
1254/// # Returns
1255///
1256/// - `0` on success
1257/// - Negative error code on failure
1258#[unsafe(no_mangle)]
1259pub unsafe extern "C" fn net_flush(handle: *mut NetHandle) -> c_int {
1260 if !handle_is_valid(handle) {
1261 return NetError::NullPointer.into();
1262 }
1263
1264 let handle = unsafe { &*handle };
1265 let _guard = match enter_ffi_op(handle) {
1266 Ok(g) => g,
1267 Err(err) => return err,
1268 };
1269
1270 match handle.runtime.block_on(handle.bus.flush()) {
1271 Ok(_) => NetError::Success.into(),
1272 Err(_) => NetError::Unknown.into(),
1273 }
1274}
1275
1276/// Shut down the event bus and free resources.
1277///
1278/// # Parameters
1279///
1280/// - `handle`: Event bus handle. After this call, the handle is invalid.
1281///
1282/// # Returns
1283///
1284/// - `0` on success
1285/// - Negative error code on failure (including `Unknown` if the
1286/// bounded wait for in-flight FFI operations expired before the bus
1287/// could be shut down cleanly)
1288///
1289/// # Notes
1290///
1291/// The handle's storage is intentionally leaked: the box is never
1292/// returned to the allocator. See `NetHandle`'s docs for why. This is
1293/// a one-time cost per shutdown — typically per-process, since most C
1294/// callers initialize the bus once and shut down once.
1295#[unsafe(no_mangle)]
1296pub unsafe extern "C" fn net_shutdown(handle: *mut NetHandle) -> c_int {
1297 if !handle_is_valid(handle) {
1298 return NetError::NullPointer.into();
1299 }
1300
1301 // Scope the `&NetHandle` borrow into an inner block so it is
1302 // verifiably out of scope before the
1303 // `ManuallyDrop::take(&mut (*handle).bus)` calls below.
1304 // Holding an `&NetHandle` in scope for the whole function
1305 // while taking a raw `&mut (*handle).bus` later would rely on
1306 // NLL ending the immutable borrow before the mutable take —
1307 // a pattern fragile under stacked/tree borrow models. The
1308 // block-scoped borrow makes the lifetime constraint explicit
1309 // and obvious to both the compiler and any future maintainer.
1310 let drained_and_taken = {
1311 // SAFETY: The C contract guarantees `handle` is valid here and that
1312 // `net_shutdown` is not called concurrently with itself. Future
1313 // dereferences of the box from concurrent FFI ops on other threads
1314 // are also sound because we never free the box (see below).
1315 let handle_ref = unsafe { &*handle };
1316
1317 // Signal shutdown so concurrent FFI calls bail before touching
1318 // `bus`/`runtime`. SeqCst pairs with `FfiOpGuard::try_enter`.
1319 handle_ref
1320 .shutting_down
1321 .store(true, std::sync::atomic::Ordering::SeqCst);
1322
1323 // Bounded wait for in-flight ops to drain. Without a deadline, a
1324 // hung concurrent operation (e.g. `net_flush` against a stalled
1325 // adapter) would pin a CPU at 100% inside this loop forever.
1326 //
1327 // `std::hint::spin_loop()` is a CPU pause hint, not a yield. On
1328 // a single-threaded executor (or any configuration where the FFI
1329 // caller's thread is the same one that needs to make progress on
1330 // the in-flight async work) the tight spin starves the very tokio
1331 // worker we're waiting for, *causing* the deadline to expire when
1332 // it otherwise wouldn't. `thread::yield_now` lets the OS schedule
1333 // whatever's blocked, and a 1ms `thread::sleep` between yields
1334 // prevents the loop from saturating a CPU on platforms where
1335 // `yield_now` is a
1336 // near-no-op under low contention. The drain we expect to take
1337 // milliseconds, so a millisecond-granularity poll is fine.
1338 let deadline = std::time::Instant::now() + FFI_SHUTDOWN_DEADLINE;
1339 let mut drained = false;
1340 loop {
1341 if handle_ref
1342 .active_ops
1343 .load(std::sync::atomic::Ordering::SeqCst)
1344 == 0
1345 {
1346 drained = true;
1347 break;
1348 }
1349 if std::time::Instant::now() >= deadline {
1350 break;
1351 }
1352 std::thread::yield_now();
1353 std::thread::sleep(std::time::Duration::from_millis(1));
1354 }
1355
1356 if !drained {
1357 // In-flight ops may still be reading `bus`/`runtime`; reading
1358 // them out via `ManuallyDrop::take` would race those readers.
1359 // Leak both fields along with the box. Future ops still see
1360 // `shutting_down=true` and bail before touching either field,
1361 // so the leaked memory is never read again.
1362 return NetError::Unknown.into();
1363 }
1364
1365 // Idempotent shutdown: if a previous `net_shutdown` already
1366 // moved out the bus/runtime, do not call `ManuallyDrop::take`
1367 // a second time (that would be UB). The first call may still
1368 // be inside `runtime.block_on(bus.shutdown())` though — pre-
1369 // fix the second caller observed `bus_taken == true` and
1370 // returned `Success` immediately, falsely signaling
1371 // completion of an in-progress shutdown. Spin on
1372 // `shutdown_completed` (set by the first caller AFTER
1373 // `bus.shutdown()` returns) so subsequent callers wait for
1374 // the actual completion.
1375 if handle_ref
1376 .bus_taken
1377 .swap(true, std::sync::atomic::Ordering::SeqCst)
1378 {
1379 // Wait for the first caller to actually finish.
1380 // Bounded by the same FFI_SHUTDOWN_DEADLINE as the
1381 // `active_ops` drain — if the first caller is wedged
1382 // longer than that, we surface a Transient error rather
1383 // than block forever.
1384 let inner_deadline = std::time::Instant::now() + FFI_SHUTDOWN_DEADLINE;
1385 while !handle_ref
1386 .shutdown_completed
1387 .load(std::sync::atomic::Ordering::Acquire)
1388 {
1389 if std::time::Instant::now() >= inner_deadline {
1390 return NetError::Unknown.into();
1391 }
1392 std::thread::yield_now();
1393 std::thread::sleep(std::time::Duration::from_millis(1));
1394 }
1395 return NetError::Success.into();
1396 }
1397 drained
1398 };
1399 let _ = drained_and_taken;
1400
1401 // SAFETY: `active_ops` reached zero with `shutting_down=true`, so:
1402 // - Every FFI op that started before shutdown has fully
1403 // completed (decremented `active_ops` on guard drop).
1404 // - Any future FFI op will observe `shutting_down=true` and
1405 // bail in `try_enter` before touching `bus` / `runtime`.
1406 // Plus, `bus_taken` was just CAS'd from false → true, so no other
1407 // shutdown is concurrently moving the same fields out. The
1408 // immutable `handle_ref` borrow above has been dropped (block
1409 // scope ended), so the `&mut`-via-raw-pointer below is the
1410 // only live access — no stacked/tree-borrow race.
1411 //
1412 // We deliberately do NOT call `Box::from_raw` here. The box's
1413 // `shutting_down` / `active_ops` / `bus_taken` atomics must remain
1414 // valid memory because future FFI ops still dereference the
1415 // C-side pointer to check them. Leaking the box is the
1416 // correctness fix for the previous use-after-free; the per-handle
1417 // storage cost is a one-time overhead.
1418 let bus = unsafe { std::mem::ManuallyDrop::take(&mut (*handle).bus) };
1419 let runtime = unsafe { std::mem::ManuallyDrop::take(&mut (*handle).runtime) };
1420
1421 // Flush pending batches and gracefully shut down the adapter
1422 // before dropping the runtime. Without this, pending events in
1423 // ring buffers and batch workers would be silently lost.
1424 let result = runtime.block_on(bus.shutdown());
1425
1426 // `bus` and `runtime` go out of scope here and are dropped.
1427 // The leaked box keeps the atomics alive for any straggler ops.
1428
1429 // Signal completion to any second/third caller spinning on
1430 // `shutdown_completed` in the idempotent path above. Done
1431 // AFTER `bus.shutdown()` returns and AFTER the bus / runtime
1432 // drop, so subsequent callers can rely on this flag as a
1433 // hard "shutdown is fully done" barrier.
1434 unsafe { &*handle }
1435 .shutdown_completed
1436 .store(true, std::sync::atomic::Ordering::Release);
1437
1438 match result {
1439 Ok(()) => NetError::Success.into(),
1440 Err(_) => NetError::Unknown.into(),
1441 }
1442}
1443
1444/// Get the number of shards.
1445///
1446/// # Parameters
1447///
1448/// - `handle`: Event bus handle.
1449///
1450/// # Returns
1451///
1452/// Number of shards, or 0 if handle is null.
1453#[unsafe(no_mangle)]
1454pub unsafe extern "C" fn net_num_shards(handle: *mut NetHandle) -> u16 {
1455 if !handle_is_valid(handle) {
1456 return 0;
1457 }
1458 let handle = unsafe { &*handle };
1459 let _guard = match enter_ffi_op(handle) {
1460 Ok(g) => g,
1461 Err(_) => return 0,
1462 };
1463 handle.bus.num_shards()
1464}
1465
1466/// Get the library version.
1467///
1468/// # Returns
1469///
1470/// Version string (static, do not free).
1471#[unsafe(no_mangle)]
1472pub unsafe extern "C" fn net_version() -> *const c_char {
1473 static VERSION: &[u8] = b"0.8.0\0";
1474 VERSION.as_ptr() as *const c_char
1475}
1476
1477/// Generate a new Net keypair.
1478///
1479/// # Returns
1480///
1481/// JSON string with hex-encoded public_key and secret_key.
1482/// The caller must free the returned string with `net_free_string`.
1483/// Returns NULL if Net feature is not enabled.
1484#[cfg(feature = "net")]
1485#[unsafe(no_mangle)]
1486pub unsafe extern "C" fn net_generate_keypair() -> *mut c_char {
1487 let keypair = StaticKeypair::generate();
1488 let json = serde_json::json!({
1489 "public_key": hex::encode(keypair.public_key()),
1490 "secret_key": hex::encode(keypair.secret_key()),
1491 });
1492
1493 match CString::new(json.to_string()) {
1494 Ok(s) => s.into_raw(),
1495 Err(_) => ptr::null_mut(),
1496 }
1497}
1498
1499/// Free a string returned by Net functions.
1500///
1501/// # Parameters
1502///
1503/// - `s`: String pointer returned by `net_generate_keypair` or similar.
1504#[cfg(feature = "net")]
1505#[unsafe(no_mangle)]
1506pub unsafe extern "C" fn net_free_string(s: *mut c_char) {
1507 if !s.is_null() {
1508 unsafe {
1509 drop(CString::from_raw(s));
1510 }
1511 }
1512}
1513
1514// `net.h` declares both `net_generate_keypair` and
1515// `net_free_string` unconditionally — a consumer linking against
1516// a cdylib built without the `net` feature would otherwise hit
1517// a load-time missing-symbol error despite the header advertising
1518// the symbol. Provide always-empty stubs so the symbol is
1519// resolvable on every build configuration. Mirrors the
1520// `nat-traversal` cfg pattern in `mesh.rs`.
1521
1522/// Stub for builds without the `net` feature.
1523///
1524/// `net.h` declares `net_generate_keypair` unconditionally, so
1525/// the symbol must be resolvable on every build configuration.
1526/// Returns NULL since keypair generation requires the net feature.
1527#[cfg(not(feature = "net"))]
1528#[unsafe(no_mangle)]
1529pub unsafe extern "C" fn net_generate_keypair() -> *mut c_char {
1530 ptr::null_mut()
1531}
1532
1533/// Stub for builds without the `net` feature.
1534///
1535/// Mirrors the always-on signature in `net.h`. Reclaims a
1536/// CString-allocated pointer if non-null.
1537#[cfg(not(feature = "net"))]
1538#[unsafe(no_mangle)]
1539pub unsafe extern "C" fn net_free_string(s: *mut c_char) {
1540 if !s.is_null() {
1541 unsafe {
1542 drop(std::ffi::CString::from_raw(s));
1543 }
1544 }
1545}
1546
1547// =========================================================================
1548// Structured (non-JSON) API — _ex variants
1549// =========================================================================
1550
1551/// Ingestion receipt for C consumers.
1552#[repr(C)]
1553pub struct NetReceipt {
1554 /// Shard the event was assigned to.
1555 pub shard_id: u16,
1556 /// Insertion timestamp (nanoseconds).
1557 pub timestamp: u64,
1558}
1559
1560// Pin layout invariants for `NetReceipt`. `#[repr(C)]` already
1561// gives C ABI compatibility per platform, but doesn't catch a
1562// future field-reorder or field-add — both would silently break
1563// any C/Go/Python binding that hard-codes the struct layout.
1564// Static asserts on 64-bit targets (the production deployment
1565// shape) trip CI before such a change reaches a binary release.
1566//
1567// 64-bit: `u16 (2) + 6 pad + u64 (8)` = 16 bytes, alignment 8.
1568#[cfg(target_pointer_width = "64")]
1569const _: () = assert!(
1570 std::mem::size_of::<NetReceipt>() == 16,
1571 "NetReceipt size changed on 64-bit; bindings hard-code 16. \
1572 If the change is intentional, bump the binding versions and \
1573 update this assertion."
1574);
1575#[cfg(target_pointer_width = "64")]
1576const _: () = assert!(
1577 std::mem::align_of::<NetReceipt>() == 8,
1578 "NetReceipt alignment changed on 64-bit; bindings expect 8."
1579);
1580
1581/// A single stored event for C consumers.
1582///
1583/// # Safety contract for callers
1584///
1585/// `id`/`id_len` and `raw`/`raw_len` are produced by Rust as a
1586/// `Box<[u8]>` whose fat-pointer length is reconstructed at free
1587/// time from `id_len` / `raw_len`. The fields are `pub` because
1588/// `#[repr(C)]` exposes them to C, **but they must be treated as
1589/// read-only** between the `net_poll_*` call that produced them
1590/// and the `net_free_poll_result` that consumes them.
1591///
1592/// Mutating `id_len` or `raw_len` (or copying the struct, replacing
1593/// the pointer, and then freeing) causes
1594/// `Box::from_raw(slice_from_raw_parts_mut(ptr, wrong_len))` to be
1595/// undefined behavior on free — the allocator records the
1596/// allocation size and any mismatch is UB.
1597#[repr(C)]
1598pub struct NetEvent {
1599 /// Event ID (not null-terminated, use `id_len`).
1600 /// Read-only after `net_poll_*`; do not mutate.
1601 pub id: *const c_char,
1602 /// Length of the event ID. Read-only after `net_poll_*`; do not
1603 /// mutate (mutation causes UB on free).
1604 pub id_len: usize,
1605 /// Raw JSON payload (not null-terminated, use `raw_len`).
1606 /// Read-only after `net_poll_*`; do not mutate.
1607 pub raw: *const c_char,
1608 /// Length of the raw JSON payload. Read-only after
1609 /// `net_poll_*`; do not mutate (mutation causes UB on free).
1610 pub raw_len: usize,
1611 /// Insertion timestamp (nanoseconds).
1612 pub insertion_ts: u64,
1613 /// Shard ID.
1614 pub shard_id: u16,
1615}
1616
1617// Pin layout invariants for `NetEvent`. See `NetReceipt`'s
1618// asserts for rationale. Bindings (C, Go, Python, Node) hard-
1619// code 48 bytes on 64-bit; an accidental reorder or new field
1620// would silently shift every offset.
1621//
1622// 64-bit: `4 × 8 (ptrs/usize) + u64 (8) + u16 (2) + 6 trail` = 48.
1623#[cfg(target_pointer_width = "64")]
1624const _: () = assert!(
1625 std::mem::size_of::<NetEvent>() == 48,
1626 "NetEvent size changed on 64-bit; bindings hard-code 48. \
1627 If the change is intentional, bump the binding versions and \
1628 update this assertion."
1629);
1630#[cfg(target_pointer_width = "64")]
1631const _: () = assert!(
1632 std::mem::align_of::<NetEvent>() == 8,
1633 "NetEvent alignment changed on 64-bit; bindings expect 8."
1634);
1635
1636/// Poll result for C consumers.
1637#[repr(C)]
1638pub struct NetPollResult {
1639 /// Array of events. Free with `net_free_poll_result`.
1640 pub events: *mut NetEvent,
1641 /// Number of events in the array.
1642 pub count: usize,
1643 /// Cursor for the next poll (null-terminated). NULL if no more.
1644 pub next_id: *mut c_char,
1645 /// 1 if more events are available, 0 otherwise.
1646 pub has_more: c_int,
1647}
1648
1649/// Stats for C consumers.
1650#[repr(C)]
1651pub struct NetStats {
1652 /// Total events ingested.
1653 pub events_ingested: u64,
1654 /// Events dropped due to backpressure.
1655 pub events_dropped: u64,
1656 /// Batches dispatched to the adapter.
1657 pub batches_dispatched: u64,
1658}
1659
1660/// Ingest raw JSON with structured receipt.
1661#[unsafe(no_mangle)]
1662pub unsafe extern "C" fn net_ingest_raw_ex(
1663 handle: *mut NetHandle,
1664 json: *const c_char,
1665 len: usize,
1666 out: *mut NetReceipt,
1667) -> c_int {
1668 if !handle_is_valid(handle) || json.is_null() {
1669 return NetError::NullPointer.into();
1670 }
1671
1672 let handle = unsafe { &*handle };
1673 let _guard = match enter_ffi_op(handle) {
1674 Ok(g) => g,
1675 Err(err) => return err,
1676 };
1677
1678 // `slice::from_raw_parts` requires `len <= isize::MAX`.
1679 if len > isize::MAX as usize {
1680 return NetError::InvalidJson.into();
1681 }
1682 let json_bytes = unsafe { std::slice::from_raw_parts(json as *const u8, len) };
1683 let json_str = match std::str::from_utf8(json_bytes) {
1684 Ok(s) => s,
1685 Err(_) => return NetError::InvalidUtf8.into(),
1686 };
1687
1688 let raw = RawEvent::from_str(json_str);
1689
1690 match handle.bus.ingest_raw(raw) {
1691 Ok((shard_id, timestamp)) => {
1692 if !out.is_null() {
1693 unsafe {
1694 (*out).shard_id = shard_id;
1695 (*out).timestamp = timestamp;
1696 }
1697 }
1698 NetError::Success.into()
1699 }
1700 Err(_) => NetError::IngestionFailed.into(),
1701 }
1702}
1703
1704/// Poll events with structured result (no JSON overhead).
1705///
1706/// The caller must free the result with `net_free_poll_result`.
1707#[unsafe(no_mangle)]
1708pub unsafe extern "C" fn net_poll_ex(
1709 handle: *mut NetHandle,
1710 limit: usize,
1711 cursor: *const c_char,
1712 out: *mut NetPollResult,
1713) -> c_int {
1714 if !handle_is_valid(handle) || out.is_null() {
1715 return NetError::NullPointer.into();
1716 }
1717
1718 // Pre-validate `limit` BEFORE calling `bus.poll` — the bus
1719 // advances the consumer cursor before returning, so any
1720 // post-poll allocation failure (e.g. `Layout::array::<NetEvent>`
1721 // overflow on a pathological `count`, or `std::alloc::alloc`
1722 // returning null under memory pressure) would drop the response
1723 // and lose every event the cursor just stepped past. Reject
1724 // requests whose `count * size_of::<NetEvent>` would overflow
1725 // `isize::MAX` (the `Layout::array` cap) up front, so the
1726 // failure happens before the cursor moves.
1727 if limit > 0
1728 && (std::mem::size_of::<NetEvent>())
1729 .checked_mul(limit)
1730 .is_none_or(|v| v > isize::MAX as usize)
1731 {
1732 return NetError::IntOverflow.into();
1733 }
1734
1735 let handle = unsafe { &*handle };
1736 let _guard = match enter_ffi_op(handle) {
1737 Ok(g) => g,
1738 Err(err) => return err,
1739 };
1740
1741 let mut request = ConsumeRequest::new(limit);
1742 if !cursor.is_null() {
1743 if let Ok(s) = unsafe { CStr::from_ptr(cursor) }.to_str() {
1744 if !s.is_empty() {
1745 request = request.from(s);
1746 }
1747 }
1748 }
1749
1750 let response = match handle.runtime.block_on(handle.bus.poll(request)) {
1751 Ok(r) => r,
1752 Err(_) => return NetError::PollFailed.into(),
1753 };
1754
1755 let count = response.events.len();
1756
1757 // Allocate events array.
1758 //
1759 // Each iteration allocates two boxed byte slices via
1760 // `Vec::to_vec().into_boxed_slice()`, which panic on OOM in
1761 // the global allocator. A panic across this `extern "C"`
1762 // body is UB — under the cgo/N-API/cffi unwind model the
1763 // panic propagates into a frame that doesn't expect it. Wrap
1764 // the per-event build in `catch_unwind`, track how many
1765 // events we've fully written, and on panic / mid-loop
1766 // failure free the partial array via `free_events_array`
1767 // so neither UB nor the partial allocations leak.
1768 let events_ptr = if count > 0 {
1769 let layout = match std::alloc::Layout::array::<NetEvent>(count) {
1770 Ok(l) => l,
1771 Err(_) => return NetError::Unknown.into(),
1772 };
1773 let ptr = unsafe { std::alloc::alloc(layout) as *mut NetEvent };
1774 if ptr.is_null() {
1775 return NetError::Unknown.into();
1776 }
1777
1778 // Shared counter so the outer scope can clean up partial
1779 // writes if any iteration panics.
1780 let completed = std::cell::Cell::new(0usize);
1781 let build_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1782 for (i, event) in response.events.iter().enumerate() {
1783 let id_bytes = event.id.as_bytes().to_vec().into_boxed_slice();
1784 let id_len = id_bytes.len();
1785 let id_ptr = Box::into_raw(id_bytes) as *const c_char;
1786
1787 let raw_bytes = event.raw.to_vec().into_boxed_slice();
1788 let raw_len = raw_bytes.len();
1789 let raw_ptr = Box::into_raw(raw_bytes) as *const c_char;
1790
1791 unsafe {
1792 ptr.add(i).write(NetEvent {
1793 id: id_ptr,
1794 id_len,
1795 raw: raw_ptr,
1796 raw_len,
1797 insertion_ts: event.insertion_ts,
1798 shard_id: event.shard_id,
1799 });
1800 }
1801 completed.set(i + 1);
1802 }
1803 }));
1804 if build_result.is_err() {
1805 // A panic landed mid-loop. Free fully-written events
1806 // (those past `completed.get()` were never written, so
1807 // the inner `id`/`raw` pointers aren't valid). The
1808 // events array was allocated for `count` NetEvent
1809 // slots, so the dealloc must use that same layout.
1810 free_events_array_partial(ptr, completed.get(), count);
1811 return NetError::Unknown.into();
1812 }
1813 ptr
1814 } else {
1815 ptr::null_mut()
1816 };
1817
1818 // Leak next_id if present.
1819 let next_id_ptr = match response.next_id {
1820 Some(ref s) => match std::ffi::CString::new(s.as_str()) {
1821 Ok(c) => c.into_raw(),
1822 Err(_) => {
1823 // Free already-allocated events before returning
1824 // error. `s.as_str()` is valid UTF-8 by `String`
1825 // invariant, so this is the interior-NUL path —
1826 // an upstream cursor id that contains `\0` cannot
1827 // round-trip through a C string. Pre-fix this
1828 // returned `InvalidUtf8`, which mis-described
1829 // the cause; bindings now see the more accurate
1830 // `InteriorNul`.
1831 free_events_array(events_ptr, count);
1832 return NetError::InteriorNul.into();
1833 }
1834 },
1835 None => ptr::null_mut(),
1836 };
1837
1838 unsafe {
1839 (*out).events = events_ptr;
1840 (*out).count = count;
1841 (*out).next_id = next_id_ptr;
1842 (*out).has_more = if response.has_more { 1 } else { 0 };
1843 }
1844
1845 NetError::Success.into()
1846}
1847
1848/// Free an events array and all its id/raw allocations.
1849///
1850/// `count` is the number of fully-written events (those whose
1851/// inner `id` / `raw` boxed slices were initialized). It must
1852/// also match the `Layout::array::<NetEvent>` used at allocation
1853/// time — every existing caller writes exactly `count` events
1854/// before invoking this function. For partial-cleanup paths
1855/// (e.g. panic mid-build), use [`free_events_array_partial`].
1856fn free_events_array(events: *mut NetEvent, count: usize) {
1857 free_events_array_partial(events, count, count);
1858}
1859
1860/// Free an events array where only `walk_count` entries have
1861/// fully-initialized `id`/`raw` allocations, but the array
1862/// itself was allocated for `alloc_count` slots. Per-event
1863/// boxes are freed for `0..walk_count`; the array is then
1864/// deallocated with the original `Layout::array::<NetEvent>(alloc_count)`
1865/// to match the allocation. Used by `net_poll_ex`'s panic-mid-loop
1866/// recovery path.
1867fn free_events_array_partial(events: *mut NetEvent, walk_count: usize, alloc_count: usize) {
1868 if events.is_null() || alloc_count == 0 {
1869 return;
1870 }
1871 for i in 0..walk_count {
1872 let event = unsafe { &*events.add(i) };
1873 if !event.id.is_null() {
1874 unsafe {
1875 let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(
1876 event.id as *mut u8,
1877 event.id_len,
1878 ));
1879 }
1880 }
1881 if !event.raw.is_null() {
1882 unsafe {
1883 let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(
1884 event.raw as *mut u8,
1885 event.raw_len,
1886 ));
1887 }
1888 }
1889 }
1890 if let Ok(layout) = std::alloc::Layout::array::<NetEvent>(alloc_count) {
1891 unsafe {
1892 std::alloc::dealloc(events as *mut u8, layout);
1893 }
1894 }
1895}
1896
1897/// Free the internal allocations of a poll result returned by `net_poll_ex`.
1898///
1899/// This frees the events array (including each event's `id` and `raw` buffers)
1900/// and the `next_id` string. It does **not** free the `NetPollResult` struct
1901/// itself, which is caller-provided (typically stack-allocated or managed by
1902/// the caller).
1903#[unsafe(no_mangle)]
1904pub unsafe extern "C" fn net_free_poll_result(result: *mut NetPollResult) {
1905 if result.is_null() {
1906 return;
1907 }
1908
1909 let result = unsafe { &mut *result };
1910
1911 // Free events array and all id/raw allocations.
1912 free_events_array(result.events, result.count);
1913
1914 // Free next_id.
1915 if !result.next_id.is_null() {
1916 unsafe {
1917 drop(std::ffi::CString::from_raw(result.next_id));
1918 }
1919 }
1920
1921 // Null the fields so a second `net_free_poll_result` on the
1922 // same struct is a safe no-op rather than a double-free. The
1923 // C header's contract just says "free a poll result"; without
1924 // this clear, a defensive caller calling free twice (or two
1925 // wrappers each calling free in their destructor) would
1926 // re-`Box::from_raw` an already-freed pointer.
1927 result.events = std::ptr::null_mut();
1928 result.count = 0;
1929 result.next_id = std::ptr::null_mut();
1930 result.has_more = 0;
1931}
1932
1933/// Get stats without JSON serialization.
1934#[unsafe(no_mangle)]
1935pub unsafe extern "C" fn net_stats_ex(handle: *mut NetHandle, out: *mut NetStats) -> c_int {
1936 if !handle_is_valid(handle) || out.is_null() {
1937 return NetError::NullPointer.into();
1938 }
1939
1940 let handle = unsafe { &*handle };
1941 let _guard = match enter_ffi_op(handle) {
1942 Ok(g) => g,
1943 Err(err) => return err,
1944 };
1945 let stats = handle.bus.stats();
1946
1947 unsafe {
1948 (*out).events_ingested = stats
1949 .events_ingested
1950 .load(std::sync::atomic::Ordering::Relaxed);
1951 (*out).events_dropped = stats
1952 .events_dropped
1953 .load(std::sync::atomic::Ordering::Relaxed);
1954 (*out).batches_dispatched = stats
1955 .batches_dispatched
1956 .load(std::sync::atomic::Ordering::Relaxed);
1957 }
1958
1959 NetError::Success.into()
1960}
1961
1962#[cfg(test)]
1963mod tests {
1964 use super::*;
1965
1966 #[test]
1967 fn test_parse_config_valid() {
1968 let config = parse_config_json(r#"{"num_shards": 8}"#);
1969 assert!(config.is_some());
1970 }
1971
1972 #[test]
1973 fn test_parse_config_num_shards_overflow() {
1974 // u16::MAX is 65535, so 65536 should fail
1975 let config = parse_config_json(r#"{"num_shards": 65536}"#);
1976 assert!(
1977 config.is_none(),
1978 "num_shards exceeding u16::MAX should fail"
1979 );
1980
1981 // Much larger value should also fail
1982 let config = parse_config_json(r#"{"num_shards": 100000}"#);
1983 assert!(
1984 config.is_none(),
1985 "num_shards exceeding u16::MAX should fail"
1986 );
1987 }
1988
1989 #[test]
1990 fn test_parse_config_num_shards_max_valid() {
1991 // u16::MAX (65535) should be valid
1992 let config = parse_config_json(r#"{"num_shards": 65535}"#);
1993 assert!(config.is_some(), "num_shards at u16::MAX should be valid");
1994 }
1995
1996 #[test]
1997 fn test_parse_config_invalid_json() {
1998 let config = parse_config_json(r#"{"num_shards": invalid}"#);
1999 assert!(config.is_none());
2000 }
2001
2002 #[test]
2003 fn test_parse_config_empty() {
2004 let config = parse_config_json(r#"{}"#);
2005 assert!(config.is_some(), "empty config should use defaults");
2006 }
2007
2008 /// Pin: known `backpressure_mode` strings round-trip; an
2009 /// unknown value (typo) is rejected with `None`, not silently
2010 /// downgraded to `DropNewest`. Pre-fix a deploy-time typo
2011 /// like `"DropOldset"` swapped the operator's intended
2012 /// durability for `DropNewest` with no diagnostic.
2013 #[test]
2014 fn parse_config_rejects_unknown_backpressure_mode() {
2015 // Known values still parse.
2016 for s in [
2017 "DropNewest",
2018 "drop_newest",
2019 "DropOldest",
2020 "drop_oldest",
2021 "FailProducer",
2022 "fail_producer",
2023 ] {
2024 let cfg = parse_config_json(&format!(r#"{{"backpressure_mode": "{}"}}"#, s));
2025 assert!(cfg.is_some(), "known mode `{}` must parse", s);
2026 }
2027
2028 // Typos must fail.
2029 for s in ["DropOldset", "FailProduce", "drop_oldst", "garbage", ""] {
2030 let cfg = parse_config_json(&format!(r#"{{"backpressure_mode": "{}"}}"#, s));
2031 assert!(
2032 cfg.is_none(),
2033 "unknown mode `{}` must reject (pre-fix this silently \
2034 fell through to DropNewest)",
2035 s,
2036 );
2037 }
2038
2039 // Wrong JSON type also fails — pre-fix this hit the
2040 // `and_then(|v| v.as_str())` short-circuit and was
2041 // ignored entirely.
2042 let cfg = parse_config_json(r#"{"backpressure_mode": 42}"#);
2043 assert!(
2044 cfg.is_none(),
2045 "non-string non-object backpressure_mode must reject"
2046 );
2047 let cfg = parse_config_json(r#"{"backpressure_mode": true}"#);
2048 assert!(cfg.is_none(), "boolean backpressure_mode must reject");
2049 }
2050
2051 /// Pin: the `Sample { rate }` mode is reachable from JSON
2052 /// via `{"backpressure_mode": {"Sample": {"rate": N}}}`,
2053 /// and a zero rate is rejected (validator already rejects
2054 /// it; the parser must too, so the surface is consistent).
2055 #[test]
2056 fn parse_config_supports_sample_mode_with_validation() {
2057 let cfg = parse_config_json(r#"{"backpressure_mode": {"Sample": {"rate": 10}}}"#);
2058 assert!(cfg.is_some(), "Sample with non-zero rate must parse");
2059
2060 let cfg = parse_config_json(r#"{"backpressure_mode": {"Sample": {"rate": 0}}}"#);
2061 assert!(cfg.is_none(), "Sample with rate=0 must reject");
2062
2063 let cfg = parse_config_json(r#"{"backpressure_mode": {"Sample": {}}}"#);
2064 assert!(cfg.is_none(), "Sample missing rate must reject");
2065 }
2066
2067 // Regression: the Go binding's `Poll(limit, cursor)` serializes a
2068 // `"cursor"` field that the FFI JSON path previously ignored —
2069 // cross-shard pagination silently broke. `parse_poll_request_json`
2070 // must round-trip the cursor into `ConsumeRequest.from_id`.
2071 #[test]
2072 fn test_parse_poll_request_preserves_cursor() {
2073 let req = parse_poll_request_json(r#"{"limit": 50, "cursor": "abc:123"}"#).unwrap();
2074 assert_eq!(req.limit, 50);
2075 assert_eq!(req.from_id.as_deref(), Some("abc:123"));
2076 }
2077
2078 #[test]
2079 fn test_parse_poll_request_no_cursor_defaults_to_none() {
2080 let req = parse_poll_request_json(r#"{"limit": 10}"#).unwrap();
2081 assert_eq!(req.limit, 10);
2082 assert_eq!(req.from_id, None);
2083 }
2084
2085 #[test]
2086 fn test_parse_poll_request_empty_uses_default_limit() {
2087 let req = parse_poll_request_json(r#"{}"#).unwrap();
2088 assert_eq!(req.limit, 100);
2089 assert_eq!(req.from_id, None);
2090 }
2091
2092 // Regression: a wrong-typed `"limit"` previously hit
2093 // `.as_u64().unwrap_or(100)` and silently defaulted. Caller bugs
2094 // (e.g. sending a string or a negative number) must surface as
2095 // `InvalidJson` instead.
2096 #[test]
2097 fn test_parse_poll_request_wrong_type_limit_errors() {
2098 let err = parse_poll_request_json(r#"{"limit": "50"}"#).unwrap_err();
2099 assert_eq!(err, c_int::from(NetError::InvalidJson));
2100 let err = parse_poll_request_json(r#"{"limit": -1}"#).unwrap_err();
2101 assert_eq!(err, c_int::from(NetError::InvalidJson));
2102 }
2103
2104 #[test]
2105 fn test_parse_poll_request_wrong_type_cursor_errors() {
2106 let err = parse_poll_request_json(r#"{"cursor": 123}"#).unwrap_err();
2107 assert_eq!(err, c_int::from(NetError::InvalidJson));
2108 }
2109
2110 #[test]
2111 fn test_parse_poll_request_null_fields_use_defaults() {
2112 let req = parse_poll_request_json(r#"{"limit": null, "cursor": null}"#).unwrap();
2113 assert_eq!(req.limit, 100);
2114 assert_eq!(req.from_id, None);
2115 }
2116
2117 /// `usize::MAX` is always a valid usize regardless of target
2118 /// pointer width, so it must parse successfully on both 32- and
2119 /// 64-bit builds. This pins the boundary case.
2120 #[test]
2121 fn test_parse_poll_request_limit_at_usize_max() {
2122 let json = format!(r#"{{"limit": {}}}"#, usize::MAX);
2123 let req = parse_poll_request_json(&json).unwrap();
2124 assert_eq!(req.limit, usize::MAX);
2125 }
2126
2127 /// Regression: `as usize` silently truncates on 32-bit targets
2128 /// for `u64` values above `usize::MAX`. The parser must return
2129 /// `InvalidJson` instead of wrapping. We only run this on 32-bit
2130 /// targets because on 64-bit `usize::MAX == u64::MAX`, leaving
2131 /// nothing that fits in u64 but not usize.
2132 #[cfg(target_pointer_width = "32")]
2133 #[test]
2134 fn test_parse_poll_request_limit_overflows_usize_on_32bit() {
2135 // 2^33 — fits in u64, but exceeds usize::MAX on a 32-bit build.
2136 let err = parse_poll_request_json(r#"{"limit": 8589934592}"#).unwrap_err();
2137 assert_eq!(err, c_int::from(NetError::InvalidJson));
2138 }
2139
2140 /// CR-22: pin parity between the Rust-side `NetError` enum and
2141 /// the two C-header copies. The Rust enum is the source of
2142 /// truth; C / Go consumers `errors.Is` against the named codes.
2143 /// Pre-CR-22 the headers were missing `-9` (IntOverflow) and
2144 /// `-10` (MismatchedHandles); a consumer receiving those values
2145 /// would fall into the unknown-code branch and lose actionable
2146 /// distinction.
2147 ///
2148 /// We extract every integer literal that appears as the
2149 /// right-hand side of an `= ` token in the file and check
2150 /// that each Rust-side value is present in BOTH headers. The
2151 /// test does NOT verify symbolic names; the sealing
2152 /// constraint is the numeric value alone.
2153 ///
2154 /// Both `include_str!` paths point inside `net/crates/net/`.
2155 /// `include/net.go.h` is a manually-synced mirror of the
2156 /// repo-root `go/net.h`. Reaching outside the crate root
2157 /// (`include_str!("../../../../../go/net.h")`) breaks
2158 /// `cargo publish` and any out-of-repo vendoring of this
2159 /// crate, so the in-crate copy is the supported source. A
2160 /// drift between the two surfaces here as a parity-test
2161 /// failure: one of them will be missing the new value.
2162 #[test]
2163 fn cr22_c_header_parity_with_rust_neterror() {
2164 let primary = include_str!("../../include/net.h");
2165 let go_copy = include_str!("../../include/net.go.h");
2166
2167 // The Rust enum's full set of values (mirrors `pub enum
2168 // NetError` above). When a new variant is added in the
2169 // Rust source, this list — AND both headers — must be
2170 // updated together. The asserts that follow then catch a
2171 // missing header update at the next CI run.
2172 let rust_values: &[i32] = &[0, -1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -99];
2173
2174 // Pull every numeric literal that looks like an enum-value
2175 // assignment (`= <number>` followed by `,` or whitespace).
2176 // Whitespace-tolerant: skips `= 0`, `= 0`, `= -10`, etc.
2177 fn extract_assigned_values(src: &str) -> Vec<i32> {
2178 let mut out = Vec::new();
2179 let mut chars = src.chars().peekable();
2180 while let Some(c) = chars.next() {
2181 if c != '=' {
2182 continue;
2183 }
2184 // Skip whitespace.
2185 while let Some(&peek) = chars.peek() {
2186 if peek == ' ' || peek == '\t' {
2187 chars.next();
2188 } else {
2189 break;
2190 }
2191 }
2192 // Optional sign.
2193 let mut buf = String::new();
2194 if let Some(&peek) = chars.peek() {
2195 if peek == '-' || peek == '+' {
2196 buf.push(peek);
2197 chars.next();
2198 }
2199 }
2200 // Digits.
2201 let mut had_digit = false;
2202 while let Some(&peek) = chars.peek() {
2203 if peek.is_ascii_digit() {
2204 buf.push(peek);
2205 chars.next();
2206 had_digit = true;
2207 } else {
2208 break;
2209 }
2210 }
2211 if had_digit {
2212 if let Ok(v) = buf.parse::<i32>() {
2213 out.push(v);
2214 }
2215 }
2216 }
2217 out
2218 }
2219
2220 let primary_vals = extract_assigned_values(primary);
2221 let go_vals = extract_assigned_values(go_copy);
2222
2223 for &v in rust_values {
2224 assert!(
2225 primary_vals.contains(&v),
2226 "CR-22 regression: include/net.h is missing the value {} \
2227 (Rust NetError defines it). Add the matching `NET_ERR_*` \
2228 enumerator before merging.",
2229 v
2230 );
2231 assert!(
2232 go_vals.contains(&v),
2233 "CR-22 regression: bindings/go/net/net.h is missing the value {} \
2234 (Rust NetError defines it).",
2235 v
2236 );
2237 }
2238 }
2239
2240 /// CR-5: pin that `examples/capability.c` does not double-include
2241 /// `net.h` and `net.go.h`. Both files use the `NET_SDK_H` include
2242 /// guard, so when both are included in one TU the second is
2243 /// silently skipped — every `net_validate_capabilities` /
2244 /// `net_predicate_*` call the example makes becomes an
2245 /// implicit-declaration error on GCC 14+/Clang 16+, and a silent
2246 /// `int`-return miscompile on older toolchains. The deeper fix
2247 /// (renaming one guard so they compose cleanly) is tracked as
2248 /// CR-28; this test catches the example-level regression.
2249 #[test]
2250 fn cr5_example_does_not_double_include_net_headers() {
2251 let example = include_str!("../../examples/capability.c");
2252 let net_h_included = example.contains("#include \"../include/net.h\"");
2253 let net_go_h_included = example.contains("#include \"../include/net.go.h\"");
2254 assert!(
2255 net_go_h_included,
2256 "examples/capability.c must include net.go.h to declare \
2257 net_validate_capabilities + net_predicate_* symbols"
2258 );
2259 assert!(
2260 !net_h_included,
2261 "examples/capability.c must NOT also include net.h: \
2262 both headers share the NET_SDK_H guard, so the second \
2263 include is silently skipped, leaving the example's \
2264 net_predicate_* calls implicitly declared. Drop the \
2265 redundant include — net.go.h is a superset."
2266 );
2267 }
2268
2269 /// `handle_is_valid` rejects null and any pointer not aligned for
2270 /// `NetHandle`. A foreign caller producing a misaligned pointer
2271 /// (e.g. via an over-eager `void *` cast on a packed struct) hits
2272 /// `&*handle` UB before any other check fires; this gate is the
2273 /// pre-deref discriminator.
2274 #[test]
2275 fn handle_is_valid_rejects_null_and_misaligned() {
2276 // Null is rejected.
2277 assert!(
2278 !handle_is_valid(std::ptr::null::<NetHandle>()),
2279 "null pointer must not be considered a valid handle"
2280 );
2281
2282 // Aligned but non-null is accepted (we use a small backing
2283 // buffer to materialize a pointer without dereferencing it).
2284 // `align_of::<NetHandle>()` is the alignment we must match.
2285 let align = std::mem::align_of::<NetHandle>();
2286 let buf = vec![0u8; align * 2];
2287 let base = buf.as_ptr() as usize;
2288 let aligned = (base + align - 1) & !(align - 1);
2289 let aligned_ptr = aligned as *const NetHandle;
2290 assert!(
2291 handle_is_valid(aligned_ptr),
2292 "aligned non-null pointer must validate (align={align}, ptr={aligned_ptr:p})"
2293 );
2294
2295 // A pointer one byte past `aligned_ptr` is misaligned for any
2296 // type with align > 1, and `NetHandle` (containing `AtomicU32`,
2297 // `AtomicBool`, ManuallyDrop'd EventBus + Runtime) easily
2298 // exceeds 1.
2299 if align > 1 {
2300 let misaligned_ptr = (aligned + 1) as *const NetHandle;
2301 assert!(
2302 !handle_is_valid(misaligned_ptr),
2303 "misaligned pointer must be rejected (align={align}, ptr={misaligned_ptr:p})"
2304 );
2305 }
2306 }
2307
2308 /// Pin: zero values for `heartbeat_interval_ms` and
2309 /// `session_timeout_ms` must reject the entire config (parser
2310 /// returns `None`). Pre-fix the parser threaded `0` through
2311 /// to `Duration::from_millis(0)`, which on the Net adapter's
2312 /// heartbeat path results in a busy-loop that pegs a CPU and
2313 /// produces no diagnostic — the FFI caller saw a successful
2314 /// `net_init` followed by a hung daemon. The validator-level
2315 /// guard for cooldown / metrics_window has no equivalent on
2316 /// the Net-adapter side, so the parser is the only place that
2317 /// can refuse the build.
2318 #[cfg(feature = "net")]
2319 #[test]
2320 fn parse_config_rejects_zero_heartbeat_and_session_timeout() {
2321 // 32-byte hex strings (64 chars) so `hex::decode` produces
2322 // exactly the [u8; 32] the parser requires for `psk` and
2323 // `peer_public_key`.
2324 let psk = "0".repeat(64);
2325 let peer_pk = "1".repeat(64);
2326
2327 // Sanity: a config with both fields *non-zero* must parse
2328 // successfully — proves the rejection in the negative
2329 // cases below is caused by the zero, not a missing
2330 // required field on the surrounding `net` block.
2331 let baseline = format!(
2332 r#"{{"net":{{"bind_addr":"127.0.0.1:9000","peer_addr":"127.0.0.1:9001",
2333 "psk":"{psk}","peer_public_key":"{peer_pk}",
2334 "heartbeat_interval_ms":1000,"session_timeout_ms":30000}}}}"#
2335 );
2336 assert!(
2337 parse_config_json(&baseline).is_some(),
2338 "baseline net config with non-zero heartbeat/session_timeout must parse"
2339 );
2340
2341 // heartbeat_interval_ms = 0 → reject.
2342 let zero_hb = format!(
2343 r#"{{"net":{{"bind_addr":"127.0.0.1:9000","peer_addr":"127.0.0.1:9001",
2344 "psk":"{psk}","peer_public_key":"{peer_pk}",
2345 "heartbeat_interval_ms":0,"session_timeout_ms":30000}}}}"#
2346 );
2347 assert!(
2348 parse_config_json(&zero_hb).is_none(),
2349 "heartbeat_interval_ms=0 must reject (pre-fix this produced a CPU-pegging busy loop)"
2350 );
2351
2352 // session_timeout_ms = 0 → reject.
2353 let zero_to = format!(
2354 r#"{{"net":{{"bind_addr":"127.0.0.1:9000","peer_addr":"127.0.0.1:9001",
2355 "psk":"{psk}","peer_public_key":"{peer_pk}",
2356 "heartbeat_interval_ms":1000,"session_timeout_ms":0}}}}"#
2357 );
2358 assert!(
2359 parse_config_json(&zero_to).is_none(),
2360 "session_timeout_ms=0 must reject"
2361 );
2362 }
2363}