1use crate::config::Config;
15use crate::dds_interface::{
16 DataWriter, DdsInterface, DiscoveredReader, DiscoveredWriter, DiscoveryCallback, DurabilityKind,
17};
18use crate::store::{PersistenceStore, Sample};
19use anyhow::Result;
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22use tokio::sync::mpsc;
23use tokio::sync::RwLock;
24
25pub struct LateJoinerPublisher<S: PersistenceStore, D: DdsInterface> {
29 config: Config,
30 store: Arc<RwLock<S>>,
31 dds: Arc<D>,
32 replayed_readers: HashSet<[u8; 16]>,
34 writers: HashMap<String, WriterState>,
36 stats: PublisherStats,
38}
39
40struct WriterState {
41 durability: DurabilityKind,
42 writer: Box<dyn DataWriter>,
43}
44
45type ReplayCallback = Box<dyn Fn(&Sample) + Send + Sync>;
46
47#[derive(Debug, Default, Clone)]
49pub struct PublisherStats {
50 pub readers_discovered: u64,
52 pub readers_replayed: u64,
54 pub samples_replayed: u64,
56 pub replay_errors: u64,
58}
59
60#[allow(dead_code)]
61enum DiscoveryEvent {
62 Reader(DiscoveredReader),
63 Writer(DiscoveredWriter),
64}
65
66struct DiscoveryBridge {
67 tx: mpsc::Sender<DiscoveryEvent>,
68}
69
70impl DiscoveryCallback for DiscoveryBridge {
71 fn on_reader_discovered(&self, reader: DiscoveredReader) {
72 if self.tx.try_send(DiscoveryEvent::Reader(reader)).is_err() {
73 tracing::debug!("Dropping reader discovery event (channel full)");
74 }
75 }
76
77 fn on_reader_removed(&self, _guid: [u8; 16]) {}
78
79 fn on_writer_discovered(&self, writer: DiscoveredWriter) {
80 if self.tx.try_send(DiscoveryEvent::Writer(writer)).is_err() {
81 tracing::debug!("Dropping writer discovery event (channel full)");
82 }
83 }
84
85 fn on_writer_removed(&self, _guid: [u8; 16]) {}
86}
87
88impl<S: PersistenceStore + Send + Sync, D: DdsInterface> LateJoinerPublisher<S, D> {
89 pub fn new(config: Config, store: Arc<RwLock<S>>, dds: Arc<D>) -> Self {
91 Self {
92 config,
93 store,
94 dds,
95 replayed_readers: HashSet::new(),
96 writers: HashMap::new(),
97 stats: PublisherStats::default(),
98 }
99 }
100
101 pub fn stats(&self) -> &PublisherStats {
103 &self.stats
104 }
105
106 pub async fn run(mut self) -> Result<()> {
108 tracing::info!(
109 "LateJoinerPublisher started for topics: {}",
110 self.config.topic_filter
111 );
112
113 let (event_tx, mut event_rx) = mpsc::channel(128);
114 let bridge = Arc::new(DiscoveryBridge { tx: event_tx });
115 self.dds.register_discovery_callback(bridge)?;
116
117 self.discover_and_replay().await?;
119
120 loop {
121 tokio::select! {
122 Some(event) = event_rx.recv() => {
123 if let DiscoveryEvent::Reader(reader) = event {
124 if let Err(e) = self.handle_reader_discovered(reader).await {
125 tracing::error!("Discovery/replay error: {}", e);
126 }
127 }
128 }
129 }
130 }
131 }
132
133 async fn discover_and_replay(&mut self) -> Result<()> {
135 let readers = self.dds.discovered_readers(&self.config.topic_filter)?;
136
137 for reader in readers {
138 self.handle_reader_discovered(reader).await?;
139 }
140
141 Ok(())
142 }
143
144 async fn handle_reader_discovered(&mut self, reader: DiscoveredReader) -> Result<()> {
145 if self.replayed_readers.contains(&reader.guid) {
147 return Ok(());
148 }
149
150 self.stats.readers_discovered += 1;
151
152 if !self.should_replay(&reader)? {
153 self.replayed_readers.insert(reader.guid);
154 return Ok(());
155 }
156
157 tracing::info!("New durable reader discovered for topic: {}", reader.topic);
158
159 match self.replay_to_reader(&reader).await {
161 Ok(count) => {
162 self.stats.readers_replayed += 1;
163 self.stats.samples_replayed += count as u64;
164 tracing::info!(
165 "Replayed {} samples to reader {} for topic {}",
166 count,
167 hex(&reader.guid[0..4]),
168 reader.topic
169 );
170 }
171 Err(e) => {
172 self.stats.replay_errors += 1;
173 tracing::error!(
174 "Failed to replay to reader {}: {}",
175 hex(&reader.guid[0..4]),
176 e
177 );
178 }
179 }
180
181 self.replayed_readers.insert(reader.guid);
182 Ok(())
183 }
184
185 fn should_replay(&self, reader: &DiscoveredReader) -> Result<bool> {
186 match reader.durability {
187 DurabilityKind::Volatile => {
188 tracing::debug!(
189 "Skipping volatile reader {} for topic {}",
190 hex(&reader.guid[0..4]),
191 reader.topic
192 );
193 return Ok(false);
194 }
195 DurabilityKind::TransientLocal => {
196 tracing::debug!(
197 "Skipping transient-local reader {} for topic {} (writer handles replay)",
198 hex(&reader.guid[0..4]),
199 reader.topic
200 );
201 return Ok(false);
202 }
203 DurabilityKind::Persistent => {}
204 }
205
206 let writers = self.dds.discovered_writers(&reader.topic)?;
207 let has_viable_writer = writers
208 .iter()
209 .any(|writer| writer.durability.rank() >= reader.durability.rank());
210
211 if has_viable_writer {
212 tracing::debug!(
213 "Skipping persistence replay for topic {} (durable writer present)",
214 reader.topic
215 );
216 return Ok(false);
217 }
218
219 Ok(true)
220 }
221
222 async fn replay_to_reader(&mut self, reader: &DiscoveredReader) -> Result<usize> {
224 let samples = {
226 let store = self.store.read().await;
227 store.query_range(&reader.topic, 0, u64::MAX)?
228 };
229
230 if samples.is_empty() {
231 tracing::debug!("No historical samples for topic {}", reader.topic);
232 return Ok(0);
233 }
234
235 let writer =
237 self.get_or_create_writer(&reader.topic, &reader.type_name, reader.durability)?;
238
239 let mut replayed = 0;
241 for sample in &samples {
242 match writer.write_with_timestamp(&sample.payload, sample.timestamp_ns) {
243 Ok(()) => {
244 replayed += 1;
245 tracing::trace!(
246 "Replayed sample seq={} to topic {}",
247 sample.sequence,
248 sample.topic
249 );
250 }
251 Err(e) => {
252 tracing::warn!("Failed to replay sample seq={}: {}", sample.sequence, e);
253 }
254 }
255 }
256
257 Ok(replayed)
258 }
259
260 fn get_or_create_writer(
262 &mut self,
263 topic: &str,
264 type_name: &str,
265 durability: DurabilityKind,
266 ) -> Result<&dyn DataWriter> {
267 let entry = self.writers.entry(topic.to_string());
268 let state = match entry {
269 std::collections::hash_map::Entry::Vacant(slot) => {
270 let writer = self.dds.create_writer(topic, type_name, durability)?;
271 tracing::info!(
272 "Created writer for topic: {} (durability={:?})",
273 topic,
274 durability
275 );
276 slot.insert(WriterState { durability, writer })
277 }
278 std::collections::hash_map::Entry::Occupied(mut slot) => {
279 if slot.get().durability.rank() < durability.rank() {
280 let writer = self.dds.create_writer(topic, type_name, durability)?;
281 tracing::info!(
282 "Upgraded writer for topic: {} (durability={:?})",
283 topic,
284 durability
285 );
286 *slot.get_mut() = WriterState { durability, writer };
287 }
288 slot.into_mut()
289 }
290 };
291
292 Ok(state.writer.as_ref())
293 }
294
295 pub async fn replay_topic(&mut self, topic: &str, type_name: &str) -> Result<usize> {
297 let samples = {
299 let store = self.store.read().await;
300 store.load(topic)?
301 };
302
303 if samples.is_empty() {
304 return Ok(0);
305 }
306
307 let writer = self.get_or_create_writer(topic, type_name, DurabilityKind::TransientLocal)?;
309
310 let mut replayed = 0;
311 for sample in &samples {
312 if writer
313 .write_with_timestamp(&sample.payload, sample.timestamp_ns)
314 .is_ok()
315 {
316 replayed += 1;
317 }
318 }
319
320 self.stats.samples_replayed += replayed as u64;
321 Ok(replayed)
322 }
323}
324
325pub struct StandalonePublisher<S: PersistenceStore> {
331 config: Config,
332 store: Arc<RwLock<S>>,
333 on_replay: Option<ReplayCallback>,
335 stats: PublisherStats,
336}
337
338impl<S: PersistenceStore + Send + Sync> StandalonePublisher<S> {
339 pub fn new(config: Config, store: Arc<RwLock<S>>) -> Self {
341 Self {
342 config,
343 store,
344 on_replay: None,
345 stats: PublisherStats::default(),
346 }
347 }
348
349 pub fn on_replay<F>(mut self, callback: F) -> Self
351 where
352 F: Fn(&Sample) + Send + Sync + 'static,
353 {
354 self.on_replay = Some(Box::new(callback));
355 self
356 }
357
358 pub fn stats(&self) -> &PublisherStats {
360 &self.stats
361 }
362
363 pub async fn replay_all(&mut self) -> Result<usize> {
365 let store = self.store.read().await;
366 let samples = store.query_range(&self.config.topic_filter, 0, u64::MAX)?;
367
368 let mut replayed = 0;
369 for sample in &samples {
370 if let Some(ref callback) = self.on_replay {
371 callback(sample);
372 }
373 replayed += 1;
374 self.stats.samples_replayed += 1;
375 }
376
377 tracing::info!(
378 "Replayed {} samples for pattern {}",
379 replayed,
380 self.config.topic_filter
381 );
382
383 Ok(replayed)
384 }
385
386 pub async fn replay_topic(&mut self, topic: &str) -> Result<usize> {
388 let store = self.store.read().await;
389 let samples = store.load(topic)?;
390
391 let mut replayed = 0;
392 for sample in &samples {
393 if let Some(ref callback) = self.on_replay {
394 callback(sample);
395 }
396 replayed += 1;
397 self.stats.samples_replayed += 1;
398 }
399
400 tracing::info!("Replayed {} samples for topic {}", replayed, topic);
401
402 Ok(replayed)
403 }
404
405 pub async fn replay_range(&mut self, topic: &str, start_ns: u64, end_ns: u64) -> Result<usize> {
407 let store = self.store.read().await;
408 let samples = store.query_range(topic, start_ns, end_ns)?;
409
410 let mut replayed = 0;
411 for sample in &samples {
412 if let Some(ref callback) = self.on_replay {
413 callback(sample);
414 }
415 replayed += 1;
416 self.stats.samples_replayed += 1;
417 }
418
419 tracing::info!(
420 "Replayed {} samples for topic {} in range [{}, {}]",
421 replayed,
422 topic,
423 start_ns,
424 end_ns
425 );
426
427 Ok(replayed)
428 }
429}
430
431fn hex(bytes: &[u8]) -> String {
433 bytes.iter().map(|b| format!("{:02x}", b)).collect()
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
439 use crate::dds_interface::{DiscoveredReader, DurabilityKind, MockDdsInterface};
440 use crate::sqlite::SqliteStore;
441
442 #[test]
443 fn test_late_joiner_publisher_creation() {
444 let config = Config::builder().topic_filter("State/*").build();
445
446 let store = SqliteStore::new_in_memory().unwrap();
447 let store = Arc::new(RwLock::new(store));
448
449 let dds = Arc::new(MockDdsInterface::new());
450
451 let publisher = LateJoinerPublisher::new(config, store, dds);
452 assert_eq!(publisher.stats.readers_discovered, 0);
453 }
454
455 #[tokio::test]
456 async fn test_discover_and_replay() {
457 let config = Config::builder().topic_filter("State/*").build();
458
459 let store = SqliteStore::new_in_memory().unwrap();
460
461 for i in 0..5 {
463 let sample = Sample {
464 topic: "State/Temperature".to_string(),
465 type_name: "Temperature".to_string(),
466 payload: vec![i as u8],
467 timestamp_ns: i * 1000,
468 sequence: i,
469 source_guid: [0xAA; 16],
470 };
471 store.save(&sample).unwrap();
472 }
473
474 let store = Arc::new(RwLock::new(store));
475 let dds = Arc::new(MockDdsInterface::new());
476
477 dds.add_reader(DiscoveredReader {
479 guid: [0x01; 16],
480 topic: "State/Temperature".to_string(),
481 type_name: "Temperature".to_string(),
482 durability: DurabilityKind::Persistent,
483 });
484
485 let mut publisher = LateJoinerPublisher::new(config, store, dds);
486
487 publisher.discover_and_replay().await.unwrap();
489
490 assert_eq!(publisher.stats.readers_discovered, 1);
491 assert_eq!(publisher.stats.readers_replayed, 1);
492 assert_eq!(publisher.stats.samples_replayed, 5);
493 }
494
495 #[tokio::test]
496 async fn test_skip_volatile_readers() {
497 let config = Config::builder().topic_filter("*").build();
498
499 let store = SqliteStore::new_in_memory().unwrap();
500 let store = Arc::new(RwLock::new(store));
501 let dds = Arc::new(MockDdsInterface::new());
502
503 dds.add_reader(DiscoveredReader {
505 guid: [0x02; 16],
506 topic: "test/topic".to_string(),
507 type_name: "Test".to_string(),
508 durability: DurabilityKind::Volatile,
509 });
510
511 let mut publisher = LateJoinerPublisher::new(config, store, dds);
512 publisher.discover_and_replay().await.unwrap();
513
514 assert_eq!(publisher.stats.readers_discovered, 1);
516 assert_eq!(publisher.stats.readers_replayed, 0);
517 }
518
519 #[tokio::test]
520 async fn test_standalone_publisher() {
521 let config = Config::builder().topic_filter("*").build();
522
523 let store = SqliteStore::new_in_memory().unwrap();
524
525 for i in 0..3 {
527 let sample = Sample {
528 topic: "test/topic".to_string(),
529 type_name: "Test".to_string(),
530 payload: vec![i as u8],
531 timestamp_ns: i * 1000,
532 sequence: i,
533 source_guid: [0xBB; 16],
534 };
535 store.save(&sample).unwrap();
536 }
537
538 let store = Arc::new(RwLock::new(store));
539
540 let replayed_samples = Arc::new(std::sync::Mutex::new(Vec::new()));
541 let replayed_clone = Arc::clone(&replayed_samples);
542
543 let mut publisher = StandalonePublisher::new(config, store).on_replay(move |sample| {
544 replayed_clone.lock().unwrap().push(sample.sequence);
545 });
546
547 let count = publisher.replay_all().await.unwrap();
548 assert_eq!(count, 3);
549
550 let replayed = replayed_samples.lock().unwrap();
551 assert_eq!(replayed.len(), 3);
552 assert!(replayed.contains(&0));
553 assert!(replayed.contains(&1));
554 assert!(replayed.contains(&2));
555 }
556
557 #[tokio::test]
558 async fn test_replay_range() {
559 let config = Config::builder().topic_filter("*").build();
560
561 let store = SqliteStore::new_in_memory().unwrap();
562
563 for i in 0..10 {
565 let sample = Sample {
566 topic: "test/topic".to_string(),
567 type_name: "Test".to_string(),
568 payload: vec![i as u8],
569 timestamp_ns: i * 1000,
570 sequence: i,
571 source_guid: [0xCC; 16],
572 };
573 store.save(&sample).unwrap();
574 }
575
576 let store = Arc::new(RwLock::new(store));
577
578 let mut publisher = StandalonePublisher::new(config, store);
579
580 let count = publisher
582 .replay_range("test/topic", 2000, 5000)
583 .await
584 .unwrap();
585 assert_eq!(count, 4); }
587
588 #[test]
589 fn test_hex() {
590 assert_eq!(hex(&[0xde, 0xad, 0xbe, 0xef]), "deadbeef");
591 assert_eq!(hex(&[0x01, 0x02]), "0102");
592 }
593}