Skip to main content

zerodds_dcps/
publisher.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Publisher + DataWriter — das Sende-Ende der DCPS-API.
4//!
5//! Spec-Referenz: OMG DDS 1.4 §2.2.2.4 `Publisher`, §2.2.2.4.2
6//! `DataWriter`.
7//!
8//! # Scope v1.2
9//!
10//! - `Publisher::create_datawriter<T>(topic, qos)` → `DataWriter<T>`.
11//! - `DataWriter::write(&sample)` encodiert via `T::encode` und
12//!   uebergibt an einen **in-memory Queue** (wiring zum
13//!   ReliableWriter erfolgt in Runtime).
14//! - Noch kein Matching gegen Remote-Reader.
15//! - Noch kein QoS-Conflict-Check.
16//!
17//! # Thread-Safety
18//!
19//! `DataWriter` ist `Send`+`Sync` via `Arc<Mutex<_>>`. Mehrere
20//! Application-Threads duerfen parallel `write()` aufrufen.
21
22extern crate alloc;
23use alloc::boxed::Box;
24use alloc::string::ToString;
25use alloc::sync::Arc;
26use alloc::vec::Vec;
27use core::marker::PhantomData;
28
29#[cfg(feature = "std")]
30use std::sync::Mutex;
31
32use crate::dds_type::DdsType;
33use crate::entity::StatusMask;
34use crate::error::{DdsError, Result};
35#[cfg(feature = "std")]
36use crate::instance_handle::{HANDLE_NIL, InstanceHandle};
37#[cfg(feature = "std")]
38use crate::instance_tracker::InstanceTracker;
39use crate::listener::{ArcDataWriterListener, ArcPublisherListener};
40use crate::qos::{DataWriterQos, PublisherQos};
41#[cfg(feature = "std")]
42use crate::time::{Time, get_current_time};
43use crate::topic::Topic;
44
45#[cfg(feature = "std")]
46use crate::runtime::DcpsRuntime;
47#[cfg(feature = "std")]
48use zerodds_qos::ReliabilityKind;
49#[cfg(feature = "std")]
50use zerodds_rtps::wire_types::EntityId;
51
52/// Publisher — Entity-Gruppe fuer DataWriter.
53///
54/// In DDS 1.4 hat der Publisher eigene QoS (Partition, Group-Data,
55/// Presentation). v1.2 implementiert nur die API-Shape ohne
56/// Partition-Matching.
57#[derive(Debug)]
58pub struct Publisher {
59    pub(crate) inner: Arc<PublisherInner>,
60}
61
62pub(crate) struct PublisherInner {
63    /// Mutable QoS. .1 (Entity-Lifecycle): set_qos prueft
64    /// Immutability nach enable().
65    #[cfg(feature = "std")]
66    pub(crate) qos: std::sync::Mutex<PublisherQos>,
67    #[cfg(not(feature = "std"))]
68    #[allow(dead_code)]
69    pub(crate) qos: PublisherQos,
70    /// Entity-Lifecycle (DCPS §2.2.2.1).
71    pub(crate) entity_state: alloc::sync::Arc<crate::entity::EntityState>,
72    /// Runtime-Handle (wenn der Publisher von einem Live-Participant
73    /// erzeugt wurde). None im offline-Modus → DataWriter fallen
74    /// auf in-memory queue zurueck.
75    #[cfg(feature = "std")]
76    pub(crate) runtime: Option<Arc<DcpsRuntime>>,
77    /// optionaler [`ArcPublisherListener`] + [`StatusMask`]
78    /// (Spec §2.2.2.4.3.x set_listener / Bubble-Up §2.2.4.2.3).
79    #[cfg(feature = "std")]
80    pub(crate) listener: std::sync::Mutex<Option<(ArcPublisherListener, StatusMask)>>,
81    /// Schwacher Back-Pointer auf den Participant — fuer Bubble-Up
82    /// vom Publisher zum Participant. Wird von
83    /// `DomainParticipant::create_publisher` gesetzt. `Weak`
84    /// vermeidet einen Refcount-Cycle Participant↔Publisher.
85    #[cfg(feature = "std")]
86    pub(crate) participant:
87        std::sync::Mutex<Option<alloc::sync::Weak<crate::participant::ParticipantInner>>>,
88    /// `suspend_publications`-Flag (Spec §2.2.2.4.1.10). Wenn `true`,
89    /// hat der Publisher die Hint gegeben, dass Writes gepuffert werden
90    /// sollen — Writer kann das als Optimization-Hint nutzen
91    /// (z.B. Coalescing). Nicht binnenkonsistent erzwungen, weil Spec
92    /// es explizit als "hint to the Service" definiert.
93    suspended: core::sync::atomic::AtomicBool,
94    /// DataWriter-Handles (per `create_datawriter` getrackt) fuer
95    /// rekursives `DomainParticipant::contains_entity`
96    /// (Spec §2.2.2.2.1.10).
97    #[cfg(feature = "std")]
98    pub(crate) datawriters:
99        std::sync::Mutex<alloc::vec::Vec<crate::instance_handle::InstanceHandle>>,
100}
101
102// Manueller Debug-Impl, weil `dyn PublisherListener` kein Debug
103// implementiert. Wir geben nur "Some/None" und die Mask aus.
104impl core::fmt::Debug for PublisherInner {
105    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
106        let listener_present = self.listener.lock().map(|s| s.is_some()).unwrap_or(false);
107        f.debug_struct("PublisherInner")
108            .field("entity_state", &self.entity_state)
109            .field("listener_present", &listener_present)
110            .finish_non_exhaustive()
111    }
112}
113
114impl Publisher {
115    #[cfg(feature = "std")]
116    pub(crate) fn new(qos: PublisherQos, runtime: Option<Arc<DcpsRuntime>>) -> Self {
117        Self {
118            inner: Arc::new(PublisherInner {
119                qos: std::sync::Mutex::new(qos),
120                entity_state: crate::entity::EntityState::new(),
121                runtime,
122                listener: std::sync::Mutex::new(None),
123                participant: std::sync::Mutex::new(None),
124                suspended: core::sync::atomic::AtomicBool::new(false),
125                datawriters: std::sync::Mutex::new(alloc::vec::Vec::new()),
126            }),
127        }
128    }
129
130    #[cfg(not(feature = "std"))]
131    pub(crate) fn new(qos: PublisherQos) -> Self {
132        Self {
133            inner: Arc::new(PublisherInner {
134                qos,
135                entity_state: crate::entity::EntityState::new(),
136                suspended: core::sync::atomic::AtomicBool::new(false),
137            }),
138        }
139    }
140
141    /// Spec §2.2.2.2.1.10 — `true` wenn `handle` ein DataWriter ist,
142    /// der ueber diesen Publisher erzeugt wurde.
143    #[cfg(feature = "std")]
144    #[must_use]
145    pub fn contains_writer(&self, handle: crate::instance_handle::InstanceHandle) -> bool {
146        self.inner
147            .datawriters
148            .lock()
149            .map(|v| v.contains(&handle))
150            .unwrap_or(false)
151    }
152
153    /// setzt den `PublisherListener` + StatusMask. `None`
154    /// loescht den Slot. Spec §2.2.2.4.3.x.
155    #[cfg(feature = "std")]
156    pub fn set_listener(&self, listener: Option<ArcPublisherListener>, mask: StatusMask) {
157        if let Ok(mut slot) = self.inner.listener.lock() {
158            *slot = listener.map(|l| (l, mask));
159        }
160        self.inner.entity_state.set_listener_mask(mask);
161    }
162
163    /// aktueller Listener-Klon, sofern vorhanden.
164    #[cfg(feature = "std")]
165    #[must_use]
166    pub fn get_listener(&self) -> Option<ArcPublisherListener> {
167        self.inner
168            .listener
169            .lock()
170            .ok()
171            .and_then(|s| s.as_ref().map(|(l, _)| Arc::clone(l)))
172    }
173
174    /// Setzt den schwachen Back-Pointer auf den Participant. Wird
175    /// vom `DomainParticipant::create_publisher` aufgerufen.
176    #[cfg(feature = "std")]
177    pub(crate) fn attach_participant(
178        &self,
179        participant: alloc::sync::Weak<crate::participant::ParticipantInner>,
180    ) {
181        if let Ok(mut slot) = self.inner.participant.lock() {
182            *slot = Some(participant);
183        }
184    }
185
186    /// Liefert die [`crate::listener_dispatch::WriterListenerChain`]
187    /// fuer einen Writer dieses Publishers — Reader-Pfad-Pendant in
188    /// Subscriber. Klont alle drei Listener-Stufen unter ihren
189    /// Mutexen und gibt das Bundle frei zurueck (Lock-Discipline).
190    #[cfg(feature = "std")]
191    #[must_use]
192    pub(crate) fn snapshot_writer_chain(
193        &self,
194        writer_listener: Option<(ArcDataWriterListener, StatusMask)>,
195    ) -> crate::listener_dispatch::WriterListenerChain {
196        let publisher = self
197            .inner
198            .listener
199            .lock()
200            .ok()
201            .and_then(|s| s.as_ref().map(|(l, m)| (Arc::clone(l), *m)));
202        let participant = {
203            let weak = self.inner.participant.lock().ok().and_then(|s| s.clone());
204            weak.and_then(|w| w.upgrade()).and_then(|inner| {
205                inner
206                    .listener
207                    .lock()
208                    .ok()
209                    .and_then(|s| s.as_ref().map(|(l, m)| (Arc::clone(l), *m)))
210            })
211        };
212        crate::listener_dispatch::WriterListenerChain {
213            writer: writer_listener,
214            publisher,
215            participant,
216        }
217    }
218
219    /// Spec §2.2.2.4.1.10 `suspend_publications` — Hint an die Service,
220    /// dass nachfolgende `write()`-Aufrufe gepuffert werden duerfen
221    /// (z.B. fuer Coalescing). Hat keine Pflicht-Semantik fuer den
222    /// Caller; der Flag ist via `is_suspended()` lesbar fuer die
223    /// Writer-Implementation.
224    ///
225    /// Idempotent: ein zweites `suspend_publications()` ohne
226    /// `resume_publications()` dazwischen ist erlaubt.
227    pub fn suspend_publications(&self) {
228        self.inner
229            .suspended
230            .store(true, core::sync::atomic::Ordering::Release);
231    }
232
233    /// Spec §2.2.2.4.1.11 `resume_publications` — Gegenstueck zu
234    /// `suspend_publications`. Bei aktivem Suspend-Flag ist das
235    /// Verhalten "Service can stop coalescing"; bei inaktivem Flag ist
236    /// das ein No-Op.
237    pub fn resume_publications(&self) {
238        self.inner
239            .suspended
240            .store(false, core::sync::atomic::Ordering::Release);
241    }
242
243    /// `true` wenn `suspend_publications()` aktiv ist und
244    /// `resume_publications()` noch nicht gerufen wurde. Wird vom
245    /// Writer-Send-Pfad als Coalescing-Hint gelesen.
246    #[must_use]
247    pub fn is_suspended(&self) -> bool {
248        self.inner
249            .suspended
250            .load(core::sync::atomic::Ordering::Acquire)
251    }
252
253    /// Spec §2.2.2.4.1.13 `copy_from_topic_qos` — kopiert die zwischen
254    /// Topic- und DataWriter-Qos teilbaren Policies aus `topic_qos`
255    /// nach `dw_qos`. Spec-Liste der gemeinsamen Policies (DCPS 1.4
256    /// §2.2.2.4.1.13): DURABILITY, DEADLINE, LATENCY_BUDGET, LIVELINESS,
257    /// RELIABILITY, DESTINATION_ORDER, HISTORY, RESOURCE_LIMITS,
258    /// TRANSPORT_PRIORITY, LIFESPAN, OWNERSHIP.
259    ///
260    /// # Errors
261    /// `DdsError::BadParameter` wenn das Resultat eine inkonsistente
262    /// QoS-Kombination ergibt — wird vom QoS-Compatibility-Check des
263    /// Caller-DataWriter validiert (analog `set_qos`).
264    pub fn copy_from_topic_qos(
265        dw_qos: &mut DataWriterQos,
266        topic_qos: &crate::qos::TopicQos,
267    ) -> Result<()> {
268        // Die folgenden Felder sind in beiden QoS-Strukturen vorhanden
269        // und werden 1:1 ueberschrieben. DataWriter-only Policies
270        // (OWNERSHIP_STRENGTH, PARTITION, RESOURCE_LIMITS, HISTORY,
271        // LIFESPAN, DEADLINE, LIVELINESS, OWNERSHIP) bleiben
272        // unangetastet, weil TopicQos sie aktuell nicht traegt.
273        // Wenn TopicQos um eines dieser Felder erweitert wird, MUSS
274        // diese Liste mit-erweitert werden — Spec §2.2.2.4.1.13.
275        dw_qos.durability = topic_qos.durability;
276        dw_qos.reliability = topic_qos.reliability;
277        Ok(())
278    }
279
280    /// Erzeugt einen typed `DataWriter<T>`. Spec §2.2.2.4.1.5
281    /// `create_datawriter`.
282    ///
283    /// # Errors
284    /// - `BadParameter` wenn `topic.type_name() != T::TYPE_NAME`
285    ///   (sollte statisch garantiert sein, aber defensiv pruefen).
286    pub fn create_datawriter<T: DdsType + Send + 'static>(
287        &self,
288        topic: &Topic<T>,
289        qos: DataWriterQos,
290    ) -> Result<DataWriter<T>> {
291        if topic.type_name() != T::TYPE_NAME {
292            return Err(DdsError::BadParameter {
293                what: "topic.type_name mismatch",
294            });
295        }
296        #[cfg(feature = "std")]
297        if let Some(rt) = self.inner.runtime.as_ref() {
298            // Live-Mode: registriere einen echten User-Writer bei
299            // der Runtime. Matching und User-Data-Flow laufen ab
300            // jetzt ueber SEDP + UDP.
301            let reliable = qos.reliability.kind == ReliabilityKind::Reliable;
302            let eid = rt.register_user_writer(crate::runtime::UserWriterConfig {
303                topic_name: topic.name().into(),
304                type_name: T::TYPE_NAME.into(),
305                reliable,
306                durability: qos.durability.kind,
307                deadline: qos.deadline,
308                lifespan: qos.lifespan,
309                liveliness: qos.liveliness,
310                ownership: qos.ownership.kind,
311                ownership_strength: qos.ownership_strength.value,
312                partition: qos.partition.names.clone(),
313                user_data: qos.user_data.value.clone(),
314                topic_data: qos.topic_data.value.clone(),
315                group_data: qos.group_data.value.clone(),
316                // F-TYPES-3: Topic-Type-Identifier weitergeben.
317                type_identifier: T::TYPE_IDENTIFIER.clone(),
318                // D.5g — `None` = nutze Runtime-Default. Per-Writer-
319                // Override via QoS-Policy ist TBD (`DataWriterQos::
320                // representation`); die DataRepresentationQosPolicy
321                // ist noch nicht in DataWriterQos modelliert.
322                data_representation_offer: None,
323            })?;
324            let dw =
325                DataWriter::new_live(topic.clone(), qos, self.inner.clone(), Arc::clone(rt), eid);
326            self.track_writer(dw.entity_state.instance_handle());
327            return Ok(dw);
328        }
329        let dw = DataWriter::new_offline(topic.clone(), qos, self.inner.clone());
330        #[cfg(feature = "std")]
331        self.track_writer(dw.entity_state.instance_handle());
332        Ok(dw)
333    }
334
335    #[cfg(feature = "std")]
336    fn track_writer(&self, handle: crate::instance_handle::InstanceHandle) {
337        if let Ok(mut list) = self.inner.datawriters.lock() {
338            list.push(handle);
339        }
340        // Propagiere zum Participant fuer rekursives contains_entity.
341        if let Ok(slot) = self.inner.participant.lock() {
342            if let Some(weak) = slot.as_ref() {
343                if let Some(p_inner) = weak.upgrade() {
344                    if let Ok(mut dws) = p_inner.datawriters.lock() {
345                        dws.push(handle);
346                    }
347                }
348            }
349        }
350    }
351}
352
353// ============================================================================
354// Entity-Trait (DCPS §2.2.2.1) —
355// ============================================================================
356
357#[cfg(feature = "std")]
358impl crate::entity::Entity for Publisher {
359    type Qos = PublisherQos;
360
361    fn get_qos(&self) -> Self::Qos {
362        self.inner.qos.lock().map(|q| q.clone()).unwrap_or_default()
363    }
364
365    fn set_qos(&self, qos: Self::Qos) -> Result<()> {
366        // PublisherQos hat keine immutable Felder per DDS-Spec §2.2.3 —
367        // Partition / GroupData / Presentation sind alle Changeable=YES.
368        // Wir koennen also pre- und post-enable einfach uebernehmen.
369        if let Ok(mut current) = self.inner.qos.lock() {
370            *current = qos;
371        }
372        Ok(())
373    }
374
375    fn enable(&self) -> Result<()> {
376        self.inner.entity_state.enable();
377        Ok(())
378    }
379
380    fn entity_state(&self) -> alloc::sync::Arc<crate::entity::EntityState> {
381        alloc::sync::Arc::clone(&self.inner.entity_state)
382    }
383}
384
385/// Typed DataWriter — schickt Samples an alle matched Reader des Topics.
386///
387/// Zwei Modi:
388/// - **Live** (`runtime: Some`, `entity_id: Some`): write() delegiert
389///   an die Runtime → ReliableWriter → UDP.
390/// - **Offline** (Offline-Fallback, Runtime=None): write() queued
391///   in-memory; fuer Unit-Tests ohne Netz.
392pub struct DataWriter<T: DdsType> {
393    topic: Topic<T>,
394    qos: Mutex<DataWriterQos>,
395    /// Entity-Lifecycle (DCPS §2.2.2.1).
396    entity_state: Arc<crate::entity::EntityState>,
397    /// Parent-Publisher (clone des `Arc`) — fuer Bubble-Up zum
398    /// Publisher- und Participant-Listener.
399    publisher: Arc<PublisherInner>,
400    /// optionaler `DataWriterListener` + `StatusMask`.
401    #[cfg(feature = "std")]
402    listener: Mutex<Option<(ArcDataWriterListener, StatusMask)>>,
403    /// zuletzt gesehene Anzahl matched Reader. Wird vom
404    /// `poll_status_changes` (lazy von Public-API-Pfaden gerufen)
405    /// genutzt, um eine Delta-Detection fuer
406    /// `on_publication_matched` zu fahren — Spec §2.2.4.2.4.4.
407    #[cfg(feature = "std")]
408    last_match_count: std::sync::atomic::AtomicI64,
409    /// zuletzt gesehener offered_deadline_missed-Counter.
410    #[cfg(feature = "std")]
411    last_offered_deadline_missed: std::sync::atomic::AtomicU64,
412    /// zuletzt gesehener liveliness_lost-Counter.
413    #[cfg(feature = "std")]
414    last_liveliness_lost: std::sync::atomic::AtomicU64,
415    /// zuletzt gesehener offered_incompatible_qos.total_count.
416    #[cfg(feature = "std")]
417    last_offered_incompatible_qos: std::sync::atomic::AtomicI64,
418    /// Offline Fallback-Queue.
419    queue: Arc<Mutex<Vec<Vec<u8>>>>,
420    /// Drain-Notify-Pair (Spec §2.2.3.19 RESOURCE_LIMITS Reliable-Block).
421    /// `write()` blockt am Condvar wenn die Queue full ist + RELIABLE +
422    /// `max_blocking_time > 0`; `__drain_pending` notifies alle wartenden
423    /// Writer-Threads.
424    #[cfg(feature = "std")]
425    drain_signal: Arc<std::sync::Condvar>,
426    #[cfg(feature = "std")]
427    runtime: Option<Arc<DcpsRuntime>>,
428    #[cfg(feature = "std")]
429    entity_id: Option<EntityId>,
430    /// Instanz-Buchhaltung.
431    #[cfg(feature = "std")]
432    instances: InstanceTracker,
433    /// Lokaler Publication-Handle — wird in `SampleInfo.publication_handle`
434    /// auf der Reader-Seite eingetragen, sobald Live-Wiring greift.
435    #[cfg(feature = "std")]
436    publication_handle: InstanceHandle,
437    /// Spec §2.2.3.5 DurabilityServiceQosPolicy: bei
438    /// Durability=Transient/Persistent legt der Writer Samples zusaetzlich
439    /// in einem Backend ab, damit Late-Joiner-Reader sie auch nach
440    /// Writer-History-Cleanup beziehen koennen.
441    #[cfg(feature = "std")]
442    durability_backend: Option<Arc<dyn crate::durability_service::DurabilityBackend>>,
443    /// Monoton steigende Writer-Sequenz fuer Durability-Backend-Storage
444    /// (DDS 1.4 §2.2.3.5 + Backend-Replay-Reihenfolge).
445    #[cfg(feature = "std")]
446    durability_seq: std::sync::atomic::AtomicU64,
447    /// Optional konfigurierter Flatdata-SlotBackend fuer Same-Host-
448    /// Zero-Copy-Pfad (`zerodds-flatdata-1.0` §4.1 + §8.1). Wird via
449    /// `set_flat_backend` (siehe `flatdata_integration`-Modul) gesetzt;
450    /// bei `None` faellt `write_flat()` auf den klassischen UDP-Pfad
451    /// zurueck.
452    #[cfg(all(feature = "std", feature = "flatdata-integration"))]
453    pub(crate) flat_backend: Mutex<
454        Option<(
455            Arc<dyn zerodds_flatdata::SlotBackend>,
456            u32, // active_readers_mask
457        )>,
458    >,
459    _t: PhantomData<fn() -> T>,
460}
461
462impl<T: DdsType> core::fmt::Debug for DataWriter<T> {
463    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
464        f.debug_struct("DataWriter")
465            .field("topic", &self.topic.name())
466            .field("type", &T::TYPE_NAME)
467            .field("qos", &self.qos)
468            .finish_non_exhaustive()
469    }
470}
471
472impl<T: DdsType> DataWriter<T> {
473    #[cfg(feature = "std")]
474    fn new_offline(topic: Topic<T>, qos: DataWriterQos, publisher: Arc<PublisherInner>) -> Self {
475        let tracker = InstanceTracker::new();
476        let pub_handle = InstanceHandle::from_raw(0xFFFF_0000_0000_0001);
477        let backend = Self::build_durability_backend(&qos);
478        Self {
479            topic,
480            qos: Mutex::new(qos),
481            entity_state: crate::entity::EntityState::new(),
482            publisher,
483            listener: Mutex::new(None),
484            last_match_count: std::sync::atomic::AtomicI64::new(-1),
485            last_offered_deadline_missed: std::sync::atomic::AtomicU64::new(0),
486            last_liveliness_lost: std::sync::atomic::AtomicU64::new(0),
487            last_offered_incompatible_qos: std::sync::atomic::AtomicI64::new(-1),
488            queue: Arc::new(Mutex::new(Vec::new())),
489            #[cfg(feature = "std")]
490            drain_signal: Arc::new(std::sync::Condvar::new()),
491            runtime: None,
492            entity_id: None,
493            instances: tracker,
494            publication_handle: pub_handle,
495            durability_backend: backend,
496            durability_seq: std::sync::atomic::AtomicU64::new(1),
497            #[cfg(feature = "flatdata-integration")]
498            flat_backend: Mutex::new(None),
499            _t: PhantomData,
500        }
501    }
502
503    /// Liefert das Durability-Backend (None bei Volatile/TransientLocal).
504    /// Test-/Inspektions-Hilfsfunktion — Spec §2.2.3.5.
505    #[doc(hidden)]
506    #[cfg(feature = "std")]
507    #[must_use]
508    pub fn durability_backend(
509        &self,
510    ) -> Option<Arc<dyn crate::durability_service::DurabilityBackend>> {
511        self.durability_backend.clone()
512    }
513
514    /// Spec §2.2.3.5: bei Durability=Transient legt der Writer ein
515    /// In-Memory-Backend an. Persistent ohne Root-Pfad wird nicht
516    /// auto-konfiguriert — Caller muss `set_durability_backend`
517    /// explizit aufrufen, weil der Pfad applikationsabhaengig ist.
518    #[cfg(feature = "std")]
519    fn build_durability_backend(
520        qos: &DataWriterQos,
521    ) -> Option<Arc<dyn crate::durability_service::DurabilityBackend>> {
522        match qos.durability.kind {
523            zerodds_qos::DurabilityKind::Transient => Some(Arc::new(
524                crate::durability_service::InMemoryDurabilityBackend::new(qos.durability_service),
525            )),
526            _ => None,
527        }
528    }
529
530    #[cfg(feature = "std")]
531    fn new_live(
532        topic: Topic<T>,
533        qos: DataWriterQos,
534        publisher: Arc<PublisherInner>,
535        runtime: Arc<DcpsRuntime>,
536        entity_id: EntityId,
537    ) -> Self {
538        let tracker = InstanceTracker::new();
539        // Wir leiten den Publication-Handle aus der EntityId ab — das macht ihn
540        // ueber Test-Runs reproduzierbar und vermeidet eine Pool-Kollision mit
541        // Instance-Handles. Spec sagt ohnehin nur, dass es sich um ein opakes
542        // u64 handelt.
543        let key = entity_id.entity_key;
544        let pub_handle = InstanceHandle::from_raw(
545            0xFFFF_0000_0000_0000
546                | (u64::from(key[0]) << 16)
547                | (u64::from(key[1]) << 8)
548                | u64::from(key[2]),
549        );
550        let backend = Self::build_durability_backend(&qos);
551        Self {
552            topic,
553            qos: Mutex::new(qos),
554            entity_state: crate::entity::EntityState::new(),
555            publisher,
556            listener: Mutex::new(None),
557            last_match_count: std::sync::atomic::AtomicI64::new(-1),
558            last_offered_deadline_missed: std::sync::atomic::AtomicU64::new(0),
559            last_liveliness_lost: std::sync::atomic::AtomicU64::new(0),
560            last_offered_incompatible_qos: std::sync::atomic::AtomicI64::new(-1),
561            queue: Arc::new(Mutex::new(Vec::new())),
562            #[cfg(feature = "std")]
563            drain_signal: Arc::new(std::sync::Condvar::new()),
564            runtime: Some(runtime),
565            entity_id: Some(entity_id),
566            instances: tracker,
567            publication_handle: pub_handle,
568            durability_backend: backend,
569            durability_seq: std::sync::atomic::AtomicU64::new(1),
570            #[cfg(feature = "flatdata-integration")]
571            flat_backend: Mutex::new(None),
572            _t: PhantomData,
573        }
574    }
575
576    #[cfg(not(feature = "std"))]
577    fn new(topic: Topic<T>, qos: DataWriterQos, publisher: Arc<PublisherInner>) -> Self {
578        Self {
579            topic,
580            qos,
581            publisher,
582            queue: Arc::new(Mutex::new(Vec::new())),
583            #[cfg(feature = "std")]
584            drain_signal: Arc::new(std::sync::Condvar::new()),
585            _t: PhantomData,
586        }
587    }
588
589    /// Topic, an das gesendet wird.
590    #[must_use]
591    pub fn topic(&self) -> &Topic<T> {
592        &self.topic
593    }
594
595    /// setzt den `DataWriterListener` + StatusMask. `None`
596    /// loescht den Slot. Spec §2.2.2.4.2.x set_listener.
597    #[cfg(feature = "std")]
598    pub fn set_listener(&self, listener: Option<ArcDataWriterListener>, mask: StatusMask) {
599        if let Ok(mut slot) = self.listener.lock() {
600            *slot = listener.map(|l| (l, mask));
601        }
602        self.entity_state.set_listener_mask(mask);
603    }
604
605    /// aktueller Listener-Klon, sofern vorhanden.
606    #[cfg(feature = "std")]
607    #[must_use]
608    pub fn get_listener(&self) -> Option<ArcDataWriterListener> {
609        self.listener
610            .lock()
611            .ok()
612            .and_then(|s| s.as_ref().map(|(l, _)| Arc::clone(l)))
613    }
614
615    /// Snapshot der Bubble-Up-Kette (Writer → Publisher → Participant).
616    /// Fuer Hot-Path-Listener-Dispatch genutzt.
617    #[cfg(feature = "std")]
618    #[must_use]
619    pub(crate) fn listener_chain(&self) -> crate::listener_dispatch::WriterListenerChain {
620        let writer = self
621            .listener
622            .lock()
623            .ok()
624            .and_then(|s| s.as_ref().map(|(l, m)| (Arc::clone(l), *m)));
625        // Wir reichen den Writer-Klon an den Publisher-Snapshot weiter,
626        // der Publisher- und Participant-Stage befuellt.
627        let pub_handle = Publisher {
628            inner: Arc::clone(&self.publisher),
629        };
630        pub_handle.snapshot_writer_chain(writer)
631    }
632
633    /// Aktuelle QoS (cloned, .1).
634    #[must_use]
635    pub fn qos(&self) -> DataWriterQos {
636        self.qos.lock().map(|q| q.clone()).unwrap_or_default()
637    }
638
639    /// Sendet einen Sample an alle matched Reader.
640    ///
641    /// **Spec §2.2.3.19 RESOURCE_LIMITS Reliable-Block:** Wenn der
642    /// lokale Writer-Cache `max_samples` erreicht hat UND
643    /// Reliability=RELIABLE UND `max_blocking_time > 0`, blockt
644    /// `write()` bis Reader ACK den Slot freigibt oder das Timeout
645    /// abgelaufen ist. Im Best-Effort-Mode oder mit
646    /// `max_blocking_time = 0` schlaegt `write()` sofort fehl mit
647    /// `OutOfResources`.
648    ///
649    /// # Errors
650    /// - `WireError` wenn `T::encode` fehlschlaegt.
651    /// - `OutOfResources` wenn die Queue full ist + Best-Effort/keine
652    ///   Block-Zeit, oder wenn der Block-Timeout vor einem Drain abgelaufen ist.
653    /// - `PreconditionNotMet` bei Lock-Poisoning.
654    pub fn write(&self, sample: &T) -> Result<()> {
655        let mut buf = Vec::new();
656        sample.encode(&mut buf).map_err(|e| DdsError::WireError {
657            message: e.to_string(),
658        })?;
659        #[cfg(feature = "metrics")]
660        crate::metrics::inc_sample_written(self.topic.name());
661        #[cfg(feature = "metrics")]
662        crate::metrics::record_sample_size(self.topic.name(), buf.len());
663        // Spec §2.2.3.5 DurabilityServiceQosPolicy: Sample zusaetzlich
664        // ins Backend ablegen (Transient/Persistent), damit Late-Joiner-
665        // Reader nach Writer-History-Cleanup noch beziehen koennen.
666        #[cfg(feature = "std")]
667        if let Some(backend) = self.durability_backend.as_ref() {
668            let key_bytes = Self::keyhash_and_holder(sample)
669                .map(|(kh, _)| kh)
670                .unwrap_or([0u8; 16]);
671            // Monotone Writer-Sequenz fuer Backend-Replay-Reihenfolge
672            // (DDS 1.4 §2.2.3.5).
673            let seq = self
674                .durability_seq
675                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
676            let _ = backend.store(crate::durability_service::DurabilitySample {
677                topic: self.topic.name().to_string(),
678                instance_key: key_bytes,
679                sequence: seq,
680                payload: buf.clone(),
681                created_at: std::time::SystemTime::now(),
682            });
683        }
684        // Live-Mode: delegiere an Runtime → ReliableWriter → UDP.
685        #[cfg(feature = "std")]
686        if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
687            return rt.write_user_sample(eid, buf);
688        }
689        // Offline-Fallback: in-memory queue mit RESOURCE_LIMITS-Block.
690        #[cfg(feature = "std")]
691        {
692            let qos = self.qos.lock().map(|q| q.clone()).unwrap_or_default();
693            let max_samples = qos.resource_limits.max_samples;
694            let reliable = qos.reliability.kind == ReliabilityKind::Reliable;
695            let max_block = qos.reliability.max_blocking_time;
696            let max_block_dur = core::time::Duration::from_nanos(
697                u64::try_from(max_block.seconds).unwrap_or(0) * 1_000_000_000
698                    + u64::from(max_block.fraction),
699            );
700
701            let mut q = self
702                .queue
703                .lock()
704                .map_err(|_| DdsError::PreconditionNotMet {
705                    reason: "datawriter queue poisoned",
706                })?;
707            if max_samples > 0 && q.len() >= max_samples as usize {
708                if !reliable || max_block_dur.is_zero() {
709                    return Err(DdsError::OutOfResources {
710                        what: "datawriter queue full (best-effort or no max_blocking_time)",
711                    });
712                }
713                // Reliable + max_blocking_time > 0 → wait_timeout am Condvar.
714                let deadline = std::time::Instant::now() + max_block_dur;
715                loop {
716                    let now = std::time::Instant::now();
717                    if now >= deadline {
718                        return Err(DdsError::Timeout);
719                    }
720                    let remaining = deadline - now;
721                    let (g, _) = self.drain_signal.wait_timeout(q, remaining).map_err(|_| {
722                        DdsError::PreconditionNotMet {
723                            reason: "datawriter queue poisoned",
724                        }
725                    })?;
726                    q = g;
727                    if q.len() < max_samples as usize {
728                        break;
729                    }
730                    // sonst spurious wakeup → weiter warten.
731                }
732            }
733            q.push(buf);
734            Ok(())
735        }
736        #[cfg(not(feature = "std"))]
737        {
738            let mut q = self
739                .queue
740                .lock()
741                .map_err(|_| DdsError::PreconditionNotMet {
742                    reason: "datawriter queue poisoned",
743                })?;
744            q.push(buf);
745            Ok(())
746        }
747    }
748
749    /// Anzahl bisher geschriebener Samples. Test-Hilfsfunktion,
750    /// in Runtime durch echte HistoryCache-Counter ersetzt.
751    #[must_use]
752    pub fn samples_pending(&self) -> usize {
753        self.queue.lock().map(|q| q.len()).unwrap_or(0)
754    }
755
756    /// Anzahl matched Remote-Reader. Im Offline-Mode immer 0.
757    ///
758    /// Spec: OMG DDS 1.4 §2.2.2.4.2.11 `get_matched_subscriptions`.
759    /// Dort wird eine Liste zurueckgegeben; liefert nur
760    /// den Count, die volle Liste kommt mit Listener-Callbacks.
761    ///
762    /// Seiteneffekt — bei einer Aenderung des Matched-Count
763    /// gegenueber dem letzten Aufruf wird `on_publication_matched`
764    /// via Bubble-Up-Kette gefeuert (Spec §2.2.4.2.4.4).
765    #[must_use]
766    pub fn matched_subscription_count(&self) -> usize {
767        #[cfg(feature = "std")]
768        if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
769            let n = rt.user_writer_matched_count(eid);
770            self.poll_publication_matched(n);
771            return n;
772        }
773        0
774    }
775
776    /// vergleicht den `current`-Count mit
777    /// `last_match_count` und feuert `on_publication_matched` wenn
778    /// sich der Wert geaendert hat. Initial ist `last_match_count == -1`,
779    /// d.h. der erste Aufruf mit n>=0 triggert immer.
780    #[cfg(feature = "std")]
781    pub(crate) fn poll_publication_matched(&self, current: usize) {
782        let curr = current as i64;
783        let prev = self
784            .last_match_count
785            .swap(curr, std::sync::atomic::Ordering::AcqRel);
786        if prev == curr {
787            return;
788        }
789        let total = if curr > prev.max(0) {
790            curr
791        } else {
792            prev.max(0)
793        };
794        let delta = curr - prev.max(0);
795        let status = crate::status::PublicationMatchedStatus {
796            total_count: total as i32,
797            total_count_change: delta.max(0) as i32,
798            current_count: curr as i32,
799            current_count_change: delta as i32,
800            last_subscription_handle: crate::instance_handle::HANDLE_NIL,
801        };
802        let chain = self.listener_chain();
803        crate::listener_dispatch::dispatch_publication_matched(
804            &chain,
805            self.entity_state.instance_handle(),
806            status,
807        );
808    }
809
810    /// Delta-Detect fuer `on_offered_deadline_missed`.
811    /// Liest den Counter aus der Runtime und feuert den Listener bei
812    /// Delta. Spec §2.2.4.2.4.1.
813    #[cfg(feature = "std")]
814    pub(crate) fn poll_offered_deadline_missed(&self, current: u64) {
815        let prev = self
816            .last_offered_deadline_missed
817            .swap(current, std::sync::atomic::Ordering::AcqRel);
818        if current == prev {
819            return;
820        }
821        let total_change = current.saturating_sub(prev);
822        let status = crate::status::OfferedDeadlineMissedStatus {
823            total_count: current as i32,
824            total_count_change: total_change as i32,
825            last_instance_handle: crate::instance_handle::HANDLE_NIL,
826        };
827        let chain = self.listener_chain();
828        crate::listener_dispatch::dispatch_offered_deadline_missed(
829            &chain,
830            self.entity_state.instance_handle(),
831            status,
832        );
833    }
834
835    /// Delta-Detect fuer `on_liveliness_lost`. Spec §2.2.4.2.4.3.
836    #[cfg(feature = "std")]
837    pub(crate) fn poll_liveliness_lost(&self, current: u64) {
838        let prev = self
839            .last_liveliness_lost
840            .swap(current, std::sync::atomic::Ordering::AcqRel);
841        if current == prev {
842            return;
843        }
844        let total_change = current.saturating_sub(prev);
845        let status = crate::status::LivelinessLostStatus {
846            total_count: current as i32,
847            total_count_change: total_change as i32,
848        };
849        let chain = self.listener_chain();
850        crate::listener_dispatch::dispatch_liveliness_lost(
851            &chain,
852            self.entity_state.instance_handle(),
853            status,
854        );
855    }
856
857    /// Delta-Detect fuer `on_offered_incompatible_qos`.
858    /// Spec §2.2.4.2.4.2.
859    #[cfg(feature = "std")]
860    pub(crate) fn poll_offered_incompatible_qos(
861        &self,
862        snapshot: crate::status::OfferedIncompatibleQosStatus,
863    ) {
864        let curr = i64::from(snapshot.total_count);
865        let prev = self
866            .last_offered_incompatible_qos
867            .swap(curr, std::sync::atomic::Ordering::AcqRel);
868        if curr == prev {
869            return;
870        }
871        let delta = curr - prev.max(0);
872        let status = crate::status::OfferedIncompatibleQosStatus {
873            total_count: curr as i32,
874            total_count_change: delta.max(0) as i32,
875            last_policy_id: snapshot.last_policy_id,
876            policies: snapshot.policies,
877        };
878        let chain = self.listener_chain();
879        crate::listener_dispatch::dispatch_offered_incompatible_qos(
880            &chain,
881            self.entity_state.instance_handle(),
882            status,
883        );
884    }
885
886    /// Blockiert, bis mindestens `min_count` Remote-Reader matched
887    /// sind oder `timeout` verstreicht. Event-driven via Runtime-Condvar
888    /// (D.5e Phase-1) — wakup direkt wenn SEDP einen Match propagiert,
889    /// kein 20-ms-Polling mehr.
890    ///
891    /// Verwandt zu OMG DDS 1.4 §2.2.2.4.2.22 `wait_for_acknowledgments`,
892    /// aber fokussiert auf Matching statt ACK. Deckt den typischen
893    /// Producer-Pattern "erst Writer anlegen, dann auf Subscriber warten,
894    /// dann schreiben" ab.
895    ///
896    /// # Errors
897    /// [`DdsError::Timeout`] wenn `min_count` im Zeitfenster nicht
898    /// erreicht wird.
899    #[cfg(feature = "std")]
900    pub fn wait_for_matched_subscription(
901        &self,
902        min_count: usize,
903        timeout: core::time::Duration,
904    ) -> Result<()> {
905        let deadline = std::time::Instant::now() + timeout;
906        loop {
907            if self.matched_subscription_count() >= min_count {
908                return Ok(());
909            }
910            let now = std::time::Instant::now();
911            if now >= deadline {
912                return Err(DdsError::Timeout);
913            }
914            if let Some(rt) = self.runtime.as_ref() {
915                let _ = rt.wait_match_event(deadline - now);
916            } else {
917                std::thread::sleep(core::time::Duration::from_millis(20));
918            }
919        }
920    }
921
922    /// Counter fuer offered-Deadline-Verletzungen (Spec
923    /// §2.2.4.2.9 `OFFERED_DEADLINE_MISSED_STATUS`). Monoton steigend;
924    /// steigt um 1 pro abgelaufenem Deadline-Fenster ohne Write.
925    /// Im Offline-Mode oder bei `deadline=INFINITE` immer 0.
926    ///
927    /// feuert ggf. `on_offered_deadline_missed` ueber die
928    /// Bubble-Up-Kette bei Delta gegenueber dem letzten Aufruf.
929    #[must_use]
930    pub fn offered_deadline_missed_count(&self) -> u64 {
931        #[cfg(feature = "std")]
932        if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
933            let n = rt.user_writer_offered_deadline_missed(eid);
934            self.poll_offered_deadline_missed(n);
935            return n;
936        }
937        0
938    }
939
940    /// Counter fuer LivelinessLost-Detections (Spec §2.2.4.2.10).
941    /// Triggert ggf. `on_liveliness_lost` via Bubble-Up.
942    #[must_use]
943    pub fn liveliness_lost_count(&self) -> u64 {
944        #[cfg(feature = "std")]
945        if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
946            let n = rt.user_writer_liveliness_lost(eid);
947            self.poll_liveliness_lost(n);
948            return n;
949        }
950        0
951    }
952
953    /// aktueller `OfferedIncompatibleQosStatus` (Spec
954    /// §2.2.4.2.4.2). Triggert ggf. `on_offered_incompatible_qos`.
955    #[must_use]
956    pub fn offered_incompatible_qos_status(&self) -> crate::status::OfferedIncompatibleQosStatus {
957        #[cfg(feature = "std")]
958        if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
959            let s = rt.user_writer_offered_incompatible_qos(eid);
960            self.poll_offered_incompatible_qos(s.clone());
961            return s;
962        }
963        crate::status::OfferedIncompatibleQosStatus::default()
964    }
965
966    /// pollt alle Statuses einmal und feuert pending Listener.
967    /// Convenient Helper fuer Tests + periodische Tick-Aufrufer.
968    #[cfg(feature = "std")]
969    pub fn drive_listeners(&self) {
970        let _ = self.matched_subscription_count();
971        let _ = self.offered_deadline_missed_count();
972        let _ = self.liveliness_lost_count();
973        let _ = self.offered_incompatible_qos_status();
974    }
975
976    /// Manual-Liveliness-Assert. Spec §2.2.2.4.2.20
977    /// `assert_liveliness`. Setzt den `last_liveliness_assert`-Timestamp;
978    /// bei Automatic-Liveliness no-op (jeder write asserts ohnehin).
979    #[cfg(feature = "std")]
980    pub fn assert_liveliness(&self) {
981        if let (Some(rt), Some(eid)) = (&self.runtime, self.entity_id) {
982            rt.assert_writer_liveliness_eid(eid);
983        }
984    }
985
986    /// Blockiert, bis alle matched Remote-Reader alle bis jetzt
987    /// geschriebenen Samples acknowledgt haben, oder `timeout` abläuft.
988    ///
989    /// Spec: OMG DDS 1.4 §2.2.2.4.2.22 `wait_for_acknowledgments`.
990    /// Im Offline-Mode und ohne gematchte Reader sofort `Ok(())`.
991    ///
992    /// # Errors
993    /// [`DdsError::Timeout`] wenn nicht alle Samples im Zeitfenster
994    /// acknowledgt sind.
995    #[cfg(feature = "std")]
996    pub fn wait_for_acknowledgments(&self, timeout: core::time::Duration) -> Result<()> {
997        let deadline = std::time::Instant::now() + timeout;
998        loop {
999            let all_acked = match (&self.runtime, self.entity_id) {
1000                (Some(rt), Some(eid)) => rt.user_writer_all_acknowledged(eid),
1001                _ => true, // offline: nichts zu bestaetigen
1002            };
1003            if all_acked {
1004                return Ok(());
1005            }
1006            let now = std::time::Instant::now();
1007            if now >= deadline {
1008                return Err(DdsError::Timeout);
1009            }
1010            // D.5e Phase-1: event-driven via Runtime-ack-event-Cvar.
1011            if let Some(rt) = self.runtime.as_ref() {
1012                let _ = rt.wait_ack_event(deadline - now);
1013            } else {
1014                std::thread::sleep(core::time::Duration::from_millis(20));
1015            }
1016        }
1017    }
1018
1019    /// Nimmt alle pending Samples aus der Offline-Queue heraus. Nur
1020    /// fuer Tests; wird mit Live-Mode-Wiring entfernt.
1021    #[doc(hidden)]
1022    pub fn __drain_pending(&self) -> Vec<Vec<u8>> {
1023        let drained = self
1024            .queue
1025            .lock()
1026            .map(|mut q| core::mem::take(&mut *q))
1027            .unwrap_or_default();
1028        // Spec §2.2.3.19: Drain-Signal an wartende `write()`-Threads.
1029        #[cfg(feature = "std")]
1030        self.drain_signal.notify_all();
1031        drained
1032    }
1033
1034    // ========================================================================
1035    // Instance-API.4 / DDS 1.4 §2.2.2.4.2.{5,7,10,13,14}
1036    // ========================================================================
1037
1038    /// Lokaler Publication-Handle dieses DataWriters (Spec §2.2.2.5.1.11).
1039    /// Wird im `publication_handle`-Feld des `SampleInfo` mitgegeben.
1040    /// **Achtung**: das ist NICHT derselbe Handle wie der Entity-
1041    /// `InstanceHandle` (Spec §2.2.2.1.1) — siehe [`Self::instance_handle`].
1042    #[cfg(feature = "std")]
1043    #[must_use]
1044    pub fn publication_handle(&self) -> InstanceHandle {
1045        self.publication_handle
1046    }
1047
1048    /// Spec §2.2.2.1.1 `get_instance_handle` — Entity-Identifier
1049    /// dieses DataWriters fuer Vergleiche via
1050    /// `DomainParticipant::contains_entity`.
1051    #[must_use]
1052    pub fn instance_handle(&self) -> InstanceHandle {
1053        self.entity_state.instance_handle()
1054    }
1055
1056    /// Liefert den aktuellen [`InstanceTracker`] (geteilt mit der
1057    /// internen Buchhaltung). Hauptsaechlich fuer Tests / Inspection.
1058    #[cfg(feature = "std")]
1059    #[must_use]
1060    /// Liefert (Runtime, EntityId), wenn der Writer im Live-Mode laeuft.
1061    /// Cross-Crate-Hook fuer FFI-Layer (zerodds-c-api), die
1062    /// rt.write_user_lifecycle direkt aufrufen muessen.
1063    #[doc(hidden)]
1064    #[cfg(feature = "std")]
1065    pub fn runtime_handle(&self) -> Option<(Arc<DcpsRuntime>, EntityId)> {
1066        match (&self.runtime, self.entity_id) {
1067            (Some(rt), Some(eid)) => Some((Arc::clone(rt), eid)),
1068            _ => None,
1069        }
1070    }
1071
1072    /// Liefert den geteilten Instance-Tracker des Writers (Test- und
1073    /// Inspection-Helper, Spec §2.2.2.4.2.5+ Lifecycle-Buchhaltung).
1074    pub fn instance_tracker(&self) -> InstanceTracker {
1075        self.instances.clone()
1076    }
1077
1078    /// Berechnet den KeyHash + den PLAIN_CDR2-BE-Key-Holder fuer ein
1079    /// Sample. Liefert `None` fuer non-keyed Topics.
1080    #[cfg(feature = "std")]
1081    fn keyhash_and_holder(sample: &T) -> Option<(crate::instance_tracker::KeyHash, Vec<u8>)> {
1082        if !T::HAS_KEY {
1083            return None;
1084        }
1085        let mut holder = crate::dds_type::PlainCdr2BeKeyHolder::new();
1086        sample.encode_key_holder_be(&mut holder);
1087        let bytes = holder.as_bytes().to_vec();
1088        let max = T::KEY_HOLDER_MAX_SIZE.unwrap_or(usize::MAX);
1089        let kh = crate::dds_type::compute_key_hash(&bytes, max);
1090        Some((kh, bytes))
1091    }
1092
1093    /// Registriert eine Instanz beim DataWriter und liefert ihren
1094    /// stabilen [`InstanceHandle`] zurueck. Spec §2.2.2.4.2.5
1095    /// `register_instance`.
1096    ///
1097    /// Fuer non-keyed Topics liefert der Aufruf [`HANDLE_NIL`] zurueck
1098    /// (jedes Sample ist seine eigene "Instanz", die Spec sagt explizit
1099    /// dass register/unregister/dispose hier optional sind).
1100    ///
1101    /// # Errors
1102    /// Aktuell kann der Aufruf nicht fehlschlagen. Spaeter (Live-Mode)
1103    /// koennen Resource-Limits hier einen `OutOfResources`-Fehler
1104    /// liefern.
1105    #[cfg(feature = "std")]
1106    pub fn register_instance(&self, instance: &T) -> Result<InstanceHandle> {
1107        self.register_instance_w_timestamp(instance, get_current_time())
1108    }
1109
1110    /// Wie `register_instance`, aber mit explizitem Timestamp.
1111    /// Spec §2.2.2.4.2.6.
1112    #[cfg(feature = "std")]
1113    pub fn register_instance_w_timestamp(
1114        &self,
1115        instance: &T,
1116        timestamp: Time,
1117    ) -> Result<InstanceHandle> {
1118        let Some((kh, holder)) = Self::keyhash_and_holder(instance) else {
1119            return Ok(HANDLE_NIL);
1120        };
1121        Ok(self.instances.register(kh, holder, Some(timestamp)))
1122    }
1123
1124    /// Macht aus einem Sample-Wert den dazugehoerigen lokalen
1125    /// [`InstanceHandle`], oder [`HANDLE_NIL`] wenn unbekannt /
1126    /// non-keyed. Spec §2.2.2.4.2.14 `lookup_instance`.
1127    #[cfg(feature = "std")]
1128    #[must_use]
1129    pub fn lookup_instance(&self, instance: &T) -> InstanceHandle {
1130        let Some((kh, _)) = Self::keyhash_and_holder(instance) else {
1131            return HANDLE_NIL;
1132        };
1133        self.instances.lookup(&kh).unwrap_or(HANDLE_NIL)
1134    }
1135
1136    /// Entfernt die Instanz aus dem Writer-Set (Spec §2.2.2.4.2.7).
1137    /// Setzt den Lifecycle-Zustand auf `NOT_ALIVE_NO_WRITERS`, sobald
1138    /// der letzte Writer sich abgemeldet hat.
1139    ///
1140    /// # Errors
1141    /// `BadParameter` wenn `handle` nicht zur Instanz von `instance`
1142    /// passt (Spec verlangt diese Konsistenz-Pruefung). Wenn
1143    /// `handle == HANDLE_NIL`, wird der Handle aus `instance`
1144    /// abgeleitet.
1145    #[cfg(feature = "std")]
1146    pub fn unregister_instance(&self, instance: &T, handle: InstanceHandle) -> Result<()> {
1147        self.unregister_instance_w_timestamp(instance, handle, get_current_time())
1148    }
1149
1150    /// Wie `unregister_instance`, aber mit Timestamp. Spec §2.2.2.4.2.8.
1151    ///
1152    /// Spec §2.2.3.21 WriterDataLifecycle: wenn
1153    /// `autodispose_unregistered_instances=true` (Default), wird die
1154    /// Instanz zusaetzlich zum Unregister auch disposed — Reader sehen
1155    /// dann sowohl `NOT_ALIVE_DISPOSED` als auch `NOT_ALIVE_NO_WRITERS`.
1156    #[cfg(feature = "std")]
1157    pub fn unregister_instance_w_timestamp(
1158        &self,
1159        instance: &T,
1160        handle: InstanceHandle,
1161        timestamp: Time,
1162    ) -> Result<()> {
1163        let resolved = self.resolve_handle(instance, handle)?;
1164        let autodispose = self
1165            .qos
1166            .lock()
1167            .map(|q| q.writer_data_lifecycle.autodispose_unregistered_instances)
1168            .unwrap_or(true);
1169        if autodispose && !self.instances.dispose(resolved, Some(timestamp)) {
1170            return Err(DdsError::BadParameter {
1171                what: "unknown instance handle",
1172            });
1173        }
1174        if !self.instances.unregister(resolved, Some(timestamp)) {
1175            return Err(DdsError::BadParameter {
1176                what: "unknown instance handle",
1177            });
1178        }
1179        // Wire-Side (Spec §9.6.3.9 PID_STATUS_INFO): an alle matched Reader
1180        // einen Lifecycle-Marker schicken. Bei autodispose=true setzen wir
1181        // beide Bits, sonst nur UNREGISTERED.
1182        #[cfg(feature = "std")]
1183        if let (Some(rt), Some(eid), Some((kh, _))) = (
1184            &self.runtime,
1185            self.entity_id,
1186            Self::keyhash_and_holder(instance),
1187        ) {
1188            let mut bits = zerodds_rtps::inline_qos::status_info::UNREGISTERED;
1189            if autodispose {
1190                bits |= zerodds_rtps::inline_qos::status_info::DISPOSED;
1191            }
1192            let _ = rt.write_user_lifecycle(eid, kh, bits);
1193        }
1194        Ok(())
1195    }
1196
1197    /// Disposed eine Instanz (Spec §2.2.2.4.2.10). Markiert sie als
1198    /// `NOT_ALIVE_DISPOSED`; Reader sehen dann ein Sample mit
1199    /// `valid_data == false`.
1200    ///
1201    /// # Errors
1202    /// Wie `unregister_instance`.
1203    #[cfg(feature = "std")]
1204    pub fn dispose(&self, instance: &T, handle: InstanceHandle) -> Result<()> {
1205        self.dispose_w_timestamp(instance, handle, get_current_time())
1206    }
1207
1208    /// Wie `dispose`, aber mit Timestamp. Spec §2.2.2.4.2.11.
1209    #[cfg(feature = "std")]
1210    pub fn dispose_w_timestamp(
1211        &self,
1212        instance: &T,
1213        handle: InstanceHandle,
1214        timestamp: Time,
1215    ) -> Result<()> {
1216        let resolved = self.resolve_handle(instance, handle)?;
1217        if !self.instances.dispose(resolved, Some(timestamp)) {
1218            return Err(DdsError::BadParameter {
1219                what: "unknown instance handle",
1220            });
1221        }
1222        // Wire-Side (Spec §9.6.3.9 PID_STATUS_INFO).
1223        #[cfg(feature = "std")]
1224        if let (Some(rt), Some(eid), Some((kh, _))) = (
1225            &self.runtime,
1226            self.entity_id,
1227            Self::keyhash_and_holder(instance),
1228        ) {
1229            let _ =
1230                rt.write_user_lifecycle(eid, kh, zerodds_rtps::inline_qos::status_info::DISPOSED);
1231        }
1232        Ok(())
1233    }
1234
1235    /// Gibt den Sample-Wert mit nur den `@key`-Feldern befuellt zurueck
1236    /// (Spec §2.2.2.4.2.13 `get_key_value`). Implementierung: wir
1237    /// rekonstruieren `T` via `decode` aus dem gespeicherten
1238    /// PLAIN_CDR2-BE-Key-Holder. Damit das funktioniert, muss `T::decode`
1239    /// einen Key-only-Stream akzeptieren — fuer einfache Records ist das
1240    /// trivialerweise der Fall.
1241    ///
1242    /// # Errors
1243    /// * `BadParameter` wenn der Handle unbekannt ist.
1244    /// * `WireError` wenn der Key-Holder nicht via `T::decode`
1245    ///   rekonstruierbar ist.
1246    #[cfg(feature = "std")]
1247    pub fn get_key_value(&self, handle: InstanceHandle) -> Result<T> {
1248        let Some(bytes) = self.instances.get_key_holder(handle) else {
1249            return Err(DdsError::BadParameter {
1250                what: "unknown instance handle",
1251            });
1252        };
1253        T::decode(&bytes).map_err(|e| DdsError::WireError {
1254            message: alloc::string::ToString::to_string(&e),
1255        })
1256    }
1257
1258    /// Hilfsfunktion: `handle == HANDLE_NIL` → aus `instance` ableiten.
1259    /// Sonst: pruefen, dass `handle` zur Instanz von `instance` passt.
1260    #[cfg(feature = "std")]
1261    fn resolve_handle(&self, instance: &T, handle: InstanceHandle) -> Result<InstanceHandle> {
1262        let derived = self.lookup_instance(instance);
1263        if handle.is_nil() {
1264            if derived.is_nil() {
1265                return Err(DdsError::BadParameter {
1266                    what: "instance not registered",
1267                });
1268            }
1269            return Ok(derived);
1270        }
1271        if !derived.is_nil() && derived != handle {
1272            return Err(DdsError::BadParameter {
1273                what: "handle does not match instance key",
1274            });
1275        }
1276        Ok(handle)
1277    }
1278
1279    /// Schreibt ein Sample mit explizitem Timestamp (Spec §2.2.2.4.2.16
1280    /// `write_w_timestamp`) und aktualisiert die Instanz-Buchhaltung.
1281    ///
1282    /// # Errors
1283    /// Wie [`Self::write`].
1284    #[cfg(feature = "std")]
1285    pub fn write_w_timestamp(&self, sample: &T, timestamp: Time) -> Result<()> {
1286        // Auto-Register: wenn die Instanz noch nicht bekannt ist,
1287        // registrieren wir sie implizit (Spec §2.2.2.4.2.16 erlaubt das).
1288        if let Some((kh, holder)) = Self::keyhash_and_holder(sample) {
1289            if self.instances.lookup(&kh).is_none() {
1290                self.instances.register(kh, holder, Some(timestamp));
1291            } else {
1292                // Bei Re-Activation nach Dispose / NoWriters bumpt das
1293                // register den Generation-Counter, fuegt aber gleichzeitig
1294                // einen Writer-Count hinzu, den wir nicht wollen — daher
1295                // direkt wieder dekrementieren.
1296                let prev = self.instances.get_by_keyhash(&kh);
1297                if let Some(state) = prev {
1298                    if !matches!(state.kind, crate::sample_info::InstanceStateKind::Alive) {
1299                        self.instances.register(kh, holder, Some(timestamp));
1300                        self.instances.unregister(state.handle, Some(timestamp));
1301                    }
1302                }
1303            }
1304        }
1305        self.write(sample)
1306    }
1307}
1308
1309#[cfg(feature = "std")]
1310impl<T: DdsType> crate::entity::Entity for DataWriter<T> {
1311    type Qos = DataWriterQos;
1312
1313    fn get_qos(&self) -> Self::Qos {
1314        self.qos.lock().map(|q| q.clone()).unwrap_or_default()
1315    }
1316
1317    /// Spec §2.2.3 / §2.2.2.4.2: DURABILITY, RELIABILITY, HISTORY,
1318    /// RESOURCE_LIMITS, OWNERSHIP, LIVELINESS sind Changeable=NO post-enable.
1319    fn set_qos(&self, qos: Self::Qos) -> Result<()> {
1320        let enabled = self.entity_state.is_enabled();
1321        if let Ok(mut current) = self.qos.lock() {
1322            if enabled {
1323                if current.durability != qos.durability {
1324                    return Err(crate::entity::immutable_if_enabled("DURABILITY"));
1325                }
1326                if current.reliability != qos.reliability {
1327                    return Err(crate::entity::immutable_if_enabled("RELIABILITY"));
1328                }
1329                if current.history != qos.history {
1330                    return Err(crate::entity::immutable_if_enabled("HISTORY"));
1331                }
1332                if current.resource_limits != qos.resource_limits {
1333                    return Err(crate::entity::immutable_if_enabled("RESOURCE_LIMITS"));
1334                }
1335                if current.ownership != qos.ownership {
1336                    return Err(crate::entity::immutable_if_enabled("OWNERSHIP"));
1337                }
1338                if current.liveliness != qos.liveliness {
1339                    return Err(crate::entity::immutable_if_enabled("LIVELINESS"));
1340                }
1341            }
1342            *current = qos;
1343        }
1344        Ok(())
1345    }
1346
1347    fn enable(&self) -> Result<()> {
1348        self.entity_state.enable();
1349        Ok(())
1350    }
1351
1352    fn entity_state(&self) -> Arc<crate::entity::EntityState> {
1353        Arc::clone(&self.entity_state)
1354    }
1355}
1356
1357// ---- Boxed-typemapped variant, damit Publisher eine heterogene
1358// Writer-Liste halten kann (Live-Mode-Vorbereitung) ----
1359#[allow(dead_code)]
1360pub(crate) trait AnyDataWriter: Send + Sync + core::fmt::Debug {
1361    fn topic_name(&self) -> &str;
1362    fn type_name(&self) -> &'static str;
1363}
1364
1365impl<T: DdsType + Send + 'static> AnyDataWriter for DataWriter<T>
1366where
1367    T: Send + Sync,
1368{
1369    fn topic_name(&self) -> &str {
1370        self.topic.name()
1371    }
1372    fn type_name(&self) -> &'static str {
1373        T::TYPE_NAME
1374    }
1375}
1376
1377// Silence dead_code on Box<dyn AnyDataWriter> construction helper.
1378#[allow(dead_code)]
1379pub(crate) fn boxed_any_writer<T: DdsType + Send + Sync + 'static>(
1380    w: DataWriter<T>,
1381) -> Box<dyn AnyDataWriter> {
1382    Box::new(w)
1383}
1384
1385#[cfg(test)]
1386#[allow(clippy::expect_used, clippy::unwrap_used)]
1387mod tests {
1388    use super::*;
1389    use crate::dds_type::RawBytes;
1390    use crate::factory::DomainParticipantFactory;
1391    use crate::qos::{DomainParticipantQos, TopicQos};
1392
1393    fn mk_topic() -> Topic<RawBytes> {
1394        let p = DomainParticipantFactory::instance()
1395            .create_participant_offline(0, DomainParticipantQos::default());
1396        Topic::new("Chatter".into(), TopicQos::default(), p)
1397    }
1398
1399    #[test]
1400    fn publisher_creates_datawriter_for_matching_type() {
1401        let p = Publisher::new(PublisherQos::default(), None);
1402        let w = p
1403            .create_datawriter::<RawBytes>(&mk_topic(), DataWriterQos::default())
1404            .unwrap();
1405        assert_eq!(w.topic().name(), "Chatter");
1406    }
1407
1408    #[test]
1409    fn datawriter_write_queues_encoded_sample() {
1410        let p = Publisher::new(PublisherQos::default(), None);
1411        let w = p
1412            .create_datawriter::<RawBytes>(&mk_topic(), DataWriterQos::default())
1413            .unwrap();
1414        assert_eq!(w.samples_pending(), 0);
1415        w.write(&RawBytes::new(vec![1, 2, 3])).unwrap();
1416        assert_eq!(w.samples_pending(), 1);
1417        let drained = w.__drain_pending();
1418        assert_eq!(drained, vec![vec![1u8, 2, 3]]);
1419    }
1420
1421    // poll_publication_matched + Listener-Slot-API.
1422
1423    use core::sync::atomic::{AtomicU32, Ordering};
1424
1425    #[test]
1426    fn datawriter_set_listener_stores_arc_and_mask() {
1427        struct L;
1428        impl crate::listener::DataWriterListener for L {}
1429        let p = Publisher::new(PublisherQos::default(), None);
1430        let w = p
1431            .create_datawriter::<RawBytes>(&mk_topic(), DataWriterQos::default())
1432            .unwrap();
1433        assert!(w.get_listener().is_none());
1434        w.set_listener(Some(Arc::new(L)), crate::psm_constants::status::ANY);
1435        assert!(w.get_listener().is_some());
1436        // Mask wird nach EntityState gespiegelt.
1437        assert_eq!(
1438            w.entity_state.listener_mask(),
1439            crate::psm_constants::status::ANY
1440        );
1441    }
1442
1443    #[test]
1444    fn poll_publication_matched_fires_on_count_increase() {
1445        struct Cnt(AtomicU32);
1446        impl crate::listener::DataWriterListener for Cnt {
1447            fn on_publication_matched(
1448                &self,
1449                _w: crate::InstanceHandle,
1450                _s: crate::status::PublicationMatchedStatus,
1451            ) {
1452                self.0.fetch_add(1, Ordering::Relaxed);
1453            }
1454        }
1455        let p = Publisher::new(PublisherQos::default(), None);
1456        let w = p
1457            .create_datawriter::<RawBytes>(&mk_topic(), DataWriterQos::default())
1458            .unwrap();
1459        let cnt = Arc::new(Cnt(AtomicU32::new(0)));
1460        w.set_listener(Some(cnt.clone()), crate::psm_constants::status::ANY);
1461
1462        // 0 → 0 (Initial-Aufruf, AtomicI64 ist -1, also Delta da).
1463        w.poll_publication_matched(0);
1464        assert_eq!(cnt.0.load(Ordering::Relaxed), 1);
1465        // 0 → 1 (Aenderung).
1466        w.poll_publication_matched(1);
1467        assert_eq!(cnt.0.load(Ordering::Relaxed), 2);
1468        // 1 → 1 (kein Delta).
1469        w.poll_publication_matched(1);
1470        assert_eq!(cnt.0.load(Ordering::Relaxed), 2);
1471        // 1 → 2.
1472        w.poll_publication_matched(2);
1473        assert_eq!(cnt.0.load(Ordering::Relaxed), 3);
1474        // 2 → 1 (Reader weg).
1475        w.poll_publication_matched(1);
1476        assert_eq!(cnt.0.load(Ordering::Relaxed), 4);
1477    }
1478
1479    #[test]
1480    fn poll_publication_matched_with_no_listener_is_noop() {
1481        let p = Publisher::new(PublisherQos::default(), None);
1482        let w = p
1483            .create_datawriter::<RawBytes>(&mk_topic(), DataWriterQos::default())
1484            .unwrap();
1485        // Kein Listener gesetzt — darf weder panicken noch Delta-State
1486        // korrumpieren.
1487        w.poll_publication_matched(0);
1488        w.poll_publication_matched(5);
1489    }
1490
1491    #[test]
1492    fn poll_publication_matched_bubbles_to_publisher() {
1493        struct PubL(AtomicU32);
1494        impl crate::listener::PublisherListener for PubL {
1495            fn on_publication_matched(
1496                &self,
1497                _w: crate::InstanceHandle,
1498                _s: crate::status::PublicationMatchedStatus,
1499            ) {
1500                self.0.fetch_add(1, Ordering::Relaxed);
1501            }
1502        }
1503        let p = Publisher::new(PublisherQos::default(), None);
1504        let pl = Arc::new(PubL(AtomicU32::new(0)));
1505        p.set_listener(Some(pl.clone()), crate::psm_constants::status::ANY);
1506        let w = p
1507            .create_datawriter::<RawBytes>(&mk_topic(), DataWriterQos::default())
1508            .unwrap();
1509        // Kein Writer-Listener → Publisher empfaengt.
1510        w.poll_publication_matched(1);
1511        assert_eq!(pl.0.load(Ordering::Relaxed), 1);
1512    }
1513
1514    // ---- §2.2.2.4.1.10 / .11 suspend/resume_publications ----
1515
1516    #[test]
1517    fn suspend_publications_sets_flag() {
1518        let p = Publisher::new(PublisherQos::default(), None);
1519        assert!(!p.is_suspended());
1520        p.suspend_publications();
1521        assert!(p.is_suspended());
1522    }
1523
1524    #[test]
1525    fn resume_publications_clears_flag() {
1526        let p = Publisher::new(PublisherQos::default(), None);
1527        p.suspend_publications();
1528        p.resume_publications();
1529        assert!(!p.is_suspended());
1530    }
1531
1532    #[test]
1533    fn suspend_publications_is_idempotent() {
1534        let p = Publisher::new(PublisherQos::default(), None);
1535        p.suspend_publications();
1536        p.suspend_publications(); // zweiter Call ist No-Op
1537        assert!(p.is_suspended());
1538    }
1539
1540    #[test]
1541    fn resume_without_suspend_is_noop() {
1542        let p = Publisher::new(PublisherQos::default(), None);
1543        // Spec §2.2.2.4.1.11 — resume ohne aktivem suspend ist No-Op.
1544        p.resume_publications();
1545        assert!(!p.is_suspended());
1546    }
1547
1548    // ---- §2.2.2.4.1.13 copy_from_topic_qos ----
1549
1550    #[test]
1551    fn copy_from_topic_qos_copies_durability_and_reliability() {
1552        use crate::qos::{DurabilityKind, ReliabilityKind, TopicQos};
1553        let mut topic = TopicQos::default();
1554        topic.durability.kind = DurabilityKind::TransientLocal;
1555        topic.reliability.kind = ReliabilityKind::Reliable;
1556
1557        let mut dw = DataWriterQos::default();
1558        // Setze etwas anderes, damit wir die Aenderung sehen.
1559        dw.durability.kind = DurabilityKind::Volatile;
1560        Publisher::copy_from_topic_qos(&mut dw, &topic).unwrap();
1561        assert_eq!(dw.durability.kind, DurabilityKind::TransientLocal);
1562        assert_eq!(dw.reliability.kind, ReliabilityKind::Reliable);
1563    }
1564
1565    // ---- §2.2.3.19 RESOURCE_LIMITS Reliable-Block ----
1566
1567    #[test]
1568    fn write_blocks_until_drain_when_reliable_max_samples_reached() {
1569        use crate::qos::{HistoryQosPolicy, ResourceLimitsQosPolicy};
1570        let p = Publisher::new(PublisherQos::default(), None);
1571        let qos = DataWriterQos {
1572            resource_limits: ResourceLimitsQosPolicy {
1573                max_samples: 2,
1574                max_instances: -1,
1575                max_samples_per_instance: -1,
1576            },
1577            reliability: crate::qos::ReliabilityQosPolicy {
1578                kind: ReliabilityKind::Reliable,
1579                max_blocking_time: zerodds_qos::Duration::from_millis(500_i32),
1580            },
1581            ..DataWriterQos::default()
1582        };
1583        let _ = qos.history;
1584        let _ = HistoryQosPolicy::default();
1585        let w = p.create_datawriter::<RawBytes>(&mk_topic(), qos).unwrap();
1586        let s = RawBytes::new(b"x".to_vec());
1587        // Erst beide Slots fuellen (no block).
1588        w.write(&s).unwrap();
1589        w.write(&s).unwrap();
1590        assert_eq!(w.samples_pending(), 2);
1591
1592        // Dritter write blockt; in einem zweiten Thread drain wir nach 50ms.
1593        let w_clone_q = w.queue.clone();
1594        let w_clone_signal = w.drain_signal.clone();
1595        let drain_handle = std::thread::spawn(move || {
1596            std::thread::sleep(core::time::Duration::from_millis(50));
1597            if let Ok(mut q) = w_clone_q.lock() {
1598                let _ = core::mem::take(&mut *q);
1599            }
1600            w_clone_signal.notify_all();
1601        });
1602
1603        let start = std::time::Instant::now();
1604        let res = w.write(&s);
1605        let elapsed = start.elapsed();
1606        drain_handle.join().unwrap();
1607
1608        assert!(res.is_ok(), "write should succeed after drain, got {res:?}");
1609        assert!(
1610            elapsed >= core::time::Duration::from_millis(40)
1611                && elapsed < core::time::Duration::from_millis(450),
1612            "elapsed = {elapsed:?}, expected ~50ms"
1613        );
1614    }
1615
1616    #[test]
1617    fn write_returns_timeout_when_reliable_drain_too_slow() {
1618        use crate::qos::ResourceLimitsQosPolicy;
1619        let p = Publisher::new(PublisherQos::default(), None);
1620        let qos = DataWriterQos {
1621            resource_limits: ResourceLimitsQosPolicy {
1622                max_samples: 1,
1623                max_instances: -1,
1624                max_samples_per_instance: -1,
1625            },
1626            reliability: crate::qos::ReliabilityQosPolicy {
1627                kind: ReliabilityKind::Reliable,
1628                max_blocking_time: zerodds_qos::Duration::from_millis(50_i32),
1629            },
1630            ..DataWriterQos::default()
1631        };
1632        let w = p.create_datawriter::<RawBytes>(&mk_topic(), qos).unwrap();
1633        let s = RawBytes::new(b"x".to_vec());
1634        w.write(&s).unwrap();
1635        // Zweiter write hat Reliable + 50ms Block; ohne drain → Timeout.
1636        let res = w.write(&s);
1637        assert!(matches!(res, Err(DdsError::Timeout)));
1638    }
1639
1640    #[test]
1641    fn write_returns_oor_when_best_effort_queue_full() {
1642        use crate::qos::ResourceLimitsQosPolicy;
1643        let p = Publisher::new(PublisherQos::default(), None);
1644        let qos = DataWriterQos {
1645            resource_limits: ResourceLimitsQosPolicy {
1646                max_samples: 1,
1647                max_instances: -1,
1648                max_samples_per_instance: -1,
1649            },
1650            reliability: crate::qos::ReliabilityQosPolicy {
1651                kind: ReliabilityKind::BestEffort,
1652                max_blocking_time: zerodds_qos::Duration::from_millis(0_i32),
1653            },
1654            ..DataWriterQos::default()
1655        };
1656        let w = p.create_datawriter::<RawBytes>(&mk_topic(), qos).unwrap();
1657        let s = RawBytes::new(b"x".to_vec());
1658        w.write(&s).unwrap();
1659        let res = w.write(&s);
1660        assert!(matches!(res, Err(DdsError::OutOfResources { .. })));
1661    }
1662
1663    #[test]
1664    fn write_does_not_block_when_max_samples_unlimited() {
1665        // max_samples = -1 (LENGTH_UNLIMITED) → kein Cap, kein Block.
1666        let p = Publisher::new(PublisherQos::default(), None);
1667        let w = p
1668            .create_datawriter::<RawBytes>(&mk_topic(), DataWriterQos::default())
1669            .unwrap();
1670        let s = RawBytes::new(b"x".to_vec());
1671        for _ in 0..50 {
1672            w.write(&s).unwrap();
1673        }
1674        assert_eq!(w.samples_pending(), 50);
1675    }
1676
1677    #[test]
1678    fn copy_from_topic_qos_does_not_touch_writer_only_policies() {
1679        use crate::qos::TopicQos;
1680        let topic = TopicQos::default();
1681        let mut dw = DataWriterQos::default();
1682        // Setze ownership_strength auf einen konkreten Wert; sollte
1683        // nach copy unangetastet bleiben (kein TopicQos-Counterpart).
1684        dw.ownership_strength.value = 42;
1685        Publisher::copy_from_topic_qos(&mut dw, &topic).unwrap();
1686        assert_eq!(dw.ownership_strength.value, 42);
1687    }
1688}