distributed_topic_tracker/gossip/topic/
publisher.rs1use actor_helper::{Action, Actor, Handle, Receiver};
4use std::time::Duration;
5
6use crate::{GossipReceiver, RecordPublisher};
7use anyhow::Result;
8
9#[derive(Debug, Clone)]
15pub struct Publisher {
16 _api: Handle<PublisherActor, anyhow::Error>,
17}
18
19#[derive(Debug)]
20struct PublisherActor {
21 rx: Receiver<Action<PublisherActor>>,
22
23 record_publisher: RecordPublisher,
24 gossip_receiver: GossipReceiver,
25 ticker: tokio::time::Interval,
26}
27
28impl Publisher {
29 pub fn new(record_publisher: RecordPublisher, gossip_receiver: GossipReceiver) -> Result<Self> {
33 let (api, rx) = Handle::channel();
34
35 tokio::spawn(async move {
36 let mut ticker = tokio::time::interval(Duration::from_secs(10));
37 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
38 let mut actor = PublisherActor {
39 rx,
40 record_publisher,
41 gossip_receiver,
42 ticker,
43 };
44 let _ = actor.run().await;
45 });
46
47 Ok(Self { _api: api })
48 }
49}
50
51impl Actor<anyhow::Error> for PublisherActor {
52 async fn run(&mut self) -> Result<()> {
53 tracing::debug!("Publisher: starting publisher actor");
54 loop {
55 tokio::select! {
56 Ok(action) = self.rx.recv_async() => {
57 action(self).await;
58 }
59 _ = self.ticker.tick() => {
60 tracing::debug!("Publisher: tick fired, attempting to publish");
61 let _ = self.publish().await;
62 let next_interval = rand::random::<u64>() % 50;
63 tracing::debug!("Publisher: next publish in {}s", next_interval);
64 self.ticker.reset_after(Duration::from_secs(next_interval));
65 }
66 _ = tokio::signal::ctrl_c() => break,
67 }
68 }
69 Ok(())
70 }
71}
72
73impl PublisherActor {
74 async fn publish(&mut self) -> Result<()> {
75 let unix_minute = crate::unix_minute(0);
76
77 let active_peers = self
78 .gossip_receiver
79 .neighbors()
80 .await
81 .iter()
82 .filter_map(|pub_key| TryInto::<[u8; 32]>::try_into(pub_key.as_slice()).ok())
83 .collect::<Vec<_>>();
84
85 let last_message_hashes = self
86 .gossip_receiver
87 .last_message_hashes()
88 .await
89 .iter()
90 .filter_map(|hash| TryInto::<[u8; 32]>::try_into(hash.as_slice()).ok())
91 .collect::<Vec<_>>();
92
93 tracing::debug!(
94 "Publisher: publishing record for unix_minute {} with {} active_peers and {} message_hashes",
95 unix_minute,
96 active_peers.len(),
97 last_message_hashes.len()
98 );
99
100 let record_content = crate::gossip::GossipRecordContent {
101 active_peers: active_peers.as_slice().try_into()?,
102 last_message_hashes: last_message_hashes.as_slice().try_into()?,
103 };
104
105 tracing::debug!("Publisher: created record content: {:?}", record_content);
106
107 let res = self
108 .record_publisher
109 .new_record(unix_minute, record_content);
110 tracing::debug!("Publisher: created new record: {:?}", res);
111 let record = res?;
112 let result = self.record_publisher.publish_record(record).await;
113
114 if result.is_ok() {
115 tracing::debug!("Publisher: successfully published record");
116 } else {
117 tracing::debug!("Publisher: failed to publish record: {:?}", result);
118 }
119
120 result
121 }
122}