Skip to main content

cyclonedds/
entity.rs

1//! The base of the DDS entity hierarchy.
2//!
3//! Most DDS objects ([`Participant`](crate::Participant),
4//! [`Topic`](crate::Topic), [`Reader`](crate::Reader),
5//! [`Writer`](crate::Writer), and others) are entities. See the
6//! [implementors of `Entity`](Entity#implementors) for the full list. This
7//! module provides the [`Entity`] trait with the common methods available to
8//! all entities.
9
10use crate::internal::ffi;
11use crate::{Result, Status};
12
13/// A unique opaque handle identifying an instance.
14///
15/// For keyed topics this corresponds to a specific key value, but applications
16/// should treat it as an opaque DDS handle.
17#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
18pub struct InstanceHandle {
19    pub(crate) inner: cyclonedds_sys::dds_instance_handle_t,
20}
21
22/// A raw entity ID for an entity.
23#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
24pub struct EntityId {
25    pub(crate) inner: cyclonedds_sys::dds_entity_t,
26}
27
28// TODO should the Entity trait be sealed?
29/// Common interface implemented by all members of the DDS entity hierarchy.
30///
31/// - [`Participant`](crate::Participant): the root entity representing
32///   membership in a domain.
33///   - [`WaitSet`](crate::WaitSet): blocks until one or more attached
34///     conditions are triggered.
35///   - [`GuardCondition`](crate::GuardCondition): a manually triggered
36///     condition for use with a [`WaitSet`](crate::WaitSet).
37///   - [`Topic<T>`](crate::Topic): names and types a data channel for a
38///     specific payload type `T`.
39///   - [`Publisher`](crate::Publisher): groups [`Writers`](crate::Writer) and
40///     controls their shared [`QoS`](crate::QoS).
41///     - [`Writer<T>`](crate::Writer): publishes samples of type `T` to a
42///       [`Topic`](crate::Topic).
43///   - [`Subscriber`](crate::Subscriber): groups [`Readers`](crate::Reader) and
44///     controls their shared [`QoS`](crate::QoS).
45///     - [`Reader<T>`](crate::Reader): receives samples of type `T` from a
46///       [`Topic`](crate::Topic).
47///       - [`ReadCondition<T>`](crate::ReadCondition): filters
48///         [`Reader`](crate::Reader) samples by
49///         [`sample`](crate::state::sample), [`view`](crate::state::view), and
50///         [`instance`](crate::state::instance) state.
51///       - [`QueryCondition<T, F>`](crate::QueryCondition): filters
52///         [`Reader`](crate::Reader) samples by [`sample state`](crate::State)
53///         and a predicate.
54pub trait Entity {
55    /// Returns the [`EntityId`] of this entity.
56    ///
57    /// # Examples
58    ///
59    /// ```
60    /// use cyclonedds::entity::Entity;
61    /// use cyclonedds::{Reader, Topic, Writer};
62    ///
63    /// # #[derive(
64    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
65    /// # )]
66    /// # struct Data {
67    /// #     x: i32,
68    /// # }
69    /// # let domain = cyclonedds::Domain::default();
70    /// # let participant = cyclonedds::Participant::new(&domain)?;
71    /// let topic = Topic::<Data>::new(&participant, "Example")?;
72    /// let reader = Reader::new(&topic)?;
73    /// let writer = Writer::new(&topic)?;
74    ///
75    /// // The reader and the writer have distinct IDs.
76    /// assert_ne!(reader.id(), writer.id());
77    ///
78    /// # Ok::<_, cyclonedds::Error>(())
79    /// ```
80    fn id(&self) -> EntityId;
81
82    /// Returns the [`InstanceHandle`] of this entity.
83    ///
84    /// # Errors
85    ///
86    /// Returns an [`Error`](crate::Error) specifying the reason if the instance
87    /// handle fails to be retrieved.
88    ///
89    /// # Examples
90    ///
91    /// ```
92    /// use cyclonedds::entity::Entity;
93    /// use cyclonedds::{Reader, Topic, Writer};
94    ///
95    /// # #[derive(
96    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
97    /// # )]
98    /// # struct Data {
99    /// #     x: i32,
100    /// # }
101    /// # let domain = cyclonedds::Domain::default();
102    /// # let participant = cyclonedds::Participant::new(&domain)?;
103    /// let topic = Topic::<Data>::new(&participant, "Example")?;
104    /// let reader = Reader::new(&topic)?;
105    /// let writer = Writer::new(&topic)?;
106    ///
107    /// // The reader and the writer have distinct instance handles.
108    /// assert_ne!(reader.instance_handle()?, writer.instance_handle()?);
109    ///
110    /// // Instance handles can be used to identify entities across various API
111    /// // calls. For example, the writer's handle appears in the set of matched
112    /// // publications.
113    /// let matched = reader.matched_publications()?;
114    /// assert_eq!(matched[0], writer.instance_handle()?);
115    /// # Ok::<_, cyclonedds::Error>(())
116    /// ```
117    fn instance_handle(&self) -> Result<InstanceHandle> {
118        let entity = self.id();
119        let inner = ffi::dds_get_instance_handle(entity.inner)?;
120        Ok(InstanceHandle { inner })
121    }
122
123    /// Returns the set of status flags that have changed since they were last
124    /// [`read`](crate::Reader::read) or [`taken`](crate::Reader::take).
125    ///
126    /// # Errors
127    ///
128    /// - Returns an [`Error`](crate::Error) if the status bits of the
129    ///   corresponding entity could not be retrieved (e.g. the entity no longer
130    ///   exists).
131    ///
132    /// - Returns [`BadParameter`](crate::Error::BadParameter) if the retrieved
133    ///   bits do not correspond to a valid [`Status`].
134    ///
135    /// # Examples
136    ///
137    /// ```
138    /// use cyclonedds::entity::Entity;
139    /// use cyclonedds::{Reader, Status, Topic, Writer};
140    ///
141    /// # #[derive(
142    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
143    /// # )]
144    /// # struct Data {
145    /// #     x: i32,
146    /// # }
147    /// # let domain = cyclonedds::Domain::default();
148    /// # let participant = cyclonedds::Participant::new(&domain)?;
149    /// let topic = Topic::<Data>::new(&participant, "Example")?;
150    /// let reader = Reader::new(&topic)?;
151    ///
152    /// // The reader has been created but nothing in particular has happened in
153    /// // terms of status changes.
154    /// let changed = reader.status_changes()?;
155    /// assert_eq!(changed, Status::empty());
156    ///
157    /// // The writer that is created will match with the reader.
158    /// let writer = Writer::new(&topic)?;
159    ///
160    /// // After a writer matches, the reader reports a status change.
161    /// let changed = reader.status_changes()?;
162    /// assert!(changed.contains(Status::SubscriptionMatched));
163    /// # Ok::<_, cyclonedds::Error>(())
164    /// ```
165    fn status_changes(&self) -> Result<Status> {
166        let entity = self.id();
167        let status = ffi::dds_get_status_changes(entity.inner)?;
168        Status::from_bits(status).ok_or(crate::error::Error::BadParameter)
169    }
170
171    /// Takes and clears the status flags matching `mask`, or all flags if
172    /// `mask` is `None`.
173    ///
174    /// Unlike [`read_status`](Entity::read_status), this clears the returned
175    /// flags on the entity.
176    ///
177    /// # Errors
178    ///
179    /// - Returns an [`Error`](crate::Error) if the status bits of the
180    ///   corresponding entity could not be retrieved (e.g. the entity no longer
181    ///   exists or the status mask contains entries that do not apply to the
182    ///   entity type).
183    ///
184    /// - Returns [`BadParameter`](crate::Error::BadParameter) if the retrieved
185    ///   bits do not correspond to a valid [`Status`].
186    ///
187    /// # Examples
188    ///
189    /// ```
190    /// use cyclonedds::entity::Entity;
191    /// use cyclonedds::{Reader, Status, Topic, Writer};
192    ///
193    /// # #[derive(
194    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
195    /// # )]
196    /// # struct Data {
197    /// #     x: i32,
198    /// # }
199    /// # let domain = cyclonedds::Domain::default();
200    /// # let participant = cyclonedds::Participant::new(&domain)?;
201    /// let topic = Topic::<Data>::new(&participant, "Example")?;
202    /// let reader = Reader::new(&topic)?;
203    /// let writer = Writer::new(&topic)?;
204    ///
205    /// // The reader has matched with the writer, so its status should have
206    /// // updated.
207    /// let status = reader.take_status(Some(Status::SubscriptionMatched))?;
208    /// assert!(status.contains(Status::SubscriptionMatched));
209    ///
210    /// // The flag has been cleared; a second take returns empty.
211    /// let cleared = reader.take_status(Some(Status::SubscriptionMatched))?;
212    /// assert!(cleared.is_empty());
213    /// # Ok::<_, cyclonedds::Error>(())
214    /// ```
215    fn take_status(&self, mask: Option<Status>) -> Result<Status> {
216        let entity = self.id();
217        let mask = mask.unwrap_or(Status::all()).bits();
218        let status = ffi::dds_take_status(entity.inner, mask)?;
219        Status::from_bits(status).ok_or(crate::error::Error::BadParameter)
220    }
221
222    /// Reads the status flags matching `mask` without clearing them, or all
223    /// flags if `mask` is `None`.
224    ///
225    /// # Errors
226    ///
227    /// - Returns an [`Error`](crate::Error) if the status bits of the
228    ///   corresponding entity could not be retrieved (e.g. the entity no longer
229    ///   exists).
230    ///
231    /// - Returns [`BadParameter`](crate::Error::BadParameter) if the retrieved
232    ///   bits do not correspond to a valid [`Status`].
233    ///
234    /// # Examples
235    ///
236    /// ```
237    /// use cyclonedds::entity::Entity;
238    /// use cyclonedds::{Reader, Status, Topic, Writer};
239    ///
240    /// # #[derive(
241    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
242    /// # )]
243    /// # struct Data {
244    /// #     x: i32,
245    /// # }
246    /// # let domain = cyclonedds::Domain::default();
247    /// # let participant = cyclonedds::Participant::new(&domain)?;
248    /// let topic = Topic::<Data>::new(&participant, "Example")?;
249    /// let reader = Reader::new(&topic)?;
250    /// let writer = Writer::new(&topic)?;
251    ///
252    /// // The reader has matched with the writer, so its status should have
253    /// // updated.
254    /// let status = reader.read_status(Some(Status::SubscriptionMatched))?;
255    /// assert!(status.contains(Status::SubscriptionMatched));
256    ///
257    /// // The flag is preserved; a second read returns the same value.
258    /// let same = reader.read_status(Some(Status::SubscriptionMatched))?;
259    /// assert_eq!(status, same);
260    /// # Ok::<_, cyclonedds::Error>(())
261    /// ```
262    fn read_status(&self, mask: Option<Status>) -> Result<Status> {
263        let entity = self.id();
264        let mask = mask.unwrap_or(Status::all()).bits();
265        let status = ffi::dds_read_status(entity.inner, mask)?;
266        Status::from_bits(status).ok_or(crate::error::Error::BadParameter)
267    }
268
269    /// Returns the status mask enabled on the entity.
270    ///
271    /// # Errors
272    ///
273    /// - Returns an [`Error`](crate::Error) if the status mask of the
274    ///   corresponding entity could not be retrieved (e.g. the entity no longer
275    ///   exists).
276    ///
277    /// - Returns [`BadParameter`](crate::Error::BadParameter) if the retrieved
278    ///   bits do not correspond to a valid [`Status`].
279    ///
280    /// # Examples
281    /// ```
282    /// use cyclonedds::entity::Entity;
283    /// use cyclonedds::{Status, Topic, Writer};
284    ///
285    /// # #[derive(
286    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
287    /// # )]
288    /// # struct Data {
289    /// #     x: i32,
290    /// # }
291    /// # let domain = cyclonedds::Domain::default();
292    /// # let participant = cyclonedds::Participant::new(&domain)?;
293    /// let topic = Topic::<Data>::new(&participant, "Example")?;
294    /// let writer = Writer::new(&topic)?;
295    ///
296    /// // Get the initial active status mask.
297    /// assert_eq!(
298    ///     writer.status_mask()?,
299    ///     Status::OfferedDeadlineMissed
300    ///         | Status::OfferedIncompatibleQoS
301    ///         | Status::LivelinessLost
302    ///         | Status::PublicationMatched
303    /// );
304    /// # Ok::<_, cyclonedds::Error>(())
305    /// ```
306    fn status_mask(&self) -> Result<Status> {
307        let entity = self.id();
308        let mask = ffi::dds_get_status_mask(entity.inner)?;
309        Status::from_bits(mask).ok_or(crate::error::Error::BadParameter)
310    }
311
312    /// Sets and enables a status mask on the entity.
313    ///
314    /// Only status flags included in `mask` will trigger listener callbacks or
315    /// be reported via [`status_changes`](Entity::status_changes).
316    ///
317    /// # Errors
318    ///
319    /// - Returns an [`Error`](crate::Error) if the status mask of the
320    ///   corresponding entity could not be set (e.g. the entity no longer
321    ///   exists).
322    ///
323    /// # Examples
324    /// ```
325    /// use cyclonedds::entity::Entity;
326    /// use cyclonedds::{Status, Topic, Writer};
327    ///
328    /// # #[derive(
329    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
330    /// # )]
331    /// # struct Data {
332    /// #     x: i32,
333    /// # }
334    /// # let domain = cyclonedds::Domain::default();
335    /// # let participant = cyclonedds::Participant::new(&domain)?;
336    /// let topic = Topic::<Data>::new(&participant, "Example")?;
337    /// let writer = Writer::new(&topic)?;
338    ///
339    /// // Set the active status mask.
340    /// writer.set_status_mask(Status::PublicationMatched)?;
341    /// // Get the active status mask.
342    /// assert_eq!(writer.status_mask()?, Status::PublicationMatched);
343    /// # Ok::<_, cyclonedds::Error>(())
344    /// ```
345    fn set_status_mask(&self, mask: Status) -> Result<()> {
346        let entity = self.id();
347        let mask = mask.bits();
348        ffi::dds_set_status_mask(entity.inner, mask)
349    }
350}
351
352macro_rules! impl_entity {
353    ($ty:ty) => {
354        impl Entity for $ty {
355            fn id(&self) -> EntityId {
356                EntityId { inner: self.inner }
357            }
358        }
359    };
360    ($ty:ty where $($bounds:tt)*) => {
361        impl<$($bounds)*> Entity for $ty {
362            fn id(&self) -> EntityId {
363                EntityId { inner: self.inner }
364            }
365        }
366    };
367}
368
369impl_entity!(crate::Participant<'_>);
370impl_entity!(crate::Topic<'_, '_, T> where T: crate::Topicable);
371impl_entity!(crate::Publisher<'_, '_>);
372impl_entity!(crate::Subscriber<'_, '_>);
373impl_entity!(crate::Reader<'_, '_, '_, T> where T: crate::Topicable);
374impl_entity!(crate::Writer<'_, '_, '_, T> where T: crate::Topicable);
375impl_entity!(crate::ReadCondition<'_, '_, '_, '_, T> where T: crate::Topicable);
376impl_entity!(crate::QueryCondition<'_, '_, '_, '_, T, F> where T: crate::Topicable, F: Fn(&T) -> bool);
377impl_entity!(crate::GuardCondition<'_>);
378impl_entity!(crate::WaitSet<'_, '_, '_, A> where A);
379
380#[cfg(test)]
381mod tests {
382    use super::*;
383
384    #[test]
385    fn test_entity_id_all_entity_types() {
386        let domain_id = crate::tests::domain::unique_id();
387        let domain = crate::Domain::new(domain_id).unwrap();
388        let participant = crate::Participant::new(&domain).unwrap();
389        let topic_name = crate::tests::topic::unique_name();
390        let topic =
391            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
392        let publisher = crate::Publisher::new(&participant).unwrap();
393        let subscriber = crate::Subscriber::new(&participant).unwrap();
394        let reader = crate::Reader::new(&topic).unwrap();
395        let writer = crate::Writer::new(&topic).unwrap();
396        let read_condition = crate::ReadCondition::new(&reader, crate::state::sample::Any).unwrap();
397        let query_condition =
398            crate::QueryCondition::new(&reader, crate::State::empty(), |_| true).unwrap();
399        let guard_condition = crate::GuardCondition::new(&participant).unwrap();
400        let waitset = crate::WaitSet::<()>::new(&participant).unwrap();
401
402        assert_eq!(participant.id().inner, participant.inner);
403        assert_eq!(topic.id().inner, topic.inner);
404        assert_eq!(publisher.id().inner, publisher.inner);
405        assert_eq!(subscriber.id().inner, subscriber.inner);
406        assert_eq!(reader.id().inner, reader.inner);
407        assert_eq!(writer.id().inner, writer.inner);
408        assert_eq!(read_condition.id().inner, read_condition.inner);
409        assert_eq!(query_condition.id().inner, query_condition.inner);
410        assert_eq!(guard_condition.id().inner, guard_condition.inner);
411        assert_eq!(waitset.id().inner, waitset.inner);
412    }
413
414    #[test]
415    fn test_entity_methods_on_invalid_participant() {
416        let domain_id = crate::tests::domain::unique_id();
417        let domain = crate::Domain::new(domain_id).unwrap();
418        let mut participant = crate::Participant::new(&domain).unwrap();
419        let participant_id = participant.inner;
420        participant.inner = 0;
421
422        assert_eq!(
423            crate::Error::BadParameter,
424            participant.instance_handle().unwrap_err()
425        );
426        assert_eq!(
427            crate::Error::BadParameter,
428            participant.status_changes().unwrap_err()
429        );
430        assert_eq!(
431            crate::Error::BadParameter,
432            participant.take_status(None).unwrap_err()
433        );
434        assert_eq!(
435            crate::Error::BadParameter,
436            participant.read_status(None).unwrap_err()
437        );
438        assert_eq!(
439            crate::Error::BadParameter,
440            participant.status_mask().unwrap_err()
441        );
442        assert_eq!(
443            crate::Error::BadParameter,
444            participant
445                .set_status_mask(crate::Status::InconsistentTopic)
446                .unwrap_err()
447        );
448
449        participant.inner = participant_id;
450    }
451
452    #[test]
453    fn test_entity_methods_on_participant() {
454        let domain_id = crate::tests::domain::unique_id();
455        let domain = crate::Domain::new(domain_id).unwrap();
456        let participant = crate::Participant::new(&domain).unwrap();
457
458        let result = participant.instance_handle();
459        assert!(result.is_ok());
460        let status_changes = participant.status_changes().unwrap();
461        assert!(status_changes.is_empty());
462        let result = participant.set_status_mask(crate::Status::empty());
463        assert!(result.is_ok());
464        let mask = participant.status_mask().unwrap();
465        assert_eq!(mask, crate::Status::empty());
466        let status = participant
467            .read_status(Some(crate::Status::empty()))
468            .unwrap();
469        assert!(status.is_empty());
470        let status = participant
471            .take_status(Some(crate::Status::empty()))
472            .unwrap();
473        assert!(status.is_empty());
474    }
475
476    #[test]
477    fn test_entity_methods_on_reader() {
478        let domain_id = crate::tests::domain::unique_id();
479        let domain = crate::Domain::new(domain_id).unwrap();
480        let topic_name = crate::tests::topic::unique_name();
481        let participant = crate::Participant::new(&domain).unwrap();
482        let topic =
483            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
484        let reader = crate::Reader::new(&topic).unwrap();
485
486        let result = reader.instance_handle();
487        assert!(result.is_ok());
488        let status_changes = reader.status_changes().unwrap();
489        assert!(status_changes.is_empty());
490        let result = reader.set_status_mask(crate::Status::SubscriptionMatched);
491        assert!(result.is_ok());
492        let mask = reader.status_mask().unwrap();
493        assert_eq!(mask, crate::Status::SubscriptionMatched);
494        let status = reader
495            .read_status(Some(crate::Status::SubscriptionMatched))
496            .unwrap();
497        assert!(status.is_empty());
498        let status = reader
499            .take_status(Some(crate::Status::SubscriptionMatched))
500            .unwrap();
501        assert!(status.is_empty());
502    }
503}