cyclonedds/writer.rs
1use crate::internal::ffi;
2use crate::internal::traits::AsFfi;
3use crate::{Publisher, Result, Topic};
4
5/// A data writer for topic type [`T`](crate::Topicable).
6///
7/// A `Writer` publishes samples of type `T` to a named [`Topic`](crate::Topic).
8/// Matched [`Readers`](crate::Reader) on the same topic receive the samples
9/// subject to their [`QoS`](crate::QoS) compatibility.
10///
11/// Use [`Writer::new`] for simple construction or [`Writer::builder`] for
12/// [`QoS`](crate::QoS), [`listener`](crate::listener::WriterListener), and
13/// [`publisher`](Publisher) configuration.
14///
15/// # Instance lifecycle
16///
17/// For keyed topics, each unique key value identifies a distinct instance.
18/// Writers can explicitly manage instance lifecycle through
19/// [`register_instance`](Writer::register_instance),
20/// [`unregister_instance`](Writer::unregister_instance), and
21/// [`dispose`](Writer::dispose). Unkeyed topics (where
22/// [`T::Key`](crate::Topicable::Key) is [`()`](primitive@unit)) have
23/// a single instance shared by all samples.
24#[derive(Debug, PartialEq, Eq)]
25pub struct Writer<'domain, 'participant, 'topic, T>
26where
27 T: crate::Topicable,
28{
29 pub(crate) inner: cyclonedds_sys::dds_entity_t,
30 phantom_topic: std::marker::PhantomData<&'topic Topic<'domain, 'participant, T>>,
31}
32
33/// Builder for [`Writer<T>`] (accessible via [`Writer::builder`]).
34#[derive(Debug)]
35pub struct WriterBuilder<'domain, 'participant, 'topic, 'qos, T>
36where
37 T: crate::Topicable,
38{
39 publisher: Option<&'participant Publisher<'domain, 'participant>>,
40 topic: &'topic Topic<'domain, 'participant, T>,
41 qos: Option<&'qos crate::QoS>,
42 listener: Option<crate::WriterListener<T>>,
43}
44
45impl<'d, 'p, 't, 'q, T> WriterBuilder<'d, 'p, 't, 'q, T>
46where
47 T: crate::Topicable,
48{
49 /// Creates a new [`WriterBuilder`] for the given [`Topic`].
50 ///
51 /// # Examples
52 ///
53 /// ```
54 /// use cyclonedds::builder::WriterBuilder;
55 /// use cyclonedds::{Domain, Participant, Topic};
56 /// # #[derive(
57 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
58 /// # )]
59 /// # struct Data {
60 /// # x: i32,
61 /// # }
62 ///
63 /// let domain = Domain::default();
64 /// let participant = Participant::new(&domain)?;
65 /// let topic = Topic::new(&participant, "MyTopic")?;
66 /// let writer_builder = WriterBuilder::<Data>::new(&topic);
67 /// # Ok::<_, cyclonedds::Error>(())
68 /// ```
69 #[must_use]
70 pub const fn new(topic: &'t Topic<'d, 'p, T>) -> Self {
71 Self {
72 publisher: None,
73 topic,
74 qos: None,
75 listener: None,
76 }
77 }
78
79 /// Sets the [`QoS`](crate::QoS) for this writer builder.
80 ///
81 /// # Examples
82 ///
83 /// ```
84 /// use cyclonedds::builder::WriterBuilder;
85 /// use cyclonedds::qos::policy;
86 /// use cyclonedds::{Duration, QoS};
87 /// # use cyclonedds::{Domain, Participant, Topic};
88 /// # let domain = Domain::default();
89 /// # let participant = Participant::new(&domain)?;
90 /// # #[derive(
91 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
92 /// # )]
93 /// # struct Data {
94 /// # x: i32,
95 /// # }
96 /// # let topic = Topic::new(&participant, "MyTopic")?;
97 ///
98 /// let qos = QoS::new().with_reliability(policy::Reliability::Reliable {
99 /// max_blocking_time: Duration::from_millis(100),
100 /// });
101 /// let writer_builder = WriterBuilder::<Data>::new(&topic).with_qos(&qos);
102 /// # Ok::<_, cyclonedds::Error>(())
103 /// ```
104 #[must_use]
105 pub const fn with_qos(mut self, qos: &'q crate::QoS) -> Self {
106 self.qos = Some(qos);
107 self
108 }
109
110 /// Sets the [`Publisher`](crate::Publisher) on this writer builder.
111 ///
112 /// # Examples
113 ///
114 /// ```
115 /// use cyclonedds::Publisher;
116 /// use cyclonedds::WriterListener;
117 /// use cyclonedds::builder::WriterBuilder;
118 /// # use cyclonedds::{Domain, Participant, Topic};
119 /// # let domain = Domain::default();
120 /// # let participant = Participant::new(&domain)?;
121 /// # #[derive(
122 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
123 /// # )]
124 /// # struct Data {
125 /// # x: i32,
126 /// # }
127 /// # let topic = Topic::new(&participant, "MyTopic")?;
128 ///
129 /// let publisher = Publisher::new(&participant)?;
130 ///
131 /// let writer_builder = WriterBuilder::<Data>::new(&topic).with_publisher(&publisher);
132 /// # Ok::<_, cyclonedds::Error>(())
133 /// ```
134 #[must_use]
135 pub const fn with_publisher(mut self, publisher: &'p Publisher<'d, 'p>) -> Self {
136 self.publisher = Some(publisher);
137 self
138 }
139
140 /// Sets the [`Listener`](crate::Listener) on this writer builder.
141 ///
142 /// # Examples
143 ///
144 /// ```
145 /// use cyclonedds::WriterListener;
146 /// use cyclonedds::builder::WriterBuilder;
147 /// # use cyclonedds::{Domain, Participant, Topic};
148 /// # let domain = Domain::default();
149 /// # let participant = Participant::new(&domain)?;
150 /// # #[derive(
151 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
152 /// # )]
153 /// # struct Data {
154 /// # x: i32,
155 /// # }
156 /// # let topic = Topic::new(&participant, "MyTopic")?;
157 ///
158 /// let writer_builder = WriterBuilder::<Data>::new(&topic).with_listener(WriterListener::new());
159 /// # Ok::<_, cyclonedds::Error>(())
160 /// ```
161 #[must_use]
162 pub fn with_listener<L>(mut self, listener: L) -> Self
163 where
164 L: AsRef<crate::WriterListener<T>>,
165 {
166 self.listener = Some(listener.as_ref().clone());
167 self
168 }
169
170 /// Builds the [`Writer`].
171 ///
172 /// # Errors
173 ///
174 /// Returns an [`Error`](crate::Error) if the writer failed to create.
175 ///
176 /// # Examples
177 ///
178 /// ```
179 /// use cyclonedds::QoS;
180 /// use cyclonedds::builder::WriterBuilder;
181 /// use cyclonedds::qos::policy;
182 /// # use cyclonedds::{Domain, Participant, Topic};
183 /// # let domain = Domain::default();
184 /// # let participant = Participant::new(&domain)?;
185 /// # #[derive(
186 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
187 /// # )]
188 /// # struct Data {
189 /// # x: i32,
190 /// # }
191 /// # let topic = Topic::new(&participant, "MyTopic")?;
192 ///
193 /// let qos = QoS::new().with_durability(policy::Durability::TransientLocal);
194 /// let writer = WriterBuilder::<Data>::new(&topic).with_qos(&qos).build()?;
195 /// # Ok::<_, cyclonedds::Error>(())
196 /// ```
197 pub fn build(self) -> Result<Writer<'d, 'p, 't, T>> {
198 // NOTE: using `and_then` to avoid ? branch on the listener for coverage
199 // since the C lib currently panics on OOM rather than returning null.
200 self.listener
201 .map(|listener| listener.as_ffi())
202 .transpose()
203 .and_then(|listener| {
204 Ok(Writer {
205 inner: ffi::dds_create_writer(
206 self.publisher
207 .map_or(ffi::dds_get_participant(self.topic.inner)?, |publisher| {
208 publisher.inner
209 }),
210 self.topic.inner,
211 self.qos.map(|qos| &qos.inner),
212 listener.as_ref(),
213 )?,
214 phantom_topic: std::marker::PhantomData,
215 })
216 })
217 }
218}
219
220impl<'d, 'p, 't, T> Writer<'d, 'p, 't, T>
221where
222 T: crate::Topicable,
223{
224 /// Creates a new `Writer` for the given [`Topic`](crate::Topic) with
225 /// default [`QoS`](crate::QoS) and no
226 /// [`listener`](crate::listener::WriterListener).
227 ///
228 /// # Errors
229 ///
230 /// Returns an [`Error`](crate::Error) if the writer fails to create.
231 ///
232 /// # Examples
233 ///
234 /// ```
235 /// use cyclonedds::Writer;
236 /// # use cyclonedds::{Domain, Participant, Topic};
237 /// # let domain = Domain::default();
238 /// # let participant = Participant::new(&domain)?;
239 /// # #[derive(
240 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
241 /// # )]
242 /// # struct Data {
243 /// # #[dds(key)]
244 /// # x: i32,
245 /// # #[dds(key)]
246 /// # y: i32,
247 /// # }
248 ///
249 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
250 /// let writer = Writer::new(&topic)?;
251 /// # Ok::<_, cyclonedds::Error>(())
252 /// ```
253 pub fn new(topic: &'t Topic<'d, 'p, T>) -> Result<Self> {
254 Self::builder(topic).build()
255 }
256
257 /// Returns a [`WriterBuilder`](crate::builder::WriterBuilder) for
258 /// constructing a writer with custom [`QoS`](crate::QoS) or a
259 /// [`listener`](crate::listener::WriterListener).
260 ///
261 /// # Examples
262 ///
263 /// ```
264 /// use cyclonedds::{Duration, QoS, Writer, qos::policy::Reliability};
265 /// # use cyclonedds::{Domain, Participant, Topic};
266 /// # let domain = Domain::default();
267 /// # let participant = Participant::new(&domain)?;
268 /// # #[derive(
269 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
270 /// # )]
271 /// # struct Data {
272 /// # #[dds(key)]
273 /// # x: i32,
274 /// # #[dds(key)]
275 /// # y: i32,
276 /// # }
277 ///
278 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
279 /// let qos = QoS::new().with_reliability(Reliability::Reliable {
280 /// max_blocking_time: Duration::from_millis(100),
281 /// });
282 /// let writer = Writer::builder(&topic).with_qos(&qos).build()?;
283 /// # Ok::<_, cyclonedds::Error>(())
284 /// ```
285 #[must_use]
286 pub const fn builder<'q>(topic: &'t Topic<'d, 'p, T>) -> WriterBuilder<'d, 'p, 't, 'q, T> {
287 WriterBuilder::new(topic)
288 }
289
290 /// Writes a sample to the topic.
291 ///
292 /// # Errors
293 ///
294 /// Returns an [`Error`](crate::Error) if the writers fails to write the
295 /// sample.
296 ///
297 /// # Examples
298 ///
299 /// ```
300 /// # use cyclonedds::{Domain, Participant, Topic, Writer};
301 /// # let domain = Domain::default();
302 /// # let participant = Participant::new(&domain)?;
303 /// # #[derive(
304 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
305 /// # )]
306 /// # struct Data {
307 /// # #[dds(key)]
308 /// # x: i32,
309 /// # #[dds(key)]
310 /// # y: i32,
311 /// # }
312 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
313 /// let writer = Writer::new(&topic)?;
314 /// writer.write(&Data { x: 1, y: 2 })?;
315 /// # Ok::<_, cyclonedds::Error>(())
316 /// ```
317 pub fn write(&self, sample: &T) -> Result<()> {
318 ffi::dds_write(self.inner, sample)
319 }
320
321 /// Writes a sample with an explicit source timestamp.
322 ///
323 /// Use this when the write timestamp should reflect the time the data was
324 /// generated rather than the time it was written.
325 ///
326 /// # Errors
327 ///
328 /// Returns an [`Error`](crate::Error) if the writer fails to write the
329 /// sample.
330 ///
331 /// # Examples
332 ///
333 /// ```
334 /// use cyclonedds::Time;
335 /// # use cyclonedds::{Domain, Participant, Topic, Writer};
336 /// # let domain = Domain::default();
337 /// # let participant = Participant::new(&domain)?;
338 /// # #[derive(
339 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
340 /// # )]
341 /// # struct Data {
342 /// # #[dds(key)]
343 /// # x: i32,
344 /// # #[dds(key)]
345 /// # y: i32,
346 /// # }
347 ///
348 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
349 /// let writer = Writer::new(&topic)?;
350 /// writer.write_with_timestamp(&Data::default(), Time::from_secs(1))?;
351 /// # Ok::<_, cyclonedds::Error>(())
352 /// ```
353 pub fn write_with_timestamp(&self, sample: &T, timestamp: crate::Time) -> Result<()> {
354 ffi::dds_write_with_timestamp(self.inner, sample, timestamp.inner)
355 }
356
357 /// Flushes batched samples to the network.
358 ///
359 /// Only relevant when write batching is enabled in the domain
360 /// configuration. Has no effect otherwise.
361 ///
362 /// # Errors
363 ///
364 /// Returns an [`Error`](crate::Error) if the writer fails to flush.
365 ///
366 /// # Examples
367 ///
368 /// ```
369 /// # use cyclonedds::{Domain, Participant, Topic, Writer};
370 /// # let domain = Domain::default();
371 /// # let participant = Participant::new(&domain)?;
372 /// # #[derive(
373 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
374 /// # )]
375 /// # struct Data {
376 /// # #[dds(key)]
377 /// # x: i32,
378 /// # #[dds(key)]
379 /// # y: i32,
380 /// # }
381 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
382 /// let writer = Writer::new(&topic)?;
383 /// writer.write(&Data::default())?;
384 /// writer.flush()?;
385 /// # Ok::<_, cyclonedds::Error>(())
386 /// ```
387 pub fn flush(&self) -> Result<()> {
388 ffi::dds_write_flush(self.inner)
389 }
390
391 /// Blocks until all written samples have been acknowledged by all matched
392 /// reliable readers, or until `timeout` elapses.
393 ///
394 /// # Errors
395 ///
396 /// Returns an [`Error`](crate::Error) if the timeout elapses before all
397 /// acknowledgements are received or if the writer encounters an unexpected
398 /// error.
399 ///
400 /// # Examples
401 ///
402 /// ```
403 /// use cyclonedds::Duration;
404 /// # use cyclonedds::{Domain, Participant, Topic, Writer};
405 /// # let domain = Domain::default();
406 /// # let participant = Participant::new(&domain)?;
407 /// # #[derive(
408 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
409 /// # )]
410 /// # struct Data {
411 /// # #[dds(key)]
412 /// # x: i32,
413 /// # #[dds(key)]
414 /// # y: i32,
415 /// # }
416 ///
417 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
418 /// let writer = Writer::new(&topic)?;
419 /// writer.write(&Data::default())?;
420 /// writer.wait_for_acks(Duration::from_secs(1))?;
421 /// # Ok::<_, cyclonedds::Error>(())
422 /// ```
423 pub fn wait_for_acks(&self, timeout: crate::Duration) -> Result<()> {
424 ffi::dds_wait_for_acks(self.inner, timeout.inner)
425 }
426
427 /// Returns the instance handles of all readers currently matched with
428 /// this writer.
429 ///
430 /// The returned handles can be compared against
431 /// [`InstanceHandle`](crate::entity::InstanceHandle) values from reader
432 /// entities to identify specific matched readers.
433 ///
434 /// # Errors
435 ///
436 /// Returns an [`Error`](crate::Error) if the writer fails to retrieve the
437 /// matched subscriptions.
438 ///
439 /// # Examples
440 ///
441 /// ```
442 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Reader};
443 /// # let domain = Domain::default();
444 /// # let participant = Participant::new(&domain)?;
445 /// # #[derive(
446 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
447 /// # )]
448 /// # struct Data {
449 /// # #[dds(key)]
450 /// # x: i32,
451 /// # #[dds(key)]
452 /// # y: i32,
453 /// # }
454 /// use cyclonedds::entity::Entity;
455 ///
456 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
457 /// let writer = Writer::new(&topic)?;
458 /// let reader = Reader::new(&topic)?;
459 ///
460 /// let matched = writer.matched_subscriptions()?;
461 /// assert_eq!(matched[0], reader.instance_handle()?);
462 /// # Ok::<_, cyclonedds::Error>(())
463 /// ```
464 pub fn matched_subscriptions(&self) -> Result<Vec<crate::entity::InstanceHandle>> {
465 ffi::dds_get_matched_subscriptions(self.inner).map(|matched| {
466 matched
467 .iter()
468 .map(|&inner| crate::entity::InstanceHandle { inner })
469 .collect()
470 })
471 }
472
473 /// Registers an instance identified by `key` with this writer.
474 ///
475 /// Registration is optional but allows for the pre-allocation of resources
476 /// for the instance. Returns the
477 /// [`InstanceHandle`](crate::entity::InstanceHandle) assigned to the
478 /// instance.
479 ///
480 /// # Errors
481 ///
482 /// Returns an [`Error`](crate::Error) if the writer fails to register the
483 /// instance.
484 ///
485 /// # Examples
486 ///
487 /// ```
488 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
489 /// # let domain = Domain::default();
490 /// # let participant = Participant::new(&domain)?;
491 /// # #[derive(
492 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
493 /// # )]
494 /// # struct Data {
495 /// # #[dds(key)]
496 /// # x: i32,
497 /// # #[dds(key)]
498 /// # y: i32,
499 /// # }
500 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
501 /// let writer = Writer::new(&topic)?;
502 /// let handle = writer.register_instance(&Key::<Data> { x: 1, y: 2 })?;
503 /// # Ok::<_, cyclonedds::Error>(())
504 /// ```
505 pub fn register_instance(&self, key: &T::Key) -> Result<crate::entity::InstanceHandle> {
506 ffi::dds_register_instance::<T>(self.inner, key)
507 .map(|inner| crate::entity::InstanceHandle { inner })
508 }
509
510 /// Unregisters an instance identified by `key` from this writer.
511 ///
512 /// Notifies matched readers that this writer will no longer publish
513 /// samples for the given instance.
514 ///
515 /// # Errors
516 ///
517 /// Returns an [`Error`](crate::Error) if the writer fails to unregister
518 /// the instance.
519 ///
520 /// # Examples
521 ///
522 /// ```
523 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
524 /// # let domain = Domain::default();
525 /// # let participant = Participant::new(&domain)?;
526 /// # #[derive(
527 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
528 /// # )]
529 /// # struct Data {
530 /// # #[dds(key)]
531 /// # x: i32,
532 /// # #[dds(key)]
533 /// # y: i32,
534 /// # }
535 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
536 /// let writer = Writer::new(&topic)?;
537 /// writer.register_instance(&Key::<Data> { x: 1, y: 2 })?;
538 /// writer.unregister_instance(&Key::<Data> { x: 1, y: 2 })?;
539 /// # Ok::<_, cyclonedds::Error>(())
540 /// ```
541 pub fn unregister_instance(&self, key: &T::Key) -> Result<()> {
542 ffi::dds_unregister_instance::<T>(self.inner, key)
543 }
544
545 /// Unregisters an instance identified by its
546 /// [`InstanceHandle`](crate::entity::InstanceHandle).
547 ///
548 /// # Errors
549 ///
550 /// Returns an [`Error`](crate::Error) if the writer fails to unregister
551 /// the instance.
552 ///
553 /// # Examples
554 ///
555 /// ```
556 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
557 /// # let domain = Domain::default();
558 /// # let participant = Participant::new(&domain)?;
559 /// # #[derive(
560 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
561 /// # )]
562 /// # struct Data {
563 /// # #[dds(key)]
564 /// # x: i32,
565 /// # #[dds(key)]
566 /// # y: i32,
567 /// # }
568 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
569 /// let writer = Writer::new(&topic)?;
570 /// let handle = writer.register_instance(&Key::<Data> { x: 1, y: 2 })?;
571 /// writer.unregister_instance_by_handle(handle)?;
572 /// # Ok::<_, cyclonedds::Error>(())
573 /// ```
574 pub fn unregister_instance_by_handle(
575 &self,
576 instance_handle: crate::entity::InstanceHandle,
577 ) -> Result<()> {
578 ffi::dds_unregister_instance_by_handle(self.inner, instance_handle.inner)
579 }
580
581 /// Unregisters an instance identified by `key` with an explicit timestamp.
582 ///
583 /// # Errors
584 ///
585 /// Returns an [`Error`](crate::Error) if the writer fails to unregister
586 /// the instance.
587 ///
588 /// # Examples
589 ///
590 /// ```
591 /// use cyclonedds::Time;
592 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
593 /// # let domain = Domain::default();
594 /// # let participant = Participant::new(&domain)?;
595 /// # #[derive(
596 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
597 /// # )]
598 /// # struct Data {
599 /// # #[dds(key)]
600 /// # x: i32,
601 /// # #[dds(key)]
602 /// # y: i32,
603 /// # }
604 ///
605 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
606 /// let writer = Writer::new(&topic)?;
607 /// writer.unregister_instance_with_timestamp(&Key::<Data> { x: 1, y: 2 }, Time::from_secs(1))?;
608 /// # Ok::<_, cyclonedds::Error>(())
609 /// ```
610 pub fn unregister_instance_with_timestamp(
611 &self,
612 key: &T::Key,
613 timestamp: crate::Time,
614 ) -> Result<()> {
615 ffi::dds_unregister_instance_with_timestamp::<T>(self.inner, key, timestamp.inner)
616 }
617
618 /// Unregisters an instance identified by its
619 /// [`InstanceHandle`](crate::entity::InstanceHandle) with an explicit
620 /// timestamp.
621 ///
622 /// # Errors
623 ///
624 /// Returns an [`Error`](crate::Error) if the writer fails to unregister
625 /// the instance.
626 ///
627 /// # Examples
628 ///
629 /// ```
630 /// use cyclonedds::Time;
631 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
632 /// # let domain = Domain::default();
633 /// # let participant = Participant::new(&domain)?;
634 /// # #[derive(
635 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
636 /// # )]
637 /// # struct Data {
638 /// # #[dds(key)]
639 /// # x: i32,
640 /// # #[dds(key)]
641 /// # y: i32,
642 /// # }
643 ///
644 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
645 /// let writer = Writer::new(&topic)?;
646 /// let handle = writer.register_instance(&Key::<Data> { x: 1, y: 2 })?;
647 /// writer.unregister_instance_by_handle_with_timestamp(handle, Time::from_secs(1))?;
648 /// # Ok::<_, cyclonedds::Error>(())
649 /// ```
650 pub fn unregister_instance_by_handle_with_timestamp(
651 &self,
652 instance_handle: crate::entity::InstanceHandle,
653 timestamp: crate::Time,
654 ) -> Result<()> {
655 ffi::dds_unregister_instance_by_handle_with_timestamp(
656 self.inner,
657 instance_handle.inner,
658 timestamp.inner,
659 )
660 }
661
662 /// Returns the [`InstanceHandle`](crate::entity::InstanceHandle) for the
663 /// instance identified by `key`, or `None` if the instance is not
664 /// registered.
665 ///
666 /// # Examples
667 ///
668 /// ```
669 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
670 /// # let domain = Domain::default();
671 /// # let participant = Participant::new(&domain)?;
672 /// # #[derive(
673 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
674 /// # )]
675 /// # struct Data {
676 /// # #[dds(key)]
677 /// # x: i32,
678 /// # #[dds(key)]
679 /// # y: i32,
680 /// # }
681 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
682 /// let writer = Writer::new(&topic)?;
683 /// let handle = writer.register_instance(&Key::<Data> { x: 1, y: 2 })?;
684 /// assert_eq!(
685 /// writer.lookup_instance(&Key::<Data> { x: 1, y: 2 }),
686 /// Some(handle)
687 /// );
688 /// assert_eq!(writer.lookup_instance(&Key::<Data> { x: 9, y: 9 }), None);
689 /// # Ok::<_, cyclonedds::Error>(())
690 /// ```
691 pub fn lookup_instance(&self, key: &T::Key) -> Option<crate::entity::InstanceHandle> {
692 ffi::dds_lookup_instance::<T>(self.inner, key)
693 .map(|inner| crate::entity::InstanceHandle { inner })
694 }
695
696 /// Writes a sample and immediately disposes the instance.
697 ///
698 /// Equivalent to calling [`write`](Writer::write) followed by
699 /// [`dispose`](Writer::dispose) but in a single operation.
700 ///
701 /// # Errors
702 ///
703 /// Returns an [`Error`](crate::Error) if the writer fails to write or
704 /// dispose.
705 ///
706 /// # Examples
707 ///
708 /// ```
709 /// # use cyclonedds::{Domain, Participant, Topic, Writer};
710 /// # let domain = Domain::default();
711 /// # let participant = Participant::new(&domain)?;
712 /// # #[derive(
713 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
714 /// # )]
715 /// # struct Data {
716 /// # #[dds(key)]
717 /// # x: i32,
718 /// # #[dds(key)]
719 /// # y: i32,
720 /// # }
721 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
722 /// let writer = Writer::new(&topic)?;
723 /// writer.write_dispose(&Data { x: 1, y: 2 })?;
724 /// # Ok::<_, cyclonedds::Error>(())
725 /// ```
726 pub fn write_dispose(&self, data: &T) -> Result<()> {
727 ffi::dds_write_dispose(self.inner, data)
728 }
729
730 /// Writes a sample and immediately disposes the instance with an explicit
731 /// timestamp.
732 ///
733 /// # Errors
734 ///
735 /// Returns an [`Error`](crate::Error) if the writer fails to write or
736 /// dispose.
737 ///
738 /// # Examples
739 ///
740 /// ```
741 /// use cyclonedds::Time;
742 /// # use cyclonedds::{Domain, Participant, Topic, Writer};
743 /// # let domain = Domain::default();
744 /// # let participant = Participant::new(&domain)?;
745 /// # #[derive(
746 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
747 /// # )]
748 /// # struct Data {
749 /// # #[dds(key)]
750 /// # x: i32,
751 /// # #[dds(key)]
752 /// # y: i32,
753 /// # }
754 ///
755 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
756 /// let writer = Writer::new(&topic)?;
757 /// writer.write_dispose_with_timestamp(&Data::default(), Time::from_secs(1))?;
758 /// # Ok::<_, cyclonedds::Error>(())
759 /// ```
760 pub fn write_dispose_with_timestamp(&self, data: &T, timestamp: crate::Time) -> Result<()> {
761 ffi::dds_write_dispose_with_timestamp(self.inner, data, timestamp.inner)
762 }
763
764 /// Disposes the instance identified by `key`.
765 ///
766 /// Notifies matched readers that the instance is no longer valid. The
767 /// instance remains known but its state transitions to disposed.
768 ///
769 /// # Errors
770 ///
771 /// Returns an [`Error`](crate::Error) if the writer fails to dispose the
772 /// instance.
773 ///
774 /// # Examples
775 ///
776 /// ```
777 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
778 /// # let domain = Domain::default();
779 /// # let participant = Participant::new(&domain)?;
780 /// # #[derive(
781 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
782 /// # )]
783 /// # struct Data {
784 /// # #[dds(key)]
785 /// # x: i32,
786 /// # #[dds(key)]
787 /// # y: i32,
788 /// # }
789 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
790 /// let writer = Writer::new(&topic)?;
791 /// writer.write(&Data { x: 1, y: 2 })?;
792 /// writer.dispose(&Key::<Data> { x: 1, y: 2 })?;
793 /// # Ok::<_, cyclonedds::Error>(())
794 /// ```
795 pub fn dispose(&self, key: &T::Key) -> Result<()> {
796 ffi::dds_dispose::<T>(self.inner, key)
797 }
798
799 /// Disposes the instance identified by `key` with an explicit timestamp.
800 ///
801 /// # Errors
802 ///
803 /// Returns an [`Error`](crate::Error) if the writer fails to dispose the
804 /// instance.
805 ///
806 /// # Examples
807 ///
808 /// ```
809 /// use cyclonedds::Time;
810 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
811 /// # let domain = Domain::default();
812 /// # let participant = Participant::new(&domain)?;
813 /// # #[derive(
814 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
815 /// # )]
816 /// # struct Data {
817 /// # #[dds(key)]
818 /// # x: i32,
819 /// # #[dds(key)]
820 /// # y: i32,
821 /// # }
822 ///
823 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
824 /// let writer = Writer::new(&topic)?;
825 /// writer.dispose_with_timestamp(&Key::<Data> { x: 1, y: 2 }, Time::from_secs(1))?;
826 /// # Ok::<_, cyclonedds::Error>(())
827 /// ```
828 pub fn dispose_with_timestamp(&self, key: &T::Key, timestamp: crate::Time) -> Result<()> {
829 ffi::dds_dispose_with_timestamp::<T>(self.inner, key, timestamp.inner)
830 }
831
832 /// Disposes the instance identified by its
833 /// [`InstanceHandle`](crate::entity::InstanceHandle).
834 ///
835 /// # Errors
836 ///
837 /// Returns an [`Error`](crate::Error) if the writer fails to dispose the
838 /// instance.
839 ///
840 /// # Examples
841 ///
842 /// ```
843 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
844 /// # let domain = Domain::default();
845 /// # let participant = Participant::new(&domain)?;
846 /// # #[derive(
847 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
848 /// # )]
849 /// # struct Data {
850 /// # #[dds(key)]
851 /// # x: i32,
852 /// # #[dds(key)]
853 /// # y: i32,
854 /// # }
855 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
856 /// let writer = Writer::new(&topic)?;
857 /// let handle = writer.register_instance(&Key::<Data> { x: 1, y: 2 })?;
858 /// writer.dispose_instance_by_handle(handle)?;
859 /// # Ok::<_, cyclonedds::Error>(())
860 /// ```
861 pub fn dispose_instance_by_handle(
862 &self,
863 instance_handle: crate::entity::InstanceHandle,
864 ) -> Result<()> {
865 ffi::dds_dispose_instance_by_handle(self.inner, instance_handle.inner)
866 }
867
868 /// Disposes the instance identified by its
869 /// [`InstanceHandle`](crate::entity::InstanceHandle) with an explicit
870 /// timestamp.
871 ///
872 /// # Errors
873 ///
874 /// Returns an [`Error`](crate::Error) if the writer fails to dispose the
875 /// instance.
876 ///
877 /// # Examples
878 ///
879 /// ```
880 /// use cyclonedds::Time;
881 /// # use cyclonedds::{Domain, Participant, Topic, Writer, Key};
882 /// # let domain = Domain::default();
883 /// # let participant = Participant::new(&domain)?;
884 /// # #[derive(
885 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
886 /// # )]
887 /// # struct Data {
888 /// # #[dds(key)]
889 /// # x: i32,
890 /// # #[dds(key)]
891 /// # y: i32,
892 /// # }
893 ///
894 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
895 /// let writer = Writer::new(&topic)?;
896 /// let handle = writer.register_instance(&Key::<Data> { x: 1, y: 2 })?;
897 /// writer.dispose_instance_by_handle_with_timestamp(handle, Time::from_secs(1))?;
898 /// # Ok::<_, cyclonedds::Error>(())
899 /// ```
900 pub fn dispose_instance_by_handle_with_timestamp(
901 &self,
902 instance_handle: crate::entity::InstanceHandle,
903 timestamp: crate::Time,
904 ) -> Result<()> {
905 ffi::dds_dispose_instance_by_handle_with_timestamp(
906 self.inner,
907 instance_handle.inner,
908 timestamp.inner,
909 )
910 }
911
912 pub(crate) const fn from_existing(
913 inner: cyclonedds_sys::dds_entity_t,
914 ) -> std::mem::ManuallyDrop<Self> {
915 std::mem::ManuallyDrop::new(Self {
916 inner,
917 phantom_topic: std::marker::PhantomData,
918 })
919 }
920
921 /// Sets the [`WriterListener`](crate::WriterListener) on this writer,
922 /// replacing any previously set listener.
923 ///
924 /// # Errors
925 ///
926 /// Returns an [`Error`](crate::Error) if the writer fails to set the
927 /// listener.
928 ///
929 /// # Examples
930 ///
931 /// ```
932 /// use cyclonedds::listener::WriterListener;
933 /// # use cyclonedds::{Domain, Participant, Topic, Writer};
934 /// # let domain = Domain::default();
935 /// # let participant = Participant::new(&domain)?;
936 /// # #[derive(
937 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
938 /// # )]
939 /// # struct Data {
940 /// # x: i32,
941 /// # }
942 ///
943 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
944 /// let mut writer = Writer::new(&topic)?;
945 /// writer.set_listener(WriterListener::new().with_publication_matched(|_, status| {
946 /// println!("matched readers: {}", status.current.count);
947 /// }))?;
948 /// # Ok::<_, cyclonedds::Error>(())
949 /// ```
950 pub fn set_listener<L>(&mut self, listener: L) -> Result<()>
951 where
952 L: AsRef<crate::WriterListener<T>>,
953 {
954 listener
955 .as_ref()
956 .as_ffi()
957 .and_then(|listener| ffi::dds_set_listener(self.inner, Some(listener.inner)))
958 }
959
960 /// Removes the listener from this writer.
961 ///
962 /// # Errors
963 ///
964 /// Returns an [`Error`](crate::Error) if the writer fails to unset the
965 /// listener.
966 ///
967 /// # Examples
968 ///
969 /// ```
970 /// # use cyclonedds::{Domain, Participant, Topic, Writer};
971 /// # let domain = Domain::default();
972 /// # let participant = Participant::new(&domain)?;
973 /// # #[derive(
974 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
975 /// # )]
976 /// # struct Data {
977 /// # x: i32,
978 /// # }
979 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
980 /// let mut writer = Writer::new(&topic)?;
981 /// writer.unset_listener()?;
982 /// # Ok::<_, cyclonedds::Error>(())
983 /// ```
984 pub fn unset_listener(&mut self) -> Result<()> {
985 ffi::dds_set_listener(self.inner, None)?;
986 Ok(())
987 }
988
989 /// Sets the [`WriterListener`](crate::WriterListener) on this writer,
990 /// consuming and returning `self`.
991 ///
992 /// # Errors
993 ///
994 /// Returns an [`Error`](crate::Error) if the writer fails to set the
995 /// listener.
996 ///
997 /// # Examples
998 ///
999 /// ```
1000 /// use cyclonedds::listener::WriterListener;
1001 /// # use cyclonedds::{Domain, Participant, Topic, Writer};
1002 /// # let domain = Domain::default();
1003 /// # let participant = Participant::new(&domain)?;
1004 /// # #[derive(
1005 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
1006 /// # )]
1007 /// # struct Data {
1008 /// # x: i32,
1009 /// # }
1010 ///
1011 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
1012 /// let writer = Writer::new(&topic)?.with_listener(WriterListener::new())?;
1013 /// # Ok::<_, cyclonedds::Error>(())
1014 /// ```
1015 pub fn with_listener<L>(mut self, listener: L) -> Result<Self>
1016 where
1017 L: AsRef<crate::WriterListener<T>>,
1018 {
1019 self.set_listener(listener).map(|()| self)
1020 }
1021}
1022
1023impl<T> Drop for Writer<'_, '_, '_, T>
1024where
1025 T: crate::Topicable,
1026{
1027 fn drop(&mut self) {
1028 let result = ffi::dds_delete(self.inner);
1029 debug_assert!(
1030 result.is_ok(),
1031 "unable to delete {self:?}, failed with: {result:?}"
1032 );
1033 }
1034}
1035
1036#[cfg(test)]
1037mod tests {
1038 use super::*;
1039 use crate::Topicable;
1040
1041 #[test]
1042 fn test_writer_create() {
1043 let domain_id = crate::tests::domain::unique_id();
1044 let domain = crate::Domain::new(domain_id).unwrap();
1045 let qos = crate::QoS::new();
1046 let topic_name = crate::tests::topic::unique_name();
1047 let participant = crate::Participant::new(&domain).unwrap();
1048 let publisher = crate::Publisher::new(&participant).unwrap();
1049 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1050 let listener = crate::WriterListener::new();
1051
1052 let _ = Writer::new(&topic).unwrap();
1053 let _ = Writer::builder(&topic).with_qos(&qos).build().unwrap();
1054 let _ = Writer::builder(&topic)
1055 .with_publisher(&publisher)
1056 .build()
1057 .unwrap();
1058 let _ = Writer::builder(&topic)
1059 .with_qos(&qos)
1060 .with_publisher(&publisher)
1061 .with_listener(listener)
1062 .build()
1063 .unwrap();
1064 }
1065
1066 #[test]
1067 fn test_writer_create_with_invalid_topic() {
1068 let domain_id = crate::tests::domain::unique_id();
1069 let domain = crate::Domain::new(domain_id).unwrap();
1070 let qos = crate::QoS::new();
1071 let topic_name = crate::tests::topic::unique_name();
1072 let participant = crate::Participant::new(&domain).unwrap();
1073 let mut topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1074
1075 let topic_id = topic.inner;
1076 topic.inner = 0;
1077 let result = Writer::new(&topic).unwrap_err();
1078 assert_eq!(result, crate::Error::BadParameter);
1079 let result = Writer::builder(&topic).with_qos(&qos).build().unwrap_err();
1080 assert_eq!(result, crate::Error::BadParameter);
1081 topic.inner = topic_id;
1082 }
1083
1084 #[test]
1085 fn test_writer_create_with_invalid_publisher() {
1086 let domain_id = crate::tests::domain::unique_id();
1087 let domain = crate::Domain::new(domain_id).unwrap();
1088 let topic_name = crate::tests::topic::unique_name();
1089 let participant = crate::Participant::new(&domain).unwrap();
1090 let mut publisher = crate::Publisher::new(&participant).unwrap();
1091 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1092
1093 let publisher_id = publisher.inner;
1094 publisher.inner = 0;
1095 let result = Writer::builder(&topic)
1096 .with_publisher(&publisher)
1097 .build()
1098 .unwrap_err();
1099 assert_eq!(result, crate::Error::BadParameter);
1100 publisher.inner = publisher_id;
1101 }
1102
1103 #[test]
1104 fn test_writer_write() {
1105 let domain_id = crate::tests::domain::unique_id();
1106 let domain = crate::Domain::new(domain_id).unwrap();
1107 let topic_name = crate::tests::topic::unique_name();
1108 let participant = crate::Participant::new(&domain).unwrap();
1109 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1110 let writer = Writer::new(&topic).unwrap();
1111 writer.write(&crate::tests::topic::Data::default()).unwrap();
1112 }
1113
1114 #[test]
1115 fn test_writer_write_with_timestamp() {
1116 let domain_id = crate::tests::domain::unique_id();
1117 let domain = crate::Domain::new(domain_id).unwrap();
1118 let topic_name = crate::tests::topic::unique_name();
1119 let participant = crate::Participant::new(&domain).unwrap();
1120 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1121 let writer = Writer::new(&topic).unwrap();
1122 let timestamp = crate::Time::from_nanos(10001);
1123 writer
1124 .write_with_timestamp(&crate::tests::topic::Data::default(), timestamp)
1125 .unwrap();
1126 }
1127
1128 #[test]
1129 fn test_writer_operations_on_invalid_writer() {
1130 let domain_id = crate::tests::domain::unique_id();
1131 let domain = crate::Domain::new(domain_id).unwrap();
1132 let topic_name = crate::tests::topic::unique_name();
1133 let participant = crate::Participant::new(&domain).unwrap();
1134 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1135 let mut writer = Writer::new(&topic).unwrap();
1136
1137 let sample = crate::tests::topic::Data::default();
1138 let key = sample.as_key();
1139 let timestamp = crate::Time::from_nanos(10001);
1140 let instance_handle = crate::entity::InstanceHandle { inner: 0 };
1141
1142 let writer_id = writer.inner;
1143 writer.inner = 0;
1144
1145 let result = writer.write(&sample).unwrap_err();
1146 assert_eq!(result, crate::Error::BadParameter);
1147
1148 let result = writer.write_dispose(&sample).unwrap_err();
1149 assert_eq!(result, crate::Error::BadParameter);
1150
1151 let result = writer.write_with_timestamp(&sample, timestamp).unwrap_err();
1152 assert_eq!(result, crate::Error::BadParameter);
1153
1154 let result = writer
1155 .write_dispose_with_timestamp(&sample, timestamp)
1156 .unwrap_err();
1157 assert_eq!(result, crate::Error::BadParameter);
1158
1159 let result = writer.unregister_instance(&key).unwrap_err();
1160 assert_eq!(result, crate::Error::BadParameter);
1161
1162 let result = writer
1163 .unregister_instance_with_timestamp(&key, timestamp)
1164 .unwrap_err();
1165 assert_eq!(result, crate::Error::BadParameter);
1166
1167 let result = writer
1168 .unregister_instance_by_handle(instance_handle)
1169 .unwrap_err();
1170 assert_eq!(result, crate::Error::BadParameter);
1171
1172 let result = writer
1173 .unregister_instance_by_handle_with_timestamp(instance_handle, timestamp)
1174 .unwrap_err();
1175 assert_eq!(result, crate::Error::BadParameter);
1176
1177 let result = writer.dispose(&key).unwrap_err();
1178 assert_eq!(result, crate::Error::BadParameter);
1179
1180 let result = writer.dispose_with_timestamp(&key, timestamp).unwrap_err();
1181 assert_eq!(result, crate::Error::BadParameter);
1182
1183 let result = writer
1184 .dispose_instance_by_handle(instance_handle)
1185 .unwrap_err();
1186 assert_eq!(result, crate::Error::BadParameter);
1187
1188 let result = writer
1189 .dispose_instance_by_handle_with_timestamp(instance_handle, timestamp)
1190 .unwrap_err();
1191 assert_eq!(result, crate::Error::BadParameter);
1192
1193 let result = writer.flush().unwrap_err();
1194 assert_eq!(result, crate::Error::BadParameter);
1195
1196 let result = writer.register_instance(&key).unwrap_err();
1197 assert_eq!(result, crate::Error::BadParameter);
1198
1199 let result = writer.matched_subscriptions().unwrap_err();
1200 assert_eq!(result, crate::Error::BadParameter);
1201
1202 assert_eq!(result, crate::Error::BadParameter);
1203 writer.inner = writer_id;
1204 }
1205
1206 #[test]
1207 fn test_writer_create_from_existing() {
1208 let domain_id = crate::tests::domain::unique_id();
1209 let domain = crate::Domain::new(domain_id).unwrap();
1210 let topic_name = crate::tests::topic::unique_name();
1211 let participant = crate::Participant::new(&domain).unwrap();
1212 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1213
1214 let writer_01 = Writer::new(&topic).unwrap();
1215 let writer_02 = Writer::<crate::tests::topic::Data>::from_existing(writer_01.inner);
1216 assert_eq!(writer_01.inner, writer_02.inner);
1217 }
1218
1219 #[test]
1220 fn test_writer_register_unregister_instance() {
1221 use crate::state;
1222
1223 let domain_id = crate::tests::domain::unique_id();
1224 let domain = crate::Domain::new(domain_id).unwrap();
1225 let topic_name = crate::tests::topic::unique_name();
1226 let participant = crate::Participant::new(&domain).unwrap();
1227 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1228
1229 let writer = Writer::new(&topic).unwrap();
1230 let reader = crate::Reader::new(&topic).unwrap();
1231
1232 let sample = crate::tests::topic::Data {
1233 x: 0,
1234 y: 1,
1235 message: String::from("initial"),
1236 };
1237 writer.write(&sample).unwrap();
1238 let sample = crate::tests::topic::Data {
1239 x: 1,
1240 y: 2,
1241 message: String::from("registered"),
1242 };
1243 writer.write(&sample).unwrap();
1244 let registered_handle = writer.register_instance(&sample.as_key()).unwrap();
1245 let sample = crate::tests::topic::Data {
1246 x: 2,
1247 y: 3,
1248 message: String::from("unregistered"),
1249 };
1250 writer.write(&sample).unwrap();
1251 writer.unregister_instance(&sample.as_key()).unwrap();
1252
1253 writer.write(&crate::tests::topic::Data::default()).unwrap();
1254
1255 let samples = reader.take().unwrap();
1256 assert_eq!(samples.len(), 4);
1257
1258 for sample in samples {
1259 assert!(sample.is_sample());
1260 match sample.message.as_ref() {
1261 "initial" => {
1262 assert_eq!(
1263 (sample.x, sample.y, sample.message.as_ref()),
1264 (0, 1, "initial")
1265 );
1266
1267 assert_eq!(
1268 sample.info().state,
1269 state::sample::Fresh | state::view::New | state::instance::Alive
1270 );
1271 }
1272 "registered" => {
1273 assert_eq!(
1274 (sample.x, sample.y, sample.message.as_ref()),
1275 (1, 2, "registered")
1276 );
1277 let info = sample.info();
1278 assert_eq!(info.instance_handle, registered_handle);
1279 assert_eq!(
1280 info.state,
1281 state::sample::Fresh | state::view::New | state::instance::Alive
1282 );
1283 }
1284 "unregistered" => {
1285 assert_eq!(
1286 (sample.x, sample.y, sample.message.as_ref()),
1287 (2, 3, "unregistered")
1288 );
1289 let info = sample.info();
1290 assert_eq!(
1291 info.state,
1292 state::sample::Fresh | state::view::New | state::instance::Disposed
1293 );
1294 }
1295 _ => {
1296 assert_eq!(*sample, crate::tests::topic::Data::default());
1297 assert_eq!(
1298 sample.info().state,
1299 state::sample::Fresh | state::view::New | state::instance::Alive
1300 );
1301 }
1302 }
1303 }
1304
1305 let sample = crate::tests::topic::Data {
1306 x: 4,
1307 y: 5,
1308 message: String::from("registered"),
1309 };
1310 let key = sample.as_key();
1311 let registered_handle = writer.register_instance(&key).unwrap();
1312 writer.write(&sample).unwrap();
1313 let lookup_handle = writer.lookup_instance(&key).unwrap();
1314 assert_eq!(registered_handle, lookup_handle);
1315 writer
1316 .unregister_instance_by_handle(registered_handle)
1317 .unwrap();
1318 let received_sample = &reader.read().unwrap()[0];
1319 assert_eq!(**received_sample, sample);
1320 assert_eq!(
1321 received_sample.info().state,
1322 state::sample::Fresh | state::view::New | state::instance::Disposed
1323 );
1324 }
1325
1326 // TODO this test doesn't really validate the flushing side.
1327 #[test]
1328 fn test_writer_flush() {
1329 let domain_id = crate::tests::domain::unique_id();
1330 let domain = crate::Domain::new(domain_id).unwrap();
1331 let topic_name = crate::tests::topic::unique_name();
1332 let participant = crate::Participant::new(&domain).unwrap();
1333 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1334
1335 let writer_01 = Writer::new(&topic).unwrap();
1336 let writer_02 = Writer::new(&topic).unwrap();
1337
1338 let sample = crate::tests::topic::Data::default();
1339
1340 writer_01.write(&sample).unwrap();
1341 writer_01.write(&sample).unwrap();
1342 writer_01.write(&sample).unwrap();
1343 writer_01.write(&sample).unwrap();
1344 writer_01.flush().unwrap();
1345
1346 writer_02.write(&sample).unwrap();
1347 writer_02.write(&sample).unwrap();
1348 writer_02.write(&sample).unwrap();
1349 writer_02.write(&sample).unwrap();
1350 writer_02.flush().unwrap();
1351 }
1352
1353 #[test]
1354 fn test_writer_wait_for_acks() {
1355 let domain_id = crate::tests::domain::unique_id();
1356 let domain = crate::Domain::new(domain_id).unwrap();
1357 let topic_name = crate::tests::topic::unique_name();
1358 let participant = crate::Participant::new(&domain).unwrap();
1359 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1360
1361 let writer = Writer::new(&topic).unwrap();
1362 let _reader = crate::Reader::builder(&topic)
1363 .with_qos(&crate::QoS::new().with_reliability(
1364 crate::qos::policy::Reliability::Reliable {
1365 max_blocking_time: crate::Duration::INFINITE,
1366 },
1367 ))
1368 .build()
1369 .unwrap();
1370
1371 writer.write(&crate::tests::topic::Data::default()).unwrap();
1372 writer
1373 .wait_for_acks(crate::Duration::from_nanos(100))
1374 .unwrap();
1375 }
1376
1377 #[test]
1378 fn test_writer_matched_subscriptions() {
1379 use crate::entity::Entity;
1380
1381 let domain_id = crate::tests::domain::unique_id();
1382 let domain = crate::Domain::new(domain_id).unwrap();
1383 let topic_name = crate::tests::topic::unique_name();
1384 let participant = crate::Participant::new(&domain).unwrap();
1385 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1386
1387 let writer = Writer::new(&topic).unwrap();
1388 let matched = writer.matched_subscriptions().unwrap();
1389 assert!(matched.is_empty(), "{matched:#?}");
1390 let reader = crate::Reader::new(&topic).unwrap();
1391 let matched = writer.matched_subscriptions().unwrap();
1392 assert_eq!(matched, vec![reader.instance_handle().unwrap()]);
1393 }
1394
1395 #[test]
1396 fn test_writer_lookup_instance() {
1397 let domain_id = crate::tests::domain::unique_id();
1398 let domain = crate::Domain::new(domain_id).unwrap();
1399 let topic_name = crate::tests::topic::unique_name();
1400 let participant = crate::Participant::new(&domain).unwrap();
1401 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1402
1403 let writer = Writer::new(&topic).unwrap();
1404
1405 let sample = crate::tests::topic::Data::default();
1406 let key = sample.as_key();
1407
1408 let result = writer.lookup_instance(&key);
1409 assert_eq!(result, None);
1410
1411 let registered_handle = writer.register_instance(&key).unwrap();
1412 let result = writer.lookup_instance(&key);
1413 assert_eq!(result, Some(registered_handle));
1414 }
1415
1416 #[test]
1417 fn test_writer_unregister() {
1418 use crate::entity::Entity;
1419 use crate::state;
1420
1421 let domain_id = crate::tests::domain::unique_id();
1422 let domain = crate::Domain::new(domain_id).unwrap();
1423 let topic_name = crate::tests::topic::unique_name();
1424 let participant = crate::Participant::new(&domain).unwrap();
1425 let qos = crate::QoS::new()
1426 .with_destination_order(crate::qos::policy::DestinationOrder::BySourceTimestamp);
1427 let topic = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
1428 .with_qos(&qos)
1429 .build()
1430 .unwrap();
1431
1432 let qos = qos
1433 .with_reliability(crate::qos::policy::Reliability::Reliable {
1434 max_blocking_time: std::time::Duration::from_millis(100).try_into().unwrap(),
1435 })
1436 .with_resource_limits(crate::qos::policy::ResourceLimits {
1437 max_samples: crate::qos::policy::ResourceLimit::Unlimited,
1438 max_instances: crate::qos::policy::ResourceLimit::Limited(3),
1439 max_samples_per_instance: crate::qos::policy::ResourceLimit::Limited(1),
1440 });
1441
1442 let reader = crate::Reader::builder(&topic)
1443 .with_qos(&qos)
1444 .build()
1445 .unwrap();
1446
1447 let qos = qos.with_writer_data_lifecycle(crate::qos::policy::WriterDataLifecycle {
1448 autodispose_unregistered_instances: false,
1449 });
1450 let writer = Writer::builder(&topic).with_qos(&qos).build().unwrap();
1451
1452 // Sync writer to reader.
1453 let mut waitset = crate::WaitSet::new(&participant).unwrap();
1454 writer
1455 .set_status_mask(crate::Status::PublicationMatched)
1456 .unwrap();
1457 waitset.attach(&writer, Some(&writer)).unwrap();
1458 let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1459 assert_eq!(result[0], &writer);
1460 waitset.detach(&writer).unwrap();
1461
1462 // Sync reader to writer.
1463 let mut waitset = crate::WaitSet::new(&participant).unwrap();
1464 reader
1465 .set_status_mask(crate::Status::SubscriptionMatched)
1466 .unwrap();
1467 waitset.attach(&reader, Some(&reader)).unwrap();
1468 let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1469 assert_eq!(result[0], &reader);
1470 waitset.detach(&reader).unwrap();
1471
1472 for i in 0..3 {
1473 let sample = crate::tests::topic::Data {
1474 x: i,
1475 y: i.cast_signed() + 1,
1476 ..crate::tests::topic::Data::default()
1477 };
1478 writer.write(&sample).unwrap();
1479 }
1480
1481 let key_01 = crate::tests::topic::Data {
1482 x: 0,
1483 y: 1,
1484 ..crate::tests::topic::Data::default()
1485 }
1486 .as_key();
1487 let handle = writer.lookup_instance(&key_01).unwrap();
1488 writer.unregister_instance_by_handle(handle).unwrap();
1489
1490 let key_02 = crate::tests::topic::Data {
1491 x: 1,
1492 y: 2,
1493 ..crate::tests::topic::Data::default()
1494 }
1495 .as_key();
1496 writer.unregister_instance(&key_02).unwrap();
1497 let samples = reader.read().unwrap();
1498 assert_eq!(samples.len(), 3);
1499
1500 for sample in samples {
1501 let key = sample.as_key();
1502
1503 if key == key_01 || key == key_02 {
1504 assert_eq!(*sample, crate::tests::topic::Data::from_key(&key));
1505 assert!(sample.is_sample());
1506 assert_eq!(
1507 sample.info().state,
1508 state::sample::Fresh | state::view::New | state::instance::Unregistered
1509 );
1510 } else {
1511 assert_eq!(
1512 *sample,
1513 crate::tests::topic::Data {
1514 x: 2,
1515 y: 3,
1516 ..crate::tests::topic::Data::default()
1517 }
1518 );
1519 assert!(sample.is_sample());
1520 assert_eq!(
1521 sample.info().state,
1522 state::sample::Fresh | state::view::New | state::instance::Alive
1523 );
1524 }
1525 }
1526 }
1527
1528 #[test]
1529 fn test_writer_unregister_with_timestamp() {
1530 use crate::entity::Entity;
1531 use crate::state;
1532
1533 let domain_id = crate::tests::domain::unique_id();
1534 let domain = crate::Domain::new(domain_id).unwrap();
1535 let topic_name = crate::tests::topic::unique_name();
1536 let participant = crate::Participant::new(&domain).unwrap();
1537 let qos = crate::QoS::new()
1538 .with_destination_order(crate::qos::policy::DestinationOrder::BySourceTimestamp);
1539 let topic = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
1540 .with_qos(&qos)
1541 .build()
1542 .unwrap();
1543
1544 let qos = qos
1545 .with_reliability(crate::qos::policy::Reliability::Reliable {
1546 max_blocking_time: std::time::Duration::from_millis(100).try_into().unwrap(),
1547 })
1548 .with_resource_limits(crate::qos::policy::ResourceLimits {
1549 max_samples: crate::qos::policy::ResourceLimit::Unlimited,
1550 max_instances: crate::qos::policy::ResourceLimit::Limited(3),
1551 max_samples_per_instance: crate::qos::policy::ResourceLimit::Limited(1),
1552 });
1553
1554 let reader = crate::Reader::builder(&topic)
1555 .with_qos(&qos)
1556 .build()
1557 .unwrap();
1558
1559 let qos = qos.with_writer_data_lifecycle(crate::qos::policy::WriterDataLifecycle {
1560 autodispose_unregistered_instances: false,
1561 });
1562 let writer = Writer::builder(&topic).with_qos(&qos).build().unwrap();
1563
1564 // Sync writer to reader.
1565 let mut waitset = crate::WaitSet::new(&participant).unwrap();
1566 writer
1567 .set_status_mask(crate::Status::PublicationMatched)
1568 .unwrap();
1569 waitset.attach(&writer, Some(&writer)).unwrap();
1570 let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1571 assert_eq!(result[0], &writer);
1572 waitset.detach(&writer).unwrap();
1573
1574 // Sync reader to writer.
1575 let mut waitset = crate::WaitSet::new(&participant).unwrap();
1576 reader
1577 .set_status_mask(crate::Status::SubscriptionMatched)
1578 .unwrap();
1579 waitset.attach(&reader, Some(&reader)).unwrap();
1580 let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1581 assert_eq!(result[0], &reader);
1582 waitset.detach(&reader).unwrap();
1583
1584 for i in 0..3 {
1585 let sample = crate::tests::topic::Data {
1586 x: i,
1587 y: i.cast_signed() + 1,
1588 ..Default::default()
1589 };
1590 writer.write(&sample).unwrap();
1591 }
1592
1593 let time = std::time::SystemTime::now().try_into().unwrap();
1594
1595 let key_01 = crate::tests::topic::Data {
1596 x: 0,
1597 y: 1,
1598 ..Default::default()
1599 }
1600 .as_key();
1601 let handle = writer.lookup_instance(&key_01).unwrap();
1602 writer
1603 .unregister_instance_by_handle_with_timestamp(handle, time)
1604 .unwrap();
1605
1606 let key_02 = crate::tests::topic::Data {
1607 x: 1,
1608 y: 2,
1609 ..Default::default()
1610 }
1611 .as_key();
1612 writer
1613 .unregister_instance_with_timestamp(&key_02, time)
1614 .unwrap();
1615 let samples = reader.read().unwrap();
1616 assert_eq!(samples.len(), 3);
1617
1618 for sample in samples {
1619 let key = sample.as_key();
1620
1621 if key == key_01 || key == key_02 {
1622 assert_eq!(*sample, crate::tests::topic::Data::from_key(&key));
1623 assert!(sample.is_sample());
1624 assert_eq!(
1625 sample.info().state,
1626 state::sample::Fresh | state::view::New | state::instance::Unregistered
1627 );
1628 } else {
1629 assert_eq!(
1630 *sample,
1631 crate::tests::topic::Data {
1632 x: 2,
1633 y: 3,
1634 ..Default::default()
1635 }
1636 );
1637 assert!(sample.is_sample());
1638 assert_eq!(
1639 sample.info().state,
1640 state::sample::Fresh | state::view::New | state::instance::Alive
1641 );
1642 }
1643 }
1644 }
1645
1646 #[test]
1647 fn test_writer_write_dispose() {
1648 use crate::entity::Entity;
1649 use crate::state;
1650
1651 let domain_id = crate::tests::domain::unique_id();
1652 let domain = crate::Domain::new(domain_id).unwrap();
1653 let topic_name = crate::tests::topic::unique_name();
1654 let participant = crate::Participant::new(&domain).unwrap();
1655 let qos = crate::QoS::new()
1656 .with_destination_order(crate::qos::policy::DestinationOrder::BySourceTimestamp);
1657 let topic = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
1658 .with_qos(&qos)
1659 .build()
1660 .unwrap();
1661
1662 let qos = qos
1663 .with_reliability(crate::qos::policy::Reliability::Reliable {
1664 max_blocking_time: std::time::Duration::from_millis(100).try_into().unwrap(),
1665 })
1666 .with_resource_limits(crate::qos::policy::ResourceLimits {
1667 max_samples: crate::qos::policy::ResourceLimit::Unlimited,
1668 max_instances: crate::qos::policy::ResourceLimit::Limited(4),
1669 max_samples_per_instance: crate::qos::policy::ResourceLimit::Limited(1),
1670 });
1671
1672 let reader = crate::Reader::builder(&topic)
1673 .with_qos(&qos)
1674 .build()
1675 .unwrap();
1676
1677 let qos = qos.with_writer_data_lifecycle(crate::qos::policy::WriterDataLifecycle {
1678 autodispose_unregistered_instances: false,
1679 });
1680 let writer = Writer::builder(&topic).with_qos(&qos).build().unwrap();
1681
1682 // Sync writer to reader.
1683 let mut waitset = crate::WaitSet::new(&participant).unwrap();
1684 writer
1685 .set_status_mask(crate::Status::PublicationMatched)
1686 .unwrap();
1687 waitset.attach(&writer, Some(&writer)).unwrap();
1688 let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1689 assert_eq!(result[0], &writer);
1690 waitset.detach(&writer).unwrap();
1691
1692 // Sync reader to writer.
1693 let mut waitset = crate::WaitSet::new(&participant).unwrap();
1694 reader
1695 .set_status_mask(crate::Status::SubscriptionMatched)
1696 .unwrap();
1697 waitset.attach(&reader, Some(&reader)).unwrap();
1698 let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1699 assert_eq!(result[0], &reader);
1700 waitset.detach(&reader).unwrap();
1701
1702 let time = std::time::SystemTime::now().try_into().unwrap();
1703 for i in 0..4 {
1704 let sample = crate::tests::topic::Data {
1705 x: i,
1706 y: i.cast_signed() + 1,
1707 ..Default::default()
1708 };
1709 if sample.x.is_multiple_of(2) {
1710 if sample.x < 2 {
1711 writer.write(&sample).unwrap();
1712 } else {
1713 writer.write_with_timestamp(&sample, time).unwrap();
1714 }
1715 } else if sample.x < 2 {
1716 writer.write_dispose(&sample).unwrap();
1717 } else {
1718 writer.write_dispose_with_timestamp(&sample, time).unwrap();
1719 }
1720 }
1721
1722 let samples = reader.read().unwrap();
1723 assert_eq!(samples.len(), 4);
1724
1725 for sample in samples {
1726 assert_eq!(
1727 *sample,
1728 crate::tests::topic::Data {
1729 x: sample.x,
1730 y: sample.x.cast_signed() + 1,
1731 ..Default::default()
1732 }
1733 );
1734 if sample.x % 2 == 0 {
1735 assert!(sample.is_sample());
1736 assert_eq!(
1737 sample.info().state,
1738 state::sample::Fresh | state::view::New | state::instance::Alive
1739 );
1740 } else {
1741 assert!(sample.is_sample());
1742 assert_eq!(
1743 sample.info().state,
1744 state::sample::Fresh | state::view::New | state::instance::Disposed
1745 );
1746 }
1747 }
1748 }
1749
1750 #[test]
1751 fn test_writer_write_and_then_dispose() {
1752 use crate::entity::Entity;
1753 use crate::state;
1754
1755 let domain_id = crate::tests::domain::unique_id();
1756 let domain = crate::Domain::new(domain_id).unwrap();
1757 let topic_name = crate::tests::topic::unique_name();
1758 let participant = crate::Participant::new(&domain).unwrap();
1759 let qos = crate::QoS::new()
1760 .with_destination_order(crate::qos::policy::DestinationOrder::BySourceTimestamp);
1761 let topic = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
1762 .with_qos(&qos)
1763 .build()
1764 .unwrap();
1765
1766 let qos = qos
1767 .with_reliability(crate::qos::policy::Reliability::Reliable {
1768 max_blocking_time: std::time::Duration::from_millis(100).try_into().unwrap(),
1769 })
1770 .with_resource_limits(crate::qos::policy::ResourceLimits {
1771 max_samples: crate::qos::policy::ResourceLimit::Unlimited,
1772 max_instances: crate::qos::policy::ResourceLimit::Limited(4),
1773 max_samples_per_instance: crate::qos::policy::ResourceLimit::Limited(1),
1774 });
1775
1776 let reader = crate::Reader::builder(&topic)
1777 .with_qos(&qos)
1778 .build()
1779 .unwrap();
1780
1781 let qos = qos.with_writer_data_lifecycle(crate::qos::policy::WriterDataLifecycle {
1782 autodispose_unregistered_instances: false,
1783 });
1784 let writer = Writer::builder(&topic).with_qos(&qos).build().unwrap();
1785
1786 // Sync writer to reader.
1787 let mut waitset = crate::WaitSet::new(&participant).unwrap();
1788 writer
1789 .set_status_mask(crate::Status::PublicationMatched)
1790 .unwrap();
1791 waitset.attach(&writer, Some(&writer)).unwrap();
1792 let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1793 assert_eq!(result[0], &writer);
1794 waitset.detach(&writer).unwrap();
1795
1796 // Sync reader to writer.
1797 let mut waitset = crate::WaitSet::new(&participant).unwrap();
1798 reader
1799 .set_status_mask(crate::Status::SubscriptionMatched)
1800 .unwrap();
1801 waitset.attach(&reader, Some(&reader)).unwrap();
1802 let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1803 assert_eq!(result[0], &reader);
1804 waitset.detach(&reader).unwrap();
1805
1806 let time = std::time::SystemTime::now().try_into().unwrap();
1807 for i in 0..4 {
1808 let sample = crate::tests::topic::Data {
1809 x: i,
1810 y: i.cast_signed() + 1,
1811 ..Default::default()
1812 };
1813 if sample.x.is_multiple_of(2) {
1814 if sample.x < 2 {
1815 writer.write(&sample).unwrap();
1816 } else {
1817 writer.write_with_timestamp(&sample, time).unwrap();
1818 }
1819 } else {
1820 let key = sample.as_key();
1821 if sample.x < 2 {
1822 writer.write(&sample).unwrap();
1823 writer.dispose(&key).unwrap();
1824 } else {
1825 writer.write_with_timestamp(&sample, time).unwrap();
1826 writer.dispose_with_timestamp(&key, time).unwrap();
1827 }
1828 }
1829 }
1830
1831 let samples = reader.read().unwrap();
1832 assert_eq!(samples.len(), 4);
1833
1834 for sample in samples {
1835 assert_eq!(
1836 *sample,
1837 crate::tests::topic::Data {
1838 x: sample.x,
1839 y: sample.x.cast_signed() + 1,
1840 ..Default::default()
1841 }
1842 );
1843 if sample.x.is_multiple_of(2) {
1844 assert!(sample.is_sample());
1845 assert_eq!(
1846 sample.info().state,
1847 state::sample::Fresh | state::view::New | state::instance::Alive
1848 );
1849 } else {
1850 assert!(sample.is_sample());
1851 assert_eq!(
1852 sample.info().state,
1853 state::sample::Fresh | state::view::New | state::instance::Disposed
1854 );
1855 }
1856 }
1857 }
1858
1859 #[test]
1860 fn test_writer_write_and_then_dispose_by_instance_handle() {
1861 use crate::entity::Entity;
1862 use crate::state;
1863
1864 let domain_id = crate::tests::domain::unique_id();
1865 let domain = crate::Domain::new(domain_id).unwrap();
1866 let topic_name = crate::tests::topic::unique_name();
1867 let participant = crate::Participant::new(&domain).unwrap();
1868 let qos = crate::QoS::new()
1869 .with_destination_order(crate::qos::policy::DestinationOrder::BySourceTimestamp);
1870 let topic = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
1871 .with_qos(&qos)
1872 .build()
1873 .unwrap();
1874
1875 let qos = qos
1876 .with_reliability(crate::qos::policy::Reliability::Reliable {
1877 max_blocking_time: std::time::Duration::from_millis(100).try_into().unwrap(),
1878 })
1879 .with_resource_limits(crate::qos::policy::ResourceLimits {
1880 max_samples: crate::qos::policy::ResourceLimit::Unlimited,
1881 max_instances: crate::qos::policy::ResourceLimit::Limited(4),
1882 max_samples_per_instance: crate::qos::policy::ResourceLimit::Limited(1),
1883 });
1884
1885 let reader = crate::Reader::builder(&topic)
1886 .with_qos(&qos)
1887 .build()
1888 .unwrap();
1889
1890 let qos = qos.with_writer_data_lifecycle(crate::qos::policy::WriterDataLifecycle {
1891 autodispose_unregistered_instances: false,
1892 });
1893 let writer = Writer::builder(&topic).with_qos(&qos).build().unwrap();
1894
1895 // Sync writer to reader.
1896 let mut waitset = crate::WaitSet::new(&participant).unwrap();
1897 writer
1898 .set_status_mask(crate::Status::PublicationMatched)
1899 .unwrap();
1900 waitset.attach(&writer, Some(&writer)).unwrap();
1901 let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1902 assert_eq!(result[0], &writer);
1903 waitset.detach(&writer).unwrap();
1904
1905 // Sync reader to writer.
1906 let mut waitset = crate::WaitSet::new(&participant).unwrap();
1907 reader
1908 .set_status_mask(crate::Status::SubscriptionMatched)
1909 .unwrap();
1910 waitset.attach(&reader, Some(&reader)).unwrap();
1911 let result = waitset.wait(crate::Duration::INFINITE).unwrap();
1912 assert_eq!(result[0], &reader);
1913 waitset.detach(&reader).unwrap();
1914
1915 let time = std::time::SystemTime::now().try_into().unwrap();
1916 for i in 0..4 {
1917 let sample = crate::tests::topic::Data {
1918 x: i,
1919 y: i.cast_signed() + 1,
1920 ..Default::default()
1921 };
1922 if sample.x.is_multiple_of(2) {
1923 if sample.x < 2 {
1924 writer.write(&sample).unwrap();
1925 } else {
1926 writer.write_with_timestamp(&sample, time).unwrap();
1927 }
1928 } else {
1929 let key = sample.as_key();
1930 if sample.x < 2 {
1931 writer.write(&sample).unwrap();
1932 let instance_handle = writer.lookup_instance(&key).unwrap();
1933 writer.dispose_instance_by_handle(instance_handle).unwrap();
1934 } else {
1935 writer.write_with_timestamp(&sample, time).unwrap();
1936 let instance_handle = writer.lookup_instance(&key).unwrap();
1937 writer
1938 .dispose_instance_by_handle_with_timestamp(instance_handle, time)
1939 .unwrap();
1940 }
1941 }
1942 }
1943
1944 let samples = reader.read().unwrap();
1945 assert_eq!(samples.len(), 4);
1946
1947 for sample in samples {
1948 assert_eq!(
1949 *sample,
1950 crate::tests::topic::Data {
1951 x: sample.x,
1952 y: sample.x.cast_signed() + 1,
1953 ..Default::default()
1954 }
1955 );
1956 if sample.x % 2 == 0 {
1957 assert!(sample.is_sample());
1958 assert_eq!(
1959 sample.info().state,
1960 state::sample::Fresh | state::view::New | state::instance::Alive
1961 );
1962 } else {
1963 assert!(sample.is_sample());
1964 assert_eq!(
1965 sample.info().state,
1966 state::sample::Fresh | state::view::New | state::instance::Disposed
1967 );
1968 }
1969 }
1970 }
1971
1972 #[test]
1973 fn test_writer_with_listener() {
1974 let domain_id = crate::tests::domain::unique_id();
1975 let domain = crate::Domain::new(domain_id).unwrap();
1976 let topic_name = crate::tests::topic::unique_name();
1977 let participant = crate::Participant::new(&domain).unwrap();
1978 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1979
1980 let listener = crate::WriterListener::new()
1981 .with_liveliness_lost(|_, _| ())
1982 .with_offered_deadline_missed(|_, _| ())
1983 .with_offered_incompatible_qos(|_, _| ())
1984 .with_publication_matched(|_, _| ());
1985
1986 let _ = Writer::new(&topic)
1987 .unwrap()
1988 .with_listener(&listener)
1989 .unwrap();
1990
1991 let mut writer = Writer::new(&topic).unwrap();
1992 writer.set_listener(&listener).unwrap();
1993 writer.unset_listener().unwrap();
1994 }
1995
1996 #[test]
1997 fn test_writer_with_listener_on_invalid_writer() {
1998 let domain_id = crate::tests::domain::unique_id();
1999 let domain = crate::Domain::new(domain_id).unwrap();
2000 let topic_name = crate::tests::topic::unique_name();
2001 let participant = crate::Participant::new(&domain).unwrap();
2002 let topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
2003
2004 let listener = crate::WriterListener::new();
2005
2006 let mut writer = Writer::new(&topic).unwrap();
2007 let writer_id = writer.inner;
2008 writer.inner = 0;
2009 let result = writer.set_listener(&listener).unwrap_err();
2010 assert_eq!(result, crate::Error::BadParameter);
2011 let result = writer.unset_listener().unwrap_err();
2012 assert_eq!(result, crate::Error::BadParameter);
2013 writer.inner = writer_id;
2014 }
2015}