use std::collections::HashMap;
use std::ffi::CString;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use zenrc_dds::{dds_attach_t, dds_domainid_t, dds_entity_t, RawMessageBridge, DDS_ANY_STATE};
use crate::dds::service;
use super::error::{check_entity, check_ret, DdsError, Result};
use super::publisher::Publisher;
use super::qos::Qos;
use super::subscriber::Subscription;
use super::topic::Topic;
pub const DOMAIN_DEFAULT: u32 = u32::MAX;
#[derive(Clone)]
pub struct DomainParticipant {
entity: dds_entity_t,
}
impl DomainParticipant {
pub fn new(domain_id: u32) -> Result<Self> {
Self::new_with_qos(domain_id, None)
}
pub fn new_with_qos(domain_id: u32, qos: Option<&Qos>) -> Result<Self> {
let qos_ptr = qos.map(|q| q.raw as *const _).unwrap_or(std::ptr::null());
let entity = unsafe {
zenrc_dds::dds_create_participant(domain_id as dds_domainid_t, qos_ptr, std::ptr::null())
};
let entity = check_entity(entity)?;
Ok(Self {
entity
})
}
pub fn domain_id(&self) -> Result<u32> {
let mut id: dds_domainid_t = 0;
super::error::check_ret(unsafe {
zenrc_dds::dds_get_domainid(self.entity, &mut id)
})?;
Ok(id)
}
pub fn create_topic<T: RawMessageBridge>(&self, name: &str) -> Result<Topic<T>> {
self.create_topic_with_qos(name, &Qos::default())
}
pub fn create_topic_with_qos<T: RawMessageBridge>(
&self,
name: &str,
qos: &Qos,
) -> Result<Topic<T>> {
let c_name = CString::new(name)?;
let entity = unsafe {
zenrc_dds::dds_create_topic(
self.entity,
T::descriptor(),
c_name.as_ptr(),
qos.raw as *const _,
std::ptr::null(),
)
};
let entity = check_entity(entity)?;
Ok(Topic::from_entity(entity))
}
pub fn create_publisher<T: RawMessageBridge>(
&self,
topic_name: &str,
qos: Qos,
) -> Result<Publisher<T>> {
let topic = self.create_topic_with_qos::<T>(topic_name, &qos)?;
let writer = unsafe {
zenrc_dds::dds_create_writer(
self.entity,
topic.entity,
qos.raw as *const _,
std::ptr::null(),
)
};
let writer = check_entity(writer)?;
Ok(Publisher::new(writer, topic))
}
pub fn create_subscriber<T: RawMessageBridge>(
&self,
topic_name: &str,
qos: Qos,
) -> Result<Subscription<T>> {
let topic = self.create_topic_with_qos::<T>(topic_name, &qos)?;
let sub = unsafe {
zenrc_dds::dds_create_subscriber(
self.entity,
qos.raw as *const _,
std::ptr::null(),
)
};
let reader = unsafe {
zenrc_dds::dds_create_reader(
sub,
topic.entity,
qos.raw as *const _,
std::ptr::null(),
)
};
let reader = check_entity(reader)?;
Ok(Subscription::new(reader, topic))
}
pub fn entity(&self) -> dds_entity_t {
self.entity
}
pub fn lookup_participants(domain_id: u32) -> Result<Vec<dds_entity_t>> {
const MAX: usize = 64;
let mut buf = vec![0i32; MAX];
let ret = unsafe {
zenrc_dds::dds_lookup_participant(domain_id as dds_domainid_t, buf.as_mut_ptr(), MAX)
};
let n = super::error::check_entity(ret)? as usize;
buf.truncate(n);
Ok(buf)
}
}
const WAKE_TOKEN: dds_attach_t = 0;
const POLL_TIMEOUT_NS: i64 = 100_000_000;
const MAX_TRIGGERS: usize = 64;
struct ReaderEntry {
readcond: dds_entity_t,
notify: Arc<tokio::sync::Notify>,
}
pub struct DdsContext {
participant: DomainParticipant,
waitset: dds_entity_t,
guard: dds_entity_t,
running: Arc<AtomicBool>,
pending: Arc<Mutex<Vec<(dds_entity_t, Arc<tokio::sync::Notify>)>>>,
thread: Option<thread::JoinHandle<()>>,
}
unsafe impl Send for DdsContext {}
unsafe impl Sync for DdsContext {}
impl DdsContext {
pub fn new(domain_id: u32) -> Result<Self> {
Self::new_with_qos(domain_id, None)
}
pub fn new_with_qos(domain_id: u32, qos: Option<&Qos>) -> Result<Self> {
let participant = DomainParticipant::new_with_qos(domain_id, qos)?;
let participant_entity = participant.entity();
let ws = check_entity(unsafe { zenrc_dds::dds_create_waitset(participant_entity) })?;
let guard = match check_entity(unsafe {
zenrc_dds::dds_create_guardcondition(participant_entity)
}) {
Ok(g) => g,
Err(e) => {
unsafe { zenrc_dds::dds_delete(ws) };
return Err(e);
}
};
if let Err(e) = check_ret(unsafe {
zenrc_dds::dds_waitset_attach(ws, guard, WAKE_TOKEN)
}) {
unsafe { zenrc_dds::dds_delete(guard) };
unsafe { zenrc_dds::dds_delete(ws) };
return Err(e);
}
let running = Arc::new(AtomicBool::new(true));
let pending = Arc::new(Mutex::new(Vec::<(dds_entity_t, Arc<tokio::sync::Notify>)>::new()));
let handle = {
let running = Arc::clone(&running);
let pending = Arc::clone(&pending);
thread::Builder::new()
.name("dds-context".into())
.spawn(move || {
context_loop(
ws,
guard,
running,
pending,
)
})
.map_err(|e| DdsError::RetCode(-1, format!("创建上下文线程失败: {e}")))?
};
Ok(Self {
participant,
waitset: ws,
guard,
running,
pending,
thread: Some(handle),
})
}
pub fn domain_id(&self) -> Result<u32> {
self.participant.domain_id()
}
pub fn entity(&self) -> dds_entity_t {
self.participant.entity()
}
pub fn lookup_participants(domain_id: u32) -> Result<Vec<dds_entity_t>> {
DomainParticipant::lookup_participants(domain_id)
}
pub fn create_topic<T: RawMessageBridge>(&self, name: &str) -> Result<Topic<T>> {
self.participant.create_topic(name)
}
pub fn create_topic_with_qos<T: RawMessageBridge>(
&self,
name: &str,
qos: &Qos,
) -> Result<Topic<T>> {
self.participant.create_topic_with_qos(name, qos)
}
pub fn create_publisher<T: RawMessageBridge>(
&self,
topic_name: &str,
qos: Qos,
) -> Result<Publisher<T>> {
self.participant.create_publisher(topic_name, qos)
}
pub fn create_subscriber<T: RawMessageBridge>(
&self,
topic_name: &str,
qos: Qos,
) -> Result<Subscription<T>> {
let topic = self.participant.create_topic_with_qos::<T>(topic_name, &qos)?;
let sub = unsafe {
zenrc_dds::dds_create_subscriber(
self.participant.entity(),
qos.raw as *const _,
std::ptr::null(),
)
};
let reader = check_entity(unsafe {
zenrc_dds::dds_create_reader(
sub,
topic.entity,
qos.raw as *const _,
std::ptr::null(),
)
})?;
let mut subscription = Subscription::new(reader, topic);
subscription.with_context(self);
Ok(subscription)
}
pub fn create_service<Req, Res>(
&self,
service_name: &str,
qos: Qos,
) -> Result<super::service::ServiceServer<Req, Res>>
where
Req: RawMessageBridge,
Res: RawMessageBridge,
{
let participant_entity = self.participant.entity;
let bare = service_name.trim_start_matches('/');
let req_name = format!("rq/{}Request", bare);
let res_name = format!("rr/{}Reply", bare);
let req_topic = self.create_topic_with_qos::<Req>(&req_name, &qos)?;
let res_topic = self.create_topic_with_qos::<Res>(&res_name, &qos)?;
let reader = check_entity(unsafe {
zenrc_dds::dds_create_reader(
participant_entity,
req_topic.entity,
qos.raw as *const _,
std::ptr::null(),
)
})?;
let writer = check_entity(unsafe {
zenrc_dds::dds_create_writer(
participant_entity,
res_topic.entity,
qos.raw as *const _,
std::ptr::null(),
)
})?;
let mut service = service::ServiceServer::new(reader, writer, req_topic, res_topic);
service.with_context(self);
Ok(service)
}
pub fn create_client<Req: RawMessageBridge, Res: RawMessageBridge>(
&self,
service_name: &str,
qos: Qos,
) -> Result<super::service::ServiceClient<Req, Res>> {
let participant_entity = self.participant.entity();
let bare = service_name.trim_start_matches('/');
let req_name = format!("rq/{}Request", bare);
let res_name = format!("rr/{}Reply", bare);
let req_topic = self.create_topic_with_qos::<Req>(&req_name, &qos)?;
let res_topic = self.create_topic_with_qos::<Res>(&res_name, &qos)?;
let writer = check_entity(unsafe {
zenrc_dds::dds_create_writer(
participant_entity,
req_topic.entity,
qos.raw as *const _,
std::ptr::null(),
)
})?;
let reader = check_entity(unsafe {
zenrc_dds::dds_create_reader(
participant_entity,
res_topic.entity,
qos.raw as *const _,
std::ptr::null(),
)
})?;
Ok(super::service::ServiceClient::new(
writer,
reader,
participant_entity,
req_topic,
res_topic,
))
}
pub(crate) fn attach(&self, reader: dds_entity_t) -> Arc<tokio::sync::Notify> {
let notify = Arc::new(tokio::sync::Notify::new());
self.pending.lock().unwrap().push((reader, Arc::clone(¬ify)));
unsafe { zenrc_dds::dds_set_guardcondition(self.guard, true) };
notify
}
}
impl Drop for DdsContext {
fn drop(&mut self) {
self.running.store(false, Ordering::Release);
unsafe { zenrc_dds::dds_set_guardcondition(self.guard, true) };
if let Some(handle) = self.thread.take() {
let _ = handle.join();
}
unsafe { zenrc_dds::dds_delete(self.waitset) };
unsafe { zenrc_dds::dds_delete(self.guard) };
}
}
fn context_loop(
waitset: dds_entity_t,
guard: dds_entity_t,
running: Arc<AtomicBool>,
pending: Arc<Mutex<Vec<(dds_entity_t, Arc<tokio::sync::Notify>)>>>,
) {
let mut readers: HashMap<isize, ReaderEntry> = HashMap::new();
while running.load(Ordering::Acquire) {
{
let new_readers: Vec<_> = pending.lock().unwrap().drain(..).collect();
for (reader, notify) in new_readers {
let token = reader as isize;
let readcond = match check_entity(unsafe {
zenrc_dds::dds_create_readcondition(reader, DDS_ANY_STATE)
}) {
Ok(rc) => rc,
Err(_) => continue,
};
if check_ret(unsafe {
zenrc_dds::dds_waitset_attach(waitset, readcond, token)
})
.is_err()
{
unsafe { zenrc_dds::dds_delete(readcond) };
continue;
}
readers.insert(token, ReaderEntry { readcond, notify });
}
}
{
let stale: Vec<isize> = readers
.iter()
.filter(|&(&token, _)| unsafe {
zenrc_dds::dds_get_parent(token as dds_entity_t) < 0
})
.map(|(&token, _)| token)
.collect();
for token in stale {
if let Some(entry) = readers.remove(&token) {
unsafe { zenrc_dds::dds_waitset_detach(waitset, entry.readcond) };
unsafe { zenrc_dds::dds_delete(entry.readcond) };
}
}
}
let mut xs: Vec<dds_attach_t> = vec![0; MAX_TRIGGERS];
let n = unsafe {
zenrc_dds::dds_waitset_wait(
waitset,
xs.as_mut_ptr(),
MAX_TRIGGERS,
POLL_TIMEOUT_NS,
)
};
if n < 0 {
break;
}
xs.truncate(n as usize);
for token in xs {
if token == WAKE_TOKEN {
unsafe { zenrc_dds::dds_set_guardcondition(guard, false) };
continue;
}
{
let notify = readers.get(&token).map(|e| Arc::clone(&e.notify));
if let Some(n) = notify {
n.notify_one();
}
}
}
}
for entry in readers.values() {
unsafe { zenrc_dds::dds_delete(entry.readcond) };
}
}