distributed_topic_tracker/gossip/topic/
publisher.rs

1use actor_helper::{Action, Actor, Handle};
2use std::time::Duration;
3
4use crate::{GossipReceiver, RecordPublisher};
5use anyhow::Result;
6
7#[derive(Debug, Clone)]
8pub struct Publisher {
9    _api: Handle<PublisherActor>,
10}
11
12#[derive(Debug)]
13struct PublisherActor {
14    rx: tokio::sync::mpsc::Receiver<Action<PublisherActor>>,
15
16    record_publisher: RecordPublisher,
17    gossip_receiver: GossipReceiver,
18    ticker: tokio::time::Interval,
19}
20
21impl Publisher {
22    pub fn new(record_publisher: RecordPublisher, gossip_receiver: GossipReceiver) -> Result<Self> {
23        let (api, rx) = Handle::channel(32);
24
25        tokio::spawn(async move {
26            let mut ticker = tokio::time::interval(Duration::from_secs(10));
27            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
28            let mut actor = PublisherActor {
29                rx,
30                record_publisher,
31                gossip_receiver,
32                ticker,
33            };
34            let _ = actor.run().await;
35        });
36
37        Ok(Self { _api: api })
38    }
39}
40
41impl Actor for PublisherActor {
42    async fn run(&mut self) -> Result<()> {
43        loop {
44            tokio::select! {
45                Some(action) = self.rx.recv() => {
46                    action(self).await;
47                }
48                _ = self.ticker.tick() => {
49                    let _ = self.publish().await;
50                    self.ticker.reset_after(Duration::from_secs(rand::random::<u64>() % 50));
51                }
52                _ = tokio::signal::ctrl_c() => break,
53            }
54        }
55        Ok(())
56    }
57}
58
59impl PublisherActor {
60    async fn publish(&mut self) -> Result<()> {
61        let unix_minute = crate::unix_minute(0);
62
63        let active_peers = self
64            .gossip_receiver
65            .neighbors()
66            .await
67            .iter()
68            .filter_map(|pub_key| TryInto::<[u8; 32]>::try_into(pub_key.as_slice()).ok())
69            .collect::<Vec<_>>();
70
71        let last_message_hashes = self
72            .gossip_receiver
73            .last_message_hashes()
74            .await
75            .iter()
76            .filter_map(|hash| TryInto::<[u8; 32]>::try_into(hash.as_slice()).ok())
77            .collect::<Vec<_>>();
78
79        let record_content = crate::gossip::GossipRecordContent {
80            active_peers: active_peers.as_slice().try_into()?,
81            last_message_hashes: last_message_hashes.as_slice().try_into()?,
82        };
83
84        let record = self
85            .record_publisher
86            .new_record(unix_minute, record_content)?;
87        self.record_publisher.publish_record(record).await
88    }
89}