use std::{
marker::PhantomData,
sync::{
atomic::{AtomicI64, Ordering},
Arc, RwLock,
},
time::Duration,
};
use mio::{Evented, Events, Poll, PollOpt, Ready, Token};
use mio_extras::channel::{self as mio_channel, Receiver, SendError};
use serde::Serialize;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::{
dds::{
ddsdata::DDSData,
helpers::*,
pubsub::Publisher,
qos::{
policy::{Liveliness, Reliability},
HasQoSPolicy, QosPolicies,
},
statusevents::*,
topic::Topic,
traits::{
dds_entity::DDSEntity, key::*, serde_adapters::with_key::SerializerAdapter, TopicDescription,
},
values::result::{Error, Result},
},
discovery::{data_types::topic_data::SubscriptionBuiltinTopicData, discovery::DiscoveryCommand},
log_and_err_internal,
messages::submessages::submessage_elements::serialized_payload::SerializedPayload,
serialization::CDRSerializerAdapter,
structure::{
cache_change::ChangeKind, dds_cache::DDSCache, entity::RTPSEntity, guid::GUID,
rpc::SampleIdentity, sequence_number::SequenceNumber, time::Timestamp,
},
};
use super::super::writer::WriterCommand;
#[derive(Debug, Default)]
pub struct WriteOptionsBuilder {
related_sample_identity: Option<SampleIdentity>,
source_timestamp: Option<Timestamp>,
}
impl WriteOptionsBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn build(self) -> WriteOptions {
WriteOptions {
related_sample_identity: self.related_sample_identity,
source_timestamp: self.source_timestamp,
}
}
#[must_use]
pub fn related_sample_identity(mut self, related_sample_identity: SampleIdentity) -> Self {
self.related_sample_identity = Some(related_sample_identity);
self
}
#[must_use]
pub fn related_sample_identity_opt(
mut self,
related_sample_identity_opt: Option<SampleIdentity>,
) -> Self {
self.related_sample_identity = related_sample_identity_opt;
self
}
#[must_use]
pub fn source_timestamp(mut self, source_timestamp: Timestamp) -> Self {
self.source_timestamp = Some(source_timestamp);
self
}
}
#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Default)]
pub struct WriteOptions {
pub(crate) related_sample_identity: Option<SampleIdentity>,
pub(crate) source_timestamp: Option<Timestamp>,
}
impl From<Option<Timestamp>> for WriteOptions {
fn from(source_timestamp: Option<Timestamp>) -> Self {
Self {
related_sample_identity: None,
source_timestamp,
}
}
}
pub type DataWriterCdr<D> = DataWriter<D, CDRSerializerAdapter<D>>;
pub struct DataWriter<D: Keyed + Serialize, SA: SerializerAdapter<D> = CDRSerializerAdapter<D>> {
data_phantom: PhantomData<D>,
ser_phantom: PhantomData<SA>,
my_publisher: Publisher,
my_topic: Topic,
qos_policy: QosPolicies,
my_guid: GUID,
cc_upload: mio_channel::SyncSender<WriterCommand>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
status_receiver: StatusReceiver<DataWriterStatus>,
available_sequence_number: AtomicI64,
}
impl<D, SA> Drop for DataWriter<D, SA>
where
D: Keyed + Serialize,
SA: SerializerAdapter<D>,
{
fn drop(&mut self) {
self.my_publisher.remove_writer(self.my_guid);
match self
.discovery_command
.send(DiscoveryCommand::RemoveLocalWriter { guid: self.guid() })
{
Ok(_) => {}
Err(SendError::Disconnected(_cmd)) => {
debug!("Failed to send REMOVE_LOCAL_WRITER DiscoveryCommand: Disconnected.");
}
Err(e) => error!(
"Failed to send REMOVE_LOCAL_WRITER DiscoveryCommand. {:?}",
e
),
}
}
}
impl<D, SA> DataWriter<D, SA>
where
D: Keyed + Serialize,
<D as Keyed>::K: Key,
SA: SerializerAdapter<D>,
{
pub(crate) fn new(
publisher: Publisher,
topic: Topic,
guid: GUID,
cc_upload: mio_channel::SyncSender<WriterCommand>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
dds_cache: &Arc<RwLock<DDSCache>>, status_receiver_rec: Receiver<DataWriterStatus>,
) -> Result<Self> {
match dds_cache.write() {
Ok(mut cache) => cache.add_new_topic(topic.name(), topic.get_type()),
Err(e) => panic!("DDSCache is poisoned. {:?}", e),
};
if let Some(lv) = topic.qos().liveliness {
match lv {
Liveliness::Automatic { .. } | Liveliness::ManualByTopic { .. } => (),
Liveliness::ManualByParticipant { .. } => {
if let Err(e) = discovery_command.send(DiscoveryCommand::ManualAssertLiveliness) {
error!("Failed to send DiscoveryCommand - Refresh. {:?}", e);
}
}
}
};
let qos = topic.qos();
Ok(Self {
data_phantom: PhantomData,
ser_phantom: PhantomData,
my_publisher: publisher,
my_topic: topic,
qos_policy: qos,
my_guid: guid,
cc_upload,
discovery_command,
status_receiver: StatusReceiver::new(status_receiver_rec),
available_sequence_number: AtomicI64::new(1), })
}
fn next_sequence_number(&self) -> SequenceNumber {
SequenceNumber::from(
self
.available_sequence_number
.fetch_add(1, Ordering::Relaxed),
)
}
fn undo_sequence_number(&self) {
self
.available_sequence_number
.fetch_sub(1, Ordering::Relaxed);
}
pub fn refresh_manual_liveliness(&self) {
if let Some(lv) = self.qos().liveliness {
match lv {
Liveliness::Automatic { .. } | Liveliness::ManualByTopic { .. } => (),
Liveliness::ManualByParticipant { .. } => {
if let Err(e) = self
.discovery_command
.send(DiscoveryCommand::ManualAssertLiveliness)
{
error!("Failed to send DiscoveryCommand - Refresh. {:?}", e);
}
}
}
};
}
pub fn write(&self, data: D, source_timestamp: Option<Timestamp>) -> Result<()> {
self.write_with_options(data, WriteOptions::from(source_timestamp))?;
Ok(())
}
pub fn write_with_options(&self, data: D, write_options: WriteOptions) -> Result<SampleIdentity> {
let send_buffer = SA::to_bytes(&data)?;
let ddsdata = DDSData::new(SerializedPayload::new_from_bytes(
SA::output_encoding(),
send_buffer,
));
let sequence_number = self.next_sequence_number();
let writer_command = WriterCommand::DDSData {
data: ddsdata,
write_options,
sequence_number,
};
let timeout = match self.qos().reliability() {
Some(Reliability::Reliable { max_blocking_time }) => Some(max_blocking_time),
_ => None,
};
match try_send_timeout(&self.cc_upload, writer_command, timeout) {
Ok(_) => {
self.refresh_manual_liveliness();
Ok(SampleIdentity {
writer_guid: self.my_guid,
sequence_number,
})
}
Err(e) => {
warn!(
"Failed to write new data: topic={:?} reason={:?} timeout={:?}",
self.my_topic.name(),
e,
timeout,
);
self.undo_sequence_number();
Err(Error::OutOfResources)
}
}
}
pub fn wait_for_acknowledgments(&self, max_wait: Duration) -> Result<bool> {
match &self.qos_policy.reliability {
None | Some(Reliability::BestEffort) => Ok(true),
Some(Reliability::Reliable { .. }) => {
let (acked_sender, acked_receiver) = mio_channel::sync_channel::<()>(1);
let poll = Poll::new()?;
poll.register(
&acked_receiver,
Token(0),
Ready::readable(),
PollOpt::edge(),
)?;
self
.cc_upload
.try_send(WriterCommand::WaitForAcknowledgments {
all_acked: acked_sender,
})?;
let mut events = Events::with_capacity(1);
poll.poll(&mut events, Some(max_wait))?;
if let Some(_event) = events.iter().next() {
let _ = acked_receiver
.try_recv()
.or_else(|_e| log_and_err_internal!("wait_for_acknowledgments - Spurious poll event?"));
Ok(true)
} else {
Ok(false)
}
}
} }
pub fn topic(&self) -> &Topic {
&self.my_topic
}
pub fn publisher(&self) -> &Publisher {
&self.my_publisher
}
pub fn assert_liveliness(&self) -> Result<()> {
self.refresh_manual_liveliness();
match self.qos().liveliness {
Some(Liveliness::ManualByTopic { lease_duration: _ }) => {
self
.discovery_command
.send(DiscoveryCommand::AssertTopicLiveliness {
writer_guid: self.guid(),
manual_assertion: true, })
.unwrap_or_else(|e| error!("assert_liveness - Failed to send DiscoveryCommand. {:?}", e));
}
_other => (),
}
Ok(())
}
pub fn get_matched_subscriptions(&self) -> Vec<SubscriptionBuiltinTopicData> {
todo!()
}
pub fn dispose(&self, key: &<D as Keyed>::K, source_timestamp: Option<Timestamp>) -> Result<()> {
let send_buffer = SA::key_to_bytes(key)?;
let ddsdata = DDSData::new_disposed_by_key(
ChangeKind::NotAliveDisposed,
SerializedPayload::new_from_bytes(SA::output_encoding(), send_buffer),
);
self
.cc_upload
.send(WriterCommand::DDSData {
data: ddsdata,
write_options: WriteOptions::from(source_timestamp),
sequence_number: self.next_sequence_number(),
})
.or_else(|huh| {
self.undo_sequence_number();
log_and_err_internal!("Cannot send dispose command: {:?}", huh)
})?;
self.refresh_manual_liveliness();
Ok(())
}
}
impl<D, SA> StatusEvented<DataWriterStatus> for DataWriter<D, SA>
where
D: Keyed + Serialize,
SA: SerializerAdapter<D>,
{
fn as_status_evented(&mut self) -> &dyn Evented {
self.status_receiver.as_status_evented()
}
fn try_recv_status(&self) -> Option<DataWriterStatus> {
self.status_receiver.try_recv_status()
}
}
impl<D, SA> RTPSEntity for DataWriter<D, SA>
where
D: Keyed + Serialize,
SA: SerializerAdapter<D>,
{
fn guid(&self) -> GUID {
self.my_guid
}
}
impl<D, SA> HasQoSPolicy for DataWriter<D, SA>
where
D: Keyed + Serialize,
SA: SerializerAdapter<D>,
{
fn qos(&self) -> QosPolicies {
self.qos_policy.clone()
}
}
impl<D, SA> DDSEntity for DataWriter<D, SA>
where
D: Keyed + Serialize,
SA: SerializerAdapter<D>,
{
}
#[cfg(test)]
mod tests {
use std::thread;
use byteorder::LittleEndian;
use log::info;
use super::*;
use crate::{
dds::{participant::DomainParticipant, traits::key::Keyed},
serialization::cdr_serializer::CDRSerializerAdapter,
structure::topic_kind::TopicKind,
test::random_data::*,
};
#[test]
fn dw_write_test() {
let domain_participant = DomainParticipant::new(0).expect("Publisher creation failed!");
let qos = QosPolicies::qos_none();
let _default_dw_qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"Huh?".to_string(),
&qos,
TopicKind::WithKey,
)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter(&topic, None)
.expect("Failed to create datawriter");
let mut data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
data_writer
.write(data.clone(), None)
.expect("Unable to write data");
data.a = 5;
let timestamp = Timestamp::now();
data_writer
.write(data, Some(timestamp))
.expect("Unable to write data with timestamp");
}
#[test]
fn dw_dispose_test() {
let domain_participant = DomainParticipant::new(0).expect("Publisher creation failed!");
let qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"Huh?".to_string(),
&qos,
TopicKind::WithKey,
)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter(&topic, None)
.expect("Failed to create datawriter");
let data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
let key = &data.key().hash_key();
info!("key: {:?}", key);
data_writer
.write(data.clone(), None)
.expect("Unable to write data");
thread::sleep(Duration::from_millis(100));
data_writer
.dispose(&data.key(), None)
.expect("Unable to dispose data");
}
#[test]
fn dw_wait_for_ack_test() {
let domain_participant = DomainParticipant::new(0).expect("Participant creation failed!");
let qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"Huh?".to_string(),
&qos,
TopicKind::WithKey,
)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter(&topic, None)
.expect("Failed to create datawriter");
let data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
data_writer.write(data, None).expect("Unable to write data");
let res = data_writer
.wait_for_acknowledgments(Duration::from_secs(2))
.unwrap();
assert!(res); }
}