use dust_dds::{
domain::domain_participant_factory::DomainParticipantFactory,
infrastructure::{
qos::{DataReaderQos, DataWriterQos, QosKind},
qos_policy::{ReliabilityQosPolicy, ReliabilityQosPolicyKind},
sample_info::{ANY_INSTANCE_STATE, ANY_SAMPLE_STATE, ANY_VIEW_STATE},
status::{NO_STATUS, StatusKind},
time::{Duration, DurationKind},
type_support::DdsType,
},
listener::NO_LISTENER,
wait_set::{Condition, WaitSet},
};
use tracing::Level;
use tracing_subscriber::{FmtSubscriber, fmt::format::FmtSpan};
#[derive(Debug, Clone, PartialEq, DdsType)]
struct Data {
#[dust_dds(key)]
id: u8,
value: Vec<u8>,
}
fn main() {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::DEBUG)
.with_span_events(FmtSpan::ACTIVE)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
let domain_id = 0;
let participant = DomainParticipantFactory::get_instance()
.create_participant(domain_id, QosKind::Default, NO_LISTENER, NO_STATUS)
.unwrap();
let topic = participant
.create_topic::<Data>(
"DataTopic",
"Data",
QosKind::Default,
NO_LISTENER,
NO_STATUS,
)
.unwrap();
let publisher = participant
.create_publisher(QosKind::Default, NO_LISTENER, NO_STATUS)
.unwrap();
let writer_qos = DataWriterQos {
reliability: ReliabilityQosPolicy {
kind: ReliabilityQosPolicyKind::BestEffort,
max_blocking_time: DurationKind::Finite(Duration::new(1, 0)),
},
..Default::default()
};
let writer = publisher
.create_datawriter(
&topic,
QosKind::Specific(writer_qos),
NO_LISTENER,
NO_STATUS,
)
.unwrap();
let subscriber = participant
.create_subscriber(QosKind::Default, NO_LISTENER, NO_STATUS)
.unwrap();
let reader_qos = DataReaderQos {
reliability: ReliabilityQosPolicy {
kind: ReliabilityQosPolicyKind::BestEffort,
max_blocking_time: DurationKind::Finite(Duration::new(1, 0)),
},
..Default::default()
};
let reader = subscriber
.create_datareader::<Data>(
&topic,
QosKind::Specific(reader_qos),
NO_LISTENER,
NO_STATUS,
)
.unwrap();
let cond = writer.get_statuscondition();
cond.set_enabled_statuses(&[StatusKind::PublicationMatched])
.unwrap();
let mut wait_set = WaitSet::new();
wait_set
.attach_condition(Condition::StatusCondition(cond))
.unwrap();
wait_set.wait(Duration::new(10, 0)).unwrap();
let data = Data {
id: 1,
value: vec![8; 15000],
};
writer.write(data.clone(), None).unwrap();
let cond = reader.get_statuscondition();
cond.set_enabled_statuses(&[StatusKind::DataAvailable])
.unwrap();
let mut reader_wait_set = WaitSet::new();
reader_wait_set
.attach_condition(Condition::StatusCondition(cond))
.unwrap();
reader_wait_set.wait(Duration::new(10, 0)).unwrap();
let samples = reader
.take(3, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE)
.unwrap();
assert_eq!(samples.len(), 1);
assert_eq!(samples[0].data.as_ref().unwrap(), &data);
}