distributed_topic_tracker/gossip/topic/
publisher.rs1use actor_helper::{Action, Actor, Handle};
2use std::time::Duration;
3
4use crate::{GossipReceiver, RecordPublisher};
5use anyhow::Result;
6
7#[derive(Debug, Clone)]
8pub struct Publisher {
9 _api: Handle<PublisherActor>,
10}
11
12#[derive(Debug)]
13struct PublisherActor {
14 rx: tokio::sync::mpsc::Receiver<Action<PublisherActor>>,
15
16 record_publisher: RecordPublisher,
17 gossip_receiver: GossipReceiver,
18 ticker: tokio::time::Interval,
19}
20
21impl Publisher {
22 pub fn new(record_publisher: RecordPublisher, gossip_receiver: GossipReceiver) -> Result<Self> {
23 let (api, rx) = Handle::channel(32);
24
25 tokio::spawn(async move {
26 let mut ticker = tokio::time::interval(Duration::from_secs(10));
27 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
28 let mut actor = PublisherActor {
29 rx,
30 record_publisher,
31 gossip_receiver,
32 ticker,
33 };
34 let _ = actor.run().await;
35 });
36
37 Ok(Self { _api: api })
38 }
39}
40
41impl Actor for PublisherActor {
42 async fn run(&mut self) -> Result<()> {
43 loop {
44 tokio::select! {
45 Some(action) = self.rx.recv() => {
46 action(self).await;
47 }
48 _ = self.ticker.tick() => {
49 let _ = self.publish().await;
50 self.ticker.reset_after(Duration::from_secs(rand::random::<u64>() % 50));
51 }
52 _ = tokio::signal::ctrl_c() => break,
53 }
54 }
55 Ok(())
56 }
57}
58
59impl PublisherActor {
60 async fn publish(&mut self) -> Result<()> {
61 let unix_minute = crate::unix_minute(0);
62
63 let active_peers = self
64 .gossip_receiver
65 .neighbors()
66 .await
67 .iter()
68 .filter_map(|pub_key| TryInto::<[u8; 32]>::try_into(pub_key.as_slice()).ok())
69 .collect::<Vec<_>>();
70
71 let last_message_hashes = self
72 .gossip_receiver
73 .last_message_hashes()
74 .await
75 .iter()
76 .filter_map(|hash| TryInto::<[u8; 32]>::try_into(hash.as_slice()).ok())
77 .collect::<Vec<_>>();
78
79 let record_content = crate::gossip::GossipRecordContent {
80 active_peers: active_peers.as_slice().try_into()?,
81 last_message_hashes: last_message_hashes.as_slice().try_into()?,
82 };
83
84 let record = self
85 .record_publisher
86 .new_record(unix_minute, record_content)?;
87 self.record_publisher.publish_record(record).await
88 }
89}