1extern crate alloc;
18use alloc::boxed::Box;
19use alloc::string::ToString;
20use alloc::sync::Arc;
21use alloc::vec::Vec;
22use core::marker::PhantomData;
23
24#[cfg(feature = "std")]
25use std::sync::Mutex;
26#[cfg(feature = "std")]
27use std::sync::mpsc;
28
29use crate::dds_type::DdsType;
30use crate::entity::StatusMask;
31use crate::error::{DdsError, Result};
32#[cfg(feature = "std")]
33use crate::instance_handle::{HANDLE_NIL, InstanceHandle};
34#[cfg(feature = "std")]
35use crate::instance_tracker::InstanceTracker;
36use crate::listener::{ArcDataReaderListener, ArcSubscriberListener};
37use crate::qos::{DataReaderQos, SubscriberQos};
38#[cfg(feature = "std")]
39use crate::sample::Sample;
40#[cfg(feature = "std")]
41use crate::sample_info::{
42 InstanceStateKind, SampleInfo, SampleStateKind, ViewStateKind, instance_state_mask,
43 sample_state_mask, view_state_mask,
44};
45#[cfg(feature = "std")]
46use crate::time::{Time, get_current_time};
47use crate::topic::Topic;
48
49#[cfg(feature = "std")]
50use crate::runtime::DcpsRuntime;
51#[cfg(feature = "std")]
52use zerodds_qos::ReliabilityKind;
53#[cfg(feature = "std")]
54use zerodds_rtps::wire_types::EntityId;
55
56#[derive(Debug)]
58pub struct Subscriber {
59 pub(crate) inner: Arc<SubscriberInner>,
60}
61
62pub(crate) struct SubscriberInner {
63 #[cfg(feature = "std")]
64 pub(crate) qos: std::sync::Mutex<SubscriberQos>,
65 #[cfg(not(feature = "std"))]
66 #[allow(dead_code)]
67 pub(crate) qos: SubscriberQos,
68 pub(crate) entity_state: alloc::sync::Arc<crate::entity::EntityState>,
69 #[cfg(feature = "std")]
70 pub(crate) runtime: Option<Arc<DcpsRuntime>>,
71 #[cfg(feature = "std")]
74 pub(crate) listener: std::sync::Mutex<Option<(ArcSubscriberListener, StatusMask)>>,
75 #[cfg(feature = "std")]
78 pub(crate) participant:
79 std::sync::Mutex<Option<alloc::sync::Weak<crate::participant::ParticipantInner>>>,
80 pub(crate) access_scope: Arc<crate::coherent_set::GroupAccessScope>,
83 #[cfg(feature = "std")]
87 pub(crate) datareaders:
88 std::sync::Mutex<alloc::vec::Vec<crate::instance_handle::InstanceHandle>>,
89}
90
91impl core::fmt::Debug for SubscriberInner {
92 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
93 let listener_present = self.listener.lock().map(|s| s.is_some()).unwrap_or(false);
94 f.debug_struct("SubscriberInner")
95 .field("entity_state", &self.entity_state)
96 .field("listener_present", &listener_present)
97 .finish_non_exhaustive()
98 }
99}
100
101impl Subscriber {
102 #[cfg(feature = "std")]
103 pub(crate) fn new(qos: SubscriberQos, runtime: Option<Arc<DcpsRuntime>>) -> Self {
104 Self {
105 inner: Arc::new(SubscriberInner {
106 qos: std::sync::Mutex::new(qos),
107 entity_state: crate::entity::EntityState::new(),
108 runtime,
109 listener: std::sync::Mutex::new(None),
110 participant: std::sync::Mutex::new(None),
111 access_scope: crate::coherent_set::GroupAccessScope::new(),
112 datareaders: std::sync::Mutex::new(alloc::vec::Vec::new()),
113 }),
114 }
115 }
116
117 #[cfg(feature = "std")]
120 #[must_use]
121 pub fn contains_reader(&self, handle: crate::instance_handle::InstanceHandle) -> bool {
122 self.inner
123 .datareaders
124 .lock()
125 .map(|v| v.contains(&handle))
126 .unwrap_or(false)
127 }
128
129 #[cfg(feature = "std")]
130 fn track_reader(&self, handle: crate::instance_handle::InstanceHandle) {
131 if let Ok(mut list) = self.inner.datareaders.lock() {
132 list.push(handle);
133 }
134 if let Ok(slot) = self.inner.participant.lock() {
136 if let Some(weak) = slot.as_ref() {
137 if let Some(p_inner) = weak.upgrade() {
138 if let Ok(mut drs) = p_inner.datareaders.lock() {
139 drs.push(handle);
140 }
141 }
142 }
143 }
144 }
145 #[cfg(not(feature = "std"))]
146 pub(crate) fn new(qos: SubscriberQos) -> Self {
147 Self {
148 inner: Arc::new(SubscriberInner {
149 qos,
150 entity_state: crate::entity::EntityState::new(),
151 access_scope: crate::coherent_set::GroupAccessScope::new(),
152 }),
153 }
154 }
155
156 pub fn begin_access(&self) {
161 self.inner.access_scope.begin();
162 }
163
164 pub fn end_access(&self) -> Result<()> {
170 self.inner.access_scope.end()
171 }
172
173 #[must_use]
175 pub fn is_access_open(&self) -> bool {
176 self.inner.access_scope.is_active()
177 }
178
179 #[cfg(feature = "std")]
182 pub fn set_listener(&self, listener: Option<ArcSubscriberListener>, mask: StatusMask) {
183 if let Ok(mut slot) = self.inner.listener.lock() {
184 *slot = listener.map(|l| (l, mask));
185 }
186 self.inner.entity_state.set_listener_mask(mask);
187 }
188
189 #[cfg(feature = "std")]
191 #[must_use]
192 pub fn get_listener(&self) -> Option<ArcSubscriberListener> {
193 self.inner
194 .listener
195 .lock()
196 .ok()
197 .and_then(|s| s.as_ref().map(|(l, _)| Arc::clone(l)))
198 }
199
200 #[cfg(feature = "std")]
202 pub(crate) fn attach_participant(
203 &self,
204 participant: alloc::sync::Weak<crate::participant::ParticipantInner>,
205 ) {
206 if let Ok(mut slot) = self.inner.participant.lock() {
207 *slot = Some(participant);
208 }
209 }
210
211 #[cfg(feature = "std")]
214 #[must_use]
215 pub(crate) fn snapshot_reader_chain(
216 &self,
217 reader_listener: Option<(ArcDataReaderListener, StatusMask)>,
218 ) -> crate::listener_dispatch::ReaderListenerChain {
219 let subscriber = self
220 .inner
221 .listener
222 .lock()
223 .ok()
224 .and_then(|s| s.as_ref().map(|(l, m)| (Arc::clone(l), *m)));
225 let participant = {
226 let weak = self.inner.participant.lock().ok().and_then(|s| s.clone());
227 weak.and_then(|w| w.upgrade()).and_then(|inner| {
228 inner
229 .listener
230 .lock()
231 .ok()
232 .and_then(|s| s.as_ref().map(|(l, m)| (Arc::clone(l), *m)))
233 })
234 };
235 crate::listener_dispatch::ReaderListenerChain {
236 reader: reader_listener,
237 subscriber,
238 participant,
239 }
240 }
241
242 pub fn create_datareader<T: DdsType + Send + 'static>(
247 &self,
248 topic: &Topic<T>,
249 qos: DataReaderQos,
250 ) -> Result<DataReader<T>> {
251 if topic.type_name() != T::TYPE_NAME {
252 return Err(DdsError::BadParameter {
253 what: "topic.type_name mismatch",
254 });
255 }
256 #[cfg(feature = "std")]
257 if let Some(rt) = self.inner.runtime.as_ref() {
258 let reliable = qos.reliability.kind == ReliabilityKind::Reliable;
259 let (eid, rx) = rt.register_user_reader(crate::runtime::UserReaderConfig {
260 topic_name: topic.name().into(),
261 type_name: T::TYPE_NAME.into(),
262 reliable,
263 durability: qos.durability.kind,
264 deadline: qos.deadline,
265 liveliness: qos.liveliness,
266 ownership: qos.ownership.kind,
267 partition: qos.partition.names.clone(),
268 user_data: qos.user_data.value.clone(),
269 topic_data: qos.topic_data.value.clone(),
270 group_data: qos.group_data.value.clone(),
271 type_identifier: T::TYPE_IDENTIFIER.clone(),
273 type_consistency: zerodds_types::qos::TypeConsistencyEnforcement::default(),
274 data_representation_offer: None,
278 })?;
279 let dr = DataReader::new_live(
280 topic.clone(),
281 qos,
282 self.inner.clone(),
283 Arc::clone(rt),
284 eid,
285 rx,
286 );
287 self.track_reader(dr.entity_state.instance_handle());
288 return Ok(dr);
289 }
290 let dr = DataReader::new_offline(topic.clone(), qos, self.inner.clone());
291 #[cfg(feature = "std")]
292 self.track_reader(dr.entity_state.instance_handle());
293 Ok(dr)
294 }
295}
296
297#[cfg(feature = "std")]
302impl crate::entity::Entity for Subscriber {
303 type Qos = SubscriberQos;
304
305 fn get_qos(&self) -> Self::Qos {
306 self.inner.qos.lock().map(|q| q.clone()).unwrap_or_default()
307 }
308
309 fn set_qos(&self, qos: Self::Qos) -> Result<()> {
310 if let Ok(mut current) = self.inner.qos.lock() {
313 *current = qos;
314 }
315 Ok(())
316 }
317
318 fn enable(&self) -> Result<()> {
319 self.inner.entity_state.enable();
320 Ok(())
321 }
322
323 fn entity_state(&self) -> alloc::sync::Arc<crate::entity::EntityState> {
324 alloc::sync::Arc::clone(&self.inner.entity_state)
325 }
326}
327
328pub struct DataReader<T: DdsType> {
334 topic: Topic<T>,
335 qos: Mutex<DataReaderQos>,
336 entity_state: Arc<crate::entity::EntityState>,
338 subscriber: Arc<SubscriberInner>,
341 #[cfg(feature = "std")]
343 listener: Mutex<Option<(ArcDataReaderListener, StatusMask)>>,
344 #[cfg(feature = "std")]
347 last_match_count: std::sync::atomic::AtomicI64,
348 #[cfg(feature = "std")]
350 last_requested_deadline_missed: std::sync::atomic::AtomicU64,
351 #[cfg(feature = "std")]
353 last_liveliness_alive: std::sync::atomic::AtomicI64,
354 #[cfg(feature = "std")]
356 last_liveliness_not_alive: std::sync::atomic::AtomicI64,
357 #[cfg(feature = "std")]
359 last_requested_incompatible_qos: std::sync::atomic::AtomicI64,
360 #[cfg(feature = "std")]
362 last_sample_lost: std::sync::atomic::AtomicU64,
363 #[cfg(feature = "std")]
365 last_sample_rejected: std::sync::atomic::AtomicI64,
366 inbox: Arc<Mutex<Vec<crate::runtime::UserSample>>>,
371 #[cfg(feature = "std")]
372 #[allow(dead_code)]
373 runtime: Option<Arc<DcpsRuntime>>,
374 #[cfg(feature = "std")]
375 #[allow(dead_code)]
376 entity_id: Option<EntityId>,
377 #[cfg(feature = "std")]
379 rx: Option<Mutex<mpsc::Receiver<crate::runtime::UserSample>>>,
380 #[allow(clippy::type_complexity)]
390 filter: Option<Arc<dyn Fn(&T) -> bool + Send + Sync>>,
391 #[cfg(feature = "std")]
393 instances: InstanceTracker,
394 #[cfg(feature = "std")]
398 cache: Arc<Mutex<Vec<CachedSample>>>,
399 #[cfg(all(feature = "std", feature = "flatdata-integration"))]
404 #[allow(clippy::type_complexity)]
405 pub(crate) flat_backend: Mutex<
406 Option<(
407 Arc<dyn zerodds_flatdata::SlotBackend>,
408 u8, std::sync::atomic::AtomicU32,
410 )>,
411 >,
412 _t: PhantomData<fn() -> T>,
413}
414
415#[cfg(feature = "std")]
421#[derive(Debug)]
422pub(crate) struct CachedSample {
423 pub bytes: Option<Vec<u8>>,
424 pub info: SampleInfo,
425}
426
427impl<T: DdsType> core::fmt::Debug for DataReader<T> {
428 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
429 f.debug_struct("DataReader")
430 .field("topic", &self.topic.name())
431 .field("type", &T::TYPE_NAME)
432 .field("qos", &self.qos)
433 .finish_non_exhaustive()
434 }
435}
436
437impl<T: DdsType> DataReader<T> {
438 #[cfg(feature = "std")]
439 fn new_offline(topic: Topic<T>, qos: DataReaderQos, subscriber: Arc<SubscriberInner>) -> Self {
440 Self {
441 topic,
442 qos: Mutex::new(qos),
443 entity_state: crate::entity::EntityState::new(),
444 subscriber,
445 listener: Mutex::new(None),
446 last_match_count: std::sync::atomic::AtomicI64::new(-1),
447 last_requested_deadline_missed: std::sync::atomic::AtomicU64::new(0),
448 last_liveliness_alive: std::sync::atomic::AtomicI64::new(-1),
449 last_liveliness_not_alive: std::sync::atomic::AtomicI64::new(-1),
450 last_requested_incompatible_qos: std::sync::atomic::AtomicI64::new(-1),
451 last_sample_lost: std::sync::atomic::AtomicU64::new(0),
452 last_sample_rejected: std::sync::atomic::AtomicI64::new(-1),
453 inbox: Arc::new(Mutex::new(Vec::new())),
454 runtime: None,
455 entity_id: None,
456 rx: None,
457 filter: None,
458 instances: InstanceTracker::new(),
459 cache: Arc::new(Mutex::new(Vec::new())),
460 #[cfg(feature = "flatdata-integration")]
461 flat_backend: Mutex::new(None),
462 _t: PhantomData,
463 }
464 }
465
466 #[cfg(feature = "std")]
467 fn new_live(
468 topic: Topic<T>,
469 qos: DataReaderQos,
470 subscriber: Arc<SubscriberInner>,
471 runtime: Arc<DcpsRuntime>,
472 entity_id: EntityId,
473 rx: mpsc::Receiver<crate::runtime::UserSample>,
474 ) -> Self {
475 Self {
476 topic,
477 qos: Mutex::new(qos),
478 entity_state: crate::entity::EntityState::new(),
479 subscriber,
480 listener: Mutex::new(None),
481 last_match_count: std::sync::atomic::AtomicI64::new(-1),
482 last_requested_deadline_missed: std::sync::atomic::AtomicU64::new(0),
483 last_liveliness_alive: std::sync::atomic::AtomicI64::new(-1),
484 last_liveliness_not_alive: std::sync::atomic::AtomicI64::new(-1),
485 last_requested_incompatible_qos: std::sync::atomic::AtomicI64::new(-1),
486 last_sample_lost: std::sync::atomic::AtomicU64::new(0),
487 last_sample_rejected: std::sync::atomic::AtomicI64::new(-1),
488 inbox: Arc::new(Mutex::new(Vec::new())),
489 runtime: Some(runtime),
490 entity_id: Some(entity_id),
491 rx: Some(Mutex::new(rx)),
492 filter: None,
493 instances: InstanceTracker::new(),
494 cache: Arc::new(Mutex::new(Vec::new())),
495 #[cfg(feature = "flatdata-integration")]
496 flat_backend: Mutex::new(None),
497 _t: PhantomData,
498 }
499 }
500
501 #[cfg(not(feature = "std"))]
502 fn new(topic: Topic<T>, qos: DataReaderQos, subscriber: Arc<SubscriberInner>) -> Self {
503 Self {
504 topic,
505 qos,
506 subscriber,
507 inbox: Arc::new(Mutex::new(Vec::new())),
508 filter: None,
509 _t: PhantomData,
510 }
511 }
512
513 #[cfg(feature = "std")]
520 pub(crate) fn new_builtin(
521 topic: Topic<T>,
522 qos: DataReaderQos,
523 subscriber: Arc<SubscriberInner>,
524 inbox: Arc<Mutex<Vec<crate::runtime::UserSample>>>,
525 ) -> Self {
526 Self {
527 topic,
528 qos: Mutex::new(qos),
529 entity_state: crate::entity::EntityState::new(),
530 subscriber,
531 listener: Mutex::new(None),
532 last_match_count: std::sync::atomic::AtomicI64::new(-1),
533 last_requested_deadline_missed: std::sync::atomic::AtomicU64::new(0),
534 last_liveliness_alive: std::sync::atomic::AtomicI64::new(-1),
535 last_liveliness_not_alive: std::sync::atomic::AtomicI64::new(-1),
536 last_requested_incompatible_qos: std::sync::atomic::AtomicI64::new(-1),
537 last_sample_lost: std::sync::atomic::AtomicU64::new(0),
538 last_sample_rejected: std::sync::atomic::AtomicI64::new(-1),
539 inbox,
540 runtime: None,
541 entity_id: None,
542 rx: None,
543 filter: None,
544 instances: InstanceTracker::new(),
545 cache: Arc::new(Mutex::new(Vec::new())),
546 #[cfg(feature = "flatdata-integration")]
547 flat_backend: Mutex::new(None),
548 _t: PhantomData,
549 }
550 }
551
552 #[must_use]
560 pub fn with_filter<F>(mut self, filter: F) -> Self
561 where
562 F: Fn(&T) -> bool + Send + Sync + 'static,
563 {
564 self.filter = Some(Arc::new(filter));
565 self
566 }
567
568 #[must_use]
570 pub fn topic(&self) -> &Topic<T> {
571 &self.topic
572 }
573
574 #[must_use]
578 pub fn subscription_handle(&self) -> crate::instance_handle::InstanceHandle {
579 self.entity_state.instance_handle()
580 }
581
582 #[cfg(feature = "std")]
585 pub fn set_listener(&self, listener: Option<ArcDataReaderListener>, mask: StatusMask) {
586 if let Ok(mut slot) = self.listener.lock() {
587 *slot = listener.map(|l| (l, mask));
588 }
589 self.entity_state.set_listener_mask(mask);
590 }
591
592 #[cfg(feature = "std")]
594 #[must_use]
595 pub fn get_listener(&self) -> Option<ArcDataReaderListener> {
596 self.listener
597 .lock()
598 .ok()
599 .and_then(|s| s.as_ref().map(|(l, _)| Arc::clone(l)))
600 }
601
602 #[cfg(feature = "std")]
605 #[must_use]
606 pub(crate) fn listener_chain(&self) -> crate::listener_dispatch::ReaderListenerChain {
607 let reader = self
608 .listener
609 .lock()
610 .ok()
611 .and_then(|s| s.as_ref().map(|(l, m)| (Arc::clone(l), *m)));
612 let sub_handle = Subscriber {
613 inner: Arc::clone(&self.subscriber),
614 };
615 sub_handle.snapshot_reader_chain(reader)
616 }
617
618 #[must_use]
620 pub fn qos(&self) -> DataReaderQos {
621 self.qos.lock().map(|q| q.clone()).unwrap_or_default()
622 }
623
624 pub fn take(&self) -> Result<Vec<T>> {
631 #[cfg(feature = "std")]
634 {
635 let now = get_current_time();
636 let mut empty: Vec<CachedSample> = Vec::new();
637 self.run_reader_autopurge(now, &mut empty);
638 }
639 #[cfg(feature = "std")]
642 if let Some(rx_mu) = self.rx.as_ref() {
643 let mut out = Vec::new();
644 let min_sep_nanos = {
647 let qos = self.qos.lock().unwrap_or_else(|e| e.into_inner());
648 qos.time_based_filter.minimum_separation.to_nanos()
649 };
650 let staged = {
651 let mut inbox = self
652 .inbox
653 .lock()
654 .map_err(|_| DdsError::PreconditionNotMet {
655 reason: "datareader inbox poisoned",
656 })?;
657 core::mem::take(&mut *inbox)
658 };
659 for staged_item in staged {
660 match staged_item {
661 crate::runtime::UserSample::Alive {
662 payload: bytes,
663 writer_guid,
664 writer_strength,
665 } => {
666 let sample = T::decode(&bytes).map_err(|e| DdsError::WireError {
667 message: e.to_string(),
668 })?;
669 if !self.sample_passes_filter(&sample) {
670 continue;
671 }
672 if !self.live_mode_time_based_filter_pass(&sample, min_sep_nanos) {
673 continue;
674 }
675 if !self.passes_exclusive_ownership(&sample, writer_guid, writer_strength) {
677 continue;
678 }
679 out.push(sample);
680 }
681 crate::runtime::UserSample::Lifecycle { .. } => {
682 }
687 }
688 }
689 let rx = rx_mu.lock().map_err(|_| DdsError::PreconditionNotMet {
690 reason: "datareader rx poisoned",
691 })?;
692 while let Ok(item) = rx.try_recv() {
693 match item {
694 crate::runtime::UserSample::Alive {
695 payload: bytes,
696 writer_guid,
697 writer_strength,
698 } => {
699 let sample = T::decode(&bytes).map_err(|e| DdsError::WireError {
700 message: e.to_string(),
701 })?;
702 if !self.sample_passes_filter(&sample) {
703 continue;
704 }
705 if !self.live_mode_time_based_filter_pass(&sample, min_sep_nanos) {
706 continue;
707 }
708 if !self.passes_exclusive_ownership(&sample, writer_guid, writer_strength) {
710 continue;
711 }
712 out.push(sample);
713 }
714 crate::runtime::UserSample::Lifecycle { key_hash, kind } => {
715 let mut holder_bytes = Vec::with_capacity(16);
718 holder_bytes.extend_from_slice(&key_hash);
719 let lc_kind = match kind {
720 zerodds_rtps::history_cache::ChangeKind::NotAliveDisposed
721 | zerodds_rtps::history_cache::ChangeKind::NotAliveDisposedUnregistered => {
722 crate::sample_info::InstanceStateKind::NotAliveDisposed
723 }
724 zerodds_rtps::history_cache::ChangeKind::NotAliveUnregistered => {
725 crate::sample_info::InstanceStateKind::NotAliveNoWriters
726 }
727 _ => crate::sample_info::InstanceStateKind::Alive,
728 };
729 let _ = self.__push_lifecycle(key_hash, holder_bytes, lc_kind);
730 }
731 }
732 }
733 return Ok(out);
734 }
735 let raw = {
737 let mut inbox = self
738 .inbox
739 .lock()
740 .map_err(|_| DdsError::PreconditionNotMet {
741 reason: "datareader inbox poisoned",
742 })?;
743 core::mem::take(&mut *inbox)
744 };
745 let mut out = Vec::with_capacity(raw.len());
746 for staged_item in raw {
747 let crate::runtime::UserSample::Alive {
748 payload: bytes,
749 writer_guid,
750 writer_strength,
751 } = staged_item
752 else {
753 continue;
754 };
755 let sample = T::decode(&bytes).map_err(|e| DdsError::WireError {
756 message: e.to_string(),
757 })?;
758 if !self.sample_passes_filter(&sample) {
759 continue;
760 }
761 if !self.passes_exclusive_ownership(&sample, writer_guid, writer_strength) {
766 continue;
767 }
768 out.push(sample);
769 }
770 Ok(out)
771 }
772
773 fn sample_passes_filter(&self, sample: &T) -> bool {
775 match &self.filter {
776 Some(f) => f(sample),
777 None => true,
778 }
779 }
780
781 #[cfg(feature = "std")]
791 fn passes_exclusive_ownership(
792 &self,
793 sample: &T,
794 writer_guid: [u8; 16],
795 writer_strength: i32,
796 ) -> bool {
797 let kind = {
798 let qos = self.qos.lock().unwrap_or_else(|e| e.into_inner());
799 qos.ownership.kind
800 };
801 if kind != zerodds_qos::OwnershipKind::Exclusive {
802 return true;
803 }
804 let (kh, key_bytes) = if T::HAS_KEY {
808 let mut holder = crate::dds_type::PlainCdr2BeKeyHolder::new();
809 sample.encode_key_holder_be(&mut holder);
810 let kb = holder.as_bytes().to_vec();
811 let max = T::KEY_HOLDER_MAX_SIZE.unwrap_or(usize::MAX);
812 (crate::dds_type::compute_key_hash(&kb, max), kb)
813 } else {
814 ([0u8; 16], Vec::new())
815 };
816 let _ = self.instances.observe_sample(kh, key_bytes, None);
820 self.instances
821 .should_accept_sample_under_exclusive_ownership(&kh, writer_guid, writer_strength)
822 }
823
824 #[cfg(feature = "std")]
832 fn live_mode_time_based_filter_pass(&self, sample: &T, min_sep_nanos: u128) -> bool {
833 if min_sep_nanos == 0 || !T::HAS_KEY {
834 return true;
835 }
836 let mut holder = crate::dds_type::PlainCdr2BeKeyHolder::new();
837 sample.encode_key_holder_be(&mut holder);
838 let key_bytes = holder.as_bytes().to_vec();
839 let max = T::KEY_HOLDER_MAX_SIZE.unwrap_or(usize::MAX);
840 let kh = crate::dds_type::compute_key_hash(&key_bytes, max);
841 let now = get_current_time();
842 if !self
843 .instances
844 .should_deliver_under_time_based_filter(&kh, now, min_sep_nanos)
845 {
846 return false;
847 }
848 let _ = self.instances.observe_sample(kh, key_bytes, Some(now));
849 self.instances.record_delivery(&kh, now);
850 true
851 }
852
853 pub fn read(&self) -> Result<Vec<T>> {
860 let raw = {
861 let inbox = self
862 .inbox
863 .lock()
864 .map_err(|_| DdsError::PreconditionNotMet {
865 reason: "datareader inbox poisoned",
866 })?;
867 inbox.clone()
868 };
869 let mut out = Vec::with_capacity(raw.len());
870 for staged_item in raw {
871 let crate::runtime::UserSample::Alive {
872 payload: bytes,
873 writer_guid,
874 writer_strength,
875 } = staged_item
876 else {
877 continue;
878 };
879 let sample = T::decode(&bytes).map_err(|e| DdsError::WireError {
880 message: e.to_string(),
881 })?;
882 if !self.sample_passes_filter(&sample) {
883 continue;
884 }
885 if !self.passes_exclusive_ownership(&sample, writer_guid, writer_strength) {
890 continue;
891 }
892 out.push(sample);
893 }
894 Ok(out)
895 }
896
897 #[must_use]
905 pub fn matched_publication_count(&self) -> usize {
906 #[cfg(feature = "std")]
907 if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
908 let n = rt.user_reader_matched_count(eid);
909 self.poll_subscription_matched(n);
910 return n;
911 }
912 0
913 }
914
915 #[cfg(feature = "std")]
917 pub(crate) fn poll_subscription_matched(&self, current: usize) {
918 let curr = current as i64;
919 let prev = self
920 .last_match_count
921 .swap(curr, std::sync::atomic::Ordering::AcqRel);
922 if prev == curr {
923 return;
924 }
925 let total = if curr > prev.max(0) {
926 curr
927 } else {
928 prev.max(0)
929 };
930 let delta = curr - prev.max(0);
931 let status = crate::status::SubscriptionMatchedStatus {
932 total_count: total as i32,
933 total_count_change: delta.max(0) as i32,
934 current_count: curr as i32,
935 current_count_change: delta as i32,
936 last_publication_handle: crate::instance_handle::HANDLE_NIL,
937 };
938 let chain = self.listener_chain();
939 crate::listener_dispatch::dispatch_subscription_matched(
940 &chain,
941 self.entity_state.instance_handle(),
942 status,
943 );
944 }
945
946 #[cfg(feature = "std")]
949 pub(crate) fn poll_requested_deadline_missed(&self, current: u64) {
950 let prev = self
951 .last_requested_deadline_missed
952 .swap(current, std::sync::atomic::Ordering::AcqRel);
953 if current == prev {
954 return;
955 }
956 let total_change = current.saturating_sub(prev);
957 let status = crate::status::RequestedDeadlineMissedStatus {
958 total_count: current as i32,
959 total_count_change: total_change as i32,
960 last_instance_handle: crate::instance_handle::HANDLE_NIL,
961 };
962 let chain = self.listener_chain();
963 crate::listener_dispatch::dispatch_requested_deadline_missed(
964 &chain,
965 self.entity_state.instance_handle(),
966 status,
967 );
968 }
969
970 #[cfg(feature = "std")]
974 pub(crate) fn poll_liveliness_changed(&self, alive_count: u64, not_alive_count: u64) {
975 let curr_alive = alive_count as i64;
976 let curr_not = not_alive_count as i64;
977 let prev_alive = self
978 .last_liveliness_alive
979 .swap(curr_alive, std::sync::atomic::Ordering::AcqRel);
980 let prev_not = self
981 .last_liveliness_not_alive
982 .swap(curr_not, std::sync::atomic::Ordering::AcqRel);
983 let alive_changed = if prev_alive < 0 {
986 curr_alive != 0
987 } else {
988 prev_alive != curr_alive
989 };
990 let not_changed = if prev_not < 0 {
991 curr_not != 0
992 } else {
993 prev_not != curr_not
994 };
995 if !alive_changed && !not_changed {
996 return;
997 }
998 let alive_delta = if prev_alive < 0 {
999 curr_alive
1000 } else {
1001 curr_alive - prev_alive
1002 };
1003 let not_delta = if prev_not < 0 {
1004 curr_not
1005 } else {
1006 curr_not - prev_not
1007 };
1008 let status = crate::status::LivelinessChangedStatus {
1009 alive_count: curr_alive as i32,
1010 not_alive_count: curr_not as i32,
1011 alive_count_change: alive_delta as i32,
1012 not_alive_count_change: not_delta as i32,
1013 last_publication_handle: crate::instance_handle::HANDLE_NIL,
1014 };
1015 let chain = self.listener_chain();
1016 crate::listener_dispatch::dispatch_liveliness_changed(
1017 &chain,
1018 self.entity_state.instance_handle(),
1019 status,
1020 );
1021 }
1022
1023 #[cfg(feature = "std")]
1026 pub(crate) fn poll_requested_incompatible_qos(
1027 &self,
1028 snapshot: crate::status::RequestedIncompatibleQosStatus,
1029 ) {
1030 let curr = i64::from(snapshot.total_count);
1031 let prev = self
1032 .last_requested_incompatible_qos
1033 .swap(curr, std::sync::atomic::Ordering::AcqRel);
1034 if curr == prev {
1035 return;
1036 }
1037 let delta = curr - prev.max(0);
1038 let status = crate::status::RequestedIncompatibleQosStatus {
1039 total_count: curr as i32,
1040 total_count_change: delta.max(0) as i32,
1041 last_policy_id: snapshot.last_policy_id,
1042 policies: snapshot.policies,
1043 };
1044 let chain = self.listener_chain();
1045 crate::listener_dispatch::dispatch_requested_incompatible_qos(
1046 &chain,
1047 self.entity_state.instance_handle(),
1048 status,
1049 );
1050 }
1051
1052 #[cfg(feature = "std")]
1054 pub(crate) fn poll_sample_lost(&self, current: u64) {
1055 let prev = self
1056 .last_sample_lost
1057 .swap(current, std::sync::atomic::Ordering::AcqRel);
1058 if current == prev {
1059 return;
1060 }
1061 let delta = current.saturating_sub(prev);
1062 let status = crate::status::SampleLostStatus {
1063 total_count: current as i32,
1064 total_count_change: delta as i32,
1065 };
1066 let chain = self.listener_chain();
1067 crate::listener_dispatch::dispatch_sample_lost(
1068 &chain,
1069 self.entity_state.instance_handle(),
1070 status,
1071 );
1072 }
1073
1074 #[cfg(feature = "std")]
1076 pub(crate) fn poll_sample_rejected(&self, snapshot: crate::status::SampleRejectedStatus) {
1077 let curr = i64::from(snapshot.total_count);
1078 let prev = self
1079 .last_sample_rejected
1080 .swap(curr, std::sync::atomic::Ordering::AcqRel);
1081 if curr == prev {
1082 return;
1083 }
1084 let delta = curr - prev.max(0);
1085 let status = crate::status::SampleRejectedStatus {
1086 total_count: curr as i32,
1087 total_count_change: delta.max(0) as i32,
1088 last_reason: snapshot.last_reason,
1089 last_instance_handle: snapshot.last_instance_handle,
1090 };
1091 let chain = self.listener_chain();
1092 crate::listener_dispatch::dispatch_sample_rejected(
1093 &chain,
1094 self.entity_state.instance_handle(),
1095 status,
1096 );
1097 }
1098
1099 #[cfg(feature = "std")]
1108 pub fn wait_for_matched_publication(
1109 &self,
1110 min_count: usize,
1111 timeout: core::time::Duration,
1112 ) -> Result<()> {
1113 let deadline = std::time::Instant::now() + timeout;
1114 loop {
1115 if self.matched_publication_count() >= min_count {
1116 return Ok(());
1117 }
1118 let now = std::time::Instant::now();
1119 if now >= deadline {
1120 return Err(DdsError::Timeout);
1121 }
1122 if let Some(rt) = self.runtime.as_ref() {
1125 let _ = rt.wait_match_event(deadline - now);
1126 } else {
1127 std::thread::sleep(core::time::Duration::from_millis(20));
1129 }
1130 }
1131 }
1132
1133 #[must_use]
1140 pub fn requested_deadline_missed_count(&self) -> u64 {
1141 #[cfg(feature = "std")]
1142 if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
1143 let n = rt.user_reader_requested_deadline_missed(eid);
1144 self.poll_requested_deadline_missed(n);
1145 return n;
1146 }
1147 0
1148 }
1149
1150 #[must_use]
1153 pub fn requested_incompatible_qos_status(
1154 &self,
1155 ) -> crate::status::RequestedIncompatibleQosStatus {
1156 #[cfg(feature = "std")]
1157 if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
1158 let s = rt.user_reader_requested_incompatible_qos(eid);
1159 self.poll_requested_incompatible_qos(s.clone());
1160 return s;
1161 }
1162 crate::status::RequestedIncompatibleQosStatus::default()
1163 }
1164
1165 #[must_use]
1167 pub fn sample_lost_count(&self) -> u64 {
1168 #[cfg(feature = "std")]
1169 if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
1170 let n = rt.user_reader_sample_lost(eid);
1171 self.poll_sample_lost(n);
1172 return n;
1173 }
1174 0
1175 }
1176
1177 #[must_use]
1179 pub fn sample_rejected_status(&self) -> crate::status::SampleRejectedStatus {
1180 #[cfg(feature = "std")]
1181 if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
1182 let s = rt.user_reader_sample_rejected(eid);
1183 self.poll_sample_rejected(s);
1184 return s;
1185 }
1186 crate::status::SampleRejectedStatus::default()
1187 }
1188
1189 #[cfg(feature = "std")]
1192 pub fn drive_listeners(&self) {
1193 let _ = self.matched_publication_count();
1194 let _ = self.requested_deadline_missed_count();
1195 let (_, alive, not_alive) = self.liveliness_changed_status();
1196 self.poll_liveliness_changed(alive, not_alive);
1197 let _ = self.requested_incompatible_qos_status();
1198 let _ = self.sample_lost_count();
1199 let _ = self.sample_rejected_status();
1200 }
1201
1202 #[must_use]
1213 pub fn liveliness_changed_status(&self) -> (bool, u64, u64) {
1214 #[cfg(feature = "std")]
1215 if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
1216 let triple = rt.user_reader_liveliness_status(eid);
1217 self.poll_liveliness_changed(triple.1, triple.2);
1219 return triple;
1220 }
1221 (false, 0, 0)
1222 }
1223
1224 #[cfg(feature = "std")]
1237 pub fn wait_for_data(&self, timeout: core::time::Duration) -> Result<()> {
1238 let Some(rx_mu) = self.rx.as_ref() else {
1239 let inbox_has = self.inbox.lock().map(|i| !i.is_empty()).unwrap_or(false);
1241 if inbox_has {
1242 return Ok(());
1243 }
1244 return Err(DdsError::Timeout);
1245 };
1246
1247 {
1249 let inbox = self
1250 .inbox
1251 .lock()
1252 .map_err(|_| DdsError::PreconditionNotMet {
1253 reason: "datareader inbox poisoned",
1254 })?;
1255 if !inbox.is_empty() {
1256 return Ok(());
1257 }
1258 }
1259
1260 let rx = rx_mu.lock().map_err(|_| DdsError::PreconditionNotMet {
1261 reason: "datareader rx poisoned",
1262 })?;
1263 let result = match rx.recv_timeout(timeout) {
1264 Ok(item) => {
1265 match item {
1266 sample @ crate::runtime::UserSample::Alive { .. } => {
1267 let mut inbox =
1268 self.inbox
1269 .lock()
1270 .map_err(|_| DdsError::PreconditionNotMet {
1271 reason: "datareader inbox poisoned",
1272 })?;
1273 inbox.push(sample);
1274 }
1275 crate::runtime::UserSample::Lifecycle { key_hash, kind } => {
1276 let lc_kind = match kind {
1277 zerodds_rtps::history_cache::ChangeKind::NotAliveDisposed
1278 | zerodds_rtps::history_cache::ChangeKind::NotAliveDisposedUnregistered => {
1279 crate::sample_info::InstanceStateKind::NotAliveDisposed
1280 }
1281 zerodds_rtps::history_cache::ChangeKind::NotAliveUnregistered => {
1282 crate::sample_info::InstanceStateKind::NotAliveNoWriters
1283 }
1284 _ => crate::sample_info::InstanceStateKind::Alive,
1285 };
1286 let mut holder_bytes = Vec::with_capacity(16);
1287 holder_bytes.extend_from_slice(&key_hash);
1288 let _ = self.__push_lifecycle(key_hash, holder_bytes, lc_kind);
1289 }
1290 }
1291 Ok(())
1292 }
1293 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => Err(DdsError::Timeout),
1294 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
1295 Err(DdsError::PreconditionNotMet {
1296 reason: "datareader rx disconnected",
1297 })
1298 }
1299 };
1300 drop(rx);
1303 if result.is_ok() {
1304 self.notify_data_arrived();
1305 }
1306 result
1307 }
1308
1309 #[doc(hidden)]
1312 #[cfg(feature = "std")]
1313 pub fn __inbox_handle(&self) -> Arc<Mutex<Vec<crate::runtime::UserSample>>> {
1314 Arc::clone(&self.inbox)
1315 }
1316
1317 #[doc(hidden)]
1325 pub fn __push_raw(&self, bytes: Vec<u8>) -> Result<()> {
1326 self.__push_raw_with_writer(bytes, [0u8; 16], 0)
1327 }
1328
1329 #[doc(hidden)]
1333 pub fn __push_raw_with_writer(
1334 &self,
1335 bytes: Vec<u8>,
1336 writer_guid: [u8; 16],
1337 writer_strength: i32,
1338 ) -> Result<()> {
1339 {
1340 let mut inbox = self
1341 .inbox
1342 .lock()
1343 .map_err(|_| DdsError::PreconditionNotMet {
1344 reason: "datareader inbox poisoned",
1345 })?;
1346 inbox.push(crate::runtime::UserSample::Alive {
1347 payload: bytes,
1348 writer_guid,
1349 writer_strength,
1350 });
1351 }
1352 self.notify_data_arrived();
1355 Ok(())
1356 }
1357
1358 #[cfg(feature = "std")]
1366 pub(crate) fn notify_data_arrived(&self) {
1367 let chain = self.listener_chain();
1368 let reader_handle = self.entity_state.instance_handle();
1369 crate::listener_dispatch::dispatch_data_on_readers(&chain, reader_handle);
1370 crate::listener_dispatch::dispatch_data_available(&chain, reader_handle);
1371 }
1372
1373 #[cfg(feature = "std")]
1381 #[must_use]
1382 pub fn instance_tracker(&self) -> InstanceTracker {
1383 self.instances.clone()
1384 }
1385
1386 #[doc(hidden)]
1390 #[cfg(feature = "std")]
1391 pub fn runtime_handle(
1392 &self,
1393 ) -> Option<(alloc::sync::Arc<crate::runtime::DcpsRuntime>, EntityId)> {
1394 match (&self.runtime, self.entity_id) {
1395 (Some(rt), Some(eid)) => Some((alloc::sync::Arc::clone(rt), eid)),
1396 _ => None,
1397 }
1398 }
1399
1400 #[must_use]
1411 pub fn notify_writer_liveliness_lost(&self, writer_guid: [u8; 16]) -> usize {
1412 self.instances.clear_owner_for_writer(writer_guid)
1413 }
1414
1415 #[must_use]
1419 pub fn notify_participant_liveliness_lost(&self, prefix: [u8; 12]) -> usize {
1420 self.instances.clear_owner_for_writer_prefix(prefix)
1421 }
1422
1423 #[cfg(feature = "std")]
1427 #[must_use]
1428 pub fn lookup_instance(&self, instance: &T) -> InstanceHandle {
1429 if !T::HAS_KEY {
1430 return HANDLE_NIL;
1431 }
1432 let mut holder = crate::dds_type::PlainCdr2BeKeyHolder::new();
1433 instance.encode_key_holder_be(&mut holder);
1434 let bytes = holder.as_bytes();
1435 let max = T::KEY_HOLDER_MAX_SIZE.unwrap_or(usize::MAX);
1436 let kh = crate::dds_type::compute_key_hash(bytes, max);
1437 self.instances.lookup(&kh).unwrap_or(HANDLE_NIL)
1438 }
1439
1440 #[cfg(feature = "std")]
1448 pub fn get_key_value(&self, handle: InstanceHandle) -> Result<T> {
1449 let Some(bytes) = self.instances.get_key_holder(handle) else {
1450 return Err(DdsError::BadParameter {
1451 what: "unknown instance handle",
1452 });
1453 };
1454 T::decode(&bytes).map_err(|e| DdsError::WireError {
1455 message: alloc::string::ToString::to_string(&e),
1456 })
1457 }
1458
1459 #[cfg(feature = "std")]
1467 fn ingest_into_cache(&self) -> Result<()> {
1468 let mut raw: Vec<(Vec<u8>, [u8; 16], i32)> = Vec::new();
1472 {
1473 let mut inbox = self
1474 .inbox
1475 .lock()
1476 .map_err(|_| DdsError::PreconditionNotMet {
1477 reason: "datareader inbox poisoned",
1478 })?;
1479 for item in inbox.drain(..) {
1480 if let crate::runtime::UserSample::Alive {
1481 payload,
1482 writer_guid,
1483 writer_strength,
1484 } = item
1485 {
1486 raw.push((payload, writer_guid, writer_strength));
1487 }
1488 }
1489 }
1490 let mut lifecycle_pending: Vec<(
1493 crate::instance_tracker::KeyHash,
1494 crate::sample_info::InstanceStateKind,
1495 )> = Vec::new();
1496 if let Some(rx_mu) = self.rx.as_ref() {
1497 let rx = rx_mu.lock().map_err(|_| DdsError::PreconditionNotMet {
1498 reason: "datareader rx poisoned",
1499 })?;
1500 while let Ok(item) = rx.try_recv() {
1501 match item {
1502 crate::runtime::UserSample::Alive {
1503 payload: bytes,
1504 writer_guid,
1505 writer_strength,
1506 } => raw.push((bytes, writer_guid, writer_strength)),
1507 crate::runtime::UserSample::Lifecycle { key_hash, kind } => {
1508 let lc_kind = match kind {
1509 zerodds_rtps::history_cache::ChangeKind::NotAliveDisposed
1510 | zerodds_rtps::history_cache::ChangeKind::NotAliveDisposedUnregistered => {
1511 crate::sample_info::InstanceStateKind::NotAliveDisposed
1512 }
1513 zerodds_rtps::history_cache::ChangeKind::NotAliveUnregistered => {
1514 crate::sample_info::InstanceStateKind::NotAliveNoWriters
1515 }
1516 _ => crate::sample_info::InstanceStateKind::Alive,
1517 };
1518 lifecycle_pending.push((key_hash, lc_kind));
1519 }
1520 }
1521 }
1522 }
1523 for (kh, lc_kind) in lifecycle_pending {
1526 let mut holder_bytes = Vec::with_capacity(16);
1527 holder_bytes.extend_from_slice(&kh);
1528 let _ = self.__push_lifecycle(kh, holder_bytes, lc_kind);
1529 }
1530 let now = get_current_time();
1531 let mut cache = self
1532 .cache
1533 .lock()
1534 .map_err(|_| DdsError::PreconditionNotMet {
1535 reason: "datareader cache poisoned",
1536 })?;
1537 if raw.is_empty() {
1538 self.run_reader_autopurge(now, &mut cache);
1541 return Ok(());
1542 }
1543 for (bytes, writer_guid, writer_strength) in raw {
1544 let sample = T::decode(&bytes).map_err(|e| DdsError::WireError {
1547 message: alloc::string::ToString::to_string(&e),
1548 })?;
1549 if !self.sample_passes_filter(&sample) {
1550 continue;
1551 }
1552 if !self.passes_exclusive_ownership(&sample, writer_guid, writer_strength) {
1555 continue;
1556 }
1557 let info = if T::HAS_KEY {
1558 let mut holder = crate::dds_type::PlainCdr2BeKeyHolder::new();
1559 sample.encode_key_holder_be(&mut holder);
1560 let key_bytes = holder.as_bytes().to_vec();
1561 let max = T::KEY_HOLDER_MAX_SIZE.unwrap_or(usize::MAX);
1562 let kh = crate::dds_type::compute_key_hash(&key_bytes, max);
1563 let (min_sep_nanos, by_source_ts) = {
1566 let qos = self.qos.lock().unwrap_or_else(|e| e.into_inner());
1567 (
1568 qos.time_based_filter.minimum_separation.to_nanos(),
1569 qos.destination_order.kind
1570 == zerodds_qos::DestinationOrderKind::BySourceTimestamp,
1571 )
1572 };
1573 if !self
1577 .instances
1578 .should_deliver_under_time_based_filter(&kh, now, min_sep_nanos)
1579 {
1580 continue;
1581 }
1582 if !self
1586 .instances
1587 .should_deliver_under_destination_order(&kh, now, by_source_ts)
1588 {
1589 continue;
1590 }
1591 let (handle, _) = self.instances.observe_sample(kh, key_bytes, Some(now));
1592 self.instances.record_delivery(&kh, now);
1593 let state = match self.instances.get_by_handle(handle) {
1594 Some(s) => s,
1595 None => continue, };
1597 SampleInfo {
1598 sample_state: SampleStateKind::NotRead,
1599 view_state: if state.reader_view_new {
1600 ViewStateKind::New
1601 } else {
1602 ViewStateKind::NotNew
1603 },
1604 instance_state: state.kind,
1605 disposed_generation_count: state.disposed_generation_count,
1606 no_writers_generation_count: state.no_writers_generation_count,
1607 source_timestamp: now,
1608 instance_handle: handle,
1609 valid_data: true,
1610 ..SampleInfo::default()
1611 }
1612 } else {
1613 SampleInfo {
1618 sample_state: SampleStateKind::NotRead,
1619 view_state: ViewStateKind::NotNew,
1620 instance_handle: HANDLE_NIL,
1621 source_timestamp: now,
1622 valid_data: true,
1623 ..SampleInfo::default()
1624 }
1625 };
1626 cache.push(CachedSample {
1627 bytes: Some(bytes),
1628 info,
1629 });
1630 }
1631 self.run_reader_autopurge(now, &mut cache);
1635 Ok(())
1636 }
1637
1638 #[cfg(feature = "std")]
1642 fn run_reader_autopurge(&self, now: Time, cache: &mut Vec<CachedSample>) {
1643 let (purge_disp, purge_now) = {
1644 let qos = self.qos.lock().unwrap_or_else(|e| e.into_inner());
1645 (
1646 qos.reader_data_lifecycle
1647 .autopurge_disposed_samples_delay
1648 .to_nanos(),
1649 qos.reader_data_lifecycle
1650 .autopurge_nowriter_samples_delay
1651 .to_nanos(),
1652 )
1653 };
1654 if purge_disp == u128::MAX && purge_now == u128::MAX {
1655 return;
1656 }
1657 let purged = self.instances.autopurge(now, purge_disp, purge_now);
1658 if purged > 0 {
1659 cache.retain(|s| {
1660 s.info.instance_handle.is_nil()
1661 || self
1662 .instances
1663 .get_by_handle(s.info.instance_handle)
1664 .is_some()
1665 });
1666 }
1667 }
1668
1669 #[cfg(feature = "std")]
1673 #[doc(hidden)]
1674 pub fn __push_lifecycle(
1675 &self,
1676 keyhash: crate::instance_tracker::KeyHash,
1677 key_holder: Vec<u8>,
1678 kind: InstanceStateKind,
1679 ) -> Result<()> {
1680 let now = get_current_time();
1681 let (handle, _) = self
1684 .instances
1685 .observe_sample(keyhash, key_holder, Some(now));
1686 match kind {
1687 InstanceStateKind::NotAliveDisposed => {
1688 self.instances.dispose(handle, Some(now));
1689 }
1690 InstanceStateKind::NotAliveNoWriters => {
1691 self.instances.unregister(handle, Some(now));
1692 }
1693 InstanceStateKind::Alive => {}
1694 }
1695 let Some(state) = self.instances.get_by_handle(handle) else {
1696 return Ok(()); };
1698 let info = SampleInfo {
1699 source_timestamp: now,
1700 valid_data: false,
1701 instance_handle: handle,
1702 instance_state: state.kind,
1703 disposed_generation_count: state.disposed_generation_count,
1704 no_writers_generation_count: state.no_writers_generation_count,
1705 view_state: if state.reader_view_new {
1706 ViewStateKind::New
1707 } else {
1708 ViewStateKind::NotNew
1709 },
1710 ..SampleInfo::default()
1711 };
1712 let mut cache = self
1713 .cache
1714 .lock()
1715 .map_err(|_| DdsError::PreconditionNotMet {
1716 reason: "datareader cache poisoned",
1717 })?;
1718 cache.push(CachedSample { bytes: None, info });
1719 Ok(())
1720 }
1721
1722 #[cfg(feature = "std")]
1729 pub fn take_with_info(&self) -> Result<Vec<Sample<T>>> {
1730 self.take_filtered(
1731 sample_state_mask::ANY,
1732 view_state_mask::ANY,
1733 instance_state_mask::ANY,
1734 )
1735 }
1736
1737 #[cfg(feature = "std")]
1743 pub fn read_with_info(&self) -> Result<Vec<Sample<T>>> {
1744 self.read_filtered(
1745 sample_state_mask::ANY,
1746 view_state_mask::ANY,
1747 instance_state_mask::ANY,
1748 )
1749 }
1750
1751 #[cfg(feature = "std")]
1756 pub fn take_filtered(
1757 &self,
1758 sample_mask: u32,
1759 view_mask: u32,
1760 instance_mask: u32,
1761 ) -> Result<Vec<Sample<T>>> {
1762 self.ingest_into_cache()?;
1763 let mut cache = self
1764 .cache
1765 .lock()
1766 .map_err(|_| DdsError::PreconditionNotMet {
1767 reason: "datareader cache poisoned",
1768 })?;
1769 let mut out = Vec::new();
1770 let mut keep = Vec::with_capacity(cache.len());
1771 for s in cache.drain(..) {
1772 if s.info.matches_states(sample_mask, view_mask, instance_mask) {
1773 let sample = self.materialize(s)?;
1774 self.instances.mark_view_seen(sample.info.instance_handle);
1775 if sample.info.instance_handle != HANDLE_NIL {
1776 self.instances.drain_samples(sample.info.instance_handle, 1);
1777 }
1778 out.push(sample);
1779 } else {
1780 keep.push(s);
1781 }
1782 }
1783 *cache = keep;
1784 Ok(out)
1785 }
1786
1787 #[cfg(feature = "std")]
1792 pub fn read_filtered(
1793 &self,
1794 sample_mask: u32,
1795 view_mask: u32,
1796 instance_mask: u32,
1797 ) -> Result<Vec<Sample<T>>> {
1798 self.ingest_into_cache()?;
1799 let mut cache = self
1800 .cache
1801 .lock()
1802 .map_err(|_| DdsError::PreconditionNotMet {
1803 reason: "datareader cache poisoned",
1804 })?;
1805 let mut out = Vec::with_capacity(cache.len());
1806 for s in cache.iter_mut() {
1807 if !s.info.matches_states(sample_mask, view_mask, instance_mask) {
1808 continue;
1809 }
1810 let snapshot = Sample::new(
1812 self.decode_or_keyholder(s.bytes.as_deref(), s.info.instance_handle)?,
1813 s.info,
1814 );
1815 s.info.sample_state = SampleStateKind::Read;
1817 self.instances.mark_view_seen(s.info.instance_handle);
1818 out.push(snapshot);
1819 }
1820 Ok(out)
1821 }
1822
1823 #[cfg(feature = "std")]
1830 pub fn read_w_condition(
1831 &self,
1832 condition: &Arc<crate::condition::QueryCondition>,
1833 ) -> Result<Vec<Sample<T>>> {
1834 let base = condition.base();
1835 let sample_mask = base.get_sample_state_mask();
1836 let view_mask = base.get_view_state_mask();
1837 let instance_mask = base.get_instance_state_mask();
1838
1839 self.ingest_into_cache()?;
1840 let mut cache = self
1841 .cache
1842 .lock()
1843 .map_err(|_| DdsError::PreconditionNotMet {
1844 reason: "datareader cache poisoned",
1845 })?;
1846 let mut out = Vec::with_capacity(cache.len());
1847 for s in cache.iter_mut() {
1848 if !s.info.matches_states(sample_mask, view_mask, instance_mask) {
1849 continue;
1850 }
1851 let decoded = self.decode_or_keyholder(s.bytes.as_deref(), s.info.instance_handle)?;
1852 let row = crate::dds_type::DdsTypeRow::new(&decoded);
1853 if !condition.evaluate(&row).unwrap_or(false) {
1857 continue;
1858 }
1859 let snapshot = Sample::new(decoded, s.info);
1860 s.info.sample_state = SampleStateKind::Read;
1861 self.instances.mark_view_seen(s.info.instance_handle);
1862 out.push(snapshot);
1863 }
1864 Ok(out)
1865 }
1866
1867 #[cfg(feature = "std")]
1873 pub fn take_w_condition(
1874 &self,
1875 condition: &Arc<crate::condition::QueryCondition>,
1876 ) -> Result<Vec<Sample<T>>> {
1877 let base = condition.base();
1878 let sample_mask = base.get_sample_state_mask();
1879 let view_mask = base.get_view_state_mask();
1880 let instance_mask = base.get_instance_state_mask();
1881
1882 self.ingest_into_cache()?;
1883 let mut cache = self
1884 .cache
1885 .lock()
1886 .map_err(|_| DdsError::PreconditionNotMet {
1887 reason: "datareader cache poisoned",
1888 })?;
1889 let mut out = Vec::new();
1890 let mut keep = Vec::with_capacity(cache.len());
1891 for s in cache.drain(..) {
1892 if !s.info.matches_states(sample_mask, view_mask, instance_mask) {
1893 keep.push(s);
1894 continue;
1895 }
1896 let decoded = self.decode_or_keyholder(s.bytes.as_deref(), s.info.instance_handle)?;
1897 let row = crate::dds_type::DdsTypeRow::new(&decoded);
1898 if !condition.evaluate(&row).unwrap_or(false) {
1899 keep.push(s);
1900 continue;
1901 }
1902 let sample = Sample::new(decoded, s.info);
1903 self.instances.mark_view_seen(sample.info.instance_handle);
1904 if sample.info.instance_handle != HANDLE_NIL {
1905 self.instances.drain_samples(sample.info.instance_handle, 1);
1906 }
1907 out.push(sample);
1908 }
1909 *cache = keep;
1910 Ok(out)
1911 }
1912
1913 #[cfg(feature = "std")]
1919 pub fn read_instance(&self, handle: InstanceHandle) -> Result<Vec<Sample<T>>> {
1920 if handle.is_nil() {
1921 return Err(DdsError::BadParameter {
1922 what: "read_instance with HANDLE_NIL",
1923 });
1924 }
1925 self.ingest_into_cache()?;
1926 let mut cache = self
1927 .cache
1928 .lock()
1929 .map_err(|_| DdsError::PreconditionNotMet {
1930 reason: "datareader cache poisoned",
1931 })?;
1932 let mut out = Vec::new();
1933 for s in cache.iter_mut() {
1934 if s.info.instance_handle != handle {
1935 continue;
1936 }
1937 let snap = Sample::new(
1938 self.decode_or_keyholder(s.bytes.as_deref(), s.info.instance_handle)?,
1939 s.info,
1940 );
1941 s.info.sample_state = SampleStateKind::Read;
1942 self.instances.mark_view_seen(handle);
1943 out.push(snap);
1944 }
1945 Ok(out)
1946 }
1947
1948 #[cfg(feature = "std")]
1953 pub fn take_instance(&self, handle: InstanceHandle) -> Result<Vec<Sample<T>>> {
1954 if handle.is_nil() {
1955 return Err(DdsError::BadParameter {
1956 what: "take_instance with HANDLE_NIL",
1957 });
1958 }
1959 self.ingest_into_cache()?;
1960 let mut cache = self
1961 .cache
1962 .lock()
1963 .map_err(|_| DdsError::PreconditionNotMet {
1964 reason: "datareader cache poisoned",
1965 })?;
1966 let mut out = Vec::new();
1967 let mut keep = Vec::with_capacity(cache.len());
1968 for s in cache.drain(..) {
1969 if s.info.instance_handle == handle {
1970 out.push(self.materialize(s)?);
1971 } else {
1972 keep.push(s);
1973 }
1974 }
1975 *cache = keep;
1976 if !out.is_empty() {
1977 self.instances.mark_view_seen(handle);
1978 self.instances.drain_samples(handle, out.len() as u32);
1979 }
1980 Ok(out)
1981 }
1982
1983 #[cfg(feature = "std")]
1992 pub fn read_next_instance(&self, previous: InstanceHandle) -> Result<Vec<Sample<T>>> {
1993 let Some(next) = self.instances.next_handle_after(previous) else {
1994 return Ok(Vec::new());
1995 };
1996 self.read_instance(next)
1997 }
1998
1999 #[cfg(feature = "std")]
2004 pub fn take_next_instance(&self, previous: InstanceHandle) -> Result<Vec<Sample<T>>> {
2005 let Some(next) = self.instances.next_handle_after(previous) else {
2006 return Ok(Vec::new());
2007 };
2008 self.take_instance(next)
2009 }
2010
2011 #[cfg(feature = "std")]
2016 fn materialize(&self, s: CachedSample) -> Result<Sample<T>> {
2017 let data = self.decode_or_keyholder(s.bytes.as_deref(), s.info.instance_handle)?;
2018 #[cfg(feature = "metrics")]
2019 crate::metrics::add_samples_read(self.topic.name(), 1);
2020 Ok(Sample::new(data, s.info))
2021 }
2022
2023 #[cfg(feature = "std")]
2027 fn decode_or_keyholder(&self, bytes: Option<&[u8]>, handle: InstanceHandle) -> Result<T> {
2028 if let Some(b) = bytes {
2029 return T::decode(b).map_err(|e| DdsError::WireError {
2030 message: alloc::string::ToString::to_string(&e),
2031 });
2032 }
2033 if let Some(holder) = self.instances.get_key_holder(handle) {
2034 return T::decode(&holder).map_err(|e| DdsError::WireError {
2035 message: alloc::string::ToString::to_string(&e),
2036 });
2037 }
2038 T::decode(&[]).map_err(|e| DdsError::WireError {
2039 message: alloc::string::ToString::to_string(&e),
2040 })
2041 }
2042}
2043
2044#[cfg(feature = "std")]
2045impl<T: DdsType> crate::entity::Entity for DataReader<T> {
2046 type Qos = DataReaderQos;
2047
2048 fn get_qos(&self) -> Self::Qos {
2049 self.qos.lock().map(|q| q.clone()).unwrap_or_default()
2050 }
2051
2052 fn set_qos(&self, qos: Self::Qos) -> Result<()> {
2055 let enabled = self.entity_state.is_enabled();
2056 if let Ok(mut current) = self.qos.lock() {
2057 if enabled {
2058 if current.durability != qos.durability {
2059 return Err(crate::entity::immutable_if_enabled("DURABILITY"));
2060 }
2061 if current.reliability != qos.reliability {
2062 return Err(crate::entity::immutable_if_enabled("RELIABILITY"));
2063 }
2064 if current.history != qos.history {
2065 return Err(crate::entity::immutable_if_enabled("HISTORY"));
2066 }
2067 if current.resource_limits != qos.resource_limits {
2068 return Err(crate::entity::immutable_if_enabled("RESOURCE_LIMITS"));
2069 }
2070 if current.ownership != qos.ownership {
2071 return Err(crate::entity::immutable_if_enabled("OWNERSHIP"));
2072 }
2073 if current.liveliness != qos.liveliness {
2074 return Err(crate::entity::immutable_if_enabled("LIVELINESS"));
2075 }
2076 }
2077 *current = qos;
2078 }
2079 Ok(())
2080 }
2081
2082 fn enable(&self) -> Result<()> {
2083 self.entity_state.enable();
2084 Ok(())
2085 }
2086
2087 fn entity_state(&self) -> Arc<crate::entity::EntityState> {
2088 Arc::clone(&self.entity_state)
2089 }
2090}
2091
2092#[allow(dead_code)]
2094pub(crate) trait AnyDataReader: Send + Sync + core::fmt::Debug {
2095 fn topic_name(&self) -> &str;
2096 fn type_name(&self) -> &'static str;
2097}
2098
2099impl<T: DdsType + Send + 'static> AnyDataReader for DataReader<T>
2100where
2101 T: Send + Sync,
2102{
2103 fn topic_name(&self) -> &str {
2104 self.topic.name()
2105 }
2106 fn type_name(&self) -> &'static str {
2107 T::TYPE_NAME
2108 }
2109}
2110
2111#[allow(dead_code)]
2112pub(crate) fn boxed_any_reader<T: DdsType + Send + Sync + 'static>(
2113 r: DataReader<T>,
2114) -> Box<dyn AnyDataReader> {
2115 Box::new(r)
2116}
2117
2118#[cfg(test)]
2119#[allow(clippy::expect_used, clippy::unwrap_used)]
2120mod tests {
2121 use super::*;
2122 use crate::dds_type::RawBytes;
2123 use crate::factory::DomainParticipantFactory;
2124 use crate::qos::{DomainParticipantQos, TopicQos};
2125
2126 fn mk_topic() -> Topic<RawBytes> {
2127 let p = DomainParticipantFactory::instance()
2128 .create_participant_offline(0, DomainParticipantQos::default());
2129 Topic::new("Chatter".into(), TopicQos::default(), p)
2130 }
2131
2132 #[test]
2133 fn subscriber_creates_datareader_for_matching_type() {
2134 let s = Subscriber::new(SubscriberQos::default(), None);
2135 let r = s
2136 .create_datareader::<RawBytes>(&mk_topic(), DataReaderQos::default())
2137 .unwrap();
2138 assert_eq!(r.topic().name(), "Chatter");
2139 }
2140
2141 #[test]
2142 fn datareader_take_returns_decoded_samples() {
2143 let s = Subscriber::new(SubscriberQos::default(), None);
2144 let r = s
2145 .create_datareader::<RawBytes>(&mk_topic(), DataReaderQos::default())
2146 .unwrap();
2147 r.__push_raw(vec![1, 2, 3]).unwrap();
2148 r.__push_raw(vec![4, 5]).unwrap();
2149 let samples = r.take().unwrap();
2150 assert_eq!(samples.len(), 2);
2151 assert_eq!(samples[0].data, vec![1, 2, 3]);
2152 assert_eq!(samples[1].data, vec![4, 5]);
2153 let again = r.take().unwrap();
2155 assert!(again.is_empty());
2156 }
2157
2158 #[test]
2159 fn datareader_read_preserves_samples() {
2160 let s = Subscriber::new(SubscriberQos::default(), None);
2161 let r = s
2162 .create_datareader::<RawBytes>(&mk_topic(), DataReaderQos::default())
2163 .unwrap();
2164 r.__push_raw(vec![0xAA]).unwrap();
2165 let first = r.read().unwrap();
2166 let second = r.read().unwrap();
2167 assert_eq!(first.len(), 1);
2168 assert_eq!(second.len(), 1);
2169 }
2170
2171 use core::sync::atomic::{AtomicU32, Ordering};
2174
2175 #[test]
2176 fn datareader_set_listener_stores_arc_and_mask() {
2177 struct L;
2178 impl crate::listener::DataReaderListener for L {}
2179 let s = Subscriber::new(SubscriberQos::default(), None);
2180 let r = s
2181 .create_datareader::<RawBytes>(&mk_topic(), DataReaderQos::default())
2182 .unwrap();
2183 assert!(r.get_listener().is_none());
2184 r.set_listener(Some(Arc::new(L)), crate::psm_constants::status::ANY);
2185 assert!(r.get_listener().is_some());
2186 assert_eq!(
2187 r.entity_state.listener_mask(),
2188 crate::psm_constants::status::ANY
2189 );
2190 }
2191
2192 #[test]
2193 fn poll_subscription_matched_fires_on_count_increase() {
2194 struct Cnt(AtomicU32);
2195 impl crate::listener::DataReaderListener for Cnt {
2196 fn on_subscription_matched(
2197 &self,
2198 _r: crate::InstanceHandle,
2199 _s: crate::status::SubscriptionMatchedStatus,
2200 ) {
2201 self.0.fetch_add(1, Ordering::Relaxed);
2202 }
2203 }
2204 let s = Subscriber::new(SubscriberQos::default(), None);
2205 let r = s
2206 .create_datareader::<RawBytes>(&mk_topic(), DataReaderQos::default())
2207 .unwrap();
2208 let cnt = Arc::new(Cnt(AtomicU32::new(0)));
2209 r.set_listener(Some(cnt.clone()), crate::psm_constants::status::ANY);
2210
2211 r.poll_subscription_matched(0);
2212 assert_eq!(cnt.0.load(Ordering::Relaxed), 1);
2213 r.poll_subscription_matched(1);
2214 assert_eq!(cnt.0.load(Ordering::Relaxed), 2);
2215 r.poll_subscription_matched(1);
2216 assert_eq!(cnt.0.load(Ordering::Relaxed), 2);
2217 r.poll_subscription_matched(0);
2218 assert_eq!(cnt.0.load(Ordering::Relaxed), 3);
2219 }
2220
2221 #[test]
2222 fn poll_subscription_matched_with_no_listener_is_noop() {
2223 let s = Subscriber::new(SubscriberQos::default(), None);
2224 let r = s
2225 .create_datareader::<RawBytes>(&mk_topic(), DataReaderQos::default())
2226 .unwrap();
2227 r.poll_subscription_matched(0);
2228 r.poll_subscription_matched(3);
2229 }
2230
2231 #[test]
2232 fn notify_data_arrived_fires_data_available_and_data_on_readers() {
2233 struct ReadCnt(AtomicU32, AtomicU32);
2234 impl crate::listener::DataReaderListener for ReadCnt {
2235 fn on_data_available(&self, _r: crate::InstanceHandle) {
2236 self.0.fetch_add(1, Ordering::Relaxed);
2237 }
2238 fn on_subscription_matched(
2239 &self,
2240 _r: crate::InstanceHandle,
2241 _s: crate::status::SubscriptionMatchedStatus,
2242 ) {
2243 self.1.fetch_add(1, Ordering::Relaxed);
2244 }
2245 }
2246 let s = Subscriber::new(SubscriberQos::default(), None);
2247 let r = s
2248 .create_datareader::<RawBytes>(&mk_topic(), DataReaderQos::default())
2249 .unwrap();
2250 let rc = Arc::new(ReadCnt(AtomicU32::new(0), AtomicU32::new(0)));
2251 r.set_listener(Some(rc.clone()), crate::psm_constants::status::ANY);
2252 r.notify_data_arrived();
2253 assert_eq!(rc.0.load(Ordering::Relaxed), 1);
2254 assert_eq!(rc.1.load(Ordering::Relaxed), 0);
2256 }
2257
2258 #[test]
2261 fn subscriber_begin_end_access_roundtrip() {
2262 let s = Subscriber::new(SubscriberQos::default(), None);
2263 assert!(!s.is_access_open());
2264 s.begin_access();
2265 assert!(s.is_access_open());
2266 s.end_access().unwrap();
2267 assert!(!s.is_access_open());
2268 }
2269
2270 #[test]
2271 fn subscriber_end_access_without_begin_returns_precondition_not_met() {
2272 let s = Subscriber::new(SubscriberQos::default(), None);
2274 let res = s.end_access();
2275 assert!(matches!(
2276 res,
2277 Err(crate::error::DdsError::PreconditionNotMet { .. })
2278 ));
2279 }
2280
2281 #[test]
2282 fn subscriber_begin_access_is_nestable() {
2283 let s = Subscriber::new(SubscriberQos::default(), None);
2286 s.begin_access();
2287 s.begin_access();
2288 assert!(s.is_access_open());
2289 s.end_access().unwrap();
2290 assert!(s.is_access_open());
2292 s.end_access().unwrap();
2293 assert!(!s.is_access_open());
2295 }
2296
2297 #[test]
2298 fn subscriber_too_many_ends_after_balanced_returns_error() {
2299 let s = Subscriber::new(SubscriberQos::default(), None);
2302 s.begin_access();
2303 s.end_access().unwrap();
2304 let res = s.end_access();
2305 assert!(matches!(
2306 res,
2307 Err(crate::error::DdsError::PreconditionNotMet { .. })
2308 ));
2309 }
2310}