Skip to main content

distributed_topic_tracker/gossip/merge/
bubble.rs

1//! Bubble detection: merge isolated peer groups in the same topic.
2//!
3//! If local peer count < 4, extract suggested peers from DHT records and join them.
4
5use actor_helper::{Action, Actor, Handle, Receiver};
6use iroh::EndpointId;
7use std::{collections::HashSet, time::Duration};
8
9use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
10use anyhow::Result;
11
12/// Detects and merges small isolated peer groups (bubbles).
13///
14/// Triggers when local peer count < 4 and DHT records exist. Extracts `active_peers`
15/// from DHT records and joins them.
16#[derive(Debug, Clone)]
17pub struct BubbleMerge {
18    _api: Handle<BubbleMergeActor, anyhow::Error>,
19}
20
21#[derive(Debug)]
22struct BubbleMergeActor {
23    rx: Receiver<Action<BubbleMergeActor>>,
24
25    record_publisher: RecordPublisher,
26    gossip_receiver: GossipReceiver,
27    gossip_sender: GossipSender,
28    ticker: tokio::time::Interval,
29}
30
31impl BubbleMerge {
32    /// Create a new bubble merge detector.
33    ///
34    /// Spawns a background task that periodically checks cluster size.
35    pub fn new(
36        record_publisher: RecordPublisher,
37        gossip_sender: GossipSender,
38        gossip_receiver: GossipReceiver,
39    ) -> Result<Self> {
40        let (api, rx) = Handle::channel();
41
42        let mut ticker = tokio::time::interval(Duration::from_secs(10));
43        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
44
45        tokio::spawn(async move {
46            let mut actor = BubbleMergeActor {
47                rx,
48                record_publisher,
49                gossip_receiver,
50                gossip_sender,
51                ticker,
52            };
53            let _ = actor.run().await;
54        });
55
56        Ok(Self { _api: api })
57    }
58}
59
60impl Actor<anyhow::Error> for BubbleMergeActor {
61    async fn run(&mut self) -> Result<()> {
62        tracing::debug!("BubbleMerge: starting bubble merge actor");
63        loop {
64            tokio::select! {
65                Ok(action) = self.rx.recv_async() => {
66                    action(self).await;
67                }
68                _ = self.ticker.tick() => {
69                    tracing::debug!("BubbleMerge: tick fired, checking for bubbles");
70                    let _ = self.merge().await;
71                    let next_interval = rand::random::<u64>() % 50;
72                    tracing::debug!("BubbleMerge: next check in {}s", next_interval);
73                    self.ticker.reset_after(Duration::from_secs(next_interval));
74                }
75                else => break Ok(()),
76            }
77        }
78    }
79}
80
81impl BubbleMergeActor {
82    // Cluster size as bubble indicator
83    async fn merge(&mut self) -> Result<()> {
84        let unix_minute = crate::unix_minute(0);
85        let mut records = self.record_publisher.get_records(unix_minute - 1).await;
86        records.extend(self.record_publisher.get_records(unix_minute).await);
87
88        let neighbors = self.gossip_receiver.neighbors().await;
89        tracing::debug!(
90            "BubbleMerge: checking with {} neighbors and {} records",
91            neighbors.len(),
92            records.len()
93        );
94
95        if neighbors.len() < 4 && !records.is_empty() {
96            tracing::debug!(
97                "BubbleMerge: detected small bubble ({} neighbors < 4), attempting merge",
98                neighbors.len()
99            );
100            let node_ids = records
101                .iter()
102                .flat_map(|record| {
103                    let mut endpoint_ids = if let Ok(content) =
104                        record.content::<GossipRecordContent>()
105                    {
106                        content
107                            .active_peers
108                            .iter()
109                            .filter_map(|&active_peer| {
110                                if active_peer == [0; 32]
111                                    || neighbors.contains(&active_peer)
112                                    || active_peer.eq(record.node_id().to_vec().as_slice())
113                                    || active_peer.eq(self.record_publisher.pub_key().as_bytes())
114                                {
115                                    None
116                                } else {
117                                    iroh::EndpointId::from_bytes(&active_peer).ok()
118                                }
119                            })
120                            .collect::<Vec<_>>()
121                    } else {
122                        vec![]
123                    };
124                    if let Ok(endpoint_id) = EndpointId::from_bytes(&record.node_id())
125                        && endpoint_id
126                            != EndpointId::from_verifying_key(self.record_publisher.pub_key())
127                    {
128                        endpoint_ids.push(endpoint_id);
129                    }
130                    endpoint_ids
131                })
132                .collect::<HashSet<_>>();
133
134            tracing::debug!(
135                "BubbleMerge: found {} potential peers to join",
136                node_ids.len()
137            );
138
139            if !node_ids.is_empty() {
140                self.gossip_sender
141                    .join_peers(
142                        node_ids.iter().cloned().collect::<Vec<_>>(),
143                        Some(super::MAX_JOIN_PEERS_COUNT),
144                    )
145                    .await?;
146                tracing::debug!("BubbleMerge: join_peers request sent");
147            }
148        } else {
149            tracing::debug!(
150                "BubbleMerge: no merge needed (neighbors={}, records={})",
151                neighbors.len(),
152                records.len()
153            );
154        }
155        Ok(())
156    }
157}