distributed_topic_tracker/gossip/topic/
bootstrap.rs

1//! Bootstrap process for discovering and joining peers via DHT.
2
3use std::{collections::HashSet, time::Duration};
4
5use actor_helper::{Action, Actor, Handle, Receiver, act, act_ok};
6use anyhow::Result;
7use iroh::EndpointId;
8use tokio::time::sleep;
9
10use crate::{
11    GossipSender,
12    crypto::Record,
13    gossip::{GossipRecordContent, receiver::GossipReceiver},
14};
15
16/// Manages the peer discovery and joining process.
17///
18/// Queries DHT for bootstrap records, extracts node IDs, and progressively
19/// joins peers until the local node is connected to the topic.
20#[derive(Debug, Clone)]
21pub struct Bootstrap {
22    api: Handle<BootstrapActor, anyhow::Error>,
23}
24
25#[derive(Debug)]
26struct BootstrapActor {
27    rx: Receiver<Action<Self>>,
28
29    record_publisher: crate::crypto::RecordPublisher,
30
31    gossip_sender: GossipSender,
32    gossip_receiver: GossipReceiver,
33}
34
35impl Bootstrap {
36    /// Create a new bootstrap process for a topic.
37    pub async fn new(
38        record_publisher: crate::crypto::RecordPublisher,
39        gossip: iroh_gossip::net::Gossip,
40    ) -> Result<Self> {
41        let gossip_topic: iroh_gossip::api::GossipTopic = gossip
42            .subscribe(
43                iroh_gossip::proto::TopicId::from(record_publisher.record_topic().hash()),
44                vec![],
45            )
46            .await?;
47        let (gossip_sender, gossip_receiver) = gossip_topic.split();
48        let (gossip_sender, gossip_receiver) = (
49            GossipSender::new(gossip_sender, gossip.clone()),
50            GossipReceiver::new(gossip_receiver, gossip.clone()),
51        );
52
53        let (api, rx) = Handle::channel();
54
55        tokio::spawn(async move {
56            let mut actor = BootstrapActor {
57                rx,
58                record_publisher,
59                gossip_sender,
60                gossip_receiver,
61            };
62            let _ = actor.run().await;
63        });
64
65        Ok(Self { api })
66    }
67
68    /// Start the bootstrap process.
69    ///
70    /// Returns a receiver that signals completion when the node has joined the topic (has at least one neighbor).
71    pub async fn bootstrap(&self) -> Result<tokio::sync::oneshot::Receiver<()>> {
72        self.api.call(act!(actor=> actor.start_bootstrap())).await
73    }
74
75    /// Get the gossip sender for this topic.
76    pub async fn gossip_sender(&self) -> Result<GossipSender> {
77        self.api
78            .call(act_ok!(actor => async move { actor.gossip_sender.clone() }))
79            .await
80    }
81
82    /// Get the gossip receiver for this topic.
83    pub async fn gossip_receiver(&self) -> Result<GossipReceiver> {
84        self.api
85            .call(act_ok!(actor => async move { actor.gossip_receiver.clone() }))
86            .await
87    }
88}
89
90impl Actor<anyhow::Error> for BootstrapActor {
91    async fn run(&mut self) -> Result<()> {
92        loop {
93            tokio::select! {
94                Ok(action) = self.rx.recv_async() => {
95                    action(self).await;
96                }
97                _ = tokio::signal::ctrl_c() => {
98                    break;
99                }
100            }
101        }
102        Ok(())
103    }
104}
105
106impl BootstrapActor {
107    pub async fn start_bootstrap(&mut self) -> Result<tokio::sync::oneshot::Receiver<()>> {
108        let (sender, receiver) = tokio::sync::oneshot::channel();
109        tokio::spawn({
110            let mut last_published_unix_minute = 0;
111            let (gossip_sender, gossip_receiver) =
112                (self.gossip_sender.clone(), self.gossip_receiver.clone());
113            let record_publisher = self.record_publisher.clone();
114            async move {
115                tracing::debug!("Bootstrap: starting bootstrap process");
116                loop {
117                    // Check if we are connected to at least one node
118                    if gossip_receiver.is_joined().await {
119                        tracing::debug!("Bootstrap: already joined, exiting bootstrap loop");
120                        break;
121                    }
122
123                    // On the first try we check the prev unix minute, after that the current one
124                    let unix_minute = crate::unix_minute(if last_published_unix_minute == 0 {
125                        -1
126                    } else {
127                        0
128                    });
129
130                    // Unique, verified records for the unix minute
131                    let mut records = record_publisher.get_records(unix_minute - 1).await;
132                    records.extend(record_publisher.get_records(unix_minute).await);
133
134                    tracing::debug!(
135                        "Bootstrap: fetched {} records for unix_minute {}",
136                        records.len(),
137                        unix_minute
138                    );
139
140                    // If there are no records, invoke the publish_proc (the publishing procedure)
141                    // continue the loop after
142                    if records.is_empty() {
143                        if unix_minute != last_published_unix_minute {
144                            tracing::debug!(
145                                "Bootstrap: no records found, publishing own record for unix_minute {}",
146                                unix_minute
147                            );
148                            last_published_unix_minute = unix_minute;
149                            let record_creator = record_publisher.clone();
150                            let record_content = GossipRecordContent {
151                                active_peers: [[0; 32]; 5],
152                                last_message_hashes: [[0; 32]; 5],
153                            };
154                            if let Ok(record) = Record::sign(
155                                record_publisher.record_topic().hash(),
156                                unix_minute,
157                                record_publisher.pub_key().to_bytes(),
158                                record_content,
159                                &record_publisher.signing_key(),
160                            ) {
161                                tokio::spawn(async move {
162                                    let _ = record_creator.publish_record(record).await;
163                                });
164                            }
165                        }
166                        sleep(Duration::from_millis(100)).await;
167                        continue;
168                    }
169
170                    // We found records
171
172                    // Collect node ids from active_peers and record.node_id (of publisher)
173                    let bootstrap_nodes = records
174                        .iter()
175                        .flat_map(|record| {
176                            let mut v = vec![record.node_id()];
177                            if let Ok(record_content) = record.content::<GossipRecordContent>() {
178                                for peer in record_content.active_peers {
179                                    if peer != [0; 32] {
180                                        v.push(peer);
181                                    }
182                                }
183                            }
184                            v
185                        })
186                        .filter_map(|node_id| EndpointId::from_bytes(&node_id).ok())
187                        .collect::<HashSet<_>>();
188
189                    tracing::debug!(
190                        "Bootstrap: extracted {} potential bootstrap nodes",
191                        bootstrap_nodes.len()
192                    );
193
194                    // Maybe in the meantime someone connected to us via one of our published records
195                    // we don't want to disrup the gossip rotations any more then we have to
196                    // so we check again before joining new peers
197                    if gossip_receiver.is_joined().await {
198                        tracing::debug!("Bootstrap: joined while processing records, exiting");
199                        break;
200                    }
201
202                    // Instead of throwing everything into join_peers() at once we go node_id by node_id
203                    // again to disrupt as little nodes peer neighborhoods as possible.
204                    for node_id in bootstrap_nodes.iter() {
205                        match gossip_sender.join_peers(vec![*node_id], None).await {
206                            Ok(_) => {
207                                tracing::debug!("Bootstrap: attempted to join peer {}", node_id);
208                                sleep(Duration::from_millis(100)).await;
209                                if gossip_receiver.is_joined().await {
210                                    tracing::debug!(
211                                        "Bootstrap: successfully joined via peer {}",
212                                        node_id
213                                    );
214                                    break;
215                                }
216                            }
217                            Err(e) => {
218                                tracing::debug!(
219                                    "Bootstrap: failed to join peer {}: {:?}",
220                                    node_id,
221                                    e
222                                );
223                                continue;
224                            }
225                        }
226                    }
227
228                    // If we are still not connected to anyone:
229                    // give it the default iroh-gossip connection timeout before the final is_joined() check
230                    if !gossip_receiver.is_joined().await {
231                        tracing::debug!(
232                            "Bootstrap: not joined yet, waiting 500ms before final check"
233                        );
234                        sleep(Duration::from_millis(500)).await;
235                    }
236
237                    // If we are connected: return
238                    if gossip_receiver.is_joined().await {
239                        tracing::debug!("Bootstrap: successfully joined after final wait");
240                        break;
241                    } else {
242                        tracing::debug!("Bootstrap: still not joined after attempting all peers");
243                        // If we are not connected: check if we should publish a record this minute
244                        if unix_minute != last_published_unix_minute {
245                            tracing::debug!(
246                                "Bootstrap: publishing fallback record for unix_minute {}",
247                                unix_minute
248                            );
249                            last_published_unix_minute = unix_minute;
250                            let record_creator = record_publisher.clone();
251                            if let Ok(record) = Record::sign(
252                                record_publisher.record_topic().hash(),
253                                unix_minute,
254                                record_publisher.pub_key().to_bytes(),
255                                GossipRecordContent {
256                                    active_peers: [[0; 32]; 5],
257                                    last_message_hashes: [[0; 32]; 5],
258                                },
259                                &record_publisher.signing_key(),
260                            ) {
261                                tokio::spawn(async move {
262                                    let _ = record_creator.publish_record(record).await;
263                                });
264                            }
265                        }
266                        sleep(Duration::from_millis(100)).await;
267                        continue;
268                    }
269                }
270                tracing::debug!("Bootstrap: completed successfully");
271                let _ = sender.send(());
272            }
273        });
274
275        Ok(receiver)
276    }
277}