use std::{
marker::PhantomData,
sync::{Arc, RwLock},
time::Duration,
};
use mio::{Poll, Events, Token, Ready, PollOpt, Evented};
use mio_extras::channel::{self as mio_channel, Receiver};
use serde::Serialize;
use log::{error, warn};
use crate::{
discovery::discovery::DiscoveryCommand, serialization::CDRSerializerAdapter,
dds::qos::policy::Liveliness, structure::time::Timestamp,
};
use crate::structure::entity::{RTPSEntity};
use crate::structure::{
dds_cache::DDSCache,
guid::{GUID, EntityId},
topic_kind::TopicKind,
cache_change::ChangeKind,
};
use crate::dds::pubsub::Publisher;
use crate::dds::topic::Topic;
use crate::log_and_err_precondition_not_met;
use crate::log_and_err_internal;
use crate::dds::values::result::{ Result, Error, };
use crate::dds::statusevents::*;
use crate::dds::traits::dds_entity::DDSEntity;
use crate::dds::traits::key::*;
use crate::dds::traits::TopicDescription;
use crate::dds::helpers::*;
use crate::dds::qos::{
HasQoSPolicy, QosPolicies,
policy::{Reliability},
};
use crate::dds::traits::serde_adapters::with_key::SerializerAdapter;
use crate::messages::submessages::submessage_elements::serialized_payload::SerializedPayload;
use crate::{discovery::data_types::topic_data::SubscriptionBuiltinTopicData, dds::ddsdata::DDSData};
use super::super::{datasample_cache::DataSampleCache, writer::WriterCommand, };
pub struct DataWriter<D: Keyed + Serialize, SA: SerializerAdapter<D> = CDRSerializerAdapter<D>>
{
my_publisher: Publisher,
my_topic: Topic,
qos_policy: QosPolicies,
my_guid: GUID,
cc_upload: mio_channel::SyncSender<WriterCommand>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
dds_cache: Arc<RwLock<DDSCache>>,
datasample_cache: DataSampleCache<D>,
phantom: PhantomData<SA>,
status_receiver: StatusReceiver<DataWriterStatus>,
}
impl<D, SA> Drop for DataWriter<D, SA>
where
D: Keyed + Serialize,
SA: SerializerAdapter<D>,
{
fn drop(&mut self) {
match self
.discovery_command
.send(DiscoveryCommand::REMOVE_LOCAL_WRITER {
guid: self.get_guid(),
}) {
Ok(_) => {}
Err(e) => error!(
"Failed to send REMOVE_LOCAL_WRITER DiscoveryCommand. {:?}",
e
),
}
}
}
impl<D, SA> DataWriter<D, SA>
where
D: Keyed + Serialize,
<D as Keyed>::K: Key,
SA: SerializerAdapter<D>,
{
pub(crate) fn new(
publisher: Publisher,
topic: Topic,
guid: Option<GUID>,
cc_upload: mio_channel::SyncSender<WriterCommand>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
dds_cache: Arc<RwLock<DDSCache>>,
status_receiver_rec: Receiver<DataWriterStatus>,
) -> Result<DataWriter<D, SA>> {
let entity_id = match guid {
Some(g) => g.entityId.clone(),
None => EntityId::ENTITYID_UNKNOWN,
};
let dp = match publisher.get_participant() {
Some(dp) => dp,
None => return
log_and_err_precondition_not_met!("Cannot create new DataWriter, DomainParticipant doesn't exist.") ,
};
let my_guid = GUID::new_with_prefix_and_id(dp.get_guid_prefix().clone(), entity_id );
match dds_cache.write() {
Ok(mut cache) => cache.add_new_topic(
&String::from(topic.get_name()),
TopicKind::NoKey,
topic.get_type(),
),
Err(e) => panic!("DDSCache is poisoned. {:?}", e),
};
match topic.get_qos().liveliness {
Some(lv) => match lv {
Liveliness::Automatic { lease_duration: _ } => (),
Liveliness::ManualByParticipant { lease_duration: _ } => {
match discovery_command.send(DiscoveryCommand::MANUAL_ASSERT_LIVELINESS) {
Ok(_) => (),
Err(e) => {
error!("Failed to send DiscoveryCommand - Refresh. {:?}", e);
}
}
}
Liveliness::ManualByTopic { lease_duration: _ } => (),
},
None => (),
};
let qos = topic.get_qos().clone();
Ok(DataWriter {
my_publisher: publisher,
my_topic: topic,
qos_policy: qos.clone(),
my_guid,
cc_upload,
discovery_command,
dds_cache,
datasample_cache: DataSampleCache::new(qos),
phantom: PhantomData,
status_receiver: StatusReceiver::new(status_receiver_rec),
})
}
pub fn refresh_manual_liveliness(&self) {
match self.get_qos().liveliness {
Some(lv) => match lv {
Liveliness::Automatic { lease_duration: _ } => (),
Liveliness::ManualByParticipant { lease_duration: _ } => {
match self
.discovery_command
.send(DiscoveryCommand::MANUAL_ASSERT_LIVELINESS)
{
Ok(_) => (),
Err(e) => {
error!("Failed to send DiscoveryCommand - Refresh. {:?}", e);
}
}
}
Liveliness::ManualByTopic { lease_duration: _ } => (),
},
None => (),
};
}
pub fn write(&self, data: D, source_timestamp: Option<Timestamp>) -> Result<()> {
let send_buffer = SA::to_Bytes( &data )?;
let ddsdata = DDSData::new( SerializedPayload::new_from_Bytes( SA::output_encoding() , send_buffer) );
let writer_command = WriterCommand::DDSData { data: ddsdata , source_timestamp };
let timeout =
match self.get_qos().reliability() {
Some(Reliability::Reliable { max_blocking_time }) => Some(max_blocking_time),
_ => None,
};
match try_send_timeout(&self.cc_upload, writer_command, timeout)
{
Ok(_) => {
self.refresh_manual_liveliness();
Ok(())
}
Err(e) => {
warn!("Failed to write new data. {:?}", e);
Err(Error::OutOfResources)
}
}
}
pub fn wait_for_acknowledgments(&self, max_wait: Duration) -> Result<bool> {
match &self.qos_policy.reliability {
None => Ok(true),
Some(Reliability::BestEffort) => Ok(true),
Some(Reliability::Reliable { .. }) => {
let (acked_sender,acked_receiver) = mio_channel::sync_channel::<()>(1);
let poll = Poll::new()?;
poll.register(&acked_receiver, Token(0), Ready::readable(), PollOpt::edge() )?;
self.cc_upload.try_send(
WriterCommand::WaitForAcknowledgments { all_acked: acked_sender })?;
let mut events = Events::with_capacity(1);
poll.poll(&mut events, Some(max_wait) )?;
if let Some( _event ) = events.iter().next() {
let _ = acked_receiver.try_recv()
.or_else(|_e| log_and_err_internal!(
"wait_for_acknowledgments - Spurious poll event?"));
Ok(true)
} else {
Ok(false)
}
}
} }
pub fn get_topic(&self) -> &Topic {
&self.my_topic
}
pub fn get_publisher(&self) -> &Publisher {
&self.my_publisher
}
pub fn assert_liveliness(&self) -> Result<()> {
self.refresh_manual_liveliness();
match self.get_qos().liveliness {
Some(Liveliness::ManualByTopic { lease_duration: _ }) => {
self.discovery_command
.send(DiscoveryCommand::ASSERT_TOPIC_LIVELINESS {
writer_guid: self.get_guid(),
manual_assertion: true, })
.unwrap_or_else( |e| error!("assert_liveness - Failed to send DiscoveryCommand. {:?}", e))
}
_other => (),
}
Ok(())
}
pub fn get_matched_subscriptions(&self) -> Vec<SubscriptionBuiltinTopicData> {
todo!()
}
pub fn dispose(&self, key: <D as Keyed>::K, source_timestamp: Option<Timestamp>) -> Result<()> {
let send_buffer = SA::key_to_Bytes( &key )?;
let ddsdata = DDSData::new_disposed_by_key(
ChangeKind::NOT_ALIVE_DISPOSED,
SerializedPayload::new_from_Bytes( SA::output_encoding() , send_buffer)
);
self.cc_upload
.send(WriterCommand::DDSData { data: ddsdata , source_timestamp })
.or_else(|huh|
log_and_err_internal!("Cannot send dispose command: {:?}", huh))?;
self.refresh_manual_liveliness();
Ok(())
}
}
impl <D,SA> StatusEvented<DataWriterStatus> for DataWriter<D,SA>
where
D: Keyed + Serialize,
SA: SerializerAdapter<D>,
{
fn as_status_evented(&mut self) -> &dyn Evented {
self.status_receiver.as_status_evented()
}
fn try_recv_status(&self) -> Option<DataWriterStatus> {
self.status_receiver.try_recv_status()
}
}
impl<D, SA> RTPSEntity for DataWriter<D, SA>
where
D: Keyed + Serialize,
SA: SerializerAdapter<D>,
{
fn get_guid(&self) -> GUID {
self.my_guid
}
}
impl<D, SA> HasQoSPolicy for DataWriter<D, SA>
where
D: Keyed + Serialize,
SA: SerializerAdapter<D>,
{
fn get_qos(&self) -> QosPolicies {
self.qos_policy.clone()
}
}
impl<D, SA> DDSEntity for DataWriter<D, SA>
where
D: Keyed + Serialize,
SA: SerializerAdapter<D>,
{
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dds::participant::DomainParticipant;
use crate::test::random_data::*;
use std::thread;
use crate::dds::traits::key::Keyed;
use crate::serialization::cdr_serializer::CDRSerializerAdapter;
use byteorder::LittleEndian;
use log::info;
#[test]
fn dw_write_test() {
let domain_participant = DomainParticipant::new(0).expect("Publisher creation failed!");
let qos = QosPolicies::qos_none();
let _default_dw_qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic("Aasii", "Huh?", &qos, TopicKind::WithKey)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter(topic, None)
.expect("Failed to create datawriter");
let mut data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
data_writer
.write(data.clone(), None)
.expect("Unable to write data");
data.a = 5;
let timestamp = Timestamp::now();
data_writer
.write(data, Some(timestamp))
.expect("Unable to write data with timestamp");
}
#[test]
fn dw_dispose_test() {
let domain_participant = DomainParticipant::new(0).expect("Publisher creation failed!");
let qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic("Aasii", "Huh?", &qos, TopicKind::WithKey)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter(topic, None)
.expect("Failed to create datawriter");
let data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
let key = &data.get_key().into_hash_key();
info!("key: {:?}", key);
data_writer
.write(data.clone(), None)
.expect("Unable to write data");
thread::sleep(Duration::from_millis(100));
data_writer
.dispose(data.get_key(), None)
.expect("Unable to dispose data");
}
#[test]
fn dw_wait_for_ack_test() {
let domain_participant = DomainParticipant::new(0).expect("Participant creation failed!");
let qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic("Aasii", "Huh?", &qos, TopicKind::WithKey)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter(topic, None)
.expect("Failed to create datawriter");
let data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
data_writer.write(data, None).expect("Unable to write data");
let res = data_writer
.wait_for_acknowledgments(Duration::from_secs(2))
.unwrap();
assert_eq!(res, true); }
}