Skip to main content

distributed_topic_tracker/gossip/merge/
bubble.rs

1//! Bubble detection: merge isolated peer groups in the same topic.
2//!
3//! If local peer count < `min_neighbors`, extract suggested peers from DHT records and join them.
4
5use actor_helper::{Action, Handle, Receiver};
6use iroh::EndpointId;
7use std::{collections::HashSet, time::Duration};
8
9use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
10use anyhow::Result;
11
12/// Detects and merges small isolated peer groups (bubbles).
13///
14/// Triggers when local peer count < `min_neighbors` and DHT records exist. Extracts `active_peers`
15/// from DHT records and joins them.
16#[derive(Debug, Clone)]
17pub struct BubbleMerge {
18    _api: Handle<BubbleMergeActor, anyhow::Error>,
19}
20
21#[derive(Debug)]
22struct BubbleMergeActor {
23    record_publisher: RecordPublisher,
24    gossip_receiver: GossipReceiver,
25    gossip_sender: GossipSender,
26    ticker: tokio::time::Interval,
27    cancel_token: tokio_util::sync::CancellationToken,
28    max_join_peers: usize,
29    base_interval: Duration,
30    max_jitter: Duration,
31    min_neighbors: usize,
32}
33
34impl BubbleMerge {
35    /// Create a new bubble merge detector.
36    ///
37    /// Spawns a background task that periodically checks cluster size.
38    #[allow(clippy::too_many_arguments)]
39    pub fn new(
40        record_publisher: RecordPublisher,
41        gossip_sender: GossipSender,
42        gossip_receiver: GossipReceiver,
43        cancel_token: tokio_util::sync::CancellationToken,
44        max_join_peers: usize,
45        initial_interval: Duration,
46        base_interval: Duration,
47        max_jitter: Duration,
48        min_neighbors: usize,
49    ) -> Result<Self> {
50        let base_interval = base_interval.max(Duration::from_secs(1));
51        let mut ticker = tokio::time::interval_at(tokio::time::Instant::now() + initial_interval, base_interval);
52        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
53
54        let api = Handle::spawn_with(
55            BubbleMergeActor {
56                record_publisher,
57                gossip_receiver,
58                gossip_sender,
59                ticker,
60                cancel_token,
61                max_join_peers,
62                base_interval,
63                max_jitter,
64                min_neighbors,
65            },
66            |mut actor, rx| async move { actor.run(rx).await },
67        )
68        .0;
69
70        Ok(Self { _api: api })
71    }
72}
73
74impl BubbleMergeActor {
75    async fn run(&mut self, rx: Receiver<Action<BubbleMergeActor>>) -> Result<()> {
76        tracing::debug!("BubbleMerge: starting bubble merge actor");
77        loop {
78            tokio::select! {
79                result = rx.recv_async() => {
80                    match result {
81                        Ok(action) => action(self).await,
82                        Err(_) => break Ok(()),
83                    }
84                }
85                _ = self.ticker.tick() => {
86                    tracing::debug!("BubbleMerge: tick fired, checking for bubbles");
87                    if let Err(e) = self.merge().await {
88                        tracing::warn!("BubbleMerge: error during merge: {:?}", e);
89                    }
90                    let jitter = if self.max_jitter > Duration::ZERO {
91                        Duration::from_nanos((rand::random::<u128>() % self.max_jitter.as_nanos()) as u64)
92                    } else {
93                        Duration::ZERO
94                    };
95                    let next_interval = self.base_interval + jitter;
96                    tracing::debug!("BubbleMerge: next check in {}ms", next_interval.as_millis());
97                    self.ticker.reset_after(next_interval);
98                }
99                _ = self.cancel_token.cancelled() => {
100                    break Ok(());
101                }
102                else => break Ok(()),
103            }
104        }
105    }
106}
107
108impl BubbleMergeActor {
109    // Cluster size as bubble indicator
110    async fn merge(&mut self) -> Result<()> {
111        let unix_minute = crate::unix_minute(0);
112        let mut records = self
113            .record_publisher
114            .get_records(unix_minute - 1, self.cancel_token.clone())
115            .await?;
116        records.extend(
117            self.record_publisher
118                .get_records(unix_minute, self.cancel_token.clone())
119                .await?,
120        );
121
122        let neighbors = self.gossip_receiver.neighbors().await?;
123        tracing::debug!(
124            "BubbleMerge: checking with {} neighbors and {} records",
125            neighbors.len(),
126            records.len()
127        );
128
129        if neighbors.len() < self.min_neighbors && !records.is_empty() {
130            tracing::debug!(
131                "BubbleMerge: detected small bubble ({} neighbors < {}), attempting merge",
132                neighbors.len(),
133                self.min_neighbors
134            );
135            let self_pub_key = EndpointId::from_verifying_key(self.record_publisher.pub_key());
136            let pub_keys = records
137                .iter()
138                .flat_map(|record| {
139                    let mut pub_keys = if let Ok(content) = record.content::<GossipRecordContent>()
140                    {
141                        content
142                            .active_peers
143                            .iter()
144                            .filter_map(|&active_peer| {
145                                if active_peer == [0; 32]
146                                    || neighbors.contains(&active_peer)
147                                    || active_peer == record.pub_key()
148                                    || active_peer.eq(self_pub_key.as_bytes())
149                                {
150                                    None
151                                } else {
152                                    iroh::EndpointId::from_bytes(&active_peer).ok()
153                                }
154                            })
155                            .collect::<Vec<_>>()
156                    } else {
157                        vec![]
158                    };
159                    if let Ok(pub_key) = EndpointId::from_bytes(&record.pub_key())
160                        && !pub_key.eq(&self_pub_key)
161                        && !neighbors.contains(&pub_key)
162                    {
163                        pub_keys.push(pub_key);
164                    }
165                    pub_keys
166                })
167                .collect::<HashSet<_>>();
168
169            if !pub_keys.is_empty() {
170                tracing::debug!(
171                    "BubbleMerge: found {} potential peers to join",
172                    pub_keys.len()
173                );
174
175                self.gossip_sender
176                    .join_peers(
177                        pub_keys.iter().cloned().collect::<Vec<_>>(),
178                        Some(self.max_join_peers),
179                    )
180                    .await?;
181
182                tracing::debug!("BubbleMerge: join_peers request sent");
183            }
184        } else {
185            tracing::debug!(
186                "BubbleMerge: no merge needed (neighbors={}, records={})",
187                neighbors.len(),
188                records.len()
189            );
190        }
191        Ok(())
192    }
193}