distributed_topic_tracker/gossip/topic/
bootstrap.rs1use std::collections::HashSet;
4
5use actor_helper::{Handle, act, act_ok};
6use anyhow::Result;
7use iroh::EndpointId;
8use tokio::time::sleep;
9use tokio_util::sync::CancellationToken;
10
11use crate::{
12 GossipSender, MAX_MESSAGE_HASHES, MAX_RECORD_PEERS, RecordPublisher, TimeoutConfig, config::BootstrapConfig, crypto::Record, gossip::{GossipRecordContent, receiver::GossipReceiver}
13};
14
15#[derive(Debug, Clone)]
20pub struct Bootstrap {
21 api: Handle<BootstrapActor, anyhow::Error>,
22}
23
24#[derive(Debug)]
25struct BootstrapActor {
26 record_publisher: RecordPublisher,
27 gossip_sender: GossipSender,
28 gossip_receiver: GossipReceiver,
29 cancel_token: tokio_util::sync::CancellationToken,
30 config: BootstrapConfig,
31}
32
33impl Bootstrap {
34 pub async fn new(
36 record_publisher: RecordPublisher,
37 gossip: iroh_gossip::net::Gossip,
38 cancel_token: tokio_util::sync::CancellationToken,
39 timeout_config: TimeoutConfig,
40 bootstrap_config: BootstrapConfig,
41 ) -> Result<Self> {
42 let gossip_topic: iroh_gossip::api::GossipTopic = gossip
43 .subscribe(
44 iroh_gossip::proto::TopicId::from(record_publisher.topic_id().hash()),
45 vec![],
46 )
47 .await?;
48 let (gossip_sender, gossip_receiver) = gossip_topic.split();
49 let (gossip_sender, gossip_receiver) = (
50 GossipSender::new(gossip_sender, timeout_config),
51 GossipReceiver::new(gossip_receiver, cancel_token.clone()),
52 );
53
54 let api = Handle::spawn(BootstrapActor {
55 record_publisher,
56 gossip_sender,
57 gossip_receiver,
58 cancel_token,
59 config: bootstrap_config,
60 })
61 .0;
62
63 Ok(Self { api })
64 }
65
66 pub async fn bootstrap(&self) -> Result<tokio::sync::oneshot::Receiver<Result<()>>> {
70 self.api.call(act!(actor=> actor.start_bootstrap())).await
71 }
72
73 pub async fn gossip_sender(&self) -> Result<GossipSender> {
75 self.api
76 .call(act_ok!(actor => async move { actor.gossip_sender.clone() }))
77 .await
78 }
79
80 pub async fn gossip_receiver(&self) -> Result<GossipReceiver> {
82 self.api
83 .call(act_ok!(actor => async move { actor.gossip_receiver.clone() }))
84 .await
85 }
86}
87
88impl BootstrapActor {
89 pub async fn start_bootstrap(&mut self) -> Result<tokio::sync::oneshot::Receiver<Result<()>>> {
90 let (sender, receiver) = tokio::sync::oneshot::channel();
91 tokio::spawn({
92 let mut last_published_unix_minute = 0;
93 let (gossip_sender, mut gossip_receiver) =
94 (self.gossip_sender.clone(), self.gossip_receiver.clone());
95 let record_publisher = self.record_publisher.clone();
96 let cancel_token = self.cancel_token.clone();
97 let bootstrap_config = self.config.clone();
98 let mut is_joined_ret = false;
99
100 if self.config.publish_record_on_startup() {
101 let unix_minute = crate::unix_minute(0);
102 tracing::debug!("Bootstrap: initial startup record publish {}", unix_minute);
103 last_published_unix_minute = if self.config.check_older_records_first_on_startup() {
104 0
105 } else {
106 unix_minute
107 };
108 let record_creator = record_publisher.clone();
109 let record_content = GossipRecordContent {
110 active_peers: [[0; 32]; MAX_RECORD_PEERS],
111 last_message_hashes: [[0; 32]; MAX_MESSAGE_HASHES],
112 };
113 if let Ok(record) = Record::sign(
114 record_publisher.topic_id().hash(),
115 unix_minute,
116 record_content,
117 record_publisher.signing_key(),
118 ) {
119 publish_record_fire_and_forget(
120 record_creator,
121 record,
122 None,
123 cancel_token.clone(),
124 );
125 }
126 }
127
128 async move {
129 tracing::debug!("Bootstrap: starting bootstrap process");
130 'bootstrap: while !cancel_token.is_cancelled() {
131 let is_joined = gossip_receiver.is_joined().await;
133 if let Ok(is_joined) = is_joined
134 && is_joined
135 {
136 tracing::debug!("Bootstrap: already joined, exiting bootstrap loop");
137 is_joined_ret = true;
138 break;
139 } else if let Err(e) = is_joined {
140 tracing::debug!("Bootstrap: error checking join status: {:?}", e);
141 break;
142 }
143
144 let current_unix_minute = crate::unix_minute(0);
145
146 let mut use_cached_next = true;
147 let unix_minute_offset = if last_published_unix_minute == 0
149 && bootstrap_config.check_older_records_first_on_startup()
150 {
151 use_cached_next = false;
152 1
153 } else {
154 0
155 };
156
157 let mut records = record_publisher
159 .get_records(
160 current_unix_minute.saturating_sub(unix_minute_offset + 1),
161 cancel_token.clone(),
162 )
163 .await
164 .unwrap_or_default();
165 let current_records = record_publisher
166 .get_records(
167 current_unix_minute.saturating_sub(unix_minute_offset),
168 cancel_token.clone(),
169 )
170 .await
171 .unwrap_or_default();
172 records.extend(current_records.clone());
173
174 tracing::debug!(
175 "Bootstrap: fetched {} records for unix_minute {}",
176 records.len(),
177 current_unix_minute
178 );
179
180 if records.is_empty() {
183 if current_unix_minute != last_published_unix_minute {
184 tracing::debug!(
185 "Bootstrap: no records found, publishing own record for unix_minute {}",
186 current_unix_minute
187 );
188 last_published_unix_minute = current_unix_minute;
189 let record_creator = record_publisher.clone();
190 let record_content = GossipRecordContent {
191 active_peers: [[0; 32]; MAX_RECORD_PEERS],
192 last_message_hashes: [[0; 32]; MAX_MESSAGE_HASHES],
193 };
194 if let Ok(record) = Record::sign(
195 record_publisher.topic_id().hash(),
196 current_unix_minute,
197 record_content,
198 record_publisher.signing_key(),
199 ) {
200 publish_record_fire_and_forget(
201 record_creator,
202 record,
203 if use_cached_next {
204 Some(current_records.clone())
205 } else {
206 None
207 },
208 cancel_token.clone(),
209 );
210 }
211 }
212 tokio::select! {
213 _ = sleep(bootstrap_config.no_peers_retry_interval()) => {}
214 _ = gossip_receiver.joined() => continue,
215 _ = cancel_token.cancelled() => break,
216 }
217 continue;
218 }
219
220 let bootstrap_nodes = records
224 .iter()
225 .flat_map(|record| {
226 let mut v = vec![record.pub_key()];
228
229 if let Ok(record_content) = record.content::<GossipRecordContent>() {
230 for peer in record_content.active_peers {
231 if peer != [0; 32]
232 && !peer.eq(record_publisher.pub_key().as_bytes())
233 {
234 v.push(peer);
235 }
236 }
237 }
238 v
239 })
240 .filter_map(|pub_key| EndpointId::from_bytes(&pub_key).ok())
241 .collect::<HashSet<_>>();
242
243 tracing::debug!(
244 "Bootstrap: extracted {} potential bootstrap nodes",
245 bootstrap_nodes.len()
246 );
247
248 let is_joined = gossip_receiver.is_joined().await;
252 if let Ok(is_joined) = is_joined
253 && is_joined
254 {
255 tracing::debug!("Bootstrap: joined while processing records, exiting");
256 is_joined_ret = true;
257 break;
258 } else if let Err(e) = is_joined {
259 tracing::debug!("Bootstrap: error checking join status: {:?}", e);
260 break;
261 }
262
263 for pub_key in bootstrap_nodes.iter() {
266 match gossip_sender.join_peers(vec![*pub_key], None).await {
267 Ok(_) => {
268 tracing::debug!("Bootstrap: attempted to join peer {}", pub_key);
269
270 tokio::select! {
271 _ = sleep(bootstrap_config.per_peer_join_settle_time()) => {}
272 _ = gossip_receiver.joined() => {},
273 _ = cancel_token.cancelled() => break 'bootstrap,
274 }
275 let is_joined = gossip_receiver.is_joined().await;
276 if let Ok(is_joined) = is_joined
277 && is_joined
278 {
279 tracing::debug!(
280 "Bootstrap: successfully joined via peer {}",
281 pub_key
282 );
283 is_joined_ret = true;
284 break;
285 } else if let Err(e) = is_joined {
286 tracing::debug!(
287 "Bootstrap: error checking join status: {:?}",
288 e
289 );
290 break;
291 }
292 }
293 Err(e) => {
294 tracing::debug!(
295 "Bootstrap: failed to join peer {}: {:?}",
296 pub_key,
297 e
298 );
299 continue;
300 }
301 }
302 }
303
304 let is_joined = gossip_receiver.is_joined().await;
307 if let Ok(is_joined) = is_joined
308 && !is_joined
309 {
310 tracing::debug!(
311 "Bootstrap: not joined yet, waiting {:?} before final check",
312 bootstrap_config.join_confirmation_wait_time()
313 );
314 tokio::select! {
315 _ = sleep(bootstrap_config.join_confirmation_wait_time()) => {}
316 _ = gossip_receiver.joined() => {},
317 _ = cancel_token.cancelled() => break,
318 }
319 } else if let Err(e) = is_joined {
320 tracing::debug!("Bootstrap: error checking join status: {:?}", e);
321 break;
322 }
323
324 let is_joined = gossip_receiver.is_joined().await;
326 if let Ok(is_joined) = is_joined
327 && is_joined
328 {
329 tracing::debug!("Bootstrap: successfully joined after final wait");
330 is_joined_ret = true;
331 break;
332 } else if let Err(e) = is_joined {
333 tracing::debug!("Bootstrap: error checking join status: {:?}", e);
334 break;
335 } else {
336 tracing::debug!("Bootstrap: still not joined after attempting all peers");
337 if current_unix_minute != last_published_unix_minute {
339 tracing::debug!(
340 "Bootstrap: publishing fallback record for unix_minute {}",
341 current_unix_minute
342 );
343 last_published_unix_minute = current_unix_minute;
344 let record_creator = record_publisher.clone();
345 if let Ok(record) = Record::sign(
346 record_publisher.topic_id().hash(),
347 current_unix_minute,
348 GossipRecordContent {
349 active_peers: [[0; 32]; MAX_RECORD_PEERS],
350 last_message_hashes: [[0; 32]; MAX_MESSAGE_HASHES],
351 },
352 record_publisher.signing_key(),
353 ) {
354 publish_record_fire_and_forget(
355 record_creator,
356 record,
357 if use_cached_next {
358 Some(current_records)
359 } else {
360 None
361 },
362 cancel_token.clone(),
363 );
364 }
365 }
366 tokio::select! {
367 _ = sleep(bootstrap_config.discovery_poll_interval()) => continue,
368 _ = gossip_receiver.joined() => continue,
369 _ = cancel_token.cancelled() => break,
370 }
371 }
372 }
373 tracing::debug!("Bootstrap: exited");
374
375 if is_joined_ret {
376 let _ = sender.send(Ok(()));
377 } else {
378 let _ = sender.send(Err(anyhow::anyhow!(
379 "Bootstrap process failed or was cancelled"
380 )));
381 }
382 }
383 });
384
385 Ok(receiver)
386 }
387}
388
389fn publish_record_fire_and_forget(
390 record_publisher: RecordPublisher,
391 record: Record,
392 cached_records: Option<HashSet<Record>>,
393 cancel_token: CancellationToken,
394) {
395 tokio::spawn(async move {
396 if let Err(err) = record_publisher
397 .publish_record_cached_records(record, cached_records, cancel_token)
398 .await
399 {
400 tracing::warn!("Failed to publish record: {:?}", err);
401 }
402 });
403}