distributed_topic_tracker/gossip/merge/
message_overlap.rs1use actor_helper::{Action, Actor, Handle};
2use std::{collections::HashSet, time::Duration};
3
4use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
5use anyhow::Result;
6
7#[derive(Debug, Clone)]
8pub struct MessageOverlapMerge {
9 _api: Handle<MessageOverlapMergeActor>,
10}
11
12#[derive(Debug)]
13struct MessageOverlapMergeActor {
14 rx: tokio::sync::mpsc::Receiver<Action<MessageOverlapMergeActor>>,
15
16 record_publisher: RecordPublisher,
17 gossip_receiver: GossipReceiver,
18 gossip_sender: GossipSender,
19 ticker: tokio::time::Interval,
20}
21
22impl MessageOverlapMerge {
23 pub fn new(
24 record_publisher: RecordPublisher,
25 gossip_sender: GossipSender,
26 gossip_receiver: GossipReceiver,
27 ) -> Result<Self> {
28 let (api, rx) = Handle::channel(32);
29
30 let mut ticker = tokio::time::interval(Duration::from_secs(10));
31 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
32
33 tokio::spawn(async move {
34 let mut actor = MessageOverlapMergeActor {
35 rx,
36 record_publisher,
37 gossip_receiver,
38 gossip_sender,
39 ticker,
40 };
41 let _ = actor.run().await;
42 });
43
44 Ok(Self { _api: api })
45 }
46}
47
48impl Actor for MessageOverlapMergeActor {
49 async fn run(&mut self) -> Result<()> {
50 loop {
51 tokio::select! {
52 Some(action) = self.rx.recv() => {
53 action(self).await;
54 }
55 _ = self.ticker.tick() => {
56 let _ = self.merge().await;
57 self.ticker.reset_after(Duration::from_secs(rand::random::<u64>() % 50));
58 }
59 _ = tokio::signal::ctrl_c() => break,
60 }
61 }
62 Ok(())
63 }
64}
65
66impl MessageOverlapMergeActor {
67 async fn merge(&mut self) -> Result<()> {
69 let unix_minute = crate::unix_minute(0);
70 let records = self.record_publisher.get_records(unix_minute).await;
71 if !self.gossip_receiver.last_message_hashes().await.is_empty() {
72 let last_message_hashes = self.gossip_receiver.last_message_hashes().await;
73 let peers_to_join = records
74 .iter()
75 .filter(|record| {
76 if let Ok(content) = record.content::<GossipRecordContent>() {
77 content.last_message_hashes.iter().any(|last_message_hash| {
78 *last_message_hash != [0; 32]
79 && last_message_hashes.contains(last_message_hash)
80 })
81 } else {
82 false
83 }
84 })
85 .collect::<Vec<_>>();
86 if !peers_to_join.is_empty() {
87 let node_ids = peers_to_join
88 .iter()
89 .flat_map(|&record| {
90 let mut peers = vec![];
91 if let Ok(node_id) = iroh::NodeId::from_bytes(&record.node_id()) {
92 peers.push(node_id);
93 }
94 if let Ok(content) = record.content::<GossipRecordContent>() {
95 for active_peer in content.active_peers {
96 if active_peer == [0; 32] {
97 continue;
98 }
99 if let Ok(node_id) = iroh::NodeId::from_bytes(&active_peer) {
100 peers.push(node_id);
101 }
102 }
103 }
104 peers
105 })
106 .collect::<HashSet<_>>();
107
108 self.gossip_sender
109 .join_peers(
110 node_ids.iter().cloned().collect::<Vec<_>>(),
111 Some(super::MAX_JOIN_PEERS_COUNT),
112 )
113 .await?;
114 }
115 }
116 Ok(())
117 }
118}