distributed_topic_tracker/gossip/merge/
message_overlap.rs1use actor_helper::{Action, Actor, Handle, Receiver};
4use iroh::EndpointId;
5use std::{collections::HashSet, time::Duration};
6
7use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
8use anyhow::Result;
9
10#[derive(Debug, Clone)]
14pub struct MessageOverlapMerge {
15 _api: Handle<MessageOverlapMergeActor, anyhow::Error>,
16}
17
18#[derive(Debug)]
19struct MessageOverlapMergeActor {
20 rx: Receiver<Action<MessageOverlapMergeActor>>,
21
22 record_publisher: RecordPublisher,
23 gossip_receiver: GossipReceiver,
24 gossip_sender: GossipSender,
25 ticker: tokio::time::Interval,
26}
27
28impl MessageOverlapMerge {
29 pub fn new(
31 record_publisher: RecordPublisher,
32 gossip_sender: GossipSender,
33 gossip_receiver: GossipReceiver,
34 ) -> Result<Self> {
35 let (api, rx) = Handle::channel();
36
37 let mut ticker = tokio::time::interval(Duration::from_secs(10));
38 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
39
40 tokio::spawn(async move {
41 let mut actor = MessageOverlapMergeActor {
42 rx,
43 record_publisher,
44 gossip_receiver,
45 gossip_sender,
46 ticker,
47 };
48 let _ = actor.run().await;
49 });
50
51 Ok(Self { _api: api })
52 }
53}
54
55impl Actor<anyhow::Error> for MessageOverlapMergeActor {
56 async fn run(&mut self) -> Result<()> {
57 tracing::debug!("MessageOverlapMerge: starting message overlap merge actor");
58 loop {
59 tokio::select! {
60 Ok(action) = self.rx.recv_async() => {
61 action(self).await;
62 }
63 _ = self.ticker.tick() => {
64 tracing::debug!("MessageOverlapMerge: tick fired, checking for split-brain");
65 let _ = self.merge().await;
66 let next_interval = rand::random::<u64>() % 50;
67 tracing::debug!("MessageOverlapMerge: next check in {}s", next_interval);
68 self.ticker.reset_after(Duration::from_secs(next_interval));
69 }
70 _ = tokio::signal::ctrl_c() => break,
71 }
72 }
73 Ok(())
74 }
75}
76
77impl MessageOverlapMergeActor {
78 async fn merge(&mut self) -> Result<()> {
79 let unix_minute = crate::unix_minute(0);
80 let mut records = self.record_publisher.get_records(unix_minute - 1).await;
81 records.extend(self.record_publisher.get_records(unix_minute).await);
82
83 let local_hashes = self.gossip_receiver.last_message_hashes().await;
84 tracing::debug!(
85 "MessageOverlapMerge: checking {} records with {} local message hashes",
86 records.len(),
87 local_hashes.len()
88 );
89
90 if !local_hashes.is_empty() {
91 let last_message_hashes = local_hashes;
92 let peers_to_join = records
93 .iter()
94 .filter(|record| {
95 if let Ok(content) = record.content::<GossipRecordContent>() {
96 content.last_message_hashes.iter().any(|last_message_hash| {
97 *last_message_hash != [0; 32]
98 && last_message_hashes.contains(last_message_hash)
99 })
100 } else {
101 false
102 }
103 })
104 .collect::<Vec<_>>();
105
106 tracing::debug!(
107 "MessageOverlapMerge: found {} peers with overlapping message hashes",
108 peers_to_join.len()
109 );
110
111 if !peers_to_join.is_empty() {
112 let node_ids = peers_to_join
113 .iter()
114 .flat_map(|&record| {
115 let mut peers = vec![];
116 if let Ok(node_id) = EndpointId::from_bytes(&record.node_id()) {
117 peers.push(node_id);
118 }
119 if let Ok(content) = record.content::<GossipRecordContent>() {
120 for active_peer in content.active_peers {
121 if active_peer == [0; 32] {
122 continue;
123 }
124 if let Ok(node_id) = EndpointId::from_bytes(&active_peer) {
125 peers.push(node_id);
126 }
127 }
128 }
129 peers
130 })
131 .collect::<HashSet<_>>();
132
133 tracing::debug!(
134 "MessageOverlapMerge: attempting to join {} node_ids with overlapping messages",
135 node_ids.len()
136 );
137
138 self.gossip_sender
139 .join_peers(
140 node_ids.iter().cloned().collect::<Vec<_>>(),
141 Some(super::MAX_JOIN_PEERS_COUNT),
142 )
143 .await?;
144
145 tracing::debug!(
146 "MessageOverlapMerge: join_peers request sent for split-brain recovery"
147 );
148 }
149 } else {
150 tracing::debug!(
151 "MessageOverlapMerge: no local message hashes yet, skipping overlap detection"
152 );
153 }
154 Ok(())
155 }
156}