Skip to main content

net/ffi/
blob.rs

1//! C FFI for Dataforts Phase 3 blob storage.
2//!
3//! Exposes:
4//!
5//! - `net_blob_register_fs_adapter` / `net_blob_unregister_adapter` —
6//!   registry lifecycle for a Rust-backed FileSystemAdapter.
7//! - `net_blob_adapter_registered` — probe.
8//! - `net_blob_publish` — content → encoded BlobRef bytes (caller
9//!   frees).
10//! - `net_blob_resolve` — payload bytes → resolved content (caller
11//!   frees).
12//!
13//! Returned buffers are heap-owned by Rust and MUST be freed via
14//! `net_blob_free_buffer`. Errors use the same `c_int` discipline
15//! as the rest of the FFI surface; the blob-specific extended
16//! codes are in the `-110..` range to stay below the cortex
17//! surface's `-100..-109` band.
18//!
19//! # Safety
20//!
21//! Every entry point is `unsafe extern "C"` and inherits the same
22//! caller-side contract as the rest of the FFI surface (see
23//! `ffi/mod.rs` and `include/net.h`): valid + aligned pointers,
24//! opaque handles produced by this crate's matching constructor
25//! (`Box::into_raw` inside the FFI surface — foreign-allocated
26//! pointers will UB when consumed by `Box::from_raw`),
27//! NUL-terminated UTF-8 strings, accurate buffer/length pairs,
28//! out-parameter pointers writable for the call's lifetime, and
29//! Rust-allocated buffers freed via `net_blob_free_buffer`.
30#![allow(clippy::missing_safety_doc)]
31#![expect(
32    clippy::undocumented_unsafe_blocks,
33    reason = "module-wide FFI safety contract documented in the # Safety preamble above"
34)]
35#![expect(
36    clippy::multiple_unsafe_ops_per_block,
37    reason = "FFI entry points routinely deref + write to multiple out-parameter fields under the same caller contract"
38)]
39
40use std::ffi::{c_char, c_int, CStr};
41use std::os::raw::c_void;
42use std::path::PathBuf;
43use std::ptr;
44use std::sync::Arc;
45
46use tokio::runtime::Runtime;
47
48#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
49use crate::adapter::net::behavior::TopologyScope;
50use crate::adapter::net::dataforts::{
51    global_blob_adapter_registry, publish_blob, resolve_payload, BlobAdapter,
52    BlobError as InnerBlobError, FileSystemAdapter,
53};
54// `InnerBlobRef` is only decoded inside the `MeshBlobAdapter`
55// store/fetch/exists entry points, which themselves require the
56// `dataforts + netdb + redex-disk` triple. Without the triple,
57// the import is unused and `-D warnings` fails CI.
58#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
59use crate::adapter::net::dataforts::{
60    BlobRef as InnerBlobRef, MeshBlobAdapter as InnerMeshBlobAdapter,
61    OverflowConfig as InnerOverflowConfig,
62};
63
64use super::NetError;
65
66/// BlobRef decode failed (truncated / unsupported version).
67pub const NET_ERR_BLOB_DECODE: c_int = -110;
68/// Adapter registry: adapter id already registered.
69pub const NET_ERR_BLOB_DUPLICATE_ID: c_int = -111;
70/// Adapter registry: adapter id not found.
71pub const NET_ERR_BLOB_NOT_REGISTERED: c_int = -112;
72/// Adapter returned `NotFound` for the requested URI.
73pub const NET_ERR_BLOB_NOT_FOUND: c_int = -113;
74/// Substrate-side hash verification rejected the fetched bytes.
75pub const NET_ERR_BLOB_HASH_MISMATCH: c_int = -114;
76/// Adapter returned a non-classifiable backend error.
77pub const NET_ERR_BLOB_BACKEND: c_int = -115;
78/// `BlobRef::UnsupportedScheme` — used for both "unknown URI scheme"
79/// and "channel pointing at an unregistered adapter id".
80pub const NET_ERR_BLOB_UNSUPPORTED_SCHEME: c_int = -116;
81/// Channel has no `blob_adapter_id` configured.
82pub const NET_ERR_BLOB_ADAPTER_NOT_CONFIGURED: c_int = -118;
83/// Configured `blob_adapter_id` is not in the registry.
84pub const NET_ERR_BLOB_ADAPTER_NOT_REGISTERED: c_int = -119;
85/// Panic surfaced from inside a user-installed adapter callback
86/// (or anywhere on the FFI body). The substrate catches it with
87/// `catch_unwind` and reports this code rather than unwinding
88/// across the FFI boundary (which is undefined behaviour for the
89/// C / cgo / Python callers).
90pub const NET_ERR_BLOB_PANIC: c_int = -117;
91/// Auth gate rejected the blob op: AuthGuard ACL miss, or no
92/// guard configured for an op that requires one. Distinct from
93/// `NET_ERR_BLOB_BACKEND` so bindings can route 401-style hits
94/// without parsing the error string.
95pub const NET_ERR_BLOB_UNAUTHORIZED: c_int = -120;
96
97fn runtime() -> &'static Arc<Runtime> {
98    use std::sync::OnceLock;
99    static RT: OnceLock<Arc<Runtime>> = OnceLock::new();
100    RT.get_or_init(|| {
101        match tokio::runtime::Builder::new_multi_thread()
102            .enable_all()
103            .build()
104        {
105            Ok(rt) => Arc::new(rt),
106            Err(e) => {
107                eprintln!("FATAL: blob FFI tokio runtime build failure ({e:?}); aborting");
108                std::process::abort();
109            }
110        }
111    })
112}
113
114fn block_on<F: std::future::Future>(future: F) -> F::Output {
115    if tokio::runtime::Handle::try_current().is_ok() {
116        eprintln!("FATAL: blob FFI called from inside a tokio runtime context; aborting");
117        std::process::abort();
118    }
119    runtime().block_on(future)
120}
121
122unsafe fn c_str_to_owned(p: *const c_char) -> Option<String> {
123    if p.is_null() {
124        return None;
125    }
126    CStr::from_ptr(p).to_str().ok().map(|s| s.to_owned())
127}
128
129fn err_to_code(e: &InnerBlobError) -> c_int {
130    match e {
131        InnerBlobError::HashMismatch { .. } => NET_ERR_BLOB_HASH_MISMATCH,
132        InnerBlobError::NotFound(_) => NET_ERR_BLOB_NOT_FOUND,
133        InnerBlobError::Backend(_) => NET_ERR_BLOB_BACKEND,
134        InnerBlobError::Cancelled => NET_ERR_BLOB_BACKEND,
135        InnerBlobError::UnsupportedScheme(_) => NET_ERR_BLOB_UNSUPPORTED_SCHEME,
136        InnerBlobError::UnsupportedVersion(_) => NET_ERR_BLOB_DECODE,
137        InnerBlobError::Decode(_) => NET_ERR_BLOB_DECODE,
138        InnerBlobError::AdapterNotConfigured => NET_ERR_BLOB_ADAPTER_NOT_CONFIGURED,
139        InnerBlobError::AdapterNotRegistered(_) => NET_ERR_BLOB_ADAPTER_NOT_REGISTERED,
140        InnerBlobError::Unauthorized(_) => NET_ERR_BLOB_UNAUTHORIZED,
141        // `ShortChunk` is a size disagreement (backend truncated
142        // the chunk); route through `NET_ERR_BLOB_BACKEND` rather
143        // than `NET_ERR_BLOB_HASH_MISMATCH` so retry logic that
144        // distinguishes truncation from content divergence keeps
145        // the existing classifier intact. A dedicated code can be
146        // added later when a binding consumer needs to fork on the
147        // distinction at the FFI surface.
148        InnerBlobError::ShortChunk { .. } => NET_ERR_BLOB_BACKEND,
149    }
150}
151
152/// Register a filesystem-backed BlobAdapter under `adapter_id`.
153/// Both `adapter_id` and `root` are null-terminated UTF-8 strings.
154/// Returns `0` on success, `NET_ERR_BLOB_DUPLICATE_ID` if the id
155/// already exists, or `NetError::InvalidUtf8` / `NullPointer` for
156/// malformed input.
157///
158/// # Safety
159/// `adapter_id` and `root` must each point to a valid null-terminated
160/// UTF-8 byte sequence and remain valid for the duration of this
161/// call. Either may be null, in which case the function returns
162/// `NetError::InvalidUtf8`.
163#[unsafe(no_mangle)]
164pub unsafe extern "C" fn net_blob_register_fs_adapter(
165    adapter_id: *const c_char,
166    root: *const c_char,
167) -> c_int {
168    let id = match c_str_to_owned(adapter_id) {
169        Some(s) => s,
170        None => return NetError::InvalidUtf8.into(),
171    };
172    let root = match c_str_to_owned(root) {
173        Some(s) => s,
174        None => return NetError::InvalidUtf8.into(),
175    };
176    let adapter: Arc<dyn BlobAdapter> =
177        Arc::new(FileSystemAdapter::new(id.clone(), PathBuf::from(root)));
178    match global_blob_adapter_registry().register(adapter) {
179        Ok(()) => 0,
180        Err(_) => NET_ERR_BLOB_DUPLICATE_ID,
181    }
182}
183
184/// Remove an adapter registration. Returns `1` if an adapter was
185/// removed, `0` if no adapter was registered under that id.
186///
187/// # Safety
188/// `adapter_id` must point to a valid null-terminated UTF-8 byte
189/// sequence and remain valid for the call. Null returns
190/// `NetError::InvalidUtf8`.
191#[unsafe(no_mangle)]
192pub unsafe extern "C" fn net_blob_unregister_adapter(adapter_id: *const c_char) -> c_int {
193    let id = match c_str_to_owned(adapter_id) {
194        Some(s) => s,
195        None => return NetError::InvalidUtf8.into(),
196    };
197    if global_blob_adapter_registry().unregister(&id).is_some() {
198        1
199    } else {
200        0
201    }
202}
203
204/// Returns `1` if `adapter_id` resolves to a registered adapter,
205/// `0` otherwise.
206///
207/// # Safety
208/// `adapter_id` must point to a valid null-terminated UTF-8 byte
209/// sequence and remain valid for the call.
210#[unsafe(no_mangle)]
211pub unsafe extern "C" fn net_blob_adapter_registered(adapter_id: *const c_char) -> c_int {
212    let id = match c_str_to_owned(adapter_id) {
213        Some(s) => s,
214        None => return NetError::InvalidUtf8.into(),
215    };
216    if global_blob_adapter_registry().get(&id).is_some() {
217        1
218    } else {
219        0
220    }
221}
222
223/// Publish `data` (len `data_len` bytes) to the adapter registered
224/// under `adapter_id`. On success returns `0` and writes a freshly-
225/// allocated Rust-owned buffer pointer into `*out_payload` /
226/// `*out_payload_len` containing the wire-encoded BlobRef. Caller
227/// MUST free via [`net_blob_free_buffer`].
228///
229/// On error returns a negative code and leaves the out-params at
230/// `(null, 0)`.
231///
232/// # Safety
233/// - `adapter_id` and `uri` must each point to a valid null-
234///   terminated UTF-8 byte sequence.
235/// - `data` must point to a readable region of at least `data_len`
236///   bytes (or be null when `data_len == 0`).
237/// - `out_payload` and `out_payload_len` must each point to writable
238///   `*mut u8` / `usize` storage; the function writes through both.
239#[unsafe(no_mangle)]
240pub unsafe extern "C" fn net_blob_publish(
241    adapter_id: *const c_char,
242    uri: *const c_char,
243    data: *const u8,
244    data_len: usize,
245    out_payload: *mut *mut u8,
246    out_payload_len: *mut usize,
247) -> c_int {
248    if out_payload.is_null() || out_payload_len.is_null() {
249        return NetError::NullPointer.into();
250    }
251    *out_payload = ptr::null_mut();
252    *out_payload_len = 0;
253
254    let id = match c_str_to_owned(adapter_id) {
255        Some(s) => s,
256        None => return NetError::InvalidUtf8.into(),
257    };
258    let uri = match c_str_to_owned(uri) {
259        Some(s) => s,
260        None => return NetError::InvalidUtf8.into(),
261    };
262    if data.is_null() && data_len > 0 {
263        return NetError::NullPointer.into();
264    }
265    // `slice::from_raw_parts` requires `len <= isize::MAX`.
266    if data_len > isize::MAX as usize {
267        return NetError::InvalidJson.into();
268    }
269    let data_slice = if data_len == 0 {
270        &[][..]
271    } else {
272        std::slice::from_raw_parts(data, data_len)
273    };
274
275    let adapter = match global_blob_adapter_registry().get(&id) {
276        Some(a) => a,
277        None => return NET_ERR_BLOB_NOT_REGISTERED,
278    };
279    // Wrap the body in catch_unwind so a panic in a user-
280    // installed adapter callback (or anywhere downstream) cannot
281    // unwind across the FFI boundary into the C / cgo / Python
282    // caller — that's undefined behaviour.
283    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
284        block_on(async move { publish_blob(adapter.as_ref(), uri, data_slice).await })
285    }));
286    let bytes = match result {
287        Ok(Ok(b)) => b,
288        Ok(Err(e)) => return err_to_code(&e),
289        Err(_) => return NET_ERR_BLOB_PANIC,
290    };
291
292    write_bytes_out(&bytes, out_payload, out_payload_len)
293}
294
295/// Resolve a payload to its content bytes. Inline payloads round-
296/// trip; encoded-BlobRef payloads fetch + verify through the
297/// adapter registered under `adapter_id`.
298///
299/// Returns `0` and writes a freshly-allocated Rust-owned buffer
300/// into `*out_content` / `*out_content_len`. Caller MUST free via
301/// [`net_blob_free_buffer`]. On error returns a negative code and
302/// leaves the out-params at `(null, 0)`.
303///
304/// # Safety
305/// - `adapter_id` must point to a valid null-terminated UTF-8 byte
306///   sequence.
307/// - `payload` must point to a readable region of at least
308///   `payload_len` bytes (or be null when `payload_len == 0`).
309/// - `out_content` and `out_content_len` must each point to writable
310///   `*mut u8` / `usize` storage.
311#[unsafe(no_mangle)]
312pub unsafe extern "C" fn net_blob_resolve(
313    adapter_id: *const c_char,
314    payload: *const u8,
315    payload_len: usize,
316    out_content: *mut *mut u8,
317    out_content_len: *mut usize,
318) -> c_int {
319    if out_content.is_null() || out_content_len.is_null() {
320        return NetError::NullPointer.into();
321    }
322    *out_content = ptr::null_mut();
323    *out_content_len = 0;
324
325    let id = match c_str_to_owned(adapter_id) {
326        Some(s) => s,
327        None => return NetError::InvalidUtf8.into(),
328    };
329    if payload.is_null() && payload_len > 0 {
330        return NetError::NullPointer.into();
331    }
332    // `slice::from_raw_parts` requires `len <= isize::MAX`.
333    if payload_len > isize::MAX as usize {
334        return NetError::InvalidJson.into();
335    }
336    let payload_slice = if payload_len == 0 {
337        &[][..]
338    } else {
339        std::slice::from_raw_parts(payload, payload_len)
340    };
341
342    let adapter = match global_blob_adapter_registry().get(&id) {
343        Some(a) => a,
344        None => return NET_ERR_BLOB_NOT_REGISTERED,
345    };
346    // Same catch_unwind protection as net_blob_publish.
347    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
348        block_on(async move { resolve_payload(payload_slice, adapter.as_ref()).await })
349    }));
350    let bytes = match result {
351        Ok(Ok(b)) => b,
352        Ok(Err(e)) => return err_to_code(&e),
353        Err(_) => return NET_ERR_BLOB_PANIC,
354    };
355
356    write_bytes_out(&bytes, out_content, out_content_len)
357}
358
359/// Allocate a Rust-owned buffer with an explicit `Layout::array::<u8>(len)`,
360/// copy `src` into it, and write `(ptr, len)` to the caller's out-pointers.
361/// Pairs with [`net_blob_free_buffer`], which deallocates with the matching
362/// layout. Pre-fix this path went `Vec → into_boxed_slice → Box::into_raw`,
363/// freed via `Box::from_raw(slice_from_raw_parts_mut(ptr, len))`. That
364/// worked because `into_boxed_slice` happens to shrink-to-fit today, but
365/// relied on a `Vec` / `Box<[u8]>` allocator-internals coincidence. A
366/// future refactor to `Vec::leak` (which does NOT shrink) would have
367/// silently mismatched the dealloc layout. Using an explicit
368/// `Layout::array::<u8>` on both sides makes the contract self-evident.
369///
370/// # Safety
371/// `out_ptr` and `out_len` must be writable.
372unsafe fn write_bytes_out(src: &[u8], out_ptr: *mut *mut u8, out_len: *mut usize) -> c_int {
373    let len = src.len();
374    if len == 0 {
375        unsafe {
376            *out_ptr = ptr::null_mut();
377            *out_len = 0;
378        }
379        return 0;
380    }
381    let layout = match std::alloc::Layout::array::<u8>(len) {
382        Ok(l) => l,
383        // `Layout::array::<u8>` only fails when `len > isize::MAX`.
384        // The publish/resolve paths have already rejected that range
385        // (slice::from_raw_parts shares the same cap), so this is
386        // unreachable from the existing call sites — but defending it
387        // here keeps `write_bytes_out` safe to reuse from any future
388        // caller. Returning a typed code beats panicking across the
389        // surrounding `extern "C"` frame.
390        Err(_) => return NetError::InvalidJson.into(),
391    };
392    let alloc_ptr = unsafe { std::alloc::alloc(layout) };
393    if alloc_ptr.is_null() {
394        std::alloc::handle_alloc_error(layout);
395    }
396    unsafe {
397        std::ptr::copy_nonoverlapping(src.as_ptr(), alloc_ptr, len);
398        *out_ptr = alloc_ptr;
399        *out_len = len;
400    }
401    0
402}
403
404/// Free a buffer returned by [`net_blob_publish`] or
405/// [`net_blob_resolve`]. Calling with `(null, _)` or `(_, 0)` is a no-op.
406///
407/// # Safety
408/// `ptr` MUST be a buffer that the substrate previously returned
409/// from `net_blob_publish` or `net_blob_resolve` (or null), and
410/// `len` MUST match the corresponding `*out_*_len` value from
411/// that call. Calling with any other `(ptr, len)` is undefined
412/// behaviour.
413#[unsafe(no_mangle)]
414pub unsafe extern "C" fn net_blob_free_buffer(ptr: *mut u8, len: usize) {
415    if ptr.is_null() || len == 0 {
416        return;
417    }
418    // Match the `Layout::array::<u8>(len)` used by `write_bytes_out`.
419    // Any `len > isize::MAX` could not have come from us — the
420    // allocating side would have rejected the same layout — so the
421    // safest response is to abandon the free rather than unwind
422    // across the FFI boundary.
423    let layout = match std::alloc::Layout::array::<u8>(len) {
424        Ok(l) => l,
425        Err(_) => return,
426    };
427    std::alloc::dealloc(ptr, layout);
428}
429
430// Ensure the unused-import lint stays quiet under feature gates that
431// drop one of these surfaces — currently all callable.
432#[allow(dead_code)]
433fn _force_use() -> *mut c_void {
434    ptr::null_mut()
435}
436
437// =========================================================================
438// C-side callback adapter — register a function-pointer-table from
439// a cgo / native caller and let the substrate dispatch BlobAdapter
440// calls into it. The substrate wraps the table as a `dyn BlobAdapter`
441// and stores it in the global registry under the supplied id.
442// =========================================================================
443
444use std::ops::Range;
445
446use async_trait::async_trait;
447use bytes::Bytes;
448
449/// `store` function pointer. Caller-allocates nothing; returns
450/// `0` on success or a negative `c_int` on failure.
451pub type NetBlobAdapterStoreFn = unsafe extern "C" fn(
452    ctx: *mut c_void,
453    uri: *const c_char,
454    hash: *const u8, // exactly 32 bytes
455    size: u64,
456    data: *const u8,
457    data_len: usize,
458) -> c_int;
459
460/// `fetch` / `fetch_range` function pointer. Caller-allocates the
461/// return buffer and writes the pointer + length into the
462/// out-params. The substrate releases it via the vtable's
463/// `free_buffer` after consuming the bytes.
464pub type NetBlobAdapterFetchFn = unsafe extern "C" fn(
465    ctx: *mut c_void,
466    uri: *const c_char,
467    hash: *const u8,
468    size: u64,
469    out_data: *mut *mut u8,
470    out_len: *mut usize,
471) -> c_int;
472
473/// `fetch_range` function pointer.
474pub type NetBlobAdapterFetchRangeFn = unsafe extern "C" fn(
475    ctx: *mut c_void,
476    uri: *const c_char,
477    hash: *const u8,
478    size: u64,
479    range_start: u64,
480    range_end: u64,
481    out_data: *mut *mut u8,
482    out_len: *mut usize,
483) -> c_int;
484
485/// `exists` function pointer. Writes a `0` / `1` boolean into
486/// `out_exists` on success.
487pub type NetBlobAdapterExistsFn = unsafe extern "C" fn(
488    ctx: *mut c_void,
489    uri: *const c_char,
490    hash: *const u8,
491    size: u64,
492    out_exists: *mut c_int,
493) -> c_int;
494
495/// Frees a buffer that the caller's `fetch` / `fetch_range`
496/// allocated. The substrate calls this after consuming the
497/// returned bytes.
498pub type NetBlobAdapterFreeFn = unsafe extern "C" fn(ctx: *mut c_void, data: *mut u8, len: usize);
499
500/// Function-pointer-table the C-side caller passes to
501/// [`net_blob_register_callback_adapter`]. The struct is `#[repr(C)]`
502/// for cross-ABI stability.
503#[repr(C)]
504#[derive(Clone, Copy)]
505pub struct NetBlobAdapterVtable {
506    /// `store(ctx, uri, hash, size, data, data_len) -> c_int`
507    pub store: NetBlobAdapterStoreFn,
508    /// `fetch(ctx, uri, hash, size, &out_data, &out_len) -> c_int`
509    pub fetch: NetBlobAdapterFetchFn,
510    /// `fetch_range(ctx, uri, hash, size, start, end, &out_data, &out_len)`
511    pub fetch_range: NetBlobAdapterFetchRangeFn,
512    /// `exists(ctx, uri, hash, size, &out_exists) -> c_int`
513    pub exists: NetBlobAdapterExistsFn,
514    /// `free_buffer(ctx, data, len)` — substrate calls this after
515    /// consuming a buffer the caller returned via `fetch` /
516    /// `fetch_range`.
517    pub free_buffer: NetBlobAdapterFreeFn,
518}
519
520/// Opaque caller-context pointer.
521///
522/// # Concurrency contract (caller MUST uphold)
523///
524/// The substrate dispatches every vtable call from a
525/// `tokio::task::spawn_blocking` worker, which means the same
526/// `ctx` pointer is observed from **multiple OS threads over the
527/// lifetime of the registration** and may be observed
528/// **concurrently** if two events for the same adapter are
529/// in-flight. `Send + Sync` are asserted unconditionally because
530/// the substrate has no visibility into what the pointer
531/// references — the C-side registrant is the trust boundary.
532///
533/// In practical terms, this means a registrant **MUST** pass a
534/// `ctx` whose pointee is:
535///
536/// - **`Send` across threads**: any per-thread state (e.g. a
537///   thread-local OS handle, a goroutine-local pointer, a
538///   Python `PyObject*` held without the GIL) is unsafe.
539/// - **`Sync` for concurrent dispatch**: any state mutated
540///   inside vtable callbacks must be protected against
541///   data races by the registrant (lock, atomic, etc.).
542///
543/// Wrappers that cannot meet the `Sync` requirement (e.g. a
544/// Python adapter that uses the GIL) MUST serialize their own
545/// dispatch behind a `Mutex` before passing control to the
546/// language runtime.
547struct OpaqueCtx(*mut c_void);
548
549// SAFETY: opaque-pointer transport — see `OpaqueCtx` doc above.
550// Cross-thread coherence of the pointee is the C-side caller's
551// responsibility; the substrate only reads and forwards the
552// same address verbatim.
553unsafe impl Send for OpaqueCtx {}
554unsafe impl Sync for OpaqueCtx {}
555
556impl OpaqueCtx {
557    fn new(ptr: *mut c_void) -> Self {
558        Self(ptr)
559    }
560    fn get(&self) -> *mut c_void {
561        self.0
562    }
563}
564
565/// `BlobAdapter` impl that calls into a vtable of C function
566/// pointers. Each trait method translates the args into
567/// `*const c_char` / `*const u8` shapes, dispatches inside
568/// `tokio::task::spawn_blocking` so the tokio worker isn't
569/// blocked on synchronous C-side I/O, and maps the return code
570/// back into a `Result<_, BlobError>`.
571struct CallbackBlobAdapter {
572    id: String,
573    vtable: NetBlobAdapterVtable,
574    ctx: Arc<OpaqueCtx>,
575}
576
577unsafe impl Send for CallbackBlobAdapter {}
578unsafe impl Sync for CallbackBlobAdapter {}
579
580fn code_to_err(code: c_int, label: &str) -> InnerBlobError {
581    match code {
582        NET_ERR_BLOB_NOT_FOUND => InnerBlobError::NotFound(label.into()),
583        NET_ERR_BLOB_HASH_MISMATCH => InnerBlobError::Backend(format!(
584            "{}: substrate hash mismatch (caller returned wrong bytes)",
585            label
586        )),
587        NET_ERR_BLOB_UNSUPPORTED_SCHEME => InnerBlobError::UnsupportedScheme(label.into()),
588        NET_ERR_BLOB_DECODE => InnerBlobError::Decode(label.into()),
589        _ => InnerBlobError::Backend(format!("{}: code {}", label, code)),
590    }
591}
592
593/// Extract `(uri, hash, size)` from a [`BlobRef::Small`] for an FFI
594/// vtable call. The C vtable signature only supports single-hash
595/// blobs; chunked dispatch happens at the substrate's
596/// `MeshBlobAdapter` layer above this FFI shim. A
597/// [`BlobRef::Manifest`] passed here is a layering bug; surface
598/// `InnerBlobError::Backend` rather than silently truncating to the
599/// first chunk.
600fn expect_small_for_ffi(
601    blob_ref: &crate::adapter::net::dataforts::BlobRef,
602) -> std::result::Result<(String, [u8; 32], u64), InnerBlobError> {
603    match blob_ref {
604        crate::adapter::net::dataforts::BlobRef::Small {
605            uri, hash, size, ..
606        } => Ok((uri.clone(), *hash, *size)),
607        crate::adapter::net::dataforts::BlobRef::Manifest { .. }
608        | crate::adapter::net::dataforts::BlobRef::Tree { .. } => Err(InnerBlobError::Backend(
609            "CallbackBlobAdapter operates on Small blobs only; \
610                 chunked blobs are dispatched at the substrate above"
611                .to_owned(),
612        )),
613    }
614}
615
616#[async_trait]
617impl BlobAdapter for CallbackBlobAdapter {
618    fn adapter_id(&self) -> &str {
619        &self.id
620    }
621
622    async fn store(
623        &self,
624        blob_ref: &crate::adapter::net::dataforts::BlobRef,
625        bytes: &[u8],
626    ) -> std::result::Result<(), InnerBlobError> {
627        let vtable = self.vtable;
628        let ctx = self.ctx.clone();
629        let (uri_str, hash, size) = expect_small_for_ffi(blob_ref)?;
630        let uri = match std::ffi::CString::new(uri_str) {
631            Ok(c) => c,
632            Err(e) => return Err(InnerBlobError::Backend(format!("uri NUL: {}", e))),
633        };
634        let data = bytes.to_vec();
635        tokio::task::spawn_blocking(move || -> std::result::Result<(), InnerBlobError> {
636            let code = unsafe {
637                (vtable.store)(
638                    ctx.get(),
639                    uri.as_ptr(),
640                    hash.as_ptr(),
641                    size,
642                    data.as_ptr(),
643                    data.len(),
644                )
645            };
646            if code == 0 {
647                Ok(())
648            } else {
649                Err(code_to_err(code, "store"))
650            }
651        })
652        .await
653        .map_err(|e| InnerBlobError::Backend(format!("spawn_blocking join: {}", e)))?
654    }
655
656    async fn fetch(
657        &self,
658        blob_ref: &crate::adapter::net::dataforts::BlobRef,
659    ) -> std::result::Result<Bytes, InnerBlobError> {
660        let vtable = self.vtable;
661        let ctx = self.ctx.clone();
662        let (uri_str, hash, size) = expect_small_for_ffi(blob_ref)?;
663        let uri = match std::ffi::CString::new(uri_str) {
664            Ok(c) => c,
665            Err(e) => return Err(InnerBlobError::Backend(format!("uri NUL: {}", e))),
666        };
667        tokio::task::spawn_blocking(move || -> std::result::Result<Bytes, InnerBlobError> {
668            let mut out_data: *mut u8 = ptr::null_mut();
669            let mut out_len: usize = 0;
670            let code = unsafe {
671                (vtable.fetch)(
672                    ctx.get(),
673                    uri.as_ptr(),
674                    hash.as_ptr(),
675                    size,
676                    &mut out_data,
677                    &mut out_len,
678                )
679            };
680            if code != 0 {
681                return Err(code_to_err(code, "fetch"));
682            }
683            if out_data.is_null() {
684                if out_len == 0 {
685                    return Ok(Bytes::new());
686                }
687                return Err(InnerBlobError::Backend(
688                    "fetch: caller returned null pointer with non-zero len".into(),
689                ));
690            }
691            // Copy out before freeing — the FFI caller owns the
692            // buffer and frees it via free_buffer. We can't hand
693            // the FFI-owned pointer to `Bytes` because rust would
694            // assume Vec-style allocator ownership, so the copy
695            // is unavoidable here (per dataforts perf #184 — the
696            // savings the Bytes signature unlocks are inside the
697            // mesh/fs/noop adapters; FFI callbacks pay the copy
698            // at the boundary in either direction).
699            let buf = unsafe { std::slice::from_raw_parts(out_data, out_len).to_vec() };
700            unsafe { (vtable.free_buffer)(ctx.get(), out_data, out_len) };
701            Ok(Bytes::from(buf))
702        })
703        .await
704        .map_err(|e| InnerBlobError::Backend(format!("spawn_blocking join: {}", e)))?
705    }
706
707    async fn fetch_range(
708        &self,
709        blob_ref: &crate::adapter::net::dataforts::BlobRef,
710        range: Range<u64>,
711    ) -> std::result::Result<Bytes, InnerBlobError> {
712        let vtable = self.vtable;
713        let ctx = self.ctx.clone();
714        let (uri_str, hash, size) = expect_small_for_ffi(blob_ref)?;
715        let uri = match std::ffi::CString::new(uri_str) {
716            Ok(c) => c,
717            Err(e) => return Err(InnerBlobError::Backend(format!("uri NUL: {}", e))),
718        };
719        let start = range.start;
720        let end = range.end;
721        tokio::task::spawn_blocking(move || -> std::result::Result<Bytes, InnerBlobError> {
722            let mut out_data: *mut u8 = ptr::null_mut();
723            let mut out_len: usize = 0;
724            let code = unsafe {
725                (vtable.fetch_range)(
726                    ctx.get(),
727                    uri.as_ptr(),
728                    hash.as_ptr(),
729                    size,
730                    start,
731                    end,
732                    &mut out_data,
733                    &mut out_len,
734                )
735            };
736            if code != 0 {
737                return Err(code_to_err(code, "fetch_range"));
738            }
739            if out_data.is_null() {
740                if out_len == 0 {
741                    return Ok(Bytes::new());
742                }
743                return Err(InnerBlobError::Backend(
744                    "fetch_range: caller returned null pointer with non-zero len".into(),
745                ));
746            }
747            let buf = unsafe { std::slice::from_raw_parts(out_data, out_len).to_vec() };
748            unsafe { (vtable.free_buffer)(ctx.get(), out_data, out_len) };
749            Ok(Bytes::from(buf))
750        })
751        .await
752        .map_err(|e| InnerBlobError::Backend(format!("spawn_blocking join: {}", e)))?
753    }
754
755    async fn exists(
756        &self,
757        blob_ref: &crate::adapter::net::dataforts::BlobRef,
758    ) -> std::result::Result<bool, InnerBlobError> {
759        let vtable = self.vtable;
760        let ctx = self.ctx.clone();
761        let (uri_str, hash, size) = expect_small_for_ffi(blob_ref)?;
762        let uri = match std::ffi::CString::new(uri_str) {
763            Ok(c) => c,
764            Err(e) => return Err(InnerBlobError::Backend(format!("uri NUL: {}", e))),
765        };
766        tokio::task::spawn_blocking(move || -> std::result::Result<bool, InnerBlobError> {
767            let mut out_exists: c_int = 0;
768            let code = unsafe {
769                (vtable.exists)(
770                    ctx.get(),
771                    uri.as_ptr(),
772                    hash.as_ptr(),
773                    size,
774                    &mut out_exists,
775                )
776            };
777            if code != 0 {
778                return Err(code_to_err(code, "exists"));
779            }
780            Ok(out_exists != 0)
781        })
782        .await
783        .map_err(|e| InnerBlobError::Backend(format!("spawn_blocking join: {}", e)))?
784    }
785}
786
787/// Register a C-side BlobAdapter implementation. The vtable is
788/// copied into the adapter; `ctx` is shuttled across every call as
789/// an opaque pointer (caller is responsible for thread-safety).
790///
791/// Returns `0` on success, `NET_ERR_BLOB_DUPLICATE_ID` if `id` is
792/// already registered, or `NetError::InvalidUtf8` / `NullPointer`
793/// for malformed input.
794///
795/// # Safety
796/// - `adapter_id` must point to a valid null-terminated UTF-8 byte
797///   sequence.
798/// - `vtable` must point to a fully-initialised `NetBlobAdapterVtable`
799///   whose function pointers remain valid for the lifetime of the
800///   registration (i.e. until `net_blob_unregister_adapter` returns
801///   AND any in-flight calls have completed).
802/// - `ctx` is an opaque pointer the substrate passes through unchanged
803///   to every vtable call; the caller is responsible for keeping the
804///   pointee alive for the same lifetime as `vtable`.
805///
806/// # Concurrency contract (caller MUST uphold)
807///
808/// The substrate dispatches every vtable call from a
809/// `tokio::task::spawn_blocking` worker. The same `ctx` will be
810/// observed from **multiple OS threads** over the lifetime of the
811/// registration and may be observed **concurrently** when two
812/// in-flight calls are dispatched to the same adapter.
813///
814/// The pointee of `ctx` therefore MUST be:
815/// - safely transferable across threads (`Send`-equivalent in the
816///   caller's runtime); and
817/// - safely accessed concurrently (`Sync`-equivalent), or guarded
818///   inside the vtable callbacks by a caller-owned lock.
819///
820/// Passing a thread-local pointer (an OS thread handle, a Go
821/// goroutine-local pointer, a Python `PyObject*` held outside the
822/// GIL, etc.) is **undefined behaviour**. Wrappers whose runtime
823/// cannot meet the `Sync` requirement MUST serialize vtable
824/// dispatch inside the callback before crossing into the
825/// language runtime.
826#[unsafe(no_mangle)]
827pub unsafe extern "C" fn net_blob_register_callback_adapter(
828    adapter_id: *const c_char,
829    vtable: *const NetBlobAdapterVtable,
830    ctx: *mut c_void,
831) -> c_int {
832    if vtable.is_null() {
833        return NetError::NullPointer.into();
834    }
835    let id = match c_str_to_owned(adapter_id) {
836        Some(s) => s,
837        None => return NetError::InvalidUtf8.into(),
838    };
839    // Validate every fn-ptr field is non-null BEFORE materialising
840    // the vtable as a value-typed `NetBlobAdapterVtable` — Rust's
841    // `unsafe extern "C" fn` type is non-nullable, so loading a
842    // struct whose C-side caller left any field NULL is immediate
843    // UB. Cast each field through a `*const ()` to read the raw
844    // bits without constructing a non-null fn-pointer value.
845    {
846        let raw = vtable as *const c_void as *const *const c_void;
847        // Five fn-ptr fields (store / fetch / fetch_range /
848        // exists / free_buffer). Reading them as *const c_void
849        // gives the raw address without invoking the fn-ptr type's
850        // non-null invariant.
851        for i in 0..5 {
852            let field = unsafe { *raw.add(i) };
853            if field.is_null() {
854                return NET_ERR_BLOB_BACKEND;
855            }
856        }
857    }
858    let vtable = unsafe { *vtable };
859    let adapter: Arc<dyn BlobAdapter> = Arc::new(CallbackBlobAdapter {
860        id: id.clone(),
861        vtable,
862        ctx: Arc::new(OpaqueCtx::new(ctx)),
863    });
864    match global_blob_adapter_registry().register(adapter) {
865        Ok(()) => 0,
866        Err(_) => NET_ERR_BLOB_DUPLICATE_ID,
867    }
868}
869
870// =========================================================================
871// MeshBlobAdapter — v0.2 substrate-owned blob CAS + v0.3 active overflow
872// =========================================================================
873//
874// Mirrors the Node + Python `MeshBlobAdapter` surface for the
875// Go binding via cgo. JSON-encoded configs at the FFI boundary
876// (matches the existing `net_redex_enable_greedy_dataforts` and
877// peers); the Go wrapper marshals from `struct{...}` into the
878// JSON shape before calling these.
879
880/// Opaque handle to a `MeshBlobAdapter`. The Box owns an
881/// `Arc<InnerMeshBlobAdapter>` so multiple handles can share
882/// the adapter — but the FFI surface only ever hands out one
883/// handle per `_new` call; the operator clones at the Go layer
884/// if they want fan-out. Free with [`net_mesh_blob_adapter_free`].
885#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
886pub struct MeshBlobAdapterHandle {
887    inner: ManuallyDrop<Arc<InnerMeshBlobAdapter>>,
888}
889
890#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
891use std::mem::ManuallyDrop;
892
893/// JSON shape for the `overflow` config option passed to
894/// [`net_mesh_blob_adapter_new`] + [`net_mesh_blob_adapter_set_overflow_config`].
895/// Mirrors the typed `OverflowConfig` from the Rust crate;
896/// `scope` is one of `"node" | "zone" | "region" | "mesh"`.
897#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
898#[derive(serde::Deserialize, serde::Serialize)]
899struct OverflowConfigJson {
900    #[serde(default)]
901    enabled: bool,
902    #[serde(default)]
903    high_water_ratio: Option<f64>,
904    #[serde(default)]
905    low_water_ratio: Option<f64>,
906    #[serde(default)]
907    max_pushes_per_tick: Option<u64>,
908    #[serde(default)]
909    scope: Option<String>,
910    #[serde(default)]
911    tick_interval_ms: Option<u64>,
912}
913
914#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
915fn parse_overflow_json(s: &str) -> Result<InnerOverflowConfig, c_int> {
916    if s.is_empty() {
917        return Ok(InnerOverflowConfig::default());
918    }
919    let raw: OverflowConfigJson =
920        serde_json::from_str(s).map_err(|_| -> c_int { NetError::InvalidJson.into() })?;
921    let mut cfg = InnerOverflowConfig {
922        enabled: raw.enabled,
923        ..InnerOverflowConfig::default()
924    };
925    if let Some(v) = raw.high_water_ratio {
926        cfg.high_water_ratio = v;
927    }
928    if let Some(v) = raw.low_water_ratio {
929        cfg.low_water_ratio = v;
930    }
931    if let Some(v) = raw.max_pushes_per_tick {
932        cfg.max_pushes_per_tick = v as usize;
933    }
934    if let Some(s) = raw.scope {
935        cfg.scope = match s.to_ascii_lowercase().as_str() {
936            "node" => TopologyScope::Node,
937            "zone" => TopologyScope::Zone,
938            "region" => TopologyScope::Region,
939            "mesh" => TopologyScope::Mesh,
940            _ => {
941                let code: c_int = NetError::InvalidJson.into();
942                return Err(code);
943            }
944        };
945    }
946    if let Some(v) = raw.tick_interval_ms {
947        cfg.tick_interval_ms = v;
948    }
949    Ok(cfg)
950}
951
952#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
953fn overflow_to_json(cfg: InnerOverflowConfig) -> String {
954    let scope = match cfg.scope {
955        TopologyScope::Node => "node",
956        TopologyScope::Zone => "zone",
957        TopologyScope::Region => "region",
958        TopologyScope::Mesh => "mesh",
959    };
960    let raw = OverflowConfigJson {
961        enabled: cfg.enabled,
962        high_water_ratio: Some(cfg.high_water_ratio),
963        low_water_ratio: Some(cfg.low_water_ratio),
964        max_pushes_per_tick: Some(cfg.max_pushes_per_tick as u64),
965        scope: Some(scope.to_string()),
966        tick_interval_ms: Some(cfg.tick_interval_ms),
967    };
968    serde_json::to_string(&raw).unwrap_or_else(|_| "{}".to_string())
969}
970
971/// Construct a `MeshBlobAdapter` against `redex`.
972///
973/// - `redex` — pointer to a `RedexHandle` from `net_redex_new`. The
974///   adapter clones the inner `Arc<Redex>`; the redex handle stays
975///   valid after this call.
976/// - `adapter_id` — null-terminated UTF-8 identity tag.
977/// - `persistent` — `0` = in-memory chunks; `1` = disk-backed
978///   (requires the redex to have been opened with a `persistent_dir`).
979/// - `overflow_json` — null OR null-terminated JSON for the v0.3
980///   overflow config. Empty string / null = overflow off (the
981///   v0.2 default).
982///
983/// Returns a non-null handle on success. On error returns null and
984/// sets no errno-equivalent — operators check for null + retry with
985/// a well-formed JSON config. Free with `net_mesh_blob_adapter_free`.
986///
987/// # Safety
988/// `redex` must be a valid `RedexHandle*` returned from `net_redex_new`
989/// and not yet freed. `adapter_id` must be a valid null-terminated
990/// UTF-8 string. `overflow_json` may be null or a valid
991/// null-terminated UTF-8 JSON string.
992#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
993#[unsafe(no_mangle)]
994pub unsafe extern "C" fn net_mesh_blob_adapter_new(
995    redex: *mut super::cortex::RedexHandle,
996    adapter_id: *const c_char,
997    persistent: c_int,
998    overflow_json: *const c_char,
999) -> *mut MeshBlobAdapterHandle {
1000    if redex.is_null() {
1001        return ptr::null_mut();
1002    }
1003    let id = match unsafe { c_str_to_owned(adapter_id) } {
1004        Some(s) => s,
1005        None => return ptr::null_mut(),
1006    };
1007    let overflow_str = if overflow_json.is_null() {
1008        String::new()
1009    } else {
1010        match unsafe { c_str_to_owned(overflow_json) } {
1011            Some(s) => s,
1012            None => return ptr::null_mut(),
1013        }
1014    };
1015    let overflow_cfg = match parse_overflow_json(&overflow_str) {
1016        Ok(c) => c,
1017        Err(_) => return ptr::null_mut(),
1018    };
1019    let redex_inner = unsafe { (*redex).redex_arc() };
1020    let mut builder = InnerMeshBlobAdapter::new(id, redex_inner).with_persistent(persistent != 0);
1021    if !overflow_str.is_empty() {
1022        builder = builder.with_overflow(overflow_cfg);
1023    }
1024    Box::into_raw(Box::new(MeshBlobAdapterHandle {
1025        inner: ManuallyDrop::new(Arc::new(builder)),
1026    }))
1027}
1028
1029/// Free a handle from [`net_mesh_blob_adapter_new`]. Idempotent
1030/// against a null pointer.
1031///
1032/// # Safety
1033/// `handle` must be a pointer returned by `net_mesh_blob_adapter_new`
1034/// + not yet freed, or null.
1035#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1036#[unsafe(no_mangle)]
1037pub unsafe extern "C" fn net_mesh_blob_adapter_free(handle: *mut MeshBlobAdapterHandle) {
1038    if handle.is_null() {
1039        return;
1040    }
1041    let mut boxed = unsafe { Box::from_raw(handle) };
1042    unsafe { ManuallyDrop::drop(&mut boxed.inner) };
1043}
1044
1045/// Store `data` of `data_len` bytes under the content address
1046/// declared by `blob_ref_bytes` (a previously-encoded `BlobRef`
1047/// wire blob from `net_blob_publish` or constructed externally).
1048///
1049/// Returns `0` on success, `NET_ERR_BLOB_*` on adapter-side error,
1050/// or `NetError::NullPointer` / `InvalidUtf8` for input validation.
1051/// The substrate BLAKE3-verifies the bytes against the BlobRef
1052/// hash before persisting.
1053///
1054/// # Safety
1055/// `handle` is a valid `MeshBlobAdapterHandle*`. `blob_ref_bytes`
1056/// + `data` point to readable buffers of the supplied lengths.
1057#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1058#[unsafe(no_mangle)]
1059pub unsafe extern "C" fn net_mesh_blob_adapter_store(
1060    handle: *const MeshBlobAdapterHandle,
1061    blob_ref_bytes: *const u8,
1062    blob_ref_len: usize,
1063    data: *const u8,
1064    data_len: usize,
1065) -> c_int {
1066    if handle.is_null() || blob_ref_bytes.is_null() {
1067        return NetError::NullPointer.into();
1068    }
1069    // `slice::from_raw_parts` requires `len <= isize::MAX`.
1070    if blob_ref_len > isize::MAX as usize || data_len > isize::MAX as usize {
1071        return NetError::InvalidJson.into();
1072    }
1073    let blob_slice = unsafe { std::slice::from_raw_parts(blob_ref_bytes, blob_ref_len) };
1074    let blob_ref = match InnerBlobRef::decode(blob_slice) {
1075        Ok(Some(b)) => b,
1076        _ => return NET_ERR_BLOB_DECODE,
1077    };
1078    let data_slice = if data.is_null() {
1079        &[]
1080    } else {
1081        unsafe { std::slice::from_raw_parts(data, data_len) }
1082    };
1083    let adapter = unsafe { (*handle).inner.clone() };
1084    let data_owned = data_slice.to_vec();
1085    let result = block_on(async move { (*adapter).store(&blob_ref, &data_owned).await });
1086    match result {
1087        Ok(()) => 0,
1088        Err(e) => err_to_code(&e),
1089    }
1090}
1091
1092/// Fetch the content for `blob_ref_bytes`. On success writes a
1093/// heap-allocated buffer pointer to `*out_data` + length to
1094/// `*out_len` and returns `0`. The caller MUST free via
1095/// [`net_blob_free_buffer`].
1096///
1097/// # Safety
1098/// `handle`, `blob_ref_bytes`, `out_data`, `out_len` must all be
1099/// non-null and point to valid memory of the appropriate type.
1100#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1101#[unsafe(no_mangle)]
1102pub unsafe extern "C" fn net_mesh_blob_adapter_fetch(
1103    handle: *const MeshBlobAdapterHandle,
1104    blob_ref_bytes: *const u8,
1105    blob_ref_len: usize,
1106    out_data: *mut *mut u8,
1107    out_len: *mut usize,
1108) -> c_int {
1109    if handle.is_null() || blob_ref_bytes.is_null() || out_data.is_null() || out_len.is_null() {
1110        return NetError::NullPointer.into();
1111    }
1112    // `slice::from_raw_parts` requires `len <= isize::MAX`.
1113    if blob_ref_len > isize::MAX as usize {
1114        return NetError::InvalidJson.into();
1115    }
1116    let blob_slice = unsafe { std::slice::from_raw_parts(blob_ref_bytes, blob_ref_len) };
1117    let blob_ref = match InnerBlobRef::decode(blob_slice) {
1118        Ok(Some(b)) => b,
1119        _ => return NET_ERR_BLOB_DECODE,
1120    };
1121    let adapter = unsafe { (*handle).inner.clone() };
1122    let result = block_on(async move { (*adapter).fetch(&blob_ref).await });
1123    match result {
1124        // Allocate with the same explicit `Layout::array::<u8>(len)`
1125        // path that `net_blob_free_buffer` deallocates with, so the
1126        // pair is layout-symmetric regardless of any future
1127        // `Vec::leak` / `into_boxed_slice` refactor inside the
1128        // adapter.
1129        Ok(bytes) => unsafe { write_bytes_out(&bytes, out_data, out_len) },
1130        Err(e) => err_to_code(&e),
1131    }
1132}
1133
1134/// Probe local presence — writes `1` to `*out_exists` if the chunk
1135/// is locally reachable, `0` otherwise. Returns `0` on success or
1136/// a `NET_ERR_*` code on failure.
1137///
1138/// # Safety
1139/// `handle`, `blob_ref_bytes`, `out_exists` must all be non-null.
1140#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1141#[unsafe(no_mangle)]
1142pub unsafe extern "C" fn net_mesh_blob_adapter_exists(
1143    handle: *const MeshBlobAdapterHandle,
1144    blob_ref_bytes: *const u8,
1145    blob_ref_len: usize,
1146    out_exists: *mut c_int,
1147) -> c_int {
1148    if handle.is_null() || blob_ref_bytes.is_null() || out_exists.is_null() {
1149        return NetError::NullPointer.into();
1150    }
1151    // `slice::from_raw_parts` requires `len <= isize::MAX`.
1152    if blob_ref_len > isize::MAX as usize {
1153        return NetError::InvalidJson.into();
1154    }
1155    let blob_slice = unsafe { std::slice::from_raw_parts(blob_ref_bytes, blob_ref_len) };
1156    let blob_ref = match InnerBlobRef::decode(blob_slice) {
1157        Ok(Some(b)) => b,
1158        _ => return NET_ERR_BLOB_DECODE,
1159    };
1160    let adapter = unsafe { (*handle).inner.clone() };
1161    let result = block_on(async move { (*adapter).exists(&blob_ref).await });
1162    match result {
1163        Ok(present) => {
1164            unsafe { *out_exists = if present { 1 } else { 0 } };
1165            0
1166        }
1167        Err(e) => err_to_code(&e),
1168    }
1169}
1170
1171/// Render the adapter's Prometheus text body. Returns a
1172/// `CString::into_raw`-allocated `*mut c_char` that the caller
1173/// MUST free via [`crate::ffi::net_free_string`]. Returns null on
1174/// allocation failure (rare).
1175///
1176/// # Safety
1177/// `handle` must be a valid `MeshBlobAdapterHandle*`.
1178#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1179#[unsafe(no_mangle)]
1180pub unsafe extern "C" fn net_mesh_blob_adapter_prometheus_text(
1181    handle: *const MeshBlobAdapterHandle,
1182) -> *mut c_char {
1183    if handle.is_null() {
1184        return ptr::null_mut();
1185    }
1186    let adapter = unsafe { (*handle).inner.clone() };
1187    let body = (*adapter).prometheus_text();
1188    match std::ffi::CString::new(body) {
1189        Ok(s) => s.into_raw(),
1190        Err(_) => ptr::null_mut(),
1191    }
1192}
1193
1194// ---- v0.3 active-overflow surface ----
1195
1196/// True / false for `overflow_enabled` on the adapter. Returns
1197/// `1` / `0`; returns negative `NET_ERR_*` on null handle.
1198///
1199/// # Safety
1200/// `handle` must be a valid `MeshBlobAdapterHandle*`.
1201#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1202#[unsafe(no_mangle)]
1203pub unsafe extern "C" fn net_mesh_blob_adapter_overflow_enabled(
1204    handle: *const MeshBlobAdapterHandle,
1205) -> c_int {
1206    if handle.is_null() {
1207        return NetError::NullPointer.into();
1208    }
1209    let adapter = unsafe { (*handle).inner.clone() };
1210    if (*adapter).overflow_enabled() {
1211        1
1212    } else {
1213        0
1214    }
1215}
1216
1217/// True / false for `overflow_active` (the hysteresis runtime
1218/// state). Same return shape as `_overflow_enabled`.
1219///
1220/// # Safety
1221/// `handle` must be a valid `MeshBlobAdapterHandle*`.
1222#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1223#[unsafe(no_mangle)]
1224pub unsafe extern "C" fn net_mesh_blob_adapter_overflow_active(
1225    handle: *const MeshBlobAdapterHandle,
1226) -> c_int {
1227    if handle.is_null() {
1228        return NetError::NullPointer.into();
1229    }
1230    let adapter = unsafe { (*handle).inner.clone() };
1231    if (*adapter).overflow_active() {
1232        1
1233    } else {
1234        0
1235    }
1236}
1237
1238/// Snapshot the current overflow configuration as a JSON
1239/// string. Returns a `CString::into_raw`-allocated `*mut c_char`
1240/// the caller MUST free via [`crate::ffi::net_free_string`].
1241///
1242/// # Safety
1243/// `handle` must be a valid `MeshBlobAdapterHandle*`.
1244#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1245#[unsafe(no_mangle)]
1246pub unsafe extern "C" fn net_mesh_blob_adapter_overflow_config(
1247    handle: *const MeshBlobAdapterHandle,
1248) -> *mut c_char {
1249    if handle.is_null() {
1250        return ptr::null_mut();
1251    }
1252    let adapter = unsafe { (*handle).inner.clone() };
1253    let cfg = (*adapter).overflow_config();
1254    let json = overflow_to_json(cfg);
1255    match std::ffi::CString::new(json) {
1256        Ok(s) => s.into_raw(),
1257        Err(_) => ptr::null_mut(),
1258    }
1259}
1260
1261/// Flip the overflow master switch. Returns `0` on success,
1262/// `NET_ERR_*` on null handle.
1263///
1264/// # Safety
1265/// `handle` must be a valid `MeshBlobAdapterHandle*`.
1266#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1267#[unsafe(no_mangle)]
1268pub unsafe extern "C" fn net_mesh_blob_adapter_set_overflow_enabled(
1269    handle: *const MeshBlobAdapterHandle,
1270    enabled: c_int,
1271) -> c_int {
1272    if handle.is_null() {
1273        return NetError::NullPointer.into();
1274    }
1275    let adapter = unsafe { (*handle).inner.clone() };
1276    (*adapter).set_overflow_enabled(enabled != 0);
1277    0
1278}
1279
1280/// Replace the entire overflow configuration with the JSON
1281/// shape `config_json`. Returns `0` on success,
1282/// `NetError::InvalidJson` on malformed input.
1283///
1284/// # Safety
1285/// `handle` + `config_json` must be valid. `config_json` must be a
1286/// null-terminated UTF-8 JSON string.
1287#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1288#[unsafe(no_mangle)]
1289pub unsafe extern "C" fn net_mesh_blob_adapter_set_overflow_config(
1290    handle: *const MeshBlobAdapterHandle,
1291    config_json: *const c_char,
1292) -> c_int {
1293    if handle.is_null() || config_json.is_null() {
1294        return NetError::NullPointer.into();
1295    }
1296    let s = match unsafe { c_str_to_owned(config_json) } {
1297        Some(s) => s,
1298        None => return NetError::InvalidUtf8.into(),
1299    };
1300    let cfg = match parse_overflow_json(&s) {
1301        Ok(c) => c,
1302        Err(code) => return code,
1303    };
1304    let adapter = unsafe { (*handle).inner.clone() };
1305    (*adapter).set_overflow_config(cfg);
1306    0
1307}
1308
1309#[cfg(test)]
1310mod tests {
1311    #![allow(
1312        clippy::disallowed_methods,
1313        reason = "test code legitimately uses std::sync::{Mutex,RwLock} for SUT setup; tests have no real poison concern"
1314    )]
1315    use super::*;
1316    use std::ffi::CString;
1317    use std::sync::atomic::{AtomicU64, Ordering};
1318
1319    fn unique_id(prefix: &str) -> String {
1320        static N: AtomicU64 = AtomicU64::new(0);
1321        let n = N.fetch_add(1, Ordering::Relaxed);
1322        format!("{}-{}-{}", prefix, std::process::id(), n)
1323    }
1324
1325    /// End-to-end: register FS adapter, publish, resolve, free.
1326    /// Pins the contract on the symbols Go / C consumers will use.
1327    #[test]
1328    fn ffi_publish_resolve_round_trip() {
1329        let id = unique_id("ffi-blob");
1330        let root = std::env::temp_dir().join(format!("net-ffi-blob-{}", id));
1331        let id_c = CString::new(id.clone()).unwrap();
1332        let root_c = CString::new(root.to_string_lossy().as_ref()).unwrap();
1333        let uri_c = CString::new("file:///ffi-round-trip").unwrap();
1334
1335        unsafe {
1336            assert_eq!(
1337                net_blob_register_fs_adapter(id_c.as_ptr(), root_c.as_ptr()),
1338                0
1339            );
1340            assert_eq!(net_blob_adapter_registered(id_c.as_ptr()), 1);
1341
1342            let payload = b"end-to-end ffi blob round trip";
1343            let mut out_buf: *mut u8 = std::ptr::null_mut();
1344            let mut out_len: usize = 0;
1345            let rc = net_blob_publish(
1346                id_c.as_ptr(),
1347                uri_c.as_ptr(),
1348                payload.as_ptr(),
1349                payload.len(),
1350                &mut out_buf,
1351                &mut out_len,
1352            );
1353            assert_eq!(rc, 0);
1354            assert!(!out_buf.is_null());
1355            // First bytes are the BlobRef magic.
1356            let encoded = std::slice::from_raw_parts(out_buf, out_len);
1357            assert_eq!(
1358                &encoded[..4],
1359                &crate::adapter::net::dataforts::BLOB_REF_MAGIC,
1360            );
1361
1362            // Resolve back through the same adapter.
1363            let mut content_buf: *mut u8 = std::ptr::null_mut();
1364            let mut content_len: usize = 0;
1365            let rc = net_blob_resolve(
1366                id_c.as_ptr(),
1367                out_buf,
1368                out_len,
1369                &mut content_buf,
1370                &mut content_len,
1371            );
1372            assert_eq!(rc, 0);
1373            let resolved = std::slice::from_raw_parts(content_buf, content_len);
1374            assert_eq!(resolved, payload);
1375
1376            net_blob_free_buffer(out_buf, out_len);
1377            net_blob_free_buffer(content_buf, content_len);
1378            assert_eq!(net_blob_unregister_adapter(id_c.as_ptr()), 1);
1379        }
1380        let _ = std::fs::remove_dir_all(&root);
1381    }
1382
1383    #[test]
1384    fn ffi_resolve_returns_not_registered_for_unknown_adapter() {
1385        let id_c = CString::new("never-registered").unwrap();
1386        let payload = b"any";
1387        let mut out_buf: *mut u8 = std::ptr::null_mut();
1388        let mut out_len: usize = 0;
1389        let rc = unsafe {
1390            net_blob_resolve(
1391                id_c.as_ptr(),
1392                payload.as_ptr(),
1393                payload.len(),
1394                &mut out_buf,
1395                &mut out_len,
1396            )
1397        };
1398        assert_eq!(rc, NET_ERR_BLOB_NOT_REGISTERED);
1399        assert!(out_buf.is_null());
1400        assert_eq!(out_len, 0);
1401    }
1402
1403    /// Round-trip an `net_blob_register_callback_adapter`-registered
1404    /// adapter: publish bytes through the vtable, then resolve them
1405    /// back. The vtable's `fetch` returns bytes from a static map
1406    /// indexed by the BLAKE3 hash; the substrate-side hash check
1407    /// validates the round trip.
1408    mod callback_adapter_round_trip {
1409        use super::*;
1410        use std::collections::HashMap;
1411        use std::sync::Mutex;
1412
1413        struct CallbackCtx {
1414            store: Mutex<HashMap<[u8; 32], Vec<u8>>>,
1415        }
1416
1417        unsafe extern "C" fn cb_store(
1418            ctx: *mut c_void,
1419            _uri: *const c_char,
1420            hash: *const u8,
1421            _size: u64,
1422            data: *const u8,
1423            data_len: usize,
1424        ) -> c_int {
1425            let ctx = &*(ctx as *const CallbackCtx);
1426            let mut h = [0u8; 32];
1427            h.copy_from_slice(std::slice::from_raw_parts(hash, 32));
1428            let buf = if data_len == 0 {
1429                Vec::new()
1430            } else {
1431                std::slice::from_raw_parts(data, data_len).to_vec()
1432            };
1433            ctx.store.lock().unwrap().insert(h, buf);
1434            0
1435        }
1436
1437        unsafe extern "C" fn cb_fetch(
1438            ctx: *mut c_void,
1439            _uri: *const c_char,
1440            hash: *const u8,
1441            _size: u64,
1442            out_data: *mut *mut u8,
1443            out_len: *mut usize,
1444        ) -> c_int {
1445            let ctx = &*(ctx as *const CallbackCtx);
1446            let mut h = [0u8; 32];
1447            h.copy_from_slice(std::slice::from_raw_parts(hash, 32));
1448            let store = ctx.store.lock().unwrap();
1449            match store.get(&h) {
1450                Some(bytes) => {
1451                    let boxed = bytes.clone().into_boxed_slice();
1452                    let len = boxed.len();
1453                    let ptr = Box::into_raw(boxed) as *mut u8;
1454                    *out_data = ptr;
1455                    *out_len = len;
1456                    0
1457                }
1458                None => NET_ERR_BLOB_NOT_FOUND,
1459            }
1460        }
1461
1462        unsafe extern "C" fn cb_fetch_range(
1463            ctx: *mut c_void,
1464            _uri: *const c_char,
1465            hash: *const u8,
1466            _size: u64,
1467            range_start: u64,
1468            range_end: u64,
1469            out_data: *mut *mut u8,
1470            out_len: *mut usize,
1471        ) -> c_int {
1472            let ctx = &*(ctx as *const CallbackCtx);
1473            let mut h = [0u8; 32];
1474            h.copy_from_slice(std::slice::from_raw_parts(hash, 32));
1475            let store = ctx.store.lock().unwrap();
1476            match store.get(&h) {
1477                Some(bytes) => {
1478                    let s = range_start as usize;
1479                    let e = range_end as usize;
1480                    if s > e || e > bytes.len() {
1481                        return NET_ERR_BLOB_BACKEND;
1482                    }
1483                    let slice = bytes[s..e].to_vec().into_boxed_slice();
1484                    let len = slice.len();
1485                    *out_data = Box::into_raw(slice) as *mut u8;
1486                    *out_len = len;
1487                    0
1488                }
1489                None => NET_ERR_BLOB_NOT_FOUND,
1490            }
1491        }
1492
1493        unsafe extern "C" fn cb_exists(
1494            ctx: *mut c_void,
1495            _uri: *const c_char,
1496            hash: *const u8,
1497            _size: u64,
1498            out_exists: *mut c_int,
1499        ) -> c_int {
1500            let ctx = &*(ctx as *const CallbackCtx);
1501            let mut h = [0u8; 32];
1502            h.copy_from_slice(std::slice::from_raw_parts(hash, 32));
1503            *out_exists = if ctx.store.lock().unwrap().contains_key(&h) {
1504                1
1505            } else {
1506                0
1507            };
1508            0
1509        }
1510
1511        unsafe extern "C" fn cb_free(_ctx: *mut c_void, data: *mut u8, len: usize) {
1512            if data.is_null() {
1513                return;
1514            }
1515            let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(data, len));
1516        }
1517
1518        #[test]
1519        fn callback_adapter_publish_resolve_round_trip() {
1520            let ctx = Box::new(CallbackCtx {
1521                store: Mutex::new(HashMap::new()),
1522            });
1523            let ctx_ptr = Box::into_raw(ctx) as *mut c_void;
1524            let vtable = NetBlobAdapterVtable {
1525                store: cb_store,
1526                fetch: cb_fetch,
1527                fetch_range: cb_fetch_range,
1528                exists: cb_exists,
1529                free_buffer: cb_free,
1530            };
1531
1532            let id_c = std::ffi::CString::new("ffi-cb-roundtrip").unwrap();
1533            let uri_c = std::ffi::CString::new("cb://round-trip").unwrap();
1534            unsafe {
1535                assert_eq!(
1536                    net_blob_register_callback_adapter(id_c.as_ptr(), &vtable, ctx_ptr),
1537                    0
1538                );
1539
1540                let payload = b"vtable round-trip payload";
1541                let mut out_buf: *mut u8 = std::ptr::null_mut();
1542                let mut out_len: usize = 0;
1543                let rc = net_blob_publish(
1544                    id_c.as_ptr(),
1545                    uri_c.as_ptr(),
1546                    payload.as_ptr(),
1547                    payload.len(),
1548                    &mut out_buf,
1549                    &mut out_len,
1550                );
1551                assert_eq!(rc, 0);
1552
1553                let mut content_buf: *mut u8 = std::ptr::null_mut();
1554                let mut content_len: usize = 0;
1555                let rc = net_blob_resolve(
1556                    id_c.as_ptr(),
1557                    out_buf,
1558                    out_len,
1559                    &mut content_buf,
1560                    &mut content_len,
1561                );
1562                assert_eq!(rc, 0);
1563                let resolved = std::slice::from_raw_parts(content_buf, content_len);
1564                assert_eq!(resolved, payload);
1565
1566                net_blob_free_buffer(out_buf, out_len);
1567                net_blob_free_buffer(content_buf, content_len);
1568                assert_eq!(net_blob_unregister_adapter(id_c.as_ptr()), 1);
1569
1570                // Reclaim the leaked ctx box.
1571                drop(Box::from_raw(ctx_ptr as *mut CallbackCtx));
1572            }
1573        }
1574    }
1575
1576    #[test]
1577    fn ffi_duplicate_registration_rejected() {
1578        let id = unique_id("ffi-dup");
1579        let root = std::env::temp_dir().join(format!("net-ffi-blob-{}", id));
1580        let id_c = CString::new(id.clone()).unwrap();
1581        let root_c = CString::new(root.to_string_lossy().as_ref()).unwrap();
1582        unsafe {
1583            assert_eq!(
1584                net_blob_register_fs_adapter(id_c.as_ptr(), root_c.as_ptr()),
1585                0
1586            );
1587            assert_eq!(
1588                net_blob_register_fs_adapter(id_c.as_ptr(), root_c.as_ptr()),
1589                NET_ERR_BLOB_DUPLICATE_ID
1590            );
1591            assert_eq!(net_blob_unregister_adapter(id_c.as_ptr()), 1);
1592        }
1593        let _ = std::fs::remove_dir_all(&root);
1594    }
1595}