Skip to main content

distributed_topic_tracker/gossip/merge/
bubble.rs

1//! Bubble detection: merge isolated peer groups in the same topic.
2//!
3//! If local peer count < `min_neighbors`, extract suggested peers from DHT records and join them.
4
5use actor_helper::{Action, Handle, Receiver};
6use iroh::EndpointId;
7use std::{collections::HashSet, time::Duration};
8
9use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
10use anyhow::Result;
11
12/// Detects and merges small isolated peer groups (bubbles).
13///
14/// Triggers when local peer count < `min_neighbors` and DHT records exist. Extracts `active_peers`
15/// from DHT records and joins them.
16#[derive(Debug, Clone)]
17pub struct BubbleMerge {
18    _api: Handle<BubbleMergeActor, anyhow::Error>,
19}
20
21#[derive(Debug)]
22struct BubbleMergeActor {
23    record_publisher: RecordPublisher,
24    gossip_receiver: GossipReceiver,
25    gossip_sender: GossipSender,
26    ticker: tokio::time::Interval,
27    cancel_token: tokio_util::sync::CancellationToken,
28    max_join_peers: usize,
29    base_interval: Duration,
30    max_jitter: Duration,
31    min_neighbors: usize,
32}
33
34impl BubbleMerge {
35    /// Create a new bubble merge detector.
36    ///
37    /// Spawns a background task that periodically checks cluster size.
38    #[allow(clippy::too_many_arguments)]
39    pub fn new(
40        record_publisher: RecordPublisher,
41        gossip_sender: GossipSender,
42        gossip_receiver: GossipReceiver,
43        cancel_token: tokio_util::sync::CancellationToken,
44        max_join_peers: usize,
45        base_interval: Duration,
46        max_jitter: Duration,
47        min_neighbors: usize,
48    ) -> Result<Self> {
49        let base_interval = base_interval.max(Duration::from_secs(1));
50        let mut ticker = tokio::time::interval(base_interval);
51        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
52
53        let api = Handle::spawn_with(
54            BubbleMergeActor {
55                record_publisher,
56                gossip_receiver,
57                gossip_sender,
58                ticker,
59                cancel_token,
60                max_join_peers,
61                base_interval,
62                max_jitter,
63                min_neighbors,
64            },
65            |mut actor, rx| async move { actor.run(rx).await },
66        )
67        .0;
68
69        Ok(Self { _api: api })
70    }
71}
72
73impl BubbleMergeActor {
74    async fn run(&mut self, rx: Receiver<Action<BubbleMergeActor>>) -> Result<()> {
75        tracing::debug!("BubbleMerge: starting bubble merge actor");
76        loop {
77            tokio::select! {
78                result = rx.recv_async() => {
79                    match result {
80                        Ok(action) => action(self).await,
81                        Err(_) => break Ok(()),
82                    }
83                }
84                _ = self.ticker.tick() => {
85                    tracing::debug!("BubbleMerge: tick fired, checking for bubbles");
86                    if let Err(e) = self.merge().await {
87                        tracing::warn!("BubbleMerge: error during merge: {:?}", e);
88                    }
89                    let jitter = if self.max_jitter > Duration::ZERO {
90                        Duration::from_nanos((rand::random::<u128>() % self.max_jitter.as_nanos()) as u64)
91                    } else {
92                        Duration::ZERO
93                    };
94                    let next_interval = self.base_interval + jitter;
95                    tracing::debug!("BubbleMerge: next check in {}ms", next_interval.as_millis());
96                    self.ticker.reset_after(next_interval);
97                }
98                _ = self.cancel_token.cancelled() => {
99                    break Ok(());
100                }
101                else => break Ok(()),
102            }
103        }
104    }
105}
106
107impl BubbleMergeActor {
108    // Cluster size as bubble indicator
109    async fn merge(&mut self) -> Result<()> {
110        let unix_minute = crate::unix_minute(0);
111        let mut records = self
112            .record_publisher
113            .get_records(unix_minute - 1, self.cancel_token.clone())
114            .await?;
115        records.extend(
116            self.record_publisher
117                .get_records(unix_minute, self.cancel_token.clone())
118                .await?,
119        );
120
121        let neighbors = self.gossip_receiver.neighbors().await?;
122        tracing::debug!(
123            "BubbleMerge: checking with {} neighbors and {} records",
124            neighbors.len(),
125            records.len()
126        );
127
128        if neighbors.len() < self.min_neighbors && !records.is_empty() {
129            tracing::debug!(
130                "BubbleMerge: detected small bubble ({} neighbors < {}), attempting merge",
131                neighbors.len(),
132                self.min_neighbors
133            );
134            let self_pub_key = EndpointId::from_verifying_key(self.record_publisher.pub_key());
135            let pub_keys = records
136                .iter()
137                .flat_map(|record| {
138                    let mut pub_keys = if let Ok(content) = record.content::<GossipRecordContent>()
139                    {
140                        content
141                            .active_peers
142                            .iter()
143                            .filter_map(|&active_peer| {
144                                if active_peer == [0; 32]
145                                    || neighbors.contains(&active_peer)
146                                    || active_peer == record.pub_key()
147                                    || active_peer.eq(self_pub_key.as_bytes())
148                                {
149                                    None
150                                } else {
151                                    iroh::EndpointId::from_bytes(&active_peer).ok()
152                                }
153                            })
154                            .collect::<Vec<_>>()
155                    } else {
156                        vec![]
157                    };
158                    if let Ok(pub_key) = EndpointId::from_bytes(&record.pub_key())
159                        && !pub_key.eq(&self_pub_key)
160                        && !neighbors.contains(&pub_key)
161                    {
162                        pub_keys.push(pub_key);
163                    }
164                    pub_keys
165                })
166                .collect::<HashSet<_>>();
167
168            if !pub_keys.is_empty() {
169                tracing::debug!(
170                    "BubbleMerge: found {} potential peers to join",
171                    pub_keys.len()
172                );
173
174                self.gossip_sender
175                    .join_peers(
176                        pub_keys.iter().cloned().collect::<Vec<_>>(),
177                        Some(self.max_join_peers),
178                    )
179                    .await?;
180
181                tracing::debug!("BubbleMerge: join_peers request sent");
182            }
183        } else {
184            tracing::debug!(
185                "BubbleMerge: no merge needed (neighbors={}, records={})",
186                neighbors.len(),
187                records.len()
188            );
189        }
190        Ok(())
191    }
192}