1#![allow(clippy::missing_safety_doc)]
31#![expect(
32 clippy::undocumented_unsafe_blocks,
33 reason = "module-wide FFI safety contract documented in the # Safety preamble above"
34)]
35#![expect(
36 clippy::multiple_unsafe_ops_per_block,
37 reason = "FFI entry points routinely deref + write to multiple out-parameter fields under the same caller contract"
38)]
39
40use std::ffi::{c_char, c_int, CStr};
41use std::os::raw::c_void;
42use std::path::PathBuf;
43use std::ptr;
44use std::sync::Arc;
45
46use tokio::runtime::Runtime;
47
48#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
49use crate::adapter::net::behavior::TopologyScope;
50use crate::adapter::net::dataforts::{
51 global_blob_adapter_registry, publish_blob, resolve_payload, BlobAdapter,
52 BlobError as InnerBlobError, FileSystemAdapter,
53};
54#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
59use crate::adapter::net::dataforts::{
60 BlobRef as InnerBlobRef, MeshBlobAdapter as InnerMeshBlobAdapter,
61 OverflowConfig as InnerOverflowConfig,
62};
63
64use super::NetError;
65
66pub const NET_ERR_BLOB_DECODE: c_int = -110;
68pub const NET_ERR_BLOB_DUPLICATE_ID: c_int = -111;
70pub const NET_ERR_BLOB_NOT_REGISTERED: c_int = -112;
72pub const NET_ERR_BLOB_NOT_FOUND: c_int = -113;
74pub const NET_ERR_BLOB_HASH_MISMATCH: c_int = -114;
76pub const NET_ERR_BLOB_BACKEND: c_int = -115;
78pub const NET_ERR_BLOB_UNSUPPORTED_SCHEME: c_int = -116;
81pub const NET_ERR_BLOB_ADAPTER_NOT_CONFIGURED: c_int = -118;
83pub const NET_ERR_BLOB_ADAPTER_NOT_REGISTERED: c_int = -119;
85pub const NET_ERR_BLOB_PANIC: c_int = -117;
91pub const NET_ERR_BLOB_UNAUTHORIZED: c_int = -120;
96
97fn runtime() -> &'static Arc<Runtime> {
98 use std::sync::OnceLock;
99 static RT: OnceLock<Arc<Runtime>> = OnceLock::new();
100 RT.get_or_init(|| {
101 match tokio::runtime::Builder::new_multi_thread()
102 .enable_all()
103 .build()
104 {
105 Ok(rt) => Arc::new(rt),
106 Err(e) => {
107 eprintln!("FATAL: blob FFI tokio runtime build failure ({e:?}); aborting");
108 std::process::abort();
109 }
110 }
111 })
112}
113
114fn block_on<F: std::future::Future>(future: F) -> F::Output {
115 if tokio::runtime::Handle::try_current().is_ok() {
116 eprintln!("FATAL: blob FFI called from inside a tokio runtime context; aborting");
117 std::process::abort();
118 }
119 runtime().block_on(future)
120}
121
122unsafe fn c_str_to_owned(p: *const c_char) -> Option<String> {
123 if p.is_null() {
124 return None;
125 }
126 CStr::from_ptr(p).to_str().ok().map(|s| s.to_owned())
127}
128
129fn err_to_code(e: &InnerBlobError) -> c_int {
130 match e {
131 InnerBlobError::HashMismatch { .. } => NET_ERR_BLOB_HASH_MISMATCH,
132 InnerBlobError::NotFound(_) => NET_ERR_BLOB_NOT_FOUND,
133 InnerBlobError::Backend(_) => NET_ERR_BLOB_BACKEND,
134 InnerBlobError::Cancelled => NET_ERR_BLOB_BACKEND,
135 InnerBlobError::UnsupportedScheme(_) => NET_ERR_BLOB_UNSUPPORTED_SCHEME,
136 InnerBlobError::UnsupportedVersion(_) => NET_ERR_BLOB_DECODE,
137 InnerBlobError::Decode(_) => NET_ERR_BLOB_DECODE,
138 InnerBlobError::AdapterNotConfigured => NET_ERR_BLOB_ADAPTER_NOT_CONFIGURED,
139 InnerBlobError::AdapterNotRegistered(_) => NET_ERR_BLOB_ADAPTER_NOT_REGISTERED,
140 InnerBlobError::Unauthorized(_) => NET_ERR_BLOB_UNAUTHORIZED,
141 InnerBlobError::ShortChunk { .. } => NET_ERR_BLOB_BACKEND,
149 }
150}
151
152#[unsafe(no_mangle)]
164pub unsafe extern "C" fn net_blob_register_fs_adapter(
165 adapter_id: *const c_char,
166 root: *const c_char,
167) -> c_int {
168 let id = match c_str_to_owned(adapter_id) {
169 Some(s) => s,
170 None => return NetError::InvalidUtf8.into(),
171 };
172 let root = match c_str_to_owned(root) {
173 Some(s) => s,
174 None => return NetError::InvalidUtf8.into(),
175 };
176 let adapter: Arc<dyn BlobAdapter> =
177 Arc::new(FileSystemAdapter::new(id.clone(), PathBuf::from(root)));
178 match global_blob_adapter_registry().register(adapter) {
179 Ok(()) => 0,
180 Err(_) => NET_ERR_BLOB_DUPLICATE_ID,
181 }
182}
183
184#[unsafe(no_mangle)]
192pub unsafe extern "C" fn net_blob_unregister_adapter(adapter_id: *const c_char) -> c_int {
193 let id = match c_str_to_owned(adapter_id) {
194 Some(s) => s,
195 None => return NetError::InvalidUtf8.into(),
196 };
197 if global_blob_adapter_registry().unregister(&id).is_some() {
198 1
199 } else {
200 0
201 }
202}
203
204#[unsafe(no_mangle)]
211pub unsafe extern "C" fn net_blob_adapter_registered(adapter_id: *const c_char) -> c_int {
212 let id = match c_str_to_owned(adapter_id) {
213 Some(s) => s,
214 None => return NetError::InvalidUtf8.into(),
215 };
216 if global_blob_adapter_registry().get(&id).is_some() {
217 1
218 } else {
219 0
220 }
221}
222
223#[unsafe(no_mangle)]
240pub unsafe extern "C" fn net_blob_publish(
241 adapter_id: *const c_char,
242 uri: *const c_char,
243 data: *const u8,
244 data_len: usize,
245 out_payload: *mut *mut u8,
246 out_payload_len: *mut usize,
247) -> c_int {
248 if out_payload.is_null() || out_payload_len.is_null() {
249 return NetError::NullPointer.into();
250 }
251 *out_payload = ptr::null_mut();
252 *out_payload_len = 0;
253
254 let id = match c_str_to_owned(adapter_id) {
255 Some(s) => s,
256 None => return NetError::InvalidUtf8.into(),
257 };
258 let uri = match c_str_to_owned(uri) {
259 Some(s) => s,
260 None => return NetError::InvalidUtf8.into(),
261 };
262 if data.is_null() && data_len > 0 {
263 return NetError::NullPointer.into();
264 }
265 if data_len > isize::MAX as usize {
267 return NetError::InvalidJson.into();
268 }
269 let data_slice = if data_len == 0 {
270 &[][..]
271 } else {
272 std::slice::from_raw_parts(data, data_len)
273 };
274
275 let adapter = match global_blob_adapter_registry().get(&id) {
276 Some(a) => a,
277 None => return NET_ERR_BLOB_NOT_REGISTERED,
278 };
279 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
284 block_on(async move { publish_blob(adapter.as_ref(), uri, data_slice).await })
285 }));
286 let bytes = match result {
287 Ok(Ok(b)) => b,
288 Ok(Err(e)) => return err_to_code(&e),
289 Err(_) => return NET_ERR_BLOB_PANIC,
290 };
291
292 write_bytes_out(&bytes, out_payload, out_payload_len)
293}
294
295#[unsafe(no_mangle)]
312pub unsafe extern "C" fn net_blob_resolve(
313 adapter_id: *const c_char,
314 payload: *const u8,
315 payload_len: usize,
316 out_content: *mut *mut u8,
317 out_content_len: *mut usize,
318) -> c_int {
319 if out_content.is_null() || out_content_len.is_null() {
320 return NetError::NullPointer.into();
321 }
322 *out_content = ptr::null_mut();
323 *out_content_len = 0;
324
325 let id = match c_str_to_owned(adapter_id) {
326 Some(s) => s,
327 None => return NetError::InvalidUtf8.into(),
328 };
329 if payload.is_null() && payload_len > 0 {
330 return NetError::NullPointer.into();
331 }
332 if payload_len > isize::MAX as usize {
334 return NetError::InvalidJson.into();
335 }
336 let payload_slice = if payload_len == 0 {
337 &[][..]
338 } else {
339 std::slice::from_raw_parts(payload, payload_len)
340 };
341
342 let adapter = match global_blob_adapter_registry().get(&id) {
343 Some(a) => a,
344 None => return NET_ERR_BLOB_NOT_REGISTERED,
345 };
346 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
348 block_on(async move { resolve_payload(payload_slice, adapter.as_ref()).await })
349 }));
350 let bytes = match result {
351 Ok(Ok(b)) => b,
352 Ok(Err(e)) => return err_to_code(&e),
353 Err(_) => return NET_ERR_BLOB_PANIC,
354 };
355
356 write_bytes_out(&bytes, out_content, out_content_len)
357}
358
359unsafe fn write_bytes_out(src: &[u8], out_ptr: *mut *mut u8, out_len: *mut usize) -> c_int {
373 let len = src.len();
374 if len == 0 {
375 unsafe {
376 *out_ptr = ptr::null_mut();
377 *out_len = 0;
378 }
379 return 0;
380 }
381 let layout = match std::alloc::Layout::array::<u8>(len) {
382 Ok(l) => l,
383 Err(_) => return NetError::InvalidJson.into(),
391 };
392 let alloc_ptr = unsafe { std::alloc::alloc(layout) };
393 if alloc_ptr.is_null() {
394 std::alloc::handle_alloc_error(layout);
395 }
396 unsafe {
397 std::ptr::copy_nonoverlapping(src.as_ptr(), alloc_ptr, len);
398 *out_ptr = alloc_ptr;
399 *out_len = len;
400 }
401 0
402}
403
404#[unsafe(no_mangle)]
414pub unsafe extern "C" fn net_blob_free_buffer(ptr: *mut u8, len: usize) {
415 if ptr.is_null() || len == 0 {
416 return;
417 }
418 let layout = match std::alloc::Layout::array::<u8>(len) {
424 Ok(l) => l,
425 Err(_) => return,
426 };
427 std::alloc::dealloc(ptr, layout);
428}
429
430#[allow(dead_code)]
433fn _force_use() -> *mut c_void {
434 ptr::null_mut()
435}
436
437use std::ops::Range;
445
446use async_trait::async_trait;
447use bytes::Bytes;
448
449pub type NetBlobAdapterStoreFn = unsafe extern "C" fn(
452 ctx: *mut c_void,
453 uri: *const c_char,
454 hash: *const u8, size: u64,
456 data: *const u8,
457 data_len: usize,
458) -> c_int;
459
460pub type NetBlobAdapterFetchFn = unsafe extern "C" fn(
465 ctx: *mut c_void,
466 uri: *const c_char,
467 hash: *const u8,
468 size: u64,
469 out_data: *mut *mut u8,
470 out_len: *mut usize,
471) -> c_int;
472
473pub type NetBlobAdapterFetchRangeFn = unsafe extern "C" fn(
475 ctx: *mut c_void,
476 uri: *const c_char,
477 hash: *const u8,
478 size: u64,
479 range_start: u64,
480 range_end: u64,
481 out_data: *mut *mut u8,
482 out_len: *mut usize,
483) -> c_int;
484
485pub type NetBlobAdapterExistsFn = unsafe extern "C" fn(
488 ctx: *mut c_void,
489 uri: *const c_char,
490 hash: *const u8,
491 size: u64,
492 out_exists: *mut c_int,
493) -> c_int;
494
495pub type NetBlobAdapterFreeFn = unsafe extern "C" fn(ctx: *mut c_void, data: *mut u8, len: usize);
499
500#[repr(C)]
504#[derive(Clone, Copy)]
505pub struct NetBlobAdapterVtable {
506 pub store: NetBlobAdapterStoreFn,
508 pub fetch: NetBlobAdapterFetchFn,
510 pub fetch_range: NetBlobAdapterFetchRangeFn,
512 pub exists: NetBlobAdapterExistsFn,
514 pub free_buffer: NetBlobAdapterFreeFn,
518}
519
520struct OpaqueCtx(*mut c_void);
548
549unsafe impl Send for OpaqueCtx {}
554unsafe impl Sync for OpaqueCtx {}
555
556impl OpaqueCtx {
557 fn new(ptr: *mut c_void) -> Self {
558 Self(ptr)
559 }
560 fn get(&self) -> *mut c_void {
561 self.0
562 }
563}
564
565struct CallbackBlobAdapter {
572 id: String,
573 vtable: NetBlobAdapterVtable,
574 ctx: Arc<OpaqueCtx>,
575}
576
577unsafe impl Send for CallbackBlobAdapter {}
578unsafe impl Sync for CallbackBlobAdapter {}
579
580fn code_to_err(code: c_int, label: &str) -> InnerBlobError {
581 match code {
582 NET_ERR_BLOB_NOT_FOUND => InnerBlobError::NotFound(label.into()),
583 NET_ERR_BLOB_HASH_MISMATCH => InnerBlobError::Backend(format!(
584 "{}: substrate hash mismatch (caller returned wrong bytes)",
585 label
586 )),
587 NET_ERR_BLOB_UNSUPPORTED_SCHEME => InnerBlobError::UnsupportedScheme(label.into()),
588 NET_ERR_BLOB_DECODE => InnerBlobError::Decode(label.into()),
589 _ => InnerBlobError::Backend(format!("{}: code {}", label, code)),
590 }
591}
592
593fn expect_small_for_ffi(
601 blob_ref: &crate::adapter::net::dataforts::BlobRef,
602) -> std::result::Result<(String, [u8; 32], u64), InnerBlobError> {
603 match blob_ref {
604 crate::adapter::net::dataforts::BlobRef::Small {
605 uri, hash, size, ..
606 } => Ok((uri.clone(), *hash, *size)),
607 crate::adapter::net::dataforts::BlobRef::Manifest { .. }
608 | crate::adapter::net::dataforts::BlobRef::Tree { .. } => Err(InnerBlobError::Backend(
609 "CallbackBlobAdapter operates on Small blobs only; \
610 chunked blobs are dispatched at the substrate above"
611 .to_owned(),
612 )),
613 }
614}
615
616#[async_trait]
617impl BlobAdapter for CallbackBlobAdapter {
618 fn adapter_id(&self) -> &str {
619 &self.id
620 }
621
622 async fn store(
623 &self,
624 blob_ref: &crate::adapter::net::dataforts::BlobRef,
625 bytes: &[u8],
626 ) -> std::result::Result<(), InnerBlobError> {
627 let vtable = self.vtable;
628 let ctx = self.ctx.clone();
629 let (uri_str, hash, size) = expect_small_for_ffi(blob_ref)?;
630 let uri = match std::ffi::CString::new(uri_str) {
631 Ok(c) => c,
632 Err(e) => return Err(InnerBlobError::Backend(format!("uri NUL: {}", e))),
633 };
634 let data = bytes.to_vec();
635 tokio::task::spawn_blocking(move || -> std::result::Result<(), InnerBlobError> {
636 let code = unsafe {
637 (vtable.store)(
638 ctx.get(),
639 uri.as_ptr(),
640 hash.as_ptr(),
641 size,
642 data.as_ptr(),
643 data.len(),
644 )
645 };
646 if code == 0 {
647 Ok(())
648 } else {
649 Err(code_to_err(code, "store"))
650 }
651 })
652 .await
653 .map_err(|e| InnerBlobError::Backend(format!("spawn_blocking join: {}", e)))?
654 }
655
656 async fn fetch(
657 &self,
658 blob_ref: &crate::adapter::net::dataforts::BlobRef,
659 ) -> std::result::Result<Bytes, InnerBlobError> {
660 let vtable = self.vtable;
661 let ctx = self.ctx.clone();
662 let (uri_str, hash, size) = expect_small_for_ffi(blob_ref)?;
663 let uri = match std::ffi::CString::new(uri_str) {
664 Ok(c) => c,
665 Err(e) => return Err(InnerBlobError::Backend(format!("uri NUL: {}", e))),
666 };
667 tokio::task::spawn_blocking(move || -> std::result::Result<Bytes, InnerBlobError> {
668 let mut out_data: *mut u8 = ptr::null_mut();
669 let mut out_len: usize = 0;
670 let code = unsafe {
671 (vtable.fetch)(
672 ctx.get(),
673 uri.as_ptr(),
674 hash.as_ptr(),
675 size,
676 &mut out_data,
677 &mut out_len,
678 )
679 };
680 if code != 0 {
681 return Err(code_to_err(code, "fetch"));
682 }
683 if out_data.is_null() {
684 if out_len == 0 {
685 return Ok(Bytes::new());
686 }
687 return Err(InnerBlobError::Backend(
688 "fetch: caller returned null pointer with non-zero len".into(),
689 ));
690 }
691 let buf = unsafe { std::slice::from_raw_parts(out_data, out_len).to_vec() };
700 unsafe { (vtable.free_buffer)(ctx.get(), out_data, out_len) };
701 Ok(Bytes::from(buf))
702 })
703 .await
704 .map_err(|e| InnerBlobError::Backend(format!("spawn_blocking join: {}", e)))?
705 }
706
707 async fn fetch_range(
708 &self,
709 blob_ref: &crate::adapter::net::dataforts::BlobRef,
710 range: Range<u64>,
711 ) -> std::result::Result<Bytes, InnerBlobError> {
712 let vtable = self.vtable;
713 let ctx = self.ctx.clone();
714 let (uri_str, hash, size) = expect_small_for_ffi(blob_ref)?;
715 let uri = match std::ffi::CString::new(uri_str) {
716 Ok(c) => c,
717 Err(e) => return Err(InnerBlobError::Backend(format!("uri NUL: {}", e))),
718 };
719 let start = range.start;
720 let end = range.end;
721 tokio::task::spawn_blocking(move || -> std::result::Result<Bytes, InnerBlobError> {
722 let mut out_data: *mut u8 = ptr::null_mut();
723 let mut out_len: usize = 0;
724 let code = unsafe {
725 (vtable.fetch_range)(
726 ctx.get(),
727 uri.as_ptr(),
728 hash.as_ptr(),
729 size,
730 start,
731 end,
732 &mut out_data,
733 &mut out_len,
734 )
735 };
736 if code != 0 {
737 return Err(code_to_err(code, "fetch_range"));
738 }
739 if out_data.is_null() {
740 if out_len == 0 {
741 return Ok(Bytes::new());
742 }
743 return Err(InnerBlobError::Backend(
744 "fetch_range: caller returned null pointer with non-zero len".into(),
745 ));
746 }
747 let buf = unsafe { std::slice::from_raw_parts(out_data, out_len).to_vec() };
748 unsafe { (vtable.free_buffer)(ctx.get(), out_data, out_len) };
749 Ok(Bytes::from(buf))
750 })
751 .await
752 .map_err(|e| InnerBlobError::Backend(format!("spawn_blocking join: {}", e)))?
753 }
754
755 async fn exists(
756 &self,
757 blob_ref: &crate::adapter::net::dataforts::BlobRef,
758 ) -> std::result::Result<bool, InnerBlobError> {
759 let vtable = self.vtable;
760 let ctx = self.ctx.clone();
761 let (uri_str, hash, size) = expect_small_for_ffi(blob_ref)?;
762 let uri = match std::ffi::CString::new(uri_str) {
763 Ok(c) => c,
764 Err(e) => return Err(InnerBlobError::Backend(format!("uri NUL: {}", e))),
765 };
766 tokio::task::spawn_blocking(move || -> std::result::Result<bool, InnerBlobError> {
767 let mut out_exists: c_int = 0;
768 let code = unsafe {
769 (vtable.exists)(
770 ctx.get(),
771 uri.as_ptr(),
772 hash.as_ptr(),
773 size,
774 &mut out_exists,
775 )
776 };
777 if code != 0 {
778 return Err(code_to_err(code, "exists"));
779 }
780 Ok(out_exists != 0)
781 })
782 .await
783 .map_err(|e| InnerBlobError::Backend(format!("spawn_blocking join: {}", e)))?
784 }
785}
786
787#[unsafe(no_mangle)]
827pub unsafe extern "C" fn net_blob_register_callback_adapter(
828 adapter_id: *const c_char,
829 vtable: *const NetBlobAdapterVtable,
830 ctx: *mut c_void,
831) -> c_int {
832 if vtable.is_null() {
833 return NetError::NullPointer.into();
834 }
835 let id = match c_str_to_owned(adapter_id) {
836 Some(s) => s,
837 None => return NetError::InvalidUtf8.into(),
838 };
839 {
846 let raw = vtable as *const c_void as *const *const c_void;
847 for i in 0..5 {
852 let field = unsafe { *raw.add(i) };
853 if field.is_null() {
854 return NET_ERR_BLOB_BACKEND;
855 }
856 }
857 }
858 let vtable = unsafe { *vtable };
859 let adapter: Arc<dyn BlobAdapter> = Arc::new(CallbackBlobAdapter {
860 id: id.clone(),
861 vtable,
862 ctx: Arc::new(OpaqueCtx::new(ctx)),
863 });
864 match global_blob_adapter_registry().register(adapter) {
865 Ok(()) => 0,
866 Err(_) => NET_ERR_BLOB_DUPLICATE_ID,
867 }
868}
869
870#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
886pub struct MeshBlobAdapterHandle {
887 inner: ManuallyDrop<Arc<InnerMeshBlobAdapter>>,
888}
889
890#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
891use std::mem::ManuallyDrop;
892
893#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
898#[derive(serde::Deserialize, serde::Serialize)]
899struct OverflowConfigJson {
900 #[serde(default)]
901 enabled: bool,
902 #[serde(default)]
903 high_water_ratio: Option<f64>,
904 #[serde(default)]
905 low_water_ratio: Option<f64>,
906 #[serde(default)]
907 max_pushes_per_tick: Option<u64>,
908 #[serde(default)]
909 scope: Option<String>,
910 #[serde(default)]
911 tick_interval_ms: Option<u64>,
912}
913
914#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
915fn parse_overflow_json(s: &str) -> Result<InnerOverflowConfig, c_int> {
916 if s.is_empty() {
917 return Ok(InnerOverflowConfig::default());
918 }
919 let raw: OverflowConfigJson =
920 serde_json::from_str(s).map_err(|_| -> c_int { NetError::InvalidJson.into() })?;
921 let mut cfg = InnerOverflowConfig {
922 enabled: raw.enabled,
923 ..InnerOverflowConfig::default()
924 };
925 if let Some(v) = raw.high_water_ratio {
926 cfg.high_water_ratio = v;
927 }
928 if let Some(v) = raw.low_water_ratio {
929 cfg.low_water_ratio = v;
930 }
931 if let Some(v) = raw.max_pushes_per_tick {
932 cfg.max_pushes_per_tick = v as usize;
933 }
934 if let Some(s) = raw.scope {
935 cfg.scope = match s.to_ascii_lowercase().as_str() {
936 "node" => TopologyScope::Node,
937 "zone" => TopologyScope::Zone,
938 "region" => TopologyScope::Region,
939 "mesh" => TopologyScope::Mesh,
940 _ => {
941 let code: c_int = NetError::InvalidJson.into();
942 return Err(code);
943 }
944 };
945 }
946 if let Some(v) = raw.tick_interval_ms {
947 cfg.tick_interval_ms = v;
948 }
949 Ok(cfg)
950}
951
952#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
953fn overflow_to_json(cfg: InnerOverflowConfig) -> String {
954 let scope = match cfg.scope {
955 TopologyScope::Node => "node",
956 TopologyScope::Zone => "zone",
957 TopologyScope::Region => "region",
958 TopologyScope::Mesh => "mesh",
959 };
960 let raw = OverflowConfigJson {
961 enabled: cfg.enabled,
962 high_water_ratio: Some(cfg.high_water_ratio),
963 low_water_ratio: Some(cfg.low_water_ratio),
964 max_pushes_per_tick: Some(cfg.max_pushes_per_tick as u64),
965 scope: Some(scope.to_string()),
966 tick_interval_ms: Some(cfg.tick_interval_ms),
967 };
968 serde_json::to_string(&raw).unwrap_or_else(|_| "{}".to_string())
969}
970
971#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
993#[unsafe(no_mangle)]
994pub unsafe extern "C" fn net_mesh_blob_adapter_new(
995 redex: *mut super::cortex::RedexHandle,
996 adapter_id: *const c_char,
997 persistent: c_int,
998 overflow_json: *const c_char,
999) -> *mut MeshBlobAdapterHandle {
1000 if redex.is_null() {
1001 return ptr::null_mut();
1002 }
1003 let id = match unsafe { c_str_to_owned(adapter_id) } {
1004 Some(s) => s,
1005 None => return ptr::null_mut(),
1006 };
1007 let overflow_str = if overflow_json.is_null() {
1008 String::new()
1009 } else {
1010 match unsafe { c_str_to_owned(overflow_json) } {
1011 Some(s) => s,
1012 None => return ptr::null_mut(),
1013 }
1014 };
1015 let overflow_cfg = match parse_overflow_json(&overflow_str) {
1016 Ok(c) => c,
1017 Err(_) => return ptr::null_mut(),
1018 };
1019 let redex_inner = unsafe { (*redex).redex_arc() };
1020 let mut builder = InnerMeshBlobAdapter::new(id, redex_inner).with_persistent(persistent != 0);
1021 if !overflow_str.is_empty() {
1022 builder = builder.with_overflow(overflow_cfg);
1023 }
1024 Box::into_raw(Box::new(MeshBlobAdapterHandle {
1025 inner: ManuallyDrop::new(Arc::new(builder)),
1026 }))
1027}
1028
1029#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1036#[unsafe(no_mangle)]
1037pub unsafe extern "C" fn net_mesh_blob_adapter_free(handle: *mut MeshBlobAdapterHandle) {
1038 if handle.is_null() {
1039 return;
1040 }
1041 let mut boxed = unsafe { Box::from_raw(handle) };
1042 unsafe { ManuallyDrop::drop(&mut boxed.inner) };
1043}
1044
1045#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1058#[unsafe(no_mangle)]
1059pub unsafe extern "C" fn net_mesh_blob_adapter_store(
1060 handle: *const MeshBlobAdapterHandle,
1061 blob_ref_bytes: *const u8,
1062 blob_ref_len: usize,
1063 data: *const u8,
1064 data_len: usize,
1065) -> c_int {
1066 if handle.is_null() || blob_ref_bytes.is_null() {
1067 return NetError::NullPointer.into();
1068 }
1069 if blob_ref_len > isize::MAX as usize || data_len > isize::MAX as usize {
1071 return NetError::InvalidJson.into();
1072 }
1073 let blob_slice = unsafe { std::slice::from_raw_parts(blob_ref_bytes, blob_ref_len) };
1074 let blob_ref = match InnerBlobRef::decode(blob_slice) {
1075 Ok(Some(b)) => b,
1076 _ => return NET_ERR_BLOB_DECODE,
1077 };
1078 let data_slice = if data.is_null() {
1079 &[]
1080 } else {
1081 unsafe { std::slice::from_raw_parts(data, data_len) }
1082 };
1083 let adapter = unsafe { (*handle).inner.clone() };
1084 let data_owned = data_slice.to_vec();
1085 let result = block_on(async move { (*adapter).store(&blob_ref, &data_owned).await });
1086 match result {
1087 Ok(()) => 0,
1088 Err(e) => err_to_code(&e),
1089 }
1090}
1091
1092#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1101#[unsafe(no_mangle)]
1102pub unsafe extern "C" fn net_mesh_blob_adapter_fetch(
1103 handle: *const MeshBlobAdapterHandle,
1104 blob_ref_bytes: *const u8,
1105 blob_ref_len: usize,
1106 out_data: *mut *mut u8,
1107 out_len: *mut usize,
1108) -> c_int {
1109 if handle.is_null() || blob_ref_bytes.is_null() || out_data.is_null() || out_len.is_null() {
1110 return NetError::NullPointer.into();
1111 }
1112 if blob_ref_len > isize::MAX as usize {
1114 return NetError::InvalidJson.into();
1115 }
1116 let blob_slice = unsafe { std::slice::from_raw_parts(blob_ref_bytes, blob_ref_len) };
1117 let blob_ref = match InnerBlobRef::decode(blob_slice) {
1118 Ok(Some(b)) => b,
1119 _ => return NET_ERR_BLOB_DECODE,
1120 };
1121 let adapter = unsafe { (*handle).inner.clone() };
1122 let result = block_on(async move { (*adapter).fetch(&blob_ref).await });
1123 match result {
1124 Ok(bytes) => unsafe { write_bytes_out(&bytes, out_data, out_len) },
1130 Err(e) => err_to_code(&e),
1131 }
1132}
1133
1134#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1141#[unsafe(no_mangle)]
1142pub unsafe extern "C" fn net_mesh_blob_adapter_exists(
1143 handle: *const MeshBlobAdapterHandle,
1144 blob_ref_bytes: *const u8,
1145 blob_ref_len: usize,
1146 out_exists: *mut c_int,
1147) -> c_int {
1148 if handle.is_null() || blob_ref_bytes.is_null() || out_exists.is_null() {
1149 return NetError::NullPointer.into();
1150 }
1151 if blob_ref_len > isize::MAX as usize {
1153 return NetError::InvalidJson.into();
1154 }
1155 let blob_slice = unsafe { std::slice::from_raw_parts(blob_ref_bytes, blob_ref_len) };
1156 let blob_ref = match InnerBlobRef::decode(blob_slice) {
1157 Ok(Some(b)) => b,
1158 _ => return NET_ERR_BLOB_DECODE,
1159 };
1160 let adapter = unsafe { (*handle).inner.clone() };
1161 let result = block_on(async move { (*adapter).exists(&blob_ref).await });
1162 match result {
1163 Ok(present) => {
1164 unsafe { *out_exists = if present { 1 } else { 0 } };
1165 0
1166 }
1167 Err(e) => err_to_code(&e),
1168 }
1169}
1170
1171#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1179#[unsafe(no_mangle)]
1180pub unsafe extern "C" fn net_mesh_blob_adapter_prometheus_text(
1181 handle: *const MeshBlobAdapterHandle,
1182) -> *mut c_char {
1183 if handle.is_null() {
1184 return ptr::null_mut();
1185 }
1186 let adapter = unsafe { (*handle).inner.clone() };
1187 let body = (*adapter).prometheus_text();
1188 match std::ffi::CString::new(body) {
1189 Ok(s) => s.into_raw(),
1190 Err(_) => ptr::null_mut(),
1191 }
1192}
1193
1194#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1202#[unsafe(no_mangle)]
1203pub unsafe extern "C" fn net_mesh_blob_adapter_overflow_enabled(
1204 handle: *const MeshBlobAdapterHandle,
1205) -> c_int {
1206 if handle.is_null() {
1207 return NetError::NullPointer.into();
1208 }
1209 let adapter = unsafe { (*handle).inner.clone() };
1210 if (*adapter).overflow_enabled() {
1211 1
1212 } else {
1213 0
1214 }
1215}
1216
1217#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1223#[unsafe(no_mangle)]
1224pub unsafe extern "C" fn net_mesh_blob_adapter_overflow_active(
1225 handle: *const MeshBlobAdapterHandle,
1226) -> c_int {
1227 if handle.is_null() {
1228 return NetError::NullPointer.into();
1229 }
1230 let adapter = unsafe { (*handle).inner.clone() };
1231 if (*adapter).overflow_active() {
1232 1
1233 } else {
1234 0
1235 }
1236}
1237
1238#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1245#[unsafe(no_mangle)]
1246pub unsafe extern "C" fn net_mesh_blob_adapter_overflow_config(
1247 handle: *const MeshBlobAdapterHandle,
1248) -> *mut c_char {
1249 if handle.is_null() {
1250 return ptr::null_mut();
1251 }
1252 let adapter = unsafe { (*handle).inner.clone() };
1253 let cfg = (*adapter).overflow_config();
1254 let json = overflow_to_json(cfg);
1255 match std::ffi::CString::new(json) {
1256 Ok(s) => s.into_raw(),
1257 Err(_) => ptr::null_mut(),
1258 }
1259}
1260
1261#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1267#[unsafe(no_mangle)]
1268pub unsafe extern "C" fn net_mesh_blob_adapter_set_overflow_enabled(
1269 handle: *const MeshBlobAdapterHandle,
1270 enabled: c_int,
1271) -> c_int {
1272 if handle.is_null() {
1273 return NetError::NullPointer.into();
1274 }
1275 let adapter = unsafe { (*handle).inner.clone() };
1276 (*adapter).set_overflow_enabled(enabled != 0);
1277 0
1278}
1279
1280#[cfg(all(feature = "dataforts", feature = "netdb", feature = "redex-disk"))]
1288#[unsafe(no_mangle)]
1289pub unsafe extern "C" fn net_mesh_blob_adapter_set_overflow_config(
1290 handle: *const MeshBlobAdapterHandle,
1291 config_json: *const c_char,
1292) -> c_int {
1293 if handle.is_null() || config_json.is_null() {
1294 return NetError::NullPointer.into();
1295 }
1296 let s = match unsafe { c_str_to_owned(config_json) } {
1297 Some(s) => s,
1298 None => return NetError::InvalidUtf8.into(),
1299 };
1300 let cfg = match parse_overflow_json(&s) {
1301 Ok(c) => c,
1302 Err(code) => return code,
1303 };
1304 let adapter = unsafe { (*handle).inner.clone() };
1305 (*adapter).set_overflow_config(cfg);
1306 0
1307}
1308
1309#[cfg(test)]
1310mod tests {
1311 #![allow(
1312 clippy::disallowed_methods,
1313 reason = "test code legitimately uses std::sync::{Mutex,RwLock} for SUT setup; tests have no real poison concern"
1314 )]
1315 use super::*;
1316 use std::ffi::CString;
1317 use std::sync::atomic::{AtomicU64, Ordering};
1318
1319 fn unique_id(prefix: &str) -> String {
1320 static N: AtomicU64 = AtomicU64::new(0);
1321 let n = N.fetch_add(1, Ordering::Relaxed);
1322 format!("{}-{}-{}", prefix, std::process::id(), n)
1323 }
1324
1325 #[test]
1328 fn ffi_publish_resolve_round_trip() {
1329 let id = unique_id("ffi-blob");
1330 let root = std::env::temp_dir().join(format!("net-ffi-blob-{}", id));
1331 let id_c = CString::new(id.clone()).unwrap();
1332 let root_c = CString::new(root.to_string_lossy().as_ref()).unwrap();
1333 let uri_c = CString::new("file:///ffi-round-trip").unwrap();
1334
1335 unsafe {
1336 assert_eq!(
1337 net_blob_register_fs_adapter(id_c.as_ptr(), root_c.as_ptr()),
1338 0
1339 );
1340 assert_eq!(net_blob_adapter_registered(id_c.as_ptr()), 1);
1341
1342 let payload = b"end-to-end ffi blob round trip";
1343 let mut out_buf: *mut u8 = std::ptr::null_mut();
1344 let mut out_len: usize = 0;
1345 let rc = net_blob_publish(
1346 id_c.as_ptr(),
1347 uri_c.as_ptr(),
1348 payload.as_ptr(),
1349 payload.len(),
1350 &mut out_buf,
1351 &mut out_len,
1352 );
1353 assert_eq!(rc, 0);
1354 assert!(!out_buf.is_null());
1355 let encoded = std::slice::from_raw_parts(out_buf, out_len);
1357 assert_eq!(
1358 &encoded[..4],
1359 &crate::adapter::net::dataforts::BLOB_REF_MAGIC,
1360 );
1361
1362 let mut content_buf: *mut u8 = std::ptr::null_mut();
1364 let mut content_len: usize = 0;
1365 let rc = net_blob_resolve(
1366 id_c.as_ptr(),
1367 out_buf,
1368 out_len,
1369 &mut content_buf,
1370 &mut content_len,
1371 );
1372 assert_eq!(rc, 0);
1373 let resolved = std::slice::from_raw_parts(content_buf, content_len);
1374 assert_eq!(resolved, payload);
1375
1376 net_blob_free_buffer(out_buf, out_len);
1377 net_blob_free_buffer(content_buf, content_len);
1378 assert_eq!(net_blob_unregister_adapter(id_c.as_ptr()), 1);
1379 }
1380 let _ = std::fs::remove_dir_all(&root);
1381 }
1382
1383 #[test]
1384 fn ffi_resolve_returns_not_registered_for_unknown_adapter() {
1385 let id_c = CString::new("never-registered").unwrap();
1386 let payload = b"any";
1387 let mut out_buf: *mut u8 = std::ptr::null_mut();
1388 let mut out_len: usize = 0;
1389 let rc = unsafe {
1390 net_blob_resolve(
1391 id_c.as_ptr(),
1392 payload.as_ptr(),
1393 payload.len(),
1394 &mut out_buf,
1395 &mut out_len,
1396 )
1397 };
1398 assert_eq!(rc, NET_ERR_BLOB_NOT_REGISTERED);
1399 assert!(out_buf.is_null());
1400 assert_eq!(out_len, 0);
1401 }
1402
1403 mod callback_adapter_round_trip {
1409 use super::*;
1410 use std::collections::HashMap;
1411 use std::sync::Mutex;
1412
1413 struct CallbackCtx {
1414 store: Mutex<HashMap<[u8; 32], Vec<u8>>>,
1415 }
1416
1417 unsafe extern "C" fn cb_store(
1418 ctx: *mut c_void,
1419 _uri: *const c_char,
1420 hash: *const u8,
1421 _size: u64,
1422 data: *const u8,
1423 data_len: usize,
1424 ) -> c_int {
1425 let ctx = &*(ctx as *const CallbackCtx);
1426 let mut h = [0u8; 32];
1427 h.copy_from_slice(std::slice::from_raw_parts(hash, 32));
1428 let buf = if data_len == 0 {
1429 Vec::new()
1430 } else {
1431 std::slice::from_raw_parts(data, data_len).to_vec()
1432 };
1433 ctx.store.lock().unwrap().insert(h, buf);
1434 0
1435 }
1436
1437 unsafe extern "C" fn cb_fetch(
1438 ctx: *mut c_void,
1439 _uri: *const c_char,
1440 hash: *const u8,
1441 _size: u64,
1442 out_data: *mut *mut u8,
1443 out_len: *mut usize,
1444 ) -> c_int {
1445 let ctx = &*(ctx as *const CallbackCtx);
1446 let mut h = [0u8; 32];
1447 h.copy_from_slice(std::slice::from_raw_parts(hash, 32));
1448 let store = ctx.store.lock().unwrap();
1449 match store.get(&h) {
1450 Some(bytes) => {
1451 let boxed = bytes.clone().into_boxed_slice();
1452 let len = boxed.len();
1453 let ptr = Box::into_raw(boxed) as *mut u8;
1454 *out_data = ptr;
1455 *out_len = len;
1456 0
1457 }
1458 None => NET_ERR_BLOB_NOT_FOUND,
1459 }
1460 }
1461
1462 unsafe extern "C" fn cb_fetch_range(
1463 ctx: *mut c_void,
1464 _uri: *const c_char,
1465 hash: *const u8,
1466 _size: u64,
1467 range_start: u64,
1468 range_end: u64,
1469 out_data: *mut *mut u8,
1470 out_len: *mut usize,
1471 ) -> c_int {
1472 let ctx = &*(ctx as *const CallbackCtx);
1473 let mut h = [0u8; 32];
1474 h.copy_from_slice(std::slice::from_raw_parts(hash, 32));
1475 let store = ctx.store.lock().unwrap();
1476 match store.get(&h) {
1477 Some(bytes) => {
1478 let s = range_start as usize;
1479 let e = range_end as usize;
1480 if s > e || e > bytes.len() {
1481 return NET_ERR_BLOB_BACKEND;
1482 }
1483 let slice = bytes[s..e].to_vec().into_boxed_slice();
1484 let len = slice.len();
1485 *out_data = Box::into_raw(slice) as *mut u8;
1486 *out_len = len;
1487 0
1488 }
1489 None => NET_ERR_BLOB_NOT_FOUND,
1490 }
1491 }
1492
1493 unsafe extern "C" fn cb_exists(
1494 ctx: *mut c_void,
1495 _uri: *const c_char,
1496 hash: *const u8,
1497 _size: u64,
1498 out_exists: *mut c_int,
1499 ) -> c_int {
1500 let ctx = &*(ctx as *const CallbackCtx);
1501 let mut h = [0u8; 32];
1502 h.copy_from_slice(std::slice::from_raw_parts(hash, 32));
1503 *out_exists = if ctx.store.lock().unwrap().contains_key(&h) {
1504 1
1505 } else {
1506 0
1507 };
1508 0
1509 }
1510
1511 unsafe extern "C" fn cb_free(_ctx: *mut c_void, data: *mut u8, len: usize) {
1512 if data.is_null() {
1513 return;
1514 }
1515 let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(data, len));
1516 }
1517
1518 #[test]
1519 fn callback_adapter_publish_resolve_round_trip() {
1520 let ctx = Box::new(CallbackCtx {
1521 store: Mutex::new(HashMap::new()),
1522 });
1523 let ctx_ptr = Box::into_raw(ctx) as *mut c_void;
1524 let vtable = NetBlobAdapterVtable {
1525 store: cb_store,
1526 fetch: cb_fetch,
1527 fetch_range: cb_fetch_range,
1528 exists: cb_exists,
1529 free_buffer: cb_free,
1530 };
1531
1532 let id_c = std::ffi::CString::new("ffi-cb-roundtrip").unwrap();
1533 let uri_c = std::ffi::CString::new("cb://round-trip").unwrap();
1534 unsafe {
1535 assert_eq!(
1536 net_blob_register_callback_adapter(id_c.as_ptr(), &vtable, ctx_ptr),
1537 0
1538 );
1539
1540 let payload = b"vtable round-trip payload";
1541 let mut out_buf: *mut u8 = std::ptr::null_mut();
1542 let mut out_len: usize = 0;
1543 let rc = net_blob_publish(
1544 id_c.as_ptr(),
1545 uri_c.as_ptr(),
1546 payload.as_ptr(),
1547 payload.len(),
1548 &mut out_buf,
1549 &mut out_len,
1550 );
1551 assert_eq!(rc, 0);
1552
1553 let mut content_buf: *mut u8 = std::ptr::null_mut();
1554 let mut content_len: usize = 0;
1555 let rc = net_blob_resolve(
1556 id_c.as_ptr(),
1557 out_buf,
1558 out_len,
1559 &mut content_buf,
1560 &mut content_len,
1561 );
1562 assert_eq!(rc, 0);
1563 let resolved = std::slice::from_raw_parts(content_buf, content_len);
1564 assert_eq!(resolved, payload);
1565
1566 net_blob_free_buffer(out_buf, out_len);
1567 net_blob_free_buffer(content_buf, content_len);
1568 assert_eq!(net_blob_unregister_adapter(id_c.as_ptr()), 1);
1569
1570 drop(Box::from_raw(ctx_ptr as *mut CallbackCtx));
1572 }
1573 }
1574 }
1575
1576 #[test]
1577 fn ffi_duplicate_registration_rejected() {
1578 let id = unique_id("ffi-dup");
1579 let root = std::env::temp_dir().join(format!("net-ffi-blob-{}", id));
1580 let id_c = CString::new(id.clone()).unwrap();
1581 let root_c = CString::new(root.to_string_lossy().as_ref()).unwrap();
1582 unsafe {
1583 assert_eq!(
1584 net_blob_register_fs_adapter(id_c.as_ptr(), root_c.as_ptr()),
1585 0
1586 );
1587 assert_eq!(
1588 net_blob_register_fs_adapter(id_c.as_ptr(), root_c.as_ptr()),
1589 NET_ERR_BLOB_DUPLICATE_ID
1590 );
1591 assert_eq!(net_blob_unregister_adapter(id_c.as_ptr()), 1);
1592 }
1593 let _ = std::fs::remove_dir_all(&root);
1594 }
1595}