Skip to main content

hdds_persistence/
dds_interface.rs

1// SPDX-License-Identifier: Apache-2.0 OR MIT
2// Copyright (c) 2025-2026 naskel.com
3
4//! DDS Interface Abstraction
5//!
6//! Provides an abstract interface for DDS operations that the persistence
7//! service uses. This allows the persistence service to work without direct
8//! dependency on the core hdds Participant implementation.
9//!
10//! # Integration
11//!
12//! To integrate with HDDS core, implement the `DdsInterface` trait:
13//!
14//! ```ignore
15//! impl DdsInterface for HddsParticipant {
16//!     fn subscribe(&self, topic: &str) -> Result<Box<dyn DataReader>> {
17//!         // Create real DataReader...
18//!     }
19//!     // ...
20//! }
21//! ```
22
23use crate::store::{RetentionPolicy, Sample};
24use anyhow::Result;
25use std::sync::Arc;
26use std::time::Duration;
27
28/// Received sample from DDS
29#[derive(Debug, Clone)]
30pub struct ReceivedSample {
31    /// Topic name
32    pub topic: String,
33    /// Type name
34    pub type_name: String,
35    /// Serialized payload (CDR)
36    pub payload: Vec<u8>,
37    /// Source writer GUID
38    pub writer_guid: [u8; 16],
39    /// Sequence number
40    pub sequence: u64,
41    /// Reception timestamp (Unix nanoseconds)
42    pub timestamp_ns: u64,
43}
44
45impl From<ReceivedSample> for Sample {
46    fn from(rs: ReceivedSample) -> Self {
47        Sample {
48            topic: rs.topic,
49            type_name: rs.type_name,
50            payload: rs.payload,
51            timestamp_ns: rs.timestamp_ns,
52            sequence: rs.sequence,
53            source_guid: rs.writer_guid,
54        }
55    }
56}
57
58/// Durability policy exposed by discovery.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum DurabilityKind {
61    Volatile,
62    TransientLocal,
63    Persistent,
64}
65
66impl DurabilityKind {
67    /// Returns true for TransientLocal/Persistent durability.
68    pub fn is_durable(self) -> bool {
69        matches!(self, Self::TransientLocal | Self::Persistent)
70    }
71
72    /// Durability ordering for QoS compatibility.
73    pub fn rank(self) -> u8 {
74        match self {
75            Self::Volatile => 0,
76            Self::TransientLocal => 1,
77            Self::Persistent => 2,
78        }
79    }
80}
81
82/// Discovered reader information
83#[derive(Debug, Clone)]
84pub struct DiscoveredReader {
85    /// Reader GUID
86    pub guid: [u8; 16],
87    /// Topic name
88    pub topic: String,
89    /// Type name
90    pub type_name: String,
91    /// Durability requested by the reader.
92    pub durability: DurabilityKind,
93}
94
95/// Discovered writer information
96#[derive(Debug, Clone)]
97pub struct DiscoveredWriter {
98    /// Writer GUID
99    pub guid: [u8; 16],
100    /// Topic name
101    pub topic: String,
102    /// Type name
103    pub type_name: String,
104    /// Durability offered by the writer.
105    pub durability: DurabilityKind,
106    /// Optional retention hint derived from writer durability settings.
107    pub retention_hint: Option<RetentionPolicy>,
108}
109
110/// Abstract data reader interface
111pub trait DataReader: Send + Sync {
112    /// Take all available samples (removes from reader cache)
113    fn take(&self) -> Result<Vec<ReceivedSample>>;
114
115    /// Read samples without removing from cache
116    fn read(&self) -> Result<Vec<ReceivedSample>>;
117
118    /// Get topic name
119    fn topic(&self) -> &str;
120
121    /// Get type name
122    fn type_name(&self) -> &str;
123}
124
125/// Abstract data writer interface
126pub trait DataWriter: Send + Sync {
127    /// Write a sample
128    fn write(&self, payload: &[u8]) -> Result<()>;
129
130    /// Write a sample with timestamp
131    fn write_with_timestamp(&self, payload: &[u8], timestamp_ns: u64) -> Result<()>;
132
133    /// Get topic name
134    fn topic(&self) -> &str;
135
136    /// Get type name
137    fn type_name(&self) -> &str;
138}
139
140/// Abstract DDS interface for persistence service
141///
142/// This trait abstracts DDS operations so the persistence service
143/// doesn't depend on concrete HDDS implementation.
144pub trait DdsInterface: Send + Sync {
145    /// Create a data reader for a topic
146    ///
147    /// The reader should be configured with appropriate QoS for durability:
148    /// - RELIABLE reliability
149    /// - Durability matching the requested policy
150    fn create_reader(
151        &self,
152        topic: &str,
153        type_name: &str,
154        durability: DurabilityKind,
155    ) -> Result<Box<dyn DataReader>>;
156
157    /// Create a data writer for a topic
158    ///
159    /// The writer should be configured with appropriate QoS:
160    /// - RELIABLE reliability
161    /// - Durability matching the replay target
162    fn create_writer(
163        &self,
164        topic: &str,
165        type_name: &str,
166        durability: DurabilityKind,
167    ) -> Result<Box<dyn DataWriter>>;
168
169    /// Get list of discovered readers matching a topic pattern
170    fn discovered_readers(&self, topic_pattern: &str) -> Result<Vec<DiscoveredReader>>;
171
172    /// Get list of discovered writers matching a topic pattern
173    fn discovered_writers(&self, topic_pattern: &str) -> Result<Vec<DiscoveredWriter>>;
174
175    /// Wait for discovery events
176    ///
177    /// Returns when new readers/writers are discovered or timeout expires.
178    fn wait_for_discovery(&self, timeout: Duration) -> Result<bool>;
179
180    /// Register a discovery callback for endpoint events.
181    fn register_discovery_callback(&self, callback: Arc<dyn DiscoveryCallback>) -> Result<()>;
182
183    /// Get participant GUID
184    fn guid(&self) -> [u8; 16];
185}
186
187/// Callback-based sample receiver
188///
189/// Alternative to polling - receive samples via callback.
190pub trait SampleCallback: Send + Sync {
191    /// Called when a new sample is received
192    fn on_sample(&self, sample: ReceivedSample);
193}
194
195/// Callback-based discovery listener
196pub trait DiscoveryCallback: Send + Sync {
197    /// Called when a new reader is discovered
198    fn on_reader_discovered(&self, reader: DiscoveredReader);
199
200    /// Called when a reader is removed
201    fn on_reader_removed(&self, guid: [u8; 16]);
202
203    /// Called when a new writer is discovered
204    fn on_writer_discovered(&self, writer: DiscoveredWriter);
205
206    /// Called when a writer is removed
207    fn on_writer_removed(&self, guid: [u8; 16]);
208}
209
210// ============================================================================
211// Mock Implementation for Testing
212// ============================================================================
213
214/// Mock DDS interface for testing without real HDDS
215pub struct MockDdsInterface {
216    guid: [u8; 16],
217    readers: std::sync::Mutex<Vec<DiscoveredReader>>,
218    writers: std::sync::Mutex<Vec<DiscoveredWriter>>,
219    samples: std::sync::Mutex<Vec<ReceivedSample>>,
220    callbacks: std::sync::Mutex<Vec<Arc<dyn DiscoveryCallback>>>,
221}
222
223impl MockDdsInterface {
224    /// Create a new mock interface
225    pub fn new() -> Self {
226        Self {
227            guid: [0x42; 16],
228            readers: std::sync::Mutex::new(Vec::new()),
229            writers: std::sync::Mutex::new(Vec::new()),
230            samples: std::sync::Mutex::new(Vec::new()),
231            callbacks: std::sync::Mutex::new(Vec::new()),
232        }
233    }
234
235    /// Add a mock discovered reader
236    pub fn add_reader(&self, reader: DiscoveredReader) {
237        let mut readers = match self.readers.lock() {
238            Ok(guard) => guard,
239            Err(poisoned) => poisoned.into_inner(),
240        };
241        readers.push(reader.clone());
242        drop(readers);
243
244        let callbacks = match self.callbacks.lock() {
245            Ok(guard) => guard,
246            Err(poisoned) => poisoned.into_inner(),
247        };
248        for callback in callbacks.iter() {
249            callback.on_reader_discovered(reader.clone());
250        }
251    }
252
253    /// Add a mock discovered writer
254    pub fn add_writer(&self, writer: DiscoveredWriter) {
255        let mut writers = match self.writers.lock() {
256            Ok(guard) => guard,
257            Err(poisoned) => poisoned.into_inner(),
258        };
259        writers.push(writer.clone());
260        drop(writers);
261
262        let callbacks = match self.callbacks.lock() {
263            Ok(guard) => guard,
264            Err(poisoned) => poisoned.into_inner(),
265        };
266        for callback in callbacks.iter() {
267            callback.on_writer_discovered(writer.clone());
268        }
269    }
270
271    /// Add a mock sample to be returned by readers
272    pub fn add_sample(&self, sample: ReceivedSample) {
273        self.samples.lock().unwrap().push(sample);
274    }
275
276    /// Clear all mock data
277    pub fn clear(&self) {
278        let mut readers = match self.readers.lock() {
279            Ok(guard) => guard,
280            Err(poisoned) => poisoned.into_inner(),
281        };
282        readers.clear();
283        drop(readers);
284
285        let mut writers = match self.writers.lock() {
286            Ok(guard) => guard,
287            Err(poisoned) => poisoned.into_inner(),
288        };
289        writers.clear();
290        drop(writers);
291
292        let mut samples = match self.samples.lock() {
293            Ok(guard) => guard,
294            Err(poisoned) => poisoned.into_inner(),
295        };
296        samples.clear();
297    }
298}
299
300impl Default for MockDdsInterface {
301    fn default() -> Self {
302        Self::new()
303    }
304}
305
306/// Mock data reader
307struct MockDataReader {
308    topic: String,
309    type_name: String,
310    samples: std::sync::Arc<std::sync::Mutex<Vec<ReceivedSample>>>,
311}
312
313impl DataReader for MockDataReader {
314    fn take(&self) -> Result<Vec<ReceivedSample>> {
315        let mut samples = self.samples.lock().unwrap();
316        let matching: Vec<_> = samples
317            .iter()
318            .filter(|s| s.topic == self.topic)
319            .cloned()
320            .collect();
321        samples.retain(|s| s.topic != self.topic);
322        Ok(matching)
323    }
324
325    fn read(&self) -> Result<Vec<ReceivedSample>> {
326        let samples = self.samples.lock().unwrap();
327        Ok(samples
328            .iter()
329            .filter(|s| s.topic == self.topic)
330            .cloned()
331            .collect())
332    }
333
334    fn topic(&self) -> &str {
335        &self.topic
336    }
337
338    fn type_name(&self) -> &str {
339        &self.type_name
340    }
341}
342
343/// Mock data writer
344struct MockDataWriter {
345    topic: String,
346    type_name: String,
347}
348
349impl DataWriter for MockDataWriter {
350    fn write(&self, _payload: &[u8]) -> Result<()> {
351        tracing::debug!("MockDataWriter: write to {}", self.topic);
352        Ok(())
353    }
354
355    fn write_with_timestamp(&self, _payload: &[u8], _timestamp_ns: u64) -> Result<()> {
356        tracing::debug!("MockDataWriter: write_with_timestamp to {}", self.topic);
357        Ok(())
358    }
359
360    fn topic(&self) -> &str {
361        &self.topic
362    }
363
364    fn type_name(&self) -> &str {
365        &self.type_name
366    }
367}
368
369impl DdsInterface for MockDdsInterface {
370    fn create_reader(
371        &self,
372        topic: &str,
373        type_name: &str,
374        _durability: DurabilityKind,
375    ) -> Result<Box<dyn DataReader>> {
376        Ok(Box::new(MockDataReader {
377            topic: topic.to_string(),
378            type_name: type_name.to_string(),
379            samples: std::sync::Arc::new(self.samples.lock().unwrap().clone().into()),
380        }))
381    }
382
383    fn create_writer(
384        &self,
385        topic: &str,
386        type_name: &str,
387        _durability: DurabilityKind,
388    ) -> Result<Box<dyn DataWriter>> {
389        Ok(Box::new(MockDataWriter {
390            topic: topic.to_string(),
391            type_name: type_name.to_string(),
392        }))
393    }
394
395    fn discovered_readers(&self, topic_pattern: &str) -> Result<Vec<DiscoveredReader>> {
396        let readers = self.readers.lock().unwrap();
397        Ok(readers
398            .iter()
399            .filter(|r| topic_matches(topic_pattern, &r.topic))
400            .cloned()
401            .collect())
402    }
403
404    fn discovered_writers(&self, topic_pattern: &str) -> Result<Vec<DiscoveredWriter>> {
405        let writers = self.writers.lock().unwrap();
406        Ok(writers
407            .iter()
408            .filter(|w| topic_matches(topic_pattern, &w.topic))
409            .cloned()
410            .collect())
411    }
412
413    fn wait_for_discovery(&self, _timeout: Duration) -> Result<bool> {
414        // Mock: always return true immediately
415        Ok(true)
416    }
417
418    fn register_discovery_callback(&self, callback: Arc<dyn DiscoveryCallback>) -> Result<()> {
419        let mut callbacks = match self.callbacks.lock() {
420            Ok(guard) => guard,
421            Err(poisoned) => poisoned.into_inner(),
422        };
423        callbacks.push(callback);
424        Ok(())
425    }
426
427    fn guid(&self) -> [u8; 16] {
428        self.guid
429    }
430}
431
432/// Check if a topic matches a pattern (supports wildcards)
433pub(crate) fn topic_matches(pattern: &str, topic: &str) -> bool {
434    if pattern == "*" {
435        return true;
436    }
437    if let Some(prefix) = pattern.strip_suffix("/*") {
438        return topic.starts_with(prefix) && topic.len() > prefix.len();
439    }
440    pattern == topic
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446
447    #[test]
448    fn test_topic_matches() {
449        assert!(topic_matches("*", "any/topic"));
450        assert!(topic_matches("State/*", "State/Temperature"));
451        assert!(topic_matches("State/*", "State/Pressure"));
452        assert!(!topic_matches("State/*", "Command/Set"));
453        assert!(!topic_matches("State/*", "State")); // Must have something after /
454        assert!(topic_matches("exact/topic", "exact/topic"));
455        assert!(!topic_matches("exact/topic", "other/topic"));
456    }
457
458    #[test]
459    fn test_mock_dds_interface() {
460        let mock = MockDdsInterface::new();
461
462        mock.add_reader(DiscoveredReader {
463            guid: [0x01; 16],
464            topic: "State/Temperature".to_string(),
465            type_name: "Temperature".to_string(),
466            durability: DurabilityKind::TransientLocal,
467        });
468
469        mock.add_reader(DiscoveredReader {
470            guid: [0x02; 16],
471            topic: "Command/Set".to_string(),
472            type_name: "Command".to_string(),
473            durability: DurabilityKind::Volatile,
474        });
475
476        let state_readers = mock.discovered_readers("State/*").unwrap();
477        assert_eq!(state_readers.len(), 1);
478        assert_eq!(state_readers[0].topic, "State/Temperature");
479
480        let all_readers = mock.discovered_readers("*").unwrap();
481        assert_eq!(all_readers.len(), 2);
482    }
483
484    #[test]
485    fn test_mock_data_reader() {
486        let mock = MockDdsInterface::new();
487
488        mock.add_sample(ReceivedSample {
489            topic: "test/topic".to_string(),
490            type_name: "TestType".to_string(),
491            payload: vec![1, 2, 3],
492            writer_guid: [0xAA; 16],
493            sequence: 1,
494            timestamp_ns: 1000,
495        });
496
497        let reader = mock
498            .create_reader("test/topic", "TestType", DurabilityKind::Volatile)
499            .unwrap();
500        let samples = reader.read().unwrap();
501        assert_eq!(samples.len(), 1);
502        assert_eq!(samples[0].sequence, 1);
503    }
504
505    #[test]
506    fn test_received_sample_to_sample() {
507        let rs = ReceivedSample {
508            topic: "test".to_string(),
509            type_name: "Type".to_string(),
510            payload: vec![42],
511            writer_guid: [0xFF; 16],
512            sequence: 99,
513            timestamp_ns: 123456,
514        };
515
516        let sample: Sample = rs.into();
517        assert_eq!(sample.topic, "test");
518        assert_eq!(sample.sequence, 99);
519        assert_eq!(sample.source_guid, [0xFF; 16]);
520    }
521}