distributed_topic_tracker/gossip/topic/
publisher.rs1use actor_helper::{Action, Handle, Receiver};
4use std::time::Duration;
5
6use crate::{GossipReceiver, MAX_MESSAGE_HASHES, MAX_RECORD_PEERS, RecordPublisher};
7use anyhow::Result;
8
9#[derive(Debug, Clone)]
15pub struct Publisher {
16 _api: Handle<PublisherActor, anyhow::Error>,
17}
18
19#[derive(Debug)]
20struct PublisherActor {
21 record_publisher: RecordPublisher,
22 gossip_receiver: GossipReceiver,
23 ticker: tokio::time::Interval,
24 cancel_token: tokio_util::sync::CancellationToken,
25 base_interval: Duration,
26 max_jitter: Duration,
27}
28
29impl Publisher {
30 pub fn new(
34 record_publisher: RecordPublisher,
35 gossip_receiver: GossipReceiver,
36 cancel_token: tokio_util::sync::CancellationToken,
37 initial_delay: Duration,
38 base_interval: Duration,
39 max_jitter: Duration,
40 ) -> Result<Self> {
41 let base_interval = base_interval.max(Duration::from_secs(1));
42 let mut ticker =
43 tokio::time::interval_at(tokio::time::Instant::now() + initial_delay, base_interval);
44 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
45 let api = Handle::spawn_with(
46 PublisherActor {
47 record_publisher,
48 gossip_receiver,
49 ticker,
50 cancel_token,
51 base_interval,
52 max_jitter,
53 },
54 |mut actor, rx| async move { actor.run(rx).await },
55 )
56 .0;
57
58 Ok(Self { _api: api })
59 }
60}
61
62impl PublisherActor {
63 async fn run(&mut self, rx: Receiver<Action<PublisherActor>>) -> Result<()> {
64 tracing::debug!("Publisher: starting publisher actor");
65 loop {
66 tokio::select! {
67 result = rx.recv_async() => {
68 match result {
69 Ok(action) => action(self).await,
70 Err(_) => break Ok(()),
71 }
72 }
73 _ = self.ticker.tick() => {
74 tracing::debug!("Publisher: tick fired, attempting to publish");
75 if let Err(e) = self.publish().await {
76 tracing::warn!("Publisher: failed to publish record: {:?}", e);
77 }
78 let jitter = if self.max_jitter > Duration::ZERO {
79 Duration::from_nanos((rand::random::<u128>() % self.max_jitter.as_nanos()) as u64)
80 } else {
81 Duration::ZERO
82 };
83 let next_interval = self.base_interval + jitter;
84 tracing::debug!("Publisher: next publish in {}ms", next_interval.as_millis());
85 self.ticker.reset_after(next_interval);
86 }
87 _ = self.cancel_token.cancelled() => {
88 break Ok(());
89 }
90 else => break Ok(()),
91 }
92 }
93 }
94}
95
96impl PublisherActor {
97 async fn publish(&mut self) -> Result<()> {
98 let unix_minute = crate::unix_minute(0);
99
100 let active_peers = self
101 .gossip_receiver
102 .neighbors()
103 .await?
104 .iter()
105 .filter_map(|pub_key| pub_key.as_bytes().as_ref().try_into().ok())
106 .take(MAX_RECORD_PEERS)
107 .collect::<Vec<_>>();
108
109 let last_message_hashes = self
110 .gossip_receiver
111 .last_message_hashes()
112 .await?
113 .iter()
114 .filter_map(|hash| hash.as_ref().try_into().ok())
115 .take(MAX_MESSAGE_HASHES)
116 .collect::<Vec<_>>();
117
118 tracing::debug!(
119 "Publisher: publishing record for unix_minute {} with {} active_peers and {} message_hashes",
120 unix_minute,
121 active_peers.len(),
122 last_message_hashes.len()
123 );
124
125 let record_content = crate::gossip::GossipRecordContent {
126 active_peers: {
127 let mut peers = [Default::default(); MAX_RECORD_PEERS];
128 peers[..active_peers.len()].copy_from_slice(&active_peers);
129 peers
130 },
131 last_message_hashes: {
132 let mut hashes = [Default::default(); MAX_MESSAGE_HASHES];
133 hashes[..last_message_hashes.len()].copy_from_slice(&last_message_hashes);
134 hashes
135 },
136 };
137
138 tracing::debug!("Publisher: created record content: {:?}", record_content);
139
140 let res = self
141 .record_publisher
142 .new_record(unix_minute, record_content);
143 tracing::debug!("Publisher: created new record: {:?}", res);
144 let record = res?;
145 let result = self
146 .record_publisher
147 .publish_record(record, self.cancel_token.clone())
148 .await;
149
150 if let Err(ref e) = result {
151 tracing::debug!("Publisher: failed to publish record: {:?}", e);
152 } else {
153 tracing::debug!("Publisher: successfully published record");
154 }
155
156 result
157 }
158}