1use std::{
2 fmt::Debug,
3 sync::{Arc, Mutex, MutexGuard, RwLock},
4 time::Duration,
5};
6
7use serde::{Deserialize, Serialize};
8use mio_extras::channel as mio_channel;
9use byteorder::LittleEndian;
10#[allow(unused_imports)]
11use log::{debug, error, info, trace, warn};
12
13use crate::{
14 create_error_dropped, create_error_poisoned,
15 dds::{
16 adapters,
17 key::Keyed,
18 no_key,
19 no_key::{
20 datareader::DataReader as NoKeyDataReader, datawriter::DataWriter as NoKeyDataWriter,
21 },
22 participant::*,
23 qos::*,
24 result::{CreateError, CreateResult, WaitResult},
25 statusevents::{sync_status_channel, DataReaderStatus},
26 topic::*,
27 with_key,
28 with_key::{
29 datareader::DataReader as WithKeyDataReader, datawriter::DataWriter as WithKeyDataWriter,
30 },
31 },
32 discovery::{
33 discovery::DiscoveryCommand, discovery_db::DiscoveryDB, sedp_messages::DiscoveredWriterData,
34 },
35 mio_source,
36 rtps::{
37 reader::ReaderIngredients,
38 writer::{WriterCommand, WriterIngredients},
39 },
40 serialization::{CDRDeserializerAdapter, CDRSerializerAdapter},
41 structure::{
42 entity::RTPSEntity,
43 guid::{EntityId, EntityKind, GUID},
44 },
45};
46use super::{
47 helpers::try_send_timeout,
48 no_key::wrappers::{DAWrapper, NoKeyWrapper, SAWrapper},
49 with_key::simpledatareader::ReaderCommand,
50};
51#[cfg(feature = "security")]
52use crate::{
53 create_error_internal, create_error_not_allowed_by_security,
54 security::{security_plugins::SecurityPluginsHandle, EndpointSecurityInfo},
55};
56#[cfg(not(feature = "security"))]
57use crate::no_security::{security_plugins::SecurityPluginsHandle, EndpointSecurityInfo};
58
59#[derive(Clone)]
90pub struct Publisher {
91 inner: Arc<Mutex<InnerPublisher>>,
92}
93
94impl Publisher {
95 #[allow(clippy::too_many_arguments)]
96 pub(super) fn new(
97 dp: DomainParticipantWeak,
98 discovery_db: Arc<RwLock<DiscoveryDB>>,
99 qos: QosPolicies,
100 default_dw_qos: QosPolicies,
101 add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
102 remove_writer_sender: mio_channel::SyncSender<GUID>,
103 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
104 security_plugins_handle: Option<SecurityPluginsHandle>,
105 ) -> Self {
106 Self {
107 inner: Arc::new(Mutex::new(InnerPublisher::new(
108 dp,
109 discovery_db,
110 qos,
111 default_dw_qos,
112 add_writer_sender,
113 remove_writer_sender,
114 discovery_command,
115 security_plugins_handle,
116 ))),
117 }
118 }
119
120 fn inner_lock(&self) -> MutexGuard<'_, InnerPublisher> {
121 self
122 .inner
123 .lock()
124 .unwrap_or_else(|e| panic!("Inner publisher lock fail! {e:?}"))
125 }
126
127 pub fn create_datawriter<D, SA>(
161 &self,
162 topic: &Topic,
163 qos: Option<QosPolicies>,
164 ) -> CreateResult<WithKeyDataWriter<D, SA>>
165 where
166 D: Keyed,
167 SA: adapters::with_key::SerializerAdapter<D>,
168 {
169 self
170 .inner_lock()
171 .create_datawriter(self, None, topic, qos, false)
172 }
173
174 pub fn create_datawriter_cdr<D>(
177 &self,
178 topic: &Topic,
179 qos: Option<QosPolicies>,
180 ) -> CreateResult<WithKeyDataWriter<D, CDRSerializerAdapter<D, LittleEndian>>>
181 where
182 D: Keyed + serde::Serialize,
183 <D as Keyed>::K: Serialize,
184 {
185 self.create_datawriter::<D, CDRSerializerAdapter<D, LittleEndian>>(topic, qos)
186 }
187
188 pub fn create_datawriter_no_key<D, SA>(
215 &self,
216 topic: &Topic,
217 qos: Option<QosPolicies>,
218 ) -> CreateResult<NoKeyDataWriter<D, SA>>
219 where
220 SA: adapters::no_key::SerializerAdapter<D>,
221 {
222 self
223 .inner_lock()
224 .create_datawriter_no_key(self, None, topic, qos, false)
225 }
226
227 pub fn create_datawriter_no_key_cdr<D>(
228 &self,
229 topic: &Topic,
230 qos: Option<QosPolicies>,
231 ) -> CreateResult<NoKeyDataWriter<D, CDRSerializerAdapter<D, LittleEndian>>>
232 where
233 D: serde::Serialize,
234 {
235 self.create_datawriter_no_key::<D, CDRSerializerAdapter<D, LittleEndian>>(topic, qos)
236 }
237
238 pub(crate) fn create_datawriter_with_entity_id_with_key<D, SA>(
241 &self,
242 entity_id: EntityId,
243 topic: &Topic,
244 qos: Option<QosPolicies>,
245 writer_like_stateless: bool, ) -> CreateResult<WithKeyDataWriter<D, SA>>
247 where
248 D: Keyed,
249 SA: adapters::with_key::SerializerAdapter<D>,
250 {
251 self
252 .inner_lock()
253 .create_datawriter(self, Some(entity_id), topic, qos, writer_like_stateless)
254 }
255
256 #[cfg(feature = "security")] pub(crate) fn create_datawriter_with_entity_id_no_key<D, SA>(
258 &self,
259 entity_id: EntityId,
260 topic: &Topic,
261 qos: Option<QosPolicies>,
262 writer_like_stateless: bool, ) -> CreateResult<NoKeyDataWriter<D, SA>>
264 where
265 SA: crate::no_key::SerializerAdapter<D>,
266 {
267 self.inner_lock().create_datawriter_no_key(
268 self,
269 Some(entity_id),
270 topic,
271 qos,
272 writer_like_stateless,
273 )
274 }
275
276 #[deprecated(note = "unimplemented")]
287 pub fn suspend_publications(&self) {
288 unimplemented!();
289 }
290
291 #[deprecated(note = "unimplemented")]
293 pub fn resume_publications(&self) {
294 unimplemented!();
295 }
296
297 #[deprecated(note = "unimplemented")]
303 pub fn begin_coherent_changes(&self) {}
304
305 #[deprecated(note = "unimplemented")]
308 pub fn end_coherent_changes(&self) {}
309
310 #[deprecated(note = "unimplemented")]
314 pub fn wait_for_acknowledgments(&self, _max_wait: Duration) -> WaitResult<()> {
315 unimplemented!();
316 }
317
318 pub fn participant(&self) -> Option<DomainParticipant> {
335 self.inner_lock().domain_participant.clone().upgrade()
336 }
337
338 pub fn get_default_datawriter_qos(&self) -> QosPolicies {
355 self.inner_lock().get_default_datawriter_qos().clone()
356 }
357
358 pub fn set_default_datawriter_qos(&mut self, q: &QosPolicies) {
377 self.inner_lock().set_default_datawriter_qos(q);
378 }
379
380 pub(crate) fn remove_writer(&self, guid: GUID) {
382 self.inner_lock().remove_writer(guid);
383 }
384} impl PartialEq for Publisher {
387 fn eq(&self, other: &Self) -> bool {
388 let id_self = { self.inner_lock().identity() };
389 let id_other = { other.inner_lock().identity() };
390 id_self == id_other
391 }
392}
393
394impl Debug for Publisher {
395 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396 self.inner_lock().fmt(f)
397 }
398}
399
400#[derive(Clone)]
403struct InnerPublisher {
404 id: EntityId,
405 domain_participant: DomainParticipantWeak,
406 discovery_db: Arc<RwLock<DiscoveryDB>>,
407 my_qos_policies: QosPolicies,
408 default_datawriter_qos: QosPolicies, add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
410 remove_writer_sender: mio_channel::SyncSender<GUID>,
411 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
412 security_plugins_handle: Option<SecurityPluginsHandle>,
413}
414
415impl InnerPublisher {
417 #[allow(clippy::too_many_arguments)]
418 fn new(
419 dp: DomainParticipantWeak,
420 discovery_db: Arc<RwLock<DiscoveryDB>>,
421 qos: QosPolicies,
422 default_dw_qos: QosPolicies,
423 add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
424 remove_writer_sender: mio_channel::SyncSender<GUID>,
425 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
426 security_plugins_handle: Option<SecurityPluginsHandle>,
427 ) -> Self {
428 let id = EntityId::MAX;
431 Self {
434 id,
435 domain_participant: dp,
436 discovery_db,
437 my_qos_policies: qos,
438 default_datawriter_qos: default_dw_qos,
439 add_writer_sender,
440 remove_writer_sender,
441 discovery_command,
442 security_plugins_handle,
443 }
444 }
445
446 pub fn create_datawriter<D, SA>(
447 &self,
448 outer: &Publisher,
449 entity_id_opt: Option<EntityId>,
450 topic: &Topic,
451 optional_qos: Option<QosPolicies>,
452 writer_like_stateless: bool, ) -> CreateResult<WithKeyDataWriter<D, SA>>
454 where
455 D: Keyed,
456 SA: adapters::with_key::SerializerAdapter<D>,
457 {
458 let (dwcc_upload, hccc_download) = mio_channel::sync_channel::<WriterCommand>(16);
460 let writer_waker = Arc::new(Mutex::new(None));
461 let (status_sender, status_receiver) = sync_status_channel(4)?;
463
464 let writer_qos = self
472 .default_datawriter_qos
473 .modify_by(&topic.qos())
474 .modify_by(&optional_qos.unwrap_or_else(QosPolicies::qos_none));
475
476 let entity_id =
477 self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::WRITER_WITH_KEY_USER_DEFINED);
478 let dp = self
479 .participant()
480 .ok_or("upgrade fail")
481 .or_else(|e| create_error_dropped!("Where is my DomainParticipant? {}", e))?;
482
483 let guid = GUID::new_with_prefix_and_id(dp.guid().prefix, entity_id);
484
485 #[cfg(feature = "security")]
486 if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
487 let check_res = sec_handle.get_plugins().check_create_datawriter(
490 guid.prefix,
491 dp.domain_id(),
492 topic.name(),
493 &writer_qos,
494 );
495 match check_res {
496 Ok(check_passed) => {
497 if !check_passed {
498 return create_error_not_allowed_by_security!(
499 "Not allowed to create a DataWriter to topic {}",
500 topic.name()
501 );
502 }
503 }
504 Err(e) => {
505 return create_error_internal!(
507 "Failed to check DataWriter rights from Access control: {}",
508 e.msg
509 );
510 }
511 };
512
513 if let Err(e) = {
515 let writer_security_attributes = sec_handle
516 .get_plugins()
517 .get_writer_sec_attributes(guid, topic.name()); writer_security_attributes.and_then(|attributes| {
519 sec_handle
520 .get_plugins()
521 .register_local_writer(guid, writer_qos.property(), attributes)
522 })
523 } {
524 return create_error_internal!(
525 "Failed to register writer to crypto plugin: {} . GUID: {:?}",
526 e,
527 guid
528 );
529 } else {
530 info!("Registered local writer to crypto plugin. GUID: {guid:?}");
531 }
532 }
533
534 let data_writer = WithKeyDataWriter::<D, SA>::new(
536 outer.clone(),
537 topic.clone(),
538 writer_qos.clone(),
539 guid,
540 dwcc_upload,
541 Arc::clone(&writer_waker),
542 self.discovery_command.clone(),
543 status_receiver,
544 )?;
545
546 #[cfg(not(feature = "security"))]
548 let security_info = None;
549 #[cfg(feature = "security")]
550 let security_info = if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
551 if guid.entity_id.entity_kind.is_user_defined() {
553 match sec_handle
554 .get_plugins()
555 .get_writer_sec_attributes(guid, topic.name())
556 {
557 Ok(attr) => EndpointSecurityInfo::from(attr).into(),
558 Err(e) => {
559 return create_error_internal!(
560 "Failed to get security info for writer: {}. Guid: {:?}",
561 e,
562 guid
563 );
564 }
565 }
566 } else {
567 None }
569 } else {
570 None
572 };
573
574 let mut db = self
576 .discovery_db
577 .write()
578 .map_err(|e| CreateError::Poisoned {
579 reason: format!("Discovery DB: {e}"),
580 })?;
581
582 let dwd = DiscoveredWriterData::new(&data_writer, topic, &dp, security_info);
583 db.update_local_topic_writer(dwd);
584 db.update_topic_data_p(topic);
585
586 if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic {
588 topic_name: topic.name(),
589 }) {
590 error!(
593 "Failed send DiscoveryCommand::AddTopic about topic {}: {}",
594 topic.name(),
595 e
596 );
597 }
598
599 let new_writer = WriterIngredients {
606 guid,
607 writer_command_receiver: hccc_download,
608 writer_command_receiver_waker: writer_waker,
609 topic_name: topic.name(),
610 like_stateless: writer_like_stateless,
611 qos_policies: writer_qos,
612 status_sender,
613 security_plugins: self.security_plugins_handle.clone(),
614 };
615
616 self
617 .add_writer_sender
618 .send(new_writer)
619 .or_else(|e| create_error_poisoned!("Adding a new writer failed: {}", e))?;
620
621 Ok(data_writer)
623 }
624
625 pub fn create_datawriter_no_key<D, SA>(
626 &self,
627 outer: &Publisher,
628 entity_id_opt: Option<EntityId>,
629 topic: &Topic,
630 qos: Option<QosPolicies>,
631 writer_like_stateless: bool, ) -> CreateResult<NoKeyDataWriter<D, SA>>
633 where
634 SA: adapters::no_key::SerializerAdapter<D>,
635 {
636 let entity_id =
637 self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::WRITER_NO_KEY_USER_DEFINED);
638 let d = self.create_datawriter::<NoKeyWrapper<D>, SAWrapper<SA>>(
639 outer,
640 Some(entity_id),
641 topic,
642 qos,
643 writer_like_stateless,
644 )?;
645 Ok(NoKeyDataWriter::<D, SA>::from_keyed(d))
646 }
647
648 pub fn participant(&self) -> Option<DomainParticipant> {
649 self.domain_participant.clone().upgrade()
650 }
651
652 pub fn get_default_datawriter_qos(&self) -> &QosPolicies {
653 &self.default_datawriter_qos
654 }
655
656 pub fn set_default_datawriter_qos(&mut self, q: &QosPolicies) {
657 self.default_datawriter_qos = q.clone();
658 }
659
660 fn unwrap_or_new_entity_id(
661 &self,
662 entity_id_opt: Option<EntityId>,
663 entity_kind: EntityKind,
664 ) -> EntityId {
665 entity_id_opt.unwrap_or_else(|| self.participant().unwrap().new_entity_id(entity_kind))
668 }
669
670 pub(crate) fn remove_writer(&self, guid: GUID) {
671 try_send_timeout(&self.remove_writer_sender, guid, None)
672 .unwrap_or_else(|e| error!("Cannot remove Writer {guid:?} : {e:?}"));
673 }
674
675 pub(crate) fn identity(&self) -> EntityId {
676 self.id
677 }
678}
679
680impl Debug for InnerPublisher {
681 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
682 f.write_fmt(format_args!("{:?}", self.participant()))?;
683 f.write_fmt(format_args!("Publisher QoS: {:?}", self.my_qos_policies))?;
684 f.write_fmt(format_args!(
685 "Publishers default Writer QoS: {:?}",
686 self.default_datawriter_qos
687 ))
688 }
689}
690
691#[derive(Clone)]
717pub struct Subscriber {
718 inner: Arc<InnerSubscriber>,
719}
720
721impl Subscriber {
722 pub(super) fn new(
723 domain_participant: DomainParticipantWeak,
724 discovery_db: Arc<RwLock<DiscoveryDB>>,
725 qos: QosPolicies,
726 sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
727 sender_remove_reader: mio_channel::SyncSender<GUID>,
728 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
729 security_plugins_handle: Option<SecurityPluginsHandle>,
730 ) -> Self {
731 Self {
732 inner: Arc::new(InnerSubscriber::new(
733 domain_participant,
734 discovery_db,
735 qos,
736 sender_add_reader,
737 sender_remove_reader,
738 discovery_command,
739 security_plugins_handle,
740 )),
741 }
742 }
743
744 pub fn create_datareader<D, SA>(
781 &self,
782 topic: &Topic,
783 qos: Option<QosPolicies>,
784 ) -> CreateResult<WithKeyDataReader<D, SA>>
785 where
786 D: 'static + Keyed,
787 SA: adapters::with_key::DeserializerAdapter<D>,
788 {
789 self.inner.create_datareader(self, topic, None, qos, false)
790 }
791
792 pub fn create_datareader_cdr<D>(
793 &self,
794 topic: &Topic,
795 qos: Option<QosPolicies>,
796 ) -> CreateResult<WithKeyDataReader<D, CDRDeserializerAdapter<D>>>
797 where
798 D: 'static + serde::de::DeserializeOwned + Keyed,
799 for<'de> <D as Keyed>::K: Deserialize<'de>,
800 {
801 self.create_datareader::<D, CDRDeserializerAdapter<D>>(topic, qos)
802 }
803
804 pub fn create_datareader_no_key<D, SA>(
834 &self,
835 topic: &Topic,
836 qos: Option<QosPolicies>,
837 ) -> CreateResult<NoKeyDataReader<D, SA>>
838 where
839 D: 'static,
840 SA: adapters::no_key::DeserializerAdapter<D>,
841 {
842 self
843 .inner
844 .create_datareader_no_key(self, topic, None, qos, false)
845 }
846
847 pub fn create_simple_datareader_no_key<D, DA>(
848 &self,
849 topic: &Topic,
850 qos: Option<QosPolicies>,
851 ) -> CreateResult<no_key::SimpleDataReader<D, DA>>
852 where
853 D: 'static,
854 DA: 'static + adapters::no_key::DeserializerAdapter<D>,
855 {
856 self
857 .inner
858 .create_simple_datareader_no_key(self, topic, None, qos)
859 }
860
861 pub fn create_datareader_no_key_cdr<D>(
862 &self,
863 topic: &Topic,
864 qos: Option<QosPolicies>,
865 ) -> CreateResult<NoKeyDataReader<D, CDRDeserializerAdapter<D>>>
866 where
867 D: 'static + serde::de::DeserializeOwned,
868 {
869 self.create_datareader_no_key::<D, CDRDeserializerAdapter<D>>(topic, qos)
870 }
871
872 pub(crate) fn create_datareader_with_entity_id_with_key<D, SA>(
875 &self,
876 topic: &Topic,
877 entity_id: EntityId,
878 qos: Option<QosPolicies>,
879 reader_like_stateless: bool, ) -> CreateResult<WithKeyDataReader<D, SA>>
881 where
882 D: 'static + Keyed,
883 SA: adapters::with_key::DeserializerAdapter<D>,
884 {
885 self
886 .inner
887 .create_datareader(self, topic, Some(entity_id), qos, reader_like_stateless)
888 }
889
890 #[cfg(feature = "security")] pub(crate) fn create_datareader_with_entity_id_no_key<D, SA>(
892 &self,
893 topic: &Topic,
894 entity_id: EntityId,
895 qos: Option<QosPolicies>,
896 reader_like_stateless: bool, ) -> CreateResult<NoKeyDataReader<D, SA>>
898 where
899 D: 'static,
900 SA: adapters::no_key::DeserializerAdapter<D>,
901 {
902 self
903 .inner
904 .create_datareader_no_key(self, topic, Some(entity_id), qos, reader_like_stateless)
905 }
906
907 pub fn participant(&self) -> Option<DomainParticipant> {
940 self.inner.participant()
941 }
942
943 pub(crate) fn remove_reader(&self, guid: GUID) {
944 self.inner.remove_reader(guid);
945 }
946}
947
948#[derive(Clone)]
949pub struct InnerSubscriber {
950 domain_participant: DomainParticipantWeak,
951 discovery_db: Arc<RwLock<DiscoveryDB>>,
952 qos: QosPolicies,
953 sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
954 sender_remove_reader: mio_channel::SyncSender<GUID>,
955 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
956 security_plugins_handle: Option<SecurityPluginsHandle>,
957}
958
959impl InnerSubscriber {
960 pub(super) fn new(
961 domain_participant: DomainParticipantWeak,
962 discovery_db: Arc<RwLock<DiscoveryDB>>,
963 qos: QosPolicies,
964 sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
965 sender_remove_reader: mio_channel::SyncSender<GUID>,
966 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
967 security_plugins_handle: Option<SecurityPluginsHandle>,
968 ) -> Self {
969 Self {
970 domain_participant,
971 discovery_db,
972 qos,
973 sender_add_reader,
974 sender_remove_reader,
975 discovery_command,
976 security_plugins_handle,
977 }
978 }
979
980 fn create_datareader_internal<D, SA>(
981 &self,
982 outer: &Subscriber,
983 entity_id_opt: Option<EntityId>,
984 topic: &Topic,
985 optional_qos: Option<QosPolicies>,
986 reader_like_stateless: bool, ) -> CreateResult<WithKeyDataReader<D, SA>>
988 where
989 D: 'static + Keyed,
990 SA: adapters::with_key::DeserializerAdapter<D>,
991 {
992 let simple_dr = self.create_simple_datareader_internal(
993 outer,
994 entity_id_opt,
995 topic,
996 optional_qos,
997 reader_like_stateless,
998 )?;
999 Ok(with_key::DataReader::<D, SA>::from_simple_data_reader(
1000 simple_dr,
1001 ))
1002 }
1003
1004 fn create_simple_datareader_internal<D, SA>(
1005 &self,
1006 outer: &Subscriber,
1007 entity_id_opt: Option<EntityId>,
1008 topic: &Topic,
1009 optional_qos: Option<QosPolicies>,
1010 reader_like_stateless: bool, ) -> CreateResult<with_key::SimpleDataReader<D, SA>>
1012 where
1013 D: 'static + Keyed,
1014 SA: adapters::with_key::DeserializerAdapter<D>,
1015 {
1016 let (send, rec) = mio_channel::sync_channel::<()>(4);
1018 let (status_sender, status_receiver) = sync_status_channel::<DataReaderStatus>(4)?;
1020
1021 let (reader_command_sender, reader_command_receiver) =
1023 mio_channel::sync_channel::<ReaderCommand>(0);
1024 let qos = self
1032 .qos
1033 .modify_by(&topic.qos())
1034 .modify_by(&optional_qos.unwrap_or_else(QosPolicies::qos_none));
1035
1036 let entity_id =
1037 self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_WITH_KEY_USER_DEFINED);
1038
1039 let dp = match self.participant() {
1040 Some(dp) => dp,
1041 None => return create_error_dropped!("DomainParticipant doesn't exist anymore."),
1042 };
1043
1044 let topic_cache_handle = match dp.dds_cache().read() {
1046 Ok(dds_cache) => dds_cache.get_existing_topic_cache(&topic.name())?,
1047 Err(e) => return create_error_poisoned!("Cannot lock DDScache. Error: {}", e),
1048 };
1049 match topic_cache_handle.lock() {
1051 Ok(mut tc) => tc.update_keep_limits(&qos),
1052 Err(e) => return create_error_poisoned!("Cannot lock topic cache. Error: {}", e),
1053 };
1054
1055 let reader_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), entity_id);
1056
1057 #[cfg(feature = "security")]
1058 if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
1059 let check_res = sec_handle.get_plugins().check_create_datareader(
1062 reader_guid.prefix,
1063 dp.domain_id(),
1064 topic.name(),
1065 &qos,
1066 );
1067 match check_res {
1068 Ok(check_passed) => {
1069 if !check_passed {
1070 return create_error_not_allowed_by_security!(
1071 "Not allowed to create a DataReader to topic {}",
1072 topic.name()
1073 );
1074 }
1075 }
1076 Err(e) => {
1077 return create_error_internal!(
1079 "Failed to check DataReader rights from Access control: {}",
1080 e.msg
1081 );
1082 }
1083 };
1084
1085 if let Err(e) = {
1087 let reader_security_attributes = sec_handle
1088 .get_plugins()
1089 .get_reader_sec_attributes(reader_guid, topic.name()); reader_security_attributes.and_then(|attributes| {
1091 sec_handle
1092 .get_plugins()
1093 .register_local_reader(reader_guid, qos.property(), attributes)
1094 })
1095 } {
1096 return create_error_internal!(
1097 "Failed to register reader to crypto plugin: {} . GUID: {:?}",
1098 e,
1099 reader_guid
1100 );
1101 } else {
1102 info!("Registered local reader to crypto plugin. GUID: {reader_guid:?}");
1103 }
1104 }
1105
1106 let data_reader_waker = Arc::new(Mutex::new(None));
1108
1109 let (poll_event_source, poll_event_sender) = mio_source::make_poll_channel()?;
1110
1111 let new_reader = ReaderIngredients {
1112 guid: reader_guid,
1113 notification_sender: send,
1114 status_sender,
1115 topic_name: topic.name(),
1116 topic_cache_handle: topic_cache_handle.clone(),
1117 like_stateless: reader_like_stateless,
1118 qos_policy: qos.clone(),
1119 data_reader_command_receiver: reader_command_receiver,
1120 data_reader_waker: data_reader_waker.clone(),
1121 poll_event_sender,
1122 security_plugins: self.security_plugins_handle.clone(),
1123 };
1124
1125 #[cfg(not(feature = "security"))]
1127 let security_info: Option<EndpointSecurityInfo> = None;
1128 #[cfg(feature = "security")]
1129 let security_info = if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
1130 if reader_guid.entity_id.entity_kind.is_user_defined() {
1132 match sec_handle
1133 .get_plugins()
1134 .get_reader_sec_attributes(reader_guid, topic.name())
1135 {
1136 Ok(attr) => EndpointSecurityInfo::from(attr).into(),
1137 Err(e) => {
1138 return create_error_internal!(
1139 "Failed to get security info for reader: {}. Guid: {:?}",
1140 e,
1141 reader_guid
1142 );
1143 }
1144 }
1145 } else {
1146 None }
1148 } else {
1149 None
1151 };
1152
1153 {
1155 let mut db = self
1156 .discovery_db
1157 .write()
1158 .or_else(|e| create_error_poisoned!("Cannot lock discovery_db. {}", e))?;
1159 db.update_local_topic_reader(&dp, topic, &new_reader, security_info);
1160 db.update_topic_data_p(topic);
1161
1162 if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic {
1164 topic_name: topic.name(),
1165 }) {
1166 error!(
1169 "Failed send DiscoveryCommand::AddTopic about topic {}: {}",
1170 topic.name(),
1171 e
1172 );
1173 }
1174 }
1175
1176 let datareader = with_key::SimpleDataReader::<D, SA>::new(
1182 outer.clone(),
1183 entity_id,
1184 topic.clone(),
1185 qos,
1186 rec,
1187 topic_cache_handle,
1188 self.discovery_command.clone(),
1189 status_receiver,
1190 reader_command_sender,
1191 data_reader_waker,
1192 poll_event_source,
1193 )?;
1194
1195 self
1198 .sender_add_reader
1199 .try_send(new_reader)
1200 .or_else(|e| create_error_poisoned!("Cannot add DataReader. Error: {}", e))?;
1201
1202 Ok(datareader)
1204 }
1205
1206 pub fn create_datareader<D, SA>(
1207 &self,
1208 outer: &Subscriber,
1209 topic: &Topic,
1210 entity_id: Option<EntityId>,
1211 qos: Option<QosPolicies>,
1212 reader_like_stateless: bool, ) -> CreateResult<WithKeyDataReader<D, SA>>
1214 where
1215 D: 'static + Keyed,
1216 SA: adapters::with_key::DeserializerAdapter<D>,
1217 {
1218 if topic.kind() != TopicKind::WithKey {
1219 return Err(CreateError::TopicKind(TopicKind::WithKey));
1220 }
1221 self.create_datareader_internal(outer, entity_id, topic, qos, reader_like_stateless)
1222 }
1223
1224 pub fn create_datareader_no_key<D: 'static, SA>(
1225 &self,
1226 outer: &Subscriber,
1227 topic: &Topic,
1228 entity_id_opt: Option<EntityId>,
1229 qos: Option<QosPolicies>,
1230 reader_like_stateless: bool, ) -> CreateResult<NoKeyDataReader<D, SA>>
1232 where
1233 SA: adapters::no_key::DeserializerAdapter<D>,
1234 {
1235 if topic.kind() != TopicKind::NoKey {
1236 return Err(CreateError::TopicKind(TopicKind::NoKey));
1237 }
1238
1239 let entity_id =
1240 self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_NO_KEY_USER_DEFINED);
1241
1242 let d = self.create_datareader_internal::<NoKeyWrapper<D>, DAWrapper<SA>>(
1243 outer,
1244 Some(entity_id),
1245 topic,
1246 qos,
1247 reader_like_stateless,
1248 )?;
1249
1250 Ok(NoKeyDataReader::<D, SA>::from_keyed(d))
1251 }
1252
1253 pub fn create_simple_datareader_no_key<D: 'static, SA>(
1254 &self,
1255 outer: &Subscriber,
1256 topic: &Topic,
1257 entity_id_opt: Option<EntityId>,
1258 qos: Option<QosPolicies>,
1259 ) -> CreateResult<no_key::SimpleDataReader<D, SA>>
1260 where
1261 SA: adapters::no_key::DeserializerAdapter<D> + 'static,
1262 {
1263 if topic.kind() != TopicKind::NoKey {
1264 return Err(CreateError::TopicKind(TopicKind::NoKey));
1265 }
1266
1267 let entity_id =
1268 self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_NO_KEY_USER_DEFINED);
1269
1270 let d = self.create_simple_datareader_internal::<NoKeyWrapper<D>, DAWrapper<SA>>(
1271 outer,
1272 Some(entity_id),
1273 topic,
1274 qos,
1275 false,
1276 )?;
1277
1278 Ok(no_key::SimpleDataReader::<D, SA>::from_keyed(d))
1279 }
1280
1281 pub fn participant(&self) -> Option<DomainParticipant> {
1282 self.domain_participant.clone().upgrade()
1283 }
1284
1285 pub(crate) fn remove_reader(&self, guid: GUID) {
1286 try_send_timeout(&self.sender_remove_reader, guid, None)
1287 .unwrap_or_else(|e| error!("Cannot remove Reader {guid:?} : {e:?}"));
1288 }
1289
1290 fn unwrap_or_new_entity_id(
1291 &self,
1292 entity_id_opt: Option<EntityId>,
1293 entity_kind: EntityKind,
1294 ) -> EntityId {
1295 entity_id_opt.unwrap_or_else(|| self.participant().unwrap().new_entity_id(entity_kind))
1298 }
1299}
1300
1301#[cfg(test)]
1304mod tests {}