distributed_topic_tracker/gossip/merge/
message_overlap.rs

1use std::{collections::HashSet, time::Duration};
2use actor_helper::{Action, Actor, Handle};
3
4use crate::{
5    GossipReceiver, GossipSender, RecordPublisher,
6};
7use anyhow::Result;
8
9#[derive(Debug, Clone)]
10pub struct MessageOverlapMerge {
11    _api: Handle<MessageOverlapMergeActor>,
12}
13
14#[derive(Debug)]
15struct MessageOverlapMergeActor {
16    rx: tokio::sync::mpsc::Receiver<Action<MessageOverlapMergeActor>>,
17
18    record_publisher: RecordPublisher,
19    gossip_receiver: GossipReceiver,
20    gossip_sender: GossipSender,
21    ticker: tokio::time::Interval,
22}
23
24impl MessageOverlapMerge {
25    pub fn new(
26        record_publisher: RecordPublisher,
27        gossip_sender: GossipSender,
28        gossip_receiver: GossipReceiver,
29    ) -> Result<Self> {
30        let (api, rx) = Handle::channel(32);
31
32        let mut ticker = tokio::time::interval(Duration::from_secs(10));
33        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
34
35        tokio::spawn(async move {
36            let mut actor = MessageOverlapMergeActor {
37                rx,
38                record_publisher,
39                gossip_receiver,
40                gossip_sender,
41                ticker,
42            };
43            let _ = actor.run().await;
44        });
45
46        Ok(Self { _api: api })
47    }
48}
49
50impl Actor for MessageOverlapMergeActor {
51    async fn run(&mut self) -> Result<()> {
52        loop {
53            tokio::select! {
54                Some(action) = self.rx.recv() => {
55                    action(self).await;
56                }
57                _ = self.ticker.tick() => {
58                    let _ = self.merge().await;
59                    self.ticker.reset_after(Duration::from_secs(rand::random::<u64>() % 50));
60                }
61                _ = tokio::signal::ctrl_c() => break,
62            }
63        }
64        Ok(())
65    }
66}
67
68impl MessageOverlapMergeActor {
69    // Message overlap indicator
70    async fn merge(&mut self) -> Result<()> {
71        let unix_minute = crate::unix_minute(0);
72        let records = self.record_publisher.get_records(unix_minute).await;
73        if !self.gossip_receiver.last_message_hashes().await.is_empty() {
74            let last_message_hashes = self.gossip_receiver.last_message_hashes().await;
75            let peers_to_join = records
76                .iter()
77                .filter(|record| {
78                    !record
79                        .last_message_hashes()
80                        .iter()
81                        .all(|last_message_hash| {
82                            *last_message_hash != [0; 32]
83                                && last_message_hashes.contains(last_message_hash)
84                        })
85                })
86                .collect::<Vec<_>>();
87            if !peers_to_join.is_empty() {
88                let node_ids = peers_to_join
89                    .iter()
90                    .flat_map(|&record| {
91                        let mut peers = vec![];
92                        if let Ok(node_id) = iroh::NodeId::from_bytes(&record.node_id()) {
93                            peers.push(node_id);
94                        }
95                        for active_peer in record.active_peers() {
96                            if active_peer == [0; 32] {
97                                continue;
98                            }
99                            if let Ok(node_id) = iroh::NodeId::from_bytes(&active_peer) {
100                                peers.push(node_id);
101                            }
102                        }
103                        peers
104                    })
105                    .collect::<HashSet<_>>();
106
107                self.gossip_sender
108                    .join_peers(
109                        node_ids.iter().cloned().collect::<Vec<_>>(),
110                        Some(super::MAX_JOIN_PEERS_COUNT),
111                    )
112                    .await?;
113            }
114        }
115        Ok(())
116    }
117}