distributed_topic_tracker/gossip/merge/
bubble.rs1use actor_helper::{Action, Handle, Receiver};
6use iroh::EndpointId;
7use std::{collections::HashSet, time::Duration};
8
9use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
10use anyhow::Result;
11
12#[derive(Debug, Clone)]
17pub struct BubbleMerge {
18 _api: Handle<BubbleMergeActor, anyhow::Error>,
19}
20
21#[derive(Debug)]
22struct BubbleMergeActor {
23 record_publisher: RecordPublisher,
24 gossip_receiver: GossipReceiver,
25 gossip_sender: GossipSender,
26 ticker: tokio::time::Interval,
27 cancel_token: tokio_util::sync::CancellationToken,
28 max_join_peers: usize,
29 base_interval: Duration,
30 max_jitter: Duration,
31 min_neighbors: usize,
32}
33
34impl BubbleMerge {
35 #[allow(clippy::too_many_arguments)]
39 pub fn new(
40 record_publisher: RecordPublisher,
41 gossip_sender: GossipSender,
42 gossip_receiver: GossipReceiver,
43 cancel_token: tokio_util::sync::CancellationToken,
44 max_join_peers: usize,
45 base_interval: Duration,
46 max_jitter: Duration,
47 min_neighbors: usize,
48 ) -> Result<Self> {
49 let base_interval = base_interval.max(Duration::from_secs(1));
50 let mut ticker = tokio::time::interval(base_interval);
51 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
52
53 let api = Handle::spawn_with(
54 BubbleMergeActor {
55 record_publisher,
56 gossip_receiver,
57 gossip_sender,
58 ticker,
59 cancel_token,
60 max_join_peers,
61 base_interval,
62 max_jitter,
63 min_neighbors,
64 },
65 |mut actor, rx| async move { actor.run(rx).await },
66 )
67 .0;
68
69 Ok(Self { _api: api })
70 }
71}
72
73impl BubbleMergeActor {
74 async fn run(&mut self, rx: Receiver<Action<BubbleMergeActor>>) -> Result<()> {
75 tracing::debug!("BubbleMerge: starting bubble merge actor");
76 loop {
77 tokio::select! {
78 result = rx.recv_async() => {
79 match result {
80 Ok(action) => action(self).await,
81 Err(_) => break Ok(()),
82 }
83 }
84 _ = self.ticker.tick() => {
85 tracing::debug!("BubbleMerge: tick fired, checking for bubbles");
86 if let Err(e) = self.merge().await {
87 tracing::warn!("BubbleMerge: error during merge: {:?}", e);
88 }
89 let jitter = if self.max_jitter > Duration::ZERO {
90 Duration::from_nanos((rand::random::<u128>() % self.max_jitter.as_nanos()) as u64)
91 } else {
92 Duration::ZERO
93 };
94 let next_interval = self.base_interval + jitter;
95 tracing::debug!("BubbleMerge: next check in {}ms", next_interval.as_millis());
96 self.ticker.reset_after(next_interval);
97 }
98 _ = self.cancel_token.cancelled() => {
99 break Ok(());
100 }
101 else => break Ok(()),
102 }
103 }
104 }
105}
106
107impl BubbleMergeActor {
108 async fn merge(&mut self) -> Result<()> {
110 let unix_minute = crate::unix_minute(0);
111 let mut records = self
112 .record_publisher
113 .get_records(unix_minute - 1, self.cancel_token.clone())
114 .await?;
115 records.extend(
116 self.record_publisher
117 .get_records(unix_minute, self.cancel_token.clone())
118 .await?,
119 );
120
121 let neighbors = self.gossip_receiver.neighbors().await?;
122 tracing::debug!(
123 "BubbleMerge: checking with {} neighbors and {} records",
124 neighbors.len(),
125 records.len()
126 );
127
128 if neighbors.len() < self.min_neighbors && !records.is_empty() {
129 tracing::debug!(
130 "BubbleMerge: detected small bubble ({} neighbors < {}), attempting merge",
131 neighbors.len(),
132 self.min_neighbors
133 );
134 let self_pub_key = EndpointId::from_verifying_key(self.record_publisher.pub_key());
135 let pub_keys = records
136 .iter()
137 .flat_map(|record| {
138 let mut pub_keys = if let Ok(content) = record.content::<GossipRecordContent>()
139 {
140 content
141 .active_peers
142 .iter()
143 .filter_map(|&active_peer| {
144 if active_peer == [0; 32]
145 || neighbors.contains(&active_peer)
146 || active_peer == record.pub_key()
147 || active_peer.eq(self_pub_key.as_bytes())
148 {
149 None
150 } else {
151 iroh::EndpointId::from_bytes(&active_peer).ok()
152 }
153 })
154 .collect::<Vec<_>>()
155 } else {
156 vec![]
157 };
158 if let Ok(pub_key) = EndpointId::from_bytes(&record.pub_key())
159 && !pub_key.eq(&self_pub_key)
160 && !neighbors.contains(&pub_key)
161 {
162 pub_keys.push(pub_key);
163 }
164 pub_keys
165 })
166 .collect::<HashSet<_>>();
167
168 if !pub_keys.is_empty() {
169 tracing::debug!(
170 "BubbleMerge: found {} potential peers to join",
171 pub_keys.len()
172 );
173
174 self.gossip_sender
175 .join_peers(
176 pub_keys.iter().cloned().collect::<Vec<_>>(),
177 Some(self.max_join_peers),
178 )
179 .await?;
180
181 tracing::debug!("BubbleMerge: join_peers request sent");
182 }
183 } else {
184 tracing::debug!(
185 "BubbleMerge: no merge needed (neighbors={}, records={})",
186 neighbors.len(),
187 records.len()
188 );
189 }
190 Ok(())
191 }
192}