distributed_topic_tracker/gossip/merge/
bubble.rs

1//! Bubble detection: merge isolated peer groups in the same topic.
2//!
3//! If local peer count < 4, extract suggested peers from DHT records and join them.
4
5use actor_helper::{Action, Actor, Handle, Receiver};
6use std::{collections::HashSet, time::Duration};
7
8use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
9use anyhow::Result;
10
11/// Detects and merges small isolated peer groups (bubbles).
12///
13/// Triggers when local peer count < 4 and DHT records exist. Extracts `active_peers`
14/// from DHT records and joins them.
15#[derive(Debug, Clone)]
16pub struct BubbleMerge {
17    _api: Handle<BubbleMergeActor, anyhow::Error>,
18}
19
20#[derive(Debug)]
21struct BubbleMergeActor {
22    rx: Receiver<Action<BubbleMergeActor>>,
23
24    record_publisher: RecordPublisher,
25    gossip_receiver: GossipReceiver,
26    gossip_sender: GossipSender,
27    ticker: tokio::time::Interval,
28}
29
30impl BubbleMerge {
31    /// Create a new bubble merge detector.
32    ///
33    /// Spawns a background task that periodically checks cluster size.
34    pub fn new(
35        record_publisher: RecordPublisher,
36        gossip_sender: GossipSender,
37        gossip_receiver: GossipReceiver,
38    ) -> Result<Self> {
39        let (api, rx) = Handle::channel();
40
41        let mut ticker = tokio::time::interval(Duration::from_secs(10));
42        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
43
44        tokio::spawn(async move {
45            let mut actor = BubbleMergeActor {
46                rx,
47                record_publisher,
48                gossip_receiver,
49                gossip_sender,
50                ticker,
51            };
52            let _ = actor.run().await;
53        });
54
55        Ok(Self { _api: api })
56    }
57}
58
59impl Actor<anyhow::Error> for BubbleMergeActor {
60    async fn run(&mut self) -> Result<()> {
61        tracing::debug!("BubbleMerge: starting bubble merge actor");
62        loop {
63            tokio::select! {
64                Ok(action) = self.rx.recv_async() => {
65                    action(self).await;
66                }
67                _ = self.ticker.tick() => {
68                    tracing::debug!("BubbleMerge: tick fired, checking for bubbles");
69                    let _ = self.merge().await;
70                    let next_interval = rand::random::<u64>() % 50;
71                    tracing::debug!("BubbleMerge: next check in {}s", next_interval);
72                    self.ticker.reset_after(Duration::from_secs(next_interval));
73                }
74                _ = tokio::signal::ctrl_c() => break,
75            }
76        }
77        Ok(())
78    }
79}
80
81impl BubbleMergeActor {
82    // Cluster size as bubble indicator
83    async fn merge(&mut self) -> Result<()> {
84        let unix_minute = crate::unix_minute(0);
85        let mut records = self.record_publisher.get_records(unix_minute - 1).await;
86        records.extend(self.record_publisher.get_records(unix_minute).await);
87
88        let neighbors = self.gossip_receiver.neighbors().await;
89        tracing::debug!(
90            "BubbleMerge: checking with {} neighbors and {} records",
91            neighbors.len(),
92            records.len()
93        );
94
95        if neighbors.len() < 4 && !records.is_empty() {
96            tracing::debug!(
97                "BubbleMerge: detected small bubble ({} neighbors < 4), attempting merge",
98                neighbors.len()
99            );
100            let node_ids = records
101                .iter()
102                .flat_map(|record| {
103                    let mut node_ids = if let Ok(content) = record.content::<GossipRecordContent>()
104                    {
105                        content
106                            .active_peers
107                            .iter()
108                            .filter_map(|&active_peer| {
109                                if active_peer == [0; 32]
110                                    || neighbors.contains(&active_peer)
111                                    || active_peer.eq(record.node_id().to_vec().as_slice())
112                                    || active_peer.eq(self.record_publisher.pub_key().as_bytes())
113                                {
114                                    None
115                                } else {
116                                    iroh::NodeId::from_bytes(&active_peer).ok()
117                                }
118                            })
119                            .collect::<Vec<_>>()
120                    } else {
121                        vec![]
122                    };
123                    if let Ok(node_id) = iroh::NodeId::from_bytes(&record.node_id()) {
124                        if node_id != self.record_publisher.pub_key().into() {
125                            node_ids.push(node_id);
126                        }
127                    }
128                    node_ids
129                })
130                .collect::<HashSet<_>>();
131
132            tracing::debug!(
133                "BubbleMerge: found {} potential peers to join",
134                node_ids.len()
135            );
136
137            if !node_ids.is_empty() {
138                self.gossip_sender
139                    .join_peers(
140                        node_ids.iter().cloned().collect::<Vec<_>>(),
141                        Some(super::MAX_JOIN_PEERS_COUNT),
142                    )
143                    .await?;
144                tracing::debug!("BubbleMerge: join_peers request sent");
145            }
146        } else {
147            tracing::debug!(
148                "BubbleMerge: no merge needed (neighbors={}, records={})",
149                neighbors.len(),
150                records.len()
151            );
152        }
153        Ok(())
154    }
155}