use alloc::string::String;
use alloc::sync::Arc;
use alloc::vec::Vec;
use std::sync::{Mutex, mpsc};
use zerodds_dcps::factory::DomainParticipantFactoryQos;
use zerodds_dcps::instance_tracker::InstanceTracker;
use zerodds_dcps::participant::DomainParticipant;
use zerodds_dcps::qos::{
DataReaderQos, DataWriterQos, DomainParticipantQos, OwnershipKind, PublisherQos, SubscriberQos,
TopicQos,
};
use zerodds_dcps::runtime::{DcpsRuntime, UserSample};
use zerodds_rtps::wire_types::EntityId;
pub struct ZeroDdsDomainParticipantFactory {
pub default_participant_qos: Mutex<DomainParticipantQos>,
pub factory_qos: Mutex<DomainParticipantFactoryQos>,
pub participants: Mutex<Vec<*mut ZeroDdsDomainParticipant>>,
}
unsafe impl Send for ZeroDdsDomainParticipantFactory {}
unsafe impl Sync for ZeroDdsDomainParticipantFactory {}
impl ZeroDdsDomainParticipantFactory {
pub fn instance() -> &'static Self {
use std::sync::OnceLock;
static FACTORY: OnceLock<ZeroDdsDomainParticipantFactory> = OnceLock::new();
FACTORY.get_or_init(|| Self {
default_participant_qos: Mutex::new(DomainParticipantQos::default()),
factory_qos: Mutex::new(DomainParticipantFactoryQos::default()),
participants: Mutex::new(Vec::new()),
})
}
}
pub struct ZeroDdsDomainParticipant {
pub dp: DomainParticipant,
pub rt: Option<Arc<DcpsRuntime>>,
pub domain_id: u32,
pub default_topic_qos: Mutex<TopicQos>,
pub default_publisher_qos: Mutex<PublisherQos>,
pub default_subscriber_qos: Mutex<SubscriberQos>,
pub topics: Mutex<Vec<*mut ZeroDdsTopic>>,
pub publishers: Mutex<Vec<*mut ZeroDdsPublisher>>,
pub subscribers: Mutex<Vec<*mut ZeroDdsSubscriber>>,
pub default_pub_partition_out: Mutex<crate::qos_ffi::PartitionOutCache>,
pub default_sub_partition_out: Mutex<crate::qos_ffi::PartitionOutCache>,
}
unsafe impl Send for ZeroDdsDomainParticipant {}
unsafe impl Sync for ZeroDdsDomainParticipant {}
pub struct ZeroDdsTopic {
pub participant: *mut ZeroDdsDomainParticipant,
pub name: String,
pub type_name: String,
pub qos: Mutex<TopicQos>,
}
unsafe impl Send for ZeroDdsTopic {}
unsafe impl Sync for ZeroDdsTopic {}
pub struct ZeroDdsContentFilteredTopic {
pub participant: *mut ZeroDdsDomainParticipant,
pub related_topic: *mut ZeroDdsTopic,
pub name: String,
pub filter_expression: String,
pub parameters: Mutex<Vec<String>>,
pub schema: Mutex<Vec<CftField>>,
pub extensibility: Mutex<CftExtensibility>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CftExtensibility {
#[default]
Final,
Appendable,
}
unsafe impl Send for ZeroDdsContentFilteredTopic {}
unsafe impl Sync for ZeroDdsContentFilteredTopic {}
pub struct ZeroDdsPublisher {
pub participant: *mut ZeroDdsDomainParticipant,
pub qos: Mutex<PublisherQos>,
pub default_dw_qos: Mutex<DataWriterQos>,
pub datawriters: Mutex<Vec<*mut ZeroDdsDataWriter>>,
pub suspended: Mutex<bool>,
pub partition_out: Mutex<crate::qos_ffi::PartitionOutCache>,
}
unsafe impl Send for ZeroDdsPublisher {}
unsafe impl Sync for ZeroDdsPublisher {}
pub struct ZeroDdsSubscriber {
pub participant: *mut ZeroDdsDomainParticipant,
pub qos: Mutex<SubscriberQos>,
pub default_dr_qos: Mutex<DataReaderQos>,
pub datareaders: Mutex<Vec<*mut ZeroDdsDataReader>>,
pub partition_out: Mutex<crate::qos_ffi::PartitionOutCache>,
}
unsafe impl Send for ZeroDdsSubscriber {}
unsafe impl Sync for ZeroDdsSubscriber {}
pub struct ZeroDdsDataWriter {
pub publisher: *mut ZeroDdsPublisher,
pub topic: *mut ZeroDdsTopic,
pub rt: Arc<DcpsRuntime>,
pub eid: EntityId,
pub qos: Mutex<DataWriterQos>,
pub partition_out: Mutex<crate::qos_ffi::PartitionOutCache>,
}
unsafe impl Send for ZeroDdsDataWriter {}
unsafe impl Sync for ZeroDdsDataWriter {}
pub struct ZeroDdsDataReader {
pub subscriber: *mut ZeroDdsSubscriber,
pub topic: *mut ZeroDdsTopic,
pub rt: Arc<DcpsRuntime>,
pub eid: EntityId,
pub qos: Mutex<DataReaderQos>,
pub rx: Mutex<mpsc::Receiver<UserSample>>,
pub read_cache: Mutex<Vec<(UserSample, ReadSampleState)>>,
pub cft_filter: Option<CftFilter>,
pub ownership: OwnershipKind,
pub instances: InstanceTracker,
pub partition_out: Mutex<crate::qos_ffi::PartitionOutCache>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CftFieldKind {
Bool,
Int32,
Int64,
Float32,
Float64,
StringField,
}
#[derive(Debug, Clone)]
pub struct CftField {
pub name: String,
pub kind: CftFieldKind,
}
pub struct CftFilter {
pub expr: zerodds_sql_filter::Expr,
pub params: Vec<zerodds_sql_filter::Value>,
pub schema: Vec<CftField>,
pub extensibility: CftExtensibility,
}
struct CdrRow<'a> {
payload: &'a [u8],
schema: &'a [CftField],
extensibility: CftExtensibility,
}
impl zerodds_sql_filter::RowAccess for CdrRow<'_> {
fn get(&self, path: &str) -> Option<zerodds_sql_filter::Value> {
use zerodds_cdr::{BufferReader, Endianness};
use zerodds_sql_filter::Value;
let mut r = BufferReader::new(self.payload, Endianness::Little).xcdr2();
if matches!(self.extensibility, CftExtensibility::Appendable) {
r.read_u32().ok()?;
}
for field in self.schema {
let want = field.name == path;
match field.kind {
CftFieldKind::Bool => {
let v = r.read_u8().ok()?;
if want {
return Some(Value::Bool(v != 0));
}
}
CftFieldKind::Int32 => {
let v = r.read_u32().ok()? as i32;
if want {
return Some(Value::Int(v as i64));
}
}
CftFieldKind::Int64 => {
let v = r.read_u64().ok()? as i64;
if want {
return Some(Value::Int(v));
}
}
CftFieldKind::Float32 => {
let v = f32::from_bits(r.read_u32().ok()?);
if want {
return Some(Value::Float(v as f64));
}
}
CftFieldKind::Float64 => {
let v = f64::from_bits(r.read_u64().ok()?);
if want {
return Some(Value::Float(v));
}
}
CftFieldKind::StringField => {
let v = r.read_string().ok()?;
if want {
return Some(Value::String(v));
}
}
}
}
None
}
}
impl CftFilter {
pub fn evaluate(&self, payload: &[u8]) -> bool {
if self.schema.is_empty() {
struct EmptyRow;
impl zerodds_sql_filter::RowAccess for EmptyRow {
fn get(&self, _path: &str) -> Option<zerodds_sql_filter::Value> {
None
}
}
return self.expr.evaluate(&EmptyRow, &self.params).unwrap_or(true);
}
let row = CdrRow {
payload,
schema: &self.schema,
extensibility: self.extensibility,
};
self.expr.evaluate(&row, &self.params).unwrap_or(false)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReadSampleState {
NotRead,
Read,
}
unsafe impl Send for ZeroDdsDataReader {}
unsafe impl Sync for ZeroDdsDataReader {}
pub unsafe fn handle_ref<T>(p: *mut T) -> Option<&'static T> {
if p.is_null() {
None
} else {
Some(unsafe { &*p })
}
}