rustdds/dds/
pubsub.rs

1use std::{
2  fmt::Debug,
3  sync::{Arc, Mutex, MutexGuard, RwLock},
4  time::Duration,
5};
6
7use serde::{Deserialize, Serialize};
8use mio_extras::channel as mio_channel;
9use byteorder::LittleEndian;
10#[allow(unused_imports)]
11use log::{debug, error, info, trace, warn};
12
13use crate::{
14  create_error_dropped, create_error_poisoned,
15  dds::{
16    adapters,
17    key::Keyed,
18    no_key,
19    no_key::{
20      datareader::DataReader as NoKeyDataReader, datawriter::DataWriter as NoKeyDataWriter,
21    },
22    participant::*,
23    qos::*,
24    result::{CreateError, CreateResult, WaitResult},
25    statusevents::{sync_status_channel, DataReaderStatus},
26    topic::*,
27    with_key,
28    with_key::{
29      datareader::DataReader as WithKeyDataReader, datawriter::DataWriter as WithKeyDataWriter,
30    },
31  },
32  discovery::{
33    discovery::DiscoveryCommand, discovery_db::DiscoveryDB, sedp_messages::DiscoveredWriterData,
34  },
35  mio_source,
36  rtps::{
37    reader::ReaderIngredients,
38    writer::{WriterCommand, WriterIngredients},
39  },
40  serialization::{CDRDeserializerAdapter, CDRSerializerAdapter},
41  structure::{
42    entity::RTPSEntity,
43    guid::{EntityId, EntityKind, GUID},
44  },
45};
46use super::{
47  helpers::try_send_timeout,
48  no_key::wrappers::{DAWrapper, NoKeyWrapper, SAWrapper},
49  with_key::simpledatareader::ReaderCommand,
50};
51#[cfg(feature = "security")]
52use crate::{
53  create_error_internal, create_error_not_allowed_by_security,
54  security::{security_plugins::SecurityPluginsHandle, EndpointSecurityInfo},
55};
56#[cfg(not(feature = "security"))]
57use crate::no_security::{security_plugins::SecurityPluginsHandle, EndpointSecurityInfo};
58
59// -------------------------------------------------------------------
60
61/// DDS Publisher
62///
63/// The Publisher and Subscriber structures are collections of DataWriters
64/// and, respectively, DataReaders. They can contain DataWriters or DataReaders
65/// of different types, and attached to different Topics.
66///
67/// They can act as a domain of sample ordering or atomicity, if such QoS
68/// policies are used. For example, DDS participants could agree via QoS
69/// policies that data samples must be presented to readers in the same order as
70/// writers have written them, and the ordering applies also between several
71/// writers/readers, but within one publisher/subscriber. Analogous arrangement
72/// can be set up w.r.t. coherency: All the samples in a transaction are
73/// delivered to the readers, or none are. The transaction can span several
74/// readers, writers, and topics in a single publisher/subscriber.
75///
76///
77/// # Examples
78///
79/// ```
80/// # use rustdds::DomainParticipant;
81/// # use rustdds::qos::QosPolicyBuilder;
82/// use rustdds::Publisher;
83///
84/// let domain_participant = DomainParticipant::new(0).unwrap();
85/// let qos = QosPolicyBuilder::new().build();
86///
87/// let publisher = domain_participant.create_publisher(&qos);
88/// ```
89#[derive(Clone)]
90pub struct Publisher {
91  inner: Arc<Mutex<InnerPublisher>>,
92}
93
94impl Publisher {
95  #[allow(clippy::too_many_arguments)]
96  pub(super) fn new(
97    dp: DomainParticipantWeak,
98    discovery_db: Arc<RwLock<DiscoveryDB>>,
99    qos: QosPolicies,
100    default_dw_qos: QosPolicies,
101    add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
102    remove_writer_sender: mio_channel::SyncSender<GUID>,
103    discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
104    security_plugins_handle: Option<SecurityPluginsHandle>,
105  ) -> Self {
106    Self {
107      inner: Arc::new(Mutex::new(InnerPublisher::new(
108        dp,
109        discovery_db,
110        qos,
111        default_dw_qos,
112        add_writer_sender,
113        remove_writer_sender,
114        discovery_command,
115        security_plugins_handle,
116      ))),
117    }
118  }
119
120  fn inner_lock(&self) -> MutexGuard<'_, InnerPublisher> {
121    self
122      .inner
123      .lock()
124      .unwrap_or_else(|e| panic!("Inner publisher lock fail! {e:?}"))
125  }
126
127  /// Creates DDS [DataWriter](struct.With_Key_DataWriter.html) for Keyed topic
128  ///
129  /// # Arguments
130  ///
131  /// * `entity_id` - Custom entity id if necessary for the user to define it
132  /// * `topic` - Reference to DDS Topic this writer is created to
133  /// * `qos` - Not currently in use
134  ///
135  /// # Examples
136  ///
137  /// ```
138  /// # use rustdds::*;
139  /// use rustdds::serialization::CDRSerializerAdapter;
140  /// use serde::Serialize;
141  ///
142  /// let domain_participant = DomainParticipant::new(0).unwrap();
143  /// let qos = QosPolicyBuilder::new().build();
144  ///
145  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
146  ///
147  /// #[derive(Serialize)]
148  /// struct SomeType { a: i32 }
149  /// impl Keyed for SomeType {
150  ///   type K = i32;
151  ///
152  ///   fn key(&self) -> Self::K {
153  ///     self.a
154  ///   }
155  /// }
156  ///
157  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
158  /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None);
159  /// ```
160  pub fn create_datawriter<D, SA>(
161    &self,
162    topic: &Topic,
163    qos: Option<QosPolicies>,
164  ) -> CreateResult<WithKeyDataWriter<D, SA>>
165  where
166    D: Keyed,
167    SA: adapters::with_key::SerializerAdapter<D>,
168  {
169    self
170      .inner_lock()
171      .create_datawriter(self, None, topic, qos, false)
172  }
173
174  /// Shorthand for crate_datawriter with Common Data Representation Little
175  /// Endian
176  pub fn create_datawriter_cdr<D>(
177    &self,
178    topic: &Topic,
179    qos: Option<QosPolicies>,
180  ) -> CreateResult<WithKeyDataWriter<D, CDRSerializerAdapter<D, LittleEndian>>>
181  where
182    D: Keyed + serde::Serialize,
183    <D as Keyed>::K: Serialize,
184  {
185    self.create_datawriter::<D, CDRSerializerAdapter<D, LittleEndian>>(topic, qos)
186  }
187
188  /// Creates DDS [DataWriter](struct.DataWriter.html) for Nokey Topic
189  ///
190  /// # Arguments
191  ///
192  /// * `entity_id` - Custom entity id if necessary for the user to define it
193  /// * `topic` - Reference to DDS Topic this writer is created to
194  /// * `qos` - QoS policies for this DataWriter
195  ///
196  /// # Examples
197  ///
198  /// ```
199  /// # use rustdds::*;
200  /// use rustdds::serialization::CDRSerializerAdapter;
201  /// use serde::Serialize;
202  ///
203  /// let domain_participant = DomainParticipant::new(0).unwrap();
204  /// let qos = QosPolicyBuilder::new().build();
205  ///
206  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
207  ///
208  /// #[derive(Serialize)]
209  /// struct SomeType {}
210  ///
211  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
212  /// let data_writer = publisher.create_datawriter_no_key::<SomeType, CDRSerializerAdapter<_>>(&topic, None);
213  /// ```
214  pub fn create_datawriter_no_key<D, SA>(
215    &self,
216    topic: &Topic,
217    qos: Option<QosPolicies>,
218  ) -> CreateResult<NoKeyDataWriter<D, SA>>
219  where
220    SA: adapters::no_key::SerializerAdapter<D>,
221  {
222    self
223      .inner_lock()
224      .create_datawriter_no_key(self, None, topic, qos, false)
225  }
226
227  pub fn create_datawriter_no_key_cdr<D>(
228    &self,
229    topic: &Topic,
230    qos: Option<QosPolicies>,
231  ) -> CreateResult<NoKeyDataWriter<D, CDRSerializerAdapter<D, LittleEndian>>>
232  where
233    D: serde::Serialize,
234  {
235    self.create_datawriter_no_key::<D, CDRSerializerAdapter<D, LittleEndian>>(topic, qos)
236  }
237
238  // Versions with callee-specified EntityId. These are for Discovery use only.
239
240  pub(crate) fn create_datawriter_with_entity_id_with_key<D, SA>(
241    &self,
242    entity_id: EntityId,
243    topic: &Topic,
244    qos: Option<QosPolicies>,
245    writer_like_stateless: bool, // Create a stateless-like RTPS writer?
246  ) -> CreateResult<WithKeyDataWriter<D, SA>>
247  where
248    D: Keyed,
249    SA: adapters::with_key::SerializerAdapter<D>,
250  {
251    self
252      .inner_lock()
253      .create_datawriter(self, Some(entity_id), topic, qos, writer_like_stateless)
254  }
255
256  #[cfg(feature = "security")] // to avoid "never used" warning
257  pub(crate) fn create_datawriter_with_entity_id_no_key<D, SA>(
258    &self,
259    entity_id: EntityId,
260    topic: &Topic,
261    qos: Option<QosPolicies>,
262    writer_like_stateless: bool, // Create a stateless-like RTPS writer?
263  ) -> CreateResult<NoKeyDataWriter<D, SA>>
264  where
265    SA: crate::no_key::SerializerAdapter<D>,
266  {
267    self.inner_lock().create_datawriter_no_key(
268      self,
269      Some(entity_id),
270      topic,
271      qos,
272      writer_like_stateless,
273    )
274  }
275
276  // delete_datawriter should not be needed. The DataWriter object itself should
277  // be deleted to accomplish this.
278
279  // lookup datawriter: maybe not necessary? App should remember datawriters it
280  // has created.
281
282  // Suspend and resume publications are performance optimization methods.
283  // The minimal correct implementation is to do nothing. See DDS spec 2.2.2.4.1.8
284  // and .9
285  /// **NOT IMPLEMENTED. DO NOT USE**
286  #[deprecated(note = "unimplemented")]
287  pub fn suspend_publications(&self) {
288    unimplemented!();
289  }
290
291  /// **NOT IMPLEMENTED. DO NOT USE**
292  #[deprecated(note = "unimplemented")]
293  pub fn resume_publications(&self) {
294    unimplemented!();
295  }
296
297  // coherent change set
298  // In case such QoS is not supported, these should be no-ops.
299  // TODO: Implement these when coherent change-sets are supported.
300  // Coherent set not implemented and currently does nothing
301  /// **NOT IMPLEMENTED. DO NOT USE**
302  #[deprecated(note = "unimplemented")]
303  pub fn begin_coherent_changes(&self) {}
304
305  // Coherent set not implemented and currently does nothing
306  /// **NOT IMPLEMENTED. DO NOT USE**
307  #[deprecated(note = "unimplemented")]
308  pub fn end_coherent_changes(&self) {}
309
310  // Wait for all matched reliable DataReaders acknowledge data written so far,
311  // or timeout.
312  /// **NOT IMPLEMENTED. DO NOT USE**
313  #[deprecated(note = "unimplemented")]
314  pub fn wait_for_acknowledgments(&self, _max_wait: Duration) -> WaitResult<()> {
315    unimplemented!();
316  }
317
318  // What is the use case for this? (is it useful in Rust style of programming?
319  // Should it be public?)
320  /// Gets [DomainParticipant](struct.DomainParticipant.html) if it has not
321  /// disappeared from all scopes.
322  ///
323  /// # Example
324  ///
325  /// ```
326  /// # use rustdds::*;
327  ///
328  /// let domain_participant = DomainParticipant::new(0).unwrap();
329  /// let qos = QosPolicyBuilder::new().build();
330  ///
331  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
332  /// assert_eq!(domain_participant, publisher.participant().unwrap());
333  /// ```
334  pub fn participant(&self) -> Option<DomainParticipant> {
335    self.inner_lock().domain_participant.clone().upgrade()
336  }
337
338  // delete_contained_entities: We should not need this. Contained DataWriters
339  // should dispose themselves and notify publisher.
340
341  /// Returns default DataWriter qos.
342  ///
343  /// # Example
344  ///
345  /// ```
346  /// # use rustdds::*;
347  ///
348  /// let domain_participant = DomainParticipant::new(0).unwrap();
349  /// let qos = QosPolicyBuilder::new().build();
350  ///
351  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
352  /// assert_eq!(qos, publisher.get_default_datawriter_qos());
353  /// ```
354  pub fn get_default_datawriter_qos(&self) -> QosPolicies {
355    self.inner_lock().get_default_datawriter_qos().clone()
356  }
357
358  /// Sets default DataWriter qos.
359  ///
360  /// # Example
361  ///
362  /// ```
363  /// # use rustdds::*;
364  ///
365  /// let domain_participant = DomainParticipant::new(0).unwrap();
366  /// let qos = QosPolicyBuilder::new().build();
367  ///
368  /// let mut publisher = domain_participant.create_publisher(&qos).unwrap();
369  /// let qos2 =
370  /// QosPolicyBuilder::new().durability(policy::Durability::Transient).build();
371  /// publisher.set_default_datawriter_qos(&qos2);
372  ///
373  /// assert_ne!(qos, publisher.get_default_datawriter_qos());
374  /// assert_eq!(qos2, publisher.get_default_datawriter_qos());
375  /// ```
376  pub fn set_default_datawriter_qos(&mut self, q: &QosPolicies) {
377    self.inner_lock().set_default_datawriter_qos(q);
378  }
379
380  // This is used on DataWriter .drop()
381  pub(crate) fn remove_writer(&self, guid: GUID) {
382    self.inner_lock().remove_writer(guid);
383  }
384} // impl
385
386impl PartialEq for Publisher {
387  fn eq(&self, other: &Self) -> bool {
388    let id_self = { self.inner_lock().identity() };
389    let id_other = { other.inner_lock().identity() };
390    id_self == id_other
391  }
392}
393
394impl Debug for Publisher {
395  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396    self.inner_lock().fmt(f)
397  }
398}
399
400// "Inner" struct
401
402#[derive(Clone)]
403struct InnerPublisher {
404  id: EntityId,
405  domain_participant: DomainParticipantWeak,
406  discovery_db: Arc<RwLock<DiscoveryDB>>,
407  my_qos_policies: QosPolicies,
408  default_datawriter_qos: QosPolicies, // used when creating a new DataWriter
409  add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
410  remove_writer_sender: mio_channel::SyncSender<GUID>,
411  discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
412  security_plugins_handle: Option<SecurityPluginsHandle>,
413}
414
415// public interface for Publisher
416impl InnerPublisher {
417  #[allow(clippy::too_many_arguments)]
418  fn new(
419    dp: DomainParticipantWeak,
420    discovery_db: Arc<RwLock<DiscoveryDB>>,
421    qos: QosPolicies,
422    default_dw_qos: QosPolicies,
423    add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
424    remove_writer_sender: mio_channel::SyncSender<GUID>,
425    discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
426    security_plugins_handle: Option<SecurityPluginsHandle>,
427  ) -> Self {
428    // We generate an arbitrary but unique id to distinguish Publishers from each
429    // other. EntityKind is just some value, since we do not show it to anyone.
430    let id = EntityId::MAX;
431    // dp.clone().upgrade().unwrap().new_entity_id(EntityKind::UNKNOWN_BUILT_IN);
432
433    Self {
434      id,
435      domain_participant: dp,
436      discovery_db,
437      my_qos_policies: qos,
438      default_datawriter_qos: default_dw_qos,
439      add_writer_sender,
440      remove_writer_sender,
441      discovery_command,
442      security_plugins_handle,
443    }
444  }
445
446  pub fn create_datawriter<D, SA>(
447    &self,
448    outer: &Publisher,
449    entity_id_opt: Option<EntityId>,
450    topic: &Topic,
451    optional_qos: Option<QosPolicies>,
452    writer_like_stateless: bool, // Create a stateless-like RTPS writer? Usually false
453  ) -> CreateResult<WithKeyDataWriter<D, SA>>
454  where
455    D: Keyed,
456    SA: adapters::with_key::SerializerAdapter<D>,
457  {
458    // Data samples from DataWriter to HistoryCache
459    let (dwcc_upload, hccc_download) = mio_channel::sync_channel::<WriterCommand>(16);
460    let writer_waker = Arc::new(Mutex::new(None));
461    // Status reports back from Writer to DataWriter.
462    let (status_sender, status_receiver) = sync_status_channel(4)?;
463
464    // DDS Spec 2.2.2.4.1.5 create_datawriter:
465    // If no QoS is specified, we should take the Publisher default
466    // QoS, modify it to match any QoS settings (that are set) in the
467    // Topic QoS and use that.
468
469    // Use Publisher QoS as basis, modify by Topic settings, and modify by specified
470    // QoS.
471    let writer_qos = self
472      .default_datawriter_qos
473      .modify_by(&topic.qos())
474      .modify_by(&optional_qos.unwrap_or_else(QosPolicies::qos_none));
475
476    let entity_id =
477      self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::WRITER_WITH_KEY_USER_DEFINED);
478    let dp = self
479      .participant()
480      .ok_or("upgrade fail")
481      .or_else(|e| create_error_dropped!("Where is my DomainParticipant? {}", e))?;
482
483    let guid = GUID::new_with_prefix_and_id(dp.guid().prefix, entity_id);
484
485    #[cfg(feature = "security")]
486    if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
487      // Security is enabled.
488      // Check are we allowed to create the DataWriter from Access control
489      let check_res = sec_handle.get_plugins().check_create_datawriter(
490        guid.prefix,
491        dp.domain_id(),
492        topic.name(),
493        &writer_qos,
494      );
495      match check_res {
496        Ok(check_passed) => {
497          if !check_passed {
498            return create_error_not_allowed_by_security!(
499              "Not allowed to create a DataWriter to topic {}",
500              topic.name()
501            );
502          }
503        }
504        Err(e) => {
505          // Something went wrong in the check
506          return create_error_internal!(
507            "Failed to check DataWriter rights from Access control: {}",
508            e.msg
509          );
510        }
511      };
512
513      // Register Writer to crypto plugin
514      if let Err(e) = {
515        let writer_security_attributes = sec_handle
516          .get_plugins()
517          .get_writer_sec_attributes(guid, topic.name()); // Release lock
518        writer_security_attributes.and_then(|attributes| {
519          sec_handle
520            .get_plugins()
521            .register_local_writer(guid, writer_qos.property(), attributes)
522        })
523      } {
524        return create_error_internal!(
525          "Failed to register writer to crypto plugin: {} . GUID: {:?}",
526          e,
527          guid
528        );
529      } else {
530        info!("Registered local writer to crypto plugin. GUID: {guid:?}");
531      }
532    }
533
534    // Construct the data writer
535    let data_writer = WithKeyDataWriter::<D, SA>::new(
536      outer.clone(),
537      topic.clone(),
538      writer_qos.clone(),
539      guid,
540      dwcc_upload,
541      Arc::clone(&writer_waker),
542      self.discovery_command.clone(),
543      status_receiver,
544    )?;
545
546    // Construct security info if needed
547    #[cfg(not(feature = "security"))]
548    let security_info = None;
549    #[cfg(feature = "security")]
550    let security_info = if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
551      // Security enabled
552      if guid.entity_id.entity_kind.is_user_defined() {
553        match sec_handle
554          .get_plugins()
555          .get_writer_sec_attributes(guid, topic.name())
556        {
557          Ok(attr) => EndpointSecurityInfo::from(attr).into(),
558          Err(e) => {
559            return create_error_internal!(
560              "Failed to get security info for writer: {}. Guid: {:?}",
561              e,
562              guid
563            );
564          }
565        }
566      } else {
567        None // For the built-in topics
568      }
569    } else {
570      // No security enabled
571      None
572    };
573
574    // Add the topic & writer to Discovery DB
575    let mut db = self
576      .discovery_db
577      .write()
578      .map_err(|e| CreateError::Poisoned {
579        reason: format!("Discovery DB: {e}"),
580      })?;
581
582    let dwd = DiscoveredWriterData::new(&data_writer, topic, &dp, security_info);
583    db.update_local_topic_writer(dwd);
584    db.update_topic_data_p(topic);
585
586    // Inform Discovery about the topic
587    if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic {
588      topic_name: topic.name(),
589    }) {
590      // Log the error but don't quit, failing to inform Discovery about the topic
591      // shouldn't be that serious
592      error!(
593        "Failed send DiscoveryCommand::AddTopic about topic {}: {}",
594        topic.name(),
595        e
596      );
597    }
598
599    // Note: notifying Discovery about the new writer is no longer done here.
600    // Instead, it's done by the DP event loop once it has actually created the new
601    // writer. This is done to avoid data races.
602
603    // Send writer ingredients to DP event loop, where the actual writer will be
604    // constructed
605    let new_writer = WriterIngredients {
606      guid,
607      writer_command_receiver: hccc_download,
608      writer_command_receiver_waker: writer_waker,
609      topic_name: topic.name(),
610      like_stateless: writer_like_stateless,
611      qos_policies: writer_qos,
612      status_sender,
613      security_plugins: self.security_plugins_handle.clone(),
614    };
615
616    self
617      .add_writer_sender
618      .send(new_writer)
619      .or_else(|e| create_error_poisoned!("Adding a new writer failed: {}", e))?;
620
621    // Return the DataWriter to user
622    Ok(data_writer)
623  }
624
625  pub fn create_datawriter_no_key<D, SA>(
626    &self,
627    outer: &Publisher,
628    entity_id_opt: Option<EntityId>,
629    topic: &Topic,
630    qos: Option<QosPolicies>,
631    writer_like_stateless: bool, // Create a stateless-like RTPS writer? Usually false
632  ) -> CreateResult<NoKeyDataWriter<D, SA>>
633  where
634    SA: adapters::no_key::SerializerAdapter<D>,
635  {
636    let entity_id =
637      self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::WRITER_NO_KEY_USER_DEFINED);
638    let d = self.create_datawriter::<NoKeyWrapper<D>, SAWrapper<SA>>(
639      outer,
640      Some(entity_id),
641      topic,
642      qos,
643      writer_like_stateless,
644    )?;
645    Ok(NoKeyDataWriter::<D, SA>::from_keyed(d))
646  }
647
648  pub fn participant(&self) -> Option<DomainParticipant> {
649    self.domain_participant.clone().upgrade()
650  }
651
652  pub fn get_default_datawriter_qos(&self) -> &QosPolicies {
653    &self.default_datawriter_qos
654  }
655
656  pub fn set_default_datawriter_qos(&mut self, q: &QosPolicies) {
657    self.default_datawriter_qos = q.clone();
658  }
659
660  fn unwrap_or_new_entity_id(
661    &self,
662    entity_id_opt: Option<EntityId>,
663    entity_kind: EntityKind,
664  ) -> EntityId {
665    // If the entity_id is given, then just use that. If not, then pull an arbitrary
666    // number out of participant's hat.
667    entity_id_opt.unwrap_or_else(|| self.participant().unwrap().new_entity_id(entity_kind))
668  }
669
670  pub(crate) fn remove_writer(&self, guid: GUID) {
671    try_send_timeout(&self.remove_writer_sender, guid, None)
672      .unwrap_or_else(|e| error!("Cannot remove Writer {guid:?} : {e:?}"));
673  }
674
675  pub(crate) fn identity(&self) -> EntityId {
676    self.id
677  }
678}
679
680impl Debug for InnerPublisher {
681  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
682    f.write_fmt(format_args!("{:?}", self.participant()))?;
683    f.write_fmt(format_args!("Publisher QoS: {:?}", self.my_qos_policies))?;
684    f.write_fmt(format_args!(
685      "Publishers default Writer QoS: {:?}",
686      self.default_datawriter_qos
687    ))
688  }
689}
690
691// -------------------------------------------------------------------
692// -------------------------------------------------------------------
693// -------------------------------------------------------------------
694// -------------------------------------------------------------------
695// -------------------------------------------------------------------
696// -------------------------------------------------------------------
697// -------------------------------------------------------------------
698// -------------------------------------------------------------------
699// -------------------------------------------------------------------
700// -------------------------------------------------------------------
701
702/// DDS Subscriber
703///
704/// See overview at [`Publisher`].
705///
706/// # Examples
707///
708/// ```
709/// # use rustdds::*;
710///
711/// let domain_participant = DomainParticipant::new(0).unwrap();
712/// let qos = QosPolicyBuilder::new().build();
713///
714/// let subscriber = domain_participant.create_subscriber(&qos);
715/// ```
716#[derive(Clone)]
717pub struct Subscriber {
718  inner: Arc<InnerSubscriber>,
719}
720
721impl Subscriber {
722  pub(super) fn new(
723    domain_participant: DomainParticipantWeak,
724    discovery_db: Arc<RwLock<DiscoveryDB>>,
725    qos: QosPolicies,
726    sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
727    sender_remove_reader: mio_channel::SyncSender<GUID>,
728    discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
729    security_plugins_handle: Option<SecurityPluginsHandle>,
730  ) -> Self {
731    Self {
732      inner: Arc::new(InnerSubscriber::new(
733        domain_participant,
734        discovery_db,
735        qos,
736        sender_add_reader,
737        sender_remove_reader,
738        discovery_command,
739        security_plugins_handle,
740      )),
741    }
742  }
743
744  /// Creates DDS DataReader for keyed Topics
745  ///
746  /// # Arguments
747  ///
748  /// * `topic` - Reference to the DDS [Topic](struct.Topic.html) this reader
749  ///   reads from
750  /// * `entity_id` - Optional [EntityId](data_types/struct.EntityId.html) if
751  ///   necessary for DDS communication (random if None)
752  /// * `qos` - Not in use
753  ///
754  /// # Examples
755  ///
756  /// ```
757  /// # use rustdds::*;
758  /// use serde::Deserialize;
759  /// use rustdds::serialization::CDRDeserializerAdapter;
760  /// #
761  /// # let domain_participant = DomainParticipant::new(0).unwrap();
762  /// # let qos = QosPolicyBuilder::new().build();
763  /// #
764  ///
765  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
766  ///
767  /// #[derive(Deserialize)]
768  /// struct SomeType { a: i32 }
769  /// impl Keyed for SomeType {
770  ///   type K = i32;
771  ///
772  ///   fn key(&self) -> Self::K {
773  ///     self.a
774  ///   }
775  /// }
776  ///
777  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
778  /// let data_reader = subscriber.create_datareader::<SomeType, CDRDeserializerAdapter<_>>(&topic, None);
779  /// ```
780  pub fn create_datareader<D, SA>(
781    &self,
782    topic: &Topic,
783    qos: Option<QosPolicies>,
784  ) -> CreateResult<WithKeyDataReader<D, SA>>
785  where
786    D: 'static + Keyed,
787    SA: adapters::with_key::DeserializerAdapter<D>,
788  {
789    self.inner.create_datareader(self, topic, None, qos, false)
790  }
791
792  pub fn create_datareader_cdr<D>(
793    &self,
794    topic: &Topic,
795    qos: Option<QosPolicies>,
796  ) -> CreateResult<WithKeyDataReader<D, CDRDeserializerAdapter<D>>>
797  where
798    D: 'static + serde::de::DeserializeOwned + Keyed,
799    for<'de> <D as Keyed>::K: Deserialize<'de>,
800  {
801    self.create_datareader::<D, CDRDeserializerAdapter<D>>(topic, qos)
802  }
803
804  /// Create DDS DataReader for non keyed Topics
805  ///
806  /// # Arguments
807  ///
808  /// * `topic` - Reference to the DDS [Topic](struct.Topic.html) this reader
809  ///   reads from
810  /// * `entity_id` - Optional [EntityId](data_types/struct.EntityId.html) if
811  ///   necessary for DDS communication (random if None)
812  /// * `qos` - Not in use
813  ///
814  /// # Examples
815  ///
816  /// ```
817  /// # use rustdds::*;
818  /// use serde::Deserialize;
819  /// use rustdds::serialization::CDRDeserializerAdapter;
820  /// #
821  /// # let domain_participant = DomainParticipant::new(0).unwrap();
822  /// # let qos = QosPolicyBuilder::new().build();
823  /// #
824  ///
825  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
826  ///
827  /// #[derive(Deserialize)]
828  /// struct SomeType {}
829  ///
830  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
831  /// let data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None);
832  /// ```
833  pub fn create_datareader_no_key<D, SA>(
834    &self,
835    topic: &Topic,
836    qos: Option<QosPolicies>,
837  ) -> CreateResult<NoKeyDataReader<D, SA>>
838  where
839    D: 'static,
840    SA: adapters::no_key::DeserializerAdapter<D>,
841  {
842    self
843      .inner
844      .create_datareader_no_key(self, topic, None, qos, false)
845  }
846
847  pub fn create_simple_datareader_no_key<D, DA>(
848    &self,
849    topic: &Topic,
850    qos: Option<QosPolicies>,
851  ) -> CreateResult<no_key::SimpleDataReader<D, DA>>
852  where
853    D: 'static,
854    DA: 'static + adapters::no_key::DeserializerAdapter<D>,
855  {
856    self
857      .inner
858      .create_simple_datareader_no_key(self, topic, None, qos)
859  }
860
861  pub fn create_datareader_no_key_cdr<D>(
862    &self,
863    topic: &Topic,
864    qos: Option<QosPolicies>,
865  ) -> CreateResult<NoKeyDataReader<D, CDRDeserializerAdapter<D>>>
866  where
867    D: 'static + serde::de::DeserializeOwned,
868  {
869    self.create_datareader_no_key::<D, CDRDeserializerAdapter<D>>(topic, qos)
870  }
871
872  // versions with callee-specified EntityId. These are for Discovery use only.
873
874  pub(crate) fn create_datareader_with_entity_id_with_key<D, SA>(
875    &self,
876    topic: &Topic,
877    entity_id: EntityId,
878    qos: Option<QosPolicies>,
879    reader_like_stateless: bool, // Create a stateless-like RTPS reader?
880  ) -> CreateResult<WithKeyDataReader<D, SA>>
881  where
882    D: 'static + Keyed,
883    SA: adapters::with_key::DeserializerAdapter<D>,
884  {
885    self
886      .inner
887      .create_datareader(self, topic, Some(entity_id), qos, reader_like_stateless)
888  }
889
890  #[cfg(feature = "security")] // to avoid "never used" warning
891  pub(crate) fn create_datareader_with_entity_id_no_key<D, SA>(
892    &self,
893    topic: &Topic,
894    entity_id: EntityId,
895    qos: Option<QosPolicies>,
896    reader_like_stateless: bool, // Create a stateless-like RTPS reader?
897  ) -> CreateResult<NoKeyDataReader<D, SA>>
898  where
899    D: 'static,
900    SA: adapters::no_key::DeserializerAdapter<D>,
901  {
902    self
903      .inner
904      .create_datareader_no_key(self, topic, Some(entity_id), qos, reader_like_stateless)
905  }
906
907  // Retrieves a previously created DataReader belonging to the Subscriber.
908  // TODO: Is this even possible. Would probably need to return reference and
909  // store references on creation
910  /*
911  pub(crate) fn lookup_datareader<D, SA>(
912    &self,
913    _topic_name: &str,
914  ) -> Option<WithKeyDataReader<D, SA>>
915  where
916    D: Keyed + DeserializeOwned,
917    SA: DeserializerAdapter<D>,
918  {
919    todo!()
920  // TO think: Is this really necessary? Because the caller would have to know
921    // types D and SA. Should we just trust whoever creates DataReaders to also remember them?
922  }
923  */
924
925  /// Returns [DomainParticipant](struct.DomainParticipant.html) if it is sill
926  /// alive.
927  ///
928  /// # Example
929  ///
930  /// ```
931  /// # use rustdds::*;
932  /// #
933  /// let domain_participant = DomainParticipant::new(0).unwrap();
934  /// let qos = QosPolicyBuilder::new().build();
935  ///
936  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
937  /// assert_eq!(domain_participant, subscriber.participant().unwrap());
938  /// ```
939  pub fn participant(&self) -> Option<DomainParticipant> {
940    self.inner.participant()
941  }
942
943  pub(crate) fn remove_reader(&self, guid: GUID) {
944    self.inner.remove_reader(guid);
945  }
946}
947
948#[derive(Clone)]
949pub struct InnerSubscriber {
950  domain_participant: DomainParticipantWeak,
951  discovery_db: Arc<RwLock<DiscoveryDB>>,
952  qos: QosPolicies,
953  sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
954  sender_remove_reader: mio_channel::SyncSender<GUID>,
955  discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
956  security_plugins_handle: Option<SecurityPluginsHandle>,
957}
958
959impl InnerSubscriber {
960  pub(super) fn new(
961    domain_participant: DomainParticipantWeak,
962    discovery_db: Arc<RwLock<DiscoveryDB>>,
963    qos: QosPolicies,
964    sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
965    sender_remove_reader: mio_channel::SyncSender<GUID>,
966    discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
967    security_plugins_handle: Option<SecurityPluginsHandle>,
968  ) -> Self {
969    Self {
970      domain_participant,
971      discovery_db,
972      qos,
973      sender_add_reader,
974      sender_remove_reader,
975      discovery_command,
976      security_plugins_handle,
977    }
978  }
979
980  fn create_datareader_internal<D, SA>(
981    &self,
982    outer: &Subscriber,
983    entity_id_opt: Option<EntityId>,
984    topic: &Topic,
985    optional_qos: Option<QosPolicies>,
986    reader_like_stateless: bool, // Create a stateless-like RTPS reader? Usually false
987  ) -> CreateResult<WithKeyDataReader<D, SA>>
988  where
989    D: 'static + Keyed,
990    SA: adapters::with_key::DeserializerAdapter<D>,
991  {
992    let simple_dr = self.create_simple_datareader_internal(
993      outer,
994      entity_id_opt,
995      topic,
996      optional_qos,
997      reader_like_stateless,
998    )?;
999    Ok(with_key::DataReader::<D, SA>::from_simple_data_reader(
1000      simple_dr,
1001    ))
1002  }
1003
1004  fn create_simple_datareader_internal<D, SA>(
1005    &self,
1006    outer: &Subscriber,
1007    entity_id_opt: Option<EntityId>,
1008    topic: &Topic,
1009    optional_qos: Option<QosPolicies>,
1010    reader_like_stateless: bool, // Create a stateless-like RTPS reader? Usually false
1011  ) -> CreateResult<with_key::SimpleDataReader<D, SA>>
1012  where
1013    D: 'static + Keyed,
1014    SA: adapters::with_key::DeserializerAdapter<D>,
1015  {
1016    // incoming data notification channel from Reader to DataReader
1017    let (send, rec) = mio_channel::sync_channel::<()>(4);
1018    // status change channel from Reader to DataReader
1019    let (status_sender, status_receiver) = sync_status_channel::<DataReaderStatus>(4)?;
1020
1021    // reader command channel from Datareader to Reader
1022    let (reader_command_sender, reader_command_receiver) =
1023      mio_channel::sync_channel::<ReaderCommand>(0);
1024    // The buffer length is zero, i.e. sender and receiver must rendezvous at
1025    // send/receive. This is needed to synchronize sending of wakers from
1026    // DataReader to Reader. If the capacity is increased, then some data
1027    // available for reading notifications may be missed.
1028
1029    // Use subscriber QoS as basis, modify by Topic settings, and modify by
1030    // specified QoS.
1031    let qos = self
1032      .qos
1033      .modify_by(&topic.qos())
1034      .modify_by(&optional_qos.unwrap_or_else(QosPolicies::qos_none));
1035
1036    let entity_id =
1037      self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_WITH_KEY_USER_DEFINED);
1038
1039    let dp = match self.participant() {
1040      Some(dp) => dp,
1041      None => return create_error_dropped!("DomainParticipant doesn't exist anymore."),
1042    };
1043
1044    // Get a handle to the topic cache
1045    let topic_cache_handle = match dp.dds_cache().read() {
1046      Ok(dds_cache) => dds_cache.get_existing_topic_cache(&topic.name())?,
1047      Err(e) => return create_error_poisoned!("Cannot lock DDScache. Error: {}", e),
1048    };
1049    // Update topic cache with DataReader's Qos
1050    match topic_cache_handle.lock() {
1051      Ok(mut tc) => tc.update_keep_limits(&qos),
1052      Err(e) => return create_error_poisoned!("Cannot lock topic cache. Error: {}", e),
1053    };
1054
1055    let reader_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), entity_id);
1056
1057    #[cfg(feature = "security")]
1058    if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
1059      // Security is enabled.
1060      // Check are we allowed to create the DataReader from Access control
1061      let check_res = sec_handle.get_plugins().check_create_datareader(
1062        reader_guid.prefix,
1063        dp.domain_id(),
1064        topic.name(),
1065        &qos,
1066      );
1067      match check_res {
1068        Ok(check_passed) => {
1069          if !check_passed {
1070            return create_error_not_allowed_by_security!(
1071              "Not allowed to create a DataReader to topic {}",
1072              topic.name()
1073            );
1074          }
1075        }
1076        Err(e) => {
1077          // Something went wrong in the check
1078          return create_error_internal!(
1079            "Failed to check DataReader rights from Access control: {}",
1080            e.msg
1081          );
1082        }
1083      };
1084
1085      // Register Reader to crypto plugin
1086      if let Err(e) = {
1087        let reader_security_attributes = sec_handle
1088          .get_plugins()
1089          .get_reader_sec_attributes(reader_guid, topic.name()); // Release lock
1090        reader_security_attributes.and_then(|attributes| {
1091          sec_handle
1092            .get_plugins()
1093            .register_local_reader(reader_guid, qos.property(), attributes)
1094        })
1095      } {
1096        return create_error_internal!(
1097          "Failed to register reader to crypto plugin: {} . GUID: {:?}",
1098          e,
1099          reader_guid
1100        );
1101      } else {
1102        info!("Registered local reader to crypto plugin. GUID: {reader_guid:?}");
1103      }
1104    }
1105
1106    // Construct the ReaderIngredients
1107    let data_reader_waker = Arc::new(Mutex::new(None));
1108
1109    let (poll_event_source, poll_event_sender) = mio_source::make_poll_channel()?;
1110
1111    let new_reader = ReaderIngredients {
1112      guid: reader_guid,
1113      notification_sender: send,
1114      status_sender,
1115      topic_name: topic.name(),
1116      topic_cache_handle: topic_cache_handle.clone(),
1117      like_stateless: reader_like_stateless,
1118      qos_policy: qos.clone(),
1119      data_reader_command_receiver: reader_command_receiver,
1120      data_reader_waker: data_reader_waker.clone(),
1121      poll_event_sender,
1122      security_plugins: self.security_plugins_handle.clone(),
1123    };
1124
1125    // Construct security info if needed
1126    #[cfg(not(feature = "security"))]
1127    let security_info: Option<EndpointSecurityInfo> = None;
1128    #[cfg(feature = "security")]
1129    let security_info = if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
1130      // Security enabled
1131      if reader_guid.entity_id.entity_kind.is_user_defined() {
1132        match sec_handle
1133          .get_plugins()
1134          .get_reader_sec_attributes(reader_guid, topic.name())
1135        {
1136          Ok(attr) => EndpointSecurityInfo::from(attr).into(),
1137          Err(e) => {
1138            return create_error_internal!(
1139              "Failed to get security info for reader: {}. Guid: {:?}",
1140              e,
1141              reader_guid
1142            );
1143          }
1144        }
1145      } else {
1146        None // For the built-in topics
1147      }
1148    } else {
1149      // No security enabled
1150      None
1151    };
1152
1153    // Add the topic & reader to Discovery DB
1154    {
1155      let mut db = self
1156        .discovery_db
1157        .write()
1158        .or_else(|e| create_error_poisoned!("Cannot lock discovery_db. {}", e))?;
1159      db.update_local_topic_reader(&dp, topic, &new_reader, security_info);
1160      db.update_topic_data_p(topic);
1161
1162      // Inform Discovery about the topic
1163      if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic {
1164        topic_name: topic.name(),
1165      }) {
1166        // Log the error but don't quit, failing to inform Discovery about the topic
1167        // shouldn't be that serious
1168        error!(
1169          "Failed send DiscoveryCommand::AddTopic about topic {}: {}",
1170          topic.name(),
1171          e
1172        );
1173      }
1174    }
1175
1176    // Note: notifying Discovery about the new reader is no longer done here.
1177    // Instead, it's done by the DP event loop once it has actually created the new
1178    // reader. This is done to avoid data races.
1179
1180    // Construct the data reader
1181    let datareader = with_key::SimpleDataReader::<D, SA>::new(
1182      outer.clone(),
1183      entity_id,
1184      topic.clone(),
1185      qos,
1186      rec,
1187      topic_cache_handle,
1188      self.discovery_command.clone(),
1189      status_receiver,
1190      reader_command_sender,
1191      data_reader_waker,
1192      poll_event_source,
1193    )?;
1194
1195    // Send reader ingredients to DP event loop, where the actual reader will be
1196    // constructed
1197    self
1198      .sender_add_reader
1199      .try_send(new_reader)
1200      .or_else(|e| create_error_poisoned!("Cannot add DataReader. Error: {}", e))?;
1201
1202    // Return the DataReader to user
1203    Ok(datareader)
1204  }
1205
1206  pub fn create_datareader<D, SA>(
1207    &self,
1208    outer: &Subscriber,
1209    topic: &Topic,
1210    entity_id: Option<EntityId>,
1211    qos: Option<QosPolicies>,
1212    reader_like_stateless: bool, // Create a stateless-like RTPS reader? Usually false
1213  ) -> CreateResult<WithKeyDataReader<D, SA>>
1214  where
1215    D: 'static + Keyed,
1216    SA: adapters::with_key::DeserializerAdapter<D>,
1217  {
1218    if topic.kind() != TopicKind::WithKey {
1219      return Err(CreateError::TopicKind(TopicKind::WithKey));
1220    }
1221    self.create_datareader_internal(outer, entity_id, topic, qos, reader_like_stateless)
1222  }
1223
1224  pub fn create_datareader_no_key<D: 'static, SA>(
1225    &self,
1226    outer: &Subscriber,
1227    topic: &Topic,
1228    entity_id_opt: Option<EntityId>,
1229    qos: Option<QosPolicies>,
1230    reader_like_stateless: bool, // Create a stateless-like RTPS reader? Usually false
1231  ) -> CreateResult<NoKeyDataReader<D, SA>>
1232  where
1233    SA: adapters::no_key::DeserializerAdapter<D>,
1234  {
1235    if topic.kind() != TopicKind::NoKey {
1236      return Err(CreateError::TopicKind(TopicKind::NoKey));
1237    }
1238
1239    let entity_id =
1240      self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_NO_KEY_USER_DEFINED);
1241
1242    let d = self.create_datareader_internal::<NoKeyWrapper<D>, DAWrapper<SA>>(
1243      outer,
1244      Some(entity_id),
1245      topic,
1246      qos,
1247      reader_like_stateless,
1248    )?;
1249
1250    Ok(NoKeyDataReader::<D, SA>::from_keyed(d))
1251  }
1252
1253  pub fn create_simple_datareader_no_key<D: 'static, SA>(
1254    &self,
1255    outer: &Subscriber,
1256    topic: &Topic,
1257    entity_id_opt: Option<EntityId>,
1258    qos: Option<QosPolicies>,
1259  ) -> CreateResult<no_key::SimpleDataReader<D, SA>>
1260  where
1261    SA: adapters::no_key::DeserializerAdapter<D> + 'static,
1262  {
1263    if topic.kind() != TopicKind::NoKey {
1264      return Err(CreateError::TopicKind(TopicKind::NoKey));
1265    }
1266
1267    let entity_id =
1268      self.unwrap_or_new_entity_id(entity_id_opt, EntityKind::READER_NO_KEY_USER_DEFINED);
1269
1270    let d = self.create_simple_datareader_internal::<NoKeyWrapper<D>, DAWrapper<SA>>(
1271      outer,
1272      Some(entity_id),
1273      topic,
1274      qos,
1275      false,
1276    )?;
1277
1278    Ok(no_key::SimpleDataReader::<D, SA>::from_keyed(d))
1279  }
1280
1281  pub fn participant(&self) -> Option<DomainParticipant> {
1282    self.domain_participant.clone().upgrade()
1283  }
1284
1285  pub(crate) fn remove_reader(&self, guid: GUID) {
1286    try_send_timeout(&self.sender_remove_reader, guid, None)
1287      .unwrap_or_else(|e| error!("Cannot remove Reader {guid:?} : {e:?}"));
1288  }
1289
1290  fn unwrap_or_new_entity_id(
1291    &self,
1292    entity_id_opt: Option<EntityId>,
1293    entity_kind: EntityKind,
1294  ) -> EntityId {
1295    // If the entity_id is given, then just use that. If not, then pull an arbitrary
1296    // number out of participant's hat.
1297    entity_id_opt.unwrap_or_else(|| self.participant().unwrap().new_entity_id(entity_kind))
1298  }
1299}
1300
1301// -------------------------------------------------------------------
1302
1303#[cfg(test)]
1304mod tests {}