distributed_topic_tracker/gossip/topic/
publisher.rs

1//! Background publisher that updates DHT records with active peer info.
2
3use actor_helper::{Action, Actor, Handle, Receiver};
4use std::time::Duration;
5
6use crate::{GossipReceiver, RecordPublisher};
7use anyhow::Result;
8
9/// Periodically publishes node state to DHT for peer discovery.
10///
11/// Publishes a record after an initial 10s delay, then repeatedly with
12/// randomized 0-49s intervals, containing this node's active neighbor list
13/// and message hashes for bubble detection and merging.
14#[derive(Debug, Clone)]
15pub struct Publisher {
16    _api: Handle<PublisherActor, anyhow::Error>,
17}
18
19#[derive(Debug)]
20struct PublisherActor {
21    rx: Receiver<Action<PublisherActor>>,
22
23    record_publisher: RecordPublisher,
24    gossip_receiver: GossipReceiver,
25    ticker: tokio::time::Interval,
26}
27
28impl Publisher {
29    /// Create a new background publisher.
30    ///
31    /// Spawns a background task that periodically publishes records.
32    pub fn new(record_publisher: RecordPublisher, gossip_receiver: GossipReceiver) -> Result<Self> {
33        let (api, rx) = Handle::channel();
34
35        tokio::spawn(async move {
36            let mut ticker = tokio::time::interval(Duration::from_secs(10));
37            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
38            let mut actor = PublisherActor {
39                rx,
40                record_publisher,
41                gossip_receiver,
42                ticker,
43            };
44            let _ = actor.run().await;
45        });
46
47        Ok(Self { _api: api })
48    }
49}
50
51impl Actor<anyhow::Error> for PublisherActor {
52    async fn run(&mut self) -> Result<()> {
53        tracing::debug!("Publisher: starting publisher actor");
54        loop {
55            tokio::select! {
56                Ok(action) = self.rx.recv_async() => {
57                    action(self).await;
58                }
59                _ = self.ticker.tick() => {
60                    tracing::debug!("Publisher: tick fired, attempting to publish");
61                    let _ = self.publish().await;
62                    let next_interval = rand::random::<u64>() % 50;
63                    tracing::debug!("Publisher: next publish in {}s", next_interval);
64                    self.ticker.reset_after(Duration::from_secs(next_interval));
65                }
66                _ = tokio::signal::ctrl_c() => break,
67            }
68        }
69        Ok(())
70    }
71}
72
73impl PublisherActor {
74    async fn publish(&mut self) -> Result<()> {
75        let unix_minute = crate::unix_minute(0);
76
77        let active_peers = self
78            .gossip_receiver
79            .neighbors()
80            .await
81            .iter()
82            .filter_map(|pub_key| TryInto::<[u8; 32]>::try_into(pub_key.as_slice()).ok())
83            .collect::<Vec<_>>();
84
85        let last_message_hashes = self
86            .gossip_receiver
87            .last_message_hashes()
88            .await
89            .iter()
90            .filter_map(|hash| TryInto::<[u8; 32]>::try_into(hash.as_slice()).ok())
91            .collect::<Vec<_>>();
92
93        tracing::debug!(
94            "Publisher: publishing record for unix_minute {} with {} active_peers and {} message_hashes",
95            unix_minute,
96            active_peers.len(),
97            last_message_hashes.len()
98        );
99
100        let record_content = crate::gossip::GossipRecordContent {
101            active_peers: active_peers.as_slice().try_into()?,
102            last_message_hashes: last_message_hashes.as_slice().try_into()?,
103        };
104
105        tracing::debug!("Publisher: created record content: {:?}", record_content);
106
107        let res = self
108            .record_publisher
109            .new_record(unix_minute, record_content);
110        tracing::debug!("Publisher: created new record: {:?}", res);
111        let record = res?;
112        let result = self.record_publisher.publish_record(record).await;
113
114        if result.is_ok() {
115            tracing::debug!("Publisher: successfully published record");
116        } else {
117            tracing::debug!("Publisher: failed to publish record: {:?}", result);
118        }
119
120        result
121    }
122}