#![allow(clippy::missing_safety_doc)]
#![expect(
clippy::undocumented_unsafe_blocks,
reason = "module-wide FFI safety contract documented in the # Safety preamble above"
)]
#![expect(
clippy::multiple_unsafe_ops_per_block,
reason = "FFI entry points routinely deref + write to multiple out-parameter fields under the same caller contract; splitting per-op would obscure the single boundary-cross"
)]
use std::ffi::{c_char, c_int, CStr, CString};
use std::mem::ManuallyDrop;
use std::os::raw::c_void;
use std::ptr;
use std::sync::Arc;
use futures::stream::BoxStream;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::runtime::Runtime;
use tokio::sync::Mutex as TokioMutex;
use crate::adapter::net::channel::ChannelName;
use crate::adapter::net::cortex::memories::{
MemoriesAdapter as InnerMemoriesAdapter, MemoriesFilter, MemoriesWatcher, Memory,
OrderBy as MemoriesOrderBy,
};
use crate::adapter::net::cortex::tasks::{
OrderBy as TasksOrderBy, Task, TaskStatus, TasksAdapter as InnerTasksAdapter, TasksFilter,
TasksWatcher,
};
use crate::adapter::net::cortex::WaitForTokenError as InnerWaitForTokenError;
use crate::adapter::net::netdb::{NetDbError as InnerNetDbError, NetDbSnapshot};
use crate::adapter::net::redex::{
FsyncPolicy, Redex as InnerRedex, RedexError, RedexEvent, RedexFile as InnerRedexFile,
RedexFileConfig, WriteToken as InnerWriteToken,
};
use super::handle_guard::{HandleGuard, FFI_HANDLE_FREE_DEADLINE};
use super::NetError;
pub(crate) const NET_ERR_CORTEX_CLOSED: c_int = -100;
pub(crate) const NET_ERR_CORTEX_FOLD: c_int = -101;
pub(crate) const NET_ERR_NETDB: c_int = -102;
pub(crate) const NET_ERR_REDEX: c_int = -103;
pub(crate) const NET_ERR_TIMEOUT: c_int = 1;
pub(crate) const NET_ERR_STREAM_ENDED: c_int = 2;
pub(crate) const NET_ERR_WRONG_ORIGIN: c_int = -104;
pub(crate) const NET_ERR_QUEUE_FULL: c_int = -105;
pub(crate) const NET_ERR_FOLD_STOPPED: c_int = -106;
#[allow(dead_code)] pub(crate) const NET_ERR_FEATURE_NOT_BUILT: c_int = -107;
pub(crate) const NET_ERR_PANIC: c_int = -108;
fn tasks_poll_for_token(adapter: &Arc<InnerTasksAdapter>, token: InnerWriteToken) -> c_int {
match adapter.poll_for_token(token) {
Ok(()) => 0,
Err(InnerWaitForTokenError::WrongOrigin { .. }) => NET_ERR_WRONG_ORIGIN,
Err(InnerWaitForTokenError::FoldStopped { .. }) => NET_ERR_FOLD_STOPPED,
Err(_) => NET_ERR_TIMEOUT,
}
}
fn memories_poll_for_token(adapter: &Arc<InnerMemoriesAdapter>, token: InnerWriteToken) -> c_int {
match adapter.poll_for_token(token) {
Ok(()) => 0,
Err(InnerWaitForTokenError::WrongOrigin { .. }) => NET_ERR_WRONG_ORIGIN,
Err(InnerWaitForTokenError::FoldStopped { .. }) => NET_ERR_FOLD_STOPPED,
Err(_) => NET_ERR_TIMEOUT,
}
}
fn runtime() -> &'static Arc<Runtime> {
use std::sync::OnceLock;
static RT: OnceLock<Arc<Runtime>> = OnceLock::new();
RT.get_or_init(|| {
match tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
{
Ok(rt) => Arc::new(rt),
Err(e) => {
eprintln!(
"FATAL: cortex FFI tokio runtime build failure ({e:?}); aborting to avoid panic across the FFI boundary"
);
std::process::abort();
}
}
})
}
fn block_on<F: std::future::Future>(future: F) -> F::Output {
if tokio::runtime::Handle::try_current().is_ok() {
eprintln!(
"FATAL: cortex FFI called from inside a tokio runtime context; \
aborting to avoid runtime-in-runtime panic across the FFI boundary"
);
std::process::abort();
}
runtime().block_on(future)
}
unsafe fn c_str_to_owned(p: *const c_char) -> Option<String> {
if p.is_null() {
return None;
}
CStr::from_ptr(p).to_str().ok().map(|s| s.to_owned())
}
fn write_json_out<T: Serialize>(
value: &T,
out_ptr: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if out_ptr.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let Ok(s) = serde_json::to_string(value) else {
unsafe {
*out_ptr = ptr::null_mut();
*out_len = 0;
}
return NetError::Unknown.into();
};
let len = s.len();
let Ok(cs) = CString::new(s) else {
unsafe {
*out_ptr = ptr::null_mut();
*out_len = 0;
}
return NetError::Unknown.into();
};
unsafe {
*out_ptr = cs.into_raw();
*out_len = len;
}
0
}
fn zero_out_json(out_ptr: *mut *mut c_char, out_len: *mut usize) {
if !out_ptr.is_null() {
unsafe {
*out_ptr = ptr::null_mut();
}
}
if !out_len.is_null() {
unsafe {
*out_len = 0;
}
}
}
const _: fn() = || {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<InnerRedex>();
assert_send_sync::<InnerRedexFile>();
assert_send_sync::<InnerTasksAdapter>();
assert_send_sync::<InnerMemoriesAdapter>();
assert_send_sync::<
TokioMutex<Option<BoxStream<'static, std::result::Result<RedexEvent, RedexError>>>>,
>();
assert_send_sync::<TokioMutex<Option<BoxStream<'static, Vec<Task>>>>>();
assert_send_sync::<TokioMutex<Option<BoxStream<'static, Vec<Memory>>>>>();
};
pub struct RedexHandle {
inner: ManuallyDrop<Arc<InnerRedex>>,
guard: HandleGuard,
}
impl RedexHandle {
#[allow(dead_code)]
pub(crate) fn redex_arc(&self) -> Arc<InnerRedex> {
(*self.inner).clone()
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_new(persistent_dir: *const c_char) -> *mut RedexHandle {
let dir = if persistent_dir.is_null() {
None
} else {
unsafe { c_str_to_owned(persistent_dir) }
};
let inner = match dir {
Some(d) => InnerRedex::new().with_persistent_dir(d),
None => InnerRedex::new(),
};
Box::into_raw(Box::new(RedexHandle {
inner: ManuallyDrop::new(Arc::new(inner)),
guard: HandleGuard::new(),
}))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_free(handle: *mut RedexHandle) {
if handle.is_null() {
return;
}
let h: &RedexHandle = unsafe { &*handle };
if h.guard.begin_free(FFI_HANDLE_FREE_DEADLINE) {
unsafe {
let inner = ManuallyDrop::take(&mut (*handle).inner);
drop(inner);
}
} else {
tracing::warn!(
"net_redex_free: in-flight ops did not drain within deadline; \
leaking inner to avoid use-after-free"
);
}
}
struct MeshArcOwned {
ptr: *mut Arc<crate::adapter::net::MeshNode>,
}
impl MeshArcOwned {
unsafe fn new(ptr: *mut Arc<crate::adapter::net::MeshNode>) -> Self {
Self { ptr }
}
unsafe fn take(mut self) -> Arc<crate::adapter::net::MeshNode> {
let ptr = std::mem::replace(&mut self.ptr, std::ptr::null_mut());
unsafe { *Box::from_raw(ptr) }
}
}
impl Drop for MeshArcOwned {
fn drop(&mut self) {
if !self.ptr.is_null() {
unsafe { drop(Box::from_raw(self.ptr)) };
}
}
}
#[cfg(feature = "net")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_enable_replication(
redex: *mut RedexHandle,
mesh_arc: *mut Arc<crate::adapter::net::MeshNode>,
) -> c_int {
if redex.is_null() || mesh_arc.is_null() {
if !mesh_arc.is_null() {
unsafe { drop(Box::from_raw(mesh_arc)) };
}
return NetError::NullPointer.into();
}
let arc_guard = unsafe { MeshArcOwned::new(mesh_arc) };
let redex_ref = unsafe { &*redex };
let _op = match redex_ref.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let mesh = unsafe { arc_guard.take() };
redex_ref.inner.enable_replication(mesh);
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_replication_runtime_count(redex: *const RedexHandle) -> u32 {
let Some(h) = (unsafe { redex.as_ref() }) else {
return 0;
};
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return 0,
};
h.inner.replication_runtime_count() as u32
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_replication_prometheus_text(
redex: *const RedexHandle,
) -> *mut c_char {
let Some(h) = (unsafe { redex.as_ref() }) else {
return std::ptr::null_mut();
};
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return std::ptr::null_mut(),
};
let text = h.inner.replication_prometheus_text();
match CString::new(text) {
Ok(c) => c.into_raw(),
Err(_) => CString::default().into_raw(),
}
}
#[cfg(feature = "dataforts")]
#[derive(serde::Deserialize, Default)]
struct RedexGreedyConfigJson {
scopes: Option<Vec<String>>,
proximity_max_rtt_ms: Option<u64>,
per_channel_cap_bytes: Option<u64>,
total_cap_bytes: Option<u64>,
bandwidth_budget_fraction: Option<f32>,
nic_peak_bytes_per_s: Option<u64>,
observer_inflight_cap: Option<u64>,
intent_match: Option<String>,
colocation_policy: Option<String>,
}
#[cfg(feature = "dataforts")]
impl RedexGreedyConfigJson {
fn into_config(self) -> Result<crate::adapter::net::dataforts::GreedyConfig, &'static str> {
use crate::adapter::net::dataforts::{
ColocationPolicy, GreedyConfig, IntentMatchPolicy, ScopeLabel,
};
let mut cfg = GreedyConfig::new();
if let Some(scopes) = self.scopes {
cfg = cfg.with_scopes(scopes.into_iter().map(ScopeLabel::new).collect());
}
if let Some(ms) = self.proximity_max_rtt_ms {
cfg = cfg.with_proximity_max_rtt(std::time::Duration::from_millis(ms));
}
if let Some(b) = self.per_channel_cap_bytes {
cfg = cfg.with_per_channel_cap_bytes(b);
}
if let Some(b) = self.total_cap_bytes {
cfg = cfg.with_total_cap_bytes(b);
}
if let Some(f) = self.bandwidth_budget_fraction {
cfg = cfg.with_bandwidth_budget_fraction(f);
}
if let Some(peak) = self.nic_peak_bytes_per_s {
cfg = cfg.with_nic_peak_bytes_per_s(Some(peak));
}
if let Some(cap) = self.observer_inflight_cap {
cfg = cfg.with_observer_inflight_cap(cap as usize);
}
if let Some(policy) = self.intent_match {
let parsed = match policy.as_str() {
"disabled" => IntentMatchPolicy::Disabled,
"any_of_local_capabilities" => IntentMatchPolicy::AnyOfLocalCapabilities,
"strict" => IntentMatchPolicy::Strict,
_ => return Err("unknown intent_match"),
};
cfg = cfg.with_intent_match(parsed);
}
if let Some(policy) = self.colocation_policy {
let parsed = match policy.as_str() {
"ignore" => ColocationPolicy::Ignore,
"soft_preference" => ColocationPolicy::SoftPreference,
"strict_required" => ColocationPolicy::StrictRequired,
_ => return Err("unknown colocation_policy"),
};
cfg = cfg.with_colocation_policy(parsed);
}
Ok(cfg)
}
}
#[cfg(all(feature = "net", feature = "dataforts"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_enable_greedy_dataforts(
redex: *mut RedexHandle,
mesh_arc: *mut Arc<crate::adapter::net::MeshNode>,
config_json: *const c_char,
) -> c_int {
if redex.is_null() || mesh_arc.is_null() {
if !mesh_arc.is_null() {
unsafe { drop(Box::from_raw(mesh_arc)) };
}
return NetError::NullPointer.into();
}
let arc_guard = unsafe { MeshArcOwned::new(mesh_arc) };
let redex_ref = unsafe { &*redex };
let _op = match redex_ref.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let cfg_json: RedexGreedyConfigJson = if config_json.is_null() {
RedexGreedyConfigJson::default()
} else {
let Some(s) = (unsafe { c_str_to_owned(config_json) }) else {
return NetError::InvalidUtf8.into();
};
if s.is_empty() {
RedexGreedyConfigJson::default()
} else {
match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
}
}
};
let cfg = match cfg_json.into_config() {
Ok(c) => c,
Err(_) => return NET_ERR_REDEX,
};
let mesh = unsafe { arc_guard.take() };
let local_caps = Arc::new(crate::adapter::net::behavior::capability::CapabilitySet::default());
let registry = crate::adapter::net::behavior::placement::IntentRegistry::defaults();
match redex_ref
.inner
.enable_greedy_dataforts(mesh, cfg, local_caps, registry)
{
Ok(()) => 0,
Err(_) => NET_ERR_REDEX,
}
}
#[cfg(feature = "dataforts")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_disable_greedy_dataforts(redex: *mut RedexHandle) -> c_int {
let Some(h) = (unsafe { redex.as_ref() }) else {
return NetError::NullPointer.into();
};
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
h.inner.disable_greedy_dataforts();
0
}
#[cfg(feature = "dataforts")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_greedy_cached_channel_count(redex: *const RedexHandle) -> u32 {
let Some(h) = (unsafe { redex.as_ref() }) else {
return 0;
};
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return 0,
};
h.inner
.greedy_runtime()
.map(|r| r.cached_channel_count() as u32)
.unwrap_or(0)
}
#[cfg(feature = "dataforts")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_greedy_prometheus_text(
redex: *const RedexHandle,
) -> *mut c_char {
let Some(h) = (unsafe { redex.as_ref() }) else {
return std::ptr::null_mut();
};
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return std::ptr::null_mut(),
};
let text = h
.inner
.greedy_runtime()
.map(|r| r.metrics().snapshot().prometheus_text())
.unwrap_or_default();
match CString::new(text) {
Ok(c) => c.into_raw(),
Err(_) => CString::default().into_raw(),
}
}
#[cfg(feature = "dataforts")]
#[derive(serde::Deserialize, Default)]
struct RedexGravityConfigJson {
enabled: Option<bool>,
emit_threshold_ratio: Option<f32>,
decay_half_life_secs: Option<u64>,
tick_interval_ms: Option<u64>,
normalization_reference_rate: Option<f32>,
}
#[cfg(feature = "dataforts")]
impl RedexGravityConfigJson {
fn into_policy_and_tick(
self,
) -> (
crate::adapter::net::dataforts::DataGravityPolicy,
std::time::Duration,
) {
let mut policy = crate::adapter::net::dataforts::DataGravityPolicy::new()
.with_enabled(self.enabled.unwrap_or(true));
if let Some(r) = self.emit_threshold_ratio {
policy = policy.with_emit_threshold_ratio(r);
}
if let Some(secs) = self.decay_half_life_secs {
policy = policy.with_decay_half_life(std::time::Duration::from_secs(secs));
}
if let Some(reference) = self.normalization_reference_rate {
policy = policy.with_normalization_reference_rate(reference);
}
let tick = std::time::Duration::from_millis(self.tick_interval_ms.unwrap_or(500));
(policy, tick)
}
}
#[cfg(all(feature = "net", feature = "dataforts"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_enable_gravity_for_greedy(
redex: *mut RedexHandle,
mesh_arc: *mut Arc<crate::adapter::net::MeshNode>,
config_json: *const c_char,
) -> c_int {
if redex.is_null() || mesh_arc.is_null() {
if !mesh_arc.is_null() {
unsafe { drop(Box::from_raw(mesh_arc)) };
}
return NetError::NullPointer.into();
}
let arc_guard = unsafe { MeshArcOwned::new(mesh_arc) };
let redex_ref = unsafe { &*redex };
let _op = match redex_ref.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let cfg_json: RedexGravityConfigJson = if config_json.is_null() {
RedexGravityConfigJson::default()
} else {
let Some(s) = (unsafe { c_str_to_owned(config_json) }) else {
return NetError::InvalidUtf8.into();
};
if s.is_empty() {
RedexGravityConfigJson::default()
} else {
match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
}
}
};
let (policy, tick) = cfg_json.into_policy_and_tick();
let mesh = unsafe { arc_guard.take() };
match redex_ref
.inner
.enable_gravity_for_greedy(mesh, policy, tick)
{
Ok(()) => 0,
Err(_) => NET_ERR_REDEX,
}
}
#[cfg(feature = "dataforts")]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_disable_gravity_for_greedy(redex: *mut RedexHandle) -> c_int {
let Some(h) = (unsafe { redex.as_ref() }) else {
return NetError::NullPointer.into();
};
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
h.inner.disable_gravity_for_greedy();
0
}
#[cfg(not(feature = "dataforts"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_enable_greedy_dataforts(
_redex: *mut RedexHandle,
mesh_arc: *mut Arc<crate::adapter::net::MeshNode>,
_config_json: *const c_char,
) -> c_int {
if !mesh_arc.is_null() {
unsafe { drop(Box::from_raw(mesh_arc)) };
}
NET_ERR_FEATURE_NOT_BUILT
}
#[cfg(not(feature = "dataforts"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_disable_greedy_dataforts(_redex: *mut RedexHandle) -> c_int {
NET_ERR_FEATURE_NOT_BUILT
}
#[cfg(not(feature = "dataforts"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_greedy_cached_channel_count(_redex: *const RedexHandle) -> u32 {
0
}
#[cfg(not(feature = "dataforts"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_greedy_prometheus_text(
_redex: *const RedexHandle,
) -> *mut c_char {
std::ptr::null_mut()
}
#[cfg(not(feature = "dataforts"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_enable_gravity_for_greedy(
_redex: *mut RedexHandle,
mesh_arc: *mut Arc<crate::adapter::net::MeshNode>,
_config_json: *const c_char,
) -> c_int {
if !mesh_arc.is_null() {
unsafe { drop(Box::from_raw(mesh_arc)) };
}
NET_ERR_FEATURE_NOT_BUILT
}
#[cfg(not(feature = "dataforts"))]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_disable_gravity_for_greedy(_redex: *mut RedexHandle) -> c_int {
NET_ERR_FEATURE_NOT_BUILT
}
#[derive(Deserialize, Default)]
struct RedexFileConfigJson {
#[serde(default)]
persistent: bool,
fsync_every_n: Option<u64>,
fsync_interval_ms: Option<u64>,
retention_max_events: Option<u64>,
retention_max_bytes: Option<u64>,
retention_max_age_ms: Option<u64>,
replication: Option<RedexReplicationConfigJson>,
}
#[derive(Deserialize, Default)]
struct RedexReplicationConfigJson {
factor: Option<u8>,
heartbeat_ms: Option<u64>,
placement: Option<String>,
pinned_nodes: Option<Vec<u64>>,
leader_pinned: Option<u64>,
on_under_capacity: Option<String>,
replication_budget_fraction: Option<f32>,
}
impl RedexReplicationConfigJson {
fn into_config(self) -> Result<crate::adapter::net::redex::ReplicationConfig, &'static str> {
use crate::adapter::net::redex::{PlacementStrategy, ReplicationConfig, UnderCapacity};
let mut cfg = ReplicationConfig::new();
if let Some(f) = self.factor {
cfg = cfg.with_factor(f);
}
if let Some(hb) = self.heartbeat_ms {
cfg = cfg.with_heartbeat_ms(hb);
}
let placement = match self.placement.as_deref() {
None | Some("standard") => PlacementStrategy::Standard,
Some("colocation_strict") | Some("colocation-strict") => {
PlacementStrategy::ColocationStrict
}
Some("pinned") => {
let nodes = self
.pinned_nodes
.ok_or("pinned placement requires pinned_nodes")?;
if nodes.is_empty() {
return Err("pinned placement requires non-empty pinned_nodes");
}
PlacementStrategy::Pinned(nodes)
}
Some(_) => return Err("unknown placement strategy"),
};
cfg = cfg.with_placement(placement);
if let Some(leader) = self.leader_pinned {
cfg = cfg.with_leader_pinned(Some(leader));
}
let policy = match self.on_under_capacity.as_deref() {
None | Some("withdraw") => UnderCapacity::Withdraw,
Some("evict_oldest") | Some("evict-oldest") => UnderCapacity::EvictOldest,
Some(_) => return Err("unknown on_under_capacity policy"),
};
cfg = cfg.with_on_under_capacity(policy);
if let Some(fr) = self.replication_budget_fraction {
cfg = cfg.with_replication_budget_fraction(fr);
}
cfg.validate().map_err(|_| "replication config invalid")?;
Ok(cfg)
}
}
pub struct RedexFileHandle {
inner: ManuallyDrop<Arc<InnerRedexFile>>,
guard: HandleGuard,
}
#[unsafe(no_mangle)]
#[allow(clippy::field_reassign_with_default)]
pub unsafe extern "C" fn net_redex_open_file(
redex: *mut RedexHandle,
name: *const c_char,
config_json: *const c_char,
out_handle: *mut *mut RedexFileHandle,
) -> c_int {
if redex.is_null() || name.is_null() || out_handle.is_null() {
return NetError::NullPointer.into();
}
unsafe {
*out_handle = std::ptr::null_mut();
}
let redex = unsafe { &*redex };
let _op = match redex.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(name_str) = (unsafe { c_str_to_owned(name) }) else {
return NetError::InvalidUtf8.into();
};
let Ok(channel) = ChannelName::new(&name_str) else {
return NET_ERR_REDEX;
};
let cfg_json: RedexFileConfigJson = if config_json.is_null() {
RedexFileConfigJson::default()
} else {
let Some(s) = (unsafe { c_str_to_owned(config_json) }) else {
return NetError::InvalidUtf8.into();
};
match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
}
};
let mut cfg = RedexFileConfig::default();
cfg.persistent = cfg_json.persistent;
match (cfg_json.fsync_every_n, cfg_json.fsync_interval_ms) {
(Some(_), Some(_)) | (Some(0), _) | (_, Some(0)) => return NET_ERR_REDEX,
(Some(n), None) => cfg.fsync_policy = FsyncPolicy::EveryN(n),
(None, Some(ms)) => {
cfg.fsync_policy = FsyncPolicy::Interval(std::time::Duration::from_millis(ms))
}
_ => {}
}
if matches!(cfg_json.retention_max_events, Some(0))
|| matches!(cfg_json.retention_max_bytes, Some(0))
|| matches!(cfg_json.retention_max_age_ms, Some(0))
{
return NET_ERR_REDEX;
}
cfg.retention_max_events = cfg_json.retention_max_events;
cfg.retention_max_bytes = cfg_json.retention_max_bytes;
if let Some(ms) = cfg_json.retention_max_age_ms {
cfg.retention_max_age_ns = Some(ms.saturating_mul(1_000_000));
}
if let Some(rep_json) = cfg_json.replication {
match rep_json.into_config() {
Ok(rep) => cfg.replication = Some(rep),
Err(_) => return NET_ERR_REDEX,
}
}
match redex.inner.open_file(&channel, cfg) {
Ok(file) => {
let handle = Box::new(RedexFileHandle {
inner: ManuallyDrop::new(Arc::new(file)),
guard: HandleGuard::new(),
});
unsafe {
*out_handle = Box::into_raw(handle);
}
0
}
Err(_) => NET_ERR_REDEX,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_file_free(handle: *mut RedexFileHandle) {
if handle.is_null() {
return;
}
let h: &RedexFileHandle = unsafe { &*handle };
if h.guard.begin_free(FFI_HANDLE_FREE_DEADLINE) {
unsafe {
let inner = ManuallyDrop::take(&mut (*handle).inner);
drop(inner);
}
} else {
tracing::warn!(
"net_redex_file_free: in-flight ops did not drain within deadline; \
leaking inner to avoid use-after-free"
);
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_file_append(
handle: *mut RedexFileHandle,
payload: *const u8,
len: usize,
out_seq: *mut u64,
) -> c_int {
if handle.is_null() || payload.is_null() || out_seq.is_null() {
return NetError::NullPointer.into();
}
let file = unsafe { &*handle };
let _op = match file.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
if len > isize::MAX as usize {
return NetError::InvalidJson.into();
}
let slice = unsafe { std::slice::from_raw_parts(payload, len) };
match file.inner.append(slice) {
Ok(seq) => {
unsafe {
*out_seq = seq;
}
0
}
Err(_) => NET_ERR_REDEX,
}
}
#[derive(Serialize)]
struct RedexEventJson {
seq: u64,
payload_hex: String,
checksum: u32,
is_inline: bool,
}
impl From<RedexEvent> for RedexEventJson {
fn from(ev: RedexEvent) -> Self {
RedexEventJson {
seq: ev.entry.seq,
payload_hex: hex_encode(&ev.payload),
checksum: ev.entry.checksum(),
is_inline: ev.entry.is_inline(),
}
}
}
fn hex_encode(bytes: &[u8]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
s.push(HEX[(b >> 4) as usize] as char);
s.push(HEX[(b & 0x0f) as usize] as char);
}
s
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_file_len(handle: *mut RedexFileHandle) -> u64 {
if handle.is_null() {
return 0;
}
let file = unsafe { &*handle };
let _op = match file.guard.try_enter() {
Some(op) => op,
None => return 0,
};
file.inner.len() as u64
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_file_read_range(
handle: *mut RedexFileHandle,
start: u64,
end: u64,
out_json: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if handle.is_null() || out_json.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let file = unsafe { &*handle };
let _op = match file.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let events: Vec<RedexEventJson> = file
.inner
.read_range(start, end)
.into_iter()
.map(RedexEventJson::from)
.collect();
write_json_out(&events, out_json, out_len)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_file_sync(handle: *mut RedexFileHandle) -> c_int {
if handle.is_null() {
return NetError::NullPointer.into();
}
let file = unsafe { &*handle };
let _op = match file.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
match file.inner.sync() {
Ok(()) => 0,
Err(_) => NET_ERR_REDEX,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_file_close(handle: *mut RedexFileHandle) -> c_int {
if handle.is_null() {
return NetError::NullPointer.into();
}
let file = unsafe { &*handle };
let _op = match file.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
match file.inner.close() {
Ok(()) => 0,
Err(_) => NET_ERR_REDEX,
}
}
type RedexTailStream = ManuallyDrop<
TokioMutex<Option<BoxStream<'static, std::result::Result<RedexEvent, RedexError>>>>,
>;
pub struct RedexTailHandle {
stream: RedexTailStream,
guard: HandleGuard,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_file_tail(
handle: *mut RedexFileHandle,
from_seq: u64,
out_cursor: *mut *mut RedexTailHandle,
) -> c_int {
if handle.is_null() || out_cursor.is_null() {
return NetError::NullPointer.into();
}
unsafe {
*out_cursor = std::ptr::null_mut();
}
let file = unsafe { &*handle };
let _op = match file.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let stream = file.inner.tail(from_seq);
let boxed: BoxStream<'static, std::result::Result<RedexEvent, RedexError>> = stream.boxed();
let cursor = Box::new(RedexTailHandle {
stream: ManuallyDrop::new(TokioMutex::new(Some(boxed))),
guard: HandleGuard::new(),
});
unsafe {
*out_cursor = Box::into_raw(cursor);
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_tail_next(
cursor: *mut RedexTailHandle,
timeout_ms: u32,
out_json: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if cursor.is_null() || out_json.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
zero_out_json(out_json, out_len);
let cursor = unsafe { &*cursor };
let _op = match cursor.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
block_on(async move {
let mut guard = cursor.stream.lock().await;
let Some(stream) = guard.as_mut() else {
return NET_ERR_STREAM_ENDED;
};
let next_fut = stream.next();
let outcome = if timeout_ms == 0 {
next_fut.await
} else {
match tokio::time::timeout(
std::time::Duration::from_millis(timeout_ms as u64),
next_fut,
)
.await
{
Ok(v) => v,
Err(_) => return NET_ERR_TIMEOUT,
}
};
match outcome {
Some(Ok(ev)) => {
drop(guard);
let js = RedexEventJson::from(ev);
write_json_out(&js, out_json, out_len)
}
Some(Err(RedexError::Closed)) | None => {
*guard = None;
NET_ERR_STREAM_ENDED
}
Some(Err(_)) => {
*guard = None;
NET_ERR_REDEX
}
}
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redex_tail_free(cursor: *mut RedexTailHandle) {
if cursor.is_null() {
return;
}
let h: &RedexTailHandle = unsafe { &*cursor };
if h.guard.begin_free(FFI_HANDLE_FREE_DEADLINE) {
unsafe {
let stream = ManuallyDrop::take(&mut (*cursor).stream);
drop(stream);
}
} else {
tracing::warn!(
"net_redex_tail_free: in-flight ops did not drain within deadline; \
leaking inner to avoid use-after-free"
);
}
}
pub struct TasksAdapterHandle {
inner: ManuallyDrop<Arc<InnerTasksAdapter>>,
guard: HandleGuard,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_tasks_adapter_open(
redex: *mut RedexHandle,
origin_hash: u64,
persistent: c_int,
out_handle: *mut *mut TasksAdapterHandle,
) -> c_int {
if redex.is_null() || out_handle.is_null() {
return NetError::NullPointer.into();
}
let redex = unsafe { &*redex };
let _op = match redex.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let cfg = if persistent != 0 {
RedexFileConfig::default().with_persistent(true)
} else {
RedexFileConfig::default()
};
let redex_inner: Arc<InnerRedex> = Arc::clone(&redex.inner);
let result = block_on(async move {
InnerTasksAdapter::open_with_config(&redex_inner, origin_hash, cfg).await
});
match result {
Ok(adapter) => {
let handle = Box::new(TasksAdapterHandle {
inner: ManuallyDrop::new(Arc::new(adapter)),
guard: HandleGuard::new(),
});
unsafe {
*out_handle = Box::into_raw(handle);
}
0
}
Err(_) => NET_ERR_CORTEX_FOLD,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_tasks_adapter_close(handle: *mut TasksAdapterHandle) -> c_int {
if handle.is_null() {
return NetError::NullPointer.into();
}
let tasks = unsafe { &*handle };
let _op = match tasks.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
match tasks.inner.close() {
Ok(()) => 0,
Err(_) => NET_ERR_CORTEX_CLOSED,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_tasks_adapter_free(handle: *mut TasksAdapterHandle) {
if handle.is_null() {
return;
}
let h: &TasksAdapterHandle = unsafe { &*handle };
if h.guard.begin_free(FFI_HANDLE_FREE_DEADLINE) {
unsafe {
let inner = ManuallyDrop::take(&mut (*handle).inner);
drop(inner);
}
} else {
tracing::warn!(
"net_tasks_adapter_free: in-flight ops did not drain within deadline; \
leaking inner to avoid use-after-free"
);
}
}
#[derive(Serialize)]
struct TaskJson {
id: u64,
title: String,
status: &'static str,
created_ns: u64,
updated_ns: u64,
}
impl From<Task> for TaskJson {
fn from(t: Task) -> Self {
TaskJson {
id: t.id,
title: t.title,
status: match t.status {
TaskStatus::Pending => "pending",
TaskStatus::Completed => "completed",
},
created_ns: t.created_ns,
updated_ns: t.updated_ns,
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_tasks_create(
handle: *mut TasksAdapterHandle,
id: u64,
title: *const c_char,
now_ns: u64,
out_seq: *mut u64,
) -> c_int {
if handle.is_null() || title.is_null() || out_seq.is_null() {
return NetError::NullPointer.into();
}
let tasks = unsafe { &*handle };
let _op = match tasks.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(title) = (unsafe { c_str_to_owned(title) }) else {
return NetError::InvalidUtf8.into();
};
match tasks.inner.create(id, title, now_ns) {
Ok(seq) => {
unsafe {
*out_seq = seq;
}
0
}
Err(_) => NET_ERR_CORTEX_FOLD,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_tasks_rename(
handle: *mut TasksAdapterHandle,
id: u64,
new_title: *const c_char,
now_ns: u64,
out_seq: *mut u64,
) -> c_int {
if handle.is_null() || new_title.is_null() || out_seq.is_null() {
return NetError::NullPointer.into();
}
let tasks = unsafe { &*handle };
let _op = match tasks.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(nt) = (unsafe { c_str_to_owned(new_title) }) else {
return NetError::InvalidUtf8.into();
};
match tasks.inner.rename(id, nt, now_ns) {
Ok(seq) => {
unsafe {
*out_seq = seq;
}
0
}
Err(_) => NET_ERR_CORTEX_FOLD,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_tasks_complete(
handle: *mut TasksAdapterHandle,
id: u64,
now_ns: u64,
out_seq: *mut u64,
) -> c_int {
if handle.is_null() || out_seq.is_null() {
return NetError::NullPointer.into();
}
let tasks = unsafe { &*handle };
let _op = match tasks.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
match tasks.inner.complete(id, now_ns) {
Ok(seq) => {
unsafe {
*out_seq = seq;
}
0
}
Err(_) => NET_ERR_CORTEX_FOLD,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_tasks_delete(
handle: *mut TasksAdapterHandle,
id: u64,
out_seq: *mut u64,
) -> c_int {
if handle.is_null() || out_seq.is_null() {
return NetError::NullPointer.into();
}
let tasks = unsafe { &*handle };
let _op = match tasks.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
match tasks.inner.delete(id) {
Ok(seq) => {
unsafe {
*out_seq = seq;
}
0
}
Err(_) => NET_ERR_CORTEX_FOLD,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_tasks_wait_for_seq(
handle: *mut TasksAdapterHandle,
seq: u64,
timeout_ms: u32,
) -> c_int {
if handle.is_null() {
return NetError::NullPointer.into();
}
let tasks = unsafe { &*handle };
let _op = match tasks.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let adapter: Arc<InnerTasksAdapter> = Arc::clone(&tasks.inner);
block_on(async move {
let fut = adapter.wait_for_seq(seq);
if timeout_ms == 0 {
match fut.await {
Ok(()) => 0,
Err(_) => NET_ERR_FOLD_STOPPED,
}
} else {
match tokio::time::timeout(std::time::Duration::from_millis(timeout_ms as u64), fut)
.await
{
Ok(Ok(())) => 0,
Ok(Err(_)) => NET_ERR_FOLD_STOPPED,
Err(_) => NET_ERR_TIMEOUT,
}
}
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_tasks_wait_for_token(
handle: *mut TasksAdapterHandle,
origin_hash: u64,
seq: u64,
timeout_ms: u32,
) -> c_int {
if handle.is_null() {
return NetError::NullPointer.into();
}
let tasks = unsafe { &*handle };
let _op = match tasks.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let adapter: Arc<InnerTasksAdapter> = Arc::clone(&tasks.inner);
let token = InnerWriteToken::new(origin_hash, seq);
if timeout_ms == 0 {
return tasks_poll_for_token(&adapter, token);
}
let deadline = std::time::Duration::from_millis(timeout_ms as u64);
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
block_on(async move {
match adapter.wait_for_token(token, deadline).await {
Ok(()) => 0,
Err(InnerWaitForTokenError::Timeout) => NET_ERR_TIMEOUT,
Err(InnerWaitForTokenError::WrongOrigin { .. }) => NET_ERR_WRONG_ORIGIN,
Err(InnerWaitForTokenError::QueueFull) => NET_ERR_QUEUE_FULL,
Err(InnerWaitForTokenError::FoldStopped { .. }) => NET_ERR_FOLD_STOPPED,
}
})
}));
result.unwrap_or(NET_ERR_PANIC)
}
#[derive(Deserialize, Default)]
struct TasksFilterJson {
status: Option<String>,
title_contains: Option<String>,
created_after_ns: Option<u64>,
created_before_ns: Option<u64>,
updated_after_ns: Option<u64>,
updated_before_ns: Option<u64>,
order_by: Option<String>,
limit: Option<u32>,
}
fn build_tasks_watcher(
adapter: &InnerTasksAdapter,
filter_json: *const c_char,
) -> Result<TasksWatcher, c_int> {
let mut w = adapter.watch();
if filter_json.is_null() {
return Ok(w);
}
let Some(s) = (unsafe { c_str_to_owned(filter_json) }) else {
return Err(NetError::InvalidUtf8.into());
};
let f: TasksFilterJson = match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return Err(NetError::InvalidJson.into()),
};
w = match f.status.as_deref() {
Some("pending") => w.where_status(TaskStatus::Pending),
Some("completed") => w.where_status(TaskStatus::Completed),
Some(_) => return Err(NetError::InvalidJson.into()),
None => w,
};
if let Some(s) = f.title_contains {
w = w.title_contains(s);
}
if let Some(ns) = f.created_after_ns {
w = w.created_after(ns);
}
if let Some(ns) = f.created_before_ns {
w = w.created_before(ns);
}
if let Some(ns) = f.updated_after_ns {
w = w.updated_after(ns);
}
if let Some(ns) = f.updated_before_ns {
w = w.updated_before(ns);
}
if let Some(o) = f.order_by.as_deref() {
w = match o {
"id_asc" => w.order_by(TasksOrderBy::IdAsc),
"id_desc" => w.order_by(TasksOrderBy::IdDesc),
"created_asc" => w.order_by(TasksOrderBy::CreatedAsc),
"created_desc" => w.order_by(TasksOrderBy::CreatedDesc),
"updated_asc" => w.order_by(TasksOrderBy::UpdatedAsc),
"updated_desc" => w.order_by(TasksOrderBy::UpdatedDesc),
_ => return Err(NetError::InvalidJson.into()),
};
}
if let Some(l) = f.limit {
w = w.limit(l as usize);
}
Ok(w)
}
#[allow(clippy::field_reassign_with_default)]
fn build_tasks_list_filter(filter_json: *const c_char) -> Result<TasksFilter, c_int> {
if filter_json.is_null() {
return Ok(TasksFilter::default());
}
let Some(s) = (unsafe { c_str_to_owned(filter_json) }) else {
return Err(NetError::InvalidUtf8.into());
};
let f: TasksFilterJson = match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return Err(NetError::InvalidJson.into()),
};
let mut out = TasksFilter::default();
out.status = match f.status.as_deref() {
Some("pending") => Some(TaskStatus::Pending),
Some("completed") => Some(TaskStatus::Completed),
Some(_) => return Err(NetError::InvalidJson.into()),
None => None,
};
out.title_contains = f.title_contains;
out.created_after_ns = f.created_after_ns;
out.created_before_ns = f.created_before_ns;
out.updated_after_ns = f.updated_after_ns;
out.updated_before_ns = f.updated_before_ns;
out.order_by = match f.order_by.as_deref() {
None => None,
Some("id_asc") => Some(TasksOrderBy::IdAsc),
Some("id_desc") => Some(TasksOrderBy::IdDesc),
Some("created_asc") => Some(TasksOrderBy::CreatedAsc),
Some("created_desc") => Some(TasksOrderBy::CreatedDesc),
Some("updated_asc") => Some(TasksOrderBy::UpdatedAsc),
Some("updated_desc") => Some(TasksOrderBy::UpdatedDesc),
Some(_) => return Err(NetError::InvalidJson.into()),
};
out.limit = f.limit.map(|l| l as usize);
Ok(out)
}
fn run_tasks_list(tasks: &InnerTasksAdapter, filter: &TasksFilter) -> Vec<Task> {
let state = tasks.state();
let guard = state.read();
let mut q = guard.query();
if let Some(s) = filter.status {
q = q.where_status(s);
}
if let Some(s) = &filter.title_contains {
q = q.title_contains(s.clone());
}
if let Some(ns) = filter.created_after_ns {
q = q.created_after(ns);
}
if let Some(ns) = filter.created_before_ns {
q = q.created_before(ns);
}
if let Some(ns) = filter.updated_after_ns {
q = q.updated_after(ns);
}
if let Some(ns) = filter.updated_before_ns {
q = q.updated_before(ns);
}
if let Some(o) = filter.order_by {
q = q.order_by(o);
}
if let Some(l) = filter.limit {
q = q.limit(l);
}
q.collect()
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_tasks_list(
handle: *mut TasksAdapterHandle,
filter_json: *const c_char,
out_json: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if handle.is_null() || out_json.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
zero_out_json(out_json, out_len);
let tasks = unsafe { &*handle };
let _op = match tasks.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let filter = match build_tasks_list_filter(filter_json) {
Ok(f) => f,
Err(code) => return code,
};
let items: Vec<TaskJson> = run_tasks_list(&tasks.inner, &filter)
.into_iter()
.map(TaskJson::from)
.collect();
write_json_out(&items, out_json, out_len)
}
pub struct TasksWatchHandle {
stream: ManuallyDrop<TokioMutex<Option<BoxStream<'static, Vec<Task>>>>>,
guard: HandleGuard,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_tasks_snapshot_and_watch(
handle: *mut TasksAdapterHandle,
filter_json: *const c_char,
out_snapshot: *mut *mut c_char,
out_snapshot_len: *mut usize,
out_cursor: *mut *mut TasksWatchHandle,
) -> c_int {
if handle.is_null()
|| out_snapshot.is_null()
|| out_snapshot_len.is_null()
|| out_cursor.is_null()
{
return NetError::NullPointer.into();
}
let tasks = unsafe { &*handle };
let _op = match tasks.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let watcher = match build_tasks_watcher(&tasks.inner, filter_json) {
Ok(w) => w,
Err(code) => return code,
};
let adapter: Arc<InnerTasksAdapter> = Arc::clone(&tasks.inner);
let (snapshot, stream) = block_on(async move { adapter.snapshot_and_watch(watcher) });
let snapshot_json: Vec<TaskJson> = snapshot.into_iter().map(TaskJson::from).collect();
let code = write_json_out(&snapshot_json, out_snapshot, out_snapshot_len);
if code != 0 {
return code;
}
let handle = Box::new(TasksWatchHandle {
stream: ManuallyDrop::new(TokioMutex::new(Some(stream))),
guard: HandleGuard::new(),
});
unsafe {
*out_cursor = Box::into_raw(handle);
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_tasks_watch_next(
cursor: *mut TasksWatchHandle,
timeout_ms: u32,
out_json: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if cursor.is_null() || out_json.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let cursor = unsafe { &*cursor };
let _op = match cursor.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
block_on(async move {
let mut guard = cursor.stream.lock().await;
let Some(stream) = guard.as_mut() else {
return NET_ERR_STREAM_ENDED;
};
let next_fut = stream.next();
let outcome = if timeout_ms == 0 {
next_fut.await
} else {
match tokio::time::timeout(
std::time::Duration::from_millis(timeout_ms as u64),
next_fut,
)
.await
{
Ok(v) => v,
Err(_) => return NET_ERR_TIMEOUT,
}
};
match outcome {
Some(batch) => {
let js: Vec<TaskJson> = batch.into_iter().map(TaskJson::from).collect();
write_json_out(&js, out_json, out_len)
}
None => {
*guard = None;
NET_ERR_STREAM_ENDED
}
}
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_tasks_watch_free(cursor: *mut TasksWatchHandle) {
if cursor.is_null() {
return;
}
let h: &TasksWatchHandle = unsafe { &*cursor };
if h.guard.begin_free(FFI_HANDLE_FREE_DEADLINE) {
unsafe {
let stream = ManuallyDrop::take(&mut (*cursor).stream);
drop(stream);
}
} else {
tracing::warn!(
"net_tasks_watch_free: in-flight ops did not drain within deadline; \
leaking inner to avoid use-after-free"
);
}
}
pub struct MemoriesAdapterHandle {
inner: ManuallyDrop<Arc<InnerMemoriesAdapter>>,
guard: HandleGuard,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_memories_adapter_open(
redex: *mut RedexHandle,
origin_hash: u64,
persistent: c_int,
out_handle: *mut *mut MemoriesAdapterHandle,
) -> c_int {
if redex.is_null() || out_handle.is_null() {
return NetError::NullPointer.into();
}
let redex = unsafe { &*redex };
let _op = match redex.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let cfg = if persistent != 0 {
RedexFileConfig::default().with_persistent(true)
} else {
RedexFileConfig::default()
};
let redex_inner: Arc<InnerRedex> = Arc::clone(&redex.inner);
let result = block_on(async move {
InnerMemoriesAdapter::open_with_config(&redex_inner, origin_hash, cfg).await
});
match result {
Ok(adapter) => {
let handle = Box::new(MemoriesAdapterHandle {
inner: ManuallyDrop::new(Arc::new(adapter)),
guard: HandleGuard::new(),
});
unsafe {
*out_handle = Box::into_raw(handle);
}
0
}
Err(_) => NET_ERR_CORTEX_FOLD,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_memories_adapter_close(handle: *mut MemoriesAdapterHandle) -> c_int {
if handle.is_null() {
return NetError::NullPointer.into();
}
let mem = unsafe { &*handle };
let _op = match mem.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
match mem.inner.close() {
Ok(()) => 0,
Err(_) => NET_ERR_CORTEX_CLOSED,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_memories_adapter_free(handle: *mut MemoriesAdapterHandle) {
if handle.is_null() {
return;
}
let h: &MemoriesAdapterHandle = unsafe { &*handle };
if h.guard.begin_free(FFI_HANDLE_FREE_DEADLINE) {
unsafe {
let inner = ManuallyDrop::take(&mut (*handle).inner);
drop(inner);
}
} else {
tracing::warn!(
"net_memories_adapter_free: in-flight ops did not drain within deadline; \
leaking inner to avoid use-after-free"
);
}
}
#[derive(Serialize)]
struct MemoryJson {
id: u64,
content: String,
tags: Vec<String>,
source: String,
created_ns: u64,
updated_ns: u64,
pinned: bool,
}
impl From<Memory> for MemoryJson {
fn from(m: Memory) -> Self {
MemoryJson {
id: m.id,
content: m.content,
tags: m.tags,
source: m.source,
created_ns: m.created_ns,
updated_ns: m.updated_ns,
pinned: m.pinned,
}
}
}
impl From<std::sync::Arc<Memory>> for MemoryJson {
fn from(m: std::sync::Arc<Memory>) -> Self {
let owned = std::sync::Arc::try_unwrap(m).unwrap_or_else(|arc| (*arc).clone());
owned.into()
}
}
#[derive(Deserialize)]
struct MemoryStoreInput {
id: u64,
content: String,
tags: Vec<String>,
source: String,
now_ns: u64,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_memories_store(
handle: *mut MemoriesAdapterHandle,
input_json: *const c_char,
out_seq: *mut u64,
) -> c_int {
if handle.is_null() || input_json.is_null() || out_seq.is_null() {
return NetError::NullPointer.into();
}
let mem = unsafe { &*handle };
let _op = match mem.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(s) = (unsafe { c_str_to_owned(input_json) }) else {
return NetError::InvalidUtf8.into();
};
let input: MemoryStoreInput = match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
};
match mem.inner.store(
input.id,
input.content,
input.tags,
input.source,
input.now_ns,
) {
Ok(seq) => {
unsafe {
*out_seq = seq;
}
0
}
Err(_) => NET_ERR_CORTEX_FOLD,
}
}
#[derive(Deserialize)]
struct MemoryRetagInput {
id: u64,
tags: Vec<String>,
now_ns: u64,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_memories_retag(
handle: *mut MemoriesAdapterHandle,
input_json: *const c_char,
out_seq: *mut u64,
) -> c_int {
if handle.is_null() || input_json.is_null() || out_seq.is_null() {
return NetError::NullPointer.into();
}
let mem = unsafe { &*handle };
let _op = match mem.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let Some(s) = (unsafe { c_str_to_owned(input_json) }) else {
return NetError::InvalidUtf8.into();
};
let input: MemoryRetagInput = match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return NetError::InvalidJson.into(),
};
match mem.inner.retag(input.id, input.tags, input.now_ns) {
Ok(seq) => {
unsafe {
*out_seq = seq;
}
0
}
Err(_) => NET_ERR_CORTEX_FOLD,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_memories_pin(
handle: *mut MemoriesAdapterHandle,
id: u64,
now_ns: u64,
out_seq: *mut u64,
) -> c_int {
if handle.is_null() || out_seq.is_null() {
return NetError::NullPointer.into();
}
let mem = unsafe { &*handle };
let _op = match mem.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
match mem.inner.pin(id, now_ns) {
Ok(seq) => {
unsafe {
*out_seq = seq;
}
0
}
Err(_) => NET_ERR_CORTEX_FOLD,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_memories_unpin(
handle: *mut MemoriesAdapterHandle,
id: u64,
now_ns: u64,
out_seq: *mut u64,
) -> c_int {
if handle.is_null() || out_seq.is_null() {
return NetError::NullPointer.into();
}
let mem = unsafe { &*handle };
let _op = match mem.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
match mem.inner.unpin(id, now_ns) {
Ok(seq) => {
unsafe {
*out_seq = seq;
}
0
}
Err(_) => NET_ERR_CORTEX_FOLD,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_memories_delete(
handle: *mut MemoriesAdapterHandle,
id: u64,
out_seq: *mut u64,
) -> c_int {
if handle.is_null() || out_seq.is_null() {
return NetError::NullPointer.into();
}
let mem = unsafe { &*handle };
let _op = match mem.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
match mem.inner.delete(id) {
Ok(seq) => {
unsafe {
*out_seq = seq;
}
0
}
Err(_) => NET_ERR_CORTEX_FOLD,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_memories_wait_for_seq(
handle: *mut MemoriesAdapterHandle,
seq: u64,
timeout_ms: u32,
) -> c_int {
if handle.is_null() {
return NetError::NullPointer.into();
}
let mem = unsafe { &*handle };
let _op = match mem.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let adapter: Arc<InnerMemoriesAdapter> = Arc::clone(&mem.inner);
block_on(async move {
let fut = adapter.wait_for_seq(seq);
if timeout_ms == 0 {
match fut.await {
Ok(()) => 0,
Err(_) => NET_ERR_FOLD_STOPPED,
}
} else {
match tokio::time::timeout(std::time::Duration::from_millis(timeout_ms as u64), fut)
.await
{
Ok(Ok(())) => 0,
Ok(Err(_)) => NET_ERR_FOLD_STOPPED,
Err(_) => NET_ERR_TIMEOUT,
}
}
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_memories_wait_for_token(
handle: *mut MemoriesAdapterHandle,
origin_hash: u64,
seq: u64,
timeout_ms: u32,
) -> c_int {
if handle.is_null() {
return NetError::NullPointer.into();
}
let mem = unsafe { &*handle };
let _op = match mem.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let adapter: Arc<InnerMemoriesAdapter> = Arc::clone(&mem.inner);
let token = InnerWriteToken::new(origin_hash, seq);
if timeout_ms == 0 {
return memories_poll_for_token(&adapter, token);
}
let deadline = std::time::Duration::from_millis(timeout_ms as u64);
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
block_on(async move {
match adapter.wait_for_token(token, deadline).await {
Ok(()) => 0,
Err(InnerWaitForTokenError::Timeout) => NET_ERR_TIMEOUT,
Err(InnerWaitForTokenError::WrongOrigin { .. }) => NET_ERR_WRONG_ORIGIN,
Err(InnerWaitForTokenError::QueueFull) => NET_ERR_QUEUE_FULL,
Err(InnerWaitForTokenError::FoldStopped { .. }) => NET_ERR_FOLD_STOPPED,
}
})
}));
result.unwrap_or(NET_ERR_PANIC)
}
#[derive(Deserialize, Default)]
struct MemoriesFilterJson {
source: Option<String>,
content_contains: Option<String>,
tag: Option<String>,
any_tag: Option<Vec<String>>,
all_tags: Option<Vec<String>>,
pinned: Option<bool>,
created_after_ns: Option<u64>,
created_before_ns: Option<u64>,
updated_after_ns: Option<u64>,
updated_before_ns: Option<u64>,
order_by: Option<String>,
limit: Option<u32>,
}
fn parse_memories_order_by(s: &str) -> Option<MemoriesOrderBy> {
match s {
"id_asc" => Some(MemoriesOrderBy::IdAsc),
"id_desc" => Some(MemoriesOrderBy::IdDesc),
"created_asc" => Some(MemoriesOrderBy::CreatedAsc),
"created_desc" => Some(MemoriesOrderBy::CreatedDesc),
"updated_asc" => Some(MemoriesOrderBy::UpdatedAsc),
"updated_desc" => Some(MemoriesOrderBy::UpdatedDesc),
_ => None,
}
}
fn build_memories_watcher(
adapter: &InnerMemoriesAdapter,
filter_json: *const c_char,
) -> Result<MemoriesWatcher, c_int> {
let mut w = adapter.watch();
if filter_json.is_null() {
return Ok(w);
}
let Some(s) = (unsafe { c_str_to_owned(filter_json) }) else {
return Err(NetError::InvalidUtf8.into());
};
let f: MemoriesFilterJson = match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return Err(NetError::InvalidJson.into()),
};
if let Some(s) = f.source {
w = w.where_source(s);
}
if let Some(s) = f.content_contains {
w = w.content_contains(s);
}
if let Some(t) = f.tag {
w = w.where_tag(t);
}
if let Some(tags) = f.any_tag {
w = w.where_any_tag(tags);
}
if let Some(tags) = f.all_tags {
w = w.where_all_tags(tags);
}
if let Some(p) = f.pinned {
w = w.where_pinned(p);
}
if let Some(ns) = f.created_after_ns {
w = w.created_after(ns);
}
if let Some(ns) = f.created_before_ns {
w = w.created_before(ns);
}
if let Some(ns) = f.updated_after_ns {
w = w.updated_after(ns);
}
if let Some(ns) = f.updated_before_ns {
w = w.updated_before(ns);
}
if let Some(o) = f.order_by.as_deref() {
if let Some(ob) = parse_memories_order_by(o) {
w = w.order_by(ob);
} else {
return Err(NetError::InvalidJson.into());
}
}
if let Some(l) = f.limit {
w = w.limit(l as usize);
}
Ok(w)
}
#[allow(clippy::field_reassign_with_default)]
fn build_memories_list_filter(filter_json: *const c_char) -> Result<MemoriesFilter, c_int> {
if filter_json.is_null() {
return Ok(MemoriesFilter::default());
}
let Some(s) = (unsafe { c_str_to_owned(filter_json) }) else {
return Err(NetError::InvalidUtf8.into());
};
let f: MemoriesFilterJson = match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => return Err(NetError::InvalidJson.into()),
};
let mut out = MemoriesFilter::default();
out.source = f.source;
out.content_contains = f.content_contains;
out.tag = f.tag;
out.any_tag = f.any_tag;
out.all_tags = f.all_tags;
out.pinned = f.pinned;
out.created_after_ns = f.created_after_ns;
out.created_before_ns = f.created_before_ns;
out.updated_after_ns = f.updated_after_ns;
out.updated_before_ns = f.updated_before_ns;
out.order_by = match f.order_by.as_deref() {
None => None,
Some(o) => match parse_memories_order_by(o) {
Some(ob) => Some(ob),
None => return Err(NetError::InvalidJson.into()),
},
};
out.limit = f.limit.map(|l| l as usize);
Ok(out)
}
fn run_memories_list(
mem: &InnerMemoriesAdapter,
filter: &MemoriesFilter,
) -> Vec<std::sync::Arc<Memory>> {
let state = mem.state();
let guard = state.read();
let mut q = guard.query();
if let Some(s) = &filter.source {
q = q.where_source(s.clone());
}
if let Some(s) = &filter.content_contains {
q = q.content_contains(s.clone());
}
if let Some(t) = &filter.tag {
q = q.where_tag(t.clone());
}
if let Some(tags) = &filter.any_tag {
q = q.where_any_tag(tags.clone());
}
if let Some(tags) = &filter.all_tags {
q = q.where_all_tags(tags.clone());
}
if let Some(p) = filter.pinned {
q = q.where_pinned(p);
}
if let Some(ns) = filter.created_after_ns {
q = q.created_after(ns);
}
if let Some(ns) = filter.created_before_ns {
q = q.created_before(ns);
}
if let Some(ns) = filter.updated_after_ns {
q = q.updated_after(ns);
}
if let Some(ns) = filter.updated_before_ns {
q = q.updated_before(ns);
}
if let Some(o) = filter.order_by {
q = q.order_by(o);
}
if let Some(l) = filter.limit {
q = q.limit(l);
}
q.collect()
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_memories_list(
handle: *mut MemoriesAdapterHandle,
filter_json: *const c_char,
out_json: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if handle.is_null() || out_json.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let mem = unsafe { &*handle };
let _op = match mem.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let filter = match build_memories_list_filter(filter_json) {
Ok(f) => f,
Err(code) => return code,
};
let items: Vec<MemoryJson> = run_memories_list(&mem.inner, &filter)
.into_iter()
.map(MemoryJson::from)
.collect();
write_json_out(&items, out_json, out_len)
}
type MemoryWatchStream = BoxStream<'static, Vec<std::sync::Arc<Memory>>>;
pub struct MemoriesWatchHandle {
stream: ManuallyDrop<TokioMutex<Option<MemoryWatchStream>>>,
guard: HandleGuard,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_memories_snapshot_and_watch(
handle: *mut MemoriesAdapterHandle,
filter_json: *const c_char,
out_snapshot: *mut *mut c_char,
out_snapshot_len: *mut usize,
out_cursor: *mut *mut MemoriesWatchHandle,
) -> c_int {
if handle.is_null()
|| out_snapshot.is_null()
|| out_snapshot_len.is_null()
|| out_cursor.is_null()
{
return NetError::NullPointer.into();
}
let mem = unsafe { &*handle };
let _op = match mem.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let watcher = match build_memories_watcher(&mem.inner, filter_json) {
Ok(w) => w,
Err(code) => return code,
};
let adapter: Arc<InnerMemoriesAdapter> = Arc::clone(&mem.inner);
let (snapshot, stream) = block_on(async move { adapter.snapshot_and_watch(watcher) });
let snapshot_json: Vec<MemoryJson> = snapshot.into_iter().map(MemoryJson::from).collect();
let code = write_json_out(&snapshot_json, out_snapshot, out_snapshot_len);
if code != 0 {
return code;
}
let handle = Box::new(MemoriesWatchHandle {
stream: ManuallyDrop::new(TokioMutex::new(Some(stream))),
guard: HandleGuard::new(),
});
unsafe {
*out_cursor = Box::into_raw(handle);
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_memories_watch_next(
cursor: *mut MemoriesWatchHandle,
timeout_ms: u32,
out_json: *mut *mut c_char,
out_len: *mut usize,
) -> c_int {
if cursor.is_null() || out_json.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
let cursor = unsafe { &*cursor };
let _op = match cursor.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
block_on(async move {
let mut guard = cursor.stream.lock().await;
let Some(stream) = guard.as_mut() else {
return NET_ERR_STREAM_ENDED;
};
let next_fut = stream.next();
let outcome = if timeout_ms == 0 {
next_fut.await
} else {
match tokio::time::timeout(
std::time::Duration::from_millis(timeout_ms as u64),
next_fut,
)
.await
{
Ok(v) => v,
Err(_) => return NET_ERR_TIMEOUT,
}
};
match outcome {
Some(batch) => {
let js: Vec<MemoryJson> = batch.into_iter().map(MemoryJson::from).collect();
write_json_out(&js, out_json, out_len)
}
None => {
*guard = None;
NET_ERR_STREAM_ENDED
}
}
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_memories_watch_free(cursor: *mut MemoriesWatchHandle) {
if cursor.is_null() {
return;
}
let h: &MemoriesWatchHandle = unsafe { &*cursor };
if h.guard.begin_free(FFI_HANDLE_FREE_DEADLINE) {
unsafe {
let stream = ManuallyDrop::take(&mut (*cursor).stream);
drop(stream);
}
} else {
tracing::warn!(
"net_memories_watch_free: in-flight ops did not drain within deadline; \
leaking inner to avoid use-after-free"
);
}
}
#[derive(Deserialize)]
struct NetDbOpenConfigJson {
origin_hash: u64,
#[serde(default)]
persistent: bool,
#[serde(default)]
with_tasks: bool,
#[serde(default)]
with_memories: bool,
}
pub struct NetDbHandle {
#[allow(dead_code)] redex: ManuallyDrop<Arc<InnerRedex>>,
tasks: Option<ManuallyDrop<Arc<InnerTasksAdapter>>>,
memories: Option<ManuallyDrop<Arc<InnerMemoriesAdapter>>>,
guard: HandleGuard,
}
fn parse_netdb_config(
config_json: *const c_char,
) -> std::result::Result<NetDbOpenConfigJson, c_int> {
if config_json.is_null() {
return Err(NetError::NullPointer.into());
}
let s = match unsafe { c_str_to_owned(config_json) } {
Some(s) => s,
None => return Err(NetError::InvalidUtf8.into()),
};
serde_json::from_str(&s).map_err(|_| NetError::InvalidJson.into())
}
fn netdb_redex_config(persistent: bool) -> RedexFileConfig {
if persistent {
RedexFileConfig::default().with_persistent(true)
} else {
RedexFileConfig::default()
}
}
type NetDbBuildOutcome = (
Arc<InnerRedex>,
Option<Arc<InnerTasksAdapter>>,
Option<Arc<InnerMemoriesAdapter>>,
);
fn build_netdb_handle(
redex_arc: Arc<InnerRedex>,
tasks: Option<Arc<InnerTasksAdapter>>,
memories: Option<Arc<InnerMemoriesAdapter>>,
) -> *mut NetDbHandle {
let handle = Box::new(NetDbHandle {
redex: ManuallyDrop::new(redex_arc),
tasks: tasks.map(ManuallyDrop::new),
memories: memories.map(ManuallyDrop::new),
guard: HandleGuard::new(),
});
Box::into_raw(handle)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_netdb_open(
redex: *mut RedexHandle,
config_json: *const c_char,
out_handle: *mut *mut NetDbHandle,
) -> c_int {
if redex.is_null() || out_handle.is_null() {
return NetError::NullPointer.into();
}
unsafe {
*out_handle = ptr::null_mut();
}
let cfg = match parse_netdb_config(config_json) {
Ok(c) => c,
Err(rc) => return rc,
};
let redex_ref = unsafe { &*redex };
let _op = match redex_ref.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let redex_arc: Arc<InnerRedex> = Arc::clone(&redex_ref.inner);
let file_cfg = netdb_redex_config(cfg.persistent);
let result = block_on(async move {
let tasks = if cfg.with_tasks {
match InnerTasksAdapter::open_with_config(&redex_arc, cfg.origin_hash, file_cfg.clone())
.await
{
Ok(t) => Some(Arc::new(t)),
Err(e) => return Err((redex_arc, e.to_string())),
}
} else {
None
};
let memories = if cfg.with_memories {
match InnerMemoriesAdapter::open_with_config(&redex_arc, cfg.origin_hash, file_cfg)
.await
{
Ok(m) => Some(Arc::new(m)),
Err(e) => {
if let Some(t) = &tasks {
let _ = t.close();
}
return Err((redex_arc, e.to_string()));
}
}
} else {
None
};
Ok((redex_arc, tasks, memories))
});
match result {
Ok((redex_arc, tasks, memories)) => {
let h = build_netdb_handle(redex_arc, tasks, memories);
unsafe {
*out_handle = h;
}
0
}
Err(_) => NET_ERR_NETDB,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_netdb_open_from_snapshot(
redex: *mut RedexHandle,
config_json: *const c_char,
bundle: *const u8,
bundle_len: usize,
out_handle: *mut *mut NetDbHandle,
) -> c_int {
if redex.is_null() || out_handle.is_null() {
return NetError::NullPointer.into();
}
unsafe {
*out_handle = ptr::null_mut();
}
if bundle.is_null() && bundle_len != 0 {
return NetError::NullPointer.into();
}
let cfg = match parse_netdb_config(config_json) {
Ok(c) => c,
Err(rc) => return rc,
};
let snapshot: Option<NetDbSnapshot> = if bundle_len == 0 {
None
} else {
if bundle_len > isize::MAX as usize {
return NetError::InvalidJson.into();
}
let slice = unsafe { std::slice::from_raw_parts(bundle, bundle_len) };
match NetDbSnapshot::decode(slice) {
Ok(s) => Some(s),
Err(_) => return NET_ERR_NETDB,
}
};
let redex_ref = unsafe { &*redex };
let _op = match redex_ref.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let redex_arc: Arc<InnerRedex> = Arc::clone(&redex_ref.inner);
let file_cfg = netdb_redex_config(cfg.persistent);
let result: std::result::Result<NetDbBuildOutcome, String> = block_on(async move {
let tasks = match (
cfg.with_tasks,
snapshot.as_ref().and_then(|s| s.tasks.as_ref()),
) {
(true, Some((bytes, last_seq))) => Some(Arc::new(
InnerTasksAdapter::open_from_snapshot_with_config(
&redex_arc,
cfg.origin_hash,
file_cfg.clone(),
bytes,
*last_seq,
)
.await
.map_err(|e| e.to_string())?,
)),
(true, None) => Some(Arc::new(
InnerTasksAdapter::open_with_config(&redex_arc, cfg.origin_hash, file_cfg.clone())
.await
.map_err(|e| e.to_string())?,
)),
(false, _) => None,
};
let memories = match (
cfg.with_memories,
snapshot.as_ref().and_then(|s| s.memories.as_ref()),
) {
(true, Some((bytes, last_seq))) => {
match InnerMemoriesAdapter::open_from_snapshot_with_config(
&redex_arc,
cfg.origin_hash,
file_cfg,
bytes,
*last_seq,
)
.await
{
Ok(m) => Some(Arc::new(m)),
Err(e) => {
if let Some(t) = &tasks {
let _ = t.close();
}
return Err(e.to_string());
}
}
}
(true, None) => {
match InnerMemoriesAdapter::open_with_config(&redex_arc, cfg.origin_hash, file_cfg)
.await
{
Ok(m) => Some(Arc::new(m)),
Err(e) => {
if let Some(t) = &tasks {
let _ = t.close();
}
return Err(e.to_string());
}
}
}
(false, _) => None,
};
Ok((redex_arc, tasks, memories))
});
match result {
Ok((redex_arc, tasks, memories)) => {
let h = build_netdb_handle(redex_arc, tasks, memories);
unsafe {
*out_handle = h;
}
0
}
Err(_) => NET_ERR_NETDB,
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_netdb_snapshot(
handle: *mut NetDbHandle,
out_bytes: *mut *mut u8,
out_len: *mut usize,
) -> c_int {
if handle.is_null() || out_bytes.is_null() || out_len.is_null() {
return NetError::NullPointer.into();
}
unsafe {
*out_bytes = ptr::null_mut();
*out_len = 0;
}
let netdb = unsafe { &*handle };
let _op = match netdb.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let tasks_snap = match &netdb.tasks {
Some(t) => match t.snapshot() {
Ok(s) => Some(s),
Err(_) => return NET_ERR_NETDB,
},
None => None,
};
let mem_snap = match &netdb.memories {
Some(m) => match m.snapshot() {
Ok(s) => Some(s),
Err(_) => return NET_ERR_NETDB,
},
None => None,
};
let bundle = NetDbSnapshot {
tasks: tasks_snap,
memories: mem_snap,
};
let encoded: Vec<u8> = match bundle.encode() {
Ok(v) => v,
Err(_) => return NET_ERR_NETDB,
};
let boxed: Box<[u8]> = encoded.into_boxed_slice();
let len = boxed.len();
let slice_ptr: *mut [u8] = Box::into_raw(boxed);
unsafe {
*out_bytes = slice_ptr as *mut u8;
*out_len = len;
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_netdb_free_bundle(bytes: *mut u8, len: usize) {
if bytes.is_null() || len == 0 {
return;
}
unsafe {
let slice_ptr = std::ptr::slice_from_raw_parts_mut(bytes, len);
drop(Box::from_raw(slice_ptr));
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_netdb_tasks(
handle: *mut NetDbHandle,
out_handle: *mut *mut TasksAdapterHandle,
) -> c_int {
if handle.is_null() || out_handle.is_null() {
return NetError::NullPointer.into();
}
unsafe {
*out_handle = ptr::null_mut();
}
let netdb = unsafe { &*handle };
let _op = match netdb.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let inner = match &netdb.tasks {
Some(t) => Arc::clone(t),
None => return NET_ERR_NETDB,
};
let h = Box::new(TasksAdapterHandle {
inner: ManuallyDrop::new(inner),
guard: HandleGuard::new(),
});
unsafe {
*out_handle = Box::into_raw(h);
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_netdb_memories(
handle: *mut NetDbHandle,
out_handle: *mut *mut MemoriesAdapterHandle,
) -> c_int {
if handle.is_null() || out_handle.is_null() {
return NetError::NullPointer.into();
}
unsafe {
*out_handle = ptr::null_mut();
}
let netdb = unsafe { &*handle };
let _op = match netdb.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let inner = match &netdb.memories {
Some(m) => Arc::clone(m),
None => return NET_ERR_NETDB,
};
let h = Box::new(MemoriesAdapterHandle {
inner: ManuallyDrop::new(inner),
guard: HandleGuard::new(),
});
unsafe {
*out_handle = Box::into_raw(h);
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_netdb_close(handle: *mut NetDbHandle) -> c_int {
if handle.is_null() {
return NetError::NullPointer.into();
}
let netdb = unsafe { &*handle };
let _op = match netdb.guard.try_enter() {
Some(op) => op,
None => return NetError::ShuttingDown.into(),
};
let tasks_err = netdb
.tasks
.as_ref()
.map(|t| t.close())
.unwrap_or(Ok(()))
.err();
let mem_err = netdb
.memories
.as_ref()
.map(|m| m.close())
.unwrap_or(Ok(()))
.err();
match (tasks_err, mem_err) {
(None, None) => 0,
(Some(_), None) | (None, Some(_)) => NET_ERR_NETDB,
(Some(t), Some(m)) => {
tracing::warn!(
tasks_error = %t,
memories_error = %m,
"net_netdb_close: both adapters failed; surfacing tasks and logging memories",
);
NET_ERR_NETDB
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_netdb_free(handle: *mut NetDbHandle) {
if handle.is_null() {
return;
}
let h: &NetDbHandle = unsafe { &*handle };
if h.guard.begin_free(FFI_HANDLE_FREE_DEADLINE) {
unsafe {
if let Some(t) = (*handle).tasks.as_mut() {
ManuallyDrop::drop(t);
}
if let Some(m) = (*handle).memories.as_mut() {
ManuallyDrop::drop(m);
}
ManuallyDrop::drop(&mut (*handle).redex);
}
} else {
tracing::warn!(
"net_netdb_free: in-flight ops did not drain within deadline; \
leaking inner to avoid use-after-free"
);
}
}
#[allow(dead_code)]
fn _netdb_error_keep_alive(e: InnerNetDbError) -> InnerNetDbError {
e
}
#[doc(hidden)]
pub fn _ffi_cortex_keep_alive() -> *mut c_void {
ptr::null_mut()
}
#[cfg(test)]
mod tests {
use super::*;
use std::ffi::CString;
use std::ptr;
use std::sync::Arc;
use std::sync::Barrier;
use std::thread;
fn redex() -> *mut RedexHandle {
unsafe { net_redex_new(ptr::null()) }
}
fn open_file(redex: *mut RedexHandle, name: &str, cfg_json: Option<&str>) -> c_int {
let name_c = CString::new(name).unwrap();
let cfg_c = cfg_json.map(|s| CString::new(s).unwrap());
let cfg_ptr = cfg_c.as_ref().map(|c| c.as_ptr()).unwrap_or(ptr::null());
let mut handle: *mut RedexFileHandle = ptr::null_mut();
unsafe {
let rc = net_redex_open_file(redex, name_c.as_ptr(), cfg_ptr, &mut handle);
if rc == 0 && !handle.is_null() {
net_redex_file_free(handle);
}
rc
}
}
#[test]
fn redex_open_file_rejects_conflicting_or_zero_fsync_config() {
let r = redex();
assert_eq!(open_file(r, "ok-default", None), 0);
assert_eq!(open_file(r, "ok-everyn", Some(r#"{"fsync_every_n":4}"#)), 0);
assert_eq!(
open_file(r, "ok-interval", Some(r#"{"fsync_interval_ms":50}"#),),
0
);
let invalid = [
("both-set", r#"{"fsync_every_n":4,"fsync_interval_ms":50}"#),
("zero-everyn", r#"{"fsync_every_n":0}"#),
("zero-interval", r#"{"fsync_interval_ms":0}"#),
("both-zero", r#"{"fsync_every_n":0,"fsync_interval_ms":0}"#),
(
"everyn-set-interval-zero",
r#"{"fsync_every_n":4,"fsync_interval_ms":0}"#,
),
];
for (name, cfg) in invalid {
let rc = open_file(r, name, Some(cfg));
assert_eq!(
rc, NET_ERR_REDEX,
"config {name:?} ({cfg}) should be rejected with NET_ERR_REDEX (got {rc})"
);
}
unsafe { net_redex_free(r) };
}
#[test]
fn redex_open_file_rejects_zero_retention() {
let r = redex();
let invalid = [
("zero-events", r#"{"retention_max_events":0}"#),
("zero-bytes", r#"{"retention_max_bytes":0}"#),
("zero-age", r#"{"retention_max_age_ms":0}"#),
(
"any-zero-among-many",
r#"{"retention_max_events":1000,"retention_max_bytes":0}"#,
),
];
for (name, cfg) in invalid {
let rc = open_file(r, name, Some(cfg));
assert_eq!(
rc, NET_ERR_REDEX,
"config {name:?} ({cfg}) must be rejected with NET_ERR_REDEX (got {rc})"
);
}
let valid = [
("non-zero-events", r#"{"retention_max_events":10000}"#),
("non-zero-bytes", r#"{"retention_max_bytes":1048576}"#),
("non-zero-age", r#"{"retention_max_age_ms":60000}"#),
("null-retention", r#"{"retention_max_events":null}"#),
];
for (name, cfg) in valid {
let rc = open_file(r, name, Some(cfg));
assert_eq!(
rc, 0,
"valid config {name:?} ({cfg}) should succeed (got {rc})"
);
}
unsafe { net_redex_free(r) };
}
#[test]
fn redex_open_file_zeroes_out_handle_on_error() {
let r = redex();
let name = CString::new("bad-json").unwrap();
let cfg = CString::new("not-json {").unwrap();
let sentinel = 0xDEAD_BEEF_usize as *mut RedexFileHandle;
let mut handle: *mut RedexFileHandle = sentinel;
let rc = unsafe { net_redex_open_file(r, name.as_ptr(), cfg.as_ptr(), &mut handle) };
assert_eq!(rc, NetError::InvalidJson as c_int);
assert!(
handle.is_null(),
"out_handle must be null after rc != 0; got {handle:?}"
);
unsafe { net_redex_free(r) };
}
#[test]
fn redex_file_tail_zeroes_out_cursor_on_error() {
let r = redex();
let name = CString::new("tail-zero").unwrap();
let mut file: *mut RedexFileHandle = ptr::null_mut();
unsafe {
assert_eq!(
net_redex_open_file(r, name.as_ptr(), ptr::null(), &mut file),
0
);
net_redex_file_free(file);
}
let sentinel = 0xDEAD_BEEF_usize as *mut RedexTailHandle;
let mut cursor: *mut RedexTailHandle = sentinel;
let rc = unsafe { net_redex_file_tail(file, 0, &mut cursor) };
assert_eq!(rc, NetError::ShuttingDown as c_int);
assert!(
cursor.is_null(),
"out_cursor must be null after rc != 0; got {cursor:?}"
);
unsafe { net_redex_free(r) };
}
#[test]
fn redex_open_file_rejects_non_json_config() {
let r = redex();
let name = CString::new("bad-json").unwrap();
let cfg = CString::new("not-json {").unwrap();
let mut handle: *mut RedexFileHandle = ptr::null_mut();
let rc = unsafe { net_redex_open_file(r, name.as_ptr(), cfg.as_ptr(), &mut handle) };
assert_eq!(rc, NetError::InvalidJson as c_int);
assert!(handle.is_null());
unsafe { net_redex_free(r) };
}
#[test]
fn redex_tail_cursor_observes_close_with_stream_ended() {
let r = redex();
let name = CString::new("tail-close").unwrap();
let mut file: *mut RedexFileHandle = ptr::null_mut();
unsafe {
assert_eq!(
net_redex_open_file(r, name.as_ptr(), ptr::null(), &mut file),
0
);
}
let mut cursor: *mut RedexTailHandle = ptr::null_mut();
unsafe {
assert_eq!(net_redex_file_tail(file, 0, &mut cursor), 0);
assert_eq!(net_redex_file_close(file), 0);
}
let mut out_json: *mut c_char = ptr::null_mut();
let mut out_len: usize = 0;
let rc = unsafe { net_redex_tail_next(cursor, 1_000, &mut out_json, &mut out_len) };
assert_eq!(
rc, NET_ERR_STREAM_ENDED,
"expected STREAM_ENDED after file close (got {rc})"
);
assert!(out_json.is_null(), "no event payload should be written");
unsafe {
net_redex_tail_free(cursor);
net_redex_file_free(file);
net_redex_free(r);
}
}
#[test]
fn redex_file_free_blocks_subsequent_ops_with_shutting_down() {
let r = redex();
let name = CString::new("free-then-op").unwrap();
let mut file: *mut RedexFileHandle = ptr::null_mut();
unsafe {
assert_eq!(
net_redex_open_file(r, name.as_ptr(), ptr::null(), &mut file),
0
);
}
assert!(!file.is_null());
unsafe { net_redex_file_free(file) };
let payload = b"x";
let mut out_seq: u64 = 0;
let rc =
unsafe { net_redex_file_append(file, payload.as_ptr(), payload.len(), &mut out_seq) };
assert_eq!(
rc,
NetError::ShuttingDown as c_int,
"post-free append must surface ShuttingDown (got {rc})",
);
assert_eq!(out_seq, 0, "no seq must be assigned to a post-free append");
assert_eq!(unsafe { net_redex_file_len(file) }, 0);
let mut out_json: *mut c_char = ptr::null_mut();
let mut out_len: usize = 0;
unsafe {
let rc = net_redex_file_read_range(file, 0, 1, &mut out_json, &mut out_len);
assert_eq!(rc, NetError::ShuttingDown as c_int);
assert_eq!(net_redex_file_sync(file), NetError::ShuttingDown as c_int);
assert_eq!(net_redex_file_close(file), NetError::ShuttingDown as c_int);
net_redex_free(r);
}
}
#[test]
fn redex_file_free_is_idempotent() {
let r = redex();
let name = CString::new("free-twice").unwrap();
let mut file: *mut RedexFileHandle = ptr::null_mut();
unsafe {
assert_eq!(
net_redex_open_file(r, name.as_ptr(), ptr::null(), &mut file),
0
);
net_redex_file_free(file);
net_redex_file_free(file);
net_redex_free(r);
}
}
#[test]
fn redex_file_free_waits_for_inflight_append() {
use std::sync::atomic::{AtomicBool, Ordering};
let r = redex();
let name = CString::new("free-races-append").unwrap();
let mut file: *mut RedexFileHandle = ptr::null_mut();
unsafe {
assert_eq!(
net_redex_open_file(r, name.as_ptr(), ptr::null(), &mut file),
0
);
}
let file_addr = file as usize;
let started = Arc::new(AtomicBool::new(false));
let done = Arc::new(AtomicBool::new(false));
let started_w = started.clone();
let done_w = done.clone();
let worker = std::thread::spawn(move || {
started_w.store(true, Ordering::SeqCst);
let payload = b"hello";
let mut out_seq: u64 = 0;
let h = file_addr as *mut RedexFileHandle;
std::thread::sleep(std::time::Duration::from_millis(30));
let rc =
unsafe { net_redex_file_append(h, payload.as_ptr(), payload.len(), &mut out_seq) };
done_w.store(true, Ordering::SeqCst);
assert!(
rc == 0 || rc == NetError::ShuttingDown as c_int,
"post-fix append after begin_free must EITHER succeed (op got there first) \
OR return ShuttingDown — never UAF. Got rc={rc}, out_seq={out_seq}",
);
});
while !started.load(Ordering::SeqCst) {
std::thread::yield_now();
}
unsafe { net_redex_file_free(file) };
worker.join().unwrap();
assert!(
done.load(Ordering::SeqCst),
"worker must have completed; the test would otherwise hang \
past the watchdog if free's begin_free deadlocked",
);
unsafe { net_redex_free(r) };
}
#[test]
fn runtime_first_call_returns_same_instance_under_concurrency() {
const THREADS: usize = 16;
let barrier = Arc::new(Barrier::new(THREADS));
let mut handles = Vec::with_capacity(THREADS);
for _ in 0..THREADS {
let b = barrier.clone();
handles.push(thread::spawn(move || {
b.wait();
let rt = runtime();
Arc::as_ptr(rt) as usize
}));
}
let mut ptrs: Vec<usize> = handles.into_iter().map(|h| h.join().unwrap()).collect();
ptrs.sort();
ptrs.dedup();
assert_eq!(
ptrs.len(),
1,
"concurrent first-callers observed {} distinct runtimes (must be exactly 1)",
ptrs.len()
);
}
#[test]
fn replication_runtime_count_zero_when_not_enabled() {
let r = redex();
unsafe {
assert_eq!(net_redex_replication_runtime_count(r), 0);
net_redex_free(r);
}
}
#[test]
fn replication_prometheus_text_empty_when_not_enabled() {
let r = redex();
let p = unsafe { net_redex_replication_prometheus_text(r) };
assert!(!p.is_null());
let s = unsafe { CStr::from_ptr(p) }.to_str().unwrap();
assert_eq!(s, "");
unsafe {
crate::ffi::net_free_string(p);
net_redex_free(r);
}
}
#[test]
fn replication_prometheus_text_null_handle_returns_null() {
let p = unsafe { net_redex_replication_prometheus_text(ptr::null()) };
assert!(p.is_null());
}
#[test]
fn open_file_with_replication_without_enable_fails() {
let r = redex();
let cfg = r#"{"replication":{"factor":3,"heartbeat_ms":500}}"#;
let rc = open_file(r, "ffi/repl_unconfigured", Some(cfg));
assert_eq!(rc, NET_ERR_REDEX);
unsafe { net_redex_free(r) };
}
#[test]
fn open_file_with_invalid_replication_config_rejected() {
let r = redex();
let cfg = r#"{"replication":{"placement":"impossible"}}"#;
let rc = open_file(r, "ffi/repl_invalid_placement", Some(cfg));
assert_eq!(rc, NET_ERR_REDEX);
let cfg = r#"{"replication":{"placement":"pinned"}}"#;
let rc = open_file(r, "ffi/repl_pinned_no_nodes", Some(cfg));
assert_eq!(rc, NET_ERR_REDEX);
let cfg = r#"{"replication":{"on_under_capacity":"impossible"}}"#;
let rc = open_file(r, "ffi/repl_invalid_policy", Some(cfg));
assert_eq!(rc, NET_ERR_REDEX);
unsafe { net_redex_free(r) };
}
#[test]
fn replication_functions_idempotent_on_null_redex() {
unsafe {
assert_eq!(net_redex_replication_runtime_count(ptr::null()), 0);
let p = net_redex_replication_prometheus_text(ptr::null());
assert!(p.is_null());
}
}
#[cfg(feature = "net")]
#[test]
fn enable_replication_drops_mesh_arc_on_null_redex() {
use crate::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
let rt = tokio::runtime::Runtime::new().unwrap();
let mesh = rt.block_on(async {
let identity = EntityKeypair::generate();
let cfg = MeshNodeConfig::new(
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
[0u8; 32],
);
Arc::new(MeshNode::new(identity, cfg).await.unwrap())
});
let pre_count = Arc::strong_count(&mesh);
let boxed_arc: *mut Arc<MeshNode> = Box::into_raw(Box::new(mesh.clone()));
assert_eq!(Arc::strong_count(&mesh), pre_count + 1);
let rc = unsafe { net_redex_enable_replication(ptr::null_mut(), boxed_arc) };
let expected: c_int = NetError::NullPointer.into();
assert_eq!(rc, expected);
assert_eq!(
Arc::strong_count(&mesh),
pre_count,
"net_redex_enable_replication must drop the boxed Arc on error paths"
);
}
#[cfg(all(feature = "net", feature = "dataforts"))]
#[test]
fn enable_greedy_drops_mesh_arc_on_null_redex() {
use crate::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
let rt = tokio::runtime::Runtime::new().unwrap();
let mesh = rt.block_on(async {
let identity = EntityKeypair::generate();
let cfg = MeshNodeConfig::new(
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
[0u8; 32],
);
Arc::new(MeshNode::new(identity, cfg).await.unwrap())
});
let pre_count = Arc::strong_count(&mesh);
let boxed_arc: *mut Arc<MeshNode> = Box::into_raw(Box::new(mesh.clone()));
assert_eq!(Arc::strong_count(&mesh), pre_count + 1);
let rc =
unsafe { net_redex_enable_greedy_dataforts(ptr::null_mut(), boxed_arc, ptr::null()) };
let expected: c_int = NetError::NullPointer.into();
assert_eq!(rc, expected);
assert_eq!(
Arc::strong_count(&mesh),
pre_count,
"net_redex_enable_greedy_dataforts must drop the boxed Arc on error paths"
);
}
#[cfg(all(feature = "net", feature = "dataforts"))]
#[test]
fn greedy_enable_disable_round_trip() {
use crate::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig};
use std::ffi::CString;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
let rt = tokio::runtime::Runtime::new().unwrap();
let mesh = rt.block_on(async {
let identity = EntityKeypair::generate();
let cfg = MeshNodeConfig::new(
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
[0u8; 32],
);
Arc::new(MeshNode::new(identity, cfg).await.unwrap())
});
let r = redex();
let boxed_arc: *mut Arc<MeshNode> = Box::into_raw(Box::new(mesh.clone()));
let cfg_json = CString::new(r#"{"intent_match":"disabled"}"#).unwrap();
let rc = unsafe { net_redex_enable_greedy_dataforts(r, boxed_arc, cfg_json.as_ptr()) };
assert_eq!(rc, 0, "enable must succeed");
assert_eq!(unsafe { net_redex_greedy_cached_channel_count(r) }, 0);
let p = unsafe { net_redex_greedy_prometheus_text(r) };
assert!(!p.is_null());
let text = unsafe { std::ffi::CStr::from_ptr(p) }
.to_string_lossy()
.into_owned();
unsafe { super::super::net_free_string(p) };
assert!(
text.contains("dataforts_greedy_admit_rejected_total"),
"Prometheus text must include the admit-rejected metric family"
);
assert_eq!(unsafe { net_redex_disable_greedy_dataforts(r) }, 0);
let p_after = unsafe { net_redex_greedy_prometheus_text(r) };
assert!(!p_after.is_null());
let after_text = unsafe { std::ffi::CStr::from_ptr(p_after) }
.to_string_lossy()
.into_owned();
unsafe { super::super::net_free_string(p_after) };
assert!(
after_text.is_empty(),
"post-disable Prometheus text must be empty; got {after_text:?}"
);
unsafe { net_redex_free(r) };
}
fn open_full_netdb(r: *mut RedexHandle, origin: u64, persistent: bool) -> *mut NetDbHandle {
let cfg = format!(
r#"{{"origin_hash":{origin},"persistent":{persistent},"with_tasks":true,"with_memories":true}}"#,
origin = origin,
persistent = persistent,
);
let cfg_c = CString::new(cfg).unwrap();
let mut h: *mut NetDbHandle = ptr::null_mut();
let rc = unsafe { net_netdb_open(r, cfg_c.as_ptr(), &mut h) };
assert_eq!(rc, 0, "net_netdb_open should succeed (rc={rc})");
assert!(!h.is_null());
h
}
#[test]
fn netdb_open_zeroes_out_handle_on_error() {
let r = redex();
let bad = CString::new("not json").unwrap();
let sentinel = 0xDEAD_BEEF_usize as *mut NetDbHandle;
let mut h: *mut NetDbHandle = sentinel;
let rc = unsafe { net_netdb_open(r, bad.as_ptr(), &mut h) };
assert_eq!(rc, NetError::InvalidJson as c_int);
assert!(h.is_null(), "expected null on error, got {h:?}");
unsafe { net_redex_free(r) };
}
#[test]
fn netdb_accessor_rejects_unenabled_model() {
let r = redex();
let cfg =
CString::new(r#"{"origin_hash":42,"with_tasks":true,"with_memories":false}"#).unwrap();
let mut db: *mut NetDbHandle = ptr::null_mut();
unsafe {
assert_eq!(net_netdb_open(r, cfg.as_ptr(), &mut db), 0);
}
let mut t: *mut TasksAdapterHandle = ptr::null_mut();
unsafe {
assert_eq!(net_netdb_tasks(db, &mut t), 0);
}
assert!(!t.is_null());
unsafe { net_tasks_adapter_free(t) };
let sentinel = 0xDEAD_BEEF_usize as *mut MemoriesAdapterHandle;
let mut m: *mut MemoriesAdapterHandle = sentinel;
let rc = unsafe { net_netdb_memories(db, &mut m) };
assert_eq!(rc, NET_ERR_NETDB);
assert!(m.is_null(), "expected null on error, got {m:?}");
unsafe {
net_netdb_free(db);
net_redex_free(r);
}
}
#[test]
fn netdb_snapshot_roundtrips_through_ffi() {
let r = redex();
let db = open_full_netdb(r, 0xDEAD_BEEF, false);
let mut t: *mut TasksAdapterHandle = ptr::null_mut();
let title = CString::new("first").unwrap();
let mut seq: u64 = 0;
unsafe {
assert_eq!(net_netdb_tasks(db, &mut t), 0);
assert_eq!(
net_tasks_create(t, 1, title.as_ptr(), 1_000_000, &mut seq),
0
);
assert_eq!(net_tasks_wait_for_seq(t, seq, 500), 0);
net_tasks_adapter_free(t);
}
let mut bytes: *mut u8 = ptr::null_mut();
let mut len: usize = 0;
unsafe {
assert_eq!(net_netdb_snapshot(db, &mut bytes, &mut len), 0);
}
assert!(!bytes.is_null());
assert!(len > 0, "snapshot bundle should not be empty");
unsafe {
let _ = net_netdb_close(db);
net_netdb_free(db);
}
let cfg =
CString::new(r#"{"origin_hash":3735928559,"with_tasks":true,"with_memories":true}"#)
.unwrap();
let mut db2: *mut NetDbHandle = ptr::null_mut();
let rc = unsafe { net_netdb_open_from_snapshot(r, cfg.as_ptr(), bytes, len, &mut db2) };
assert_eq!(rc, 0, "restore should succeed (rc={rc})");
assert!(!db2.is_null());
let mut t2: *mut TasksAdapterHandle = ptr::null_mut();
let filter = CString::new("{}").unwrap();
let mut list_json: *mut c_char = ptr::null_mut();
let mut list_len: usize = 0;
unsafe {
assert_eq!(net_netdb_tasks(db2, &mut t2), 0);
assert_eq!(
net_tasks_list(t2, filter.as_ptr(), &mut list_json, &mut list_len),
0
);
}
let list = unsafe { CStr::from_ptr(list_json) }
.to_string_lossy()
.into_owned();
unsafe { super::super::net_free_string(list_json) };
assert!(
list.contains("\"first\""),
"restored task list should contain the seeded title; got {list}"
);
unsafe {
net_tasks_adapter_free(t2);
net_netdb_free_bundle(bytes, len);
net_netdb_free(db2);
net_redex_free(r);
}
}
#[test]
fn netdb_free_bundle_is_null_safe() {
unsafe {
net_netdb_free_bundle(ptr::null_mut(), 0);
net_netdb_free_bundle(ptr::null_mut(), 16);
let mut buf: Vec<u8> = vec![0u8; 4];
net_netdb_free_bundle(buf.as_mut_ptr(), 0);
}
}
#[test]
fn netdb_open_from_empty_snapshot_opens_from_scratch() {
let r = redex();
let cfg =
CString::new(r#"{"origin_hash":1,"with_tasks":true,"with_memories":false}"#).unwrap();
let mut db: *mut NetDbHandle = ptr::null_mut();
let rc = unsafe { net_netdb_open_from_snapshot(r, cfg.as_ptr(), ptr::null(), 0, &mut db) };
assert_eq!(rc, 0);
assert!(!db.is_null());
unsafe {
net_netdb_free(db);
net_redex_free(r);
}
}
}