1use crate::store::{RetentionPolicy, Sample};
24use anyhow::Result;
25use std::sync::Arc;
26use std::time::Duration;
27
28#[derive(Debug, Clone)]
30pub struct ReceivedSample {
31 pub topic: String,
33 pub type_name: String,
35 pub payload: Vec<u8>,
37 pub writer_guid: [u8; 16],
39 pub sequence: u64,
41 pub timestamp_ns: u64,
43}
44
45impl From<ReceivedSample> for Sample {
46 fn from(rs: ReceivedSample) -> Self {
47 Sample {
48 topic: rs.topic,
49 type_name: rs.type_name,
50 payload: rs.payload,
51 timestamp_ns: rs.timestamp_ns,
52 sequence: rs.sequence,
53 source_guid: rs.writer_guid,
54 }
55 }
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum DurabilityKind {
61 Volatile,
62 TransientLocal,
63 Persistent,
64}
65
66impl DurabilityKind {
67 pub fn is_durable(self) -> bool {
69 matches!(self, Self::TransientLocal | Self::Persistent)
70 }
71
72 pub fn rank(self) -> u8 {
74 match self {
75 Self::Volatile => 0,
76 Self::TransientLocal => 1,
77 Self::Persistent => 2,
78 }
79 }
80}
81
82#[derive(Debug, Clone)]
84pub struct DiscoveredReader {
85 pub guid: [u8; 16],
87 pub topic: String,
89 pub type_name: String,
91 pub durability: DurabilityKind,
93}
94
95#[derive(Debug, Clone)]
97pub struct DiscoveredWriter {
98 pub guid: [u8; 16],
100 pub topic: String,
102 pub type_name: String,
104 pub durability: DurabilityKind,
106 pub retention_hint: Option<RetentionPolicy>,
108}
109
110pub trait DataReader: Send + Sync {
112 fn take(&self) -> Result<Vec<ReceivedSample>>;
114
115 fn read(&self) -> Result<Vec<ReceivedSample>>;
117
118 fn topic(&self) -> &str;
120
121 fn type_name(&self) -> &str;
123}
124
125pub trait DataWriter: Send + Sync {
127 fn write(&self, payload: &[u8]) -> Result<()>;
129
130 fn write_with_timestamp(&self, payload: &[u8], timestamp_ns: u64) -> Result<()>;
132
133 fn topic(&self) -> &str;
135
136 fn type_name(&self) -> &str;
138}
139
140pub trait DdsInterface: Send + Sync {
145 fn create_reader(
151 &self,
152 topic: &str,
153 type_name: &str,
154 durability: DurabilityKind,
155 ) -> Result<Box<dyn DataReader>>;
156
157 fn create_writer(
163 &self,
164 topic: &str,
165 type_name: &str,
166 durability: DurabilityKind,
167 ) -> Result<Box<dyn DataWriter>>;
168
169 fn discovered_readers(&self, topic_pattern: &str) -> Result<Vec<DiscoveredReader>>;
171
172 fn discovered_writers(&self, topic_pattern: &str) -> Result<Vec<DiscoveredWriter>>;
174
175 fn wait_for_discovery(&self, timeout: Duration) -> Result<bool>;
179
180 fn register_discovery_callback(&self, callback: Arc<dyn DiscoveryCallback>) -> Result<()>;
182
183 fn guid(&self) -> [u8; 16];
185}
186
187pub trait SampleCallback: Send + Sync {
191 fn on_sample(&self, sample: ReceivedSample);
193}
194
195pub trait DiscoveryCallback: Send + Sync {
197 fn on_reader_discovered(&self, reader: DiscoveredReader);
199
200 fn on_reader_removed(&self, guid: [u8; 16]);
202
203 fn on_writer_discovered(&self, writer: DiscoveredWriter);
205
206 fn on_writer_removed(&self, guid: [u8; 16]);
208}
209
210pub struct MockDdsInterface {
216 guid: [u8; 16],
217 readers: std::sync::Mutex<Vec<DiscoveredReader>>,
218 writers: std::sync::Mutex<Vec<DiscoveredWriter>>,
219 samples: std::sync::Mutex<Vec<ReceivedSample>>,
220 callbacks: std::sync::Mutex<Vec<Arc<dyn DiscoveryCallback>>>,
221}
222
223impl MockDdsInterface {
224 pub fn new() -> Self {
226 Self {
227 guid: [0x42; 16],
228 readers: std::sync::Mutex::new(Vec::new()),
229 writers: std::sync::Mutex::new(Vec::new()),
230 samples: std::sync::Mutex::new(Vec::new()),
231 callbacks: std::sync::Mutex::new(Vec::new()),
232 }
233 }
234
235 pub fn add_reader(&self, reader: DiscoveredReader) {
237 let mut readers = match self.readers.lock() {
238 Ok(guard) => guard,
239 Err(poisoned) => poisoned.into_inner(),
240 };
241 readers.push(reader.clone());
242 drop(readers);
243
244 let callbacks = match self.callbacks.lock() {
245 Ok(guard) => guard,
246 Err(poisoned) => poisoned.into_inner(),
247 };
248 for callback in callbacks.iter() {
249 callback.on_reader_discovered(reader.clone());
250 }
251 }
252
253 pub fn add_writer(&self, writer: DiscoveredWriter) {
255 let mut writers = match self.writers.lock() {
256 Ok(guard) => guard,
257 Err(poisoned) => poisoned.into_inner(),
258 };
259 writers.push(writer.clone());
260 drop(writers);
261
262 let callbacks = match self.callbacks.lock() {
263 Ok(guard) => guard,
264 Err(poisoned) => poisoned.into_inner(),
265 };
266 for callback in callbacks.iter() {
267 callback.on_writer_discovered(writer.clone());
268 }
269 }
270
271 pub fn add_sample(&self, sample: ReceivedSample) {
273 self.samples.lock().unwrap().push(sample);
274 }
275
276 pub fn clear(&self) {
278 let mut readers = match self.readers.lock() {
279 Ok(guard) => guard,
280 Err(poisoned) => poisoned.into_inner(),
281 };
282 readers.clear();
283 drop(readers);
284
285 let mut writers = match self.writers.lock() {
286 Ok(guard) => guard,
287 Err(poisoned) => poisoned.into_inner(),
288 };
289 writers.clear();
290 drop(writers);
291
292 let mut samples = match self.samples.lock() {
293 Ok(guard) => guard,
294 Err(poisoned) => poisoned.into_inner(),
295 };
296 samples.clear();
297 }
298}
299
300impl Default for MockDdsInterface {
301 fn default() -> Self {
302 Self::new()
303 }
304}
305
306struct MockDataReader {
308 topic: String,
309 type_name: String,
310 samples: std::sync::Arc<std::sync::Mutex<Vec<ReceivedSample>>>,
311}
312
313impl DataReader for MockDataReader {
314 fn take(&self) -> Result<Vec<ReceivedSample>> {
315 let mut samples = self.samples.lock().unwrap();
316 let matching: Vec<_> = samples
317 .iter()
318 .filter(|s| s.topic == self.topic)
319 .cloned()
320 .collect();
321 samples.retain(|s| s.topic != self.topic);
322 Ok(matching)
323 }
324
325 fn read(&self) -> Result<Vec<ReceivedSample>> {
326 let samples = self.samples.lock().unwrap();
327 Ok(samples
328 .iter()
329 .filter(|s| s.topic == self.topic)
330 .cloned()
331 .collect())
332 }
333
334 fn topic(&self) -> &str {
335 &self.topic
336 }
337
338 fn type_name(&self) -> &str {
339 &self.type_name
340 }
341}
342
343struct MockDataWriter {
345 topic: String,
346 type_name: String,
347}
348
349impl DataWriter for MockDataWriter {
350 fn write(&self, _payload: &[u8]) -> Result<()> {
351 tracing::debug!("MockDataWriter: write to {}", self.topic);
352 Ok(())
353 }
354
355 fn write_with_timestamp(&self, _payload: &[u8], _timestamp_ns: u64) -> Result<()> {
356 tracing::debug!("MockDataWriter: write_with_timestamp to {}", self.topic);
357 Ok(())
358 }
359
360 fn topic(&self) -> &str {
361 &self.topic
362 }
363
364 fn type_name(&self) -> &str {
365 &self.type_name
366 }
367}
368
369impl DdsInterface for MockDdsInterface {
370 fn create_reader(
371 &self,
372 topic: &str,
373 type_name: &str,
374 _durability: DurabilityKind,
375 ) -> Result<Box<dyn DataReader>> {
376 Ok(Box::new(MockDataReader {
377 topic: topic.to_string(),
378 type_name: type_name.to_string(),
379 samples: std::sync::Arc::new(self.samples.lock().unwrap().clone().into()),
380 }))
381 }
382
383 fn create_writer(
384 &self,
385 topic: &str,
386 type_name: &str,
387 _durability: DurabilityKind,
388 ) -> Result<Box<dyn DataWriter>> {
389 Ok(Box::new(MockDataWriter {
390 topic: topic.to_string(),
391 type_name: type_name.to_string(),
392 }))
393 }
394
395 fn discovered_readers(&self, topic_pattern: &str) -> Result<Vec<DiscoveredReader>> {
396 let readers = self.readers.lock().unwrap();
397 Ok(readers
398 .iter()
399 .filter(|r| topic_matches(topic_pattern, &r.topic))
400 .cloned()
401 .collect())
402 }
403
404 fn discovered_writers(&self, topic_pattern: &str) -> Result<Vec<DiscoveredWriter>> {
405 let writers = self.writers.lock().unwrap();
406 Ok(writers
407 .iter()
408 .filter(|w| topic_matches(topic_pattern, &w.topic))
409 .cloned()
410 .collect())
411 }
412
413 fn wait_for_discovery(&self, _timeout: Duration) -> Result<bool> {
414 Ok(true)
416 }
417
418 fn register_discovery_callback(&self, callback: Arc<dyn DiscoveryCallback>) -> Result<()> {
419 let mut callbacks = match self.callbacks.lock() {
420 Ok(guard) => guard,
421 Err(poisoned) => poisoned.into_inner(),
422 };
423 callbacks.push(callback);
424 Ok(())
425 }
426
427 fn guid(&self) -> [u8; 16] {
428 self.guid
429 }
430}
431
432pub(crate) fn topic_matches(pattern: &str, topic: &str) -> bool {
434 if pattern == "*" {
435 return true;
436 }
437 if let Some(prefix) = pattern.strip_suffix("/*") {
438 return topic.starts_with(prefix) && topic.len() > prefix.len();
439 }
440 pattern == topic
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446
447 #[test]
448 fn test_topic_matches() {
449 assert!(topic_matches("*", "any/topic"));
450 assert!(topic_matches("State/*", "State/Temperature"));
451 assert!(topic_matches("State/*", "State/Pressure"));
452 assert!(!topic_matches("State/*", "Command/Set"));
453 assert!(!topic_matches("State/*", "State")); assert!(topic_matches("exact/topic", "exact/topic"));
455 assert!(!topic_matches("exact/topic", "other/topic"));
456 }
457
458 #[test]
459 fn test_mock_dds_interface() {
460 let mock = MockDdsInterface::new();
461
462 mock.add_reader(DiscoveredReader {
463 guid: [0x01; 16],
464 topic: "State/Temperature".to_string(),
465 type_name: "Temperature".to_string(),
466 durability: DurabilityKind::TransientLocal,
467 });
468
469 mock.add_reader(DiscoveredReader {
470 guid: [0x02; 16],
471 topic: "Command/Set".to_string(),
472 type_name: "Command".to_string(),
473 durability: DurabilityKind::Volatile,
474 });
475
476 let state_readers = mock.discovered_readers("State/*").unwrap();
477 assert_eq!(state_readers.len(), 1);
478 assert_eq!(state_readers[0].topic, "State/Temperature");
479
480 let all_readers = mock.discovered_readers("*").unwrap();
481 assert_eq!(all_readers.len(), 2);
482 }
483
484 #[test]
485 fn test_mock_data_reader() {
486 let mock = MockDdsInterface::new();
487
488 mock.add_sample(ReceivedSample {
489 topic: "test/topic".to_string(),
490 type_name: "TestType".to_string(),
491 payload: vec![1, 2, 3],
492 writer_guid: [0xAA; 16],
493 sequence: 1,
494 timestamp_ns: 1000,
495 });
496
497 let reader = mock
498 .create_reader("test/topic", "TestType", DurabilityKind::Volatile)
499 .unwrap();
500 let samples = reader.read().unwrap();
501 assert_eq!(samples.len(), 1);
502 assert_eq!(samples[0].sequence, 1);
503 }
504
505 #[test]
506 fn test_received_sample_to_sample() {
507 let rs = ReceivedSample {
508 topic: "test".to_string(),
509 type_name: "Type".to_string(),
510 payload: vec![42],
511 writer_guid: [0xFF; 16],
512 sequence: 99,
513 timestamp_ns: 123456,
514 };
515
516 let sample: Sample = rs.into();
517 assert_eq!(sample.topic, "test");
518 assert_eq!(sample.sequence, 99);
519 assert_eq!(sample.source_guid, [0xFF; 16]);
520 }
521}