Skip to main content

distributed_topic_tracker/gossip/merge/
message_overlap.rs

1//! Split-brain detection via message hash overlap in DHT records.
2
3use actor_helper::{Action, Handle, Receiver};
4use iroh::EndpointId;
5use std::{collections::HashSet, time::Duration};
6
7use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
8use anyhow::Result;
9
10/// Detects network partitions by comparing message hashes across DHT records.
11///
12/// Joins peers whose published hashes do not overlap with local message history.
13#[derive(Debug, Clone)]
14pub struct MessageOverlapMerge {
15    _api: Handle<MessageOverlapMergeActor, anyhow::Error>,
16}
17
18#[derive(Debug)]
19struct MessageOverlapMergeActor {
20    record_publisher: RecordPublisher,
21    gossip_receiver: GossipReceiver,
22    gossip_sender: GossipSender,
23    ticker: tokio::time::Interval,
24    cancel_token: tokio_util::sync::CancellationToken,
25
26    max_join_peers: usize,
27    base_interval: Duration,
28    max_jitter: Duration,
29}
30
31impl MessageOverlapMerge {
32    /// Create a new split-brain detector.
33    pub fn new(
34        record_publisher: RecordPublisher,
35        gossip_sender: GossipSender,
36        gossip_receiver: GossipReceiver,
37        cancel_token: tokio_util::sync::CancellationToken,
38        max_join_peers: usize,
39        base_interval: Duration,
40        max_jitter: Duration,
41    ) -> Result<Self> {
42        let base_interval = base_interval.max(Duration::from_secs(1));
43        let mut ticker = tokio::time::interval(base_interval);
44        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
45
46        let api = Handle::spawn_with(
47            MessageOverlapMergeActor {
48                record_publisher,
49                gossip_receiver,
50                gossip_sender,
51                ticker,
52                cancel_token,
53                max_join_peers,
54                base_interval,
55                max_jitter,
56            },
57            |mut actor, rx| async move { actor.run(rx).await },
58        )
59        .0;
60        Ok(Self { _api: api })
61    }
62}
63
64impl MessageOverlapMergeActor {
65    async fn run(&mut self, rx: Receiver<Action<MessageOverlapMergeActor>>) -> Result<()> {
66        tracing::debug!("MessageOverlapMerge: starting message overlap merge actor");
67        loop {
68            tokio::select! {
69                result = rx.recv_async() => {
70                    match result {
71                        Ok(action) => action(self).await,
72                        Err(_) => break Ok(()),
73                    }
74                }
75                _ = self.ticker.tick() => {
76                    tracing::debug!("MessageOverlapMerge: tick fired, checking for split-brain");
77                    if let Err(e) = self.merge().await {
78                        tracing::warn!("MessageOverlapMerge: error during merge: {:?}", e);
79                    }
80                    let jitter = if self.max_jitter > Duration::ZERO {
81                        Duration::from_nanos((rand::random::<u128>() % self.max_jitter.as_nanos()) as u64)
82                    } else {
83                        Duration::ZERO
84                    };
85                    let next_interval = self.base_interval + jitter;
86                    tracing::debug!("MessageOverlapMerge: next check in {}ms", next_interval.as_millis());
87                    self.ticker.reset_after(next_interval);
88                }
89                _ = self.cancel_token.cancelled() => {
90                    break Ok(());
91                }
92                else => break Ok(()),
93            }
94        }
95    }
96}
97
98impl MessageOverlapMergeActor {
99    async fn merge(&mut self) -> Result<()> {
100        let unix_minute = crate::unix_minute(0);
101        let mut records = self
102            .record_publisher
103            .get_records(unix_minute - 1, self.cancel_token.clone())
104            .await?;
105        records.extend(
106            self.record_publisher
107                .get_records(unix_minute, self.cancel_token.clone())
108                .await?,
109        );
110
111        let local_hashes = self.gossip_receiver.last_message_hashes().await?;
112        tracing::debug!(
113            "MessageOverlapMerge: checking {} records with {} local message hashes",
114            records.len(),
115            local_hashes.len()
116        );
117
118        if !local_hashes.is_empty() {
119            let last_message_hashes = local_hashes;
120            let peers_to_join = records
121                .iter()
122                .filter(|record| {
123                    if let Ok(content) = record.content::<GossipRecordContent>() {
124                        let remote_hashes = content
125                            .last_message_hashes
126                            .iter()
127                            .filter(|last_message_hash| **last_message_hash != [0; 32])
128                            .collect::<Vec<_>>();
129
130                        !remote_hashes.is_empty()
131                            && remote_hashes.iter().all(|last_message_hash| {
132                                !last_message_hashes.contains(*last_message_hash)
133                            })
134                    } else {
135                        false
136                    }
137                })
138                .collect::<Vec<_>>();
139
140            tracing::debug!(
141                "MessageOverlapMerge: found {} peers with no overlapping message hashes",
142                peers_to_join.len()
143            );
144
145            if !peers_to_join.is_empty() {
146                let active_neighbors = self.gossip_receiver.neighbors().await?;
147                let self_pub_key = EndpointId::from_verifying_key(self.record_publisher.pub_key());
148                let pub_keys = peers_to_join
149                    .iter()
150                    .flat_map(|&record| {
151                        let mut peers = vec![];
152                        if let Ok(pub_key) = EndpointId::from_bytes(&record.pub_key())
153                            && pub_key != self_pub_key
154                        {
155                            peers.push(pub_key);
156                        }
157                        if let Ok(content) = record.content::<GossipRecordContent>() {
158                            for active_peer in content.active_peers {
159                                if active_peer == [0; 32] {
160                                    continue;
161                                }
162                                if let Ok(pub_key) = EndpointId::from_bytes(&active_peer)
163                                    && pub_key != self_pub_key
164                                {
165                                    peers.push(pub_key);
166                                }
167                            }
168                        }
169                        peers
170                    })
171                    .filter(|pub_key| !active_neighbors.contains(pub_key))
172                    .collect::<HashSet<_>>();
173
174                if !pub_keys.is_empty() {
175                    tracing::debug!(
176                        "MessageOverlapMerge: attempting to join {} pub_keys with no overlapping messages",
177                        pub_keys.len()
178                    );
179
180                    self.gossip_sender
181                        .join_peers(
182                            pub_keys.iter().cloned().collect::<Vec<_>>(),
183                            Some(self.max_join_peers),
184                        )
185                        .await?;
186
187                    tracing::debug!(
188                        "MessageOverlapMerge: join_peers request sent for split-brain recovery"
189                    );
190                }
191            }
192        } else {
193            tracing::debug!(
194                "MessageOverlapMerge: no local message hashes yet, skipping overlap detection"
195            );
196        }
197        Ok(())
198    }
199}