Skip to main content

zerodds_dcps/
durability_service.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Durability-Service Storage-Backend (Spec §2.2.3.5 + §2.2.3.4
4//! TRANSIENT/PERSISTENT-Pfad).
5//!
6//! In DDS sind die Durability-Stufen wie folgt charakterisiert
7//! (Spec §2.2.3.4 Tab. 16):
8//! * `VOLATILE`: keine History fuer Late-Joiner.
9//! * `TRANSIENT_LOCAL`: Writer-History bis Disconnect.
10//! * `TRANSIENT`: separater Storage-Service haelt die History
11//!   ueber Writer-Lifetime; ueberlebt aber NICHT den Service-Crash.
12//! * `PERSISTENT`: wie TRANSIENT, aber Disk-persistent.
13//!
14//! Dieses Modul implementiert die Backend-Abstraktion fuer
15//! TRANSIENT + PERSISTENT. Der RTPS-Pfad fuettert es im
16//! Writer-DataPath (`runtime.rs::handle_user_publish`) und
17//! konsumiert es beim Late-Joiner-Match (zusaetzlich zur
18//! Writer-eigenen TRANSIENT_LOCAL-History).
19//!
20//! Architektur:
21//!
22//! 1. [`DurabilityBackend`] — Trait mit `store`, `replay_for_topic`,
23//!    `cleanup_after_delay`.
24//! 2. [`InMemoryDurabilityBackend`] — Default fuer TRANSIENT-Kind.
25//! 3. [`OnDiskDurabilityBackend`] — fuer PERSISTENT-Kind, persistiert
26//!    auf einer Verzeichnis-Hierarchie (eine Datei pro `(topic,
27//!    instance, sequence)`).
28//!
29//! Beide Backends respektieren `DurabilityServiceQosPolicy`:
30//! `service_cleanup_delay` (Wartezeit nach `unregister` bevor die
31//! Instance entfernt wird), `history_kind`/`history_depth` (Cap pro
32//! Instance), `max_samples`/`max_instances`/`max_samples_per_instance`.
33
34extern crate alloc;
35
36use alloc::collections::BTreeMap;
37use alloc::string::{String, ToString};
38use alloc::vec::Vec;
39use core::time::Duration as CoreDuration;
40use std::path::PathBuf;
41use std::sync::Mutex;
42use std::time::SystemTime;
43
44use zerodds_qos::DurabilityKind;
45use zerodds_qos::policies::durability_service::DurabilityServiceQosPolicy;
46use zerodds_qos::policies::history::HistoryKind;
47use zerodds_qos::policies::resource_limits::LENGTH_UNLIMITED;
48
49use crate::error::{DdsError, Result};
50
51/// Stable Sample-Slot in der Durability-History.
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct DurabilitySample {
54    /// Topic-Name.
55    pub topic: String,
56    /// Instance-KeyHash.
57    pub instance_key: [u8; 16],
58    /// Sequence-Number (Writer-vergeben, monoton).
59    pub sequence: u64,
60    /// Encoded Payload (XCDR2 Body).
61    pub payload: Vec<u8>,
62    /// Erzeugungszeitpunkt (Wall-Clock).
63    pub created_at: SystemTime,
64}
65
66/// Trait fuer Durability-Storage-Backends. Beide Implementationen
67/// (in-memory + on-disk) erfuellen das gleiche Interface, damit der
68/// Runtime-Pfad sie austauschen kann.
69pub trait DurabilityBackend: Send + Sync {
70    /// Persistiert ein Sample.
71    ///
72    /// # Errors
73    /// `OutOfResources` bei ueberschrittenen `max_samples`-Caps;
74    /// I/O-Fehler bei On-Disk-Backends.
75    fn store(&self, sample: DurabilitySample) -> Result<()>;
76
77    /// Liefert alle gespeicherten Samples fuer ein Topic in der
78    /// Reihenfolge ihrer Sequence-Number (sortiert pro Instance).
79    ///
80    /// # Errors
81    /// I/O-Fehler bei On-Disk-Backends.
82    fn replay_for_topic(&self, topic: &str) -> Result<Vec<DurabilitySample>>;
83
84    /// Markiert eine Instance als `unregister`, sodass nach
85    /// `service_cleanup_delay` die zugehoerigen Samples entfernt
86    /// werden. `now` ist die aktuelle Wall-Clock; das eigentliche
87    /// Loeschen passiert spaeter via [`Self::cleanup`].
88    ///
89    /// # Errors
90    /// I/O-Fehler bei On-Disk-Backends.
91    fn unregister_instance(
92        &self,
93        topic: &str,
94        instance_key: [u8; 16],
95        now: SystemTime,
96    ) -> Result<()>;
97
98    /// Loescht alle Instanzen, deren `unregister`-Zeitpunkt + Cleanup-
99    /// Delay vor `now` liegt. Liefert die Anzahl entfernter Instanzen.
100    ///
101    /// # Errors
102    /// I/O-Fehler bei On-Disk-Backends.
103    fn cleanup(&self, now: SystemTime) -> Result<usize>;
104}
105
106/// Helper: Service-Cleanup-Delay aus QoS in `core::time::Duration`.
107///
108/// QoS-Duration speichert `fraction` als `2^-32 s`, nicht als Nanos.
109fn cleanup_delay(qos: &DurabilityServiceQosPolicy) -> CoreDuration {
110    let secs = u64::try_from(qos.service_cleanup_delay.seconds.max(0)).unwrap_or(0);
111    // fraction (2^-32 s) → Nanosekunden
112    let frac = u64::from(qos.service_cleanup_delay.fraction);
113    let nanos = (frac.saturating_mul(1_000_000_000) >> 32) as u32;
114    CoreDuration::new(secs, nanos)
115}
116
117/// Pro-Instance-Slot mit ringbuffer-aehnlicher History.
118#[derive(Debug, Default, Clone)]
119struct InstanceSlot {
120    samples: Vec<DurabilitySample>,
121    /// `unregister`-Zeitpunkt; bei `Some(t)` wird die Instance nach
122    /// `t + service_cleanup_delay` aus dem Backend entfernt.
123    unregistered_at: Option<SystemTime>,
124}
125
126impl InstanceSlot {
127    fn push(
128        &mut self,
129        s: DurabilitySample,
130        history_kind: HistoryKind,
131        history_depth: i32,
132        max_samples_per_instance: i32,
133    ) -> Result<()> {
134        // History-Kind-Cap.
135        match history_kind {
136            HistoryKind::KeepLast => {
137                let depth_unsigned = if history_depth <= 0 {
138                    1
139                } else {
140                    history_depth as usize
141                };
142                while self.samples.len() >= depth_unsigned {
143                    self.samples.remove(0);
144                }
145            }
146            HistoryKind::KeepAll => {
147                if max_samples_per_instance != LENGTH_UNLIMITED
148                    && self.samples.len() >= max_samples_per_instance as usize
149                {
150                    return Err(DdsError::OutOfResources {
151                        what: "durability backend: max_samples_per_instance reached",
152                    });
153                }
154            }
155        }
156        self.samples.push(s);
157        Ok(())
158    }
159}
160
161/// `(topic, instance_key)`-Lookup-Key.
162type Key = (String, [u8; 16]);
163
164#[derive(Debug, Default)]
165struct InMemoryState {
166    by_key: BTreeMap<Key, InstanceSlot>,
167    total_samples: usize,
168}
169
170/// In-Memory-Durability-Backend (Default fuer DurabilityKind::Transient).
171pub struct InMemoryDurabilityBackend {
172    qos: DurabilityServiceQosPolicy,
173    state: Mutex<InMemoryState>,
174}
175
176impl InMemoryDurabilityBackend {
177    /// Konstruktor.
178    #[must_use]
179    pub fn new(qos: DurabilityServiceQosPolicy) -> Self {
180        Self {
181            qos,
182            state: Mutex::new(InMemoryState::default()),
183        }
184    }
185
186    /// Anzahl gespeicherter Samples (Diagnose / Tests).
187    #[must_use]
188    pub fn len(&self) -> usize {
189        self.state.lock().map(|s| s.total_samples).unwrap_or(0)
190    }
191
192    /// True wenn keine Samples gespeichert sind.
193    #[must_use]
194    pub fn is_empty(&self) -> bool {
195        self.len() == 0
196    }
197}
198
199impl DurabilityBackend for InMemoryDurabilityBackend {
200    fn store(&self, sample: DurabilitySample) -> Result<()> {
201        let mut g = self
202            .state
203            .lock()
204            .map_err(|_| DdsError::PreconditionNotMet {
205                reason: "in-memory durability backend poisoned",
206            })?;
207        // Cap auf max_samples / max_instances.
208        if self.qos.max_samples != LENGTH_UNLIMITED
209            && g.total_samples >= self.qos.max_samples as usize
210        {
211            return Err(DdsError::OutOfResources {
212                what: "durability backend: max_samples reached",
213            });
214        }
215        let key = (sample.topic.clone(), sample.instance_key);
216        let new_instance = !g.by_key.contains_key(&key);
217        if new_instance
218            && self.qos.max_instances != LENGTH_UNLIMITED
219            && g.by_key.len() >= self.qos.max_instances as usize
220        {
221            return Err(DdsError::OutOfResources {
222                what: "durability backend: max_instances reached",
223            });
224        }
225        let slot = g.by_key.entry(key).or_default();
226        let before = slot.samples.len();
227        slot.push(
228            sample,
229            self.qos.history_kind,
230            self.qos.history_depth,
231            self.qos.max_samples_per_instance,
232        )?;
233        let delta = slot.samples.len() as isize - before as isize;
234        g.total_samples = (g.total_samples as isize + delta).max(0) as usize;
235        Ok(())
236    }
237
238    fn replay_for_topic(&self, topic: &str) -> Result<Vec<DurabilitySample>> {
239        let g = self
240            .state
241            .lock()
242            .map_err(|_| DdsError::PreconditionNotMet {
243                reason: "in-memory durability backend poisoned",
244            })?;
245        let mut out = Vec::new();
246        for ((t, _), slot) in g.by_key.iter() {
247            if t == topic {
248                out.extend(slot.samples.iter().cloned());
249            }
250        }
251        out.sort_by_key(|s| (s.instance_key, s.sequence));
252        Ok(out)
253    }
254
255    fn unregister_instance(
256        &self,
257        topic: &str,
258        instance_key: [u8; 16],
259        now: SystemTime,
260    ) -> Result<()> {
261        let mut g = self
262            .state
263            .lock()
264            .map_err(|_| DdsError::PreconditionNotMet {
265                reason: "in-memory durability backend poisoned",
266            })?;
267        if let Some(slot) = g.by_key.get_mut(&(topic.to_string(), instance_key)) {
268            slot.unregistered_at = Some(now);
269        }
270        Ok(())
271    }
272
273    fn cleanup(&self, now: SystemTime) -> Result<usize> {
274        let delay = cleanup_delay(&self.qos);
275        let mut g = self
276            .state
277            .lock()
278            .map_err(|_| DdsError::PreconditionNotMet {
279                reason: "in-memory durability backend poisoned",
280            })?;
281        let to_remove: Vec<Key> = g
282            .by_key
283            .iter()
284            .filter_map(|(k, slot)| {
285                slot.unregistered_at.and_then(|ts| {
286                    let due = ts.checked_add(delay)?;
287                    if now >= due { Some(k.clone()) } else { None }
288                })
289            })
290            .collect();
291        let removed = to_remove.len();
292        for k in to_remove {
293            if let Some(slot) = g.by_key.remove(&k) {
294                g.total_samples = g.total_samples.saturating_sub(slot.samples.len());
295            }
296        }
297        Ok(removed)
298    }
299}
300
301// --------------------- On-Disk-Backend (PERSISTENT) ---------------------
302
303/// On-Disk-Durability-Backend (DurabilityKind::Persistent).
304///
305/// Layout: `<root>/<topic>/<hex(instance_key)>/<sequence>.bin` enthaelt
306/// den Encoded-Payload; `<root>/<topic>/<hex(instance_key)>/.unregistered`
307/// markiert den Instance-Cleanup-Zeitpunkt (Unix-Nanos als ASCII).
308pub struct OnDiskDurabilityBackend {
309    qos: DurabilityServiceQosPolicy,
310    root: PathBuf,
311}
312
313impl OnDiskDurabilityBackend {
314    /// Konstruktor — erzeugt den Root-Pfad falls nicht vorhanden.
315    ///
316    /// # Errors
317    /// Filesystem-Fehler beim Anlegen des Roots.
318    pub fn new<P: Into<PathBuf>>(root: P, qos: DurabilityServiceQosPolicy) -> Result<Self> {
319        let root = root.into();
320        std::fs::create_dir_all(&root).map_err(|e| DdsError::PreconditionNotMet {
321            reason: io_static_msg(&e, "durability backend: cannot create root"),
322        })?;
323        Ok(Self { qos, root })
324    }
325
326    fn instance_dir(&self, topic: &str, key: &[u8; 16]) -> PathBuf {
327        let mut p = self.root.join(sanitize_topic(topic));
328        p.push(hex16(key));
329        p
330    }
331
332    fn unregister_marker(&self, topic: &str, key: &[u8; 16]) -> PathBuf {
333        self.instance_dir(topic, key).join(".unregistered")
334    }
335
336    fn count_total_samples(&self) -> Result<usize> {
337        let mut total = 0usize;
338        let topics = match std::fs::read_dir(&self.root) {
339            Ok(d) => d,
340            Err(_) => return Ok(0),
341        };
342        for topic_dir in topics.flatten() {
343            if !topic_dir.file_type().map(|t| t.is_dir()).unwrap_or(false) {
344                continue;
345            }
346            let instances = match std::fs::read_dir(topic_dir.path()) {
347                Ok(d) => d,
348                Err(_) => continue,
349            };
350            for inst_dir in instances.flatten() {
351                if !inst_dir.file_type().map(|t| t.is_dir()).unwrap_or(false) {
352                    continue;
353                }
354                if let Ok(samples) = std::fs::read_dir(inst_dir.path()) {
355                    for s in samples.flatten() {
356                        if s.file_name() != ".unregistered" {
357                            total += 1;
358                        }
359                    }
360                }
361            }
362        }
363        Ok(total)
364    }
365
366    fn count_instances_for_topic(&self, topic: &str) -> usize {
367        let topic_dir = self.root.join(sanitize_topic(topic));
368        match std::fs::read_dir(&topic_dir) {
369            Ok(d) => d
370                .flatten()
371                .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
372                .count(),
373            Err(_) => 0,
374        }
375    }
376}
377
378fn io_static_msg(_e: &std::io::Error, msg: &'static str) -> &'static str {
379    msg
380}
381
382fn sanitize_topic(topic: &str) -> String {
383    // Schraenke auf [A-Za-z0-9_.-] ein, Rest mit '_' ersetzen.
384    topic
385        .chars()
386        .map(|c| {
387            if c.is_ascii_alphanumeric() || c == '_' || c == '.' || c == '-' {
388                c
389            } else {
390                '_'
391            }
392        })
393        .collect()
394}
395
396fn hex16(b: &[u8; 16]) -> String {
397    let mut s = String::with_capacity(32);
398    for x in b {
399        let hi = (x >> 4) & 0xF;
400        let lo = x & 0xF;
401        s.push(core::char::from_digit(u32::from(hi), 16).unwrap_or('0'));
402        s.push(core::char::from_digit(u32::from(lo), 16).unwrap_or('0'));
403    }
404    s
405}
406
407impl DurabilityBackend for OnDiskDurabilityBackend {
408    fn store(&self, sample: DurabilitySample) -> Result<()> {
409        // Caps vorpruefen.
410        if self.qos.max_samples != LENGTH_UNLIMITED {
411            let total = self.count_total_samples()?;
412            if total >= self.qos.max_samples as usize {
413                return Err(DdsError::OutOfResources {
414                    what: "durability backend: max_samples reached",
415                });
416            }
417        }
418        let inst_dir = self.instance_dir(&sample.topic, &sample.instance_key);
419        let new_instance = !inst_dir.exists();
420        if new_instance && self.qos.max_instances != LENGTH_UNLIMITED {
421            let count = self.count_instances_for_topic(&sample.topic);
422            if count >= self.qos.max_instances as usize {
423                return Err(DdsError::OutOfResources {
424                    what: "durability backend: max_instances reached",
425                });
426            }
427        }
428        std::fs::create_dir_all(&inst_dir).map_err(|e| DdsError::PreconditionNotMet {
429            reason: io_static_msg(&e, "durability backend: mkdir failed"),
430        })?;
431        // History-Kind-Cap.
432        match self.qos.history_kind {
433            HistoryKind::KeepLast => {
434                let depth = if self.qos.history_depth <= 0 {
435                    1
436                } else {
437                    self.qos.history_depth as usize
438                };
439                let mut existing: Vec<(u64, std::path::PathBuf)> = std::fs::read_dir(&inst_dir)
440                    .map_err(|e| DdsError::PreconditionNotMet {
441                        reason: io_static_msg(&e, "durability backend: readdir failed"),
442                    })?
443                    .flatten()
444                    .filter_map(|e| {
445                        let name = e.file_name().to_string_lossy().to_string();
446                        if name == ".unregistered" {
447                            return None;
448                        }
449                        let stem = name.strip_suffix(".bin")?;
450                        let seq = stem.parse::<u64>().ok()?;
451                        Some((seq, e.path()))
452                    })
453                    .collect();
454                existing.sort_by_key(|(seq, _)| *seq);
455                while existing.len() >= depth {
456                    let (_, p) = existing.remove(0);
457                    let _ = std::fs::remove_file(&p);
458                }
459            }
460            HistoryKind::KeepAll => {
461                if self.qos.max_samples_per_instance != LENGTH_UNLIMITED {
462                    let count = std::fs::read_dir(&inst_dir)
463                        .map_err(|e| DdsError::PreconditionNotMet {
464                            reason: io_static_msg(&e, "durability backend: readdir failed"),
465                        })?
466                        .flatten()
467                        .filter(|e| e.file_name() != ".unregistered")
468                        .count();
469                    if count >= self.qos.max_samples_per_instance as usize {
470                        return Err(DdsError::OutOfResources {
471                            what: "durability backend: max_samples_per_instance reached",
472                        });
473                    }
474                }
475            }
476        }
477        let path = inst_dir.join(alloc::format!("{}.bin", sample.sequence));
478        std::fs::write(&path, &sample.payload).map_err(|e| DdsError::PreconditionNotMet {
479            reason: io_static_msg(&e, "durability backend: write failed"),
480        })?;
481        Ok(())
482    }
483
484    fn replay_for_topic(&self, topic: &str) -> Result<Vec<DurabilitySample>> {
485        let topic_dir = self.root.join(sanitize_topic(topic));
486        let mut out = Vec::new();
487        let dirs = match std::fs::read_dir(&topic_dir) {
488            Ok(d) => d,
489            Err(_) => return Ok(Vec::new()),
490        };
491        for inst_entry in dirs.flatten() {
492            let inst_path = inst_entry.path();
493            let key = match parse_hex16(&inst_entry.file_name().to_string_lossy()) {
494                Some(k) => k,
495                None => continue,
496            };
497            let samples = match std::fs::read_dir(&inst_path) {
498                Ok(s) => s,
499                Err(_) => continue,
500            };
501            for entry in samples.flatten() {
502                let name = entry.file_name().to_string_lossy().to_string();
503                if name == ".unregistered" {
504                    continue;
505                }
506                let Some(stem) = name.strip_suffix(".bin") else {
507                    continue;
508                };
509                let Ok(seq) = stem.parse::<u64>() else {
510                    continue;
511                };
512                let payload =
513                    std::fs::read(entry.path()).map_err(|e| DdsError::PreconditionNotMet {
514                        reason: io_static_msg(&e, "durability backend: read failed"),
515                    })?;
516                let created_at = entry
517                    .metadata()
518                    .and_then(|m| m.modified())
519                    .unwrap_or(SystemTime::UNIX_EPOCH);
520                out.push(DurabilitySample {
521                    topic: topic.to_string(),
522                    instance_key: key,
523                    sequence: seq,
524                    payload,
525                    created_at,
526                });
527            }
528        }
529        out.sort_by_key(|s| (s.instance_key, s.sequence));
530        Ok(out)
531    }
532
533    fn unregister_instance(
534        &self,
535        topic: &str,
536        instance_key: [u8; 16],
537        now: SystemTime,
538    ) -> Result<()> {
539        let dir = self.instance_dir(topic, &instance_key);
540        if !dir.exists() {
541            return Ok(());
542        }
543        let nanos = now
544            .duration_since(SystemTime::UNIX_EPOCH)
545            .unwrap_or_default()
546            .as_nanos();
547        let marker = self.unregister_marker(topic, &instance_key);
548        std::fs::write(&marker, nanos.to_string().as_bytes()).map_err(|e| {
549            DdsError::PreconditionNotMet {
550                reason: io_static_msg(&e, "durability backend: marker write failed"),
551            }
552        })?;
553        Ok(())
554    }
555
556    fn cleanup(&self, now: SystemTime) -> Result<usize> {
557        let delay = cleanup_delay(&self.qos);
558        let mut removed = 0usize;
559        let topics = match std::fs::read_dir(&self.root) {
560            Ok(d) => d,
561            Err(_) => return Ok(0),
562        };
563        for topic_dir in topics.flatten() {
564            if !topic_dir.file_type().map(|t| t.is_dir()).unwrap_or(false) {
565                continue;
566            }
567            let instances = match std::fs::read_dir(topic_dir.path()) {
568                Ok(d) => d,
569                Err(_) => continue,
570            };
571            for inst_dir in instances.flatten() {
572                if !inst_dir.file_type().map(|t| t.is_dir()).unwrap_or(false) {
573                    continue;
574                }
575                let marker = inst_dir.path().join(".unregistered");
576                let Ok(content) = std::fs::read_to_string(&marker) else {
577                    continue;
578                };
579                let Ok(nanos) = content.trim().parse::<u128>() else {
580                    continue;
581                };
582                let unreg = SystemTime::UNIX_EPOCH + CoreDuration::from_nanos(nanos as u64);
583                let due = unreg.checked_add(delay).unwrap_or(SystemTime::UNIX_EPOCH);
584                if now >= due && std::fs::remove_dir_all(inst_dir.path()).is_ok() {
585                    removed += 1;
586                }
587            }
588        }
589        Ok(removed)
590    }
591}
592
593fn parse_hex16(s: &str) -> Option<[u8; 16]> {
594    if s.len() != 32 {
595        return None;
596    }
597    let mut out = [0u8; 16];
598    for (i, chunk) in s.as_bytes().chunks(2).enumerate() {
599        let hi = (chunk[0] as char).to_digit(16)?;
600        let lo = (chunk[1] as char).to_digit(16)?;
601        out[i] = ((hi << 4) | lo) as u8;
602    }
603    Some(out)
604}
605
606/// Factory: erzeugt das passende Backend fuer die uebergebene
607/// Durability-Stufe. `root` wird nur fuer Persistent benoetigt.
608///
609/// # Errors
610/// Filesystem-Fehler bei On-Disk-Backend-Initialisierung;
611/// `BadParameter` wenn `kind == Volatile/TransientLocal` (kein
612/// Durability-Service noetig) oder `Persistent` ohne `root`.
613pub fn make_backend(
614    kind: DurabilityKind,
615    qos: DurabilityServiceQosPolicy,
616    root: Option<PathBuf>,
617) -> Result<alloc::boxed::Box<dyn DurabilityBackend>> {
618    match kind {
619        DurabilityKind::Volatile | DurabilityKind::TransientLocal => Err(DdsError::BadParameter {
620            what: "durability backend: kind does not need a service",
621        }),
622        DurabilityKind::Transient => {
623            Ok(alloc::boxed::Box::new(InMemoryDurabilityBackend::new(qos)))
624        }
625        DurabilityKind::Persistent => {
626            let root = root.ok_or(DdsError::BadParameter {
627                what: "durability backend: Persistent kind requires root path",
628            })?;
629            Ok(alloc::boxed::Box::new(OnDiskDurabilityBackend::new(
630                root, qos,
631            )?))
632        }
633    }
634}
635
636#[cfg(test)]
637#[allow(clippy::expect_used, clippy::unwrap_used)]
638mod tests {
639    use super::*;
640    use std::time::Duration as StdDuration;
641
642    fn sample(topic: &str, key_byte: u8, seq: u64, payload: &[u8]) -> DurabilitySample {
643        DurabilitySample {
644            topic: topic.to_string(),
645            instance_key: [key_byte; 16],
646            sequence: seq,
647            payload: payload.to_vec(),
648            created_at: SystemTime::now(),
649        }
650    }
651
652    fn keep_all_qos() -> DurabilityServiceQosPolicy {
653        DurabilityServiceQosPolicy {
654            history_kind: HistoryKind::KeepAll,
655            history_depth: -1,
656            ..DurabilityServiceQosPolicy::default()
657        }
658    }
659
660    #[test]
661    fn in_memory_store_and_replay_returns_sorted_samples() {
662        let b = InMemoryDurabilityBackend::new(keep_all_qos());
663        b.store(sample("T", 1, 2, b"b")).unwrap();
664        b.store(sample("T", 1, 1, b"a")).unwrap();
665        b.store(sample("T", 2, 1, b"c")).unwrap();
666        let out = b.replay_for_topic("T").unwrap();
667        assert_eq!(out.len(), 3);
668        // sort by (instance, seq)
669        assert_eq!(out[0].sequence, 1);
670        assert_eq!(out[0].instance_key[0], 1);
671        assert_eq!(out[1].sequence, 2);
672        assert_eq!(out[1].instance_key[0], 1);
673        assert_eq!(out[2].instance_key[0], 2);
674    }
675
676    #[test]
677    fn in_memory_keeplast_caps_history_at_depth() {
678        let qos = DurabilityServiceQosPolicy {
679            history_kind: HistoryKind::KeepLast,
680            history_depth: 2,
681            ..DurabilityServiceQosPolicy::default()
682        };
683        let b = InMemoryDurabilityBackend::new(qos);
684        for i in 1u64..=5 {
685            b.store(sample("T", 1, i, &i.to_le_bytes())).unwrap();
686        }
687        let out = b.replay_for_topic("T").unwrap();
688        // Depth=2 → nur letzte 2 sequences
689        assert_eq!(out.len(), 2);
690        assert_eq!(out[0].sequence, 4);
691        assert_eq!(out[1].sequence, 5);
692    }
693
694    #[test]
695    fn in_memory_keepall_max_samples_per_instance_returns_oor() {
696        let qos = DurabilityServiceQosPolicy {
697            history_kind: HistoryKind::KeepAll,
698            history_depth: -1,
699            max_samples_per_instance: 2,
700            ..DurabilityServiceQosPolicy::default()
701        };
702        let b = InMemoryDurabilityBackend::new(qos);
703        b.store(sample("T", 1, 1, b"a")).unwrap();
704        b.store(sample("T", 1, 2, b"b")).unwrap();
705        let r = b.store(sample("T", 1, 3, b"c"));
706        assert!(matches!(r, Err(DdsError::OutOfResources { .. })));
707    }
708
709    #[test]
710    fn in_memory_max_samples_globally_returns_oor() {
711        let qos = DurabilityServiceQosPolicy {
712            history_kind: HistoryKind::KeepAll,
713            history_depth: -1,
714            max_samples: 2,
715            ..DurabilityServiceQosPolicy::default()
716        };
717        let b = InMemoryDurabilityBackend::new(qos);
718        b.store(sample("T", 1, 1, b"a")).unwrap();
719        b.store(sample("T", 2, 1, b"b")).unwrap();
720        let r = b.store(sample("T", 3, 1, b"c"));
721        assert!(matches!(r, Err(DdsError::OutOfResources { .. })));
722    }
723
724    #[test]
725    fn in_memory_max_instances_returns_oor() {
726        let qos = DurabilityServiceQosPolicy {
727            history_kind: HistoryKind::KeepAll,
728            history_depth: -1,
729            max_instances: 1,
730            ..DurabilityServiceQosPolicy::default()
731        };
732        let b = InMemoryDurabilityBackend::new(qos);
733        b.store(sample("T", 1, 1, b"a")).unwrap();
734        let r = b.store(sample("T", 2, 1, b"b"));
735        assert!(matches!(r, Err(DdsError::OutOfResources { .. })));
736    }
737
738    #[test]
739    fn in_memory_unregister_then_cleanup_removes_after_delay() {
740        let qos = DurabilityServiceQosPolicy {
741            service_cleanup_delay: zerodds_qos::Duration::from_millis(100),
742            history_kind: HistoryKind::KeepAll,
743            history_depth: -1,
744            ..DurabilityServiceQosPolicy::default()
745        };
746        let b = InMemoryDurabilityBackend::new(qos);
747        let t0 = SystemTime::now();
748        b.store(sample("T", 1, 1, b"a")).unwrap();
749        b.unregister_instance("T", [1u8; 16], t0).unwrap();
750        // Vor Delay: cleanup tut nichts.
751        assert_eq!(b.cleanup(t0 + StdDuration::from_millis(50)).unwrap(), 0);
752        assert_eq!(b.replay_for_topic("T").unwrap().len(), 1);
753        // Nach Delay: cleanup entfernt.
754        assert_eq!(b.cleanup(t0 + StdDuration::from_millis(150)).unwrap(), 1);
755        assert!(b.replay_for_topic("T").unwrap().is_empty());
756    }
757
758    #[test]
759    fn in_memory_replay_filters_by_topic() {
760        let b = InMemoryDurabilityBackend::new(keep_all_qos());
761        b.store(sample("A", 1, 1, b"a1")).unwrap();
762        b.store(sample("B", 1, 1, b"b1")).unwrap();
763        let a = b.replay_for_topic("A").unwrap();
764        let bb = b.replay_for_topic("B").unwrap();
765        assert_eq!(a.len(), 1);
766        assert_eq!(bb.len(), 1);
767        assert_eq!(a[0].topic, "A");
768        assert_eq!(bb[0].topic, "B");
769    }
770
771    #[test]
772    fn in_memory_unknown_topic_returns_empty() {
773        let b = InMemoryDurabilityBackend::new(keep_all_qos());
774        assert!(b.replay_for_topic("nope").unwrap().is_empty());
775    }
776
777    #[test]
778    fn make_backend_rejects_volatile_and_transient_local() {
779        let r1 = make_backend(
780            DurabilityKind::Volatile,
781            DurabilityServiceQosPolicy::default(),
782            None,
783        );
784        let r2 = make_backend(
785            DurabilityKind::TransientLocal,
786            DurabilityServiceQosPolicy::default(),
787            None,
788        );
789        assert!(matches!(r1, Err(DdsError::BadParameter { .. })));
790        assert!(matches!(r2, Err(DdsError::BadParameter { .. })));
791    }
792
793    #[test]
794    fn make_backend_persistent_requires_root() {
795        let r = make_backend(
796            DurabilityKind::Persistent,
797            DurabilityServiceQosPolicy::default(),
798            None,
799        );
800        assert!(matches!(r, Err(DdsError::BadParameter { .. })));
801    }
802
803    #[test]
804    fn make_backend_transient_returns_in_memory() {
805        let b = make_backend(
806            DurabilityKind::Transient,
807            DurabilityServiceQosPolicy::default(),
808            None,
809        )
810        .unwrap();
811        b.store(sample("T", 1, 1, b"a")).unwrap();
812        assert_eq!(b.replay_for_topic("T").unwrap().len(), 1);
813    }
814
815    fn tmp_dir(prefix: &str) -> PathBuf {
816        let mut p = std::env::temp_dir();
817        let nanos = SystemTime::now()
818            .duration_since(SystemTime::UNIX_EPOCH)
819            .unwrap()
820            .as_nanos();
821        p.push(alloc::format!("zerodds-dur-{prefix}-{nanos}"));
822        p
823    }
824
825    #[test]
826    fn on_disk_store_and_replay_roundtrip() {
827        let root = tmp_dir("rt");
828        let b = OnDiskDurabilityBackend::new(&root, keep_all_qos()).unwrap();
829        b.store(sample("PersTopic", 7, 1, b"hello")).unwrap();
830        b.store(sample("PersTopic", 7, 2, b"world")).unwrap();
831        let out = b.replay_for_topic("PersTopic").unwrap();
832        assert_eq!(out.len(), 2);
833        assert_eq!(out[0].sequence, 1);
834        assert_eq!(out[0].payload, b"hello");
835        assert_eq!(out[1].sequence, 2);
836        assert_eq!(out[1].payload, b"world");
837        let _ = std::fs::remove_dir_all(&root);
838    }
839
840    #[test]
841    fn on_disk_keeplast_replaces_old_files() {
842        let root = tmp_dir("kl");
843        let qos = DurabilityServiceQosPolicy {
844            history_kind: HistoryKind::KeepLast,
845            history_depth: 2,
846            ..DurabilityServiceQosPolicy::default()
847        };
848        let b = OnDiskDurabilityBackend::new(&root, qos).unwrap();
849        for i in 1u64..=5 {
850            b.store(sample("T", 1, i, &i.to_le_bytes())).unwrap();
851        }
852        let out = b.replay_for_topic("T").unwrap();
853        assert_eq!(out.len(), 2);
854        assert_eq!(out[0].sequence, 4);
855        assert_eq!(out[1].sequence, 5);
856        let _ = std::fs::remove_dir_all(&root);
857    }
858
859    #[test]
860    fn on_disk_persistent_survives_backend_drop() {
861        let root = tmp_dir("survive");
862        {
863            let b = OnDiskDurabilityBackend::new(&root, keep_all_qos()).unwrap();
864            b.store(sample("Pers", 9, 42, b"alive")).unwrap();
865        } // drop
866        // Neuer Backend mit gleicher Root-Pfad sieht das Sample.
867        let b2 = OnDiskDurabilityBackend::new(&root, keep_all_qos()).unwrap();
868        let out = b2.replay_for_topic("Pers").unwrap();
869        assert_eq!(out.len(), 1);
870        assert_eq!(out[0].payload, b"alive");
871        let _ = std::fs::remove_dir_all(&root);
872    }
873
874    #[test]
875    fn on_disk_unregister_and_cleanup_removes_directory() {
876        let root = tmp_dir("cleanup");
877        let qos = DurabilityServiceQosPolicy {
878            service_cleanup_delay: zerodds_qos::Duration::from_millis(50),
879            history_kind: HistoryKind::KeepAll,
880            history_depth: -1,
881            ..DurabilityServiceQosPolicy::default()
882        };
883        let b = OnDiskDurabilityBackend::new(&root, qos).unwrap();
884        let t0 = SystemTime::now();
885        b.store(sample("CT", 5, 1, b"v")).unwrap();
886        b.unregister_instance("CT", [5u8; 16], t0).unwrap();
887        assert_eq!(b.cleanup(t0 + StdDuration::from_millis(10)).unwrap(), 0);
888        assert_eq!(b.cleanup(t0 + StdDuration::from_millis(100)).unwrap(), 1);
889        assert!(b.replay_for_topic("CT").unwrap().is_empty());
890        let _ = std::fs::remove_dir_all(&root);
891    }
892
893    #[test]
894    fn on_disk_max_samples_per_instance_returns_oor() {
895        let root = tmp_dir("oor");
896        let qos = DurabilityServiceQosPolicy {
897            history_kind: HistoryKind::KeepAll,
898            history_depth: -1,
899            max_samples_per_instance: 2,
900            ..DurabilityServiceQosPolicy::default()
901        };
902        let b = OnDiskDurabilityBackend::new(&root, qos).unwrap();
903        b.store(sample("T", 1, 1, b"a")).unwrap();
904        b.store(sample("T", 1, 2, b"b")).unwrap();
905        let r = b.store(sample("T", 1, 3, b"c"));
906        assert!(matches!(r, Err(DdsError::OutOfResources { .. })));
907        let _ = std::fs::remove_dir_all(&root);
908    }
909
910    #[test]
911    fn hex16_roundtrip() {
912        let key = [0xAB, 0xCD, 0xEF, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
913        let h = hex16(&key);
914        assert_eq!(h.len(), 32);
915        assert_eq!(parse_hex16(&h).unwrap(), key);
916    }
917
918    #[test]
919    fn parse_hex16_rejects_wrong_length_and_invalid_chars() {
920        assert!(parse_hex16("abc").is_none());
921        assert!(parse_hex16(&"x".repeat(32)).is_none());
922    }
923
924    #[test]
925    fn sanitize_topic_replaces_path_chars() {
926        assert_eq!(sanitize_topic("Topic/With:Path"), "Topic_With_Path");
927        assert_eq!(sanitize_topic("ok-name.v1"), "ok-name.v1");
928    }
929}