distributed_topic_tracker/gossip/merge/
message_overlap.rs

1//! Split-brain detection via message hash overlap in DHT records.
2
3use actor_helper::{Action, Actor, Handle, Receiver};
4use iroh::EndpointId;
5use std::{collections::HashSet, time::Duration};
6
7use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
8use anyhow::Result;
9
10/// Detects network partitions by comparing message hashes across DHT records.
11///
12/// Joins peers when their published hashes match local message history.
13#[derive(Debug, Clone)]
14pub struct MessageOverlapMerge {
15    _api: Handle<MessageOverlapMergeActor, anyhow::Error>,
16}
17
18#[derive(Debug)]
19struct MessageOverlapMergeActor {
20    rx: Receiver<Action<MessageOverlapMergeActor>>,
21
22    record_publisher: RecordPublisher,
23    gossip_receiver: GossipReceiver,
24    gossip_sender: GossipSender,
25    ticker: tokio::time::Interval,
26}
27
28impl MessageOverlapMerge {
29    /// Create a new split-brain detector.
30    pub fn new(
31        record_publisher: RecordPublisher,
32        gossip_sender: GossipSender,
33        gossip_receiver: GossipReceiver,
34    ) -> Result<Self> {
35        let (api, rx) = Handle::channel();
36
37        let mut ticker = tokio::time::interval(Duration::from_secs(10));
38        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
39
40        tokio::spawn(async move {
41            let mut actor = MessageOverlapMergeActor {
42                rx,
43                record_publisher,
44                gossip_receiver,
45                gossip_sender,
46                ticker,
47            };
48            let _ = actor.run().await;
49        });
50
51        Ok(Self { _api: api })
52    }
53}
54
55impl Actor<anyhow::Error> for MessageOverlapMergeActor {
56    async fn run(&mut self) -> Result<()> {
57        tracing::debug!("MessageOverlapMerge: starting message overlap merge actor");
58        loop {
59            tokio::select! {
60                Ok(action) = self.rx.recv_async() => {
61                    action(self).await;
62                }
63                _ = self.ticker.tick() => {
64                    tracing::debug!("MessageOverlapMerge: tick fired, checking for split-brain");
65                    let _ = self.merge().await;
66                    let next_interval = rand::random::<u64>() % 50;
67                    tracing::debug!("MessageOverlapMerge: next check in {}s", next_interval);
68                    self.ticker.reset_after(Duration::from_secs(next_interval));
69                }
70                _ = tokio::signal::ctrl_c() => break,
71            }
72        }
73        Ok(())
74    }
75}
76
77impl MessageOverlapMergeActor {
78    async fn merge(&mut self) -> Result<()> {
79        let unix_minute = crate::unix_minute(0);
80        let mut records = self.record_publisher.get_records(unix_minute - 1).await;
81        records.extend(self.record_publisher.get_records(unix_minute).await);
82
83        let local_hashes = self.gossip_receiver.last_message_hashes().await;
84        tracing::debug!(
85            "MessageOverlapMerge: checking {} records with {} local message hashes",
86            records.len(),
87            local_hashes.len()
88        );
89
90        if !local_hashes.is_empty() {
91            let last_message_hashes = local_hashes;
92            let peers_to_join = records
93                .iter()
94                .filter(|record| {
95                    if let Ok(content) = record.content::<GossipRecordContent>() {
96                        content.last_message_hashes.iter().any(|last_message_hash| {
97                            *last_message_hash != [0; 32]
98                                && last_message_hashes.contains(last_message_hash)
99                        })
100                    } else {
101                        false
102                    }
103                })
104                .collect::<Vec<_>>();
105
106            tracing::debug!(
107                "MessageOverlapMerge: found {} peers with overlapping message hashes",
108                peers_to_join.len()
109            );
110
111            if !peers_to_join.is_empty() {
112                let node_ids = peers_to_join
113                    .iter()
114                    .flat_map(|&record| {
115                        let mut peers = vec![];
116                        if let Ok(node_id) = EndpointId::from_bytes(&record.node_id()) {
117                            peers.push(node_id);
118                        }
119                        if let Ok(content) = record.content::<GossipRecordContent>() {
120                            for active_peer in content.active_peers {
121                                if active_peer == [0; 32] {
122                                    continue;
123                                }
124                                if let Ok(node_id) = EndpointId::from_bytes(&active_peer) {
125                                    peers.push(node_id);
126                                }
127                            }
128                        }
129                        peers
130                    })
131                    .collect::<HashSet<_>>();
132
133                tracing::debug!(
134                    "MessageOverlapMerge: attempting to join {} node_ids with overlapping messages",
135                    node_ids.len()
136                );
137
138                self.gossip_sender
139                    .join_peers(
140                        node_ids.iter().cloned().collect::<Vec<_>>(),
141                        Some(super::MAX_JOIN_PEERS_COUNT),
142                    )
143                    .await?;
144
145                tracing::debug!(
146                    "MessageOverlapMerge: join_peers request sent for split-brain recovery"
147                );
148            }
149        } else {
150            tracing::debug!(
151                "MessageOverlapMerge: no local message hashes yet, skipping overlap detection"
152            );
153        }
154        Ok(())
155    }
156}