use std::{
fmt::Debug,
sync::{Arc, Mutex, MutexGuard, RwLock},
time::Duration,
};
use serde::{Deserialize, Serialize};
use mio_extras::channel as mio_channel;
use byteorder::LittleEndian;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::{
create_error_dropped, create_error_poisoned,
dds::{
adapters,
key::Keyed,
no_key,
no_key::{
datareader::DataReader as NoKeyDataReader, datawriter::DataWriter as NoKeyDataWriter,
},
participant::*,
qos::*,
result::{CreateError, CreateResult, WaitResult},
statusevents::{sync_status_channel, DataReaderStatus},
topic::*,
with_key,
with_key::{
datareader::DataReader as WithKeyDataReader, datawriter::DataWriter as WithKeyDataWriter,
},
},
discovery::{
discovery::DiscoveryCommand, discovery_db::DiscoveryDB, sedp_messages::DiscoveredWriterData,
},
mio_source,
rtps::{
reader::ReaderIngredients,
writer::{WriterCommand, WriterIngredients},
},
serialization::{CDRDeserializerAdapter, CDRSerializerAdapter},
structure::{
entity::RTPSEntity,
guid::{EntityId, EntityKind, GUID},
},
};
use super::{
helpers::try_send_timeout,
no_key::wrappers::{DAWrapper, NoKeyWrapper, SAWrapper},
with_key::simpledatareader::ReaderCommand,
};
#[cfg(feature = "security")]
use crate::{
create_error_internal, create_error_not_allowed_by_security,
security::{security_plugins::SecurityPluginsHandle, EndpointSecurityInfo},
};
#[cfg(not(feature = "security"))]
use crate::no_security::{security_plugins::SecurityPluginsHandle, EndpointSecurityInfo};
#[derive(Clone)]
pub struct Publisher {
inner: Arc<Mutex<InnerPublisher>>,
}
impl Publisher {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
dp: DomainParticipantWeak,
discovery_db: Arc<RwLock<DiscoveryDB>>,
qos: QosPolicies,
default_dw_qos: QosPolicies,
add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
remove_writer_sender: mio_channel::SyncSender<GUID>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
security_plugins_handle: Option<SecurityPluginsHandle>,
) -> Self {
Self {
inner: Arc::new(Mutex::new(InnerPublisher::new(
dp,
discovery_db,
qos,
default_dw_qos,
add_writer_sender,
remove_writer_sender,
discovery_command,
security_plugins_handle,
))),
}
}
fn inner_lock(&self) -> MutexGuard<'_, InnerPublisher> {
self
.inner
.lock()
.unwrap_or_else(|e| panic!("Inner publisher lock fail! {e:?}"))
}
pub fn create_datawriter<D, SA>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> CreateResult<WithKeyDataWriter<D, SA>>
where
D: Keyed,
SA: adapters::with_key::SerializerAdapter<D>,
{
self
.inner_lock()
.create_datawriter(self, None, topic, qos, false)
}
pub fn create_datawriter_cdr<D>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> CreateResult<WithKeyDataWriter<D, CDRSerializerAdapter<D, LittleEndian>>>
where
D: Keyed + serde::Serialize,
<D as Keyed>::K: Serialize,
{
self.create_datawriter::<D, CDRSerializerAdapter<D, LittleEndian>>(topic, qos)
}
pub fn create_datawriter_no_key<D, SA>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> CreateResult<NoKeyDataWriter<D, SA>>
where
SA: adapters::no_key::SerializerAdapter<D>,
{
self
.inner_lock()
.create_datawriter_no_key(self, None, topic, qos, false)
}
pub fn create_datawriter_no_key_cdr<D>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> CreateResult<NoKeyDataWriter<D, CDRSerializerAdapter<D, LittleEndian>>>
where
D: serde::Serialize,
{
self.create_datawriter_no_key::<D, CDRSerializerAdapter<D, LittleEndian>>(topic, qos)
}
pub(crate) fn create_datawriter_with_entity_id_with_key<D, SA>(
&self,
entity_id: EntityId,
topic: &Topic,
qos: Option<QosPolicies>,
writer_like_stateless: bool, ) -> CreateResult<WithKeyDataWriter<D, SA>>
where
D: Keyed,
SA: adapters::with_key::SerializerAdapter<D>,
{
self
.inner_lock()
.create_datawriter(self, Some(entity_id), topic, qos, writer_like_stateless)
}
#[cfg(feature = "security")] pub(crate) fn create_datawriter_with_entity_id_no_key<D, SA>(
&self,
entity_id: EntityId,
topic: &Topic,
qos: Option<QosPolicies>,
writer_like_stateless: bool, ) -> CreateResult<NoKeyDataWriter<D, SA>>
where
SA: crate::no_key::SerializerAdapter<D>,
{
self.inner_lock().create_datawriter_no_key(
self,
Some(entity_id),
topic,
qos,
writer_like_stateless,
)
}
#[deprecated(note = "unimplemented")]
pub fn suspend_publications(&self) {
unimplemented!();
}
#[deprecated(note = "unimplemented")]
pub fn resume_publications(&self) {
unimplemented!();
}
#[deprecated(note = "unimplemented")]
pub fn begin_coherent_changes(&self) {}
#[deprecated(note = "unimplemented")]
pub fn end_coherent_changes(&self) {}
#[deprecated(note = "unimplemented")]
pub fn wait_for_acknowledgments(&self, _max_wait: Duration) -> WaitResult<()> {
unimplemented!();
}
pub fn participant(&self) -> Option<DomainParticipant> {
self.inner_lock().domain_participant.clone().upgrade()
}
pub fn get_default_datawriter_qos(&self) -> QosPolicies {
self.inner_lock().get_default_datawriter_qos().clone()
}
pub fn set_default_datawriter_qos(&mut self, q: &QosPolicies) {
self.inner_lock().set_default_datawriter_qos(q);
}
pub(crate) fn remove_writer(&self, guid: GUID) {
self.inner_lock().remove_writer(guid);
}
}
impl PartialEq for Publisher {
fn eq(&self, other: &Self) -> bool {
let id_self = { self.inner_lock().identity() };
let id_other = { other.inner_lock().identity() };
id_self == id_other
}
}
impl Debug for Publisher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.inner_lock().fmt(f)
}
}
#[derive(Clone)]
struct InnerPublisher {
id: EntityId,
domain_participant: DomainParticipantWeak,
discovery_db: Arc<RwLock<DiscoveryDB>>,
my_qos_policies: QosPolicies,
default_datawriter_qos: QosPolicies, add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
remove_writer_sender: mio_channel::SyncSender<GUID>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
security_plugins_handle: Option<SecurityPluginsHandle>,
}
impl InnerPublisher {
#[allow(clippy::too_many_arguments)]
fn new(
dp: DomainParticipantWeak,
discovery_db: Arc<RwLock<DiscoveryDB>>,
qos: QosPolicies,
default_dw_qos: QosPolicies,
add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
remove_writer_sender: mio_channel::SyncSender<GUID>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
security_plugins_handle: Option<SecurityPluginsHandle>,
) -> Self {
let id = EntityId::MAX;
Self {
id,
domain_participant: dp,
discovery_db,
my_qos_policies: qos,
default_datawriter_qos: default_dw_qos,
add_writer_sender,
remove_writer_sender,
discovery_command,
security_plugins_handle,
}
}
pub fn create_datawriter<D, SA>(
&self,
outer: &Publisher,
entity_id_opt: Option<EntityId>,
topic: &Topic,
optional_qos: Option<QosPolicies>,
writer_like_stateless: bool, ) -> CreateResult<WithKeyDataWriter<D, SA>>
where
D: Keyed,
SA: adapters::with_key::SerializerAdapter<D>,
{
let (dwcc_upload, hccc_download) = mio_channel::sync_channel::<WriterCommand>(16);
let writer_waker = Arc::new(Mutex::new(None));
let (status_sender, status_receiver) = sync_status_channel(4)?;
let writer_qos = self
.default_datawriter_qos
.modify_by(&topic.qos())
.modify_by(&optional_qos.unwrap_or_else(QosPolicies::qos_none));
let entity_id =
self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::WRITER_WITH_KEY_USER_DEFINED);
let dp = self
.participant()
.ok_or("upgrade fail")
.or_else(|e| create_error_dropped!("Where is my DomainParticipant? {}", e))?;
let guid = GUID::new_with_prefix_and_id(dp.guid().prefix, entity_id);
#[cfg(feature = "security")]
if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
let check_res = sec_handle.get_plugins().check_create_datawriter(
guid.prefix,
dp.domain_id(),
topic.name(),
&writer_qos,
);
match check_res {
Ok(check_passed) => {
if !check_passed {
return create_error_not_allowed_by_security!(
"Not allowed to create a DataWriter to topic {}",
topic.name()
);
}
}
Err(e) => {
return create_error_internal!(
"Failed to check DataWriter rights from Access control: {}",
e.msg
);
}
};
if let Err(e) = {
let writer_security_attributes = sec_handle
.get_plugins()
.get_writer_sec_attributes(guid, topic.name()); writer_security_attributes.and_then(|attributes| {
sec_handle
.get_plugins()
.register_local_writer(guid, writer_qos.property(), attributes)
})
} {
return create_error_internal!(
"Failed to register writer to crypto plugin: {} . GUID: {:?}",
e,
guid
);
} else {
info!("Registered local writer to crypto plugin. GUID: {guid:?}");
}
}
let data_writer = WithKeyDataWriter::<D, SA>::new(
outer.clone(),
topic.clone(),
writer_qos.clone(),
guid,
dwcc_upload,
Arc::clone(&writer_waker),
self.discovery_command.clone(),
status_receiver,
)?;
#[cfg(not(feature = "security"))]
let security_info = None;
#[cfg(feature = "security")]
let security_info = if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
if guid.entity_id.entity_kind.is_user_defined() {
match sec_handle
.get_plugins()
.get_writer_sec_attributes(guid, topic.name())
{
Ok(attr) => EndpointSecurityInfo::from(attr).into(),
Err(e) => {
return create_error_internal!(
"Failed to get security info for writer: {}. Guid: {:?}",
e,
guid
);
}
}
} else {
None }
} else {
None
};
let mut db = self
.discovery_db
.write()
.map_err(|e| CreateError::Poisoned {
reason: format!("Discovery DB: {e}"),
})?;
let dwd = DiscoveredWriterData::new(&data_writer, topic, &dp, security_info);
db.update_local_topic_writer(dwd);
db.update_topic_data_p(topic);
if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic {
topic_name: topic.name(),
}) {
error!(
"Failed send DiscoveryCommand::AddTopic about topic {}: {}",
topic.name(),
e
);
}
let new_writer = WriterIngredients {
guid,
writer_command_receiver: hccc_download,
writer_command_receiver_waker: writer_waker,
topic_name: topic.name(),
like_stateless: writer_like_stateless,
qos_policies: writer_qos,
status_sender,
security_plugins: self.security_plugins_handle.clone(),
};
self
.add_writer_sender
.send(new_writer)
.or_else(|e| create_error_poisoned!("Adding a new writer failed: {}", e))?;
Ok(data_writer)
}
pub fn create_datawriter_no_key<D, SA>(
&self,
outer: &Publisher,
entity_id_opt: Option<EntityId>,
topic: &Topic,
qos: Option<QosPolicies>,
writer_like_stateless: bool, ) -> CreateResult<NoKeyDataWriter<D, SA>>
where
SA: adapters::no_key::SerializerAdapter<D>,
{
let entity_id =
self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::WRITER_NO_KEY_USER_DEFINED);
let d = self.create_datawriter::<NoKeyWrapper<D>, SAWrapper<SA>>(
outer,
Some(entity_id),
topic,
qos,
writer_like_stateless,
)?;
Ok(NoKeyDataWriter::<D, SA>::from_keyed(d))
}
pub fn participant(&self) -> Option<DomainParticipant> {
self.domain_participant.clone().upgrade()
}
pub fn get_default_datawriter_qos(&self) -> &QosPolicies {
&self.default_datawriter_qos
}
pub fn set_default_datawriter_qos(&mut self, q: &QosPolicies) {
self.default_datawriter_qos = q.clone();
}
fn unwrap_or_new_entity_id(
&self,
entity_id_opt: Option<EntityId>,
entity_kind: EntityKind,
) -> EntityId {
entity_id_opt.unwrap_or_else(|| self.participant().unwrap().new_entity_id(entity_kind))
}
pub(crate) fn remove_writer(&self, guid: GUID) {
try_send_timeout(&self.remove_writer_sender, guid, None)
.unwrap_or_else(|e| error!("Cannot remove Writer {guid:?} : {e:?}"));
}
pub(crate) fn identity(&self) -> EntityId {
self.id
}
}
impl Debug for InnerPublisher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("{:?}", self.participant()))?;
f.write_fmt(format_args!("Publisher QoS: {:?}", self.my_qos_policies))?;
f.write_fmt(format_args!(
"Publishers default Writer QoS: {:?}",
self.default_datawriter_qos
))
}
}
#[derive(Clone)]
pub struct Subscriber {
inner: Arc<InnerSubscriber>,
}
impl Subscriber {
pub(super) fn new(
domain_participant: DomainParticipantWeak,
discovery_db: Arc<RwLock<DiscoveryDB>>,
qos: QosPolicies,
sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
sender_remove_reader: mio_channel::SyncSender<GUID>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
security_plugins_handle: Option<SecurityPluginsHandle>,
) -> Self {
Self {
inner: Arc::new(InnerSubscriber::new(
domain_participant,
discovery_db,
qos,
sender_add_reader,
sender_remove_reader,
discovery_command,
security_plugins_handle,
)),
}
}
pub fn create_datareader<D, SA>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> CreateResult<WithKeyDataReader<D, SA>>
where
D: 'static + Keyed,
SA: adapters::with_key::DeserializerAdapter<D>,
{
self.inner.create_datareader(self, topic, None, qos, false)
}
pub fn create_datareader_cdr<D>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> CreateResult<WithKeyDataReader<D, CDRDeserializerAdapter<D>>>
where
D: 'static + serde::de::DeserializeOwned + Keyed,
for<'de> <D as Keyed>::K: Deserialize<'de>,
{
self.create_datareader::<D, CDRDeserializerAdapter<D>>(topic, qos)
}
pub fn create_datareader_no_key<D, SA>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> CreateResult<NoKeyDataReader<D, SA>>
where
D: 'static,
SA: adapters::no_key::DeserializerAdapter<D>,
{
self
.inner
.create_datareader_no_key(self, topic, None, qos, false)
}
pub fn create_simple_datareader_no_key<D, DA>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> CreateResult<no_key::SimpleDataReader<D, DA>>
where
D: 'static,
DA: 'static + adapters::no_key::DeserializerAdapter<D>,
{
self
.inner
.create_simple_datareader_no_key(self, topic, None, qos)
}
pub fn create_datareader_no_key_cdr<D>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> CreateResult<NoKeyDataReader<D, CDRDeserializerAdapter<D>>>
where
D: 'static + serde::de::DeserializeOwned,
{
self.create_datareader_no_key::<D, CDRDeserializerAdapter<D>>(topic, qos)
}
pub(crate) fn create_datareader_with_entity_id_with_key<D, SA>(
&self,
topic: &Topic,
entity_id: EntityId,
qos: Option<QosPolicies>,
reader_like_stateless: bool, ) -> CreateResult<WithKeyDataReader<D, SA>>
where
D: 'static + Keyed,
SA: adapters::with_key::DeserializerAdapter<D>,
{
self
.inner
.create_datareader(self, topic, Some(entity_id), qos, reader_like_stateless)
}
#[cfg(feature = "security")] pub(crate) fn create_datareader_with_entity_id_no_key<D, SA>(
&self,
topic: &Topic,
entity_id: EntityId,
qos: Option<QosPolicies>,
reader_like_stateless: bool, ) -> CreateResult<NoKeyDataReader<D, SA>>
where
D: 'static,
SA: adapters::no_key::DeserializerAdapter<D>,
{
self
.inner
.create_datareader_no_key(self, topic, Some(entity_id), qos, reader_like_stateless)
}
pub fn participant(&self) -> Option<DomainParticipant> {
self.inner.participant()
}
pub(crate) fn remove_reader(&self, guid: GUID) {
self.inner.remove_reader(guid);
}
}
#[derive(Clone)]
pub struct InnerSubscriber {
domain_participant: DomainParticipantWeak,
discovery_db: Arc<RwLock<DiscoveryDB>>,
qos: QosPolicies,
sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
sender_remove_reader: mio_channel::SyncSender<GUID>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
security_plugins_handle: Option<SecurityPluginsHandle>,
}
impl InnerSubscriber {
pub(super) fn new(
domain_participant: DomainParticipantWeak,
discovery_db: Arc<RwLock<DiscoveryDB>>,
qos: QosPolicies,
sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
sender_remove_reader: mio_channel::SyncSender<GUID>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
security_plugins_handle: Option<SecurityPluginsHandle>,
) -> Self {
Self {
domain_participant,
discovery_db,
qos,
sender_add_reader,
sender_remove_reader,
discovery_command,
security_plugins_handle,
}
}
fn create_datareader_internal<D, SA>(
&self,
outer: &Subscriber,
entity_id_opt: Option<EntityId>,
topic: &Topic,
optional_qos: Option<QosPolicies>,
reader_like_stateless: bool, ) -> CreateResult<WithKeyDataReader<D, SA>>
where
D: 'static + Keyed,
SA: adapters::with_key::DeserializerAdapter<D>,
{
let simple_dr = self.create_simple_datareader_internal(
outer,
entity_id_opt,
topic,
optional_qos,
reader_like_stateless,
)?;
Ok(with_key::DataReader::<D, SA>::from_simple_data_reader(
simple_dr,
))
}
fn create_simple_datareader_internal<D, SA>(
&self,
outer: &Subscriber,
entity_id_opt: Option<EntityId>,
topic: &Topic,
optional_qos: Option<QosPolicies>,
reader_like_stateless: bool, ) -> CreateResult<with_key::SimpleDataReader<D, SA>>
where
D: 'static + Keyed,
SA: adapters::with_key::DeserializerAdapter<D>,
{
let (send, rec) = mio_channel::sync_channel::<()>(4);
let (status_sender, status_receiver) = sync_status_channel::<DataReaderStatus>(4)?;
let (reader_command_sender, reader_command_receiver) =
mio_channel::sync_channel::<ReaderCommand>(0);
let qos = self
.qos
.modify_by(&topic.qos())
.modify_by(&optional_qos.unwrap_or_else(QosPolicies::qos_none));
let entity_id =
self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_WITH_KEY_USER_DEFINED);
let dp = match self.participant() {
Some(dp) => dp,
None => return create_error_dropped!("DomainParticipant doesn't exist anymore."),
};
let topic_cache_handle = match dp.dds_cache().read() {
Ok(dds_cache) => dds_cache.get_existing_topic_cache(&topic.name())?,
Err(e) => return create_error_poisoned!("Cannot lock DDScache. Error: {}", e),
};
match topic_cache_handle.lock() {
Ok(mut tc) => tc.update_keep_limits(&qos),
Err(e) => return create_error_poisoned!("Cannot lock topic cache. Error: {}", e),
};
let reader_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), entity_id);
#[cfg(feature = "security")]
if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
let check_res = sec_handle.get_plugins().check_create_datareader(
reader_guid.prefix,
dp.domain_id(),
topic.name(),
&qos,
);
match check_res {
Ok(check_passed) => {
if !check_passed {
return create_error_not_allowed_by_security!(
"Not allowed to create a DataReader to topic {}",
topic.name()
);
}
}
Err(e) => {
return create_error_internal!(
"Failed to check DataReader rights from Access control: {}",
e.msg
);
}
};
if let Err(e) = {
let reader_security_attributes = sec_handle
.get_plugins()
.get_reader_sec_attributes(reader_guid, topic.name()); reader_security_attributes.and_then(|attributes| {
sec_handle
.get_plugins()
.register_local_reader(reader_guid, qos.property(), attributes)
})
} {
return create_error_internal!(
"Failed to register reader to crypto plugin: {} . GUID: {:?}",
e,
reader_guid
);
} else {
info!("Registered local reader to crypto plugin. GUID: {reader_guid:?}");
}
}
let data_reader_waker = Arc::new(Mutex::new(None));
let (poll_event_source, poll_event_sender) = mio_source::make_poll_channel()?;
let new_reader = ReaderIngredients {
guid: reader_guid,
notification_sender: send,
status_sender,
topic_name: topic.name(),
topic_cache_handle: topic_cache_handle.clone(),
like_stateless: reader_like_stateless,
qos_policy: qos.clone(),
data_reader_command_receiver: reader_command_receiver,
data_reader_waker: data_reader_waker.clone(),
poll_event_sender,
security_plugins: self.security_plugins_handle.clone(),
};
#[cfg(not(feature = "security"))]
let security_info: Option<EndpointSecurityInfo> = None;
#[cfg(feature = "security")]
let security_info = if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
if reader_guid.entity_id.entity_kind.is_user_defined() {
match sec_handle
.get_plugins()
.get_reader_sec_attributes(reader_guid, topic.name())
{
Ok(attr) => EndpointSecurityInfo::from(attr).into(),
Err(e) => {
return create_error_internal!(
"Failed to get security info for reader: {}. Guid: {:?}",
e,
reader_guid
);
}
}
} else {
None }
} else {
None
};
{
let mut db = self
.discovery_db
.write()
.or_else(|e| create_error_poisoned!("Cannot lock discovery_db. {}", e))?;
db.update_local_topic_reader(&dp, topic, &new_reader, security_info);
db.update_topic_data_p(topic);
if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic {
topic_name: topic.name(),
}) {
error!(
"Failed send DiscoveryCommand::AddTopic about topic {}: {}",
topic.name(),
e
);
}
}
let datareader = with_key::SimpleDataReader::<D, SA>::new(
outer.clone(),
entity_id,
topic.clone(),
qos,
rec,
topic_cache_handle,
self.discovery_command.clone(),
status_receiver,
reader_command_sender,
data_reader_waker,
poll_event_source,
)?;
self
.sender_add_reader
.try_send(new_reader)
.or_else(|e| create_error_poisoned!("Cannot add DataReader. Error: {}", e))?;
Ok(datareader)
}
pub fn create_datareader<D, SA>(
&self,
outer: &Subscriber,
topic: &Topic,
entity_id: Option<EntityId>,
qos: Option<QosPolicies>,
reader_like_stateless: bool, ) -> CreateResult<WithKeyDataReader<D, SA>>
where
D: 'static + Keyed,
SA: adapters::with_key::DeserializerAdapter<D>,
{
if topic.kind() != TopicKind::WithKey {
return Err(CreateError::TopicKind(TopicKind::WithKey));
}
self.create_datareader_internal(outer, entity_id, topic, qos, reader_like_stateless)
}
pub fn create_datareader_no_key<D: 'static, SA>(
&self,
outer: &Subscriber,
topic: &Topic,
entity_id_opt: Option<EntityId>,
qos: Option<QosPolicies>,
reader_like_stateless: bool, ) -> CreateResult<NoKeyDataReader<D, SA>>
where
SA: adapters::no_key::DeserializerAdapter<D>,
{
if topic.kind() != TopicKind::NoKey {
return Err(CreateError::TopicKind(TopicKind::NoKey));
}
let entity_id =
self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_NO_KEY_USER_DEFINED);
let d = self.create_datareader_internal::<NoKeyWrapper<D>, DAWrapper<SA>>(
outer,
Some(entity_id),
topic,
qos,
reader_like_stateless,
)?;
Ok(NoKeyDataReader::<D, SA>::from_keyed(d))
}
pub fn create_simple_datareader_no_key<D: 'static, SA>(
&self,
outer: &Subscriber,
topic: &Topic,
entity_id_opt: Option<EntityId>,
qos: Option<QosPolicies>,
) -> CreateResult<no_key::SimpleDataReader<D, SA>>
where
SA: adapters::no_key::DeserializerAdapter<D> + 'static,
{
if topic.kind() != TopicKind::NoKey {
return Err(CreateError::TopicKind(TopicKind::NoKey));
}
let entity_id =
self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_NO_KEY_USER_DEFINED);
let d = self.create_simple_datareader_internal::<NoKeyWrapper<D>, DAWrapper<SA>>(
outer,
Some(entity_id),
topic,
qos,
false,
)?;
Ok(no_key::SimpleDataReader::<D, SA>::from_keyed(d))
}
pub fn participant(&self) -> Option<DomainParticipant> {
self.domain_participant.clone().upgrade()
}
pub(crate) fn remove_reader(&self, guid: GUID) {
try_send_timeout(&self.sender_remove_reader, guid, None)
.unwrap_or_else(|e| error!("Cannot remove Reader {guid:?} : {e:?}"));
}
fn unwrap_or_new_entity_id(
&self,
entity_id_opt: Option<EntityId>,
entity_kind: EntityKind,
) -> EntityId {
entity_id_opt.unwrap_or_else(|| self.participant().unwrap().new_entity_id(entity_kind))
}
}
#[cfg(test)]
mod tests {}