use alloc::boxed::Box;
use alloc::string::ToString;
use alloc::vec::Vec;
use core::ffi::c_int;
use core::ptr;
use core::slice;
use std::sync::{Mutex, mpsc};
use std::time::Duration;
use zerodds_dcps::qos::{DataReaderQos, ReliabilityKind};
use zerodds_dcps::runtime::{UserReaderConfig, UserSample};
use crate::ZeroDdsStatus;
use crate::entities::{
ZeroDdsDataReader, ZeroDdsDomainParticipant, ZeroDdsSubscriber, ZeroDdsTopic,
};
use crate::qos_ffi::{ZeroDdsDataReaderQos, dr_qos_from_c};
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_sub_get_participant(
sub: *mut ZeroDdsSubscriber,
) -> *mut ZeroDdsDomainParticipant {
if sub.is_null() {
return ptr::null_mut();
}
unsafe { (*sub).participant }
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_sub_begin_access(sub: *mut ZeroDdsSubscriber) -> c_int {
if sub.is_null() {
return ZeroDdsStatus::BadHandle as c_int;
}
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_sub_end_access(sub: *mut ZeroDdsSubscriber) -> c_int {
if sub.is_null() {
return ZeroDdsStatus::BadHandle as c_int;
}
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_sub_get_datareaders(
sub: *mut ZeroDdsSubscriber,
out: *mut *mut ZeroDdsDataReader,
out_count: *mut usize,
cap: usize,
) -> c_int {
if sub.is_null() || out.is_null() || out_count.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let sb = unsafe { &*sub };
let drs = sb.datareaders.lock().map(|g| g.clone()).unwrap_or_default();
let n = drs.len().min(cap);
let dst = unsafe { slice::from_raw_parts_mut(out, n) };
dst.copy_from_slice(&drs[..n]);
unsafe { *out_count = n };
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_sub_notify_datareaders(sub: *mut ZeroDdsSubscriber) -> c_int {
if sub.is_null() {
return ZeroDdsStatus::BadHandle as c_int;
}
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_sub_create_datareader(
sub: *mut ZeroDdsSubscriber,
topic: *mut ZeroDdsTopic,
qos: *const ZeroDdsDataReaderQos,
) -> *mut ZeroDdsDataReader {
if sub.is_null() || topic.is_null() {
return ptr::null_mut();
}
let sb = unsafe { &*sub };
let tt = unsafe { &*topic };
let dp_handle = sb.participant;
if dp_handle.is_null() {
return ptr::null_mut();
}
let dp = unsafe { &*dp_handle };
let rt = match dp.rt.as_ref() {
Some(r) => r.clone(),
None => return ptr::null_mut(),
};
let qos: DataReaderQos = if qos.is_null() {
sb.default_dr_qos
.lock()
.map(|g| g.clone())
.unwrap_or_default()
} else {
unsafe { dr_qos_from_c(qos) }
};
let cfg = UserReaderConfig {
topic_name: tt.name.to_string(),
type_name: tt.type_name.to_string(),
reliable: matches!(qos.reliability.kind, ReliabilityKind::Reliable),
durability: qos.durability.kind,
deadline: qos.deadline.clone(),
liveliness: qos.liveliness.clone(),
ownership: qos.ownership.kind,
partition: Vec::new(),
user_data: Vec::new(),
topic_data: Vec::new(),
group_data: Vec::new(),
type_identifier: zerodds_types::TypeIdentifier::default(),
type_consistency: zerodds_types::qos::TypeConsistencyEnforcement::default(),
data_representation_offer: None,
};
let (eid, rx) = match rt.register_user_reader(cfg) {
Ok(pair) => pair,
Err(_) => return ptr::null_mut(),
};
let dr = Box::new(ZeroDdsDataReader {
subscriber: sub,
topic,
rt,
eid,
qos: Mutex::new(qos),
rx: Mutex::new(rx),
read_cache: Mutex::new(Vec::new()),
cft_filter: None,
});
let dr_ptr = Box::into_raw(dr);
if let Ok(mut list) = sb.datareaders.lock() {
list.push(dr_ptr);
}
dr_ptr
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_sub_create_datareader_with_cft(
sub: *mut ZeroDdsSubscriber,
cft: *mut crate::entities::ZeroDdsContentFilteredTopic,
qos: *const ZeroDdsDataReaderQos,
) -> *mut ZeroDdsDataReader {
if sub.is_null() || cft.is_null() {
return ptr::null_mut();
}
let cft_ref = unsafe { &*cft };
let related_topic = cft_ref.related_topic;
if related_topic.is_null() {
return ptr::null_mut();
}
let expr = match zerodds_sql_filter::parse(&cft_ref.filter_expression) {
Ok(e) => e,
Err(_) => return ptr::null_mut(),
};
let params: Vec<zerodds_sql_filter::Value> = cft_ref
.parameters
.lock()
.map(|g| {
g.iter()
.map(|p| zerodds_sql_filter::Value::String(p.clone()))
.collect()
})
.unwrap_or_default();
let dr = unsafe { zerodds_sub_create_datareader(sub, related_topic, qos) };
if dr.is_null() {
return ptr::null_mut();
}
unsafe {
let mut boxed = Box::from_raw(dr);
boxed.cft_filter = Some(crate::entities::CftFilter { expr, params });
Box::into_raw(boxed)
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_sub_delete_datareader(
sub: *mut ZeroDdsSubscriber,
dr: *mut ZeroDdsDataReader,
) -> c_int {
if sub.is_null() || dr.is_null() {
return ZeroDdsStatus::BadHandle as c_int;
}
{
let drr = unsafe { &*dr };
if drr.subscriber != sub {
return ZeroDdsStatus::PreconditionNotMet as c_int;
}
}
let sb = unsafe { &*sub };
if let Ok(mut list) = sb.datareaders.lock() {
let n = list.len();
list.retain(|x| *x != dr);
if list.len() == n {
return ZeroDdsStatus::BadHandle as c_int;
}
}
let _ = unsafe { Box::from_raw(dr) };
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_sub_lookup_datareader(
sub: *mut ZeroDdsSubscriber,
topic_name: *const core::ffi::c_char,
) -> *mut ZeroDdsDataReader {
if sub.is_null() || topic_name.is_null() {
return ptr::null_mut();
}
let cs = unsafe { std::ffi::CStr::from_ptr(topic_name) };
let name = match cs.to_str() {
Ok(s) => s,
Err(_) => return ptr::null_mut(),
};
let sb = unsafe { &*sub };
if let Ok(list) = sb.datareaders.lock() {
for &dr in list.iter() {
if dr.is_null() {
continue;
}
let drr = unsafe { &*dr };
if !drr.topic.is_null() {
let t = unsafe { &*drr.topic };
if t.name == name {
return dr;
}
}
}
}
ptr::null_mut()
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_sub_delete_contained_entities(
sub: *mut ZeroDdsSubscriber,
) -> c_int {
if sub.is_null() {
return ZeroDdsStatus::BadHandle as c_int;
}
let sb = unsafe { &*sub };
let drs: Vec<*mut ZeroDdsDataReader> = sb
.datareaders
.lock()
.map(|mut g| core::mem::take(&mut *g))
.unwrap_or_default();
for dr in drs {
if !dr.is_null() {
let _ = unsafe { Box::from_raw(dr) };
}
}
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_get_topicdescription(
dr: *mut ZeroDdsDataReader,
) -> *mut ZeroDdsTopic {
if dr.is_null() {
return ptr::null_mut();
}
unsafe { (*dr).topic }
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_get_subscriber(
dr: *mut ZeroDdsDataReader,
) -> *mut ZeroDdsSubscriber {
if dr.is_null() {
return ptr::null_mut();
}
unsafe { (*dr).subscriber }
}
#[repr(C)]
#[derive(Debug, Default, Clone, Copy)]
pub struct ZeroDdsSampleInfo {
pub sample_state: u32,
pub view_state: u32,
pub instance_state: u32,
pub disposed_generation_count: i32,
pub no_writers_generation_count: i32,
pub sample_rank: i32,
pub generation_rank: i32,
pub absolute_generation_rank: i32,
pub source_timestamp_sec: i32,
pub source_timestamp_nanosec: u32,
pub instance_handle: u64,
pub publication_handle: u64,
pub valid_data: bool,
}
#[repr(C)]
pub struct ZeroDdsSampleArray {
pub buffers: *mut *mut u8,
pub lengths: *mut usize,
pub infos: *mut ZeroDdsSampleInfo,
pub count: usize,
pub loan_token: *mut core::ffi::c_void,
}
struct LoanMemory {
payloads: Vec<Vec<u8>>,
buffers: Vec<*mut u8>,
lengths: Vec<usize>,
infos: Vec<ZeroDdsSampleInfo>,
}
impl LoanMemory {
fn new(samples: Vec<UserSample>) -> Box<Self> {
let with_state: Vec<(UserSample, crate::entities::ReadSampleState)> = samples
.into_iter()
.map(|s| (s, crate::entities::ReadSampleState::NotRead))
.collect();
Self::from_state(with_state)
}
fn from_state(samples: Vec<(UserSample, crate::entities::ReadSampleState)>) -> Box<Self> {
let mut payloads: Vec<Vec<u8>> = Vec::with_capacity(samples.len());
let mut infos: Vec<ZeroDdsSampleInfo> = Vec::with_capacity(samples.len());
for (s, state) in samples {
let sample_state_bit = match state {
crate::entities::ReadSampleState::Read => 1u32, crate::entities::ReadSampleState::NotRead => 2u32, };
match s {
UserSample::Alive {
payload,
writer_guid,
..
} => {
let pub_handle = u64_from_guid(writer_guid);
infos.push(ZeroDdsSampleInfo {
sample_state: sample_state_bit,
view_state: 1, instance_state: 1, disposed_generation_count: 0,
no_writers_generation_count: 0,
sample_rank: 0,
generation_rank: 0,
absolute_generation_rank: 0,
source_timestamp_sec: 0,
source_timestamp_nanosec: 0,
instance_handle: 0,
publication_handle: pub_handle,
valid_data: true,
});
payloads.push(payload);
}
UserSample::Lifecycle { kind, .. } => {
use zerodds_rtps::history_cache::ChangeKind;
let inst_state = match kind {
ChangeKind::NotAliveDisposed | ChangeKind::NotAliveDisposedUnregistered => {
2
}
ChangeKind::NotAliveUnregistered => 4,
_ => 1,
};
infos.push(ZeroDdsSampleInfo {
sample_state: sample_state_bit,
view_state: 1,
instance_state: inst_state,
disposed_generation_count: 0,
no_writers_generation_count: 0,
sample_rank: 0,
generation_rank: 0,
absolute_generation_rank: 0,
source_timestamp_sec: 0,
source_timestamp_nanosec: 0,
instance_handle: 0,
publication_handle: 0,
valid_data: false,
});
payloads.push(Vec::new());
}
}
}
let mut buffers: Vec<*mut u8> = Vec::with_capacity(payloads.len());
let mut lengths: Vec<usize> = Vec::with_capacity(payloads.len());
for v in payloads.iter_mut() {
buffers.push(v.as_mut_ptr());
lengths.push(v.len());
}
Box::new(LoanMemory {
payloads,
buffers,
lengths,
infos,
})
}
}
fn u64_from_guid(g: [u8; 16]) -> u64 {
let mut h: u64 = 0xcbf29ce484222325;
for &b in g.iter() {
h ^= b as u64;
h = h.wrapping_mul(0x100000001b3);
}
h
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_take(
dr: *mut ZeroDdsDataReader,
out: *mut ZeroDdsSampleArray,
max_samples: usize,
_sample_states: u32,
_view_states: u32,
_instance_states: u32,
) -> c_int {
if dr.is_null() || out.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let drr = unsafe { &*dr };
let limit = if max_samples == 0 {
usize::MAX
} else {
max_samples
};
let mut collected: Vec<(UserSample, crate::entities::ReadSampleState)> = Vec::new();
if let Ok(mut cache) = drr.read_cache.lock() {
while collected.len() < limit && !cache.is_empty() {
collected.push(cache.remove(0));
}
}
if let Ok(rx) = drr.rx.lock() {
while collected.len() < limit {
match rx.try_recv() {
Ok(s) => collected.push((s, crate::entities::ReadSampleState::NotRead)),
Err(_) => break,
}
}
}
if let Some(filter) = &drr.cft_filter {
collected.retain(|(s, _)| match s {
UserSample::Alive { payload, .. } => filter.evaluate(payload),
UserSample::Lifecycle { .. } => true, });
}
if collected.is_empty() {
unsafe {
(*out).buffers = ptr::null_mut();
(*out).lengths = ptr::null_mut();
(*out).infos = ptr::null_mut();
(*out).count = 0;
(*out).loan_token = ptr::null_mut();
}
return ZeroDdsStatus::NoData as c_int;
}
finalize_loan(out, collected)
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_read(
dr: *mut ZeroDdsDataReader,
out: *mut ZeroDdsSampleArray,
max_samples: usize,
_sample_states: u32,
_view_states: u32,
_instance_states: u32,
) -> c_int {
if dr.is_null() || out.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let drr = unsafe { &*dr };
let limit = if max_samples == 0 {
usize::MAX
} else {
max_samples
};
if let (Ok(rx), Ok(mut cache)) = (drr.rx.lock(), drr.read_cache.lock()) {
while let Ok(s) = rx.try_recv() {
let pass = if let Some(filter) = &drr.cft_filter {
match &s {
UserSample::Alive { payload, .. } => filter.evaluate(payload),
UserSample::Lifecycle { .. } => true,
}
} else {
true
};
if pass {
cache.push((s, crate::entities::ReadSampleState::NotRead));
}
}
}
let collected: Vec<(UserSample, crate::entities::ReadSampleState)> =
if let Ok(mut cache) = drr.read_cache.lock() {
let n = cache.len().min(limit);
let out_collected: Vec<_> = cache
.iter()
.take(n)
.map(|(s, st)| (s.clone(), *st))
.collect();
for entry in cache.iter_mut().take(n) {
entry.1 = crate::entities::ReadSampleState::Read;
}
out_collected
} else {
Vec::new()
};
if collected.is_empty() {
unsafe {
(*out).buffers = ptr::null_mut();
(*out).lengths = ptr::null_mut();
(*out).infos = ptr::null_mut();
(*out).count = 0;
(*out).loan_token = ptr::null_mut();
}
return ZeroDdsStatus::NoData as c_int;
}
finalize_loan(out, collected)
}
fn finalize_loan(
out: *mut ZeroDdsSampleArray,
collected: Vec<(UserSample, crate::entities::ReadSampleState)>,
) -> c_int {
let mut loan = LoanMemory::from_state(collected);
let buffers_ptr = loan.buffers.as_mut_ptr();
let lengths_ptr = loan.lengths.as_mut_ptr();
let infos_ptr = loan.infos.as_mut_ptr();
let count = loan.payloads.len();
let token = Box::into_raw(loan) as *mut core::ffi::c_void;
unsafe {
(*out).buffers = buffers_ptr;
(*out).lengths = lengths_ptr;
(*out).infos = infos_ptr;
(*out).count = count;
(*out).loan_token = token;
}
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_take_next_sample(
dr: *mut ZeroDdsDataReader,
out_buf: *mut *mut u8,
out_len: *mut usize,
out_info: *mut ZeroDdsSampleInfo,
) -> c_int {
if dr.is_null() || out_buf.is_null() || out_len.is_null() || out_info.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let drr = unsafe { &*dr };
let s = match drr.rx.lock().ok().and_then(|rx| rx.try_recv().ok()) {
Some(s) => s,
None => return ZeroDdsStatus::NoData as c_int,
};
let loan = LoanMemory::new(alloc::vec![s]);
unsafe {
*out_buf = loan.buffers[0];
*out_len = loan.lengths[0];
*out_info = loan.infos[0];
}
let _ = Box::into_raw(loan);
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_read_next_sample(
dr: *mut ZeroDdsDataReader,
out_buf: *mut *mut u8,
out_len: *mut usize,
out_info: *mut ZeroDdsSampleInfo,
) -> c_int {
unsafe { zerodds_dr_take_next_sample(dr, out_buf, out_len, out_info) }
}
pub fn sample_array_filter_instance(arr: *mut ZeroDdsSampleArray, handle: u64) -> c_int {
if arr.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let count = unsafe { (*arr).count };
if count == 0 {
return ZeroDdsStatus::Ok as c_int;
}
let buffers = unsafe { (*arr).buffers };
let lengths = unsafe { (*arr).lengths };
let infos = unsafe { (*arr).infos };
if buffers.is_null() || lengths.is_null() || infos.is_null() {
return ZeroDdsStatus::Ok as c_int;
}
let mut write_idx: usize = 0;
for read_idx in 0..count {
let info = unsafe { *infos.add(read_idx) };
if info.instance_handle == handle {
if write_idx != read_idx {
unsafe {
*buffers.add(write_idx) = *buffers.add(read_idx);
*lengths.add(write_idx) = *lengths.add(read_idx);
*infos.add(write_idx) = info;
}
}
write_idx += 1;
}
}
unsafe { (*arr).count = write_idx };
ZeroDdsStatus::Ok as c_int
}
pub fn sample_array_filter_next_instance(arr: *mut ZeroDdsSampleArray, prev_handle: u64) -> c_int {
if arr.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let count = unsafe { (*arr).count };
if count == 0 {
return ZeroDdsStatus::Ok as c_int;
}
let infos = unsafe { (*arr).infos };
if infos.is_null() {
return ZeroDdsStatus::Ok as c_int;
}
let mut next_handle: Option<u64> = None;
for read_idx in 0..count {
let h = unsafe { (*infos.add(read_idx)).instance_handle };
if h > prev_handle && next_handle.is_none_or(|n| h < n) {
next_handle = Some(h);
}
}
match next_handle {
Some(h) => sample_array_filter_instance(arr, h),
None => {
unsafe { (*arr).count = 0 };
ZeroDdsStatus::Ok as c_int
}
}
}
pub fn sample_array_filter_states(
arr: *mut ZeroDdsSampleArray,
sample_states: u32,
view_states: u32,
instance_states: u32,
) -> c_int {
if arr.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let count = unsafe { (*arr).count };
if count == 0 {
return ZeroDdsStatus::Ok as c_int;
}
let buffers = unsafe { (*arr).buffers };
let lengths = unsafe { (*arr).lengths };
let infos = unsafe { (*arr).infos };
if buffers.is_null() || lengths.is_null() || infos.is_null() {
return ZeroDdsStatus::Ok as c_int;
}
let mut write_idx = 0usize;
for read_idx in 0..count {
let info = unsafe { *infos.add(read_idx) };
let s_ok = sample_states == 0 || (sample_states & info.sample_state) != 0;
let v_ok = view_states == 0 || (view_states & info.view_state) != 0;
let i_ok = instance_states == 0 || (instance_states & info.instance_state) != 0;
if s_ok && v_ok && i_ok {
if write_idx != read_idx {
unsafe {
*buffers.add(write_idx) = *buffers.add(read_idx);
*lengths.add(write_idx) = *lengths.add(read_idx);
*infos.add(write_idx) = info;
}
}
write_idx += 1;
}
}
unsafe { (*arr).count = write_idx };
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_return_loan(
_dr: *mut ZeroDdsDataReader,
arr: *mut ZeroDdsSampleArray,
) -> c_int {
if arr.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let token = unsafe { (*arr).loan_token };
if !token.is_null() {
let _ = unsafe { Box::from_raw(token as *mut LoanMemory) };
}
unsafe {
(*arr).buffers = ptr::null_mut();
(*arr).lengths = ptr::null_mut();
(*arr).infos = ptr::null_mut();
(*arr).count = 0;
(*arr).loan_token = ptr::null_mut();
}
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_wait_for_matched(
dr: *mut ZeroDdsDataReader,
min: i32,
timeout_ms: u64,
) -> c_int {
if dr.is_null() {
return ZeroDdsStatus::BadHandle as c_int;
}
let drr = unsafe { &*dr };
let deadline = std::time::Instant::now() + Duration::from_millis(timeout_ms);
loop {
let n = drr.rt.user_reader_matched_count(drr.eid) as i32;
if n >= min {
return ZeroDdsStatus::Ok as c_int;
}
if std::time::Instant::now() >= deadline {
return ZeroDdsStatus::Timeout as c_int;
}
std::thread::sleep(Duration::from_millis(20));
}
}
#[repr(C)]
#[derive(Debug, Default, Clone, Copy)]
pub struct ZeroDdsLivelinessChangedStatus {
pub alive_count: i32,
pub not_alive_count: i32,
pub alive_count_change: i32,
pub not_alive_count_change: i32,
pub last_publication_handle: u64,
}
#[repr(C)]
#[derive(Debug, Default, Clone, Copy)]
pub struct ZeroDdsSubscriptionMatchedStatus {
pub total_count: i32,
pub total_count_change: i32,
pub current_count: i32,
pub current_count_change: i32,
pub last_publication_handle: u64,
}
#[repr(C)]
#[derive(Debug, Default, Clone, Copy)]
pub struct ZeroDdsRequestedDeadlineMissedStatus {
pub total_count: i32,
pub total_count_change: i32,
pub last_instance_handle: u64,
}
#[repr(C)]
#[derive(Debug, Default, Clone, Copy)]
pub struct ZeroDdsRequestedIncompatibleQosStatus {
pub total_count: i32,
pub total_count_change: i32,
pub last_policy_id: u32,
}
#[repr(C)]
#[derive(Debug, Default, Clone, Copy)]
pub struct ZeroDdsSampleLostStatus {
pub total_count: i32,
pub total_count_change: i32,
}
#[repr(C)]
#[derive(Debug, Default, Clone, Copy)]
pub struct ZeroDdsSampleRejectedStatus {
pub total_count: i32,
pub total_count_change: i32,
pub last_reason: u32,
pub last_instance_handle: u64,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_get_liveliness_changed_status(
dr: *mut ZeroDdsDataReader,
out: *mut ZeroDdsLivelinessChangedStatus,
) -> c_int {
if dr.is_null() || out.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let drr = unsafe { &*dr };
let (alive, alive_count, not_alive_count) = drr.rt.user_reader_liveliness_status(drr.eid);
let _ = alive;
unsafe {
*out = ZeroDdsLivelinessChangedStatus {
alive_count: alive_count as i32,
not_alive_count: not_alive_count as i32,
alive_count_change: 0,
not_alive_count_change: 0,
last_publication_handle: 0,
};
}
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_get_subscription_matched_status(
dr: *mut ZeroDdsDataReader,
out: *mut ZeroDdsSubscriptionMatchedStatus,
) -> c_int {
if dr.is_null() || out.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let drr = unsafe { &*dr };
let n = drr.rt.user_reader_matched_count(drr.eid) as i32;
unsafe {
*out = ZeroDdsSubscriptionMatchedStatus {
total_count: n,
total_count_change: 0,
current_count: n,
current_count_change: 0,
last_publication_handle: 0,
};
}
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_get_requested_deadline_missed_status(
dr: *mut ZeroDdsDataReader,
out: *mut ZeroDdsRequestedDeadlineMissedStatus,
) -> c_int {
if dr.is_null() || out.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let drr = unsafe { &*dr };
let n = drr.rt.user_reader_requested_deadline_missed(drr.eid);
unsafe {
*out = ZeroDdsRequestedDeadlineMissedStatus {
total_count: n as i32,
total_count_change: 0,
last_instance_handle: 0,
};
}
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_get_requested_incompatible_qos_status(
dr: *mut ZeroDdsDataReader,
out: *mut ZeroDdsRequestedIncompatibleQosStatus,
) -> c_int {
if dr.is_null() || out.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let drr = unsafe { &*dr };
let st = drr.rt.user_reader_requested_incompatible_qos(drr.eid);
unsafe {
*out = ZeroDdsRequestedIncompatibleQosStatus {
total_count: st.total_count,
total_count_change: st.total_count_change,
last_policy_id: st.last_policy_id,
};
}
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_get_sample_lost_status(
dr: *mut ZeroDdsDataReader,
out: *mut ZeroDdsSampleLostStatus,
) -> c_int {
if dr.is_null() || out.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let drr = unsafe { &*dr };
let n = drr.rt.user_reader_sample_lost(drr.eid);
unsafe {
*out = ZeroDdsSampleLostStatus {
total_count: n as i32,
total_count_change: 0,
};
}
ZeroDdsStatus::Ok as c_int
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn zerodds_dr_get_sample_rejected_status(
dr: *mut ZeroDdsDataReader,
out: *mut ZeroDdsSampleRejectedStatus,
) -> c_int {
if dr.is_null() || out.is_null() {
return ZeroDdsStatus::BadParameter as c_int;
}
let drr = unsafe { &*dr };
let st = drr.rt.user_reader_sample_rejected(drr.eid);
use zerodds_dcps::status::SampleRejectedStatusKind;
let reason: u32 = match st.last_reason {
SampleRejectedStatusKind::NotRejected => 0,
SampleRejectedStatusKind::RejectedByInstancesLimit => 1,
SampleRejectedStatusKind::RejectedBySamplesLimit => 2,
SampleRejectedStatusKind::RejectedBySamplesPerInstanceLimit => 3,
};
unsafe {
*out = ZeroDdsSampleRejectedStatus {
total_count: st.total_count,
total_count_change: st.total_count_change,
last_reason: reason,
last_instance_handle: 0,
};
}
ZeroDdsStatus::Ok as c_int
}
#[allow(dead_code)]
fn _suppress(_: DataReaderQos, _: mpsc::Receiver<UserSample>) {}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use crate::factory_ffi::{
zerodds_dpf_create_participant, zerodds_dpf_delete_participant, zerodds_dpf_get_instance,
};
use crate::participant_ffi::{
zerodds_dp_create_subscriber, zerodds_dp_create_topic, zerodds_dp_delete_contained_entities,
};
fn mk(
domain: u32,
) -> (
*mut ZeroDdsDomainParticipant,
*mut ZeroDdsSubscriber,
*mut ZeroDdsTopic,
) {
let f = zerodds_dpf_get_instance();
let p = unsafe { zerodds_dpf_create_participant(f, domain, ptr::null()) };
let sub = unsafe { zerodds_dp_create_subscriber(p, ptr::null()) };
let n = c"SubT";
let tn = c"SubTy";
let t = unsafe { zerodds_dp_create_topic(p, n.as_ptr(), tn.as_ptr(), ptr::null()) };
(p, sub, t)
}
fn cleanup(p: *mut ZeroDdsDomainParticipant) {
let f = zerodds_dpf_get_instance();
unsafe {
zerodds_dp_delete_contained_entities(p);
zerodds_dpf_delete_participant(f, p);
}
}
#[test]
fn create_delete_datareader() {
let (p, sub, t) = mk(51);
let dr = unsafe { zerodds_sub_create_datareader(sub, t, ptr::null()) };
assert!(!dr.is_null());
let rc = unsafe { zerodds_sub_delete_datareader(sub, dr) };
assert_eq!(rc, ZeroDdsStatus::Ok as c_int);
cleanup(p);
}
#[test]
fn lookup_datareader_finds_existing() {
let (p, sub, t) = mk(52);
let dr = unsafe { zerodds_sub_create_datareader(sub, t, ptr::null()) };
let n = c"SubT";
let f = unsafe { zerodds_sub_lookup_datareader(sub, n.as_ptr()) };
assert_eq!(f, dr);
cleanup(p);
}
#[test]
fn take_on_empty_returns_no_data() {
let (p, sub, t) = mk(53);
let dr = unsafe { zerodds_sub_create_datareader(sub, t, ptr::null()) };
let mut arr = ZeroDdsSampleArray {
buffers: ptr::null_mut(),
lengths: ptr::null_mut(),
infos: ptr::null_mut(),
count: 0,
loan_token: ptr::null_mut(),
};
let rc = unsafe { zerodds_dr_take(dr, &mut arr, 10, 0, 0, 0) };
assert_eq!(rc, ZeroDdsStatus::NoData as c_int);
assert_eq!(arr.count, 0);
cleanup(p);
}
#[test]
fn return_loan_clears_array() {
let (p, sub, t) = mk(54);
let dr = unsafe { zerodds_sub_create_datareader(sub, t, ptr::null()) };
let mut arr = ZeroDdsSampleArray {
buffers: ptr::null_mut(),
lengths: ptr::null_mut(),
infos: ptr::null_mut(),
count: 7,
loan_token: ptr::null_mut(),
};
let rc = unsafe { zerodds_dr_return_loan(dr, &mut arr) };
assert_eq!(rc, ZeroDdsStatus::Ok as c_int);
assert_eq!(arr.count, 0);
cleanup(p);
}
#[test]
fn statuses_default_ok() {
let (p, sub, t) = mk(55);
let dr = unsafe { zerodds_sub_create_datareader(sub, t, ptr::null()) };
let mut a = ZeroDdsLivelinessChangedStatus::default();
let mut b = ZeroDdsSubscriptionMatchedStatus::default();
let mut c = ZeroDdsRequestedDeadlineMissedStatus::default();
let mut d = ZeroDdsRequestedIncompatibleQosStatus::default();
let mut e = ZeroDdsSampleLostStatus::default();
let mut g = ZeroDdsSampleRejectedStatus::default();
assert_eq!(
unsafe { zerodds_dr_get_liveliness_changed_status(dr, &mut a) },
ZeroDdsStatus::Ok as c_int
);
assert_eq!(
unsafe { zerodds_dr_get_subscription_matched_status(dr, &mut b) },
ZeroDdsStatus::Ok as c_int
);
assert_eq!(
unsafe { zerodds_dr_get_requested_deadline_missed_status(dr, &mut c) },
ZeroDdsStatus::Ok as c_int
);
assert_eq!(
unsafe { zerodds_dr_get_requested_incompatible_qos_status(dr, &mut d) },
ZeroDdsStatus::Ok as c_int
);
assert_eq!(
unsafe { zerodds_dr_get_sample_lost_status(dr, &mut e) },
ZeroDdsStatus::Ok as c_int
);
assert_eq!(
unsafe { zerodds_dr_get_sample_rejected_status(dr, &mut g) },
ZeroDdsStatus::Ok as c_int
);
cleanup(p);
}
#[test]
fn cft_filter_active_passes_untyped_samples() {
use crate::participant_ffi::{
zerodds_dp_create_contentfilteredtopic, zerodds_dp_delete_contentfilteredtopic,
};
let (p, sub, t) = mk(58);
let cft_name = c"FilteredCFT";
let expr = c"name = 'foo'";
let cft = unsafe {
zerodds_dp_create_contentfilteredtopic(
p,
cft_name.as_ptr(),
t,
expr.as_ptr(),
ptr::null(),
0,
)
};
assert!(!cft.is_null());
let dr = unsafe { zerodds_sub_create_datareader_with_cft(sub, cft, ptr::null()) };
assert!(!dr.is_null(), "CFT-bound reader must be created");
let mut arr = ZeroDdsSampleArray {
buffers: ptr::null_mut(),
lengths: ptr::null_mut(),
infos: ptr::null_mut(),
count: 0,
loan_token: ptr::null_mut(),
};
let rc = unsafe { zerodds_dr_take(dr, &mut arr, 10, 0, 0, 0) };
assert_eq!(rc, ZeroDdsStatus::NoData as c_int);
unsafe { zerodds_dp_delete_contentfilteredtopic(p, cft) };
cleanup(p);
}
#[test]
fn cft_with_invalid_expression_returns_null() {
use crate::participant_ffi::zerodds_dp_create_contentfilteredtopic;
let (p, sub, t) = mk(59);
let cft_name = c"BadCFT";
let bad_expr = c"$$invalid syntax";
let cft = unsafe {
zerodds_dp_create_contentfilteredtopic(
p,
cft_name.as_ptr(),
t,
bad_expr.as_ptr(),
ptr::null(),
0,
)
};
assert!(!cft.is_null());
let dr = unsafe { zerodds_sub_create_datareader_with_cft(sub, cft, ptr::null()) };
assert!(dr.is_null(), "invalid filter syntax must reject");
cleanup(p);
}
#[test]
fn read_on_empty_returns_no_data() {
let (p, sub, t) = mk(57);
let dr = unsafe { zerodds_sub_create_datareader(sub, t, ptr::null()) };
let mut arr = ZeroDdsSampleArray {
buffers: ptr::null_mut(),
lengths: ptr::null_mut(),
infos: ptr::null_mut(),
count: 0,
loan_token: ptr::null_mut(),
};
let rc = unsafe { zerodds_dr_read(dr, &mut arr, 10, 0, 0, 0) };
assert_eq!(rc, ZeroDdsStatus::NoData as c_int);
cleanup(p);
}
#[test]
fn dr_get_topicdescription_subscriber_roundtrip() {
let (p, sub, t) = mk(56);
let dr = unsafe { zerodds_sub_create_datareader(sub, t, ptr::null()) };
assert_eq!(unsafe { zerodds_dr_get_topicdescription(dr) }, t);
assert_eq!(unsafe { zerodds_dr_get_subscriber(dr) }, sub);
cleanup(p);
}
}