Skip to main content

zerodds_dcps/
subscriber.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Subscriber + DataReader — das Empfangs-Ende der DCPS-API.
4//!
5//! Spec-Referenz: OMG DDS 1.4 §2.2.2.5 `Subscriber`, §2.2.2.5.2
6//! `DataReader`.
7//!
8//! # Scope v1.2
9//!
10//! - `Subscriber::create_datareader<T>(topic, qos)` → `DataReader<T>`.
11//! - `DataReader::take()` entnimmt alle zwischengespeicherten Samples.
12//! - `DataReader::read()` peekt ohne zu entfernen (Offline: identisch
13//!   zu take, kein Statement-Wechsel — Spec §2.2.2.5.3.4 sample-state
14//!   wird in Live-Mode implementiert).
15//! - Listener / WaitSet: Live-Mode.
16
17extern crate alloc;
18use alloc::boxed::Box;
19use alloc::string::ToString;
20use alloc::sync::Arc;
21use alloc::vec::Vec;
22use core::marker::PhantomData;
23
24#[cfg(feature = "std")]
25use std::sync::Mutex;
26#[cfg(feature = "std")]
27use std::sync::mpsc;
28
29use crate::dds_type::DdsType;
30use crate::entity::StatusMask;
31use crate::error::{DdsError, Result};
32#[cfg(feature = "std")]
33use crate::instance_handle::{HANDLE_NIL, InstanceHandle};
34#[cfg(feature = "std")]
35use crate::instance_tracker::InstanceTracker;
36use crate::listener::{ArcDataReaderListener, ArcSubscriberListener};
37use crate::qos::{DataReaderQos, SubscriberQos};
38#[cfg(feature = "std")]
39use crate::sample::Sample;
40#[cfg(feature = "std")]
41use crate::sample_info::{
42    InstanceStateKind, SampleInfo, SampleStateKind, ViewStateKind, instance_state_mask,
43    sample_state_mask, view_state_mask,
44};
45#[cfg(feature = "std")]
46use crate::time::{Time, get_current_time};
47use crate::topic::Topic;
48
49#[cfg(feature = "std")]
50use crate::runtime::DcpsRuntime;
51#[cfg(feature = "std")]
52use zerodds_qos::ReliabilityKind;
53#[cfg(feature = "std")]
54use zerodds_rtps::wire_types::EntityId;
55
56/// Subscriber — Entity-Gruppe fuer DataReader.
57#[derive(Debug)]
58pub struct Subscriber {
59    pub(crate) inner: Arc<SubscriberInner>,
60}
61
62pub(crate) struct SubscriberInner {
63    #[cfg(feature = "std")]
64    pub(crate) qos: std::sync::Mutex<SubscriberQos>,
65    #[cfg(not(feature = "std"))]
66    #[allow(dead_code)]
67    pub(crate) qos: SubscriberQos,
68    pub(crate) entity_state: alloc::sync::Arc<crate::entity::EntityState>,
69    #[cfg(feature = "std")]
70    pub(crate) runtime: Option<Arc<DcpsRuntime>>,
71    /// optionaler `SubscriberListener` + StatusMask.
72    /// Bubble-Up-Target fuer Reader-Events.
73    #[cfg(feature = "std")]
74    pub(crate) listener: std::sync::Mutex<Option<(ArcSubscriberListener, StatusMask)>>,
75    /// Schwacher Back-Pointer auf den Participant (Bubble-Up,
76    /// Cycle-Vermeidung via Weak).
77    #[cfg(feature = "std")]
78    pub(crate) participant:
79        std::sync::Mutex<Option<alloc::sync::Weak<crate::participant::ParticipantInner>>>,
80    /// Group-Access-Scope fuer §2.2.2.5.2.8/.9 begin/end_access.
81    /// Counter-basiert (rekursiv nestable per Spec).
82    pub(crate) access_scope: Arc<crate::coherent_set::GroupAccessScope>,
83    /// DataReader-Handles (per `create_datareader` getrackt) fuer
84    /// rekursives `DomainParticipant::contains_entity`
85    /// (Spec §2.2.2.2.1.10).
86    #[cfg(feature = "std")]
87    pub(crate) datareaders:
88        std::sync::Mutex<alloc::vec::Vec<crate::instance_handle::InstanceHandle>>,
89}
90
91impl core::fmt::Debug for SubscriberInner {
92    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
93        let listener_present = self.listener.lock().map(|s| s.is_some()).unwrap_or(false);
94        f.debug_struct("SubscriberInner")
95            .field("entity_state", &self.entity_state)
96            .field("listener_present", &listener_present)
97            .finish_non_exhaustive()
98    }
99}
100
101impl Subscriber {
102    #[cfg(feature = "std")]
103    pub(crate) fn new(qos: SubscriberQos, runtime: Option<Arc<DcpsRuntime>>) -> Self {
104        Self {
105            inner: Arc::new(SubscriberInner {
106                qos: std::sync::Mutex::new(qos),
107                entity_state: crate::entity::EntityState::new(),
108                runtime,
109                listener: std::sync::Mutex::new(None),
110                participant: std::sync::Mutex::new(None),
111                access_scope: crate::coherent_set::GroupAccessScope::new(),
112                datareaders: std::sync::Mutex::new(alloc::vec::Vec::new()),
113            }),
114        }
115    }
116
117    /// Spec §2.2.2.2.1.10 — `true` wenn `handle` ein DataReader ist,
118    /// der ueber diesen Subscriber erzeugt wurde.
119    #[cfg(feature = "std")]
120    #[must_use]
121    pub fn contains_reader(&self, handle: crate::instance_handle::InstanceHandle) -> bool {
122        self.inner
123            .datareaders
124            .lock()
125            .map(|v| v.contains(&handle))
126            .unwrap_or(false)
127    }
128
129    #[cfg(feature = "std")]
130    fn track_reader(&self, handle: crate::instance_handle::InstanceHandle) {
131        if let Ok(mut list) = self.inner.datareaders.lock() {
132            list.push(handle);
133        }
134        // Propagiere zum Participant fuer rekursives contains_entity.
135        if let Ok(slot) = self.inner.participant.lock() {
136            if let Some(weak) = slot.as_ref() {
137                if let Some(p_inner) = weak.upgrade() {
138                    if let Ok(mut drs) = p_inner.datareaders.lock() {
139                        drs.push(handle);
140                    }
141                }
142            }
143        }
144    }
145    #[cfg(not(feature = "std"))]
146    pub(crate) fn new(qos: SubscriberQos) -> Self {
147        Self {
148            inner: Arc::new(SubscriberInner {
149                qos,
150                entity_state: crate::entity::EntityState::new(),
151                access_scope: crate::coherent_set::GroupAccessScope::new(),
152            }),
153        }
154    }
155
156    /// Spec §2.2.2.5.2.8 `begin_access` — markiert den Beginn eines
157    /// kohaerenten Read-Sets. Verschachtelung ist erlaubt; jeder
158    /// Aufruf erhoeht einen internen Counter, jedes `end_access`
159    /// erniedrigt ihn.
160    pub fn begin_access(&self) {
161        self.inner.access_scope.begin();
162    }
163
164    /// Spec §2.2.2.5.2.9 `end_access` — Gegenstueck zu `begin_access`.
165    ///
166    /// # Errors
167    /// `DdsError::PreconditionNotMet` wenn `end_access` ohne
168    /// vorhergehendes `begin_access` gerufen wird.
169    pub fn end_access(&self) -> Result<()> {
170        self.inner.access_scope.end()
171    }
172
173    /// `true` wenn aktuell ein Group-Access offen ist.
174    #[must_use]
175    pub fn is_access_open(&self) -> bool {
176        self.inner.access_scope.is_active()
177    }
178
179    /// setzt den `SubscriberListener` + StatusMask. `None`
180    /// loescht den Slot. Spec §2.2.2.5.6.x set_listener.
181    #[cfg(feature = "std")]
182    pub fn set_listener(&self, listener: Option<ArcSubscriberListener>, mask: StatusMask) {
183        if let Ok(mut slot) = self.inner.listener.lock() {
184            *slot = listener.map(|l| (l, mask));
185        }
186        self.inner.entity_state.set_listener_mask(mask);
187    }
188
189    /// aktueller Listener-Klon.
190    #[cfg(feature = "std")]
191    #[must_use]
192    pub fn get_listener(&self) -> Option<ArcSubscriberListener> {
193        self.inner
194            .listener
195            .lock()
196            .ok()
197            .and_then(|s| s.as_ref().map(|(l, _)| Arc::clone(l)))
198    }
199
200    /// Setzt den schwachen Back-Pointer auf den Participant.
201    #[cfg(feature = "std")]
202    pub(crate) fn attach_participant(
203        &self,
204        participant: alloc::sync::Weak<crate::participant::ParticipantInner>,
205    ) {
206        if let Ok(mut slot) = self.inner.participant.lock() {
207            *slot = Some(participant);
208        }
209    }
210
211    /// Snapshot der Reader-Bubble-Up-Kette: gegebenes
212    /// `reader_listener`-Tupel + Subscriber-Stage + Participant-Stage.
213    #[cfg(feature = "std")]
214    #[must_use]
215    pub(crate) fn snapshot_reader_chain(
216        &self,
217        reader_listener: Option<(ArcDataReaderListener, StatusMask)>,
218    ) -> crate::listener_dispatch::ReaderListenerChain {
219        let subscriber = self
220            .inner
221            .listener
222            .lock()
223            .ok()
224            .and_then(|s| s.as_ref().map(|(l, m)| (Arc::clone(l), *m)));
225        let participant = {
226            let weak = self.inner.participant.lock().ok().and_then(|s| s.clone());
227            weak.and_then(|w| w.upgrade()).and_then(|inner| {
228                inner
229                    .listener
230                    .lock()
231                    .ok()
232                    .and_then(|s| s.as_ref().map(|(l, m)| (Arc::clone(l), *m)))
233            })
234        };
235        crate::listener_dispatch::ReaderListenerChain {
236            reader: reader_listener,
237            subscriber,
238            participant,
239        }
240    }
241
242    /// Erzeugt einen typed `DataReader<T>`.
243    ///
244    /// # Errors
245    /// `BadParameter` bei Type-Name-Mismatch.
246    pub fn create_datareader<T: DdsType + Send + 'static>(
247        &self,
248        topic: &Topic<T>,
249        qos: DataReaderQos,
250    ) -> Result<DataReader<T>> {
251        if topic.type_name() != T::TYPE_NAME {
252            return Err(DdsError::BadParameter {
253                what: "topic.type_name mismatch",
254            });
255        }
256        #[cfg(feature = "std")]
257        if let Some(rt) = self.inner.runtime.as_ref() {
258            let reliable = qos.reliability.kind == ReliabilityKind::Reliable;
259            let (eid, rx) = rt.register_user_reader(crate::runtime::UserReaderConfig {
260                topic_name: topic.name().into(),
261                type_name: T::TYPE_NAME.into(),
262                reliable,
263                durability: qos.durability.kind,
264                deadline: qos.deadline,
265                liveliness: qos.liveliness,
266                ownership: qos.ownership.kind,
267                partition: qos.partition.names.clone(),
268                user_data: qos.user_data.value.clone(),
269                topic_data: qos.topic_data.value.clone(),
270                group_data: qos.group_data.value.clone(),
271                // F-TYPES-3: Topic-Type-Identifier + TCE-QoS weitergeben.
272                type_identifier: T::TYPE_IDENTIFIER.clone(),
273                type_consistency: zerodds_types::qos::TypeConsistencyEnforcement::default(),
274                // D.5g — Per-Reader-Override TBD (DataReaderQos::
275                // representation noch nicht modelliert). Default
276                // `None` = Runtime-Default.
277                data_representation_offer: None,
278            })?;
279            let dr = DataReader::new_live(
280                topic.clone(),
281                qos,
282                self.inner.clone(),
283                Arc::clone(rt),
284                eid,
285                rx,
286            );
287            self.track_reader(dr.entity_state.instance_handle());
288            return Ok(dr);
289        }
290        let dr = DataReader::new_offline(topic.clone(), qos, self.inner.clone());
291        #[cfg(feature = "std")]
292        self.track_reader(dr.entity_state.instance_handle());
293        Ok(dr)
294    }
295}
296
297// ============================================================================
298// Entity-Trait (DCPS §2.2.2.1) —
299// ============================================================================
300
301#[cfg(feature = "std")]
302impl crate::entity::Entity for Subscriber {
303    type Qos = SubscriberQos;
304
305    fn get_qos(&self) -> Self::Qos {
306        self.inner.qos.lock().map(|q| q.clone()).unwrap_or_default()
307    }
308
309    fn set_qos(&self, qos: Self::Qos) -> Result<()> {
310        // SubscriberQos: Partition / GroupData / Presentation sind alle
311        // Changeable=YES per Spec §2.2.3 — kein Immutable-Check nötig.
312        if let Ok(mut current) = self.inner.qos.lock() {
313            *current = qos;
314        }
315        Ok(())
316    }
317
318    fn enable(&self) -> Result<()> {
319        self.inner.entity_state.enable();
320        Ok(())
321    }
322
323    fn entity_state(&self) -> alloc::sync::Arc<crate::entity::EntityState> {
324        alloc::sync::Arc::clone(&self.inner.entity_state)
325    }
326}
327
328/// Typed DataReader — entnimmt Samples, die der RTPS-Reader fuer
329/// das Topic empfangen hat.
330///
331/// Live-Mode: `rx: Some` liefert Samples aus der Runtime-mpsc.
332/// Offline-Mode: in-memory `inbox` fuer Unit-Tests.
333pub struct DataReader<T: DdsType> {
334    topic: Topic<T>,
335    qos: Mutex<DataReaderQos>,
336    /// Entity-Lifecycle (DCPS §2.2.2.1).
337    entity_state: Arc<crate::entity::EntityState>,
338    /// Parent-Subscriber — fuer Bubble-Up zum Subscriber- und
339    /// Participant-Listener.
340    subscriber: Arc<SubscriberInner>,
341    /// optionaler `DataReaderListener` + StatusMask.
342    #[cfg(feature = "std")]
343    listener: Mutex<Option<(ArcDataReaderListener, StatusMask)>>,
344    /// zuletzt gesehene Anzahl matched Writer (fuer
345    /// Delta-Detection im poll_subscription_matched).
346    #[cfg(feature = "std")]
347    last_match_count: std::sync::atomic::AtomicI64,
348    /// zuletzt gesehener requested_deadline_missed-Counter.
349    #[cfg(feature = "std")]
350    last_requested_deadline_missed: std::sync::atomic::AtomicU64,
351    /// zuletzt gesehener (alive_count, not_alive_count).
352    #[cfg(feature = "std")]
353    last_liveliness_alive: std::sync::atomic::AtomicI64,
354    /// zuletzt gesehener not_alive-Counter.
355    #[cfg(feature = "std")]
356    last_liveliness_not_alive: std::sync::atomic::AtomicI64,
357    /// zuletzt gesehener requested_incompatible_qos.total_count.
358    #[cfg(feature = "std")]
359    last_requested_incompatible_qos: std::sync::atomic::AtomicI64,
360    /// zuletzt gesehener sample_lost-Counter.
361    #[cfg(feature = "std")]
362    last_sample_lost: std::sync::atomic::AtomicU64,
363    /// zuletzt gesehener sample_rejected.total_count.
364    #[cfg(feature = "std")]
365    last_sample_rejected: std::sync::atomic::AtomicI64,
366    /// Offline-Fallback-Inbox. Speichert volle [`UserSample`]-Werte
367    /// (inkl. writer_guid + writer_strength bei Alive), damit
368    /// `take()`/`read()` den Exclusive-Ownership-Filter spec-konform
369    /// anwenden koennen.
370    inbox: Arc<Mutex<Vec<crate::runtime::UserSample>>>,
371    #[cfg(feature = "std")]
372    #[allow(dead_code)]
373    runtime: Option<Arc<DcpsRuntime>>,
374    #[cfg(feature = "std")]
375    #[allow(dead_code)]
376    entity_id: Option<EntityId>,
377    /// Runtime-Channel fuer ankommende Samples (Live-Mode).
378    #[cfg(feature = "std")]
379    rx: Option<Mutex<mpsc::Receiver<crate::runtime::UserSample>>>,
380    /// Optional Content-Filter-Closure. Wird in `take()`
381    /// nach dem Decode auf jedes Sample angewendet; liefert `true` →
382    /// Sample wird ausgeliefert, `false` → verworfen.
383    ///
384    /// Spec-Bezug: OMG DDS 1.4 §2.2.2.5.4 `ContentFilteredTopic`.
385    /// Diese Rust-Closure-Variante ist idiomatischer als die SQL-
386    /// Expression-Syntax der Spec und reicht fuer alle In-Process
387    /// Use-Cases. SQL-Parser + Cross-Vendor-SEDP-Propagation kommen
388    /// mit .
389    #[allow(clippy::type_complexity)]
390    filter: Option<Arc<dyn Fn(&T) -> bool + Send + Sync>>,
391    ///  Instanz-Buchhaltung (Spec §2.2.2.5.1).
392    #[cfg(feature = "std")]
393    instances: InstanceTracker,
394    ///  Sample-Cache mit aufgeloester [`SampleInfo`]. Der Cache
395    /// wird beim Eingang via `ingest_bytes` befuellt; `take`/`read`/
396    /// `take_with_info`/`read_with_info` lesen daraus.
397    #[cfg(feature = "std")]
398    cache: Arc<Mutex<Vec<CachedSample>>>,
399    /// Optional konfigurierter Flatdata-SlotBackend fuer den Same-Host-
400    /// Zero-Copy-Lese-Pfad (`zerodds-flatdata-1.0` §4.1 + §9.1). Wird
401    /// via `set_flat_backend` gesetzt; `read_flat()` faellt auf
402    /// klassisches `take()` zurueck wenn `None`.
403    #[cfg(all(feature = "std", feature = "flatdata-integration"))]
404    #[allow(clippy::type_complexity)]
405    pub(crate) flat_backend: Mutex<
406        Option<(
407            Arc<dyn zerodds_flatdata::SlotBackend>,
408            u8, // reader_index (0..31)
409            std::sync::atomic::AtomicU32,
410        )>,
411    >,
412    _t: PhantomData<fn() -> T>,
413}
414
415/// Intern: ein dekodierter Sample im Reader-Cache.
416///
417/// Wir tragen die Bytes (statt `T`), damit der Reader-Cache nicht an
418/// `T: Clone` gebunden ist und damit `T::decode` lazy passieren kann.
419/// Lifecycle-Marker (Dispose/Unregister) haben `bytes == None`.
420#[cfg(feature = "std")]
421#[derive(Debug)]
422pub(crate) struct CachedSample {
423    pub bytes: Option<Vec<u8>>,
424    pub info: SampleInfo,
425}
426
427impl<T: DdsType> core::fmt::Debug for DataReader<T> {
428    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
429        f.debug_struct("DataReader")
430            .field("topic", &self.topic.name())
431            .field("type", &T::TYPE_NAME)
432            .field("qos", &self.qos)
433            .finish_non_exhaustive()
434    }
435}
436
437impl<T: DdsType> DataReader<T> {
438    #[cfg(feature = "std")]
439    fn new_offline(topic: Topic<T>, qos: DataReaderQos, subscriber: Arc<SubscriberInner>) -> Self {
440        Self {
441            topic,
442            qos: Mutex::new(qos),
443            entity_state: crate::entity::EntityState::new(),
444            subscriber,
445            listener: Mutex::new(None),
446            last_match_count: std::sync::atomic::AtomicI64::new(-1),
447            last_requested_deadline_missed: std::sync::atomic::AtomicU64::new(0),
448            last_liveliness_alive: std::sync::atomic::AtomicI64::new(-1),
449            last_liveliness_not_alive: std::sync::atomic::AtomicI64::new(-1),
450            last_requested_incompatible_qos: std::sync::atomic::AtomicI64::new(-1),
451            last_sample_lost: std::sync::atomic::AtomicU64::new(0),
452            last_sample_rejected: std::sync::atomic::AtomicI64::new(-1),
453            inbox: Arc::new(Mutex::new(Vec::new())),
454            runtime: None,
455            entity_id: None,
456            rx: None,
457            filter: None,
458            instances: InstanceTracker::new(),
459            cache: Arc::new(Mutex::new(Vec::new())),
460            #[cfg(feature = "flatdata-integration")]
461            flat_backend: Mutex::new(None),
462            _t: PhantomData,
463        }
464    }
465
466    #[cfg(feature = "std")]
467    fn new_live(
468        topic: Topic<T>,
469        qos: DataReaderQos,
470        subscriber: Arc<SubscriberInner>,
471        runtime: Arc<DcpsRuntime>,
472        entity_id: EntityId,
473        rx: mpsc::Receiver<crate::runtime::UserSample>,
474    ) -> Self {
475        Self {
476            topic,
477            qos: Mutex::new(qos),
478            entity_state: crate::entity::EntityState::new(),
479            subscriber,
480            listener: Mutex::new(None),
481            last_match_count: std::sync::atomic::AtomicI64::new(-1),
482            last_requested_deadline_missed: std::sync::atomic::AtomicU64::new(0),
483            last_liveliness_alive: std::sync::atomic::AtomicI64::new(-1),
484            last_liveliness_not_alive: std::sync::atomic::AtomicI64::new(-1),
485            last_requested_incompatible_qos: std::sync::atomic::AtomicI64::new(-1),
486            last_sample_lost: std::sync::atomic::AtomicU64::new(0),
487            last_sample_rejected: std::sync::atomic::AtomicI64::new(-1),
488            inbox: Arc::new(Mutex::new(Vec::new())),
489            runtime: Some(runtime),
490            entity_id: Some(entity_id),
491            rx: Some(Mutex::new(rx)),
492            filter: None,
493            instances: InstanceTracker::new(),
494            cache: Arc::new(Mutex::new(Vec::new())),
495            #[cfg(feature = "flatdata-integration")]
496            flat_backend: Mutex::new(None),
497            _t: PhantomData,
498        }
499    }
500
501    #[cfg(not(feature = "std"))]
502    fn new(topic: Topic<T>, qos: DataReaderQos, subscriber: Arc<SubscriberInner>) -> Self {
503        Self {
504            topic,
505            qos,
506            subscriber,
507            inbox: Arc::new(Mutex::new(Vec::new())),
508            filter: None,
509            _t: PhantomData,
510        }
511    }
512
513    /// Konstruktor fuer Builtin-Topic-Reader.
514    ///
515    /// Anders als `new_offline` teilt sich dieser Reader die Inbox mit
516    /// dem `DcpsRuntime`-Discovery-Hook: SPDP-/SEDP-Receive pusht
517    /// ueber denselben `Arc<Mutex<Vec<crate::runtime::UserSample>>>` ein encoded Sample,
518    /// das hier per `take()`/`read()` ausgelesen wird.
519    #[cfg(feature = "std")]
520    pub(crate) fn new_builtin(
521        topic: Topic<T>,
522        qos: DataReaderQos,
523        subscriber: Arc<SubscriberInner>,
524        inbox: Arc<Mutex<Vec<crate::runtime::UserSample>>>,
525    ) -> Self {
526        Self {
527            topic,
528            qos: Mutex::new(qos),
529            entity_state: crate::entity::EntityState::new(),
530            subscriber,
531            listener: Mutex::new(None),
532            last_match_count: std::sync::atomic::AtomicI64::new(-1),
533            last_requested_deadline_missed: std::sync::atomic::AtomicU64::new(0),
534            last_liveliness_alive: std::sync::atomic::AtomicI64::new(-1),
535            last_liveliness_not_alive: std::sync::atomic::AtomicI64::new(-1),
536            last_requested_incompatible_qos: std::sync::atomic::AtomicI64::new(-1),
537            last_sample_lost: std::sync::atomic::AtomicU64::new(0),
538            last_sample_rejected: std::sync::atomic::AtomicI64::new(-1),
539            inbox,
540            runtime: None,
541            entity_id: None,
542            rx: None,
543            filter: None,
544            instances: InstanceTracker::new(),
545            cache: Arc::new(Mutex::new(Vec::new())),
546            #[cfg(feature = "flatdata-integration")]
547            flat_backend: Mutex::new(None),
548            _t: PhantomData,
549        }
550    }
551
552    /// Setzt einen Content-Filter, der auf jedem Sample im `take()`-
553    /// Pfad evaluiert wird. Rueckgabe `false` verwirft das Sample.
554    ///
555    /// Builder-Stil: `reader.with_filter(|s| s.value > 0)`.
556    ///
557    /// .7a — SQL-Expression-Syntax via `set_filter_expression`
558    /// folgt in .
559    #[must_use]
560    pub fn with_filter<F>(mut self, filter: F) -> Self
561    where
562        F: Fn(&T) -> bool + Send + Sync + 'static,
563    {
564        self.filter = Some(Arc::new(filter));
565        self
566    }
567
568    /// Topic, von dem gelesen wird.
569    #[must_use]
570    pub fn topic(&self) -> &Topic<T> {
571        &self.topic
572    }
573
574    /// Spec §2.2.2.5.3.6 / §2.2.2.1.1 — `InstanceHandle` dieses
575    /// DataReaders. Stabile Identitaet fuer
576    /// `DomainParticipant::contains_entity`.
577    #[must_use]
578    pub fn subscription_handle(&self) -> crate::instance_handle::InstanceHandle {
579        self.entity_state.instance_handle()
580    }
581
582    /// setzt den `DataReaderListener` + StatusMask. `None`
583    /// loescht den Slot. Spec §2.2.2.5.7.x set_listener.
584    #[cfg(feature = "std")]
585    pub fn set_listener(&self, listener: Option<ArcDataReaderListener>, mask: StatusMask) {
586        if let Ok(mut slot) = self.listener.lock() {
587            *slot = listener.map(|l| (l, mask));
588        }
589        self.entity_state.set_listener_mask(mask);
590    }
591
592    /// aktueller Listener-Klon, sofern vorhanden.
593    #[cfg(feature = "std")]
594    #[must_use]
595    pub fn get_listener(&self) -> Option<ArcDataReaderListener> {
596        self.listener
597            .lock()
598            .ok()
599            .and_then(|s| s.as_ref().map(|(l, _)| Arc::clone(l)))
600    }
601
602    /// Snapshot der Bubble-Up-Kette (Reader → Subscriber → Participant)
603    /// fuer Hot-Path-Listener-Dispatch.
604    #[cfg(feature = "std")]
605    #[must_use]
606    pub(crate) fn listener_chain(&self) -> crate::listener_dispatch::ReaderListenerChain {
607        let reader = self
608            .listener
609            .lock()
610            .ok()
611            .and_then(|s| s.as_ref().map(|(l, m)| (Arc::clone(l), *m)));
612        let sub_handle = Subscriber {
613            inner: Arc::clone(&self.subscriber),
614        };
615        sub_handle.snapshot_reader_chain(reader)
616    }
617
618    /// Aktuelle QoS (cloned, .1).
619    #[must_use]
620    pub fn qos(&self) -> DataReaderQos {
621        self.qos.lock().map(|q| q.clone()).unwrap_or_default()
622    }
623
624    /// Nimmt alle zwischengespeicherten Samples und entfernt sie aus
625    /// der Inbox. Liefert leeren Vec wenn nichts da ist.
626    ///
627    /// # Errors
628    /// - `WireError` wenn ein gespeicherter Payload sich nicht mehr
629    ///   decoden laesst (type-eval mismatch).
630    pub fn take(&self) -> Result<Vec<T>> {
631        // Spec §2.2.3.22 ReaderDataLifecycle.autopurge — bei jedem read/take
632        // pruefen, ob abgelaufene Instanzen aus dem Tracker zu entfernen sind.
633        #[cfg(feature = "std")]
634        {
635            let now = get_current_time();
636            let mut empty: Vec<CachedSample> = Vec::new();
637            self.run_reader_autopurge(now, &mut empty);
638        }
639        // Live-Mode: zuerst Staging-Inbox (gefuellt von wait_for_data)
640        // drainen, dann alle noch unpollten Samples aus mpsc ziehen.
641        #[cfg(feature = "std")]
642        if let Some(rx_mu) = self.rx.as_ref() {
643            let mut out = Vec::new();
644            // TimeBasedFilter (Spec §2.2.3.13) min_separation aus QoS lesen,
645            // damit Live-Mode dieselbe Filterung wie ingest_into_cache anwendet.
646            let min_sep_nanos = {
647                let qos = self.qos.lock().unwrap_or_else(|e| e.into_inner());
648                qos.time_based_filter.minimum_separation.to_nanos()
649            };
650            let staged = {
651                let mut inbox = self
652                    .inbox
653                    .lock()
654                    .map_err(|_| DdsError::PreconditionNotMet {
655                        reason: "datareader inbox poisoned",
656                    })?;
657                core::mem::take(&mut *inbox)
658            };
659            for staged_item in staged {
660                match staged_item {
661                    crate::runtime::UserSample::Alive {
662                        payload: bytes,
663                        writer_guid,
664                        writer_strength,
665                    } => {
666                        let sample = T::decode(&bytes).map_err(|e| DdsError::WireError {
667                            message: e.to_string(),
668                        })?;
669                        if !self.sample_passes_filter(&sample) {
670                            continue;
671                        }
672                        if !self.live_mode_time_based_filter_pass(&sample, min_sep_nanos) {
673                            continue;
674                        }
675                        // §2.2.3.23 Exclusive-Ownership-Filter.
676                        if !self.passes_exclusive_ownership(&sample, writer_guid, writer_strength) {
677                            continue;
678                        }
679                        out.push(sample);
680                    }
681                    crate::runtime::UserSample::Lifecycle { .. } => {
682                        // Lifecycle in der Staging-Inbox: in der
683                        // Live-Mode-take()-Schleife wird sie sofort
684                        // unten via __push_lifecycle behandelt — hier
685                        // einfach uebergehen; sie kommt naechste Runde.
686                    }
687                }
688            }
689            let rx = rx_mu.lock().map_err(|_| DdsError::PreconditionNotMet {
690                reason: "datareader rx poisoned",
691            })?;
692            while let Ok(item) = rx.try_recv() {
693                match item {
694                    crate::runtime::UserSample::Alive {
695                        payload: bytes,
696                        writer_guid,
697                        writer_strength,
698                    } => {
699                        let sample = T::decode(&bytes).map_err(|e| DdsError::WireError {
700                            message: e.to_string(),
701                        })?;
702                        if !self.sample_passes_filter(&sample) {
703                            continue;
704                        }
705                        if !self.live_mode_time_based_filter_pass(&sample, min_sep_nanos) {
706                            continue;
707                        }
708                        // §2.2.3.23 Exclusive-Ownership-Filter.
709                        if !self.passes_exclusive_ownership(&sample, writer_guid, writer_strength) {
710                            continue;
711                        }
712                        out.push(sample);
713                    }
714                    crate::runtime::UserSample::Lifecycle { key_hash, kind } => {
715                        // Lifecycle-Marker via __push_lifecycle in den
716                        // Tracker fuettern (Spec §8.2.1.2).
717                        let mut holder_bytes = Vec::with_capacity(16);
718                        holder_bytes.extend_from_slice(&key_hash);
719                        let lc_kind = match kind {
720                            zerodds_rtps::history_cache::ChangeKind::NotAliveDisposed
721                            | zerodds_rtps::history_cache::ChangeKind::NotAliveDisposedUnregistered => {
722                                crate::sample_info::InstanceStateKind::NotAliveDisposed
723                            }
724                            zerodds_rtps::history_cache::ChangeKind::NotAliveUnregistered => {
725                                crate::sample_info::InstanceStateKind::NotAliveNoWriters
726                            }
727                            _ => crate::sample_info::InstanceStateKind::Alive,
728                        };
729                        let _ = self.__push_lifecycle(key_hash, holder_bytes, lc_kind);
730                    }
731                }
732            }
733            return Ok(out);
734        }
735        // Offline-Fallback.
736        let raw = {
737            let mut inbox = self
738                .inbox
739                .lock()
740                .map_err(|_| DdsError::PreconditionNotMet {
741                    reason: "datareader inbox poisoned",
742                })?;
743            core::mem::take(&mut *inbox)
744        };
745        let mut out = Vec::with_capacity(raw.len());
746        for staged_item in raw {
747            let crate::runtime::UserSample::Alive {
748                payload: bytes,
749                writer_guid,
750                writer_strength,
751            } = staged_item
752            else {
753                continue;
754            };
755            let sample = T::decode(&bytes).map_err(|e| DdsError::WireError {
756                message: e.to_string(),
757            })?;
758            if !self.sample_passes_filter(&sample) {
759                continue;
760            }
761            // §2.2.3.23 Exclusive-Ownership-Filter (auch im Offline-
762            // Fallback). Builtin-Inject-Pfad nutzt writer_guid=[0;16]
763            // mit Shared-Ownership-Default; passes_exclusive_ownership
764            // returnt dann immer `true`.
765            if !self.passes_exclusive_ownership(&sample, writer_guid, writer_strength) {
766                continue;
767            }
768            out.push(sample);
769        }
770        Ok(out)
771    }
772
773    /// Hilfsfunktion — evaluiert den Content-Filter wenn gesetzt.
774    fn sample_passes_filter(&self, sample: &T) -> bool {
775        match &self.filter {
776            Some(f) => f(sample),
777            None => true,
778        }
779    }
780
781    /// Spec §2.2.3.23 / §2.2.2.5.5 — Exclusive-Ownership-Filter.
782    ///
783    /// Gibt `true` zurueck wenn das Sample geliefert werden darf:
784    /// - Reader-Ownership-QoS = Shared → immer `true` (kein Filter).
785    /// - Keyless Topic → immer `true` (keine Per-Instance-Owner-State).
786    /// - Sonst: berechnet KeyHash und konsultiert
787    ///   [`instance_tracker::InstanceTracker::should_accept_sample_under_exclusive_ownership`]
788    ///   das pro Instanz den (writer_guid, writer_strength) der bisher
789    ///   gewinnenden Source haelt und Samples schwaecherer Writer rejectet.
790    #[cfg(feature = "std")]
791    fn passes_exclusive_ownership(
792        &self,
793        sample: &T,
794        writer_guid: [u8; 16],
795        writer_strength: i32,
796    ) -> bool {
797        let kind = {
798            let qos = self.qos.lock().unwrap_or_else(|e| e.into_inner());
799            qos.ownership.kind
800        };
801        if kind != zerodds_qos::OwnershipKind::Exclusive {
802            return true;
803        }
804        // Spec §2.2.3.23: Ownership-Resolution greift per-Instanz; bei
805        // keyless Topics behandeln wir das Topic als einzige Instanz mit
806        // synthetischem all-zero KeyHash.
807        let (kh, key_bytes) = if T::HAS_KEY {
808            let mut holder = crate::dds_type::PlainCdr2BeKeyHolder::new();
809            sample.encode_key_holder_be(&mut holder);
810            let kb = holder.as_bytes().to_vec();
811            let max = T::KEY_HOLDER_MAX_SIZE.unwrap_or(usize::MAX);
812            (crate::dds_type::compute_key_hash(&kb, max), kb)
813        } else {
814            ([0u8; 16], Vec::new())
815        };
816        // Instance muss registriert sein, damit der Owner-Tracker den
817        // Slot anlegen kann (`should_accept` returnt sonst `true` bei
818        // unbekannter Instance, was die Filterung umgeht).
819        let _ = self.instances.observe_sample(kh, key_bytes, None);
820        self.instances
821            .should_accept_sample_under_exclusive_ownership(&kh, writer_guid, writer_strength)
822    }
823
824    /// Spec §2.2.3.13 TIME_BASED_FILTER fuer den Live-Mode-Pfad.
825    /// Gibt `true` zurueck, wenn das Sample geliefert werden darf.
826    /// Bei keyless Types oder min_separation=0 immer `true`.
827    /// Bei keyed Types: keyhash via `encode_key_holder_be` berechnen,
828    /// gegen instance_tracker pruefen, und bei `true` direkt
829    /// `record_delivery` aufrufen, damit nachfolgende Samples derselben
830    /// Instanz richtig gefiltert werden.
831    #[cfg(feature = "std")]
832    fn live_mode_time_based_filter_pass(&self, sample: &T, min_sep_nanos: u128) -> bool {
833        if min_sep_nanos == 0 || !T::HAS_KEY {
834            return true;
835        }
836        let mut holder = crate::dds_type::PlainCdr2BeKeyHolder::new();
837        sample.encode_key_holder_be(&mut holder);
838        let key_bytes = holder.as_bytes().to_vec();
839        let max = T::KEY_HOLDER_MAX_SIZE.unwrap_or(usize::MAX);
840        let kh = crate::dds_type::compute_key_hash(&key_bytes, max);
841        let now = get_current_time();
842        if !self
843            .instances
844            .should_deliver_under_time_based_filter(&kh, now, min_sep_nanos)
845        {
846            return false;
847        }
848        let _ = self.instances.observe_sample(kh, key_bytes, Some(now));
849        self.instances.record_delivery(&kh, now);
850        true
851    }
852
853    /// Liest alle Samples ohne sie zu entfernen. aktuell identisch
854    /// zu `take` minus entfernen. Sample-State (`ReadCondition`
855    /// §2.2.2.5.8) folgt im Wire-Up.
856    ///
857    /// # Errors
858    /// Wie `take`.
859    pub fn read(&self) -> Result<Vec<T>> {
860        let raw = {
861            let inbox = self
862                .inbox
863                .lock()
864                .map_err(|_| DdsError::PreconditionNotMet {
865                    reason: "datareader inbox poisoned",
866                })?;
867            inbox.clone()
868        };
869        let mut out = Vec::with_capacity(raw.len());
870        for staged_item in raw {
871            let crate::runtime::UserSample::Alive {
872                payload: bytes,
873                writer_guid,
874                writer_strength,
875            } = staged_item
876            else {
877                continue;
878            };
879            let sample = T::decode(&bytes).map_err(|e| DdsError::WireError {
880                message: e.to_string(),
881            })?;
882            if !self.sample_passes_filter(&sample) {
883                continue;
884            }
885            // §2.2.3.23 Exclusive-Ownership-Filter (auch im Offline-
886            // Fallback). Builtin-Inject-Pfad nutzt writer_guid=[0;16]
887            // mit Shared-Ownership-Default; passes_exclusive_ownership
888            // returnt dann immer `true`.
889            if !self.passes_exclusive_ownership(&sample, writer_guid, writer_strength) {
890                continue;
891            }
892            out.push(sample);
893        }
894        Ok(out)
895    }
896
897    /// Anzahl matched Remote-Writer. Im Offline-Mode immer 0.
898    ///
899    /// Spec: OMG DDS 1.4 §2.2.2.5.3.15 `get_matched_publications`.
900    ///
901    /// Seiteneffekt — bei einer Aenderung des Matched-Count
902    /// gegenueber dem letzten Aufruf wird `on_subscription_matched`
903    /// via Bubble-Up-Kette gefeuert (Spec §2.2.4.2.6.7).
904    #[must_use]
905    pub fn matched_publication_count(&self) -> usize {
906        #[cfg(feature = "std")]
907        if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
908            let n = rt.user_reader_matched_count(eid);
909            self.poll_subscription_matched(n);
910            return n;
911        }
912        0
913    }
914
915    /// Delta-Detect-Helper fuer `on_subscription_matched`.
916    #[cfg(feature = "std")]
917    pub(crate) fn poll_subscription_matched(&self, current: usize) {
918        let curr = current as i64;
919        let prev = self
920            .last_match_count
921            .swap(curr, std::sync::atomic::Ordering::AcqRel);
922        if prev == curr {
923            return;
924        }
925        let total = if curr > prev.max(0) {
926            curr
927        } else {
928            prev.max(0)
929        };
930        let delta = curr - prev.max(0);
931        let status = crate::status::SubscriptionMatchedStatus {
932            total_count: total as i32,
933            total_count_change: delta.max(0) as i32,
934            current_count: curr as i32,
935            current_count_change: delta as i32,
936            last_publication_handle: crate::instance_handle::HANDLE_NIL,
937        };
938        let chain = self.listener_chain();
939        crate::listener_dispatch::dispatch_subscription_matched(
940            &chain,
941            self.entity_state.instance_handle(),
942            status,
943        );
944    }
945
946    /// Delta-Detect fuer `on_requested_deadline_missed`.
947    /// Spec §2.2.4.2.6.4.
948    #[cfg(feature = "std")]
949    pub(crate) fn poll_requested_deadline_missed(&self, current: u64) {
950        let prev = self
951            .last_requested_deadline_missed
952            .swap(current, std::sync::atomic::Ordering::AcqRel);
953        if current == prev {
954            return;
955        }
956        let total_change = current.saturating_sub(prev);
957        let status = crate::status::RequestedDeadlineMissedStatus {
958            total_count: current as i32,
959            total_count_change: total_change as i32,
960            last_instance_handle: crate::instance_handle::HANDLE_NIL,
961        };
962        let chain = self.listener_chain();
963        crate::listener_dispatch::dispatch_requested_deadline_missed(
964            &chain,
965            self.entity_state.instance_handle(),
966            status,
967        );
968    }
969
970    /// Delta-Detect fuer `on_liveliness_changed`. Spec
971    /// §2.2.4.2.6.6. Beachtet beide Counter (alive + not_alive); jeder
972    /// Wechsel triggert genau einmal.
973    #[cfg(feature = "std")]
974    pub(crate) fn poll_liveliness_changed(&self, alive_count: u64, not_alive_count: u64) {
975        let curr_alive = alive_count as i64;
976        let curr_not = not_alive_count as i64;
977        let prev_alive = self
978            .last_liveliness_alive
979            .swap(curr_alive, std::sync::atomic::Ordering::AcqRel);
980        let prev_not = self
981            .last_liveliness_not_alive
982            .swap(curr_not, std::sync::atomic::Ordering::AcqRel);
983        // Erste Beobachtung (prev == -1) zaehlt nur wenn der Counter
984        // ungleich 0 ist; sonst kein triggern.
985        let alive_changed = if prev_alive < 0 {
986            curr_alive != 0
987        } else {
988            prev_alive != curr_alive
989        };
990        let not_changed = if prev_not < 0 {
991            curr_not != 0
992        } else {
993            prev_not != curr_not
994        };
995        if !alive_changed && !not_changed {
996            return;
997        }
998        let alive_delta = if prev_alive < 0 {
999            curr_alive
1000        } else {
1001            curr_alive - prev_alive
1002        };
1003        let not_delta = if prev_not < 0 {
1004            curr_not
1005        } else {
1006            curr_not - prev_not
1007        };
1008        let status = crate::status::LivelinessChangedStatus {
1009            alive_count: curr_alive as i32,
1010            not_alive_count: curr_not as i32,
1011            alive_count_change: alive_delta as i32,
1012            not_alive_count_change: not_delta as i32,
1013            last_publication_handle: crate::instance_handle::HANDLE_NIL,
1014        };
1015        let chain = self.listener_chain();
1016        crate::listener_dispatch::dispatch_liveliness_changed(
1017            &chain,
1018            self.entity_state.instance_handle(),
1019            status,
1020        );
1021    }
1022
1023    /// Delta-Detect fuer `on_requested_incompatible_qos`.
1024    /// Spec §2.2.4.2.6.5.
1025    #[cfg(feature = "std")]
1026    pub(crate) fn poll_requested_incompatible_qos(
1027        &self,
1028        snapshot: crate::status::RequestedIncompatibleQosStatus,
1029    ) {
1030        let curr = i64::from(snapshot.total_count);
1031        let prev = self
1032            .last_requested_incompatible_qos
1033            .swap(curr, std::sync::atomic::Ordering::AcqRel);
1034        if curr == prev {
1035            return;
1036        }
1037        let delta = curr - prev.max(0);
1038        let status = crate::status::RequestedIncompatibleQosStatus {
1039            total_count: curr as i32,
1040            total_count_change: delta.max(0) as i32,
1041            last_policy_id: snapshot.last_policy_id,
1042            policies: snapshot.policies,
1043        };
1044        let chain = self.listener_chain();
1045        crate::listener_dispatch::dispatch_requested_incompatible_qos(
1046            &chain,
1047            self.entity_state.instance_handle(),
1048            status,
1049        );
1050    }
1051
1052    /// Delta-Detect fuer `on_sample_lost`. Spec §2.2.4.2.6.2.
1053    #[cfg(feature = "std")]
1054    pub(crate) fn poll_sample_lost(&self, current: u64) {
1055        let prev = self
1056            .last_sample_lost
1057            .swap(current, std::sync::atomic::Ordering::AcqRel);
1058        if current == prev {
1059            return;
1060        }
1061        let delta = current.saturating_sub(prev);
1062        let status = crate::status::SampleLostStatus {
1063            total_count: current as i32,
1064            total_count_change: delta as i32,
1065        };
1066        let chain = self.listener_chain();
1067        crate::listener_dispatch::dispatch_sample_lost(
1068            &chain,
1069            self.entity_state.instance_handle(),
1070            status,
1071        );
1072    }
1073
1074    /// Delta-Detect fuer `on_sample_rejected`. Spec §2.2.4.2.6.3.
1075    #[cfg(feature = "std")]
1076    pub(crate) fn poll_sample_rejected(&self, snapshot: crate::status::SampleRejectedStatus) {
1077        let curr = i64::from(snapshot.total_count);
1078        let prev = self
1079            .last_sample_rejected
1080            .swap(curr, std::sync::atomic::Ordering::AcqRel);
1081        if curr == prev {
1082            return;
1083        }
1084        let delta = curr - prev.max(0);
1085        let status = crate::status::SampleRejectedStatus {
1086            total_count: curr as i32,
1087            total_count_change: delta.max(0) as i32,
1088            last_reason: snapshot.last_reason,
1089            last_instance_handle: snapshot.last_instance_handle,
1090        };
1091        let chain = self.listener_chain();
1092        crate::listener_dispatch::dispatch_sample_rejected(
1093            &chain,
1094            self.entity_state.instance_handle(),
1095            status,
1096        );
1097    }
1098
1099    /// Blockiert, bis mindestens `min_count` Remote-Writer matched
1100    /// sind oder `timeout` verstreicht. Event-driven via Runtime-Condvar
1101    /// (D.5e Phase-1) — wakup direkt wenn SEDP einen Match propagiert,
1102    /// kein 20-ms-Polling mehr.
1103    ///
1104    /// # Errors
1105    /// [`DdsError::Timeout`] wenn `min_count` im Zeitfenster nicht
1106    /// erreicht wird.
1107    #[cfg(feature = "std")]
1108    pub fn wait_for_matched_publication(
1109        &self,
1110        min_count: usize,
1111        timeout: core::time::Duration,
1112    ) -> Result<()> {
1113        let deadline = std::time::Instant::now() + timeout;
1114        loop {
1115            if self.matched_publication_count() >= min_count {
1116                return Ok(());
1117            }
1118            let now = std::time::Instant::now();
1119            if now >= deadline {
1120                return Err(DdsError::Timeout);
1121            }
1122            // Live-Mode: park auf Runtime-match-event. Spurious wake-ups
1123            // sind fine — wir checken den count auf naechster iteration.
1124            if let Some(rt) = self.runtime.as_ref() {
1125                let _ = rt.wait_match_event(deadline - now);
1126            } else {
1127                // Offline-Mode: keine Match-Events, sleep-fallback.
1128                std::thread::sleep(core::time::Duration::from_millis(20));
1129            }
1130        }
1131    }
1132
1133    /// Counter fuer requested-Deadline-Verletzungen (Spec
1134    /// §2.2.4.2.11 `REQUESTED_DEADLINE_MISSED_STATUS`). Monoton steigend;
1135    /// steigt um 1 pro abgelaufenem Deadline-Fenster ohne empfangenes
1136    /// Sample. Offline / INFINITE → 0.
1137    ///
1138    /// feuert ggf. `on_requested_deadline_missed`.
1139    #[must_use]
1140    pub fn requested_deadline_missed_count(&self) -> u64 {
1141        #[cfg(feature = "std")]
1142        if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
1143            let n = rt.user_reader_requested_deadline_missed(eid);
1144            self.poll_requested_deadline_missed(n);
1145            return n;
1146        }
1147        0
1148    }
1149
1150    /// aktueller `RequestedIncompatibleQosStatus`. Spec
1151    /// §2.2.4.2.6.5. Triggert ggf. `on_requested_incompatible_qos`.
1152    #[must_use]
1153    pub fn requested_incompatible_qos_status(
1154        &self,
1155    ) -> crate::status::RequestedIncompatibleQosStatus {
1156        #[cfg(feature = "std")]
1157        if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
1158            let s = rt.user_reader_requested_incompatible_qos(eid);
1159            self.poll_requested_incompatible_qos(s.clone());
1160            return s;
1161        }
1162        crate::status::RequestedIncompatibleQosStatus::default()
1163    }
1164
1165    /// SampleLost-Counter. Spec §2.2.4.2.6.2.
1166    #[must_use]
1167    pub fn sample_lost_count(&self) -> u64 {
1168        #[cfg(feature = "std")]
1169        if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
1170            let n = rt.user_reader_sample_lost(eid);
1171            self.poll_sample_lost(n);
1172            return n;
1173        }
1174        0
1175    }
1176
1177    /// SampleRejected-Status. Spec §2.2.4.2.6.3.
1178    #[must_use]
1179    pub fn sample_rejected_status(&self) -> crate::status::SampleRejectedStatus {
1180        #[cfg(feature = "std")]
1181        if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
1182            let s = rt.user_reader_sample_rejected(eid);
1183            self.poll_sample_rejected(s);
1184            return s;
1185        }
1186        crate::status::SampleRejectedStatus::default()
1187    }
1188
1189    /// pollt alle Reader-Statuses einmal und feuert pending
1190    /// Listener. Convenience-Helper fuer Tests + periodische Tick-Aufrufer.
1191    #[cfg(feature = "std")]
1192    pub fn drive_listeners(&self) {
1193        let _ = self.matched_publication_count();
1194        let _ = self.requested_deadline_missed_count();
1195        let (_, alive, not_alive) = self.liveliness_changed_status();
1196        self.poll_liveliness_changed(alive, not_alive);
1197        let _ = self.requested_incompatible_qos_status();
1198        let _ = self.sample_lost_count();
1199        let _ = self.sample_rejected_status();
1200    }
1201
1202    /// Liveliness-Status des matched Writers (Spec §2.2.4.2.14
1203    /// `LIVELINESS_CHANGED_STATUS`): `(alive, alive_count, not_alive_count)`.
1204    ///
1205    /// * `alive`: aktueller Zustand (true = Writer hat Sample innerhalb
1206    ///   seiner Lease-Duration geliefert).
1207    /// * `alive_count`: Zaehler der "not_alive → alive"-Transitions.
1208    /// * `not_alive_count`: Zaehler der "alive → not_alive"-Transitions.
1209    ///
1210    /// Offline / INFINITE-Lease → `(false, 0, 0)` / `(true, 0, 0)` je
1211    /// nach Init. Fuer v1.3 wird nur `LivelinessKind::Automatic` ueberwacht.
1212    #[must_use]
1213    pub fn liveliness_changed_status(&self) -> (bool, u64, u64) {
1214        #[cfg(feature = "std")]
1215        if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
1216            let triple = rt.user_reader_liveliness_status(eid);
1217            // Listener-Trigger via Delta-Detection.
1218            self.poll_liveliness_changed(triple.1, triple.2);
1219            return triple;
1220        }
1221        (false, 0, 0)
1222    }
1223
1224    /// Blockiert, bis mindestens ein Sample verfuegbar ist oder der
1225    /// Timeout abgelaufen ist. Das Sample wird dabei nicht entnommen —
1226    /// es wird in einen Staging-Buffer gelegt, den der naechste `take()`
1227    /// ausliest. Damit bleibt `wait_for_data` + `take()` der kanonische
1228    /// Subscriber-Loop, statt busy-polling im Application-Code.
1229    ///
1230    /// Spec-Analog: OMG DDS 1.4 §2.2.2.5.8 `ReadCondition` + `WaitSet`.
1231    /// Diese API liefert die wichtigste Semantik (wake-on-data) ohne die
1232    /// komplette WaitSet/Condition-Infrastruktur.
1233    ///
1234    /// # Errors
1235    /// [`DdsError::Timeout`] wenn im Zeitfenster nichts ankommt.
1236    #[cfg(feature = "std")]
1237    pub fn wait_for_data(&self, timeout: core::time::Duration) -> Result<()> {
1238        let Some(rx_mu) = self.rx.as_ref() else {
1239            // Offline-Mode: wenn inbox schon was hat, OK, sonst Timeout.
1240            let inbox_has = self.inbox.lock().map(|i| !i.is_empty()).unwrap_or(false);
1241            if inbox_has {
1242                return Ok(());
1243            }
1244            return Err(DdsError::Timeout);
1245        };
1246
1247        // Schon was in der Staging-Inbox?
1248        {
1249            let inbox = self
1250                .inbox
1251                .lock()
1252                .map_err(|_| DdsError::PreconditionNotMet {
1253                    reason: "datareader inbox poisoned",
1254                })?;
1255            if !inbox.is_empty() {
1256                return Ok(());
1257            }
1258        }
1259
1260        let rx = rx_mu.lock().map_err(|_| DdsError::PreconditionNotMet {
1261            reason: "datareader rx poisoned",
1262        })?;
1263        let result = match rx.recv_timeout(timeout) {
1264            Ok(item) => {
1265                match item {
1266                    sample @ crate::runtime::UserSample::Alive { .. } => {
1267                        let mut inbox =
1268                            self.inbox
1269                                .lock()
1270                                .map_err(|_| DdsError::PreconditionNotMet {
1271                                    reason: "datareader inbox poisoned",
1272                                })?;
1273                        inbox.push(sample);
1274                    }
1275                    crate::runtime::UserSample::Lifecycle { key_hash, kind } => {
1276                        let lc_kind = match kind {
1277                            zerodds_rtps::history_cache::ChangeKind::NotAliveDisposed
1278                            | zerodds_rtps::history_cache::ChangeKind::NotAliveDisposedUnregistered => {
1279                                crate::sample_info::InstanceStateKind::NotAliveDisposed
1280                            }
1281                            zerodds_rtps::history_cache::ChangeKind::NotAliveUnregistered => {
1282                                crate::sample_info::InstanceStateKind::NotAliveNoWriters
1283                            }
1284                            _ => crate::sample_info::InstanceStateKind::Alive,
1285                        };
1286                        let mut holder_bytes = Vec::with_capacity(16);
1287                        holder_bytes.extend_from_slice(&key_hash);
1288                        let _ = self.__push_lifecycle(key_hash, holder_bytes, lc_kind);
1289                    }
1290                }
1291                Ok(())
1292            }
1293            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => Err(DdsError::Timeout),
1294            Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
1295                Err(DdsError::PreconditionNotMet {
1296                    reason: "datareader rx disconnected",
1297                })
1298            }
1299        };
1300        // Lock zuerst freigeben, dann Listener feuern (
1301        // Lock-Discipline).
1302        drop(rx);
1303        if result.is_ok() {
1304            self.notify_data_arrived();
1305        }
1306        result
1307    }
1308
1309    /// Builtin-Topic-Helper: gibt den Arc auf die geteilte Inbox
1310    /// zurueck (Reader-Klone teilen sich denselben Buffer).
1311    #[doc(hidden)]
1312    #[cfg(feature = "std")]
1313    pub fn __inbox_handle(&self) -> Arc<Mutex<Vec<crate::runtime::UserSample>>> {
1314        Arc::clone(&self.inbox)
1315    }
1316
1317    /// Test-Helper: fuegt einen encoded Payload in die Inbox ein.
1318    /// In Runtime wird das durch den ReliableReader-Delivery-Pfad
1319    /// ersetzt.
1320    ///
1321    /// triggert die Listener-Bubble-Up-Kette
1322    /// `on_data_on_readers` (Subscriber-Stage) und `on_data_available`
1323    /// (Reader-Stage). Spec §2.2.4.2.7.1 / §2.2.4.2.6.1.
1324    #[doc(hidden)]
1325    pub fn __push_raw(&self, bytes: Vec<u8>) -> Result<()> {
1326        self.__push_raw_with_writer(bytes, [0u8; 16], 0)
1327    }
1328
1329    /// Test-Hook: pusht ein Sample mit explizitem Writer-GUID und
1330    /// `ownership_strength` in die Inbox. Wird vom Cyclone-Interop-
1331    /// Harness und den Exclusive-Ownership-Tests benutzt.
1332    #[doc(hidden)]
1333    pub fn __push_raw_with_writer(
1334        &self,
1335        bytes: Vec<u8>,
1336        writer_guid: [u8; 16],
1337        writer_strength: i32,
1338    ) -> Result<()> {
1339        {
1340            let mut inbox = self
1341                .inbox
1342                .lock()
1343                .map_err(|_| DdsError::PreconditionNotMet {
1344                    reason: "datareader inbox poisoned",
1345                })?;
1346            inbox.push(crate::runtime::UserSample::Alive {
1347                payload: bytes,
1348                writer_guid,
1349                writer_strength,
1350            });
1351        }
1352        // Listener-Notify ausserhalb des Inbox-Locks, um Re-Entrancy
1353        // zu vermeiden.
1354        self.notify_data_arrived();
1355        Ok(())
1356    }
1357
1358    /// ruft die `on_data_on_readers`- und
1359    /// `on_data_available`-Bubble-Up-Pfade. Spec §2.2.4.1: pro
1360    /// neuem Sample wird `data_on_readers` (Subscriber-Level) und
1361    /// `data_available` (Reader-Level) als unabhaengige Statuses
1362    /// gesetzt; wenn der Subscriber `data_on_readers` konsumiert
1363    /// hat, soll `data_available` *nicht* unterdrueckt werden — die
1364    /// beiden Status sind getrennte Bits in der Mask.
1365    #[cfg(feature = "std")]
1366    pub(crate) fn notify_data_arrived(&self) {
1367        let chain = self.listener_chain();
1368        let reader_handle = self.entity_state.instance_handle();
1369        crate::listener_dispatch::dispatch_data_on_readers(&chain, reader_handle);
1370        crate::listener_dispatch::dispatch_data_available(&chain, reader_handle);
1371    }
1372
1373    // ========================================================================
1374    // SampleInfo-Statechart + Instance-Lifecycle.
1375    // Spec §2.2.2.5.1, §2.2.2.5.3.{5,27,28}.
1376    // ========================================================================
1377
1378    /// Liefert den aktuellen [`InstanceTracker`] (geteilt mit der
1379    /// internen Buchhaltung). Hauptsaechlich fuer Tests / Inspection.
1380    #[cfg(feature = "std")]
1381    #[must_use]
1382    pub fn instance_tracker(&self) -> InstanceTracker {
1383        self.instances.clone()
1384    }
1385
1386    /// Liefert (Runtime, EntityId), wenn der Reader im Live-Mode laeuft.
1387    /// Cross-Crate-Hook fuer Async-Layer (dcps-async), der den Waker-
1388    /// Slot direkt registrieren muss.
1389    #[doc(hidden)]
1390    #[cfg(feature = "std")]
1391    pub fn runtime_handle(
1392        &self,
1393    ) -> Option<(alloc::sync::Arc<crate::runtime::DcpsRuntime>, EntityId)> {
1394        match (&self.runtime, self.entity_id) {
1395            (Some(rt), Some(eid)) => Some((alloc::sync::Arc::clone(rt), eid)),
1396            _ => None,
1397        }
1398    }
1399
1400    /// Spec §2.2.3.23 — Hook fuer "Writer X hat Liveliness verloren".
1401    /// Macht zwei Dinge:
1402    ///   1. clear OWNERSHIP=EXCLUSIVE-Owner fuer alle Instanzen, deren
1403    ///      Owner dieser Writer war (so dass der naechste Sample eines
1404    ///      anderen Writers via `should_accept_sample_under_exclusive_ownership`
1405    ///      neu gewinnen kann);
1406    ///   2. liefert die Anzahl betroffener Instanzen zurueck.
1407    ///
1408    /// Wird aus dem WLP-Pfad gerufen, sobald ein Writer-Lease abgelaufen
1409    /// ist (siehe `wlp::WlpEndpoint::lost_peers`).
1410    #[must_use]
1411    pub fn notify_writer_liveliness_lost(&self, writer_guid: [u8; 16]) -> usize {
1412        self.instances.clear_owner_for_writer(writer_guid)
1413    }
1414
1415    /// Wie [`Self::notify_writer_liveliness_lost`], aber Match nur ueber
1416    /// die ersten 12 Bytes (GuidPrefix). Erlaubt Failover, wenn nur die
1417    /// Participant-Identitaet (z.B. bei SPDP-Lease-Expiry) bekannt ist.
1418    #[must_use]
1419    pub fn notify_participant_liveliness_lost(&self, prefix: [u8; 12]) -> usize {
1420        self.instances.clear_owner_for_writer_prefix(prefix)
1421    }
1422
1423    /// Macht aus einem Sample-Wert den dazugehoerigen lokalen
1424    /// [`InstanceHandle`], oder [`HANDLE_NIL`] wenn unbekannt /
1425    /// non-keyed. Spec §2.2.2.5.3.26 `lookup_instance` (Reader-Variante).
1426    #[cfg(feature = "std")]
1427    #[must_use]
1428    pub fn lookup_instance(&self, instance: &T) -> InstanceHandle {
1429        if !T::HAS_KEY {
1430            return HANDLE_NIL;
1431        }
1432        let mut holder = crate::dds_type::PlainCdr2BeKeyHolder::new();
1433        instance.encode_key_holder_be(&mut holder);
1434        let bytes = holder.as_bytes();
1435        let max = T::KEY_HOLDER_MAX_SIZE.unwrap_or(usize::MAX);
1436        let kh = crate::dds_type::compute_key_hash(bytes, max);
1437        self.instances.lookup(&kh).unwrap_or(HANDLE_NIL)
1438    }
1439
1440    /// Spec §2.2.2.5.3.25 `get_key_value`. Liefert den Sample-Wert mit
1441    /// nur den `@key`-Feldern befuellt (rekonstruiert aus dem
1442    /// gespeicherten Key-Holder via `T::decode`).
1443    ///
1444    /// # Errors
1445    /// `BadParameter` wenn `handle` unbekannt; `WireError` wenn
1446    /// `T::decode` den Key-Stream nicht rekonstruieren kann.
1447    #[cfg(feature = "std")]
1448    pub fn get_key_value(&self, handle: InstanceHandle) -> Result<T> {
1449        let Some(bytes) = self.instances.get_key_holder(handle) else {
1450            return Err(DdsError::BadParameter {
1451                what: "unknown instance handle",
1452            });
1453        };
1454        T::decode(&bytes).map_err(|e| DdsError::WireError {
1455            message: alloc::string::ToString::to_string(&e),
1456        })
1457    }
1458
1459    /// Drainiert alle pending Bytes aus rx + inbox in den internen
1460    /// Sample-Cache. Dabei wird pro Sample der KeyHash berechnet, die
1461    /// Instanz registriert (falls neu) und ein passendes [`SampleInfo`]
1462    /// erzeugt.
1463    ///
1464    /// Wird automatisch von den `*_with_info`/`*_instance`-APIs
1465    /// aufgerufen.
1466    #[cfg(feature = "std")]
1467    fn ingest_into_cache(&self) -> Result<()> {
1468        // Schritt 1: alle eingehenden Samples einsammeln. `raw` traegt
1469        // (bytes, writer_guid, writer_strength) damit der Exclusive-
1470        // Ownership-Filter (DDS 1.4 §2.2.3.23) anwendbar ist.
1471        let mut raw: Vec<(Vec<u8>, [u8; 16], i32)> = Vec::new();
1472        {
1473            let mut inbox = self
1474                .inbox
1475                .lock()
1476                .map_err(|_| DdsError::PreconditionNotMet {
1477                    reason: "datareader inbox poisoned",
1478                })?;
1479            for item in inbox.drain(..) {
1480                if let crate::runtime::UserSample::Alive {
1481                    payload,
1482                    writer_guid,
1483                    writer_strength,
1484                } = item
1485                {
1486                    raw.push((payload, writer_guid, writer_strength));
1487                }
1488            }
1489        }
1490        // Live-Mode-Channel: Alive-Samples in `raw` einreihen,
1491        // Lifecycle-Marker direkt via __push_lifecycle behandeln.
1492        let mut lifecycle_pending: Vec<(
1493            crate::instance_tracker::KeyHash,
1494            crate::sample_info::InstanceStateKind,
1495        )> = Vec::new();
1496        if let Some(rx_mu) = self.rx.as_ref() {
1497            let rx = rx_mu.lock().map_err(|_| DdsError::PreconditionNotMet {
1498                reason: "datareader rx poisoned",
1499            })?;
1500            while let Ok(item) = rx.try_recv() {
1501                match item {
1502                    crate::runtime::UserSample::Alive {
1503                        payload: bytes,
1504                        writer_guid,
1505                        writer_strength,
1506                    } => raw.push((bytes, writer_guid, writer_strength)),
1507                    crate::runtime::UserSample::Lifecycle { key_hash, kind } => {
1508                        let lc_kind = match kind {
1509                            zerodds_rtps::history_cache::ChangeKind::NotAliveDisposed
1510                            | zerodds_rtps::history_cache::ChangeKind::NotAliveDisposedUnregistered => {
1511                                crate::sample_info::InstanceStateKind::NotAliveDisposed
1512                            }
1513                            zerodds_rtps::history_cache::ChangeKind::NotAliveUnregistered => {
1514                                crate::sample_info::InstanceStateKind::NotAliveNoWriters
1515                            }
1516                            _ => crate::sample_info::InstanceStateKind::Alive,
1517                        };
1518                        lifecycle_pending.push((key_hash, lc_kind));
1519                    }
1520                }
1521            }
1522        }
1523        // Lifecycle-Marker erst NACH Drain anwenden, damit der Lock-Pfad
1524        // sauber bleibt (__push_lifecycle nimmt eigene Locks).
1525        for (kh, lc_kind) in lifecycle_pending {
1526            let mut holder_bytes = Vec::with_capacity(16);
1527            holder_bytes.extend_from_slice(&kh);
1528            let _ = self.__push_lifecycle(kh, holder_bytes, lc_kind);
1529        }
1530        let now = get_current_time();
1531        let mut cache = self
1532            .cache
1533            .lock()
1534            .map_err(|_| DdsError::PreconditionNotMet {
1535                reason: "datareader cache poisoned",
1536            })?;
1537        if raw.is_empty() {
1538            // Auch ohne neue Bytes muss autopurge laufen, sonst verfallen
1539            // disposed/nowriter-Instanzen nie ausserhalb von Sample-Zufluss.
1540            self.run_reader_autopurge(now, &mut cache);
1541            return Ok(());
1542        }
1543        for (bytes, writer_guid, writer_strength) in raw {
1544            // Decode T um (a) den Filter zu evaluieren und (b) den
1545            // KeyHash zu berechnen.
1546            let sample = T::decode(&bytes).map_err(|e| DdsError::WireError {
1547                message: alloc::string::ToString::to_string(&e),
1548            })?;
1549            if !self.sample_passes_filter(&sample) {
1550                continue;
1551            }
1552            // §2.2.3.23 Exclusive-Ownership-Filter: rejecte Samples
1553            // schwaecherer Writer bevor sie in den Cache wandern.
1554            if !self.passes_exclusive_ownership(&sample, writer_guid, writer_strength) {
1555                continue;
1556            }
1557            let info = if T::HAS_KEY {
1558                let mut holder = crate::dds_type::PlainCdr2BeKeyHolder::new();
1559                sample.encode_key_holder_be(&mut holder);
1560                let key_bytes = holder.as_bytes().to_vec();
1561                let max = T::KEY_HOLDER_MAX_SIZE.unwrap_or(usize::MAX);
1562                let kh = crate::dds_type::compute_key_hash(&key_bytes, max);
1563                // QoS-Filter VOR observe_sample, damit verworfene Samples
1564                // den Sample-Zustand nicht beeinflussen.
1565                let (min_sep_nanos, by_source_ts) = {
1566                    let qos = self.qos.lock().unwrap_or_else(|e| e.into_inner());
1567                    (
1568                        qos.time_based_filter.minimum_separation.to_nanos(),
1569                        qos.destination_order.kind
1570                            == zerodds_qos::DestinationOrderKind::BySourceTimestamp,
1571                    )
1572                };
1573                // Spec §2.2.3.13 TIME_BASED_FILTER: drop, wenn weniger als
1574                // minimum_separation seit dem letzten gelieferten Sample
1575                // dieser Instanz vergangen ist.
1576                if !self
1577                    .instances
1578                    .should_deliver_under_time_based_filter(&kh, now, min_sep_nanos)
1579                {
1580                    continue;
1581                }
1582                // Spec §2.2.3.18 DESTINATION_ORDER: bei BY_SOURCE_TIMESTAMP
1583                // nur Samples mit strikt groesserem source_ts liefern,
1584                // sonst out-of-order Resolution greift.
1585                if !self
1586                    .instances
1587                    .should_deliver_under_destination_order(&kh, now, by_source_ts)
1588                {
1589                    continue;
1590                }
1591                let (handle, _) = self.instances.observe_sample(kh, key_bytes, Some(now));
1592                self.instances.record_delivery(&kh, now);
1593                let state = match self.instances.get_by_handle(handle) {
1594                    Some(s) => s,
1595                    None => continue, // sollte nie passieren — defensiv
1596                };
1597                SampleInfo {
1598                    sample_state: SampleStateKind::NotRead,
1599                    view_state: if state.reader_view_new {
1600                        ViewStateKind::New
1601                    } else {
1602                        ViewStateKind::NotNew
1603                    },
1604                    instance_state: state.kind,
1605                    disposed_generation_count: state.disposed_generation_count,
1606                    no_writers_generation_count: state.no_writers_generation_count,
1607                    source_timestamp: now,
1608                    instance_handle: handle,
1609                    valid_data: true,
1610                    ..SampleInfo::default()
1611                }
1612            } else {
1613                // Non-keyed Topics: ein "Pseudo-Handle" pro Sample
1614                // waere overkill — wir lassen es bei HANDLE_NIL (Spec
1615                // §2.2.2.5.1.10 erlaubt das, weil die Instance-Sicht
1616                // fuer non-keyed Topics formal "alles eine Instanz" ist).
1617                SampleInfo {
1618                    sample_state: SampleStateKind::NotRead,
1619                    view_state: ViewStateKind::NotNew,
1620                    instance_handle: HANDLE_NIL,
1621                    source_timestamp: now,
1622                    valid_data: true,
1623                    ..SampleInfo::default()
1624                }
1625            };
1626            cache.push(CachedSample {
1627                bytes: Some(bytes),
1628                info,
1629            });
1630        }
1631        // Spec §2.2.3.22 ReaderDataLifecycle: Instanzen, die laenger als
1632        // autopurge_*_samples_delay in NotAlive-Disposed bzw. NotAlive-
1633        // NoWriters sind, aus dem Tracker und Cache entfernen.
1634        self.run_reader_autopurge(now, &mut cache);
1635        Ok(())
1636    }
1637
1638    /// Wendet `ReaderDataLifecycle.autopurge_*` an: entfernt abgelaufene
1639    /// Instanzen aus Tracker + Cache. Aufgerufen von `ingest_into_cache`
1640    /// und beim Einlesen ohne neue Bytes.
1641    #[cfg(feature = "std")]
1642    fn run_reader_autopurge(&self, now: Time, cache: &mut Vec<CachedSample>) {
1643        let (purge_disp, purge_now) = {
1644            let qos = self.qos.lock().unwrap_or_else(|e| e.into_inner());
1645            (
1646                qos.reader_data_lifecycle
1647                    .autopurge_disposed_samples_delay
1648                    .to_nanos(),
1649                qos.reader_data_lifecycle
1650                    .autopurge_nowriter_samples_delay
1651                    .to_nanos(),
1652            )
1653        };
1654        if purge_disp == u128::MAX && purge_now == u128::MAX {
1655            return;
1656        }
1657        let purged = self.instances.autopurge(now, purge_disp, purge_now);
1658        if purged > 0 {
1659            cache.retain(|s| {
1660                s.info.instance_handle.is_nil()
1661                    || self
1662                        .instances
1663                        .get_by_handle(s.info.instance_handle)
1664                        .is_some()
1665            });
1666        }
1667    }
1668
1669    /// Push eines reinen Lifecycle-Markers (Dispose / Unregister)
1670    /// in den Cache. Wird von der Runtime aufgerufen, sobald ein Writer
1671    /// `dispose`/`unregister_instance` schickt.
1672    #[cfg(feature = "std")]
1673    #[doc(hidden)]
1674    pub fn __push_lifecycle(
1675        &self,
1676        keyhash: crate::instance_tracker::KeyHash,
1677        key_holder: Vec<u8>,
1678        kind: InstanceStateKind,
1679    ) -> Result<()> {
1680        let now = get_current_time();
1681        // Erst die Instanz im Tracker im richtigen Zustand bringen.
1682        // observe_sample registriert sie ggf. neu und macht sie alive.
1683        let (handle, _) = self
1684            .instances
1685            .observe_sample(keyhash, key_holder, Some(now));
1686        match kind {
1687            InstanceStateKind::NotAliveDisposed => {
1688                self.instances.dispose(handle, Some(now));
1689            }
1690            InstanceStateKind::NotAliveNoWriters => {
1691                self.instances.unregister(handle, Some(now));
1692            }
1693            InstanceStateKind::Alive => {}
1694        }
1695        let Some(state) = self.instances.get_by_handle(handle) else {
1696            return Ok(()); // sollte nie passieren — defensiv
1697        };
1698        let info = SampleInfo {
1699            source_timestamp: now,
1700            valid_data: false,
1701            instance_handle: handle,
1702            instance_state: state.kind,
1703            disposed_generation_count: state.disposed_generation_count,
1704            no_writers_generation_count: state.no_writers_generation_count,
1705            view_state: if state.reader_view_new {
1706                ViewStateKind::New
1707            } else {
1708                ViewStateKind::NotNew
1709            },
1710            ..SampleInfo::default()
1711        };
1712        let mut cache = self
1713            .cache
1714            .lock()
1715            .map_err(|_| DdsError::PreconditionNotMet {
1716                reason: "datareader cache poisoned",
1717            })?;
1718        cache.push(CachedSample { bytes: None, info });
1719        Ok(())
1720    }
1721
1722    /// `take` mit voller [`SampleInfo`]. Spec §2.2.2.5.3.5
1723    /// `take`. Konsumiert die Samples aus dem Cache (`NOT_READ → READ`-
1724    /// Transition entfaellt, weil sie weg sind).
1725    ///
1726    /// # Errors
1727    /// Wie [`Self::take`].
1728    #[cfg(feature = "std")]
1729    pub fn take_with_info(&self) -> Result<Vec<Sample<T>>> {
1730        self.take_filtered(
1731            sample_state_mask::ANY,
1732            view_state_mask::ANY,
1733            instance_state_mask::ANY,
1734        )
1735    }
1736
1737    /// `read` mit voller [`SampleInfo`]. Konsumiert nicht — markiert
1738    /// die Samples nur als `READ` (Spec §2.2.2.5.3.4).
1739    ///
1740    /// # Errors
1741    /// Wie [`Self::read`].
1742    #[cfg(feature = "std")]
1743    pub fn read_with_info(&self) -> Result<Vec<Sample<T>>> {
1744        self.read_filtered(
1745            sample_state_mask::ANY,
1746            view_state_mask::ANY,
1747            instance_state_mask::ANY,
1748        )
1749    }
1750
1751    /// `take` mit State-Masken (Spec §2.2.2.5.3.6 `take_w_condition`).
1752    ///
1753    /// # Errors
1754    /// Wie [`Self::take`].
1755    #[cfg(feature = "std")]
1756    pub fn take_filtered(
1757        &self,
1758        sample_mask: u32,
1759        view_mask: u32,
1760        instance_mask: u32,
1761    ) -> Result<Vec<Sample<T>>> {
1762        self.ingest_into_cache()?;
1763        let mut cache = self
1764            .cache
1765            .lock()
1766            .map_err(|_| DdsError::PreconditionNotMet {
1767                reason: "datareader cache poisoned",
1768            })?;
1769        let mut out = Vec::new();
1770        let mut keep = Vec::with_capacity(cache.len());
1771        for s in cache.drain(..) {
1772            if s.info.matches_states(sample_mask, view_mask, instance_mask) {
1773                let sample = self.materialize(s)?;
1774                self.instances.mark_view_seen(sample.info.instance_handle);
1775                if sample.info.instance_handle != HANDLE_NIL {
1776                    self.instances.drain_samples(sample.info.instance_handle, 1);
1777                }
1778                out.push(sample);
1779            } else {
1780                keep.push(s);
1781            }
1782        }
1783        *cache = keep;
1784        Ok(out)
1785    }
1786
1787    /// `read` mit State-Masken (Spec §2.2.2.5.3.3 `read_w_condition`).
1788    ///
1789    /// # Errors
1790    /// Wie [`Self::read`].
1791    #[cfg(feature = "std")]
1792    pub fn read_filtered(
1793        &self,
1794        sample_mask: u32,
1795        view_mask: u32,
1796        instance_mask: u32,
1797    ) -> Result<Vec<Sample<T>>> {
1798        self.ingest_into_cache()?;
1799        let mut cache = self
1800            .cache
1801            .lock()
1802            .map_err(|_| DdsError::PreconditionNotMet {
1803                reason: "datareader cache poisoned",
1804            })?;
1805        let mut out = Vec::with_capacity(cache.len());
1806        for s in cache.iter_mut() {
1807            if !s.info.matches_states(sample_mask, view_mask, instance_mask) {
1808                continue;
1809            }
1810            // Snapshot bauen (mit aktueller Sample-State-Sicht).
1811            let snapshot = Sample::new(
1812                self.decode_or_keyholder(s.bytes.as_deref(), s.info.instance_handle)?,
1813                s.info,
1814            );
1815            // Sample-State Transition NOT_READ → READ (Spec §2.2.2.5.3.4).
1816            s.info.sample_state = SampleStateKind::Read;
1817            self.instances.mark_view_seen(s.info.instance_handle);
1818            out.push(snapshot);
1819        }
1820        Ok(out)
1821    }
1822
1823    /// `read_w_condition` (Spec §2.2.2.5.3.7) — wendet zusaetzlich zur
1824    /// State-Mask den SQL-Filter der QueryCondition pro Sample an.
1825    /// Samples bleiben im Cache (Sample-State NOT_READ → READ).
1826    ///
1827    /// # Errors
1828    /// `PreconditionNotMet` bei Lock-Poisoning oder SQL-Eval-Fehler.
1829    #[cfg(feature = "std")]
1830    pub fn read_w_condition(
1831        &self,
1832        condition: &Arc<crate::condition::QueryCondition>,
1833    ) -> Result<Vec<Sample<T>>> {
1834        let base = condition.base();
1835        let sample_mask = base.get_sample_state_mask();
1836        let view_mask = base.get_view_state_mask();
1837        let instance_mask = base.get_instance_state_mask();
1838
1839        self.ingest_into_cache()?;
1840        let mut cache = self
1841            .cache
1842            .lock()
1843            .map_err(|_| DdsError::PreconditionNotMet {
1844                reason: "datareader cache poisoned",
1845            })?;
1846        let mut out = Vec::with_capacity(cache.len());
1847        for s in cache.iter_mut() {
1848            if !s.info.matches_states(sample_mask, view_mask, instance_mask) {
1849                continue;
1850            }
1851            let decoded = self.decode_or_keyholder(s.bytes.as_deref(), s.info.instance_handle)?;
1852            let row = crate::dds_type::DdsTypeRow::new(&decoded);
1853            // Filter-Eval-Fehler -> Sample wird abgelehnt (Spec: "filter
1854            // expression false" Semantik), aber wir propagieren keinen
1855            // harten Error nach oben, ausser Lock-Poisoning.
1856            if !condition.evaluate(&row).unwrap_or(false) {
1857                continue;
1858            }
1859            let snapshot = Sample::new(decoded, s.info);
1860            s.info.sample_state = SampleStateKind::Read;
1861            self.instances.mark_view_seen(s.info.instance_handle);
1862            out.push(snapshot);
1863        }
1864        Ok(out)
1865    }
1866
1867    /// `take_w_condition` (Spec §2.2.2.5.3.8) — wie `read_w_condition`,
1868    /// aber konsumiert die Samples (entfernt aus dem Cache).
1869    ///
1870    /// # Errors
1871    /// `PreconditionNotMet` bei Lock-Poisoning oder SQL-Eval-Fehler.
1872    #[cfg(feature = "std")]
1873    pub fn take_w_condition(
1874        &self,
1875        condition: &Arc<crate::condition::QueryCondition>,
1876    ) -> Result<Vec<Sample<T>>> {
1877        let base = condition.base();
1878        let sample_mask = base.get_sample_state_mask();
1879        let view_mask = base.get_view_state_mask();
1880        let instance_mask = base.get_instance_state_mask();
1881
1882        self.ingest_into_cache()?;
1883        let mut cache = self
1884            .cache
1885            .lock()
1886            .map_err(|_| DdsError::PreconditionNotMet {
1887                reason: "datareader cache poisoned",
1888            })?;
1889        let mut out = Vec::new();
1890        let mut keep = Vec::with_capacity(cache.len());
1891        for s in cache.drain(..) {
1892            if !s.info.matches_states(sample_mask, view_mask, instance_mask) {
1893                keep.push(s);
1894                continue;
1895            }
1896            let decoded = self.decode_or_keyholder(s.bytes.as_deref(), s.info.instance_handle)?;
1897            let row = crate::dds_type::DdsTypeRow::new(&decoded);
1898            if !condition.evaluate(&row).unwrap_or(false) {
1899                keep.push(s);
1900                continue;
1901            }
1902            let sample = Sample::new(decoded, s.info);
1903            self.instances.mark_view_seen(sample.info.instance_handle);
1904            if sample.info.instance_handle != HANDLE_NIL {
1905                self.instances.drain_samples(sample.info.instance_handle, 1);
1906            }
1907            out.push(sample);
1908        }
1909        *cache = keep;
1910        Ok(out)
1911    }
1912
1913    /// `read_instance` (Spec §2.2.2.5.3.27). Liefert nur Samples der
1914    /// angegebenen Instanz.
1915    ///
1916    /// # Errors
1917    /// `BadParameter` wenn `handle == HANDLE_NIL`.
1918    #[cfg(feature = "std")]
1919    pub fn read_instance(&self, handle: InstanceHandle) -> Result<Vec<Sample<T>>> {
1920        if handle.is_nil() {
1921            return Err(DdsError::BadParameter {
1922                what: "read_instance with HANDLE_NIL",
1923            });
1924        }
1925        self.ingest_into_cache()?;
1926        let mut cache = self
1927            .cache
1928            .lock()
1929            .map_err(|_| DdsError::PreconditionNotMet {
1930                reason: "datareader cache poisoned",
1931            })?;
1932        let mut out = Vec::new();
1933        for s in cache.iter_mut() {
1934            if s.info.instance_handle != handle {
1935                continue;
1936            }
1937            let snap = Sample::new(
1938                self.decode_or_keyholder(s.bytes.as_deref(), s.info.instance_handle)?,
1939                s.info,
1940            );
1941            s.info.sample_state = SampleStateKind::Read;
1942            self.instances.mark_view_seen(handle);
1943            out.push(snap);
1944        }
1945        Ok(out)
1946    }
1947
1948    /// `take_instance` (Spec §2.2.2.5.3.27, Take-Variante). Konsumiert.
1949    ///
1950    /// # Errors
1951    /// `BadParameter` wenn `handle == HANDLE_NIL`.
1952    #[cfg(feature = "std")]
1953    pub fn take_instance(&self, handle: InstanceHandle) -> Result<Vec<Sample<T>>> {
1954        if handle.is_nil() {
1955            return Err(DdsError::BadParameter {
1956                what: "take_instance with HANDLE_NIL",
1957            });
1958        }
1959        self.ingest_into_cache()?;
1960        let mut cache = self
1961            .cache
1962            .lock()
1963            .map_err(|_| DdsError::PreconditionNotMet {
1964                reason: "datareader cache poisoned",
1965            })?;
1966        let mut out = Vec::new();
1967        let mut keep = Vec::with_capacity(cache.len());
1968        for s in cache.drain(..) {
1969            if s.info.instance_handle == handle {
1970                out.push(self.materialize(s)?);
1971            } else {
1972                keep.push(s);
1973            }
1974        }
1975        *cache = keep;
1976        if !out.is_empty() {
1977            self.instances.mark_view_seen(handle);
1978            self.instances.drain_samples(handle, out.len() as u32);
1979        }
1980        Ok(out)
1981    }
1982
1983    /// `read_next_instance` (Spec §2.2.2.5.3.28). Liefert die Samples
1984    /// der **naechsten** Instanz (nach Sortier-Ordnung) hinter
1985    /// `previous`.
1986    ///
1987    /// `previous == HANDLE_NIL` startet beim ersten Handle.
1988    ///
1989    /// # Errors
1990    /// Wie `read`.
1991    #[cfg(feature = "std")]
1992    pub fn read_next_instance(&self, previous: InstanceHandle) -> Result<Vec<Sample<T>>> {
1993        let Some(next) = self.instances.next_handle_after(previous) else {
1994            return Ok(Vec::new());
1995        };
1996        self.read_instance(next)
1997    }
1998
1999    /// `take_next_instance` (Spec §2.2.2.5.3.28). Take-Variante.
2000    ///
2001    /// # Errors
2002    /// Wie `take`.
2003    #[cfg(feature = "std")]
2004    pub fn take_next_instance(&self, previous: InstanceHandle) -> Result<Vec<Sample<T>>> {
2005        let Some(next) = self.instances.next_handle_after(previous) else {
2006            return Ok(Vec::new());
2007        };
2008        self.take_instance(next)
2009    }
2010
2011    /// Hilfsfunktion: aus einem CachedSample ein `Sample<T>` machen.
2012    /// Bei Lifecycle-Markern (`bytes == None`) wird `T` aus dem
2013    /// gespeicherten Key-Holder rekonstruiert (Spec §2.2.2.5.1.13:
2014    /// `data` enthaelt dann nur den Key-Anteil).
2015    #[cfg(feature = "std")]
2016    fn materialize(&self, s: CachedSample) -> Result<Sample<T>> {
2017        let data = self.decode_or_keyholder(s.bytes.as_deref(), s.info.instance_handle)?;
2018        #[cfg(feature = "metrics")]
2019        crate::metrics::add_samples_read(self.topic.name(), 1);
2020        Ok(Sample::new(data, s.info))
2021    }
2022
2023    /// Decode-Helper: bei `Some(bytes)` via `T::decode`, bei `None`
2024    /// (Lifecycle-Marker) ueber den Key-Holder der Instanz; falls
2025    /// auch der nicht verfuegbar, faellt zurueck auf `T::decode(&[])`.
2026    #[cfg(feature = "std")]
2027    fn decode_or_keyholder(&self, bytes: Option<&[u8]>, handle: InstanceHandle) -> Result<T> {
2028        if let Some(b) = bytes {
2029            return T::decode(b).map_err(|e| DdsError::WireError {
2030                message: alloc::string::ToString::to_string(&e),
2031            });
2032        }
2033        if let Some(holder) = self.instances.get_key_holder(handle) {
2034            return T::decode(&holder).map_err(|e| DdsError::WireError {
2035                message: alloc::string::ToString::to_string(&e),
2036            });
2037        }
2038        T::decode(&[]).map_err(|e| DdsError::WireError {
2039            message: alloc::string::ToString::to_string(&e),
2040        })
2041    }
2042}
2043
2044#[cfg(feature = "std")]
2045impl<T: DdsType> crate::entity::Entity for DataReader<T> {
2046    type Qos = DataReaderQos;
2047
2048    fn get_qos(&self) -> Self::Qos {
2049        self.qos.lock().map(|q| q.clone()).unwrap_or_default()
2050    }
2051
2052    /// Spec §2.2.3 / §2.2.2.5.3: DURABILITY, RELIABILITY, HISTORY,
2053    /// RESOURCE_LIMITS, OWNERSHIP sind Changeable=NO post-enable.
2054    fn set_qos(&self, qos: Self::Qos) -> Result<()> {
2055        let enabled = self.entity_state.is_enabled();
2056        if let Ok(mut current) = self.qos.lock() {
2057            if enabled {
2058                if current.durability != qos.durability {
2059                    return Err(crate::entity::immutable_if_enabled("DURABILITY"));
2060                }
2061                if current.reliability != qos.reliability {
2062                    return Err(crate::entity::immutable_if_enabled("RELIABILITY"));
2063                }
2064                if current.history != qos.history {
2065                    return Err(crate::entity::immutable_if_enabled("HISTORY"));
2066                }
2067                if current.resource_limits != qos.resource_limits {
2068                    return Err(crate::entity::immutable_if_enabled("RESOURCE_LIMITS"));
2069                }
2070                if current.ownership != qos.ownership {
2071                    return Err(crate::entity::immutable_if_enabled("OWNERSHIP"));
2072                }
2073                if current.liveliness != qos.liveliness {
2074                    return Err(crate::entity::immutable_if_enabled("LIVELINESS"));
2075                }
2076            }
2077            *current = qos;
2078        }
2079        Ok(())
2080    }
2081
2082    fn enable(&self) -> Result<()> {
2083        self.entity_state.enable();
2084        Ok(())
2085    }
2086
2087    fn entity_state(&self) -> Arc<crate::entity::EntityState> {
2088        Arc::clone(&self.entity_state)
2089    }
2090}
2091
2092// ---- Boxed-typemapped variant fuer heterogene Reader-Listen ----
2093#[allow(dead_code)]
2094pub(crate) trait AnyDataReader: Send + Sync + core::fmt::Debug {
2095    fn topic_name(&self) -> &str;
2096    fn type_name(&self) -> &'static str;
2097}
2098
2099impl<T: DdsType + Send + 'static> AnyDataReader for DataReader<T>
2100where
2101    T: Send + Sync,
2102{
2103    fn topic_name(&self) -> &str {
2104        self.topic.name()
2105    }
2106    fn type_name(&self) -> &'static str {
2107        T::TYPE_NAME
2108    }
2109}
2110
2111#[allow(dead_code)]
2112pub(crate) fn boxed_any_reader<T: DdsType + Send + Sync + 'static>(
2113    r: DataReader<T>,
2114) -> Box<dyn AnyDataReader> {
2115    Box::new(r)
2116}
2117
2118#[cfg(test)]
2119#[allow(clippy::expect_used, clippy::unwrap_used)]
2120mod tests {
2121    use super::*;
2122    use crate::dds_type::RawBytes;
2123    use crate::factory::DomainParticipantFactory;
2124    use crate::qos::{DomainParticipantQos, TopicQos};
2125
2126    fn mk_topic() -> Topic<RawBytes> {
2127        let p = DomainParticipantFactory::instance()
2128            .create_participant_offline(0, DomainParticipantQos::default());
2129        Topic::new("Chatter".into(), TopicQos::default(), p)
2130    }
2131
2132    #[test]
2133    fn subscriber_creates_datareader_for_matching_type() {
2134        let s = Subscriber::new(SubscriberQos::default(), None);
2135        let r = s
2136            .create_datareader::<RawBytes>(&mk_topic(), DataReaderQos::default())
2137            .unwrap();
2138        assert_eq!(r.topic().name(), "Chatter");
2139    }
2140
2141    #[test]
2142    fn datareader_take_returns_decoded_samples() {
2143        let s = Subscriber::new(SubscriberQos::default(), None);
2144        let r = s
2145            .create_datareader::<RawBytes>(&mk_topic(), DataReaderQos::default())
2146            .unwrap();
2147        r.__push_raw(vec![1, 2, 3]).unwrap();
2148        r.__push_raw(vec![4, 5]).unwrap();
2149        let samples = r.take().unwrap();
2150        assert_eq!(samples.len(), 2);
2151        assert_eq!(samples[0].data, vec![1, 2, 3]);
2152        assert_eq!(samples[1].data, vec![4, 5]);
2153        // Inbox ist jetzt leer.
2154        let again = r.take().unwrap();
2155        assert!(again.is_empty());
2156    }
2157
2158    #[test]
2159    fn datareader_read_preserves_samples() {
2160        let s = Subscriber::new(SubscriberQos::default(), None);
2161        let r = s
2162            .create_datareader::<RawBytes>(&mk_topic(), DataReaderQos::default())
2163            .unwrap();
2164        r.__push_raw(vec![0xAA]).unwrap();
2165        let first = r.read().unwrap();
2166        let second = r.read().unwrap();
2167        assert_eq!(first.len(), 1);
2168        assert_eq!(second.len(), 1);
2169    }
2170
2171    // poll_subscription_matched + Listener-Slot-API.
2172
2173    use core::sync::atomic::{AtomicU32, Ordering};
2174
2175    #[test]
2176    fn datareader_set_listener_stores_arc_and_mask() {
2177        struct L;
2178        impl crate::listener::DataReaderListener for L {}
2179        let s = Subscriber::new(SubscriberQos::default(), None);
2180        let r = s
2181            .create_datareader::<RawBytes>(&mk_topic(), DataReaderQos::default())
2182            .unwrap();
2183        assert!(r.get_listener().is_none());
2184        r.set_listener(Some(Arc::new(L)), crate::psm_constants::status::ANY);
2185        assert!(r.get_listener().is_some());
2186        assert_eq!(
2187            r.entity_state.listener_mask(),
2188            crate::psm_constants::status::ANY
2189        );
2190    }
2191
2192    #[test]
2193    fn poll_subscription_matched_fires_on_count_increase() {
2194        struct Cnt(AtomicU32);
2195        impl crate::listener::DataReaderListener for Cnt {
2196            fn on_subscription_matched(
2197                &self,
2198                _r: crate::InstanceHandle,
2199                _s: crate::status::SubscriptionMatchedStatus,
2200            ) {
2201                self.0.fetch_add(1, Ordering::Relaxed);
2202            }
2203        }
2204        let s = Subscriber::new(SubscriberQos::default(), None);
2205        let r = s
2206            .create_datareader::<RawBytes>(&mk_topic(), DataReaderQos::default())
2207            .unwrap();
2208        let cnt = Arc::new(Cnt(AtomicU32::new(0)));
2209        r.set_listener(Some(cnt.clone()), crate::psm_constants::status::ANY);
2210
2211        r.poll_subscription_matched(0);
2212        assert_eq!(cnt.0.load(Ordering::Relaxed), 1);
2213        r.poll_subscription_matched(1);
2214        assert_eq!(cnt.0.load(Ordering::Relaxed), 2);
2215        r.poll_subscription_matched(1);
2216        assert_eq!(cnt.0.load(Ordering::Relaxed), 2);
2217        r.poll_subscription_matched(0);
2218        assert_eq!(cnt.0.load(Ordering::Relaxed), 3);
2219    }
2220
2221    #[test]
2222    fn poll_subscription_matched_with_no_listener_is_noop() {
2223        let s = Subscriber::new(SubscriberQos::default(), None);
2224        let r = s
2225            .create_datareader::<RawBytes>(&mk_topic(), DataReaderQos::default())
2226            .unwrap();
2227        r.poll_subscription_matched(0);
2228        r.poll_subscription_matched(3);
2229    }
2230
2231    #[test]
2232    fn notify_data_arrived_fires_data_available_and_data_on_readers() {
2233        struct ReadCnt(AtomicU32, AtomicU32);
2234        impl crate::listener::DataReaderListener for ReadCnt {
2235            fn on_data_available(&self, _r: crate::InstanceHandle) {
2236                self.0.fetch_add(1, Ordering::Relaxed);
2237            }
2238            fn on_subscription_matched(
2239                &self,
2240                _r: crate::InstanceHandle,
2241                _s: crate::status::SubscriptionMatchedStatus,
2242            ) {
2243                self.1.fetch_add(1, Ordering::Relaxed);
2244            }
2245        }
2246        let s = Subscriber::new(SubscriberQos::default(), None);
2247        let r = s
2248            .create_datareader::<RawBytes>(&mk_topic(), DataReaderQos::default())
2249            .unwrap();
2250        let rc = Arc::new(ReadCnt(AtomicU32::new(0), AtomicU32::new(0)));
2251        r.set_listener(Some(rc.clone()), crate::psm_constants::status::ANY);
2252        r.notify_data_arrived();
2253        assert_eq!(rc.0.load(Ordering::Relaxed), 1);
2254        // sub_matched-Counter unveraendert (anderer Status-Bit).
2255        assert_eq!(rc.1.load(Ordering::Relaxed), 0);
2256    }
2257
2258    // ---- §2.2.2.5.2.8/.9 begin/end_access ----
2259
2260    #[test]
2261    fn subscriber_begin_end_access_roundtrip() {
2262        let s = Subscriber::new(SubscriberQos::default(), None);
2263        assert!(!s.is_access_open());
2264        s.begin_access();
2265        assert!(s.is_access_open());
2266        s.end_access().unwrap();
2267        assert!(!s.is_access_open());
2268    }
2269
2270    #[test]
2271    fn subscriber_end_access_without_begin_returns_precondition_not_met() {
2272        // Spec §2.2.2.5.2.9 — end ohne begin ist Spec-Verletzung.
2273        let s = Subscriber::new(SubscriberQos::default(), None);
2274        let res = s.end_access();
2275        assert!(matches!(
2276            res,
2277            Err(crate::error::DdsError::PreconditionNotMet { .. })
2278        ));
2279    }
2280
2281    #[test]
2282    fn subscriber_begin_access_is_nestable() {
2283        // Spec §2.2.2.5.2.8 — Verschachtelung erlaubt; jedes
2284        // begin braucht ein eigenes end.
2285        let s = Subscriber::new(SubscriberQos::default(), None);
2286        s.begin_access();
2287        s.begin_access();
2288        assert!(s.is_access_open());
2289        s.end_access().unwrap();
2290        // Nach erstem end noch offen (rekursive Verschachtelung).
2291        assert!(s.is_access_open());
2292        s.end_access().unwrap();
2293        // Erst nach zweitem end ist der Scope wieder zu.
2294        assert!(!s.is_access_open());
2295    }
2296
2297    #[test]
2298    fn subscriber_too_many_ends_after_balanced_returns_error() {
2299        // Negativ: nach balanciertem begin/end ist der naechste end
2300        // ein Underflow → PreconditionNotMet.
2301        let s = Subscriber::new(SubscriberQos::default(), None);
2302        s.begin_access();
2303        s.end_access().unwrap();
2304        let res = s.end_access();
2305        assert!(matches!(
2306            res,
2307            Err(crate::error::DdsError::PreconditionNotMet { .. })
2308        ));
2309    }
2310}