1use crate::dds_interface::{
7 DataReader, DataWriter, DdsInterface, DiscoveredReader, DiscoveredWriter, DiscoveryCallback,
8 DurabilityKind, ReceivedSample,
9};
10use crate::store::RetentionPolicy;
11use anyhow::{anyhow, Result};
12use hdds::core::discovery::multicast::{DiscoveryListener, EndpointInfo, EndpointKind};
13use hdds::dds::qos::{Durability, DurabilityService, History};
14use hdds::{Participant, QoS, RawDataReader, RawDataWriter};
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17
18pub struct HddsDdsInterface {
20 participant: Arc<Participant>,
21 discovery: Arc<hdds::core::discovery::multicast::DiscoveryFsm>,
22}
23
24impl HddsDdsInterface {
25 pub fn new(participant: Arc<Participant>) -> Result<Self> {
27 let discovery = participant
28 .discovery()
29 .ok_or_else(|| anyhow!("Discovery not initialized"))?;
30
31 Ok(Self {
32 participant,
33 discovery,
34 })
35 }
36}
37
38struct HddsDataReader {
39 inner: RawDataReader,
40 topic: String,
41 type_name: String,
42}
43
44impl DataReader for HddsDataReader {
45 fn take(&self) -> Result<Vec<ReceivedSample>> {
46 let samples = self
47 .inner
48 .try_take_raw()
49 .map_err(|e| anyhow!("RawDataReader::try_take_raw failed: {:?}", e))?;
50
51 Ok(samples
52 .into_iter()
53 .map(|sample| ReceivedSample {
54 topic: self.topic.clone(),
55 type_name: self.type_name.clone(),
56 payload: sample.payload,
57 writer_guid: sample.writer_guid.as_bytes(),
58 sequence: sample.sequence_number.unwrap_or(0),
59 timestamp_ns: system_time_to_ns(sample.reception_timestamp),
60 })
61 .collect())
62 }
63
64 fn read(&self) -> Result<Vec<ReceivedSample>> {
65 self.take()
66 }
67
68 fn topic(&self) -> &str {
69 &self.topic
70 }
71
72 fn type_name(&self) -> &str {
73 &self.type_name
74 }
75}
76
77struct HddsDataWriter {
78 inner: Mutex<RawDataWriter>,
79 topic: String,
80 type_name: String,
81}
82
83impl DataWriter for HddsDataWriter {
84 fn write(&self, payload: &[u8]) -> Result<()> {
85 let writer = match self.inner.lock() {
86 Ok(guard) => guard,
87 Err(poisoned) => poisoned.into_inner(),
88 };
89 writer
90 .write_raw(payload)
91 .map_err(|e| anyhow!("RawDataWriter::write_raw failed: {:?}", e))
92 }
93
94 fn write_with_timestamp(&self, payload: &[u8], _timestamp_ns: u64) -> Result<()> {
95 self.write(payload)
96 }
97
98 fn topic(&self) -> &str {
99 &self.topic
100 }
101
102 fn type_name(&self) -> &str {
103 &self.type_name
104 }
105}
106
107struct HddsDiscoveryBridge {
108 callback: Arc<dyn DiscoveryCallback>,
109}
110
111impl DiscoveryListener for HddsDiscoveryBridge {
112 fn on_endpoint_discovered(&self, endpoint: EndpointInfo) {
113 let durability = durability_kind_from_qos(endpoint.qos.durability);
114 match endpoint.kind {
115 EndpointKind::Reader => {
116 self.callback.on_reader_discovered(DiscoveredReader {
117 guid: endpoint.endpoint_guid.as_bytes(),
118 topic: endpoint.topic_name,
119 type_name: endpoint.type_name,
120 durability,
121 });
122 }
123 EndpointKind::Writer => {
124 self.callback.on_writer_discovered(DiscoveredWriter {
125 guid: endpoint.endpoint_guid.as_bytes(),
126 topic: endpoint.topic_name,
127 type_name: endpoint.type_name,
128 durability,
129 retention_hint: retention_hint_from_qos(&endpoint.qos),
130 });
131 }
132 }
133 }
134}
135
136impl DdsInterface for HddsDdsInterface {
137 fn create_reader(
138 &self,
139 topic: &str,
140 type_name: &str,
141 durability: DurabilityKind,
142 ) -> Result<Box<dyn DataReader>> {
143 let qos = qos_for_durability(durability);
144
145 let reader = self
146 .participant
147 .create_raw_reader_with_type(topic, type_name, Some(qos), None)
148 .map_err(|e| anyhow!("create_raw_reader_with_type failed: {:?}", e))?;
149
150 Ok(Box::new(HddsDataReader {
151 inner: reader,
152 topic: topic.to_string(),
153 type_name: type_name.to_string(),
154 }))
155 }
156
157 fn create_writer(
158 &self,
159 topic: &str,
160 type_name: &str,
161 durability: DurabilityKind,
162 ) -> Result<Box<dyn DataWriter>> {
163 let qos = qos_for_durability(durability);
164
165 let writer = self
166 .participant
167 .create_raw_writer_with_type(topic, type_name, Some(qos), None)
168 .map_err(|e| anyhow!("create_raw_writer_with_type failed: {:?}", e))?;
169
170 Ok(Box::new(HddsDataWriter {
171 inner: Mutex::new(writer),
172 topic: topic.to_string(),
173 type_name: type_name.to_string(),
174 }))
175 }
176
177 fn discovered_readers(&self, topic_pattern: &str) -> Result<Vec<DiscoveredReader>> {
178 let topics = self.discovery.get_all_topics();
179 let mut readers = Vec::new();
180
181 for (topic, (_writers, discovered_readers)) in topics {
182 if !crate::dds_interface::topic_matches(topic_pattern, &topic) {
183 continue;
184 }
185 for reader in discovered_readers {
186 readers.push(DiscoveredReader {
187 guid: reader.endpoint_guid.as_bytes(),
188 topic: reader.topic_name,
189 type_name: reader.type_name,
190 durability: durability_kind_from_qos(reader.qos.durability),
191 });
192 }
193 }
194
195 Ok(readers)
196 }
197
198 fn discovered_writers(&self, topic_pattern: &str) -> Result<Vec<DiscoveredWriter>> {
199 let topics = self.discovery.get_all_topics();
200 let mut writers = Vec::new();
201
202 for (topic, (discovered_writers, _readers)) in topics {
203 if !crate::dds_interface::topic_matches(topic_pattern, &topic) {
204 continue;
205 }
206 for writer in discovered_writers {
207 writers.push(DiscoveredWriter {
208 guid: writer.endpoint_guid.as_bytes(),
209 topic: writer.topic_name,
210 type_name: writer.type_name,
211 durability: durability_kind_from_qos(writer.qos.durability),
212 retention_hint: retention_hint_from_qos(&writer.qos),
213 });
214 }
215 }
216
217 Ok(writers)
218 }
219
220 fn wait_for_discovery(&self, timeout: Duration) -> Result<bool> {
221 std::thread::sleep(timeout);
222 Ok(false)
223 }
224
225 fn register_discovery_callback(&self, callback: Arc<dyn DiscoveryCallback>) -> Result<()> {
226 let listener = Arc::new(HddsDiscoveryBridge { callback });
227 self.discovery.register_listener(listener);
228 Ok(())
229 }
230
231 fn guid(&self) -> [u8; 16] {
232 self.participant.guid().as_bytes()
233 }
234}
235
236fn system_time_to_ns(time: SystemTime) -> u64 {
237 match time.duration_since(UNIX_EPOCH) {
238 Ok(duration) => duration.as_nanos() as u64,
239 Err(_) => 0,
240 }
241}
242
243fn qos_for_durability(durability: DurabilityKind) -> QoS {
244 match durability {
245 DurabilityKind::Volatile => QoS::reliable().volatile().keep_all(),
246 DurabilityKind::TransientLocal => QoS::reliable().transient_local().keep_all(),
247 DurabilityKind::Persistent => QoS::reliable().persistent().keep_all(),
248 }
249}
250
251fn durability_kind_from_qos(durability: Durability) -> DurabilityKind {
252 match durability {
253 Durability::Volatile => DurabilityKind::Volatile,
254 Durability::TransientLocal => DurabilityKind::TransientLocal,
255 Durability::Persistent => DurabilityKind::Persistent,
256 }
257}
258
259fn retention_hint_from_qos(qos: &hdds::dds::qos::QoS) -> Option<RetentionPolicy> {
260 if !matches!(
261 qos.durability,
262 Durability::TransientLocal | Durability::Persistent
263 ) {
264 return None;
265 }
266
267 let mut keep_count: Option<usize> = None;
268 let mut apply_limit = |limit: usize| {
269 if limit == 0 {
270 return;
271 }
272 keep_count = Some(match keep_count {
273 Some(existing) => existing.min(limit),
274 None => limit,
275 });
276 };
277
278 match qos.history {
279 History::KeepLast(depth) if depth > 0 => apply_limit(depth as usize),
280 History::KeepAll => {
281 if qos.resource_limits.max_samples > 0 {
282 apply_limit(qos.resource_limits.max_samples);
283 }
284 }
285 _ => {}
286 }
287
288 if qos.resource_limits.max_samples > 0 {
289 apply_limit(qos.resource_limits.max_samples);
290 }
291
292 if qos.durability_service != DurabilityService::default() {
293 if qos.durability_service.history_depth > 0 {
294 apply_limit(qos.durability_service.history_depth as usize);
295 }
296 if qos.durability_service.max_samples > 0 {
297 apply_limit(qos.durability_service.max_samples as usize);
298 }
299 }
300
301 keep_count.map(|keep_count| RetentionPolicy {
302 keep_count,
303 max_age_ns: None,
304 max_bytes: None,
305 })
306}