distributed_topic_tracker/gossip/merge/
message_overlap.rs1use actor_helper::{Action, Handle, Receiver};
4use iroh::EndpointId;
5use std::{collections::HashSet, time::Duration};
6
7use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
8use anyhow::Result;
9
10#[derive(Debug, Clone)]
14pub struct MessageOverlapMerge {
15 _api: Handle<MessageOverlapMergeActor, anyhow::Error>,
16}
17
18#[derive(Debug)]
19struct MessageOverlapMergeActor {
20 record_publisher: RecordPublisher,
21 gossip_receiver: GossipReceiver,
22 gossip_sender: GossipSender,
23 ticker: tokio::time::Interval,
24 cancel_token: tokio_util::sync::CancellationToken,
25
26 max_join_peers: usize,
27 base_interval: Duration,
28 max_jitter: Duration,
29}
30
31impl MessageOverlapMerge {
32 pub fn new(
34 record_publisher: RecordPublisher,
35 gossip_sender: GossipSender,
36 gossip_receiver: GossipReceiver,
37 cancel_token: tokio_util::sync::CancellationToken,
38 max_join_peers: usize,
39 base_interval: Duration,
40 max_jitter: Duration,
41 ) -> Result<Self> {
42 let base_interval = base_interval.max(Duration::from_secs(1));
43 let mut ticker = tokio::time::interval(base_interval);
44 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
45
46 let api = Handle::spawn_with(
47 MessageOverlapMergeActor {
48 record_publisher,
49 gossip_receiver,
50 gossip_sender,
51 ticker,
52 cancel_token,
53 max_join_peers,
54 base_interval,
55 max_jitter,
56 },
57 |mut actor, rx| async move { actor.run(rx).await },
58 )
59 .0;
60 Ok(Self { _api: api })
61 }
62}
63
64impl MessageOverlapMergeActor {
65 async fn run(&mut self, rx: Receiver<Action<MessageOverlapMergeActor>>) -> Result<()> {
66 tracing::debug!("MessageOverlapMerge: starting message overlap merge actor");
67 loop {
68 tokio::select! {
69 result = rx.recv_async() => {
70 match result {
71 Ok(action) => action(self).await,
72 Err(_) => break Ok(()),
73 }
74 }
75 _ = self.ticker.tick() => {
76 tracing::debug!("MessageOverlapMerge: tick fired, checking for split-brain");
77 if let Err(e) = self.merge().await {
78 tracing::warn!("MessageOverlapMerge: error during merge: {:?}", e);
79 }
80 let jitter = if self.max_jitter > Duration::ZERO {
81 Duration::from_nanos((rand::random::<u128>() % self.max_jitter.as_nanos()) as u64)
82 } else {
83 Duration::ZERO
84 };
85 let next_interval = self.base_interval + jitter;
86 tracing::debug!("MessageOverlapMerge: next check in {}ms", next_interval.as_millis());
87 self.ticker.reset_after(next_interval);
88 }
89 _ = self.cancel_token.cancelled() => {
90 break Ok(());
91 }
92 else => break Ok(()),
93 }
94 }
95 }
96}
97
98impl MessageOverlapMergeActor {
99 async fn merge(&mut self) -> Result<()> {
100 let unix_minute = crate::unix_minute(0);
101 let mut records = self
102 .record_publisher
103 .get_records(unix_minute - 1, self.cancel_token.clone())
104 .await?;
105 records.extend(
106 self.record_publisher
107 .get_records(unix_minute, self.cancel_token.clone())
108 .await?,
109 );
110
111 let local_hashes = self.gossip_receiver.last_message_hashes().await?;
112 tracing::debug!(
113 "MessageOverlapMerge: checking {} records with {} local message hashes",
114 records.len(),
115 local_hashes.len()
116 );
117
118 if !local_hashes.is_empty() {
119 let last_message_hashes = local_hashes;
120 let peers_to_join = records
121 .iter()
122 .filter(|record| {
123 if let Ok(content) = record.content::<GossipRecordContent>() {
124 let remote_hashes = content
125 .last_message_hashes
126 .iter()
127 .filter(|last_message_hash| **last_message_hash != [0; 32])
128 .collect::<Vec<_>>();
129
130 !remote_hashes.is_empty()
131 && remote_hashes.iter().all(|last_message_hash| {
132 !last_message_hashes.contains(*last_message_hash)
133 })
134 } else {
135 false
136 }
137 })
138 .collect::<Vec<_>>();
139
140 tracing::debug!(
141 "MessageOverlapMerge: found {} peers with no overlapping message hashes",
142 peers_to_join.len()
143 );
144
145 if !peers_to_join.is_empty() {
146 let active_neighbors = self.gossip_receiver.neighbors().await?;
147 let self_pub_key = EndpointId::from_verifying_key(self.record_publisher.pub_key());
148 let pub_keys = peers_to_join
149 .iter()
150 .flat_map(|&record| {
151 let mut peers = vec![];
152 if let Ok(pub_key) = EndpointId::from_bytes(&record.pub_key())
153 && pub_key != self_pub_key
154 {
155 peers.push(pub_key);
156 }
157 if let Ok(content) = record.content::<GossipRecordContent>() {
158 for active_peer in content.active_peers {
159 if active_peer == [0; 32] {
160 continue;
161 }
162 if let Ok(pub_key) = EndpointId::from_bytes(&active_peer)
163 && pub_key != self_pub_key
164 {
165 peers.push(pub_key);
166 }
167 }
168 }
169 peers
170 })
171 .filter(|pub_key| !active_neighbors.contains(pub_key))
172 .collect::<HashSet<_>>();
173
174 if !pub_keys.is_empty() {
175 tracing::debug!(
176 "MessageOverlapMerge: attempting to join {} pub_keys with no overlapping messages",
177 pub_keys.len()
178 );
179
180 self.gossip_sender
181 .join_peers(
182 pub_keys.iter().cloned().collect::<Vec<_>>(),
183 Some(self.max_join_peers),
184 )
185 .await?;
186
187 tracing::debug!(
188 "MessageOverlapMerge: join_peers request sent for split-brain recovery"
189 );
190 }
191 }
192 } else {
193 tracing::debug!(
194 "MessageOverlapMerge: no local message hashes yet, skipping overlap detection"
195 );
196 }
197 Ok(())
198 }
199}