Skip to main content

distributed_topic_tracker/gossip/topic/
bootstrap.rs

1//! Bootstrap process for discovering and joining peers via DHT.
2
3use std::collections::HashSet;
4
5use actor_helper::{Handle, act, act_ok};
6use anyhow::Result;
7use iroh::EndpointId;
8use tokio::time::sleep;
9use tokio_util::sync::CancellationToken;
10
11use crate::{
12    GossipSender, MAX_MESSAGE_HASHES, MAX_RECORD_PEERS, RecordPublisher, TimeoutConfig, config::BootstrapConfig, crypto::Record, gossip::{GossipRecordContent, receiver::GossipReceiver}
13};
14
15/// Manages the peer discovery and joining process.
16///
17/// Queries DHT for bootstrap records, extracts node IDs, and progressively
18/// joins peers until the local node is connected to the topic.
19#[derive(Debug, Clone)]
20pub struct Bootstrap {
21    api: Handle<BootstrapActor, anyhow::Error>,
22}
23
24#[derive(Debug)]
25struct BootstrapActor {
26    record_publisher: RecordPublisher,
27    gossip_sender: GossipSender,
28    gossip_receiver: GossipReceiver,
29    cancel_token: tokio_util::sync::CancellationToken,
30    config: BootstrapConfig,
31}
32
33impl Bootstrap {
34    /// Create a new bootstrap process for a topic.
35    pub async fn new(
36        record_publisher: RecordPublisher,
37        gossip: iroh_gossip::net::Gossip,
38        cancel_token: tokio_util::sync::CancellationToken,
39        timeout_config: TimeoutConfig,
40        bootstrap_config: BootstrapConfig,
41    ) -> Result<Self> {
42        let gossip_topic: iroh_gossip::api::GossipTopic = gossip
43            .subscribe(
44                iroh_gossip::proto::TopicId::from(record_publisher.topic_id().hash()),
45                vec![],
46            )
47            .await?;
48        let (gossip_sender, gossip_receiver) = gossip_topic.split();
49        let (gossip_sender, gossip_receiver) = (
50            GossipSender::new(gossip_sender, timeout_config),
51            GossipReceiver::new(gossip_receiver, cancel_token.clone()),
52        );
53
54        let api = Handle::spawn(BootstrapActor {
55            record_publisher,
56            gossip_sender,
57            gossip_receiver,
58            cancel_token,
59            config: bootstrap_config,
60        })
61        .0;
62
63        Ok(Self { api })
64    }
65
66    /// Start the bootstrap process.
67    ///
68    /// Returns a receiver that signals completion when the node has joined the topic (has at least one neighbor).
69    pub async fn bootstrap(&self) -> Result<tokio::sync::oneshot::Receiver<Result<()>>> {
70        self.api.call(act!(actor=> actor.start_bootstrap())).await
71    }
72
73    /// Get the gossip sender for this topic.
74    pub async fn gossip_sender(&self) -> Result<GossipSender> {
75        self.api
76            .call(act_ok!(actor => async move { actor.gossip_sender.clone() }))
77            .await
78    }
79
80    /// Get the gossip receiver for this topic.
81    pub async fn gossip_receiver(&self) -> Result<GossipReceiver> {
82        self.api
83            .call(act_ok!(actor => async move { actor.gossip_receiver.clone() }))
84            .await
85    }
86}
87
88impl BootstrapActor {
89    pub async fn start_bootstrap(&mut self) -> Result<tokio::sync::oneshot::Receiver<Result<()>>> {
90        let (sender, receiver) = tokio::sync::oneshot::channel();
91        tokio::spawn({
92            let mut last_published_unix_minute = 0;
93            let (gossip_sender, mut gossip_receiver) =
94                (self.gossip_sender.clone(), self.gossip_receiver.clone());
95            let record_publisher = self.record_publisher.clone();
96            let cancel_token = self.cancel_token.clone();
97            let bootstrap_config = self.config.clone();
98            let mut is_joined_ret = false;
99
100            if self.config.publish_record_on_startup() {
101                let unix_minute = crate::unix_minute(0);
102                tracing::debug!("Bootstrap: initial startup record publish {}", unix_minute);
103                last_published_unix_minute = if self.config.check_older_records_first_on_startup() {
104                    0
105                } else {
106                    unix_minute
107                };
108                let record_creator = record_publisher.clone();
109                let record_content = GossipRecordContent {
110                    active_peers: [[0; 32]; MAX_RECORD_PEERS],
111                    last_message_hashes: [[0; 32]; MAX_MESSAGE_HASHES],
112                };
113                if let Ok(record) = Record::sign(
114                    record_publisher.topic_id().hash(),
115                    unix_minute,
116                    record_content,
117                    record_publisher.signing_key(),
118                ) {
119                    publish_record_fire_and_forget(
120                        record_creator,
121                        record,
122                        None,
123                        cancel_token.clone(),
124                    );
125                }
126            }
127
128            async move {
129                tracing::debug!("Bootstrap: starting bootstrap process");
130                'bootstrap: while !cancel_token.is_cancelled() {
131                    // Check if we are connected to at least one node
132                    let is_joined = gossip_receiver.is_joined().await;
133                    if let Ok(is_joined) = is_joined
134                        && is_joined
135                    {
136                        tracing::debug!("Bootstrap: already joined, exiting bootstrap loop");
137                        is_joined_ret = true;
138                        break;
139                    } else if let Err(e) = is_joined {
140                        tracing::debug!("Bootstrap: error checking join status: {:?}", e);
141                        break;
142                    }
143
144                    let current_unix_minute = crate::unix_minute(0);
145
146                    let mut use_cached_next = true;
147                    // last_published_unix_minute == 0 means first run
148                    let unix_minute_offset = if last_published_unix_minute == 0
149                        && bootstrap_config.check_older_records_first_on_startup()
150                    {
151                        use_cached_next = false;
152                        1
153                    } else {
154                        0
155                    };
156
157                    // Unique, verified records for the unix minute
158                    let mut records = record_publisher
159                        .get_records(
160                            current_unix_minute.saturating_sub(unix_minute_offset + 1),
161                            cancel_token.clone(),
162                        )
163                        .await
164                        .unwrap_or_default();
165                    let current_records = record_publisher
166                        .get_records(
167                            current_unix_minute.saturating_sub(unix_minute_offset),
168                            cancel_token.clone(),
169                        )
170                        .await
171                        .unwrap_or_default();
172                    records.extend(current_records.clone());
173
174                    tracing::debug!(
175                        "Bootstrap: fetched {} records for unix_minute {}",
176                        records.len(),
177                        current_unix_minute
178                    );
179
180                    // If there are no records, invoke the publish_proc (the publishing procedure)
181                    // continue the loop after
182                    if records.is_empty() {
183                        if current_unix_minute != last_published_unix_minute {
184                            tracing::debug!(
185                                "Bootstrap: no records found, publishing own record for unix_minute {}",
186                                current_unix_minute
187                            );
188                            last_published_unix_minute = current_unix_minute;
189                            let record_creator = record_publisher.clone();
190                            let record_content = GossipRecordContent {
191                                active_peers: [[0; 32]; MAX_RECORD_PEERS],
192                                last_message_hashes: [[0; 32]; MAX_MESSAGE_HASHES],
193                            };
194                            if let Ok(record) = Record::sign(
195                                record_publisher.topic_id().hash(),
196                                current_unix_minute,
197                                record_content,
198                                record_publisher.signing_key(),
199                            ) {
200                                publish_record_fire_and_forget(
201                                    record_creator,
202                                    record,
203                                    if use_cached_next {
204                                        Some(current_records.clone())
205                                    } else {
206                                        None
207                                    },
208                                    cancel_token.clone(),
209                                );
210                            }
211                        }
212                        tokio::select! {
213                            _ = sleep(bootstrap_config.no_peers_retry_interval()) => {}
214                            _ = gossip_receiver.joined() => continue,
215                            _ = cancel_token.cancelled() => break,
216                        }
217                        continue;
218                    }
219
220                    // We found records
221
222                    // Collect node ids from active_peers and record.pub_key (of publisher)
223                    let bootstrap_nodes = records
224                        .iter()
225                        .flat_map(|record| {
226                            // records are already filtered by self entry
227                            let mut v = vec![record.pub_key()];
228
229                            if let Ok(record_content) = record.content::<GossipRecordContent>() {
230                                for peer in record_content.active_peers {
231                                    if peer != [0; 32]
232                                        && !peer.eq(record_publisher.pub_key().as_bytes())
233                                    {
234                                        v.push(peer);
235                                    }
236                                }
237                            }
238                            v
239                        })
240                        .filter_map(|pub_key| EndpointId::from_bytes(&pub_key).ok())
241                        .collect::<HashSet<_>>();
242
243                    tracing::debug!(
244                        "Bootstrap: extracted {} potential bootstrap nodes",
245                        bootstrap_nodes.len()
246                    );
247
248                    // Maybe in the meantime someone connected to us via one of our published records
249                    // we don't want to disrup the gossip rotations any more then we have to
250                    // so we check again before joining new peers
251                    let is_joined = gossip_receiver.is_joined().await;
252                    if let Ok(is_joined) = is_joined
253                        && is_joined
254                    {
255                        tracing::debug!("Bootstrap: joined while processing records, exiting");
256                        is_joined_ret = true;
257                        break;
258                    } else if let Err(e) = is_joined {
259                        tracing::debug!("Bootstrap: error checking join status: {:?}", e);
260                        break;
261                    }
262
263                    // Instead of throwing everything into join_peers() at once we go pub_key by pub_key
264                    // again to disrupt as little nodes peer neighborhoods as possible.
265                    for pub_key in bootstrap_nodes.iter() {
266                        match gossip_sender.join_peers(vec![*pub_key], None).await {
267                            Ok(_) => {
268                                tracing::debug!("Bootstrap: attempted to join peer {}", pub_key);
269
270                                tokio::select! {
271                                    _ = sleep(bootstrap_config.per_peer_join_settle_time()) => {}
272                                    _ = gossip_receiver.joined() => {},
273                                    _ = cancel_token.cancelled() => break 'bootstrap,
274                                }
275                                let is_joined = gossip_receiver.is_joined().await;
276                                if let Ok(is_joined) = is_joined
277                                    && is_joined
278                                {
279                                    tracing::debug!(
280                                        "Bootstrap: successfully joined via peer {}",
281                                        pub_key
282                                    );
283                                    is_joined_ret = true;
284                                    break;
285                                } else if let Err(e) = is_joined {
286                                    tracing::debug!(
287                                        "Bootstrap: error checking join status: {:?}",
288                                        e
289                                    );
290                                    break;
291                                }
292                            }
293                            Err(e) => {
294                                tracing::debug!(
295                                    "Bootstrap: failed to join peer {}: {:?}",
296                                    pub_key,
297                                    e
298                                );
299                                continue;
300                            }
301                        }
302                    }
303
304                    // If we are still not connected to anyone:
305                    // give it the default iroh-gossip connection timeout before the final is_joined() check
306                    let is_joined = gossip_receiver.is_joined().await;
307                    if let Ok(is_joined) = is_joined
308                        && !is_joined
309                    {
310                        tracing::debug!(
311                            "Bootstrap: not joined yet, waiting {:?} before final check",
312                            bootstrap_config.join_confirmation_wait_time()
313                        );
314                        tokio::select! {
315                            _ = sleep(bootstrap_config.join_confirmation_wait_time()) => {}
316                            _ = gossip_receiver.joined() => {},
317                            _ = cancel_token.cancelled() => break,
318                        }
319                    } else if let Err(e) = is_joined {
320                        tracing::debug!("Bootstrap: error checking join status: {:?}", e);
321                        break;
322                    }
323
324                    // If we are connected: return
325                    let is_joined = gossip_receiver.is_joined().await;
326                    if let Ok(is_joined) = is_joined
327                        && is_joined
328                    {
329                        tracing::debug!("Bootstrap: successfully joined after final wait");
330                        is_joined_ret = true;
331                        break;
332                    } else if let Err(e) = is_joined {
333                        tracing::debug!("Bootstrap: error checking join status: {:?}", e);
334                        break;
335                    } else {
336                        tracing::debug!("Bootstrap: still not joined after attempting all peers");
337                        // If we are not connected: check if we should publish a record this minute
338                        if current_unix_minute != last_published_unix_minute {
339                            tracing::debug!(
340                                "Bootstrap: publishing fallback record for unix_minute {}",
341                                current_unix_minute
342                            );
343                            last_published_unix_minute = current_unix_minute;
344                            let record_creator = record_publisher.clone();
345                            if let Ok(record) = Record::sign(
346                                record_publisher.topic_id().hash(),
347                                current_unix_minute,
348                                GossipRecordContent {
349                                    active_peers: [[0; 32]; MAX_RECORD_PEERS],
350                                    last_message_hashes: [[0; 32]; MAX_MESSAGE_HASHES],
351                                },
352                                record_publisher.signing_key(),
353                            ) {
354                                publish_record_fire_and_forget(
355                                    record_creator,
356                                    record,
357                                    if use_cached_next {
358                                        Some(current_records)
359                                    } else {
360                                        None
361                                    },
362                                    cancel_token.clone(),
363                                );
364                            }
365                        }
366                        tokio::select! {
367                            _ = sleep(bootstrap_config.discovery_poll_interval()) => continue,
368                            _ = gossip_receiver.joined() => continue,
369                            _ = cancel_token.cancelled() => break,
370                        }
371                    }
372                }
373                tracing::debug!("Bootstrap: exited");
374
375                if is_joined_ret {
376                    let _ = sender.send(Ok(()));
377                } else {
378                    let _ = sender.send(Err(anyhow::anyhow!(
379                        "Bootstrap process failed or was cancelled"
380                    )));
381                }
382            }
383        });
384
385        Ok(receiver)
386    }
387}
388
389fn publish_record_fire_and_forget(
390    record_publisher: RecordPublisher,
391    record: Record,
392    cached_records: Option<HashSet<Record>>,
393    cancel_token: CancellationToken,
394) {
395    tokio::spawn(async move {
396        if let Err(err) = record_publisher
397            .publish_record_cached_records(record, cached_records, cancel_token)
398            .await
399        {
400            tracing::warn!("Failed to publish record: {:?}", err);
401        }
402    });
403}