Skip to main content

distributed_topic_tracker/gossip/merge/
message_overlap.rs

1//! Split-brain detection via message hash overlap in DHT records.
2
3use actor_helper::{Action, Actor, Handle, Receiver};
4use iroh::EndpointId;
5use std::{collections::HashSet, time::Duration};
6
7use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
8use anyhow::Result;
9
10/// Detects network partitions by comparing message hashes across DHT records.
11///
12/// Joins peers when their published hashes match local message history.
13#[derive(Debug, Clone)]
14pub struct MessageOverlapMerge {
15    _api: Handle<MessageOverlapMergeActor, anyhow::Error>,
16}
17
18#[derive(Debug)]
19struct MessageOverlapMergeActor {
20    rx: Receiver<Action<MessageOverlapMergeActor>>,
21
22    record_publisher: RecordPublisher,
23    gossip_receiver: GossipReceiver,
24    gossip_sender: GossipSender,
25    ticker: tokio::time::Interval,
26}
27
28impl MessageOverlapMerge {
29    /// Create a new split-brain detector.
30    pub fn new(
31        record_publisher: RecordPublisher,
32        gossip_sender: GossipSender,
33        gossip_receiver: GossipReceiver,
34    ) -> Result<Self> {
35        let (api, rx) = Handle::channel();
36
37        let mut ticker = tokio::time::interval(Duration::from_secs(10));
38        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
39
40        tokio::spawn(async move {
41            let mut actor = MessageOverlapMergeActor {
42                rx,
43                record_publisher,
44                gossip_receiver,
45                gossip_sender,
46                ticker,
47            };
48            let _ = actor.run().await;
49        });
50
51        Ok(Self { _api: api })
52    }
53}
54
55impl Actor<anyhow::Error> for MessageOverlapMergeActor {
56    async fn run(&mut self) -> Result<()> {
57        tracing::debug!("MessageOverlapMerge: starting message overlap merge actor");
58        loop {
59            tokio::select! {
60                Ok(action) = self.rx.recv_async() => {
61                    action(self).await;
62                }
63                _ = self.ticker.tick() => {
64                    tracing::debug!("MessageOverlapMerge: tick fired, checking for split-brain");
65                    let _ = self.merge().await;
66                    let next_interval = rand::random::<u64>() % 50;
67                    tracing::debug!("MessageOverlapMerge: next check in {}s", next_interval);
68                    self.ticker.reset_after(Duration::from_secs(next_interval));
69                }
70                else => break Ok(()),
71            }
72        }
73    }
74}
75
76impl MessageOverlapMergeActor {
77    async fn merge(&mut self) -> Result<()> {
78        let unix_minute = crate::unix_minute(0);
79        let mut records = self.record_publisher.get_records(unix_minute - 1).await;
80        records.extend(self.record_publisher.get_records(unix_minute).await);
81
82        let local_hashes = self.gossip_receiver.last_message_hashes().await;
83        tracing::debug!(
84            "MessageOverlapMerge: checking {} records with {} local message hashes",
85            records.len(),
86            local_hashes.len()
87        );
88
89        if !local_hashes.is_empty() {
90            let last_message_hashes = local_hashes;
91            let peers_to_join = records
92                .iter()
93                .filter(|record| {
94                    if let Ok(content) = record.content::<GossipRecordContent>() {
95                        content.last_message_hashes.iter().any(|last_message_hash| {
96                            *last_message_hash != [0; 32]
97                                && last_message_hashes.contains(last_message_hash)
98                        })
99                    } else {
100                        false
101                    }
102                })
103                .collect::<Vec<_>>();
104
105            tracing::debug!(
106                "MessageOverlapMerge: found {} peers with overlapping message hashes",
107                peers_to_join.len()
108            );
109
110            if !peers_to_join.is_empty() {
111                let node_ids = peers_to_join
112                    .iter()
113                    .flat_map(|&record| {
114                        let mut peers = vec![];
115                        if let Ok(node_id) = EndpointId::from_bytes(&record.node_id()) {
116                            peers.push(node_id);
117                        }
118                        if let Ok(content) = record.content::<GossipRecordContent>() {
119                            for active_peer in content.active_peers {
120                                if active_peer == [0; 32] {
121                                    continue;
122                                }
123                                if let Ok(node_id) = EndpointId::from_bytes(&active_peer) {
124                                    peers.push(node_id);
125                                }
126                            }
127                        }
128                        peers
129                    })
130                    .collect::<HashSet<_>>();
131
132                tracing::debug!(
133                    "MessageOverlapMerge: attempting to join {} node_ids with overlapping messages",
134                    node_ids.len()
135                );
136
137                self.gossip_sender
138                    .join_peers(
139                        node_ids.iter().cloned().collect::<Vec<_>>(),
140                        Some(super::MAX_JOIN_PEERS_COUNT),
141                    )
142                    .await?;
143
144                tracing::debug!(
145                    "MessageOverlapMerge: join_peers request sent for split-brain recovery"
146                );
147            }
148        } else {
149            tracing::debug!(
150                "MessageOverlapMerge: no local message hashes yet, skipping overlap detection"
151            );
152        }
153        Ok(())
154    }
155}