1extern crate alloc;
35
36use alloc::collections::BTreeMap;
37use alloc::string::{String, ToString};
38use alloc::vec::Vec;
39use core::time::Duration as CoreDuration;
40use std::path::PathBuf;
41use std::sync::Mutex;
42use std::time::SystemTime;
43
44use zerodds_qos::DurabilityKind;
45use zerodds_qos::policies::durability_service::DurabilityServiceQosPolicy;
46use zerodds_qos::policies::history::HistoryKind;
47use zerodds_qos::policies::resource_limits::LENGTH_UNLIMITED;
48
49use crate::error::{DdsError, Result};
50
51#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct DurabilitySample {
54 pub topic: String,
56 pub instance_key: [u8; 16],
58 pub sequence: u64,
60 pub payload: Vec<u8>,
62 pub created_at: SystemTime,
64}
65
66pub trait DurabilityBackend: Send + Sync {
70 fn store(&self, sample: DurabilitySample) -> Result<()>;
76
77 fn replay_for_topic(&self, topic: &str) -> Result<Vec<DurabilitySample>>;
83
84 fn unregister_instance(
92 &self,
93 topic: &str,
94 instance_key: [u8; 16],
95 now: SystemTime,
96 ) -> Result<()>;
97
98 fn cleanup(&self, now: SystemTime) -> Result<usize>;
104}
105
106fn cleanup_delay(qos: &DurabilityServiceQosPolicy) -> CoreDuration {
110 let secs = u64::try_from(qos.service_cleanup_delay.seconds.max(0)).unwrap_or(0);
111 let frac = u64::from(qos.service_cleanup_delay.fraction);
113 let nanos = (frac.saturating_mul(1_000_000_000) >> 32) as u32;
114 CoreDuration::new(secs, nanos)
115}
116
117#[derive(Debug, Default, Clone)]
119struct InstanceSlot {
120 samples: Vec<DurabilitySample>,
121 unregistered_at: Option<SystemTime>,
124}
125
126impl InstanceSlot {
127 fn push(
128 &mut self,
129 s: DurabilitySample,
130 history_kind: HistoryKind,
131 history_depth: i32,
132 max_samples_per_instance: i32,
133 ) -> Result<()> {
134 match history_kind {
136 HistoryKind::KeepLast => {
137 let depth_unsigned = if history_depth <= 0 {
138 1
139 } else {
140 history_depth as usize
141 };
142 while self.samples.len() >= depth_unsigned {
143 self.samples.remove(0);
144 }
145 }
146 HistoryKind::KeepAll => {
147 if max_samples_per_instance != LENGTH_UNLIMITED
148 && self.samples.len() >= max_samples_per_instance as usize
149 {
150 return Err(DdsError::OutOfResources {
151 what: "durability backend: max_samples_per_instance reached",
152 });
153 }
154 }
155 }
156 self.samples.push(s);
157 Ok(())
158 }
159}
160
161type Key = (String, [u8; 16]);
163
164#[derive(Debug, Default)]
165struct InMemoryState {
166 by_key: BTreeMap<Key, InstanceSlot>,
167 total_samples: usize,
168}
169
170pub struct InMemoryDurabilityBackend {
172 qos: DurabilityServiceQosPolicy,
173 state: Mutex<InMemoryState>,
174}
175
176impl InMemoryDurabilityBackend {
177 #[must_use]
179 pub fn new(qos: DurabilityServiceQosPolicy) -> Self {
180 Self {
181 qos,
182 state: Mutex::new(InMemoryState::default()),
183 }
184 }
185
186 #[must_use]
188 pub fn len(&self) -> usize {
189 self.state.lock().map(|s| s.total_samples).unwrap_or(0)
190 }
191
192 #[must_use]
194 pub fn is_empty(&self) -> bool {
195 self.len() == 0
196 }
197}
198
199impl DurabilityBackend for InMemoryDurabilityBackend {
200 fn store(&self, sample: DurabilitySample) -> Result<()> {
201 let mut g = self
202 .state
203 .lock()
204 .map_err(|_| DdsError::PreconditionNotMet {
205 reason: "in-memory durability backend poisoned",
206 })?;
207 if self.qos.max_samples != LENGTH_UNLIMITED
209 && g.total_samples >= self.qos.max_samples as usize
210 {
211 return Err(DdsError::OutOfResources {
212 what: "durability backend: max_samples reached",
213 });
214 }
215 let key = (sample.topic.clone(), sample.instance_key);
216 let new_instance = !g.by_key.contains_key(&key);
217 if new_instance
218 && self.qos.max_instances != LENGTH_UNLIMITED
219 && g.by_key.len() >= self.qos.max_instances as usize
220 {
221 return Err(DdsError::OutOfResources {
222 what: "durability backend: max_instances reached",
223 });
224 }
225 let slot = g.by_key.entry(key).or_default();
226 let before = slot.samples.len();
227 slot.push(
228 sample,
229 self.qos.history_kind,
230 self.qos.history_depth,
231 self.qos.max_samples_per_instance,
232 )?;
233 let delta = slot.samples.len() as isize - before as isize;
234 g.total_samples = (g.total_samples as isize + delta).max(0) as usize;
235 Ok(())
236 }
237
238 fn replay_for_topic(&self, topic: &str) -> Result<Vec<DurabilitySample>> {
239 let g = self
240 .state
241 .lock()
242 .map_err(|_| DdsError::PreconditionNotMet {
243 reason: "in-memory durability backend poisoned",
244 })?;
245 let mut out = Vec::new();
246 for ((t, _), slot) in g.by_key.iter() {
247 if t == topic {
248 out.extend(slot.samples.iter().cloned());
249 }
250 }
251 out.sort_by_key(|s| (s.instance_key, s.sequence));
252 Ok(out)
253 }
254
255 fn unregister_instance(
256 &self,
257 topic: &str,
258 instance_key: [u8; 16],
259 now: SystemTime,
260 ) -> Result<()> {
261 let mut g = self
262 .state
263 .lock()
264 .map_err(|_| DdsError::PreconditionNotMet {
265 reason: "in-memory durability backend poisoned",
266 })?;
267 if let Some(slot) = g.by_key.get_mut(&(topic.to_string(), instance_key)) {
268 slot.unregistered_at = Some(now);
269 }
270 Ok(())
271 }
272
273 fn cleanup(&self, now: SystemTime) -> Result<usize> {
274 let delay = cleanup_delay(&self.qos);
275 let mut g = self
276 .state
277 .lock()
278 .map_err(|_| DdsError::PreconditionNotMet {
279 reason: "in-memory durability backend poisoned",
280 })?;
281 let to_remove: Vec<Key> = g
282 .by_key
283 .iter()
284 .filter_map(|(k, slot)| {
285 slot.unregistered_at.and_then(|ts| {
286 let due = ts.checked_add(delay)?;
287 if now >= due { Some(k.clone()) } else { None }
288 })
289 })
290 .collect();
291 let removed = to_remove.len();
292 for k in to_remove {
293 if let Some(slot) = g.by_key.remove(&k) {
294 g.total_samples = g.total_samples.saturating_sub(slot.samples.len());
295 }
296 }
297 Ok(removed)
298 }
299}
300
301pub struct OnDiskDurabilityBackend {
309 qos: DurabilityServiceQosPolicy,
310 root: PathBuf,
311}
312
313impl OnDiskDurabilityBackend {
314 pub fn new<P: Into<PathBuf>>(root: P, qos: DurabilityServiceQosPolicy) -> Result<Self> {
319 let root = root.into();
320 std::fs::create_dir_all(&root).map_err(|e| DdsError::PreconditionNotMet {
321 reason: io_static_msg(&e, "durability backend: cannot create root"),
322 })?;
323 Ok(Self { qos, root })
324 }
325
326 fn instance_dir(&self, topic: &str, key: &[u8; 16]) -> PathBuf {
327 let mut p = self.root.join(sanitize_topic(topic));
328 p.push(hex16(key));
329 p
330 }
331
332 fn unregister_marker(&self, topic: &str, key: &[u8; 16]) -> PathBuf {
333 self.instance_dir(topic, key).join(".unregistered")
334 }
335
336 fn count_total_samples(&self) -> Result<usize> {
337 let mut total = 0usize;
338 let topics = match std::fs::read_dir(&self.root) {
339 Ok(d) => d,
340 Err(_) => return Ok(0),
341 };
342 for topic_dir in topics.flatten() {
343 if !topic_dir.file_type().map(|t| t.is_dir()).unwrap_or(false) {
344 continue;
345 }
346 let instances = match std::fs::read_dir(topic_dir.path()) {
347 Ok(d) => d,
348 Err(_) => continue,
349 };
350 for inst_dir in instances.flatten() {
351 if !inst_dir.file_type().map(|t| t.is_dir()).unwrap_or(false) {
352 continue;
353 }
354 if let Ok(samples) = std::fs::read_dir(inst_dir.path()) {
355 for s in samples.flatten() {
356 if s.file_name() != ".unregistered" {
357 total += 1;
358 }
359 }
360 }
361 }
362 }
363 Ok(total)
364 }
365
366 fn count_instances_for_topic(&self, topic: &str) -> usize {
367 let topic_dir = self.root.join(sanitize_topic(topic));
368 match std::fs::read_dir(&topic_dir) {
369 Ok(d) => d
370 .flatten()
371 .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
372 .count(),
373 Err(_) => 0,
374 }
375 }
376}
377
378fn io_static_msg(_e: &std::io::Error, msg: &'static str) -> &'static str {
379 msg
380}
381
382fn sanitize_topic(topic: &str) -> String {
383 topic
385 .chars()
386 .map(|c| {
387 if c.is_ascii_alphanumeric() || c == '_' || c == '.' || c == '-' {
388 c
389 } else {
390 '_'
391 }
392 })
393 .collect()
394}
395
396fn hex16(b: &[u8; 16]) -> String {
397 let mut s = String::with_capacity(32);
398 for x in b {
399 let hi = (x >> 4) & 0xF;
400 let lo = x & 0xF;
401 s.push(core::char::from_digit(u32::from(hi), 16).unwrap_or('0'));
402 s.push(core::char::from_digit(u32::from(lo), 16).unwrap_or('0'));
403 }
404 s
405}
406
407impl DurabilityBackend for OnDiskDurabilityBackend {
408 fn store(&self, sample: DurabilitySample) -> Result<()> {
409 if self.qos.max_samples != LENGTH_UNLIMITED {
411 let total = self.count_total_samples()?;
412 if total >= self.qos.max_samples as usize {
413 return Err(DdsError::OutOfResources {
414 what: "durability backend: max_samples reached",
415 });
416 }
417 }
418 let inst_dir = self.instance_dir(&sample.topic, &sample.instance_key);
419 let new_instance = !inst_dir.exists();
420 if new_instance && self.qos.max_instances != LENGTH_UNLIMITED {
421 let count = self.count_instances_for_topic(&sample.topic);
422 if count >= self.qos.max_instances as usize {
423 return Err(DdsError::OutOfResources {
424 what: "durability backend: max_instances reached",
425 });
426 }
427 }
428 std::fs::create_dir_all(&inst_dir).map_err(|e| DdsError::PreconditionNotMet {
429 reason: io_static_msg(&e, "durability backend: mkdir failed"),
430 })?;
431 match self.qos.history_kind {
433 HistoryKind::KeepLast => {
434 let depth = if self.qos.history_depth <= 0 {
435 1
436 } else {
437 self.qos.history_depth as usize
438 };
439 let mut existing: Vec<(u64, std::path::PathBuf)> = std::fs::read_dir(&inst_dir)
440 .map_err(|e| DdsError::PreconditionNotMet {
441 reason: io_static_msg(&e, "durability backend: readdir failed"),
442 })?
443 .flatten()
444 .filter_map(|e| {
445 let name = e.file_name().to_string_lossy().to_string();
446 if name == ".unregistered" {
447 return None;
448 }
449 let stem = name.strip_suffix(".bin")?;
450 let seq = stem.parse::<u64>().ok()?;
451 Some((seq, e.path()))
452 })
453 .collect();
454 existing.sort_by_key(|(seq, _)| *seq);
455 while existing.len() >= depth {
456 let (_, p) = existing.remove(0);
457 let _ = std::fs::remove_file(&p);
458 }
459 }
460 HistoryKind::KeepAll => {
461 if self.qos.max_samples_per_instance != LENGTH_UNLIMITED {
462 let count = std::fs::read_dir(&inst_dir)
463 .map_err(|e| DdsError::PreconditionNotMet {
464 reason: io_static_msg(&e, "durability backend: readdir failed"),
465 })?
466 .flatten()
467 .filter(|e| e.file_name() != ".unregistered")
468 .count();
469 if count >= self.qos.max_samples_per_instance as usize {
470 return Err(DdsError::OutOfResources {
471 what: "durability backend: max_samples_per_instance reached",
472 });
473 }
474 }
475 }
476 }
477 let path = inst_dir.join(alloc::format!("{}.bin", sample.sequence));
478 std::fs::write(&path, &sample.payload).map_err(|e| DdsError::PreconditionNotMet {
479 reason: io_static_msg(&e, "durability backend: write failed"),
480 })?;
481 Ok(())
482 }
483
484 fn replay_for_topic(&self, topic: &str) -> Result<Vec<DurabilitySample>> {
485 let topic_dir = self.root.join(sanitize_topic(topic));
486 let mut out = Vec::new();
487 let dirs = match std::fs::read_dir(&topic_dir) {
488 Ok(d) => d,
489 Err(_) => return Ok(Vec::new()),
490 };
491 for inst_entry in dirs.flatten() {
492 let inst_path = inst_entry.path();
493 let key = match parse_hex16(&inst_entry.file_name().to_string_lossy()) {
494 Some(k) => k,
495 None => continue,
496 };
497 let samples = match std::fs::read_dir(&inst_path) {
498 Ok(s) => s,
499 Err(_) => continue,
500 };
501 for entry in samples.flatten() {
502 let name = entry.file_name().to_string_lossy().to_string();
503 if name == ".unregistered" {
504 continue;
505 }
506 let Some(stem) = name.strip_suffix(".bin") else {
507 continue;
508 };
509 let Ok(seq) = stem.parse::<u64>() else {
510 continue;
511 };
512 let payload =
513 std::fs::read(entry.path()).map_err(|e| DdsError::PreconditionNotMet {
514 reason: io_static_msg(&e, "durability backend: read failed"),
515 })?;
516 let created_at = entry
517 .metadata()
518 .and_then(|m| m.modified())
519 .unwrap_or(SystemTime::UNIX_EPOCH);
520 out.push(DurabilitySample {
521 topic: topic.to_string(),
522 instance_key: key,
523 sequence: seq,
524 payload,
525 created_at,
526 });
527 }
528 }
529 out.sort_by_key(|s| (s.instance_key, s.sequence));
530 Ok(out)
531 }
532
533 fn unregister_instance(
534 &self,
535 topic: &str,
536 instance_key: [u8; 16],
537 now: SystemTime,
538 ) -> Result<()> {
539 let dir = self.instance_dir(topic, &instance_key);
540 if !dir.exists() {
541 return Ok(());
542 }
543 let nanos = now
544 .duration_since(SystemTime::UNIX_EPOCH)
545 .unwrap_or_default()
546 .as_nanos();
547 let marker = self.unregister_marker(topic, &instance_key);
548 std::fs::write(&marker, nanos.to_string().as_bytes()).map_err(|e| {
549 DdsError::PreconditionNotMet {
550 reason: io_static_msg(&e, "durability backend: marker write failed"),
551 }
552 })?;
553 Ok(())
554 }
555
556 fn cleanup(&self, now: SystemTime) -> Result<usize> {
557 let delay = cleanup_delay(&self.qos);
558 let mut removed = 0usize;
559 let topics = match std::fs::read_dir(&self.root) {
560 Ok(d) => d,
561 Err(_) => return Ok(0),
562 };
563 for topic_dir in topics.flatten() {
564 if !topic_dir.file_type().map(|t| t.is_dir()).unwrap_or(false) {
565 continue;
566 }
567 let instances = match std::fs::read_dir(topic_dir.path()) {
568 Ok(d) => d,
569 Err(_) => continue,
570 };
571 for inst_dir in instances.flatten() {
572 if !inst_dir.file_type().map(|t| t.is_dir()).unwrap_or(false) {
573 continue;
574 }
575 let marker = inst_dir.path().join(".unregistered");
576 let Ok(content) = std::fs::read_to_string(&marker) else {
577 continue;
578 };
579 let Ok(nanos) = content.trim().parse::<u128>() else {
580 continue;
581 };
582 let unreg = SystemTime::UNIX_EPOCH + CoreDuration::from_nanos(nanos as u64);
583 let due = unreg.checked_add(delay).unwrap_or(SystemTime::UNIX_EPOCH);
584 if now >= due && std::fs::remove_dir_all(inst_dir.path()).is_ok() {
585 removed += 1;
586 }
587 }
588 }
589 Ok(removed)
590 }
591}
592
593fn parse_hex16(s: &str) -> Option<[u8; 16]> {
594 if s.len() != 32 {
595 return None;
596 }
597 let mut out = [0u8; 16];
598 for (i, chunk) in s.as_bytes().chunks(2).enumerate() {
599 let hi = (chunk[0] as char).to_digit(16)?;
600 let lo = (chunk[1] as char).to_digit(16)?;
601 out[i] = ((hi << 4) | lo) as u8;
602 }
603 Some(out)
604}
605
606pub fn make_backend(
614 kind: DurabilityKind,
615 qos: DurabilityServiceQosPolicy,
616 root: Option<PathBuf>,
617) -> Result<alloc::boxed::Box<dyn DurabilityBackend>> {
618 match kind {
619 DurabilityKind::Volatile | DurabilityKind::TransientLocal => Err(DdsError::BadParameter {
620 what: "durability backend: kind does not need a service",
621 }),
622 DurabilityKind::Transient => {
623 Ok(alloc::boxed::Box::new(InMemoryDurabilityBackend::new(qos)))
624 }
625 DurabilityKind::Persistent => {
626 let root = root.ok_or(DdsError::BadParameter {
627 what: "durability backend: Persistent kind requires root path",
628 })?;
629 Ok(alloc::boxed::Box::new(OnDiskDurabilityBackend::new(
630 root, qos,
631 )?))
632 }
633 }
634}
635
636#[cfg(test)]
637#[allow(clippy::expect_used, clippy::unwrap_used)]
638mod tests {
639 use super::*;
640 use std::time::Duration as StdDuration;
641
642 fn sample(topic: &str, key_byte: u8, seq: u64, payload: &[u8]) -> DurabilitySample {
643 DurabilitySample {
644 topic: topic.to_string(),
645 instance_key: [key_byte; 16],
646 sequence: seq,
647 payload: payload.to_vec(),
648 created_at: SystemTime::now(),
649 }
650 }
651
652 fn keep_all_qos() -> DurabilityServiceQosPolicy {
653 DurabilityServiceQosPolicy {
654 history_kind: HistoryKind::KeepAll,
655 history_depth: -1,
656 ..DurabilityServiceQosPolicy::default()
657 }
658 }
659
660 #[test]
661 fn in_memory_store_and_replay_returns_sorted_samples() {
662 let b = InMemoryDurabilityBackend::new(keep_all_qos());
663 b.store(sample("T", 1, 2, b"b")).unwrap();
664 b.store(sample("T", 1, 1, b"a")).unwrap();
665 b.store(sample("T", 2, 1, b"c")).unwrap();
666 let out = b.replay_for_topic("T").unwrap();
667 assert_eq!(out.len(), 3);
668 assert_eq!(out[0].sequence, 1);
670 assert_eq!(out[0].instance_key[0], 1);
671 assert_eq!(out[1].sequence, 2);
672 assert_eq!(out[1].instance_key[0], 1);
673 assert_eq!(out[2].instance_key[0], 2);
674 }
675
676 #[test]
677 fn in_memory_keeplast_caps_history_at_depth() {
678 let qos = DurabilityServiceQosPolicy {
679 history_kind: HistoryKind::KeepLast,
680 history_depth: 2,
681 ..DurabilityServiceQosPolicy::default()
682 };
683 let b = InMemoryDurabilityBackend::new(qos);
684 for i in 1u64..=5 {
685 b.store(sample("T", 1, i, &i.to_le_bytes())).unwrap();
686 }
687 let out = b.replay_for_topic("T").unwrap();
688 assert_eq!(out.len(), 2);
690 assert_eq!(out[0].sequence, 4);
691 assert_eq!(out[1].sequence, 5);
692 }
693
694 #[test]
695 fn in_memory_keepall_max_samples_per_instance_returns_oor() {
696 let qos = DurabilityServiceQosPolicy {
697 history_kind: HistoryKind::KeepAll,
698 history_depth: -1,
699 max_samples_per_instance: 2,
700 ..DurabilityServiceQosPolicy::default()
701 };
702 let b = InMemoryDurabilityBackend::new(qos);
703 b.store(sample("T", 1, 1, b"a")).unwrap();
704 b.store(sample("T", 1, 2, b"b")).unwrap();
705 let r = b.store(sample("T", 1, 3, b"c"));
706 assert!(matches!(r, Err(DdsError::OutOfResources { .. })));
707 }
708
709 #[test]
710 fn in_memory_max_samples_globally_returns_oor() {
711 let qos = DurabilityServiceQosPolicy {
712 history_kind: HistoryKind::KeepAll,
713 history_depth: -1,
714 max_samples: 2,
715 ..DurabilityServiceQosPolicy::default()
716 };
717 let b = InMemoryDurabilityBackend::new(qos);
718 b.store(sample("T", 1, 1, b"a")).unwrap();
719 b.store(sample("T", 2, 1, b"b")).unwrap();
720 let r = b.store(sample("T", 3, 1, b"c"));
721 assert!(matches!(r, Err(DdsError::OutOfResources { .. })));
722 }
723
724 #[test]
725 fn in_memory_max_instances_returns_oor() {
726 let qos = DurabilityServiceQosPolicy {
727 history_kind: HistoryKind::KeepAll,
728 history_depth: -1,
729 max_instances: 1,
730 ..DurabilityServiceQosPolicy::default()
731 };
732 let b = InMemoryDurabilityBackend::new(qos);
733 b.store(sample("T", 1, 1, b"a")).unwrap();
734 let r = b.store(sample("T", 2, 1, b"b"));
735 assert!(matches!(r, Err(DdsError::OutOfResources { .. })));
736 }
737
738 #[test]
739 fn in_memory_unregister_then_cleanup_removes_after_delay() {
740 let qos = DurabilityServiceQosPolicy {
741 service_cleanup_delay: zerodds_qos::Duration::from_millis(100),
742 history_kind: HistoryKind::KeepAll,
743 history_depth: -1,
744 ..DurabilityServiceQosPolicy::default()
745 };
746 let b = InMemoryDurabilityBackend::new(qos);
747 let t0 = SystemTime::now();
748 b.store(sample("T", 1, 1, b"a")).unwrap();
749 b.unregister_instance("T", [1u8; 16], t0).unwrap();
750 assert_eq!(b.cleanup(t0 + StdDuration::from_millis(50)).unwrap(), 0);
752 assert_eq!(b.replay_for_topic("T").unwrap().len(), 1);
753 assert_eq!(b.cleanup(t0 + StdDuration::from_millis(150)).unwrap(), 1);
755 assert!(b.replay_for_topic("T").unwrap().is_empty());
756 }
757
758 #[test]
759 fn in_memory_replay_filters_by_topic() {
760 let b = InMemoryDurabilityBackend::new(keep_all_qos());
761 b.store(sample("A", 1, 1, b"a1")).unwrap();
762 b.store(sample("B", 1, 1, b"b1")).unwrap();
763 let a = b.replay_for_topic("A").unwrap();
764 let bb = b.replay_for_topic("B").unwrap();
765 assert_eq!(a.len(), 1);
766 assert_eq!(bb.len(), 1);
767 assert_eq!(a[0].topic, "A");
768 assert_eq!(bb[0].topic, "B");
769 }
770
771 #[test]
772 fn in_memory_unknown_topic_returns_empty() {
773 let b = InMemoryDurabilityBackend::new(keep_all_qos());
774 assert!(b.replay_for_topic("nope").unwrap().is_empty());
775 }
776
777 #[test]
778 fn make_backend_rejects_volatile_and_transient_local() {
779 let r1 = make_backend(
780 DurabilityKind::Volatile,
781 DurabilityServiceQosPolicy::default(),
782 None,
783 );
784 let r2 = make_backend(
785 DurabilityKind::TransientLocal,
786 DurabilityServiceQosPolicy::default(),
787 None,
788 );
789 assert!(matches!(r1, Err(DdsError::BadParameter { .. })));
790 assert!(matches!(r2, Err(DdsError::BadParameter { .. })));
791 }
792
793 #[test]
794 fn make_backend_persistent_requires_root() {
795 let r = make_backend(
796 DurabilityKind::Persistent,
797 DurabilityServiceQosPolicy::default(),
798 None,
799 );
800 assert!(matches!(r, Err(DdsError::BadParameter { .. })));
801 }
802
803 #[test]
804 fn make_backend_transient_returns_in_memory() {
805 let b = make_backend(
806 DurabilityKind::Transient,
807 DurabilityServiceQosPolicy::default(),
808 None,
809 )
810 .unwrap();
811 b.store(sample("T", 1, 1, b"a")).unwrap();
812 assert_eq!(b.replay_for_topic("T").unwrap().len(), 1);
813 }
814
815 fn tmp_dir(prefix: &str) -> PathBuf {
816 let mut p = std::env::temp_dir();
817 let nanos = SystemTime::now()
818 .duration_since(SystemTime::UNIX_EPOCH)
819 .unwrap()
820 .as_nanos();
821 p.push(alloc::format!("zerodds-dur-{prefix}-{nanos}"));
822 p
823 }
824
825 #[test]
826 fn on_disk_store_and_replay_roundtrip() {
827 let root = tmp_dir("rt");
828 let b = OnDiskDurabilityBackend::new(&root, keep_all_qos()).unwrap();
829 b.store(sample("PersTopic", 7, 1, b"hello")).unwrap();
830 b.store(sample("PersTopic", 7, 2, b"world")).unwrap();
831 let out = b.replay_for_topic("PersTopic").unwrap();
832 assert_eq!(out.len(), 2);
833 assert_eq!(out[0].sequence, 1);
834 assert_eq!(out[0].payload, b"hello");
835 assert_eq!(out[1].sequence, 2);
836 assert_eq!(out[1].payload, b"world");
837 let _ = std::fs::remove_dir_all(&root);
838 }
839
840 #[test]
841 fn on_disk_keeplast_replaces_old_files() {
842 let root = tmp_dir("kl");
843 let qos = DurabilityServiceQosPolicy {
844 history_kind: HistoryKind::KeepLast,
845 history_depth: 2,
846 ..DurabilityServiceQosPolicy::default()
847 };
848 let b = OnDiskDurabilityBackend::new(&root, qos).unwrap();
849 for i in 1u64..=5 {
850 b.store(sample("T", 1, i, &i.to_le_bytes())).unwrap();
851 }
852 let out = b.replay_for_topic("T").unwrap();
853 assert_eq!(out.len(), 2);
854 assert_eq!(out[0].sequence, 4);
855 assert_eq!(out[1].sequence, 5);
856 let _ = std::fs::remove_dir_all(&root);
857 }
858
859 #[test]
860 fn on_disk_persistent_survives_backend_drop() {
861 let root = tmp_dir("survive");
862 {
863 let b = OnDiskDurabilityBackend::new(&root, keep_all_qos()).unwrap();
864 b.store(sample("Pers", 9, 42, b"alive")).unwrap();
865 } let b2 = OnDiskDurabilityBackend::new(&root, keep_all_qos()).unwrap();
868 let out = b2.replay_for_topic("Pers").unwrap();
869 assert_eq!(out.len(), 1);
870 assert_eq!(out[0].payload, b"alive");
871 let _ = std::fs::remove_dir_all(&root);
872 }
873
874 #[test]
875 fn on_disk_unregister_and_cleanup_removes_directory() {
876 let root = tmp_dir("cleanup");
877 let qos = DurabilityServiceQosPolicy {
878 service_cleanup_delay: zerodds_qos::Duration::from_millis(50),
879 history_kind: HistoryKind::KeepAll,
880 history_depth: -1,
881 ..DurabilityServiceQosPolicy::default()
882 };
883 let b = OnDiskDurabilityBackend::new(&root, qos).unwrap();
884 let t0 = SystemTime::now();
885 b.store(sample("CT", 5, 1, b"v")).unwrap();
886 b.unregister_instance("CT", [5u8; 16], t0).unwrap();
887 assert_eq!(b.cleanup(t0 + StdDuration::from_millis(10)).unwrap(), 0);
888 assert_eq!(b.cleanup(t0 + StdDuration::from_millis(100)).unwrap(), 1);
889 assert!(b.replay_for_topic("CT").unwrap().is_empty());
890 let _ = std::fs::remove_dir_all(&root);
891 }
892
893 #[test]
894 fn on_disk_max_samples_per_instance_returns_oor() {
895 let root = tmp_dir("oor");
896 let qos = DurabilityServiceQosPolicy {
897 history_kind: HistoryKind::KeepAll,
898 history_depth: -1,
899 max_samples_per_instance: 2,
900 ..DurabilityServiceQosPolicy::default()
901 };
902 let b = OnDiskDurabilityBackend::new(&root, qos).unwrap();
903 b.store(sample("T", 1, 1, b"a")).unwrap();
904 b.store(sample("T", 1, 2, b"b")).unwrap();
905 let r = b.store(sample("T", 1, 3, b"c"));
906 assert!(matches!(r, Err(DdsError::OutOfResources { .. })));
907 let _ = std::fs::remove_dir_all(&root);
908 }
909
910 #[test]
911 fn hex16_roundtrip() {
912 let key = [0xAB, 0xCD, 0xEF, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
913 let h = hex16(&key);
914 assert_eq!(h.len(), 32);
915 assert_eq!(parse_hex16(&h).unwrap(), key);
916 }
917
918 #[test]
919 fn parse_hex16_rejects_wrong_length_and_invalid_chars() {
920 assert!(parse_hex16("abc").is_none());
921 assert!(parse_hex16(&"x".repeat(32)).is_none());
922 }
923
924 #[test]
925 fn sanitize_topic_replaces_path_chars() {
926 assert_eq!(sanitize_topic("Topic/With:Path"), "Topic_With_Path");
927 assert_eq!(sanitize_topic("ok-name.v1"), "ok-name.v1");
928 }
929}