#![allow(clippy::missing_safety_doc)]
#![allow(clippy::not_unsafe_ptr_arg_deref)]
#![expect(
clippy::undocumented_unsafe_blocks,
reason = "module-wide FFI safety contract documented in the # Safety preamble above"
)]
#![expect(
clippy::multiple_unsafe_ops_per_block,
reason = "FFI entry points deref input pointers together with out-parameter writes under the same caller contract"
)]
use parking_lot::Mutex;
use std::ffi::CStr;
use std::mem::ManuallyDrop;
use std::os::raw::{c_char, c_int};
use std::panic::{catch_unwind, AssertUnwindSafe};
use super::handle_guard::{HandleGuard, FFI_HANDLE_FREE_DEADLINE};
pub struct RedisStreamDedupHandle {
inner: ManuallyDrop<Mutex<crate::adapter::RedisStreamDedup>>,
guard: HandleGuard,
}
#[inline]
fn ffi_guard<R>(name: &'static str, fallback: R, f: impl FnOnce() -> R) -> R {
match catch_unwind(AssertUnwindSafe(f)) {
Ok(v) => v,
Err(_) => {
tracing::error!(
ffi_function = name,
"panic caught in net_redis_dedup FFI; returning fallback to avoid \
UB across the C boundary",
);
fallback
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redis_dedup_new(capacity: usize) -> *mut RedisStreamDedupHandle {
ffi_guard("net_redis_dedup_new", std::ptr::null_mut(), || {
let inner = if capacity == 0 {
crate::adapter::RedisStreamDedup::new()
} else {
crate::adapter::RedisStreamDedup::with_capacity(capacity)
};
Box::into_raw(Box::new(RedisStreamDedupHandle {
inner: ManuallyDrop::new(Mutex::new(inner)),
guard: HandleGuard::new(),
}))
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redis_dedup_free(handle: *mut RedisStreamDedupHandle) {
ffi_guard("net_redis_dedup_free", (), || {
if handle.is_null() {
return;
}
let h: &RedisStreamDedupHandle = unsafe { &*handle };
if h.guard.begin_free(FFI_HANDLE_FREE_DEADLINE) {
unsafe {
let inner = ManuallyDrop::take(&mut (*handle).inner);
drop(inner);
}
} else {
tracing::warn!(
"net_redis_dedup_free: in-flight ops did not drain within deadline; \
leaking inner to avoid use-after-free"
);
}
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redis_dedup_is_duplicate(
handle: *mut RedisStreamDedupHandle,
dedup_id: *const c_char,
) -> c_int {
ffi_guard("net_redis_dedup_is_duplicate", -1, || {
if handle.is_null() || dedup_id.is_null() {
return -1;
}
let id = unsafe { CStr::from_ptr(dedup_id) };
let Ok(id_str) = id.to_str() else {
return -2;
};
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return -1,
};
let mut guard = h.inner.lock();
if guard.is_duplicate(id_str) {
1
} else {
0
}
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redis_dedup_len(handle: *mut RedisStreamDedupHandle) -> usize {
ffi_guard("net_redis_dedup_len", 0, || {
if handle.is_null() {
return 0;
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return 0,
};
let guard = h.inner.lock();
guard.len()
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redis_dedup_capacity(handle: *mut RedisStreamDedupHandle) -> usize {
ffi_guard("net_redis_dedup_capacity", 0, || {
if handle.is_null() {
return 0;
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return 0,
};
let guard = h.inner.lock();
guard.capacity()
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redis_dedup_is_empty(handle: *mut RedisStreamDedupHandle) -> c_int {
ffi_guard("net_redis_dedup_is_empty", -1, || {
if handle.is_null() {
return -1;
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return -1,
};
let guard = h.inner.lock();
if guard.is_empty() {
1
} else {
0
}
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn net_redis_dedup_clear(handle: *mut RedisStreamDedupHandle) {
ffi_guard("net_redis_dedup_clear", (), || {
if handle.is_null() {
return;
}
let h = unsafe { &*handle };
let _op = match h.guard.try_enter() {
Some(op) => op,
None => return,
};
let mut guard = h.inner.lock();
guard.clear();
})
}
#[cfg(test)]
mod tests {
use super::*;
use std::ffi::CString;
use std::ptr;
#[test]
fn ffi_guard_catches_panic_and_returns_fallback() {
let v = ffi_guard("test_guard", -42i32, || {
panic!("intentional FFI panic");
});
assert_eq!(v, -42);
let v = ffi_guard("test_guard", 0i32, || 7);
assert_eq!(v, 7);
}
#[test]
fn null_handle_returns_negative() {
let id = CString::new("anything").unwrap();
assert_eq!(
unsafe { net_redis_dedup_is_duplicate(ptr::null_mut(), id.as_ptr()) },
-1,
);
assert_eq!(unsafe { net_redis_dedup_len(ptr::null_mut()) }, 0);
assert_eq!(unsafe { net_redis_dedup_capacity(ptr::null_mut()) }, 0);
assert_eq!(unsafe { net_redis_dedup_is_empty(ptr::null_mut()) }, -1);
unsafe { net_redis_dedup_free(ptr::null_mut()) };
unsafe { net_redis_dedup_clear(ptr::null_mut()) };
}
#[test]
fn null_dedup_id_returns_negative() {
let h = unsafe { net_redis_dedup_new(8) };
assert_eq!(unsafe { net_redis_dedup_is_duplicate(h, ptr::null()) }, -1);
unsafe { net_redis_dedup_free(h) };
}
#[test]
fn lifecycle_round_trip_filters_duplicates() {
let h = unsafe { net_redis_dedup_new(0) }; assert_eq!(unsafe { net_redis_dedup_capacity(h) }, 4096);
assert_eq!(unsafe { net_redis_dedup_is_empty(h) }, 1);
let id_a = CString::new("deadbeef:0:0:0").unwrap();
let id_b = CString::new("deadbeef:0:0:1").unwrap();
assert_eq!(unsafe { net_redis_dedup_is_duplicate(h, id_a.as_ptr()) }, 0);
assert_eq!(unsafe { net_redis_dedup_is_duplicate(h, id_b.as_ptr()) }, 0);
assert_eq!(unsafe { net_redis_dedup_len(h) }, 2);
assert_eq!(unsafe { net_redis_dedup_is_empty(h) }, 0);
assert_eq!(unsafe { net_redis_dedup_is_duplicate(h, id_a.as_ptr()) }, 1);
assert_eq!(unsafe { net_redis_dedup_is_duplicate(h, id_b.as_ptr()) }, 1);
unsafe { net_redis_dedup_clear(h) };
assert_eq!(unsafe { net_redis_dedup_len(h) }, 0);
assert_eq!(unsafe { net_redis_dedup_is_empty(h) }, 1);
unsafe { net_redis_dedup_free(h) };
}
#[test]
fn capacity_zero_is_clamped_to_default() {
let h = unsafe { net_redis_dedup_new(0) };
assert_eq!(unsafe { net_redis_dedup_capacity(h) }, 4096);
unsafe { net_redis_dedup_free(h) };
}
#[test]
fn explicit_capacity_round_trips() {
let h = unsafe { net_redis_dedup_new(8192) };
assert_eq!(unsafe { net_redis_dedup_capacity(h) }, 8192);
unsafe { net_redis_dedup_free(h) };
}
#[test]
fn invalid_utf8_dedup_id_returns_minus_two() {
use std::ffi::CString;
let h = unsafe { net_redis_dedup_new(8) };
let bad: CString = unsafe { CString::from_vec_unchecked(vec![0xC0, 0x41]) };
let rc = unsafe { net_redis_dedup_is_duplicate(h, bad.as_ptr()) };
assert_eq!(rc, -2, "invalid UTF-8 dedup_id must return -2, got {rc}");
assert_eq!(unsafe { net_redis_dedup_len(h) }, 0);
unsafe { net_redis_dedup_free(h) };
}
#[test]
fn concurrent_threads_on_one_handle_serialize_safely() {
use std::ffi::CString;
use std::sync::Arc;
use std::thread;
const THREADS: usize = 8;
const PER_THREAD: usize = 100;
const TOTAL: usize = THREADS * PER_THREAD;
let h = unsafe { net_redis_dedup_new(TOTAL) };
struct HandleSend(*mut RedisStreamDedupHandle);
unsafe impl Send for HandleSend {}
unsafe impl Sync for HandleSend {}
let shared = Arc::new(HandleSend(h));
let mut handles = Vec::with_capacity(THREADS);
for tid in 0..THREADS {
let shared = shared.clone();
handles.push(thread::spawn(move || {
for i in 0..PER_THREAD {
let id = CString::new(format!("t{tid}-id{i}")).unwrap();
let rc = unsafe { net_redis_dedup_is_duplicate(shared.0, id.as_ptr()) };
assert!(
rc == 0 || rc == 1,
"thread {tid} id {i}: rc {rc} ∉ {{0, 1}} — \
concurrent FFI call returned an error code; \
Mutex serialization may be broken",
);
assert_eq!(
rc, 0,
"thread {tid} id {i}: expected new (0), got duplicate (1)"
);
}
}));
}
for h in handles {
h.join().expect("test thread panicked");
}
assert_eq!(
unsafe { net_redis_dedup_len(h) },
TOTAL,
"expected {TOTAL} ids tracked after concurrent inserts; \
missing ids → concurrent calls dropped mutations",
);
unsafe { net_redis_dedup_free(h) };
}
}