distributed_topic_tracker/gossip/merge/
bubble.rs

1//! Bubble detection: merge isolated peer groups in the same topic.
2//!
3//! If local peer count < 4, extract suggested peers from DHT records and join them.
4
5use actor_helper::{Action, Actor, Handle, Receiver};
6use iroh::EndpointId;
7use std::{collections::HashSet, time::Duration};
8
9use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
10use anyhow::Result;
11
12/// Detects and merges small isolated peer groups (bubbles).
13///
14/// Triggers when local peer count < 4 and DHT records exist. Extracts `active_peers`
15/// from DHT records and joins them.
16#[derive(Debug, Clone)]
17pub struct BubbleMerge {
18    _api: Handle<BubbleMergeActor, anyhow::Error>,
19}
20
21#[derive(Debug)]
22struct BubbleMergeActor {
23    rx: Receiver<Action<BubbleMergeActor>>,
24
25    record_publisher: RecordPublisher,
26    gossip_receiver: GossipReceiver,
27    gossip_sender: GossipSender,
28    ticker: tokio::time::Interval,
29}
30
31impl BubbleMerge {
32    /// Create a new bubble merge detector.
33    ///
34    /// Spawns a background task that periodically checks cluster size.
35    pub fn new(
36        record_publisher: RecordPublisher,
37        gossip_sender: GossipSender,
38        gossip_receiver: GossipReceiver,
39    ) -> Result<Self> {
40        let (api, rx) = Handle::channel();
41
42        let mut ticker = tokio::time::interval(Duration::from_secs(10));
43        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
44
45        tokio::spawn(async move {
46            let mut actor = BubbleMergeActor {
47                rx,
48                record_publisher,
49                gossip_receiver,
50                gossip_sender,
51                ticker,
52            };
53            let _ = actor.run().await;
54        });
55
56        Ok(Self { _api: api })
57    }
58}
59
60impl Actor<anyhow::Error> for BubbleMergeActor {
61    async fn run(&mut self) -> Result<()> {
62        tracing::debug!("BubbleMerge: starting bubble merge actor");
63        loop {
64            tokio::select! {
65                Ok(action) = self.rx.recv_async() => {
66                    action(self).await;
67                }
68                _ = self.ticker.tick() => {
69                    tracing::debug!("BubbleMerge: tick fired, checking for bubbles");
70                    let _ = self.merge().await;
71                    let next_interval = rand::random::<u64>() % 50;
72                    tracing::debug!("BubbleMerge: next check in {}s", next_interval);
73                    self.ticker.reset_after(Duration::from_secs(next_interval));
74                }
75                _ = tokio::signal::ctrl_c() => break,
76            }
77        }
78        Ok(())
79    }
80}
81
82impl BubbleMergeActor {
83    // Cluster size as bubble indicator
84    async fn merge(&mut self) -> Result<()> {
85        let unix_minute = crate::unix_minute(0);
86        let mut records = self.record_publisher.get_records(unix_minute - 1).await;
87        records.extend(self.record_publisher.get_records(unix_minute).await);
88
89        let neighbors = self.gossip_receiver.neighbors().await;
90        tracing::debug!(
91            "BubbleMerge: checking with {} neighbors and {} records",
92            neighbors.len(),
93            records.len()
94        );
95
96        if neighbors.len() < 4 && !records.is_empty() {
97            tracing::debug!(
98                "BubbleMerge: detected small bubble ({} neighbors < 4), attempting merge",
99                neighbors.len()
100            );
101            let node_ids = records
102                .iter()
103                .flat_map(|record| {
104                    let mut endpoint_ids = if let Ok(content) =
105                        record.content::<GossipRecordContent>()
106                    {
107                        content
108                            .active_peers
109                            .iter()
110                            .filter_map(|&active_peer| {
111                                if active_peer == [0; 32]
112                                    || neighbors.contains(&active_peer)
113                                    || active_peer.eq(record.node_id().to_vec().as_slice())
114                                    || active_peer.eq(self.record_publisher.pub_key().as_bytes())
115                                {
116                                    None
117                                } else {
118                                    iroh::EndpointId::from_bytes(&active_peer).ok()
119                                }
120                            })
121                            .collect::<Vec<_>>()
122                    } else {
123                        vec![]
124                    };
125                    if let Ok(endpoint_id) = EndpointId::from_bytes(&record.node_id()) {
126                        if endpoint_id
127                            != EndpointId::from_verifying_key(self.record_publisher.pub_key())
128                        {
129                            endpoint_ids.push(endpoint_id);
130                        }
131                    }
132                    endpoint_ids
133                })
134                .collect::<HashSet<_>>();
135
136            tracing::debug!(
137                "BubbleMerge: found {} potential peers to join",
138                node_ids.len()
139            );
140
141            if !node_ids.is_empty() {
142                self.gossip_sender
143                    .join_peers(
144                        node_ids.iter().cloned().collect::<Vec<_>>(),
145                        Some(super::MAX_JOIN_PEERS_COUNT),
146                    )
147                    .await?;
148                tracing::debug!("BubbleMerge: join_peers request sent");
149            }
150        } else {
151            tracing::debug!(
152                "BubbleMerge: no merge needed (neighbors={}, records={})",
153                neighbors.len(),
154                records.len()
155            );
156        }
157        Ok(())
158    }
159}