distributed_topic_tracker/gossip/merge/
message_overlap.rs

1use actor_helper::{Action, Actor, Handle};
2use std::{collections::HashSet, time::Duration};
3
4use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
5use anyhow::Result;
6
7#[derive(Debug, Clone)]
8pub struct MessageOverlapMerge {
9    _api: Handle<MessageOverlapMergeActor>,
10}
11
12#[derive(Debug)]
13struct MessageOverlapMergeActor {
14    rx: tokio::sync::mpsc::Receiver<Action<MessageOverlapMergeActor>>,
15
16    record_publisher: RecordPublisher,
17    gossip_receiver: GossipReceiver,
18    gossip_sender: GossipSender,
19    ticker: tokio::time::Interval,
20}
21
22impl MessageOverlapMerge {
23    pub fn new(
24        record_publisher: RecordPublisher,
25        gossip_sender: GossipSender,
26        gossip_receiver: GossipReceiver,
27    ) -> Result<Self> {
28        let (api, rx) = Handle::channel(32);
29
30        let mut ticker = tokio::time::interval(Duration::from_secs(10));
31        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
32
33        tokio::spawn(async move {
34            let mut actor = MessageOverlapMergeActor {
35                rx,
36                record_publisher,
37                gossip_receiver,
38                gossip_sender,
39                ticker,
40            };
41            let _ = actor.run().await;
42        });
43
44        Ok(Self { _api: api })
45    }
46}
47
48impl Actor for MessageOverlapMergeActor {
49    async fn run(&mut self) -> Result<()> {
50        loop {
51            tokio::select! {
52                Some(action) = self.rx.recv() => {
53                    action(self).await;
54                }
55                _ = self.ticker.tick() => {
56                    let _ = self.merge().await;
57                    self.ticker.reset_after(Duration::from_secs(rand::random::<u64>() % 50));
58                }
59                _ = tokio::signal::ctrl_c() => break,
60            }
61        }
62        Ok(())
63    }
64}
65
66impl MessageOverlapMergeActor {
67    // Message overlap indicator
68    async fn merge(&mut self) -> Result<()> {
69        let unix_minute = crate::unix_minute(0);
70        let records = self.record_publisher.get_records(unix_minute).await;
71        if !self.gossip_receiver.last_message_hashes().await.is_empty() {
72            let last_message_hashes = self.gossip_receiver.last_message_hashes().await;
73            let peers_to_join = records
74                .iter()
75                .filter(|record| {
76                    if let Ok(content) = record.content::<GossipRecordContent>() {
77                        content.last_message_hashes.iter().any(|last_message_hash| {
78                            *last_message_hash != [0; 32]
79                                && last_message_hashes.contains(last_message_hash)
80                        })
81                    } else {
82                        false
83                    }
84                })
85                .collect::<Vec<_>>();
86            if !peers_to_join.is_empty() {
87                let node_ids = peers_to_join
88                    .iter()
89                    .flat_map(|&record| {
90                        let mut peers = vec![];
91                        if let Ok(node_id) = iroh::NodeId::from_bytes(&record.node_id()) {
92                            peers.push(node_id);
93                        }
94                        if let Ok(content) = record.content::<GossipRecordContent>() {
95                            for active_peer in content.active_peers {
96                                if active_peer == [0; 32] {
97                                    continue;
98                                }
99                                if let Ok(node_id) = iroh::NodeId::from_bytes(&active_peer) {
100                                    peers.push(node_id);
101                                }
102                            }
103                        }
104                        peers
105                    })
106                    .collect::<HashSet<_>>();
107
108                self.gossip_sender
109                    .join_peers(
110                        node_ids.iter().cloned().collect::<Vec<_>>(),
111                        Some(super::MAX_JOIN_PEERS_COUNT),
112                    )
113                    .await?;
114            }
115        }
116        Ok(())
117    }
118}