use std::time::Duration;
use serde::Serialize;
use crate::{
dds::{
data_types::GUID,
pubsub::Publisher,
qos::{HasQoSPolicy, QosPolicies},
topic::Topic,
traits::{dds_entity::DDSEntity, serde_adapters::no_key::SerializerAdapter},
values::result::Result,
with_key::datawriter as datawriter_with_key,
},
discovery::data_types::topic_data::SubscriptionBuiltinTopicData,
serialization::CDRSerializerAdapter,
structure::{entity::RTPSEntity, rpc::SampleIdentity, time::Timestamp},
};
use super::wrappers::{NoKeyWrapper, SAWrapper};
pub type DataWriterCdr<D> = DataWriter<D, CDRSerializerAdapter<D>>;
pub struct DataWriter<D: Serialize, SA: SerializerAdapter<D> = CDRSerializerAdapter<D>> {
keyed_datawriter: datawriter_with_key::DataWriter<NoKeyWrapper<D>, SAWrapper<SA>>,
}
impl<D, SA> DataWriter<D, SA>
where
D: Serialize,
SA: SerializerAdapter<D>,
{
pub(crate) fn from_keyed(
keyed: datawriter_with_key::DataWriter<NoKeyWrapper<D>, SAWrapper<SA>>,
) -> Self {
Self {
keyed_datawriter: keyed,
}
}
pub fn write(&self, data: D, source_timestamp: Option<Timestamp>) -> Result<()> {
self
.keyed_datawriter
.write(NoKeyWrapper::<D> { d: data }, source_timestamp)
}
pub fn write_with_options(
&self,
data: D,
write_options: datawriter_with_key::WriteOptions,
) -> Result<SampleIdentity> {
self
.keyed_datawriter
.write_with_options(NoKeyWrapper::<D> { d: data }, write_options)
}
pub fn wait_for_acknowledgments(&self, max_wait: Duration) -> Result<bool> {
self.keyed_datawriter.wait_for_acknowledgments(max_wait)
}
pub fn topic(&self) -> &Topic {
self.keyed_datawriter.topic()
}
pub fn publisher(&self) -> &Publisher {
self.keyed_datawriter.publisher()
}
pub fn assert_liveliness(&self) -> Result<()> {
self.keyed_datawriter.assert_liveliness()
}
pub fn get_matched_subscriptions(&self) -> Vec<SubscriptionBuiltinTopicData> {
self.keyed_datawriter.get_matched_subscriptions()
}
}
impl<D: Serialize, SA: SerializerAdapter<D>> RTPSEntity for DataWriter<D, SA> {
fn guid(&self) -> GUID {
self.keyed_datawriter.guid()
}
}
impl<D: Serialize, SA: SerializerAdapter<D>> HasQoSPolicy for DataWriter<D, SA> {
fn qos(&self) -> QosPolicies {
self.keyed_datawriter.qos()
}
}
impl<D: Serialize, SA: SerializerAdapter<D>> DDSEntity for DataWriter<D, SA> {}
#[cfg(test)]
mod tests {
use byteorder::LittleEndian;
use super::*;
use crate::{
dds::{participant::DomainParticipant, topic::TopicKind},
serialization::cdr_serializer::*,
test::random_data::*,
};
#[test]
fn dw_write_test() {
let domain_participant = DomainParticipant::new(0).expect("Failed to create participant");
let qos = QosPolicies::qos_none();
let _default_dw_qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"Huh?".to_string(),
&qos,
TopicKind::NoKey,
)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter_no_key(&topic, None)
.expect("Failed to create datawriter");
let mut data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
data_writer
.write(data.clone(), None)
.expect("Unable to write data");
data.a = 5;
let timestamp: Timestamp = Timestamp::now();
data_writer
.write(data, Some(timestamp))
.expect("Unable to write data with timestamp");
}
#[test]
fn dw_wait_for_ack_test() {
let domain_participant = DomainParticipant::new(0).expect("Failed to create participant");
let qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"Huh?".to_string(),
&qos,
TopicKind::NoKey,
)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter_no_key(&topic, None)
.expect("Failed to create datawriter");
let data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
data_writer.write(data, None).expect("Unable to write data");
let res = data_writer
.wait_for_acknowledgments(Duration::from_secs(2))
.unwrap();
assert!(res); }
}