rustdds/dds/
pubsub.rs

1use std::{
2  fmt::Debug,
3  sync::{Arc, Mutex, MutexGuard, RwLock},
4  time::Duration,
5};
6
7use serde::{Deserialize, Serialize};
8use mio_extras::channel as mio_channel;
9use byteorder::LittleEndian;
10#[allow(unused_imports)]
11use log::{debug, error, info, trace, warn};
12
13use crate::{
14  create_error_dropped, create_error_internal, create_error_poisoned,
15  dds::{
16    adapters,
17    key::Keyed,
18    no_key,
19    no_key::{
20      datareader::DataReader as NoKeyDataReader, datawriter::DataWriter as NoKeyDataWriter,
21    },
22    participant::*,
23    qos::*,
24    result::{CreateError, CreateResult, WaitResult},
25    statusevents::{sync_status_channel, DataReaderStatus},
26    topic::*,
27    with_key,
28    with_key::{
29      datareader::DataReader as WithKeyDataReader, datawriter::DataWriter as WithKeyDataWriter,
30    },
31  },
32  discovery::{
33    discovery::DiscoveryCommand, discovery_db::DiscoveryDB, sedp_messages::DiscoveredWriterData,
34  },
35  mio_source,
36  rtps::{
37    reader::ReaderIngredients,
38    writer::{WriterCommand, WriterIngredients},
39  },
40  serialization::{CDRDeserializerAdapter, CDRSerializerAdapter},
41  structure::{
42    entity::RTPSEntity,
43    guid::{EntityId, EntityKind, GUID},
44  },
45};
46use super::{
47  helpers::try_send_timeout,
48  no_key::wrappers::{DAWrapper, NoKeyWrapper, SAWrapper},
49  with_key::simpledatareader::ReaderCommand,
50};
51#[cfg(feature = "security")]
52use crate::{
53  create_error_not_allowed_by_security,
54  security::{security_plugins::SecurityPluginsHandle, EndpointSecurityInfo},
55};
56#[cfg(not(feature = "security"))]
57use crate::no_security::{security_plugins::SecurityPluginsHandle, EndpointSecurityInfo};
58
59// -------------------------------------------------------------------
60
61/// DDS Publisher
62///
63/// The Publisher and Subscriber structures are collections of DataWriters
64/// and, respectively, DataReaders. They can contain DataWriters or DataReaders
65/// of different types, and attached to different Topics.
66///
67/// They can act as a domain of sample ordering or atomicity, if such QoS
68/// policies are used. For example, DDS participants could agree via QoS
69/// policies that data samples must be presented to readers in the same order as
70/// writers have written them, and the ordering applies also between several
71/// writers/readers, but within one publisher/subscriber. Analogous arrangement
72/// can be set up w.r.t. coherency: All the samples in a transaction are
73/// delivered to the readers, or none are. The transaction can span several
74/// readers, writers, and topics in a single publisher/subscriber.
75///
76///
77/// # Examples
78///
79/// ```
80/// # use rustdds::DomainParticipant;
81/// # use rustdds::qos::QosPolicyBuilder;
82/// use rustdds::Publisher;
83///
84/// let domain_participant = DomainParticipant::new(0).unwrap();
85/// let qos = QosPolicyBuilder::new().build();
86///
87/// let publisher = domain_participant.create_publisher(&qos);
88/// ```
89#[derive(Clone)]
90pub struct Publisher {
91  inner: Arc<Mutex<InnerPublisher>>,
92}
93
94impl Publisher {
95  #[allow(clippy::too_many_arguments)]
96  pub(super) fn new(
97    dp: DomainParticipantWeak,
98    discovery_db: Arc<RwLock<DiscoveryDB>>,
99    qos: QosPolicies,
100    default_dw_qos: QosPolicies,
101    add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
102    remove_writer_sender: mio_channel::SyncSender<GUID>,
103    discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
104    security_plugins_handle: Option<SecurityPluginsHandle>,
105  ) -> Self {
106    Self {
107      inner: Arc::new(Mutex::new(InnerPublisher::new(
108        dp,
109        discovery_db,
110        qos,
111        default_dw_qos,
112        add_writer_sender,
113        remove_writer_sender,
114        discovery_command,
115        security_plugins_handle,
116      ))),
117    }
118  }
119
120  fn inner_lock(&self) -> MutexGuard<'_, InnerPublisher> {
121    self
122      .inner
123      .lock()
124      .unwrap_or_else(|e| panic!("Inner publisher lock fail! {e:?}"))
125  }
126
127  /// Creates DDS [DataWriter](struct.With_Key_DataWriter.html) for Keyed topic
128  ///
129  /// # Arguments
130  ///
131  /// * `entity_id` - Custom entity id if necessary for the user to define it
132  /// * `topic` - Reference to DDS Topic this writer is created to
133  /// * `qos` - Not currently in use
134  ///
135  /// # Examples
136  ///
137  /// ```
138  /// # use rustdds::*;
139  /// use rustdds::serialization::CDRSerializerAdapter;
140  /// use serde::Serialize;
141  ///
142  /// let domain_participant = DomainParticipant::new(0).unwrap();
143  /// let qos = QosPolicyBuilder::new().build();
144  ///
145  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
146  ///
147  /// #[derive(Serialize)]
148  /// struct SomeType { a: i32 }
149  /// impl Keyed for SomeType {
150  ///   type K = i32;
151  ///
152  ///   fn key(&self) -> Self::K {
153  ///     self.a
154  ///   }
155  /// }
156  ///
157  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
158  /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None);
159  /// ```
160  pub fn create_datawriter<D, SA>(
161    &self,
162    topic: &Topic,
163    qos: Option<QosPolicies>,
164  ) -> CreateResult<WithKeyDataWriter<D, SA>>
165  where
166    D: Keyed,
167    SA: adapters::with_key::SerializerAdapter<D>,
168  {
169    self
170      .inner_lock()
171      .create_datawriter(self, None, topic, qos, false)
172  }
173
174  /// Shorthand for crate_datawriter with Common Data Representation Little
175  /// Endian
176  pub fn create_datawriter_cdr<D>(
177    &self,
178    topic: &Topic,
179    qos: Option<QosPolicies>,
180  ) -> CreateResult<WithKeyDataWriter<D, CDRSerializerAdapter<D, LittleEndian>>>
181  where
182    D: Keyed + serde::Serialize,
183    <D as Keyed>::K: Serialize,
184  {
185    self.create_datawriter::<D, CDRSerializerAdapter<D, LittleEndian>>(topic, qos)
186  }
187
188  /// Creates DDS [DataWriter](struct.DataWriter.html) for Nokey Topic
189  ///
190  /// # Arguments
191  ///
192  /// * `entity_id` - Custom entity id if necessary for the user to define it
193  /// * `topic` - Reference to DDS Topic this writer is created to
194  /// * `qos` - QoS policies for this DataWriter
195  ///
196  /// # Examples
197  ///
198  /// ```
199  /// # use rustdds::*;
200  /// use rustdds::serialization::CDRSerializerAdapter;
201  /// use serde::Serialize;
202  ///
203  /// let domain_participant = DomainParticipant::new(0).unwrap();
204  /// let qos = QosPolicyBuilder::new().build();
205  ///
206  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
207  ///
208  /// #[derive(Serialize)]
209  /// struct SomeType {}
210  ///
211  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
212  /// let data_writer = publisher.create_datawriter_no_key::<SomeType, CDRSerializerAdapter<_>>(&topic, None);
213  /// ```
214  pub fn create_datawriter_no_key<D, SA>(
215    &self,
216    topic: &Topic,
217    qos: Option<QosPolicies>,
218  ) -> CreateResult<NoKeyDataWriter<D, SA>>
219  where
220    SA: adapters::no_key::SerializerAdapter<D>,
221  {
222    self
223      .inner_lock()
224      .create_datawriter_no_key(self, None, topic, qos, false)
225  }
226
227  pub fn create_datawriter_no_key_cdr<D>(
228    &self,
229    topic: &Topic,
230    qos: Option<QosPolicies>,
231  ) -> CreateResult<NoKeyDataWriter<D, CDRSerializerAdapter<D, LittleEndian>>>
232  where
233    D: serde::Serialize,
234  {
235    self.create_datawriter_no_key::<D, CDRSerializerAdapter<D, LittleEndian>>(topic, qos)
236  }
237
238  // Versions with callee-specified EntityId. These are for Discovery use only.
239
240  pub(crate) fn create_datawriter_with_entity_id_with_key<D, SA>(
241    &self,
242    entity_id: EntityId,
243    topic: &Topic,
244    qos: Option<QosPolicies>,
245    writer_like_stateless: bool, // Create a stateless-like RTPS writer?
246  ) -> CreateResult<WithKeyDataWriter<D, SA>>
247  where
248    D: Keyed,
249    SA: adapters::with_key::SerializerAdapter<D>,
250  {
251    self
252      .inner_lock()
253      .create_datawriter(self, Some(entity_id), topic, qos, writer_like_stateless)
254  }
255
256  #[cfg(feature = "security")] // to avoid "never used" warning
257  pub(crate) fn create_datawriter_with_entity_id_no_key<D, SA>(
258    &self,
259    entity_id: EntityId,
260    topic: &Topic,
261    qos: Option<QosPolicies>,
262    writer_like_stateless: bool, // Create a stateless-like RTPS writer?
263  ) -> CreateResult<NoKeyDataWriter<D, SA>>
264  where
265    SA: crate::no_key::SerializerAdapter<D>,
266  {
267    self.inner_lock().create_datawriter_no_key(
268      self,
269      Some(entity_id),
270      topic,
271      qos,
272      writer_like_stateless,
273    )
274  }
275
276  // delete_datawriter should not be needed. The DataWriter object itself should
277  // be deleted to accomplish this.
278
279  // lookup datawriter: maybe not necessary? App should remember datawriters it
280  // has created.
281
282  // Suspend and resume publications are performance optimization methods.
283  // The minimal correct implementation is to do nothing. See DDS spec 2.2.2.4.1.8
284  // and .9
285  /// **NOT IMPLEMENTED. DO NOT USE**
286  #[deprecated(note = "unimplemented")]
287  pub fn suspend_publications(&self) {
288    unimplemented!();
289  }
290
291  /// **NOT IMPLEMENTED. DO NOT USE**
292  #[deprecated(note = "unimplemented")]
293  pub fn resume_publications(&self) {
294    unimplemented!();
295  }
296
297  // coherent change set
298  // In case such QoS is not supported, these should be no-ops.
299  // TODO: Implement these when coherent change-sets are supported.
300  // Coherent set not implemented and currently does nothing
301  /// **NOT IMPLEMENTED. DO NOT USE**
302  #[deprecated(note = "unimplemented")]
303  pub fn begin_coherent_changes(&self) {}
304
305  // Coherent set not implemented and currently does nothing
306  /// **NOT IMPLEMENTED. DO NOT USE**
307  #[deprecated(note = "unimplemented")]
308  pub fn end_coherent_changes(&self) {}
309
310  // Wait for all matched reliable DataReaders acknowledge data written so far,
311  // or timeout.
312  /// **NOT IMPLEMENTED. DO NOT USE**
313  #[deprecated(note = "unimplemented")]
314  pub fn wait_for_acknowledgments(&self, _max_wait: Duration) -> WaitResult<()> {
315    unimplemented!();
316  }
317
318  // What is the use case for this? (is it useful in Rust style of programming?
319  // Should it be public?)
320  /// Gets [DomainParticipant](struct.DomainParticipant.html) if it has not
321  /// disappeared from all scopes.
322  ///
323  /// # Example
324  ///
325  /// ```
326  /// # use rustdds::*;
327  ///
328  /// let domain_participant = DomainParticipant::new(0).unwrap();
329  /// let qos = QosPolicyBuilder::new().build();
330  ///
331  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
332  /// assert_eq!(domain_participant, publisher.participant().unwrap());
333  /// ```
334  pub fn participant(&self) -> Option<DomainParticipant> {
335    self.inner_lock().domain_participant.clone().upgrade()
336  }
337
338  // delete_contained_entities: We should not need this. Contained DataWriters
339  // should dispose themselves and notify publisher.
340
341  /// Returns default DataWriter qos.
342  ///
343  /// # Example
344  ///
345  /// ```
346  /// # use rustdds::*;
347  ///
348  /// let domain_participant = DomainParticipant::new(0).unwrap();
349  /// let qos = QosPolicyBuilder::new().build();
350  ///
351  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
352  /// assert_eq!(qos, publisher.get_default_datawriter_qos());
353  /// ```
354  pub fn get_default_datawriter_qos(&self) -> QosPolicies {
355    self.inner_lock().get_default_datawriter_qos().clone()
356  }
357
358  /// Sets default DataWriter qos.
359  ///
360  /// # Example
361  ///
362  /// ```
363  /// # use rustdds::*;
364  ///
365  /// let domain_participant = DomainParticipant::new(0).unwrap();
366  /// let qos = QosPolicyBuilder::new().build();
367  ///
368  /// let mut publisher = domain_participant.create_publisher(&qos).unwrap();
369  /// let qos2 =
370  /// QosPolicyBuilder::new().durability(policy::Durability::Transient).build();
371  /// publisher.set_default_datawriter_qos(&qos2);
372  ///
373  /// assert_ne!(qos, publisher.get_default_datawriter_qos());
374  /// assert_eq!(qos2, publisher.get_default_datawriter_qos());
375  /// ```
376  pub fn set_default_datawriter_qos(&mut self, q: &QosPolicies) {
377    self.inner_lock().set_default_datawriter_qos(q);
378  }
379
380  // This is used on DataWriter .drop()
381  pub(crate) fn remove_writer(&self, guid: GUID) {
382    self.inner_lock().remove_writer(guid);
383  }
384} // impl
385
386impl PartialEq for Publisher {
387  fn eq(&self, other: &Self) -> bool {
388    let id_self = { self.inner_lock().identity() };
389    let id_other = { other.inner_lock().identity() };
390    id_self == id_other
391  }
392}
393
394impl Debug for Publisher {
395  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396    self.inner_lock().fmt(f)
397  }
398}
399
400// "Inner" struct
401
402#[derive(Clone)]
403struct InnerPublisher {
404  id: EntityId,
405  domain_participant: DomainParticipantWeak,
406  discovery_db: Arc<RwLock<DiscoveryDB>>,
407  my_qos_policies: QosPolicies,
408  default_datawriter_qos: QosPolicies, // used when creating a new DataWriter
409  add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
410  remove_writer_sender: mio_channel::SyncSender<GUID>,
411  discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
412  security_plugins_handle: Option<SecurityPluginsHandle>,
413}
414
415// public interface for Publisher
416impl InnerPublisher {
417  #[allow(clippy::too_many_arguments)]
418  fn new(
419    dp: DomainParticipantWeak,
420    discovery_db: Arc<RwLock<DiscoveryDB>>,
421    qos: QosPolicies,
422    default_dw_qos: QosPolicies,
423    add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
424    remove_writer_sender: mio_channel::SyncSender<GUID>,
425    discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
426    security_plugins_handle: Option<SecurityPluginsHandle>,
427  ) -> Self {
428    // We generate an arbitrary but unique id to distinguish Publishers from each
429    // other. EntityKind is just some value, since we do not show it to anyone.
430    let id = EntityId::MAX;
431    // dp.clone().upgrade().unwrap().new_entity_id(EntityKind::UNKNOWN_BUILT_IN);
432
433    Self {
434      id,
435      domain_participant: dp,
436      discovery_db,
437      my_qos_policies: qos,
438      default_datawriter_qos: default_dw_qos,
439      add_writer_sender,
440      remove_writer_sender,
441      discovery_command,
442      security_plugins_handle,
443    }
444  }
445
446  pub fn create_datawriter<D, SA>(
447    &self,
448    outer: &Publisher,
449    entity_id_opt: Option<EntityId>,
450    topic: &Topic,
451    optional_qos: Option<QosPolicies>,
452    writer_like_stateless: bool, // Create a stateless-like RTPS writer? Usually false
453  ) -> CreateResult<WithKeyDataWriter<D, SA>>
454  where
455    D: Keyed,
456    SA: adapters::with_key::SerializerAdapter<D>,
457  {
458    // Data samples from DataWriter to HistoryCache
459    let (dwcc_upload, hccc_download) = mio_channel::sync_channel::<WriterCommand>(16);
460    let writer_waker = Arc::new(Mutex::new(None));
461    // Status reports back from Writer to DataWriter.
462    let (status_sender, status_receiver) = sync_status_channel(4)?;
463
464    // DDS Spec 2.2.2.4.1.5 create_datawriter:
465    // If no QoS is specified, we should take the Publisher default
466    // QoS, modify it to match any QoS settings (that are set) in the
467    // Topic QoS and use that.
468
469    // Use Publisher QoS as basis, modify by Topic settings, and modify by specified
470    // QoS.
471    let writer_qos = self
472      .default_datawriter_qos
473      .modify_by(&topic.qos())
474      .modify_by(&optional_qos.unwrap_or_else(QosPolicies::qos_none));
475
476    let entity_id =
477      self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::WRITER_WITH_KEY_USER_DEFINED);
478    let dp = self
479      .participant()
480      .ok_or("upgrade fail")
481      .or_else(|e| create_error_dropped!("Where is my DomainParticipant? {}", e))?;
482
483    let guid = GUID::new_with_prefix_and_id(dp.guid().prefix, entity_id);
484
485    #[cfg(feature = "security")]
486    if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
487      // Security is enabled.
488      // Check are we allowed to create the DataWriter from Access control
489      let check_res = sec_handle.get_plugins().check_create_datawriter(
490        guid.prefix,
491        dp.domain_id(),
492        topic.name(),
493        &writer_qos,
494      );
495      match check_res {
496        Ok(check_passed) => {
497          if !check_passed {
498            return create_error_not_allowed_by_security!(
499              "Not allowed to create a DataWriter to topic {}",
500              topic.name()
501            );
502          }
503        }
504        Err(e) => {
505          // Something went wrong in the check
506          return create_error_internal!(
507            "Failed to check DataWriter rights from Access control: {}",
508            e.msg
509          );
510        }
511      };
512
513      // Register Writer to crypto plugin
514      if let Err(e) = {
515        let writer_security_attributes = sec_handle
516          .get_plugins()
517          .get_writer_sec_attributes(guid, topic.name()); // Release lock
518        writer_security_attributes.and_then(|attributes| {
519          sec_handle
520            .get_plugins()
521            .register_local_writer(guid, writer_qos.property(), attributes)
522        })
523      } {
524        return create_error_internal!(
525          "Failed to register writer to crypto plugin: {} . GUID: {:?}",
526          e,
527          guid
528        );
529      } else {
530        info!("Registered local writer to crypto plugin. GUID: {guid:?}");
531      }
532    }
533
534    let new_writer = WriterIngredients {
535      guid,
536      writer_command_receiver: hccc_download,
537      writer_command_receiver_waker: Arc::clone(&writer_waker),
538      topic_name: topic.name(),
539      like_stateless: writer_like_stateless,
540      qos_policies: writer_qos.clone(),
541      status_sender,
542      security_plugins: self.security_plugins_handle.clone(),
543    };
544
545    // Send writer ingredients to DP event loop, where the actual writer will be
546    // constructed
547    self
548      .add_writer_sender
549      .send(new_writer)
550      .or_else(|e| create_error_poisoned!("Adding a new writer failed: {}", e))?;
551
552    let data_writer = WithKeyDataWriter::<D, SA>::new(
553      outer.clone(),
554      topic.clone(),
555      writer_qos,
556      guid,
557      dwcc_upload,
558      writer_waker,
559      self.discovery_command.clone(),
560      status_receiver,
561    )?;
562
563    // notify Discovery DB
564    let mut db = self
565      .discovery_db
566      .write()
567      .map_err(|e| CreateError::Poisoned {
568        reason: format!("Discovery DB: {e}"),
569      })?;
570
571    #[cfg(not(feature = "security"))]
572    let security_info = None;
573    #[cfg(feature = "security")]
574    let security_info = if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
575      // Security enabled
576      if guid.entity_id.entity_kind.is_user_defined() {
577        match sec_handle
578          .get_plugins()
579          .get_writer_sec_attributes(guid, topic.name())
580        {
581          Ok(attr) => EndpointSecurityInfo::from(attr).into(),
582          Err(e) => {
583            return create_error_internal!(
584              "Failed to get security info for writer: {}. Guid: {:?}",
585              e,
586              guid
587            );
588          }
589        }
590      } else {
591        None // For the built-in topics
592      }
593    } else {
594      // No security enabled
595      None
596    };
597
598    // Update topic to DiscoveryDB & inform Discovery about it
599    let dwd = DiscoveredWriterData::new(&data_writer, topic, &dp, security_info);
600    db.update_local_topic_writer(dwd);
601    db.update_topic_data_p(topic);
602
603    if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic {
604      topic_name: topic.name(),
605    }) {
606      // Log the error but don't quit, failing to inform Discovery about the topic
607      // shouldn't be that serious
608      error!(
609        "Failed send DiscoveryCommand::AddTopic about topic {}: {}",
610        topic.name(),
611        e
612      );
613    }
614
615    // Inform Discovery about the new writer
616    let writer_guid = self.domain_participant.guid().from_prefix(entity_id);
617    self
618      .discovery_command
619      .try_send(DiscoveryCommand::AddLocalWriter { guid: writer_guid })
620      .or_else(|e| {
621        create_error_internal!(
622          "Cannot inform Discovery about the new writer {writer_guid:?}. Error: {}",
623          e
624        )
625      })?;
626
627    // Return the DataWriter to user
628    Ok(data_writer)
629  }
630
631  pub fn create_datawriter_no_key<D, SA>(
632    &self,
633    outer: &Publisher,
634    entity_id_opt: Option<EntityId>,
635    topic: &Topic,
636    qos: Option<QosPolicies>,
637    writer_like_stateless: bool, // Create a stateless-like RTPS writer? Usually false
638  ) -> CreateResult<NoKeyDataWriter<D, SA>>
639  where
640    SA: adapters::no_key::SerializerAdapter<D>,
641  {
642    let entity_id =
643      self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::WRITER_NO_KEY_USER_DEFINED);
644    let d = self.create_datawriter::<NoKeyWrapper<D>, SAWrapper<SA>>(
645      outer,
646      Some(entity_id),
647      topic,
648      qos,
649      writer_like_stateless,
650    )?;
651    Ok(NoKeyDataWriter::<D, SA>::from_keyed(d))
652  }
653
654  pub fn participant(&self) -> Option<DomainParticipant> {
655    self.domain_participant.clone().upgrade()
656  }
657
658  pub fn get_default_datawriter_qos(&self) -> &QosPolicies {
659    &self.default_datawriter_qos
660  }
661
662  pub fn set_default_datawriter_qos(&mut self, q: &QosPolicies) {
663    self.default_datawriter_qos = q.clone();
664  }
665
666  fn unwrap_or_new_entity_id(
667    &self,
668    entity_id_opt: Option<EntityId>,
669    entity_kind: EntityKind,
670  ) -> EntityId {
671    // If the entity_id is given, then just use that. If not, then pull an arbitrary
672    // number out of participant's hat.
673    entity_id_opt.unwrap_or_else(|| self.participant().unwrap().new_entity_id(entity_kind))
674  }
675
676  pub(crate) fn remove_writer(&self, guid: GUID) {
677    try_send_timeout(&self.remove_writer_sender, guid, None)
678      .unwrap_or_else(|e| error!("Cannot remove Writer {guid:?} : {e:?}"));
679  }
680
681  pub(crate) fn identity(&self) -> EntityId {
682    self.id
683  }
684}
685
686impl Debug for InnerPublisher {
687  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
688    f.write_fmt(format_args!("{:?}", self.participant()))?;
689    f.write_fmt(format_args!("Publisher QoS: {:?}", self.my_qos_policies))?;
690    f.write_fmt(format_args!(
691      "Publishers default Writer QoS: {:?}",
692      self.default_datawriter_qos
693    ))
694  }
695}
696
697// -------------------------------------------------------------------
698// -------------------------------------------------------------------
699// -------------------------------------------------------------------
700// -------------------------------------------------------------------
701// -------------------------------------------------------------------
702// -------------------------------------------------------------------
703// -------------------------------------------------------------------
704// -------------------------------------------------------------------
705// -------------------------------------------------------------------
706// -------------------------------------------------------------------
707
708/// DDS Subscriber
709///
710/// See overview at [`Publisher`].
711///
712/// # Examples
713///
714/// ```
715/// # use rustdds::*;
716///
717/// let domain_participant = DomainParticipant::new(0).unwrap();
718/// let qos = QosPolicyBuilder::new().build();
719///
720/// let subscriber = domain_participant.create_subscriber(&qos);
721/// ```
722#[derive(Clone)]
723pub struct Subscriber {
724  inner: Arc<InnerSubscriber>,
725}
726
727impl Subscriber {
728  pub(super) fn new(
729    domain_participant: DomainParticipantWeak,
730    discovery_db: Arc<RwLock<DiscoveryDB>>,
731    qos: QosPolicies,
732    sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
733    sender_remove_reader: mio_channel::SyncSender<GUID>,
734    discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
735    security_plugins_handle: Option<SecurityPluginsHandle>,
736  ) -> Self {
737    Self {
738      inner: Arc::new(InnerSubscriber::new(
739        domain_participant,
740        discovery_db,
741        qos,
742        sender_add_reader,
743        sender_remove_reader,
744        discovery_command,
745        security_plugins_handle,
746      )),
747    }
748  }
749
750  /// Creates DDS DataReader for keyed Topics
751  ///
752  /// # Arguments
753  ///
754  /// * `topic` - Reference to the DDS [Topic](struct.Topic.html) this reader
755  ///   reads from
756  /// * `entity_id` - Optional [EntityId](data_types/struct.EntityId.html) if
757  ///   necessary for DDS communication (random if None)
758  /// * `qos` - Not in use
759  ///
760  /// # Examples
761  ///
762  /// ```
763  /// # use rustdds::*;
764  /// use serde::Deserialize;
765  /// use rustdds::serialization::CDRDeserializerAdapter;
766  /// #
767  /// # let domain_participant = DomainParticipant::new(0).unwrap();
768  /// # let qos = QosPolicyBuilder::new().build();
769  /// #
770  ///
771  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
772  ///
773  /// #[derive(Deserialize)]
774  /// struct SomeType { a: i32 }
775  /// impl Keyed for SomeType {
776  ///   type K = i32;
777  ///
778  ///   fn key(&self) -> Self::K {
779  ///     self.a
780  ///   }
781  /// }
782  ///
783  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
784  /// let data_reader = subscriber.create_datareader::<SomeType, CDRDeserializerAdapter<_>>(&topic, None);
785  /// ```
786  pub fn create_datareader<D, SA>(
787    &self,
788    topic: &Topic,
789    qos: Option<QosPolicies>,
790  ) -> CreateResult<WithKeyDataReader<D, SA>>
791  where
792    D: 'static + Keyed,
793    SA: adapters::with_key::DeserializerAdapter<D>,
794  {
795    self.inner.create_datareader(self, topic, None, qos, false)
796  }
797
798  pub fn create_datareader_cdr<D>(
799    &self,
800    topic: &Topic,
801    qos: Option<QosPolicies>,
802  ) -> CreateResult<WithKeyDataReader<D, CDRDeserializerAdapter<D>>>
803  where
804    D: 'static + serde::de::DeserializeOwned + Keyed,
805    for<'de> <D as Keyed>::K: Deserialize<'de>,
806  {
807    self.create_datareader::<D, CDRDeserializerAdapter<D>>(topic, qos)
808  }
809
810  /// Create DDS DataReader for non keyed Topics
811  ///
812  /// # Arguments
813  ///
814  /// * `topic` - Reference to the DDS [Topic](struct.Topic.html) this reader
815  ///   reads from
816  /// * `entity_id` - Optional [EntityId](data_types/struct.EntityId.html) if
817  ///   necessary for DDS communication (random if None)
818  /// * `qos` - Not in use
819  ///
820  /// # Examples
821  ///
822  /// ```
823  /// # use rustdds::*;
824  /// use serde::Deserialize;
825  /// use rustdds::serialization::CDRDeserializerAdapter;
826  /// #
827  /// # let domain_participant = DomainParticipant::new(0).unwrap();
828  /// # let qos = QosPolicyBuilder::new().build();
829  /// #
830  ///
831  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
832  ///
833  /// #[derive(Deserialize)]
834  /// struct SomeType {}
835  ///
836  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
837  /// let data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None);
838  /// ```
839  pub fn create_datareader_no_key<D, SA>(
840    &self,
841    topic: &Topic,
842    qos: Option<QosPolicies>,
843  ) -> CreateResult<NoKeyDataReader<D, SA>>
844  where
845    D: 'static,
846    SA: adapters::no_key::DeserializerAdapter<D>,
847  {
848    self
849      .inner
850      .create_datareader_no_key(self, topic, None, qos, false)
851  }
852
853  pub fn create_simple_datareader_no_key<D, DA>(
854    &self,
855    topic: &Topic,
856    qos: Option<QosPolicies>,
857  ) -> CreateResult<no_key::SimpleDataReader<D, DA>>
858  where
859    D: 'static,
860    DA: 'static + adapters::no_key::DeserializerAdapter<D>,
861  {
862    self
863      .inner
864      .create_simple_datareader_no_key(self, topic, None, qos)
865  }
866
867  pub fn create_datareader_no_key_cdr<D>(
868    &self,
869    topic: &Topic,
870    qos: Option<QosPolicies>,
871  ) -> CreateResult<NoKeyDataReader<D, CDRDeserializerAdapter<D>>>
872  where
873    D: 'static + serde::de::DeserializeOwned,
874  {
875    self.create_datareader_no_key::<D, CDRDeserializerAdapter<D>>(topic, qos)
876  }
877
878  // versions with callee-specified EntityId. These are for Discovery use only.
879
880  pub(crate) fn create_datareader_with_entity_id_with_key<D, SA>(
881    &self,
882    topic: &Topic,
883    entity_id: EntityId,
884    qos: Option<QosPolicies>,
885    reader_like_stateless: bool, // Create a stateless-like RTPS reader?
886  ) -> CreateResult<WithKeyDataReader<D, SA>>
887  where
888    D: 'static + Keyed,
889    SA: adapters::with_key::DeserializerAdapter<D>,
890  {
891    self
892      .inner
893      .create_datareader(self, topic, Some(entity_id), qos, reader_like_stateless)
894  }
895
896  #[cfg(feature = "security")] // to avoid "never used" warning
897  pub(crate) fn create_datareader_with_entity_id_no_key<D, SA>(
898    &self,
899    topic: &Topic,
900    entity_id: EntityId,
901    qos: Option<QosPolicies>,
902    reader_like_stateless: bool, // Create a stateless-like RTPS reader?
903  ) -> CreateResult<NoKeyDataReader<D, SA>>
904  where
905    D: 'static,
906    SA: adapters::no_key::DeserializerAdapter<D>,
907  {
908    self
909      .inner
910      .create_datareader_no_key(self, topic, Some(entity_id), qos, reader_like_stateless)
911  }
912
913  // Retrieves a previously created DataReader belonging to the Subscriber.
914  // TODO: Is this even possible. Would probably need to return reference and
915  // store references on creation
916  /*
917  pub(crate) fn lookup_datareader<D, SA>(
918    &self,
919    _topic_name: &str,
920  ) -> Option<WithKeyDataReader<D, SA>>
921  where
922    D: Keyed + DeserializeOwned,
923    SA: DeserializerAdapter<D>,
924  {
925    todo!()
926  // TO think: Is this really necessary? Because the caller would have to know
927    // types D and SA. Should we just trust whoever creates DataReaders to also remember them?
928  }
929  */
930
931  /// Returns [DomainParticipant](struct.DomainParticipant.html) if it is sill
932  /// alive.
933  ///
934  /// # Example
935  ///
936  /// ```
937  /// # use rustdds::*;
938  /// #
939  /// let domain_participant = DomainParticipant::new(0).unwrap();
940  /// let qos = QosPolicyBuilder::new().build();
941  ///
942  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
943  /// assert_eq!(domain_participant, subscriber.participant().unwrap());
944  /// ```
945  pub fn participant(&self) -> Option<DomainParticipant> {
946    self.inner.participant()
947  }
948
949  pub(crate) fn remove_reader(&self, guid: GUID) {
950    self.inner.remove_reader(guid);
951  }
952}
953
954#[derive(Clone)]
955pub struct InnerSubscriber {
956  domain_participant: DomainParticipantWeak,
957  discovery_db: Arc<RwLock<DiscoveryDB>>,
958  qos: QosPolicies,
959  sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
960  sender_remove_reader: mio_channel::SyncSender<GUID>,
961  discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
962  security_plugins_handle: Option<SecurityPluginsHandle>,
963}
964
965impl InnerSubscriber {
966  pub(super) fn new(
967    domain_participant: DomainParticipantWeak,
968    discovery_db: Arc<RwLock<DiscoveryDB>>,
969    qos: QosPolicies,
970    sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
971    sender_remove_reader: mio_channel::SyncSender<GUID>,
972    discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
973    security_plugins_handle: Option<SecurityPluginsHandle>,
974  ) -> Self {
975    Self {
976      domain_participant,
977      discovery_db,
978      qos,
979      sender_add_reader,
980      sender_remove_reader,
981      discovery_command,
982      security_plugins_handle,
983    }
984  }
985
986  fn create_datareader_internal<D, SA>(
987    &self,
988    outer: &Subscriber,
989    entity_id_opt: Option<EntityId>,
990    topic: &Topic,
991    optional_qos: Option<QosPolicies>,
992    reader_like_stateless: bool, // Create a stateless-like RTPS reader? Usually false
993  ) -> CreateResult<WithKeyDataReader<D, SA>>
994  where
995    D: 'static + Keyed,
996    SA: adapters::with_key::DeserializerAdapter<D>,
997  {
998    let simple_dr = self.create_simple_datareader_internal(
999      outer,
1000      entity_id_opt,
1001      topic,
1002      optional_qos,
1003      reader_like_stateless,
1004    )?;
1005    Ok(with_key::DataReader::<D, SA>::from_simple_data_reader(
1006      simple_dr,
1007    ))
1008  }
1009
1010  fn create_simple_datareader_internal<D, SA>(
1011    &self,
1012    outer: &Subscriber,
1013    entity_id_opt: Option<EntityId>,
1014    topic: &Topic,
1015    optional_qos: Option<QosPolicies>,
1016    reader_like_stateless: bool, // Create a stateless-like RTPS reader? Usually false
1017  ) -> CreateResult<with_key::SimpleDataReader<D, SA>>
1018  where
1019    D: 'static + Keyed,
1020    SA: adapters::with_key::DeserializerAdapter<D>,
1021  {
1022    // incoming data notification channel from Reader to DataReader
1023    let (send, rec) = mio_channel::sync_channel::<()>(4);
1024    // status change channel from Reader to DataReader
1025    let (status_sender, status_receiver) = sync_status_channel::<DataReaderStatus>(4)?;
1026
1027    // reader command channel from Datareader to Reader
1028    let (reader_command_sender, reader_command_receiver) =
1029      mio_channel::sync_channel::<ReaderCommand>(0);
1030    // The buffer length is zero, i.e. sender and receiver must rendezvous at
1031    // send/receive. This is needed to synchronize sending of wakers from
1032    // DataReader to Reader. If the capacity is increased, then some data
1033    // available for reading notifications may be missed.
1034
1035    // Use subscriber QoS as basis, modify by Topic settings, and modify by
1036    // specified QoS.
1037    let qos = self
1038      .qos
1039      .modify_by(&topic.qos())
1040      .modify_by(&optional_qos.unwrap_or_else(QosPolicies::qos_none));
1041
1042    let entity_id =
1043      self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_WITH_KEY_USER_DEFINED);
1044
1045    let dp = match self.participant() {
1046      Some(dp) => dp,
1047      None => return create_error_dropped!("DomainParticipant doesn't exist anymore."),
1048    };
1049
1050    // Get a handle to the topic cache
1051    let topic_cache_handle = match dp.dds_cache().read() {
1052      Ok(dds_cache) => dds_cache.get_existing_topic_cache(&topic.name())?,
1053      Err(e) => return create_error_poisoned!("Cannot lock DDScache. Error: {}", e),
1054    };
1055    // Update topic cache with DataReader's Qos
1056    match topic_cache_handle.lock() {
1057      Ok(mut tc) => tc.update_keep_limits(&qos),
1058      Err(e) => return create_error_poisoned!("Cannot lock topic cache. Error: {}", e),
1059    };
1060
1061    let reader_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), entity_id);
1062
1063    #[cfg(feature = "security")]
1064    if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
1065      // Security is enabled.
1066      // Check are we allowed to create the DataReader from Access control
1067      let check_res = sec_handle.get_plugins().check_create_datareader(
1068        reader_guid.prefix,
1069        dp.domain_id(),
1070        topic.name(),
1071        &qos,
1072      );
1073      match check_res {
1074        Ok(check_passed) => {
1075          if !check_passed {
1076            return create_error_not_allowed_by_security!(
1077              "Not allowed to create a DataReader to topic {}",
1078              topic.name()
1079            );
1080          }
1081        }
1082        Err(e) => {
1083          // Something went wrong in the check
1084          return create_error_internal!(
1085            "Failed to check DataReader rights from Access control: {}",
1086            e.msg
1087          );
1088        }
1089      };
1090
1091      // Register Reader to crypto plugin
1092      if let Err(e) = {
1093        let reader_security_attributes = sec_handle
1094          .get_plugins()
1095          .get_reader_sec_attributes(reader_guid, topic.name()); // Release lock
1096        reader_security_attributes.and_then(|attributes| {
1097          sec_handle
1098            .get_plugins()
1099            .register_local_reader(reader_guid, qos.property(), attributes)
1100        })
1101      } {
1102        return create_error_internal!(
1103          "Failed to register reader to crypto plugin: {} . GUID: {:?}",
1104          e,
1105          reader_guid
1106        );
1107      } else {
1108        info!("Registered local reader to crypto plugin. GUID: {reader_guid:?}");
1109      }
1110    }
1111
1112    let data_reader_waker = Arc::new(Mutex::new(None));
1113
1114    let (poll_event_source, poll_event_sender) = mio_source::make_poll_channel()?;
1115
1116    let new_reader = ReaderIngredients {
1117      guid: reader_guid,
1118      notification_sender: send,
1119      status_sender,
1120      topic_name: topic.name(),
1121      topic_cache_handle: topic_cache_handle.clone(),
1122      like_stateless: reader_like_stateless,
1123      qos_policy: qos.clone(),
1124      data_reader_command_receiver: reader_command_receiver,
1125      data_reader_waker: data_reader_waker.clone(),
1126      poll_event_sender,
1127      security_plugins: self.security_plugins_handle.clone(),
1128    };
1129
1130    #[cfg(not(feature = "security"))]
1131    let security_info: Option<EndpointSecurityInfo> = None;
1132    #[cfg(feature = "security")]
1133    let security_info = if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
1134      // Security enabled
1135      if reader_guid.entity_id.entity_kind.is_user_defined() {
1136        match sec_handle
1137          .get_plugins()
1138          .get_reader_sec_attributes(reader_guid, topic.name())
1139        {
1140          Ok(attr) => EndpointSecurityInfo::from(attr).into(),
1141          Err(e) => {
1142            return create_error_internal!(
1143              "Failed to get security info for reader: {}. Guid: {:?}",
1144              e,
1145              reader_guid
1146            );
1147          }
1148        }
1149      } else {
1150        None // For the built-in topics
1151      }
1152    } else {
1153      // No security enabled
1154      None
1155    };
1156
1157    // Update topic to DiscoveryDB & inform Discovery about it
1158    {
1159      let mut db = self
1160        .discovery_db
1161        .write()
1162        .or_else(|e| create_error_poisoned!("Cannot lock discovery_db. {}", e))?;
1163      db.update_local_topic_reader(&dp, topic, &new_reader, security_info);
1164      db.update_topic_data_p(topic);
1165
1166      if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic {
1167        topic_name: topic.name(),
1168      }) {
1169        // Log the error but don't quit, failing to inform Discovery about the topic
1170        // shouldn't be that serious
1171        error!(
1172          "Failed send DiscoveryCommand::AddTopic about topic {}: {}",
1173          topic.name(),
1174          e
1175        );
1176      }
1177    }
1178
1179    let datareader = with_key::SimpleDataReader::<D, SA>::new(
1180      outer.clone(),
1181      entity_id,
1182      topic.clone(),
1183      qos,
1184      rec,
1185      topic_cache_handle,
1186      self.discovery_command.clone(),
1187      status_receiver,
1188      reader_command_sender,
1189      data_reader_waker,
1190      poll_event_source,
1191    )?;
1192
1193    // Send reader ingredients to DP event loop, where the actual reader will be
1194    // constructed
1195    self
1196      .sender_add_reader
1197      .try_send(new_reader)
1198      .or_else(|e| create_error_poisoned!("Cannot add DataReader. Error: {}", e))?;
1199
1200    // Inform Discovery about the new reader
1201    let reader_guid = self.domain_participant.guid().from_prefix(entity_id);
1202    self
1203      .discovery_command
1204      .try_send(DiscoveryCommand::AddLocalReader { guid: reader_guid })
1205      .or_else(|e| {
1206        create_error_internal!(
1207          "Cannot inform Discovery about the new reader {reader_guid:?}. Error: {}",
1208          e
1209        )
1210      })?;
1211
1212    // Return the DataReader to user
1213    Ok(datareader)
1214  }
1215
1216  pub fn create_datareader<D, SA>(
1217    &self,
1218    outer: &Subscriber,
1219    topic: &Topic,
1220    entity_id: Option<EntityId>,
1221    qos: Option<QosPolicies>,
1222    reader_like_stateless: bool, // Create a stateless-like RTPS reader? Usually false
1223  ) -> CreateResult<WithKeyDataReader<D, SA>>
1224  where
1225    D: 'static + Keyed,
1226    SA: adapters::with_key::DeserializerAdapter<D>,
1227  {
1228    if topic.kind() != TopicKind::WithKey {
1229      return Err(CreateError::TopicKind(TopicKind::WithKey));
1230    }
1231    self.create_datareader_internal(outer, entity_id, topic, qos, reader_like_stateless)
1232  }
1233
1234  pub fn create_datareader_no_key<D: 'static, SA>(
1235    &self,
1236    outer: &Subscriber,
1237    topic: &Topic,
1238    entity_id_opt: Option<EntityId>,
1239    qos: Option<QosPolicies>,
1240    reader_like_stateless: bool, // Create a stateless-like RTPS reader? Usually false
1241  ) -> CreateResult<NoKeyDataReader<D, SA>>
1242  where
1243    SA: adapters::no_key::DeserializerAdapter<D>,
1244  {
1245    if topic.kind() != TopicKind::NoKey {
1246      return Err(CreateError::TopicKind(TopicKind::NoKey));
1247    }
1248
1249    let entity_id =
1250      self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_NO_KEY_USER_DEFINED);
1251
1252    let d = self.create_datareader_internal::<NoKeyWrapper<D>, DAWrapper<SA>>(
1253      outer,
1254      Some(entity_id),
1255      topic,
1256      qos,
1257      reader_like_stateless,
1258    )?;
1259
1260    Ok(NoKeyDataReader::<D, SA>::from_keyed(d))
1261  }
1262
1263  pub fn create_simple_datareader_no_key<D: 'static, SA>(
1264    &self,
1265    outer: &Subscriber,
1266    topic: &Topic,
1267    entity_id_opt: Option<EntityId>,
1268    qos: Option<QosPolicies>,
1269  ) -> CreateResult<no_key::SimpleDataReader<D, SA>>
1270  where
1271    SA: adapters::no_key::DeserializerAdapter<D> + 'static,
1272  {
1273    if topic.kind() != TopicKind::NoKey {
1274      return Err(CreateError::TopicKind(TopicKind::NoKey));
1275    }
1276
1277    let entity_id =
1278      self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_NO_KEY_USER_DEFINED);
1279
1280    let d = self.create_simple_datareader_internal::<NoKeyWrapper<D>, DAWrapper<SA>>(
1281      outer,
1282      Some(entity_id),
1283      topic,
1284      qos,
1285      false,
1286    )?;
1287
1288    Ok(no_key::SimpleDataReader::<D, SA>::from_keyed(d))
1289  }
1290
1291  pub fn participant(&self) -> Option<DomainParticipant> {
1292    self.domain_participant.clone().upgrade()
1293  }
1294
1295  pub(crate) fn remove_reader(&self, guid: GUID) {
1296    try_send_timeout(&self.sender_remove_reader, guid, None)
1297      .unwrap_or_else(|e| error!("Cannot remove Reader {guid:?} : {e:?}"));
1298  }
1299
1300  fn unwrap_or_new_entity_id(
1301    &self,
1302    entity_id_opt: Option<EntityId>,
1303    entity_kind: EntityKind,
1304  ) -> EntityId {
1305    // If the entity_id is given, then just use that. If not, then pull an arbitrary
1306    // number out of participant's hat.
1307    entity_id_opt.unwrap_or_else(|| self.participant().unwrap().new_entity_id(entity_kind))
1308  }
1309}
1310
1311// -------------------------------------------------------------------
1312
1313#[cfg(test)]
1314mod tests {}