Skip to main content

cyclonedds/
writer.rs

1use crate::internal::ffi;
2use crate::internal::traits::AsFfi;
3use crate::{Publisher, Result, Topic};
4
5/// A data writer for topic type [`T`](crate::Topicable).
6///
7/// A `Writer` publishes samples of type `T` to a named [`Topic`](crate::Topic).
8/// Matched [`Readers`](crate::Reader) on the same topic receive the samples
9/// subject to their [`QoS`](crate::QoS) compatibility.
10///
11/// Use [`Writer::new`] for simple construction or [`Writer::builder`] for
12/// [`QoS`](crate::QoS), [`listener`](crate::listener::WriterListener), and
13/// [`publisher`](Publisher) configuration.
14///
15/// # Instance lifecycle
16///
17/// For keyed topics, each unique key value identifies a distinct instance.
18/// Writers can explicitly manage instance lifecycle through
19/// [`register_instance`](Writer::register_instance),
20/// [`unregister_instance`](Writer::unregister_instance), and
21/// [`dispose`](Writer::dispose). Unkeyed topics (where
22/// [`T::Key`](crate::Topicable::Key) is [`()`](primitive@unit)) have
23/// a single instance shared by all samples.
24#[derive(Debug, PartialEq, Eq)]
25pub struct Writer<'domain, 'participant, 'topic, T>
26where
27    T: crate::Topicable,
28{
29    pub(crate) inner: cyclonedds_sys::dds_entity_t,
30    phantom_topic: std::marker::PhantomData<&'topic Topic<'domain, 'participant, T>>,
31}
32
33/// Builder for [`Writer<T>`] (accessible via [`Writer::builder`]).
34#[derive(Debug)]
35pub struct WriterBuilder<'domain, 'participant, 'topic, 'qos, T>
36where
37    T: crate::Topicable,
38{
39    publisher: Option<&'participant Publisher<'domain, 'participant>>,
40    topic: &'topic Topic<'domain, 'participant, T>,
41    qos: Option<&'qos crate::QoS>,
42    listener: Option<crate::WriterListener<T>>,
43}
44
45impl<'d, 'p, 't, 'q, T> WriterBuilder<'d, 'p, 't, 'q, T>
46where
47    T: crate::Topicable,
48{
49    /// Creates a new [`WriterBuilder`] for the given [`Topic`].
50    ///
51    /// # Examples
52    ///
53    /// ```
54    /// use cyclonedds::builder::WriterBuilder;
55    /// use cyclonedds::{Domain, Participant, Topic};
56    /// # #[derive(
57    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
58    /// # )]
59    /// # struct Data {
60    /// #     x: i32,
61    /// # }
62    ///
63    /// let domain = Domain::default();
64    /// let participant = Participant::new(&domain)?;
65    /// let topic = Topic::new(&participant, "MyTopic")?;
66    /// let writer_builder = WriterBuilder::<Data>::new(&topic);
67    /// # Ok::<_, cyclonedds::Error>(())
68    /// ```
69    #[must_use]
70    pub const fn new(topic: &'t Topic<'d, 'p, T>) -> Self {
71        Self {
72            publisher: None,
73            topic,
74            qos: None,
75            listener: None,
76        }
77    }
78
79    /// Sets the [`QoS`](crate::QoS) for this writer builder.
80    ///
81    /// # Examples
82    ///
83    /// ```
84    /// use cyclonedds::builder::WriterBuilder;
85    /// use cyclonedds::qos::policy;
86    /// use cyclonedds::{Duration, QoS};
87    /// # use cyclonedds::{Domain, Participant, Topic};
88    /// # let domain = Domain::default();
89    /// # let participant = Participant::new(&domain)?;
90    /// # #[derive(
91    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
92    /// # )]
93    /// # struct Data {
94    /// #     x: i32,
95    /// # }
96    /// # let topic = Topic::new(&participant, "MyTopic")?;
97    ///
98    /// let qos = QoS::new().with_reliability(policy::Reliability::Reliable {
99    ///     max_blocking_time: Duration::from_millis(100),
100    /// });
101    /// let writer_builder = WriterBuilder::<Data>::new(&topic).with_qos(&qos);
102    /// # Ok::<_, cyclonedds::Error>(())
103    /// ```
104    #[must_use]
105    pub const fn with_qos(mut self, qos: &'q crate::QoS) -> Self {
106        self.qos = Some(qos);
107        self
108    }
109
110    /// Sets the [`Publisher`](crate::Publisher) on this writer builder.
111    ///
112    /// # Examples
113    ///
114    /// ```
115    /// use cyclonedds::Publisher;
116    /// use cyclonedds::WriterListener;
117    /// use cyclonedds::builder::WriterBuilder;
118    /// # use cyclonedds::{Domain, Participant, Topic};
119    /// # let domain = Domain::default();
120    /// # let participant = Participant::new(&domain)?;
121    /// # #[derive(
122    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
123    /// # )]
124    /// # struct Data {
125    /// #     x: i32,
126    /// # }
127    /// # let topic = Topic::new(&participant, "MyTopic")?;
128    ///
129    /// let publisher = Publisher::new(&participant)?;
130    ///
131    /// let writer_builder = WriterBuilder::<Data>::new(&topic).with_publisher(&publisher);
132    /// # Ok::<_, cyclonedds::Error>(())
133    /// ```
134    #[must_use]
135    pub const fn with_publisher(mut self, publisher: &'p Publisher<'d, 'p>) -> Self {
136        self.publisher = Some(publisher);
137        self
138    }
139
140    /// Sets the [`Listener`](crate::Listener) on this writer builder.
141    ///
142    /// # Examples
143    ///
144    /// ```
145    /// use cyclonedds::WriterListener;
146    /// use cyclonedds::builder::WriterBuilder;
147    /// # use cyclonedds::{Domain, Participant, Topic};
148    /// # let domain = Domain::default();
149    /// # let participant = Participant::new(&domain)?;
150    /// # #[derive(
151    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
152    /// # )]
153    /// # struct Data {
154    /// #     x: i32,
155    /// # }
156    /// # let topic = Topic::new(&participant, "MyTopic")?;
157    ///
158    /// let writer_builder = WriterBuilder::<Data>::new(&topic).with_listener(WriterListener::new());
159    /// # Ok::<_, cyclonedds::Error>(())
160    /// ```
161    #[must_use]
162    pub fn with_listener<L>(mut self, listener: L) -> Self
163    where
164        L: AsRef<crate::WriterListener<T>>,
165    {
166        self.listener = Some(listener.as_ref().clone());
167        self
168    }
169
170    /// Builds the [`Writer`].
171    ///
172    /// # Errors
173    ///
174    /// Returns an [`Error`](crate::Error) if the writer failed to create.
175    ///
176    /// # Examples
177    ///
178    /// ```
179    /// use cyclonedds::QoS;
180    /// use cyclonedds::builder::WriterBuilder;
181    /// use cyclonedds::qos::policy;
182    /// # use cyclonedds::{Domain, Participant, Topic};
183    /// # let domain = Domain::default();
184    /// # let participant = Participant::new(&domain)?;
185    /// # #[derive(
186    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
187    /// # )]
188    /// # struct Data {
189    /// #     x: i32,
190    /// # }
191    /// # let topic = Topic::new(&participant, "MyTopic")?;
192    ///
193    /// let qos = QoS::new().with_durability(policy::Durability::TransientLocal);
194    /// let writer = WriterBuilder::<Data>::new(&topic).with_qos(&qos).build()?;
195    /// # Ok::<_, cyclonedds::Error>(())
196    /// ```
197    pub fn build(self) -> Result<Writer<'d, 'p, 't, T>> {
198        // NOTE: using `and_then` to avoid ? branch on the listener for coverage
199        // since the C lib currently panics on OOM rather than returning null.
200        self.listener
201            .map(|listener| listener.as_ffi())
202            .transpose()
203            .and_then(|listener| {
204                Ok(Writer {
205                    inner: ffi::dds_create_writer(
206                        self.publisher
207                            .map_or(ffi::dds_get_participant(self.topic.inner)?, |publisher| {
208                                publisher.inner
209                            }),
210                        self.topic.inner,
211                        self.qos.map(|qos| &qos.inner),
212                        listener.as_ref(),
213                    )?,
214                    phantom_topic: std::marker::PhantomData,
215                })
216            })
217    }
218}
219
220impl<'d, 'p, 't, T> Writer<'d, 'p, 't, T>
221where
222    T: crate::Topicable,
223{
224    /// Creates a new `Writer` for the given [`Topic`](crate::Topic) with
225    /// default [`QoS`](crate::QoS) and no
226    /// [`listener`](crate::listener::WriterListener).
227    ///
228    /// # Errors
229    ///
230    /// Returns an [`Error`](crate::Error) if the writer fails to create.
231    ///
232    /// # Examples
233    ///
234    /// ```
235    /// use cyclonedds::Writer;
236    /// # use cyclonedds::{Domain, Participant, Topic};
237    /// # let domain = Domain::default();
238    /// # let participant = Participant::new(&domain)?;
239    /// # #[derive(
240    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
241    /// # )]
242    /// # struct Data {
243    /// #     #[dds(key)]
244    /// #     x: i32,
245    /// #     #[dds(key)]
246    /// #     y: i32,
247    /// # }
248    ///
249    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
250    /// let writer = Writer::new(&topic)?;
251    /// # Ok::<_, cyclonedds::Error>(())
252    /// ```
253    pub fn new(topic: &'t Topic<'d, 'p, T>) -> Result<Self> {
254        Self::builder(topic).build()
255    }
256
257    /// Returns a [`WriterBuilder`](crate::builder::WriterBuilder) for
258    /// constructing a writer with custom [`QoS`](crate::QoS) or a
259    /// [`listener`](crate::listener::WriterListener).
260    ///
261    /// # Examples
262    ///
263    /// ```
264    /// use cyclonedds::{Duration, QoS, Writer, qos::policy::Reliability};
265    /// # use cyclonedds::{Domain, Participant, Topic};
266    /// # let domain = Domain::default();
267    /// # let participant = Participant::new(&domain)?;
268    /// # #[derive(
269    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
270    /// # )]
271    /// # struct Data {
272    /// #     #[dds(key)]
273    /// #     x: i32,
274    /// #     #[dds(key)]
275    /// #     y: i32,
276    /// # }
277    ///
278    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
279    /// let qos = QoS::new().with_reliability(Reliability::Reliable {
280    ///     max_blocking_time: Duration::from_millis(100),
281    /// });
282    /// let writer = Writer::builder(&topic).with_qos(&qos).build()?;
283    /// # Ok::<_, cyclonedds::Error>(())
284    /// ```
285    #[must_use]
286    pub const fn builder<'q>(topic: &'t Topic<'d, 'p, T>) -> WriterBuilder<'d, 'p, 't, 'q, T> {
287        WriterBuilder::new(topic)
288    }
289
290    /// Writes a sample to the topic.
291    ///
292    /// # Errors
293    ///
294    /// Returns an [`Error`](crate::Error) if the writers fails to write the
295    /// sample.
296    ///
297    /// # Examples
298    ///
299    /// ```
300    /// # use cyclonedds::{Domain, Participant, Topic, Writer};
301    /// # let domain = Domain::default();
302    /// # let participant = Participant::new(&domain)?;
303    /// # #[derive(
304    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
305    /// # )]
306    /// # struct Data {
307    /// #     #[dds(key)]
308    /// #     x: i32,
309    /// #     #[dds(key)]
310    /// #     y: i32,
311    /// # }
312    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
313    /// let writer = Writer::new(&topic)?;
314    /// writer.write(&Data { x: 1, y: 2 })?;
315    /// # Ok::<_, cyclonedds::Error>(())
316    /// ```
317    pub fn write(&self, sample: &T) -> Result<()> {
318        ffi::dds_write(self.inner, sample)
319    }
320
321    /// Writes a sample with an explicit source timestamp.
322    ///
323    /// Use this when the write timestamp should reflect the time the data was
324    /// generated rather than the time it was written.
325    ///
326    /// # Errors
327    ///
328    /// Returns an [`Error`](crate::Error) if the writer fails to write the
329    /// sample.
330    ///
331    /// # Examples
332    ///
333    /// ```
334    /// use cyclonedds::Time;
335    /// # use cyclonedds::{Domain, Participant, Topic, Writer};
336    /// # let domain = Domain::default();
337    /// # let participant = Participant::new(&domain)?;
338    /// # #[derive(
339    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
340    /// # )]
341    /// # struct Data {
342    /// #     #[dds(key)]
343    /// #     x: i32,
344    /// #     #[dds(key)]
345    /// #     y: i32,
346    /// # }
347    ///
348    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
349    /// let writer = Writer::new(&topic)?;
350    /// writer.write_with_timestamp(&Data::default(), Time::from_secs(1))?;
351    /// # Ok::<_, cyclonedds::Error>(())
352    /// ```
353    pub fn write_with_timestamp(&self, sample: &T, timestamp: crate::Time) -> Result<()> {
354        ffi::dds_write_with_timestamp(self.inner, sample, timestamp.inner)
355    }
356
357    /// Flushes batched samples to the network.
358    ///
359    /// Only relevant when write batching is enabled in the domain
360    /// configuration. Has no effect otherwise.
361    ///
362    /// # Errors
363    ///
364    /// Returns an [`Error`](crate::Error) if the writer fails to flush.
365    ///
366    /// # Examples
367    ///
368    /// ```
369    /// # use cyclonedds::{Domain, Participant, Topic, Writer};
370    /// # let domain = Domain::default();
371    /// # let participant = Participant::new(&domain)?;
372    /// # #[derive(
373    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
374    /// # )]
375    /// # struct Data {
376    /// #     #[dds(key)]
377    /// #     x: i32,
378    /// #     #[dds(key)]
379    /// #     y: i32,
380    /// # }
381    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
382    /// let writer = Writer::new(&topic)?;
383    /// writer.write(&Data::default())?;
384    /// writer.flush()?;
385    /// # Ok::<_, cyclonedds::Error>(())
386    /// ```
387    pub fn flush(&self) -> Result<()> {
388        ffi::dds_write_flush(self.inner)
389    }
390
391    /// Blocks until all written samples have been acknowledged by all matched
392    /// reliable readers, or until `timeout` elapses.
393    ///
394    /// # Errors
395    ///
396    /// Returns an [`Error`](crate::Error) if the timeout elapses before all
397    /// acknowledgements are received or if the writer encounters an unexpected
398    /// error.
399    ///
400    /// # Examples
401    ///
402    /// ```
403    /// use cyclonedds::Duration;
404    /// # use cyclonedds::{Domain, Participant, Topic, Writer};
405    /// # let domain = Domain::default();
406    /// # let participant = Participant::new(&domain)?;
407    /// # #[derive(
408    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
409    /// # )]
410    /// # struct Data {
411    /// #     #[dds(key)]
412    /// #     x: i32,
413    /// #     #[dds(key)]
414    /// #     y: i32,
415    /// # }
416    ///
417    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
418    /// let writer = Writer::new(&topic)?;
419    /// writer.write(&Data::default())?;
420    /// writer.wait_for_acks(Duration::from_secs(1))?;
421    /// # Ok::<_, cyclonedds::Error>(())
422    /// ```
423    pub fn wait_for_acks(&self, timeout: crate::Duration) -> Result<()> {
424        ffi::dds_wait_for_acks(self.inner, timeout.inner)
425    }
426
427    /// Returns the instance handles of all readers currently matched with
428    /// this writer.
429    ///
430    /// The returned handles can be compared against
431    /// [`InstanceHandle`](crate::entity::InstanceHandle) values from reader
432    /// entities to identify specific matched readers.
433    ///
434    /// # Errors
435    ///
436    /// Returns an [`Error`](crate::Error) if the writer fails to retrieve the
437    /// matched subscriptions.
438    ///
439    /// # Examples
440    ///
441    /// ```
442    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
443    /// # let domain = Domain::default();
444    /// # let participant = Participant::new(&domain)?;
445    /// # #[derive(
446    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
447    /// # )]
448    /// # struct Data {
449    /// #     #[dds(key)]
450    /// #     x: i32,
451    /// #     #[dds(key)]
452    /// #     y: i32,
453    /// # }
454    /// use cyclonedds::entity::Entity;
455    ///
456    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
457    /// let writer = Writer::new(&topic)?;
458    /// let reader = Reader::new(&topic)?;
459    ///
460    /// let matched = writer.matched_subscriptions()?;
461    /// assert_eq!(matched[0], reader.instance_handle()?);
462    /// # Ok::<_, cyclonedds::Error>(())
463    /// ```
464    pub fn matched_subscriptions(&self) -> Result<Vec<crate::entity::InstanceHandle>> {
465        ffi::dds_get_matched_subscriptions(self.inner).map(|matched| {
466            matched
467                .iter()
468                .map(|&inner| crate::entity::InstanceHandle { inner })
469                .collect()
470        })
471    }
472
473    /// Registers an instance identified by `key` with this writer.
474    ///
475    /// Registration is optional but allows for the pre-allocation of resources
476    /// for the instance. Returns the
477    /// [`InstanceHandle`](crate::entity::InstanceHandle) assigned to the
478    /// instance.
479    ///
480    /// # Errors
481    ///
482    /// Returns an [`Error`](crate::Error) if the writer fails to register the
483    /// instance.
484    ///
485    /// # Examples
486    ///
487    /// ```
488    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
489    /// # let domain = Domain::default();
490    /// # let participant = Participant::new(&domain)?;
491    /// # #[derive(
492    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
493    /// # )]
494    /// # struct Data {
495    /// #     #[dds(key)]
496    /// #     x: i32,
497    /// #     #[dds(key)]
498    /// #     y: i32,
499    /// # }
500    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
501    /// let writer = Writer::new(&topic)?;
502    /// let handle = writer.register_instance(&Key::<Data> { x: 1, y: 2 })?;
503    /// # Ok::<_, cyclonedds::Error>(())
504    /// ```
505    pub fn register_instance(&self, key: &T::Key) -> Result<crate::entity::InstanceHandle> {
506        ffi::dds_register_instance::<T>(self.inner, key)
507            .map(|inner| crate::entity::InstanceHandle { inner })
508    }
509
510    /// Unregisters an instance identified by `key` from this writer.
511    ///
512    /// Notifies matched readers that this writer will no longer publish
513    /// samples for the given instance.
514    ///
515    /// # Errors
516    ///
517    /// Returns an [`Error`](crate::Error) if the writer fails to unregister
518    /// the instance.
519    ///
520    /// # Examples
521    ///
522    /// ```
523    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
524    /// # let domain = Domain::default();
525    /// # let participant = Participant::new(&domain)?;
526    /// # #[derive(
527    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
528    /// # )]
529    /// # struct Data {
530    /// #     #[dds(key)]
531    /// #     x: i32,
532    /// #     #[dds(key)]
533    /// #     y: i32,
534    /// # }
535    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
536    /// let writer = Writer::new(&topic)?;
537    /// writer.register_instance(&Key::<Data> { x: 1, y: 2 })?;
538    /// writer.unregister_instance(&Key::<Data> { x: 1, y: 2 })?;
539    /// # Ok::<_, cyclonedds::Error>(())
540    /// ```
541    pub fn unregister_instance(&self, key: &T::Key) -> Result<()> {
542        ffi::dds_unregister_instance::<T>(self.inner, key)
543    }
544
545    /// Unregisters an instance identified by its
546    /// [`InstanceHandle`](crate::entity::InstanceHandle).
547    ///
548    /// # Errors
549    ///
550    /// Returns an [`Error`](crate::Error) if the writer fails to unregister
551    /// the instance.
552    ///
553    /// # Examples
554    ///
555    /// ```
556    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
557    /// # let domain = Domain::default();
558    /// # let participant = Participant::new(&domain)?;
559    /// # #[derive(
560    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
561    /// # )]
562    /// # struct Data {
563    /// #     #[dds(key)]
564    /// #     x: i32,
565    /// #     #[dds(key)]
566    /// #     y: i32,
567    /// # }
568    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
569    /// let writer = Writer::new(&topic)?;
570    /// let handle = writer.register_instance(&Key::<Data> { x: 1, y: 2 })?;
571    /// writer.unregister_instance_by_handle(handle)?;
572    /// # Ok::<_, cyclonedds::Error>(())
573    /// ```
574    pub fn unregister_instance_by_handle(
575        &self,
576        instance_handle: crate::entity::InstanceHandle,
577    ) -> Result<()> {
578        ffi::dds_unregister_instance_by_handle(self.inner, instance_handle.inner)
579    }
580
581    /// Unregisters an instance identified by `key` with an explicit timestamp.
582    ///
583    /// # Errors
584    ///
585    /// Returns an [`Error`](crate::Error) if the writer fails to unregister
586    /// the instance.
587    ///
588    /// # Examples
589    ///
590    /// ```
591    /// use cyclonedds::Time;
592    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
593    /// # let domain = Domain::default();
594    /// # let participant = Participant::new(&domain)?;
595    /// # #[derive(
596    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
597    /// # )]
598    /// # struct Data {
599    /// #     #[dds(key)]
600    /// #     x: i32,
601    /// #     #[dds(key)]
602    /// #     y: i32,
603    /// # }
604    ///
605    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
606    /// let writer = Writer::new(&topic)?;
607    /// writer.unregister_instance_with_timestamp(&Key::<Data> { x: 1, y: 2 }, Time::from_secs(1))?;
608    /// # Ok::<_, cyclonedds::Error>(())
609    /// ```
610    pub fn unregister_instance_with_timestamp(
611        &self,
612        key: &T::Key,
613        timestamp: crate::Time,
614    ) -> Result<()> {
615        ffi::dds_unregister_instance_with_timestamp::<T>(self.inner, key, timestamp.inner)
616    }
617
618    /// Unregisters an instance identified by its
619    /// [`InstanceHandle`](crate::entity::InstanceHandle) with an explicit
620    /// timestamp.
621    ///
622    /// # Errors
623    ///
624    /// Returns an [`Error`](crate::Error) if the writer fails to unregister
625    /// the instance.
626    ///
627    /// # Examples
628    ///
629    /// ```
630    /// use cyclonedds::Time;
631    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
632    /// # let domain = Domain::default();
633    /// # let participant = Participant::new(&domain)?;
634    /// # #[derive(
635    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
636    /// # )]
637    /// # struct Data {
638    /// #     #[dds(key)]
639    /// #     x: i32,
640    /// #     #[dds(key)]
641    /// #     y: i32,
642    /// # }
643    ///
644    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
645    /// let writer = Writer::new(&topic)?;
646    /// let handle = writer.register_instance(&Key::<Data> { x: 1, y: 2 })?;
647    /// writer.unregister_instance_by_handle_with_timestamp(handle, Time::from_secs(1))?;
648    /// # Ok::<_, cyclonedds::Error>(())
649    /// ```
650    pub fn unregister_instance_by_handle_with_timestamp(
651        &self,
652        instance_handle: crate::entity::InstanceHandle,
653        timestamp: crate::Time,
654    ) -> Result<()> {
655        ffi::dds_unregister_instance_by_handle_with_timestamp(
656            self.inner,
657            instance_handle.inner,
658            timestamp.inner,
659        )
660    }
661
662    /// Returns the [`InstanceHandle`](crate::entity::InstanceHandle) for the
663    /// instance identified by `key`, or `None` if the instance is not
664    /// registered.
665    ///
666    /// # Examples
667    ///
668    /// ```
669    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
670    /// # let domain = Domain::default();
671    /// # let participant = Participant::new(&domain)?;
672    /// # #[derive(
673    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
674    /// # )]
675    /// # struct Data {
676    /// #     #[dds(key)]
677    /// #     x: i32,
678    /// #     #[dds(key)]
679    /// #     y: i32,
680    /// # }
681    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
682    /// let writer = Writer::new(&topic)?;
683    /// let handle = writer.register_instance(&Key::<Data> { x: 1, y: 2 })?;
684    /// assert_eq!(
685    ///     writer.lookup_instance(&Key::<Data> { x: 1, y: 2 }),
686    ///     Some(handle)
687    /// );
688    /// assert_eq!(writer.lookup_instance(&Key::<Data> { x: 9, y: 9 }), None);
689    /// # Ok::<_, cyclonedds::Error>(())
690    /// ```
691    pub fn lookup_instance(&self, key: &T::Key) -> Option<crate::entity::InstanceHandle> {
692        ffi::dds_lookup_instance::<T>(self.inner, key)
693            .map(|inner| crate::entity::InstanceHandle { inner })
694    }
695
696    /// Writes a sample and immediately disposes the instance.
697    ///
698    /// Equivalent to calling [`write`](Writer::write) followed by
699    /// [`dispose`](Writer::dispose) but in a single operation.
700    ///
701    /// # Errors
702    ///
703    /// Returns an [`Error`](crate::Error) if the writer fails to write or
704    /// dispose.
705    ///
706    /// # Examples
707    ///
708    /// ```
709    /// # use cyclonedds::{Domain, Participant, Topic, Writer};
710    /// # let domain = Domain::default();
711    /// # let participant = Participant::new(&domain)?;
712    /// # #[derive(
713    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
714    /// # )]
715    /// # struct Data {
716    /// #     #[dds(key)]
717    /// #     x: i32,
718    /// #     #[dds(key)]
719    /// #     y: i32,
720    /// # }
721    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
722    /// let writer = Writer::new(&topic)?;
723    /// writer.write_dispose(&Data { x: 1, y: 2 })?;
724    /// # Ok::<_, cyclonedds::Error>(())
725    /// ```
726    pub fn write_dispose(&self, data: &T) -> Result<()> {
727        ffi::dds_write_dispose(self.inner, data)
728    }
729
730    /// Writes a sample and immediately disposes the instance with an explicit
731    /// timestamp.
732    ///
733    /// # Errors
734    ///
735    /// Returns an [`Error`](crate::Error) if the writer fails to write or
736    /// dispose.
737    ///
738    /// # Examples
739    ///
740    /// ```
741    /// use cyclonedds::Time;
742    /// # use cyclonedds::{Domain, Participant, Topic, Writer};
743    /// # let domain = Domain::default();
744    /// # let participant = Participant::new(&domain)?;
745    /// # #[derive(
746    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
747    /// # )]
748    /// # struct Data {
749    /// #     #[dds(key)]
750    /// #     x: i32,
751    /// #     #[dds(key)]
752    /// #     y: i32,
753    /// # }
754    ///
755    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
756    /// let writer = Writer::new(&topic)?;
757    /// writer.write_dispose_with_timestamp(&Data::default(), Time::from_secs(1))?;
758    /// # Ok::<_, cyclonedds::Error>(())
759    /// ```
760    pub fn write_dispose_with_timestamp(&self, data: &T, timestamp: crate::Time) -> Result<()> {
761        ffi::dds_write_dispose_with_timestamp(self.inner, data, timestamp.inner)
762    }
763
764    /// Disposes the instance identified by `key`.
765    ///
766    /// Notifies matched readers that the instance is no longer valid. The
767    /// instance remains known but its state transitions to disposed.
768    ///
769    /// # Errors
770    ///
771    /// Returns an [`Error`](crate::Error) if the writer fails to dispose the
772    /// instance.
773    ///
774    /// # Examples
775    ///
776    /// ```
777    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
778    /// # let domain = Domain::default();
779    /// # let participant = Participant::new(&domain)?;
780    /// # #[derive(
781    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
782    /// # )]
783    /// # struct Data {
784    /// #     #[dds(key)]
785    /// #     x: i32,
786    /// #     #[dds(key)]
787    /// #     y: i32,
788    /// # }
789    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
790    /// let writer = Writer::new(&topic)?;
791    /// writer.write(&Data { x: 1, y: 2 })?;
792    /// writer.dispose(&Key::<Data> { x: 1, y: 2 })?;
793    /// # Ok::<_, cyclonedds::Error>(())
794    /// ```
795    pub fn dispose(&self, key: &T::Key) -> Result<()> {
796        ffi::dds_dispose::<T>(self.inner, key)
797    }
798
799    /// Disposes the instance identified by `key` with an explicit timestamp.
800    ///
801    /// # Errors
802    ///
803    /// Returns an [`Error`](crate::Error) if the writer fails to dispose the
804    /// instance.
805    ///
806    /// # Examples
807    ///
808    /// ```
809    /// use cyclonedds::Time;
810    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
811    /// # let domain = Domain::default();
812    /// # let participant = Participant::new(&domain)?;
813    /// # #[derive(
814    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
815    /// # )]
816    /// # struct Data {
817    /// #     #[dds(key)]
818    /// #     x: i32,
819    /// #     #[dds(key)]
820    /// #     y: i32,
821    /// # }
822    ///
823    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
824    /// let writer = Writer::new(&topic)?;
825    /// writer.dispose_with_timestamp(&Key::<Data> { x: 1, y: 2 }, Time::from_secs(1))?;
826    /// # Ok::<_, cyclonedds::Error>(())
827    /// ```
828    pub fn dispose_with_timestamp(&self, key: &T::Key, timestamp: crate::Time) -> Result<()> {
829        ffi::dds_dispose_with_timestamp::<T>(self.inner, key, timestamp.inner)
830    }
831
832    /// Disposes the instance identified by its
833    /// [`InstanceHandle`](crate::entity::InstanceHandle).
834    ///
835    /// # Errors
836    ///
837    /// Returns an [`Error`](crate::Error) if the writer fails to dispose the
838    /// instance.
839    ///
840    /// # Examples
841    ///
842    /// ```
843    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
844    /// # let domain = Domain::default();
845    /// # let participant = Participant::new(&domain)?;
846    /// # #[derive(
847    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
848    /// # )]
849    /// # struct Data {
850    /// #     #[dds(key)]
851    /// #     x: i32,
852    /// #     #[dds(key)]
853    /// #     y: i32,
854    /// # }
855    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
856    /// let writer = Writer::new(&topic)?;
857    /// let handle = writer.register_instance(&Key::<Data> { x: 1, y: 2 })?;
858    /// writer.dispose_instance_by_handle(handle)?;
859    /// # Ok::<_, cyclonedds::Error>(())
860    /// ```
861    pub fn dispose_instance_by_handle(
862        &self,
863        instance_handle: crate::entity::InstanceHandle,
864    ) -> Result<()> {
865        ffi::dds_dispose_instance_by_handle(self.inner, instance_handle.inner)
866    }
867
868    /// Disposes the instance identified by its
869    /// [`InstanceHandle`](crate::entity::InstanceHandle) with an explicit
870    /// timestamp.
871    ///
872    /// # Errors
873    ///
874    /// Returns an [`Error`](crate::Error) if the writer fails to dispose the
875    /// instance.
876    ///
877    /// # Examples
878    ///
879    /// ```
880    /// use cyclonedds::Time;
881    /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
882    /// # let domain = Domain::default();
883    /// # let participant = Participant::new(&domain)?;
884    /// # #[derive(
885    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
886    /// # )]
887    /// # struct Data {
888    /// #     #[dds(key)]
889    /// #     x: i32,
890    /// #     #[dds(key)]
891    /// #     y: i32,
892    /// # }
893    ///
894    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
895    /// let writer = Writer::new(&topic)?;
896    /// let handle = writer.register_instance(&Key::<Data> { x: 1, y: 2 })?;
897    /// writer.dispose_instance_by_handle_with_timestamp(handle, Time::from_secs(1))?;
898    /// # Ok::<_, cyclonedds::Error>(())
899    /// ```
900    pub fn dispose_instance_by_handle_with_timestamp(
901        &self,
902        instance_handle: crate::entity::InstanceHandle,
903        timestamp: crate::Time,
904    ) -> Result<()> {
905        ffi::dds_dispose_instance_by_handle_with_timestamp(
906            self.inner,
907            instance_handle.inner,
908            timestamp.inner,
909        )
910    }
911
912    pub(crate) const fn from_existing(
913        inner: cyclonedds_sys::dds_entity_t,
914    ) -> std::mem::ManuallyDrop<Self> {
915        std::mem::ManuallyDrop::new(Self {
916            inner,
917            phantom_topic: std::marker::PhantomData,
918        })
919    }
920
921    /// Sets the [`WriterListener`](crate::WriterListener) on this writer,
922    /// replacing any previously set listener.
923    ///
924    /// # Errors
925    ///
926    /// Returns an [`Error`](crate::Error) if the writer fails to set the
927    /// listener.
928    ///
929    /// # Examples
930    ///
931    /// ```
932    /// use cyclonedds::listener::WriterListener;
933    /// # use cyclonedds::{Domain, Participant, Topic, Writer};
934    /// # let domain = Domain::default();
935    /// # let participant = Participant::new(&domain)?;
936    /// # #[derive(
937    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
938    /// # )]
939    /// # struct Data {
940    /// #     x: i32,
941    /// # }
942    ///
943    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
944    /// let mut writer = Writer::new(&topic)?;
945    /// writer.set_listener(WriterListener::new().with_publication_matched(|_, status| {
946    ///     println!("matched readers: {}", status.current.count);
947    /// }))?;
948    /// # Ok::<_, cyclonedds::Error>(())
949    /// ```
950    pub fn set_listener<L>(&mut self, listener: L) -> Result<()>
951    where
952        L: AsRef<crate::WriterListener<T>>,
953    {
954        listener
955            .as_ref()
956            .as_ffi()
957            .and_then(|listener| ffi::dds_set_listener(self.inner, Some(listener.inner)))
958    }
959
960    /// Removes the listener from this writer.
961    ///
962    /// # Errors
963    ///
964    /// Returns an [`Error`](crate::Error) if the writer fails to unset the
965    /// listener.
966    ///
967    /// # Examples
968    ///
969    /// ```
970    /// # use cyclonedds::{Domain, Participant, Topic, Writer};
971    /// # let domain = Domain::default();
972    /// # let participant = Participant::new(&domain)?;
973    /// # #[derive(
974    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
975    /// # )]
976    /// # struct Data {
977    /// #     x: i32,
978    /// # }
979    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
980    /// let mut writer = Writer::new(&topic)?;
981    /// writer.unset_listener()?;
982    /// # Ok::<_, cyclonedds::Error>(())
983    /// ```
984    pub fn unset_listener(&mut self) -> Result<()> {
985        ffi::dds_set_listener(self.inner, None)?;
986        Ok(())
987    }
988
989    /// Sets the [`WriterListener`](crate::WriterListener) on this writer,
990    /// consuming and returning `self`.
991    ///
992    /// # Errors
993    ///
994    /// Returns an [`Error`](crate::Error) if the writer fails to set the
995    /// listener.
996    ///
997    /// # Examples
998    ///
999    /// ```
1000    /// use cyclonedds::listener::WriterListener;
1001    /// # use cyclonedds::{Domain, Participant, Topic, Writer};
1002    /// # let domain = Domain::default();
1003    /// # let participant = Participant::new(&domain)?;
1004    /// # #[derive(
1005    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
1006    /// # )]
1007    /// # struct Data {
1008    /// #     x: i32,
1009    /// # }
1010    ///
1011    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
1012    /// let writer = Writer::new(&topic)?.with_listener(WriterListener::new())?;
1013    /// # Ok::<_, cyclonedds::Error>(())
1014    /// ```
1015    pub fn with_listener<L>(mut self, listener: L) -> Result<Self>
1016    where
1017        L: AsRef<crate::WriterListener<T>>,
1018    {
1019        self.set_listener(listener).map(|()| self)
1020    }
1021}
1022
1023impl<T> Drop for Writer<'_, '_, '_, T>
1024where
1025    T: crate::Topicable,
1026{
1027    fn drop(&mut self) {
1028        let result = ffi::dds_delete(self.inner);
1029        debug_assert!(
1030            result.is_ok(),
1031            "unable to delete {self:?}, failed with: {result:?}"
1032        );
1033    }
1034}
1035
1036#[cfg(test)]
1037mod tests {
1038    use super::*;
1039    use crate::Topicable;
1040
1041    #[test]
1042    fn test_writer_create() {
1043        let domain_id = crate::tests::domain::unique_id();
1044        let domain = crate::Domain::new(domain_id).unwrap();
1045        let qos = crate::QoS::new();
1046        let topic_name = crate::tests::topic::unique_name();
1047        let participant = crate::Participant::new(&domain).unwrap();
1048        let publisher = crate::Publisher::new(&participant).unwrap();
1049        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1050        let listener = crate::WriterListener::new();
1051
1052        let _ = Writer::new(&topic).unwrap();
1053        let _ = Writer::builder(&topic).with_qos(&qos).build().unwrap();
1054        let _ = Writer::builder(&topic)
1055            .with_publisher(&publisher)
1056            .build()
1057            .unwrap();
1058        let _ = Writer::builder(&topic)
1059            .with_qos(&qos)
1060            .with_publisher(&publisher)
1061            .with_listener(listener)
1062            .build()
1063            .unwrap();
1064    }
1065
1066    #[test]
1067    fn test_writer_create_with_invalid_topic() {
1068        let domain_id = crate::tests::domain::unique_id();
1069        let domain = crate::Domain::new(domain_id).unwrap();
1070        let qos = crate::QoS::new();
1071        let topic_name = crate::tests::topic::unique_name();
1072        let participant = crate::Participant::new(&domain).unwrap();
1073        let mut topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1074
1075        let topic_id = topic.inner;
1076        topic.inner = 0;
1077        let result = Writer::new(&topic).unwrap_err();
1078        assert_eq!(result, crate::Error::BadParameter);
1079        let result = Writer::builder(&topic).with_qos(&qos).build().unwrap_err();
1080        assert_eq!(result, crate::Error::BadParameter);
1081        topic.inner = topic_id;
1082    }
1083
1084    #[test]
1085    fn test_writer_create_with_invalid_publisher() {
1086        let domain_id = crate::tests::domain::unique_id();
1087        let domain = crate::Domain::new(domain_id).unwrap();
1088        let topic_name = crate::tests::topic::unique_name();
1089        let participant = crate::Participant::new(&domain).unwrap();
1090        let mut publisher = crate::Publisher::new(&participant).unwrap();
1091        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1092
1093        let publisher_id = publisher.inner;
1094        publisher.inner = 0;
1095        let result = Writer::builder(&topic)
1096            .with_publisher(&publisher)
1097            .build()
1098            .unwrap_err();
1099        assert_eq!(result, crate::Error::BadParameter);
1100        publisher.inner = publisher_id;
1101    }
1102
1103    #[test]
1104    fn test_writer_write() {
1105        let domain_id = crate::tests::domain::unique_id();
1106        let domain = crate::Domain::new(domain_id).unwrap();
1107        let topic_name = crate::tests::topic::unique_name();
1108        let participant = crate::Participant::new(&domain).unwrap();
1109        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1110        let writer = Writer::new(&topic).unwrap();
1111        writer.write(&crate::tests::topic::Data::default()).unwrap();
1112    }
1113
1114    #[test]
1115    fn test_writer_write_with_timestamp() {
1116        let domain_id = crate::tests::domain::unique_id();
1117        let domain = crate::Domain::new(domain_id).unwrap();
1118        let topic_name = crate::tests::topic::unique_name();
1119        let participant = crate::Participant::new(&domain).unwrap();
1120        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1121        let writer = Writer::new(&topic).unwrap();
1122        let timestamp = crate::Time::from_nanos(10001);
1123        writer
1124            .write_with_timestamp(&crate::tests::topic::Data::default(), timestamp)
1125            .unwrap();
1126    }
1127
1128    #[test]
1129    fn test_writer_operations_on_invalid_writer() {
1130        let domain_id = crate::tests::domain::unique_id();
1131        let domain = crate::Domain::new(domain_id).unwrap();
1132        let topic_name = crate::tests::topic::unique_name();
1133        let participant = crate::Participant::new(&domain).unwrap();
1134        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1135        let mut writer = Writer::new(&topic).unwrap();
1136
1137        let sample = crate::tests::topic::Data::default();
1138        let key = sample.as_key();
1139        let timestamp = crate::Time::from_nanos(10001);
1140        let instance_handle = crate::entity::InstanceHandle { inner: 0 };
1141
1142        let writer_id = writer.inner;
1143        writer.inner = 0;
1144
1145        let result = writer.write(&sample).unwrap_err();
1146        assert_eq!(result, crate::Error::BadParameter);
1147
1148        let result = writer.write_dispose(&sample).unwrap_err();
1149        assert_eq!(result, crate::Error::BadParameter);
1150
1151        let result = writer.write_with_timestamp(&sample, timestamp).unwrap_err();
1152        assert_eq!(result, crate::Error::BadParameter);
1153
1154        let result = writer
1155            .write_dispose_with_timestamp(&sample, timestamp)
1156            .unwrap_err();
1157        assert_eq!(result, crate::Error::BadParameter);
1158
1159        let result = writer.unregister_instance(&key).unwrap_err();
1160        assert_eq!(result, crate::Error::BadParameter);
1161
1162        let result = writer
1163            .unregister_instance_with_timestamp(&key, timestamp)
1164            .unwrap_err();
1165        assert_eq!(result, crate::Error::BadParameter);
1166
1167        let result = writer
1168            .unregister_instance_by_handle(instance_handle)
1169            .unwrap_err();
1170        assert_eq!(result, crate::Error::BadParameter);
1171
1172        let result = writer
1173            .unregister_instance_by_handle_with_timestamp(instance_handle, timestamp)
1174            .unwrap_err();
1175        assert_eq!(result, crate::Error::BadParameter);
1176
1177        let result = writer.dispose(&key).unwrap_err();
1178        assert_eq!(result, crate::Error::BadParameter);
1179
1180        let result = writer.dispose_with_timestamp(&key, timestamp).unwrap_err();
1181        assert_eq!(result, crate::Error::BadParameter);
1182
1183        let result = writer
1184            .dispose_instance_by_handle(instance_handle)
1185            .unwrap_err();
1186        assert_eq!(result, crate::Error::BadParameter);
1187
1188        let result = writer
1189            .dispose_instance_by_handle_with_timestamp(instance_handle, timestamp)
1190            .unwrap_err();
1191        assert_eq!(result, crate::Error::BadParameter);
1192
1193        let result = writer.flush().unwrap_err();
1194        assert_eq!(result, crate::Error::BadParameter);
1195
1196        let result = writer.register_instance(&key).unwrap_err();
1197        assert_eq!(result, crate::Error::BadParameter);
1198
1199        let result = writer.matched_subscriptions().unwrap_err();
1200        assert_eq!(result, crate::Error::BadParameter);
1201
1202        assert_eq!(result, crate::Error::BadParameter);
1203        writer.inner = writer_id;
1204    }
1205
1206    #[test]
1207    fn test_writer_create_from_existing() {
1208        let domain_id = crate::tests::domain::unique_id();
1209        let domain = crate::Domain::new(domain_id).unwrap();
1210        let topic_name = crate::tests::topic::unique_name();
1211        let participant = crate::Participant::new(&domain).unwrap();
1212        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1213
1214        let writer_01 = Writer::new(&topic).unwrap();
1215        let writer_02 = Writer::<crate::tests::topic::Data>::from_existing(writer_01.inner);
1216        assert_eq!(writer_01.inner, writer_02.inner);
1217    }
1218
1219    #[test]
1220    fn test_writer_register_unregister_instance() {
1221        use crate::state;
1222
1223        let domain_id = crate::tests::domain::unique_id();
1224        let domain = crate::Domain::new(domain_id).unwrap();
1225        let topic_name = crate::tests::topic::unique_name();
1226        let participant = crate::Participant::new(&domain).unwrap();
1227        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1228
1229        let writer = Writer::new(&topic).unwrap();
1230        let reader = crate::Reader::new(&topic).unwrap();
1231
1232        let sample = crate::tests::topic::Data {
1233            x: 0,
1234            y: 1,
1235            message: String::from("initial"),
1236        };
1237        writer.write(&sample).unwrap();
1238        let sample = crate::tests::topic::Data {
1239            x: 1,
1240            y: 2,
1241            message: String::from("registered"),
1242        };
1243        writer.write(&sample).unwrap();
1244        let registered_handle = writer.register_instance(&sample.as_key()).unwrap();
1245        let sample = crate::tests::topic::Data {
1246            x: 2,
1247            y: 3,
1248            message: String::from("unregistered"),
1249        };
1250        writer.write(&sample).unwrap();
1251        writer.unregister_instance(&sample.as_key()).unwrap();
1252
1253        writer.write(&crate::tests::topic::Data::default()).unwrap();
1254
1255        let samples = reader.take().unwrap();
1256        assert_eq!(samples.len(), 4);
1257
1258        for sample in samples {
1259            assert!(sample.is_sample());
1260            match sample.message.as_ref() {
1261                "initial" => {
1262                    assert_eq!(
1263                        (sample.x, sample.y, sample.message.as_ref()),
1264                        (0, 1, "initial")
1265                    );
1266
1267                    assert_eq!(
1268                        sample.info().state,
1269                        state::sample::Fresh | state::view::New | state::instance::Alive
1270                    );
1271                }
1272                "registered" => {
1273                    assert_eq!(
1274                        (sample.x, sample.y, sample.message.as_ref()),
1275                        (1, 2, "registered")
1276                    );
1277                    let info = sample.info();
1278                    assert_eq!(info.instance_handle, registered_handle);
1279                    assert_eq!(
1280                        info.state,
1281                        state::sample::Fresh | state::view::New | state::instance::Alive
1282                    );
1283                }
1284                "unregistered" => {
1285                    assert_eq!(
1286                        (sample.x, sample.y, sample.message.as_ref()),
1287                        (2, 3, "unregistered")
1288                    );
1289                    let info = sample.info();
1290                    assert_eq!(
1291                        info.state,
1292                        state::sample::Fresh | state::view::New | state::instance::Disposed
1293                    );
1294                }
1295                _ => {
1296                    assert_eq!(*sample, crate::tests::topic::Data::default());
1297                    assert_eq!(
1298                        sample.info().state,
1299                        state::sample::Fresh | state::view::New | state::instance::Alive
1300                    );
1301                }
1302            }
1303        }
1304
1305        let sample = crate::tests::topic::Data {
1306            x: 4,
1307            y: 5,
1308            message: String::from("registered"),
1309        };
1310        let key = sample.as_key();
1311        let registered_handle = writer.register_instance(&key).unwrap();
1312        writer.write(&sample).unwrap();
1313        let lookup_handle = writer.lookup_instance(&key).unwrap();
1314        assert_eq!(registered_handle, lookup_handle);
1315        writer
1316            .unregister_instance_by_handle(registered_handle)
1317            .unwrap();
1318        let received_sample = &reader.read().unwrap()[0];
1319        assert_eq!(**received_sample, sample);
1320        assert_eq!(
1321            received_sample.info().state,
1322            state::sample::Fresh | state::view::New | state::instance::Disposed
1323        );
1324    }
1325
1326    // TODO this test doesn't really validate the flushing side.
1327    #[test]
1328    fn test_writer_flush() {
1329        let domain_id = crate::tests::domain::unique_id();
1330        let domain = crate::Domain::new(domain_id).unwrap();
1331        let topic_name = crate::tests::topic::unique_name();
1332        let participant = crate::Participant::new(&domain).unwrap();
1333        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1334
1335        let writer_01 = Writer::new(&topic).unwrap();
1336        let writer_02 = Writer::new(&topic).unwrap();
1337
1338        let sample = crate::tests::topic::Data::default();
1339
1340        writer_01.write(&sample).unwrap();
1341        writer_01.write(&sample).unwrap();
1342        writer_01.write(&sample).unwrap();
1343        writer_01.write(&sample).unwrap();
1344        writer_01.flush().unwrap();
1345
1346        writer_02.write(&sample).unwrap();
1347        writer_02.write(&sample).unwrap();
1348        writer_02.write(&sample).unwrap();
1349        writer_02.write(&sample).unwrap();
1350        writer_02.flush().unwrap();
1351    }
1352
1353    #[test]
1354    fn test_writer_wait_for_acks() {
1355        let domain_id = crate::tests::domain::unique_id();
1356        let domain = crate::Domain::new(domain_id).unwrap();
1357        let topic_name = crate::tests::topic::unique_name();
1358        let participant = crate::Participant::new(&domain).unwrap();
1359        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1360
1361        let writer = Writer::new(&topic).unwrap();
1362        let _reader = crate::Reader::builder(&topic)
1363            .with_qos(&crate::QoS::new().with_reliability(
1364                crate::qos::policy::Reliability::Reliable {
1365                    max_blocking_time: crate::Duration::INFINITE,
1366                },
1367            ))
1368            .build()
1369            .unwrap();
1370
1371        writer.write(&crate::tests::topic::Data::default()).unwrap();
1372        writer
1373            .wait_for_acks(crate::Duration::from_nanos(100))
1374            .unwrap();
1375    }
1376
1377    #[test]
1378    fn test_writer_matched_subscriptions() {
1379        use crate::entity::Entity;
1380
1381        let domain_id = crate::tests::domain::unique_id();
1382        let domain = crate::Domain::new(domain_id).unwrap();
1383        let topic_name = crate::tests::topic::unique_name();
1384        let participant = crate::Participant::new(&domain).unwrap();
1385        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1386
1387        let writer = Writer::new(&topic).unwrap();
1388        let matched = writer.matched_subscriptions().unwrap();
1389        assert!(matched.is_empty(), "{matched:#?}");
1390        let reader = crate::Reader::new(&topic).unwrap();
1391        let matched = writer.matched_subscriptions().unwrap();
1392        assert_eq!(matched, vec![reader.instance_handle().unwrap()]);
1393    }
1394
1395    #[test]
1396    fn test_writer_lookup_instance() {
1397        let domain_id = crate::tests::domain::unique_id();
1398        let domain = crate::Domain::new(domain_id).unwrap();
1399        let topic_name = crate::tests::topic::unique_name();
1400        let participant = crate::Participant::new(&domain).unwrap();
1401        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1402
1403        let writer = Writer::new(&topic).unwrap();
1404
1405        let sample = crate::tests::topic::Data::default();
1406        let key = sample.as_key();
1407
1408        let result = writer.lookup_instance(&key);
1409        assert_eq!(result, None);
1410
1411        let registered_handle = writer.register_instance(&key).unwrap();
1412        let result = writer.lookup_instance(&key);
1413        assert_eq!(result, Some(registered_handle));
1414    }
1415
1416    #[test]
1417    fn test_writer_unregister() {
1418        use crate::entity::Entity;
1419        use crate::state;
1420
1421        let domain_id = crate::tests::domain::unique_id();
1422        let domain = crate::Domain::new(domain_id).unwrap();
1423        let topic_name = crate::tests::topic::unique_name();
1424        let participant = crate::Participant::new(&domain).unwrap();
1425        let qos = crate::QoS::new()
1426            .with_destination_order(crate::qos::policy::DestinationOrder::BySourceTimestamp);
1427        let topic = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
1428            .with_qos(&qos)
1429            .build()
1430            .unwrap();
1431
1432        let qos = qos
1433            .with_reliability(crate::qos::policy::Reliability::Reliable {
1434                max_blocking_time: std::time::Duration::from_millis(100).try_into().unwrap(),
1435            })
1436            .with_resource_limits(crate::qos::policy::ResourceLimits {
1437                max_samples: crate::qos::policy::ResourceLimit::Unlimited,
1438                max_instances: crate::qos::policy::ResourceLimit::Limited(3),
1439                max_samples_per_instance: crate::qos::policy::ResourceLimit::Limited(1),
1440            });
1441
1442        let reader = crate::Reader::builder(&topic)
1443            .with_qos(&qos)
1444            .build()
1445            .unwrap();
1446
1447        let qos = qos.with_writer_data_lifecycle(crate::qos::policy::WriterDataLifecycle {
1448            autodispose_unregistered_instances: false,
1449        });
1450        let writer = Writer::builder(&topic).with_qos(&qos).build().unwrap();
1451
1452        // Sync writer to reader.
1453        let mut waitset = crate::WaitSet::new(&participant).unwrap();
1454        writer
1455            .set_status_mask(crate::Status::PublicationMatched)
1456            .unwrap();
1457        waitset.attach(&writer, Some(&writer)).unwrap();
1458        let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1459        assert_eq!(result[0], &writer);
1460        waitset.detach(&writer).unwrap();
1461
1462        // Sync reader to writer.
1463        let mut waitset = crate::WaitSet::new(&participant).unwrap();
1464        reader
1465            .set_status_mask(crate::Status::SubscriptionMatched)
1466            .unwrap();
1467        waitset.attach(&reader, Some(&reader)).unwrap();
1468        let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1469        assert_eq!(result[0], &reader);
1470        waitset.detach(&reader).unwrap();
1471
1472        for i in 0..3 {
1473            let sample = crate::tests::topic::Data {
1474                x: i,
1475                y: i.cast_signed() + 1,
1476                ..crate::tests::topic::Data::default()
1477            };
1478            writer.write(&sample).unwrap();
1479        }
1480
1481        let key_01 = crate::tests::topic::Data {
1482            x: 0,
1483            y: 1,
1484            ..crate::tests::topic::Data::default()
1485        }
1486        .as_key();
1487        let handle = writer.lookup_instance(&key_01).unwrap();
1488        writer.unregister_instance_by_handle(handle).unwrap();
1489
1490        let key_02 = crate::tests::topic::Data {
1491            x: 1,
1492            y: 2,
1493            ..crate::tests::topic::Data::default()
1494        }
1495        .as_key();
1496        writer.unregister_instance(&key_02).unwrap();
1497        let samples = reader.read().unwrap();
1498        assert_eq!(samples.len(), 3);
1499
1500        for sample in samples {
1501            let key = sample.as_key();
1502
1503            if key == key_01 || key == key_02 {
1504                assert_eq!(*sample, crate::tests::topic::Data::from_key(&key));
1505                assert!(sample.is_sample());
1506                assert_eq!(
1507                    sample.info().state,
1508                    state::sample::Fresh | state::view::New | state::instance::Unregistered
1509                );
1510            } else {
1511                assert_eq!(
1512                    *sample,
1513                    crate::tests::topic::Data {
1514                        x: 2,
1515                        y: 3,
1516                        ..crate::tests::topic::Data::default()
1517                    }
1518                );
1519                assert!(sample.is_sample());
1520                assert_eq!(
1521                    sample.info().state,
1522                    state::sample::Fresh | state::view::New | state::instance::Alive
1523                );
1524            }
1525        }
1526    }
1527
1528    #[test]
1529    fn test_writer_unregister_with_timestamp() {
1530        use crate::entity::Entity;
1531        use crate::state;
1532
1533        let domain_id = crate::tests::domain::unique_id();
1534        let domain = crate::Domain::new(domain_id).unwrap();
1535        let topic_name = crate::tests::topic::unique_name();
1536        let participant = crate::Participant::new(&domain).unwrap();
1537        let qos = crate::QoS::new()
1538            .with_destination_order(crate::qos::policy::DestinationOrder::BySourceTimestamp);
1539        let topic = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
1540            .with_qos(&qos)
1541            .build()
1542            .unwrap();
1543
1544        let qos = qos
1545            .with_reliability(crate::qos::policy::Reliability::Reliable {
1546                max_blocking_time: std::time::Duration::from_millis(100).try_into().unwrap(),
1547            })
1548            .with_resource_limits(crate::qos::policy::ResourceLimits {
1549                max_samples: crate::qos::policy::ResourceLimit::Unlimited,
1550                max_instances: crate::qos::policy::ResourceLimit::Limited(3),
1551                max_samples_per_instance: crate::qos::policy::ResourceLimit::Limited(1),
1552            });
1553
1554        let reader = crate::Reader::builder(&topic)
1555            .with_qos(&qos)
1556            .build()
1557            .unwrap();
1558
1559        let qos = qos.with_writer_data_lifecycle(crate::qos::policy::WriterDataLifecycle {
1560            autodispose_unregistered_instances: false,
1561        });
1562        let writer = Writer::builder(&topic).with_qos(&qos).build().unwrap();
1563
1564        // Sync writer to reader.
1565        let mut waitset = crate::WaitSet::new(&participant).unwrap();
1566        writer
1567            .set_status_mask(crate::Status::PublicationMatched)
1568            .unwrap();
1569        waitset.attach(&writer, Some(&writer)).unwrap();
1570        let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1571        assert_eq!(result[0], &writer);
1572        waitset.detach(&writer).unwrap();
1573
1574        // Sync reader to writer.
1575        let mut waitset = crate::WaitSet::new(&participant).unwrap();
1576        reader
1577            .set_status_mask(crate::Status::SubscriptionMatched)
1578            .unwrap();
1579        waitset.attach(&reader, Some(&reader)).unwrap();
1580        let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1581        assert_eq!(result[0], &reader);
1582        waitset.detach(&reader).unwrap();
1583
1584        for i in 0..3 {
1585            let sample = crate::tests::topic::Data {
1586                x: i,
1587                y: i.cast_signed() + 1,
1588                ..Default::default()
1589            };
1590            writer.write(&sample).unwrap();
1591        }
1592
1593        let time = std::time::SystemTime::now().try_into().unwrap();
1594
1595        let key_01 = crate::tests::topic::Data {
1596            x: 0,
1597            y: 1,
1598            ..Default::default()
1599        }
1600        .as_key();
1601        let handle = writer.lookup_instance(&key_01).unwrap();
1602        writer
1603            .unregister_instance_by_handle_with_timestamp(handle, time)
1604            .unwrap();
1605
1606        let key_02 = crate::tests::topic::Data {
1607            x: 1,
1608            y: 2,
1609            ..Default::default()
1610        }
1611        .as_key();
1612        writer
1613            .unregister_instance_with_timestamp(&key_02, time)
1614            .unwrap();
1615        let samples = reader.read().unwrap();
1616        assert_eq!(samples.len(), 3);
1617
1618        for sample in samples {
1619            let key = sample.as_key();
1620
1621            if key == key_01 || key == key_02 {
1622                assert_eq!(*sample, crate::tests::topic::Data::from_key(&key));
1623                assert!(sample.is_sample());
1624                assert_eq!(
1625                    sample.info().state,
1626                    state::sample::Fresh | state::view::New | state::instance::Unregistered
1627                );
1628            } else {
1629                assert_eq!(
1630                    *sample,
1631                    crate::tests::topic::Data {
1632                        x: 2,
1633                        y: 3,
1634                        ..Default::default()
1635                    }
1636                );
1637                assert!(sample.is_sample());
1638                assert_eq!(
1639                    sample.info().state,
1640                    state::sample::Fresh | state::view::New | state::instance::Alive
1641                );
1642            }
1643        }
1644    }
1645
1646    #[test]
1647    fn test_writer_write_dispose() {
1648        use crate::entity::Entity;
1649        use crate::state;
1650
1651        let domain_id = crate::tests::domain::unique_id();
1652        let domain = crate::Domain::new(domain_id).unwrap();
1653        let topic_name = crate::tests::topic::unique_name();
1654        let participant = crate::Participant::new(&domain).unwrap();
1655        let qos = crate::QoS::new()
1656            .with_destination_order(crate::qos::policy::DestinationOrder::BySourceTimestamp);
1657        let topic = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
1658            .with_qos(&qos)
1659            .build()
1660            .unwrap();
1661
1662        let qos = qos
1663            .with_reliability(crate::qos::policy::Reliability::Reliable {
1664                max_blocking_time: std::time::Duration::from_millis(100).try_into().unwrap(),
1665            })
1666            .with_resource_limits(crate::qos::policy::ResourceLimits {
1667                max_samples: crate::qos::policy::ResourceLimit::Unlimited,
1668                max_instances: crate::qos::policy::ResourceLimit::Limited(4),
1669                max_samples_per_instance: crate::qos::policy::ResourceLimit::Limited(1),
1670            });
1671
1672        let reader = crate::Reader::builder(&topic)
1673            .with_qos(&qos)
1674            .build()
1675            .unwrap();
1676
1677        let qos = qos.with_writer_data_lifecycle(crate::qos::policy::WriterDataLifecycle {
1678            autodispose_unregistered_instances: false,
1679        });
1680        let writer = Writer::builder(&topic).with_qos(&qos).build().unwrap();
1681
1682        // Sync writer to reader.
1683        let mut waitset = crate::WaitSet::new(&participant).unwrap();
1684        writer
1685            .set_status_mask(crate::Status::PublicationMatched)
1686            .unwrap();
1687        waitset.attach(&writer, Some(&writer)).unwrap();
1688        let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1689        assert_eq!(result[0], &writer);
1690        waitset.detach(&writer).unwrap();
1691
1692        // Sync reader to writer.
1693        let mut waitset = crate::WaitSet::new(&participant).unwrap();
1694        reader
1695            .set_status_mask(crate::Status::SubscriptionMatched)
1696            .unwrap();
1697        waitset.attach(&reader, Some(&reader)).unwrap();
1698        let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1699        assert_eq!(result[0], &reader);
1700        waitset.detach(&reader).unwrap();
1701
1702        let time = std::time::SystemTime::now().try_into().unwrap();
1703        for i in 0..4 {
1704            let sample = crate::tests::topic::Data {
1705                x: i,
1706                y: i.cast_signed() + 1,
1707                ..Default::default()
1708            };
1709            if sample.x.is_multiple_of(2) {
1710                if sample.x < 2 {
1711                    writer.write(&sample).unwrap();
1712                } else {
1713                    writer.write_with_timestamp(&sample, time).unwrap();
1714                }
1715            } else if sample.x < 2 {
1716                writer.write_dispose(&sample).unwrap();
1717            } else {
1718                writer.write_dispose_with_timestamp(&sample, time).unwrap();
1719            }
1720        }
1721
1722        let samples = reader.read().unwrap();
1723        assert_eq!(samples.len(), 4);
1724
1725        for sample in samples {
1726            assert_eq!(
1727                *sample,
1728                crate::tests::topic::Data {
1729                    x: sample.x,
1730                    y: sample.x.cast_signed() + 1,
1731                    ..Default::default()
1732                }
1733            );
1734            if sample.x % 2 == 0 {
1735                assert!(sample.is_sample());
1736                assert_eq!(
1737                    sample.info().state,
1738                    state::sample::Fresh | state::view::New | state::instance::Alive
1739                );
1740            } else {
1741                assert!(sample.is_sample());
1742                assert_eq!(
1743                    sample.info().state,
1744                    state::sample::Fresh | state::view::New | state::instance::Disposed
1745                );
1746            }
1747        }
1748    }
1749
1750    #[test]
1751    fn test_writer_write_and_then_dispose() {
1752        use crate::entity::Entity;
1753        use crate::state;
1754
1755        let domain_id = crate::tests::domain::unique_id();
1756        let domain = crate::Domain::new(domain_id).unwrap();
1757        let topic_name = crate::tests::topic::unique_name();
1758        let participant = crate::Participant::new(&domain).unwrap();
1759        let qos = crate::QoS::new()
1760            .with_destination_order(crate::qos::policy::DestinationOrder::BySourceTimestamp);
1761        let topic = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
1762            .with_qos(&qos)
1763            .build()
1764            .unwrap();
1765
1766        let qos = qos
1767            .with_reliability(crate::qos::policy::Reliability::Reliable {
1768                max_blocking_time: std::time::Duration::from_millis(100).try_into().unwrap(),
1769            })
1770            .with_resource_limits(crate::qos::policy::ResourceLimits {
1771                max_samples: crate::qos::policy::ResourceLimit::Unlimited,
1772                max_instances: crate::qos::policy::ResourceLimit::Limited(4),
1773                max_samples_per_instance: crate::qos::policy::ResourceLimit::Limited(1),
1774            });
1775
1776        let reader = crate::Reader::builder(&topic)
1777            .with_qos(&qos)
1778            .build()
1779            .unwrap();
1780
1781        let qos = qos.with_writer_data_lifecycle(crate::qos::policy::WriterDataLifecycle {
1782            autodispose_unregistered_instances: false,
1783        });
1784        let writer = Writer::builder(&topic).with_qos(&qos).build().unwrap();
1785
1786        // Sync writer to reader.
1787        let mut waitset = crate::WaitSet::new(&participant).unwrap();
1788        writer
1789            .set_status_mask(crate::Status::PublicationMatched)
1790            .unwrap();
1791        waitset.attach(&writer, Some(&writer)).unwrap();
1792        let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1793        assert_eq!(result[0], &writer);
1794        waitset.detach(&writer).unwrap();
1795
1796        // Sync reader to writer.
1797        let mut waitset = crate::WaitSet::new(&participant).unwrap();
1798        reader
1799            .set_status_mask(crate::Status::SubscriptionMatched)
1800            .unwrap();
1801        waitset.attach(&reader, Some(&reader)).unwrap();
1802        let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1803        assert_eq!(result[0], &reader);
1804        waitset.detach(&reader).unwrap();
1805
1806        let time = std::time::SystemTime::now().try_into().unwrap();
1807        for i in 0..4 {
1808            let sample = crate::tests::topic::Data {
1809                x: i,
1810                y: i.cast_signed() + 1,
1811                ..Default::default()
1812            };
1813            if sample.x.is_multiple_of(2) {
1814                if sample.x < 2 {
1815                    writer.write(&sample).unwrap();
1816                } else {
1817                    writer.write_with_timestamp(&sample, time).unwrap();
1818                }
1819            } else {
1820                let key = sample.as_key();
1821                if sample.x < 2 {
1822                    writer.write(&sample).unwrap();
1823                    writer.dispose(&key).unwrap();
1824                } else {
1825                    writer.write_with_timestamp(&sample, time).unwrap();
1826                    writer.dispose_with_timestamp(&key, time).unwrap();
1827                }
1828            }
1829        }
1830
1831        let samples = reader.read().unwrap();
1832        assert_eq!(samples.len(), 4);
1833
1834        for sample in samples {
1835            assert_eq!(
1836                *sample,
1837                crate::tests::topic::Data {
1838                    x: sample.x,
1839                    y: sample.x.cast_signed() + 1,
1840                    ..Default::default()
1841                }
1842            );
1843            if sample.x.is_multiple_of(2) {
1844                assert!(sample.is_sample());
1845                assert_eq!(
1846                    sample.info().state,
1847                    state::sample::Fresh | state::view::New | state::instance::Alive
1848                );
1849            } else {
1850                assert!(sample.is_sample());
1851                assert_eq!(
1852                    sample.info().state,
1853                    state::sample::Fresh | state::view::New | state::instance::Disposed
1854                );
1855            }
1856        }
1857    }
1858
1859    #[test]
1860    fn test_writer_write_and_then_dispose_by_instance_handle() {
1861        use crate::entity::Entity;
1862        use crate::state;
1863
1864        let domain_id = crate::tests::domain::unique_id();
1865        let domain = crate::Domain::new(domain_id).unwrap();
1866        let topic_name = crate::tests::topic::unique_name();
1867        let participant = crate::Participant::new(&domain).unwrap();
1868        let qos = crate::QoS::new()
1869            .with_destination_order(crate::qos::policy::DestinationOrder::BySourceTimestamp);
1870        let topic = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
1871            .with_qos(&qos)
1872            .build()
1873            .unwrap();
1874
1875        let qos = qos
1876            .with_reliability(crate::qos::policy::Reliability::Reliable {
1877                max_blocking_time: std::time::Duration::from_millis(100).try_into().unwrap(),
1878            })
1879            .with_resource_limits(crate::qos::policy::ResourceLimits {
1880                max_samples: crate::qos::policy::ResourceLimit::Unlimited,
1881                max_instances: crate::qos::policy::ResourceLimit::Limited(4),
1882                max_samples_per_instance: crate::qos::policy::ResourceLimit::Limited(1),
1883            });
1884
1885        let reader = crate::Reader::builder(&topic)
1886            .with_qos(&qos)
1887            .build()
1888            .unwrap();
1889
1890        let qos = qos.with_writer_data_lifecycle(crate::qos::policy::WriterDataLifecycle {
1891            autodispose_unregistered_instances: false,
1892        });
1893        let writer = Writer::builder(&topic).with_qos(&qos).build().unwrap();
1894
1895        // Sync writer to reader.
1896        let mut waitset = crate::WaitSet::new(&participant).unwrap();
1897        writer
1898            .set_status_mask(crate::Status::PublicationMatched)
1899            .unwrap();
1900        waitset.attach(&writer, Some(&writer)).unwrap();
1901        let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1902        assert_eq!(result[0], &writer);
1903        waitset.detach(&writer).unwrap();
1904
1905        // Sync reader to writer.
1906        let mut waitset = crate::WaitSet::new(&participant).unwrap();
1907        reader
1908            .set_status_mask(crate::Status::SubscriptionMatched)
1909            .unwrap();
1910        waitset.attach(&reader, Some(&reader)).unwrap();
1911        let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1912        assert_eq!(result[0], &reader);
1913        waitset.detach(&reader).unwrap();
1914
1915        let time = std::time::SystemTime::now().try_into().unwrap();
1916        for i in 0..4 {
1917            let sample = crate::tests::topic::Data {
1918                x: i,
1919                y: i.cast_signed() + 1,
1920                ..Default::default()
1921            };
1922            if sample.x.is_multiple_of(2) {
1923                if sample.x < 2 {
1924                    writer.write(&sample).unwrap();
1925                } else {
1926                    writer.write_with_timestamp(&sample, time).unwrap();
1927                }
1928            } else {
1929                let key = sample.as_key();
1930                if sample.x < 2 {
1931                    writer.write(&sample).unwrap();
1932                    let instance_handle = writer.lookup_instance(&key).unwrap();
1933                    writer.dispose_instance_by_handle(instance_handle).unwrap();
1934                } else {
1935                    writer.write_with_timestamp(&sample, time).unwrap();
1936                    let instance_handle = writer.lookup_instance(&key).unwrap();
1937                    writer
1938                        .dispose_instance_by_handle_with_timestamp(instance_handle, time)
1939                        .unwrap();
1940                }
1941            }
1942        }
1943
1944        let samples = reader.read().unwrap();
1945        assert_eq!(samples.len(), 4);
1946
1947        for sample in samples {
1948            assert_eq!(
1949                *sample,
1950                crate::tests::topic::Data {
1951                    x: sample.x,
1952                    y: sample.x.cast_signed() + 1,
1953                    ..Default::default()
1954                }
1955            );
1956            if sample.x % 2 == 0 {
1957                assert!(sample.is_sample());
1958                assert_eq!(
1959                    sample.info().state,
1960                    state::sample::Fresh | state::view::New | state::instance::Alive
1961                );
1962            } else {
1963                assert!(sample.is_sample());
1964                assert_eq!(
1965                    sample.info().state,
1966                    state::sample::Fresh | state::view::New | state::instance::Disposed
1967                );
1968            }
1969        }
1970    }
1971
1972    #[test]
1973    fn test_writer_with_listener() {
1974        let domain_id = crate::tests::domain::unique_id();
1975        let domain = crate::Domain::new(domain_id).unwrap();
1976        let topic_name = crate::tests::topic::unique_name();
1977        let participant = crate::Participant::new(&domain).unwrap();
1978        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1979
1980        let listener = crate::WriterListener::new()
1981            .with_liveliness_lost(|_, _| ())
1982            .with_offered_deadline_missed(|_, _| ())
1983            .with_offered_incompatible_qos(|_, _| ())
1984            .with_publication_matched(|_, _| ());
1985
1986        let _ = Writer::new(&topic)
1987            .unwrap()
1988            .with_listener(&listener)
1989            .unwrap();
1990
1991        let mut writer = Writer::new(&topic).unwrap();
1992        writer.set_listener(&listener).unwrap();
1993        writer.unset_listener().unwrap();
1994    }
1995
1996    #[test]
1997    fn test_writer_with_listener_on_invalid_writer() {
1998        let domain_id = crate::tests::domain::unique_id();
1999        let domain = crate::Domain::new(domain_id).unwrap();
2000        let topic_name = crate::tests::topic::unique_name();
2001        let participant = crate::Participant::new(&domain).unwrap();
2002        let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
2003
2004        let listener = crate::WriterListener::new();
2005
2006        let mut writer = Writer::new(&topic).unwrap();
2007        let writer_id = writer.inner;
2008        writer.inner = 0;
2009        let result = writer.set_listener(&listener).unwrap_err();
2010        assert_eq!(result, crate::Error::BadParameter);
2011        let result = writer.unset_listener().unwrap_err();
2012        assert_eq!(result, crate::Error::BadParameter);
2013        writer.inner = writer_id;
2014    }
2015}