Skip to main content

distributed_topic_tracker/gossip/topic/
publisher.rs

1//! Background publisher that updates DHT records with active peer info.
2
3use actor_helper::{Action, Handle, Receiver};
4use std::time::Duration;
5
6use crate::{GossipReceiver, MAX_MESSAGE_HASHES, MAX_RECORD_PEERS, RecordPublisher};
7use anyhow::Result;
8
9/// Periodically publishes node state to DHT for peer discovery.
10///
11/// Publishes a record after an initial delay initial_delay, then repeatedly with
12/// randomized base_interval + rand(0 to max_jitter) intervals, containing this node's active neighbor list
13/// and message hashes for bubble detection and merging.
14#[derive(Debug, Clone)]
15pub struct Publisher {
16    _api: Handle<PublisherActor, anyhow::Error>,
17}
18
19#[derive(Debug)]
20struct PublisherActor {
21    record_publisher: RecordPublisher,
22    gossip_receiver: GossipReceiver,
23    ticker: tokio::time::Interval,
24    cancel_token: tokio_util::sync::CancellationToken,
25    base_interval: Duration,
26    max_jitter: Duration,
27}
28
29impl Publisher {
30    /// Create a new background publisher.
31    ///
32    /// Spawns a background task that periodically publishes records.
33    pub fn new(
34        record_publisher: RecordPublisher,
35        gossip_receiver: GossipReceiver,
36        cancel_token: tokio_util::sync::CancellationToken,
37        initial_delay: Duration,
38        base_interval: Duration,
39        max_jitter: Duration,
40    ) -> Result<Self> {
41        let base_interval = base_interval.max(Duration::from_secs(1));
42        let mut ticker =
43            tokio::time::interval_at(tokio::time::Instant::now() + initial_delay, base_interval);
44        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
45        let api = Handle::spawn_with(
46            PublisherActor {
47                record_publisher,
48                gossip_receiver,
49                ticker,
50                cancel_token,
51                base_interval,
52                max_jitter,
53            },
54            |mut actor, rx| async move { actor.run(rx).await },
55        )
56        .0;
57
58        Ok(Self { _api: api })
59    }
60}
61
62impl PublisherActor {
63    async fn run(&mut self, rx: Receiver<Action<PublisherActor>>) -> Result<()> {
64        tracing::debug!("Publisher: starting publisher actor");
65        loop {
66            tokio::select! {
67                result = rx.recv_async() => {
68                    match result {
69                        Ok(action) => action(self).await,
70                        Err(_) => break Ok(()),
71                    }
72                }
73                _ = self.ticker.tick() => {
74                    tracing::debug!("Publisher: tick fired, attempting to publish");
75                    if let Err(e) = self.publish().await {
76                        tracing::warn!("Publisher: failed to publish record: {:?}", e);
77                    }
78                    let jitter = if self.max_jitter > Duration::ZERO {
79                        Duration::from_nanos((rand::random::<u128>() % self.max_jitter.as_nanos()) as u64)
80                    } else {
81                        Duration::ZERO
82                    };
83                    let next_interval = self.base_interval + jitter;
84                    tracing::debug!("Publisher: next publish in {}ms", next_interval.as_millis());
85                    self.ticker.reset_after(next_interval);
86                }
87                _ = self.cancel_token.cancelled() => {
88                    break Ok(());
89                }
90                else => break Ok(()),
91            }
92        }
93    }
94}
95
96impl PublisherActor {
97    async fn publish(&mut self) -> Result<()> {
98        let unix_minute = crate::unix_minute(0);
99
100        let active_peers = self
101            .gossip_receiver
102            .neighbors()
103            .await?
104            .iter()
105            .filter_map(|pub_key| pub_key.as_bytes().as_ref().try_into().ok())
106            .take(MAX_RECORD_PEERS)
107            .collect::<Vec<_>>();
108
109        let last_message_hashes = self
110            .gossip_receiver
111            .last_message_hashes()
112            .await?
113            .iter()
114            .filter_map(|hash| hash.as_ref().try_into().ok())
115            .take(MAX_MESSAGE_HASHES)
116            .collect::<Vec<_>>();
117
118        tracing::debug!(
119            "Publisher: publishing record for unix_minute {} with {} active_peers and {} message_hashes",
120            unix_minute,
121            active_peers.len(),
122            last_message_hashes.len()
123        );
124
125        let record_content = crate::gossip::GossipRecordContent {
126            active_peers: {
127                let mut peers = [Default::default(); MAX_RECORD_PEERS];
128                peers[..active_peers.len()].copy_from_slice(&active_peers);
129                peers
130            },
131            last_message_hashes: {
132                let mut hashes = [Default::default(); MAX_MESSAGE_HASHES];
133                hashes[..last_message_hashes.len()].copy_from_slice(&last_message_hashes);
134                hashes
135            },
136        };
137
138        tracing::debug!("Publisher: created record content: {:?}", record_content);
139
140        let res = self
141            .record_publisher
142            .new_record(unix_minute, record_content);
143        tracing::debug!("Publisher: created new record: {:?}", res);
144        let record = res?;
145        let result = self
146            .record_publisher
147            .publish_record(record, self.cancel_token.clone())
148            .await;
149
150        if let Err(ref e) = result {
151            tracing::debug!("Publisher: failed to publish record: {:?}", e);
152        } else {
153            tracing::debug!("Publisher: successfully published record");
154        }
155
156        result
157    }
158}