Skip to main content

net/ffi/
mod.rs

1//! C FFI bindings for cross-language integration.
2//!
3//! This module provides a C-compatible API for using Net from
4//! other languages (Python, Node.js, Go, etc.).
5//!
6//! # Safety
7//!
8//! All public FFI functions in this module accept raw pointers from C code.
9//! Each is declared `pub unsafe extern "C" fn` so the unsafety is
10//! explicit at the type level; the module-wide contract callers
11//! must uphold is:
12//! - Pointers are valid and properly aligned
13//! - Opaque handle pointers (`*mut T`) were produced by this crate's
14//!   matching constructor (`Box::into_raw` inside the FFI surface).
15//!   Foreign-allocated pointers, even if valid and aligned, will UB
16//!   when consumed by `Box::from_raw` in the corresponding `_free`.
17//! - String pointers point to valid UTF-8 data
18//! - Buffer sizes are accurate
19//! - Handles are not used after `net_shutdown`
20//!
21//! The per-function `# Safety` rustdoc is intentionally suppressed
22//! at the module level — every entry point shares the same contract
23//! and the module doc-comment above (plus `include/README.md`) is
24//! the source of truth. Adding individual `# Safety` blocks would
25//! duplicate the same wording 200 times without adding signal.
26//!
27//! # Thread Safety
28//!
29//! All FFI functions are thread-safe. The event bus handle can be shared
30//! across threads.
31//!
32#![allow(clippy::missing_safety_doc)]
33// The cross-cutting C-side safety contract for every `unsafe` block in
34// this module is documented in the `# Safety` section above:
35// caller-validated pointer / length / lifetime / handle-not-after-shutdown
36// invariants documented in `include/net.h`. Inlining `// SAFETY:` on each
37// block would add ~200 identical "see module preamble" comments without
38// adding any signal beyond what the preamble already says.
39#![expect(
40    clippy::undocumented_unsafe_blocks,
41    reason = "module-wide FFI safety contract documented in the # Safety preamble above"
42)]
43#![expect(
44    clippy::multiple_unsafe_ops_per_block,
45    reason = "FFI entry points routinely deref + write to multiple out-parameter fields under the same caller contract; splitting per-op would obscure the single boundary-cross"
46)]
47
48//! # Tokio runtime restriction
49//!
50//! Internal FFI ops (`net_poll`, `net_flush`, `net_shutdown`,
51//! `net_redex_*`, `net_mesh_new`, the cortex FFI, the mesh FFI)
52//! drive the bus's tokio runtime via `Runtime::block_on`. That
53//! function panics with "Cannot start a runtime from within a
54//! runtime" if the calling thread is already inside a tokio
55//! runtime context. The functions are `extern "C"`, so a panic
56//! unwinds across the FFI boundary into C / Go-cgo / Python /
57//! NAPI — undefined behavior.
58//!
59//! **The common-case C / Go / Python caller has no Rust tokio
60//! runtime, so this is unreachable for them.** The narrow path is:
61//!
62//! - A **Rust** caller loads the cdylib and calls these
63//!   functions from inside its own `#[tokio::main]` (or any
64//!   thread that has called `Runtime::enter()`).
65//! - A non-Rust caller embeds a Rust library that runs its own
66//!   tokio runtime and forwards calls into this cdylib on the
67//!   same thread.
68//!
69//! Both forms are unusual but reachable. **Do not call any FFI
70//! op from a thread that already holds a tokio runtime
71//! context.** If you must, spawn the FFI call on a fresh OS
72//! thread that doesn't carry a runtime guard, or wrap the call
73//! with `tokio::task::spawn_blocking(|| net_xxx(...))` to escape
74//! the worker pool.
75//!
76//! `net_init` (`mod.rs:284-316`) hardens against this for runtime
77//! *construction*; the steady-state ops do not, since the cost
78//! of a `Handle::try_current()` check on every poll would be
79//! measurable for the common path that doesn't hit the bug.
80//!
81//! # `catch_unwind` + caller-held locks
82//!
83//! Several FFI entries (`net_blob_publish`, `net_blob_resolve`,
84//! `net_*_wait_for_token`) wrap their body in
85//! `std::panic::catch_unwind(AssertUnwindSafe(...))` so a panic
86//! during the call returns a typed `NET_ERR_BLOB_PANIC` /
87//! `NET_ERR_PANIC` code rather than unwinding across the FFI
88//! boundary. That stops the substrate-side undefined behavior,
89//! but it does NOT make the wrapped code transparently panic-safe
90//! from the caller's perspective.
91//!
92//! **If the caller invokes an FFI op while holding an OS-level
93//! lock, a `sync.Mutex` (Go), `threading.Lock` (Python), or any
94//! other mutex with poisoning semantics, and the FFI body panics,
95//! the mutex is left in a poisoned state.** Subsequent acquires
96//! on the same mutex by the caller observe the poisoning and
97//! either error (Rust `parking_lot` with `poison_on_unwind`) or
98//! deadlock (Go's `sync.Mutex` doesn't poison; the caller has
99//! observed a return value that may not reflect the state of
100//! the FFI op).
101//!
102//! Recommended caller pattern: **do not hold a caller-side lock
103//! across an FFI call**. Acquire the lock, prepare the inputs,
104//! release the lock, then call the FFI. Re-acquire if you need
105//! to update caller state with the result.
106//!
107//! The hazard is documented per-binding in:
108//!   - Python: `bindings/python/README.md` (caller-mutex notes)
109//!   - Node:   `bindings/node/README.md`
110//!   - Go:     `bindings/go/net/redex.go` lifecycle docs
111//!   - C:      `include/net.h` (every wait-family declaration)
112//!
113//! # Memory Management
114//!
115//! - Handles returned by `net_init` must be freed with `net_shutdown`
116//! - String buffers passed to `net_poll` are owned by the caller
117//! - Error codes are returned as integers (0 = success, negative = error)
118//!
119//! # Example (C)
120//!
121//! ```c
122//! #include "net.h"
123//!
124//! int main() {
125//!     // Initialize with default config
126//!     void* bus = net_init("{\"num_shards\": 4}");
127//!     if (!bus) return 1;
128//!
129//!     // Ingest an event
130//!     int result = net_ingest(bus, "{\"token\": \"hello\"}", 19);
131//!     if (result < 0) { /* handle error */ }
132//!
133//!     // Poll events
134//!     char buffer[65536];
135//!     result = net_poll(bus, "{\"limit\": 100}", buffer, sizeof(buffer));
136//!
137//!     // Shutdown
138//!     net_shutdown(bus);
139//!     return 0;
140//! }
141//! ```
142
143// FFI functions accept raw pointers but are not marked `unsafe` to maintain
144// C ABI compatibility. Safety is documented in the module-level docs.
145#![allow(clippy::not_unsafe_ptr_arg_deref)]
146
147use std::ffi::CStr;
148use std::os::raw::{c_char, c_int};
149use std::ptr;
150
151use tokio::runtime::Runtime;
152
153use crate::bus::EventBus;
154use crate::config::EventBusConfig;
155use crate::consumer::ConsumeRequest;
156use crate::event::{Event, RawEvent};
157
158/// C FFI for CortEX / NetDb / RedexFile. Requires `netdb` (for the
159/// unified facade) and `redex-disk` (for persistent storage paths on
160/// `Redex` / `RedexFile`). Go / cgo consumers target this surface.
161///
162/// `missing_docs` is suppressed on this module: these are extern "C"
163/// shims over already-documented Rust adapters, and the per-function
164/// contract is documented in the binding-side READMEs (Go / TS / Py).
165/// Re-documenting each shim would duplicate with drift risk.
166/// Per-FFI-handle quiescing protocol shared by cortex / mesh
167/// handles to close the audit-#23/#24/#25 use-after-free hazards
168/// when a `_free` races a concurrent op. See module docs for the
169/// soundness story (intentional box leak) and the per-handle
170/// recipe.
171#[cfg(any(
172    all(feature = "netdb", feature = "redex-disk"),
173    feature = "net",
174    feature = "redis",
175))]
176pub mod handle_guard;
177
178#[cfg(all(feature = "netdb", feature = "redex-disk"))]
179#[allow(missing_docs)]
180pub mod cortex;
181
182/// C FFI for the Dataforts Phase 3 blob surface. Exposes the
183/// BlobRef wire codec, the global adapter registry, and the
184/// `publish_blob` / `resolve_payload` helpers for cgo / native
185/// consumers.
186#[cfg(feature = "dataforts")]
187#[allow(missing_docs)]
188pub mod blob;
189
190/// Stub definitions for the `net_mesh_blob_adapter_*` symbols
191/// when the `dataforts / netdb / redex-disk` feature triple is
192/// off. cgo / dlsym consumers link these symbols unconditionally
193/// (see `bindings/go/blob.go`), so a libnet built without the
194/// triple must still satisfy them — each stub returns
195/// `NET_ERR_FEATURE_NOT_BUILT` (or null) so Go programs route to
196/// a clean error rather than fail at program load. The module is
197/// empty when the feature triple is on (the real impls in
198/// `ffi::blob` cover the same symbol names).
199#[allow(missing_docs)]
200pub mod blob_stubs;
201
202/// C FFI for the encrypted-UDP mesh transport + channels. Requires
203/// the `net` feature (which brings in the crypto + transport). Go /
204/// cgo consumers target this surface alongside `ffi::cortex`. See
205/// the `ffi::cortex` note for why `missing_docs` is suppressed here.
206#[cfg(feature = "net")]
207#[allow(missing_docs)]
208pub mod mesh;
209
210/// C FFI for the `aggregator.registry` RPC client + channel
211/// visibility setter. Stage 5 of `SDK_AGGREGATOR_SUBNET_PLAN.md`.
212/// Rides the `net` feature alongside `ffi::mesh` because every
213/// op needs a `MeshNodeHandle`, and `cortex` because the
214/// underlying `behavior::aggregator` module's RPC surface is
215/// cortex-only (`mesh_rpc`, `cortex::rpc`, `postcard`).
216#[cfg(all(feature = "net", feature = "cortex"))]
217#[allow(missing_docs)]
218pub mod aggregator;
219
220/// C FFI for stateless predicate evaluation (Phase 9c of
221/// `CAPABILITY_SYSTEM_SDK_PLAN.md`). Pure helpers — no handles,
222/// no state. Mirrors the SDK-layer `evaluatePredicate` /
223/// `evaluate_predicate` surface every binding ships, exposed at
224/// the C ABI for raw consumers (C / C++ / Zig / Swift / etc.).
225#[cfg(feature = "net")]
226pub mod predicate;
227
228/// C FFI for stateless capability-set validation (Phase 9a of
229/// `CAPABILITY_SYSTEM_SDK_PLAN.md`). Pure helper — `caps_json`
230/// in, `report_json` out. Mirrors the SDK-layer
231/// `validate_capabilities` surface, exposed at the C ABI for raw
232/// consumers.
233#[cfg(feature = "net")]
234pub mod schema;
235
236/// C FFI for predicate debug-session helpers (Phase 9d of
237/// `CAPABILITY_SYSTEM_SDK_PLAN.md`). Pure helpers — single-eval
238/// `evaluate_with_trace`, corpus-wide
239/// `aggregate_debug_report`, and host-side
240/// `redact_metadata_keys`. Mirror what every other binding
241/// ships at the SDK layer; exposed at the C ABI for raw
242/// consumers.
243#[cfg(feature = "net")]
244pub mod predicate_debug;
245
246/// C FFI for the Redis Streams consumer-side dedup helper. Mirrors
247/// the Rust `net::adapter::RedisStreamDedup` surface for Go / C / Zig
248/// consumers. See `ffi::redis_dedup` module docs for the wire
249/// shape and the dedup contract.
250#[cfg(feature = "redis")]
251pub mod redis_dedup;
252
253#[cfg(feature = "net")]
254use crate::adapter::net::{NetAdapterConfig, ReliabilityConfig, StaticKeypair};
255#[cfg(any(feature = "redis", feature = "jetstream", feature = "net"))]
256use crate::config::AdapterConfig;
257#[cfg(feature = "jetstream")]
258use crate::config::JetStreamAdapterConfig;
259#[cfg(feature = "redis")]
260use crate::config::RedisAdapterConfig;
261#[cfg(feature = "net")]
262use std::ffi::CString;
263
264/// Opaque handle to an event bus instance.
265///
266/// This wraps the EventBus along with a Tokio runtime for async operations.
267///
268/// # Lifetime / soundness
269///
270/// The handle storage is *intentionally leaked* on `net_shutdown` rather
271/// than freed via `Box::from_raw`. Reasoning: every FFI entry point
272/// dereferences the C-side `*mut NetHandle` to access the atomics that
273/// gate shutdown. The previous Dekker-style SeqCst handshake between
274/// `FfiOpGuard::try_enter` (which calls `fetch_add` on `active_ops`) and
275/// `net_shutdown` (which loads `active_ops` then `Box::from_raw`s the
276/// handle) was unsound: SeqCst orders the atomic operations only — the
277/// non-atomic `Box::from_raw` could deallocate the storage between
278/// shutdown's load and a concurrent FFI op's `fetch_add`, producing a
279/// use-after-free on the freed atomic. By never freeing the box, the
280/// atomic memory backing the handle is always valid; concurrent FFI ops
281/// observe `shutting_down=true` after shutdown signals it and bail
282/// before touching `bus`/`runtime`.
283///
284/// `bus` and `runtime` are stored in `ManuallyDrop` so that
285/// `net_shutdown` can `take` them out (via `ptr::read`) in order to
286/// call `bus.shutdown().await`. Because `shutting_down` is set first
287/// and shutdown waits for `active_ops` to drop to zero before reading
288/// these fields, no FFI op can be racing the read. If the wait times
289/// out, the `ptr::read` is skipped and both fields are leaked along
290/// with the box.
291pub struct NetHandle {
292    /// Owned `EventBus`. Read out via `ManuallyDrop::take` during
293    /// shutdown once `active_ops` has drained to zero. After that
294    /// point, `shutting_down` is `true` and no FFI op may access this
295    /// field.
296    bus: std::mem::ManuallyDrop<EventBus>,
297    /// Owned tokio runtime. Same lifetime contract as `bus`.
298    runtime: std::mem::ManuallyDrop<Runtime>,
299    /// Set to `true` once `net_shutdown` begins. All other FFI
300    /// functions check this flag and return `ShuttingDown` before
301    /// touching `bus` / `runtime`.
302    shutting_down: std::sync::atomic::AtomicBool,
303    /// Number of in-flight FFI operations (excluding shutdown itself).
304    /// `net_shutdown` spins until this drops to zero (with a deadline)
305    /// before reading `bus` / `runtime` to call shutdown.
306    active_ops: std::sync::atomic::AtomicU32,
307    /// Set to `true` after `net_shutdown` has consumed `bus` /
308    /// `runtime` via `ManuallyDrop::take`. A second `net_shutdown`
309    /// call observes this and returns `Success` without re-taking
310    /// (which would be UB). FFI ops also check this before touching
311    /// `bus` / `runtime`, defending against a contract-violating
312    /// caller that races a post-shutdown call.
313    bus_taken: std::sync::atomic::AtomicBool,
314    /// Set to `true` after `bus.shutdown()` returns from the
315    /// first `net_shutdown` call. A second/third concurrent
316    /// `net_shutdown` caller spins until this flips before
317    /// returning success — without this gate the second caller
318    /// observed `bus_taken == true` and returned `Success` while
319    /// the first caller was still mid-`block_on(bus.shutdown())`,
320    /// falsely signaling completion of an in-progress shutdown.
321    shutdown_completed: std::sync::atomic::AtomicBool,
322}
323
324/// Maximum time `net_shutdown` will wait for in-flight FFI operations
325/// to complete before giving up. If the deadline expires, the bus is
326/// leaked rather than read out — leaking is correct (the box is
327/// already leaked permanently for soundness reasons) but means the
328/// adapter's `flush()` / `shutdown()` won't run.
329const FFI_SHUTDOWN_DEADLINE: std::time::Duration = std::time::Duration::from_secs(5);
330
331/// RAII guard that increments `active_ops` on creation and decrements on drop.
332struct FfiOpGuard<'a> {
333    handle: &'a NetHandle,
334}
335
336impl<'a> FfiOpGuard<'a> {
337    /// Try to enter an FFI operation. Returns `None` if the handle is
338    /// shutting down or if `bus` / `runtime` have already been taken.
339    ///
340    /// Soundness rests on the fact that the box backing `handle` is
341    /// never freed (see `NetHandle` doc). The `fetch_add` is therefore
342    /// always on valid memory regardless of whether shutdown is in
343    /// progress. The subsequent loads decide whether the op is allowed
344    /// to proceed; if shutdown was signaled or `bus_taken` flipped
345    /// before our increment was visible, we bail without touching
346    /// `bus` / `runtime`. The `bus_taken` check defends against a
347    /// contract-violating caller that races a post-shutdown call: even
348    /// if `shutting_down` was reset somehow, an op that would touch the
349    /// already-taken `ManuallyDrop` fields is rejected.
350    fn try_enter(handle: &'a NetHandle) -> Option<Self> {
351        handle
352            .active_ops
353            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
354        if handle
355            .shutting_down
356            .load(std::sync::atomic::Ordering::SeqCst)
357            || handle.bus_taken.load(std::sync::atomic::Ordering::SeqCst)
358        {
359            handle
360                .active_ops
361                .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
362            None
363        } else {
364            Some(Self { handle })
365        }
366    }
367}
368
369impl Drop for FfiOpGuard<'_> {
370    fn drop(&mut self) {
371        self.handle
372            .active_ops
373            .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
374    }
375}
376
377/// Returns `true` when `handle` is non-null and aligned for
378/// `NetHandle`. Every `extern "C"` entry point that derefs the
379/// raw handle must gate on this — a misaligned pointer produced
380/// by an over-eager `void *` cast in a foreign caller would be
381/// immediate UB on `&*handle`, even before the `is_null` check.
382#[inline]
383fn handle_is_valid(handle: *const NetHandle) -> bool {
384    !handle.is_null() && (handle as usize).is_multiple_of(std::mem::align_of::<NetHandle>())
385}
386
387/// Error codes returned by FFI functions.
388#[repr(C)]
389pub enum NetError {
390    /// Success (no error).
391    Success = 0,
392    /// Null pointer passed.
393    NullPointer = -1,
394    /// Invalid UTF-8 string.
395    InvalidUtf8 = -2,
396    /// Invalid JSON.
397    InvalidJson = -3,
398    /// Initialization failed.
399    InitFailed = -4,
400    /// Ingestion failed (backpressure).
401    IngestionFailed = -5,
402    /// Poll failed.
403    PollFailed = -6,
404    /// Buffer too small.
405    BufferTooSmall = -7,
406    /// Shutting down.
407    ShuttingDown = -8,
408    /// Integer overflow: result does not fit in `c_int`.
409    IntOverflow = -9,
410    /// Stream handle does not belong to the supplied node handle.
411    /// Previously the send-family FFIs accepted any (stream, node)
412    /// pair without verifying they were created from the same node,
413    /// allowing silent cross-session traffic.
414    MismatchedHandles = -10,
415    /// `CString::new` failure: the input bytes are valid UTF-8 by
416    /// Rust's `String` invariant but contain an interior NUL byte
417    /// — and the C ABI cannot represent that, since C strings are
418    /// NUL-terminated. Pre-fix this was reported as
419    /// `InvalidUtf8`, which was wrong: the input is UTF-8-valid;
420    /// it just has a NUL where C expects it not to. A binding
421    /// reading the typed error and seeing "invalid UTF-8" would
422    /// chase the wrong cause.
423    InteriorNul = -11,
424    /// Unknown error.
425    Unknown = -99,
426}
427
428impl From<NetError> for c_int {
429    fn from(e: NetError) -> Self {
430        e as c_int
431    }
432}
433
434/// Enter an FFI operation with lifetime protection. Returns an `FfiOpGuard`
435/// that prevents `net_shutdown` from deallocating the handle until the guard
436/// is dropped. Returns `Err` with the error code if shutdown is in progress.
437#[inline]
438fn enter_ffi_op(handle: &NetHandle) -> Result<FfiOpGuard<'_>, c_int> {
439    FfiOpGuard::try_enter(handle).ok_or(NetError::ShuttingDown.into())
440}
441
442/// Initialize a new event bus.
443///
444/// # Parameters
445///
446/// - `config_json`: JSON configuration string (UTF-8, null-terminated).
447///   Pass NULL or empty string for default configuration.
448///
449/// # Returns
450///
451/// Opaque handle to the event bus, or NULL on failure.
452/// The handle must be freed with `net_shutdown`.
453///
454/// # Example Configuration
455///
456/// ```json
457/// {
458///   "num_shards": 8,
459///   "ring_buffer_capacity": 1048576,
460///   "backpressure_mode": "DropOldest",
461///   "batch": {
462///     "min_size": 1000,
463///     "max_size": 10000,
464///     "max_delay_ms": 10
465///   }
466/// }
467/// ```
468#[unsafe(no_mangle)]
469pub unsafe extern "C" fn net_init(config_json: *const c_char) -> *mut NetHandle {
470    // Parse and validate the config BEFORE constructing the tokio
471    // runtime. Building the runtime first would let any subsequent
472    // early-return path (`CStr::to_str` Err, `parse_config_json`
473    // returning None, `EventBus::new` returning Err) drop the
474    // local `Runtime` on function return. Dropping a multi-thread
475    // tokio runtime from inside ANOTHER tokio runtime's worker
476    // thread panics with "Cannot drop a runtime in a context where
477    // blocking is not allowed", unwinding across this `extern "C"`
478    // boundary into a Python / Go-cgo / NAPI / PyO3 caller —
479    // undefined behaviour. By validating inputs first, the runtime
480    // is only built once we know it will be installed into the
481    // `NetHandle` and survive the call.
482    let config = if config_json.is_null() {
483        EventBusConfig::default()
484    } else {
485        let config_str = match unsafe { CStr::from_ptr(config_json) }.to_str() {
486            Ok("") => EventBusConfig::default(),
487            Ok(s) => match parse_config_json(s) {
488                Some(cfg) => cfg,
489                None => return ptr::null_mut(),
490            },
491            Err(_) => return ptr::null_mut(),
492        };
493        config_str
494    };
495
496    // Now construct the runtime — its lifetime is tied to the
497    // returned `NetHandle` (via `create_with_config`), so the only
498    // remaining drop is on `net_shutdown`, which already handles
499    // it via `runtime.block_on(...)` (see #74) outside any other
500    // tokio context.
501    let runtime = match Runtime::new() {
502        Ok(rt) => rt,
503        Err(_) => return ptr::null_mut(),
504    };
505
506    create_with_config(runtime, config)
507}
508
509/// Parse JSON configuration into EventBusConfig.
510///
511/// Supports:
512/// - `num_shards`: number of shards
513/// - `ring_buffer_capacity`: ring buffer size per shard
514/// - `backpressure_mode`: "DropNewest", "DropOldest", "FailProducer"
515fn parse_config_json(json_str: &str) -> Option<EventBusConfig> {
516    let value: serde_json::Value = serde_json::from_str(json_str).ok()?;
517
518    let mut builder = EventBusConfig::builder();
519
520    if let Some(num_shards) = value.get("num_shards").and_then(|v| v.as_u64()) {
521        let num_shards = u16::try_from(num_shards).ok()?;
522        builder = builder.num_shards(num_shards);
523    }
524
525    if let Some(capacity) = value.get("ring_buffer_capacity").and_then(|v| v.as_u64()) {
526        let capacity = usize::try_from(capacity).ok()?;
527        builder = builder.ring_buffer_capacity(capacity);
528    }
529
530    if let Some(bp_value) = value.get("backpressure_mode") {
531        let bp_mode = if let Some(mode) = bp_value.as_str() {
532            match mode {
533                "DropNewest" | "drop_newest" => crate::config::BackpressureMode::DropNewest,
534                "DropOldest" | "drop_oldest" => crate::config::BackpressureMode::DropOldest,
535                "FailProducer" | "fail_producer" => crate::config::BackpressureMode::FailProducer,
536                // Pre-fix every other string silently fell back to
537                // `DropNewest`. A typo (`"DropOldset"`) thus
538                // changed durability profile at deploy time with
539                // no error. Reject unknowns to match the contract
540                // already enforced by `parse_poll_request_json`.
541                _ => return None,
542            }
543        } else if let Some(obj) = bp_value.as_object() {
544            // Object form: `{"Sample": {"rate": N}}` for the
545            // sampling mode that has an associated value.
546            if let Some(sample) = obj.get("Sample").or_else(|| obj.get("sample")) {
547                let rate = sample.get("rate").and_then(|v| v.as_u64())?;
548                let rate = u32::try_from(rate).ok()?;
549                if rate == 0 {
550                    // Validated again by `EventBusConfig::validate`,
551                    // but reject earlier so the parser surface
552                    // matches the validator surface.
553                    return None;
554                }
555                crate::config::BackpressureMode::Sample { rate }
556            } else {
557                return None;
558            }
559        } else {
560            return None;
561        };
562        builder = builder.backpressure_mode(bp_mode);
563    }
564
565    // Parse Redis config
566    #[cfg(feature = "redis")]
567    if let Some(redis) = value.get("redis") {
568        if let Some(url) = redis.get("url").and_then(|v| v.as_str()) {
569            let mut redis_config = RedisAdapterConfig::new(url);
570
571            if let Some(prefix) = redis.get("prefix").and_then(|v| v.as_str()) {
572                redis_config = redis_config.with_prefix(prefix);
573            }
574            if let Some(max_len) = redis.get("max_stream_len").and_then(|v| v.as_u64()) {
575                let max_len = usize::try_from(max_len).ok()?;
576                redis_config = redis_config.with_max_stream_len(max_len);
577            }
578            if let Some(pipeline_size) = redis.get("pipeline_size").and_then(|v| v.as_u64()) {
579                let pipeline_size = usize::try_from(pipeline_size).ok()?;
580                redis_config = redis_config.with_pipeline_size(pipeline_size);
581            }
582
583            builder = builder.adapter(AdapterConfig::Redis(redis_config));
584        }
585    }
586
587    // Parse JetStream config
588    #[cfg(feature = "jetstream")]
589    if let Some(jetstream) = value.get("jetstream") {
590        if let Some(url) = jetstream.get("url").and_then(|v| v.as_str()) {
591            let mut js_config = JetStreamAdapterConfig::new(url);
592
593            if let Some(prefix) = jetstream.get("prefix").and_then(|v| v.as_str()) {
594                js_config = js_config.with_prefix(prefix);
595            }
596            if let Some(max_messages) = jetstream.get("max_messages").and_then(|v| v.as_i64()) {
597                js_config = js_config.with_max_messages(max_messages);
598            }
599            if let Some(replicas) = jetstream.get("replicas").and_then(|v| v.as_u64()) {
600                let replicas = usize::try_from(replicas).ok()?;
601                js_config = js_config.with_replicas(replicas);
602            }
603
604            builder = builder.adapter(AdapterConfig::JetStream(js_config));
605        }
606    }
607
608    // Parse Net config
609    #[cfg(feature = "net")]
610    if let Some(net) = value.get("net") {
611        let bind_addr: std::net::SocketAddr = net
612            .get("bind_addr")
613            .and_then(|v| v.as_str())
614            .and_then(|s| s.parse().ok())?;
615
616        let peer_addr: std::net::SocketAddr = net
617            .get("peer_addr")
618            .and_then(|v| v.as_str())
619            .and_then(|s| s.parse().ok())?;
620
621        let psk: [u8; 32] = net
622            .get("psk")
623            .and_then(|v| v.as_str())
624            .and_then(|s| hex::decode(s).ok())
625            .and_then(|v| v.try_into().ok())?;
626
627        let role = net
628            .get("role")
629            .and_then(|v| v.as_str())
630            .unwrap_or("initiator");
631
632        let mut net_config = match role {
633            "initiator" => {
634                let peer_pubkey: [u8; 32] = net
635                    .get("peer_public_key")
636                    .and_then(|v| v.as_str())
637                    .and_then(|s| hex::decode(s).ok())
638                    .and_then(|v| v.try_into().ok())?;
639                NetAdapterConfig::initiator(bind_addr, peer_addr, psk, peer_pubkey)
640            }
641            "responder" => {
642                let secret_key: [u8; 32] = net
643                    .get("secret_key")
644                    .and_then(|v| v.as_str())
645                    .and_then(|s| hex::decode(s).ok())
646                    .and_then(|v| v.try_into().ok())?;
647                let public_key: [u8; 32] = net
648                    .get("public_key")
649                    .and_then(|v| v.as_str())
650                    .and_then(|s| hex::decode(s).ok())
651                    .and_then(|v| v.try_into().ok())?;
652                let keypair = StaticKeypair::from_keys(secret_key, public_key);
653                NetAdapterConfig::responder(bind_addr, peer_addr, psk, keypair)
654            }
655            _ => return None,
656        };
657
658        // Apply optional settings
659        if let Some(reliability) = net.get("reliability").and_then(|v| v.as_str()) {
660            net_config = net_config.with_reliability(match reliability {
661                "light" => ReliabilityConfig::Light,
662                "full" => ReliabilityConfig::Full,
663                _ => ReliabilityConfig::None,
664            });
665        }
666
667        if let Some(pool_size) = net.get("packet_pool_size").and_then(|v| v.as_u64()) {
668            if let Ok(size) = usize::try_from(pool_size) {
669                net_config = net_config.with_pool_size(size);
670            }
671        }
672
673        // Reject `0` for `heartbeat_interval_ms` and
674        // `session_timeout_ms`. `EventBusConfig::validate` rejects
675        // zero `Duration`s for `cooldown`, `metrics_window`, etc.,
676        // but the Net adapter's JSON parser had no equivalent guard
677        // — a `0` here flowed through to `Duration::from_millis(0)`,
678        // which on the heartbeat path busy-loops the heartbeat task
679        // and saturates a CPU. Treat zero as a misconfig and refuse
680        // to build the bus, surfacing as `InvalidJson` so the FFI
681        // caller sees a typed failure rather than a hung daemon.
682        if let Some(interval_ms) = net.get("heartbeat_interval_ms").and_then(|v| v.as_u64()) {
683            if interval_ms == 0 {
684                return None;
685            }
686            net_config =
687                net_config.with_heartbeat_interval(std::time::Duration::from_millis(interval_ms));
688        }
689
690        if let Some(timeout_ms) = net.get("session_timeout_ms").and_then(|v| v.as_u64()) {
691            if timeout_ms == 0 {
692                return None;
693            }
694            net_config =
695                net_config.with_session_timeout(std::time::Duration::from_millis(timeout_ms));
696        }
697
698        if let Some(batched) = net.get("batched_io").and_then(|v| v.as_bool()) {
699            net_config = net_config.with_batched_io(batched);
700        }
701
702        builder = builder.adapter(AdapterConfig::Net(Box::new(net_config)));
703    }
704
705    builder.build().ok()
706}
707
708fn create_with_config(runtime: Runtime, config: EventBusConfig) -> *mut NetHandle {
709    let bus = match runtime.block_on(EventBus::new(config)) {
710        Ok(bus) => bus,
711        Err(_) => {
712            // Send the runtime off to a fresh OS thread for
713            // dropping. Dropping a multi-thread tokio `Runtime`
714            // from inside another tokio runtime's worker thread
715            // panics ("Cannot drop a runtime in a context where
716            // blocking is not allowed"); a panic here would unwind
717            // across this `extern "C"` frame. The fresh thread
718            // guarantees a non-tokio context, so the drop is sound
719            // regardless of the caller's runtime environment. We
720            // don't `join()` the thread — the drop completes on
721            // its own and the caller has already been told
722            // `net_init` failed (returning null).
723            std::thread::spawn(move || drop(runtime));
724            return ptr::null_mut();
725        }
726    };
727
728    let handle = Box::new(NetHandle {
729        bus: std::mem::ManuallyDrop::new(bus),
730        runtime: std::mem::ManuallyDrop::new(runtime),
731        shutting_down: std::sync::atomic::AtomicBool::new(false),
732        active_ops: std::sync::atomic::AtomicU32::new(0),
733        bus_taken: std::sync::atomic::AtomicBool::new(false),
734        shutdown_completed: std::sync::atomic::AtomicBool::new(false),
735    });
736
737    Box::into_raw(handle)
738}
739
740/// Ingest a single event.
741///
742/// # Parameters
743///
744/// - `handle`: Event bus handle from `net_init`.
745/// - `event_json`: JSON event string (UTF-8).
746/// - `len`: Length of the event string in bytes.
747///
748/// # Returns
749///
750/// - `0` on success
751/// - Negative error code on failure
752#[unsafe(no_mangle)]
753pub unsafe extern "C" fn net_ingest(
754    handle: *mut NetHandle,
755    event_json: *const c_char,
756    len: usize,
757) -> c_int {
758    if !handle_is_valid(handle) || event_json.is_null() {
759        return NetError::NullPointer.into();
760    }
761
762    let handle = unsafe { &*handle };
763    let _guard = match enter_ffi_op(handle) {
764        Ok(g) => g,
765        Err(err) => return err,
766    };
767
768    // `slice::from_raw_parts` requires `len <= isize::MAX`. A
769    // C caller passing a sign-extended `-1` (or any
770    // `len > isize::MAX as usize`) triggers immediate UB before
771    // any other validation runs. Reject such inputs explicitly
772    // — caller should never see this in practice; surfacing a
773    // typed error is safer than UB.
774    if len > isize::MAX as usize {
775        return NetError::InvalidJson.into();
776    }
777    // Parse event JSON
778    let json_bytes = unsafe { std::slice::from_raw_parts(event_json as *const u8, len) };
779    let json_str = match std::str::from_utf8(json_bytes) {
780        Ok(s) => s,
781        Err(_) => return NetError::InvalidUtf8.into(),
782    };
783
784    let event = match Event::from_str(json_str) {
785        Ok(e) => e,
786        Err(_) => return NetError::InvalidJson.into(),
787    };
788
789    // Ingest
790    match handle.bus.ingest(event) {
791        Ok(_) => NetError::Success.into(),
792        Err(_) => NetError::IngestionFailed.into(),
793    }
794}
795
796/// Ingest a raw JSON string (fastest path).
797///
798/// The JSON string is stored directly without parsing.
799/// This is the recommended method for high-throughput ingestion.
800///
801/// # Parameters
802///
803/// - `handle`: Event bus handle from `net_init`.
804/// - `json`: JSON string (UTF-8).
805/// - `len`: Length of the JSON string in bytes.
806///
807/// # Returns
808///
809/// - `0` on success
810/// - Negative error code on failure
811#[unsafe(no_mangle)]
812pub unsafe extern "C" fn net_ingest_raw(
813    handle: *mut NetHandle,
814    json: *const c_char,
815    len: usize,
816) -> c_int {
817    if !handle_is_valid(handle) || json.is_null() {
818        return NetError::NullPointer.into();
819    }
820
821    let handle = unsafe { &*handle };
822    let _guard = match enter_ffi_op(handle) {
823        Ok(g) => g,
824        Err(err) => return err,
825    };
826
827    // `slice::from_raw_parts` requires `len <= isize::MAX`.
828    if len > isize::MAX as usize {
829        return NetError::InvalidJson.into();
830    }
831    let json_bytes = unsafe { std::slice::from_raw_parts(json as *const u8, len) };
832    let json_str = match std::str::from_utf8(json_bytes) {
833        Ok(s) => s,
834        Err(_) => return NetError::InvalidUtf8.into(),
835    };
836
837    let raw = RawEvent::from_str(json_str);
838
839    match handle.bus.ingest_raw(raw) {
840        Ok(_) => NetError::Success.into(),
841        Err(_) => NetError::IngestionFailed.into(),
842    }
843}
844
845/// Ingest multiple raw JSON strings (fastest batch path).
846///
847/// # Parameters
848///
849/// - `handle`: Event bus handle.
850/// - `jsons`: Array of pointers to JSON strings.
851/// - `lens`: Array of lengths for each JSON string.
852/// - `count`: Number of events in the arrays.
853///
854/// # Returns
855///
856/// Number of successfully ingested events, or negative error code.
857#[unsafe(no_mangle)]
858pub unsafe extern "C" fn net_ingest_raw_batch(
859    handle: *mut NetHandle,
860    jsons: *const *const c_char,
861    lens: *const usize,
862    count: usize,
863) -> c_int {
864    if !handle_is_valid(handle) || jsons.is_null() || lens.is_null() {
865        return NetError::NullPointer.into();
866    }
867    if count == 0 {
868        return 0;
869    }
870
871    let handle = unsafe { &*handle };
872    let _guard = match enter_ffi_op(handle) {
873        Ok(g) => g,
874        Err(err) => return err,
875    };
876    let mut events = Vec::with_capacity(count);
877    // Track per-entry drops so the caller's accounting can
878    // reconcile the returned count against the input count.
879    // Pre-fix per-entry rejects (null pointer, oversized length,
880    // invalid UTF-8) were silently `continue`-d and the caller
881    // saw `count - drops` accepted events without any signal as
882    // to which input indices were dropped. A binding that
883    // attributed the drop to back-pressure and retried got the
884    // wrong indices and double-published the good ones.
885    //
886    // The C-API contract is "returns count of accepted events";
887    // expanding it to take an out-param of dropped indices is
888    // an API addition, not a fix-in-place. Emit `tracing::warn!`
889    // with the offending index AND reason so operators
890    // observing the bus can correlate drop counts to specific
891    // inputs without changing the C surface. For high-volume
892    // bindings this should still be sized at one log line per
893    // dropped entry; if that ever matters in practice the
894    // `*_ex` follow-up can return the indices structurally.
895    let mut dropped_null = 0usize;
896    let mut dropped_oversize = 0usize;
897    let mut dropped_invalid_utf8 = 0usize;
898
899    for i in 0..count {
900        let json_ptr = unsafe { *jsons.add(i) };
901        let len = unsafe { *lens.add(i) };
902
903        if json_ptr.is_null() {
904            tracing::warn!(
905                index = i,
906                "net_ingest_raw_batch: dropping entry with null pointer"
907            );
908            dropped_null += 1;
909            continue;
910        }
911
912        // `slice::from_raw_parts` requires `len <= isize::MAX`.
913        // Skip pathological per-entry lengths rather than UB.
914        if len > isize::MAX as usize {
915            tracing::warn!(
916                index = i,
917                len,
918                "net_ingest_raw_batch: dropping entry with len > isize::MAX"
919            );
920            dropped_oversize += 1;
921            continue;
922        }
923        let json_bytes = unsafe { std::slice::from_raw_parts(json_ptr as *const u8, len) };
924        match std::str::from_utf8(json_bytes) {
925            Ok(json_str) => events.push(RawEvent::from_str(json_str)),
926            Err(_) => {
927                tracing::warn!(
928                    index = i,
929                    "net_ingest_raw_batch: dropping entry with invalid UTF-8"
930                );
931                dropped_invalid_utf8 += 1;
932            }
933        }
934    }
935    let total_dropped = dropped_null + dropped_oversize + dropped_invalid_utf8;
936    if total_dropped > 0 {
937        // Aggregate summary for log-pipeline filters that fold
938        // per-index lines.
939        tracing::warn!(
940            input_count = count,
941            dropped_null,
942            dropped_oversize,
943            dropped_invalid_utf8,
944            "net_ingest_raw_batch: {} of {} entries dropped before ingest",
945            total_dropped,
946            count,
947        );
948    }
949
950    let count = handle.bus.ingest_raw_batch(events);
951    // Returning `c_int::MAX` on overflow would be ambiguous with a real
952    // `INT_MAX` ingest. Signal overflow explicitly so callers doing
953    // accounting in high-throughput paths do not silently miscount.
954    c_int::try_from(count).unwrap_or_else(|_| NetError::IntOverflow.into())
955}
956
957/// Ingest multiple events.
958///
959/// # Parameters
960///
961/// - `handle`: Event bus handle.
962/// - `events_json`: JSON array of events (UTF-8, null-terminated).
963///
964/// # Returns
965///
966/// Number of successfully ingested events, or negative error code.
967#[unsafe(no_mangle)]
968pub unsafe extern "C" fn net_ingest_batch(
969    handle: *mut NetHandle,
970    events_json: *const c_char,
971) -> c_int {
972    if !handle_is_valid(handle) || events_json.is_null() {
973        return NetError::NullPointer.into();
974    }
975
976    let handle = unsafe { &*handle };
977    let _guard = match enter_ffi_op(handle) {
978        Ok(g) => g,
979        Err(err) => return err,
980    };
981
982    let json_str = match unsafe { CStr::from_ptr(events_json) }.to_str() {
983        Ok(s) => s,
984        Err(_) => return NetError::InvalidUtf8.into(),
985    };
986
987    // Parse as JSON array
988    let array: Vec<serde_json::Value> = match serde_json::from_str(json_str) {
989        Ok(a) => a,
990        Err(_) => return NetError::InvalidJson.into(),
991    };
992
993    let events: Vec<Event> = array.into_iter().map(Event::new).collect();
994    let count = handle.bus.ingest_batch(events);
995
996    // Returning `c_int::MAX` on overflow would be ambiguous with a real
997    // `INT_MAX` ingest. Signal overflow explicitly — matches the
998    // `net_ingest_raw_batch` contract.
999    c_int::try_from(count).unwrap_or_else(|_| NetError::IntOverflow.into())
1000}
1001
1002/// Parse the JSON request body passed to `net_poll` into a
1003/// `ConsumeRequest`. Returns the negative `NetError` code on parse
1004/// failure so the caller can surface it back across FFI. Both `limit`
1005/// and `cursor` are optional, but if either key is present with the
1006/// wrong JSON type it is an explicit error — silently falling back to
1007/// the default would hide caller bugs (e.g. the Go binding that
1008/// previously serialized `cursor` but had it dropped server-side).
1009fn parse_poll_request_json(json_str: &str) -> Result<ConsumeRequest, c_int> {
1010    let value: serde_json::Value =
1011        serde_json::from_str(json_str).map_err(|_| c_int::from(NetError::InvalidJson))?;
1012
1013    let limit = match value.get("limit") {
1014        None | Some(serde_json::Value::Null) => 100usize,
1015        Some(v) => match v.as_u64() {
1016            // `as usize` would silently truncate on 32-bit targets for
1017            // values above `usize::MAX`. Reject such inputs explicitly
1018            // so a caller asking for e.g. 2^33 events on a wasm32
1019            // build gets `InvalidJson` instead of a tiny wrap-around.
1020            Some(n) => usize::try_from(n).map_err(|_| c_int::from(NetError::InvalidJson))?,
1021            None => return Err(NetError::InvalidJson.into()),
1022        },
1023    };
1024    let cursor = match value.get("cursor") {
1025        None | Some(serde_json::Value::Null) => None,
1026        Some(v) => match v.as_str() {
1027            Some(s) => Some(s.to_owned()),
1028            None => return Err(NetError::InvalidJson.into()),
1029        },
1030    };
1031    let mut req = ConsumeRequest::new(limit);
1032    req.from_id = cursor;
1033    Ok(req)
1034}
1035
1036/// Poll events from the bus.
1037///
1038/// # Parameters
1039///
1040/// - `handle`: Event bus handle.
1041/// - `request_json`: JSON request string (UTF-8, null-terminated).
1042///   Example: `{"limit": 100, "ordering": "InsertionTs"}`
1043/// - `out_buffer`: Output buffer for JSON response.
1044/// - `buffer_len`: Size of the output buffer.
1045///
1046/// # Returns
1047///
1048/// - Number of bytes written to buffer on success
1049/// - Negative error code on failure
1050#[unsafe(no_mangle)]
1051pub unsafe extern "C" fn net_poll(
1052    handle: *mut NetHandle,
1053    request_json: *const c_char,
1054    out_buffer: *mut c_char,
1055    buffer_len: usize,
1056) -> c_int {
1057    if !handle_is_valid(handle) || out_buffer.is_null() {
1058        return NetError::NullPointer.into();
1059    }
1060
1061    let handle = unsafe { &*handle };
1062    let _guard = match enter_ffi_op(handle) {
1063        Ok(g) => g,
1064        Err(err) => return err,
1065    };
1066
1067    // Parse request
1068    let request = if request_json.is_null() {
1069        ConsumeRequest::new(100)
1070    } else {
1071        let json_str = match unsafe { CStr::from_ptr(request_json) }.to_str() {
1072            Ok(s) => s,
1073            Err(_) => return NetError::InvalidUtf8.into(),
1074        };
1075        match parse_poll_request_json(json_str) {
1076            Ok(req) => req,
1077            Err(code) => return code,
1078        }
1079    };
1080
1081    // Reject buffers too small to even hold an empty-response
1082    // JSON envelope. This catches the degenerate "tiny buffer"
1083    // case before we hit the adapter — `BufferTooSmall` returned
1084    // here means "no work was done, caller's cursor is unchanged."
1085    // 256 bytes comfortably fits the empty-response JSON below
1086    // even with a long echoed `next_id` cursor.
1087    const MIN_RESPONSE_BUFFER: usize = 256;
1088    if buffer_len < MIN_RESPONSE_BUFFER {
1089        return NetError::BufferTooSmall.into();
1090    }
1091
1092    // Stash the cursor before moving `request` into `poll()` so
1093    // the post-poll fallback can echo it back to the caller. On
1094    // overflow we write a minimal "no events delivered, cursor
1095    // unchanged" response so the caller's next poll re-fetches
1096    // the same range — events are not lost on idempotent
1097    // adapters (Redis XRANGE, JetStream direct_get).
1098    let cursor_snapshot = request.from_id.clone();
1099
1100    // Poll
1101    let response = match handle.runtime.block_on(handle.bus.poll(request)) {
1102        Ok(r) => r,
1103        Err(_) => return NetError::PollFailed.into(),
1104    };
1105
1106    // Serialize response. Events that fail to parse are included as raw
1107    // strings so the caller can see all events and detect parse failures.
1108    let total_events = response.events.len();
1109    let mut parsed_events: Vec<serde_json::Value> = Vec::with_capacity(total_events);
1110    let mut parse_errors: usize = 0;
1111    for e in &response.events {
1112        match e.parse() {
1113            Ok(v) => parsed_events.push(v),
1114            Err(_) => {
1115                parse_errors += 1;
1116                // Include the raw bytes as a string so the caller doesn't silently lose events
1117                if let Ok(raw) = e.raw_str() {
1118                    parsed_events.push(serde_json::Value::String(raw.to_string()));
1119                }
1120            }
1121        }
1122    }
1123    let response_json = match serde_json::to_string(&serde_json::json!({
1124        "events": parsed_events,
1125        "next_id": response.next_id,
1126        "has_more": response.has_more,
1127        "count": parsed_events.len(),
1128        "parse_errors": parse_errors,
1129    })) {
1130        Ok(s) => s,
1131        Err(_) => return NetError::Unknown.into(),
1132    };
1133
1134    // Buffer overflow: emit a minimal fallback response that echoes
1135    // the caller's original cursor as `next_id`. The caller's next
1136    // poll runs against the same range and re-delivers the events
1137    // (idempotent on Redis XRANGE / JetStream direct_get). Without
1138    // this, a caller that trusts `next_id` blindly would advance
1139    // past the unread batch.
1140    if response_json.len() + 1 > buffer_len {
1141        let fallback = serde_json::to_string(&serde_json::json!({
1142            "events": [],
1143            "next_id": cursor_snapshot,
1144            "has_more": true,
1145            "count": 0,
1146            "parse_errors": 0,
1147            "buffer_too_small": true,
1148            "events_dropped": total_events,
1149        }))
1150        .unwrap_or_else(|_| String::from(
1151            r#"{"events":[],"next_id":null,"has_more":true,"count":0,"parse_errors":0,"buffer_too_small":true}"#
1152        ));
1153        if fallback.len() < buffer_len {
1154            unsafe {
1155                ptr::copy_nonoverlapping(
1156                    fallback.as_ptr() as *const c_char,
1157                    out_buffer,
1158                    fallback.len(),
1159                );
1160                *out_buffer.add(fallback.len()) = 0;
1161            }
1162        }
1163        return NetError::BufferTooSmall.into();
1164    }
1165
1166    // Copy to output buffer
1167    unsafe {
1168        ptr::copy_nonoverlapping(
1169            response_json.as_ptr() as *const c_char,
1170            out_buffer,
1171            response_json.len(),
1172        );
1173        *out_buffer.add(response_json.len()) = 0; // Null terminate
1174    }
1175
1176    // Data was already copied into the caller's buffer; a
1177    // `c_int` overflow here means the byte count exceeds c_int's
1178    // range, NOT that the buffer was too small. Returning
1179    // `BufferTooSmall` would tell the caller to "resize and retry"
1180    // when retrying can't fix the actual condition. `IntOverflow`
1181    // is the documented variant for this case.
1182    match c_int::try_from(response_json.len()) {
1183        Ok(n) => n,
1184        Err(_) => NetError::IntOverflow.into(),
1185    }
1186}
1187
1188/// Get event bus statistics.
1189///
1190/// # Parameters
1191///
1192/// - `handle`: Event bus handle.
1193/// - `out_buffer`: Output buffer for JSON statistics.
1194/// - `buffer_len`: Size of the output buffer.
1195///
1196/// # Returns
1197///
1198/// Number of bytes written, or negative error code.
1199#[unsafe(no_mangle)]
1200pub unsafe extern "C" fn net_stats(
1201    handle: *mut NetHandle,
1202    out_buffer: *mut c_char,
1203    buffer_len: usize,
1204) -> c_int {
1205    if !handle_is_valid(handle) || out_buffer.is_null() {
1206        return NetError::NullPointer.into();
1207    }
1208
1209    let handle = unsafe { &*handle };
1210    let _guard = match enter_ffi_op(handle) {
1211        Ok(g) => g,
1212        Err(err) => return err,
1213    };
1214    let stats = handle.bus.stats();
1215    let shard_stats = handle.bus.shard_stats();
1216
1217    let stats_json = match serde_json::to_string(&serde_json::json!({
1218        "events_ingested": stats.events_ingested.load(std::sync::atomic::Ordering::Relaxed),
1219        "events_dropped": stats.events_dropped.load(std::sync::atomic::Ordering::Relaxed),
1220        "batches_dispatched": stats.batches_dispatched.load(std::sync::atomic::Ordering::Relaxed),
1221        "shard_events_ingested": shard_stats.events_ingested,
1222        "shard_events_dropped": shard_stats.events_dropped,
1223        "shard_batches_dispatched": shard_stats.batches_dispatched,
1224    })) {
1225        Ok(s) => s,
1226        Err(_) => return NetError::Unknown.into(),
1227    };
1228
1229    if stats_json.len() + 1 > buffer_len {
1230        return NetError::BufferTooSmall.into();
1231    }
1232
1233    unsafe {
1234        ptr::copy_nonoverlapping(
1235            stats_json.as_ptr() as *const c_char,
1236            out_buffer,
1237            stats_json.len(),
1238        );
1239        *out_buffer.add(stats_json.len()) = 0;
1240    }
1241
1242    // See net_poll above — the data was already copied, so an
1243    // overflowing length is `IntOverflow`, not `BufferTooSmall`.
1244    match c_int::try_from(stats_json.len()) {
1245        Ok(n) => n,
1246        Err(_) => NetError::IntOverflow.into(),
1247    }
1248}
1249
1250/// Flush all pending batches to the adapter.
1251///
1252/// # Parameters
1253///
1254/// - `handle`: Event bus handle.
1255///
1256/// # Returns
1257///
1258/// - `0` on success
1259/// - Negative error code on failure
1260#[unsafe(no_mangle)]
1261pub unsafe extern "C" fn net_flush(handle: *mut NetHandle) -> c_int {
1262    if !handle_is_valid(handle) {
1263        return NetError::NullPointer.into();
1264    }
1265
1266    let handle = unsafe { &*handle };
1267    let _guard = match enter_ffi_op(handle) {
1268        Ok(g) => g,
1269        Err(err) => return err,
1270    };
1271
1272    match handle.runtime.block_on(handle.bus.flush()) {
1273        Ok(_) => NetError::Success.into(),
1274        Err(_) => NetError::Unknown.into(),
1275    }
1276}
1277
1278/// Shut down the event bus and free resources.
1279///
1280/// # Parameters
1281///
1282/// - `handle`: Event bus handle. After this call, the handle is invalid.
1283///
1284/// # Returns
1285///
1286/// - `0` on success
1287/// - Negative error code on failure (including `Unknown` if the
1288///   bounded wait for in-flight FFI operations expired before the bus
1289///   could be shut down cleanly)
1290///
1291/// # Notes
1292///
1293/// The handle's storage is intentionally leaked: the box is never
1294/// returned to the allocator. See `NetHandle`'s docs for why. This is
1295/// a one-time cost per shutdown — typically per-process, since most C
1296/// callers initialize the bus once and shut down once.
1297#[unsafe(no_mangle)]
1298pub unsafe extern "C" fn net_shutdown(handle: *mut NetHandle) -> c_int {
1299    if !handle_is_valid(handle) {
1300        return NetError::NullPointer.into();
1301    }
1302
1303    // Scope the `&NetHandle` borrow into an inner block so it is
1304    // verifiably out of scope before the
1305    // `ManuallyDrop::take(&mut (*handle).bus)` calls below.
1306    // Holding an `&NetHandle` in scope for the whole function
1307    // while taking a raw `&mut (*handle).bus` later would rely on
1308    // NLL ending the immutable borrow before the mutable take —
1309    // a pattern fragile under stacked/tree borrow models. The
1310    // block-scoped borrow makes the lifetime constraint explicit
1311    // and obvious to both the compiler and any future maintainer.
1312    let drained_and_taken = {
1313        // SAFETY: The C contract guarantees `handle` is valid here and that
1314        // `net_shutdown` is not called concurrently with itself. Future
1315        // dereferences of the box from concurrent FFI ops on other threads
1316        // are also sound because we never free the box (see below).
1317        let handle_ref = unsafe { &*handle };
1318
1319        // Signal shutdown so concurrent FFI calls bail before touching
1320        // `bus`/`runtime`. SeqCst pairs with `FfiOpGuard::try_enter`.
1321        handle_ref
1322            .shutting_down
1323            .store(true, std::sync::atomic::Ordering::SeqCst);
1324
1325        // Bounded wait for in-flight ops to drain. Without a deadline, a
1326        // hung concurrent operation (e.g. `net_flush` against a stalled
1327        // adapter) would pin a CPU at 100% inside this loop forever.
1328        //
1329        // `std::hint::spin_loop()` is a CPU pause hint, not a yield. On
1330        // a single-threaded executor (or any configuration where the FFI
1331        // caller's thread is the same one that needs to make progress on
1332        // the in-flight async work) the tight spin starves the very tokio
1333        // worker we're waiting for, *causing* the deadline to expire when
1334        // it otherwise wouldn't. `thread::yield_now` lets the OS schedule
1335        // whatever's blocked, and a 1ms `thread::sleep` between yields
1336        // prevents the loop from saturating a CPU on platforms where
1337        // `yield_now` is a
1338        // near-no-op under low contention. The drain we expect to take
1339        // milliseconds, so a millisecond-granularity poll is fine.
1340        let deadline = std::time::Instant::now() + FFI_SHUTDOWN_DEADLINE;
1341        let mut drained = false;
1342        loop {
1343            if handle_ref
1344                .active_ops
1345                .load(std::sync::atomic::Ordering::SeqCst)
1346                == 0
1347            {
1348                drained = true;
1349                break;
1350            }
1351            if std::time::Instant::now() >= deadline {
1352                break;
1353            }
1354            std::thread::yield_now();
1355            std::thread::sleep(std::time::Duration::from_millis(1));
1356        }
1357
1358        if !drained {
1359            // In-flight ops may still be reading `bus`/`runtime`; reading
1360            // them out via `ManuallyDrop::take` would race those readers.
1361            // Leak both fields along with the box. Future ops still see
1362            // `shutting_down=true` and bail before touching either field,
1363            // so the leaked memory is never read again.
1364            return NetError::Unknown.into();
1365        }
1366
1367        // Idempotent shutdown: if a previous `net_shutdown` already
1368        // moved out the bus/runtime, do not call `ManuallyDrop::take`
1369        // a second time (that would be UB). The first call may still
1370        // be inside `runtime.block_on(bus.shutdown())` though — pre-
1371        // fix the second caller observed `bus_taken == true` and
1372        // returned `Success` immediately, falsely signaling
1373        // completion of an in-progress shutdown. Spin on
1374        // `shutdown_completed` (set by the first caller AFTER
1375        // `bus.shutdown()` returns) so subsequent callers wait for
1376        // the actual completion.
1377        if handle_ref
1378            .bus_taken
1379            .swap(true, std::sync::atomic::Ordering::SeqCst)
1380        {
1381            // Wait for the first caller to actually finish.
1382            // Bounded by the same FFI_SHUTDOWN_DEADLINE as the
1383            // `active_ops` drain — if the first caller is wedged
1384            // longer than that, we surface a Transient error rather
1385            // than block forever.
1386            let inner_deadline = std::time::Instant::now() + FFI_SHUTDOWN_DEADLINE;
1387            while !handle_ref
1388                .shutdown_completed
1389                .load(std::sync::atomic::Ordering::Acquire)
1390            {
1391                if std::time::Instant::now() >= inner_deadline {
1392                    return NetError::Unknown.into();
1393                }
1394                std::thread::yield_now();
1395                std::thread::sleep(std::time::Duration::from_millis(1));
1396            }
1397            return NetError::Success.into();
1398        }
1399        drained
1400    };
1401    let _ = drained_and_taken;
1402
1403    // SAFETY: `active_ops` reached zero with `shutting_down=true`, so:
1404    //   - Every FFI op that started before shutdown has fully
1405    //     completed (decremented `active_ops` on guard drop).
1406    //   - Any future FFI op will observe `shutting_down=true` and
1407    //     bail in `try_enter` before touching `bus` / `runtime`.
1408    // Plus, `bus_taken` was just CAS'd from false → true, so no other
1409    // shutdown is concurrently moving the same fields out. The
1410    // immutable `handle_ref` borrow above has been dropped (block
1411    // scope ended), so the `&mut`-via-raw-pointer below is the
1412    // only live access — no stacked/tree-borrow race.
1413    //
1414    // We deliberately do NOT call `Box::from_raw` here. The box's
1415    // `shutting_down` / `active_ops` / `bus_taken` atomics must remain
1416    // valid memory because future FFI ops still dereference the
1417    // C-side pointer to check them. Leaking the box is the
1418    // correctness fix for the previous use-after-free; the per-handle
1419    // storage cost is a one-time overhead.
1420    let bus = unsafe { std::mem::ManuallyDrop::take(&mut (*handle).bus) };
1421    let runtime = unsafe { std::mem::ManuallyDrop::take(&mut (*handle).runtime) };
1422
1423    // Flush pending batches and gracefully shut down the adapter
1424    // before dropping the runtime. Without this, pending events in
1425    // ring buffers and batch workers would be silently lost.
1426    let result = runtime.block_on(bus.shutdown());
1427
1428    // `bus` and `runtime` go out of scope here and are dropped.
1429    // The leaked box keeps the atomics alive for any straggler ops.
1430
1431    // Signal completion to any second/third caller spinning on
1432    // `shutdown_completed` in the idempotent path above. Done
1433    // AFTER `bus.shutdown()` returns and AFTER the bus / runtime
1434    // drop, so subsequent callers can rely on this flag as a
1435    // hard "shutdown is fully done" barrier.
1436    unsafe { &*handle }
1437        .shutdown_completed
1438        .store(true, std::sync::atomic::Ordering::Release);
1439
1440    match result {
1441        Ok(()) => NetError::Success.into(),
1442        Err(_) => NetError::Unknown.into(),
1443    }
1444}
1445
1446/// Get the number of shards.
1447///
1448/// # Parameters
1449///
1450/// - `handle`: Event bus handle.
1451///
1452/// # Returns
1453///
1454/// Number of shards, or 0 if handle is null.
1455#[unsafe(no_mangle)]
1456pub unsafe extern "C" fn net_num_shards(handle: *mut NetHandle) -> u16 {
1457    if !handle_is_valid(handle) {
1458        return 0;
1459    }
1460    let handle = unsafe { &*handle };
1461    let _guard = match enter_ffi_op(handle) {
1462        Ok(g) => g,
1463        Err(_) => return 0,
1464    };
1465    handle.bus.num_shards()
1466}
1467
1468/// Get the library version.
1469///
1470/// # Returns
1471///
1472/// Version string (static, do not free).
1473#[unsafe(no_mangle)]
1474pub unsafe extern "C" fn net_version() -> *const c_char {
1475    static VERSION: &[u8] = b"0.8.0\0";
1476    VERSION.as_ptr() as *const c_char
1477}
1478
1479/// Generate a new Net keypair.
1480///
1481/// # Returns
1482///
1483/// JSON string with hex-encoded public_key and secret_key.
1484/// The caller must free the returned string with `net_free_string`.
1485/// Returns NULL if Net feature is not enabled.
1486#[cfg(feature = "net")]
1487#[unsafe(no_mangle)]
1488pub unsafe extern "C" fn net_generate_keypair() -> *mut c_char {
1489    let keypair = StaticKeypair::generate();
1490    let json = serde_json::json!({
1491        "public_key": hex::encode(keypair.public_key()),
1492        "secret_key": hex::encode(keypair.secret_key()),
1493    });
1494
1495    match CString::new(json.to_string()) {
1496        Ok(s) => s.into_raw(),
1497        Err(_) => ptr::null_mut(),
1498    }
1499}
1500
1501/// Free a string returned by Net functions.
1502///
1503/// # Parameters
1504///
1505/// - `s`: String pointer returned by `net_generate_keypair` or similar.
1506#[cfg(feature = "net")]
1507#[unsafe(no_mangle)]
1508pub unsafe extern "C" fn net_free_string(s: *mut c_char) {
1509    if !s.is_null() {
1510        unsafe {
1511            drop(CString::from_raw(s));
1512        }
1513    }
1514}
1515
1516// `net.h` declares both `net_generate_keypair` and
1517// `net_free_string` unconditionally — a consumer linking against
1518// a cdylib built without the `net` feature would otherwise hit
1519// a load-time missing-symbol error despite the header advertising
1520// the symbol. Provide always-empty stubs so the symbol is
1521// resolvable on every build configuration. Mirrors the
1522// `nat-traversal` cfg pattern in `mesh.rs`.
1523
1524/// Stub for builds without the `net` feature.
1525///
1526/// `net.h` declares `net_generate_keypair` unconditionally, so
1527/// the symbol must be resolvable on every build configuration.
1528/// Returns NULL since keypair generation requires the net feature.
1529#[cfg(not(feature = "net"))]
1530#[unsafe(no_mangle)]
1531pub unsafe extern "C" fn net_generate_keypair() -> *mut c_char {
1532    ptr::null_mut()
1533}
1534
1535/// Stub for builds without the `net` feature.
1536///
1537/// Mirrors the always-on signature in `net.h`. Reclaims a
1538/// CString-allocated pointer if non-null.
1539#[cfg(not(feature = "net"))]
1540#[unsafe(no_mangle)]
1541pub unsafe extern "C" fn net_free_string(s: *mut c_char) {
1542    if !s.is_null() {
1543        unsafe {
1544            drop(std::ffi::CString::from_raw(s));
1545        }
1546    }
1547}
1548
1549// =========================================================================
1550// Structured (non-JSON) API — _ex variants
1551// =========================================================================
1552
1553/// Ingestion receipt for C consumers.
1554#[repr(C)]
1555pub struct NetReceipt {
1556    /// Shard the event was assigned to.
1557    pub shard_id: u16,
1558    /// Insertion timestamp (nanoseconds).
1559    pub timestamp: u64,
1560}
1561
1562// Pin layout invariants for `NetReceipt`. `#[repr(C)]` already
1563// gives C ABI compatibility per platform, but doesn't catch a
1564// future field-reorder or field-add — both would silently break
1565// any C/Go/Python binding that hard-codes the struct layout.
1566// Static asserts on 64-bit targets (the production deployment
1567// shape) trip CI before such a change reaches a binary release.
1568//
1569// 64-bit: `u16 (2) + 6 pad + u64 (8)` = 16 bytes, alignment 8.
1570#[cfg(target_pointer_width = "64")]
1571const _: () = assert!(
1572    std::mem::size_of::<NetReceipt>() == 16,
1573    "NetReceipt size changed on 64-bit; bindings hard-code 16. \
1574     If the change is intentional, bump the binding versions and \
1575     update this assertion."
1576);
1577#[cfg(target_pointer_width = "64")]
1578const _: () = assert!(
1579    std::mem::align_of::<NetReceipt>() == 8,
1580    "NetReceipt alignment changed on 64-bit; bindings expect 8."
1581);
1582
1583/// A single stored event for C consumers.
1584///
1585/// # Safety contract for callers
1586///
1587/// `id`/`id_len` and `raw`/`raw_len` are produced by Rust as a
1588/// `Box<[u8]>` whose fat-pointer length is reconstructed at free
1589/// time from `id_len` / `raw_len`. The fields are `pub` because
1590/// `#[repr(C)]` exposes them to C, **but they must be treated as
1591/// read-only** between the `net_poll_*` call that produced them
1592/// and the `net_free_poll_result` that consumes them.
1593///
1594/// Mutating `id_len` or `raw_len` (or copying the struct, replacing
1595/// the pointer, and then freeing) causes
1596/// `Box::from_raw(slice_from_raw_parts_mut(ptr, wrong_len))` to be
1597/// undefined behavior on free — the allocator records the
1598/// allocation size and any mismatch is UB.
1599#[repr(C)]
1600pub struct NetEvent {
1601    /// Event ID (not null-terminated, use `id_len`).
1602    /// Read-only after `net_poll_*`; do not mutate.
1603    pub id: *const c_char,
1604    /// Length of the event ID. Read-only after `net_poll_*`; do not
1605    /// mutate (mutation causes UB on free).
1606    pub id_len: usize,
1607    /// Raw JSON payload (not null-terminated, use `raw_len`).
1608    /// Read-only after `net_poll_*`; do not mutate.
1609    pub raw: *const c_char,
1610    /// Length of the raw JSON payload. Read-only after
1611    /// `net_poll_*`; do not mutate (mutation causes UB on free).
1612    pub raw_len: usize,
1613    /// Insertion timestamp (nanoseconds).
1614    pub insertion_ts: u64,
1615    /// Shard ID.
1616    pub shard_id: u16,
1617}
1618
1619// Pin layout invariants for `NetEvent`. See `NetReceipt`'s
1620// asserts for rationale. Bindings (C, Go, Python, Node) hard-
1621// code 48 bytes on 64-bit; an accidental reorder or new field
1622// would silently shift every offset.
1623//
1624// 64-bit: `4 × 8 (ptrs/usize) + u64 (8) + u16 (2) + 6 trail` = 48.
1625#[cfg(target_pointer_width = "64")]
1626const _: () = assert!(
1627    std::mem::size_of::<NetEvent>() == 48,
1628    "NetEvent size changed on 64-bit; bindings hard-code 48. \
1629     If the change is intentional, bump the binding versions and \
1630     update this assertion."
1631);
1632#[cfg(target_pointer_width = "64")]
1633const _: () = assert!(
1634    std::mem::align_of::<NetEvent>() == 8,
1635    "NetEvent alignment changed on 64-bit; bindings expect 8."
1636);
1637
1638/// Poll result for C consumers.
1639#[repr(C)]
1640pub struct NetPollResult {
1641    /// Array of events. Free with `net_free_poll_result`.
1642    pub events: *mut NetEvent,
1643    /// Number of events in the array.
1644    pub count: usize,
1645    /// Cursor for the next poll (null-terminated). NULL if no more.
1646    pub next_id: *mut c_char,
1647    /// 1 if more events are available, 0 otherwise.
1648    pub has_more: c_int,
1649}
1650
1651/// Stats for C consumers.
1652#[repr(C)]
1653pub struct NetStats {
1654    /// Total events ingested.
1655    pub events_ingested: u64,
1656    /// Events dropped due to backpressure.
1657    pub events_dropped: u64,
1658    /// Batches dispatched to the adapter.
1659    pub batches_dispatched: u64,
1660}
1661
1662/// Ingest raw JSON with structured receipt.
1663#[unsafe(no_mangle)]
1664pub unsafe extern "C" fn net_ingest_raw_ex(
1665    handle: *mut NetHandle,
1666    json: *const c_char,
1667    len: usize,
1668    out: *mut NetReceipt,
1669) -> c_int {
1670    if !handle_is_valid(handle) || json.is_null() {
1671        return NetError::NullPointer.into();
1672    }
1673
1674    let handle = unsafe { &*handle };
1675    let _guard = match enter_ffi_op(handle) {
1676        Ok(g) => g,
1677        Err(err) => return err,
1678    };
1679
1680    // `slice::from_raw_parts` requires `len <= isize::MAX`.
1681    if len > isize::MAX as usize {
1682        return NetError::InvalidJson.into();
1683    }
1684    let json_bytes = unsafe { std::slice::from_raw_parts(json as *const u8, len) };
1685    let json_str = match std::str::from_utf8(json_bytes) {
1686        Ok(s) => s,
1687        Err(_) => return NetError::InvalidUtf8.into(),
1688    };
1689
1690    let raw = RawEvent::from_str(json_str);
1691
1692    match handle.bus.ingest_raw(raw) {
1693        Ok((shard_id, timestamp)) => {
1694            if !out.is_null() {
1695                unsafe {
1696                    (*out).shard_id = shard_id;
1697                    (*out).timestamp = timestamp;
1698                }
1699            }
1700            NetError::Success.into()
1701        }
1702        Err(_) => NetError::IngestionFailed.into(),
1703    }
1704}
1705
1706/// Poll events with structured result (no JSON overhead).
1707///
1708/// The caller must free the result with `net_free_poll_result`.
1709#[unsafe(no_mangle)]
1710pub unsafe extern "C" fn net_poll_ex(
1711    handle: *mut NetHandle,
1712    limit: usize,
1713    cursor: *const c_char,
1714    out: *mut NetPollResult,
1715) -> c_int {
1716    if !handle_is_valid(handle) || out.is_null() {
1717        return NetError::NullPointer.into();
1718    }
1719
1720    // Pre-validate `limit` BEFORE calling `bus.poll` — the bus
1721    // advances the consumer cursor before returning, so any
1722    // post-poll allocation failure (e.g. `Layout::array::<NetEvent>`
1723    // overflow on a pathological `count`, or `std::alloc::alloc`
1724    // returning null under memory pressure) would drop the response
1725    // and lose every event the cursor just stepped past. Reject
1726    // requests whose `count * size_of::<NetEvent>` would overflow
1727    // `isize::MAX` (the `Layout::array` cap) up front, so the
1728    // failure happens before the cursor moves.
1729    if limit > 0
1730        && (std::mem::size_of::<NetEvent>())
1731            .checked_mul(limit)
1732            .is_none_or(|v| v > isize::MAX as usize)
1733    {
1734        return NetError::IntOverflow.into();
1735    }
1736
1737    let handle = unsafe { &*handle };
1738    let _guard = match enter_ffi_op(handle) {
1739        Ok(g) => g,
1740        Err(err) => return err,
1741    };
1742
1743    let mut request = ConsumeRequest::new(limit);
1744    if !cursor.is_null() {
1745        if let Ok(s) = unsafe { CStr::from_ptr(cursor) }.to_str() {
1746            if !s.is_empty() {
1747                request = request.from(s);
1748            }
1749        }
1750    }
1751
1752    let response = match handle.runtime.block_on(handle.bus.poll(request)) {
1753        Ok(r) => r,
1754        Err(_) => return NetError::PollFailed.into(),
1755    };
1756
1757    let count = response.events.len();
1758
1759    // Allocate events array.
1760    //
1761    // Each iteration allocates two boxed byte slices via
1762    // `Vec::to_vec().into_boxed_slice()`, which panic on OOM in
1763    // the global allocator. A panic across this `extern "C"`
1764    // body is UB — under the cgo/N-API/cffi unwind model the
1765    // panic propagates into a frame that doesn't expect it. Wrap
1766    // the per-event build in `catch_unwind`, track how many
1767    // events we've fully written, and on panic / mid-loop
1768    // failure free the partial array via `free_events_array`
1769    // so neither UB nor the partial allocations leak.
1770    let events_ptr = if count > 0 {
1771        let layout = match std::alloc::Layout::array::<NetEvent>(count) {
1772            Ok(l) => l,
1773            Err(_) => return NetError::Unknown.into(),
1774        };
1775        let ptr = unsafe { std::alloc::alloc(layout) as *mut NetEvent };
1776        if ptr.is_null() {
1777            return NetError::Unknown.into();
1778        }
1779
1780        // Shared counter so the outer scope can clean up partial
1781        // writes if any iteration panics.
1782        let completed = std::cell::Cell::new(0usize);
1783        let build_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1784            for (i, event) in response.events.iter().enumerate() {
1785                let id_bytes = event.id.as_bytes().to_vec().into_boxed_slice();
1786                let id_len = id_bytes.len();
1787                let id_ptr = Box::into_raw(id_bytes) as *const c_char;
1788
1789                let raw_bytes = event.raw.to_vec().into_boxed_slice();
1790                let raw_len = raw_bytes.len();
1791                let raw_ptr = Box::into_raw(raw_bytes) as *const c_char;
1792
1793                unsafe {
1794                    ptr.add(i).write(NetEvent {
1795                        id: id_ptr,
1796                        id_len,
1797                        raw: raw_ptr,
1798                        raw_len,
1799                        insertion_ts: event.insertion_ts,
1800                        shard_id: event.shard_id,
1801                    });
1802                }
1803                completed.set(i + 1);
1804            }
1805        }));
1806        if build_result.is_err() {
1807            // A panic landed mid-loop. Free fully-written events
1808            // (those past `completed.get()` were never written, so
1809            // the inner `id`/`raw` pointers aren't valid). The
1810            // events array was allocated for `count` NetEvent
1811            // slots, so the dealloc must use that same layout.
1812            free_events_array_partial(ptr, completed.get(), count);
1813            return NetError::Unknown.into();
1814        }
1815        ptr
1816    } else {
1817        ptr::null_mut()
1818    };
1819
1820    // Leak next_id if present.
1821    let next_id_ptr = match response.next_id {
1822        Some(ref s) => match std::ffi::CString::new(s.as_str()) {
1823            Ok(c) => c.into_raw(),
1824            Err(_) => {
1825                // Free already-allocated events before returning
1826                // error. `s.as_str()` is valid UTF-8 by `String`
1827                // invariant, so this is the interior-NUL path —
1828                // an upstream cursor id that contains `\0` cannot
1829                // round-trip through a C string. Pre-fix this
1830                // returned `InvalidUtf8`, which mis-described
1831                // the cause; bindings now see the more accurate
1832                // `InteriorNul`.
1833                free_events_array(events_ptr, count);
1834                return NetError::InteriorNul.into();
1835            }
1836        },
1837        None => ptr::null_mut(),
1838    };
1839
1840    unsafe {
1841        (*out).events = events_ptr;
1842        (*out).count = count;
1843        (*out).next_id = next_id_ptr;
1844        (*out).has_more = if response.has_more { 1 } else { 0 };
1845    }
1846
1847    NetError::Success.into()
1848}
1849
1850/// Free an events array and all its id/raw allocations.
1851///
1852/// `count` is the number of fully-written events (those whose
1853/// inner `id` / `raw` boxed slices were initialized). It must
1854/// also match the `Layout::array::<NetEvent>` used at allocation
1855/// time — every existing caller writes exactly `count` events
1856/// before invoking this function. For partial-cleanup paths
1857/// (e.g. panic mid-build), use [`free_events_array_partial`].
1858fn free_events_array(events: *mut NetEvent, count: usize) {
1859    free_events_array_partial(events, count, count);
1860}
1861
1862/// Free an events array where only `walk_count` entries have
1863/// fully-initialized `id`/`raw` allocations, but the array
1864/// itself was allocated for `alloc_count` slots. Per-event
1865/// boxes are freed for `0..walk_count`; the array is then
1866/// deallocated with the original `Layout::array::<NetEvent>(alloc_count)`
1867/// to match the allocation. Used by `net_poll_ex`'s panic-mid-loop
1868/// recovery path.
1869fn free_events_array_partial(events: *mut NetEvent, walk_count: usize, alloc_count: usize) {
1870    if events.is_null() || alloc_count == 0 {
1871        return;
1872    }
1873    for i in 0..walk_count {
1874        let event = unsafe { &*events.add(i) };
1875        if !event.id.is_null() {
1876            unsafe {
1877                let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(
1878                    event.id as *mut u8,
1879                    event.id_len,
1880                ));
1881            }
1882        }
1883        if !event.raw.is_null() {
1884            unsafe {
1885                let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(
1886                    event.raw as *mut u8,
1887                    event.raw_len,
1888                ));
1889            }
1890        }
1891    }
1892    if let Ok(layout) = std::alloc::Layout::array::<NetEvent>(alloc_count) {
1893        unsafe {
1894            std::alloc::dealloc(events as *mut u8, layout);
1895        }
1896    }
1897}
1898
1899/// Free the internal allocations of a poll result returned by `net_poll_ex`.
1900///
1901/// This frees the events array (including each event's `id` and `raw` buffers)
1902/// and the `next_id` string. It does **not** free the `NetPollResult` struct
1903/// itself, which is caller-provided (typically stack-allocated or managed by
1904/// the caller).
1905#[unsafe(no_mangle)]
1906pub unsafe extern "C" fn net_free_poll_result(result: *mut NetPollResult) {
1907    if result.is_null() {
1908        return;
1909    }
1910
1911    let result = unsafe { &mut *result };
1912
1913    // Free events array and all id/raw allocations.
1914    free_events_array(result.events, result.count);
1915
1916    // Free next_id.
1917    if !result.next_id.is_null() {
1918        unsafe {
1919            drop(std::ffi::CString::from_raw(result.next_id));
1920        }
1921    }
1922
1923    // Null the fields so a second `net_free_poll_result` on the
1924    // same struct is a safe no-op rather than a double-free. The
1925    // C header's contract just says "free a poll result"; without
1926    // this clear, a defensive caller calling free twice (or two
1927    // wrappers each calling free in their destructor) would
1928    // re-`Box::from_raw` an already-freed pointer.
1929    result.events = std::ptr::null_mut();
1930    result.count = 0;
1931    result.next_id = std::ptr::null_mut();
1932    result.has_more = 0;
1933}
1934
1935/// Get stats without JSON serialization.
1936#[unsafe(no_mangle)]
1937pub unsafe extern "C" fn net_stats_ex(handle: *mut NetHandle, out: *mut NetStats) -> c_int {
1938    if !handle_is_valid(handle) || out.is_null() {
1939        return NetError::NullPointer.into();
1940    }
1941
1942    let handle = unsafe { &*handle };
1943    let _guard = match enter_ffi_op(handle) {
1944        Ok(g) => g,
1945        Err(err) => return err,
1946    };
1947    let stats = handle.bus.stats();
1948
1949    unsafe {
1950        (*out).events_ingested = stats
1951            .events_ingested
1952            .load(std::sync::atomic::Ordering::Relaxed);
1953        (*out).events_dropped = stats
1954            .events_dropped
1955            .load(std::sync::atomic::Ordering::Relaxed);
1956        (*out).batches_dispatched = stats
1957            .batches_dispatched
1958            .load(std::sync::atomic::Ordering::Relaxed);
1959    }
1960
1961    NetError::Success.into()
1962}
1963
1964#[cfg(test)]
1965mod tests {
1966    use super::*;
1967
1968    #[test]
1969    fn test_parse_config_valid() {
1970        let config = parse_config_json(r#"{"num_shards": 8}"#);
1971        assert!(config.is_some());
1972    }
1973
1974    #[test]
1975    fn test_parse_config_num_shards_overflow() {
1976        // u16::MAX is 65535, so 65536 should fail
1977        let config = parse_config_json(r#"{"num_shards": 65536}"#);
1978        assert!(
1979            config.is_none(),
1980            "num_shards exceeding u16::MAX should fail"
1981        );
1982
1983        // Much larger value should also fail
1984        let config = parse_config_json(r#"{"num_shards": 100000}"#);
1985        assert!(
1986            config.is_none(),
1987            "num_shards exceeding u16::MAX should fail"
1988        );
1989    }
1990
1991    #[test]
1992    fn test_parse_config_num_shards_max_valid() {
1993        // u16::MAX (65535) should be valid
1994        let config = parse_config_json(r#"{"num_shards": 65535}"#);
1995        assert!(config.is_some(), "num_shards at u16::MAX should be valid");
1996    }
1997
1998    #[test]
1999    fn test_parse_config_invalid_json() {
2000        let config = parse_config_json(r#"{"num_shards": invalid}"#);
2001        assert!(config.is_none());
2002    }
2003
2004    #[test]
2005    fn test_parse_config_empty() {
2006        let config = parse_config_json(r#"{}"#);
2007        assert!(config.is_some(), "empty config should use defaults");
2008    }
2009
2010    /// Pin: known `backpressure_mode` strings round-trip; an
2011    /// unknown value (typo) is rejected with `None`, not silently
2012    /// downgraded to `DropNewest`. Pre-fix a deploy-time typo
2013    /// like `"DropOldset"` swapped the operator's intended
2014    /// durability for `DropNewest` with no diagnostic.
2015    #[test]
2016    fn parse_config_rejects_unknown_backpressure_mode() {
2017        // Known values still parse.
2018        for s in [
2019            "DropNewest",
2020            "drop_newest",
2021            "DropOldest",
2022            "drop_oldest",
2023            "FailProducer",
2024            "fail_producer",
2025        ] {
2026            let cfg = parse_config_json(&format!(r#"{{"backpressure_mode": "{}"}}"#, s));
2027            assert!(cfg.is_some(), "known mode `{}` must parse", s);
2028        }
2029
2030        // Typos must fail.
2031        for s in ["DropOldset", "FailProduce", "drop_oldst", "garbage", ""] {
2032            let cfg = parse_config_json(&format!(r#"{{"backpressure_mode": "{}"}}"#, s));
2033            assert!(
2034                cfg.is_none(),
2035                "unknown mode `{}` must reject (pre-fix this silently \
2036                 fell through to DropNewest)",
2037                s,
2038            );
2039        }
2040
2041        // Wrong JSON type also fails — pre-fix this hit the
2042        // `and_then(|v| v.as_str())` short-circuit and was
2043        // ignored entirely.
2044        let cfg = parse_config_json(r#"{"backpressure_mode": 42}"#);
2045        assert!(
2046            cfg.is_none(),
2047            "non-string non-object backpressure_mode must reject"
2048        );
2049        let cfg = parse_config_json(r#"{"backpressure_mode": true}"#);
2050        assert!(cfg.is_none(), "boolean backpressure_mode must reject");
2051    }
2052
2053    /// Pin: the `Sample { rate }` mode is reachable from JSON
2054    /// via `{"backpressure_mode": {"Sample": {"rate": N}}}`,
2055    /// and a zero rate is rejected (validator already rejects
2056    /// it; the parser must too, so the surface is consistent).
2057    #[test]
2058    fn parse_config_supports_sample_mode_with_validation() {
2059        let cfg = parse_config_json(r#"{"backpressure_mode": {"Sample": {"rate": 10}}}"#);
2060        assert!(cfg.is_some(), "Sample with non-zero rate must parse");
2061
2062        let cfg = parse_config_json(r#"{"backpressure_mode": {"Sample": {"rate": 0}}}"#);
2063        assert!(cfg.is_none(), "Sample with rate=0 must reject");
2064
2065        let cfg = parse_config_json(r#"{"backpressure_mode": {"Sample": {}}}"#);
2066        assert!(cfg.is_none(), "Sample missing rate must reject");
2067    }
2068
2069    // Regression: the Go binding's `Poll(limit, cursor)` serializes a
2070    // `"cursor"` field that the FFI JSON path previously ignored —
2071    // cross-shard pagination silently broke. `parse_poll_request_json`
2072    // must round-trip the cursor into `ConsumeRequest.from_id`.
2073    #[test]
2074    fn test_parse_poll_request_preserves_cursor() {
2075        let req = parse_poll_request_json(r#"{"limit": 50, "cursor": "abc:123"}"#).unwrap();
2076        assert_eq!(req.limit, 50);
2077        assert_eq!(req.from_id.as_deref(), Some("abc:123"));
2078    }
2079
2080    #[test]
2081    fn test_parse_poll_request_no_cursor_defaults_to_none() {
2082        let req = parse_poll_request_json(r#"{"limit": 10}"#).unwrap();
2083        assert_eq!(req.limit, 10);
2084        assert_eq!(req.from_id, None);
2085    }
2086
2087    #[test]
2088    fn test_parse_poll_request_empty_uses_default_limit() {
2089        let req = parse_poll_request_json(r#"{}"#).unwrap();
2090        assert_eq!(req.limit, 100);
2091        assert_eq!(req.from_id, None);
2092    }
2093
2094    // Regression: a wrong-typed `"limit"` previously hit
2095    // `.as_u64().unwrap_or(100)` and silently defaulted. Caller bugs
2096    // (e.g. sending a string or a negative number) must surface as
2097    // `InvalidJson` instead.
2098    #[test]
2099    fn test_parse_poll_request_wrong_type_limit_errors() {
2100        let err = parse_poll_request_json(r#"{"limit": "50"}"#).unwrap_err();
2101        assert_eq!(err, c_int::from(NetError::InvalidJson));
2102        let err = parse_poll_request_json(r#"{"limit": -1}"#).unwrap_err();
2103        assert_eq!(err, c_int::from(NetError::InvalidJson));
2104    }
2105
2106    #[test]
2107    fn test_parse_poll_request_wrong_type_cursor_errors() {
2108        let err = parse_poll_request_json(r#"{"cursor": 123}"#).unwrap_err();
2109        assert_eq!(err, c_int::from(NetError::InvalidJson));
2110    }
2111
2112    #[test]
2113    fn test_parse_poll_request_null_fields_use_defaults() {
2114        let req = parse_poll_request_json(r#"{"limit": null, "cursor": null}"#).unwrap();
2115        assert_eq!(req.limit, 100);
2116        assert_eq!(req.from_id, None);
2117    }
2118
2119    /// `usize::MAX` is always a valid usize regardless of target
2120    /// pointer width, so it must parse successfully on both 32- and
2121    /// 64-bit builds. This pins the boundary case.
2122    #[test]
2123    fn test_parse_poll_request_limit_at_usize_max() {
2124        let json = format!(r#"{{"limit": {}}}"#, usize::MAX);
2125        let req = parse_poll_request_json(&json).unwrap();
2126        assert_eq!(req.limit, usize::MAX);
2127    }
2128
2129    /// Regression: `as usize` silently truncates on 32-bit targets
2130    /// for `u64` values above `usize::MAX`. The parser must return
2131    /// `InvalidJson` instead of wrapping. We only run this on 32-bit
2132    /// targets because on 64-bit `usize::MAX == u64::MAX`, leaving
2133    /// nothing that fits in u64 but not usize.
2134    #[cfg(target_pointer_width = "32")]
2135    #[test]
2136    fn test_parse_poll_request_limit_overflows_usize_on_32bit() {
2137        // 2^33 — fits in u64, but exceeds usize::MAX on a 32-bit build.
2138        let err = parse_poll_request_json(r#"{"limit": 8589934592}"#).unwrap_err();
2139        assert_eq!(err, c_int::from(NetError::InvalidJson));
2140    }
2141
2142    /// CR-22: pin parity between the Rust-side `NetError` enum and
2143    /// the two C-header copies. The Rust enum is the source of
2144    /// truth; C / Go consumers `errors.Is` against the named codes.
2145    /// Pre-CR-22 the headers were missing `-9` (IntOverflow) and
2146    /// `-10` (MismatchedHandles); a consumer receiving those values
2147    /// would fall into the unknown-code branch and lose actionable
2148    /// distinction.
2149    ///
2150    /// We extract every integer literal that appears as the
2151    /// right-hand side of an `= ` token in the file and check
2152    /// that each Rust-side value is present in BOTH headers. The
2153    /// test does NOT verify symbolic names; the sealing
2154    /// constraint is the numeric value alone.
2155    ///
2156    /// Both `include_str!` paths point inside `net/crates/net/`.
2157    /// `include/net.go.h` is a manually-synced mirror of the
2158    /// repo-root `go/net.h`. Reaching outside the crate root
2159    /// (`include_str!("../../../../../go/net.h")`) breaks
2160    /// `cargo publish` and any out-of-repo vendoring of this
2161    /// crate, so the in-crate copy is the supported source. A
2162    /// drift between the two surfaces here as a parity-test
2163    /// failure: one of them will be missing the new value.
2164    #[test]
2165    fn cr22_c_header_parity_with_rust_neterror() {
2166        let primary = include_str!("../../include/net.h");
2167        let go_copy = include_str!("../../include/net.go.h");
2168
2169        // The Rust enum's full set of values (mirrors `pub enum
2170        // NetError` above). When a new variant is added in the
2171        // Rust source, this list — AND both headers — must be
2172        // updated together. The asserts that follow then catch a
2173        // missing header update at the next CI run.
2174        let rust_values: &[i32] = &[0, -1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -99];
2175
2176        // Pull every numeric literal that looks like an enum-value
2177        // assignment (`= <number>` followed by `,` or whitespace).
2178        // Whitespace-tolerant: skips `= 0`, `=  0`, `= -10`, etc.
2179        fn extract_assigned_values(src: &str) -> Vec<i32> {
2180            let mut out = Vec::new();
2181            let mut chars = src.chars().peekable();
2182            while let Some(c) = chars.next() {
2183                if c != '=' {
2184                    continue;
2185                }
2186                // Skip whitespace.
2187                while let Some(&peek) = chars.peek() {
2188                    if peek == ' ' || peek == '\t' {
2189                        chars.next();
2190                    } else {
2191                        break;
2192                    }
2193                }
2194                // Optional sign.
2195                let mut buf = String::new();
2196                if let Some(&peek) = chars.peek() {
2197                    if peek == '-' || peek == '+' {
2198                        buf.push(peek);
2199                        chars.next();
2200                    }
2201                }
2202                // Digits.
2203                let mut had_digit = false;
2204                while let Some(&peek) = chars.peek() {
2205                    if peek.is_ascii_digit() {
2206                        buf.push(peek);
2207                        chars.next();
2208                        had_digit = true;
2209                    } else {
2210                        break;
2211                    }
2212                }
2213                if had_digit {
2214                    if let Ok(v) = buf.parse::<i32>() {
2215                        out.push(v);
2216                    }
2217                }
2218            }
2219            out
2220        }
2221
2222        let primary_vals = extract_assigned_values(primary);
2223        let go_vals = extract_assigned_values(go_copy);
2224
2225        for &v in rust_values {
2226            assert!(
2227                primary_vals.contains(&v),
2228                "CR-22 regression: include/net.h is missing the value {} \
2229                 (Rust NetError defines it). Add the matching `NET_ERR_*` \
2230                 enumerator before merging.",
2231                v
2232            );
2233            assert!(
2234                go_vals.contains(&v),
2235                "CR-22 regression: bindings/go/net/net.h is missing the value {} \
2236                 (Rust NetError defines it).",
2237                v
2238            );
2239        }
2240    }
2241
2242    /// CR-5: pin that `examples/capability.c` does not double-include
2243    /// `net.h` and `net.go.h`. Both files use the `NET_SDK_H` include
2244    /// guard, so when both are included in one TU the second is
2245    /// silently skipped — every `net_validate_capabilities` /
2246    /// `net_predicate_*` call the example makes becomes an
2247    /// implicit-declaration error on GCC 14+/Clang 16+, and a silent
2248    /// `int`-return miscompile on older toolchains. The deeper fix
2249    /// (renaming one guard so they compose cleanly) is tracked as
2250    /// CR-28; this test catches the example-level regression.
2251    #[test]
2252    fn cr5_example_does_not_double_include_net_headers() {
2253        let example = include_str!("../../examples/capability.c");
2254        let net_h_included = example.contains("#include \"../include/net.h\"");
2255        let net_go_h_included = example.contains("#include \"../include/net.go.h\"");
2256        assert!(
2257            net_go_h_included,
2258            "examples/capability.c must include net.go.h to declare \
2259             net_validate_capabilities + net_predicate_* symbols"
2260        );
2261        assert!(
2262            !net_h_included,
2263            "examples/capability.c must NOT also include net.h: \
2264             both headers share the NET_SDK_H guard, so the second \
2265             include is silently skipped, leaving the example's \
2266             net_predicate_* calls implicitly declared. Drop the \
2267             redundant include — net.go.h is a superset."
2268        );
2269    }
2270
2271    /// `handle_is_valid` rejects null and any pointer not aligned for
2272    /// `NetHandle`. A foreign caller producing a misaligned pointer
2273    /// (e.g. via an over-eager `void *` cast on a packed struct) hits
2274    /// `&*handle` UB before any other check fires; this gate is the
2275    /// pre-deref discriminator.
2276    #[test]
2277    fn handle_is_valid_rejects_null_and_misaligned() {
2278        // Null is rejected.
2279        assert!(
2280            !handle_is_valid(std::ptr::null::<NetHandle>()),
2281            "null pointer must not be considered a valid handle"
2282        );
2283
2284        // Aligned but non-null is accepted (we use a small backing
2285        // buffer to materialize a pointer without dereferencing it).
2286        // `align_of::<NetHandle>()` is the alignment we must match.
2287        let align = std::mem::align_of::<NetHandle>();
2288        let buf = vec![0u8; align * 2];
2289        let base = buf.as_ptr() as usize;
2290        let aligned = (base + align - 1) & !(align - 1);
2291        let aligned_ptr = aligned as *const NetHandle;
2292        assert!(
2293            handle_is_valid(aligned_ptr),
2294            "aligned non-null pointer must validate (align={align}, ptr={aligned_ptr:p})"
2295        );
2296
2297        // A pointer one byte past `aligned_ptr` is misaligned for any
2298        // type with align > 1, and `NetHandle` (containing `AtomicU32`,
2299        // `AtomicBool`, ManuallyDrop'd EventBus + Runtime) easily
2300        // exceeds 1.
2301        if align > 1 {
2302            let misaligned_ptr = (aligned + 1) as *const NetHandle;
2303            assert!(
2304                !handle_is_valid(misaligned_ptr),
2305                "misaligned pointer must be rejected (align={align}, ptr={misaligned_ptr:p})"
2306            );
2307        }
2308    }
2309
2310    /// Pin: zero values for `heartbeat_interval_ms` and
2311    /// `session_timeout_ms` must reject the entire config (parser
2312    /// returns `None`). Pre-fix the parser threaded `0` through
2313    /// to `Duration::from_millis(0)`, which on the Net adapter's
2314    /// heartbeat path results in a busy-loop that pegs a CPU and
2315    /// produces no diagnostic — the FFI caller saw a successful
2316    /// `net_init` followed by a hung daemon. The validator-level
2317    /// guard for cooldown / metrics_window has no equivalent on
2318    /// the Net-adapter side, so the parser is the only place that
2319    /// can refuse the build.
2320    #[cfg(feature = "net")]
2321    #[test]
2322    fn parse_config_rejects_zero_heartbeat_and_session_timeout() {
2323        // 32-byte hex strings (64 chars) so `hex::decode` produces
2324        // exactly the [u8; 32] the parser requires for `psk` and
2325        // `peer_public_key`.
2326        let psk = "0".repeat(64);
2327        let peer_pk = "1".repeat(64);
2328
2329        // Sanity: a config with both fields *non-zero* must parse
2330        // successfully — proves the rejection in the negative
2331        // cases below is caused by the zero, not a missing
2332        // required field on the surrounding `net` block.
2333        let baseline = format!(
2334            r#"{{"net":{{"bind_addr":"127.0.0.1:9000","peer_addr":"127.0.0.1:9001",
2335                "psk":"{psk}","peer_public_key":"{peer_pk}",
2336                "heartbeat_interval_ms":1000,"session_timeout_ms":30000}}}}"#
2337        );
2338        assert!(
2339            parse_config_json(&baseline).is_some(),
2340            "baseline net config with non-zero heartbeat/session_timeout must parse"
2341        );
2342
2343        // heartbeat_interval_ms = 0 → reject.
2344        let zero_hb = format!(
2345            r#"{{"net":{{"bind_addr":"127.0.0.1:9000","peer_addr":"127.0.0.1:9001",
2346                "psk":"{psk}","peer_public_key":"{peer_pk}",
2347                "heartbeat_interval_ms":0,"session_timeout_ms":30000}}}}"#
2348        );
2349        assert!(
2350            parse_config_json(&zero_hb).is_none(),
2351            "heartbeat_interval_ms=0 must reject (pre-fix this produced a CPU-pegging busy loop)"
2352        );
2353
2354        // session_timeout_ms = 0 → reject.
2355        let zero_to = format!(
2356            r#"{{"net":{{"bind_addr":"127.0.0.1:9000","peer_addr":"127.0.0.1:9001",
2357                "psk":"{psk}","peer_public_key":"{peer_pk}",
2358                "heartbeat_interval_ms":1000,"session_timeout_ms":0}}}}"#
2359        );
2360        assert!(
2361            parse_config_json(&zero_to).is_none(),
2362            "session_timeout_ms=0 must reject"
2363        );
2364    }
2365}