1use std::{
2 fmt::Debug,
3 sync::{Arc, Mutex, MutexGuard, RwLock},
4 time::Duration,
5};
6
7use serde::{Deserialize, Serialize};
8use mio_extras::channel as mio_channel;
9use byteorder::LittleEndian;
10#[allow(unused_imports)]
11use log::{debug, error, info, trace, warn};
12
13use crate::{
14 create_error_dropped, create_error_internal, create_error_poisoned,
15 dds::{
16 adapters,
17 key::Keyed,
18 no_key,
19 no_key::{
20 datareader::DataReader as NoKeyDataReader, datawriter::DataWriter as NoKeyDataWriter,
21 },
22 participant::*,
23 qos::*,
24 result::{CreateError, CreateResult, WaitResult},
25 statusevents::{sync_status_channel, DataReaderStatus},
26 topic::*,
27 with_key,
28 with_key::{
29 datareader::DataReader as WithKeyDataReader, datawriter::DataWriter as WithKeyDataWriter,
30 },
31 },
32 discovery::{
33 discovery::DiscoveryCommand, discovery_db::DiscoveryDB, sedp_messages::DiscoveredWriterData,
34 },
35 mio_source,
36 rtps::{
37 reader::ReaderIngredients,
38 writer::{WriterCommand, WriterIngredients},
39 },
40 serialization::{CDRDeserializerAdapter, CDRSerializerAdapter},
41 structure::{
42 entity::RTPSEntity,
43 guid::{EntityId, EntityKind, GUID},
44 },
45};
46use super::{
47 helpers::try_send_timeout,
48 no_key::wrappers::{DAWrapper, NoKeyWrapper, SAWrapper},
49 with_key::simpledatareader::ReaderCommand,
50};
51#[cfg(feature = "security")]
52use crate::{
53 create_error_not_allowed_by_security,
54 security::{security_plugins::SecurityPluginsHandle, EndpointSecurityInfo},
55};
56#[cfg(not(feature = "security"))]
57use crate::no_security::{security_plugins::SecurityPluginsHandle, EndpointSecurityInfo};
58
59#[derive(Clone)]
90pub struct Publisher {
91 inner: Arc<Mutex<InnerPublisher>>,
92}
93
94impl Publisher {
95 #[allow(clippy::too_many_arguments)]
96 pub(super) fn new(
97 dp: DomainParticipantWeak,
98 discovery_db: Arc<RwLock<DiscoveryDB>>,
99 qos: QosPolicies,
100 default_dw_qos: QosPolicies,
101 add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
102 remove_writer_sender: mio_channel::SyncSender<GUID>,
103 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
104 security_plugins_handle: Option<SecurityPluginsHandle>,
105 ) -> Self {
106 Self {
107 inner: Arc::new(Mutex::new(InnerPublisher::new(
108 dp,
109 discovery_db,
110 qos,
111 default_dw_qos,
112 add_writer_sender,
113 remove_writer_sender,
114 discovery_command,
115 security_plugins_handle,
116 ))),
117 }
118 }
119
120 fn inner_lock(&self) -> MutexGuard<'_, InnerPublisher> {
121 self
122 .inner
123 .lock()
124 .unwrap_or_else(|e| panic!("Inner publisher lock fail! {e:?}"))
125 }
126
127 pub fn create_datawriter<D, SA>(
161 &self,
162 topic: &Topic,
163 qos: Option<QosPolicies>,
164 ) -> CreateResult<WithKeyDataWriter<D, SA>>
165 where
166 D: Keyed,
167 SA: adapters::with_key::SerializerAdapter<D>,
168 {
169 self
170 .inner_lock()
171 .create_datawriter(self, None, topic, qos, false)
172 }
173
174 pub fn create_datawriter_cdr<D>(
177 &self,
178 topic: &Topic,
179 qos: Option<QosPolicies>,
180 ) -> CreateResult<WithKeyDataWriter<D, CDRSerializerAdapter<D, LittleEndian>>>
181 where
182 D: Keyed + serde::Serialize,
183 <D as Keyed>::K: Serialize,
184 {
185 self.create_datawriter::<D, CDRSerializerAdapter<D, LittleEndian>>(topic, qos)
186 }
187
188 pub fn create_datawriter_no_key<D, SA>(
215 &self,
216 topic: &Topic,
217 qos: Option<QosPolicies>,
218 ) -> CreateResult<NoKeyDataWriter<D, SA>>
219 where
220 SA: adapters::no_key::SerializerAdapter<D>,
221 {
222 self
223 .inner_lock()
224 .create_datawriter_no_key(self, None, topic, qos, false)
225 }
226
227 pub fn create_datawriter_no_key_cdr<D>(
228 &self,
229 topic: &Topic,
230 qos: Option<QosPolicies>,
231 ) -> CreateResult<NoKeyDataWriter<D, CDRSerializerAdapter<D, LittleEndian>>>
232 where
233 D: serde::Serialize,
234 {
235 self.create_datawriter_no_key::<D, CDRSerializerAdapter<D, LittleEndian>>(topic, qos)
236 }
237
238 pub(crate) fn create_datawriter_with_entity_id_with_key<D, SA>(
241 &self,
242 entity_id: EntityId,
243 topic: &Topic,
244 qos: Option<QosPolicies>,
245 writer_like_stateless: bool, ) -> CreateResult<WithKeyDataWriter<D, SA>>
247 where
248 D: Keyed,
249 SA: adapters::with_key::SerializerAdapter<D>,
250 {
251 self
252 .inner_lock()
253 .create_datawriter(self, Some(entity_id), topic, qos, writer_like_stateless)
254 }
255
256 #[cfg(feature = "security")] pub(crate) fn create_datawriter_with_entity_id_no_key<D, SA>(
258 &self,
259 entity_id: EntityId,
260 topic: &Topic,
261 qos: Option<QosPolicies>,
262 writer_like_stateless: bool, ) -> CreateResult<NoKeyDataWriter<D, SA>>
264 where
265 SA: crate::no_key::SerializerAdapter<D>,
266 {
267 self.inner_lock().create_datawriter_no_key(
268 self,
269 Some(entity_id),
270 topic,
271 qos,
272 writer_like_stateless,
273 )
274 }
275
276 #[deprecated(note = "unimplemented")]
287 pub fn suspend_publications(&self) {
288 unimplemented!();
289 }
290
291 #[deprecated(note = "unimplemented")]
293 pub fn resume_publications(&self) {
294 unimplemented!();
295 }
296
297 #[deprecated(note = "unimplemented")]
303 pub fn begin_coherent_changes(&self) {}
304
305 #[deprecated(note = "unimplemented")]
308 pub fn end_coherent_changes(&self) {}
309
310 #[deprecated(note = "unimplemented")]
314 pub fn wait_for_acknowledgments(&self, _max_wait: Duration) -> WaitResult<()> {
315 unimplemented!();
316 }
317
318 pub fn participant(&self) -> Option<DomainParticipant> {
335 self.inner_lock().domain_participant.clone().upgrade()
336 }
337
338 pub fn get_default_datawriter_qos(&self) -> QosPolicies {
355 self.inner_lock().get_default_datawriter_qos().clone()
356 }
357
358 pub fn set_default_datawriter_qos(&mut self, q: &QosPolicies) {
377 self.inner_lock().set_default_datawriter_qos(q);
378 }
379
380 pub(crate) fn remove_writer(&self, guid: GUID) {
382 self.inner_lock().remove_writer(guid);
383 }
384} impl PartialEq for Publisher {
387 fn eq(&self, other: &Self) -> bool {
388 let id_self = { self.inner_lock().identity() };
389 let id_other = { other.inner_lock().identity() };
390 id_self == id_other
391 }
392}
393
394impl Debug for Publisher {
395 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396 self.inner_lock().fmt(f)
397 }
398}
399
400#[derive(Clone)]
403struct InnerPublisher {
404 id: EntityId,
405 domain_participant: DomainParticipantWeak,
406 discovery_db: Arc<RwLock<DiscoveryDB>>,
407 my_qos_policies: QosPolicies,
408 default_datawriter_qos: QosPolicies, add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
410 remove_writer_sender: mio_channel::SyncSender<GUID>,
411 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
412 security_plugins_handle: Option<SecurityPluginsHandle>,
413}
414
415impl InnerPublisher {
417 #[allow(clippy::too_many_arguments)]
418 fn new(
419 dp: DomainParticipantWeak,
420 discovery_db: Arc<RwLock<DiscoveryDB>>,
421 qos: QosPolicies,
422 default_dw_qos: QosPolicies,
423 add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
424 remove_writer_sender: mio_channel::SyncSender<GUID>,
425 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
426 security_plugins_handle: Option<SecurityPluginsHandle>,
427 ) -> Self {
428 let id = EntityId::MAX;
431 Self {
434 id,
435 domain_participant: dp,
436 discovery_db,
437 my_qos_policies: qos,
438 default_datawriter_qos: default_dw_qos,
439 add_writer_sender,
440 remove_writer_sender,
441 discovery_command,
442 security_plugins_handle,
443 }
444 }
445
446 pub fn create_datawriter<D, SA>(
447 &self,
448 outer: &Publisher,
449 entity_id_opt: Option<EntityId>,
450 topic: &Topic,
451 optional_qos: Option<QosPolicies>,
452 writer_like_stateless: bool, ) -> CreateResult<WithKeyDataWriter<D, SA>>
454 where
455 D: Keyed,
456 SA: adapters::with_key::SerializerAdapter<D>,
457 {
458 let (dwcc_upload, hccc_download) = mio_channel::sync_channel::<WriterCommand>(16);
460 let writer_waker = Arc::new(Mutex::new(None));
461 let (status_sender, status_receiver) = sync_status_channel(4)?;
463
464 let writer_qos = self
472 .default_datawriter_qos
473 .modify_by(&topic.qos())
474 .modify_by(&optional_qos.unwrap_or_else(QosPolicies::qos_none));
475
476 let entity_id =
477 self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::WRITER_WITH_KEY_USER_DEFINED);
478 let dp = self
479 .participant()
480 .ok_or("upgrade fail")
481 .or_else(|e| create_error_dropped!("Where is my DomainParticipant? {}", e))?;
482
483 let guid = GUID::new_with_prefix_and_id(dp.guid().prefix, entity_id);
484
485 #[cfg(feature = "security")]
486 if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
487 let check_res = sec_handle.get_plugins().check_create_datawriter(
490 guid.prefix,
491 dp.domain_id(),
492 topic.name(),
493 &writer_qos,
494 );
495 match check_res {
496 Ok(check_passed) => {
497 if !check_passed {
498 return create_error_not_allowed_by_security!(
499 "Not allowed to create a DataWriter to topic {}",
500 topic.name()
501 );
502 }
503 }
504 Err(e) => {
505 return create_error_internal!(
507 "Failed to check DataWriter rights from Access control: {}",
508 e.msg
509 );
510 }
511 };
512
513 if let Err(e) = {
515 let writer_security_attributes = sec_handle
516 .get_plugins()
517 .get_writer_sec_attributes(guid, topic.name()); writer_security_attributes.and_then(|attributes| {
519 sec_handle
520 .get_plugins()
521 .register_local_writer(guid, writer_qos.property(), attributes)
522 })
523 } {
524 return create_error_internal!(
525 "Failed to register writer to crypto plugin: {} . GUID: {:?}",
526 e,
527 guid
528 );
529 } else {
530 info!("Registered local writer to crypto plugin. GUID: {guid:?}");
531 }
532 }
533
534 let new_writer = WriterIngredients {
535 guid,
536 writer_command_receiver: hccc_download,
537 writer_command_receiver_waker: Arc::clone(&writer_waker),
538 topic_name: topic.name(),
539 like_stateless: writer_like_stateless,
540 qos_policies: writer_qos.clone(),
541 status_sender,
542 security_plugins: self.security_plugins_handle.clone(),
543 };
544
545 self
548 .add_writer_sender
549 .send(new_writer)
550 .or_else(|e| create_error_poisoned!("Adding a new writer failed: {}", e))?;
551
552 let data_writer = WithKeyDataWriter::<D, SA>::new(
553 outer.clone(),
554 topic.clone(),
555 writer_qos,
556 guid,
557 dwcc_upload,
558 writer_waker,
559 self.discovery_command.clone(),
560 status_receiver,
561 )?;
562
563 let mut db = self
565 .discovery_db
566 .write()
567 .map_err(|e| CreateError::Poisoned {
568 reason: format!("Discovery DB: {e}"),
569 })?;
570
571 #[cfg(not(feature = "security"))]
572 let security_info = None;
573 #[cfg(feature = "security")]
574 let security_info = if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
575 if guid.entity_id.entity_kind.is_user_defined() {
577 match sec_handle
578 .get_plugins()
579 .get_writer_sec_attributes(guid, topic.name())
580 {
581 Ok(attr) => EndpointSecurityInfo::from(attr).into(),
582 Err(e) => {
583 return create_error_internal!(
584 "Failed to get security info for writer: {}. Guid: {:?}",
585 e,
586 guid
587 );
588 }
589 }
590 } else {
591 None }
593 } else {
594 None
596 };
597
598 let dwd = DiscoveredWriterData::new(&data_writer, topic, &dp, security_info);
600 db.update_local_topic_writer(dwd);
601 db.update_topic_data_p(topic);
602
603 if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic {
604 topic_name: topic.name(),
605 }) {
606 error!(
609 "Failed send DiscoveryCommand::AddTopic about topic {}: {}",
610 topic.name(),
611 e
612 );
613 }
614
615 let writer_guid = self.domain_participant.guid().from_prefix(entity_id);
617 self
618 .discovery_command
619 .try_send(DiscoveryCommand::AddLocalWriter { guid: writer_guid })
620 .or_else(|e| {
621 create_error_internal!(
622 "Cannot inform Discovery about the new writer {writer_guid:?}. Error: {}",
623 e
624 )
625 })?;
626
627 Ok(data_writer)
629 }
630
631 pub fn create_datawriter_no_key<D, SA>(
632 &self,
633 outer: &Publisher,
634 entity_id_opt: Option<EntityId>,
635 topic: &Topic,
636 qos: Option<QosPolicies>,
637 writer_like_stateless: bool, ) -> CreateResult<NoKeyDataWriter<D, SA>>
639 where
640 SA: adapters::no_key::SerializerAdapter<D>,
641 {
642 let entity_id =
643 self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::WRITER_NO_KEY_USER_DEFINED);
644 let d = self.create_datawriter::<NoKeyWrapper<D>, SAWrapper<SA>>(
645 outer,
646 Some(entity_id),
647 topic,
648 qos,
649 writer_like_stateless,
650 )?;
651 Ok(NoKeyDataWriter::<D, SA>::from_keyed(d))
652 }
653
654 pub fn participant(&self) -> Option<DomainParticipant> {
655 self.domain_participant.clone().upgrade()
656 }
657
658 pub fn get_default_datawriter_qos(&self) -> &QosPolicies {
659 &self.default_datawriter_qos
660 }
661
662 pub fn set_default_datawriter_qos(&mut self, q: &QosPolicies) {
663 self.default_datawriter_qos = q.clone();
664 }
665
666 fn unwrap_or_new_entity_id(
667 &self,
668 entity_id_opt: Option<EntityId>,
669 entity_kind: EntityKind,
670 ) -> EntityId {
671 entity_id_opt.unwrap_or_else(|| self.participant().unwrap().new_entity_id(entity_kind))
674 }
675
676 pub(crate) fn remove_writer(&self, guid: GUID) {
677 try_send_timeout(&self.remove_writer_sender, guid, None)
678 .unwrap_or_else(|e| error!("Cannot remove Writer {guid:?} : {e:?}"));
679 }
680
681 pub(crate) fn identity(&self) -> EntityId {
682 self.id
683 }
684}
685
686impl Debug for InnerPublisher {
687 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
688 f.write_fmt(format_args!("{:?}", self.participant()))?;
689 f.write_fmt(format_args!("Publisher QoS: {:?}", self.my_qos_policies))?;
690 f.write_fmt(format_args!(
691 "Publishers default Writer QoS: {:?}",
692 self.default_datawriter_qos
693 ))
694 }
695}
696
697#[derive(Clone)]
723pub struct Subscriber {
724 inner: Arc<InnerSubscriber>,
725}
726
727impl Subscriber {
728 pub(super) fn new(
729 domain_participant: DomainParticipantWeak,
730 discovery_db: Arc<RwLock<DiscoveryDB>>,
731 qos: QosPolicies,
732 sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
733 sender_remove_reader: mio_channel::SyncSender<GUID>,
734 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
735 security_plugins_handle: Option<SecurityPluginsHandle>,
736 ) -> Self {
737 Self {
738 inner: Arc::new(InnerSubscriber::new(
739 domain_participant,
740 discovery_db,
741 qos,
742 sender_add_reader,
743 sender_remove_reader,
744 discovery_command,
745 security_plugins_handle,
746 )),
747 }
748 }
749
750 pub fn create_datareader<D, SA>(
787 &self,
788 topic: &Topic,
789 qos: Option<QosPolicies>,
790 ) -> CreateResult<WithKeyDataReader<D, SA>>
791 where
792 D: 'static + Keyed,
793 SA: adapters::with_key::DeserializerAdapter<D>,
794 {
795 self.inner.create_datareader(self, topic, None, qos, false)
796 }
797
798 pub fn create_datareader_cdr<D>(
799 &self,
800 topic: &Topic,
801 qos: Option<QosPolicies>,
802 ) -> CreateResult<WithKeyDataReader<D, CDRDeserializerAdapter<D>>>
803 where
804 D: 'static + serde::de::DeserializeOwned + Keyed,
805 for<'de> <D as Keyed>::K: Deserialize<'de>,
806 {
807 self.create_datareader::<D, CDRDeserializerAdapter<D>>(topic, qos)
808 }
809
810 pub fn create_datareader_no_key<D, SA>(
840 &self,
841 topic: &Topic,
842 qos: Option<QosPolicies>,
843 ) -> CreateResult<NoKeyDataReader<D, SA>>
844 where
845 D: 'static,
846 SA: adapters::no_key::DeserializerAdapter<D>,
847 {
848 self
849 .inner
850 .create_datareader_no_key(self, topic, None, qos, false)
851 }
852
853 pub fn create_simple_datareader_no_key<D, DA>(
854 &self,
855 topic: &Topic,
856 qos: Option<QosPolicies>,
857 ) -> CreateResult<no_key::SimpleDataReader<D, DA>>
858 where
859 D: 'static,
860 DA: 'static + adapters::no_key::DeserializerAdapter<D>,
861 {
862 self
863 .inner
864 .create_simple_datareader_no_key(self, topic, None, qos)
865 }
866
867 pub fn create_datareader_no_key_cdr<D>(
868 &self,
869 topic: &Topic,
870 qos: Option<QosPolicies>,
871 ) -> CreateResult<NoKeyDataReader<D, CDRDeserializerAdapter<D>>>
872 where
873 D: 'static + serde::de::DeserializeOwned,
874 {
875 self.create_datareader_no_key::<D, CDRDeserializerAdapter<D>>(topic, qos)
876 }
877
878 pub(crate) fn create_datareader_with_entity_id_with_key<D, SA>(
881 &self,
882 topic: &Topic,
883 entity_id: EntityId,
884 qos: Option<QosPolicies>,
885 reader_like_stateless: bool, ) -> CreateResult<WithKeyDataReader<D, SA>>
887 where
888 D: 'static + Keyed,
889 SA: adapters::with_key::DeserializerAdapter<D>,
890 {
891 self
892 .inner
893 .create_datareader(self, topic, Some(entity_id), qos, reader_like_stateless)
894 }
895
896 #[cfg(feature = "security")] pub(crate) fn create_datareader_with_entity_id_no_key<D, SA>(
898 &self,
899 topic: &Topic,
900 entity_id: EntityId,
901 qos: Option<QosPolicies>,
902 reader_like_stateless: bool, ) -> CreateResult<NoKeyDataReader<D, SA>>
904 where
905 D: 'static,
906 SA: adapters::no_key::DeserializerAdapter<D>,
907 {
908 self
909 .inner
910 .create_datareader_no_key(self, topic, Some(entity_id), qos, reader_like_stateless)
911 }
912
913 pub fn participant(&self) -> Option<DomainParticipant> {
946 self.inner.participant()
947 }
948
949 pub(crate) fn remove_reader(&self, guid: GUID) {
950 self.inner.remove_reader(guid);
951 }
952}
953
954#[derive(Clone)]
955pub struct InnerSubscriber {
956 domain_participant: DomainParticipantWeak,
957 discovery_db: Arc<RwLock<DiscoveryDB>>,
958 qos: QosPolicies,
959 sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
960 sender_remove_reader: mio_channel::SyncSender<GUID>,
961 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
962 security_plugins_handle: Option<SecurityPluginsHandle>,
963}
964
965impl InnerSubscriber {
966 pub(super) fn new(
967 domain_participant: DomainParticipantWeak,
968 discovery_db: Arc<RwLock<DiscoveryDB>>,
969 qos: QosPolicies,
970 sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
971 sender_remove_reader: mio_channel::SyncSender<GUID>,
972 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
973 security_plugins_handle: Option<SecurityPluginsHandle>,
974 ) -> Self {
975 Self {
976 domain_participant,
977 discovery_db,
978 qos,
979 sender_add_reader,
980 sender_remove_reader,
981 discovery_command,
982 security_plugins_handle,
983 }
984 }
985
986 fn create_datareader_internal<D, SA>(
987 &self,
988 outer: &Subscriber,
989 entity_id_opt: Option<EntityId>,
990 topic: &Topic,
991 optional_qos: Option<QosPolicies>,
992 reader_like_stateless: bool, ) -> CreateResult<WithKeyDataReader<D, SA>>
994 where
995 D: 'static + Keyed,
996 SA: adapters::with_key::DeserializerAdapter<D>,
997 {
998 let simple_dr = self.create_simple_datareader_internal(
999 outer,
1000 entity_id_opt,
1001 topic,
1002 optional_qos,
1003 reader_like_stateless,
1004 )?;
1005 Ok(with_key::DataReader::<D, SA>::from_simple_data_reader(
1006 simple_dr,
1007 ))
1008 }
1009
1010 fn create_simple_datareader_internal<D, SA>(
1011 &self,
1012 outer: &Subscriber,
1013 entity_id_opt: Option<EntityId>,
1014 topic: &Topic,
1015 optional_qos: Option<QosPolicies>,
1016 reader_like_stateless: bool, ) -> CreateResult<with_key::SimpleDataReader<D, SA>>
1018 where
1019 D: 'static + Keyed,
1020 SA: adapters::with_key::DeserializerAdapter<D>,
1021 {
1022 let (send, rec) = mio_channel::sync_channel::<()>(4);
1024 let (status_sender, status_receiver) = sync_status_channel::<DataReaderStatus>(4)?;
1026
1027 let (reader_command_sender, reader_command_receiver) =
1029 mio_channel::sync_channel::<ReaderCommand>(0);
1030 let qos = self
1038 .qos
1039 .modify_by(&topic.qos())
1040 .modify_by(&optional_qos.unwrap_or_else(QosPolicies::qos_none));
1041
1042 let entity_id =
1043 self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_WITH_KEY_USER_DEFINED);
1044
1045 let dp = match self.participant() {
1046 Some(dp) => dp,
1047 None => return create_error_dropped!("DomainParticipant doesn't exist anymore."),
1048 };
1049
1050 let topic_cache_handle = match dp.dds_cache().read() {
1052 Ok(dds_cache) => dds_cache.get_existing_topic_cache(&topic.name())?,
1053 Err(e) => return create_error_poisoned!("Cannot lock DDScache. Error: {}", e),
1054 };
1055 match topic_cache_handle.lock() {
1057 Ok(mut tc) => tc.update_keep_limits(&qos),
1058 Err(e) => return create_error_poisoned!("Cannot lock topic cache. Error: {}", e),
1059 };
1060
1061 let reader_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), entity_id);
1062
1063 #[cfg(feature = "security")]
1064 if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
1065 let check_res = sec_handle.get_plugins().check_create_datareader(
1068 reader_guid.prefix,
1069 dp.domain_id(),
1070 topic.name(),
1071 &qos,
1072 );
1073 match check_res {
1074 Ok(check_passed) => {
1075 if !check_passed {
1076 return create_error_not_allowed_by_security!(
1077 "Not allowed to create a DataReader to topic {}",
1078 topic.name()
1079 );
1080 }
1081 }
1082 Err(e) => {
1083 return create_error_internal!(
1085 "Failed to check DataReader rights from Access control: {}",
1086 e.msg
1087 );
1088 }
1089 };
1090
1091 if let Err(e) = {
1093 let reader_security_attributes = sec_handle
1094 .get_plugins()
1095 .get_reader_sec_attributes(reader_guid, topic.name()); reader_security_attributes.and_then(|attributes| {
1097 sec_handle
1098 .get_plugins()
1099 .register_local_reader(reader_guid, qos.property(), attributes)
1100 })
1101 } {
1102 return create_error_internal!(
1103 "Failed to register reader to crypto plugin: {} . GUID: {:?}",
1104 e,
1105 reader_guid
1106 );
1107 } else {
1108 info!("Registered local reader to crypto plugin. GUID: {reader_guid:?}");
1109 }
1110 }
1111
1112 let data_reader_waker = Arc::new(Mutex::new(None));
1113
1114 let (poll_event_source, poll_event_sender) = mio_source::make_poll_channel()?;
1115
1116 let new_reader = ReaderIngredients {
1117 guid: reader_guid,
1118 notification_sender: send,
1119 status_sender,
1120 topic_name: topic.name(),
1121 topic_cache_handle: topic_cache_handle.clone(),
1122 like_stateless: reader_like_stateless,
1123 qos_policy: qos.clone(),
1124 data_reader_command_receiver: reader_command_receiver,
1125 data_reader_waker: data_reader_waker.clone(),
1126 poll_event_sender,
1127 security_plugins: self.security_plugins_handle.clone(),
1128 };
1129
1130 #[cfg(not(feature = "security"))]
1131 let security_info: Option<EndpointSecurityInfo> = None;
1132 #[cfg(feature = "security")]
1133 let security_info = if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
1134 if reader_guid.entity_id.entity_kind.is_user_defined() {
1136 match sec_handle
1137 .get_plugins()
1138 .get_reader_sec_attributes(reader_guid, topic.name())
1139 {
1140 Ok(attr) => EndpointSecurityInfo::from(attr).into(),
1141 Err(e) => {
1142 return create_error_internal!(
1143 "Failed to get security info for reader: {}. Guid: {:?}",
1144 e,
1145 reader_guid
1146 );
1147 }
1148 }
1149 } else {
1150 None }
1152 } else {
1153 None
1155 };
1156
1157 {
1159 let mut db = self
1160 .discovery_db
1161 .write()
1162 .or_else(|e| create_error_poisoned!("Cannot lock discovery_db. {}", e))?;
1163 db.update_local_topic_reader(&dp, topic, &new_reader, security_info);
1164 db.update_topic_data_p(topic);
1165
1166 if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic {
1167 topic_name: topic.name(),
1168 }) {
1169 error!(
1172 "Failed send DiscoveryCommand::AddTopic about topic {}: {}",
1173 topic.name(),
1174 e
1175 );
1176 }
1177 }
1178
1179 let datareader = with_key::SimpleDataReader::<D, SA>::new(
1180 outer.clone(),
1181 entity_id,
1182 topic.clone(),
1183 qos,
1184 rec,
1185 topic_cache_handle,
1186 self.discovery_command.clone(),
1187 status_receiver,
1188 reader_command_sender,
1189 data_reader_waker,
1190 poll_event_source,
1191 )?;
1192
1193 self
1196 .sender_add_reader
1197 .try_send(new_reader)
1198 .or_else(|e| create_error_poisoned!("Cannot add DataReader. Error: {}", e))?;
1199
1200 let reader_guid = self.domain_participant.guid().from_prefix(entity_id);
1202 self
1203 .discovery_command
1204 .try_send(DiscoveryCommand::AddLocalReader { guid: reader_guid })
1205 .or_else(|e| {
1206 create_error_internal!(
1207 "Cannot inform Discovery about the new reader {reader_guid:?}. Error: {}",
1208 e
1209 )
1210 })?;
1211
1212 Ok(datareader)
1214 }
1215
1216 pub fn create_datareader<D, SA>(
1217 &self,
1218 outer: &Subscriber,
1219 topic: &Topic,
1220 entity_id: Option<EntityId>,
1221 qos: Option<QosPolicies>,
1222 reader_like_stateless: bool, ) -> CreateResult<WithKeyDataReader<D, SA>>
1224 where
1225 D: 'static + Keyed,
1226 SA: adapters::with_key::DeserializerAdapter<D>,
1227 {
1228 if topic.kind() != TopicKind::WithKey {
1229 return Err(CreateError::TopicKind(TopicKind::WithKey));
1230 }
1231 self.create_datareader_internal(outer, entity_id, topic, qos, reader_like_stateless)
1232 }
1233
1234 pub fn create_datareader_no_key<D: 'static, SA>(
1235 &self,
1236 outer: &Subscriber,
1237 topic: &Topic,
1238 entity_id_opt: Option<EntityId>,
1239 qos: Option<QosPolicies>,
1240 reader_like_stateless: bool, ) -> CreateResult<NoKeyDataReader<D, SA>>
1242 where
1243 SA: adapters::no_key::DeserializerAdapter<D>,
1244 {
1245 if topic.kind() != TopicKind::NoKey {
1246 return Err(CreateError::TopicKind(TopicKind::NoKey));
1247 }
1248
1249 let entity_id =
1250 self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_NO_KEY_USER_DEFINED);
1251
1252 let d = self.create_datareader_internal::<NoKeyWrapper<D>, DAWrapper<SA>>(
1253 outer,
1254 Some(entity_id),
1255 topic,
1256 qos,
1257 reader_like_stateless,
1258 )?;
1259
1260 Ok(NoKeyDataReader::<D, SA>::from_keyed(d))
1261 }
1262
1263 pub fn create_simple_datareader_no_key<D: 'static, SA>(
1264 &self,
1265 outer: &Subscriber,
1266 topic: &Topic,
1267 entity_id_opt: Option<EntityId>,
1268 qos: Option<QosPolicies>,
1269 ) -> CreateResult<no_key::SimpleDataReader<D, SA>>
1270 where
1271 SA: adapters::no_key::DeserializerAdapter<D> + 'static,
1272 {
1273 if topic.kind() != TopicKind::NoKey {
1274 return Err(CreateError::TopicKind(TopicKind::NoKey));
1275 }
1276
1277 let entity_id =
1278 self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_NO_KEY_USER_DEFINED);
1279
1280 let d = self.create_simple_datareader_internal::<NoKeyWrapper<D>, DAWrapper<SA>>(
1281 outer,
1282 Some(entity_id),
1283 topic,
1284 qos,
1285 false,
1286 )?;
1287
1288 Ok(no_key::SimpleDataReader::<D, SA>::from_keyed(d))
1289 }
1290
1291 pub fn participant(&self) -> Option<DomainParticipant> {
1292 self.domain_participant.clone().upgrade()
1293 }
1294
1295 pub(crate) fn remove_reader(&self, guid: GUID) {
1296 try_send_timeout(&self.sender_remove_reader, guid, None)
1297 .unwrap_or_else(|e| error!("Cannot remove Reader {guid:?} : {e:?}"));
1298 }
1299
1300 fn unwrap_or_new_entity_id(
1301 &self,
1302 entity_id_opt: Option<EntityId>,
1303 entity_kind: EntityKind,
1304 ) -> EntityId {
1305 entity_id_opt.unwrap_or_else(|| self.participant().unwrap().new_entity_id(entity_kind))
1308 }
1309}
1310
1311#[cfg(test)]
1314mod tests {}