Skip to main content

hdds_persistence/
hdds_interface.rs

1// SPDX-License-Identifier: Apache-2.0 OR MIT
2// Copyright (c) 2025-2026 naskel.com
3
4//! HDDS-backed DDS interface implementation.
5
6use crate::dds_interface::{
7    DataReader, DataWriter, DdsInterface, DiscoveredReader, DiscoveredWriter, DiscoveryCallback,
8    DurabilityKind, ReceivedSample,
9};
10use crate::store::RetentionPolicy;
11use anyhow::{anyhow, Result};
12use hdds::core::discovery::multicast::{DiscoveryListener, EndpointInfo, EndpointKind};
13use hdds::dds::qos::{Durability, DurabilityService, History};
14use hdds::{Participant, QoS, RawDataReader, RawDataWriter};
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17
18/// HDDS-backed implementation of the persistence DDS interface.
19pub struct HddsDdsInterface {
20    participant: Arc<Participant>,
21    discovery: Arc<hdds::core::discovery::multicast::DiscoveryFsm>,
22}
23
24impl HddsDdsInterface {
25    /// Create a new HDDS-backed DDS interface.
26    pub fn new(participant: Arc<Participant>) -> Result<Self> {
27        let discovery = participant
28            .discovery()
29            .ok_or_else(|| anyhow!("Discovery not initialized"))?;
30
31        Ok(Self {
32            participant,
33            discovery,
34        })
35    }
36}
37
38struct HddsDataReader {
39    inner: RawDataReader,
40    topic: String,
41    type_name: String,
42}
43
44impl DataReader for HddsDataReader {
45    fn take(&self) -> Result<Vec<ReceivedSample>> {
46        let samples = self
47            .inner
48            .try_take_raw()
49            .map_err(|e| anyhow!("RawDataReader::try_take_raw failed: {:?}", e))?;
50
51        Ok(samples
52            .into_iter()
53            .map(|sample| ReceivedSample {
54                topic: self.topic.clone(),
55                type_name: self.type_name.clone(),
56                payload: sample.payload,
57                writer_guid: sample.writer_guid.as_bytes(),
58                sequence: sample.sequence_number.unwrap_or(0),
59                timestamp_ns: system_time_to_ns(sample.reception_timestamp),
60            })
61            .collect())
62    }
63
64    fn read(&self) -> Result<Vec<ReceivedSample>> {
65        self.take()
66    }
67
68    fn topic(&self) -> &str {
69        &self.topic
70    }
71
72    fn type_name(&self) -> &str {
73        &self.type_name
74    }
75}
76
77struct HddsDataWriter {
78    inner: Mutex<RawDataWriter>,
79    topic: String,
80    type_name: String,
81}
82
83impl DataWriter for HddsDataWriter {
84    fn write(&self, payload: &[u8]) -> Result<()> {
85        let writer = match self.inner.lock() {
86            Ok(guard) => guard,
87            Err(poisoned) => poisoned.into_inner(),
88        };
89        writer
90            .write_raw(payload)
91            .map_err(|e| anyhow!("RawDataWriter::write_raw failed: {:?}", e))
92    }
93
94    fn write_with_timestamp(&self, payload: &[u8], _timestamp_ns: u64) -> Result<()> {
95        self.write(payload)
96    }
97
98    fn topic(&self) -> &str {
99        &self.topic
100    }
101
102    fn type_name(&self) -> &str {
103        &self.type_name
104    }
105}
106
107struct HddsDiscoveryBridge {
108    callback: Arc<dyn DiscoveryCallback>,
109}
110
111impl DiscoveryListener for HddsDiscoveryBridge {
112    fn on_endpoint_discovered(&self, endpoint: EndpointInfo) {
113        let durability = durability_kind_from_qos(endpoint.qos.durability);
114        match endpoint.kind {
115            EndpointKind::Reader => {
116                self.callback.on_reader_discovered(DiscoveredReader {
117                    guid: endpoint.endpoint_guid.as_bytes(),
118                    topic: endpoint.topic_name,
119                    type_name: endpoint.type_name,
120                    durability,
121                });
122            }
123            EndpointKind::Writer => {
124                self.callback.on_writer_discovered(DiscoveredWriter {
125                    guid: endpoint.endpoint_guid.as_bytes(),
126                    topic: endpoint.topic_name,
127                    type_name: endpoint.type_name,
128                    durability,
129                    retention_hint: retention_hint_from_qos(&endpoint.qos),
130                });
131            }
132        }
133    }
134}
135
136impl DdsInterface for HddsDdsInterface {
137    fn create_reader(
138        &self,
139        topic: &str,
140        type_name: &str,
141        durability: DurabilityKind,
142    ) -> Result<Box<dyn DataReader>> {
143        let qos = qos_for_durability(durability);
144
145        let reader = self
146            .participant
147            .create_raw_reader_with_type(topic, type_name, Some(qos), None)
148            .map_err(|e| anyhow!("create_raw_reader_with_type failed: {:?}", e))?;
149
150        Ok(Box::new(HddsDataReader {
151            inner: reader,
152            topic: topic.to_string(),
153            type_name: type_name.to_string(),
154        }))
155    }
156
157    fn create_writer(
158        &self,
159        topic: &str,
160        type_name: &str,
161        durability: DurabilityKind,
162    ) -> Result<Box<dyn DataWriter>> {
163        let qos = qos_for_durability(durability);
164
165        let writer = self
166            .participant
167            .create_raw_writer_with_type(topic, type_name, Some(qos), None)
168            .map_err(|e| anyhow!("create_raw_writer_with_type failed: {:?}", e))?;
169
170        Ok(Box::new(HddsDataWriter {
171            inner: Mutex::new(writer),
172            topic: topic.to_string(),
173            type_name: type_name.to_string(),
174        }))
175    }
176
177    fn discovered_readers(&self, topic_pattern: &str) -> Result<Vec<DiscoveredReader>> {
178        let topics = self.discovery.get_all_topics();
179        let mut readers = Vec::new();
180
181        for (topic, (_writers, discovered_readers)) in topics {
182            if !crate::dds_interface::topic_matches(topic_pattern, &topic) {
183                continue;
184            }
185            for reader in discovered_readers {
186                readers.push(DiscoveredReader {
187                    guid: reader.endpoint_guid.as_bytes(),
188                    topic: reader.topic_name,
189                    type_name: reader.type_name,
190                    durability: durability_kind_from_qos(reader.qos.durability),
191                });
192            }
193        }
194
195        Ok(readers)
196    }
197
198    fn discovered_writers(&self, topic_pattern: &str) -> Result<Vec<DiscoveredWriter>> {
199        let topics = self.discovery.get_all_topics();
200        let mut writers = Vec::new();
201
202        for (topic, (discovered_writers, _readers)) in topics {
203            if !crate::dds_interface::topic_matches(topic_pattern, &topic) {
204                continue;
205            }
206            for writer in discovered_writers {
207                writers.push(DiscoveredWriter {
208                    guid: writer.endpoint_guid.as_bytes(),
209                    topic: writer.topic_name,
210                    type_name: writer.type_name,
211                    durability: durability_kind_from_qos(writer.qos.durability),
212                    retention_hint: retention_hint_from_qos(&writer.qos),
213                });
214            }
215        }
216
217        Ok(writers)
218    }
219
220    fn wait_for_discovery(&self, timeout: Duration) -> Result<bool> {
221        std::thread::sleep(timeout);
222        Ok(false)
223    }
224
225    fn register_discovery_callback(&self, callback: Arc<dyn DiscoveryCallback>) -> Result<()> {
226        let listener = Arc::new(HddsDiscoveryBridge { callback });
227        self.discovery.register_listener(listener);
228        Ok(())
229    }
230
231    fn guid(&self) -> [u8; 16] {
232        self.participant.guid().as_bytes()
233    }
234}
235
236fn system_time_to_ns(time: SystemTime) -> u64 {
237    match time.duration_since(UNIX_EPOCH) {
238        Ok(duration) => duration.as_nanos() as u64,
239        Err(_) => 0,
240    }
241}
242
243fn qos_for_durability(durability: DurabilityKind) -> QoS {
244    match durability {
245        DurabilityKind::Volatile => QoS::reliable().volatile().keep_all(),
246        DurabilityKind::TransientLocal => QoS::reliable().transient_local().keep_all(),
247        DurabilityKind::Persistent => QoS::reliable().persistent().keep_all(),
248    }
249}
250
251fn durability_kind_from_qos(durability: Durability) -> DurabilityKind {
252    match durability {
253        Durability::Volatile => DurabilityKind::Volatile,
254        Durability::TransientLocal => DurabilityKind::TransientLocal,
255        Durability::Persistent => DurabilityKind::Persistent,
256    }
257}
258
259fn retention_hint_from_qos(qos: &hdds::dds::qos::QoS) -> Option<RetentionPolicy> {
260    if !matches!(
261        qos.durability,
262        Durability::TransientLocal | Durability::Persistent
263    ) {
264        return None;
265    }
266
267    let mut keep_count: Option<usize> = None;
268    let mut apply_limit = |limit: usize| {
269        if limit == 0 {
270            return;
271        }
272        keep_count = Some(match keep_count {
273            Some(existing) => existing.min(limit),
274            None => limit,
275        });
276    };
277
278    match qos.history {
279        History::KeepLast(depth) if depth > 0 => apply_limit(depth as usize),
280        History::KeepAll => {
281            if qos.resource_limits.max_samples > 0 {
282                apply_limit(qos.resource_limits.max_samples);
283            }
284        }
285        _ => {}
286    }
287
288    if qos.resource_limits.max_samples > 0 {
289        apply_limit(qos.resource_limits.max_samples);
290    }
291
292    if qos.durability_service != DurabilityService::default() {
293        if qos.durability_service.history_depth > 0 {
294            apply_limit(qos.durability_service.history_depth as usize);
295        }
296        if qos.durability_service.max_samples > 0 {
297            apply_limit(qos.durability_service.max_samples as usize);
298        }
299    }
300
301    keep_count.map(|keep_count| RetentionPolicy {
302        keep_count,
303        max_age_ns: None,
304        max_bytes: None,
305    })
306}