Skip to main content

zerodds_dcps_async/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! AsyncDataWriter — write/dispose/unregister/wait_for_matched async.
4
5use alloc::sync::Arc;
6use core::time::Duration;
7
8use zerodds_dcps::{DataWriter, DataWriterQos, DdsType, InstanceHandle, Result};
9
10/// Async-Wrapper um `DataWriter<T>`.
11///
12/// Hot-Path: `write()` ist eine Future-Form ueber dem sync-Pfad mit
13/// einer yield-basierten Retry-Schleife fuer
14/// `OutOfResources`-Backpressure (Spec §5.1
15/// `zerodds-async-1.0`). Statt eines Thread-Block-`Condvar::wait_timeout`
16/// fallen Caller-Tasks per `yield_for` aus dem Executor und bleiben
17/// cancelable. Andere DCPS-Methoden delegieren synchron — sie sind
18/// ohnehin nicht blockierend.
19pub struct AsyncDataWriter<T: DdsType + Send + Sync + 'static> {
20    inner: Arc<DataWriter<T>>,
21}
22
23impl<T: DdsType + Send + Sync + 'static> Clone for AsyncDataWriter<T> {
24    fn clone(&self) -> Self {
25        Self {
26            inner: Arc::clone(&self.inner),
27        }
28    }
29}
30
31impl<T: DdsType + Send + Sync + 'static> AsyncDataWriter<T> {
32    pub(crate) fn from_sync(inner: DataWriter<T>) -> Self {
33        Self {
34            inner: Arc::new(inner),
35        }
36    }
37
38    /// Schreibt einen Sample. Spec §2.1.1.
39    ///
40    /// # Errors
41    /// Wie `DataWriter::write` — `OutOfResources` nach
42    /// `max_blocking_time`-Timeout, sonst alle anderen Errors
43    /// transparent durchgereicht.
44    ///
45    /// Spec §5.1 zerodds-async-1.0: bei `OutOfResources` suspendiert
46    /// der Future via `yield_for` und retried, bis entweder ein Drain
47    /// passiert oder die `reliability.max_blocking_time` abgelaufen
48    /// ist. Im Sync-Pfad wuerde hier ein `Condvar::wait_timeout`
49    /// blockieren — async-Pfad nutzt yield-retry-Loop ohne
50    /// Thread-Block.
51    pub async fn write(&self, sample: &T) -> Result<()>
52    where
53        T: Clone,
54    {
55        let max_block = self.inner.qos().reliability.max_blocking_time;
56        let max_block_nanos = max_block.to_nanos();
57        // INFINITE → unsere Retry-Loop hat trotzdem einen safety-cap
58        // (~1 s polling) damit Caller die Caller-side cancellation
59        // sieht. Spec erlaubt das.
60        let safety_cap = core::time::Duration::from_secs(1);
61        let deadline = if max_block_nanos == u128::MAX {
62            None
63        } else {
64            #[allow(clippy::cast_possible_truncation)]
65            let secs = (max_block_nanos / 1_000_000_000) as u64;
66            #[allow(clippy::cast_possible_truncation)]
67            let nanos = (max_block_nanos % 1_000_000_000) as u32;
68            Some(std::time::Instant::now() + core::time::Duration::new(secs, nanos))
69        };
70
71        let s = sample.clone();
72        loop {
73            match self.inner.write(&s) {
74                Ok(()) => return Ok(()),
75                Err(zerodds_dcps::DdsError::OutOfResources { .. }) => {
76                    // Drain abwarten.
77                    if let Some(d) = deadline {
78                        if std::time::Instant::now() >= d {
79                            return Err(zerodds_dcps::DdsError::Timeout);
80                        }
81                    }
82                    crate::yield_for(core::time::Duration::from_millis(2)).await;
83                }
84                Err(other) => return Err(other),
85            }
86            if deadline.is_none() {
87                // INFINITE: nach 1 s safety-yield, damit der Caller
88                // mindestens ein await-point sieht und canceln kann.
89                let _ = safety_cap;
90            }
91        }
92    }
93
94    /// Spec §2.1.2 register_instance.
95    ///
96    /// # Errors
97    /// Wie sync.
98    pub async fn register_instance(&self, sample: &T) -> Result<InstanceHandle> {
99        self.inner.register_instance(sample)
100    }
101
102    /// Spec §2.1.3 dispose. Loest Wire-Lifecycle DISPOSED.
103    ///
104    /// # Errors
105    /// Wie sync.
106    pub async fn dispose(&self, sample: &T, handle: InstanceHandle) -> Result<()> {
107        self.inner.dispose(sample, handle)
108    }
109
110    /// Spec §2.1.4 unregister_instance.
111    ///
112    /// # Errors
113    /// Wie sync.
114    pub async fn unregister_instance(&self, sample: &T, handle: InstanceHandle) -> Result<()> {
115        self.inner.unregister_instance(sample, handle)
116    }
117
118    /// Spec §2.1.5 wait_for_matched_subscription. Async-Polling-
119    /// Schleife mit 10 ms Tick.
120    ///
121    /// # Errors
122    /// Wie sync — `Timeout` wenn `min_count` nicht in `timeout` erreicht.
123    pub async fn wait_for_matched_subscription(
124        &self,
125        min_count: usize,
126        timeout: Duration,
127    ) -> Result<()> {
128        let deadline = std::time::Instant::now() + timeout;
129        loop {
130            if self.inner.matched_subscription_count() >= min_count {
131                return Ok(());
132            }
133            if std::time::Instant::now() >= deadline {
134                return Err(zerodds_dcps::DdsError::Timeout);
135            }
136            // Async-sleep ohne tokio-Hard-Dep: yield via futures-Helper.
137            crate::yield_for(Duration::from_millis(10)).await;
138        }
139    }
140
141    /// Spec §2.1.6 matched_subscription_count (synchron).
142    #[must_use]
143    pub fn matched_subscription_count(&self) -> usize {
144        self.inner.matched_subscription_count()
145    }
146
147    /// Liefert die zugrundeliegende sync-Variante.
148    #[must_use]
149    pub fn as_sync(&self) -> &DataWriter<T> {
150        &self.inner
151    }
152
153    /// Liefert die DataWriterQos.
154    #[must_use]
155    pub fn qos(&self) -> DataWriterQos {
156        self.inner.qos()
157    }
158}