distributed_topic_tracker/gossip/topic/
publisher.rs1use std::time::Duration;
2use actor_helper::{Action, Actor, Handle};
3
4use crate::{
5 GossipReceiver, RecordPublisher,
6};
7use anyhow::Result;
8
9#[derive(Debug,Clone)]
10pub struct Publisher {
11 _api: Handle<PublisherActor>,
12}
13
14#[derive(Debug)]
15struct PublisherActor {
16 rx: tokio::sync::mpsc::Receiver<Action<PublisherActor>>,
17
18 record_publisher: RecordPublisher,
19 gossip_receiver: GossipReceiver,
20 ticker: tokio::time::Interval,
21}
22
23impl Publisher {
24 pub fn new(record_publisher: RecordPublisher, gossip_receiver: GossipReceiver) -> Result<Self> {
25 let (api, rx) = Handle::channel(32);
26
27 tokio::spawn(async move {
28 let mut ticker = tokio::time::interval(Duration::from_secs(10));
29 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
30 let mut actor = PublisherActor {
31 rx,
32 record_publisher,
33 gossip_receiver,
34 ticker,
35 };
36 let _ = actor.run().await;
37 });
38
39 Ok(Self { _api: api })
40 }
41}
42
43impl Actor for PublisherActor {
44 async fn run(&mut self) -> Result<()> {
45 loop {
46 tokio::select! {
47 Some(action) = self.rx.recv() => {
48 action(self).await;
49 }
50 _ = self.ticker.tick() => {
51 let _ = self.publish().await;
52 self.ticker.reset_after(Duration::from_secs(rand::random::<u64>() % 50));
53 }
54 _ = tokio::signal::ctrl_c() => break,
55 }
56 }
57 Ok(())
58 }
59}
60
61impl PublisherActor {
62 async fn publish(&mut self) -> Result<()> {
63 let unix_minute = crate::unix_minute(0);
64 let record = self.record_publisher.new_record(
65 unix_minute,
66 self.gossip_receiver
67 .neighbors()
68 .await
69 .iter()
70 .map(|n| n.public())
71 .collect(),
72 self.gossip_receiver.last_message_hashes().await,
73 );
74 self.record_publisher.publish_record(record).await
75 }
76}