Skip to main content

distributed_topic_tracker/gossip/topic/
publisher.rs

1//! Background publisher that updates DHT records with active peer info.
2
3use actor_helper::{Action, Actor, Handle, Receiver};
4use std::time::Duration;
5
6use crate::{GossipReceiver, RecordPublisher};
7use anyhow::Result;
8
9/// Periodically publishes node state to DHT for peer discovery.
10///
11/// Publishes a record after an initial 10s delay, then repeatedly with
12/// randomized 0-49s intervals, containing this node's active neighbor list
13/// and message hashes for bubble detection and merging.
14#[derive(Debug, Clone)]
15pub struct Publisher {
16    _api: Handle<PublisherActor, anyhow::Error>,
17}
18
19#[derive(Debug)]
20struct PublisherActor {
21    rx: Receiver<Action<PublisherActor>>,
22
23    record_publisher: RecordPublisher,
24    gossip_receiver: GossipReceiver,
25    ticker: tokio::time::Interval,
26}
27
28impl Publisher {
29    /// Create a new background publisher.
30    ///
31    /// Spawns a background task that periodically publishes records.
32    pub fn new(record_publisher: RecordPublisher, gossip_receiver: GossipReceiver) -> Result<Self> {
33        let (api, rx) = Handle::channel();
34
35        tokio::spawn(async move {
36            let mut ticker = tokio::time::interval(Duration::from_secs(10));
37            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
38            let mut actor = PublisherActor {
39                rx,
40                record_publisher,
41                gossip_receiver,
42                ticker,
43            };
44            let _ = actor.run().await;
45        });
46
47        Ok(Self { _api: api })
48    }
49}
50
51impl Actor<anyhow::Error> for PublisherActor {
52    async fn run(&mut self) -> Result<()> {
53        tracing::debug!("Publisher: starting publisher actor");
54        loop {
55            tokio::select! {
56                Ok(action) = self.rx.recv_async() => {
57                    action(self).await;
58                }
59                _ = self.ticker.tick() => {
60                    tracing::debug!("Publisher: tick fired, attempting to publish");
61                    let _ = self.publish().await;
62                    let next_interval = rand::random::<u64>() % 50;
63                    tracing::debug!("Publisher: next publish in {}s", next_interval);
64                    self.ticker.reset_after(Duration::from_secs(next_interval));
65                }
66                else => break Ok(()),
67            }
68        }
69    }
70}
71
72impl PublisherActor {
73    async fn publish(&mut self) -> Result<()> {
74        let unix_minute = crate::unix_minute(0);
75
76        let active_peers = self
77            .gossip_receiver
78            .neighbors()
79            .await
80            .iter()
81            .filter_map(|pub_key| TryInto::<[u8; 32]>::try_into(pub_key.as_slice()).ok())
82            .collect::<Vec<_>>();
83
84        let last_message_hashes = self
85            .gossip_receiver
86            .last_message_hashes()
87            .await
88            .iter()
89            .filter_map(|hash| TryInto::<[u8; 32]>::try_into(hash.as_slice()).ok())
90            .collect::<Vec<_>>();
91
92        tracing::debug!(
93            "Publisher: publishing record for unix_minute {} with {} active_peers and {} message_hashes",
94            unix_minute,
95            active_peers.len(),
96            last_message_hashes.len()
97        );
98
99        let record_content = crate::gossip::GossipRecordContent {
100            active_peers: active_peers.as_slice().try_into()?,
101            last_message_hashes: last_message_hashes.as_slice().try_into()?,
102        };
103
104        tracing::debug!("Publisher: created record content: {:?}", record_content);
105
106        let res = self
107            .record_publisher
108            .new_record(unix_minute, record_content);
109        tracing::debug!("Publisher: created new record: {:?}", res);
110        let record = res?;
111        let result = self.record_publisher.publish_record(record).await;
112
113        if result.is_ok() {
114            tracing::debug!("Publisher: successfully published record");
115        } else {
116            tracing::debug!("Publisher: failed to publish record: {:?}", result);
117        }
118
119        result
120    }
121}