distributed_topic_tracker/gossip/topic/
bootstrap.rs1use std::collections::HashSet;
4
5use actor_helper::{Handle, act, act_ok};
6use anyhow::Result;
7use iroh::EndpointId;
8use tokio::time::sleep;
9use tokio_util::sync::CancellationToken;
10
11use crate::{
12 GossipSender, MAX_MESSAGE_HASHES, MAX_RECORD_PEERS, RecordPublisher,
13 config::BootstrapConfig,
14 crypto::Record,
15 gossip::{GossipRecordContent, receiver::GossipReceiver},
16};
17
18#[derive(Debug, Clone)]
23pub struct Bootstrap {
24 api: Handle<BootstrapActor, anyhow::Error>,
25}
26
27#[derive(Debug)]
28struct BootstrapActor {
29 record_publisher: crate::crypto::RecordPublisher,
30 gossip_sender: GossipSender,
31 gossip_receiver: GossipReceiver,
32 cancel_token: tokio_util::sync::CancellationToken,
33 config: BootstrapConfig,
34}
35
36impl Bootstrap {
37 pub async fn new(
39 record_publisher: crate::crypto::RecordPublisher,
40 gossip: iroh_gossip::net::Gossip,
41 cancel_token: tokio_util::sync::CancellationToken,
42 timeout_config: crate::config::TimeoutConfig,
43 bootstrap_config: BootstrapConfig,
44 ) -> Result<Self> {
45 let gossip_topic: iroh_gossip::api::GossipTopic = gossip
46 .subscribe(
47 iroh_gossip::proto::TopicId::from(record_publisher.topic_id().hash()),
48 vec![],
49 )
50 .await?;
51 let (gossip_sender, gossip_receiver) = gossip_topic.split();
52 let (gossip_sender, gossip_receiver) = (
53 GossipSender::new(gossip_sender, timeout_config),
54 GossipReceiver::new(gossip_receiver, cancel_token.clone()),
55 );
56
57 let api = Handle::spawn(BootstrapActor {
58 record_publisher,
59 gossip_sender,
60 gossip_receiver,
61 cancel_token,
62 config: bootstrap_config,
63 })
64 .0;
65
66 Ok(Self { api })
67 }
68
69 pub async fn bootstrap(&self) -> Result<tokio::sync::oneshot::Receiver<Result<()>>> {
73 self.api.call(act!(actor=> actor.start_bootstrap())).await
74 }
75
76 pub async fn gossip_sender(&self) -> Result<GossipSender> {
78 self.api
79 .call(act_ok!(actor => async move { actor.gossip_sender.clone() }))
80 .await
81 }
82
83 pub async fn gossip_receiver(&self) -> Result<GossipReceiver> {
85 self.api
86 .call(act_ok!(actor => async move { actor.gossip_receiver.clone() }))
87 .await
88 }
89}
90
91impl BootstrapActor {
92 pub async fn start_bootstrap(&mut self) -> Result<tokio::sync::oneshot::Receiver<Result<()>>> {
93 let (sender, receiver) = tokio::sync::oneshot::channel();
94 tokio::spawn({
95 let mut last_published_unix_minute = 0;
96 let (gossip_sender, mut gossip_receiver) =
97 (self.gossip_sender.clone(), self.gossip_receiver.clone());
98 let record_publisher = self.record_publisher.clone();
99 let cancel_token = self.cancel_token.clone();
100 let bootstrap_config = self.config.clone();
101 let mut is_joined_ret = false;
102
103 if self.config.publish_record_on_startup() {
104 let unix_minute = crate::unix_minute(0);
105 tracing::debug!("Bootstrap: initial startup record publish {}", unix_minute);
106 last_published_unix_minute = if self.config.check_older_records_first_on_startup() {
107 0
108 } else {
109 unix_minute
110 };
111 let record_creator = record_publisher.clone();
112 let record_content = GossipRecordContent {
113 active_peers: [[0; 32]; MAX_RECORD_PEERS],
114 last_message_hashes: [[0; 32]; MAX_MESSAGE_HASHES],
115 };
116 if let Ok(record) = Record::sign(
117 record_publisher.topic_id().hash(),
118 unix_minute,
119 record_content,
120 record_publisher.signing_key(),
121 ) {
122 publish_record_fire_and_forget(
123 record_creator,
124 record,
125 None,
126 cancel_token.clone(),
127 );
128 }
129 }
130
131 async move {
132 tracing::debug!("Bootstrap: starting bootstrap process");
133 'bootstrap: while !cancel_token.is_cancelled() {
134 let is_joined = gossip_receiver.is_joined().await;
136 if let Ok(is_joined) = is_joined
137 && is_joined
138 {
139 tracing::debug!("Bootstrap: already joined, exiting bootstrap loop");
140 is_joined_ret = true;
141 break;
142 } else if let Err(e) = is_joined {
143 tracing::debug!("Bootstrap: error checking join status: {:?}", e);
144 break;
145 }
146
147 let current_unix_minute = crate::unix_minute(0);
148
149 let mut use_cached_next = true;
150 let unix_minute_offset = if last_published_unix_minute == 0
152 && bootstrap_config.check_older_records_first_on_startup()
153 {
154 use_cached_next = false;
155 1
156 } else {
157 0
158 };
159
160 let mut records = record_publisher
162 .get_records(
163 current_unix_minute.saturating_sub(unix_minute_offset + 1),
164 cancel_token.clone(),
165 )
166 .await
167 .unwrap_or_default();
168 let current_records = record_publisher
169 .get_records(
170 current_unix_minute.saturating_sub(unix_minute_offset),
171 cancel_token.clone(),
172 )
173 .await
174 .unwrap_or_default();
175 records.extend(current_records.clone());
176
177 tracing::debug!(
178 "Bootstrap: fetched {} records for unix_minute {}",
179 records.len(),
180 current_unix_minute
181 );
182
183 if records.is_empty() {
186 if current_unix_minute != last_published_unix_minute {
187 tracing::debug!(
188 "Bootstrap: no records found, publishing own record for unix_minute {}",
189 current_unix_minute
190 );
191 last_published_unix_minute = current_unix_minute;
192 let record_creator = record_publisher.clone();
193 let record_content = GossipRecordContent {
194 active_peers: [[0; 32]; MAX_RECORD_PEERS],
195 last_message_hashes: [[0; 32]; MAX_MESSAGE_HASHES],
196 };
197 if let Ok(record) = Record::sign(
198 record_publisher.topic_id().hash(),
199 current_unix_minute,
200 record_content,
201 record_publisher.signing_key(),
202 ) {
203 publish_record_fire_and_forget(
204 record_creator,
205 record,
206 if use_cached_next {
207 Some(current_records.clone())
208 } else {
209 None
210 },
211 cancel_token.clone(),
212 );
213 }
214 }
215 tokio::select! {
216 _ = sleep(bootstrap_config.no_peers_retry_interval()) => {}
217 _ = gossip_receiver.joined() => continue,
218 _ = cancel_token.cancelled() => break,
219 }
220 continue;
221 }
222
223 let bootstrap_nodes = records
227 .iter()
228 .flat_map(|record| {
229 let mut v = vec![record.pub_key()];
231
232 if let Ok(record_content) = record.content::<GossipRecordContent>() {
233 for peer in record_content.active_peers {
234 if peer != [0; 32]
235 && !peer.eq(record_publisher.pub_key().as_bytes())
236 {
237 v.push(peer);
238 }
239 }
240 }
241 v
242 })
243 .filter_map(|pub_key| EndpointId::from_bytes(&pub_key).ok())
244 .collect::<HashSet<_>>();
245
246 tracing::debug!(
247 "Bootstrap: extracted {} potential bootstrap nodes",
248 bootstrap_nodes.len()
249 );
250
251 let is_joined = gossip_receiver.is_joined().await;
255 if let Ok(is_joined) = is_joined
256 && is_joined
257 {
258 tracing::debug!("Bootstrap: joined while processing records, exiting");
259 is_joined_ret = true;
260 break;
261 } else if let Err(e) = is_joined {
262 tracing::debug!("Bootstrap: error checking join status: {:?}", e);
263 break;
264 }
265
266 for pub_key in bootstrap_nodes.iter() {
269 match gossip_sender.join_peers(vec![*pub_key], None).await {
270 Ok(_) => {
271 tracing::debug!("Bootstrap: attempted to join peer {}", pub_key);
272
273 tokio::select! {
274 _ = sleep(bootstrap_config.per_peer_join_settle_time()) => {}
275 _ = gossip_receiver.joined() => {},
276 _ = cancel_token.cancelled() => break 'bootstrap,
277 }
278 let is_joined = gossip_receiver.is_joined().await;
279 if let Ok(is_joined) = is_joined
280 && is_joined
281 {
282 tracing::debug!(
283 "Bootstrap: successfully joined via peer {}",
284 pub_key
285 );
286 is_joined_ret = true;
287 break;
288 } else if let Err(e) = is_joined {
289 tracing::debug!(
290 "Bootstrap: error checking join status: {:?}",
291 e
292 );
293 break;
294 }
295 }
296 Err(e) => {
297 tracing::debug!(
298 "Bootstrap: failed to join peer {}: {:?}",
299 pub_key,
300 e
301 );
302 continue;
303 }
304 }
305 }
306
307 let is_joined = gossip_receiver.is_joined().await;
310 if let Ok(is_joined) = is_joined
311 && !is_joined
312 {
313 tracing::debug!(
314 "Bootstrap: not joined yet, waiting {:?} before final check",
315 bootstrap_config.join_confirmation_wait_time()
316 );
317 tokio::select! {
318 _ = sleep(bootstrap_config.join_confirmation_wait_time()) => {}
319 _ = gossip_receiver.joined() => {},
320 _ = cancel_token.cancelled() => break,
321 }
322 } else if let Err(e) = is_joined {
323 tracing::debug!("Bootstrap: error checking join status: {:?}", e);
324 break;
325 }
326
327 let is_joined = gossip_receiver.is_joined().await;
329 if let Ok(is_joined) = is_joined
330 && is_joined
331 {
332 tracing::debug!("Bootstrap: successfully joined after final wait");
333 is_joined_ret = true;
334 break;
335 } else if let Err(e) = is_joined {
336 tracing::debug!("Bootstrap: error checking join status: {:?}", e);
337 break;
338 } else {
339 tracing::debug!("Bootstrap: still not joined after attempting all peers");
340 if current_unix_minute != last_published_unix_minute {
342 tracing::debug!(
343 "Bootstrap: publishing fallback record for unix_minute {}",
344 current_unix_minute
345 );
346 last_published_unix_minute = current_unix_minute;
347 let record_creator = record_publisher.clone();
348 if let Ok(record) = Record::sign(
349 record_publisher.topic_id().hash(),
350 current_unix_minute,
351 GossipRecordContent {
352 active_peers: [[0; 32]; MAX_RECORD_PEERS],
353 last_message_hashes: [[0; 32]; MAX_MESSAGE_HASHES],
354 },
355 record_publisher.signing_key(),
356 ) {
357 publish_record_fire_and_forget(
358 record_creator,
359 record,
360 if use_cached_next {
361 Some(current_records)
362 } else {
363 None
364 },
365 cancel_token.clone(),
366 );
367 }
368 }
369 tokio::select! {
370 _ = sleep(bootstrap_config.discovery_poll_interval()) => continue,
371 _ = gossip_receiver.joined() => continue,
372 _ = cancel_token.cancelled() => break,
373 }
374 }
375 }
376 tracing::debug!("Bootstrap: exited");
377
378 if is_joined_ret {
379 let _ = sender.send(Ok(()));
380 } else {
381 let _ = sender.send(Err(anyhow::anyhow!(
382 "Bootstrap process failed or was cancelled"
383 )));
384 }
385 }
386 });
387
388 Ok(receiver)
389 }
390}
391
392fn publish_record_fire_and_forget(
393 record_publisher: RecordPublisher,
394 record: Record,
395 cached_records: Option<HashSet<Record>>,
396 cancel_token: CancellationToken,
397) {
398 tokio::spawn(async move {
399 if let Err(err) = record_publisher
400 .publish_record_cached_records(record, cached_records, cancel_token)
401 .await
402 {
403 tracing::warn!("Failed to publish record: {:?}", err);
404 }
405 });
406}