distributed_topic_tracker/gossip/merge/
message_overlap.rs1use std::{collections::HashSet, time::Duration};
2use actor_helper::{Action, Actor, Handle};
3
4use crate::{
5 GossipReceiver, GossipSender, RecordPublisher,
6};
7use anyhow::Result;
8
9#[derive(Debug, Clone)]
10pub struct MessageOverlapMerge {
11 _api: Handle<MessageOverlapMergeActor>,
12}
13
14#[derive(Debug)]
15struct MessageOverlapMergeActor {
16 rx: tokio::sync::mpsc::Receiver<Action<MessageOverlapMergeActor>>,
17
18 record_publisher: RecordPublisher,
19 gossip_receiver: GossipReceiver,
20 gossip_sender: GossipSender,
21 ticker: tokio::time::Interval,
22}
23
24impl MessageOverlapMerge {
25 pub fn new(
26 record_publisher: RecordPublisher,
27 gossip_sender: GossipSender,
28 gossip_receiver: GossipReceiver,
29 ) -> Result<Self> {
30 let (api, rx) = Handle::channel(32);
31
32 let mut ticker = tokio::time::interval(Duration::from_secs(10));
33 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
34
35 tokio::spawn(async move {
36 let mut actor = MessageOverlapMergeActor {
37 rx,
38 record_publisher,
39 gossip_receiver,
40 gossip_sender,
41 ticker,
42 };
43 let _ = actor.run().await;
44 });
45
46 Ok(Self { _api: api })
47 }
48}
49
50impl Actor for MessageOverlapMergeActor {
51 async fn run(&mut self) -> Result<()> {
52 loop {
53 tokio::select! {
54 Some(action) = self.rx.recv() => {
55 action(self).await;
56 }
57 _ = self.ticker.tick() => {
58 let _ = self.merge().await;
59 self.ticker.reset_after(Duration::from_secs(rand::random::<u64>() % 50));
60 }
61 _ = tokio::signal::ctrl_c() => break,
62 }
63 }
64 Ok(())
65 }
66}
67
68impl MessageOverlapMergeActor {
69 async fn merge(&mut self) -> Result<()> {
71 let unix_minute = crate::unix_minute(0);
72 let records = self.record_publisher.get_records(unix_minute).await;
73 if !self.gossip_receiver.last_message_hashes().await.is_empty() {
74 let last_message_hashes = self.gossip_receiver.last_message_hashes().await;
75 let peers_to_join = records
76 .iter()
77 .filter(|record| {
78 !record
79 .last_message_hashes()
80 .iter()
81 .all(|last_message_hash| {
82 *last_message_hash != [0; 32]
83 && last_message_hashes.contains(last_message_hash)
84 })
85 })
86 .collect::<Vec<_>>();
87 if !peers_to_join.is_empty() {
88 let node_ids = peers_to_join
89 .iter()
90 .flat_map(|&record| {
91 let mut peers = vec![];
92 if let Ok(node_id) = iroh::NodeId::from_bytes(&record.node_id()) {
93 peers.push(node_id);
94 }
95 for active_peer in record.active_peers() {
96 if active_peer == [0; 32] {
97 continue;
98 }
99 if let Ok(node_id) = iroh::NodeId::from_bytes(&active_peer) {
100 peers.push(node_id);
101 }
102 }
103 peers
104 })
105 .collect::<HashSet<_>>();
106
107 self.gossip_sender
108 .join_peers(
109 node_ids.iter().cloned().collect::<Vec<_>>(),
110 Some(super::MAX_JOIN_PEERS_COUNT),
111 )
112 .await?;
113 }
114 }
115 Ok(())
116 }
117}