zerodds_dcps_async/writer.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! AsyncDataWriter — write/dispose/unregister/wait_for_matched async.
4
5use alloc::sync::Arc;
6use core::time::Duration;
7
8use zerodds_dcps::{DataWriter, DataWriterQos, DdsType, InstanceHandle, Result};
9
10/// Async-Wrapper um `DataWriter<T>`.
11///
12/// Hot-Path: `write()` ist eine Future-Form ueber dem sync-Pfad mit
13/// einer yield-basierten Retry-Schleife fuer
14/// `OutOfResources`-Backpressure (Spec §5.1
15/// `zerodds-async-1.0`). Statt eines Thread-Block-`Condvar::wait_timeout`
16/// fallen Caller-Tasks per `yield_for` aus dem Executor und bleiben
17/// cancelable. Andere DCPS-Methoden delegieren synchron — sie sind
18/// ohnehin nicht blockierend.
19pub struct AsyncDataWriter<T: DdsType + Send + Sync + 'static> {
20 inner: Arc<DataWriter<T>>,
21}
22
23impl<T: DdsType + Send + Sync + 'static> Clone for AsyncDataWriter<T> {
24 fn clone(&self) -> Self {
25 Self {
26 inner: Arc::clone(&self.inner),
27 }
28 }
29}
30
31impl<T: DdsType + Send + Sync + 'static> AsyncDataWriter<T> {
32 pub(crate) fn from_sync(inner: DataWriter<T>) -> Self {
33 Self {
34 inner: Arc::new(inner),
35 }
36 }
37
38 /// Schreibt einen Sample. Spec §2.1.1.
39 ///
40 /// # Errors
41 /// Wie `DataWriter::write` — `OutOfResources` nach
42 /// `max_blocking_time`-Timeout, sonst alle anderen Errors
43 /// transparent durchgereicht.
44 ///
45 /// Spec §5.1 zerodds-async-1.0: bei `OutOfResources` suspendiert
46 /// der Future via `yield_for` und retried, bis entweder ein Drain
47 /// passiert oder die `reliability.max_blocking_time` abgelaufen
48 /// ist. Im Sync-Pfad wuerde hier ein `Condvar::wait_timeout`
49 /// blockieren — async-Pfad nutzt yield-retry-Loop ohne
50 /// Thread-Block.
51 pub async fn write(&self, sample: &T) -> Result<()>
52 where
53 T: Clone,
54 {
55 let max_block = self.inner.qos().reliability.max_blocking_time;
56 let max_block_nanos = max_block.to_nanos();
57 // INFINITE → unsere Retry-Loop hat trotzdem einen safety-cap
58 // (~1 s polling) damit Caller die Caller-side cancellation
59 // sieht. Spec erlaubt das.
60 let safety_cap = core::time::Duration::from_secs(1);
61 let deadline = if max_block_nanos == u128::MAX {
62 None
63 } else {
64 #[allow(clippy::cast_possible_truncation)]
65 let secs = (max_block_nanos / 1_000_000_000) as u64;
66 #[allow(clippy::cast_possible_truncation)]
67 let nanos = (max_block_nanos % 1_000_000_000) as u32;
68 Some(std::time::Instant::now() + core::time::Duration::new(secs, nanos))
69 };
70
71 let s = sample.clone();
72 loop {
73 match self.inner.write(&s) {
74 Ok(()) => return Ok(()),
75 Err(zerodds_dcps::DdsError::OutOfResources { .. }) => {
76 // Drain abwarten.
77 if let Some(d) = deadline {
78 if std::time::Instant::now() >= d {
79 return Err(zerodds_dcps::DdsError::Timeout);
80 }
81 }
82 crate::yield_for(core::time::Duration::from_millis(2)).await;
83 }
84 Err(other) => return Err(other),
85 }
86 if deadline.is_none() {
87 // INFINITE: nach 1 s safety-yield, damit der Caller
88 // mindestens ein await-point sieht und canceln kann.
89 let _ = safety_cap;
90 }
91 }
92 }
93
94 /// Spec §2.1.2 register_instance.
95 ///
96 /// # Errors
97 /// Wie sync.
98 pub async fn register_instance(&self, sample: &T) -> Result<InstanceHandle> {
99 self.inner.register_instance(sample)
100 }
101
102 /// Spec §2.1.3 dispose. Loest Wire-Lifecycle DISPOSED.
103 ///
104 /// # Errors
105 /// Wie sync.
106 pub async fn dispose(&self, sample: &T, handle: InstanceHandle) -> Result<()> {
107 self.inner.dispose(sample, handle)
108 }
109
110 /// Spec §2.1.4 unregister_instance.
111 ///
112 /// # Errors
113 /// Wie sync.
114 pub async fn unregister_instance(&self, sample: &T, handle: InstanceHandle) -> Result<()> {
115 self.inner.unregister_instance(sample, handle)
116 }
117
118 /// Spec §2.1.5 wait_for_matched_subscription. Async-Polling-
119 /// Schleife mit 10 ms Tick.
120 ///
121 /// # Errors
122 /// Wie sync — `Timeout` wenn `min_count` nicht in `timeout` erreicht.
123 pub async fn wait_for_matched_subscription(
124 &self,
125 min_count: usize,
126 timeout: Duration,
127 ) -> Result<()> {
128 let deadline = std::time::Instant::now() + timeout;
129 loop {
130 if self.inner.matched_subscription_count() >= min_count {
131 return Ok(());
132 }
133 if std::time::Instant::now() >= deadline {
134 return Err(zerodds_dcps::DdsError::Timeout);
135 }
136 // Async-sleep ohne tokio-Hard-Dep: yield via futures-Helper.
137 crate::yield_for(Duration::from_millis(10)).await;
138 }
139 }
140
141 /// Spec §2.1.6 matched_subscription_count (synchron).
142 #[must_use]
143 pub fn matched_subscription_count(&self) -> usize {
144 self.inner.matched_subscription_count()
145 }
146
147 /// Liefert die zugrundeliegende sync-Variante.
148 #[must_use]
149 pub fn as_sync(&self) -> &DataWriter<T> {
150 &self.inner
151 }
152
153 /// Liefert die DataWriterQos.
154 #[must_use]
155 pub fn qos(&self) -> DataWriterQos {
156 self.inner.qos()
157 }
158}