1pub mod config;
40pub mod dds_interface;
41pub mod hdds_interface;
42pub mod publisher;
43pub mod sqlite;
44pub mod store;
45pub mod subscriber;
46
47pub use config::Config;
48pub use dds_interface::{
49 DataReader, DataWriter, DdsInterface, DiscoveredReader, DiscoveredWriter, DurabilityKind,
50 MockDdsInterface, ReceivedSample,
51};
52pub use hdds_interface::HddsDdsInterface;
53pub use publisher::{LateJoinerPublisher, PublisherStats, StandalonePublisher};
54pub use sqlite::SqliteStore;
55pub use store::{PersistenceStore, Sample};
56pub use subscriber::{DurabilitySubscriber, StandaloneSubscriber, SubscriberStats};
57
58use anyhow::Result;
59use std::sync::Arc;
60use tokio::sync::RwLock;
61
62pub struct PersistenceService<S: PersistenceStore, D: DdsInterface> {
72 config: Config,
73 store: Arc<RwLock<S>>,
74 dds: Arc<D>,
75}
76
77impl<S: PersistenceStore + Send + Sync + 'static, D: DdsInterface + 'static>
78 PersistenceService<S, D>
79{
80 pub fn new(config: Config, store: S, dds: D) -> Self {
82 Self {
83 config,
84 store: Arc::new(RwLock::new(store)),
85 dds: Arc::new(dds),
86 }
87 }
88
89 pub async fn run(self) -> Result<()> {
93 tracing::info!("Starting HDDS Persistence Service");
94 tracing::info!(" Topics: {}", self.config.topic_filter);
95 tracing::info!(" Retention: {} samples", self.config.retention_count);
96
97 let subscriber = DurabilitySubscriber::new(
98 self.config.clone(),
99 Arc::clone(&self.store),
100 Arc::clone(&self.dds),
101 );
102
103 let publisher = LateJoinerPublisher::new(
104 self.config.clone(),
105 Arc::clone(&self.store),
106 Arc::clone(&self.dds),
107 );
108
109 tokio::try_join!(subscriber.run(), publisher.run(),)?;
111
112 Ok(())
113 }
114}
115
116pub struct StandalonePersistenceService<S: PersistenceStore> {
120 config: Config,
121 store: Arc<RwLock<S>>,
122}
123
124impl<S: PersistenceStore + Send + Sync + 'static> StandalonePersistenceService<S> {
125 pub fn new(config: Config, store: S) -> Self {
127 Self {
128 config,
129 store: Arc::new(RwLock::new(store)),
130 }
131 }
132
133 pub fn store(&self) -> Arc<RwLock<S>> {
135 Arc::clone(&self.store)
136 }
137
138 pub fn create_subscriber(
140 &self,
141 ) -> (StandaloneSubscriber<S>, tokio::sync::mpsc::Sender<Sample>) {
142 StandaloneSubscriber::new(self.config.clone(), Arc::clone(&self.store))
143 }
144
145 pub fn create_publisher(&self) -> StandalonePublisher<S> {
147 StandalonePublisher::new(self.config.clone(), Arc::clone(&self.store))
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154
155 #[test]
156 fn test_persistence_service_creation() {
157 let config = Config::builder()
158 .topic_filter("test/*")
159 .retention_count(100)
160 .build();
161
162 let store = SqliteStore::new_in_memory().unwrap();
163 let dds = MockDdsInterface::new();
164 let _service = PersistenceService::new(config, store, dds);
165 }
166
167 #[test]
168 fn test_standalone_service() {
169 let config = Config::builder().topic_filter("State/*").build();
170
171 let store = SqliteStore::new_in_memory().unwrap();
172 let service = StandalonePersistenceService::new(config, store);
173
174 let (_subscriber, _tx) = service.create_subscriber();
175 let _publisher = service.create_publisher();
176 }
177}