Skip to main content

hdds_persistence/
lib.rs

1// SPDX-License-Identifier: Apache-2.0 OR MIT
2// Copyright (c) 2025-2026 naskel.com
3
4//! HDDS Persistence Service
5//!
6//! Provides TRANSIENT and PERSISTENT durability QoS support for DDS topics.
7//!
8//! # Features
9//!
10//! - **SQLite Backend** -- Zero-dependency, production-ready persistent storage
11//! - **RocksDB Backend** -- High-performance embedded database (feature flag)
12//! - **Late-joiner Support** -- Replay historical samples to new readers
13//! - **Retention Policies** -- Time-based, count-based, and size-based limits
14//!
15//! # Architecture
16//!
17//! ```text
18//! PersistenceService
19//! +-- DurabilitySubscriber  (listens to TRANSIENT/PERSISTENT topics)
20//! +-- LateJoinerPublisher   (replays history to new readers)
21//! +-- PersistenceStore      (SQLite or RocksDB backend)
22//! ```
23//!
24//! # Example
25//!
26//! ```ignore
27//! use hdds_persistence::{PersistenceService, Config, SqliteStore};
28//!
29//! let config = Config::builder()
30//!     .topic_filter("State/*")
31//!     .retention_count(1000)
32//!     .build();
33//!
34//! let store = SqliteStore::new("hdds_persist.db")?;
35//! let service = PersistenceService::new(config, store);
36//! service.run().await?;
37//! ```
38
39pub mod config;
40pub mod dds_interface;
41pub mod hdds_interface;
42pub mod publisher;
43pub mod sqlite;
44pub mod store;
45pub mod subscriber;
46
47pub use config::Config;
48pub use dds_interface::{
49    DataReader, DataWriter, DdsInterface, DiscoveredReader, DiscoveredWriter, DurabilityKind,
50    MockDdsInterface, ReceivedSample,
51};
52pub use hdds_interface::HddsDdsInterface;
53pub use publisher::{LateJoinerPublisher, PublisherStats, StandalonePublisher};
54pub use sqlite::SqliteStore;
55pub use store::{PersistenceStore, Sample};
56pub use subscriber::{DurabilitySubscriber, StandaloneSubscriber, SubscriberStats};
57
58use anyhow::Result;
59use std::sync::Arc;
60use tokio::sync::RwLock;
61
62/// Persistence Service
63///
64/// Combines durability subscriber and late-joiner publisher to provide
65/// TRANSIENT/PERSISTENT QoS support.
66///
67/// # Type Parameters
68///
69/// - `S` -- Storage backend (e.g., `SqliteStore`)
70/// - `D` -- DDS interface implementation
71pub struct PersistenceService<S: PersistenceStore, D: DdsInterface> {
72    config: Config,
73    store: Arc<RwLock<S>>,
74    dds: Arc<D>,
75}
76
77impl<S: PersistenceStore + Send + Sync + 'static, D: DdsInterface + 'static>
78    PersistenceService<S, D>
79{
80    /// Create a new persistence service
81    pub fn new(config: Config, store: S, dds: D) -> Self {
82        Self {
83            config,
84            store: Arc::new(RwLock::new(store)),
85            dds: Arc::new(dds),
86        }
87    }
88
89    /// Run the persistence service
90    ///
91    /// Starts durability subscriber and late-joiner publisher in parallel.
92    pub async fn run(self) -> Result<()> {
93        tracing::info!("Starting HDDS Persistence Service");
94        tracing::info!("  Topics: {}", self.config.topic_filter);
95        tracing::info!("  Retention: {} samples", self.config.retention_count);
96
97        let subscriber = DurabilitySubscriber::new(
98            self.config.clone(),
99            Arc::clone(&self.store),
100            Arc::clone(&self.dds),
101        );
102
103        let publisher = LateJoinerPublisher::new(
104            self.config.clone(),
105            Arc::clone(&self.store),
106            Arc::clone(&self.dds),
107        );
108
109        // Run subscriber and publisher concurrently
110        tokio::try_join!(subscriber.run(), publisher.run(),)?;
111
112        Ok(())
113    }
114}
115
116/// Standalone Persistence Service (without real DDS)
117///
118/// Uses channels for sample input/output, useful for testing and CLI tools.
119pub struct StandalonePersistenceService<S: PersistenceStore> {
120    config: Config,
121    store: Arc<RwLock<S>>,
122}
123
124impl<S: PersistenceStore + Send + Sync + 'static> StandalonePersistenceService<S> {
125    /// Create a new standalone persistence service
126    pub fn new(config: Config, store: S) -> Self {
127        Self {
128            config,
129            store: Arc::new(RwLock::new(store)),
130        }
131    }
132
133    /// Get the store for direct access
134    pub fn store(&self) -> Arc<RwLock<S>> {
135        Arc::clone(&self.store)
136    }
137
138    /// Create a subscriber that accepts samples via channel
139    pub fn create_subscriber(
140        &self,
141    ) -> (StandaloneSubscriber<S>, tokio::sync::mpsc::Sender<Sample>) {
142        StandaloneSubscriber::new(self.config.clone(), Arc::clone(&self.store))
143    }
144
145    /// Create a publisher that replays via callback
146    pub fn create_publisher(&self) -> StandalonePublisher<S> {
147        StandalonePublisher::new(self.config.clone(), Arc::clone(&self.store))
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    #[test]
156    fn test_persistence_service_creation() {
157        let config = Config::builder()
158            .topic_filter("test/*")
159            .retention_count(100)
160            .build();
161
162        let store = SqliteStore::new_in_memory().unwrap();
163        let dds = MockDdsInterface::new();
164        let _service = PersistenceService::new(config, store, dds);
165    }
166
167    #[test]
168    fn test_standalone_service() {
169        let config = Config::builder().topic_filter("State/*").build();
170
171        let store = SqliteStore::new_in_memory().unwrap();
172        let service = StandalonePersistenceService::new(config, store);
173
174        let (_subscriber, _tx) = service.create_subscriber();
175        let _publisher = service.create_publisher();
176    }
177}