use std::{
marker::PhantomData,
sync::{Arc, RwLock},
time::Duration,
};
use mio::Evented;
use mio_extras::channel::{self as mio_channel, Receiver};
use serde::Serialize;
use log::{error, warn};
use crate::{
discovery::discovery::DiscoveryCommand, serialization::CDRSerializerAdapter,
dds::qos::policy::Liveliness, structure::time::Timestamp,
};
use crate::structure::entity::{RTPSEntity};
use crate::structure::{
dds_cache::DDSCache,
guid::{GUID, EntityId},
topic_kind::TopicKind,
};
use crate::dds::pubsub::Publisher;
use crate::dds::topic::Topic;
use crate::log_and_err_precondition_not_met;
use crate::dds::values::result::{ Result, Error, };
use crate::dds::statusevents::*;
use crate::dds::traits::dds_entity::DDSEntity;
use crate::dds::traits::key::*;
use crate::dds::traits::TopicDescription;
use crate::dds::qos::{
HasQoSPolicy, QosPolicies,
policy::{Reliability},
};
use crate::dds::traits::serde_adapters::SerializerAdapter;
use crate::dds::with_key::datasample::DataSample;
use crate::{discovery::data_types::topic_data::SubscriptionBuiltinTopicData, dds::ddsdata::DDSData};
use super::super::{datasample_cache::DataSampleCache, writer::WriterCommand, };
pub struct DataWriter<D: Keyed + Serialize, SA: SerializerAdapter<D> = CDRSerializerAdapter<D>>
{
my_publisher: Publisher,
my_topic: Topic,
qos_policy: QosPolicies,
my_guid: GUID,
cc_upload: mio_channel::SyncSender<WriterCommand>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
dds_cache: Arc<RwLock<DDSCache>>,
datasample_cache: DataSampleCache<D>,
phantom: PhantomData<SA>,
status_receiver: StatusReceiver<DataWriterStatus>,
}
impl<D, SA> Drop for DataWriter<D, SA>
where
D: Keyed + Serialize,
SA: SerializerAdapter<D>,
{
fn drop(&mut self) {
match self
.discovery_command
.send(DiscoveryCommand::REMOVE_LOCAL_WRITER {
guid: self.get_guid(),
}) {
Ok(_) => {}
Err(e) => error!(
"Failed to send REMOVE_LOCAL_WRITER DiscoveryCommand. {:?}",
e
),
}
}
}
impl<D, SA> DataWriter<D, SA>
where
D: Keyed + Serialize,
<D as Keyed>::K: Key,
SA: SerializerAdapter<D>,
{
pub(crate) fn new(
publisher: Publisher,
topic: Topic,
guid: Option<GUID>,
cc_upload: mio_channel::SyncSender<WriterCommand>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
dds_cache: Arc<RwLock<DDSCache>>,
status_receiver_rec: Receiver<DataWriterStatus>,
) -> Result<DataWriter<D, SA>> {
let entity_id = match guid {
Some(g) => g.entityId.clone(),
None => EntityId::ENTITYID_UNKNOWN,
};
let dp = match publisher.get_participant() {
Some(dp) => dp,
None => return
log_and_err_precondition_not_met!("Cannot create new DataWriter, DomainParticipant doesn't exist.") ,
};
let my_guid = GUID::new_with_prefix_and_id(dp.get_guid_prefix().clone(), entity_id );
match dds_cache.write() {
Ok(mut cache) => cache.add_new_topic(
&String::from(topic.get_name()),
TopicKind::NoKey,
topic.get_type(),
),
Err(e) => panic!("DDSCache is poisoned. {:?}", e),
};
match topic.get_qos().liveliness {
Some(lv) => match lv {
Liveliness::Automatic { lease_duration: _ } => (),
Liveliness::ManualByParticipant { lease_duration: _ } => {
match discovery_command.send(DiscoveryCommand::MANUAL_ASSERT_LIVELINESS) {
Ok(_) => (),
Err(e) => {
error!("Failed to send DiscoveryCommand - Refresh. {:?}", e);
}
}
}
Liveliness::ManualByTopic { lease_duration: _ } => (),
},
None => (),
};
let qos = topic.get_qos().clone();
Ok(DataWriter {
my_publisher: publisher,
my_topic: topic,
qos_policy: qos.clone(),
my_guid,
cc_upload,
discovery_command,
dds_cache,
datasample_cache: DataSampleCache::new(qos),
phantom: PhantomData,
status_receiver: StatusReceiver::new(status_receiver_rec),
})
}
pub fn refresh_manual_liveliness(&self) {
match self.get_qos().liveliness {
Some(lv) => match lv {
Liveliness::Automatic { lease_duration: _ } => (),
Liveliness::ManualByParticipant { lease_duration: _ } => {
match self
.discovery_command
.send(DiscoveryCommand::MANUAL_ASSERT_LIVELINESS)
{
Ok(_) => (),
Err(e) => {
error!("Failed to send DiscoveryCommand - Refresh. {:?}", e);
}
}
}
Liveliness::ManualByTopic { lease_duration: _ } => (),
},
None => (),
};
}
pub fn write(&self, data: D, source_timestamp: Option<Timestamp>) -> Result<()> {
let mut ddsdata = DDSData::from(&data, source_timestamp);
ddsdata.value_key_hash = data.get_key().into_hash_key();
let _data_sample = match source_timestamp {
Some(t) => DataSample::new_deprecated(t, data, self.get_guid()),
None => DataSample::new_deprecated(Timestamp::now(), data, self.get_guid()),
};
match self
.cc_upload
.try_send(WriterCommand::DDSData { data: ddsdata })
{
Ok(_) => {
self.refresh_manual_liveliness();
Ok(())
}
Err(e) => {
warn!("Failed to write new data. {:?}", e);
Err(Error::OutOfResources)
}
}
}
pub fn wait_for_acknowledgments(&self, _max_wait: Duration) -> Result<()> {
match &self.qos_policy.reliability {
Some(rel) => match rel {
Reliability::BestEffort => return Ok(()),
Reliability::Reliable {
max_blocking_time: _,
} =>
{
()
}
},
None => return Ok(()),
};
return Err(Error::Unsupported);
}
pub fn get_topic(&self) -> &Topic {
&self.my_topic
}
pub fn get_publisher(&self) -> &Publisher {
&self.my_publisher
}
pub fn assert_liveliness(&self) -> Result<()> {
self.refresh_manual_liveliness();
match self.get_qos().liveliness {
Some(Liveliness::ManualByTopic { lease_duration: _ }) => {
self.discovery_command
.send(DiscoveryCommand::ASSERT_TOPIC_LIVELINESS {
writer_guid: self.get_guid(),
manual_assertion: true, })
.unwrap_or_else( |e| error!("assert_liveness - Failed to send DiscoveryCommand. {:?}", e))
}
_other => (),
}
Ok(())
}
pub fn get_matched_subscriptions(&self) -> Vec<SubscriptionBuiltinTopicData> {
todo!()
}
pub fn dispose(&self, key: <D as Keyed>::K, source_timestamp: Option<Timestamp>) -> Result<()> {
let mut ddsdata = DDSData::from_dispose::<D>(key.clone(), source_timestamp);
ddsdata.value_key_hash = key.into_hash_key();
let _data_sample: DataSample<D> = match source_timestamp {
Some(t) => DataSample::<D>::new_disposed::<<D as Keyed>::K>(t, key, self.get_guid()),
None => DataSample::new_disposed::<<D as Keyed>::K>(Timestamp::now(), key, self.get_guid()),
};
match self
.cc_upload
.try_send(WriterCommand::DDSData { data: ddsdata })
{
Ok(_) => {
self.refresh_manual_liveliness();
Ok(())
}
Err(huh) => {
warn!("Error: {:?}", huh);
Err(Error::OutOfResources)
}
}
}
}
impl <D,SA> StatusEvented<DataWriterStatus> for DataWriter<D,SA>
where
D: Keyed + Serialize,
SA: SerializerAdapter<D>,
{
fn as_status_evented(&mut self) -> &dyn Evented {
self.status_receiver.as_status_evented()
}
fn try_recv_status(&self) -> Option<DataWriterStatus> {
self.status_receiver.try_recv_status()
}
}
impl<D, SA> RTPSEntity for DataWriter<D, SA>
where
D: Keyed + Serialize,
SA: SerializerAdapter<D>,
{
fn get_guid(&self) -> GUID {
self.my_guid
}
}
impl<D, SA> HasQoSPolicy for DataWriter<D, SA>
where
D: Keyed + Serialize,
SA: SerializerAdapter<D>,
{
fn get_qos(&self) -> QosPolicies {
self.qos_policy.clone()
}
}
impl<D, SA> DDSEntity for DataWriter<D, SA>
where
D: Keyed + Serialize,
SA: SerializerAdapter<D>,
{
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dds::participant::DomainParticipant;
use crate::test::random_data::*;
use std::thread;
use crate::dds::traits::key::Keyed;
use crate::serialization::cdr_serializer::CDRSerializerAdapter;
use byteorder::LittleEndian;
use log::info;
#[test]
fn dw_write_test() {
let domain_participant = DomainParticipant::new(0);
let qos = QosPolicies::qos_none();
let _default_dw_qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic("Aasii", "Huh?", &qos, TopicKind::WithKey)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter(None, topic, None)
.expect("Failed to create datawriter");
let mut data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
data_writer
.write(data.clone(), None)
.expect("Unable to write data");
data.a = 5;
let timestamp = Timestamp::now();
data_writer
.write(data, Some(timestamp))
.expect("Unable to write data with timestamp");
}
#[test]
fn dw_dispose_test() {
let domain_participant = DomainParticipant::new(0);
let qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic("Aasii", "Huh?", &qos, TopicKind::WithKey)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter(None, topic, None)
.expect("Failed to create datawriter");
let data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
let key = &data.get_key().into_hash_key();
info!("key: {:?}", key);
data_writer
.write(data.clone(), None)
.expect("Unable to write data");
thread::sleep(Duration::from_millis(100));
data_writer
.dispose(data.get_key(), None)
.expect("Unable to dispose data");
}
#[test]
fn dw_wait_for_ack_test() {
let domain_participant = DomainParticipant::new(0);
let qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic("Aasii", "Huh?", &qos, TopicKind::WithKey)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter(None, topic, None)
.expect("Failed to create datawriter");
let data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
data_writer.write(data, None).expect("Unable to write data");
let res = data_writer
.wait_for_acknowledgments(Duration::from_secs(5))
.unwrap();
assert_eq!(res, ());
}
}