distributed_topic_tracker/gossip/topic/
publisher.rs1use actor_helper::{Action, Actor, Handle, Receiver};
4use std::time::Duration;
5
6use crate::{GossipReceiver, RecordPublisher};
7use anyhow::Result;
8
9#[derive(Debug, Clone)]
15pub struct Publisher {
16 _api: Handle<PublisherActor, anyhow::Error>,
17}
18
19#[derive(Debug)]
20struct PublisherActor {
21 rx: Receiver<Action<PublisherActor>>,
22
23 record_publisher: RecordPublisher,
24 gossip_receiver: GossipReceiver,
25 ticker: tokio::time::Interval,
26}
27
28impl Publisher {
29 pub fn new(record_publisher: RecordPublisher, gossip_receiver: GossipReceiver) -> Result<Self> {
33 let (api, rx) = Handle::channel();
34
35 tokio::spawn(async move {
36 let mut ticker = tokio::time::interval(Duration::from_secs(10));
37 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
38 let mut actor = PublisherActor {
39 rx,
40 record_publisher,
41 gossip_receiver,
42 ticker,
43 };
44 let _ = actor.run().await;
45 });
46
47 Ok(Self { _api: api })
48 }
49}
50
51impl Actor<anyhow::Error> for PublisherActor {
52 async fn run(&mut self) -> Result<()> {
53 tracing::debug!("Publisher: starting publisher actor");
54 loop {
55 tokio::select! {
56 Ok(action) = self.rx.recv_async() => {
57 action(self).await;
58 }
59 _ = self.ticker.tick() => {
60 tracing::debug!("Publisher: tick fired, attempting to publish");
61 let _ = self.publish().await;
62 let next_interval = rand::random::<u64>() % 50;
63 tracing::debug!("Publisher: next publish in {}s", next_interval);
64 self.ticker.reset_after(Duration::from_secs(next_interval));
65 }
66 _ = tokio::signal::ctrl_c() => break,
67 }
68 }
69 Ok(())
70 }
71}
72
73impl PublisherActor {
74 async fn publish(&mut self) -> Result<()> {
75 let unix_minute = crate::unix_minute(0);
76
77 let active_peers = self
78 .gossip_receiver
79 .neighbors()
80 .await
81 .iter()
82 .filter_map(|pub_key| TryInto::<[u8; 32]>::try_into(pub_key.as_slice()).ok())
83 .collect::<Vec<_>>();
84
85 let last_message_hashes = self
86 .gossip_receiver
87 .last_message_hashes()
88 .await
89 .iter()
90 .filter_map(|hash| TryInto::<[u8; 32]>::try_into(hash.as_slice()).ok())
91 .collect::<Vec<_>>();
92
93 tracing::debug!(
94 "Publisher: publishing record for unix_minute {} with {} active_peers and {} message_hashes",
95 unix_minute, active_peers.len(), last_message_hashes.len()
96 );
97
98 let record_content = crate::gossip::GossipRecordContent {
99 active_peers: active_peers.as_slice().try_into()?,
100 last_message_hashes: last_message_hashes.as_slice().try_into()?,
101 };
102
103 tracing::debug!("Publisher: created record content: {:?}", record_content);
104
105 let res = self
106 .record_publisher
107 .new_record(unix_minute, record_content);
108 tracing::debug!("Publisher: created new record: {:?}", res);
109 let record = res?;
110 let result = self.record_publisher.publish_record(record).await;
111
112 if result.is_ok() {
113 tracing::debug!("Publisher: successfully published record");
114 } else {
115 tracing::debug!("Publisher: failed to publish record: {:?}", result);
116 }
117
118 result
119 }
120}