distributed_topic_tracker/gossip/merge/
message_overlap.rs1use actor_helper::{Action, Handle, Receiver};
4use iroh::EndpointId;
5use std::{collections::HashSet, time::Duration};
6
7use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
8use anyhow::Result;
9
10#[derive(Debug, Clone)]
14pub struct MessageOverlapMerge {
15 _api: Handle<MessageOverlapMergeActor, anyhow::Error>,
16}
17
18#[derive(Debug)]
19struct MessageOverlapMergeActor {
20 record_publisher: RecordPublisher,
21 gossip_receiver: GossipReceiver,
22 gossip_sender: GossipSender,
23 ticker: tokio::time::Interval,
24 cancel_token: tokio_util::sync::CancellationToken,
25
26 max_join_peers: usize,
27 base_interval: Duration,
28 max_jitter: Duration,
29}
30
31impl MessageOverlapMerge {
32 #[allow(clippy::too_many_arguments)]
34 pub fn new(
35 record_publisher: RecordPublisher,
36 gossip_sender: GossipSender,
37 gossip_receiver: GossipReceiver,
38 cancel_token: tokio_util::sync::CancellationToken,
39 max_join_peers: usize,
40 initial_interval: Duration,
41 base_interval: Duration,
42 max_jitter: Duration,
43 ) -> Result<Self> {
44 let base_interval = base_interval.max(Duration::from_secs(1));
45 let mut ticker = tokio::time::interval_at(tokio::time::Instant::now() + initial_interval, base_interval);
46 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
47
48 let api = Handle::spawn_with(
49 MessageOverlapMergeActor {
50 record_publisher,
51 gossip_receiver,
52 gossip_sender,
53 ticker,
54 cancel_token,
55 max_join_peers,
56 base_interval,
57 max_jitter,
58 },
59 |mut actor, rx| async move { actor.run(rx).await },
60 )
61 .0;
62 Ok(Self { _api: api })
63 }
64}
65
66impl MessageOverlapMergeActor {
67 async fn run(&mut self, rx: Receiver<Action<MessageOverlapMergeActor>>) -> Result<()> {
68 tracing::debug!("MessageOverlapMerge: starting message overlap merge actor");
69 loop {
70 tokio::select! {
71 result = rx.recv_async() => {
72 match result {
73 Ok(action) => action(self).await,
74 Err(_) => break Ok(()),
75 }
76 }
77 _ = self.ticker.tick() => {
78 tracing::debug!("MessageOverlapMerge: tick fired, checking for split-brain");
79 if let Err(e) = self.merge().await {
80 tracing::warn!("MessageOverlapMerge: error during merge: {:?}", e);
81 }
82 let jitter = if self.max_jitter > Duration::ZERO {
83 Duration::from_nanos((rand::random::<u128>() % self.max_jitter.as_nanos()) as u64)
84 } else {
85 Duration::ZERO
86 };
87 let next_interval = self.base_interval + jitter;
88 tracing::debug!("MessageOverlapMerge: next check in {}ms", next_interval.as_millis());
89 self.ticker.reset_after(next_interval);
90 }
91 _ = self.cancel_token.cancelled() => {
92 break Ok(());
93 }
94 else => break Ok(()),
95 }
96 }
97 }
98}
99
100impl MessageOverlapMergeActor {
101 async fn merge(&mut self) -> Result<()> {
102 let unix_minute = crate::unix_minute(0);
103 let mut records = self
104 .record_publisher
105 .get_records(unix_minute - 1, self.cancel_token.clone())
106 .await?;
107 records.extend(
108 self.record_publisher
109 .get_records(unix_minute, self.cancel_token.clone())
110 .await?,
111 );
112
113 let local_hashes = self.gossip_receiver.last_message_hashes().await?;
114 tracing::debug!(
115 "MessageOverlapMerge: checking {} records with {} local message hashes",
116 records.len(),
117 local_hashes.len()
118 );
119
120 if !local_hashes.is_empty() {
121 let last_message_hashes = local_hashes;
122 let peers_to_join = records
123 .iter()
124 .filter(|record| {
125 if let Ok(content) = record.content::<GossipRecordContent>() {
126 let remote_hashes = content
127 .last_message_hashes
128 .iter()
129 .filter(|last_message_hash| **last_message_hash != [0; 32])
130 .collect::<Vec<_>>();
131
132 !remote_hashes.is_empty()
133 && remote_hashes.iter().all(|last_message_hash| {
134 !last_message_hashes.contains(*last_message_hash)
135 })
136 } else {
137 false
138 }
139 })
140 .collect::<Vec<_>>();
141
142 tracing::debug!(
143 "MessageOverlapMerge: found {} peers with no overlapping message hashes",
144 peers_to_join.len()
145 );
146
147 if !peers_to_join.is_empty() {
148 let active_neighbors = self.gossip_receiver.neighbors().await?;
149 let self_pub_key = EndpointId::from_verifying_key(self.record_publisher.pub_key());
150 let pub_keys = peers_to_join
151 .iter()
152 .flat_map(|&record| {
153 let mut peers = vec![];
154 if let Ok(pub_key) = EndpointId::from_bytes(&record.pub_key())
155 && pub_key != self_pub_key
156 {
157 peers.push(pub_key);
158 }
159 if let Ok(content) = record.content::<GossipRecordContent>() {
160 for active_peer in content.active_peers {
161 if active_peer == [0; 32] {
162 continue;
163 }
164 if let Ok(pub_key) = EndpointId::from_bytes(&active_peer)
165 && pub_key != self_pub_key
166 {
167 peers.push(pub_key);
168 }
169 }
170 }
171 peers
172 })
173 .filter(|pub_key| !active_neighbors.contains(pub_key))
174 .collect::<HashSet<_>>();
175
176 if !pub_keys.is_empty() {
177 tracing::debug!(
178 "MessageOverlapMerge: attempting to join {} pub_keys with no overlapping messages",
179 pub_keys.len()
180 );
181
182 self.gossip_sender
183 .join_peers(
184 pub_keys.iter().cloned().collect::<Vec<_>>(),
185 Some(self.max_join_peers),
186 )
187 .await?;
188
189 tracing::debug!(
190 "MessageOverlapMerge: join_peers request sent for split-brain recovery"
191 );
192 }
193 }
194 } else {
195 tracing::debug!(
196 "MessageOverlapMerge: no local message hashes yet, skipping overlap detection"
197 );
198 }
199 Ok(())
200 }
201}