distributed_topic_tracker/gossip/merge/
message_overlap.rs1use actor_helper::{Action, Actor, Handle, Receiver};
4use std::{collections::HashSet, time::Duration};
5
6use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
7use anyhow::Result;
8
9#[derive(Debug, Clone)]
13pub struct MessageOverlapMerge {
14 _api: Handle<MessageOverlapMergeActor, anyhow::Error>,
15}
16
17#[derive(Debug)]
18struct MessageOverlapMergeActor {
19 rx: Receiver<Action<MessageOverlapMergeActor>>,
20
21 record_publisher: RecordPublisher,
22 gossip_receiver: GossipReceiver,
23 gossip_sender: GossipSender,
24 ticker: tokio::time::Interval,
25}
26
27impl MessageOverlapMerge {
28 pub fn new(
30 record_publisher: RecordPublisher,
31 gossip_sender: GossipSender,
32 gossip_receiver: GossipReceiver,
33 ) -> Result<Self> {
34 let (api, rx) = Handle::channel();
35
36 let mut ticker = tokio::time::interval(Duration::from_secs(10));
37 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
38
39 tokio::spawn(async move {
40 let mut actor = MessageOverlapMergeActor {
41 rx,
42 record_publisher,
43 gossip_receiver,
44 gossip_sender,
45 ticker,
46 };
47 let _ = actor.run().await;
48 });
49
50 Ok(Self { _api: api })
51 }
52}
53
54impl Actor<anyhow::Error> for MessageOverlapMergeActor {
55 async fn run(&mut self) -> Result<()> {
56 tracing::debug!("MessageOverlapMerge: starting message overlap merge actor");
57 loop {
58 tokio::select! {
59 Ok(action) = self.rx.recv_async() => {
60 action(self).await;
61 }
62 _ = self.ticker.tick() => {
63 tracing::debug!("MessageOverlapMerge: tick fired, checking for split-brain");
64 let _ = self.merge().await;
65 let next_interval = rand::random::<u64>() % 50;
66 tracing::debug!("MessageOverlapMerge: next check in {}s", next_interval);
67 self.ticker.reset_after(Duration::from_secs(next_interval));
68 }
69 _ = tokio::signal::ctrl_c() => break,
70 }
71 }
72 Ok(())
73 }
74}
75
76impl MessageOverlapMergeActor {
77 async fn merge(&mut self) -> Result<()> {
78 let unix_minute = crate::unix_minute(0);
79 let mut records = self.record_publisher.get_records(unix_minute-1).await;
80 records.extend(self.record_publisher.get_records(unix_minute).await);
81
82 let local_hashes = self.gossip_receiver.last_message_hashes().await;
83 tracing::debug!("MessageOverlapMerge: checking {} records with {} local message hashes", records.len(), local_hashes.len());
84
85 if !local_hashes.is_empty() {
86 let last_message_hashes = local_hashes;
87 let peers_to_join = records
88 .iter()
89 .filter(|record| {
90 if let Ok(content) = record.content::<GossipRecordContent>() {
91 content.last_message_hashes.iter().any(|last_message_hash| {
92 *last_message_hash != [0; 32]
93 && last_message_hashes.contains(last_message_hash)
94 })
95 } else {
96 false
97 }
98 })
99 .collect::<Vec<_>>();
100
101 tracing::debug!("MessageOverlapMerge: found {} peers with overlapping message hashes", peers_to_join.len());
102
103 if !peers_to_join.is_empty() {
104 let node_ids = peers_to_join
105 .iter()
106 .flat_map(|&record| {
107 let mut peers = vec![];
108 if let Ok(node_id) = iroh::NodeId::from_bytes(&record.node_id()) {
109 peers.push(node_id);
110 }
111 if let Ok(content) = record.content::<GossipRecordContent>() {
112 for active_peer in content.active_peers {
113 if active_peer == [0; 32] {
114 continue;
115 }
116 if let Ok(node_id) = iroh::NodeId::from_bytes(&active_peer) {
117 peers.push(node_id);
118 }
119 }
120 }
121 peers
122 })
123 .collect::<HashSet<_>>();
124
125 tracing::debug!("MessageOverlapMerge: attempting to join {} node_ids with overlapping messages", node_ids.len());
126
127 self.gossip_sender
128 .join_peers(
129 node_ids.iter().cloned().collect::<Vec<_>>(),
130 Some(super::MAX_JOIN_PEERS_COUNT),
131 )
132 .await?;
133
134 tracing::debug!("MessageOverlapMerge: join_peers request sent for split-brain recovery");
135 }
136 } else {
137 tracing::debug!("MessageOverlapMerge: no local message hashes yet, skipping overlap detection");
138 }
139 Ok(())
140 }
141}