distributed_topic_tracker/gossip/merge/
message_overlap.rs

1//! Split-brain detection via message hash overlap in DHT records.
2
3use actor_helper::{Action, Actor, Handle, Receiver};
4use std::{collections::HashSet, time::Duration};
5
6use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
7use anyhow::Result;
8
9/// Detects network partitions by comparing message hashes across DHT records.
10///
11/// Joins peers when their published hashes match local message history.
12#[derive(Debug, Clone)]
13pub struct MessageOverlapMerge {
14    _api: Handle<MessageOverlapMergeActor, anyhow::Error>,
15}
16
17#[derive(Debug)]
18struct MessageOverlapMergeActor {
19    rx: Receiver<Action<MessageOverlapMergeActor>>,
20
21    record_publisher: RecordPublisher,
22    gossip_receiver: GossipReceiver,
23    gossip_sender: GossipSender,
24    ticker: tokio::time::Interval,
25}
26
27impl MessageOverlapMerge {
28    /// Create a new split-brain detector.
29    pub fn new(
30        record_publisher: RecordPublisher,
31        gossip_sender: GossipSender,
32        gossip_receiver: GossipReceiver,
33    ) -> Result<Self> {
34        let (api, rx) = Handle::channel();
35
36        let mut ticker = tokio::time::interval(Duration::from_secs(10));
37        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
38
39        tokio::spawn(async move {
40            let mut actor = MessageOverlapMergeActor {
41                rx,
42                record_publisher,
43                gossip_receiver,
44                gossip_sender,
45                ticker,
46            };
47            let _ = actor.run().await;
48        });
49
50        Ok(Self { _api: api })
51    }
52}
53
54impl Actor<anyhow::Error> for MessageOverlapMergeActor {
55    async fn run(&mut self) -> Result<()> {
56        tracing::debug!("MessageOverlapMerge: starting message overlap merge actor");
57        loop {
58            tokio::select! {
59                Ok(action) = self.rx.recv_async() => {
60                    action(self).await;
61                }
62                _ = self.ticker.tick() => {
63                    tracing::debug!("MessageOverlapMerge: tick fired, checking for split-brain");
64                    let _ = self.merge().await;
65                    let next_interval = rand::random::<u64>() % 50;
66                    tracing::debug!("MessageOverlapMerge: next check in {}s", next_interval);
67                    self.ticker.reset_after(Duration::from_secs(next_interval));
68                }
69                _ = tokio::signal::ctrl_c() => break,
70            }
71        }
72        Ok(())
73    }
74}
75
76impl MessageOverlapMergeActor {
77    async fn merge(&mut self) -> Result<()> {
78        let unix_minute = crate::unix_minute(0);
79        let mut records = self.record_publisher.get_records(unix_minute-1).await;
80        records.extend(self.record_publisher.get_records(unix_minute).await);
81        
82        let local_hashes = self.gossip_receiver.last_message_hashes().await;
83        tracing::debug!("MessageOverlapMerge: checking {} records with {} local message hashes", records.len(), local_hashes.len());
84        
85        if !local_hashes.is_empty() {
86            let last_message_hashes = local_hashes;
87            let peers_to_join = records
88                .iter()
89                .filter(|record| {
90                    if let Ok(content) = record.content::<GossipRecordContent>() {
91                        content.last_message_hashes.iter().any(|last_message_hash| {
92                            *last_message_hash != [0; 32]
93                                && last_message_hashes.contains(last_message_hash)
94                        })
95                    } else {
96                        false
97                    }
98                })
99                .collect::<Vec<_>>();
100            
101            tracing::debug!("MessageOverlapMerge: found {} peers with overlapping message hashes", peers_to_join.len());
102            
103            if !peers_to_join.is_empty() {
104                let node_ids = peers_to_join
105                    .iter()
106                    .flat_map(|&record| {
107                        let mut peers = vec![];
108                        if let Ok(node_id) = iroh::NodeId::from_bytes(&record.node_id()) {
109                            peers.push(node_id);
110                        }
111                        if let Ok(content) = record.content::<GossipRecordContent>() {
112                            for active_peer in content.active_peers {
113                                if active_peer == [0; 32] {
114                                    continue;
115                                }
116                                if let Ok(node_id) = iroh::NodeId::from_bytes(&active_peer) {
117                                    peers.push(node_id);
118                                }
119                            }
120                        }
121                        peers
122                    })
123                    .collect::<HashSet<_>>();
124
125                tracing::debug!("MessageOverlapMerge: attempting to join {} node_ids with overlapping messages", node_ids.len());
126
127                self.gossip_sender
128                    .join_peers(
129                        node_ids.iter().cloned().collect::<Vec<_>>(),
130                        Some(super::MAX_JOIN_PEERS_COUNT),
131                    )
132                    .await?;
133                
134                tracing::debug!("MessageOverlapMerge: join_peers request sent for split-brain recovery");
135            }
136        } else {
137            tracing::debug!("MessageOverlapMerge: no local message hashes yet, skipping overlap detection");
138        }
139        Ok(())
140    }
141}