Skip to main content

net/ffi/
mod.rs

1//! C FFI bindings for cross-language integration.
2//!
3//! This module provides a C-compatible API for using Net from
4//! other languages (Python, Node.js, Go, etc.).
5//!
6//! # Safety
7//!
8//! All public FFI functions in this module accept raw pointers from C code.
9//! Each is declared `pub unsafe extern "C" fn` so the unsafety is
10//! explicit at the type level; the module-wide contract callers
11//! must uphold is:
12//! - Pointers are valid and properly aligned
13//! - String pointers point to valid UTF-8 data
14//! - Buffer sizes are accurate
15//! - Handles are not used after `net_shutdown`
16//!
17//! The per-function `# Safety` rustdoc is intentionally suppressed
18//! at the module level — every entry point shares the same contract
19//! and the module doc-comment above (plus `include/README.md`) is
20//! the source of truth. Adding individual `# Safety` blocks would
21//! duplicate the same wording 200 times without adding signal.
22//!
23//! # Thread Safety
24//!
25//! All FFI functions are thread-safe. The event bus handle can be shared
26//! across threads.
27//!
28#![allow(clippy::missing_safety_doc)]
29
30//! # Tokio runtime restriction
31//!
32//! Internal FFI ops (`net_poll`, `net_flush`, `net_shutdown`,
33//! `net_redex_*`, `net_mesh_new`, the cortex FFI, the mesh FFI)
34//! drive the bus's tokio runtime via `Runtime::block_on`. That
35//! function panics with "Cannot start a runtime from within a
36//! runtime" if the calling thread is already inside a tokio
37//! runtime context. The functions are `extern "C"`, so a panic
38//! unwinds across the FFI boundary into C / Go-cgo / Python /
39//! NAPI — undefined behavior.
40//!
41//! **The common-case C / Go / Python caller has no Rust tokio
42//! runtime, so this is unreachable for them.** The narrow path is:
43//!
44//! - A **Rust** caller loads the cdylib and calls these
45//!   functions from inside its own `#[tokio::main]` (or any
46//!   thread that has called `Runtime::enter()`).
47//! - A non-Rust caller embeds a Rust library that runs its own
48//!   tokio runtime and forwards calls into this cdylib on the
49//!   same thread.
50//!
51//! Both forms are unusual but reachable. **Do not call any FFI
52//! op from a thread that already holds a tokio runtime
53//! context.** If you must, spawn the FFI call on a fresh OS
54//! thread that doesn't carry a runtime guard, or wrap the call
55//! with `tokio::task::spawn_blocking(|| net_xxx(...))` to escape
56//! the worker pool.
57//!
58//! `net_init` (`mod.rs:284-316`) hardens against this for runtime
59//! *construction*; the steady-state ops do not, since the cost
60//! of a `Handle::try_current()` check on every poll would be
61//! measurable for the common path that doesn't hit the bug.
62//!
63//! # `catch_unwind` + caller-held locks
64//!
65//! Several FFI entries (`net_blob_publish`, `net_blob_resolve`,
66//! `net_*_wait_for_token`) wrap their body in
67//! `std::panic::catch_unwind(AssertUnwindSafe(...))` so a panic
68//! during the call returns a typed `NET_ERR_BLOB_PANIC` /
69//! `NET_ERR_PANIC` code rather than unwinding across the FFI
70//! boundary. That stops the substrate-side undefined behavior,
71//! but it does NOT make the wrapped code transparently panic-safe
72//! from the caller's perspective.
73//!
74//! **If the caller invokes an FFI op while holding an OS-level
75//! lock, a `sync.Mutex` (Go), `threading.Lock` (Python), or any
76//! other mutex with poisoning semantics, and the FFI body panics,
77//! the mutex is left in a poisoned state.** Subsequent acquires
78//! on the same mutex by the caller observe the poisoning and
79//! either error (Rust `parking_lot` with `poison_on_unwind`) or
80//! deadlock (Go's `sync.Mutex` doesn't poison; the caller has
81//! observed a return value that may not reflect the state of
82//! the FFI op).
83//!
84//! Recommended caller pattern: **do not hold a caller-side lock
85//! across an FFI call**. Acquire the lock, prepare the inputs,
86//! release the lock, then call the FFI. Re-acquire if you need
87//! to update caller state with the result.
88//!
89//! The hazard is documented per-binding in:
90//!   - Python: `bindings/python/README.md` (caller-mutex notes)
91//!   - Node:   `bindings/node/README.md`
92//!   - Go:     `bindings/go/net/redex.go` lifecycle docs
93//!   - C:      `include/net.h` (every wait-family declaration)
94//!
95//! # Memory Management
96//!
97//! - Handles returned by `net_init` must be freed with `net_shutdown`
98//! - String buffers passed to `net_poll` are owned by the caller
99//! - Error codes are returned as integers (0 = success, negative = error)
100//!
101//! # Example (C)
102//!
103//! ```c
104//! #include "net.h"
105//!
106//! int main() {
107//!     // Initialize with default config
108//!     void* bus = net_init("{\"num_shards\": 4}");
109//!     if (!bus) return 1;
110//!
111//!     // Ingest an event
112//!     int result = net_ingest(bus, "{\"token\": \"hello\"}", 19);
113//!     if (result < 0) { /* handle error */ }
114//!
115//!     // Poll events
116//!     char buffer[65536];
117//!     result = net_poll(bus, "{\"limit\": 100}", buffer, sizeof(buffer));
118//!
119//!     // Shutdown
120//!     net_shutdown(bus);
121//!     return 0;
122//! }
123//! ```
124
125// FFI functions accept raw pointers but are not marked `unsafe` to maintain
126// C ABI compatibility. Safety is documented in the module-level docs.
127#![allow(clippy::not_unsafe_ptr_arg_deref)]
128
129use std::ffi::CStr;
130use std::os::raw::{c_char, c_int};
131use std::ptr;
132
133use tokio::runtime::Runtime;
134
135use crate::bus::EventBus;
136use crate::config::EventBusConfig;
137use crate::consumer::ConsumeRequest;
138use crate::event::{Event, RawEvent};
139
140/// C FFI for CortEX / NetDb / RedexFile. Requires `netdb` (for the
141/// unified facade) and `redex-disk` (for persistent storage paths on
142/// `Redex` / `RedexFile`). Go / cgo consumers target this surface.
143///
144/// `missing_docs` is suppressed on this module: these are extern "C"
145/// shims over already-documented Rust adapters, and the per-function
146/// contract is documented in the binding-side READMEs (Go / TS / Py).
147/// Re-documenting each shim would duplicate with drift risk.
148/// Per-FFI-handle quiescing protocol shared by cortex / mesh
149/// handles to close the audit-#23/#24/#25 use-after-free hazards
150/// when a `_free` races a concurrent op. See module docs for the
151/// soundness story (intentional box leak) and the per-handle
152/// recipe.
153#[cfg(any(
154    all(feature = "netdb", feature = "redex-disk"),
155    feature = "net",
156    feature = "redis",
157))]
158pub mod handle_guard;
159
160#[cfg(all(feature = "netdb", feature = "redex-disk"))]
161#[allow(missing_docs)]
162pub mod cortex;
163
164/// C FFI for the Dataforts Phase 3 blob surface. Exposes the
165/// BlobRef wire codec, the global adapter registry, and the
166/// `publish_blob` / `resolve_payload` helpers for cgo / native
167/// consumers.
168#[cfg(feature = "dataforts")]
169#[allow(missing_docs)]
170pub mod blob;
171
172/// Stub definitions for the `net_mesh_blob_adapter_*` symbols
173/// when the `dataforts / netdb / redex-disk` feature triple is
174/// off. cgo / dlsym consumers link these symbols unconditionally
175/// (see `bindings/go/blob.go`), so a libnet built without the
176/// triple must still satisfy them — each stub returns
177/// `NET_ERR_FEATURE_NOT_BUILT` (or null) so Go programs route to
178/// a clean error rather than fail at program load. The module is
179/// empty when the feature triple is on (the real impls in
180/// `ffi::blob` cover the same symbol names).
181#[allow(missing_docs)]
182pub mod blob_stubs;
183
184/// C FFI for the encrypted-UDP mesh transport + channels. Requires
185/// the `net` feature (which brings in the crypto + transport). Go /
186/// cgo consumers target this surface alongside `ffi::cortex`. See
187/// the `ffi::cortex` note for why `missing_docs` is suppressed here.
188#[cfg(feature = "net")]
189#[allow(missing_docs)]
190pub mod mesh;
191
192/// C FFI for stateless predicate evaluation (Phase 9c of
193/// `CAPABILITY_SYSTEM_SDK_PLAN.md`). Pure helpers — no handles,
194/// no state. Mirrors the SDK-layer `evaluatePredicate` /
195/// `evaluate_predicate` surface every binding ships, exposed at
196/// the C ABI for raw consumers (C / C++ / Zig / Swift / etc.).
197#[cfg(feature = "net")]
198pub mod predicate;
199
200/// C FFI for stateless capability-set validation (Phase 9a of
201/// `CAPABILITY_SYSTEM_SDK_PLAN.md`). Pure helper — `caps_json`
202/// in, `report_json` out. Mirrors the SDK-layer
203/// `validate_capabilities` surface, exposed at the C ABI for raw
204/// consumers.
205#[cfg(feature = "net")]
206pub mod schema;
207
208/// C FFI for predicate debug-session helpers (Phase 9d of
209/// `CAPABILITY_SYSTEM_SDK_PLAN.md`). Pure helpers — single-eval
210/// `evaluate_with_trace`, corpus-wide
211/// `aggregate_debug_report`, and host-side
212/// `redact_metadata_keys`. Mirror what every other binding
213/// ships at the SDK layer; exposed at the C ABI for raw
214/// consumers.
215#[cfg(feature = "net")]
216pub mod predicate_debug;
217
218/// C FFI for the Redis Streams consumer-side dedup helper. Mirrors
219/// the Rust `net::adapter::RedisStreamDedup` surface for Go / C / Zig
220/// consumers. See `ffi::redis_dedup` module docs for the wire
221/// shape and the dedup contract.
222#[cfg(feature = "redis")]
223pub mod redis_dedup;
224
225#[cfg(feature = "net")]
226use crate::adapter::net::{NetAdapterConfig, ReliabilityConfig, StaticKeypair};
227#[cfg(any(feature = "redis", feature = "jetstream", feature = "net"))]
228use crate::config::AdapterConfig;
229#[cfg(feature = "jetstream")]
230use crate::config::JetStreamAdapterConfig;
231#[cfg(feature = "redis")]
232use crate::config::RedisAdapterConfig;
233#[cfg(feature = "net")]
234use std::ffi::CString;
235
236/// Opaque handle to an event bus instance.
237///
238/// This wraps the EventBus along with a Tokio runtime for async operations.
239///
240/// # Lifetime / soundness
241///
242/// The handle storage is *intentionally leaked* on `net_shutdown` rather
243/// than freed via `Box::from_raw`. Reasoning: every FFI entry point
244/// dereferences the C-side `*mut NetHandle` to access the atomics that
245/// gate shutdown. The previous Dekker-style SeqCst handshake between
246/// `FfiOpGuard::try_enter` (which calls `fetch_add` on `active_ops`) and
247/// `net_shutdown` (which loads `active_ops` then `Box::from_raw`s the
248/// handle) was unsound: SeqCst orders the atomic operations only — the
249/// non-atomic `Box::from_raw` could deallocate the storage between
250/// shutdown's load and a concurrent FFI op's `fetch_add`, producing a
251/// use-after-free on the freed atomic. By never freeing the box, the
252/// atomic memory backing the handle is always valid; concurrent FFI ops
253/// observe `shutting_down=true` after shutdown signals it and bail
254/// before touching `bus`/`runtime`.
255///
256/// `bus` and `runtime` are stored in `ManuallyDrop` so that
257/// `net_shutdown` can `take` them out (via `ptr::read`) in order to
258/// call `bus.shutdown().await`. Because `shutting_down` is set first
259/// and shutdown waits for `active_ops` to drop to zero before reading
260/// these fields, no FFI op can be racing the read. If the wait times
261/// out, the `ptr::read` is skipped and both fields are leaked along
262/// with the box.
263pub struct NetHandle {
264    /// Owned `EventBus`. Read out via `ManuallyDrop::take` during
265    /// shutdown once `active_ops` has drained to zero. After that
266    /// point, `shutting_down` is `true` and no FFI op may access this
267    /// field.
268    bus: std::mem::ManuallyDrop<EventBus>,
269    /// Owned tokio runtime. Same lifetime contract as `bus`.
270    runtime: std::mem::ManuallyDrop<Runtime>,
271    /// Set to `true` once `net_shutdown` begins. All other FFI
272    /// functions check this flag and return `ShuttingDown` before
273    /// touching `bus` / `runtime`.
274    shutting_down: std::sync::atomic::AtomicBool,
275    /// Number of in-flight FFI operations (excluding shutdown itself).
276    /// `net_shutdown` spins until this drops to zero (with a deadline)
277    /// before reading `bus` / `runtime` to call shutdown.
278    active_ops: std::sync::atomic::AtomicU32,
279    /// Set to `true` after `net_shutdown` has consumed `bus` /
280    /// `runtime` via `ManuallyDrop::take`. A second `net_shutdown`
281    /// call observes this and returns `Success` without re-taking
282    /// (which would be UB). FFI ops also check this before touching
283    /// `bus` / `runtime`, defending against a contract-violating
284    /// caller that races a post-shutdown call.
285    bus_taken: std::sync::atomic::AtomicBool,
286    /// Set to `true` after `bus.shutdown()` returns from the
287    /// first `net_shutdown` call. A second/third concurrent
288    /// `net_shutdown` caller spins until this flips before
289    /// returning success — without this gate the second caller
290    /// observed `bus_taken == true` and returned `Success` while
291    /// the first caller was still mid-`block_on(bus.shutdown())`,
292    /// falsely signaling completion of an in-progress shutdown.
293    shutdown_completed: std::sync::atomic::AtomicBool,
294}
295
296/// Maximum time `net_shutdown` will wait for in-flight FFI operations
297/// to complete before giving up. If the deadline expires, the bus is
298/// leaked rather than read out — leaking is correct (the box is
299/// already leaked permanently for soundness reasons) but means the
300/// adapter's `flush()` / `shutdown()` won't run.
301const FFI_SHUTDOWN_DEADLINE: std::time::Duration = std::time::Duration::from_secs(5);
302
303/// RAII guard that increments `active_ops` on creation and decrements on drop.
304struct FfiOpGuard<'a> {
305    handle: &'a NetHandle,
306}
307
308impl<'a> FfiOpGuard<'a> {
309    /// Try to enter an FFI operation. Returns `None` if the handle is
310    /// shutting down or if `bus` / `runtime` have already been taken.
311    ///
312    /// Soundness rests on the fact that the box backing `handle` is
313    /// never freed (see `NetHandle` doc). The `fetch_add` is therefore
314    /// always on valid memory regardless of whether shutdown is in
315    /// progress. The subsequent loads decide whether the op is allowed
316    /// to proceed; if shutdown was signaled or `bus_taken` flipped
317    /// before our increment was visible, we bail without touching
318    /// `bus` / `runtime`. The `bus_taken` check defends against a
319    /// contract-violating caller that races a post-shutdown call: even
320    /// if `shutting_down` was reset somehow, an op that would touch the
321    /// already-taken `ManuallyDrop` fields is rejected.
322    fn try_enter(handle: &'a NetHandle) -> Option<Self> {
323        handle
324            .active_ops
325            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
326        if handle
327            .shutting_down
328            .load(std::sync::atomic::Ordering::SeqCst)
329            || handle.bus_taken.load(std::sync::atomic::Ordering::SeqCst)
330        {
331            handle
332                .active_ops
333                .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
334            None
335        } else {
336            Some(Self { handle })
337        }
338    }
339}
340
341impl Drop for FfiOpGuard<'_> {
342    fn drop(&mut self) {
343        self.handle
344            .active_ops
345            .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
346    }
347}
348
349/// Returns `true` when `handle` is non-null and aligned for
350/// `NetHandle`. Every `extern "C"` entry point that derefs the
351/// raw handle must gate on this — a misaligned pointer produced
352/// by an over-eager `void *` cast in a foreign caller would be
353/// immediate UB on `&*handle`, even before the `is_null` check.
354#[inline]
355fn handle_is_valid(handle: *const NetHandle) -> bool {
356    !handle.is_null() && (handle as usize).is_multiple_of(std::mem::align_of::<NetHandle>())
357}
358
359/// Error codes returned by FFI functions.
360#[repr(C)]
361pub enum NetError {
362    /// Success (no error).
363    Success = 0,
364    /// Null pointer passed.
365    NullPointer = -1,
366    /// Invalid UTF-8 string.
367    InvalidUtf8 = -2,
368    /// Invalid JSON.
369    InvalidJson = -3,
370    /// Initialization failed.
371    InitFailed = -4,
372    /// Ingestion failed (backpressure).
373    IngestionFailed = -5,
374    /// Poll failed.
375    PollFailed = -6,
376    /// Buffer too small.
377    BufferTooSmall = -7,
378    /// Shutting down.
379    ShuttingDown = -8,
380    /// Integer overflow: result does not fit in `c_int`.
381    IntOverflow = -9,
382    /// Stream handle does not belong to the supplied node handle.
383    /// Previously the send-family FFIs accepted any (stream, node)
384    /// pair without verifying they were created from the same node,
385    /// allowing silent cross-session traffic.
386    MismatchedHandles = -10,
387    /// `CString::new` failure: the input bytes are valid UTF-8 by
388    /// Rust's `String` invariant but contain an interior NUL byte
389    /// — and the C ABI cannot represent that, since C strings are
390    /// NUL-terminated. Pre-fix this was reported as
391    /// `InvalidUtf8`, which was wrong: the input is UTF-8-valid;
392    /// it just has a NUL where C expects it not to. A binding
393    /// reading the typed error and seeing "invalid UTF-8" would
394    /// chase the wrong cause.
395    InteriorNul = -11,
396    /// Unknown error.
397    Unknown = -99,
398}
399
400impl From<NetError> for c_int {
401    fn from(e: NetError) -> Self {
402        e as c_int
403    }
404}
405
406/// Enter an FFI operation with lifetime protection. Returns an `FfiOpGuard`
407/// that prevents `net_shutdown` from deallocating the handle until the guard
408/// is dropped. Returns `Err` with the error code if shutdown is in progress.
409#[inline]
410fn enter_ffi_op(handle: &NetHandle) -> Result<FfiOpGuard<'_>, c_int> {
411    FfiOpGuard::try_enter(handle).ok_or(NetError::ShuttingDown.into())
412}
413
414/// Initialize a new event bus.
415///
416/// # Parameters
417///
418/// - `config_json`: JSON configuration string (UTF-8, null-terminated).
419///   Pass NULL or empty string for default configuration.
420///
421/// # Returns
422///
423/// Opaque handle to the event bus, or NULL on failure.
424/// The handle must be freed with `net_shutdown`.
425///
426/// # Example Configuration
427///
428/// ```json
429/// {
430///   "num_shards": 8,
431///   "ring_buffer_capacity": 1048576,
432///   "backpressure_mode": "DropOldest",
433///   "batch": {
434///     "min_size": 1000,
435///     "max_size": 10000,
436///     "max_delay_ms": 10
437///   }
438/// }
439/// ```
440#[unsafe(no_mangle)]
441pub unsafe extern "C" fn net_init(config_json: *const c_char) -> *mut NetHandle {
442    // Parse and validate the config BEFORE constructing the tokio
443    // runtime. Building the runtime first would let any subsequent
444    // early-return path (`CStr::to_str` Err, `parse_config_json`
445    // returning None, `EventBus::new` returning Err) drop the
446    // local `Runtime` on function return. Dropping a multi-thread
447    // tokio runtime from inside ANOTHER tokio runtime's worker
448    // thread panics with "Cannot drop a runtime in a context where
449    // blocking is not allowed", unwinding across this `extern "C"`
450    // boundary into a Python / Go-cgo / NAPI / PyO3 caller —
451    // undefined behaviour. By validating inputs first, the runtime
452    // is only built once we know it will be installed into the
453    // `NetHandle` and survive the call.
454    let config = if config_json.is_null() {
455        EventBusConfig::default()
456    } else {
457        let config_str = match unsafe { CStr::from_ptr(config_json) }.to_str() {
458            Ok("") => EventBusConfig::default(),
459            Ok(s) => match parse_config_json(s) {
460                Some(cfg) => cfg,
461                None => return ptr::null_mut(),
462            },
463            Err(_) => return ptr::null_mut(),
464        };
465        config_str
466    };
467
468    // Now construct the runtime — its lifetime is tied to the
469    // returned `NetHandle` (via `create_with_config`), so the only
470    // remaining drop is on `net_shutdown`, which already handles
471    // it via `runtime.block_on(...)` (see #74) outside any other
472    // tokio context.
473    let runtime = match Runtime::new() {
474        Ok(rt) => rt,
475        Err(_) => return ptr::null_mut(),
476    };
477
478    create_with_config(runtime, config)
479}
480
481/// Parse JSON configuration into EventBusConfig.
482///
483/// Supports:
484/// - `num_shards`: number of shards
485/// - `ring_buffer_capacity`: ring buffer size per shard
486/// - `backpressure_mode`: "DropNewest", "DropOldest", "FailProducer"
487fn parse_config_json(json_str: &str) -> Option<EventBusConfig> {
488    let value: serde_json::Value = serde_json::from_str(json_str).ok()?;
489
490    let mut builder = EventBusConfig::builder();
491
492    if let Some(num_shards) = value.get("num_shards").and_then(|v| v.as_u64()) {
493        let num_shards = u16::try_from(num_shards).ok()?;
494        builder = builder.num_shards(num_shards);
495    }
496
497    if let Some(capacity) = value.get("ring_buffer_capacity").and_then(|v| v.as_u64()) {
498        let capacity = usize::try_from(capacity).ok()?;
499        builder = builder.ring_buffer_capacity(capacity);
500    }
501
502    if let Some(bp_value) = value.get("backpressure_mode") {
503        let bp_mode = if let Some(mode) = bp_value.as_str() {
504            match mode {
505                "DropNewest" | "drop_newest" => crate::config::BackpressureMode::DropNewest,
506                "DropOldest" | "drop_oldest" => crate::config::BackpressureMode::DropOldest,
507                "FailProducer" | "fail_producer" => crate::config::BackpressureMode::FailProducer,
508                // Pre-fix every other string silently fell back to
509                // `DropNewest`. A typo (`"DropOldset"`) thus
510                // changed durability profile at deploy time with
511                // no error. Reject unknowns to match the contract
512                // already enforced by `parse_poll_request_json`.
513                _ => return None,
514            }
515        } else if let Some(obj) = bp_value.as_object() {
516            // Object form: `{"Sample": {"rate": N}}` for the
517            // sampling mode that has an associated value.
518            if let Some(sample) = obj.get("Sample").or_else(|| obj.get("sample")) {
519                let rate = sample.get("rate").and_then(|v| v.as_u64())?;
520                let rate = u32::try_from(rate).ok()?;
521                if rate == 0 {
522                    // Validated again by `EventBusConfig::validate`,
523                    // but reject earlier so the parser surface
524                    // matches the validator surface.
525                    return None;
526                }
527                crate::config::BackpressureMode::Sample { rate }
528            } else {
529                return None;
530            }
531        } else {
532            return None;
533        };
534        builder = builder.backpressure_mode(bp_mode);
535    }
536
537    // Parse Redis config
538    #[cfg(feature = "redis")]
539    if let Some(redis) = value.get("redis") {
540        if let Some(url) = redis.get("url").and_then(|v| v.as_str()) {
541            let mut redis_config = RedisAdapterConfig::new(url);
542
543            if let Some(prefix) = redis.get("prefix").and_then(|v| v.as_str()) {
544                redis_config = redis_config.with_prefix(prefix);
545            }
546            if let Some(max_len) = redis.get("max_stream_len").and_then(|v| v.as_u64()) {
547                let max_len = usize::try_from(max_len).ok()?;
548                redis_config = redis_config.with_max_stream_len(max_len);
549            }
550            if let Some(pipeline_size) = redis.get("pipeline_size").and_then(|v| v.as_u64()) {
551                let pipeline_size = usize::try_from(pipeline_size).ok()?;
552                redis_config = redis_config.with_pipeline_size(pipeline_size);
553            }
554
555            builder = builder.adapter(AdapterConfig::Redis(redis_config));
556        }
557    }
558
559    // Parse JetStream config
560    #[cfg(feature = "jetstream")]
561    if let Some(jetstream) = value.get("jetstream") {
562        if let Some(url) = jetstream.get("url").and_then(|v| v.as_str()) {
563            let mut js_config = JetStreamAdapterConfig::new(url);
564
565            if let Some(prefix) = jetstream.get("prefix").and_then(|v| v.as_str()) {
566                js_config = js_config.with_prefix(prefix);
567            }
568            if let Some(max_messages) = jetstream.get("max_messages").and_then(|v| v.as_i64()) {
569                js_config = js_config.with_max_messages(max_messages);
570            }
571            if let Some(replicas) = jetstream.get("replicas").and_then(|v| v.as_u64()) {
572                let replicas = usize::try_from(replicas).ok()?;
573                js_config = js_config.with_replicas(replicas);
574            }
575
576            builder = builder.adapter(AdapterConfig::JetStream(js_config));
577        }
578    }
579
580    // Parse Net config
581    #[cfg(feature = "net")]
582    if let Some(net) = value.get("net") {
583        let bind_addr: std::net::SocketAddr = net
584            .get("bind_addr")
585            .and_then(|v| v.as_str())
586            .and_then(|s| s.parse().ok())?;
587
588        let peer_addr: std::net::SocketAddr = net
589            .get("peer_addr")
590            .and_then(|v| v.as_str())
591            .and_then(|s| s.parse().ok())?;
592
593        let psk: [u8; 32] = net
594            .get("psk")
595            .and_then(|v| v.as_str())
596            .and_then(|s| hex::decode(s).ok())
597            .and_then(|v| v.try_into().ok())?;
598
599        let role = net
600            .get("role")
601            .and_then(|v| v.as_str())
602            .unwrap_or("initiator");
603
604        let mut net_config = match role {
605            "initiator" => {
606                let peer_pubkey: [u8; 32] = net
607                    .get("peer_public_key")
608                    .and_then(|v| v.as_str())
609                    .and_then(|s| hex::decode(s).ok())
610                    .and_then(|v| v.try_into().ok())?;
611                NetAdapterConfig::initiator(bind_addr, peer_addr, psk, peer_pubkey)
612            }
613            "responder" => {
614                let secret_key: [u8; 32] = net
615                    .get("secret_key")
616                    .and_then(|v| v.as_str())
617                    .and_then(|s| hex::decode(s).ok())
618                    .and_then(|v| v.try_into().ok())?;
619                let public_key: [u8; 32] = net
620                    .get("public_key")
621                    .and_then(|v| v.as_str())
622                    .and_then(|s| hex::decode(s).ok())
623                    .and_then(|v| v.try_into().ok())?;
624                let keypair = StaticKeypair::from_keys(secret_key, public_key);
625                NetAdapterConfig::responder(bind_addr, peer_addr, psk, keypair)
626            }
627            _ => return None,
628        };
629
630        // Apply optional settings
631        if let Some(reliability) = net.get("reliability").and_then(|v| v.as_str()) {
632            net_config = net_config.with_reliability(match reliability {
633                "light" => ReliabilityConfig::Light,
634                "full" => ReliabilityConfig::Full,
635                _ => ReliabilityConfig::None,
636            });
637        }
638
639        if let Some(pool_size) = net.get("packet_pool_size").and_then(|v| v.as_u64()) {
640            if let Ok(size) = usize::try_from(pool_size) {
641                net_config = net_config.with_pool_size(size);
642            }
643        }
644
645        // Reject `0` for `heartbeat_interval_ms` and
646        // `session_timeout_ms`. `EventBusConfig::validate` rejects
647        // zero `Duration`s for `cooldown`, `metrics_window`, etc.,
648        // but the Net adapter's JSON parser had no equivalent guard
649        // — a `0` here flowed through to `Duration::from_millis(0)`,
650        // which on the heartbeat path busy-loops the heartbeat task
651        // and saturates a CPU. Treat zero as a misconfig and refuse
652        // to build the bus, surfacing as `InvalidJson` so the FFI
653        // caller sees a typed failure rather than a hung daemon.
654        if let Some(interval_ms) = net.get("heartbeat_interval_ms").and_then(|v| v.as_u64()) {
655            if interval_ms == 0 {
656                return None;
657            }
658            net_config =
659                net_config.with_heartbeat_interval(std::time::Duration::from_millis(interval_ms));
660        }
661
662        if let Some(timeout_ms) = net.get("session_timeout_ms").and_then(|v| v.as_u64()) {
663            if timeout_ms == 0 {
664                return None;
665            }
666            net_config =
667                net_config.with_session_timeout(std::time::Duration::from_millis(timeout_ms));
668        }
669
670        if let Some(batched) = net.get("batched_io").and_then(|v| v.as_bool()) {
671            net_config = net_config.with_batched_io(batched);
672        }
673
674        builder = builder.adapter(AdapterConfig::Net(Box::new(net_config)));
675    }
676
677    builder.build().ok()
678}
679
680fn create_with_config(runtime: Runtime, config: EventBusConfig) -> *mut NetHandle {
681    let bus = match runtime.block_on(EventBus::new(config)) {
682        Ok(bus) => bus,
683        Err(_) => {
684            // Send the runtime off to a fresh OS thread for
685            // dropping. Dropping a multi-thread tokio `Runtime`
686            // from inside another tokio runtime's worker thread
687            // panics ("Cannot drop a runtime in a context where
688            // blocking is not allowed"); a panic here would unwind
689            // across this `extern "C"` frame. The fresh thread
690            // guarantees a non-tokio context, so the drop is sound
691            // regardless of the caller's runtime environment. We
692            // don't `join()` the thread — the drop completes on
693            // its own and the caller has already been told
694            // `net_init` failed (returning null).
695            std::thread::spawn(move || drop(runtime));
696            return ptr::null_mut();
697        }
698    };
699
700    let handle = Box::new(NetHandle {
701        bus: std::mem::ManuallyDrop::new(bus),
702        runtime: std::mem::ManuallyDrop::new(runtime),
703        shutting_down: std::sync::atomic::AtomicBool::new(false),
704        active_ops: std::sync::atomic::AtomicU32::new(0),
705        bus_taken: std::sync::atomic::AtomicBool::new(false),
706        shutdown_completed: std::sync::atomic::AtomicBool::new(false),
707    });
708
709    Box::into_raw(handle)
710}
711
712/// Ingest a single event.
713///
714/// # Parameters
715///
716/// - `handle`: Event bus handle from `net_init`.
717/// - `event_json`: JSON event string (UTF-8).
718/// - `len`: Length of the event string in bytes.
719///
720/// # Returns
721///
722/// - `0` on success
723/// - Negative error code on failure
724#[unsafe(no_mangle)]
725pub unsafe extern "C" fn net_ingest(
726    handle: *mut NetHandle,
727    event_json: *const c_char,
728    len: usize,
729) -> c_int {
730    if !handle_is_valid(handle) || event_json.is_null() {
731        return NetError::NullPointer.into();
732    }
733
734    let handle = unsafe { &*handle };
735    let _guard = match enter_ffi_op(handle) {
736        Ok(g) => g,
737        Err(err) => return err,
738    };
739
740    // `slice::from_raw_parts` requires `len <= isize::MAX`. A
741    // C caller passing a sign-extended `-1` (or any
742    // `len > isize::MAX as usize`) triggers immediate UB before
743    // any other validation runs. Reject such inputs explicitly
744    // — caller should never see this in practice; surfacing a
745    // typed error is safer than UB.
746    if len > isize::MAX as usize {
747        return NetError::InvalidJson.into();
748    }
749    // Parse event JSON
750    let json_bytes = unsafe { std::slice::from_raw_parts(event_json as *const u8, len) };
751    let json_str = match std::str::from_utf8(json_bytes) {
752        Ok(s) => s,
753        Err(_) => return NetError::InvalidUtf8.into(),
754    };
755
756    let event = match Event::from_str(json_str) {
757        Ok(e) => e,
758        Err(_) => return NetError::InvalidJson.into(),
759    };
760
761    // Ingest
762    match handle.bus.ingest(event) {
763        Ok(_) => NetError::Success.into(),
764        Err(_) => NetError::IngestionFailed.into(),
765    }
766}
767
768/// Ingest a raw JSON string (fastest path).
769///
770/// The JSON string is stored directly without parsing.
771/// This is the recommended method for high-throughput ingestion.
772///
773/// # Parameters
774///
775/// - `handle`: Event bus handle from `net_init`.
776/// - `json`: JSON string (UTF-8).
777/// - `len`: Length of the JSON string in bytes.
778///
779/// # Returns
780///
781/// - `0` on success
782/// - Negative error code on failure
783#[unsafe(no_mangle)]
784pub unsafe extern "C" fn net_ingest_raw(
785    handle: *mut NetHandle,
786    json: *const c_char,
787    len: usize,
788) -> c_int {
789    if !handle_is_valid(handle) || json.is_null() {
790        return NetError::NullPointer.into();
791    }
792
793    let handle = unsafe { &*handle };
794    let _guard = match enter_ffi_op(handle) {
795        Ok(g) => g,
796        Err(err) => return err,
797    };
798
799    // `slice::from_raw_parts` requires `len <= isize::MAX`.
800    if len > isize::MAX as usize {
801        return NetError::InvalidJson.into();
802    }
803    let json_bytes = unsafe { std::slice::from_raw_parts(json as *const u8, len) };
804    let json_str = match std::str::from_utf8(json_bytes) {
805        Ok(s) => s,
806        Err(_) => return NetError::InvalidUtf8.into(),
807    };
808
809    let raw = RawEvent::from_str(json_str);
810
811    match handle.bus.ingest_raw(raw) {
812        Ok(_) => NetError::Success.into(),
813        Err(_) => NetError::IngestionFailed.into(),
814    }
815}
816
817/// Ingest multiple raw JSON strings (fastest batch path).
818///
819/// # Parameters
820///
821/// - `handle`: Event bus handle.
822/// - `jsons`: Array of pointers to JSON strings.
823/// - `lens`: Array of lengths for each JSON string.
824/// - `count`: Number of events in the arrays.
825///
826/// # Returns
827///
828/// Number of successfully ingested events, or negative error code.
829#[unsafe(no_mangle)]
830pub unsafe extern "C" fn net_ingest_raw_batch(
831    handle: *mut NetHandle,
832    jsons: *const *const c_char,
833    lens: *const usize,
834    count: usize,
835) -> c_int {
836    if !handle_is_valid(handle) || jsons.is_null() || lens.is_null() {
837        return NetError::NullPointer.into();
838    }
839    if count == 0 {
840        return 0;
841    }
842
843    let handle = unsafe { &*handle };
844    let _guard = match enter_ffi_op(handle) {
845        Ok(g) => g,
846        Err(err) => return err,
847    };
848    let mut events = Vec::with_capacity(count);
849    // Track per-entry drops so the caller's accounting can
850    // reconcile the returned count against the input count.
851    // Pre-fix per-entry rejects (null pointer, oversized length,
852    // invalid UTF-8) were silently `continue`-d and the caller
853    // saw `count - drops` accepted events without any signal as
854    // to which input indices were dropped. A binding that
855    // attributed the drop to back-pressure and retried got the
856    // wrong indices and double-published the good ones.
857    //
858    // The C-API contract is "returns count of accepted events";
859    // expanding it to take an out-param of dropped indices is
860    // an API addition, not a fix-in-place. Emit `tracing::warn!`
861    // with the offending index AND reason so operators
862    // observing the bus can correlate drop counts to specific
863    // inputs without changing the C surface. For high-volume
864    // bindings this should still be sized at one log line per
865    // dropped entry; if that ever matters in practice the
866    // `*_ex` follow-up can return the indices structurally.
867    let mut dropped_null = 0usize;
868    let mut dropped_oversize = 0usize;
869    let mut dropped_invalid_utf8 = 0usize;
870
871    for i in 0..count {
872        let json_ptr = unsafe { *jsons.add(i) };
873        let len = unsafe { *lens.add(i) };
874
875        if json_ptr.is_null() {
876            tracing::warn!(
877                index = i,
878                "net_ingest_raw_batch: dropping entry with null pointer"
879            );
880            dropped_null += 1;
881            continue;
882        }
883
884        // `slice::from_raw_parts` requires `len <= isize::MAX`.
885        // Skip pathological per-entry lengths rather than UB.
886        if len > isize::MAX as usize {
887            tracing::warn!(
888                index = i,
889                len,
890                "net_ingest_raw_batch: dropping entry with len > isize::MAX"
891            );
892            dropped_oversize += 1;
893            continue;
894        }
895        let json_bytes = unsafe { std::slice::from_raw_parts(json_ptr as *const u8, len) };
896        match std::str::from_utf8(json_bytes) {
897            Ok(json_str) => events.push(RawEvent::from_str(json_str)),
898            Err(_) => {
899                tracing::warn!(
900                    index = i,
901                    "net_ingest_raw_batch: dropping entry with invalid UTF-8"
902                );
903                dropped_invalid_utf8 += 1;
904            }
905        }
906    }
907    let total_dropped = dropped_null + dropped_oversize + dropped_invalid_utf8;
908    if total_dropped > 0 {
909        // Aggregate summary for log-pipeline filters that fold
910        // per-index lines.
911        tracing::warn!(
912            input_count = count,
913            dropped_null,
914            dropped_oversize,
915            dropped_invalid_utf8,
916            "net_ingest_raw_batch: {} of {} entries dropped before ingest",
917            total_dropped,
918            count,
919        );
920    }
921
922    let count = handle.bus.ingest_raw_batch(events);
923    // Returning `c_int::MAX` on overflow would be ambiguous with a real
924    // `INT_MAX` ingest. Signal overflow explicitly so callers doing
925    // accounting in high-throughput paths do not silently miscount.
926    c_int::try_from(count).unwrap_or_else(|_| NetError::IntOverflow.into())
927}
928
929/// Ingest multiple events.
930///
931/// # Parameters
932///
933/// - `handle`: Event bus handle.
934/// - `events_json`: JSON array of events (UTF-8, null-terminated).
935///
936/// # Returns
937///
938/// Number of successfully ingested events, or negative error code.
939#[unsafe(no_mangle)]
940pub unsafe extern "C" fn net_ingest_batch(
941    handle: *mut NetHandle,
942    events_json: *const c_char,
943) -> c_int {
944    if !handle_is_valid(handle) || events_json.is_null() {
945        return NetError::NullPointer.into();
946    }
947
948    let handle = unsafe { &*handle };
949    let _guard = match enter_ffi_op(handle) {
950        Ok(g) => g,
951        Err(err) => return err,
952    };
953
954    let json_str = match unsafe { CStr::from_ptr(events_json) }.to_str() {
955        Ok(s) => s,
956        Err(_) => return NetError::InvalidUtf8.into(),
957    };
958
959    // Parse as JSON array
960    let array: Vec<serde_json::Value> = match serde_json::from_str(json_str) {
961        Ok(a) => a,
962        Err(_) => return NetError::InvalidJson.into(),
963    };
964
965    let events: Vec<Event> = array.into_iter().map(Event::new).collect();
966    let count = handle.bus.ingest_batch(events);
967
968    // Returning `c_int::MAX` on overflow would be ambiguous with a real
969    // `INT_MAX` ingest. Signal overflow explicitly — matches the
970    // `net_ingest_raw_batch` contract.
971    c_int::try_from(count).unwrap_or_else(|_| NetError::IntOverflow.into())
972}
973
974/// Parse the JSON request body passed to `net_poll` into a
975/// `ConsumeRequest`. Returns the negative `NetError` code on parse
976/// failure so the caller can surface it back across FFI. Both `limit`
977/// and `cursor` are optional, but if either key is present with the
978/// wrong JSON type it is an explicit error — silently falling back to
979/// the default would hide caller bugs (e.g. the Go binding that
980/// previously serialized `cursor` but had it dropped server-side).
981fn parse_poll_request_json(json_str: &str) -> Result<ConsumeRequest, c_int> {
982    let value: serde_json::Value =
983        serde_json::from_str(json_str).map_err(|_| c_int::from(NetError::InvalidJson))?;
984
985    let limit = match value.get("limit") {
986        None | Some(serde_json::Value::Null) => 100usize,
987        Some(v) => match v.as_u64() {
988            // `as usize` would silently truncate on 32-bit targets for
989            // values above `usize::MAX`. Reject such inputs explicitly
990            // so a caller asking for e.g. 2^33 events on a wasm32
991            // build gets `InvalidJson` instead of a tiny wrap-around.
992            Some(n) => usize::try_from(n).map_err(|_| c_int::from(NetError::InvalidJson))?,
993            None => return Err(NetError::InvalidJson.into()),
994        },
995    };
996    let cursor = match value.get("cursor") {
997        None | Some(serde_json::Value::Null) => None,
998        Some(v) => match v.as_str() {
999            Some(s) => Some(s.to_owned()),
1000            None => return Err(NetError::InvalidJson.into()),
1001        },
1002    };
1003    let mut req = ConsumeRequest::new(limit);
1004    req.from_id = cursor;
1005    Ok(req)
1006}
1007
1008/// Poll events from the bus.
1009///
1010/// # Parameters
1011///
1012/// - `handle`: Event bus handle.
1013/// - `request_json`: JSON request string (UTF-8, null-terminated).
1014///   Example: `{"limit": 100, "ordering": "InsertionTs"}`
1015/// - `out_buffer`: Output buffer for JSON response.
1016/// - `buffer_len`: Size of the output buffer.
1017///
1018/// # Returns
1019///
1020/// - Number of bytes written to buffer on success
1021/// - Negative error code on failure
1022#[unsafe(no_mangle)]
1023pub unsafe extern "C" fn net_poll(
1024    handle: *mut NetHandle,
1025    request_json: *const c_char,
1026    out_buffer: *mut c_char,
1027    buffer_len: usize,
1028) -> c_int {
1029    if !handle_is_valid(handle) || out_buffer.is_null() {
1030        return NetError::NullPointer.into();
1031    }
1032
1033    let handle = unsafe { &*handle };
1034    let _guard = match enter_ffi_op(handle) {
1035        Ok(g) => g,
1036        Err(err) => return err,
1037    };
1038
1039    // Parse request
1040    let request = if request_json.is_null() {
1041        ConsumeRequest::new(100)
1042    } else {
1043        let json_str = match unsafe { CStr::from_ptr(request_json) }.to_str() {
1044            Ok(s) => s,
1045            Err(_) => return NetError::InvalidUtf8.into(),
1046        };
1047        match parse_poll_request_json(json_str) {
1048            Ok(req) => req,
1049            Err(code) => return code,
1050        }
1051    };
1052
1053    // Reject buffers too small to even hold an empty-response
1054    // JSON envelope. This catches the degenerate "tiny buffer"
1055    // case before we hit the adapter — `BufferTooSmall` returned
1056    // here means "no work was done, caller's cursor is unchanged."
1057    // 256 bytes comfortably fits the empty-response JSON below
1058    // even with a long echoed `next_id` cursor.
1059    const MIN_RESPONSE_BUFFER: usize = 256;
1060    if buffer_len < MIN_RESPONSE_BUFFER {
1061        return NetError::BufferTooSmall.into();
1062    }
1063
1064    // Stash the cursor before moving `request` into `poll()` so
1065    // the post-poll fallback can echo it back to the caller. On
1066    // overflow we write a minimal "no events delivered, cursor
1067    // unchanged" response so the caller's next poll re-fetches
1068    // the same range — events are not lost on idempotent
1069    // adapters (Redis XRANGE, JetStream direct_get).
1070    let cursor_snapshot = request.from_id.clone();
1071
1072    // Poll
1073    let response = match handle.runtime.block_on(handle.bus.poll(request)) {
1074        Ok(r) => r,
1075        Err(_) => return NetError::PollFailed.into(),
1076    };
1077
1078    // Serialize response. Events that fail to parse are included as raw
1079    // strings so the caller can see all events and detect parse failures.
1080    let total_events = response.events.len();
1081    let mut parsed_events: Vec<serde_json::Value> = Vec::with_capacity(total_events);
1082    let mut parse_errors: usize = 0;
1083    for e in &response.events {
1084        match e.parse() {
1085            Ok(v) => parsed_events.push(v),
1086            Err(_) => {
1087                parse_errors += 1;
1088                // Include the raw bytes as a string so the caller doesn't silently lose events
1089                if let Ok(raw) = e.raw_str() {
1090                    parsed_events.push(serde_json::Value::String(raw.to_string()));
1091                }
1092            }
1093        }
1094    }
1095    let response_json = match serde_json::to_string(&serde_json::json!({
1096        "events": parsed_events,
1097        "next_id": response.next_id,
1098        "has_more": response.has_more,
1099        "count": parsed_events.len(),
1100        "parse_errors": parse_errors,
1101    })) {
1102        Ok(s) => s,
1103        Err(_) => return NetError::Unknown.into(),
1104    };
1105
1106    // Buffer overflow: emit a minimal fallback response that echoes
1107    // the caller's original cursor as `next_id`. The caller's next
1108    // poll runs against the same range and re-delivers the events
1109    // (idempotent on Redis XRANGE / JetStream direct_get). Without
1110    // this, a caller that trusts `next_id` blindly would advance
1111    // past the unread batch.
1112    if response_json.len() + 1 > buffer_len {
1113        let fallback = serde_json::to_string(&serde_json::json!({
1114            "events": [],
1115            "next_id": cursor_snapshot,
1116            "has_more": true,
1117            "count": 0,
1118            "parse_errors": 0,
1119            "buffer_too_small": true,
1120            "events_dropped": total_events,
1121        }))
1122        .unwrap_or_else(|_| String::from(
1123            r#"{"events":[],"next_id":null,"has_more":true,"count":0,"parse_errors":0,"buffer_too_small":true}"#
1124        ));
1125        if fallback.len() < buffer_len {
1126            unsafe {
1127                ptr::copy_nonoverlapping(
1128                    fallback.as_ptr() as *const c_char,
1129                    out_buffer,
1130                    fallback.len(),
1131                );
1132                *out_buffer.add(fallback.len()) = 0;
1133            }
1134        }
1135        return NetError::BufferTooSmall.into();
1136    }
1137
1138    // Copy to output buffer
1139    unsafe {
1140        ptr::copy_nonoverlapping(
1141            response_json.as_ptr() as *const c_char,
1142            out_buffer,
1143            response_json.len(),
1144        );
1145        *out_buffer.add(response_json.len()) = 0; // Null terminate
1146    }
1147
1148    // Data was already copied into the caller's buffer; a
1149    // `c_int` overflow here means the byte count exceeds c_int's
1150    // range, NOT that the buffer was too small. Returning
1151    // `BufferTooSmall` would tell the caller to "resize and retry"
1152    // when retrying can't fix the actual condition. `IntOverflow`
1153    // is the documented variant for this case.
1154    match c_int::try_from(response_json.len()) {
1155        Ok(n) => n,
1156        Err(_) => NetError::IntOverflow.into(),
1157    }
1158}
1159
1160/// Get event bus statistics.
1161///
1162/// # Parameters
1163///
1164/// - `handle`: Event bus handle.
1165/// - `out_buffer`: Output buffer for JSON statistics.
1166/// - `buffer_len`: Size of the output buffer.
1167///
1168/// # Returns
1169///
1170/// Number of bytes written, or negative error code.
1171#[unsafe(no_mangle)]
1172pub unsafe extern "C" fn net_stats(
1173    handle: *mut NetHandle,
1174    out_buffer: *mut c_char,
1175    buffer_len: usize,
1176) -> c_int {
1177    if !handle_is_valid(handle) || out_buffer.is_null() {
1178        return NetError::NullPointer.into();
1179    }
1180
1181    let handle = unsafe { &*handle };
1182    let _guard = match enter_ffi_op(handle) {
1183        Ok(g) => g,
1184        Err(err) => return err,
1185    };
1186    let stats = handle.bus.stats();
1187    let shard_stats = handle.bus.shard_stats();
1188
1189    let stats_json = match serde_json::to_string(&serde_json::json!({
1190        "events_ingested": stats.events_ingested.load(std::sync::atomic::Ordering::Relaxed),
1191        "events_dropped": stats.events_dropped.load(std::sync::atomic::Ordering::Relaxed),
1192        "batches_dispatched": stats.batches_dispatched.load(std::sync::atomic::Ordering::Relaxed),
1193        "shard_events_ingested": shard_stats.events_ingested,
1194        "shard_events_dropped": shard_stats.events_dropped,
1195        "shard_batches_dispatched": shard_stats.batches_dispatched,
1196    })) {
1197        Ok(s) => s,
1198        Err(_) => return NetError::Unknown.into(),
1199    };
1200
1201    if stats_json.len() + 1 > buffer_len {
1202        return NetError::BufferTooSmall.into();
1203    }
1204
1205    unsafe {
1206        ptr::copy_nonoverlapping(
1207            stats_json.as_ptr() as *const c_char,
1208            out_buffer,
1209            stats_json.len(),
1210        );
1211        *out_buffer.add(stats_json.len()) = 0;
1212    }
1213
1214    // See net_poll above — the data was already copied, so an
1215    // overflowing length is `IntOverflow`, not `BufferTooSmall`.
1216    match c_int::try_from(stats_json.len()) {
1217        Ok(n) => n,
1218        Err(_) => NetError::IntOverflow.into(),
1219    }
1220}
1221
1222/// Flush all pending batches to the adapter.
1223///
1224/// # Parameters
1225///
1226/// - `handle`: Event bus handle.
1227///
1228/// # Returns
1229///
1230/// - `0` on success
1231/// - Negative error code on failure
1232#[unsafe(no_mangle)]
1233pub unsafe extern "C" fn net_flush(handle: *mut NetHandle) -> c_int {
1234    if !handle_is_valid(handle) {
1235        return NetError::NullPointer.into();
1236    }
1237
1238    let handle = unsafe { &*handle };
1239    let _guard = match enter_ffi_op(handle) {
1240        Ok(g) => g,
1241        Err(err) => return err,
1242    };
1243
1244    match handle.runtime.block_on(handle.bus.flush()) {
1245        Ok(_) => NetError::Success.into(),
1246        Err(_) => NetError::Unknown.into(),
1247    }
1248}
1249
1250/// Shut down the event bus and free resources.
1251///
1252/// # Parameters
1253///
1254/// - `handle`: Event bus handle. After this call, the handle is invalid.
1255///
1256/// # Returns
1257///
1258/// - `0` on success
1259/// - Negative error code on failure (including `Unknown` if the
1260///   bounded wait for in-flight FFI operations expired before the bus
1261///   could be shut down cleanly)
1262///
1263/// # Notes
1264///
1265/// The handle's storage is intentionally leaked: the box is never
1266/// returned to the allocator. See `NetHandle`'s docs for why. This is
1267/// a one-time cost per shutdown — typically per-process, since most C
1268/// callers initialize the bus once and shut down once.
1269#[unsafe(no_mangle)]
1270pub unsafe extern "C" fn net_shutdown(handle: *mut NetHandle) -> c_int {
1271    if !handle_is_valid(handle) {
1272        return NetError::NullPointer.into();
1273    }
1274
1275    // Scope the `&NetHandle` borrow into an inner block so it is
1276    // verifiably out of scope before the
1277    // `ManuallyDrop::take(&mut (*handle).bus)` calls below.
1278    // Holding an `&NetHandle` in scope for the whole function
1279    // while taking a raw `&mut (*handle).bus` later would rely on
1280    // NLL ending the immutable borrow before the mutable take —
1281    // a pattern fragile under stacked/tree borrow models. The
1282    // block-scoped borrow makes the lifetime constraint explicit
1283    // and obvious to both the compiler and any future maintainer.
1284    let drained_and_taken = {
1285        // SAFETY: The C contract guarantees `handle` is valid here and that
1286        // `net_shutdown` is not called concurrently with itself. Future
1287        // dereferences of the box from concurrent FFI ops on other threads
1288        // are also sound because we never free the box (see below).
1289        let handle_ref = unsafe { &*handle };
1290
1291        // Signal shutdown so concurrent FFI calls bail before touching
1292        // `bus`/`runtime`. SeqCst pairs with `FfiOpGuard::try_enter`.
1293        handle_ref
1294            .shutting_down
1295            .store(true, std::sync::atomic::Ordering::SeqCst);
1296
1297        // Bounded wait for in-flight ops to drain. Without a deadline, a
1298        // hung concurrent operation (e.g. `net_flush` against a stalled
1299        // adapter) would pin a CPU at 100% inside this loop forever.
1300        //
1301        // `std::hint::spin_loop()` is a CPU pause hint, not a yield. On
1302        // a single-threaded executor (or any configuration where the FFI
1303        // caller's thread is the same one that needs to make progress on
1304        // the in-flight async work) the tight spin starves the very tokio
1305        // worker we're waiting for, *causing* the deadline to expire when
1306        // it otherwise wouldn't. `thread::yield_now` lets the OS schedule
1307        // whatever's blocked, and a 1ms `thread::sleep` between yields
1308        // prevents the loop from saturating a CPU on platforms where
1309        // `yield_now` is a
1310        // near-no-op under low contention. The drain we expect to take
1311        // milliseconds, so a millisecond-granularity poll is fine.
1312        let deadline = std::time::Instant::now() + FFI_SHUTDOWN_DEADLINE;
1313        let mut drained = false;
1314        loop {
1315            if handle_ref
1316                .active_ops
1317                .load(std::sync::atomic::Ordering::SeqCst)
1318                == 0
1319            {
1320                drained = true;
1321                break;
1322            }
1323            if std::time::Instant::now() >= deadline {
1324                break;
1325            }
1326            std::thread::yield_now();
1327            std::thread::sleep(std::time::Duration::from_millis(1));
1328        }
1329
1330        if !drained {
1331            // In-flight ops may still be reading `bus`/`runtime`; reading
1332            // them out via `ManuallyDrop::take` would race those readers.
1333            // Leak both fields along with the box. Future ops still see
1334            // `shutting_down=true` and bail before touching either field,
1335            // so the leaked memory is never read again.
1336            return NetError::Unknown.into();
1337        }
1338
1339        // Idempotent shutdown: if a previous `net_shutdown` already
1340        // moved out the bus/runtime, do not call `ManuallyDrop::take`
1341        // a second time (that would be UB). The first call may still
1342        // be inside `runtime.block_on(bus.shutdown())` though — pre-
1343        // fix the second caller observed `bus_taken == true` and
1344        // returned `Success` immediately, falsely signaling
1345        // completion of an in-progress shutdown. Spin on
1346        // `shutdown_completed` (set by the first caller AFTER
1347        // `bus.shutdown()` returns) so subsequent callers wait for
1348        // the actual completion.
1349        if handle_ref
1350            .bus_taken
1351            .swap(true, std::sync::atomic::Ordering::SeqCst)
1352        {
1353            // Wait for the first caller to actually finish.
1354            // Bounded by the same FFI_SHUTDOWN_DEADLINE as the
1355            // `active_ops` drain — if the first caller is wedged
1356            // longer than that, we surface a Transient error rather
1357            // than block forever.
1358            let inner_deadline = std::time::Instant::now() + FFI_SHUTDOWN_DEADLINE;
1359            while !handle_ref
1360                .shutdown_completed
1361                .load(std::sync::atomic::Ordering::Acquire)
1362            {
1363                if std::time::Instant::now() >= inner_deadline {
1364                    return NetError::Unknown.into();
1365                }
1366                std::thread::yield_now();
1367                std::thread::sleep(std::time::Duration::from_millis(1));
1368            }
1369            return NetError::Success.into();
1370        }
1371        drained
1372    };
1373    let _ = drained_and_taken;
1374
1375    // SAFETY: `active_ops` reached zero with `shutting_down=true`, so:
1376    //   - Every FFI op that started before shutdown has fully
1377    //     completed (decremented `active_ops` on guard drop).
1378    //   - Any future FFI op will observe `shutting_down=true` and
1379    //     bail in `try_enter` before touching `bus` / `runtime`.
1380    // Plus, `bus_taken` was just CAS'd from false → true, so no other
1381    // shutdown is concurrently moving the same fields out. The
1382    // immutable `handle_ref` borrow above has been dropped (block
1383    // scope ended), so the `&mut`-via-raw-pointer below is the
1384    // only live access — no stacked/tree-borrow race.
1385    //
1386    // We deliberately do NOT call `Box::from_raw` here. The box's
1387    // `shutting_down` / `active_ops` / `bus_taken` atomics must remain
1388    // valid memory because future FFI ops still dereference the
1389    // C-side pointer to check them. Leaking the box is the
1390    // correctness fix for the previous use-after-free; the per-handle
1391    // storage cost is a one-time overhead.
1392    let bus = unsafe { std::mem::ManuallyDrop::take(&mut (*handle).bus) };
1393    let runtime = unsafe { std::mem::ManuallyDrop::take(&mut (*handle).runtime) };
1394
1395    // Flush pending batches and gracefully shut down the adapter
1396    // before dropping the runtime. Without this, pending events in
1397    // ring buffers and batch workers would be silently lost.
1398    let result = runtime.block_on(bus.shutdown());
1399
1400    // `bus` and `runtime` go out of scope here and are dropped.
1401    // The leaked box keeps the atomics alive for any straggler ops.
1402
1403    // Signal completion to any second/third caller spinning on
1404    // `shutdown_completed` in the idempotent path above. Done
1405    // AFTER `bus.shutdown()` returns and AFTER the bus / runtime
1406    // drop, so subsequent callers can rely on this flag as a
1407    // hard "shutdown is fully done" barrier.
1408    unsafe { &*handle }
1409        .shutdown_completed
1410        .store(true, std::sync::atomic::Ordering::Release);
1411
1412    match result {
1413        Ok(()) => NetError::Success.into(),
1414        Err(_) => NetError::Unknown.into(),
1415    }
1416}
1417
1418/// Get the number of shards.
1419///
1420/// # Parameters
1421///
1422/// - `handle`: Event bus handle.
1423///
1424/// # Returns
1425///
1426/// Number of shards, or 0 if handle is null.
1427#[unsafe(no_mangle)]
1428pub unsafe extern "C" fn net_num_shards(handle: *mut NetHandle) -> u16 {
1429    if !handle_is_valid(handle) {
1430        return 0;
1431    }
1432    let handle = unsafe { &*handle };
1433    let _guard = match enter_ffi_op(handle) {
1434        Ok(g) => g,
1435        Err(_) => return 0,
1436    };
1437    handle.bus.num_shards()
1438}
1439
1440/// Get the library version.
1441///
1442/// # Returns
1443///
1444/// Version string (static, do not free).
1445#[unsafe(no_mangle)]
1446pub unsafe extern "C" fn net_version() -> *const c_char {
1447    static VERSION: &[u8] = b"0.8.0\0";
1448    VERSION.as_ptr() as *const c_char
1449}
1450
1451/// Generate a new Net keypair.
1452///
1453/// # Returns
1454///
1455/// JSON string with hex-encoded public_key and secret_key.
1456/// The caller must free the returned string with `net_free_string`.
1457/// Returns NULL if Net feature is not enabled.
1458#[cfg(feature = "net")]
1459#[unsafe(no_mangle)]
1460pub unsafe extern "C" fn net_generate_keypair() -> *mut c_char {
1461    let keypair = StaticKeypair::generate();
1462    let json = serde_json::json!({
1463        "public_key": hex::encode(keypair.public_key()),
1464        "secret_key": hex::encode(keypair.secret_key()),
1465    });
1466
1467    match CString::new(json.to_string()) {
1468        Ok(s) => s.into_raw(),
1469        Err(_) => ptr::null_mut(),
1470    }
1471}
1472
1473/// Free a string returned by Net functions.
1474///
1475/// # Parameters
1476///
1477/// - `s`: String pointer returned by `net_generate_keypair` or similar.
1478#[cfg(feature = "net")]
1479#[unsafe(no_mangle)]
1480pub unsafe extern "C" fn net_free_string(s: *mut c_char) {
1481    if !s.is_null() {
1482        unsafe {
1483            drop(CString::from_raw(s));
1484        }
1485    }
1486}
1487
1488// `net.h` declares both `net_generate_keypair` and
1489// `net_free_string` unconditionally — a consumer linking against
1490// a cdylib built without the `net` feature would otherwise hit
1491// a load-time missing-symbol error despite the header advertising
1492// the symbol. Provide always-empty stubs so the symbol is
1493// resolvable on every build configuration. Mirrors the
1494// `nat-traversal` cfg pattern in `mesh.rs`.
1495
1496/// Stub for builds without the `net` feature.
1497///
1498/// `net.h` declares `net_generate_keypair` unconditionally, so
1499/// the symbol must be resolvable on every build configuration.
1500/// Returns NULL since keypair generation requires the net feature.
1501#[cfg(not(feature = "net"))]
1502#[unsafe(no_mangle)]
1503pub unsafe extern "C" fn net_generate_keypair() -> *mut c_char {
1504    ptr::null_mut()
1505}
1506
1507/// Stub for builds without the `net` feature.
1508///
1509/// Mirrors the always-on signature in `net.h`. Reclaims a
1510/// CString-allocated pointer if non-null.
1511#[cfg(not(feature = "net"))]
1512#[unsafe(no_mangle)]
1513pub unsafe extern "C" fn net_free_string(s: *mut c_char) {
1514    if !s.is_null() {
1515        unsafe {
1516            drop(std::ffi::CString::from_raw(s));
1517        }
1518    }
1519}
1520
1521// =========================================================================
1522// Structured (non-JSON) API — _ex variants
1523// =========================================================================
1524
1525/// Ingestion receipt for C consumers.
1526#[repr(C)]
1527pub struct NetReceipt {
1528    /// Shard the event was assigned to.
1529    pub shard_id: u16,
1530    /// Insertion timestamp (nanoseconds).
1531    pub timestamp: u64,
1532}
1533
1534// Pin layout invariants for `NetReceipt`. `#[repr(C)]` already
1535// gives C ABI compatibility per platform, but doesn't catch a
1536// future field-reorder or field-add — both would silently break
1537// any C/Go/Python binding that hard-codes the struct layout.
1538// Static asserts on 64-bit targets (the production deployment
1539// shape) trip CI before such a change reaches a binary release.
1540//
1541// 64-bit: `u16 (2) + 6 pad + u64 (8)` = 16 bytes, alignment 8.
1542#[cfg(target_pointer_width = "64")]
1543const _: () = assert!(
1544    std::mem::size_of::<NetReceipt>() == 16,
1545    "NetReceipt size changed on 64-bit; bindings hard-code 16. \
1546     If the change is intentional, bump the binding versions and \
1547     update this assertion."
1548);
1549#[cfg(target_pointer_width = "64")]
1550const _: () = assert!(
1551    std::mem::align_of::<NetReceipt>() == 8,
1552    "NetReceipt alignment changed on 64-bit; bindings expect 8."
1553);
1554
1555/// A single stored event for C consumers.
1556///
1557/// # Safety contract for callers
1558///
1559/// `id`/`id_len` and `raw`/`raw_len` are produced by Rust as a
1560/// `Box<[u8]>` whose fat-pointer length is reconstructed at free
1561/// time from `id_len` / `raw_len`. The fields are `pub` because
1562/// `#[repr(C)]` exposes them to C, **but they must be treated as
1563/// read-only** between the `net_poll_*` call that produced them
1564/// and the `net_free_poll_result` that consumes them.
1565///
1566/// Mutating `id_len` or `raw_len` (or copying the struct, replacing
1567/// the pointer, and then freeing) causes
1568/// `Box::from_raw(slice_from_raw_parts_mut(ptr, wrong_len))` to be
1569/// undefined behavior on free — the allocator records the
1570/// allocation size and any mismatch is UB.
1571#[repr(C)]
1572pub struct NetEvent {
1573    /// Event ID (not null-terminated, use `id_len`).
1574    /// Read-only after `net_poll_*`; do not mutate.
1575    pub id: *const c_char,
1576    /// Length of the event ID. Read-only after `net_poll_*`; do not
1577    /// mutate (mutation causes UB on free).
1578    pub id_len: usize,
1579    /// Raw JSON payload (not null-terminated, use `raw_len`).
1580    /// Read-only after `net_poll_*`; do not mutate.
1581    pub raw: *const c_char,
1582    /// Length of the raw JSON payload. Read-only after
1583    /// `net_poll_*`; do not mutate (mutation causes UB on free).
1584    pub raw_len: usize,
1585    /// Insertion timestamp (nanoseconds).
1586    pub insertion_ts: u64,
1587    /// Shard ID.
1588    pub shard_id: u16,
1589}
1590
1591// Pin layout invariants for `NetEvent`. See `NetReceipt`'s
1592// asserts for rationale. Bindings (C, Go, Python, Node) hard-
1593// code 48 bytes on 64-bit; an accidental reorder or new field
1594// would silently shift every offset.
1595//
1596// 64-bit: `4 × 8 (ptrs/usize) + u64 (8) + u16 (2) + 6 trail` = 48.
1597#[cfg(target_pointer_width = "64")]
1598const _: () = assert!(
1599    std::mem::size_of::<NetEvent>() == 48,
1600    "NetEvent size changed on 64-bit; bindings hard-code 48. \
1601     If the change is intentional, bump the binding versions and \
1602     update this assertion."
1603);
1604#[cfg(target_pointer_width = "64")]
1605const _: () = assert!(
1606    std::mem::align_of::<NetEvent>() == 8,
1607    "NetEvent alignment changed on 64-bit; bindings expect 8."
1608);
1609
1610/// Poll result for C consumers.
1611#[repr(C)]
1612pub struct NetPollResult {
1613    /// Array of events. Free with `net_free_poll_result`.
1614    pub events: *mut NetEvent,
1615    /// Number of events in the array.
1616    pub count: usize,
1617    /// Cursor for the next poll (null-terminated). NULL if no more.
1618    pub next_id: *mut c_char,
1619    /// 1 if more events are available, 0 otherwise.
1620    pub has_more: c_int,
1621}
1622
1623/// Stats for C consumers.
1624#[repr(C)]
1625pub struct NetStats {
1626    /// Total events ingested.
1627    pub events_ingested: u64,
1628    /// Events dropped due to backpressure.
1629    pub events_dropped: u64,
1630    /// Batches dispatched to the adapter.
1631    pub batches_dispatched: u64,
1632}
1633
1634/// Ingest raw JSON with structured receipt.
1635#[unsafe(no_mangle)]
1636pub unsafe extern "C" fn net_ingest_raw_ex(
1637    handle: *mut NetHandle,
1638    json: *const c_char,
1639    len: usize,
1640    out: *mut NetReceipt,
1641) -> c_int {
1642    if !handle_is_valid(handle) || json.is_null() {
1643        return NetError::NullPointer.into();
1644    }
1645
1646    let handle = unsafe { &*handle };
1647    let _guard = match enter_ffi_op(handle) {
1648        Ok(g) => g,
1649        Err(err) => return err,
1650    };
1651
1652    // `slice::from_raw_parts` requires `len <= isize::MAX`.
1653    if len > isize::MAX as usize {
1654        return NetError::InvalidJson.into();
1655    }
1656    let json_bytes = unsafe { std::slice::from_raw_parts(json as *const u8, len) };
1657    let json_str = match std::str::from_utf8(json_bytes) {
1658        Ok(s) => s,
1659        Err(_) => return NetError::InvalidUtf8.into(),
1660    };
1661
1662    let raw = RawEvent::from_str(json_str);
1663
1664    match handle.bus.ingest_raw(raw) {
1665        Ok((shard_id, timestamp)) => {
1666            if !out.is_null() {
1667                unsafe {
1668                    (*out).shard_id = shard_id;
1669                    (*out).timestamp = timestamp;
1670                }
1671            }
1672            NetError::Success.into()
1673        }
1674        Err(_) => NetError::IngestionFailed.into(),
1675    }
1676}
1677
1678/// Poll events with structured result (no JSON overhead).
1679///
1680/// The caller must free the result with `net_free_poll_result`.
1681#[unsafe(no_mangle)]
1682pub unsafe extern "C" fn net_poll_ex(
1683    handle: *mut NetHandle,
1684    limit: usize,
1685    cursor: *const c_char,
1686    out: *mut NetPollResult,
1687) -> c_int {
1688    if !handle_is_valid(handle) || out.is_null() {
1689        return NetError::NullPointer.into();
1690    }
1691
1692    // Pre-validate `limit` BEFORE calling `bus.poll` — the bus
1693    // advances the consumer cursor before returning, so any
1694    // post-poll allocation failure (e.g. `Layout::array::<NetEvent>`
1695    // overflow on a pathological `count`, or `std::alloc::alloc`
1696    // returning null under memory pressure) would drop the response
1697    // and lose every event the cursor just stepped past. Reject
1698    // requests whose `count * size_of::<NetEvent>` would overflow
1699    // `isize::MAX` (the `Layout::array` cap) up front, so the
1700    // failure happens before the cursor moves.
1701    if limit > 0
1702        && (std::mem::size_of::<NetEvent>())
1703            .checked_mul(limit)
1704            .is_none_or(|v| v > isize::MAX as usize)
1705    {
1706        return NetError::IntOverflow.into();
1707    }
1708
1709    let handle = unsafe { &*handle };
1710    let _guard = match enter_ffi_op(handle) {
1711        Ok(g) => g,
1712        Err(err) => return err,
1713    };
1714
1715    let mut request = ConsumeRequest::new(limit);
1716    if !cursor.is_null() {
1717        if let Ok(s) = unsafe { CStr::from_ptr(cursor) }.to_str() {
1718            if !s.is_empty() {
1719                request = request.from(s);
1720            }
1721        }
1722    }
1723
1724    let response = match handle.runtime.block_on(handle.bus.poll(request)) {
1725        Ok(r) => r,
1726        Err(_) => return NetError::PollFailed.into(),
1727    };
1728
1729    let count = response.events.len();
1730
1731    // Allocate events array.
1732    //
1733    // Each iteration allocates two boxed byte slices via
1734    // `Vec::to_vec().into_boxed_slice()`, which panic on OOM in
1735    // the global allocator. A panic across this `extern "C"`
1736    // body is UB — under the cgo/N-API/cffi unwind model the
1737    // panic propagates into a frame that doesn't expect it. Wrap
1738    // the per-event build in `catch_unwind`, track how many
1739    // events we've fully written, and on panic / mid-loop
1740    // failure free the partial array via `free_events_array`
1741    // so neither UB nor the partial allocations leak.
1742    let events_ptr = if count > 0 {
1743        let layout = match std::alloc::Layout::array::<NetEvent>(count) {
1744            Ok(l) => l,
1745            Err(_) => return NetError::Unknown.into(),
1746        };
1747        let ptr = unsafe { std::alloc::alloc(layout) as *mut NetEvent };
1748        if ptr.is_null() {
1749            return NetError::Unknown.into();
1750        }
1751
1752        // Shared counter so the outer scope can clean up partial
1753        // writes if any iteration panics.
1754        let completed = std::cell::Cell::new(0usize);
1755        let build_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1756            for (i, event) in response.events.iter().enumerate() {
1757                let id_bytes = event.id.as_bytes().to_vec().into_boxed_slice();
1758                let id_len = id_bytes.len();
1759                let id_ptr = Box::into_raw(id_bytes) as *const c_char;
1760
1761                let raw_bytes = event.raw.to_vec().into_boxed_slice();
1762                let raw_len = raw_bytes.len();
1763                let raw_ptr = Box::into_raw(raw_bytes) as *const c_char;
1764
1765                unsafe {
1766                    ptr.add(i).write(NetEvent {
1767                        id: id_ptr,
1768                        id_len,
1769                        raw: raw_ptr,
1770                        raw_len,
1771                        insertion_ts: event.insertion_ts,
1772                        shard_id: event.shard_id,
1773                    });
1774                }
1775                completed.set(i + 1);
1776            }
1777        }));
1778        if build_result.is_err() {
1779            // A panic landed mid-loop. Free fully-written events
1780            // (those past `completed.get()` were never written, so
1781            // the inner `id`/`raw` pointers aren't valid). The
1782            // events array was allocated for `count` NetEvent
1783            // slots, so the dealloc must use that same layout.
1784            free_events_array_partial(ptr, completed.get(), count);
1785            return NetError::Unknown.into();
1786        }
1787        ptr
1788    } else {
1789        ptr::null_mut()
1790    };
1791
1792    // Leak next_id if present.
1793    let next_id_ptr = match response.next_id {
1794        Some(ref s) => match std::ffi::CString::new(s.as_str()) {
1795            Ok(c) => c.into_raw(),
1796            Err(_) => {
1797                // Free already-allocated events before returning
1798                // error. `s.as_str()` is valid UTF-8 by `String`
1799                // invariant, so this is the interior-NUL path —
1800                // an upstream cursor id that contains `\0` cannot
1801                // round-trip through a C string. Pre-fix this
1802                // returned `InvalidUtf8`, which mis-described
1803                // the cause; bindings now see the more accurate
1804                // `InteriorNul`.
1805                free_events_array(events_ptr, count);
1806                return NetError::InteriorNul.into();
1807            }
1808        },
1809        None => ptr::null_mut(),
1810    };
1811
1812    unsafe {
1813        (*out).events = events_ptr;
1814        (*out).count = count;
1815        (*out).next_id = next_id_ptr;
1816        (*out).has_more = if response.has_more { 1 } else { 0 };
1817    }
1818
1819    NetError::Success.into()
1820}
1821
1822/// Free an events array and all its id/raw allocations.
1823///
1824/// `count` is the number of fully-written events (those whose
1825/// inner `id` / `raw` boxed slices were initialized). It must
1826/// also match the `Layout::array::<NetEvent>` used at allocation
1827/// time — every existing caller writes exactly `count` events
1828/// before invoking this function. For partial-cleanup paths
1829/// (e.g. panic mid-build), use [`free_events_array_partial`].
1830fn free_events_array(events: *mut NetEvent, count: usize) {
1831    free_events_array_partial(events, count, count);
1832}
1833
1834/// Free an events array where only `walk_count` entries have
1835/// fully-initialized `id`/`raw` allocations, but the array
1836/// itself was allocated for `alloc_count` slots. Per-event
1837/// boxes are freed for `0..walk_count`; the array is then
1838/// deallocated with the original `Layout::array::<NetEvent>(alloc_count)`
1839/// to match the allocation. Used by `net_poll_ex`'s panic-mid-loop
1840/// recovery path.
1841fn free_events_array_partial(events: *mut NetEvent, walk_count: usize, alloc_count: usize) {
1842    if events.is_null() || alloc_count == 0 {
1843        return;
1844    }
1845    for i in 0..walk_count {
1846        let event = unsafe { &*events.add(i) };
1847        if !event.id.is_null() {
1848            unsafe {
1849                let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(
1850                    event.id as *mut u8,
1851                    event.id_len,
1852                ));
1853            }
1854        }
1855        if !event.raw.is_null() {
1856            unsafe {
1857                let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(
1858                    event.raw as *mut u8,
1859                    event.raw_len,
1860                ));
1861            }
1862        }
1863    }
1864    if let Ok(layout) = std::alloc::Layout::array::<NetEvent>(alloc_count) {
1865        unsafe {
1866            std::alloc::dealloc(events as *mut u8, layout);
1867        }
1868    }
1869}
1870
1871/// Free the internal allocations of a poll result returned by `net_poll_ex`.
1872///
1873/// This frees the events array (including each event's `id` and `raw` buffers)
1874/// and the `next_id` string. It does **not** free the `NetPollResult` struct
1875/// itself, which is caller-provided (typically stack-allocated or managed by
1876/// the caller).
1877#[unsafe(no_mangle)]
1878pub unsafe extern "C" fn net_free_poll_result(result: *mut NetPollResult) {
1879    if result.is_null() {
1880        return;
1881    }
1882
1883    let result = unsafe { &mut *result };
1884
1885    // Free events array and all id/raw allocations.
1886    free_events_array(result.events, result.count);
1887
1888    // Free next_id.
1889    if !result.next_id.is_null() {
1890        unsafe {
1891            drop(std::ffi::CString::from_raw(result.next_id));
1892        }
1893    }
1894
1895    // Null the fields so a second `net_free_poll_result` on the
1896    // same struct is a safe no-op rather than a double-free. The
1897    // C header's contract just says "free a poll result"; without
1898    // this clear, a defensive caller calling free twice (or two
1899    // wrappers each calling free in their destructor) would
1900    // re-`Box::from_raw` an already-freed pointer.
1901    result.events = std::ptr::null_mut();
1902    result.count = 0;
1903    result.next_id = std::ptr::null_mut();
1904    result.has_more = 0;
1905}
1906
1907/// Get stats without JSON serialization.
1908#[unsafe(no_mangle)]
1909pub unsafe extern "C" fn net_stats_ex(handle: *mut NetHandle, out: *mut NetStats) -> c_int {
1910    if !handle_is_valid(handle) || out.is_null() {
1911        return NetError::NullPointer.into();
1912    }
1913
1914    let handle = unsafe { &*handle };
1915    let _guard = match enter_ffi_op(handle) {
1916        Ok(g) => g,
1917        Err(err) => return err,
1918    };
1919    let stats = handle.bus.stats();
1920
1921    unsafe {
1922        (*out).events_ingested = stats
1923            .events_ingested
1924            .load(std::sync::atomic::Ordering::Relaxed);
1925        (*out).events_dropped = stats
1926            .events_dropped
1927            .load(std::sync::atomic::Ordering::Relaxed);
1928        (*out).batches_dispatched = stats
1929            .batches_dispatched
1930            .load(std::sync::atomic::Ordering::Relaxed);
1931    }
1932
1933    NetError::Success.into()
1934}
1935
1936#[cfg(test)]
1937mod tests {
1938    use super::*;
1939
1940    #[test]
1941    fn test_parse_config_valid() {
1942        let config = parse_config_json(r#"{"num_shards": 8}"#);
1943        assert!(config.is_some());
1944    }
1945
1946    #[test]
1947    fn test_parse_config_num_shards_overflow() {
1948        // u16::MAX is 65535, so 65536 should fail
1949        let config = parse_config_json(r#"{"num_shards": 65536}"#);
1950        assert!(
1951            config.is_none(),
1952            "num_shards exceeding u16::MAX should fail"
1953        );
1954
1955        // Much larger value should also fail
1956        let config = parse_config_json(r#"{"num_shards": 100000}"#);
1957        assert!(
1958            config.is_none(),
1959            "num_shards exceeding u16::MAX should fail"
1960        );
1961    }
1962
1963    #[test]
1964    fn test_parse_config_num_shards_max_valid() {
1965        // u16::MAX (65535) should be valid
1966        let config = parse_config_json(r#"{"num_shards": 65535}"#);
1967        assert!(config.is_some(), "num_shards at u16::MAX should be valid");
1968    }
1969
1970    #[test]
1971    fn test_parse_config_invalid_json() {
1972        let config = parse_config_json(r#"{"num_shards": invalid}"#);
1973        assert!(config.is_none());
1974    }
1975
1976    #[test]
1977    fn test_parse_config_empty() {
1978        let config = parse_config_json(r#"{}"#);
1979        assert!(config.is_some(), "empty config should use defaults");
1980    }
1981
1982    /// Pin: known `backpressure_mode` strings round-trip; an
1983    /// unknown value (typo) is rejected with `None`, not silently
1984    /// downgraded to `DropNewest`. Pre-fix a deploy-time typo
1985    /// like `"DropOldset"` swapped the operator's intended
1986    /// durability for `DropNewest` with no diagnostic.
1987    #[test]
1988    fn parse_config_rejects_unknown_backpressure_mode() {
1989        // Known values still parse.
1990        for s in [
1991            "DropNewest",
1992            "drop_newest",
1993            "DropOldest",
1994            "drop_oldest",
1995            "FailProducer",
1996            "fail_producer",
1997        ] {
1998            let cfg = parse_config_json(&format!(r#"{{"backpressure_mode": "{}"}}"#, s));
1999            assert!(cfg.is_some(), "known mode `{}` must parse", s);
2000        }
2001
2002        // Typos must fail.
2003        for s in ["DropOldset", "FailProduce", "drop_oldst", "garbage", ""] {
2004            let cfg = parse_config_json(&format!(r#"{{"backpressure_mode": "{}"}}"#, s));
2005            assert!(
2006                cfg.is_none(),
2007                "unknown mode `{}` must reject (pre-fix this silently \
2008                 fell through to DropNewest)",
2009                s,
2010            );
2011        }
2012
2013        // Wrong JSON type also fails — pre-fix this hit the
2014        // `and_then(|v| v.as_str())` short-circuit and was
2015        // ignored entirely.
2016        let cfg = parse_config_json(r#"{"backpressure_mode": 42}"#);
2017        assert!(
2018            cfg.is_none(),
2019            "non-string non-object backpressure_mode must reject"
2020        );
2021        let cfg = parse_config_json(r#"{"backpressure_mode": true}"#);
2022        assert!(cfg.is_none(), "boolean backpressure_mode must reject");
2023    }
2024
2025    /// Pin: the `Sample { rate }` mode is reachable from JSON
2026    /// via `{"backpressure_mode": {"Sample": {"rate": N}}}`,
2027    /// and a zero rate is rejected (validator already rejects
2028    /// it; the parser must too, so the surface is consistent).
2029    #[test]
2030    fn parse_config_supports_sample_mode_with_validation() {
2031        let cfg = parse_config_json(r#"{"backpressure_mode": {"Sample": {"rate": 10}}}"#);
2032        assert!(cfg.is_some(), "Sample with non-zero rate must parse");
2033
2034        let cfg = parse_config_json(r#"{"backpressure_mode": {"Sample": {"rate": 0}}}"#);
2035        assert!(cfg.is_none(), "Sample with rate=0 must reject");
2036
2037        let cfg = parse_config_json(r#"{"backpressure_mode": {"Sample": {}}}"#);
2038        assert!(cfg.is_none(), "Sample missing rate must reject");
2039    }
2040
2041    // Regression: the Go binding's `Poll(limit, cursor)` serializes a
2042    // `"cursor"` field that the FFI JSON path previously ignored —
2043    // cross-shard pagination silently broke. `parse_poll_request_json`
2044    // must round-trip the cursor into `ConsumeRequest.from_id`.
2045    #[test]
2046    fn test_parse_poll_request_preserves_cursor() {
2047        let req = parse_poll_request_json(r#"{"limit": 50, "cursor": "abc:123"}"#).unwrap();
2048        assert_eq!(req.limit, 50);
2049        assert_eq!(req.from_id.as_deref(), Some("abc:123"));
2050    }
2051
2052    #[test]
2053    fn test_parse_poll_request_no_cursor_defaults_to_none() {
2054        let req = parse_poll_request_json(r#"{"limit": 10}"#).unwrap();
2055        assert_eq!(req.limit, 10);
2056        assert_eq!(req.from_id, None);
2057    }
2058
2059    #[test]
2060    fn test_parse_poll_request_empty_uses_default_limit() {
2061        let req = parse_poll_request_json(r#"{}"#).unwrap();
2062        assert_eq!(req.limit, 100);
2063        assert_eq!(req.from_id, None);
2064    }
2065
2066    // Regression: a wrong-typed `"limit"` previously hit
2067    // `.as_u64().unwrap_or(100)` and silently defaulted. Caller bugs
2068    // (e.g. sending a string or a negative number) must surface as
2069    // `InvalidJson` instead.
2070    #[test]
2071    fn test_parse_poll_request_wrong_type_limit_errors() {
2072        let err = parse_poll_request_json(r#"{"limit": "50"}"#).unwrap_err();
2073        assert_eq!(err, c_int::from(NetError::InvalidJson));
2074        let err = parse_poll_request_json(r#"{"limit": -1}"#).unwrap_err();
2075        assert_eq!(err, c_int::from(NetError::InvalidJson));
2076    }
2077
2078    #[test]
2079    fn test_parse_poll_request_wrong_type_cursor_errors() {
2080        let err = parse_poll_request_json(r#"{"cursor": 123}"#).unwrap_err();
2081        assert_eq!(err, c_int::from(NetError::InvalidJson));
2082    }
2083
2084    #[test]
2085    fn test_parse_poll_request_null_fields_use_defaults() {
2086        let req = parse_poll_request_json(r#"{"limit": null, "cursor": null}"#).unwrap();
2087        assert_eq!(req.limit, 100);
2088        assert_eq!(req.from_id, None);
2089    }
2090
2091    /// `usize::MAX` is always a valid usize regardless of target
2092    /// pointer width, so it must parse successfully on both 32- and
2093    /// 64-bit builds. This pins the boundary case.
2094    #[test]
2095    fn test_parse_poll_request_limit_at_usize_max() {
2096        let json = format!(r#"{{"limit": {}}}"#, usize::MAX);
2097        let req = parse_poll_request_json(&json).unwrap();
2098        assert_eq!(req.limit, usize::MAX);
2099    }
2100
2101    /// Regression: `as usize` silently truncates on 32-bit targets
2102    /// for `u64` values above `usize::MAX`. The parser must return
2103    /// `InvalidJson` instead of wrapping. We only run this on 32-bit
2104    /// targets because on 64-bit `usize::MAX == u64::MAX`, leaving
2105    /// nothing that fits in u64 but not usize.
2106    #[cfg(target_pointer_width = "32")]
2107    #[test]
2108    fn test_parse_poll_request_limit_overflows_usize_on_32bit() {
2109        // 2^33 — fits in u64, but exceeds usize::MAX on a 32-bit build.
2110        let err = parse_poll_request_json(r#"{"limit": 8589934592}"#).unwrap_err();
2111        assert_eq!(err, c_int::from(NetError::InvalidJson));
2112    }
2113
2114    /// CR-22: pin parity between the Rust-side `NetError` enum and
2115    /// the two C-header copies. The Rust enum is the source of
2116    /// truth; C / Go consumers `errors.Is` against the named codes.
2117    /// Pre-CR-22 the headers were missing `-9` (IntOverflow) and
2118    /// `-10` (MismatchedHandles); a consumer receiving those values
2119    /// would fall into the unknown-code branch and lose actionable
2120    /// distinction.
2121    ///
2122    /// We extract every integer literal that appears as the
2123    /// right-hand side of an `= ` token in the file and check
2124    /// that each Rust-side value is present in BOTH headers. The
2125    /// test does NOT verify symbolic names; the sealing
2126    /// constraint is the numeric value alone.
2127    ///
2128    /// Both `include_str!` paths point inside `net/crates/net/`.
2129    /// `include/net.go.h` is a manually-synced mirror of the
2130    /// repo-root `go/net.h`. Reaching outside the crate root
2131    /// (`include_str!("../../../../../go/net.h")`) breaks
2132    /// `cargo publish` and any out-of-repo vendoring of this
2133    /// crate, so the in-crate copy is the supported source. A
2134    /// drift between the two surfaces here as a parity-test
2135    /// failure: one of them will be missing the new value.
2136    #[test]
2137    fn cr22_c_header_parity_with_rust_neterror() {
2138        let primary = include_str!("../../include/net.h");
2139        let go_copy = include_str!("../../include/net.go.h");
2140
2141        // The Rust enum's full set of values (mirrors `pub enum
2142        // NetError` above). When a new variant is added in the
2143        // Rust source, this list — AND both headers — must be
2144        // updated together. The asserts that follow then catch a
2145        // missing header update at the next CI run.
2146        let rust_values: &[i32] = &[0, -1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -99];
2147
2148        // Pull every numeric literal that looks like an enum-value
2149        // assignment (`= <number>` followed by `,` or whitespace).
2150        // Whitespace-tolerant: skips `= 0`, `=  0`, `= -10`, etc.
2151        fn extract_assigned_values(src: &str) -> Vec<i32> {
2152            let mut out = Vec::new();
2153            let mut chars = src.chars().peekable();
2154            while let Some(c) = chars.next() {
2155                if c != '=' {
2156                    continue;
2157                }
2158                // Skip whitespace.
2159                while let Some(&peek) = chars.peek() {
2160                    if peek == ' ' || peek == '\t' {
2161                        chars.next();
2162                    } else {
2163                        break;
2164                    }
2165                }
2166                // Optional sign.
2167                let mut buf = String::new();
2168                if let Some(&peek) = chars.peek() {
2169                    if peek == '-' || peek == '+' {
2170                        buf.push(peek);
2171                        chars.next();
2172                    }
2173                }
2174                // Digits.
2175                let mut had_digit = false;
2176                while let Some(&peek) = chars.peek() {
2177                    if peek.is_ascii_digit() {
2178                        buf.push(peek);
2179                        chars.next();
2180                        had_digit = true;
2181                    } else {
2182                        break;
2183                    }
2184                }
2185                if had_digit {
2186                    if let Ok(v) = buf.parse::<i32>() {
2187                        out.push(v);
2188                    }
2189                }
2190            }
2191            out
2192        }
2193
2194        let primary_vals = extract_assigned_values(primary);
2195        let go_vals = extract_assigned_values(go_copy);
2196
2197        for &v in rust_values {
2198            assert!(
2199                primary_vals.contains(&v),
2200                "CR-22 regression: include/net.h is missing the value {} \
2201                 (Rust NetError defines it). Add the matching `NET_ERR_*` \
2202                 enumerator before merging.",
2203                v
2204            );
2205            assert!(
2206                go_vals.contains(&v),
2207                "CR-22 regression: bindings/go/net/net.h is missing the value {} \
2208                 (Rust NetError defines it).",
2209                v
2210            );
2211        }
2212    }
2213
2214    /// CR-5: pin that `examples/capability.c` does not double-include
2215    /// `net.h` and `net.go.h`. Both files use the `NET_SDK_H` include
2216    /// guard, so when both are included in one TU the second is
2217    /// silently skipped — every `net_validate_capabilities` /
2218    /// `net_predicate_*` call the example makes becomes an
2219    /// implicit-declaration error on GCC 14+/Clang 16+, and a silent
2220    /// `int`-return miscompile on older toolchains. The deeper fix
2221    /// (renaming one guard so they compose cleanly) is tracked as
2222    /// CR-28; this test catches the example-level regression.
2223    #[test]
2224    fn cr5_example_does_not_double_include_net_headers() {
2225        let example = include_str!("../../examples/capability.c");
2226        let net_h_included = example.contains("#include \"../include/net.h\"");
2227        let net_go_h_included = example.contains("#include \"../include/net.go.h\"");
2228        assert!(
2229            net_go_h_included,
2230            "examples/capability.c must include net.go.h to declare \
2231             net_validate_capabilities + net_predicate_* symbols"
2232        );
2233        assert!(
2234            !net_h_included,
2235            "examples/capability.c must NOT also include net.h: \
2236             both headers share the NET_SDK_H guard, so the second \
2237             include is silently skipped, leaving the example's \
2238             net_predicate_* calls implicitly declared. Drop the \
2239             redundant include — net.go.h is a superset."
2240        );
2241    }
2242
2243    /// `handle_is_valid` rejects null and any pointer not aligned for
2244    /// `NetHandle`. A foreign caller producing a misaligned pointer
2245    /// (e.g. via an over-eager `void *` cast on a packed struct) hits
2246    /// `&*handle` UB before any other check fires; this gate is the
2247    /// pre-deref discriminator.
2248    #[test]
2249    fn handle_is_valid_rejects_null_and_misaligned() {
2250        // Null is rejected.
2251        assert!(
2252            !handle_is_valid(std::ptr::null::<NetHandle>()),
2253            "null pointer must not be considered a valid handle"
2254        );
2255
2256        // Aligned but non-null is accepted (we use a small backing
2257        // buffer to materialize a pointer without dereferencing it).
2258        // `align_of::<NetHandle>()` is the alignment we must match.
2259        let align = std::mem::align_of::<NetHandle>();
2260        let buf = vec![0u8; align * 2];
2261        let base = buf.as_ptr() as usize;
2262        let aligned = (base + align - 1) & !(align - 1);
2263        let aligned_ptr = aligned as *const NetHandle;
2264        assert!(
2265            handle_is_valid(aligned_ptr),
2266            "aligned non-null pointer must validate (align={align}, ptr={aligned_ptr:p})"
2267        );
2268
2269        // A pointer one byte past `aligned_ptr` is misaligned for any
2270        // type with align > 1, and `NetHandle` (containing `AtomicU32`,
2271        // `AtomicBool`, ManuallyDrop'd EventBus + Runtime) easily
2272        // exceeds 1.
2273        if align > 1 {
2274            let misaligned_ptr = (aligned + 1) as *const NetHandle;
2275            assert!(
2276                !handle_is_valid(misaligned_ptr),
2277                "misaligned pointer must be rejected (align={align}, ptr={misaligned_ptr:p})"
2278            );
2279        }
2280    }
2281
2282    /// Pin: zero values for `heartbeat_interval_ms` and
2283    /// `session_timeout_ms` must reject the entire config (parser
2284    /// returns `None`). Pre-fix the parser threaded `0` through
2285    /// to `Duration::from_millis(0)`, which on the Net adapter's
2286    /// heartbeat path results in a busy-loop that pegs a CPU and
2287    /// produces no diagnostic — the FFI caller saw a successful
2288    /// `net_init` followed by a hung daemon. The validator-level
2289    /// guard for cooldown / metrics_window has no equivalent on
2290    /// the Net-adapter side, so the parser is the only place that
2291    /// can refuse the build.
2292    #[cfg(feature = "net")]
2293    #[test]
2294    fn parse_config_rejects_zero_heartbeat_and_session_timeout() {
2295        // 32-byte hex strings (64 chars) so `hex::decode` produces
2296        // exactly the [u8; 32] the parser requires for `psk` and
2297        // `peer_public_key`.
2298        let psk = "0".repeat(64);
2299        let peer_pk = "1".repeat(64);
2300
2301        // Sanity: a config with both fields *non-zero* must parse
2302        // successfully — proves the rejection in the negative
2303        // cases below is caused by the zero, not a missing
2304        // required field on the surrounding `net` block.
2305        let baseline = format!(
2306            r#"{{"net":{{"bind_addr":"127.0.0.1:9000","peer_addr":"127.0.0.1:9001",
2307                "psk":"{psk}","peer_public_key":"{peer_pk}",
2308                "heartbeat_interval_ms":1000,"session_timeout_ms":30000}}}}"#
2309        );
2310        assert!(
2311            parse_config_json(&baseline).is_some(),
2312            "baseline net config with non-zero heartbeat/session_timeout must parse"
2313        );
2314
2315        // heartbeat_interval_ms = 0 → reject.
2316        let zero_hb = format!(
2317            r#"{{"net":{{"bind_addr":"127.0.0.1:9000","peer_addr":"127.0.0.1:9001",
2318                "psk":"{psk}","peer_public_key":"{peer_pk}",
2319                "heartbeat_interval_ms":0,"session_timeout_ms":30000}}}}"#
2320        );
2321        assert!(
2322            parse_config_json(&zero_hb).is_none(),
2323            "heartbeat_interval_ms=0 must reject (pre-fix this produced a CPU-pegging busy loop)"
2324        );
2325
2326        // session_timeout_ms = 0 → reject.
2327        let zero_to = format!(
2328            r#"{{"net":{{"bind_addr":"127.0.0.1:9000","peer_addr":"127.0.0.1:9001",
2329                "psk":"{psk}","peer_public_key":"{peer_pk}",
2330                "heartbeat_interval_ms":1000,"session_timeout_ms":0}}}}"#
2331        );
2332        assert!(
2333            parse_config_json(&zero_to).is_none(),
2334            "session_timeout_ms=0 must reject"
2335        );
2336    }
2337}