Skip to main content

distributed_topic_tracker/gossip/topic/
bootstrap.rs

1//! Bootstrap process for discovering and joining peers via DHT.
2
3use std::collections::HashSet;
4
5use actor_helper::{Handle, act, act_ok};
6use anyhow::Result;
7use iroh::EndpointId;
8use tokio::time::sleep;
9use tokio_util::sync::CancellationToken;
10
11use crate::{
12    GossipSender, MAX_MESSAGE_HASHES, MAX_RECORD_PEERS, RecordPublisher,
13    config::BootstrapConfig,
14    crypto::Record,
15    gossip::{GossipRecordContent, receiver::GossipReceiver},
16};
17
18/// Manages the peer discovery and joining process.
19///
20/// Queries DHT for bootstrap records, extracts node IDs, and progressively
21/// joins peers until the local node is connected to the topic.
22#[derive(Debug, Clone)]
23pub struct Bootstrap {
24    api: Handle<BootstrapActor, anyhow::Error>,
25}
26
27#[derive(Debug)]
28struct BootstrapActor {
29    record_publisher: crate::crypto::RecordPublisher,
30    gossip_sender: GossipSender,
31    gossip_receiver: GossipReceiver,
32    cancel_token: tokio_util::sync::CancellationToken,
33    config: BootstrapConfig,
34}
35
36impl Bootstrap {
37    /// Create a new bootstrap process for a topic.
38    pub async fn new(
39        record_publisher: crate::crypto::RecordPublisher,
40        gossip: iroh_gossip::net::Gossip,
41        cancel_token: tokio_util::sync::CancellationToken,
42        timeout_config: crate::config::TimeoutConfig,
43        bootstrap_config: BootstrapConfig,
44    ) -> Result<Self> {
45        let gossip_topic: iroh_gossip::api::GossipTopic = gossip
46            .subscribe(
47                iroh_gossip::proto::TopicId::from(record_publisher.topic_id().hash()),
48                vec![],
49            )
50            .await?;
51        let (gossip_sender, gossip_receiver) = gossip_topic.split();
52        let (gossip_sender, gossip_receiver) = (
53            GossipSender::new(gossip_sender, timeout_config),
54            GossipReceiver::new(gossip_receiver, cancel_token.clone()),
55        );
56
57        let api = Handle::spawn(BootstrapActor {
58            record_publisher,
59            gossip_sender,
60            gossip_receiver,
61            cancel_token,
62            config: bootstrap_config,
63        })
64        .0;
65
66        Ok(Self { api })
67    }
68
69    /// Start the bootstrap process.
70    ///
71    /// Returns a receiver that signals completion when the node has joined the topic (has at least one neighbor).
72    pub async fn bootstrap(&self) -> Result<tokio::sync::oneshot::Receiver<Result<()>>> {
73        self.api.call(act!(actor=> actor.start_bootstrap())).await
74    }
75
76    /// Get the gossip sender for this topic.
77    pub async fn gossip_sender(&self) -> Result<GossipSender> {
78        self.api
79            .call(act_ok!(actor => async move { actor.gossip_sender.clone() }))
80            .await
81    }
82
83    /// Get the gossip receiver for this topic.
84    pub async fn gossip_receiver(&self) -> Result<GossipReceiver> {
85        self.api
86            .call(act_ok!(actor => async move { actor.gossip_receiver.clone() }))
87            .await
88    }
89}
90
91impl BootstrapActor {
92    pub async fn start_bootstrap(&mut self) -> Result<tokio::sync::oneshot::Receiver<Result<()>>> {
93        let (sender, receiver) = tokio::sync::oneshot::channel();
94        tokio::spawn({
95            let mut last_published_unix_minute = 0;
96            let (gossip_sender, mut gossip_receiver) =
97                (self.gossip_sender.clone(), self.gossip_receiver.clone());
98            let record_publisher = self.record_publisher.clone();
99            let cancel_token = self.cancel_token.clone();
100            let bootstrap_config = self.config.clone();
101            let mut is_joined_ret = false;
102
103            if self.config.publish_record_on_startup() {
104                let unix_minute = crate::unix_minute(0);
105                tracing::debug!("Bootstrap: initial startup record publish {}", unix_minute);
106                last_published_unix_minute = if self.config.check_older_records_first_on_startup() {
107                    0
108                } else {
109                    unix_minute
110                };
111                let record_creator = record_publisher.clone();
112                let record_content = GossipRecordContent {
113                    active_peers: [[0; 32]; MAX_RECORD_PEERS],
114                    last_message_hashes: [[0; 32]; MAX_MESSAGE_HASHES],
115                };
116                if let Ok(record) = Record::sign(
117                    record_publisher.topic_id().hash(),
118                    unix_minute,
119                    record_content,
120                    record_publisher.signing_key(),
121                ) {
122                    publish_record_fire_and_forget(
123                        record_creator,
124                        record,
125                        None,
126                        cancel_token.clone(),
127                    );
128                }
129            }
130
131            async move {
132                tracing::debug!("Bootstrap: starting bootstrap process");
133                'bootstrap: while !cancel_token.is_cancelled() {
134                    // Check if we are connected to at least one node
135                    let is_joined = gossip_receiver.is_joined().await;
136                    if let Ok(is_joined) = is_joined
137                        && is_joined
138                    {
139                        tracing::debug!("Bootstrap: already joined, exiting bootstrap loop");
140                        is_joined_ret = true;
141                        break;
142                    } else if let Err(e) = is_joined {
143                        tracing::debug!("Bootstrap: error checking join status: {:?}", e);
144                        break;
145                    }
146
147                    let current_unix_minute = crate::unix_minute(0);
148
149                    let mut use_cached_next = true;
150                    // last_published_unix_minute == 0 means first run
151                    let unix_minute_offset = if last_published_unix_minute == 0
152                        && bootstrap_config.check_older_records_first_on_startup()
153                    {
154                        use_cached_next = false;
155                        1
156                    } else {
157                        0
158                    };
159
160                    // Unique, verified records for the unix minute
161                    let mut records = record_publisher
162                        .get_records(
163                            current_unix_minute.saturating_sub(unix_minute_offset + 1),
164                            cancel_token.clone(),
165                        )
166                        .await
167                        .unwrap_or_default();
168                    let current_records = record_publisher
169                        .get_records(
170                            current_unix_minute.saturating_sub(unix_minute_offset),
171                            cancel_token.clone(),
172                        )
173                        .await
174                        .unwrap_or_default();
175                    records.extend(current_records.clone());
176
177                    tracing::debug!(
178                        "Bootstrap: fetched {} records for unix_minute {}",
179                        records.len(),
180                        current_unix_minute
181                    );
182
183                    // If there are no records, invoke the publish_proc (the publishing procedure)
184                    // continue the loop after
185                    if records.is_empty() {
186                        if current_unix_minute != last_published_unix_minute {
187                            tracing::debug!(
188                                "Bootstrap: no records found, publishing own record for unix_minute {}",
189                                current_unix_minute
190                            );
191                            last_published_unix_minute = current_unix_minute;
192                            let record_creator = record_publisher.clone();
193                            let record_content = GossipRecordContent {
194                                active_peers: [[0; 32]; MAX_RECORD_PEERS],
195                                last_message_hashes: [[0; 32]; MAX_MESSAGE_HASHES],
196                            };
197                            if let Ok(record) = Record::sign(
198                                record_publisher.topic_id().hash(),
199                                current_unix_minute,
200                                record_content,
201                                record_publisher.signing_key(),
202                            ) {
203                                publish_record_fire_and_forget(
204                                    record_creator,
205                                    record,
206                                    if use_cached_next {
207                                        Some(current_records.clone())
208                                    } else {
209                                        None
210                                    },
211                                    cancel_token.clone(),
212                                );
213                            }
214                        }
215                        tokio::select! {
216                            _ = sleep(bootstrap_config.no_peers_retry_interval()) => {}
217                            _ = gossip_receiver.joined() => continue,
218                            _ = cancel_token.cancelled() => break,
219                        }
220                        continue;
221                    }
222
223                    // We found records
224
225                    // Collect node ids from active_peers and record.pub_key (of publisher)
226                    let bootstrap_nodes = records
227                        .iter()
228                        .flat_map(|record| {
229                            // records are already filtered by self entry
230                            let mut v = vec![record.pub_key()];
231
232                            if let Ok(record_content) = record.content::<GossipRecordContent>() {
233                                for peer in record_content.active_peers {
234                                    if peer != [0; 32]
235                                        && !peer.eq(record_publisher.pub_key().as_bytes())
236                                    {
237                                        v.push(peer);
238                                    }
239                                }
240                            }
241                            v
242                        })
243                        .filter_map(|pub_key| EndpointId::from_bytes(&pub_key).ok())
244                        .collect::<HashSet<_>>();
245
246                    tracing::debug!(
247                        "Bootstrap: extracted {} potential bootstrap nodes",
248                        bootstrap_nodes.len()
249                    );
250
251                    // Maybe in the meantime someone connected to us via one of our published records
252                    // we don't want to disrup the gossip rotations any more then we have to
253                    // so we check again before joining new peers
254                    let is_joined = gossip_receiver.is_joined().await;
255                    if let Ok(is_joined) = is_joined
256                        && is_joined
257                    {
258                        tracing::debug!("Bootstrap: joined while processing records, exiting");
259                        is_joined_ret = true;
260                        break;
261                    } else if let Err(e) = is_joined {
262                        tracing::debug!("Bootstrap: error checking join status: {:?}", e);
263                        break;
264                    }
265
266                    // Instead of throwing everything into join_peers() at once we go pub_key by pub_key
267                    // again to disrupt as little nodes peer neighborhoods as possible.
268                    for pub_key in bootstrap_nodes.iter() {
269                        match gossip_sender.join_peers(vec![*pub_key], None).await {
270                            Ok(_) => {
271                                tracing::debug!("Bootstrap: attempted to join peer {}", pub_key);
272
273                                tokio::select! {
274                                    _ = sleep(bootstrap_config.per_peer_join_settle_time()) => {}
275                                    _ = gossip_receiver.joined() => {},
276                                    _ = cancel_token.cancelled() => break 'bootstrap,
277                                }
278                                let is_joined = gossip_receiver.is_joined().await;
279                                if let Ok(is_joined) = is_joined
280                                    && is_joined
281                                {
282                                    tracing::debug!(
283                                        "Bootstrap: successfully joined via peer {}",
284                                        pub_key
285                                    );
286                                    is_joined_ret = true;
287                                    break;
288                                } else if let Err(e) = is_joined {
289                                    tracing::debug!(
290                                        "Bootstrap: error checking join status: {:?}",
291                                        e
292                                    );
293                                    break;
294                                }
295                            }
296                            Err(e) => {
297                                tracing::debug!(
298                                    "Bootstrap: failed to join peer {}: {:?}",
299                                    pub_key,
300                                    e
301                                );
302                                continue;
303                            }
304                        }
305                    }
306
307                    // If we are still not connected to anyone:
308                    // give it the default iroh-gossip connection timeout before the final is_joined() check
309                    let is_joined = gossip_receiver.is_joined().await;
310                    if let Ok(is_joined) = is_joined
311                        && !is_joined
312                    {
313                        tracing::debug!(
314                            "Bootstrap: not joined yet, waiting {:?} before final check",
315                            bootstrap_config.join_confirmation_wait_time()
316                        );
317                        tokio::select! {
318                            _ = sleep(bootstrap_config.join_confirmation_wait_time()) => {}
319                            _ = gossip_receiver.joined() => {},
320                            _ = cancel_token.cancelled() => break,
321                        }
322                    } else if let Err(e) = is_joined {
323                        tracing::debug!("Bootstrap: error checking join status: {:?}", e);
324                        break;
325                    }
326
327                    // If we are connected: return
328                    let is_joined = gossip_receiver.is_joined().await;
329                    if let Ok(is_joined) = is_joined
330                        && is_joined
331                    {
332                        tracing::debug!("Bootstrap: successfully joined after final wait");
333                        is_joined_ret = true;
334                        break;
335                    } else if let Err(e) = is_joined {
336                        tracing::debug!("Bootstrap: error checking join status: {:?}", e);
337                        break;
338                    } else {
339                        tracing::debug!("Bootstrap: still not joined after attempting all peers");
340                        // If we are not connected: check if we should publish a record this minute
341                        if current_unix_minute != last_published_unix_minute {
342                            tracing::debug!(
343                                "Bootstrap: publishing fallback record for unix_minute {}",
344                                current_unix_minute
345                            );
346                            last_published_unix_minute = current_unix_minute;
347                            let record_creator = record_publisher.clone();
348                            if let Ok(record) = Record::sign(
349                                record_publisher.topic_id().hash(),
350                                current_unix_minute,
351                                GossipRecordContent {
352                                    active_peers: [[0; 32]; MAX_RECORD_PEERS],
353                                    last_message_hashes: [[0; 32]; MAX_MESSAGE_HASHES],
354                                },
355                                record_publisher.signing_key(),
356                            ) {
357                                publish_record_fire_and_forget(
358                                    record_creator,
359                                    record,
360                                    if use_cached_next {
361                                        Some(current_records)
362                                    } else {
363                                        None
364                                    },
365                                    cancel_token.clone(),
366                                );
367                            }
368                        }
369                        tokio::select! {
370                            _ = sleep(bootstrap_config.discovery_poll_interval()) => continue,
371                            _ = gossip_receiver.joined() => continue,
372                            _ = cancel_token.cancelled() => break,
373                        }
374                    }
375                }
376                tracing::debug!("Bootstrap: exited");
377
378                if is_joined_ret {
379                    let _ = sender.send(Ok(()));
380                } else {
381                    let _ = sender.send(Err(anyhow::anyhow!(
382                        "Bootstrap process failed or was cancelled"
383                    )));
384                }
385            }
386        });
387
388        Ok(receiver)
389    }
390}
391
392fn publish_record_fire_and_forget(
393    record_publisher: RecordPublisher,
394    record: Record,
395    cached_records: Option<HashSet<Record>>,
396    cancel_token: CancellationToken,
397) {
398    tokio::spawn(async move {
399        if let Err(err) = record_publisher
400            .publish_record_cached_records(record, cached_records, cancel_token)
401            .await
402        {
403            tracing::warn!("Failed to publish record: {:?}", err);
404        }
405    });
406}