net/ffi/mod.rs
1//! C FFI bindings for cross-language integration.
2//!
3//! This module provides a C-compatible API for using Net from
4//! other languages (Python, Node.js, Go, etc.).
5//!
6//! # Safety
7//!
8//! All public FFI functions in this module accept raw pointers from C code.
9//! Each is declared `pub unsafe extern "C" fn` so the unsafety is
10//! explicit at the type level; the module-wide contract callers
11//! must uphold is:
12//! - Pointers are valid and properly aligned
13//! - Opaque handle pointers (`*mut T`) were produced by this crate's
14//! matching constructor (`Box::into_raw` inside the FFI surface).
15//! Foreign-allocated pointers, even if valid and aligned, will UB
16//! when consumed by `Box::from_raw` in the corresponding `_free`.
17//! - String pointers point to valid UTF-8 data
18//! - Buffer sizes are accurate
19//! - Handles are not used after `net_shutdown`
20//!
21//! The per-function `# Safety` rustdoc is intentionally suppressed
22//! at the module level — every entry point shares the same contract
23//! and the module doc-comment above (plus `include/README.md`) is
24//! the source of truth. Adding individual `# Safety` blocks would
25//! duplicate the same wording 200 times without adding signal.
26//!
27//! # Thread Safety
28//!
29//! All FFI functions are thread-safe. The event bus handle can be shared
30//! across threads.
31//!
32#![allow(clippy::missing_safety_doc)]
33// The cross-cutting C-side safety contract for every `unsafe` block in
34// this module is documented in the `# Safety` section above:
35// caller-validated pointer / length / lifetime / handle-not-after-shutdown
36// invariants documented in `include/net.h`. Inlining `// SAFETY:` on each
37// block would add ~200 identical "see module preamble" comments without
38// adding any signal beyond what the preamble already says.
39#![expect(
40 clippy::undocumented_unsafe_blocks,
41 reason = "module-wide FFI safety contract documented in the # Safety preamble above"
42)]
43#![expect(
44 clippy::multiple_unsafe_ops_per_block,
45 reason = "FFI entry points routinely deref + write to multiple out-parameter fields under the same caller contract; splitting per-op would obscure the single boundary-cross"
46)]
47
48//! # Tokio runtime restriction
49//!
50//! Internal FFI ops (`net_poll`, `net_flush`, `net_shutdown`,
51//! `net_redex_*`, `net_mesh_new`, the cortex FFI, the mesh FFI)
52//! drive the bus's tokio runtime via `Runtime::block_on`. That
53//! function panics with "Cannot start a runtime from within a
54//! runtime" if the calling thread is already inside a tokio
55//! runtime context. The functions are `extern "C"`, so a panic
56//! unwinds across the FFI boundary into C / Go-cgo / Python /
57//! NAPI — undefined behavior.
58//!
59//! **The common-case C / Go / Python caller has no Rust tokio
60//! runtime, so this is unreachable for them.** The narrow path is:
61//!
62//! - A **Rust** caller loads the cdylib and calls these
63//! functions from inside its own `#[tokio::main]` (or any
64//! thread that has called `Runtime::enter()`).
65//! - A non-Rust caller embeds a Rust library that runs its own
66//! tokio runtime and forwards calls into this cdylib on the
67//! same thread.
68//!
69//! Both forms are unusual but reachable. **Do not call any FFI
70//! op from a thread that already holds a tokio runtime
71//! context.** If you must, spawn the FFI call on a fresh OS
72//! thread that doesn't carry a runtime guard, or wrap the call
73//! with `tokio::task::spawn_blocking(|| net_xxx(...))` to escape
74//! the worker pool.
75//!
76//! `net_init` (`mod.rs:284-316`) hardens against this for runtime
77//! *construction*; the steady-state ops do not, since the cost
78//! of a `Handle::try_current()` check on every poll would be
79//! measurable for the common path that doesn't hit the bug.
80//!
81//! # `catch_unwind` + caller-held locks
82//!
83//! Several FFI entries (`net_blob_publish`, `net_blob_resolve`,
84//! `net_*_wait_for_token`) wrap their body in
85//! `std::panic::catch_unwind(AssertUnwindSafe(...))` so a panic
86//! during the call returns a typed `NET_ERR_BLOB_PANIC` /
87//! `NET_ERR_PANIC` code rather than unwinding across the FFI
88//! boundary. That stops the substrate-side undefined behavior,
89//! but it does NOT make the wrapped code transparently panic-safe
90//! from the caller's perspective.
91//!
92//! **If the caller invokes an FFI op while holding an OS-level
93//! lock, a `sync.Mutex` (Go), `threading.Lock` (Python), or any
94//! other mutex with poisoning semantics, and the FFI body panics,
95//! the mutex is left in a poisoned state.** Subsequent acquires
96//! on the same mutex by the caller observe the poisoning and
97//! either error (Rust `parking_lot` with `poison_on_unwind`) or
98//! deadlock (Go's `sync.Mutex` doesn't poison; the caller has
99//! observed a return value that may not reflect the state of
100//! the FFI op).
101//!
102//! Recommended caller pattern: **do not hold a caller-side lock
103//! across an FFI call**. Acquire the lock, prepare the inputs,
104//! release the lock, then call the FFI. Re-acquire if you need
105//! to update caller state with the result.
106//!
107//! The hazard is documented per-binding in:
108//! - Python: `bindings/python/README.md` (caller-mutex notes)
109//! - Node: `bindings/node/README.md`
110//! - Go: `bindings/go/net/redex.go` lifecycle docs
111//! - C: `include/net.h` (every wait-family declaration)
112//!
113//! # Memory Management
114//!
115//! - Handles returned by `net_init` must be freed with `net_shutdown`
116//! - String buffers passed to `net_poll` are owned by the caller
117//! - Error codes are returned as integers (0 = success, negative = error)
118//!
119//! # Example (C)
120//!
121//! ```c
122//! #include "net.h"
123//!
124//! int main() {
125//! // Initialize with default config
126//! void* bus = net_init("{\"num_shards\": 4}");
127//! if (!bus) return 1;
128//!
129//! // Ingest an event
130//! int result = net_ingest(bus, "{\"token\": \"hello\"}", 19);
131//! if (result < 0) { /* handle error */ }
132//!
133//! // Poll events
134//! char buffer[65536];
135//! result = net_poll(bus, "{\"limit\": 100}", buffer, sizeof(buffer));
136//!
137//! // Shutdown
138//! net_shutdown(bus);
139//! return 0;
140//! }
141//! ```
142
143// FFI functions accept raw pointers but are not marked `unsafe` to maintain
144// C ABI compatibility. Safety is documented in the module-level docs.
145#![allow(clippy::not_unsafe_ptr_arg_deref)]
146
147use std::ffi::CStr;
148use std::os::raw::{c_char, c_int};
149use std::ptr;
150
151use tokio::runtime::Runtime;
152
153use crate::bus::EventBus;
154use crate::config::EventBusConfig;
155use crate::consumer::{ConsumeRequest, ConsumeResponse};
156use crate::event::{Event, RawEvent};
157
158/// C FFI for CortEX / NetDb / RedexFile. Requires `netdb` (for the
159/// unified facade) and `redex-disk` (for persistent storage paths on
160/// `Redex` / `RedexFile`). Go / cgo consumers target this surface.
161///
162/// `missing_docs` is suppressed on this module: these are extern "C"
163/// shims over already-documented Rust adapters, and the per-function
164/// contract is documented in the binding-side READMEs (Go / TS / Py).
165/// Re-documenting each shim would duplicate with drift risk.
166/// Per-FFI-handle quiescing protocol shared by cortex / mesh
167/// handles to close the audit-#23/#24/#25 use-after-free hazards
168/// when a `_free` races a concurrent op. See module docs for the
169/// soundness story (intentional box leak) and the per-handle
170/// recipe.
171#[cfg(any(
172 all(feature = "netdb", feature = "redex-disk"),
173 feature = "net",
174 feature = "redis",
175))]
176pub mod handle_guard;
177
178#[cfg(all(feature = "netdb", feature = "redex-disk"))]
179#[allow(missing_docs)]
180pub mod cortex;
181
182/// C FFI for the Dataforts Phase 3 blob surface. Exposes the
183/// BlobRef wire codec, the global adapter registry, and the
184/// `publish_blob` / `resolve_payload` helpers for cgo / native
185/// consumers.
186#[cfg(feature = "dataforts")]
187#[allow(missing_docs)]
188pub mod blob;
189
190/// Stub definitions for the `net_mesh_blob_adapter_*` symbols
191/// when the `dataforts / netdb / redex-disk` feature triple is
192/// off. cgo / dlsym consumers link these symbols unconditionally
193/// (see `bindings/go/blob.go`), so a libnet built without the
194/// triple must still satisfy them — each stub returns
195/// `NET_ERR_FEATURE_NOT_BUILT` (or null) so Go programs route to
196/// a clean error rather than fail at program load. The module is
197/// empty when the feature triple is on (the real impls in
198/// `ffi::blob` cover the same symbol names).
199#[allow(missing_docs)]
200pub mod blob_stubs;
201
202/// C FFI for the encrypted-UDP mesh transport + channels. Requires
203/// the `net` feature (which brings in the crypto + transport). Go /
204/// cgo consumers target this surface alongside `ffi::cortex`. See
205/// the `ffi::cortex` note for why `missing_docs` is suppressed here.
206#[cfg(feature = "net")]
207#[allow(missing_docs)]
208pub mod mesh;
209
210/// C FFI for the transport surface (blob + directory transfer over the
211/// fairscheduler stream transport — Transport SDK plan T-C). Drives the
212/// node's transfer engine via the existing `MeshNodeHandle` +
213/// `MeshBlobAdapterHandle`, so it rides `net` + the blob-adapter feature
214/// set (the adapter handle needs `netdb` + `redex-disk`). Feature-off
215/// stubs for builds missing the quad live in `transport_stubs` below.
216#[cfg(all(
217 feature = "net",
218 feature = "dataforts",
219 feature = "netdb",
220 feature = "redex-disk"
221))]
222#[allow(missing_docs)]
223pub mod transport;
224
225/// Feature-off stubs for the transport symbols
226/// (`net_serve_blob_transfer` / `net_fetch_blob*` / `net_store_dir` /
227/// `net_fetch_dir` / `net_dir_manifest_read` / `net_transport_free_buffer`)
228/// when the quad above is not fully built. The Go binding
229/// (`bindings/go/net/transport.go`) links these unconditionally, so a
230/// libnet without the quad must still satisfy them — each stub returns
231/// `NET_ERR_FEATURE_NOT_BUILT` (or null / no-op) so Go programs route to
232/// a clean error rather than fail at program load. Empty (compiled out)
233/// when the quad is on — the real impls in `ffi::transport` then own the
234/// symbol names. Mirrors `ffi::blob_stubs`.
235#[allow(missing_docs)]
236pub mod transport_stubs;
237
238/// C FFI for the `aggregator.registry` RPC client + channel
239/// visibility setter. Stage 5 of `SDK_AGGREGATOR_SUBNET_PLAN.md`.
240/// Rides the `net` feature alongside `ffi::mesh` because every
241/// op needs a `MeshNodeHandle`, and `cortex` because the
242/// underlying `behavior::aggregator` module's RPC surface is
243/// cortex-only (`mesh_rpc`, `cortex::rpc`, `postcard`).
244#[cfg(all(feature = "net", feature = "cortex"))]
245#[allow(missing_docs)]
246pub mod aggregator;
247
248/// C FFI for stateless predicate evaluation (Phase 9c of
249/// `CAPABILITY_SYSTEM_SDK_PLAN.md`). Pure helpers — no handles,
250/// no state. Mirrors the SDK-layer `evaluatePredicate` /
251/// `evaluate_predicate` surface every binding ships, exposed at
252/// the C ABI for raw consumers (C / C++ / Zig / Swift / etc.).
253#[cfg(feature = "net")]
254pub mod predicate;
255
256/// C FFI for stateless capability-set validation (Phase 9a of
257/// `CAPABILITY_SYSTEM_SDK_PLAN.md`). Pure helper — `caps_json`
258/// in, `report_json` out. Mirrors the SDK-layer
259/// `validate_capabilities` surface, exposed at the C ABI for raw
260/// consumers.
261#[cfg(feature = "net")]
262pub mod schema;
263
264/// C FFI for predicate debug-session helpers (Phase 9d of
265/// `CAPABILITY_SYSTEM_SDK_PLAN.md`). Pure helpers — single-eval
266/// `evaluate_with_trace`, corpus-wide
267/// `aggregate_debug_report`, and host-side
268/// `redact_metadata_keys`. Mirror what every other binding
269/// ships at the SDK layer; exposed at the C ABI for raw
270/// consumers.
271#[cfg(feature = "net")]
272pub mod predicate_debug;
273
274/// C FFI for the Redis Streams consumer-side dedup helper. Mirrors
275/// the Rust `net::adapter::RedisStreamDedup` surface for Go / C / Zig
276/// consumers. See `ffi::redis_dedup` module docs for the wire
277/// shape and the dedup contract.
278#[cfg(feature = "redis")]
279pub mod redis_dedup;
280
281#[cfg(feature = "net")]
282use crate::adapter::net::{NetAdapterConfig, ReliabilityConfig, StaticKeypair};
283#[cfg(any(feature = "redis", feature = "jetstream", feature = "net"))]
284use crate::config::AdapterConfig;
285#[cfg(feature = "jetstream")]
286use crate::config::JetStreamAdapterConfig;
287#[cfg(feature = "redis")]
288use crate::config::RedisAdapterConfig;
289#[cfg(feature = "net")]
290use std::ffi::CString;
291
292/// Opaque handle to an event bus instance.
293///
294/// This wraps the EventBus along with a Tokio runtime for async operations.
295///
296/// # Lifetime / soundness
297///
298/// The handle storage is *intentionally leaked* on `net_shutdown` rather
299/// than freed via `Box::from_raw`. Reasoning: every FFI entry point
300/// dereferences the C-side `*mut NetHandle` to access the atomics that
301/// gate shutdown. The previous Dekker-style SeqCst handshake between
302/// `FfiOpGuard::try_enter` (which calls `fetch_add` on `active_ops`) and
303/// `net_shutdown` (which loads `active_ops` then `Box::from_raw`s the
304/// handle) was unsound: SeqCst orders the atomic operations only — the
305/// non-atomic `Box::from_raw` could deallocate the storage between
306/// shutdown's load and a concurrent FFI op's `fetch_add`, producing a
307/// use-after-free on the freed atomic. By never freeing the box, the
308/// atomic memory backing the handle is always valid; concurrent FFI ops
309/// observe `shutting_down=true` after shutdown signals it and bail
310/// before touching `bus`/`runtime`.
311///
312/// `bus` and `runtime` are stored in `ManuallyDrop` so that
313/// `net_shutdown` can `take` them out (via `ptr::read`) in order to
314/// call `bus.shutdown().await`. Because `shutting_down` is set first
315/// and shutdown waits for `active_ops` to drop to zero before reading
316/// these fields, no FFI op can be racing the read. If the wait times
317/// out, the `ptr::read` is skipped and both fields are leaked along
318/// with the box.
319pub struct NetHandle {
320 /// Owned `EventBus`. Read out via `ManuallyDrop::take` during
321 /// shutdown once `active_ops` has drained to zero. After that
322 /// point, `shutting_down` is `true` and no FFI op may access this
323 /// field.
324 bus: std::mem::ManuallyDrop<EventBus>,
325 /// Owned tokio runtime. Same lifetime contract as `bus`.
326 runtime: std::mem::ManuallyDrop<Runtime>,
327 /// Set to `true` once `net_shutdown` begins. All other FFI
328 /// functions check this flag and return `ShuttingDown` before
329 /// touching `bus` / `runtime`.
330 shutting_down: std::sync::atomic::AtomicBool,
331 /// Number of in-flight FFI operations (excluding shutdown itself).
332 /// `net_shutdown` spins until this drops to zero (with a deadline)
333 /// before reading `bus` / `runtime` to call shutdown.
334 active_ops: std::sync::atomic::AtomicU32,
335 /// Set to `true` after `net_shutdown` has consumed `bus` /
336 /// `runtime` via `ManuallyDrop::take`. A second `net_shutdown`
337 /// call observes this and returns `Success` without re-taking
338 /// (which would be UB). FFI ops also check this before touching
339 /// `bus` / `runtime`, defending against a contract-violating
340 /// caller that races a post-shutdown call.
341 bus_taken: std::sync::atomic::AtomicBool,
342 /// Set to `true` after `bus.shutdown()` returns from the
343 /// first `net_shutdown` call. A second/third concurrent
344 /// `net_shutdown` caller spins until this flips before
345 /// returning success — without this gate the second caller
346 /// observed `bus_taken == true` and returned `Success` while
347 /// the first caller was still mid-`block_on(bus.shutdown())`,
348 /// falsely signaling completion of an in-progress shutdown.
349 shutdown_completed: std::sync::atomic::AtomicBool,
350}
351
352/// Maximum time `net_shutdown` will wait for in-flight FFI operations
353/// to complete before giving up. If the deadline expires, the bus is
354/// leaked rather than read out — leaking is correct (the box is
355/// already leaked permanently for soundness reasons) but means the
356/// adapter's `flush()` / `shutdown()` won't run.
357const FFI_SHUTDOWN_DEADLINE: std::time::Duration = std::time::Duration::from_secs(5);
358
359/// RAII guard that increments `active_ops` on creation and decrements on drop.
360struct FfiOpGuard<'a> {
361 handle: &'a NetHandle,
362}
363
364impl<'a> FfiOpGuard<'a> {
365 /// Try to enter an FFI operation. Returns `None` if the handle is
366 /// shutting down or if `bus` / `runtime` have already been taken.
367 ///
368 /// Soundness rests on the fact that the box backing `handle` is
369 /// never freed (see `NetHandle` doc). The `fetch_add` is therefore
370 /// always on valid memory regardless of whether shutdown is in
371 /// progress. The subsequent loads decide whether the op is allowed
372 /// to proceed; if shutdown was signaled or `bus_taken` flipped
373 /// before our increment was visible, we bail without touching
374 /// `bus` / `runtime`. The `bus_taken` check defends against a
375 /// contract-violating caller that races a post-shutdown call: even
376 /// if `shutting_down` was reset somehow, an op that would touch the
377 /// already-taken `ManuallyDrop` fields is rejected.
378 fn try_enter(handle: &'a NetHandle) -> Option<Self> {
379 handle
380 .active_ops
381 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
382 if handle
383 .shutting_down
384 .load(std::sync::atomic::Ordering::SeqCst)
385 || handle.bus_taken.load(std::sync::atomic::Ordering::SeqCst)
386 {
387 handle
388 .active_ops
389 .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
390 None
391 } else {
392 Some(Self { handle })
393 }
394 }
395}
396
397impl Drop for FfiOpGuard<'_> {
398 fn drop(&mut self) {
399 self.handle
400 .active_ops
401 .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
402 }
403}
404
405/// Returns `true` when `handle` is non-null and aligned for
406/// `NetHandle`. Every `extern "C"` entry point that derefs the
407/// raw handle must gate on this — a misaligned pointer produced
408/// by an over-eager `void *` cast in a foreign caller would be
409/// immediate UB on `&*handle`, even before the `is_null` check.
410#[inline]
411fn handle_is_valid(handle: *const NetHandle) -> bool {
412 !handle.is_null() && (handle as usize).is_multiple_of(std::mem::align_of::<NetHandle>())
413}
414
415/// Error codes returned by FFI functions.
416#[repr(C)]
417pub enum NetError {
418 /// Success (no error).
419 Success = 0,
420 /// Null pointer passed.
421 NullPointer = -1,
422 /// Invalid UTF-8 string.
423 InvalidUtf8 = -2,
424 /// Invalid JSON.
425 InvalidJson = -3,
426 /// Initialization failed.
427 InitFailed = -4,
428 /// Ingestion failed (backpressure).
429 IngestionFailed = -5,
430 /// Poll failed.
431 PollFailed = -6,
432 /// Buffer too small.
433 BufferTooSmall = -7,
434 /// Shutting down.
435 ShuttingDown = -8,
436 /// Integer overflow: result does not fit in `c_int`.
437 IntOverflow = -9,
438 /// Stream handle does not belong to the supplied node handle.
439 /// Previously the send-family FFIs accepted any (stream, node)
440 /// pair without verifying they were created from the same node,
441 /// allowing silent cross-session traffic.
442 MismatchedHandles = -10,
443 /// `CString::new` failure: the input bytes are valid UTF-8 by
444 /// Rust's `String` invariant but contain an interior NUL byte
445 /// — and the C ABI cannot represent that, since C strings are
446 /// NUL-terminated. Pre-fix this was reported as
447 /// `InvalidUtf8`, which was wrong: the input is UTF-8-valid;
448 /// it just has a NUL where C expects it not to. A binding
449 /// reading the typed error and seeing "invalid UTF-8" would
450 /// chase the wrong cause.
451 InteriorNul = -11,
452 /// Unknown error.
453 Unknown = -99,
454}
455
456impl From<NetError> for c_int {
457 fn from(e: NetError) -> Self {
458 e as c_int
459 }
460}
461
462/// Enter an FFI operation with lifetime protection. Returns an `FfiOpGuard`
463/// that prevents `net_shutdown` from deallocating the handle until the guard
464/// is dropped. Returns `Err` with the error code if shutdown is in progress.
465#[inline]
466fn enter_ffi_op(handle: &NetHandle) -> Result<FfiOpGuard<'_>, c_int> {
467 FfiOpGuard::try_enter(handle).ok_or(NetError::ShuttingDown.into())
468}
469
470/// Initialize a new event bus.
471///
472/// # Parameters
473///
474/// - `config_json`: JSON configuration string (UTF-8, null-terminated).
475/// Pass NULL or empty string for default configuration.
476///
477/// # Returns
478///
479/// Opaque handle to the event bus, or NULL on failure.
480/// The handle must be freed with `net_shutdown`.
481///
482/// # Example Configuration
483///
484/// ```json
485/// {
486/// "num_shards": 8,
487/// "ring_buffer_capacity": 1048576,
488/// "backpressure_mode": "DropOldest",
489/// "batch": {
490/// "min_size": 1000,
491/// "max_size": 10000,
492/// "max_delay_ms": 10
493/// }
494/// }
495/// ```
496#[unsafe(no_mangle)]
497pub unsafe extern "C" fn net_init(config_json: *const c_char) -> *mut NetHandle {
498 // Parse and validate the config BEFORE constructing the tokio
499 // runtime. Building the runtime first would let any subsequent
500 // early-return path (`CStr::to_str` Err, `parse_config_json`
501 // returning None, `EventBus::new` returning Err) drop the
502 // local `Runtime` on function return. Dropping a multi-thread
503 // tokio runtime from inside ANOTHER tokio runtime's worker
504 // thread panics with "Cannot drop a runtime in a context where
505 // blocking is not allowed", unwinding across this `extern "C"`
506 // boundary into a Python / Go-cgo / NAPI / PyO3 caller —
507 // undefined behaviour. By validating inputs first, the runtime
508 // is only built once we know it will be installed into the
509 // `NetHandle` and survive the call.
510 let config = if config_json.is_null() {
511 EventBusConfig::default()
512 } else {
513 let config_str = match unsafe { CStr::from_ptr(config_json) }.to_str() {
514 Ok("") => EventBusConfig::default(),
515 Ok(s) => match parse_config_json(s) {
516 Some(cfg) => cfg,
517 None => return ptr::null_mut(),
518 },
519 Err(_) => return ptr::null_mut(),
520 };
521 config_str
522 };
523
524 // Now construct the runtime — its lifetime is tied to the
525 // returned `NetHandle` (via `create_with_config`), so the only
526 // remaining drop is on `net_shutdown`, which already handles
527 // it via `runtime.block_on(...)` (see #74) outside any other
528 // tokio context.
529 let runtime = match Runtime::new() {
530 Ok(rt) => rt,
531 Err(_) => return ptr::null_mut(),
532 };
533
534 create_with_config(runtime, config)
535}
536
537/// Parse JSON configuration into EventBusConfig.
538///
539/// Supports:
540/// - `num_shards`: number of shards
541/// - `ring_buffer_capacity`: ring buffer size per shard
542/// - `backpressure_mode`: "DropNewest", "DropOldest", "FailProducer"
543fn parse_config_json(json_str: &str) -> Option<EventBusConfig> {
544 let value: serde_json::Value = serde_json::from_str(json_str).ok()?;
545
546 let mut builder = EventBusConfig::builder();
547
548 if let Some(num_shards) = value.get("num_shards").and_then(|v| v.as_u64()) {
549 let num_shards = u16::try_from(num_shards).ok()?;
550 builder = builder.num_shards(num_shards);
551 }
552
553 if let Some(capacity) = value.get("ring_buffer_capacity").and_then(|v| v.as_u64()) {
554 let capacity = usize::try_from(capacity).ok()?;
555 builder = builder.ring_buffer_capacity(capacity);
556 }
557
558 if let Some(bp_value) = value.get("backpressure_mode") {
559 let bp_mode = if let Some(mode) = bp_value.as_str() {
560 match mode {
561 "DropNewest" | "drop_newest" => crate::config::BackpressureMode::DropNewest,
562 "DropOldest" | "drop_oldest" => crate::config::BackpressureMode::DropOldest,
563 "FailProducer" | "fail_producer" => crate::config::BackpressureMode::FailProducer,
564 // Pre-fix every other string silently fell back to
565 // `DropNewest`. A typo (`"DropOldset"`) thus
566 // changed durability profile at deploy time with
567 // no error. Reject unknowns to match the contract
568 // already enforced by `parse_poll_request_json`.
569 _ => return None,
570 }
571 } else if let Some(obj) = bp_value.as_object() {
572 // Object form: `{"Sample": {"rate": N}}` for the
573 // sampling mode that has an associated value.
574 if let Some(sample) = obj.get("Sample").or_else(|| obj.get("sample")) {
575 let rate = sample.get("rate").and_then(|v| v.as_u64())?;
576 let rate = u32::try_from(rate).ok()?;
577 if rate == 0 {
578 // Validated again by `EventBusConfig::validate`,
579 // but reject earlier so the parser surface
580 // matches the validator surface.
581 return None;
582 }
583 crate::config::BackpressureMode::Sample { rate }
584 } else {
585 return None;
586 }
587 } else {
588 return None;
589 };
590 builder = builder.backpressure_mode(bp_mode);
591 }
592
593 // Parse Redis config
594 #[cfg(feature = "redis")]
595 if let Some(redis) = value.get("redis") {
596 if let Some(url) = redis.get("url").and_then(|v| v.as_str()) {
597 let mut redis_config = RedisAdapterConfig::new(url);
598
599 if let Some(prefix) = redis.get("prefix").and_then(|v| v.as_str()) {
600 redis_config = redis_config.with_prefix(prefix);
601 }
602 if let Some(max_len) = redis.get("max_stream_len").and_then(|v| v.as_u64()) {
603 let max_len = usize::try_from(max_len).ok()?;
604 redis_config = redis_config.with_max_stream_len(max_len);
605 }
606 if let Some(pipeline_size) = redis.get("pipeline_size").and_then(|v| v.as_u64()) {
607 let pipeline_size = usize::try_from(pipeline_size).ok()?;
608 redis_config = redis_config.with_pipeline_size(pipeline_size);
609 }
610
611 builder = builder.adapter(AdapterConfig::Redis(redis_config));
612 }
613 }
614
615 // Parse JetStream config
616 #[cfg(feature = "jetstream")]
617 if let Some(jetstream) = value.get("jetstream") {
618 if let Some(url) = jetstream.get("url").and_then(|v| v.as_str()) {
619 let mut js_config = JetStreamAdapterConfig::new(url);
620
621 if let Some(prefix) = jetstream.get("prefix").and_then(|v| v.as_str()) {
622 js_config = js_config.with_prefix(prefix);
623 }
624 if let Some(max_messages) = jetstream.get("max_messages").and_then(|v| v.as_i64()) {
625 js_config = js_config.with_max_messages(max_messages);
626 }
627 if let Some(replicas) = jetstream.get("replicas").and_then(|v| v.as_u64()) {
628 let replicas = usize::try_from(replicas).ok()?;
629 js_config = js_config.with_replicas(replicas);
630 }
631
632 builder = builder.adapter(AdapterConfig::JetStream(js_config));
633 }
634 }
635
636 // Parse Net config
637 #[cfg(feature = "net")]
638 if let Some(net) = value.get("net") {
639 let bind_addr: std::net::SocketAddr = net
640 .get("bind_addr")
641 .and_then(|v| v.as_str())
642 .and_then(|s| s.parse().ok())?;
643
644 let peer_addr: std::net::SocketAddr = net
645 .get("peer_addr")
646 .and_then(|v| v.as_str())
647 .and_then(|s| s.parse().ok())?;
648
649 let psk: [u8; 32] = net
650 .get("psk")
651 .and_then(|v| v.as_str())
652 .and_then(|s| hex::decode(s).ok())
653 .and_then(|v| v.try_into().ok())?;
654
655 let role = net
656 .get("role")
657 .and_then(|v| v.as_str())
658 .unwrap_or("initiator");
659
660 let mut net_config = match role {
661 "initiator" => {
662 let peer_pubkey: [u8; 32] = net
663 .get("peer_public_key")
664 .and_then(|v| v.as_str())
665 .and_then(|s| hex::decode(s).ok())
666 .and_then(|v| v.try_into().ok())?;
667 NetAdapterConfig::initiator(bind_addr, peer_addr, psk, peer_pubkey)
668 }
669 "responder" => {
670 let secret_key: [u8; 32] = net
671 .get("secret_key")
672 .and_then(|v| v.as_str())
673 .and_then(|s| hex::decode(s).ok())
674 .and_then(|v| v.try_into().ok())?;
675 let public_key: [u8; 32] = net
676 .get("public_key")
677 .and_then(|v| v.as_str())
678 .and_then(|s| hex::decode(s).ok())
679 .and_then(|v| v.try_into().ok())?;
680 let keypair = StaticKeypair::from_keys(secret_key, public_key);
681 NetAdapterConfig::responder(bind_addr, peer_addr, psk, keypair)
682 }
683 _ => return None,
684 };
685
686 // Apply optional settings
687 if let Some(reliability) = net.get("reliability").and_then(|v| v.as_str()) {
688 net_config = net_config.with_reliability(match reliability {
689 "light" => ReliabilityConfig::Light,
690 "full" => ReliabilityConfig::Full,
691 _ => ReliabilityConfig::None,
692 });
693 }
694
695 if let Some(pool_size) = net.get("packet_pool_size").and_then(|v| v.as_u64()) {
696 if let Ok(size) = usize::try_from(pool_size) {
697 net_config = net_config.with_pool_size(size);
698 }
699 }
700
701 // Reject `0` for `heartbeat_interval_ms` and
702 // `session_timeout_ms`. `EventBusConfig::validate` rejects
703 // zero `Duration`s for `cooldown`, `metrics_window`, etc.,
704 // but the Net adapter's JSON parser had no equivalent guard
705 // — a `0` here flowed through to `Duration::from_millis(0)`,
706 // which on the heartbeat path busy-loops the heartbeat task
707 // and saturates a CPU. Treat zero as a misconfig and refuse
708 // to build the bus, surfacing as `InvalidJson` so the FFI
709 // caller sees a typed failure rather than a hung daemon.
710 if let Some(interval_ms) = net.get("heartbeat_interval_ms").and_then(|v| v.as_u64()) {
711 if interval_ms == 0 {
712 return None;
713 }
714 net_config =
715 net_config.with_heartbeat_interval(std::time::Duration::from_millis(interval_ms));
716 }
717
718 if let Some(timeout_ms) = net.get("session_timeout_ms").and_then(|v| v.as_u64()) {
719 if timeout_ms == 0 {
720 return None;
721 }
722 net_config =
723 net_config.with_session_timeout(std::time::Duration::from_millis(timeout_ms));
724 }
725
726 if let Some(batched) = net.get("batched_io").and_then(|v| v.as_bool()) {
727 net_config = net_config.with_batched_io(batched);
728 }
729
730 builder = builder.adapter(AdapterConfig::Net(Box::new(net_config)));
731 }
732
733 builder.build().ok()
734}
735
736fn create_with_config(runtime: Runtime, config: EventBusConfig) -> *mut NetHandle {
737 let bus = match runtime.block_on(EventBus::new(config)) {
738 Ok(bus) => bus,
739 Err(_) => {
740 // Send the runtime off to a fresh OS thread for
741 // dropping. Dropping a multi-thread tokio `Runtime`
742 // from inside another tokio runtime's worker thread
743 // panics ("Cannot drop a runtime in a context where
744 // blocking is not allowed"); a panic here would unwind
745 // across this `extern "C"` frame. The fresh thread
746 // guarantees a non-tokio context, so the drop is sound
747 // regardless of the caller's runtime environment. We
748 // don't `join()` the thread — the drop completes on
749 // its own and the caller has already been told
750 // `net_init` failed (returning null).
751 std::thread::spawn(move || drop(runtime));
752 return ptr::null_mut();
753 }
754 };
755
756 let handle = Box::new(NetHandle {
757 bus: std::mem::ManuallyDrop::new(bus),
758 runtime: std::mem::ManuallyDrop::new(runtime),
759 shutting_down: std::sync::atomic::AtomicBool::new(false),
760 active_ops: std::sync::atomic::AtomicU32::new(0),
761 bus_taken: std::sync::atomic::AtomicBool::new(false),
762 shutdown_completed: std::sync::atomic::AtomicBool::new(false),
763 });
764
765 Box::into_raw(handle)
766}
767
768/// Ingest a single event.
769///
770/// # Parameters
771///
772/// - `handle`: Event bus handle from `net_init`.
773/// - `event_json`: JSON event string (UTF-8).
774/// - `len`: Length of the event string in bytes.
775///
776/// # Returns
777///
778/// - `0` on success
779/// - Negative error code on failure
780#[unsafe(no_mangle)]
781pub unsafe extern "C" fn net_ingest(
782 handle: *mut NetHandle,
783 event_json: *const c_char,
784 len: usize,
785) -> c_int {
786 if !handle_is_valid(handle) || event_json.is_null() {
787 return NetError::NullPointer.into();
788 }
789
790 let handle = unsafe { &*handle };
791 let _guard = match enter_ffi_op(handle) {
792 Ok(g) => g,
793 Err(err) => return err,
794 };
795
796 // `slice::from_raw_parts` requires `len <= isize::MAX`. A
797 // C caller passing a sign-extended `-1` (or any
798 // `len > isize::MAX as usize`) triggers immediate UB before
799 // any other validation runs. Reject such inputs explicitly
800 // — caller should never see this in practice; surfacing a
801 // typed error is safer than UB.
802 if len > isize::MAX as usize {
803 return NetError::InvalidJson.into();
804 }
805 // Parse event JSON
806 let json_bytes = unsafe { std::slice::from_raw_parts(event_json as *const u8, len) };
807 let json_str = match std::str::from_utf8(json_bytes) {
808 Ok(s) => s,
809 Err(_) => return NetError::InvalidUtf8.into(),
810 };
811
812 let event = match Event::from_str(json_str) {
813 Ok(e) => e,
814 Err(_) => return NetError::InvalidJson.into(),
815 };
816
817 // Ingest
818 match handle.bus.ingest(event) {
819 Ok(_) => NetError::Success.into(),
820 Err(_) => NetError::IngestionFailed.into(),
821 }
822}
823
824/// Ingest a raw JSON string (fastest path).
825///
826/// The JSON string is stored directly without parsing.
827/// This is the recommended method for high-throughput ingestion.
828///
829/// # Parameters
830///
831/// - `handle`: Event bus handle from `net_init`.
832/// - `json`: JSON string (UTF-8).
833/// - `len`: Length of the JSON string in bytes.
834///
835/// # Returns
836///
837/// - `0` on success
838/// - Negative error code on failure
839#[unsafe(no_mangle)]
840pub unsafe extern "C" fn net_ingest_raw(
841 handle: *mut NetHandle,
842 json: *const c_char,
843 len: usize,
844) -> c_int {
845 if !handle_is_valid(handle) || json.is_null() {
846 return NetError::NullPointer.into();
847 }
848
849 let handle = unsafe { &*handle };
850 let _guard = match enter_ffi_op(handle) {
851 Ok(g) => g,
852 Err(err) => return err,
853 };
854
855 // `slice::from_raw_parts` requires `len <= isize::MAX`.
856 if len > isize::MAX as usize {
857 return NetError::InvalidJson.into();
858 }
859 let json_bytes = unsafe { std::slice::from_raw_parts(json as *const u8, len) };
860 let json_str = match std::str::from_utf8(json_bytes) {
861 Ok(s) => s,
862 Err(_) => return NetError::InvalidUtf8.into(),
863 };
864
865 let raw = RawEvent::from_str(json_str);
866
867 match handle.bus.ingest_raw(raw) {
868 Ok(_) => NetError::Success.into(),
869 Err(_) => NetError::IngestionFailed.into(),
870 }
871}
872
873/// Ingest multiple raw JSON strings (fastest batch path).
874///
875/// # Parameters
876///
877/// - `handle`: Event bus handle.
878/// - `jsons`: Array of pointers to JSON strings.
879/// - `lens`: Array of lengths for each JSON string.
880/// - `count`: Number of events in the arrays.
881///
882/// # Returns
883///
884/// Number of successfully ingested events, or negative error code.
885#[unsafe(no_mangle)]
886pub unsafe extern "C" fn net_ingest_raw_batch(
887 handle: *mut NetHandle,
888 jsons: *const *const c_char,
889 lens: *const usize,
890 count: usize,
891) -> c_int {
892 if !handle_is_valid(handle) || jsons.is_null() || lens.is_null() {
893 return NetError::NullPointer.into();
894 }
895 if count == 0 {
896 return 0;
897 }
898
899 let handle = unsafe { &*handle };
900 let _guard = match enter_ffi_op(handle) {
901 Ok(g) => g,
902 Err(err) => return err,
903 };
904 let mut events = Vec::with_capacity(count);
905 // Track per-entry drops so the caller's accounting can
906 // reconcile the returned count against the input count.
907 // Pre-fix per-entry rejects (null pointer, oversized length,
908 // invalid UTF-8) were silently `continue`-d and the caller
909 // saw `count - drops` accepted events without any signal as
910 // to which input indices were dropped. A binding that
911 // attributed the drop to back-pressure and retried got the
912 // wrong indices and double-published the good ones.
913 //
914 // The C-API contract is "returns count of accepted events";
915 // expanding it to take an out-param of dropped indices is
916 // an API addition, not a fix-in-place. Emit `tracing::warn!`
917 // with the offending index AND reason so operators
918 // observing the bus can correlate drop counts to specific
919 // inputs without changing the C surface. For high-volume
920 // bindings this should still be sized at one log line per
921 // dropped entry; if that ever matters in practice the
922 // `*_ex` follow-up can return the indices structurally.
923 let mut dropped_null = 0usize;
924 let mut dropped_oversize = 0usize;
925 let mut dropped_invalid_utf8 = 0usize;
926
927 for i in 0..count {
928 let json_ptr = unsafe { *jsons.add(i) };
929 let len = unsafe { *lens.add(i) };
930
931 if json_ptr.is_null() {
932 tracing::warn!(
933 index = i,
934 "net_ingest_raw_batch: dropping entry with null pointer"
935 );
936 dropped_null += 1;
937 continue;
938 }
939
940 // `slice::from_raw_parts` requires `len <= isize::MAX`.
941 // Skip pathological per-entry lengths rather than UB.
942 if len > isize::MAX as usize {
943 tracing::warn!(
944 index = i,
945 len,
946 "net_ingest_raw_batch: dropping entry with len > isize::MAX"
947 );
948 dropped_oversize += 1;
949 continue;
950 }
951 let json_bytes = unsafe { std::slice::from_raw_parts(json_ptr as *const u8, len) };
952 match std::str::from_utf8(json_bytes) {
953 Ok(json_str) => events.push(RawEvent::from_str(json_str)),
954 Err(_) => {
955 tracing::warn!(
956 index = i,
957 "net_ingest_raw_batch: dropping entry with invalid UTF-8"
958 );
959 dropped_invalid_utf8 += 1;
960 }
961 }
962 }
963 let total_dropped = dropped_null + dropped_oversize + dropped_invalid_utf8;
964 if total_dropped > 0 {
965 // Aggregate summary for log-pipeline filters that fold
966 // per-index lines.
967 tracing::warn!(
968 input_count = count,
969 dropped_null,
970 dropped_oversize,
971 dropped_invalid_utf8,
972 "net_ingest_raw_batch: {} of {} entries dropped before ingest",
973 total_dropped,
974 count,
975 );
976 }
977
978 let count = handle.bus.ingest_raw_batch(events);
979 // Returning `c_int::MAX` on overflow would be ambiguous with a real
980 // `INT_MAX` ingest. Signal overflow explicitly so callers doing
981 // accounting in high-throughput paths do not silently miscount.
982 c_int::try_from(count).unwrap_or_else(|_| NetError::IntOverflow.into())
983}
984
985/// Ingest multiple events.
986///
987/// # Parameters
988///
989/// - `handle`: Event bus handle.
990/// - `events_json`: JSON array of events (UTF-8, null-terminated).
991///
992/// # Returns
993///
994/// Number of successfully ingested events, or negative error code.
995#[unsafe(no_mangle)]
996pub unsafe extern "C" fn net_ingest_batch(
997 handle: *mut NetHandle,
998 events_json: *const c_char,
999) -> c_int {
1000 if !handle_is_valid(handle) || events_json.is_null() {
1001 return NetError::NullPointer.into();
1002 }
1003
1004 let handle = unsafe { &*handle };
1005 let _guard = match enter_ffi_op(handle) {
1006 Ok(g) => g,
1007 Err(err) => return err,
1008 };
1009
1010 let json_str = match unsafe { CStr::from_ptr(events_json) }.to_str() {
1011 Ok(s) => s,
1012 Err(_) => return NetError::InvalidUtf8.into(),
1013 };
1014
1015 // Parse as JSON array
1016 let array: Vec<serde_json::Value> = match serde_json::from_str(json_str) {
1017 Ok(a) => a,
1018 Err(_) => return NetError::InvalidJson.into(),
1019 };
1020
1021 let events: Vec<Event> = array.into_iter().map(Event::new).collect();
1022 let count = handle.bus.ingest_batch(events);
1023
1024 // Returning `c_int::MAX` on overflow would be ambiguous with a real
1025 // `INT_MAX` ingest. Signal overflow explicitly — matches the
1026 // `net_ingest_raw_batch` contract.
1027 c_int::try_from(count).unwrap_or_else(|_| NetError::IntOverflow.into())
1028}
1029
1030/// Parse the JSON request body passed to `net_poll` into a
1031/// `ConsumeRequest`. Returns the negative `NetError` code on parse
1032/// failure so the caller can surface it back across FFI. Both `limit`
1033/// and `cursor` are optional, but if either key is present with the
1034/// wrong JSON type it is an explicit error — silently falling back to
1035/// the default would hide caller bugs (e.g. the Go binding that
1036/// previously serialized `cursor` but had it dropped server-side).
1037fn parse_poll_request_json(json_str: &str) -> Result<ConsumeRequest, c_int> {
1038 let value: serde_json::Value =
1039 serde_json::from_str(json_str).map_err(|_| c_int::from(NetError::InvalidJson))?;
1040
1041 let limit = match value.get("limit") {
1042 None | Some(serde_json::Value::Null) => 100usize,
1043 Some(v) => match v.as_u64() {
1044 // `as usize` would silently truncate on 32-bit targets for
1045 // values above `usize::MAX`. Reject such inputs explicitly
1046 // so a caller asking for e.g. 2^33 events on a wasm32
1047 // build gets `InvalidJson` instead of a tiny wrap-around.
1048 Some(n) => usize::try_from(n).map_err(|_| c_int::from(NetError::InvalidJson))?,
1049 None => return Err(NetError::InvalidJson.into()),
1050 },
1051 };
1052 let cursor = match value.get("cursor") {
1053 None | Some(serde_json::Value::Null) => None,
1054 Some(v) => match v.as_str() {
1055 Some(s) => Some(s.to_owned()),
1056 None => return Err(NetError::InvalidJson.into()),
1057 },
1058 };
1059 let mut req = ConsumeRequest::new(limit);
1060 req.from_id = cursor;
1061 Ok(req)
1062}
1063
1064/// Build the JSON envelope `net_poll` writes into the caller's buffer.
1065///
1066/// **PERF_AUDIT §1.6.** Pre-fix every event was deserialized into a
1067/// full `serde_json::Value` tree (per-event hashmap + recursive
1068/// allocations + UTF-8 re-validation) and then re-serialized when the
1069/// envelope was emitted. Per event that's ~1-5 µs/KB plus an
1070/// unbounded chain of small allocations for the tree shape — a real
1071/// load on the FFI/SDK consume hot path.
1072///
1073/// The new path uses `from_slice::<&RawValue>`: validates the bytes
1074/// as well-formed JSON, hands back a borrowed view into the original
1075/// buffer, and lets the outer serialize splice those bytes verbatim
1076/// into the envelope. Zero per-event allocation on the parse-OK fast
1077/// path; the rare invalid-UTF-8 / invalid-JSON fallback still emits
1078/// the bytes as a JSON string so the caller can see what got skipped.
1079/// Events whose bytes are not even valid UTF-8 are skipped entirely
1080/// (only `parse_errors` records them) — same as the pre-fix path.
1081fn build_poll_envelope_json(response: &ConsumeResponse) -> Result<String, serde_json::Error> {
1082 use serde_json::value::RawValue;
1083 #[derive(serde::Serialize)]
1084 #[serde(untagged)]
1085 enum EventOut<'a> {
1086 Raw(&'a RawValue),
1087 Fallback(String),
1088 }
1089 let mut events_out: Vec<EventOut<'_>> = Vec::with_capacity(response.events.len());
1090 let mut parse_errors: usize = 0;
1091 for e in &response.events {
1092 match serde_json::from_slice::<&RawValue>(&e.raw) {
1093 Ok(rv) => events_out.push(EventOut::Raw(rv)),
1094 Err(_) => {
1095 parse_errors += 1;
1096 // Include the raw bytes as a string so the caller doesn't silently lose events
1097 if let Ok(raw) = e.raw_str() {
1098 events_out.push(EventOut::Fallback(raw.to_string()));
1099 }
1100 }
1101 }
1102 }
1103 #[derive(serde::Serialize)]
1104 struct EnvelopeBorrowed<'a> {
1105 events: &'a [EventOut<'a>],
1106 next_id: Option<&'a str>,
1107 has_more: bool,
1108 count: usize,
1109 parse_errors: usize,
1110 }
1111 serde_json::to_string(&EnvelopeBorrowed {
1112 events: &events_out,
1113 next_id: response.next_id.as_deref(),
1114 has_more: response.has_more,
1115 count: events_out.len(),
1116 parse_errors,
1117 })
1118}
1119
1120/// Poll events from the bus.
1121///
1122/// # Parameters
1123///
1124/// - `handle`: Event bus handle.
1125/// - `request_json`: JSON request string (UTF-8, null-terminated).
1126/// Example: `{"limit": 100, "ordering": "InsertionTs"}`
1127/// - `out_buffer`: Output buffer for JSON response.
1128/// - `buffer_len`: Size of the output buffer.
1129///
1130/// # Returns
1131///
1132/// - Number of bytes written to buffer on success
1133/// - Negative error code on failure
1134#[unsafe(no_mangle)]
1135pub unsafe extern "C" fn net_poll(
1136 handle: *mut NetHandle,
1137 request_json: *const c_char,
1138 out_buffer: *mut c_char,
1139 buffer_len: usize,
1140) -> c_int {
1141 if !handle_is_valid(handle) || out_buffer.is_null() {
1142 return NetError::NullPointer.into();
1143 }
1144
1145 let handle = unsafe { &*handle };
1146 let _guard = match enter_ffi_op(handle) {
1147 Ok(g) => g,
1148 Err(err) => return err,
1149 };
1150
1151 // Parse request
1152 let request = if request_json.is_null() {
1153 ConsumeRequest::new(100)
1154 } else {
1155 let json_str = match unsafe { CStr::from_ptr(request_json) }.to_str() {
1156 Ok(s) => s,
1157 Err(_) => return NetError::InvalidUtf8.into(),
1158 };
1159 match parse_poll_request_json(json_str) {
1160 Ok(req) => req,
1161 Err(code) => return code,
1162 }
1163 };
1164
1165 // Reject buffers too small to even hold an empty-response
1166 // JSON envelope. This catches the degenerate "tiny buffer"
1167 // case before we hit the adapter — `BufferTooSmall` returned
1168 // here means "no work was done, caller's cursor is unchanged."
1169 // 256 bytes comfortably fits the empty-response JSON below
1170 // even with a long echoed `next_id` cursor.
1171 const MIN_RESPONSE_BUFFER: usize = 256;
1172 if buffer_len < MIN_RESPONSE_BUFFER {
1173 return NetError::BufferTooSmall.into();
1174 }
1175
1176 // Stash the cursor before moving `request` into `poll()` so
1177 // the post-poll fallback can echo it back to the caller. On
1178 // overflow we write a minimal "no events delivered, cursor
1179 // unchanged" response so the caller's next poll re-fetches
1180 // the same range — events are not lost on idempotent
1181 // adapters (Redis XRANGE, JetStream direct_get).
1182 let cursor_snapshot = request.from_id.clone();
1183
1184 // Poll
1185 let response = match handle.runtime.block_on(handle.bus.poll(request)) {
1186 Ok(r) => r,
1187 Err(_) => return NetError::PollFailed.into(),
1188 };
1189
1190 // Serialize response. Events that fail to parse are included as raw
1191 // strings so the caller can see all events and detect parse failures.
1192 let total_events = response.events.len();
1193 let response_json = match build_poll_envelope_json(&response) {
1194 Ok(s) => s,
1195 Err(_) => return NetError::Unknown.into(),
1196 };
1197
1198 // Buffer overflow: emit a minimal fallback response that echoes
1199 // the caller's original cursor as `next_id`. The caller's next
1200 // poll runs against the same range and re-delivers the events
1201 // (idempotent on Redis XRANGE / JetStream direct_get). Without
1202 // this, a caller that trusts `next_id` blindly would advance
1203 // past the unread batch.
1204 if response_json.len() + 1 > buffer_len {
1205 let fallback = serde_json::to_string(&serde_json::json!({
1206 "events": [],
1207 "next_id": cursor_snapshot,
1208 "has_more": true,
1209 "count": 0,
1210 "parse_errors": 0,
1211 "buffer_too_small": true,
1212 "events_dropped": total_events,
1213 }))
1214 .unwrap_or_else(|_| String::from(
1215 r#"{"events":[],"next_id":null,"has_more":true,"count":0,"parse_errors":0,"buffer_too_small":true}"#
1216 ));
1217 if fallback.len() < buffer_len {
1218 unsafe {
1219 ptr::copy_nonoverlapping(
1220 fallback.as_ptr() as *const c_char,
1221 out_buffer,
1222 fallback.len(),
1223 );
1224 *out_buffer.add(fallback.len()) = 0;
1225 }
1226 }
1227 return NetError::BufferTooSmall.into();
1228 }
1229
1230 // Copy to output buffer
1231 unsafe {
1232 ptr::copy_nonoverlapping(
1233 response_json.as_ptr() as *const c_char,
1234 out_buffer,
1235 response_json.len(),
1236 );
1237 *out_buffer.add(response_json.len()) = 0; // Null terminate
1238 }
1239
1240 // Data was already copied into the caller's buffer; a
1241 // `c_int` overflow here means the byte count exceeds c_int's
1242 // range, NOT that the buffer was too small. Returning
1243 // `BufferTooSmall` would tell the caller to "resize and retry"
1244 // when retrying can't fix the actual condition. `IntOverflow`
1245 // is the documented variant for this case.
1246 match c_int::try_from(response_json.len()) {
1247 Ok(n) => n,
1248 Err(_) => NetError::IntOverflow.into(),
1249 }
1250}
1251
1252/// Get event bus statistics.
1253///
1254/// # Parameters
1255///
1256/// - `handle`: Event bus handle.
1257/// - `out_buffer`: Output buffer for JSON statistics.
1258/// - `buffer_len`: Size of the output buffer.
1259///
1260/// # Returns
1261///
1262/// Number of bytes written, or negative error code.
1263#[unsafe(no_mangle)]
1264pub unsafe extern "C" fn net_stats(
1265 handle: *mut NetHandle,
1266 out_buffer: *mut c_char,
1267 buffer_len: usize,
1268) -> c_int {
1269 if !handle_is_valid(handle) || out_buffer.is_null() {
1270 return NetError::NullPointer.into();
1271 }
1272
1273 let handle = unsafe { &*handle };
1274 let _guard = match enter_ffi_op(handle) {
1275 Ok(g) => g,
1276 Err(err) => return err,
1277 };
1278 let stats = handle.bus.stats();
1279 let shard_stats = handle.bus.shard_stats();
1280
1281 let stats_json = match serde_json::to_string(&serde_json::json!({
1282 "events_ingested": stats.events_ingested.load(std::sync::atomic::Ordering::Relaxed),
1283 "events_dropped": stats.events_dropped.load(std::sync::atomic::Ordering::Relaxed),
1284 "batches_dispatched": stats.batches_dispatched.load(std::sync::atomic::Ordering::Relaxed),
1285 "shard_events_ingested": shard_stats.events_ingested,
1286 "shard_events_dropped": shard_stats.events_dropped,
1287 "shard_batches_dispatched": shard_stats.batches_dispatched,
1288 })) {
1289 Ok(s) => s,
1290 Err(_) => return NetError::Unknown.into(),
1291 };
1292
1293 if stats_json.len() + 1 > buffer_len {
1294 return NetError::BufferTooSmall.into();
1295 }
1296
1297 unsafe {
1298 ptr::copy_nonoverlapping(
1299 stats_json.as_ptr() as *const c_char,
1300 out_buffer,
1301 stats_json.len(),
1302 );
1303 *out_buffer.add(stats_json.len()) = 0;
1304 }
1305
1306 // See net_poll above — the data was already copied, so an
1307 // overflowing length is `IntOverflow`, not `BufferTooSmall`.
1308 match c_int::try_from(stats_json.len()) {
1309 Ok(n) => n,
1310 Err(_) => NetError::IntOverflow.into(),
1311 }
1312}
1313
1314/// Flush all pending batches to the adapter.
1315///
1316/// # Parameters
1317///
1318/// - `handle`: Event bus handle.
1319///
1320/// # Returns
1321///
1322/// - `0` on success
1323/// - Negative error code on failure
1324#[unsafe(no_mangle)]
1325pub unsafe extern "C" fn net_flush(handle: *mut NetHandle) -> c_int {
1326 if !handle_is_valid(handle) {
1327 return NetError::NullPointer.into();
1328 }
1329
1330 let handle = unsafe { &*handle };
1331 let _guard = match enter_ffi_op(handle) {
1332 Ok(g) => g,
1333 Err(err) => return err,
1334 };
1335
1336 match handle.runtime.block_on(handle.bus.flush()) {
1337 Ok(_) => NetError::Success.into(),
1338 Err(_) => NetError::Unknown.into(),
1339 }
1340}
1341
1342/// Shut down the event bus and free resources.
1343///
1344/// # Parameters
1345///
1346/// - `handle`: Event bus handle. After this call, the handle is invalid.
1347///
1348/// # Returns
1349///
1350/// - `0` on success
1351/// - Negative error code on failure (including `Unknown` if the
1352/// bounded wait for in-flight FFI operations expired before the bus
1353/// could be shut down cleanly)
1354///
1355/// # Notes
1356///
1357/// The handle's storage is intentionally leaked: the box is never
1358/// returned to the allocator. See `NetHandle`'s docs for why. This is
1359/// a one-time cost per shutdown — typically per-process, since most C
1360/// callers initialize the bus once and shut down once.
1361#[unsafe(no_mangle)]
1362pub unsafe extern "C" fn net_shutdown(handle: *mut NetHandle) -> c_int {
1363 if !handle_is_valid(handle) {
1364 return NetError::NullPointer.into();
1365 }
1366
1367 // Scope the `&NetHandle` borrow into an inner block so it is
1368 // verifiably out of scope before the
1369 // `ManuallyDrop::take(&mut (*handle).bus)` calls below.
1370 // Holding an `&NetHandle` in scope for the whole function
1371 // while taking a raw `&mut (*handle).bus` later would rely on
1372 // NLL ending the immutable borrow before the mutable take —
1373 // a pattern fragile under stacked/tree borrow models. The
1374 // block-scoped borrow makes the lifetime constraint explicit
1375 // and obvious to both the compiler and any future maintainer.
1376 let drained_and_taken = {
1377 // SAFETY: The C contract guarantees `handle` is valid here and that
1378 // `net_shutdown` is not called concurrently with itself. Future
1379 // dereferences of the box from concurrent FFI ops on other threads
1380 // are also sound because we never free the box (see below).
1381 let handle_ref = unsafe { &*handle };
1382
1383 // Signal shutdown so concurrent FFI calls bail before touching
1384 // `bus`/`runtime`. SeqCst pairs with `FfiOpGuard::try_enter`.
1385 handle_ref
1386 .shutting_down
1387 .store(true, std::sync::atomic::Ordering::SeqCst);
1388
1389 // Bounded wait for in-flight ops to drain. Without a deadline, a
1390 // hung concurrent operation (e.g. `net_flush` against a stalled
1391 // adapter) would pin a CPU at 100% inside this loop forever.
1392 //
1393 // `std::hint::spin_loop()` is a CPU pause hint, not a yield. On
1394 // a single-threaded executor (or any configuration where the FFI
1395 // caller's thread is the same one that needs to make progress on
1396 // the in-flight async work) the tight spin starves the very tokio
1397 // worker we're waiting for, *causing* the deadline to expire when
1398 // it otherwise wouldn't. `thread::yield_now` lets the OS schedule
1399 // whatever's blocked, and a 1ms `thread::sleep` between yields
1400 // prevents the loop from saturating a CPU on platforms where
1401 // `yield_now` is a
1402 // near-no-op under low contention. The drain we expect to take
1403 // milliseconds, so a millisecond-granularity poll is fine.
1404 let deadline = std::time::Instant::now() + FFI_SHUTDOWN_DEADLINE;
1405 let mut drained = false;
1406 loop {
1407 if handle_ref
1408 .active_ops
1409 .load(std::sync::atomic::Ordering::SeqCst)
1410 == 0
1411 {
1412 drained = true;
1413 break;
1414 }
1415 if std::time::Instant::now() >= deadline {
1416 break;
1417 }
1418 std::thread::yield_now();
1419 std::thread::sleep(std::time::Duration::from_millis(1));
1420 }
1421
1422 if !drained {
1423 // In-flight ops may still be reading `bus`/`runtime`; reading
1424 // them out via `ManuallyDrop::take` would race those readers.
1425 // Leak both fields along with the box. Future ops still see
1426 // `shutting_down=true` and bail before touching either field,
1427 // so the leaked memory is never read again.
1428 return NetError::Unknown.into();
1429 }
1430
1431 // Idempotent shutdown: if a previous `net_shutdown` already
1432 // moved out the bus/runtime, do not call `ManuallyDrop::take`
1433 // a second time (that would be UB). The first call may still
1434 // be inside `runtime.block_on(bus.shutdown())` though — pre-
1435 // fix the second caller observed `bus_taken == true` and
1436 // returned `Success` immediately, falsely signaling
1437 // completion of an in-progress shutdown. Spin on
1438 // `shutdown_completed` (set by the first caller AFTER
1439 // `bus.shutdown()` returns) so subsequent callers wait for
1440 // the actual completion.
1441 if handle_ref
1442 .bus_taken
1443 .swap(true, std::sync::atomic::Ordering::SeqCst)
1444 {
1445 // Wait for the first caller to actually finish.
1446 // Bounded by the same FFI_SHUTDOWN_DEADLINE as the
1447 // `active_ops` drain — if the first caller is wedged
1448 // longer than that, we surface a Transient error rather
1449 // than block forever.
1450 let inner_deadline = std::time::Instant::now() + FFI_SHUTDOWN_DEADLINE;
1451 while !handle_ref
1452 .shutdown_completed
1453 .load(std::sync::atomic::Ordering::Acquire)
1454 {
1455 if std::time::Instant::now() >= inner_deadline {
1456 return NetError::Unknown.into();
1457 }
1458 std::thread::yield_now();
1459 std::thread::sleep(std::time::Duration::from_millis(1));
1460 }
1461 return NetError::Success.into();
1462 }
1463 drained
1464 };
1465 let _ = drained_and_taken;
1466
1467 // SAFETY: `active_ops` reached zero with `shutting_down=true`, so:
1468 // - Every FFI op that started before shutdown has fully
1469 // completed (decremented `active_ops` on guard drop).
1470 // - Any future FFI op will observe `shutting_down=true` and
1471 // bail in `try_enter` before touching `bus` / `runtime`.
1472 // Plus, `bus_taken` was just CAS'd from false → true, so no other
1473 // shutdown is concurrently moving the same fields out. The
1474 // immutable `handle_ref` borrow above has been dropped (block
1475 // scope ended), so the `&mut`-via-raw-pointer below is the
1476 // only live access — no stacked/tree-borrow race.
1477 //
1478 // We deliberately do NOT call `Box::from_raw` here. The box's
1479 // `shutting_down` / `active_ops` / `bus_taken` atomics must remain
1480 // valid memory because future FFI ops still dereference the
1481 // C-side pointer to check them. Leaking the box is the
1482 // correctness fix for the previous use-after-free; the per-handle
1483 // storage cost is a one-time overhead.
1484 let bus = unsafe { std::mem::ManuallyDrop::take(&mut (*handle).bus) };
1485 let runtime = unsafe { std::mem::ManuallyDrop::take(&mut (*handle).runtime) };
1486
1487 // Flush pending batches and gracefully shut down the adapter
1488 // before dropping the runtime. Without this, pending events in
1489 // ring buffers and batch workers would be silently lost.
1490 let result = runtime.block_on(bus.shutdown());
1491
1492 // `bus` and `runtime` go out of scope here and are dropped.
1493 // The leaked box keeps the atomics alive for any straggler ops.
1494
1495 // Signal completion to any second/third caller spinning on
1496 // `shutdown_completed` in the idempotent path above. Done
1497 // AFTER `bus.shutdown()` returns and AFTER the bus / runtime
1498 // drop, so subsequent callers can rely on this flag as a
1499 // hard "shutdown is fully done" barrier.
1500 unsafe { &*handle }
1501 .shutdown_completed
1502 .store(true, std::sync::atomic::Ordering::Release);
1503
1504 match result {
1505 Ok(()) => NetError::Success.into(),
1506 Err(_) => NetError::Unknown.into(),
1507 }
1508}
1509
1510/// Get the number of shards.
1511///
1512/// # Parameters
1513///
1514/// - `handle`: Event bus handle.
1515///
1516/// # Returns
1517///
1518/// Number of shards, or 0 if handle is null.
1519#[unsafe(no_mangle)]
1520pub unsafe extern "C" fn net_num_shards(handle: *mut NetHandle) -> u16 {
1521 if !handle_is_valid(handle) {
1522 return 0;
1523 }
1524 let handle = unsafe { &*handle };
1525 let _guard = match enter_ffi_op(handle) {
1526 Ok(g) => g,
1527 Err(_) => return 0,
1528 };
1529 handle.bus.num_shards()
1530}
1531
1532/// Get the library version.
1533///
1534/// # Returns
1535///
1536/// Version string (static, do not free).
1537#[unsafe(no_mangle)]
1538pub unsafe extern "C" fn net_version() -> *const c_char {
1539 static VERSION: &[u8] = b"0.8.0\0";
1540 VERSION.as_ptr() as *const c_char
1541}
1542
1543/// Generate a new Net keypair.
1544///
1545/// # Returns
1546///
1547/// JSON string with hex-encoded public_key and secret_key.
1548/// The caller must free the returned string with `net_free_string`.
1549/// Returns NULL if Net feature is not enabled.
1550#[cfg(feature = "net")]
1551#[unsafe(no_mangle)]
1552pub unsafe extern "C" fn net_generate_keypair() -> *mut c_char {
1553 let keypair = StaticKeypair::generate();
1554 let json = serde_json::json!({
1555 "public_key": hex::encode(keypair.public_key()),
1556 "secret_key": hex::encode(keypair.secret_key()),
1557 });
1558
1559 match CString::new(json.to_string()) {
1560 Ok(s) => s.into_raw(),
1561 Err(_) => ptr::null_mut(),
1562 }
1563}
1564
1565/// Free a string returned by Net functions.
1566///
1567/// # Parameters
1568///
1569/// - `s`: String pointer returned by `net_generate_keypair` or similar.
1570#[cfg(feature = "net")]
1571#[unsafe(no_mangle)]
1572pub unsafe extern "C" fn net_free_string(s: *mut c_char) {
1573 if !s.is_null() {
1574 unsafe {
1575 drop(CString::from_raw(s));
1576 }
1577 }
1578}
1579
1580// `net.h` declares both `net_generate_keypair` and
1581// `net_free_string` unconditionally — a consumer linking against
1582// a cdylib built without the `net` feature would otherwise hit
1583// a load-time missing-symbol error despite the header advertising
1584// the symbol. Provide always-empty stubs so the symbol is
1585// resolvable on every build configuration. Mirrors the
1586// `nat-traversal` cfg pattern in `mesh.rs`.
1587
1588/// Stub for builds without the `net` feature.
1589///
1590/// `net.h` declares `net_generate_keypair` unconditionally, so
1591/// the symbol must be resolvable on every build configuration.
1592/// Returns NULL since keypair generation requires the net feature.
1593#[cfg(not(feature = "net"))]
1594#[unsafe(no_mangle)]
1595pub unsafe extern "C" fn net_generate_keypair() -> *mut c_char {
1596 ptr::null_mut()
1597}
1598
1599/// Stub for builds without the `net` feature.
1600///
1601/// Mirrors the always-on signature in `net.h`. Reclaims a
1602/// CString-allocated pointer if non-null.
1603#[cfg(not(feature = "net"))]
1604#[unsafe(no_mangle)]
1605pub unsafe extern "C" fn net_free_string(s: *mut c_char) {
1606 if !s.is_null() {
1607 unsafe {
1608 drop(std::ffi::CString::from_raw(s));
1609 }
1610 }
1611}
1612
1613// =========================================================================
1614// Structured (non-JSON) API — _ex variants
1615// =========================================================================
1616
1617/// Ingestion receipt for C consumers.
1618#[repr(C)]
1619pub struct NetReceipt {
1620 /// Shard the event was assigned to.
1621 pub shard_id: u16,
1622 /// Insertion timestamp (nanoseconds).
1623 pub timestamp: u64,
1624}
1625
1626// Pin layout invariants for `NetReceipt`. `#[repr(C)]` already
1627// gives C ABI compatibility per platform, but doesn't catch a
1628// future field-reorder or field-add — both would silently break
1629// any C/Go/Python binding that hard-codes the struct layout.
1630// Static asserts on 64-bit targets (the production deployment
1631// shape) trip CI before such a change reaches a binary release.
1632//
1633// 64-bit: `u16 (2) + 6 pad + u64 (8)` = 16 bytes, alignment 8.
1634#[cfg(target_pointer_width = "64")]
1635const _: () = assert!(
1636 std::mem::size_of::<NetReceipt>() == 16,
1637 "NetReceipt size changed on 64-bit; bindings hard-code 16. \
1638 If the change is intentional, bump the binding versions and \
1639 update this assertion."
1640);
1641#[cfg(target_pointer_width = "64")]
1642const _: () = assert!(
1643 std::mem::align_of::<NetReceipt>() == 8,
1644 "NetReceipt alignment changed on 64-bit; bindings expect 8."
1645);
1646
1647/// A single stored event for C consumers.
1648///
1649/// # Safety contract for callers
1650///
1651/// `id`/`id_len` and `raw`/`raw_len` are produced by Rust as a
1652/// `Box<[u8]>` whose fat-pointer length is reconstructed at free
1653/// time from `id_len` / `raw_len`. The fields are `pub` because
1654/// `#[repr(C)]` exposes them to C, **but they must be treated as
1655/// read-only** between the `net_poll_*` call that produced them
1656/// and the `net_free_poll_result` that consumes them.
1657///
1658/// Mutating `id_len` or `raw_len` (or copying the struct, replacing
1659/// the pointer, and then freeing) causes
1660/// `Box::from_raw(slice_from_raw_parts_mut(ptr, wrong_len))` to be
1661/// undefined behavior on free — the allocator records the
1662/// allocation size and any mismatch is UB.
1663#[repr(C)]
1664pub struct NetEvent {
1665 /// Event ID (not null-terminated, use `id_len`).
1666 /// Read-only after `net_poll_*`; do not mutate.
1667 pub id: *const c_char,
1668 /// Length of the event ID. Read-only after `net_poll_*`; do not
1669 /// mutate (mutation causes UB on free).
1670 pub id_len: usize,
1671 /// Raw JSON payload (not null-terminated, use `raw_len`).
1672 /// Read-only after `net_poll_*`; do not mutate.
1673 pub raw: *const c_char,
1674 /// Length of the raw JSON payload. Read-only after
1675 /// `net_poll_*`; do not mutate (mutation causes UB on free).
1676 pub raw_len: usize,
1677 /// Insertion timestamp (nanoseconds).
1678 pub insertion_ts: u64,
1679 /// Shard ID.
1680 pub shard_id: u16,
1681}
1682
1683// Pin layout invariants for `NetEvent`. See `NetReceipt`'s
1684// asserts for rationale. Bindings (C, Go, Python, Node) hard-
1685// code 48 bytes on 64-bit; an accidental reorder or new field
1686// would silently shift every offset.
1687//
1688// 64-bit: `4 × 8 (ptrs/usize) + u64 (8) + u16 (2) + 6 trail` = 48.
1689#[cfg(target_pointer_width = "64")]
1690const _: () = assert!(
1691 std::mem::size_of::<NetEvent>() == 48,
1692 "NetEvent size changed on 64-bit; bindings hard-code 48. \
1693 If the change is intentional, bump the binding versions and \
1694 update this assertion."
1695);
1696#[cfg(target_pointer_width = "64")]
1697const _: () = assert!(
1698 std::mem::align_of::<NetEvent>() == 8,
1699 "NetEvent alignment changed on 64-bit; bindings expect 8."
1700);
1701
1702/// Poll result for C consumers.
1703#[repr(C)]
1704pub struct NetPollResult {
1705 /// Array of events. Free with `net_free_poll_result`.
1706 pub events: *mut NetEvent,
1707 /// Number of events in the array.
1708 pub count: usize,
1709 /// Cursor for the next poll (null-terminated). NULL if no more.
1710 pub next_id: *mut c_char,
1711 /// 1 if more events are available, 0 otherwise.
1712 pub has_more: c_int,
1713}
1714
1715/// Stats for C consumers.
1716#[repr(C)]
1717pub struct NetStats {
1718 /// Total events ingested.
1719 pub events_ingested: u64,
1720 /// Events dropped due to backpressure.
1721 pub events_dropped: u64,
1722 /// Batches dispatched to the adapter.
1723 pub batches_dispatched: u64,
1724}
1725
1726/// Ingest raw JSON with structured receipt.
1727#[unsafe(no_mangle)]
1728pub unsafe extern "C" fn net_ingest_raw_ex(
1729 handle: *mut NetHandle,
1730 json: *const c_char,
1731 len: usize,
1732 out: *mut NetReceipt,
1733) -> c_int {
1734 if !handle_is_valid(handle) || json.is_null() {
1735 return NetError::NullPointer.into();
1736 }
1737
1738 let handle = unsafe { &*handle };
1739 let _guard = match enter_ffi_op(handle) {
1740 Ok(g) => g,
1741 Err(err) => return err,
1742 };
1743
1744 // `slice::from_raw_parts` requires `len <= isize::MAX`.
1745 if len > isize::MAX as usize {
1746 return NetError::InvalidJson.into();
1747 }
1748 let json_bytes = unsafe { std::slice::from_raw_parts(json as *const u8, len) };
1749 let json_str = match std::str::from_utf8(json_bytes) {
1750 Ok(s) => s,
1751 Err(_) => return NetError::InvalidUtf8.into(),
1752 };
1753
1754 let raw = RawEvent::from_str(json_str);
1755
1756 match handle.bus.ingest_raw(raw) {
1757 Ok((shard_id, timestamp)) => {
1758 if !out.is_null() {
1759 unsafe {
1760 (*out).shard_id = shard_id;
1761 (*out).timestamp = timestamp;
1762 }
1763 }
1764 NetError::Success.into()
1765 }
1766 Err(_) => NetError::IngestionFailed.into(),
1767 }
1768}
1769
1770/// Poll events with structured result (no JSON overhead).
1771///
1772/// The caller must free the result with `net_free_poll_result`.
1773#[unsafe(no_mangle)]
1774pub unsafe extern "C" fn net_poll_ex(
1775 handle: *mut NetHandle,
1776 limit: usize,
1777 cursor: *const c_char,
1778 out: *mut NetPollResult,
1779) -> c_int {
1780 if !handle_is_valid(handle) || out.is_null() {
1781 return NetError::NullPointer.into();
1782 }
1783
1784 // Pre-validate `limit` BEFORE calling `bus.poll` — the bus
1785 // advances the consumer cursor before returning, so any
1786 // post-poll allocation failure (e.g. `Layout::array::<NetEvent>`
1787 // overflow on a pathological `count`, or `std::alloc::alloc`
1788 // returning null under memory pressure) would drop the response
1789 // and lose every event the cursor just stepped past. Reject
1790 // requests whose `count * size_of::<NetEvent>` would overflow
1791 // `isize::MAX` (the `Layout::array` cap) up front, so the
1792 // failure happens before the cursor moves.
1793 if limit > 0
1794 && (std::mem::size_of::<NetEvent>())
1795 .checked_mul(limit)
1796 .is_none_or(|v| v > isize::MAX as usize)
1797 {
1798 return NetError::IntOverflow.into();
1799 }
1800
1801 let handle = unsafe { &*handle };
1802 let _guard = match enter_ffi_op(handle) {
1803 Ok(g) => g,
1804 Err(err) => return err,
1805 };
1806
1807 let mut request = ConsumeRequest::new(limit);
1808 if !cursor.is_null() {
1809 if let Ok(s) = unsafe { CStr::from_ptr(cursor) }.to_str() {
1810 if !s.is_empty() {
1811 request = request.from(s);
1812 }
1813 }
1814 }
1815
1816 let response = match handle.runtime.block_on(handle.bus.poll(request)) {
1817 Ok(r) => r,
1818 Err(_) => return NetError::PollFailed.into(),
1819 };
1820
1821 let count = response.events.len();
1822
1823 // Allocate events array.
1824 //
1825 // Each iteration allocates two boxed byte slices via
1826 // `Vec::to_vec().into_boxed_slice()`, which panic on OOM in
1827 // the global allocator. A panic across this `extern "C"`
1828 // body is UB — under the cgo/N-API/cffi unwind model the
1829 // panic propagates into a frame that doesn't expect it. Wrap
1830 // the per-event build in `catch_unwind`, track how many
1831 // events we've fully written, and on panic / mid-loop
1832 // failure free the partial array via `free_events_array`
1833 // so neither UB nor the partial allocations leak.
1834 let events_ptr = if count > 0 {
1835 let layout = match std::alloc::Layout::array::<NetEvent>(count) {
1836 Ok(l) => l,
1837 Err(_) => return NetError::Unknown.into(),
1838 };
1839 let ptr = unsafe { std::alloc::alloc(layout) as *mut NetEvent };
1840 if ptr.is_null() {
1841 return NetError::Unknown.into();
1842 }
1843
1844 // Shared counter so the outer scope can clean up partial
1845 // writes if any iteration panics.
1846 let completed = std::cell::Cell::new(0usize);
1847 let build_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1848 for (i, event) in response.events.iter().enumerate() {
1849 let id_bytes = event.id.as_bytes().to_vec().into_boxed_slice();
1850 let id_len = id_bytes.len();
1851 let id_ptr = Box::into_raw(id_bytes) as *const c_char;
1852
1853 let raw_bytes = event.raw.to_vec().into_boxed_slice();
1854 let raw_len = raw_bytes.len();
1855 let raw_ptr = Box::into_raw(raw_bytes) as *const c_char;
1856
1857 unsafe {
1858 ptr.add(i).write(NetEvent {
1859 id: id_ptr,
1860 id_len,
1861 raw: raw_ptr,
1862 raw_len,
1863 insertion_ts: event.insertion_ts,
1864 shard_id: event.shard_id,
1865 });
1866 }
1867 completed.set(i + 1);
1868 }
1869 }));
1870 if build_result.is_err() {
1871 // A panic landed mid-loop. Free fully-written events
1872 // (those past `completed.get()` were never written, so
1873 // the inner `id`/`raw` pointers aren't valid). The
1874 // events array was allocated for `count` NetEvent
1875 // slots, so the dealloc must use that same layout.
1876 free_events_array_partial(ptr, completed.get(), count);
1877 return NetError::Unknown.into();
1878 }
1879 ptr
1880 } else {
1881 ptr::null_mut()
1882 };
1883
1884 // Leak next_id if present.
1885 let next_id_ptr = match response.next_id {
1886 Some(ref s) => match std::ffi::CString::new(s.as_str()) {
1887 Ok(c) => c.into_raw(),
1888 Err(_) => {
1889 // Free already-allocated events before returning
1890 // error. `s.as_str()` is valid UTF-8 by `String`
1891 // invariant, so this is the interior-NUL path —
1892 // an upstream cursor id that contains `\0` cannot
1893 // round-trip through a C string. Pre-fix this
1894 // returned `InvalidUtf8`, which mis-described
1895 // the cause; bindings now see the more accurate
1896 // `InteriorNul`.
1897 free_events_array(events_ptr, count);
1898 return NetError::InteriorNul.into();
1899 }
1900 },
1901 None => ptr::null_mut(),
1902 };
1903
1904 unsafe {
1905 (*out).events = events_ptr;
1906 (*out).count = count;
1907 (*out).next_id = next_id_ptr;
1908 (*out).has_more = if response.has_more { 1 } else { 0 };
1909 }
1910
1911 NetError::Success.into()
1912}
1913
1914/// Free an events array and all its id/raw allocations.
1915///
1916/// `count` is the number of fully-written events (those whose
1917/// inner `id` / `raw` boxed slices were initialized). It must
1918/// also match the `Layout::array::<NetEvent>` used at allocation
1919/// time — every existing caller writes exactly `count` events
1920/// before invoking this function. For partial-cleanup paths
1921/// (e.g. panic mid-build), use [`free_events_array_partial`].
1922fn free_events_array(events: *mut NetEvent, count: usize) {
1923 free_events_array_partial(events, count, count);
1924}
1925
1926/// Free an events array where only `walk_count` entries have
1927/// fully-initialized `id`/`raw` allocations, but the array
1928/// itself was allocated for `alloc_count` slots. Per-event
1929/// boxes are freed for `0..walk_count`; the array is then
1930/// deallocated with the original `Layout::array::<NetEvent>(alloc_count)`
1931/// to match the allocation. Used by `net_poll_ex`'s panic-mid-loop
1932/// recovery path.
1933fn free_events_array_partial(events: *mut NetEvent, walk_count: usize, alloc_count: usize) {
1934 if events.is_null() || alloc_count == 0 {
1935 return;
1936 }
1937 for i in 0..walk_count {
1938 let event = unsafe { &*events.add(i) };
1939 if !event.id.is_null() {
1940 unsafe {
1941 let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(
1942 event.id as *mut u8,
1943 event.id_len,
1944 ));
1945 }
1946 }
1947 if !event.raw.is_null() {
1948 unsafe {
1949 let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(
1950 event.raw as *mut u8,
1951 event.raw_len,
1952 ));
1953 }
1954 }
1955 }
1956 if let Ok(layout) = std::alloc::Layout::array::<NetEvent>(alloc_count) {
1957 unsafe {
1958 std::alloc::dealloc(events as *mut u8, layout);
1959 }
1960 }
1961}
1962
1963/// Free the internal allocations of a poll result returned by `net_poll_ex`.
1964///
1965/// This frees the events array (including each event's `id` and `raw` buffers)
1966/// and the `next_id` string. It does **not** free the `NetPollResult` struct
1967/// itself, which is caller-provided (typically stack-allocated or managed by
1968/// the caller).
1969#[unsafe(no_mangle)]
1970pub unsafe extern "C" fn net_free_poll_result(result: *mut NetPollResult) {
1971 if result.is_null() {
1972 return;
1973 }
1974
1975 let result = unsafe { &mut *result };
1976
1977 // Free events array and all id/raw allocations.
1978 free_events_array(result.events, result.count);
1979
1980 // Free next_id.
1981 if !result.next_id.is_null() {
1982 unsafe {
1983 drop(std::ffi::CString::from_raw(result.next_id));
1984 }
1985 }
1986
1987 // Null the fields so a second `net_free_poll_result` on the
1988 // same struct is a safe no-op rather than a double-free. The
1989 // C header's contract just says "free a poll result"; without
1990 // this clear, a defensive caller calling free twice (or two
1991 // wrappers each calling free in their destructor) would
1992 // re-`Box::from_raw` an already-freed pointer.
1993 result.events = std::ptr::null_mut();
1994 result.count = 0;
1995 result.next_id = std::ptr::null_mut();
1996 result.has_more = 0;
1997}
1998
1999/// Get stats without JSON serialization.
2000#[unsafe(no_mangle)]
2001pub unsafe extern "C" fn net_stats_ex(handle: *mut NetHandle, out: *mut NetStats) -> c_int {
2002 if !handle_is_valid(handle) || out.is_null() {
2003 return NetError::NullPointer.into();
2004 }
2005
2006 let handle = unsafe { &*handle };
2007 let _guard = match enter_ffi_op(handle) {
2008 Ok(g) => g,
2009 Err(err) => return err,
2010 };
2011 let stats = handle.bus.stats();
2012
2013 unsafe {
2014 (*out).events_ingested = stats
2015 .events_ingested
2016 .load(std::sync::atomic::Ordering::Relaxed);
2017 (*out).events_dropped = stats
2018 .events_dropped
2019 .load(std::sync::atomic::Ordering::Relaxed);
2020 (*out).batches_dispatched = stats
2021 .batches_dispatched
2022 .load(std::sync::atomic::Ordering::Relaxed);
2023 }
2024
2025 NetError::Success.into()
2026}
2027
2028#[cfg(test)]
2029mod tests {
2030 use super::*;
2031
2032 /// Build a `ConsumeResponse` with the given raw event payloads —
2033 /// shared fixture for the PERF_AUDIT §1.6 envelope tests.
2034 fn poll_response_with_raw(raws: Vec<bytes::Bytes>) -> ConsumeResponse {
2035 let events = raws
2036 .into_iter()
2037 .enumerate()
2038 .map(|(i, raw)| crate::event::StoredEvent::new(format!("ev-{i}"), raw, i as u64, 0))
2039 .collect();
2040 ConsumeResponse {
2041 events,
2042 next_id: Some("cursor-42".to_string()),
2043 has_more: true,
2044 truncated_at_per_shard_cap: false,
2045 stalled_shards: Vec::new(),
2046 failed_shards: Vec::new(),
2047 }
2048 }
2049
2050 /// PERF_AUDIT §1.6 — the poll envelope must splice valid event
2051 /// bytes through verbatim (no `Value` round-trip). Key order and
2052 /// number formatting are the two observable fingerprints of the
2053 /// old parse→re-serialize path: a BTreeMap-backed `Value` would
2054 /// re-order `z` before `a` alphabetically and could rewrite
2055 /// `1.0`. The envelope's shape fields must match the pre-fix
2056 /// `json!` output: same keys, same value types.
2057 #[test]
2058 fn poll_envelope_splices_raw_event_bytes_verbatim() {
2059 let raw = br#"{"z":1.0,"a":2}"#;
2060 let response = poll_response_with_raw(vec![bytes::Bytes::from_static(raw)]);
2061 let json = build_poll_envelope_json(&response).unwrap();
2062
2063 assert!(
2064 json.contains(std::str::from_utf8(raw).unwrap()),
2065 "event bytes must appear verbatim in the envelope; got {json}"
2066 );
2067 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
2068 assert_eq!(v["count"], 1);
2069 assert_eq!(v["parse_errors"], 0);
2070 assert_eq!(v["has_more"], true);
2071 assert_eq!(v["next_id"], "cursor-42");
2072 assert_eq!(v["events"][0]["z"], 1.0);
2073 assert_eq!(v["events"][0]["a"], 2);
2074 }
2075
2076 /// PERF_AUDIT §1.6 — invalid JSON (but valid UTF-8) must fail
2077 /// closed into the string fallback: the event shows up as a JSON
2078 /// string (caller can see what got skipped), `parse_errors`
2079 /// counts it, and the envelope is still valid JSON. No panic, no
2080 /// silent loss.
2081 #[test]
2082 fn poll_envelope_invalid_json_falls_back_to_string() {
2083 let response = poll_response_with_raw(vec![
2084 bytes::Bytes::from_static(br#"{"ok":true}"#),
2085 bytes::Bytes::from_static(b"not valid json"),
2086 ]);
2087 let json = build_poll_envelope_json(&response).unwrap();
2088 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
2089
2090 assert_eq!(v["count"], 2, "fallback event still counts toward count");
2091 assert_eq!(v["parse_errors"], 1);
2092 assert_eq!(v["events"][0]["ok"], true);
2093 assert_eq!(
2094 v["events"][1], "not valid json",
2095 "invalid-JSON event must be emitted as a JSON string"
2096 );
2097 }
2098
2099 /// PERF_AUDIT §1.6 — invalid UTF-8 can't be emitted even as a
2100 /// string: the event is skipped from `events` (so `count`
2101 /// excludes it) while `parse_errors` still records it. Mirrors
2102 /// the pre-fix `e.parse()` + `raw_str()` behavior exactly.
2103 #[test]
2104 fn poll_envelope_invalid_utf8_is_skipped_but_counted_as_error() {
2105 let response = poll_response_with_raw(vec![
2106 bytes::Bytes::from_static(&[0xFF, 0xFE, 0xFD]),
2107 bytes::Bytes::from_static(br#"{"ok":1}"#),
2108 ]);
2109 let json = build_poll_envelope_json(&response).unwrap();
2110 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
2111
2112 assert_eq!(
2113 v["count"], 1,
2114 "invalid-UTF-8 event is skipped from the events array"
2115 );
2116 assert_eq!(v["parse_errors"], 1);
2117 assert_eq!(v["events"].as_array().unwrap().len(), 1);
2118 assert_eq!(v["events"][0]["ok"], 1);
2119 }
2120
2121 #[test]
2122 fn test_parse_config_valid() {
2123 let config = parse_config_json(r#"{"num_shards": 8}"#);
2124 assert!(config.is_some());
2125 }
2126
2127 #[test]
2128 fn test_parse_config_num_shards_overflow() {
2129 // u16::MAX is 65535, so 65536 should fail
2130 let config = parse_config_json(r#"{"num_shards": 65536}"#);
2131 assert!(
2132 config.is_none(),
2133 "num_shards exceeding u16::MAX should fail"
2134 );
2135
2136 // Much larger value should also fail
2137 let config = parse_config_json(r#"{"num_shards": 100000}"#);
2138 assert!(
2139 config.is_none(),
2140 "num_shards exceeding u16::MAX should fail"
2141 );
2142 }
2143
2144 #[test]
2145 fn test_parse_config_num_shards_max_valid() {
2146 // u16::MAX (65535) should be valid
2147 let config = parse_config_json(r#"{"num_shards": 65535}"#);
2148 assert!(config.is_some(), "num_shards at u16::MAX should be valid");
2149 }
2150
2151 #[test]
2152 fn test_parse_config_invalid_json() {
2153 let config = parse_config_json(r#"{"num_shards": invalid}"#);
2154 assert!(config.is_none());
2155 }
2156
2157 #[test]
2158 fn test_parse_config_empty() {
2159 let config = parse_config_json(r#"{}"#);
2160 assert!(config.is_some(), "empty config should use defaults");
2161 }
2162
2163 /// Pin: known `backpressure_mode` strings round-trip; an
2164 /// unknown value (typo) is rejected with `None`, not silently
2165 /// downgraded to `DropNewest`. Pre-fix a deploy-time typo
2166 /// like `"DropOldset"` swapped the operator's intended
2167 /// durability for `DropNewest` with no diagnostic.
2168 #[test]
2169 fn parse_config_rejects_unknown_backpressure_mode() {
2170 // Known values still parse.
2171 for s in [
2172 "DropNewest",
2173 "drop_newest",
2174 "DropOldest",
2175 "drop_oldest",
2176 "FailProducer",
2177 "fail_producer",
2178 ] {
2179 let cfg = parse_config_json(&format!(r#"{{"backpressure_mode": "{}"}}"#, s));
2180 assert!(cfg.is_some(), "known mode `{}` must parse", s);
2181 }
2182
2183 // Typos must fail.
2184 for s in ["DropOldset", "FailProduce", "drop_oldst", "garbage", ""] {
2185 let cfg = parse_config_json(&format!(r#"{{"backpressure_mode": "{}"}}"#, s));
2186 assert!(
2187 cfg.is_none(),
2188 "unknown mode `{}` must reject (pre-fix this silently \
2189 fell through to DropNewest)",
2190 s,
2191 );
2192 }
2193
2194 // Wrong JSON type also fails — pre-fix this hit the
2195 // `and_then(|v| v.as_str())` short-circuit and was
2196 // ignored entirely.
2197 let cfg = parse_config_json(r#"{"backpressure_mode": 42}"#);
2198 assert!(
2199 cfg.is_none(),
2200 "non-string non-object backpressure_mode must reject"
2201 );
2202 let cfg = parse_config_json(r#"{"backpressure_mode": true}"#);
2203 assert!(cfg.is_none(), "boolean backpressure_mode must reject");
2204 }
2205
2206 /// Pin: the `Sample { rate }` mode is reachable from JSON
2207 /// via `{"backpressure_mode": {"Sample": {"rate": N}}}`,
2208 /// and a zero rate is rejected (validator already rejects
2209 /// it; the parser must too, so the surface is consistent).
2210 #[test]
2211 fn parse_config_supports_sample_mode_with_validation() {
2212 let cfg = parse_config_json(r#"{"backpressure_mode": {"Sample": {"rate": 10}}}"#);
2213 assert!(cfg.is_some(), "Sample with non-zero rate must parse");
2214
2215 let cfg = parse_config_json(r#"{"backpressure_mode": {"Sample": {"rate": 0}}}"#);
2216 assert!(cfg.is_none(), "Sample with rate=0 must reject");
2217
2218 let cfg = parse_config_json(r#"{"backpressure_mode": {"Sample": {}}}"#);
2219 assert!(cfg.is_none(), "Sample missing rate must reject");
2220 }
2221
2222 // Regression: the Go binding's `Poll(limit, cursor)` serializes a
2223 // `"cursor"` field that the FFI JSON path previously ignored —
2224 // cross-shard pagination silently broke. `parse_poll_request_json`
2225 // must round-trip the cursor into `ConsumeRequest.from_id`.
2226 #[test]
2227 fn test_parse_poll_request_preserves_cursor() {
2228 let req = parse_poll_request_json(r#"{"limit": 50, "cursor": "abc:123"}"#).unwrap();
2229 assert_eq!(req.limit, 50);
2230 assert_eq!(req.from_id.as_deref(), Some("abc:123"));
2231 }
2232
2233 #[test]
2234 fn test_parse_poll_request_no_cursor_defaults_to_none() {
2235 let req = parse_poll_request_json(r#"{"limit": 10}"#).unwrap();
2236 assert_eq!(req.limit, 10);
2237 assert_eq!(req.from_id, None);
2238 }
2239
2240 #[test]
2241 fn test_parse_poll_request_empty_uses_default_limit() {
2242 let req = parse_poll_request_json(r#"{}"#).unwrap();
2243 assert_eq!(req.limit, 100);
2244 assert_eq!(req.from_id, None);
2245 }
2246
2247 // Regression: a wrong-typed `"limit"` previously hit
2248 // `.as_u64().unwrap_or(100)` and silently defaulted. Caller bugs
2249 // (e.g. sending a string or a negative number) must surface as
2250 // `InvalidJson` instead.
2251 #[test]
2252 fn test_parse_poll_request_wrong_type_limit_errors() {
2253 let err = parse_poll_request_json(r#"{"limit": "50"}"#).unwrap_err();
2254 assert_eq!(err, c_int::from(NetError::InvalidJson));
2255 let err = parse_poll_request_json(r#"{"limit": -1}"#).unwrap_err();
2256 assert_eq!(err, c_int::from(NetError::InvalidJson));
2257 }
2258
2259 #[test]
2260 fn test_parse_poll_request_wrong_type_cursor_errors() {
2261 let err = parse_poll_request_json(r#"{"cursor": 123}"#).unwrap_err();
2262 assert_eq!(err, c_int::from(NetError::InvalidJson));
2263 }
2264
2265 #[test]
2266 fn test_parse_poll_request_null_fields_use_defaults() {
2267 let req = parse_poll_request_json(r#"{"limit": null, "cursor": null}"#).unwrap();
2268 assert_eq!(req.limit, 100);
2269 assert_eq!(req.from_id, None);
2270 }
2271
2272 /// `usize::MAX` is always a valid usize regardless of target
2273 /// pointer width, so it must parse successfully on both 32- and
2274 /// 64-bit builds. This pins the boundary case.
2275 #[test]
2276 fn test_parse_poll_request_limit_at_usize_max() {
2277 let json = format!(r#"{{"limit": {}}}"#, usize::MAX);
2278 let req = parse_poll_request_json(&json).unwrap();
2279 assert_eq!(req.limit, usize::MAX);
2280 }
2281
2282 /// Regression: `as usize` silently truncates on 32-bit targets
2283 /// for `u64` values above `usize::MAX`. The parser must return
2284 /// `InvalidJson` instead of wrapping. We only run this on 32-bit
2285 /// targets because on 64-bit `usize::MAX == u64::MAX`, leaving
2286 /// nothing that fits in u64 but not usize.
2287 #[cfg(target_pointer_width = "32")]
2288 #[test]
2289 fn test_parse_poll_request_limit_overflows_usize_on_32bit() {
2290 // 2^33 — fits in u64, but exceeds usize::MAX on a 32-bit build.
2291 let err = parse_poll_request_json(r#"{"limit": 8589934592}"#).unwrap_err();
2292 assert_eq!(err, c_int::from(NetError::InvalidJson));
2293 }
2294
2295 /// CR-22: pin parity between the Rust-side `NetError` enum and
2296 /// the two C-header copies. The Rust enum is the source of
2297 /// truth; C / Go consumers `errors.Is` against the named codes.
2298 /// Pre-CR-22 the headers were missing `-9` (IntOverflow) and
2299 /// `-10` (MismatchedHandles); a consumer receiving those values
2300 /// would fall into the unknown-code branch and lose actionable
2301 /// distinction.
2302 ///
2303 /// We extract every integer literal that appears as the
2304 /// right-hand side of an `= ` token in the file and check
2305 /// that each Rust-side value is present in BOTH headers. The
2306 /// test does NOT verify symbolic names; the sealing
2307 /// constraint is the numeric value alone.
2308 ///
2309 /// Both `include_str!` paths point inside `net/crates/net/`.
2310 /// `include/net.go.h` is a manually-synced mirror of the
2311 /// repo-root `go/net.h`. Reaching outside the crate root
2312 /// (`include_str!("../../../../../go/net.h")`) breaks
2313 /// `cargo publish` and any out-of-repo vendoring of this
2314 /// crate, so the in-crate copy is the supported source. A
2315 /// drift between the two surfaces here as a parity-test
2316 /// failure: one of them will be missing the new value.
2317 #[test]
2318 fn cr22_c_header_parity_with_rust_neterror() {
2319 let primary = include_str!("../../include/net.h");
2320 let go_copy = include_str!("../../include/net.go.h");
2321
2322 // The Rust enum's full set of values (mirrors `pub enum
2323 // NetError` above). When a new variant is added in the
2324 // Rust source, this list — AND both headers — must be
2325 // updated together. The asserts that follow then catch a
2326 // missing header update at the next CI run.
2327 let rust_values: &[i32] = &[0, -1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -99];
2328
2329 // Pull every numeric literal that looks like an enum-value
2330 // assignment (`= <number>` followed by `,` or whitespace).
2331 // Whitespace-tolerant: skips `= 0`, `= 0`, `= -10`, etc.
2332 fn extract_assigned_values(src: &str) -> Vec<i32> {
2333 let mut out = Vec::new();
2334 let mut chars = src.chars().peekable();
2335 while let Some(c) = chars.next() {
2336 if c != '=' {
2337 continue;
2338 }
2339 // Skip whitespace.
2340 while let Some(&peek) = chars.peek() {
2341 if peek == ' ' || peek == '\t' {
2342 chars.next();
2343 } else {
2344 break;
2345 }
2346 }
2347 // Optional sign.
2348 let mut buf = String::new();
2349 if let Some(&peek) = chars.peek() {
2350 if peek == '-' || peek == '+' {
2351 buf.push(peek);
2352 chars.next();
2353 }
2354 }
2355 // Digits.
2356 let mut had_digit = false;
2357 while let Some(&peek) = chars.peek() {
2358 if peek.is_ascii_digit() {
2359 buf.push(peek);
2360 chars.next();
2361 had_digit = true;
2362 } else {
2363 break;
2364 }
2365 }
2366 if had_digit {
2367 if let Ok(v) = buf.parse::<i32>() {
2368 out.push(v);
2369 }
2370 }
2371 }
2372 out
2373 }
2374
2375 let primary_vals = extract_assigned_values(primary);
2376 let go_vals = extract_assigned_values(go_copy);
2377
2378 for &v in rust_values {
2379 assert!(
2380 primary_vals.contains(&v),
2381 "CR-22 regression: include/net.h is missing the value {} \
2382 (Rust NetError defines it). Add the matching `NET_ERR_*` \
2383 enumerator before merging.",
2384 v
2385 );
2386 assert!(
2387 go_vals.contains(&v),
2388 "CR-22 regression: bindings/go/net/net.h is missing the value {} \
2389 (Rust NetError defines it).",
2390 v
2391 );
2392 }
2393 }
2394
2395 /// CR-5: pin that `examples/capability.c` does not double-include
2396 /// `net.h` and `net.go.h`. Both files use the `NET_SDK_H` include
2397 /// guard, so when both are included in one TU the second is
2398 /// silently skipped — every `net_validate_capabilities` /
2399 /// `net_predicate_*` call the example makes becomes an
2400 /// implicit-declaration error on GCC 14+/Clang 16+, and a silent
2401 /// `int`-return miscompile on older toolchains. The deeper fix
2402 /// (renaming one guard so they compose cleanly) is tracked as
2403 /// CR-28; this test catches the example-level regression.
2404 #[test]
2405 fn cr5_example_does_not_double_include_net_headers() {
2406 let example = include_str!("../../examples/capability.c");
2407 let net_h_included = example.contains("#include \"../include/net.h\"");
2408 let net_go_h_included = example.contains("#include \"../include/net.go.h\"");
2409 assert!(
2410 net_go_h_included,
2411 "examples/capability.c must include net.go.h to declare \
2412 net_validate_capabilities + net_predicate_* symbols"
2413 );
2414 assert!(
2415 !net_h_included,
2416 "examples/capability.c must NOT also include net.h: \
2417 both headers share the NET_SDK_H guard, so the second \
2418 include is silently skipped, leaving the example's \
2419 net_predicate_* calls implicitly declared. Drop the \
2420 redundant include — net.go.h is a superset."
2421 );
2422 }
2423
2424 /// `handle_is_valid` rejects null and any pointer not aligned for
2425 /// `NetHandle`. A foreign caller producing a misaligned pointer
2426 /// (e.g. via an over-eager `void *` cast on a packed struct) hits
2427 /// `&*handle` UB before any other check fires; this gate is the
2428 /// pre-deref discriminator.
2429 #[test]
2430 fn handle_is_valid_rejects_null_and_misaligned() {
2431 // Null is rejected.
2432 assert!(
2433 !handle_is_valid(std::ptr::null::<NetHandle>()),
2434 "null pointer must not be considered a valid handle"
2435 );
2436
2437 // Aligned but non-null is accepted (we use a small backing
2438 // buffer to materialize a pointer without dereferencing it).
2439 // `align_of::<NetHandle>()` is the alignment we must match.
2440 let align = std::mem::align_of::<NetHandle>();
2441 let buf = vec![0u8; align * 2];
2442 let base = buf.as_ptr() as usize;
2443 let aligned = (base + align - 1) & !(align - 1);
2444 let aligned_ptr = aligned as *const NetHandle;
2445 assert!(
2446 handle_is_valid(aligned_ptr),
2447 "aligned non-null pointer must validate (align={align}, ptr={aligned_ptr:p})"
2448 );
2449
2450 // A pointer one byte past `aligned_ptr` is misaligned for any
2451 // type with align > 1, and `NetHandle` (containing `AtomicU32`,
2452 // `AtomicBool`, ManuallyDrop'd EventBus + Runtime) easily
2453 // exceeds 1.
2454 if align > 1 {
2455 let misaligned_ptr = (aligned + 1) as *const NetHandle;
2456 assert!(
2457 !handle_is_valid(misaligned_ptr),
2458 "misaligned pointer must be rejected (align={align}, ptr={misaligned_ptr:p})"
2459 );
2460 }
2461 }
2462
2463 /// Pin: zero values for `heartbeat_interval_ms` and
2464 /// `session_timeout_ms` must reject the entire config (parser
2465 /// returns `None`). Pre-fix the parser threaded `0` through
2466 /// to `Duration::from_millis(0)`, which on the Net adapter's
2467 /// heartbeat path results in a busy-loop that pegs a CPU and
2468 /// produces no diagnostic — the FFI caller saw a successful
2469 /// `net_init` followed by a hung daemon. The validator-level
2470 /// guard for cooldown / metrics_window has no equivalent on
2471 /// the Net-adapter side, so the parser is the only place that
2472 /// can refuse the build.
2473 #[cfg(feature = "net")]
2474 #[test]
2475 fn parse_config_rejects_zero_heartbeat_and_session_timeout() {
2476 // 32-byte hex strings (64 chars) so `hex::decode` produces
2477 // exactly the [u8; 32] the parser requires for `psk` and
2478 // `peer_public_key`.
2479 let psk = "0".repeat(64);
2480 let peer_pk = "1".repeat(64);
2481
2482 // Sanity: a config with both fields *non-zero* must parse
2483 // successfully — proves the rejection in the negative
2484 // cases below is caused by the zero, not a missing
2485 // required field on the surrounding `net` block.
2486 let baseline = format!(
2487 r#"{{"net":{{"bind_addr":"127.0.0.1:9000","peer_addr":"127.0.0.1:9001",
2488 "psk":"{psk}","peer_public_key":"{peer_pk}",
2489 "heartbeat_interval_ms":1000,"session_timeout_ms":30000}}}}"#
2490 );
2491 assert!(
2492 parse_config_json(&baseline).is_some(),
2493 "baseline net config with non-zero heartbeat/session_timeout must parse"
2494 );
2495
2496 // heartbeat_interval_ms = 0 → reject.
2497 let zero_hb = format!(
2498 r#"{{"net":{{"bind_addr":"127.0.0.1:9000","peer_addr":"127.0.0.1:9001",
2499 "psk":"{psk}","peer_public_key":"{peer_pk}",
2500 "heartbeat_interval_ms":0,"session_timeout_ms":30000}}}}"#
2501 );
2502 assert!(
2503 parse_config_json(&zero_hb).is_none(),
2504 "heartbeat_interval_ms=0 must reject (pre-fix this produced a CPU-pegging busy loop)"
2505 );
2506
2507 // session_timeout_ms = 0 → reject.
2508 let zero_to = format!(
2509 r#"{{"net":{{"bind_addr":"127.0.0.1:9000","peer_addr":"127.0.0.1:9001",
2510 "psk":"{psk}","peer_public_key":"{peer_pk}",
2511 "heartbeat_interval_ms":1000,"session_timeout_ms":0}}}}"#
2512 );
2513 assert!(
2514 parse_config_json(&zero_to).is_none(),
2515 "session_timeout_ms=0 must reject"
2516 );
2517 }
2518}