distributed_topic_tracker/gossip/merge/
message_overlap.rs1use actor_helper::{Action, Actor, Handle, Receiver};
4use iroh::EndpointId;
5use std::{collections::HashSet, time::Duration};
6
7use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
8use anyhow::Result;
9
10#[derive(Debug, Clone)]
14pub struct MessageOverlapMerge {
15 _api: Handle<MessageOverlapMergeActor, anyhow::Error>,
16}
17
18#[derive(Debug)]
19struct MessageOverlapMergeActor {
20 rx: Receiver<Action<MessageOverlapMergeActor>>,
21
22 record_publisher: RecordPublisher,
23 gossip_receiver: GossipReceiver,
24 gossip_sender: GossipSender,
25 ticker: tokio::time::Interval,
26}
27
28impl MessageOverlapMerge {
29 pub fn new(
31 record_publisher: RecordPublisher,
32 gossip_sender: GossipSender,
33 gossip_receiver: GossipReceiver,
34 ) -> Result<Self> {
35 let (api, rx) = Handle::channel();
36
37 let mut ticker = tokio::time::interval(Duration::from_secs(10));
38 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
39
40 tokio::spawn(async move {
41 let mut actor = MessageOverlapMergeActor {
42 rx,
43 record_publisher,
44 gossip_receiver,
45 gossip_sender,
46 ticker,
47 };
48 let _ = actor.run().await;
49 });
50
51 Ok(Self { _api: api })
52 }
53}
54
55impl Actor<anyhow::Error> for MessageOverlapMergeActor {
56 async fn run(&mut self) -> Result<()> {
57 tracing::debug!("MessageOverlapMerge: starting message overlap merge actor");
58 loop {
59 tokio::select! {
60 Ok(action) = self.rx.recv_async() => {
61 action(self).await;
62 }
63 _ = self.ticker.tick() => {
64 tracing::debug!("MessageOverlapMerge: tick fired, checking for split-brain");
65 let _ = self.merge().await;
66 let next_interval = rand::random::<u64>() % 50;
67 tracing::debug!("MessageOverlapMerge: next check in {}s", next_interval);
68 self.ticker.reset_after(Duration::from_secs(next_interval));
69 }
70 else => break Ok(()),
71 }
72 }
73 }
74}
75
76impl MessageOverlapMergeActor {
77 async fn merge(&mut self) -> Result<()> {
78 let unix_minute = crate::unix_minute(0);
79 let mut records = self.record_publisher.get_records(unix_minute - 1).await;
80 records.extend(self.record_publisher.get_records(unix_minute).await);
81
82 let local_hashes = self.gossip_receiver.last_message_hashes().await;
83 tracing::debug!(
84 "MessageOverlapMerge: checking {} records with {} local message hashes",
85 records.len(),
86 local_hashes.len()
87 );
88
89 if !local_hashes.is_empty() {
90 let last_message_hashes = local_hashes;
91 let peers_to_join = records
92 .iter()
93 .filter(|record| {
94 if let Ok(content) = record.content::<GossipRecordContent>() {
95 content.last_message_hashes.iter().any(|last_message_hash| {
96 *last_message_hash != [0; 32]
97 && last_message_hashes.contains(last_message_hash)
98 })
99 } else {
100 false
101 }
102 })
103 .collect::<Vec<_>>();
104
105 tracing::debug!(
106 "MessageOverlapMerge: found {} peers with overlapping message hashes",
107 peers_to_join.len()
108 );
109
110 if !peers_to_join.is_empty() {
111 let node_ids = peers_to_join
112 .iter()
113 .flat_map(|&record| {
114 let mut peers = vec![];
115 if let Ok(node_id) = EndpointId::from_bytes(&record.node_id()) {
116 peers.push(node_id);
117 }
118 if let Ok(content) = record.content::<GossipRecordContent>() {
119 for active_peer in content.active_peers {
120 if active_peer == [0; 32] {
121 continue;
122 }
123 if let Ok(node_id) = EndpointId::from_bytes(&active_peer) {
124 peers.push(node_id);
125 }
126 }
127 }
128 peers
129 })
130 .collect::<HashSet<_>>();
131
132 tracing::debug!(
133 "MessageOverlapMerge: attempting to join {} node_ids with overlapping messages",
134 node_ids.len()
135 );
136
137 self.gossip_sender
138 .join_peers(
139 node_ids.iter().cloned().collect::<Vec<_>>(),
140 Some(super::MAX_JOIN_PEERS_COUNT),
141 )
142 .await?;
143
144 tracing::debug!(
145 "MessageOverlapMerge: join_peers request sent for split-brain recovery"
146 );
147 }
148 } else {
149 tracing::debug!(
150 "MessageOverlapMerge: no local message hashes yet, skipping overlap detection"
151 );
152 }
153 Ok(())
154 }
155}