cyclonedds/reader.rs
1use crate::internal::ffi;
2use crate::internal::traits::AsFfi;
3use crate::{Result, Subscriber, Topic};
4
5/// A data reader for topic type [`T`](crate::Topicable).
6///
7/// A `Reader` receives samples of type [`T`](crate::Topicable) from a named
8/// [`Topic`](crate::Topic). Samples are retrieved via [`read`](Reader::read),
9/// [`take`](Reader::take), or [`peek`](Reader::peek). Matched
10/// [`Writers`](crate::Writer) on the same topic deliver samples subject to
11/// [`QoS`](crate::QoS) compatibility.
12///
13/// Use [`Reader::new`] for simple construction or [`Reader::builder`] for
14/// [`QoS`](crate::QoS) and [`listener`](crate::listener::ReaderListener)
15/// configuration.
16///
17/// # `peek` vs `read` vs `take`
18///
19/// | Method | Behavior | Cache effect | Read state effect |
20/// |------------------------|------------------------------------------------------------------------------------------|--------------------------------------------|--------------------------------|
21/// | [`peek`](Reader::peek) | Returns samples without consuming them. Useful for checking whether data is available. | Samples remain in the reader cache. | Stays unread. |
22/// | [`read`](Reader::read) | Returns samples and marks them as read (but leaves them available for subsequent reads). | Samples remain in the reader cache. | Marked as read. |
23/// | [`take`](Reader::take) | Returns samples and removes them (making them unavailable for subsequent reads). | Samples are removed from the reader cache. | Consumed and no longer cached. |
24#[derive(Debug, PartialEq, Eq)]
25pub struct Reader<'domain, 'participant, 'topic, T>
26where
27 T: crate::Topicable,
28{
29 pub(crate) inner: cyclonedds_sys::dds_entity_t,
30 phantom_topic: std::marker::PhantomData<&'topic Topic<'domain, 'participant, T>>,
31}
32
33/// Builder for [`Reader<T>`] (accessible via [`Reader::builder`]).
34#[derive(Debug)]
35pub struct ReaderBuilder<'domain, 'participant, 'topic, 'qos, T>
36where
37 T: crate::Topicable,
38{
39 subscriber: Option<&'participant Subscriber<'domain, 'participant>>,
40 topic: &'topic Topic<'domain, 'participant, T>,
41 qos: Option<&'qos crate::QoS>,
42 listener: Option<crate::ReaderListener<T>>,
43}
44
45impl<'d, 'p, 't, 'q, T> ReaderBuilder<'d, 'p, 't, 'q, T>
46where
47 T: crate::Topicable,
48{
49 /// Creates a new [`ReaderBuilder`] for the given [`Topic`].
50 ///
51 /// # Examples
52 ///
53 /// ```
54 /// use cyclonedds::builder::ReaderBuilder;
55 /// use cyclonedds::{Domain, Participant, Topic};
56 /// # #[derive(
57 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
58 /// # )]
59 /// # struct Data {
60 /// # x: i32,
61 /// # }
62 ///
63 /// let domain = Domain::default();
64 /// let participant = Participant::new(&domain)?;
65 /// let topic = Topic::new(&participant, "MyTopic")?;
66 /// let reader_builder = ReaderBuilder::<Data>::new(&topic);
67 /// # Ok::<_, cyclonedds::Error>(())
68 /// ```
69 #[must_use]
70 pub const fn new(topic: &'t Topic<'d, 'p, T>) -> Self {
71 Self {
72 subscriber: None,
73 topic,
74 qos: None,
75 listener: None,
76 }
77 }
78
79 /// Sets the [`QoS`](crate::QoS) for this reader builder.
80 ///
81 /// # Examples
82 ///
83 /// ```
84 /// use cyclonedds::builder::ReaderBuilder;
85 /// use cyclonedds::qos::policy;
86 /// use cyclonedds::{Duration, QoS};
87 /// # use cyclonedds::{Domain, Participant, Topic};
88 /// # let domain = Domain::default();
89 /// # let participant = Participant::new(&domain)?;
90 /// # #[derive(
91 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
92 /// # )]
93 /// # struct Data {
94 /// # x: i32,
95 /// # }
96 /// # let topic = Topic::new(&participant, "MyTopic")?;
97 ///
98 /// let qos = QoS::new().with_reliability(policy::Reliability::Reliable {
99 /// max_blocking_time: Duration::from_millis(100),
100 /// });
101 /// let reader_builder = ReaderBuilder::<Data>::new(&topic).with_qos(&qos);
102 /// # Ok::<_, cyclonedds::Error>(())
103 /// ```
104 #[must_use]
105 pub const fn with_qos(mut self, qos: &'q crate::QoS) -> Self {
106 self.qos = Some(qos);
107 self
108 }
109
110 /// Sets the [`Subscriber`](crate::Subscriber) on this reader builder.
111 ///
112 /// # Examples
113 ///
114 /// ```
115 /// use cyclonedds::ReaderListener;
116 /// use cyclonedds::Subscriber;
117 /// use cyclonedds::builder::ReaderBuilder;
118 /// # use cyclonedds::{Domain, Participant, Topic};
119 /// # let domain = Domain::default();
120 /// # let participant = Participant::new(&domain)?;
121 /// # #[derive(
122 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
123 /// # )]
124 /// # struct Data {
125 /// # x: i32,
126 /// # }
127 /// # let topic = Topic::new(&participant, "MyTopic")?;
128 ///
129 /// let subscriber = Subscriber::new(&participant)?;
130 ///
131 /// let reader_builder = ReaderBuilder::<Data>::new(&topic).with_subscriber(&subscriber);
132 /// # Ok::<_, cyclonedds::Error>(())
133 /// ```
134 #[must_use]
135 pub const fn with_subscriber(mut self, subscriber: &'p Subscriber<'d, 'p>) -> Self {
136 self.subscriber = Some(subscriber);
137 self
138 }
139
140 /// Sets the [`Listener`](crate::Listener) on this reader builder.
141 ///
142 /// # Examples
143 ///
144 /// ```
145 /// use cyclonedds::ReaderListener;
146 /// use cyclonedds::builder::ReaderBuilder;
147 /// # use cyclonedds::{Domain, Participant, Topic};
148 /// # let domain = Domain::default();
149 /// # let participant = Participant::new(&domain)?;
150 /// # #[derive(
151 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
152 /// # )]
153 /// # struct Data {
154 /// # x: i32,
155 /// # }
156 /// # let topic = Topic::new(&participant, "MyTopic")?;
157 ///
158 /// let reader_builder = ReaderBuilder::<Data>::new(&topic).with_listener(ReaderListener::new());
159 /// # Ok::<_, cyclonedds::Error>(())
160 /// ```
161 #[must_use]
162 pub fn with_listener<L>(mut self, listener: L) -> Self
163 where
164 L: AsRef<crate::ReaderListener<T>>,
165 {
166 self.listener = Some(listener.as_ref().clone());
167 self
168 }
169
170 /// Builds the [`Reader`].
171 ///
172 /// # Errors
173 ///
174 /// Returns an [`Error`](crate::Error) if the reader failed to create.
175 ///
176 /// # Examples
177 ///
178 /// ```
179 /// use cyclonedds::QoS;
180 /// use cyclonedds::builder::ReaderBuilder;
181 /// use cyclonedds::qos::policy;
182 /// # use cyclonedds::{Domain, Participant, Topic};
183 /// # let domain = Domain::default();
184 /// # let participant = Participant::new(&domain)?;
185 /// # #[derive(
186 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
187 /// # )]
188 /// # struct Data {
189 /// # x: i32,
190 /// # }
191 /// # let topic = Topic::new(&participant, "MyTopic")?;
192 ///
193 /// let qos = QoS::new().with_durability(policy::Durability::TransientLocal);
194 /// let reader = ReaderBuilder::<Data>::new(&topic).with_qos(&qos).build()?;
195 /// # Ok::<_, cyclonedds::Error>(())
196 /// ```
197 pub fn build(self) -> Result<Reader<'d, 'p, 't, T>> {
198 // NOTE: using `and_then` to avoid ? branch on the listener for coverage
199 // since the C lib currently panics on OOM rather than returning null.
200 self.listener
201 .map(|listener| listener.as_ffi())
202 .transpose()
203 .and_then(|listener| {
204 Ok(Reader {
205 inner: ffi::dds_create_reader(
206 self.subscriber
207 .map_or(ffi::dds_get_participant(self.topic.inner)?, |subscriber| {
208 subscriber.inner
209 }),
210 self.topic.inner,
211 self.qos.map(|qos| &qos.inner),
212 listener.as_ref(),
213 )?,
214 phantom_topic: std::marker::PhantomData,
215 })
216 })
217 }
218}
219
220impl<'d, 'p, 't, T> Reader<'d, 'p, 't, T>
221where
222 T: crate::Topicable,
223{
224 /// Creates a new `Reader` for the given [`Topic`](crate::Topic) with
225 /// default [`QoS`](crate::QoS) and no
226 /// [`listener`](crate::listener::ReaderListener).
227 ///
228 /// # Errors
229 ///
230 /// Returns an [`Error`](crate::Error) if the reader fails to create.
231 ///
232 /// # Examples
233 ///
234 /// ```
235 /// use cyclonedds::Reader;
236 /// # use cyclonedds::{Domain, Participant, Topic, Writer};
237 /// # let domain = Domain::default();
238 /// # let participant = Participant::new(&domain)?;
239 /// # #[derive(
240 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
241 /// # )]
242 /// # struct Data {
243 /// # x: i32,
244 /// # }
245 ///
246 /// let topic = Topic::<Data>::new(&participant, "Example")?;
247 /// let reader = Reader::new(&topic)?;
248 /// # Ok::<_, cyclonedds::Error>(())
249 /// ```
250 pub fn new(topic: &'t Topic<'d, 'p, T>) -> Result<Self> {
251 Self::builder(topic).build()
252 }
253
254 /// Returns a [`ReaderBuilder`](crate::builder::ReaderBuilder) for
255 /// constructing a reader with custom [`QoS`](crate::QoS) or a
256 /// [`listener`](crate::listener::ReaderListener).
257 ///
258 /// # Examples
259 ///
260 /// ```
261 /// use cyclonedds::{QoS, Reader, qos::policy::History};
262 ///
263 /// # use cyclonedds::{Domain, Participant, Topic};
264 /// # let domain = Domain::default();
265 /// # let participant = Participant::new(&domain)?;
266 /// # #[derive(
267 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
268 /// # )]
269 /// # struct Data {
270 /// # x: i32,
271 /// # }
272 ///
273 /// let topic = Topic::<Data>::new(&participant, "Example")?;
274 /// let qos = QoS::new().with_history(History::KeepAll);
275 /// let reader = Reader::builder(&topic).with_qos(&qos).build()?;
276 /// # Ok::<_, cyclonedds::Error>(())
277 /// ```
278 #[must_use]
279 pub const fn builder<'q>(topic: &'t Topic<'d, 'p, T>) -> ReaderBuilder<'d, 'p, 't, 'q, T> {
280 ReaderBuilder::new(topic)
281 }
282
283 /// Removes and returns all available samples from the reader cache.
284 ///
285 /// Each call to `take` consumes the returned samples so they will not be
286 /// returned by subsequent calls. See [`read`](Reader::read) to leave
287 /// samples in the cache.
288 ///
289 /// # Errors
290 ///
291 /// Returns an [`Error`](crate::Error) if the reader fails to take samples.
292 ///
293 /// # Examples
294 ///
295 /// ```
296 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
297 /// # let domain = Domain::default();
298 /// # let participant = Participant::new(&domain)?;
299 /// # #[derive(
300 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
301 /// # )]
302 /// # struct Data {
303 /// # x: i32,
304 /// # }
305 /// let topic = Topic::<Data>::new(&participant, "Example")?;
306 /// let reader = Reader::new(&topic)?;
307 /// let writer = Writer::new(&topic)?;
308 ///
309 /// writer.write(&Data::default())?;
310 /// let samples = reader.take()?;
311 /// assert_eq!(samples.len(), 1);
312 ///
313 /// // Samples have been consumed.
314 /// assert!(reader.take()?.is_empty());
315 /// # Ok::<_, cyclonedds::Error>(())
316 /// ```
317 pub fn take(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
318 where
319 T: std::clone::Clone,
320 {
321 ffi::dds_take(self.inner)
322 }
323
324 /// Returns all available samples from the reader cache without removing
325 /// them.
326 ///
327 /// Samples returned by `read` remain in the cache and will be returned
328 /// again by subsequent calls, marked as read in their
329 /// [`Info`](crate::sample::Info) state. See [`take`](Reader::take) to
330 /// consume samples.
331 ///
332 /// # Errors
333 ///
334 /// Returns an [`Error`](crate::Error) if the reader fails to read samples.
335 ///
336 /// # Examples
337 ///
338 /// ```
339 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
340 /// # let domain = Domain::default();
341 /// # let participant = Participant::new(&domain)?;
342 /// # #[derive(
343 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
344 /// # )]
345 /// # struct Data {
346 /// # x: i32,
347 /// # }
348 /// let topic = Topic::<Data>::new(&participant, "Example")?;
349 /// let reader = Reader::new(&topic)?;
350 /// let writer = Writer::new(&topic)?;
351 ///
352 /// writer.write(&Data::default())?;
353 /// let samples = reader.read()?;
354 /// assert_eq!(samples.len(), 1);
355 ///
356 /// // Samples are still in the cache.
357 /// assert_eq!(reader.read()?.len(), 1);
358 /// # Ok::<_, cyclonedds::Error>(())
359 /// ```
360 pub fn read(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
361 where
362 T: std::clone::Clone,
363 {
364 ffi::dds_read(self.inner)
365 }
366
367 /// Returns all available samples without marking them as read or removing
368 /// them from the cache.
369 ///
370 /// Useful for checking whether data is available without affecting the
371 /// read state of samples. Subsequent calls to [`read`](Reader::read) or
372 /// [`take`](Reader::take) will still return the same samples as unread.
373 ///
374 /// # Errors
375 ///
376 /// Returns an [`Error`](crate::Error) if the reader fails to peek.
377 ///
378 /// # Examples
379 ///
380 /// ```
381 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
382 /// # let domain = Domain::default();
383 /// # let participant = Participant::new(&domain)?;
384 /// # #[derive(
385 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
386 /// # )]
387 /// # struct Data {
388 /// # x: i32,
389 /// # }
390 /// let topic = Topic::<Data>::new(&participant, "Example")?;
391 /// let reader = Reader::new(&topic)?;
392 /// let writer = Writer::new(&topic)?;
393 ///
394 /// writer.write(&Data::default())?;
395 /// assert_eq!(reader.peek()?.len(), 1);
396 ///
397 /// // Samples are unaffected.
398 /// assert_eq!(reader.take()?.len(), 1);
399 /// # Ok::<_, cyclonedds::Error>(())
400 /// ```
401 pub fn peek(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
402 where
403 T: std::clone::Clone,
404 {
405 ffi::dds_peek(self.inner)
406 }
407
408 /// Returns the instance handles of all writers currently matched with
409 /// this reader.
410 ///
411 /// The returned handles can be compared against
412 /// [`InstanceHandle`](crate::entity::InstanceHandle) values from writer
413 /// entities to identify specific matched writers.
414 ///
415 /// # Errors
416 ///
417 /// Returns an [`Error`](crate::Error) if the reader fails to retrieve the
418 /// matched publications.
419 ///
420 /// # Examples
421 ///
422 /// ```
423 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
424 /// # let domain = Domain::default();
425 /// # let participant = Participant::new(&domain)?;
426 /// # #[derive(
427 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
428 /// # )]
429 /// # struct Data {
430 /// # x: i32,
431 /// # }
432 /// use cyclonedds::entity::Entity;
433 ///
434 /// let topic = Topic::<Data>::new(&participant, "Example")?;
435 /// let reader = Reader::new(&topic)?;
436 /// let writer = Writer::new(&topic)?;
437 ///
438 /// let matched = reader.matched_publications()?;
439 /// assert_eq!(matched[0], writer.instance_handle()?);
440 /// # Ok::<_, cyclonedds::Error>(())
441 /// ```
442 pub fn matched_publications(&self) -> Result<Vec<crate::entity::InstanceHandle>> {
443 let matched = ffi::dds_get_matched_publications(self.inner)?;
444 let matched = matched
445 .iter()
446 .map(|&inner| crate::entity::InstanceHandle { inner })
447 .collect();
448 Ok(matched)
449 }
450
451 /// Blocks until all historical data available from matched writers with
452 /// [`TransientLocal`](crate::qos::policy::Durability::TransientLocal) or
453 /// higher durability has been received, or until `timeout` elapses.
454 ///
455 /// # Errors
456 ///
457 /// Returns an [`Error`](crate::Error) if the timeout elapses before
458 /// historical data is received or if the reader returns an error.
459 ///
460 /// # Examples
461 ///
462 /// ```
463 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
464 /// # let domain = Domain::default();
465 /// # let participant = Participant::new(&domain)?;
466 /// # #[derive(
467 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
468 /// # )]
469 /// # struct Data {
470 /// # x: i32,
471 /// # }
472 /// use cyclonedds::Duration;
473 ///
474 /// let topic = Topic::<Data>::new(&participant, "Example")?;
475 /// let reader = Reader::new(&topic)?;
476 /// reader.wait_for_historical_data(Duration::from_secs(1))?;
477 /// # Ok::<_, cyclonedds::Error>(())
478 /// ```
479 pub fn wait_for_historical_data(&self, timeout: crate::Duration) -> Result<()> {
480 ffi::dds_reader_wait_for_historical_data(self.inner, timeout.inner)
481 }
482
483 pub(crate) const fn from_existing(
484 inner: cyclonedds_sys::dds_entity_t,
485 ) -> std::mem::ManuallyDrop<Self> {
486 std::mem::ManuallyDrop::new(Self {
487 inner,
488 phantom_topic: std::marker::PhantomData,
489 })
490 }
491
492 /// Sets the [`ReaderListener`](crate::ReaderListener) on this reader,
493 /// replacing any previously set listener.
494 ///
495 /// # Errors
496 ///
497 /// Returns an [`Error`](crate::Error) if the reader fails to set the
498 /// listener.
499 ///
500 /// # Examples
501 ///
502 /// ```
503 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
504 /// # let domain = Domain::default();
505 /// # let participant = Participant::new(&domain)?;
506 /// # #[derive(
507 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
508 /// # )]
509 /// # struct Data {
510 /// # x: i32,
511 /// # }
512 /// use cyclonedds::listener::ReaderListener;
513 ///
514 /// let topic = Topic::<Data>::new(&participant, "Example")?;
515 /// let mut reader = Reader::new(&topic)?;
516 /// reader.set_listener(
517 /// ReaderListener::new().with_subscription_matched(|_, status| {
518 /// println!("matched writers: {}", status.current.count);
519 /// }),
520 /// )?;
521 /// # Ok::<_, cyclonedds::Error>(())
522 /// ```
523 pub fn set_listener<L>(&mut self, listener: L) -> Result<()>
524 where
525 L: AsRef<crate::ReaderListener<T>>,
526 {
527 listener
528 .as_ref()
529 .as_ffi()
530 .and_then(|listener| ffi::dds_set_listener(self.inner, Some(listener.inner)))
531 }
532
533 /// Removes the listener from this reader.
534 ///
535 /// # Errors
536 ///
537 /// Returns an [`Error`](crate::Error) if the reader fails to unset the
538 /// listener.
539 ///
540 /// # Examples
541 ///
542 /// ```
543 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
544 /// # let domain = Domain::default();
545 /// # let participant = Participant::new(&domain)?;
546 /// # #[derive(
547 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
548 /// # )]
549 /// # struct Data {
550 /// # x: i32,
551 /// # }
552 /// let topic = Topic::<Data>::new(&participant, "Example")?;
553 /// let mut reader = Reader::new(&topic)?;
554 /// reader.unset_listener()?;
555 /// # Ok::<_, cyclonedds::Error>(())
556 /// ```
557 pub fn unset_listener(&mut self) -> Result<()> {
558 ffi::dds_set_listener(self.inner, None)?;
559 Ok(())
560 }
561
562 /// Sets the [`ReaderListener`](crate::ReaderListener) on this reader,
563 /// consuming and returning `self`.
564 ///
565 /// # Errors
566 ///
567 /// Returns an [`Error`](crate::Error) if the reader fails to set the
568 /// listener.
569 ///
570 /// # Examples
571 ///
572 /// ```
573 /// use cyclonedds::listener::ReaderListener;
574 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
575 /// # let domain = Domain::default();
576 /// # let participant = Participant::new(&domain)?;
577 /// # #[derive(
578 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
579 /// # )]
580 /// # struct Data {
581 /// # x: i32,
582 /// # }
583 ///
584 /// let topic = Topic::<Data>::new(&participant, "Example")?;
585 /// let reader = Reader::new(&topic)?.with_listener(ReaderListener::new())?;
586 /// # Ok::<_, cyclonedds::Error>(())
587 /// ```
588 pub fn with_listener<L>(mut self, listener: L) -> Result<Self>
589 where
590 L: AsRef<crate::ReaderListener<T>>,
591 {
592 self.set_listener(listener).map(|()| self)
593 }
594}
595
596impl<T> Drop for Reader<'_, '_, '_, T>
597where
598 T: crate::Topicable,
599{
600 fn drop(&mut self) {
601 let result = ffi::dds_delete(self.inner);
602 debug_assert!(
603 result.is_ok(),
604 "unable to delete {self:?}: failed with {result:?}"
605 );
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use super::*;
612 use crate::entity::Entity;
613
614 #[test]
615 fn test_reader_create() {
616 let domain_id = crate::tests::domain::unique_id();
617 let domain = crate::Domain::new(domain_id).unwrap();
618 let qos = crate::QoS::new();
619 let topic_name = crate::tests::topic::unique_name();
620 let participant = crate::Participant::new(&domain).unwrap();
621 let subscriber = crate::Subscriber::new(&participant).unwrap();
622 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
623 let listener = crate::ReaderListener::new();
624
625 let _ = Reader::new(&topic).unwrap();
626 let _ = Reader::builder(&topic).with_qos(&qos).build().unwrap();
627 let _ = Reader::builder(&topic)
628 .with_subscriber(&subscriber)
629 .build()
630 .unwrap();
631 let _ = Reader::builder(&topic)
632 .with_qos(&qos)
633 .with_subscriber(&subscriber)
634 .with_listener(listener)
635 .build()
636 .unwrap();
637 }
638
639 #[test]
640 fn test_reader_create_with_invalid_topic() {
641 let domain_id = crate::tests::domain::unique_id();
642 let domain = crate::Domain::new(domain_id).unwrap();
643 let qos = crate::QoS::new();
644 let topic_name = crate::tests::topic::unique_name();
645 let participant = crate::Participant::new(&domain).unwrap();
646 let mut topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
647
648 let topic_id = topic.inner;
649 topic.inner = 0;
650 let result = Reader::new(&topic).unwrap_err();
651 assert_eq!(result, crate::Error::BadParameter);
652 let result = Reader::builder(&topic).with_qos(&qos).build().unwrap_err();
653 assert_eq!(result, crate::Error::BadParameter);
654 topic.inner = topic_id;
655 }
656
657 #[test]
658 fn test_reader_create_with_invalid_subscriber() {
659 let domain_id = crate::tests::domain::unique_id();
660 let domain = crate::Domain::new(domain_id).unwrap();
661 let topic_name = crate::tests::topic::unique_name();
662 let participant = crate::Participant::new(&domain).unwrap();
663 let mut subscriber = crate::Subscriber::new(&participant).unwrap();
664 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
665
666 let subscriber_id = subscriber.inner;
667 subscriber.inner = 0;
668 let result = Reader::builder(&topic)
669 .with_subscriber(&subscriber)
670 .build()
671 .unwrap_err();
672 assert_eq!(result, crate::Error::BadParameter);
673 subscriber.inner = subscriber_id;
674 }
675
676 #[test]
677 fn test_reader_empty_read() {
678 let domain_id = crate::tests::domain::unique_id();
679 let domain = crate::Domain::new(domain_id).unwrap();
680 let topic_name = crate::tests::topic::unique_name();
681 let participant = crate::Participant::new(&domain).unwrap();
682 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
683
684 let reader = Reader::new(&topic).unwrap();
685 let _ = reader.read().unwrap();
686 }
687
688 #[test]
689 fn test_reader_empty_take() {
690 let domain_id = crate::tests::domain::unique_id();
691 let domain = crate::Domain::new(domain_id).unwrap();
692 let topic_name = crate::tests::topic::unique_name();
693 let participant = crate::Participant::new(&domain).unwrap();
694 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
695
696 let reader = Reader::new(&topic).unwrap();
697 let _ = reader.take().unwrap();
698 }
699
700 #[test]
701 fn test_reader_empty_peek() {
702 let domain_id = crate::tests::domain::unique_id();
703 let domain = crate::Domain::new(domain_id).unwrap();
704 let topic_name = crate::tests::topic::unique_name();
705 let participant = crate::Participant::new(&domain).unwrap();
706 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
707
708 let reader = Reader::new(&topic).unwrap();
709 let _ = reader.peek().unwrap();
710 }
711
712 #[test]
713 fn test_reader_create_from_existing() {
714 let domain_id = crate::tests::domain::unique_id();
715 let domain = crate::Domain::new(domain_id).unwrap();
716 let topic_name = crate::tests::topic::unique_name();
717 let participant = crate::Participant::new(&domain).unwrap();
718 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
719
720 let reader_01 = Reader::new(&topic).unwrap();
721 let reader_02 = Reader::<crate::tests::topic::Data>::from_existing(reader_01.inner);
722 assert_eq!(reader_01.inner, reader_02.inner);
723 }
724
725 #[test]
726 fn test_reader_wait_for_historical_data() {
727 let domain_id = crate::tests::domain::unique_id();
728 let domain = crate::Domain::new(domain_id).unwrap();
729 let topic_name = crate::tests::topic::unique_name();
730 let participant = crate::Participant::new(&domain).unwrap();
731 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
732
733 let reader = Reader::new(&topic).unwrap();
734
735 reader
736 .wait_for_historical_data(crate::Duration::INFINITE)
737 .unwrap();
738 }
739
740 #[test]
741 fn test_reader_matched_publications() {
742 let domain_id = crate::tests::domain::unique_id();
743 let domain = crate::Domain::new(domain_id).unwrap();
744 let topic_name = crate::tests::topic::unique_name();
745 let participant = crate::Participant::new(&domain).unwrap();
746 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
747
748 let reader = Reader::new(&topic).unwrap();
749 let matched = reader.matched_publications().unwrap();
750 assert_eq!(matched.len(), 0);
751
752 let writer = crate::Writer::new(&topic).unwrap();
753
754 let matched = reader.matched_publications().unwrap();
755
756 assert_eq!(matched.len(), 1);
757 let expected = writer.instance_handle().unwrap();
758 let actual = matched[0];
759 assert_eq!(expected, actual);
760 }
761
762 #[test]
763 fn test_reader_matched_publications_on_invalid_reader() {
764 let domain_id = crate::tests::domain::unique_id();
765 let domain = crate::Domain::new(domain_id).unwrap();
766 let topic_name = crate::tests::topic::unique_name();
767 let participant = crate::Participant::new(&domain).unwrap();
768 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
769
770 let mut reader = Reader::new(&topic).unwrap();
771 let reader_id = reader.inner;
772 reader.inner = 0;
773
774 let result = reader.matched_publications().unwrap_err();
775 assert_eq!(result, crate::Error::BadParameter);
776 reader.inner = reader_id;
777 }
778
779 #[test]
780 fn test_reader_with_listener() {
781 let domain_id = crate::tests::domain::unique_id();
782 let domain = crate::Domain::new(domain_id).unwrap();
783 let topic_name = crate::tests::topic::unique_name();
784 let participant = crate::Participant::new(&domain).unwrap();
785 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
786
787 let listener = crate::ReaderListener::new()
788 .with_data_available(|_| ())
789 .with_liveliness_changed(|_, _| ())
790 .with_requested_deadline_missed(|_, _| ())
791 .with_requested_incompatible_qos(|_, _| ())
792 .with_sample_lost(|_, _| ())
793 .with_sample_rejected(|_, _| ())
794 .with_subscription_matched(|_, _| ());
795
796 let _ = Reader::new(&topic)
797 .unwrap()
798 .with_listener(&listener)
799 .unwrap();
800
801 let mut reader = Reader::new(&topic).unwrap();
802 reader.set_listener(&listener).unwrap();
803 reader.unset_listener().unwrap();
804 }
805
806 #[test]
807 fn test_reader_with_listener_on_invalid_reader() {
808 let domain_id = crate::tests::domain::unique_id();
809 let domain = crate::Domain::new(domain_id).unwrap();
810 let topic_name = crate::tests::topic::unique_name();
811 let participant = crate::Participant::new(&domain).unwrap();
812 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
813
814 let listener = crate::ReaderListener::new();
815
816 let mut reader = Reader::new(&topic).unwrap();
817 let reader_id = reader.inner;
818 reader.inner = 0;
819 let result = reader.set_listener(&listener).unwrap_err();
820 assert_eq!(result, crate::Error::BadParameter);
821 let result = reader.unset_listener().unwrap_err();
822 assert_eq!(result, crate::Error::BadParameter);
823 reader.inner = reader_id;
824 }
825}