1extern crate alloc;
23use alloc::boxed::Box;
24use alloc::string::ToString;
25use alloc::sync::Arc;
26use alloc::vec::Vec;
27use core::marker::PhantomData;
28
29#[cfg(feature = "std")]
30use std::sync::Mutex;
31
32use crate::dds_type::DdsType;
33use crate::entity::StatusMask;
34use crate::error::{DdsError, Result};
35#[cfg(feature = "std")]
36use crate::instance_handle::{HANDLE_NIL, InstanceHandle};
37#[cfg(feature = "std")]
38use crate::instance_tracker::InstanceTracker;
39use crate::listener::{ArcDataWriterListener, ArcPublisherListener};
40use crate::qos::{DataWriterQos, PublisherQos};
41#[cfg(feature = "std")]
42use crate::time::{Time, get_current_time};
43use crate::topic::Topic;
44
45#[cfg(feature = "std")]
46use crate::runtime::DcpsRuntime;
47#[cfg(feature = "std")]
48use zerodds_qos::ReliabilityKind;
49#[cfg(feature = "std")]
50use zerodds_rtps::wire_types::EntityId;
51
52#[derive(Debug)]
58pub struct Publisher {
59 pub(crate) inner: Arc<PublisherInner>,
60}
61
62pub(crate) struct PublisherInner {
63 #[cfg(feature = "std")]
66 pub(crate) qos: std::sync::Mutex<PublisherQos>,
67 #[cfg(not(feature = "std"))]
68 #[allow(dead_code)]
69 pub(crate) qos: PublisherQos,
70 pub(crate) entity_state: alloc::sync::Arc<crate::entity::EntityState>,
72 #[cfg(feature = "std")]
76 pub(crate) runtime: Option<Arc<DcpsRuntime>>,
77 #[cfg(feature = "std")]
80 pub(crate) listener: std::sync::Mutex<Option<(ArcPublisherListener, StatusMask)>>,
81 #[cfg(feature = "std")]
86 pub(crate) participant:
87 std::sync::Mutex<Option<alloc::sync::Weak<crate::participant::ParticipantInner>>>,
88 suspended: core::sync::atomic::AtomicBool,
94 #[cfg(feature = "std")]
98 pub(crate) datawriters:
99 std::sync::Mutex<alloc::vec::Vec<crate::instance_handle::InstanceHandle>>,
100}
101
102impl core::fmt::Debug for PublisherInner {
105 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
106 let listener_present = self.listener.lock().map(|s| s.is_some()).unwrap_or(false);
107 f.debug_struct("PublisherInner")
108 .field("entity_state", &self.entity_state)
109 .field("listener_present", &listener_present)
110 .finish_non_exhaustive()
111 }
112}
113
114impl Publisher {
115 #[cfg(feature = "std")]
116 pub(crate) fn new(qos: PublisherQos, runtime: Option<Arc<DcpsRuntime>>) -> Self {
117 Self {
118 inner: Arc::new(PublisherInner {
119 qos: std::sync::Mutex::new(qos),
120 entity_state: crate::entity::EntityState::new(),
121 runtime,
122 listener: std::sync::Mutex::new(None),
123 participant: std::sync::Mutex::new(None),
124 suspended: core::sync::atomic::AtomicBool::new(false),
125 datawriters: std::sync::Mutex::new(alloc::vec::Vec::new()),
126 }),
127 }
128 }
129
130 #[cfg(not(feature = "std"))]
131 pub(crate) fn new(qos: PublisherQos) -> Self {
132 Self {
133 inner: Arc::new(PublisherInner {
134 qos,
135 entity_state: crate::entity::EntityState::new(),
136 suspended: core::sync::atomic::AtomicBool::new(false),
137 }),
138 }
139 }
140
141 #[cfg(feature = "std")]
144 #[must_use]
145 pub fn contains_writer(&self, handle: crate::instance_handle::InstanceHandle) -> bool {
146 self.inner
147 .datawriters
148 .lock()
149 .map(|v| v.contains(&handle))
150 .unwrap_or(false)
151 }
152
153 #[cfg(feature = "std")]
156 pub fn set_listener(&self, listener: Option<ArcPublisherListener>, mask: StatusMask) {
157 if let Ok(mut slot) = self.inner.listener.lock() {
158 *slot = listener.map(|l| (l, mask));
159 }
160 self.inner.entity_state.set_listener_mask(mask);
161 }
162
163 #[cfg(feature = "std")]
165 #[must_use]
166 pub fn get_listener(&self) -> Option<ArcPublisherListener> {
167 self.inner
168 .listener
169 .lock()
170 .ok()
171 .and_then(|s| s.as_ref().map(|(l, _)| Arc::clone(l)))
172 }
173
174 #[cfg(feature = "std")]
177 pub(crate) fn attach_participant(
178 &self,
179 participant: alloc::sync::Weak<crate::participant::ParticipantInner>,
180 ) {
181 if let Ok(mut slot) = self.inner.participant.lock() {
182 *slot = Some(participant);
183 }
184 }
185
186 #[cfg(feature = "std")]
191 #[must_use]
192 pub(crate) fn snapshot_writer_chain(
193 &self,
194 writer_listener: Option<(ArcDataWriterListener, StatusMask)>,
195 ) -> crate::listener_dispatch::WriterListenerChain {
196 let publisher = self
197 .inner
198 .listener
199 .lock()
200 .ok()
201 .and_then(|s| s.as_ref().map(|(l, m)| (Arc::clone(l), *m)));
202 let participant = {
203 let weak = self.inner.participant.lock().ok().and_then(|s| s.clone());
204 weak.and_then(|w| w.upgrade()).and_then(|inner| {
205 inner
206 .listener
207 .lock()
208 .ok()
209 .and_then(|s| s.as_ref().map(|(l, m)| (Arc::clone(l), *m)))
210 })
211 };
212 crate::listener_dispatch::WriterListenerChain {
213 writer: writer_listener,
214 publisher,
215 participant,
216 }
217 }
218
219 pub fn suspend_publications(&self) {
228 self.inner
229 .suspended
230 .store(true, core::sync::atomic::Ordering::Release);
231 }
232
233 pub fn resume_publications(&self) {
238 self.inner
239 .suspended
240 .store(false, core::sync::atomic::Ordering::Release);
241 }
242
243 #[must_use]
247 pub fn is_suspended(&self) -> bool {
248 self.inner
249 .suspended
250 .load(core::sync::atomic::Ordering::Acquire)
251 }
252
253 pub fn copy_from_topic_qos(
265 dw_qos: &mut DataWriterQos,
266 topic_qos: &crate::qos::TopicQos,
267 ) -> Result<()> {
268 dw_qos.durability = topic_qos.durability;
276 dw_qos.reliability = topic_qos.reliability;
277 Ok(())
278 }
279
280 pub fn create_datawriter<T: DdsType + Send + 'static>(
287 &self,
288 topic: &Topic<T>,
289 qos: DataWriterQos,
290 ) -> Result<DataWriter<T>> {
291 if topic.type_name() != T::TYPE_NAME {
292 return Err(DdsError::BadParameter {
293 what: "topic.type_name mismatch",
294 });
295 }
296 #[cfg(feature = "std")]
297 if let Some(rt) = self.inner.runtime.as_ref() {
298 let reliable = qos.reliability.kind == ReliabilityKind::Reliable;
302 let eid = rt.register_user_writer(crate::runtime::UserWriterConfig {
303 topic_name: topic.name().into(),
304 type_name: T::TYPE_NAME.into(),
305 reliable,
306 durability: qos.durability.kind,
307 deadline: qos.deadline,
308 lifespan: qos.lifespan,
309 liveliness: qos.liveliness,
310 ownership: qos.ownership.kind,
311 ownership_strength: qos.ownership_strength.value,
312 partition: qos.partition.names.clone(),
313 user_data: qos.user_data.value.clone(),
314 topic_data: qos.topic_data.value.clone(),
315 group_data: qos.group_data.value.clone(),
316 type_identifier: T::TYPE_IDENTIFIER.clone(),
318 data_representation_offer: None,
323 })?;
324 let dw =
325 DataWriter::new_live(topic.clone(), qos, self.inner.clone(), Arc::clone(rt), eid);
326 self.track_writer(dw.entity_state.instance_handle());
327 return Ok(dw);
328 }
329 let dw = DataWriter::new_offline(topic.clone(), qos, self.inner.clone());
330 #[cfg(feature = "std")]
331 self.track_writer(dw.entity_state.instance_handle());
332 Ok(dw)
333 }
334
335 #[cfg(feature = "std")]
336 fn track_writer(&self, handle: crate::instance_handle::InstanceHandle) {
337 if let Ok(mut list) = self.inner.datawriters.lock() {
338 list.push(handle);
339 }
340 if let Ok(slot) = self.inner.participant.lock() {
342 if let Some(weak) = slot.as_ref() {
343 if let Some(p_inner) = weak.upgrade() {
344 if let Ok(mut dws) = p_inner.datawriters.lock() {
345 dws.push(handle);
346 }
347 }
348 }
349 }
350 }
351}
352
353#[cfg(feature = "std")]
358impl crate::entity::Entity for Publisher {
359 type Qos = PublisherQos;
360
361 fn get_qos(&self) -> Self::Qos {
362 self.inner.qos.lock().map(|q| q.clone()).unwrap_or_default()
363 }
364
365 fn set_qos(&self, qos: Self::Qos) -> Result<()> {
366 if let Ok(mut current) = self.inner.qos.lock() {
370 *current = qos;
371 }
372 Ok(())
373 }
374
375 fn enable(&self) -> Result<()> {
376 self.inner.entity_state.enable();
377 Ok(())
378 }
379
380 fn entity_state(&self) -> alloc::sync::Arc<crate::entity::EntityState> {
381 alloc::sync::Arc::clone(&self.inner.entity_state)
382 }
383}
384
385pub struct DataWriter<T: DdsType> {
393 topic: Topic<T>,
394 qos: Mutex<DataWriterQos>,
395 entity_state: Arc<crate::entity::EntityState>,
397 publisher: Arc<PublisherInner>,
400 #[cfg(feature = "std")]
402 listener: Mutex<Option<(ArcDataWriterListener, StatusMask)>>,
403 #[cfg(feature = "std")]
408 last_match_count: std::sync::atomic::AtomicI64,
409 #[cfg(feature = "std")]
411 last_offered_deadline_missed: std::sync::atomic::AtomicU64,
412 #[cfg(feature = "std")]
414 last_liveliness_lost: std::sync::atomic::AtomicU64,
415 #[cfg(feature = "std")]
417 last_offered_incompatible_qos: std::sync::atomic::AtomicI64,
418 queue: Arc<Mutex<Vec<Vec<u8>>>>,
420 #[cfg(feature = "std")]
425 drain_signal: Arc<std::sync::Condvar>,
426 #[cfg(feature = "std")]
427 runtime: Option<Arc<DcpsRuntime>>,
428 #[cfg(feature = "std")]
429 entity_id: Option<EntityId>,
430 #[cfg(feature = "std")]
432 instances: InstanceTracker,
433 #[cfg(feature = "std")]
436 publication_handle: InstanceHandle,
437 #[cfg(feature = "std")]
442 durability_backend: Option<Arc<dyn crate::durability_service::DurabilityBackend>>,
443 #[cfg(feature = "std")]
446 durability_seq: std::sync::atomic::AtomicU64,
447 #[cfg(all(feature = "std", feature = "flatdata-integration"))]
453 pub(crate) flat_backend: Mutex<
454 Option<(
455 Arc<dyn zerodds_flatdata::SlotBackend>,
456 u32, )>,
458 >,
459 _t: PhantomData<fn() -> T>,
460}
461
462impl<T: DdsType> core::fmt::Debug for DataWriter<T> {
463 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
464 f.debug_struct("DataWriter")
465 .field("topic", &self.topic.name())
466 .field("type", &T::TYPE_NAME)
467 .field("qos", &self.qos)
468 .finish_non_exhaustive()
469 }
470}
471
472impl<T: DdsType> DataWriter<T> {
473 #[cfg(feature = "std")]
474 fn new_offline(topic: Topic<T>, qos: DataWriterQos, publisher: Arc<PublisherInner>) -> Self {
475 let tracker = InstanceTracker::new();
476 let pub_handle = InstanceHandle::from_raw(0xFFFF_0000_0000_0001);
477 let backend = Self::build_durability_backend(&qos);
478 Self {
479 topic,
480 qos: Mutex::new(qos),
481 entity_state: crate::entity::EntityState::new(),
482 publisher,
483 listener: Mutex::new(None),
484 last_match_count: std::sync::atomic::AtomicI64::new(-1),
485 last_offered_deadline_missed: std::sync::atomic::AtomicU64::new(0),
486 last_liveliness_lost: std::sync::atomic::AtomicU64::new(0),
487 last_offered_incompatible_qos: std::sync::atomic::AtomicI64::new(-1),
488 queue: Arc::new(Mutex::new(Vec::new())),
489 #[cfg(feature = "std")]
490 drain_signal: Arc::new(std::sync::Condvar::new()),
491 runtime: None,
492 entity_id: None,
493 instances: tracker,
494 publication_handle: pub_handle,
495 durability_backend: backend,
496 durability_seq: std::sync::atomic::AtomicU64::new(1),
497 #[cfg(feature = "flatdata-integration")]
498 flat_backend: Mutex::new(None),
499 _t: PhantomData,
500 }
501 }
502
503 #[doc(hidden)]
506 #[cfg(feature = "std")]
507 #[must_use]
508 pub fn durability_backend(
509 &self,
510 ) -> Option<Arc<dyn crate::durability_service::DurabilityBackend>> {
511 self.durability_backend.clone()
512 }
513
514 #[cfg(feature = "std")]
519 fn build_durability_backend(
520 qos: &DataWriterQos,
521 ) -> Option<Arc<dyn crate::durability_service::DurabilityBackend>> {
522 match qos.durability.kind {
523 zerodds_qos::DurabilityKind::Transient => Some(Arc::new(
524 crate::durability_service::InMemoryDurabilityBackend::new(qos.durability_service),
525 )),
526 _ => None,
527 }
528 }
529
530 #[cfg(feature = "std")]
531 fn new_live(
532 topic: Topic<T>,
533 qos: DataWriterQos,
534 publisher: Arc<PublisherInner>,
535 runtime: Arc<DcpsRuntime>,
536 entity_id: EntityId,
537 ) -> Self {
538 let tracker = InstanceTracker::new();
539 let key = entity_id.entity_key;
544 let pub_handle = InstanceHandle::from_raw(
545 0xFFFF_0000_0000_0000
546 | (u64::from(key[0]) << 16)
547 | (u64::from(key[1]) << 8)
548 | u64::from(key[2]),
549 );
550 let backend = Self::build_durability_backend(&qos);
551 Self {
552 topic,
553 qos: Mutex::new(qos),
554 entity_state: crate::entity::EntityState::new(),
555 publisher,
556 listener: Mutex::new(None),
557 last_match_count: std::sync::atomic::AtomicI64::new(-1),
558 last_offered_deadline_missed: std::sync::atomic::AtomicU64::new(0),
559 last_liveliness_lost: std::sync::atomic::AtomicU64::new(0),
560 last_offered_incompatible_qos: std::sync::atomic::AtomicI64::new(-1),
561 queue: Arc::new(Mutex::new(Vec::new())),
562 #[cfg(feature = "std")]
563 drain_signal: Arc::new(std::sync::Condvar::new()),
564 runtime: Some(runtime),
565 entity_id: Some(entity_id),
566 instances: tracker,
567 publication_handle: pub_handle,
568 durability_backend: backend,
569 durability_seq: std::sync::atomic::AtomicU64::new(1),
570 #[cfg(feature = "flatdata-integration")]
571 flat_backend: Mutex::new(None),
572 _t: PhantomData,
573 }
574 }
575
576 #[cfg(not(feature = "std"))]
577 fn new(topic: Topic<T>, qos: DataWriterQos, publisher: Arc<PublisherInner>) -> Self {
578 Self {
579 topic,
580 qos,
581 publisher,
582 queue: Arc::new(Mutex::new(Vec::new())),
583 #[cfg(feature = "std")]
584 drain_signal: Arc::new(std::sync::Condvar::new()),
585 _t: PhantomData,
586 }
587 }
588
589 #[must_use]
591 pub fn topic(&self) -> &Topic<T> {
592 &self.topic
593 }
594
595 #[cfg(feature = "std")]
598 pub fn set_listener(&self, listener: Option<ArcDataWriterListener>, mask: StatusMask) {
599 if let Ok(mut slot) = self.listener.lock() {
600 *slot = listener.map(|l| (l, mask));
601 }
602 self.entity_state.set_listener_mask(mask);
603 }
604
605 #[cfg(feature = "std")]
607 #[must_use]
608 pub fn get_listener(&self) -> Option<ArcDataWriterListener> {
609 self.listener
610 .lock()
611 .ok()
612 .and_then(|s| s.as_ref().map(|(l, _)| Arc::clone(l)))
613 }
614
615 #[cfg(feature = "std")]
618 #[must_use]
619 pub(crate) fn listener_chain(&self) -> crate::listener_dispatch::WriterListenerChain {
620 let writer = self
621 .listener
622 .lock()
623 .ok()
624 .and_then(|s| s.as_ref().map(|(l, m)| (Arc::clone(l), *m)));
625 let pub_handle = Publisher {
628 inner: Arc::clone(&self.publisher),
629 };
630 pub_handle.snapshot_writer_chain(writer)
631 }
632
633 #[must_use]
635 pub fn qos(&self) -> DataWriterQos {
636 self.qos.lock().map(|q| q.clone()).unwrap_or_default()
637 }
638
639 pub fn write(&self, sample: &T) -> Result<()> {
655 let mut buf = Vec::new();
656 sample.encode(&mut buf).map_err(|e| DdsError::WireError {
657 message: e.to_string(),
658 })?;
659 #[cfg(feature = "metrics")]
660 crate::metrics::inc_sample_written(self.topic.name());
661 #[cfg(feature = "metrics")]
662 crate::metrics::record_sample_size(self.topic.name(), buf.len());
663 #[cfg(feature = "std")]
667 if let Some(backend) = self.durability_backend.as_ref() {
668 let key_bytes = Self::keyhash_and_holder(sample)
669 .map(|(kh, _)| kh)
670 .unwrap_or([0u8; 16]);
671 let seq = self
674 .durability_seq
675 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
676 let _ = backend.store(crate::durability_service::DurabilitySample {
677 topic: self.topic.name().to_string(),
678 instance_key: key_bytes,
679 sequence: seq,
680 payload: buf.clone(),
681 created_at: std::time::SystemTime::now(),
682 });
683 }
684 #[cfg(feature = "std")]
686 if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
687 return rt.write_user_sample(eid, buf);
688 }
689 #[cfg(feature = "std")]
691 {
692 let qos = self.qos.lock().map(|q| q.clone()).unwrap_or_default();
693 let max_samples = qos.resource_limits.max_samples;
694 let reliable = qos.reliability.kind == ReliabilityKind::Reliable;
695 let max_block = qos.reliability.max_blocking_time;
696 let max_block_dur = core::time::Duration::from_nanos(
697 u64::try_from(max_block.seconds).unwrap_or(0) * 1_000_000_000
698 + u64::from(max_block.fraction),
699 );
700
701 let mut q = self
702 .queue
703 .lock()
704 .map_err(|_| DdsError::PreconditionNotMet {
705 reason: "datawriter queue poisoned",
706 })?;
707 if max_samples > 0 && q.len() >= max_samples as usize {
708 if !reliable || max_block_dur.is_zero() {
709 return Err(DdsError::OutOfResources {
710 what: "datawriter queue full (best-effort or no max_blocking_time)",
711 });
712 }
713 let deadline = std::time::Instant::now() + max_block_dur;
715 loop {
716 let now = std::time::Instant::now();
717 if now >= deadline {
718 return Err(DdsError::Timeout);
719 }
720 let remaining = deadline - now;
721 let (g, _) = self.drain_signal.wait_timeout(q, remaining).map_err(|_| {
722 DdsError::PreconditionNotMet {
723 reason: "datawriter queue poisoned",
724 }
725 })?;
726 q = g;
727 if q.len() < max_samples as usize {
728 break;
729 }
730 }
732 }
733 q.push(buf);
734 Ok(())
735 }
736 #[cfg(not(feature = "std"))]
737 {
738 let mut q = self
739 .queue
740 .lock()
741 .map_err(|_| DdsError::PreconditionNotMet {
742 reason: "datawriter queue poisoned",
743 })?;
744 q.push(buf);
745 Ok(())
746 }
747 }
748
749 #[must_use]
752 pub fn samples_pending(&self) -> usize {
753 self.queue.lock().map(|q| q.len()).unwrap_or(0)
754 }
755
756 #[must_use]
766 pub fn matched_subscription_count(&self) -> usize {
767 #[cfg(feature = "std")]
768 if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
769 let n = rt.user_writer_matched_count(eid);
770 self.poll_publication_matched(n);
771 return n;
772 }
773 0
774 }
775
776 #[cfg(feature = "std")]
781 pub(crate) fn poll_publication_matched(&self, current: usize) {
782 let curr = current as i64;
783 let prev = self
784 .last_match_count
785 .swap(curr, std::sync::atomic::Ordering::AcqRel);
786 if prev == curr {
787 return;
788 }
789 let total = if curr > prev.max(0) {
790 curr
791 } else {
792 prev.max(0)
793 };
794 let delta = curr - prev.max(0);
795 let status = crate::status::PublicationMatchedStatus {
796 total_count: total as i32,
797 total_count_change: delta.max(0) as i32,
798 current_count: curr as i32,
799 current_count_change: delta as i32,
800 last_subscription_handle: crate::instance_handle::HANDLE_NIL,
801 };
802 let chain = self.listener_chain();
803 crate::listener_dispatch::dispatch_publication_matched(
804 &chain,
805 self.entity_state.instance_handle(),
806 status,
807 );
808 }
809
810 #[cfg(feature = "std")]
814 pub(crate) fn poll_offered_deadline_missed(&self, current: u64) {
815 let prev = self
816 .last_offered_deadline_missed
817 .swap(current, std::sync::atomic::Ordering::AcqRel);
818 if current == prev {
819 return;
820 }
821 let total_change = current.saturating_sub(prev);
822 let status = crate::status::OfferedDeadlineMissedStatus {
823 total_count: current as i32,
824 total_count_change: total_change as i32,
825 last_instance_handle: crate::instance_handle::HANDLE_NIL,
826 };
827 let chain = self.listener_chain();
828 crate::listener_dispatch::dispatch_offered_deadline_missed(
829 &chain,
830 self.entity_state.instance_handle(),
831 status,
832 );
833 }
834
835 #[cfg(feature = "std")]
837 pub(crate) fn poll_liveliness_lost(&self, current: u64) {
838 let prev = self
839 .last_liveliness_lost
840 .swap(current, std::sync::atomic::Ordering::AcqRel);
841 if current == prev {
842 return;
843 }
844 let total_change = current.saturating_sub(prev);
845 let status = crate::status::LivelinessLostStatus {
846 total_count: current as i32,
847 total_count_change: total_change as i32,
848 };
849 let chain = self.listener_chain();
850 crate::listener_dispatch::dispatch_liveliness_lost(
851 &chain,
852 self.entity_state.instance_handle(),
853 status,
854 );
855 }
856
857 #[cfg(feature = "std")]
860 pub(crate) fn poll_offered_incompatible_qos(
861 &self,
862 snapshot: crate::status::OfferedIncompatibleQosStatus,
863 ) {
864 let curr = i64::from(snapshot.total_count);
865 let prev = self
866 .last_offered_incompatible_qos
867 .swap(curr, std::sync::atomic::Ordering::AcqRel);
868 if curr == prev {
869 return;
870 }
871 let delta = curr - prev.max(0);
872 let status = crate::status::OfferedIncompatibleQosStatus {
873 total_count: curr as i32,
874 total_count_change: delta.max(0) as i32,
875 last_policy_id: snapshot.last_policy_id,
876 policies: snapshot.policies,
877 };
878 let chain = self.listener_chain();
879 crate::listener_dispatch::dispatch_offered_incompatible_qos(
880 &chain,
881 self.entity_state.instance_handle(),
882 status,
883 );
884 }
885
886 #[cfg(feature = "std")]
900 pub fn wait_for_matched_subscription(
901 &self,
902 min_count: usize,
903 timeout: core::time::Duration,
904 ) -> Result<()> {
905 let deadline = std::time::Instant::now() + timeout;
906 loop {
907 if self.matched_subscription_count() >= min_count {
908 return Ok(());
909 }
910 let now = std::time::Instant::now();
911 if now >= deadline {
912 return Err(DdsError::Timeout);
913 }
914 if let Some(rt) = self.runtime.as_ref() {
915 let _ = rt.wait_match_event(deadline - now);
916 } else {
917 std::thread::sleep(core::time::Duration::from_millis(20));
918 }
919 }
920 }
921
922 #[must_use]
930 pub fn offered_deadline_missed_count(&self) -> u64 {
931 #[cfg(feature = "std")]
932 if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
933 let n = rt.user_writer_offered_deadline_missed(eid);
934 self.poll_offered_deadline_missed(n);
935 return n;
936 }
937 0
938 }
939
940 #[must_use]
943 pub fn liveliness_lost_count(&self) -> u64 {
944 #[cfg(feature = "std")]
945 if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
946 let n = rt.user_writer_liveliness_lost(eid);
947 self.poll_liveliness_lost(n);
948 return n;
949 }
950 0
951 }
952
953 #[must_use]
956 pub fn offered_incompatible_qos_status(&self) -> crate::status::OfferedIncompatibleQosStatus {
957 #[cfg(feature = "std")]
958 if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
959 let s = rt.user_writer_offered_incompatible_qos(eid);
960 self.poll_offered_incompatible_qos(s.clone());
961 return s;
962 }
963 crate::status::OfferedIncompatibleQosStatus::default()
964 }
965
966 #[cfg(feature = "std")]
969 pub fn drive_listeners(&self) {
970 let _ = self.matched_subscription_count();
971 let _ = self.offered_deadline_missed_count();
972 let _ = self.liveliness_lost_count();
973 let _ = self.offered_incompatible_qos_status();
974 }
975
976 #[cfg(feature = "std")]
980 pub fn assert_liveliness(&self) {
981 if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
982 rt.assert_writer_liveliness_eid(eid);
983 }
984 }
985
986 #[cfg(feature = "std")]
996 pub fn wait_for_acknowledgments(&self, timeout: core::time::Duration) -> Result<()> {
997 let deadline = std::time::Instant::now() + timeout;
998 loop {
999 let all_acked = match (&self.runtime, self.entity_id) {
1000 (Some(rt), Some(eid)) => rt.user_writer_all_acknowledged(eid),
1001 _ => true, };
1003 if all_acked {
1004 return Ok(());
1005 }
1006 let now = std::time::Instant::now();
1007 if now >= deadline {
1008 return Err(DdsError::Timeout);
1009 }
1010 if let Some(rt) = self.runtime.as_ref() {
1012 let _ = rt.wait_ack_event(deadline - now);
1013 } else {
1014 std::thread::sleep(core::time::Duration::from_millis(20));
1015 }
1016 }
1017 }
1018
1019 #[doc(hidden)]
1022 pub fn __drain_pending(&self) -> Vec<Vec<u8>> {
1023 let drained = self
1024 .queue
1025 .lock()
1026 .map(|mut q| core::mem::take(&mut *q))
1027 .unwrap_or_default();
1028 #[cfg(feature = "std")]
1030 self.drain_signal.notify_all();
1031 drained
1032 }
1033
1034 #[cfg(feature = "std")]
1043 #[must_use]
1044 pub fn publication_handle(&self) -> InstanceHandle {
1045 self.publication_handle
1046 }
1047
1048 #[must_use]
1052 pub fn instance_handle(&self) -> InstanceHandle {
1053 self.entity_state.instance_handle()
1054 }
1055
1056 #[cfg(feature = "std")]
1059 #[must_use]
1060 #[doc(hidden)]
1064 #[cfg(feature = "std")]
1065 pub fn runtime_handle(&self) -> Option<(Arc<DcpsRuntime>, EntityId)> {
1066 match (&self.runtime, self.entity_id) {
1067 (Some(rt), Some(eid)) => Some((Arc::clone(rt), eid)),
1068 _ => None,
1069 }
1070 }
1071
1072 pub fn instance_tracker(&self) -> InstanceTracker {
1075 self.instances.clone()
1076 }
1077
1078 #[cfg(feature = "std")]
1081 fn keyhash_and_holder(sample: &T) -> Option<(crate::instance_tracker::KeyHash, Vec<u8>)> {
1082 if !T::HAS_KEY {
1083 return None;
1084 }
1085 let mut holder = crate::dds_type::PlainCdr2BeKeyHolder::new();
1086 sample.encode_key_holder_be(&mut holder);
1087 let bytes = holder.as_bytes().to_vec();
1088 let max = T::KEY_HOLDER_MAX_SIZE.unwrap_or(usize::MAX);
1089 let kh = crate::dds_type::compute_key_hash(&bytes, max);
1090 Some((kh, bytes))
1091 }
1092
1093 #[cfg(feature = "std")]
1106 pub fn register_instance(&self, instance: &T) -> Result<InstanceHandle> {
1107 self.register_instance_w_timestamp(instance, get_current_time())
1108 }
1109
1110 #[cfg(feature = "std")]
1113 pub fn register_instance_w_timestamp(
1114 &self,
1115 instance: &T,
1116 timestamp: Time,
1117 ) -> Result<InstanceHandle> {
1118 let Some((kh, holder)) = Self::keyhash_and_holder(instance) else {
1119 return Ok(HANDLE_NIL);
1120 };
1121 Ok(self.instances.register(kh, holder, Some(timestamp)))
1122 }
1123
1124 #[cfg(feature = "std")]
1128 #[must_use]
1129 pub fn lookup_instance(&self, instance: &T) -> InstanceHandle {
1130 let Some((kh, _)) = Self::keyhash_and_holder(instance) else {
1131 return HANDLE_NIL;
1132 };
1133 self.instances.lookup(&kh).unwrap_or(HANDLE_NIL)
1134 }
1135
1136 #[cfg(feature = "std")]
1146 pub fn unregister_instance(&self, instance: &T, handle: InstanceHandle) -> Result<()> {
1147 self.unregister_instance_w_timestamp(instance, handle, get_current_time())
1148 }
1149
1150 #[cfg(feature = "std")]
1157 pub fn unregister_instance_w_timestamp(
1158 &self,
1159 instance: &T,
1160 handle: InstanceHandle,
1161 timestamp: Time,
1162 ) -> Result<()> {
1163 let resolved = self.resolve_handle(instance, handle)?;
1164 let autodispose = self
1165 .qos
1166 .lock()
1167 .map(|q| q.writer_data_lifecycle.autodispose_unregistered_instances)
1168 .unwrap_or(true);
1169 if autodispose && !self.instances.dispose(resolved, Some(timestamp)) {
1170 return Err(DdsError::BadParameter {
1171 what: "unknown instance handle",
1172 });
1173 }
1174 if !self.instances.unregister(resolved, Some(timestamp)) {
1175 return Err(DdsError::BadParameter {
1176 what: "unknown instance handle",
1177 });
1178 }
1179 #[cfg(feature = "std")]
1183 if let (Some(rt), Some(eid), Some((kh, _))) = (
1184 &self.runtime,
1185 self.entity_id,
1186 Self::keyhash_and_holder(instance),
1187 ) {
1188 let mut bits = zerodds_rtps::inline_qos::status_info::UNREGISTERED;
1189 if autodispose {
1190 bits |= zerodds_rtps::inline_qos::status_info::DISPOSED;
1191 }
1192 let _ = rt.write_user_lifecycle(eid, kh, bits);
1193 }
1194 Ok(())
1195 }
1196
1197 #[cfg(feature = "std")]
1204 pub fn dispose(&self, instance: &T, handle: InstanceHandle) -> Result<()> {
1205 self.dispose_w_timestamp(instance, handle, get_current_time())
1206 }
1207
1208 #[cfg(feature = "std")]
1210 pub fn dispose_w_timestamp(
1211 &self,
1212 instance: &T,
1213 handle: InstanceHandle,
1214 timestamp: Time,
1215 ) -> Result<()> {
1216 let resolved = self.resolve_handle(instance, handle)?;
1217 if !self.instances.dispose(resolved, Some(timestamp)) {
1218 return Err(DdsError::BadParameter {
1219 what: "unknown instance handle",
1220 });
1221 }
1222 #[cfg(feature = "std")]
1224 if let (Some(rt), Some(eid), Some((kh, _))) = (
1225 &self.runtime,
1226 self.entity_id,
1227 Self::keyhash_and_holder(instance),
1228 ) {
1229 let _ =
1230 rt.write_user_lifecycle(eid, kh, zerodds_rtps::inline_qos::status_info::DISPOSED);
1231 }
1232 Ok(())
1233 }
1234
1235 #[cfg(feature = "std")]
1247 pub fn get_key_value(&self, handle: InstanceHandle) -> Result<T> {
1248 let Some(bytes) = self.instances.get_key_holder(handle) else {
1249 return Err(DdsError::BadParameter {
1250 what: "unknown instance handle",
1251 });
1252 };
1253 T::decode(&bytes).map_err(|e| DdsError::WireError {
1254 message: alloc::string::ToString::to_string(&e),
1255 })
1256 }
1257
1258 #[cfg(feature = "std")]
1261 fn resolve_handle(&self, instance: &T, handle: InstanceHandle) -> Result<InstanceHandle> {
1262 let derived = self.lookup_instance(instance);
1263 if handle.is_nil() {
1264 if derived.is_nil() {
1265 return Err(DdsError::BadParameter {
1266 what: "instance not registered",
1267 });
1268 }
1269 return Ok(derived);
1270 }
1271 if !derived.is_nil() && derived != handle {
1272 return Err(DdsError::BadParameter {
1273 what: "handle does not match instance key",
1274 });
1275 }
1276 Ok(handle)
1277 }
1278
1279 #[cfg(feature = "std")]
1285 pub fn write_w_timestamp(&self, sample: &T, timestamp: Time) -> Result<()> {
1286 if let Some((kh, holder)) = Self::keyhash_and_holder(sample) {
1289 if self.instances.lookup(&kh).is_none() {
1290 self.instances.register(kh, holder, Some(timestamp));
1291 } else {
1292 let prev = self.instances.get_by_keyhash(&kh);
1297 if let Some(state) = prev {
1298 if !matches!(state.kind, crate::sample_info::InstanceStateKind::Alive) {
1299 self.instances.register(kh, holder, Some(timestamp));
1300 self.instances.unregister(state.handle, Some(timestamp));
1301 }
1302 }
1303 }
1304 }
1305 self.write(sample)
1306 }
1307}
1308
1309#[cfg(feature = "std")]
1310impl<T: DdsType> crate::entity::Entity for DataWriter<T> {
1311 type Qos = DataWriterQos;
1312
1313 fn get_qos(&self) -> Self::Qos {
1314 self.qos.lock().map(|q| q.clone()).unwrap_or_default()
1315 }
1316
1317 fn set_qos(&self, qos: Self::Qos) -> Result<()> {
1320 let enabled = self.entity_state.is_enabled();
1321 if let Ok(mut current) = self.qos.lock() {
1322 if enabled {
1323 if current.durability != qos.durability {
1324 return Err(crate::entity::immutable_if_enabled("DURABILITY"));
1325 }
1326 if current.reliability != qos.reliability {
1327 return Err(crate::entity::immutable_if_enabled("RELIABILITY"));
1328 }
1329 if current.history != qos.history {
1330 return Err(crate::entity::immutable_if_enabled("HISTORY"));
1331 }
1332 if current.resource_limits != qos.resource_limits {
1333 return Err(crate::entity::immutable_if_enabled("RESOURCE_LIMITS"));
1334 }
1335 if current.ownership != qos.ownership {
1336 return Err(crate::entity::immutable_if_enabled("OWNERSHIP"));
1337 }
1338 if current.liveliness != qos.liveliness {
1339 return Err(crate::entity::immutable_if_enabled("LIVELINESS"));
1340 }
1341 }
1342 *current = qos;
1343 }
1344 Ok(())
1345 }
1346
1347 fn enable(&self) -> Result<()> {
1348 self.entity_state.enable();
1349 Ok(())
1350 }
1351
1352 fn entity_state(&self) -> Arc<crate::entity::EntityState> {
1353 Arc::clone(&self.entity_state)
1354 }
1355}
1356
1357#[allow(dead_code)]
1360pub(crate) trait AnyDataWriter: Send + Sync + core::fmt::Debug {
1361 fn topic_name(&self) -> &str;
1362 fn type_name(&self) -> &'static str;
1363}
1364
1365impl<T: DdsType + Send + 'static> AnyDataWriter for DataWriter<T>
1366where
1367 T: Send + Sync,
1368{
1369 fn topic_name(&self) -> &str {
1370 self.topic.name()
1371 }
1372 fn type_name(&self) -> &'static str {
1373 T::TYPE_NAME
1374 }
1375}
1376
1377#[allow(dead_code)]
1379pub(crate) fn boxed_any_writer<T: DdsType + Send + Sync + 'static>(
1380 w: DataWriter<T>,
1381) -> Box<dyn AnyDataWriter> {
1382 Box::new(w)
1383}
1384
1385#[cfg(test)]
1386#[allow(clippy::expect_used, clippy::unwrap_used)]
1387mod tests {
1388 use super::*;
1389 use crate::dds_type::RawBytes;
1390 use crate::factory::DomainParticipantFactory;
1391 use crate::qos::{DomainParticipantQos, TopicQos};
1392
1393 fn mk_topic() -> Topic<RawBytes> {
1394 let p = DomainParticipantFactory::instance()
1395 .create_participant_offline(0, DomainParticipantQos::default());
1396 Topic::new("Chatter".into(), TopicQos::default(), p)
1397 }
1398
1399 #[test]
1400 fn publisher_creates_datawriter_for_matching_type() {
1401 let p = Publisher::new(PublisherQos::default(), None);
1402 let w = p
1403 .create_datawriter::<RawBytes>(&mk_topic(), DataWriterQos::default())
1404 .unwrap();
1405 assert_eq!(w.topic().name(), "Chatter");
1406 }
1407
1408 #[test]
1409 fn datawriter_write_queues_encoded_sample() {
1410 let p = Publisher::new(PublisherQos::default(), None);
1411 let w = p
1412 .create_datawriter::<RawBytes>(&mk_topic(), DataWriterQos::default())
1413 .unwrap();
1414 assert_eq!(w.samples_pending(), 0);
1415 w.write(&RawBytes::new(vec![1, 2, 3])).unwrap();
1416 assert_eq!(w.samples_pending(), 1);
1417 let drained = w.__drain_pending();
1418 assert_eq!(drained, vec![vec![1u8, 2, 3]]);
1419 }
1420
1421 use core::sync::atomic::{AtomicU32, Ordering};
1424
1425 #[test]
1426 fn datawriter_set_listener_stores_arc_and_mask() {
1427 struct L;
1428 impl crate::listener::DataWriterListener for L {}
1429 let p = Publisher::new(PublisherQos::default(), None);
1430 let w = p
1431 .create_datawriter::<RawBytes>(&mk_topic(), DataWriterQos::default())
1432 .unwrap();
1433 assert!(w.get_listener().is_none());
1434 w.set_listener(Some(Arc::new(L)), crate::psm_constants::status::ANY);
1435 assert!(w.get_listener().is_some());
1436 assert_eq!(
1438 w.entity_state.listener_mask(),
1439 crate::psm_constants::status::ANY
1440 );
1441 }
1442
1443 #[test]
1444 fn poll_publication_matched_fires_on_count_increase() {
1445 struct Cnt(AtomicU32);
1446 impl crate::listener::DataWriterListener for Cnt {
1447 fn on_publication_matched(
1448 &self,
1449 _w: crate::InstanceHandle,
1450 _s: crate::status::PublicationMatchedStatus,
1451 ) {
1452 self.0.fetch_add(1, Ordering::Relaxed);
1453 }
1454 }
1455 let p = Publisher::new(PublisherQos::default(), None);
1456 let w = p
1457 .create_datawriter::<RawBytes>(&mk_topic(), DataWriterQos::default())
1458 .unwrap();
1459 let cnt = Arc::new(Cnt(AtomicU32::new(0)));
1460 w.set_listener(Some(cnt.clone()), crate::psm_constants::status::ANY);
1461
1462 w.poll_publication_matched(0);
1464 assert_eq!(cnt.0.load(Ordering::Relaxed), 1);
1465 w.poll_publication_matched(1);
1467 assert_eq!(cnt.0.load(Ordering::Relaxed), 2);
1468 w.poll_publication_matched(1);
1470 assert_eq!(cnt.0.load(Ordering::Relaxed), 2);
1471 w.poll_publication_matched(2);
1473 assert_eq!(cnt.0.load(Ordering::Relaxed), 3);
1474 w.poll_publication_matched(1);
1476 assert_eq!(cnt.0.load(Ordering::Relaxed), 4);
1477 }
1478
1479 #[test]
1480 fn poll_publication_matched_with_no_listener_is_noop() {
1481 let p = Publisher::new(PublisherQos::default(), None);
1482 let w = p
1483 .create_datawriter::<RawBytes>(&mk_topic(), DataWriterQos::default())
1484 .unwrap();
1485 w.poll_publication_matched(0);
1488 w.poll_publication_matched(5);
1489 }
1490
1491 #[test]
1492 fn poll_publication_matched_bubbles_to_publisher() {
1493 struct PubL(AtomicU32);
1494 impl crate::listener::PublisherListener for PubL {
1495 fn on_publication_matched(
1496 &self,
1497 _w: crate::InstanceHandle,
1498 _s: crate::status::PublicationMatchedStatus,
1499 ) {
1500 self.0.fetch_add(1, Ordering::Relaxed);
1501 }
1502 }
1503 let p = Publisher::new(PublisherQos::default(), None);
1504 let pl = Arc::new(PubL(AtomicU32::new(0)));
1505 p.set_listener(Some(pl.clone()), crate::psm_constants::status::ANY);
1506 let w = p
1507 .create_datawriter::<RawBytes>(&mk_topic(), DataWriterQos::default())
1508 .unwrap();
1509 w.poll_publication_matched(1);
1511 assert_eq!(pl.0.load(Ordering::Relaxed), 1);
1512 }
1513
1514 #[test]
1517 fn suspend_publications_sets_flag() {
1518 let p = Publisher::new(PublisherQos::default(), None);
1519 assert!(!p.is_suspended());
1520 p.suspend_publications();
1521 assert!(p.is_suspended());
1522 }
1523
1524 #[test]
1525 fn resume_publications_clears_flag() {
1526 let p = Publisher::new(PublisherQos::default(), None);
1527 p.suspend_publications();
1528 p.resume_publications();
1529 assert!(!p.is_suspended());
1530 }
1531
1532 #[test]
1533 fn suspend_publications_is_idempotent() {
1534 let p = Publisher::new(PublisherQos::default(), None);
1535 p.suspend_publications();
1536 p.suspend_publications(); assert!(p.is_suspended());
1538 }
1539
1540 #[test]
1541 fn resume_without_suspend_is_noop() {
1542 let p = Publisher::new(PublisherQos::default(), None);
1543 p.resume_publications();
1545 assert!(!p.is_suspended());
1546 }
1547
1548 #[test]
1551 fn copy_from_topic_qos_copies_durability_and_reliability() {
1552 use crate::qos::{DurabilityKind, ReliabilityKind, TopicQos};
1553 let mut topic = TopicQos::default();
1554 topic.durability.kind = DurabilityKind::TransientLocal;
1555 topic.reliability.kind = ReliabilityKind::Reliable;
1556
1557 let mut dw = DataWriterQos::default();
1558 dw.durability.kind = DurabilityKind::Volatile;
1560 Publisher::copy_from_topic_qos(&mut dw, &topic).unwrap();
1561 assert_eq!(dw.durability.kind, DurabilityKind::TransientLocal);
1562 assert_eq!(dw.reliability.kind, ReliabilityKind::Reliable);
1563 }
1564
1565 #[test]
1568 fn write_blocks_until_drain_when_reliable_max_samples_reached() {
1569 use crate::qos::{HistoryQosPolicy, ResourceLimitsQosPolicy};
1570 let p = Publisher::new(PublisherQos::default(), None);
1571 let qos = DataWriterQos {
1572 resource_limits: ResourceLimitsQosPolicy {
1573 max_samples: 2,
1574 max_instances: -1,
1575 max_samples_per_instance: -1,
1576 },
1577 reliability: crate::qos::ReliabilityQosPolicy {
1578 kind: ReliabilityKind::Reliable,
1579 max_blocking_time: zerodds_qos::Duration::from_millis(500_i32),
1580 },
1581 ..DataWriterQos::default()
1582 };
1583 let _ = qos.history;
1584 let _ = HistoryQosPolicy::default();
1585 let w = p.create_datawriter::<RawBytes>(&mk_topic(), qos).unwrap();
1586 let s = RawBytes::new(b"x".to_vec());
1587 w.write(&s).unwrap();
1589 w.write(&s).unwrap();
1590 assert_eq!(w.samples_pending(), 2);
1591
1592 let w_clone_q = w.queue.clone();
1594 let w_clone_signal = w.drain_signal.clone();
1595 let drain_handle = std::thread::spawn(move || {
1596 std::thread::sleep(core::time::Duration::from_millis(50));
1597 if let Ok(mut q) = w_clone_q.lock() {
1598 let _ = core::mem::take(&mut *q);
1599 }
1600 w_clone_signal.notify_all();
1601 });
1602
1603 let start = std::time::Instant::now();
1604 let res = w.write(&s);
1605 let elapsed = start.elapsed();
1606 drain_handle.join().unwrap();
1607
1608 assert!(res.is_ok(), "write should succeed after drain, got {res:?}");
1609 assert!(
1610 elapsed >= core::time::Duration::from_millis(40)
1611 && elapsed < core::time::Duration::from_millis(450),
1612 "elapsed = {elapsed:?}, expected ~50ms"
1613 );
1614 }
1615
1616 #[test]
1617 fn write_returns_timeout_when_reliable_drain_too_slow() {
1618 use crate::qos::ResourceLimitsQosPolicy;
1619 let p = Publisher::new(PublisherQos::default(), None);
1620 let qos = DataWriterQos {
1621 resource_limits: ResourceLimitsQosPolicy {
1622 max_samples: 1,
1623 max_instances: -1,
1624 max_samples_per_instance: -1,
1625 },
1626 reliability: crate::qos::ReliabilityQosPolicy {
1627 kind: ReliabilityKind::Reliable,
1628 max_blocking_time: zerodds_qos::Duration::from_millis(50_i32),
1629 },
1630 ..DataWriterQos::default()
1631 };
1632 let w = p.create_datawriter::<RawBytes>(&mk_topic(), qos).unwrap();
1633 let s = RawBytes::new(b"x".to_vec());
1634 w.write(&s).unwrap();
1635 let res = w.write(&s);
1637 assert!(matches!(res, Err(DdsError::Timeout)));
1638 }
1639
1640 #[test]
1641 fn write_returns_oor_when_best_effort_queue_full() {
1642 use crate::qos::ResourceLimitsQosPolicy;
1643 let p = Publisher::new(PublisherQos::default(), None);
1644 let qos = DataWriterQos {
1645 resource_limits: ResourceLimitsQosPolicy {
1646 max_samples: 1,
1647 max_instances: -1,
1648 max_samples_per_instance: -1,
1649 },
1650 reliability: crate::qos::ReliabilityQosPolicy {
1651 kind: ReliabilityKind::BestEffort,
1652 max_blocking_time: zerodds_qos::Duration::from_millis(0_i32),
1653 },
1654 ..DataWriterQos::default()
1655 };
1656 let w = p.create_datawriter::<RawBytes>(&mk_topic(), qos).unwrap();
1657 let s = RawBytes::new(b"x".to_vec());
1658 w.write(&s).unwrap();
1659 let res = w.write(&s);
1660 assert!(matches!(res, Err(DdsError::OutOfResources { .. })));
1661 }
1662
1663 #[test]
1664 fn write_does_not_block_when_max_samples_unlimited() {
1665 let p = Publisher::new(PublisherQos::default(), None);
1667 let w = p
1668 .create_datawriter::<RawBytes>(&mk_topic(), DataWriterQos::default())
1669 .unwrap();
1670 let s = RawBytes::new(b"x".to_vec());
1671 for _ in 0..50 {
1672 w.write(&s).unwrap();
1673 }
1674 assert_eq!(w.samples_pending(), 50);
1675 }
1676
1677 #[test]
1678 fn copy_from_topic_qos_does_not_touch_writer_only_policies() {
1679 use crate::qos::TopicQos;
1680 let topic = TopicQos::default();
1681 let mut dw = DataWriterQos::default();
1682 dw.ownership_strength.value = 42;
1685 Publisher::copy_from_topic_qos(&mut dw, &topic).unwrap();
1686 assert_eq!(dw.ownership_strength.value, 42);
1687 }
1688}