cyclonedds/entity.rs
1//! The base of the DDS entity hierarchy.
2//!
3//! Most DDS objects ([`Participant`](crate::Participant),
4//! [`Topic`](crate::Topic), [`Reader`](crate::Reader),
5//! [`Writer`](crate::Writer), and others) are entities. See the
6//! [implementors of `Entity`](Entity#implementors) for the full list. This
7//! module provides the [`Entity`] trait with the common methods available to
8//! all entities.
9
10use crate::internal::ffi;
11use crate::{Result, Status};
12
13/// A unique opaque handle identifying an instance.
14///
15/// For keyed topics this corresponds to a specific key value, but applications
16/// should treat it as an opaque DDS handle.
17#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
18pub struct InstanceHandle {
19 pub(crate) inner: cyclonedds_sys::dds_instance_handle_t,
20}
21
22/// A raw entity ID for an entity.
23#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
24pub struct EntityId {
25 pub(crate) inner: cyclonedds_sys::dds_entity_t,
26}
27
28// TODO should the Entity trait be sealed?
29/// Common interface implemented by all members of the DDS entity hierarchy.
30///
31/// - [`Participant`](crate::Participant): the root entity representing
32/// membership in a domain.
33/// - [`WaitSet`](crate::WaitSet): blocks until one or more attached
34/// conditions are triggered.
35/// - [`GuardCondition`](crate::GuardCondition): a manually triggered
36/// condition for use with a [`WaitSet`](crate::WaitSet).
37/// - [`Topic<T>`](crate::Topic): names and types a data channel for a
38/// specific payload type `T`.
39/// - [`Publisher`](crate::Publisher): groups [`Writers`](crate::Writer) and
40/// controls their shared [`QoS`](crate::QoS).
41/// - [`Writer<T>`](crate::Writer): publishes samples of type `T` to a
42/// [`Topic`](crate::Topic).
43/// - [`Subscriber`](crate::Subscriber): groups [`Readers`](crate::Reader) and
44/// controls their shared [`QoS`](crate::QoS).
45/// - [`Reader<T>`](crate::Reader): receives samples of type `T` from a
46/// [`Topic`](crate::Topic).
47/// - [`ReadCondition<T>`](crate::ReadCondition): filters
48/// [`Reader`](crate::Reader) samples by
49/// [`sample`](crate::state::sample), [`view`](crate::state::view), and
50/// [`instance`](crate::state::instance) state.
51/// - [`QueryCondition<T, F>`](crate::QueryCondition): filters
52/// [`Reader`](crate::Reader) samples by [`sample state`](crate::State)
53/// and a predicate.
54pub trait Entity {
55 /// Returns the [`EntityId`] of this entity.
56 ///
57 /// # Examples
58 ///
59 /// ```
60 /// use cyclonedds::entity::Entity;
61 /// use cyclonedds::{Reader, Topic, Writer};
62 ///
63 /// # #[derive(
64 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
65 /// # )]
66 /// # struct Data {
67 /// # x: i32,
68 /// # }
69 /// # let domain = cyclonedds::Domain::default();
70 /// # let participant = cyclonedds::Participant::new(&domain)?;
71 /// let topic = Topic::<Data>::new(&participant, "Example")?;
72 /// let reader = Reader::new(&topic)?;
73 /// let writer = Writer::new(&topic)?;
74 ///
75 /// // The reader and the writer have distinct IDs.
76 /// assert_ne!(reader.id(), writer.id());
77 ///
78 /// # Ok::<_, cyclonedds::Error>(())
79 /// ```
80 fn id(&self) -> EntityId;
81
82 /// Returns the [`InstanceHandle`] of this entity.
83 ///
84 /// # Errors
85 ///
86 /// Returns an [`Error`](crate::Error) specifying the reason if the instance
87 /// handle fails to be retrieved.
88 ///
89 /// # Examples
90 ///
91 /// ```
92 /// use cyclonedds::entity::Entity;
93 /// use cyclonedds::{Reader, Topic, Writer};
94 ///
95 /// # #[derive(
96 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
97 /// # )]
98 /// # struct Data {
99 /// # x: i32,
100 /// # }
101 /// # let domain = cyclonedds::Domain::default();
102 /// # let participant = cyclonedds::Participant::new(&domain)?;
103 /// let topic = Topic::<Data>::new(&participant, "Example")?;
104 /// let reader = Reader::new(&topic)?;
105 /// let writer = Writer::new(&topic)?;
106 ///
107 /// // The reader and the writer have distinct instance handles.
108 /// assert_ne!(reader.instance_handle()?, writer.instance_handle()?);
109 ///
110 /// // Instance handles can be used to identify entities across various API
111 /// // calls. For example, the writer's handle appears in the set of matched
112 /// // publications.
113 /// let matched = reader.matched_publications()?;
114 /// assert_eq!(matched[0], writer.instance_handle()?);
115 /// # Ok::<_, cyclonedds::Error>(())
116 /// ```
117 fn instance_handle(&self) -> Result<InstanceHandle> {
118 let entity = self.id();
119 let inner = ffi::dds_get_instance_handle(entity.inner)?;
120 Ok(InstanceHandle { inner })
121 }
122
123 /// Returns the set of status flags that have changed since they were last
124 /// [`read`](crate::Reader::read) or [`taken`](crate::Reader::take).
125 ///
126 /// # Errors
127 ///
128 /// - Returns an [`Error`](crate::Error) if the status bits of the
129 /// corresponding entity could not be retrieved (e.g. the entity no longer
130 /// exists).
131 ///
132 /// - Returns [`BadParameter`](crate::Error::BadParameter) if the retrieved
133 /// bits do not correspond to a valid [`Status`].
134 ///
135 /// # Examples
136 ///
137 /// ```
138 /// use cyclonedds::entity::Entity;
139 /// use cyclonedds::{Reader, Status, Topic, Writer};
140 ///
141 /// # #[derive(
142 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
143 /// # )]
144 /// # struct Data {
145 /// # x: i32,
146 /// # }
147 /// # let domain = cyclonedds::Domain::default();
148 /// # let participant = cyclonedds::Participant::new(&domain)?;
149 /// let topic = Topic::<Data>::new(&participant, "Example")?;
150 /// let reader = Reader::new(&topic)?;
151 ///
152 /// // The reader has been created but nothing in particular has happened in
153 /// // terms of status changes.
154 /// let changed = reader.status_changes()?;
155 /// assert_eq!(changed, Status::empty());
156 ///
157 /// // The writer that is created will match with the reader.
158 /// let writer = Writer::new(&topic)?;
159 ///
160 /// // After a writer matches, the reader reports a status change.
161 /// let changed = reader.status_changes()?;
162 /// assert!(changed.contains(Status::SubscriptionMatched));
163 /// # Ok::<_, cyclonedds::Error>(())
164 /// ```
165 fn status_changes(&self) -> Result<Status> {
166 let entity = self.id();
167 let status = ffi::dds_get_status_changes(entity.inner)?;
168 Status::from_bits(status).ok_or(crate::error::Error::BadParameter)
169 }
170
171 /// Takes and clears the status flags matching `mask`, or all flags if
172 /// `mask` is `None`.
173 ///
174 /// Unlike [`read_status`](Entity::read_status), this clears the returned
175 /// flags on the entity.
176 ///
177 /// # Errors
178 ///
179 /// - Returns an [`Error`](crate::Error) if the status bits of the
180 /// corresponding entity could not be retrieved (e.g. the entity no longer
181 /// exists or the status mask contains entries that do not apply to the
182 /// entity type).
183 ///
184 /// - Returns [`BadParameter`](crate::Error::BadParameter) if the retrieved
185 /// bits do not correspond to a valid [`Status`].
186 ///
187 /// # Examples
188 ///
189 /// ```
190 /// use cyclonedds::entity::Entity;
191 /// use cyclonedds::{Reader, Status, Topic, Writer};
192 ///
193 /// # #[derive(
194 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
195 /// # )]
196 /// # struct Data {
197 /// # x: i32,
198 /// # }
199 /// # let domain = cyclonedds::Domain::default();
200 /// # let participant = cyclonedds::Participant::new(&domain)?;
201 /// let topic = Topic::<Data>::new(&participant, "Example")?;
202 /// let reader = Reader::new(&topic)?;
203 /// let writer = Writer::new(&topic)?;
204 ///
205 /// // The reader has matched with the writer, so its status should have
206 /// // updated.
207 /// let status = reader.take_status(Some(Status::SubscriptionMatched))?;
208 /// assert!(status.contains(Status::SubscriptionMatched));
209 ///
210 /// // The flag has been cleared; a second take returns empty.
211 /// let cleared = reader.take_status(Some(Status::SubscriptionMatched))?;
212 /// assert!(cleared.is_empty());
213 /// # Ok::<_, cyclonedds::Error>(())
214 /// ```
215 fn take_status(&self, mask: Option<Status>) -> Result<Status> {
216 let entity = self.id();
217 let mask = mask.unwrap_or(Status::all()).bits();
218 let status = ffi::dds_take_status(entity.inner, mask)?;
219 Status::from_bits(status).ok_or(crate::error::Error::BadParameter)
220 }
221
222 /// Reads the status flags matching `mask` without clearing them, or all
223 /// flags if `mask` is `None`.
224 ///
225 /// # Errors
226 ///
227 /// - Returns an [`Error`](crate::Error) if the status bits of the
228 /// corresponding entity could not be retrieved (e.g. the entity no longer
229 /// exists).
230 ///
231 /// - Returns [`BadParameter`](crate::Error::BadParameter) if the retrieved
232 /// bits do not correspond to a valid [`Status`].
233 ///
234 /// # Examples
235 ///
236 /// ```
237 /// use cyclonedds::entity::Entity;
238 /// use cyclonedds::{Reader, Status, Topic, Writer};
239 ///
240 /// # #[derive(
241 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
242 /// # )]
243 /// # struct Data {
244 /// # x: i32,
245 /// # }
246 /// # let domain = cyclonedds::Domain::default();
247 /// # let participant = cyclonedds::Participant::new(&domain)?;
248 /// let topic = Topic::<Data>::new(&participant, "Example")?;
249 /// let reader = Reader::new(&topic)?;
250 /// let writer = Writer::new(&topic)?;
251 ///
252 /// // The reader has matched with the writer, so its status should have
253 /// // updated.
254 /// let status = reader.read_status(Some(Status::SubscriptionMatched))?;
255 /// assert!(status.contains(Status::SubscriptionMatched));
256 ///
257 /// // The flag is preserved; a second read returns the same value.
258 /// let same = reader.read_status(Some(Status::SubscriptionMatched))?;
259 /// assert_eq!(status, same);
260 /// # Ok::<_, cyclonedds::Error>(())
261 /// ```
262 fn read_status(&self, mask: Option<Status>) -> Result<Status> {
263 let entity = self.id();
264 let mask = mask.unwrap_or(Status::all()).bits();
265 let status = ffi::dds_read_status(entity.inner, mask)?;
266 Status::from_bits(status).ok_or(crate::error::Error::BadParameter)
267 }
268
269 /// Returns the status mask enabled on the entity.
270 ///
271 /// # Errors
272 ///
273 /// - Returns an [`Error`](crate::Error) if the status mask of the
274 /// corresponding entity could not be retrieved (e.g. the entity no longer
275 /// exists).
276 ///
277 /// - Returns [`BadParameter`](crate::Error::BadParameter) if the retrieved
278 /// bits do not correspond to a valid [`Status`].
279 ///
280 /// # Examples
281 /// ```
282 /// use cyclonedds::entity::Entity;
283 /// use cyclonedds::{Status, Topic, Writer};
284 ///
285 /// # #[derive(
286 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
287 /// # )]
288 /// # struct Data {
289 /// # x: i32,
290 /// # }
291 /// # let domain = cyclonedds::Domain::default();
292 /// # let participant = cyclonedds::Participant::new(&domain)?;
293 /// let topic = Topic::<Data>::new(&participant, "Example")?;
294 /// let writer = Writer::new(&topic)?;
295 ///
296 /// // Get the initial active status mask.
297 /// assert_eq!(
298 /// writer.status_mask()?,
299 /// Status::OfferedDeadlineMissed
300 /// | Status::OfferedIncompatibleQoS
301 /// | Status::LivelinessLost
302 /// | Status::PublicationMatched
303 /// );
304 /// # Ok::<_, cyclonedds::Error>(())
305 /// ```
306 fn status_mask(&self) -> Result<Status> {
307 let entity = self.id();
308 let mask = ffi::dds_get_status_mask(entity.inner)?;
309 Status::from_bits(mask).ok_or(crate::error::Error::BadParameter)
310 }
311
312 /// Sets and enables a status mask on the entity.
313 ///
314 /// Only status flags included in `mask` will trigger listener callbacks or
315 /// be reported via [`status_changes`](Entity::status_changes).
316 ///
317 /// # Errors
318 ///
319 /// - Returns an [`Error`](crate::Error) if the status mask of the
320 /// corresponding entity could not be set (e.g. the entity no longer
321 /// exists).
322 ///
323 /// # Examples
324 /// ```
325 /// use cyclonedds::entity::Entity;
326 /// use cyclonedds::{Status, Topic, Writer};
327 ///
328 /// # #[derive(
329 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
330 /// # )]
331 /// # struct Data {
332 /// # x: i32,
333 /// # }
334 /// # let domain = cyclonedds::Domain::default();
335 /// # let participant = cyclonedds::Participant::new(&domain)?;
336 /// let topic = Topic::<Data>::new(&participant, "Example")?;
337 /// let writer = Writer::new(&topic)?;
338 ///
339 /// // Set the active status mask.
340 /// writer.set_status_mask(Status::PublicationMatched)?;
341 /// // Get the active status mask.
342 /// assert_eq!(writer.status_mask()?, Status::PublicationMatched);
343 /// # Ok::<_, cyclonedds::Error>(())
344 /// ```
345 fn set_status_mask(&self, mask: Status) -> Result<()> {
346 let entity = self.id();
347 let mask = mask.bits();
348 ffi::dds_set_status_mask(entity.inner, mask)
349 }
350}
351
352macro_rules! impl_entity {
353 ($ty:ty) => {
354 impl Entity for $ty {
355 fn id(&self) -> EntityId {
356 EntityId { inner: self.inner }
357 }
358 }
359 };
360 ($ty:ty where $($bounds:tt)*) => {
361 impl<$($bounds)*> Entity for $ty {
362 fn id(&self) -> EntityId {
363 EntityId { inner: self.inner }
364 }
365 }
366 };
367}
368
369impl_entity!(crate::Participant<'_>);
370impl_entity!(crate::Topic<'_, '_, T> where T: crate::Topicable);
371impl_entity!(crate::Publisher<'_, '_>);
372impl_entity!(crate::Subscriber<'_, '_>);
373impl_entity!(crate::Reader<'_, '_, '_, T> where T: crate::Topicable);
374impl_entity!(crate::Writer<'_, '_, '_, T> where T: crate::Topicable);
375impl_entity!(crate::ReadCondition<'_, '_, '_, '_, T> where T: crate::Topicable);
376impl_entity!(crate::QueryCondition<'_, '_, '_, '_, T, F> where T: crate::Topicable, F: Fn(&T) -> bool);
377impl_entity!(crate::GuardCondition<'_>);
378impl_entity!(crate::WaitSet<'_, '_, '_, A> where A);
379
380#[cfg(test)]
381mod tests {
382 use super::*;
383
384 #[test]
385 fn test_entity_id_all_entity_types() {
386 let domain_id = crate::tests::domain::unique_id();
387 let domain = crate::Domain::new(domain_id).unwrap();
388 let participant = crate::Participant::new(&domain).unwrap();
389 let topic_name = crate::tests::topic::unique_name();
390 let topic =
391 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
392 let publisher = crate::Publisher::new(&participant).unwrap();
393 let subscriber = crate::Subscriber::new(&participant).unwrap();
394 let reader = crate::Reader::new(&topic).unwrap();
395 let writer = crate::Writer::new(&topic).unwrap();
396 let read_condition = crate::ReadCondition::new(&reader, crate::state::sample::Any).unwrap();
397 let query_condition =
398 crate::QueryCondition::new(&reader, crate::State::empty(), |_| true).unwrap();
399 let guard_condition = crate::GuardCondition::new(&participant).unwrap();
400 let waitset = crate::WaitSet::<()>::new(&participant).unwrap();
401
402 assert_eq!(participant.id().inner, participant.inner);
403 assert_eq!(topic.id().inner, topic.inner);
404 assert_eq!(publisher.id().inner, publisher.inner);
405 assert_eq!(subscriber.id().inner, subscriber.inner);
406 assert_eq!(reader.id().inner, reader.inner);
407 assert_eq!(writer.id().inner, writer.inner);
408 assert_eq!(read_condition.id().inner, read_condition.inner);
409 assert_eq!(query_condition.id().inner, query_condition.inner);
410 assert_eq!(guard_condition.id().inner, guard_condition.inner);
411 assert_eq!(waitset.id().inner, waitset.inner);
412 }
413
414 #[test]
415 fn test_entity_methods_on_invalid_participant() {
416 let domain_id = crate::tests::domain::unique_id();
417 let domain = crate::Domain::new(domain_id).unwrap();
418 let mut participant = crate::Participant::new(&domain).unwrap();
419 let participant_id = participant.inner;
420 participant.inner = 0;
421
422 assert_eq!(
423 crate::Error::BadParameter,
424 participant.instance_handle().unwrap_err()
425 );
426 assert_eq!(
427 crate::Error::BadParameter,
428 participant.status_changes().unwrap_err()
429 );
430 assert_eq!(
431 crate::Error::BadParameter,
432 participant.take_status(None).unwrap_err()
433 );
434 assert_eq!(
435 crate::Error::BadParameter,
436 participant.read_status(None).unwrap_err()
437 );
438 assert_eq!(
439 crate::Error::BadParameter,
440 participant.status_mask().unwrap_err()
441 );
442 assert_eq!(
443 crate::Error::BadParameter,
444 participant
445 .set_status_mask(crate::Status::InconsistentTopic)
446 .unwrap_err()
447 );
448
449 participant.inner = participant_id;
450 }
451
452 #[test]
453 fn test_entity_methods_on_participant() {
454 let domain_id = crate::tests::domain::unique_id();
455 let domain = crate::Domain::new(domain_id).unwrap();
456 let participant = crate::Participant::new(&domain).unwrap();
457
458 let result = participant.instance_handle();
459 assert!(result.is_ok());
460 let status_changes = participant.status_changes().unwrap();
461 assert!(status_changes.is_empty());
462 let result = participant.set_status_mask(crate::Status::empty());
463 assert!(result.is_ok());
464 let mask = participant.status_mask().unwrap();
465 assert_eq!(mask, crate::Status::empty());
466 let status = participant
467 .read_status(Some(crate::Status::empty()))
468 .unwrap();
469 assert!(status.is_empty());
470 let status = participant
471 .take_status(Some(crate::Status::empty()))
472 .unwrap();
473 assert!(status.is_empty());
474 }
475
476 #[test]
477 fn test_entity_methods_on_reader() {
478 let domain_id = crate::tests::domain::unique_id();
479 let domain = crate::Domain::new(domain_id).unwrap();
480 let topic_name = crate::tests::topic::unique_name();
481 let participant = crate::Participant::new(&domain).unwrap();
482 let topic =
483 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
484 let reader = crate::Reader::new(&topic).unwrap();
485
486 let result = reader.instance_handle();
487 assert!(result.is_ok());
488 let status_changes = reader.status_changes().unwrap();
489 assert!(status_changes.is_empty());
490 let result = reader.set_status_mask(crate::Status::SubscriptionMatched);
491 assert!(result.is_ok());
492 let mask = reader.status_mask().unwrap();
493 assert_eq!(mask, crate::Status::SubscriptionMatched);
494 let status = reader
495 .read_status(Some(crate::Status::SubscriptionMatched))
496 .unwrap();
497 assert!(status.is_empty());
498 let status = reader
499 .take_status(Some(crate::Status::SubscriptionMatched))
500 .unwrap();
501 assert!(status.is_empty());
502 }
503}