Skip to main content

distributed_topic_tracker/gossip/merge/
message_overlap.rs

1//! Split-brain detection via message hash overlap in DHT records.
2
3use actor_helper::{Action, Handle, Receiver};
4use iroh::EndpointId;
5use std::{collections::HashSet, time::Duration};
6
7use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
8use anyhow::Result;
9
10/// Detects network partitions by comparing message hashes across DHT records.
11///
12/// Joins peers whose published hashes do not overlap with local message history.
13#[derive(Debug, Clone)]
14pub struct MessageOverlapMerge {
15    _api: Handle<MessageOverlapMergeActor, anyhow::Error>,
16}
17
18#[derive(Debug)]
19struct MessageOverlapMergeActor {
20    record_publisher: RecordPublisher,
21    gossip_receiver: GossipReceiver,
22    gossip_sender: GossipSender,
23    ticker: tokio::time::Interval,
24    cancel_token: tokio_util::sync::CancellationToken,
25
26    max_join_peers: usize,
27    base_interval: Duration,
28    max_jitter: Duration,
29}
30
31impl MessageOverlapMerge {
32    /// Create a new split-brain detector.
33    #[allow(clippy::too_many_arguments)]
34    pub fn new(
35        record_publisher: RecordPublisher,
36        gossip_sender: GossipSender,
37        gossip_receiver: GossipReceiver,
38        cancel_token: tokio_util::sync::CancellationToken,
39        max_join_peers: usize,
40        initial_interval: Duration,
41        base_interval: Duration,
42        max_jitter: Duration,
43    ) -> Result<Self> {
44        let base_interval = base_interval.max(Duration::from_secs(1));
45        let mut ticker = tokio::time::interval_at(tokio::time::Instant::now() + initial_interval, base_interval);
46        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
47
48        let api = Handle::spawn_with(
49            MessageOverlapMergeActor {
50                record_publisher,
51                gossip_receiver,
52                gossip_sender,
53                ticker,
54                cancel_token,
55                max_join_peers,
56                base_interval,
57                max_jitter,
58            },
59            |mut actor, rx| async move { actor.run(rx).await },
60        )
61        .0;
62        Ok(Self { _api: api })
63    }
64}
65
66impl MessageOverlapMergeActor {
67    async fn run(&mut self, rx: Receiver<Action<MessageOverlapMergeActor>>) -> Result<()> {
68        tracing::debug!("MessageOverlapMerge: starting message overlap merge actor");
69        loop {
70            tokio::select! {
71                result = rx.recv_async() => {
72                    match result {
73                        Ok(action) => action(self).await,
74                        Err(_) => break Ok(()),
75                    }
76                }
77                _ = self.ticker.tick() => {
78                    tracing::debug!("MessageOverlapMerge: tick fired, checking for split-brain");
79                    if let Err(e) = self.merge().await {
80                        tracing::warn!("MessageOverlapMerge: error during merge: {:?}", e);
81                    }
82                    let jitter = if self.max_jitter > Duration::ZERO {
83                        Duration::from_nanos((rand::random::<u128>() % self.max_jitter.as_nanos()) as u64)
84                    } else {
85                        Duration::ZERO
86                    };
87                    let next_interval = self.base_interval + jitter;
88                    tracing::debug!("MessageOverlapMerge: next check in {}ms", next_interval.as_millis());
89                    self.ticker.reset_after(next_interval);
90                }
91                _ = self.cancel_token.cancelled() => {
92                    break Ok(());
93                }
94                else => break Ok(()),
95            }
96        }
97    }
98}
99
100impl MessageOverlapMergeActor {
101    async fn merge(&mut self) -> Result<()> {
102        let unix_minute = crate::unix_minute(0);
103        let mut records = self
104            .record_publisher
105            .get_records(unix_minute - 1, self.cancel_token.clone())
106            .await?;
107        records.extend(
108            self.record_publisher
109                .get_records(unix_minute, self.cancel_token.clone())
110                .await?,
111        );
112
113        let local_hashes = self.gossip_receiver.last_message_hashes().await?;
114        tracing::debug!(
115            "MessageOverlapMerge: checking {} records with {} local message hashes",
116            records.len(),
117            local_hashes.len()
118        );
119
120        if !local_hashes.is_empty() {
121            let last_message_hashes = local_hashes;
122            let peers_to_join = records
123                .iter()
124                .filter(|record| {
125                    if let Ok(content) = record.content::<GossipRecordContent>() {
126                        let remote_hashes = content
127                            .last_message_hashes
128                            .iter()
129                            .filter(|last_message_hash| **last_message_hash != [0; 32])
130                            .collect::<Vec<_>>();
131
132                        !remote_hashes.is_empty()
133                            && remote_hashes.iter().all(|last_message_hash| {
134                                !last_message_hashes.contains(*last_message_hash)
135                            })
136                    } else {
137                        false
138                    }
139                })
140                .collect::<Vec<_>>();
141
142            tracing::debug!(
143                "MessageOverlapMerge: found {} peers with no overlapping message hashes",
144                peers_to_join.len()
145            );
146
147            if !peers_to_join.is_empty() {
148                let active_neighbors = self.gossip_receiver.neighbors().await?;
149                let self_pub_key = EndpointId::from_verifying_key(self.record_publisher.pub_key());
150                let pub_keys = peers_to_join
151                    .iter()
152                    .flat_map(|&record| {
153                        let mut peers = vec![];
154                        if let Ok(pub_key) = EndpointId::from_bytes(&record.pub_key())
155                            && pub_key != self_pub_key
156                        {
157                            peers.push(pub_key);
158                        }
159                        if let Ok(content) = record.content::<GossipRecordContent>() {
160                            for active_peer in content.active_peers {
161                                if active_peer == [0; 32] {
162                                    continue;
163                                }
164                                if let Ok(pub_key) = EndpointId::from_bytes(&active_peer)
165                                    && pub_key != self_pub_key
166                                {
167                                    peers.push(pub_key);
168                                }
169                            }
170                        }
171                        peers
172                    })
173                    .filter(|pub_key| !active_neighbors.contains(pub_key))
174                    .collect::<HashSet<_>>();
175
176                if !pub_keys.is_empty() {
177                    tracing::debug!(
178                        "MessageOverlapMerge: attempting to join {} pub_keys with no overlapping messages",
179                        pub_keys.len()
180                    );
181
182                    self.gossip_sender
183                        .join_peers(
184                            pub_keys.iter().cloned().collect::<Vec<_>>(),
185                            Some(self.max_join_peers),
186                        )
187                        .await?;
188
189                    tracing::debug!(
190                        "MessageOverlapMerge: join_peers request sent for split-brain recovery"
191                    );
192                }
193            }
194        } else {
195            tracing::debug!(
196                "MessageOverlapMerge: no local message hashes yet, skipping overlap detection"
197            );
198        }
199        Ok(())
200    }
201}