distributed_topic_tracker/gossip/topic/
publisher.rs1use actor_helper::{Action, Actor, Handle, Receiver};
4use std::time::Duration;
5
6use crate::{GossipReceiver, RecordPublisher};
7use anyhow::Result;
8
9#[derive(Debug, Clone)]
15pub struct Publisher {
16 _api: Handle<PublisherActor, anyhow::Error>,
17}
18
19#[derive(Debug)]
20struct PublisherActor {
21 rx: Receiver<Action<PublisherActor>>,
22
23 record_publisher: RecordPublisher,
24 gossip_receiver: GossipReceiver,
25 ticker: tokio::time::Interval,
26}
27
28impl Publisher {
29 pub fn new(record_publisher: RecordPublisher, gossip_receiver: GossipReceiver) -> Result<Self> {
33 let (api, rx) = Handle::channel();
34
35 tokio::spawn(async move {
36 let mut ticker = tokio::time::interval(Duration::from_secs(10));
37 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
38 let mut actor = PublisherActor {
39 rx,
40 record_publisher,
41 gossip_receiver,
42 ticker,
43 };
44 let _ = actor.run().await;
45 });
46
47 Ok(Self { _api: api })
48 }
49}
50
51impl Actor<anyhow::Error> for PublisherActor {
52 async fn run(&mut self) -> Result<()> {
53 tracing::debug!("Publisher: starting publisher actor");
54 loop {
55 tokio::select! {
56 Ok(action) = self.rx.recv_async() => {
57 action(self).await;
58 }
59 _ = self.ticker.tick() => {
60 tracing::debug!("Publisher: tick fired, attempting to publish");
61 let _ = self.publish().await;
62 let next_interval = rand::random::<u64>() % 50;
63 tracing::debug!("Publisher: next publish in {}s", next_interval);
64 self.ticker.reset_after(Duration::from_secs(next_interval));
65 }
66 else => break Ok(()),
67 }
68 }
69 }
70}
71
72impl PublisherActor {
73 async fn publish(&mut self) -> Result<()> {
74 let unix_minute = crate::unix_minute(0);
75
76 let active_peers = self
77 .gossip_receiver
78 .neighbors()
79 .await
80 .iter()
81 .filter_map(|pub_key| TryInto::<[u8; 32]>::try_into(pub_key.as_slice()).ok())
82 .collect::<Vec<_>>();
83
84 let last_message_hashes = self
85 .gossip_receiver
86 .last_message_hashes()
87 .await
88 .iter()
89 .filter_map(|hash| TryInto::<[u8; 32]>::try_into(hash.as_slice()).ok())
90 .collect::<Vec<_>>();
91
92 tracing::debug!(
93 "Publisher: publishing record for unix_minute {} with {} active_peers and {} message_hashes",
94 unix_minute,
95 active_peers.len(),
96 last_message_hashes.len()
97 );
98
99 let record_content = crate::gossip::GossipRecordContent {
100 active_peers: active_peers.as_slice().try_into()?,
101 last_message_hashes: last_message_hashes.as_slice().try_into()?,
102 };
103
104 tracing::debug!("Publisher: created record content: {:?}", record_content);
105
106 let res = self
107 .record_publisher
108 .new_record(unix_minute, record_content);
109 tracing::debug!("Publisher: created new record: {:?}", res);
110 let record = res?;
111 let result = self.record_publisher.publish_record(record).await;
112
113 if result.is_ok() {
114 tracing::debug!("Publisher: successfully published record");
115 } else {
116 tracing::debug!("Publisher: failed to publish record: {:?}", result);
117 }
118
119 result
120 }
121}