Skip to main content

cyclonedds/
reader.rs

1use crate::internal::ffi;
2use crate::internal::traits::AsFfi;
3use crate::{Result, Subscriber, Topic};
4
5/// A data reader for topic type [`T`](crate::Topicable).
6///
7/// A `Reader` receives samples of type [`T`](crate::Topicable) from a named
8/// [`Topic`](crate::Topic). Samples are retrieved via [`read`](Reader::read),
9/// [`take`](Reader::take), or [`peek`](Reader::peek). Matched
10/// [`Writers`](crate::Writer) on the same topic deliver samples subject to
11/// [`QoS`](crate::QoS) compatibility.
12///
13/// Use [`Reader::new`] for simple construction or [`Reader::builder`] for
14/// [`QoS`](crate::QoS) and [`listener`](crate::listener::ReaderListener)
15/// configuration.
16///
17/// # `peek` vs `read` vs `take`
18///
19/// | Method                 | Behavior                                                                                 | Cache effect                               | Read state effect              |
20/// |------------------------|------------------------------------------------------------------------------------------|--------------------------------------------|--------------------------------|
21/// | [`peek`](Reader::peek) | Returns samples without consuming them. Useful for checking whether data is available.   | Samples remain in the reader cache.        | Stays unread.                  |
22/// | [`read`](Reader::read) | Returns samples and marks them as read (but leaves them available for subsequent reads). | Samples remain in the reader cache.        | Marked as read.                |
23/// | [`take`](Reader::take) | Returns samples and removes them (making them unavailable for subsequent reads).         | Samples are removed from the reader cache. | Consumed and no longer cached. |
24#[derive(Debug, PartialEq, Eq)]
25pub struct Reader<'domain, 'participant, 'topic, T>
26where
27    T: crate::Topicable,
28{
29    pub(crate) inner: cyclonedds_sys::dds_entity_t,
30    phantom_topic: std::marker::PhantomData<&'topic Topic<'domain, 'participant, T>>,
31}
32
33/// Builder for [`Reader<T>`] (accessible via [`Reader::builder`]).
34#[derive(Debug)]
35pub struct ReaderBuilder<'domain, 'participant, 'topic, 'qos, T>
36where
37    T: crate::Topicable,
38{
39    subscriber: Option<&'participant Subscriber<'domain, 'participant>>,
40    topic: &'topic Topic<'domain, 'participant, T>,
41    qos: Option<&'qos crate::QoS>,
42    listener: Option<crate::ReaderListener<T>>,
43}
44
45impl<'d, 'p, 't, 'q, T> ReaderBuilder<'d, 'p, 't, 'q, T>
46where
47    T: crate::Topicable,
48{
49    /// Creates a new [`ReaderBuilder`] for the given [`Topic`].
50    ///
51    /// # Examples
52    ///
53    /// ```
54    /// use cyclonedds::builder::ReaderBuilder;
55    /// use cyclonedds::{Domain, Participant, Topic};
56    /// # #[derive(
57    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
58    /// # )]
59    /// # struct Data {
60    /// #     x: i32,
61    /// # }
62    ///
63    /// let domain = Domain::default();
64    /// let participant = Participant::new(&domain)?;
65    /// let topic = Topic::new(&participant, "MyTopic")?;
66    /// let reader_builder = ReaderBuilder::<Data>::new(&topic);
67    /// # Ok::<_, cyclonedds::Error>(())
68    /// ```
69    #[must_use]
70    pub const fn new(topic: &'t Topic<'d, 'p, T>) -> Self {
71        Self {
72            subscriber: None,
73            topic,
74            qos: None,
75            listener: None,
76        }
77    }
78
79    /// Sets the [`QoS`](crate::QoS) for this reader builder.
80    ///
81    /// # Examples
82    ///
83    /// ```
84    /// use cyclonedds::builder::ReaderBuilder;
85    /// use cyclonedds::qos::policy;
86    /// use cyclonedds::{Duration, QoS};
87    /// # use cyclonedds::{Domain, Participant, Topic};
88    /// # let domain = Domain::default();
89    /// # let participant = Participant::new(&domain)?;
90    /// # #[derive(
91    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
92    /// # )]
93    /// # struct Data {
94    /// #     x: i32,
95    /// # }
96    /// # let topic = Topic::new(&participant, "MyTopic")?;
97    ///
98    /// let qos = QoS::new().with_reliability(policy::Reliability::Reliable {
99    ///     max_blocking_time: Duration::from_millis(100),
100    /// });
101    /// let reader_builder = ReaderBuilder::<Data>::new(&topic).with_qos(&qos);
102    /// # Ok::<_, cyclonedds::Error>(())
103    /// ```
104    #[must_use]
105    pub const fn with_qos(mut self, qos: &'q crate::QoS) -> Self {
106        self.qos = Some(qos);
107        self
108    }
109
110    /// Sets the [`Subscriber`](crate::Subscriber) on this reader builder.
111    ///
112    /// # Examples
113    ///
114    /// ```
115    /// use cyclonedds::ReaderListener;
116    /// use cyclonedds::Subscriber;
117    /// use cyclonedds::builder::ReaderBuilder;
118    /// # use cyclonedds::{Domain, Participant, Topic};
119    /// # let domain = Domain::default();
120    /// # let participant = Participant::new(&domain)?;
121    /// # #[derive(
122    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
123    /// # )]
124    /// # struct Data {
125    /// #     x: i32,
126    /// # }
127    /// # let topic = Topic::new(&participant, "MyTopic")?;
128    ///
129    /// let subscriber = Subscriber::new(&participant)?;
130    ///
131    /// let reader_builder = ReaderBuilder::<Data>::new(&topic).with_subscriber(&subscriber);
132    /// # Ok::<_, cyclonedds::Error>(())
133    /// ```
134    #[must_use]
135    pub const fn with_subscriber(mut self, subscriber: &'p Subscriber<'d, 'p>) -> Self {
136        self.subscriber = Some(subscriber);
137        self
138    }
139
140    /// Sets the [`Listener`](crate::Listener) on this reader builder.
141    ///
142    /// # Examples
143    ///
144    /// ```
145    /// use cyclonedds::ReaderListener;
146    /// use cyclonedds::builder::ReaderBuilder;
147    /// # use cyclonedds::{Domain, Participant, Topic};
148    /// # let domain = Domain::default();
149    /// # let participant = Participant::new(&domain)?;
150    /// # #[derive(
151    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
152    /// # )]
153    /// # struct Data {
154    /// #     x: i32,
155    /// # }
156    /// # let topic = Topic::new(&participant, "MyTopic")?;
157    ///
158    /// let reader_builder = ReaderBuilder::<Data>::new(&topic).with_listener(ReaderListener::new());
159    /// # Ok::<_, cyclonedds::Error>(())
160    /// ```
161    #[must_use]
162    pub fn with_listener<L>(mut self, listener: L) -> Self
163    where
164        L: AsRef<crate::ReaderListener<T>>,
165    {
166        self.listener = Some(listener.as_ref().clone());
167        self
168    }
169
170    /// Builds the [`Reader`].
171    ///
172    /// # Errors
173    ///
174    /// Returns an [`Error`](crate::Error) if the reader failed to create.
175    ///
176    /// # Examples
177    ///
178    /// ```
179    /// use cyclonedds::QoS;
180    /// use cyclonedds::builder::ReaderBuilder;
181    /// use cyclonedds::qos::policy;
182    /// # use cyclonedds::{Domain, Participant, Topic};
183    /// # let domain = Domain::default();
184    /// # let participant = Participant::new(&domain)?;
185    /// # #[derive(
186    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
187    /// # )]
188    /// # struct Data {
189    /// #     x: i32,
190    /// # }
191    /// # let topic = Topic::new(&participant, "MyTopic")?;
192    ///
193    /// let qos = QoS::new().with_durability(policy::Durability::TransientLocal);
194    /// let reader = ReaderBuilder::<Data>::new(&topic).with_qos(&qos).build()?;
195    /// # Ok::<_, cyclonedds::Error>(())
196    /// ```
197    pub fn build(self) -> Result<Reader<'d, 'p, 't, T>> {
198        // NOTE: using `and_then` to avoid ? branch on the listener for coverage
199        // since the C lib currently panics on OOM rather than returning null.
200        self.listener
201            .map(|listener| listener.as_ffi())
202            .transpose()
203            .and_then(|listener| {
204                Ok(Reader {
205                    inner: ffi::dds_create_reader(
206                        self.subscriber
207                            .map_or(ffi::dds_get_participant(self.topic.inner)?, |subscriber| {
208                                subscriber.inner
209                            }),
210                        self.topic.inner,
211                        self.qos.map(|qos| &qos.inner),
212                        listener.as_ref(),
213                    )?,
214                    phantom_topic: std::marker::PhantomData,
215                })
216            })
217    }
218}
219
220impl<'d, 'p, 't, T> Reader<'d, 'p, 't, T>
221where
222    T: crate::Topicable,
223{
224    /// Creates a new `Reader` for the given [`Topic`](crate::Topic) with
225    /// default [`QoS`](crate::QoS) and no
226    /// [`listener`](crate::listener::ReaderListener).
227    ///
228    /// # Errors
229    ///
230    /// Returns an [`Error`](crate::Error) if the reader fails to create.
231    ///
232    /// # Examples
233    ///
234    /// ```
235    /// use cyclonedds::Reader;
236    /// # use cyclonedds::{Domain, Participant, Topic, Writer};
237    /// # let domain = Domain::default();
238    /// # let participant = Participant::new(&domain)?;
239    /// # #[derive(
240    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
241    /// # )]
242    /// # struct Data {
243    /// #     x: i32,
244    /// # }
245    ///
246    /// let topic = Topic::<Data>::new(&participant, "Example")?;
247    /// let reader = Reader::new(&topic)?;
248    /// # Ok::<_, cyclonedds::Error>(())
249    /// ```
250    pub fn new(topic: &'t Topic<'d, 'p, T>) -> Result<Self> {
251        Self::builder(topic).build()
252    }
253
254    /// Returns a [`ReaderBuilder`](crate::builder::ReaderBuilder) for
255    /// constructing a reader with custom [`QoS`](crate::QoS) or a
256    /// [`listener`](crate::listener::ReaderListener).
257    ///
258    /// # Examples
259    ///
260    /// ```
261    /// use cyclonedds::{QoS, Reader, qos::policy::History};
262    ///
263    /// # use cyclonedds::{Domain, Participant, Topic};
264    /// # let domain = Domain::default();
265    /// # let participant = Participant::new(&domain)?;
266    /// # #[derive(
267    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
268    /// # )]
269    /// # struct Data {
270    /// #     x: i32,
271    /// # }
272    ///
273    /// let topic = Topic::<Data>::new(&participant, "Example")?;
274    /// let qos = QoS::new().with_history(History::KeepAll);
275    /// let reader = Reader::builder(&topic).with_qos(&qos).build()?;
276    /// # Ok::<_, cyclonedds::Error>(())
277    /// ```
278    #[must_use]
279    pub const fn builder<'q>(topic: &'t Topic<'d, 'p, T>) -> ReaderBuilder<'d, 'p, 't, 'q, T> {
280        ReaderBuilder::new(topic)
281    }
282
283    /// Removes and returns all available samples from the reader cache.
284    ///
285    /// Each call to `take` consumes the returned samples so they will not be
286    /// returned by subsequent calls. See [`read`](Reader::read) to leave
287    /// samples in the cache.
288    ///
289    /// # Errors
290    ///
291    /// Returns an [`Error`](crate::Error) if the reader fails to take samples.
292    ///
293    /// # Examples
294    ///
295    /// ```
296    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
297    /// # let domain = Domain::default();
298    /// # let participant = Participant::new(&domain)?;
299    /// # #[derive(
300    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
301    /// # )]
302    /// # struct Data {
303    /// #     x: i32,
304    /// # }
305    /// let topic = Topic::<Data>::new(&participant, "Example")?;
306    /// let reader = Reader::new(&topic)?;
307    /// let writer = Writer::new(&topic)?;
308    ///
309    /// writer.write(&Data::default())?;
310    /// let samples = reader.take()?;
311    /// assert_eq!(samples.len(), 1);
312    ///
313    /// // Samples have been consumed.
314    /// assert!(reader.take()?.is_empty());
315    /// # Ok::<_, cyclonedds::Error>(())
316    /// ```
317    pub fn take(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
318    where
319        T: std::clone::Clone,
320    {
321        ffi::dds_take(self.inner)
322    }
323
324    /// Returns all available samples from the reader cache without removing
325    /// them.
326    ///
327    /// Samples returned by `read` remain in the cache and will be returned
328    /// again by subsequent calls, marked as read in their
329    /// [`Info`](crate::sample::Info) state. See [`take`](Reader::take) to
330    /// consume samples.
331    ///
332    /// # Errors
333    ///
334    /// Returns an [`Error`](crate::Error) if the reader fails to read samples.
335    ///
336    /// # Examples
337    ///
338    /// ```
339    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
340    /// # let domain = Domain::default();
341    /// # let participant = Participant::new(&domain)?;
342    /// # #[derive(
343    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
344    /// # )]
345    /// # struct Data {
346    /// #     x: i32,
347    /// # }
348    /// let topic = Topic::<Data>::new(&participant, "Example")?;
349    /// let reader = Reader::new(&topic)?;
350    /// let writer = Writer::new(&topic)?;
351    ///
352    /// writer.write(&Data::default())?;
353    /// let samples = reader.read()?;
354    /// assert_eq!(samples.len(), 1);
355    ///
356    /// // Samples are still in the cache.
357    /// assert_eq!(reader.read()?.len(), 1);
358    /// # Ok::<_, cyclonedds::Error>(())
359    /// ```
360    pub fn read(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
361    where
362        T: std::clone::Clone,
363    {
364        ffi::dds_read(self.inner)
365    }
366
367    /// Returns all available samples without marking them as read or removing
368    /// them from the cache.
369    ///
370    /// Useful for checking whether data is available without affecting the
371    /// read state of samples. Subsequent calls to [`read`](Reader::read) or
372    /// [`take`](Reader::take) will still return the same samples as unread.
373    ///
374    /// # Errors
375    ///
376    /// Returns an [`Error`](crate::Error) if the reader fails to peek.
377    ///
378    /// # Examples
379    ///
380    /// ```
381    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
382    /// # let domain = Domain::default();
383    /// # let participant = Participant::new(&domain)?;
384    /// # #[derive(
385    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
386    /// # )]
387    /// # struct Data {
388    /// #     x: i32,
389    /// # }
390    /// let topic = Topic::<Data>::new(&participant, "Example")?;
391    /// let reader = Reader::new(&topic)?;
392    /// let writer = Writer::new(&topic)?;
393    ///
394    /// writer.write(&Data::default())?;
395    /// assert_eq!(reader.peek()?.len(), 1);
396    ///
397    /// // Samples are unaffected.
398    /// assert_eq!(reader.take()?.len(), 1);
399    /// # Ok::<_, cyclonedds::Error>(())
400    /// ```
401    pub fn peek(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
402    where
403        T: std::clone::Clone,
404    {
405        ffi::dds_peek(self.inner)
406    }
407
408    /// Returns the instance handles of all writers currently matched with
409    /// this reader.
410    ///
411    /// The returned handles can be compared against
412    /// [`InstanceHandle`](crate::entity::InstanceHandle) values from writer
413    /// entities to identify specific matched writers.
414    ///
415    /// # Errors
416    ///
417    /// Returns an [`Error`](crate::Error) if the reader fails to retrieve the
418    /// matched publications.
419    ///
420    /// # Examples
421    ///
422    /// ```
423    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
424    /// # let domain = Domain::default();
425    /// # let participant = Participant::new(&domain)?;
426    /// # #[derive(
427    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
428    /// # )]
429    /// # struct Data {
430    /// #     x: i32,
431    /// # }
432    /// use cyclonedds::entity::Entity;
433    ///
434    /// let topic = Topic::<Data>::new(&participant, "Example")?;
435    /// let reader = Reader::new(&topic)?;
436    /// let writer = Writer::new(&topic)?;
437    ///
438    /// let matched = reader.matched_publications()?;
439    /// assert_eq!(matched[0], writer.instance_handle()?);
440    /// # Ok::<_, cyclonedds::Error>(())
441    /// ```
442    pub fn matched_publications(&self) -> Result<Vec<crate::entity::InstanceHandle>> {
443        let matched = ffi::dds_get_matched_publications(self.inner)?;
444        let matched = matched
445            .iter()
446            .map(|&inner| crate::entity::InstanceHandle { inner })
447            .collect();
448        Ok(matched)
449    }
450
451    /// Blocks until all historical data available from matched writers with
452    /// [`TransientLocal`](crate::qos::policy::Durability::TransientLocal) or
453    /// higher durability has been received, or until `timeout` elapses.
454    ///
455    /// # Errors
456    ///
457    /// Returns an [`Error`](crate::Error) if the timeout elapses before
458    /// historical data is received or if the reader returns an error.
459    ///
460    /// # Examples
461    ///
462    /// ```
463    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
464    /// # let domain = Domain::default();
465    /// # let participant = Participant::new(&domain)?;
466    /// # #[derive(
467    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
468    /// # )]
469    /// # struct Data {
470    /// #     x: i32,
471    /// # }
472    /// use cyclonedds::Duration;
473    ///
474    /// let topic = Topic::<Data>::new(&participant, "Example")?;
475    /// let reader = Reader::new(&topic)?;
476    /// reader.wait_for_historical_data(Duration::from_secs(1))?;
477    /// # Ok::<_, cyclonedds::Error>(())
478    /// ```
479    pub fn wait_for_historical_data(&self, timeout: crate::Duration) -> Result<()> {
480        ffi::dds_reader_wait_for_historical_data(self.inner, timeout.inner)
481    }
482
483    pub(crate) const fn from_existing(
484        inner: cyclonedds_sys::dds_entity_t,
485    ) -> std::mem::ManuallyDrop<Self> {
486        std::mem::ManuallyDrop::new(Self {
487            inner,
488            phantom_topic: std::marker::PhantomData,
489        })
490    }
491
492    /// Sets the [`ReaderListener`](crate::ReaderListener) on this reader,
493    /// replacing any previously set listener.
494    ///
495    /// # Errors
496    ///
497    /// Returns an [`Error`](crate::Error) if the reader fails to set the
498    /// listener.
499    ///
500    /// # Examples
501    ///
502    /// ```
503    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
504    /// # let domain = Domain::default();
505    /// # let participant = Participant::new(&domain)?;
506    /// # #[derive(
507    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
508    /// # )]
509    /// # struct Data {
510    /// #     x: i32,
511    /// # }
512    /// use cyclonedds::listener::ReaderListener;
513    ///
514    /// let topic = Topic::<Data>::new(&participant, "Example")?;
515    /// let mut reader = Reader::new(&topic)?;
516    /// reader.set_listener(
517    ///     ReaderListener::new().with_subscription_matched(|_, status| {
518    ///         println!("matched writers: {}", status.current.count);
519    ///     }),
520    /// )?;
521    /// # Ok::<_, cyclonedds::Error>(())
522    /// ```
523    pub fn set_listener<L>(&mut self, listener: L) -> Result<()>
524    where
525        L: AsRef<crate::ReaderListener<T>>,
526    {
527        listener
528            .as_ref()
529            .as_ffi()
530            .and_then(|listener| ffi::dds_set_listener(self.inner, Some(listener.inner)))
531    }
532
533    /// Removes the listener from this reader.
534    ///
535    /// # Errors
536    ///
537    /// Returns an [`Error`](crate::Error) if the reader fails to unset the
538    /// listener.
539    ///
540    /// # Examples
541    ///
542    /// ```
543    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
544    /// # let domain = Domain::default();
545    /// # let participant = Participant::new(&domain)?;
546    /// # #[derive(
547    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
548    /// # )]
549    /// # struct Data {
550    /// #     x: i32,
551    /// # }
552    /// let topic = Topic::<Data>::new(&participant, "Example")?;
553    /// let mut reader = Reader::new(&topic)?;
554    /// reader.unset_listener()?;
555    /// # Ok::<_, cyclonedds::Error>(())
556    /// ```
557    pub fn unset_listener(&mut self) -> Result<()> {
558        ffi::dds_set_listener(self.inner, None)?;
559        Ok(())
560    }
561
562    /// Sets the [`ReaderListener`](crate::ReaderListener) on this reader,
563    /// consuming and returning `self`.
564    ///
565    /// # Errors
566    ///
567    /// Returns an [`Error`](crate::Error) if the reader fails to set the
568    /// listener.
569    ///
570    /// # Examples
571    ///
572    /// ```
573    /// use cyclonedds::listener::ReaderListener;
574    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
575    /// # let domain = Domain::default();
576    /// # let participant = Participant::new(&domain)?;
577    /// # #[derive(
578    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
579    /// # )]
580    /// # struct Data {
581    /// #     x: i32,
582    /// # }
583    ///
584    /// let topic = Topic::<Data>::new(&participant, "Example")?;
585    /// let reader = Reader::new(&topic)?.with_listener(ReaderListener::new())?;
586    /// # Ok::<_, cyclonedds::Error>(())
587    /// ```
588    pub fn with_listener<L>(mut self, listener: L) -> Result<Self>
589    where
590        L: AsRef<crate::ReaderListener<T>>,
591    {
592        self.set_listener(listener).map(|()| self)
593    }
594}
595
596impl<T> Drop for Reader<'_, '_, '_, T>
597where
598    T: crate::Topicable,
599{
600    fn drop(&mut self) {
601        let result = ffi::dds_delete(self.inner);
602        debug_assert!(
603            result.is_ok(),
604            "unable to delete {self:?}: failed with {result:?}"
605        );
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use super::*;
612    use crate::entity::Entity;
613
614    #[test]
615    fn test_reader_create() {
616        let domain_id = crate::tests::domain::unique_id();
617        let domain = crate::Domain::new(domain_id).unwrap();
618        let qos = crate::QoS::new();
619        let topic_name = crate::tests::topic::unique_name();
620        let participant = crate::Participant::new(&domain).unwrap();
621        let subscriber = crate::Subscriber::new(&participant).unwrap();
622        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
623        let listener = crate::ReaderListener::new();
624
625        let _ = Reader::new(&topic).unwrap();
626        let _ = Reader::builder(&topic).with_qos(&qos).build().unwrap();
627        let _ = Reader::builder(&topic)
628            .with_subscriber(&subscriber)
629            .build()
630            .unwrap();
631        let _ = Reader::builder(&topic)
632            .with_qos(&qos)
633            .with_subscriber(&subscriber)
634            .with_listener(listener)
635            .build()
636            .unwrap();
637    }
638
639    #[test]
640    fn test_reader_create_with_invalid_topic() {
641        let domain_id = crate::tests::domain::unique_id();
642        let domain = crate::Domain::new(domain_id).unwrap();
643        let qos = crate::QoS::new();
644        let topic_name = crate::tests::topic::unique_name();
645        let participant = crate::Participant::new(&domain).unwrap();
646        let mut topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
647
648        let topic_id = topic.inner;
649        topic.inner = 0;
650        let result = Reader::new(&topic).unwrap_err();
651        assert_eq!(result, crate::Error::BadParameter);
652        let result = Reader::builder(&topic).with_qos(&qos).build().unwrap_err();
653        assert_eq!(result, crate::Error::BadParameter);
654        topic.inner = topic_id;
655    }
656
657    #[test]
658    fn test_reader_create_with_invalid_subscriber() {
659        let domain_id = crate::tests::domain::unique_id();
660        let domain = crate::Domain::new(domain_id).unwrap();
661        let topic_name = crate::tests::topic::unique_name();
662        let participant = crate::Participant::new(&domain).unwrap();
663        let mut subscriber = crate::Subscriber::new(&participant).unwrap();
664        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
665
666        let subscriber_id = subscriber.inner;
667        subscriber.inner = 0;
668        let result = Reader::builder(&topic)
669            .with_subscriber(&subscriber)
670            .build()
671            .unwrap_err();
672        assert_eq!(result, crate::Error::BadParameter);
673        subscriber.inner = subscriber_id;
674    }
675
676    #[test]
677    fn test_reader_empty_read() {
678        let domain_id = crate::tests::domain::unique_id();
679        let domain = crate::Domain::new(domain_id).unwrap();
680        let topic_name = crate::tests::topic::unique_name();
681        let participant = crate::Participant::new(&domain).unwrap();
682        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
683
684        let reader = Reader::new(&topic).unwrap();
685        let _ = reader.read().unwrap();
686    }
687
688    #[test]
689    fn test_reader_empty_take() {
690        let domain_id = crate::tests::domain::unique_id();
691        let domain = crate::Domain::new(domain_id).unwrap();
692        let topic_name = crate::tests::topic::unique_name();
693        let participant = crate::Participant::new(&domain).unwrap();
694        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
695
696        let reader = Reader::new(&topic).unwrap();
697        let _ = reader.take().unwrap();
698    }
699
700    #[test]
701    fn test_reader_empty_peek() {
702        let domain_id = crate::tests::domain::unique_id();
703        let domain = crate::Domain::new(domain_id).unwrap();
704        let topic_name = crate::tests::topic::unique_name();
705        let participant = crate::Participant::new(&domain).unwrap();
706        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
707
708        let reader = Reader::new(&topic).unwrap();
709        let _ = reader.peek().unwrap();
710    }
711
712    #[test]
713    fn test_reader_create_from_existing() {
714        let domain_id = crate::tests::domain::unique_id();
715        let domain = crate::Domain::new(domain_id).unwrap();
716        let topic_name = crate::tests::topic::unique_name();
717        let participant = crate::Participant::new(&domain).unwrap();
718        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
719
720        let reader_01 = Reader::new(&topic).unwrap();
721        let reader_02 = Reader::<crate::tests::topic::Data>::from_existing(reader_01.inner);
722        assert_eq!(reader_01.inner, reader_02.inner);
723    }
724
725    #[test]
726    fn test_reader_wait_for_historical_data() {
727        let domain_id = crate::tests::domain::unique_id();
728        let domain = crate::Domain::new(domain_id).unwrap();
729        let topic_name = crate::tests::topic::unique_name();
730        let participant = crate::Participant::new(&domain).unwrap();
731        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
732
733        let reader = Reader::new(&topic).unwrap();
734
735        reader
736            .wait_for_historical_data(crate::Duration::INFINITE)
737            .unwrap();
738    }
739
740    #[test]
741    fn test_reader_matched_publications() {
742        let domain_id = crate::tests::domain::unique_id();
743        let domain = crate::Domain::new(domain_id).unwrap();
744        let topic_name = crate::tests::topic::unique_name();
745        let participant = crate::Participant::new(&domain).unwrap();
746        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
747
748        let reader = Reader::new(&topic).unwrap();
749        let matched = reader.matched_publications().unwrap();
750        assert_eq!(matched.len(), 0);
751
752        let writer = crate::Writer::new(&topic).unwrap();
753
754        let matched = reader.matched_publications().unwrap();
755
756        assert_eq!(matched.len(), 1);
757        let expected = writer.instance_handle().unwrap();
758        let actual = matched[0];
759        assert_eq!(expected, actual);
760    }
761
762    #[test]
763    fn test_reader_matched_publications_on_invalid_reader() {
764        let domain_id = crate::tests::domain::unique_id();
765        let domain = crate::Domain::new(domain_id).unwrap();
766        let topic_name = crate::tests::topic::unique_name();
767        let participant = crate::Participant::new(&domain).unwrap();
768        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
769
770        let mut reader = Reader::new(&topic).unwrap();
771        let reader_id = reader.inner;
772        reader.inner = 0;
773
774        let result = reader.matched_publications().unwrap_err();
775        assert_eq!(result, crate::Error::BadParameter);
776        reader.inner = reader_id;
777    }
778
779    #[test]
780    fn test_reader_with_listener() {
781        let domain_id = crate::tests::domain::unique_id();
782        let domain = crate::Domain::new(domain_id).unwrap();
783        let topic_name = crate::tests::topic::unique_name();
784        let participant = crate::Participant::new(&domain).unwrap();
785        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
786
787        let listener = crate::ReaderListener::new()
788            .with_data_available(|_| ())
789            .with_liveliness_changed(|_, _| ())
790            .with_requested_deadline_missed(|_, _| ())
791            .with_requested_incompatible_qos(|_, _| ())
792            .with_sample_lost(|_, _| ())
793            .with_sample_rejected(|_, _| ())
794            .with_subscription_matched(|_, _| ());
795
796        let _ = Reader::new(&topic)
797            .unwrap()
798            .with_listener(&listener)
799            .unwrap();
800
801        let mut reader = Reader::new(&topic).unwrap();
802        reader.set_listener(&listener).unwrap();
803        reader.unset_listener().unwrap();
804    }
805
806    #[test]
807    fn test_reader_with_listener_on_invalid_reader() {
808        let domain_id = crate::tests::domain::unique_id();
809        let domain = crate::Domain::new(domain_id).unwrap();
810        let topic_name = crate::tests::topic::unique_name();
811        let participant = crate::Participant::new(&domain).unwrap();
812        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
813
814        let listener = crate::ReaderListener::new();
815
816        let mut reader = Reader::new(&topic).unwrap();
817        let reader_id = reader.inner;
818        reader.inner = 0;
819        let result = reader.set_listener(&listener).unwrap_err();
820        assert_eq!(result, crate::Error::BadParameter);
821        let result = reader.unset_listener().unwrap_err();
822        assert_eq!(result, crate::Error::BadParameter);
823        reader.inner = reader_id;
824    }
825}