cdds-util 0.1.1

Cyclone DDS and some cyclone utils low-level Rust bindings.
Documentation
use std::mem::MaybeUninit;
use cdds_util::*;
use std::ffi::{CStr, CString};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;


const MAX_SAMPLES: usize = 32;

#[derive(Debug)]
pub enum MatchedEntity {
    DiscoveredPublication {
        topic_name: String,
        type_name: String,
        partition: Option<String>,
        qos: Arc<*mut dds_qos_t>
    },
    UndiscoveredPublication {
        topic_name: String,
        type_name: String,
        partition: Option<String>,
    },
    DiscoveredSubscription {
        topic_name: String,
        type_name: String,
        partition: Option<String>,
        qos: Arc<*mut dds_qos_t>
    },
    UndiscoveredSubscription {
        topic_name: String,
        type_name: String,
        partition: Option<String>,
    },
}

unsafe extern "C" fn on_data(dr: dds_entity_t, arg: *mut std::os::raw::c_void) {
    let btx = Box::from_raw(arg as *mut (bool, Sender<MatchedEntity>));

    #[allow(clippy::uninit_assumed_init)]
    let mut si: [dds_sample_info_t; MAX_SAMPLES] = { MaybeUninit::uninit().assume_init() };
    let mut samples: [*mut ::std::os::raw::c_void; MAX_SAMPLES] =
        [std::ptr::null_mut(); MAX_SAMPLES as usize];
    samples[0] = std::ptr::null_mut();

    let n = dds_take(
        dr,
        samples.as_mut_ptr() as *mut *mut libc::c_void,
        si.as_mut_ptr() as *mut dds_sample_info_t,
        MAX_SAMPLES as u64,
        MAX_SAMPLES as u32,
    );
    for i in 0..n {
        if si[i as usize].valid_data {
            let sample = samples[i as usize] as *mut dds_builtintopic_endpoint_t;
            let topic_name = CStr::from_ptr((*sample).topic_name).to_str().unwrap();
            let type_name = CStr::from_ptr((*sample).type_name).to_str().unwrap();
            let mut n = 0u32;
            let mut ps: *mut *mut ::std::os::raw::c_char = std::ptr::null_mut();
            let qos = dds_create_qos();
            dds_copy_qos(qos, (*sample).qos);
            let bqos = Arc::new(qos);

            let _ = dds_qget_partition(
                (*sample).qos,
                &mut n as *mut u32,
                &mut ps as *mut *mut *mut ::std::os::raw::c_char,
            );
            if n > 0 {
                for k in 0..n {
                    let p = CStr::from_ptr(*(ps.offset(k as isize))).to_str().unwrap();
                    if si[i as usize].instance_state == dds_instance_state_DDS_IST_ALIVE {
                        if btx.0 {
                            (btx.1)
                                .send(MatchedEntity::DiscoveredPublication {
                                    topic_name: String::from(topic_name),
                                    type_name: String::from(type_name),
                                    partition: Some(String::from(p)),
                                    qos: bqos.clone()
                                })
                                .unwrap();
                        } else {
                            (btx.1)
                                .send(MatchedEntity::DiscoveredSubscription {
                                    topic_name: String::from(topic_name),
                                    type_name: String::from(type_name),
                                    partition: Some(String::from(p)),
                                    qos: bqos.clone()
                                })
                                .unwrap();
                        }
                    } else if btx.0 {
                        (btx.1)
                            .send(MatchedEntity::UndiscoveredPublication {
                                topic_name: String::from(topic_name),
                                type_name: String::from(type_name),
                                partition: Some(String::from(p)),
                            })
                            .unwrap();
                    } else {
                        (btx.1)
                            .send(MatchedEntity::UndiscoveredSubscription {
                                topic_name: String::from(topic_name),
                                type_name: String::from(type_name),
                                partition: Some(String::from(p)),
                            })
                            .unwrap();
                    }
                }
            } else if si[i as usize].instance_state == dds_instance_state_DDS_IST_ALIVE {
                if btx.0 {
                    (btx.1)
                        .send(MatchedEntity::DiscoveredPublication {
                            topic_name: String::from(topic_name),
                            type_name: String::from(type_name),
                            partition: None,
                            qos: bqos.clone()
                        })
                        .unwrap();
                } else {
                    (btx.1)
                        .send(MatchedEntity::DiscoveredSubscription {
                            topic_name: String::from(topic_name),
                            type_name: String::from(type_name),
                            partition: None,
                            qos: bqos.clone()
                        })
                        .unwrap();
                }
            } else if btx.0 {
                (btx.1)
                    .send(MatchedEntity::UndiscoveredPublication {
                        topic_name: String::from(topic_name),
                        type_name: String::from(type_name),
                        partition: None,
                    })
                    .unwrap();
            } else {
                (btx.1)
                    .send(MatchedEntity::UndiscoveredSubscription {
                        topic_name: String::from(topic_name),
                        type_name: String::from(type_name),
                        partition: None,
                    })
                    .unwrap();
            }
        }
    }
    dds_return_loan(
        dr,
        samples.as_mut_ptr() as *mut *mut libc::c_void,
        MAX_SAMPLES as i32,
    );
    Box::into_raw(btx);
}
fn main() {
    unsafe {
        let (tx, rx): (Sender<MatchedEntity>, Receiver<MatchedEntity>) = channel();
        let ptx = Box::new((true, tx.clone()));
        let stx = Box::new((false, tx));
        let dp = dds_create_participant(DDS_DOMAIN_DEFAULT, std::ptr::null(), std::ptr::null());
        let pub_listener = dds_create_listener(Box::into_raw(ptx) as *mut std::os::raw::c_void);
        dds_lset_data_available(pub_listener, Some(on_data));

        let _pr = dds_create_reader(
            dp,
            DDS_BUILTIN_TOPIC_DCPSPUBLICATION,
            std::ptr::null(),
            pub_listener,
        );

        let sub_listener = dds_create_listener(Box::into_raw(stx) as *mut std::os::raw::c_void);
        dds_lset_data_available(sub_listener, Some(on_data));
        let _sr = dds_create_reader(
            dp,
            DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION,
            std::ptr::null(),
            sub_listener,
        );
        while let Ok(me) = rx.recv() {
            println!("{:?}", me);
        }
    }
}