Skip to main content

hdds_persistence/
publisher.rs

1// SPDX-License-Identifier: Apache-2.0 OR MIT
2// Copyright (c) 2025-2026 naskel.com
3
4//! Late-joiner publisher
5//!
6//! Detects new readers and replays historical samples.
7//!
8//! # Operation
9//!
10//! 1. Monitor for new DataReaders via discovery
11//! 2. When a TRANSIENT_LOCAL reader joins, query store for historical samples
12//! 3. Replay historical samples to the new reader via DataWriter
13
14use crate::config::Config;
15use crate::dds_interface::{
16    DataWriter, DdsInterface, DiscoveredReader, DiscoveredWriter, DiscoveryCallback, DurabilityKind,
17};
18use crate::store::{PersistenceStore, Sample};
19use anyhow::Result;
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22use tokio::sync::mpsc;
23use tokio::sync::RwLock;
24
25/// Late-joiner publisher
26///
27/// Monitors for new DataReaders and replays historical data to them.
28pub struct LateJoinerPublisher<S: PersistenceStore, D: DdsInterface> {
29    config: Config,
30    store: Arc<RwLock<S>>,
31    dds: Arc<D>,
32    /// Readers we've already replayed to
33    replayed_readers: HashSet<[u8; 16]>,
34    /// Writers we've created (by topic)
35    writers: HashMap<String, WriterState>,
36    /// Statistics
37    stats: PublisherStats,
38}
39
40struct WriterState {
41    durability: DurabilityKind,
42    writer: Box<dyn DataWriter>,
43}
44
45type ReplayCallback = Box<dyn Fn(&Sample) + Send + Sync>;
46
47/// Publisher statistics
48#[derive(Debug, Default, Clone)]
49pub struct PublisherStats {
50    /// Total readers discovered
51    pub readers_discovered: u64,
52    /// Readers that requested replay
53    pub readers_replayed: u64,
54    /// Total samples replayed
55    pub samples_replayed: u64,
56    /// Replay errors
57    pub replay_errors: u64,
58}
59
60#[allow(dead_code)]
61enum DiscoveryEvent {
62    Reader(DiscoveredReader),
63    Writer(DiscoveredWriter),
64}
65
66struct DiscoveryBridge {
67    tx: mpsc::Sender<DiscoveryEvent>,
68}
69
70impl DiscoveryCallback for DiscoveryBridge {
71    fn on_reader_discovered(&self, reader: DiscoveredReader) {
72        if self.tx.try_send(DiscoveryEvent::Reader(reader)).is_err() {
73            tracing::debug!("Dropping reader discovery event (channel full)");
74        }
75    }
76
77    fn on_reader_removed(&self, _guid: [u8; 16]) {}
78
79    fn on_writer_discovered(&self, writer: DiscoveredWriter) {
80        if self.tx.try_send(DiscoveryEvent::Writer(writer)).is_err() {
81            tracing::debug!("Dropping writer discovery event (channel full)");
82        }
83    }
84
85    fn on_writer_removed(&self, _guid: [u8; 16]) {}
86}
87
88impl<S: PersistenceStore + Send + Sync, D: DdsInterface> LateJoinerPublisher<S, D> {
89    /// Create a new late-joiner publisher
90    pub fn new(config: Config, store: Arc<RwLock<S>>, dds: Arc<D>) -> Self {
91        Self {
92            config,
93            store,
94            dds,
95            replayed_readers: HashSet::new(),
96            writers: HashMap::new(),
97            stats: PublisherStats::default(),
98        }
99    }
100
101    /// Get publisher statistics
102    pub fn stats(&self) -> &PublisherStats {
103        &self.stats
104    }
105
106    /// Run the publisher
107    pub async fn run(mut self) -> Result<()> {
108        tracing::info!(
109            "LateJoinerPublisher started for topics: {}",
110            self.config.topic_filter
111        );
112
113        let (event_tx, mut event_rx) = mpsc::channel(128);
114        let bridge = Arc::new(DiscoveryBridge { tx: event_tx });
115        self.dds.register_discovery_callback(bridge)?;
116
117        // Initial snapshot
118        self.discover_and_replay().await?;
119
120        loop {
121            tokio::select! {
122                Some(event) = event_rx.recv() => {
123                    if let DiscoveryEvent::Reader(reader) = event {
124                        if let Err(e) = self.handle_reader_discovered(reader).await {
125                            tracing::error!("Discovery/replay error: {}", e);
126                        }
127                    }
128                }
129            }
130        }
131    }
132
133    /// Discover new readers and replay history to them
134    async fn discover_and_replay(&mut self) -> Result<()> {
135        let readers = self.dds.discovered_readers(&self.config.topic_filter)?;
136
137        for reader in readers {
138            self.handle_reader_discovered(reader).await?;
139        }
140
141        Ok(())
142    }
143
144    async fn handle_reader_discovered(&mut self, reader: DiscoveredReader) -> Result<()> {
145        // Skip if we've already replayed to this reader
146        if self.replayed_readers.contains(&reader.guid) {
147            return Ok(());
148        }
149
150        self.stats.readers_discovered += 1;
151
152        if !self.should_replay(&reader)? {
153            self.replayed_readers.insert(reader.guid);
154            return Ok(());
155        }
156
157        tracing::info!("New durable reader discovered for topic: {}", reader.topic);
158
159        // Replay history
160        match self.replay_to_reader(&reader).await {
161            Ok(count) => {
162                self.stats.readers_replayed += 1;
163                self.stats.samples_replayed += count as u64;
164                tracing::info!(
165                    "Replayed {} samples to reader {} for topic {}",
166                    count,
167                    hex(&reader.guid[0..4]),
168                    reader.topic
169                );
170            }
171            Err(e) => {
172                self.stats.replay_errors += 1;
173                tracing::error!(
174                    "Failed to replay to reader {}: {}",
175                    hex(&reader.guid[0..4]),
176                    e
177                );
178            }
179        }
180
181        self.replayed_readers.insert(reader.guid);
182        Ok(())
183    }
184
185    fn should_replay(&self, reader: &DiscoveredReader) -> Result<bool> {
186        match reader.durability {
187            DurabilityKind::Volatile => {
188                tracing::debug!(
189                    "Skipping volatile reader {} for topic {}",
190                    hex(&reader.guid[0..4]),
191                    reader.topic
192                );
193                return Ok(false);
194            }
195            DurabilityKind::TransientLocal => {
196                tracing::debug!(
197                    "Skipping transient-local reader {} for topic {} (writer handles replay)",
198                    hex(&reader.guid[0..4]),
199                    reader.topic
200                );
201                return Ok(false);
202            }
203            DurabilityKind::Persistent => {}
204        }
205
206        let writers = self.dds.discovered_writers(&reader.topic)?;
207        let has_viable_writer = writers
208            .iter()
209            .any(|writer| writer.durability.rank() >= reader.durability.rank());
210
211        if has_viable_writer {
212            tracing::debug!(
213                "Skipping persistence replay for topic {} (durable writer present)",
214                reader.topic
215            );
216            return Ok(false);
217        }
218
219        Ok(true)
220    }
221
222    /// Replay historical samples to a specific reader
223    async fn replay_to_reader(&mut self, reader: &DiscoveredReader) -> Result<usize> {
224        // Query store for historical samples first
225        let samples = {
226            let store = self.store.read().await;
227            store.query_range(&reader.topic, 0, u64::MAX)?
228        };
229
230        if samples.is_empty() {
231            tracing::debug!("No historical samples for topic {}", reader.topic);
232            return Ok(0);
233        }
234
235        // Get or create writer for this topic
236        let writer =
237            self.get_or_create_writer(&reader.topic, &reader.type_name, reader.durability)?;
238
239        // Replay samples in order
240        let mut replayed = 0;
241        for sample in &samples {
242            match writer.write_with_timestamp(&sample.payload, sample.timestamp_ns) {
243                Ok(()) => {
244                    replayed += 1;
245                    tracing::trace!(
246                        "Replayed sample seq={} to topic {}",
247                        sample.sequence,
248                        sample.topic
249                    );
250                }
251                Err(e) => {
252                    tracing::warn!("Failed to replay sample seq={}: {}", sample.sequence, e);
253                }
254            }
255        }
256
257        Ok(replayed)
258    }
259
260    /// Get or create a DataWriter for a topic
261    fn get_or_create_writer(
262        &mut self,
263        topic: &str,
264        type_name: &str,
265        durability: DurabilityKind,
266    ) -> Result<&dyn DataWriter> {
267        let entry = self.writers.entry(topic.to_string());
268        let state = match entry {
269            std::collections::hash_map::Entry::Vacant(slot) => {
270                let writer = self.dds.create_writer(topic, type_name, durability)?;
271                tracing::info!(
272                    "Created writer for topic: {} (durability={:?})",
273                    topic,
274                    durability
275                );
276                slot.insert(WriterState { durability, writer })
277            }
278            std::collections::hash_map::Entry::Occupied(mut slot) => {
279                if slot.get().durability.rank() < durability.rank() {
280                    let writer = self.dds.create_writer(topic, type_name, durability)?;
281                    tracing::info!(
282                        "Upgraded writer for topic: {} (durability={:?})",
283                        topic,
284                        durability
285                    );
286                    *slot.get_mut() = WriterState { durability, writer };
287                }
288                slot.into_mut()
289            }
290        };
291
292        Ok(state.writer.as_ref())
293    }
294
295    /// Manually trigger replay for a topic (for testing)
296    pub async fn replay_topic(&mut self, topic: &str, type_name: &str) -> Result<usize> {
297        // Load samples first
298        let samples = {
299            let store = self.store.read().await;
300            store.load(topic)?
301        };
302
303        if samples.is_empty() {
304            return Ok(0);
305        }
306
307        // Get or create writer
308        let writer = self.get_or_create_writer(topic, type_name, DurabilityKind::TransientLocal)?;
309
310        let mut replayed = 0;
311        for sample in &samples {
312            if writer
313                .write_with_timestamp(&sample.payload, sample.timestamp_ns)
314                .is_ok()
315            {
316                replayed += 1;
317            }
318        }
319
320        self.stats.samples_replayed += replayed as u64;
321        Ok(replayed)
322    }
323}
324
325// ============================================================================
326// Standalone mode (without DDS - for testing and CLI)
327// ============================================================================
328
329/// Standalone late-joiner publisher that replays via callback
330pub struct StandalonePublisher<S: PersistenceStore> {
331    config: Config,
332    store: Arc<RwLock<S>>,
333    /// Callback for replayed samples
334    on_replay: Option<ReplayCallback>,
335    stats: PublisherStats,
336}
337
338impl<S: PersistenceStore + Send + Sync> StandalonePublisher<S> {
339    /// Create a new standalone publisher
340    pub fn new(config: Config, store: Arc<RwLock<S>>) -> Self {
341        Self {
342            config,
343            store,
344            on_replay: None,
345            stats: PublisherStats::default(),
346        }
347    }
348
349    /// Set callback for replayed samples
350    pub fn on_replay<F>(mut self, callback: F) -> Self
351    where
352        F: Fn(&Sample) + Send + Sync + 'static,
353    {
354        self.on_replay = Some(Box::new(callback));
355        self
356    }
357
358    /// Get statistics
359    pub fn stats(&self) -> &PublisherStats {
360        &self.stats
361    }
362
363    /// Replay all samples matching topic filter
364    pub async fn replay_all(&mut self) -> Result<usize> {
365        let store = self.store.read().await;
366        let samples = store.query_range(&self.config.topic_filter, 0, u64::MAX)?;
367
368        let mut replayed = 0;
369        for sample in &samples {
370            if let Some(ref callback) = self.on_replay {
371                callback(sample);
372            }
373            replayed += 1;
374            self.stats.samples_replayed += 1;
375        }
376
377        tracing::info!(
378            "Replayed {} samples for pattern {}",
379            replayed,
380            self.config.topic_filter
381        );
382
383        Ok(replayed)
384    }
385
386    /// Replay samples for a specific topic
387    pub async fn replay_topic(&mut self, topic: &str) -> Result<usize> {
388        let store = self.store.read().await;
389        let samples = store.load(topic)?;
390
391        let mut replayed = 0;
392        for sample in &samples {
393            if let Some(ref callback) = self.on_replay {
394                callback(sample);
395            }
396            replayed += 1;
397            self.stats.samples_replayed += 1;
398        }
399
400        tracing::info!("Replayed {} samples for topic {}", replayed, topic);
401
402        Ok(replayed)
403    }
404
405    /// Replay samples in a time range
406    pub async fn replay_range(&mut self, topic: &str, start_ns: u64, end_ns: u64) -> Result<usize> {
407        let store = self.store.read().await;
408        let samples = store.query_range(topic, start_ns, end_ns)?;
409
410        let mut replayed = 0;
411        for sample in &samples {
412            if let Some(ref callback) = self.on_replay {
413                callback(sample);
414            }
415            replayed += 1;
416            self.stats.samples_replayed += 1;
417        }
418
419        tracing::info!(
420            "Replayed {} samples for topic {} in range [{}, {}]",
421            replayed,
422            topic,
423            start_ns,
424            end_ns
425        );
426
427        Ok(replayed)
428    }
429}
430
431/// Helper: format bytes as hex
432fn hex(bytes: &[u8]) -> String {
433    bytes.iter().map(|b| format!("{:02x}", b)).collect()
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439    use crate::dds_interface::{DiscoveredReader, DurabilityKind, MockDdsInterface};
440    use crate::sqlite::SqliteStore;
441
442    #[test]
443    fn test_late_joiner_publisher_creation() {
444        let config = Config::builder().topic_filter("State/*").build();
445
446        let store = SqliteStore::new_in_memory().unwrap();
447        let store = Arc::new(RwLock::new(store));
448
449        let dds = Arc::new(MockDdsInterface::new());
450
451        let publisher = LateJoinerPublisher::new(config, store, dds);
452        assert_eq!(publisher.stats.readers_discovered, 0);
453    }
454
455    #[tokio::test]
456    async fn test_discover_and_replay() {
457        let config = Config::builder().topic_filter("State/*").build();
458
459        let store = SqliteStore::new_in_memory().unwrap();
460
461        // Add some historical samples
462        for i in 0..5 {
463            let sample = Sample {
464                topic: "State/Temperature".to_string(),
465                type_name: "Temperature".to_string(),
466                payload: vec![i as u8],
467                timestamp_ns: i * 1000,
468                sequence: i,
469                source_guid: [0xAA; 16],
470            };
471            store.save(&sample).unwrap();
472        }
473
474        let store = Arc::new(RwLock::new(store));
475        let dds = Arc::new(MockDdsInterface::new());
476
477        // Add a mock reader
478        dds.add_reader(DiscoveredReader {
479            guid: [0x01; 16],
480            topic: "State/Temperature".to_string(),
481            type_name: "Temperature".to_string(),
482            durability: DurabilityKind::Persistent,
483        });
484
485        let mut publisher = LateJoinerPublisher::new(config, store, dds);
486
487        // Run discovery
488        publisher.discover_and_replay().await.unwrap();
489
490        assert_eq!(publisher.stats.readers_discovered, 1);
491        assert_eq!(publisher.stats.readers_replayed, 1);
492        assert_eq!(publisher.stats.samples_replayed, 5);
493    }
494
495    #[tokio::test]
496    async fn test_skip_volatile_readers() {
497        let config = Config::builder().topic_filter("*").build();
498
499        let store = SqliteStore::new_in_memory().unwrap();
500        let store = Arc::new(RwLock::new(store));
501        let dds = Arc::new(MockDdsInterface::new());
502
503        // Add a volatile reader
504        dds.add_reader(DiscoveredReader {
505            guid: [0x02; 16],
506            topic: "test/topic".to_string(),
507            type_name: "Test".to_string(),
508            durability: DurabilityKind::Volatile,
509        });
510
511        let mut publisher = LateJoinerPublisher::new(config, store, dds);
512        publisher.discover_and_replay().await.unwrap();
513
514        // Reader was discovered but not replayed (because it's volatile)
515        assert_eq!(publisher.stats.readers_discovered, 1);
516        assert_eq!(publisher.stats.readers_replayed, 0);
517    }
518
519    #[tokio::test]
520    async fn test_standalone_publisher() {
521        let config = Config::builder().topic_filter("*").build();
522
523        let store = SqliteStore::new_in_memory().unwrap();
524
525        // Add samples
526        for i in 0..3 {
527            let sample = Sample {
528                topic: "test/topic".to_string(),
529                type_name: "Test".to_string(),
530                payload: vec![i as u8],
531                timestamp_ns: i * 1000,
532                sequence: i,
533                source_guid: [0xBB; 16],
534            };
535            store.save(&sample).unwrap();
536        }
537
538        let store = Arc::new(RwLock::new(store));
539
540        let replayed_samples = Arc::new(std::sync::Mutex::new(Vec::new()));
541        let replayed_clone = Arc::clone(&replayed_samples);
542
543        let mut publisher = StandalonePublisher::new(config, store).on_replay(move |sample| {
544            replayed_clone.lock().unwrap().push(sample.sequence);
545        });
546
547        let count = publisher.replay_all().await.unwrap();
548        assert_eq!(count, 3);
549
550        let replayed = replayed_samples.lock().unwrap();
551        assert_eq!(replayed.len(), 3);
552        assert!(replayed.contains(&0));
553        assert!(replayed.contains(&1));
554        assert!(replayed.contains(&2));
555    }
556
557    #[tokio::test]
558    async fn test_replay_range() {
559        let config = Config::builder().topic_filter("*").build();
560
561        let store = SqliteStore::new_in_memory().unwrap();
562
563        // Add samples at different times
564        for i in 0..10 {
565            let sample = Sample {
566                topic: "test/topic".to_string(),
567                type_name: "Test".to_string(),
568                payload: vec![i as u8],
569                timestamp_ns: i * 1000,
570                sequence: i,
571                source_guid: [0xCC; 16],
572            };
573            store.save(&sample).unwrap();
574        }
575
576        let store = Arc::new(RwLock::new(store));
577
578        let mut publisher = StandalonePublisher::new(config, store);
579
580        // Replay only samples in range [2000, 5000]
581        let count = publisher
582            .replay_range("test/topic", 2000, 5000)
583            .await
584            .unwrap();
585        assert_eq!(count, 4); // Samples at 2000, 3000, 4000, 5000
586    }
587
588    #[test]
589    fn test_hex() {
590        assert_eq!(hex(&[0xde, 0xad, 0xbe, 0xef]), "deadbeef");
591        assert_eq!(hex(&[0x01, 0x02]), "0102");
592    }
593}