use std::time::Duration;
use mio_06::Evented;
use crate::{
dds::{
adapters::no_key::SerializerAdapter,
pubsub::Publisher,
qos::{HasQoSPolicy, QosPolicies},
result::{unwrap_no_key_write_error, WriteResult},
statusevents::{DataWriterStatus, StatusReceiverStream},
topic::Topic,
with_key::datawriter as datawriter_with_key,
},
discovery::sedp_messages::SubscriptionBuiltinTopicData,
serialization::CDRSerializerAdapter,
structure::{entity::RTPSEntity, rpc::SampleIdentity, time::Timestamp},
StatusEvented, GUID,
};
use super::wrappers::{NoKeyWrapper, SAWrapper};
pub type DataWriterCdr<D> = DataWriter<D, CDRSerializerAdapter<D>>;
pub struct DataWriter<D, SA: SerializerAdapter<D> = CDRSerializerAdapter<D>> {
keyed_datawriter: datawriter_with_key::DataWriter<NoKeyWrapper<D>, SAWrapper<SA>>,
}
impl<D, SA> DataWriter<D, SA>
where
SA: SerializerAdapter<D>,
{
pub(crate) fn from_keyed(
keyed: datawriter_with_key::DataWriter<NoKeyWrapper<D>, SAWrapper<SA>>,
) -> Self {
Self {
keyed_datawriter: keyed,
}
}
pub fn write(&self, data: D, source_timestamp: Option<Timestamp>) -> WriteResult<(), D> {
self
.keyed_datawriter
.write(NoKeyWrapper::<D> { d: data }, source_timestamp)
.map_err(unwrap_no_key_write_error)
}
pub fn write_with_options(
&self,
data: D,
write_options: datawriter_with_key::WriteOptions,
) -> WriteResult<SampleIdentity, D> {
self
.keyed_datawriter
.write_with_options(NoKeyWrapper::<D> { d: data }, write_options)
.map_err(unwrap_no_key_write_error)
}
pub fn wait_for_acknowledgments(&self, max_wait: Duration) -> WriteResult<bool, ()> {
self.keyed_datawriter.wait_for_acknowledgments(max_wait)
}
pub fn topic(&self) -> &Topic {
self.keyed_datawriter.topic()
}
pub fn publisher(&self) -> &Publisher {
self.keyed_datawriter.publisher()
}
pub fn assert_liveliness(&self) -> WriteResult<(), ()> {
self.keyed_datawriter.assert_liveliness()
}
pub fn get_matched_subscriptions(&self) -> Vec<SubscriptionBuiltinTopicData> {
self.keyed_datawriter.get_matched_subscriptions()
}
}
impl<'a, D, SA> StatusEvented<'a, DataWriterStatus, StatusReceiverStream<'a, DataWriterStatus>>
for DataWriter<D, SA>
where
SA: SerializerAdapter<D>,
{
fn as_status_evented(&mut self) -> &dyn Evented {
self.keyed_datawriter.as_status_evented()
}
fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
self.keyed_datawriter.as_status_source()
}
fn as_async_status_stream(&'a self) -> StatusReceiverStream<'a, DataWriterStatus> {
self.keyed_datawriter.as_async_status_stream()
}
fn try_recv_status(&self) -> Option<DataWriterStatus> {
self.keyed_datawriter.try_recv_status()
}
}
impl<D, SA: SerializerAdapter<D>> RTPSEntity for DataWriter<D, SA> {
fn guid(&self) -> GUID {
self.keyed_datawriter.guid()
}
}
impl<D, SA: SerializerAdapter<D>> HasQoSPolicy for DataWriter<D, SA> {
fn qos(&self) -> QosPolicies {
self.keyed_datawriter.qos()
}
}
impl<D, SA> DataWriter<D, SA>
where
SA: SerializerAdapter<D>,
{
pub async fn async_write(
&self,
data: D,
source_timestamp: Option<Timestamp>,
) -> WriteResult<(), D> {
self
.keyed_datawriter
.async_write(NoKeyWrapper::<D> { d: data }, source_timestamp)
.await
.map_err(unwrap_no_key_write_error)
}
pub async fn async_write_with_options(
&self,
data: D,
write_options: datawriter_with_key::WriteOptions,
) -> WriteResult<SampleIdentity, D> {
self
.keyed_datawriter
.async_write_with_options(NoKeyWrapper::<D> { d: data }, write_options)
.await
.map_err(unwrap_no_key_write_error)
}
pub async fn async_wait_for_acknowledgments(&self) -> WriteResult<bool, ()> {
self.keyed_datawriter.async_wait_for_acknowledgments().await
} }
#[cfg(test)]
mod tests {
use byteorder::LittleEndian;
use super::*;
use crate::{
dds::{participant::DomainParticipant, topic::TopicKind},
serialization::*,
test::random_data::*,
};
#[test]
fn dw_write_test() {
let domain_participant = DomainParticipant::new(0).expect("Failed to create participant");
let qos = QosPolicies::qos_none();
let _default_dw_qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"Huh?".to_string(),
&qos,
TopicKind::NoKey,
)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter_no_key(&topic, None)
.expect("Failed to create datawriter");
let mut data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
data_writer
.write(data.clone(), None)
.expect("Unable to write data");
data.a = 5;
let timestamp: Timestamp = Timestamp::now();
data_writer
.write(data, Some(timestamp))
.expect("Unable to write data with timestamp");
}
#[test]
fn dw_wait_for_ack_test() {
let domain_participant = DomainParticipant::new(0).expect("Failed to create participant");
let qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"Huh?".to_string(),
&qos,
TopicKind::NoKey,
)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter_no_key(&topic, None)
.expect("Failed to create datawriter");
let data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
data_writer.write(data, None).expect("Unable to write data");
let res = data_writer
.wait_for_acknowledgments(Duration::from_secs(2))
.unwrap();
assert!(res); }
}