distributed_topic_tracker/gossip/merge/
bubble.rs1use actor_helper::{Action, Handle, Receiver};
6use iroh::EndpointId;
7use std::{collections::HashSet, time::Duration};
8
9use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
10use anyhow::Result;
11
12#[derive(Debug, Clone)]
17pub struct BubbleMerge {
18 _api: Handle<BubbleMergeActor, anyhow::Error>,
19}
20
21#[derive(Debug)]
22struct BubbleMergeActor {
23 record_publisher: RecordPublisher,
24 gossip_receiver: GossipReceiver,
25 gossip_sender: GossipSender,
26 ticker: tokio::time::Interval,
27 cancel_token: tokio_util::sync::CancellationToken,
28 max_join_peers: usize,
29 base_interval: Duration,
30 max_jitter: Duration,
31 min_neighbors: usize,
32}
33
34impl BubbleMerge {
35 #[allow(clippy::too_many_arguments)]
39 pub fn new(
40 record_publisher: RecordPublisher,
41 gossip_sender: GossipSender,
42 gossip_receiver: GossipReceiver,
43 cancel_token: tokio_util::sync::CancellationToken,
44 max_join_peers: usize,
45 initial_interval: Duration,
46 base_interval: Duration,
47 max_jitter: Duration,
48 min_neighbors: usize,
49 ) -> Result<Self> {
50 let base_interval = base_interval.max(Duration::from_secs(1));
51 let mut ticker = tokio::time::interval_at(tokio::time::Instant::now() + initial_interval, base_interval);
52 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
53
54 let api = Handle::spawn_with(
55 BubbleMergeActor {
56 record_publisher,
57 gossip_receiver,
58 gossip_sender,
59 ticker,
60 cancel_token,
61 max_join_peers,
62 base_interval,
63 max_jitter,
64 min_neighbors,
65 },
66 |mut actor, rx| async move { actor.run(rx).await },
67 )
68 .0;
69
70 Ok(Self { _api: api })
71 }
72}
73
74impl BubbleMergeActor {
75 async fn run(&mut self, rx: Receiver<Action<BubbleMergeActor>>) -> Result<()> {
76 tracing::debug!("BubbleMerge: starting bubble merge actor");
77 loop {
78 tokio::select! {
79 result = rx.recv_async() => {
80 match result {
81 Ok(action) => action(self).await,
82 Err(_) => break Ok(()),
83 }
84 }
85 _ = self.ticker.tick() => {
86 tracing::debug!("BubbleMerge: tick fired, checking for bubbles");
87 if let Err(e) = self.merge().await {
88 tracing::warn!("BubbleMerge: error during merge: {:?}", e);
89 }
90 let jitter = if self.max_jitter > Duration::ZERO {
91 Duration::from_nanos((rand::random::<u128>() % self.max_jitter.as_nanos()) as u64)
92 } else {
93 Duration::ZERO
94 };
95 let next_interval = self.base_interval + jitter;
96 tracing::debug!("BubbleMerge: next check in {}ms", next_interval.as_millis());
97 self.ticker.reset_after(next_interval);
98 }
99 _ = self.cancel_token.cancelled() => {
100 break Ok(());
101 }
102 else => break Ok(()),
103 }
104 }
105 }
106}
107
108impl BubbleMergeActor {
109 async fn merge(&mut self) -> Result<()> {
111 let unix_minute = crate::unix_minute(0);
112 let mut records = self
113 .record_publisher
114 .get_records(unix_minute - 1, self.cancel_token.clone())
115 .await?;
116 records.extend(
117 self.record_publisher
118 .get_records(unix_minute, self.cancel_token.clone())
119 .await?,
120 );
121
122 let neighbors = self.gossip_receiver.neighbors().await?;
123 tracing::debug!(
124 "BubbleMerge: checking with {} neighbors and {} records",
125 neighbors.len(),
126 records.len()
127 );
128
129 if neighbors.len() < self.min_neighbors && !records.is_empty() {
130 tracing::debug!(
131 "BubbleMerge: detected small bubble ({} neighbors < {}), attempting merge",
132 neighbors.len(),
133 self.min_neighbors
134 );
135 let self_pub_key = EndpointId::from_verifying_key(self.record_publisher.pub_key());
136 let pub_keys = records
137 .iter()
138 .flat_map(|record| {
139 let mut pub_keys = if let Ok(content) = record.content::<GossipRecordContent>()
140 {
141 content
142 .active_peers
143 .iter()
144 .filter_map(|&active_peer| {
145 if active_peer == [0; 32]
146 || neighbors.contains(&active_peer)
147 || active_peer == record.pub_key()
148 || active_peer.eq(self_pub_key.as_bytes())
149 {
150 None
151 } else {
152 iroh::EndpointId::from_bytes(&active_peer).ok()
153 }
154 })
155 .collect::<Vec<_>>()
156 } else {
157 vec![]
158 };
159 if let Ok(pub_key) = EndpointId::from_bytes(&record.pub_key())
160 && !pub_key.eq(&self_pub_key)
161 && !neighbors.contains(&pub_key)
162 {
163 pub_keys.push(pub_key);
164 }
165 pub_keys
166 })
167 .collect::<HashSet<_>>();
168
169 if !pub_keys.is_empty() {
170 tracing::debug!(
171 "BubbleMerge: found {} potential peers to join",
172 pub_keys.len()
173 );
174
175 self.gossip_sender
176 .join_peers(
177 pub_keys.iter().cloned().collect::<Vec<_>>(),
178 Some(self.max_join_peers),
179 )
180 .await?;
181
182 tracing::debug!("BubbleMerge: join_peers request sent");
183 }
184 } else {
185 tracing::debug!(
186 "BubbleMerge: no merge needed (neighbors={}, records={})",
187 neighbors.len(),
188 records.len()
189 );
190 }
191 Ok(())
192 }
193}