use alloc::boxed::Box;
use alloc::vec::Vec;
use core::ffi::c_int;
use core::ptr;
use core::slice;
use std::sync::{
Mutex,
atomic::{AtomicBool, AtomicU32, Ordering},
};
use std::time::{Duration, Instant};
use crate::ZeroDdsStatus;
use crate::entities::{ZeroDdsDataReader, ZeroDdsDataWriter};
#[repr(u32)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConditionKind {
Guard = 1,
Status = 2,
Read = 3,
Query = 4,
}
#[repr(C)]
struct ConditionHeader {
kind: ConditionKind,
}
#[repr(C)]
pub struct ZeroDdsGuardCondition {
#[allow(dead_code)]
header: ConditionHeader,
trigger: AtomicBool,
}
#[repr(C)]
pub struct ZeroDdsStatusCondition {
#[allow(dead_code)]
header: ConditionHeader,
#[allow(dead_code)]
entity: *mut core::ffi::c_void,
enabled_statuses: AtomicU32,
}
unsafe impl Send for ZeroDdsStatusCondition {}
unsafe impl Sync for ZeroDdsStatusCondition {}
#[repr(C)]
pub struct ZeroDdsReadCondition {
#[allow(dead_code)]
header: ConditionHeader,
reader: *mut ZeroDdsDataReader,
sample_states: u32,
view_states: u32,
instance_states: u32,
}
unsafe impl Send for ZeroDdsReadCondition {}
unsafe impl Sync for ZeroDdsReadCondition {}
#[repr(C)]
pub struct ZeroDdsQueryCondition {
#[allow(dead_code)]
header: ConditionHeader,
reader: *mut ZeroDdsDataReader,
sample_states: u32,
view_states: u32,
instance_states: u32,
expression: alloc::string::String,
parameters: Vec<alloc::string::String>,
}
unsafe impl Send for ZeroDdsQueryCondition {}
unsafe impl Sync for ZeroDdsQueryCondition {}
fn drain_channel_into_cache(drr: &ZeroDdsDataReader) {
use zerodds_dcps::runtime::UserSample;
if let (Ok(rx), Ok(mut cache)) = (drr.rx.lock(), drr.read_cache.lock()) {
while let Ok(s) = rx.try_recv() {
let pass = if let Some(filter) = &drr.cft_filter {
match &s {
UserSample::Alive { payload, .. } => filter.evaluate(payload),
UserSample::Lifecycle { .. } => true,
}
} else {
true
};
if pass {
cache.push((s, crate::entities::ReadSampleState::NotRead));
}
}
}
}
fn cache_has_matching(
drr: &ZeroDdsDataReader,
sample_states: u32,
view_states: u32,
instance_states: u32,
) -> bool {
let cache = match drr.read_cache.lock() {
Ok(g) => g,
Err(_) => return false,
};
cache.iter().any(|(_sample, st)| {
let s_bit = match st {
crate::entities::ReadSampleState::Read => 1u32,
crate::entities::ReadSampleState::NotRead => 2u32,
};
let s_ok = sample_states == 0 || (sample_states & s_bit) != 0;
let v_ok = view_states == 0 || (view_states & 1u32) != 0; let i_ok = instance_states == 0 || (instance_states & 1u32) != 0; s_ok && v_ok && i_ok
})
}
fn cache_has_matching_query(
drr: &ZeroDdsDataReader,
sample_states: u32,
view_states: u32,
instance_states: u32,
expression: &str,
params: &[alloc::string::String],
) -> bool {
use zerodds_dcps::runtime::UserSample;
if !cache_has_matching(drr, sample_states, view_states, instance_states) {
return false;
}
let expr = match zerodds_sql_filter::parse(expression) {
Ok(e) => e,
Err(_) => return false,
};
let values: Vec<zerodds_sql_filter::Value> = params
.iter()
.map(|p| zerodds_sql_filter::Value::String(p.clone()))
.collect();
let cache = match drr.read_cache.lock() {
Ok(g) => g,
Err(_) => return false,
};
struct EmptyRow;
impl zerodds_sql_filter::RowAccess for EmptyRow {
fn get(&self, _path: &str) -> Option<zerodds_sql_filter::Value> {
None
}
}
cache.iter().any(|(s, _)| match s {
UserSample::Alive { .. } => expr.evaluate(&EmptyRow, &values).unwrap_or(true),
UserSample::Lifecycle { .. } => true,
})
}
pub unsafe fn condition_state_masks(c: *const core::ffi::c_void) -> Option<(u32, u32, u32)> {
unsafe {
let kind = condition_kind(c)?;
match kind {
ConditionKind::Read => {
let r = &*(c as *const ZeroDdsReadCondition);
Some((r.sample_states, r.view_states, r.instance_states))
}
ConditionKind::Query => {
let q = &*(c as *const ZeroDdsQueryCondition);
Some((q.sample_states, q.view_states, q.instance_states))
}
_ => None,
}
}
}
unsafe fn condition_kind(p: *const core::ffi::c_void) -> Option<ConditionKind> {
if p.is_null() {
return None;
}
let hdr = unsafe { &*(p as *const ConditionHeader) };
Some(hdr.kind)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_condition_get_trigger_value(c: *const core::ffi::c_void) -> bool {
unsafe {
let kind = match condition_kind(c) {
Some(k) => k,
None => return false,
};
match kind {
ConditionKind::Guard => (*(c as *const ZeroDdsGuardCondition))
.trigger
.load(Ordering::SeqCst),
ConditionKind::Status => {
(*(c as *const ZeroDdsStatusCondition))
.enabled_statuses
.load(Ordering::SeqCst)
!= 0
}
ConditionKind::Read => {
let r = &*(c as *const ZeroDdsReadCondition);
if r.reader.is_null() {
return false;
}
let drr = &*r.reader;
drain_channel_into_cache(drr);
cache_has_matching(drr, r.sample_states, r.view_states, r.instance_states)
}
ConditionKind::Query => {
let q = &*(c as *const ZeroDdsQueryCondition);
if q.reader.is_null() {
return false;
}
let drr = &*q.reader;
drain_channel_into_cache(drr);
cache_has_matching_query(
drr,
q.sample_states,
q.view_states,
q.instance_states,
&q.expression,
&q.parameters,
)
}
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn zerodds_guardcondition_create() -> *mut ZeroDdsGuardCondition {
Box::into_raw(Box::new(ZeroDdsGuardCondition {
header: ConditionHeader {
kind: ConditionKind::Guard,
},
trigger: AtomicBool::new(false),
}))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_guardcondition_destroy(g: *mut ZeroDdsGuardCondition) {
if !g.is_null() {
let _ = unsafe { Box::from_raw(g) };
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_guardcondition_set_trigger_value(
g: *mut ZeroDdsGuardCondition,
v: bool,
) -> c_int {
if g.is_null() {
return ZeroDdsStatus::BadHandle as c_int;
}
unsafe { (*g).trigger.store(v, Ordering::SeqCst) };
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_entity_get_statuscondition(
entity: *mut core::ffi::c_void,
) -> *mut ZeroDdsStatusCondition {
Box::into_raw(Box::new(ZeroDdsStatusCondition {
header: ConditionHeader {
kind: ConditionKind::Status,
},
entity,
enabled_statuses: AtomicU32::new(0xFFFF_FFFF),
}))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_statuscondition_set_enabled_statuses(
c: *mut ZeroDdsStatusCondition,
mask: u32,
) -> c_int {
if c.is_null() {
return ZeroDdsStatus::BadHandle as c_int;
}
unsafe { (*c).enabled_statuses.store(mask, Ordering::SeqCst) };
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_statuscondition_get_enabled_statuses(
c: *mut ZeroDdsStatusCondition,
) -> u32 {
if c.is_null() {
return 0;
}
unsafe { (*c).enabled_statuses.load(Ordering::SeqCst) }
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_statuscondition_destroy(c: *mut ZeroDdsStatusCondition) {
if !c.is_null() {
let _ = unsafe { Box::from_raw(c) };
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_create_readcondition(
dr: *mut ZeroDdsDataReader,
sample_states: u32,
view_states: u32,
instance_states: u32,
) -> *mut ZeroDdsReadCondition {
if dr.is_null() {
return ptr::null_mut();
}
Box::into_raw(Box::new(ZeroDdsReadCondition {
header: ConditionHeader {
kind: ConditionKind::Read,
},
reader: dr,
sample_states,
view_states,
instance_states,
}))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_create_querycondition(
dr: *mut ZeroDdsDataReader,
sample_states: u32,
view_states: u32,
instance_states: u32,
expr: *const core::ffi::c_char,
params: *const *const core::ffi::c_char,
param_count: usize,
) -> *mut ZeroDdsQueryCondition {
if dr.is_null() || expr.is_null() {
return ptr::null_mut();
}
unsafe {
let cs = std::ffi::CStr::from_ptr(expr);
let expression = match cs.to_str() {
Ok(s) => s.to_string(),
Err(_) => return ptr::null_mut(),
};
let mut parameters: Vec<alloc::string::String> = Vec::with_capacity(param_count);
if !params.is_null() && param_count > 0 {
let slc = slice::from_raw_parts(params, param_count);
for &p in slc {
if p.is_null() {
continue;
}
let cs = std::ffi::CStr::from_ptr(p);
if let Ok(s) = cs.to_str() {
parameters.push(s.to_string());
}
}
}
Box::into_raw(Box::new(ZeroDdsQueryCondition {
header: ConditionHeader {
kind: ConditionKind::Query,
},
reader: dr,
sample_states,
view_states,
instance_states,
expression,
parameters,
}))
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_delete_readcondition(
_dr: *mut ZeroDdsDataReader,
c: *mut core::ffi::c_void,
) -> c_int {
unsafe {
let kind = match condition_kind(c) {
Some(k) => k,
None => return ZeroDdsStatus::BadHandle as c_int,
};
match kind {
ConditionKind::Read => {
let _ = Box::from_raw(c as *mut ZeroDdsReadCondition);
}
ConditionKind::Query => {
let _ = Box::from_raw(c as *mut ZeroDdsQueryCondition);
}
_ => return ZeroDdsStatus::PreconditionNotMet as c_int,
}
}
ZeroDdsStatus::Ok as c_int
}
pub struct ZeroDdsWaitSet {
conditions: Mutex<Vec<*mut core::ffi::c_void>>,
}
unsafe impl Send for ZeroDdsWaitSet {}
unsafe impl Sync for ZeroDdsWaitSet {}
#[unsafe(no_mangle)]
pub extern "C" fn zerodds_waitset_create() -> *mut ZeroDdsWaitSet {
Box::into_raw(Box::new(ZeroDdsWaitSet {
conditions: Mutex::new(Vec::new()),
}))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_waitset_destroy(w: *mut ZeroDdsWaitSet) {
if !w.is_null() {
let _ = unsafe { Box::from_raw(w) };
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_waitset_attach_condition(
w: *mut ZeroDdsWaitSet,
cond: *mut core::ffi::c_void,
) -> c_int {
if w.is_null() || cond.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
unsafe {
if let Ok(mut g) = (*w).conditions.lock() {
if !g.contains(&cond) {
g.push(cond);
}
}
}
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_waitset_detach_condition(
w: *mut ZeroDdsWaitSet,
cond: *mut core::ffi::c_void,
) -> c_int {
if w.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
unsafe {
if let Ok(mut g) = (*w).conditions.lock() {
let n = g.len();
g.retain(|c| *c != cond);
if g.len() == n {
return ZeroDdsStatus::PreconditionNotMet as c_int;
}
}
}
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_waitset_wait(
w: *mut ZeroDdsWaitSet,
out_active: *mut *mut core::ffi::c_void,
cap: usize,
out_count: *mut usize,
timeout_sec: i32,
timeout_nanosec: u32,
) -> c_int {
if w.is_null() || out_active.is_null() || out_count.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let timeout = if timeout_sec == i32::MAX && timeout_nanosec == u32::MAX {
Duration::from_secs(60 * 60 * 24 * 365 * 100) } else {
Duration::new(timeout_sec.max(0) as u64, timeout_nanosec)
};
let deadline = Instant::now() + timeout;
unsafe {
let ws = &*w;
loop {
let conds: Vec<*mut core::ffi::c_void> =
ws.conditions.lock().map(|g| g.clone()).unwrap_or_default();
let mut active: Vec<*mut core::ffi::c_void> = Vec::new();
for &c in conds.iter() {
if zerodds_condition_get_trigger_value(c as *const _) {
active.push(c);
}
}
if !active.is_empty() {
let n = active.len().min(cap);
let dst = slice::from_raw_parts_mut(out_active, n);
dst.copy_from_slice(&active[..n]);
*out_count = n;
return ZeroDdsStatus::Ok as c_int;
}
if Instant::now() >= deadline {
*out_count = 0;
return ZeroDdsStatus::Timeout as c_int;
}
std::thread::sleep(Duration::from_millis(5));
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_waitset_get_conditions(
w: *mut ZeroDdsWaitSet,
out: *mut *mut core::ffi::c_void,
cap: usize,
out_count: *mut usize,
) -> c_int {
if w.is_null() || out.is_null() || out_count.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
unsafe {
let conds: Vec<*mut core::ffi::c_void> = (*w)
.conditions
.lock()
.map(|g| g.clone())
.unwrap_or_default();
let n = conds.len().min(cap);
let dst = slice::from_raw_parts_mut(out, n);
dst.copy_from_slice(&conds[..n]);
*out_count = n;
}
ZeroDdsStatus::Ok as c_int
}
#[allow(dead_code)]
fn _suppress(_: *mut ZeroDdsDataWriter) {}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn condition_header_at_offset_zero() {
assert_eq!(core::mem::offset_of!(ZeroDdsGuardCondition, header), 0);
assert_eq!(core::mem::offset_of!(ZeroDdsStatusCondition, header), 0);
assert_eq!(core::mem::offset_of!(ZeroDdsReadCondition, header), 0);
assert_eq!(core::mem::offset_of!(ZeroDdsQueryCondition, header), 0);
}
#[test]
fn guardcondition_lifecycle_and_trigger() {
let g = zerodds_guardcondition_create();
assert!(!g.is_null());
unsafe {
assert!(!zerodds_condition_get_trigger_value(g as *const _));
let _ = zerodds_guardcondition_set_trigger_value(g, true);
assert!(zerodds_condition_get_trigger_value(g as *const _));
zerodds_guardcondition_destroy(g);
}
}
#[test]
fn statuscondition_lifecycle_and_mask() {
unsafe {
let sc = zerodds_entity_get_statuscondition(ptr::null_mut());
assert!(!sc.is_null());
let _ = zerodds_statuscondition_set_enabled_statuses(sc, 0x1234);
assert_eq!(zerodds_statuscondition_get_enabled_statuses(sc), 0x1234);
zerodds_statuscondition_destroy(sc);
}
}
#[test]
fn waitset_attach_detach() {
let ws = zerodds_waitset_create();
let g = zerodds_guardcondition_create();
let mut buf: [*mut core::ffi::c_void; 4] = [ptr::null_mut(); 4];
let mut count: usize = 0;
unsafe {
let rc = zerodds_waitset_attach_condition(ws, g as *mut core::ffi::c_void);
assert_eq!(rc, ZeroDdsStatus::Ok as c_int);
let rc = zerodds_waitset_get_conditions(ws, buf.as_mut_ptr(), 4, &mut count);
assert_eq!(rc, ZeroDdsStatus::Ok as c_int);
assert_eq!(count, 1);
let rc = zerodds_waitset_detach_condition(ws, g as *mut core::ffi::c_void);
assert_eq!(rc, ZeroDdsStatus::Ok as c_int);
zerodds_waitset_destroy(ws);
zerodds_guardcondition_destroy(g);
}
}
#[test]
fn waitset_wait_returns_active_guard() {
let ws = zerodds_waitset_create();
let g = zerodds_guardcondition_create();
let mut buf: [*mut core::ffi::c_void; 4] = [ptr::null_mut(); 4];
let mut count: usize = 0;
unsafe {
zerodds_guardcondition_set_trigger_value(g, true);
zerodds_waitset_attach_condition(ws, g as *mut core::ffi::c_void);
let rc = zerodds_waitset_wait(ws, buf.as_mut_ptr(), 4, &mut count, 1, 0);
assert_eq!(rc, ZeroDdsStatus::Ok as c_int);
assert_eq!(count, 1);
assert_eq!(buf[0], g as *mut core::ffi::c_void);
zerodds_waitset_destroy(ws);
zerodds_guardcondition_destroy(g);
}
}
#[test]
fn waitset_wait_timeout_no_active() {
let ws = zerodds_waitset_create();
let g = zerodds_guardcondition_create();
let mut buf: [*mut core::ffi::c_void; 4] = [ptr::null_mut(); 4];
let mut count: usize = 0;
unsafe {
zerodds_waitset_attach_condition(ws, g as *mut core::ffi::c_void);
let rc = zerodds_waitset_wait(ws, buf.as_mut_ptr(), 4, &mut count, 0, 50_000_000);
assert_eq!(rc, ZeroDdsStatus::Timeout as c_int);
assert_eq!(count, 0);
zerodds_waitset_destroy(ws);
zerodds_guardcondition_destroy(g);
}
}
}