distributed_topic_tracker/gossip/topic/
publisher.rs

1use std::time::Duration;
2use actor_helper::{Action, Actor, Handle};
3
4use crate::{
5    GossipReceiver, RecordPublisher,
6};
7use anyhow::Result;
8
9#[derive(Debug,Clone)]
10pub struct Publisher {
11    _api: Handle<PublisherActor>,
12}
13
14#[derive(Debug)]
15struct PublisherActor {
16    rx: tokio::sync::mpsc::Receiver<Action<PublisherActor>>,
17
18    record_publisher: RecordPublisher,
19    gossip_receiver: GossipReceiver,
20    ticker: tokio::time::Interval,
21}
22
23impl Publisher {
24    pub fn new(record_publisher: RecordPublisher, gossip_receiver: GossipReceiver) -> Result<Self> {
25        let (api, rx) = Handle::channel(32);
26
27        tokio::spawn(async move {
28            let mut ticker = tokio::time::interval(Duration::from_secs(10));
29            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
30            let mut actor = PublisherActor {
31                rx,
32                record_publisher,
33                gossip_receiver,
34                ticker,
35            };
36            let _ = actor.run().await;
37        });
38
39        Ok(Self { _api: api })
40    }
41}
42
43impl Actor for PublisherActor {
44    async fn run(&mut self) -> Result<()> {
45        loop {
46            tokio::select! {
47                Some(action) = self.rx.recv() => {
48                    action(self).await;
49                }
50                _ = self.ticker.tick() => {
51                    let _ = self.publish().await;
52                    self.ticker.reset_after(Duration::from_secs(rand::random::<u64>() % 50));
53                }
54                _ = tokio::signal::ctrl_c() => break,
55            }
56        }
57        Ok(())
58    }
59}
60
61impl PublisherActor {
62    async fn publish(&mut self) -> Result<()> {
63        let unix_minute = crate::unix_minute(0);
64        let record = self.record_publisher.new_record(
65            unix_minute,
66            self.gossip_receiver
67                .neighbors()
68                .await
69                .iter()
70                .map(|n| n.public())
71                .collect(),
72            self.gossip_receiver.last_message_hashes().await,
73        );
74        self.record_publisher.publish_record(record).await
75    }
76}