use std::{
fmt::Debug,
sync::{Arc, Mutex, MutexGuard, RwLock},
time::Duration,
};
use mio_extras::channel as mio_channel;
use serde::{de::DeserializeOwned, Serialize};
use byteorder::LittleEndian;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::{
dds::{
data_types::EntityKind,
no_key::{
datareader::DataReader as NoKeyDataReader, datawriter::DataWriter as NoKeyDataWriter,
},
participant::*,
qos::*,
reader::ReaderIngredients,
statusevents::DataReaderStatus,
topic::*,
traits::{
key::{Key, Keyed},
serde_adapters::{no_key, with_key},
},
values::result::{Error, Result},
with_key::{
datareader::DataReader as WithKeyDataReader, datawriter::DataWriter as WithKeyDataWriter,
},
writer::WriterIngredients,
},
discovery::{
data_types::topic_data::DiscoveredWriterData, discovery::DiscoveryCommand,
discovery_db::DiscoveryDB,
},
log_and_err_internal, log_and_err_precondition_not_met,
serialization::{cdr_deserializer::CDRDeserializerAdapter, cdr_serializer::CDRSerializerAdapter},
structure::{
entity::RTPSEntity,
guid::{EntityId, GUID},
topic_kind::TopicKind,
},
};
use super::{
no_key::wrappers::{DAWrapper, NoKeyWrapper, SAWrapper},
with_key::datareader::ReaderCommand,
writer::WriterCommand,
};
#[derive(Clone)]
pub struct Publisher {
inner: Arc<Mutex<InnerPublisher>>,
}
impl Publisher {
pub(super) fn new(
dp: DomainParticipantWeak,
discovery_db: Arc<RwLock<DiscoveryDB>>,
qos: QosPolicies,
default_dw_qos: QosPolicies,
add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
remove_writer_sender: mio_channel::SyncSender<GUID>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
) -> Self {
Self {
inner: Arc::new(Mutex::new(InnerPublisher::new(
dp,
discovery_db,
qos,
default_dw_qos,
add_writer_sender,
remove_writer_sender,
discovery_command,
))),
}
}
fn inner_lock(&self) -> MutexGuard<'_, InnerPublisher> {
self
.inner
.lock()
.unwrap_or_else(|e| panic!("Inner publisher lock fail! {:?}", e))
}
pub fn create_datawriter<D, SA>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> Result<WithKeyDataWriter<D, SA>>
where
D: Keyed + Serialize,
<D as Keyed>::K: Key,
SA: with_key::SerializerAdapter<D>,
{
self.inner_lock().create_datawriter(self, None, topic, qos)
}
pub fn create_datawriter_cdr<D>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> Result<WithKeyDataWriter<D, CDRSerializerAdapter<D, LittleEndian>>>
where
D: Keyed + Serialize,
<D as Keyed>::K: Key,
{
self.create_datawriter::<D, CDRSerializerAdapter<D, LittleEndian>>(topic, qos)
}
pub(crate) fn create_datawriter_with_entityid<D, SA>(
&self,
entity_id: EntityId,
topic: &Topic,
qos: Option<QosPolicies>,
) -> Result<WithKeyDataWriter<D, SA>>
where
D: Keyed + Serialize,
<D as Keyed>::K: Key,
SA: with_key::SerializerAdapter<D>,
{
self
.inner_lock()
.create_datawriter(self, Some(entity_id), topic, qos)
}
pub(crate) fn create_datawriter_cdr_with_entityid<D>(
&self,
entity_id: EntityId,
topic: &Topic,
qos: Option<QosPolicies>,
) -> Result<WithKeyDataWriter<D, CDRSerializerAdapter<D, LittleEndian>>>
where
D: Keyed + Serialize,
<D as Keyed>::K: Key,
{
self.create_datawriter_with_entityid::<D, CDRSerializerAdapter<D, LittleEndian>>(
entity_id, topic, qos,
)
}
pub fn create_datawriter_no_key<D, SA>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> Result<NoKeyDataWriter<D, SA>>
where
D: Serialize,
SA: no_key::SerializerAdapter<D>,
{
self
.inner_lock()
.create_datawriter_no_key(self, None, topic, qos)
}
pub fn create_datawriter_no_key_cdr<D>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> Result<NoKeyDataWriter<D, CDRSerializerAdapter<D, LittleEndian>>>
where
D: Serialize,
{
self.create_datawriter_no_key::<D, CDRSerializerAdapter<D, LittleEndian>>(topic, qos)
}
pub fn suspend_publications(&self) -> Result<()> {
self.inner_lock().suspend_publications()
}
pub fn resume_publications(&self) -> Result<()> {
self.inner_lock().resume_publications()
}
pub fn begin_coherent_changes(&self) -> Result<()> {
self.inner_lock().begin_coherent_changes()
}
pub fn end_coherent_changes(&self) -> Result<()> {
self.inner_lock().end_coherent_changes()
}
pub fn wait_for_acknowledgments(&self, max_wait: Duration) -> Result<()> {
self.inner_lock().wait_for_acknowledgments(max_wait)
}
pub fn participant(&self) -> Option<DomainParticipant> {
self.inner_lock().domain_participant.clone().upgrade()
}
pub fn get_default_datawriter_qos(&self) -> QosPolicies {
self.inner_lock().get_default_datawriter_qos().clone()
}
pub fn set_default_datawriter_qos(&mut self, q: &QosPolicies) {
self.inner_lock().set_default_datawriter_qos(q);
}
pub(crate) fn remove_writer(&self, guid: GUID) {
self.inner_lock().remove_writer(guid);
}
}
impl PartialEq for Publisher {
fn eq(&self, other: &Self) -> bool {
let id_self = { self.inner_lock().identity() };
let id_other = { other.inner_lock().identity() };
id_self == id_other
}
}
impl Debug for Publisher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.inner_lock().fmt(f)
}
}
#[derive(Clone)]
struct InnerPublisher {
id: EntityId,
domain_participant: DomainParticipantWeak,
discovery_db: Arc<RwLock<DiscoveryDB>>,
my_qos_policies: QosPolicies,
default_datawriter_qos: QosPolicies, add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
remove_writer_sender: mio_channel::SyncSender<GUID>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
}
impl InnerPublisher {
fn new(
dp: DomainParticipantWeak,
discovery_db: Arc<RwLock<DiscoveryDB>>,
qos: QosPolicies,
default_dw_qos: QosPolicies,
add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
remove_writer_sender: mio_channel::SyncSender<GUID>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
) -> Self {
let id = EntityId::MAX;
Self {
id,
domain_participant: dp,
discovery_db,
my_qos_policies: qos,
default_datawriter_qos: default_dw_qos,
add_writer_sender,
remove_writer_sender,
discovery_command,
}
}
pub fn create_datawriter<D, SA>(
&self,
outer: &Publisher,
entity_id_opt: Option<EntityId>,
topic: &Topic,
optional_qos: Option<QosPolicies>,
) -> Result<WithKeyDataWriter<D, SA>>
where
D: Keyed + Serialize,
<D as Keyed>::K: Key,
SA: with_key::SerializerAdapter<D>,
{
let (dwcc_upload, hccc_download) = mio_channel::sync_channel::<WriterCommand>(16);
let (status_sender, status_receiver) = mio_channel::sync_channel(4);
let writer_qos = self
.default_datawriter_qos
.modify_by(&topic.qos())
.modify_by(&optional_qos.unwrap_or_else(QosPolicies::qos_none));
let entity_id =
self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::WRITER_WITH_KEY_USER_DEFINED);
let dp = self
.participant()
.ok_or("upgrade fail")
.or_else(|e| log_and_err_internal!("Where is my DomainParticipant? {}", e))?;
let guid = GUID::new_with_prefix_and_id(dp.guid().prefix, entity_id);
let new_writer = WriterIngredients {
guid,
writer_command_receiver: hccc_download,
topic_name: topic.name(),
qos_policies: writer_qos,
status_sender,
};
self
.add_writer_sender
.send(new_writer)
.or_else(|e| log_and_err_internal!("Adding a new writer failed: {}", e))?;
let data_writer = WithKeyDataWriter::<D, SA>::new(
outer.clone(),
topic.clone(),
guid,
dwcc_upload,
self.discovery_command.clone(),
&dp.dds_cache(),
status_receiver,
)?;
let mut db = self.discovery_db.write()?;
let dwd = DiscoveredWriterData::new(&data_writer, topic, &dp);
db.update_local_topic_writer(dwd);
db.update_topic_data_p(topic);
Ok(data_writer)
}
pub fn create_datawriter_no_key<D, SA>(
&self,
outer: &Publisher,
entity_id_opt: Option<EntityId>,
topic: &Topic,
qos: Option<QosPolicies>,
) -> Result<NoKeyDataWriter<D, SA>>
where
D: Serialize,
SA: no_key::SerializerAdapter<D>,
{
let entity_id =
self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::WRITER_NO_KEY_USER_DEFINED);
let d = self.create_datawriter::<NoKeyWrapper<D>, SAWrapper<SA>>(
outer,
Some(entity_id),
topic,
qos,
)?;
Ok(NoKeyDataWriter::<D, SA>::from_keyed(d))
}
pub fn suspend_publications(&self) -> Result<()> {
Ok(())
}
pub fn resume_publications(&self) -> Result<()> {
Ok(())
}
pub fn begin_coherent_changes(&self) -> Result<()> {
Ok(())
}
pub fn end_coherent_changes(&self) -> Result<()> {
Ok(())
}
pub(crate) fn wait_for_acknowledgments(&self, _max_wait: Duration) -> Result<()> {
unimplemented!();
}
pub fn participant(&self) -> Option<DomainParticipant> {
self.domain_participant.clone().upgrade()
}
pub fn get_default_datawriter_qos(&self) -> &QosPolicies {
&self.default_datawriter_qos
}
pub fn set_default_datawriter_qos(&mut self, q: &QosPolicies) {
self.default_datawriter_qos = q.clone();
}
fn unwrap_or_new_entity_id(
&self,
entity_id_opt: Option<EntityId>,
entity_kind: EntityKind,
) -> EntityId {
entity_id_opt.unwrap_or_else(|| self.participant().unwrap().new_entity_id(entity_kind))
}
pub(crate) fn remove_writer(&self, guid: GUID) {
self
.remove_writer_sender
.try_send(guid)
.unwrap_or_else(|e| error!("Cannot remove Writer {:?} : {:?}", guid, e));
}
pub(crate) fn identity(&self) -> EntityId {
self.id
}
}
impl Debug for InnerPublisher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("{:?}", self.participant()))?;
f.write_fmt(format_args!("Publisher QoS: {:?}", self.my_qos_policies))?;
f.write_fmt(format_args!(
"Publishers default Writer QoS: {:?}",
self.default_datawriter_qos
))
}
}
#[derive(Clone)]
pub struct Subscriber {
inner: Arc<InnerSubscriber>,
}
impl Subscriber {
pub(super) fn new(
domain_participant: DomainParticipantWeak,
discovery_db: Arc<RwLock<DiscoveryDB>>,
qos: QosPolicies,
sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
sender_remove_reader: mio_channel::SyncSender<GUID>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
) -> Self {
Self {
inner: Arc::new(InnerSubscriber::new(
domain_participant,
discovery_db,
qos,
sender_add_reader,
sender_remove_reader,
discovery_command,
)),
}
}
pub fn create_datareader<D: 'static, SA>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> Result<WithKeyDataReader<D, SA>>
where
D: DeserializeOwned + Keyed,
<D as Keyed>::K: Key,
SA: with_key::DeserializerAdapter<D>,
{
self.inner.create_datareader(self, topic, None, qos)
}
pub fn create_datareader_cdr<D: 'static>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> Result<WithKeyDataReader<D, CDRDeserializerAdapter<D>>>
where
D: DeserializeOwned + Keyed,
<D as Keyed>::K: Key,
{
self.create_datareader::<D, CDRDeserializerAdapter<D>>(topic, qos)
}
pub(crate) fn create_datareader_with_entityid<D: 'static, SA>(
&self,
topic: &Topic,
entity_id: EntityId,
qos: Option<QosPolicies>,
) -> Result<WithKeyDataReader<D, SA>>
where
D: DeserializeOwned + Keyed,
<D as Keyed>::K: Key,
SA: with_key::DeserializerAdapter<D>,
{
self
.inner
.create_datareader(self, topic, Some(entity_id), qos)
}
pub(crate) fn create_datareader_cdr_with_entityid<D: 'static>(
&self,
topic: &Topic,
entity_id: EntityId,
qos: Option<QosPolicies>,
) -> Result<WithKeyDataReader<D, CDRDeserializerAdapter<D>>>
where
D: DeserializeOwned + Keyed,
<D as Keyed>::K: Key,
{
self.create_datareader_with_entityid::<D, CDRDeserializerAdapter<D>>(topic, entity_id, qos)
}
pub fn create_datareader_no_key<D: 'static, SA>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> Result<NoKeyDataReader<D, SA>>
where
D: DeserializeOwned,
SA: no_key::DeserializerAdapter<D>,
{
self.inner.create_datareader_no_key(self, topic, None, qos)
}
pub fn create_datareader_no_key_cdr<D: 'static>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> Result<NoKeyDataReader<D, CDRDeserializerAdapter<D>>>
where
D: DeserializeOwned,
{
self.create_datareader_no_key::<D, CDRDeserializerAdapter<D>>(topic, qos)
}
pub fn participant(&self) -> Option<DomainParticipant> {
self.inner.participant()
}
pub(crate) fn remove_reader(&self, guid: GUID) {
self.inner.remove_reader(guid);
}
}
#[derive(Clone)]
pub struct InnerSubscriber {
domain_participant: DomainParticipantWeak,
discovery_db: Arc<RwLock<DiscoveryDB>>,
qos: QosPolicies,
sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
sender_remove_reader: mio_channel::SyncSender<GUID>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
}
impl InnerSubscriber {
pub(super) fn new(
domain_participant: DomainParticipantWeak,
discovery_db: Arc<RwLock<DiscoveryDB>>,
qos: QosPolicies,
sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
sender_remove_reader: mio_channel::SyncSender<GUID>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
) -> Self {
Self {
domain_participant,
discovery_db,
qos,
sender_add_reader,
sender_remove_reader,
discovery_command,
}
}
fn create_datareader_internal<D: 'static, SA>(
&self,
outer: &Subscriber,
entity_id_opt: Option<EntityId>,
topic: &Topic,
optional_qos: Option<QosPolicies>,
) -> Result<WithKeyDataReader<D, SA>>
where
D: DeserializeOwned + Keyed,
<D as Keyed>::K: Key,
SA: with_key::DeserializerAdapter<D>,
{
let (send, rec) = mio_channel::sync_channel::<()>(4);
let (status_sender, status_receiver) = mio_channel::sync_channel::<DataReaderStatus>(4);
let (reader_command_sender, reader_command_receiver) =
mio_channel::sync_channel::<ReaderCommand>(4);
let qos = self
.qos
.modify_by(&topic.qos())
.modify_by(&optional_qos.unwrap_or_else(QosPolicies::qos_none));
let entity_id =
self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_WITH_KEY_USER_DEFINED);
let dp = match self.participant() {
Some(dp) => dp,
None => return log_and_err_precondition_not_met!("DomainParticipant doesn't exist anymore."),
};
let reader_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), entity_id);
let new_reader = ReaderIngredients {
guid: reader_guid,
notification_sender: send,
status_sender,
topic_name: topic.name(),
qos_policy: qos.clone(),
data_reader_command_receiver: reader_command_receiver,
};
{
let mut db = self
.discovery_db
.write()
.or_else(|e| log_and_err_internal!("Cannot lock discovery_db. {}", e))?;
db.update_local_topic_reader(&dp, topic, &new_reader);
db.update_topic_data_p(topic);
}
let datareader = WithKeyDataReader::<D, SA>::new(
outer.clone(),
entity_id,
topic.clone(),
qos,
rec,
dp.dds_cache(),
self.discovery_command.clone(),
status_receiver,
reader_command_sender,
)?;
match dp.dds_cache().write() {
Ok(mut dds_cache) => {
dds_cache.add_new_topic(topic.name(), topic.get_type());
}
Err(e) => return log_and_err_internal!("Cannot lock DDScache. Error: {}", e),
}
self
.sender_add_reader
.try_send(new_reader)
.or_else(|e| log_and_err_internal!("Cannot add DataReader. Error: {}", e))?;
Ok(datareader)
}
pub fn create_datareader<D: 'static, SA>(
&self,
outer: &Subscriber,
topic: &Topic,
entity_id: Option<EntityId>,
qos: Option<QosPolicies>,
) -> Result<WithKeyDataReader<D, SA>>
where
D: DeserializeOwned + Keyed,
<D as Keyed>::K: Key,
SA: with_key::DeserializerAdapter<D>,
{
if topic.kind() != TopicKind::WithKey {
return Error::precondition_not_met(
"Topic is NO_KEY, but attempted to create WITH_KEY Datareader",
);
}
self.create_datareader_internal(outer, entity_id, topic, qos)
}
pub fn create_datareader_no_key<D: 'static, SA>(
&self,
outer: &Subscriber,
topic: &Topic,
entity_id_opt: Option<EntityId>,
qos: Option<QosPolicies>,
) -> Result<NoKeyDataReader<D, SA>>
where
D: DeserializeOwned,
SA: no_key::DeserializerAdapter<D>,
{
if topic.kind() != TopicKind::NoKey {
return Error::precondition_not_met(
"Topic is WITH_KEY, but attempted to create NO_KEY Datareader",
);
}
let entity_id =
self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_NO_KEY_USER_DEFINED);
let d = self.create_datareader_internal::<NoKeyWrapper<D>, DAWrapper<SA>>(
outer,
Some(entity_id),
topic,
qos,
)?;
Ok(NoKeyDataReader::<D, SA>::from_keyed(d))
}
pub fn participant(&self) -> Option<DomainParticipant> {
self.domain_participant.clone().upgrade()
}
pub(crate) fn remove_reader(&self, guid: GUID) {
self
.sender_remove_reader
.try_send(guid)
.unwrap_or_else(|e| error!("Cannot remove Reader {:?} : {:?}", guid, e));
}
fn unwrap_or_new_entity_id(
&self,
entity_id_opt: Option<EntityId>,
entity_kind: EntityKind,
) -> EntityId {
entity_id_opt.unwrap_or_else(|| self.participant().unwrap().new_entity_id(entity_kind))
}
}
#[cfg(test)]
mod tests {}