use alloc::boxed::Box;
use alloc::vec::Vec;
use core::ffi::c_int;
use core::ptr;
use core::slice;
use std::sync::{
Mutex,
atomic::{AtomicBool, AtomicU32, Ordering},
};
use std::time::{Duration, Instant};
use crate::ZeroDdsStatus;
use crate::entities::{ZeroDdsDataReader, ZeroDdsDataWriter};
#[repr(u32)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConditionKind {
Guard = 1,
Status = 2,
Read = 3,
Query = 4,
}
#[repr(C)]
struct ConditionHeader {
kind: ConditionKind,
}
pub struct ZeroDdsGuardCondition {
#[allow(dead_code)]
header: ConditionHeader,
trigger: AtomicBool,
}
pub struct ZeroDdsStatusCondition {
#[allow(dead_code)]
header: ConditionHeader,
#[allow(dead_code)]
entity: *mut core::ffi::c_void,
enabled_statuses: AtomicU32,
}
unsafe impl Send for ZeroDdsStatusCondition {}
unsafe impl Sync for ZeroDdsStatusCondition {}
pub struct ZeroDdsReadCondition {
#[allow(dead_code)]
header: ConditionHeader,
reader: *mut ZeroDdsDataReader,
sample_states: u32,
view_states: u32,
instance_states: u32,
}
unsafe impl Send for ZeroDdsReadCondition {}
unsafe impl Sync for ZeroDdsReadCondition {}
pub struct ZeroDdsQueryCondition {
#[allow(dead_code)]
header: ConditionHeader,
reader: *mut ZeroDdsDataReader,
sample_states: u32,
view_states: u32,
instance_states: u32,
expression: alloc::string::String,
parameters: Vec<alloc::string::String>,
}
unsafe impl Send for ZeroDdsQueryCondition {}
unsafe impl Sync for ZeroDdsQueryCondition {}
fn drain_channel_into_cache(drr: &ZeroDdsDataReader) {
use zerodds_dcps::runtime::UserSample;
if let (Ok(rx), Ok(mut cache)) = (drr.rx.lock(), drr.read_cache.lock()) {
while let Ok(s) = rx.try_recv() {
let pass = if let Some(filter) = &drr.cft_filter {
match &s {
UserSample::Alive { payload, .. } => filter.evaluate(payload),
UserSample::Lifecycle { .. } => true,
}
} else {
true
};
if pass {
cache.push((s, crate::entities::ReadSampleState::NotRead));
}
}
}
}
fn cache_has_matching(
drr: &ZeroDdsDataReader,
sample_states: u32,
view_states: u32,
instance_states: u32,
) -> bool {
let cache = match drr.read_cache.lock() {
Ok(g) => g,
Err(_) => return false,
};
cache.iter().any(|(_sample, st)| {
let s_bit = match st {
crate::entities::ReadSampleState::Read => 1u32,
crate::entities::ReadSampleState::NotRead => 2u32,
};
let s_ok = sample_states == 0 || (sample_states & s_bit) != 0;
let v_ok = view_states == 0 || (view_states & 1u32) != 0; let i_ok = instance_states == 0 || (instance_states & 1u32) != 0; s_ok && v_ok && i_ok
})
}
fn cache_has_matching_query(
drr: &ZeroDdsDataReader,
sample_states: u32,
view_states: u32,
instance_states: u32,
expression: &str,
params: &[alloc::string::String],
) -> bool {
use zerodds_dcps::runtime::UserSample;
if !cache_has_matching(drr, sample_states, view_states, instance_states) {
return false;
}
let expr = match zerodds_sql_filter::parse(expression) {
Ok(e) => e,
Err(_) => return false,
};
let values: Vec<zerodds_sql_filter::Value> = params
.iter()
.map(|p| zerodds_sql_filter::Value::String(p.clone()))
.collect();
let cache = match drr.read_cache.lock() {
Ok(g) => g,
Err(_) => return false,
};
struct EmptyRow;
impl zerodds_sql_filter::RowAccess for EmptyRow {
fn get(&self, _path: &str) -> Option<zerodds_sql_filter::Value> {
None
}
}
cache.iter().any(|(s, _)| match s {
UserSample::Alive { .. } => expr.evaluate(&EmptyRow, &values).unwrap_or(true),
UserSample::Lifecycle { .. } => true,
})
}
pub unsafe fn condition_state_masks(c: *const core::ffi::c_void) -> Option<(u32, u32, u32)> {
let kind = unsafe { condition_kind(c) }?;
match kind {
ConditionKind::Read => {
let r = unsafe { &*(c as *const ZeroDdsReadCondition) };
Some((r.sample_states, r.view_states, r.instance_states))
}
ConditionKind::Query => {
let q = unsafe { &*(c as *const ZeroDdsQueryCondition) };
Some((q.sample_states, q.view_states, q.instance_states))
}
_ => None,
}
}
unsafe fn condition_kind(p: *const core::ffi::c_void) -> Option<ConditionKind> {
if p.is_null() {
return None;
}
let hdr = unsafe { &*(p as *const ConditionHeader) };
Some(hdr.kind)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_condition_get_trigger_value(c: *const core::ffi::c_void) -> bool {
let kind = match unsafe { condition_kind(c) } {
Some(k) => k,
None => return false,
};
match kind {
ConditionKind::Guard => {
let g = unsafe { &*(c as *const ZeroDdsGuardCondition) };
g.trigger.load(Ordering::SeqCst)
}
ConditionKind::Status => {
let s = unsafe { &*(c as *const ZeroDdsStatusCondition) };
s.enabled_statuses.load(Ordering::SeqCst) != 0
}
ConditionKind::Read => {
let r = unsafe { &*(c as *const ZeroDdsReadCondition) };
if r.reader.is_null() {
return false;
}
let drr = unsafe { &*r.reader };
drain_channel_into_cache(drr);
cache_has_matching(drr, r.sample_states, r.view_states, r.instance_states)
}
ConditionKind::Query => {
let q = unsafe { &*(c as *const ZeroDdsQueryCondition) };
if q.reader.is_null() {
return false;
}
let drr = unsafe { &*q.reader };
drain_channel_into_cache(drr);
cache_has_matching_query(
drr,
q.sample_states,
q.view_states,
q.instance_states,
&q.expression,
&q.parameters,
)
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn zerodds_guardcondition_create() -> *mut ZeroDdsGuardCondition {
Box::into_raw(Box::new(ZeroDdsGuardCondition {
header: ConditionHeader {
kind: ConditionKind::Guard,
},
trigger: AtomicBool::new(false),
}))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_guardcondition_destroy(g: *mut ZeroDdsGuardCondition) {
if !g.is_null() {
let _ = unsafe { Box::from_raw(g) };
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_guardcondition_set_trigger_value(
g: *mut ZeroDdsGuardCondition,
v: bool,
) -> c_int {
if g.is_null() {
return ZeroDdsStatus::BadHandle as c_int;
}
unsafe { (*g).trigger.store(v, Ordering::SeqCst) };
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_entity_get_statuscondition(
entity: *mut core::ffi::c_void,
) -> *mut ZeroDdsStatusCondition {
Box::into_raw(Box::new(ZeroDdsStatusCondition {
header: ConditionHeader {
kind: ConditionKind::Status,
},
entity,
enabled_statuses: AtomicU32::new(0xFFFF_FFFF),
}))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_statuscondition_set_enabled_statuses(
c: *mut ZeroDdsStatusCondition,
mask: u32,
) -> c_int {
if c.is_null() {
return ZeroDdsStatus::BadHandle as c_int;
}
unsafe { (*c).enabled_statuses.store(mask, Ordering::SeqCst) };
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_statuscondition_get_enabled_statuses(
c: *mut ZeroDdsStatusCondition,
) -> u32 {
if c.is_null() {
return 0;
}
unsafe { (*c).enabled_statuses.load(Ordering::SeqCst) }
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_statuscondition_destroy(c: *mut ZeroDdsStatusCondition) {
if !c.is_null() {
let _ = unsafe { Box::from_raw(c) };
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_create_readcondition(
dr: *mut ZeroDdsDataReader,
sample_states: u32,
view_states: u32,
instance_states: u32,
) -> *mut ZeroDdsReadCondition {
if dr.is_null() {
return ptr::null_mut();
}
Box::into_raw(Box::new(ZeroDdsReadCondition {
header: ConditionHeader {
kind: ConditionKind::Read,
},
reader: dr,
sample_states,
view_states,
instance_states,
}))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_create_querycondition(
dr: *mut ZeroDdsDataReader,
sample_states: u32,
view_states: u32,
instance_states: u32,
expr: *const core::ffi::c_char,
params: *const *const core::ffi::c_char,
param_count: usize,
) -> *mut ZeroDdsQueryCondition {
if dr.is_null() || expr.is_null() {
return ptr::null_mut();
}
let cs = unsafe { std::ffi::CStr::from_ptr(expr) };
let expression = match cs.to_str() {
Ok(s) => s.to_string(),
Err(_) => return ptr::null_mut(),
};
let mut parameters: Vec<alloc::string::String> = Vec::with_capacity(param_count);
if !params.is_null() && param_count > 0 {
let slc = unsafe { slice::from_raw_parts(params, param_count) };
for &p in slc {
if p.is_null() {
continue;
}
let cs = unsafe { std::ffi::CStr::from_ptr(p) };
if let Ok(s) = cs.to_str() {
parameters.push(s.to_string());
}
}
}
Box::into_raw(Box::new(ZeroDdsQueryCondition {
header: ConditionHeader {
kind: ConditionKind::Query,
},
reader: dr,
sample_states,
view_states,
instance_states,
expression,
parameters,
}))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_delete_readcondition(
_dr: *mut ZeroDdsDataReader,
c: *mut core::ffi::c_void,
) -> c_int {
let kind = match unsafe { condition_kind(c) } {
Some(k) => k,
None => return ZeroDdsStatus::BadHandle as c_int,
};
match kind {
ConditionKind::Read => {
let _ = unsafe { Box::from_raw(c as *mut ZeroDdsReadCondition) };
}
ConditionKind::Query => {
let _ = unsafe { Box::from_raw(c as *mut ZeroDdsQueryCondition) };
}
_ => return ZeroDdsStatus::PreconditionNotMet as c_int,
}
ZeroDdsStatus::Ok as c_int
}
pub struct ZeroDdsWaitSet {
conditions: Mutex<Vec<*mut core::ffi::c_void>>,
}
unsafe impl Send for ZeroDdsWaitSet {}
unsafe impl Sync for ZeroDdsWaitSet {}
#[unsafe(no_mangle)]
pub extern "C" fn zerodds_waitset_create() -> *mut ZeroDdsWaitSet {
Box::into_raw(Box::new(ZeroDdsWaitSet {
conditions: Mutex::new(Vec::new()),
}))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_waitset_destroy(w: *mut ZeroDdsWaitSet) {
if !w.is_null() {
let _ = unsafe { Box::from_raw(w) };
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_waitset_attach_condition(
w: *mut ZeroDdsWaitSet,
cond: *mut core::ffi::c_void,
) -> c_int {
if w.is_null() || cond.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let ws = unsafe { &*w };
if let Ok(mut g) = ws.conditions.lock() {
if !g.contains(&cond) {
g.push(cond);
}
}
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_waitset_detach_condition(
w: *mut ZeroDdsWaitSet,
cond: *mut core::ffi::c_void,
) -> c_int {
if w.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let ws = unsafe { &*w };
if let Ok(mut g) = ws.conditions.lock() {
let n = g.len();
g.retain(|c| *c != cond);
if g.len() == n {
return ZeroDdsStatus::PreconditionNotMet as c_int;
}
}
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_waitset_wait(
w: *mut ZeroDdsWaitSet,
out_active: *mut *mut core::ffi::c_void,
cap: usize,
out_count: *mut usize,
timeout_sec: i32,
timeout_nanosec: u32,
) -> c_int {
if w.is_null() || out_active.is_null() || out_count.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let ws = unsafe { &*w };
let timeout = if timeout_sec == i32::MAX && timeout_nanosec == u32::MAX {
Duration::from_secs(60 * 60 * 24 * 365 * 100) } else {
Duration::new(timeout_sec.max(0) as u64, timeout_nanosec)
};
let deadline = Instant::now() + timeout;
loop {
let conds: Vec<*mut core::ffi::c_void> =
ws.conditions.lock().map(|g| g.clone()).unwrap_or_default();
let mut active: Vec<*mut core::ffi::c_void> = Vec::new();
for &c in conds.iter() {
if unsafe { zerodds_condition_get_trigger_value(c as *const _) } {
active.push(c);
}
}
if !active.is_empty() {
let n = active.len().min(cap);
let dst = unsafe { slice::from_raw_parts_mut(out_active, n) };
dst.copy_from_slice(&active[..n]);
unsafe { *out_count = n };
return ZeroDdsStatus::Ok as c_int;
}
if Instant::now() >= deadline {
unsafe { *out_count = 0 };
return ZeroDdsStatus::Timeout as c_int;
}
std::thread::sleep(Duration::from_millis(5));
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_waitset_get_conditions(
w: *mut ZeroDdsWaitSet,
out: *mut *mut core::ffi::c_void,
cap: usize,
out_count: *mut usize,
) -> c_int {
if w.is_null() || out.is_null() || out_count.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let ws = unsafe { &*w };
let conds: Vec<*mut core::ffi::c_void> =
ws.conditions.lock().map(|g| g.clone()).unwrap_or_default();
let n = conds.len().min(cap);
let dst = unsafe { slice::from_raw_parts_mut(out, n) };
dst.copy_from_slice(&conds[..n]);
unsafe { *out_count = n };
ZeroDdsStatus::Ok as c_int
}
#[allow(dead_code)]
fn _suppress(_: *mut ZeroDdsDataWriter) {}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn guardcondition_lifecycle_and_trigger() {
let g = zerodds_guardcondition_create();
assert!(!g.is_null());
assert!(!unsafe { zerodds_condition_get_trigger_value(g as *const _) });
let _ = unsafe { zerodds_guardcondition_set_trigger_value(g, true) };
assert!(unsafe { zerodds_condition_get_trigger_value(g as *const _) });
unsafe { zerodds_guardcondition_destroy(g) };
}
#[test]
fn statuscondition_lifecycle_and_mask() {
let sc = unsafe { zerodds_entity_get_statuscondition(ptr::null_mut()) };
assert!(!sc.is_null());
let _ = unsafe { zerodds_statuscondition_set_enabled_statuses(sc, 0x1234) };
assert_eq!(
unsafe { zerodds_statuscondition_get_enabled_statuses(sc) },
0x1234
);
unsafe { zerodds_statuscondition_destroy(sc) };
}
#[test]
fn waitset_attach_detach() {
let ws = zerodds_waitset_create();
let g = zerodds_guardcondition_create();
let rc = unsafe { zerodds_waitset_attach_condition(ws, g as *mut core::ffi::c_void) };
assert_eq!(rc, ZeroDdsStatus::Ok as c_int);
let mut buf: [*mut core::ffi::c_void; 4] = [ptr::null_mut(); 4];
let mut count: usize = 0;
let rc = unsafe { zerodds_waitset_get_conditions(ws, buf.as_mut_ptr(), 4, &mut count) };
assert_eq!(rc, ZeroDdsStatus::Ok as c_int);
assert_eq!(count, 1);
let rc = unsafe { zerodds_waitset_detach_condition(ws, g as *mut core::ffi::c_void) };
assert_eq!(rc, ZeroDdsStatus::Ok as c_int);
unsafe { zerodds_waitset_destroy(ws) };
unsafe { zerodds_guardcondition_destroy(g) };
}
#[test]
fn waitset_wait_returns_active_guard() {
let ws = zerodds_waitset_create();
let g = zerodds_guardcondition_create();
unsafe { zerodds_guardcondition_set_trigger_value(g, true) };
unsafe { zerodds_waitset_attach_condition(ws, g as *mut core::ffi::c_void) };
let mut buf: [*mut core::ffi::c_void; 4] = [ptr::null_mut(); 4];
let mut count: usize = 0;
let rc = unsafe { zerodds_waitset_wait(ws, buf.as_mut_ptr(), 4, &mut count, 1, 0) };
assert_eq!(rc, ZeroDdsStatus::Ok as c_int);
assert_eq!(count, 1);
assert_eq!(buf[0], g as *mut core::ffi::c_void);
unsafe { zerodds_waitset_destroy(ws) };
unsafe { zerodds_guardcondition_destroy(g) };
}
#[test]
fn waitset_wait_timeout_no_active() {
let ws = zerodds_waitset_create();
let g = zerodds_guardcondition_create();
unsafe { zerodds_waitset_attach_condition(ws, g as *mut core::ffi::c_void) };
let mut buf: [*mut core::ffi::c_void; 4] = [ptr::null_mut(); 4];
let mut count: usize = 0;
let rc =
unsafe { zerodds_waitset_wait(ws, buf.as_mut_ptr(), 4, &mut count, 0, 50_000_000) };
assert_eq!(rc, ZeroDdsStatus::Timeout as c_int);
assert_eq!(count, 0);
unsafe { zerodds_waitset_destroy(ws) };
unsafe { zerodds_guardcondition_destroy(g) };
}
}