distributed_topic_tracker/gossip/topic/
bootstrap.rs

1//! Bootstrap process for discovering and joining peers via DHT.
2
3use std::{collections::HashSet, time::Duration};
4
5use actor_helper::{Action, Actor, Handle, Receiver, act, act_ok};
6use anyhow::Result;
7use tokio::time::sleep;
8
9use crate::{
10    GossipSender,
11    crypto::Record,
12    gossip::{GossipRecordContent, receiver::GossipReceiver},
13};
14
15/// Manages the peer discovery and joining process.
16///
17/// Queries DHT for bootstrap records, extracts node IDs, and progressively
18/// joins peers until the local node is connected to the topic.
19#[derive(Debug, Clone)]
20pub struct Bootstrap {
21    api: Handle<BootstrapActor, anyhow::Error>,
22}
23
24#[derive(Debug)]
25struct BootstrapActor {
26    rx: Receiver<Action<Self>>,
27
28    record_publisher: crate::crypto::RecordPublisher,
29
30    gossip_sender: GossipSender,
31    gossip_receiver: GossipReceiver,
32}
33
34impl Bootstrap {
35    /// Create a new bootstrap process for a topic.
36    pub async fn new(
37        record_publisher: crate::crypto::RecordPublisher,
38        gossip: iroh_gossip::net::Gossip,
39    ) -> Result<Self> {
40        let gossip_topic: iroh_gossip::api::GossipTopic = gossip
41            .subscribe(
42                iroh_gossip::proto::TopicId::from(record_publisher.record_topic().hash()),
43                vec![],
44            )
45            .await?;
46        let (gossip_sender, gossip_receiver) = gossip_topic.split();
47        let (gossip_sender, gossip_receiver) = (
48            GossipSender::new(gossip_sender, gossip.clone()),
49            GossipReceiver::new(gossip_receiver, gossip.clone()),
50        );
51
52        let (api, rx) = Handle::channel();
53
54        tokio::spawn(async move {
55            let mut actor = BootstrapActor {
56                rx,
57                record_publisher,
58                gossip_sender,
59                gossip_receiver,
60            };
61            let _ = actor.run().await;
62        });
63
64        Ok(Self { api: api })
65    }
66
67    /// Start the bootstrap process.
68    ///
69    /// Returns a receiver that signals completion when the node has joined the topic (has at least one neighbor).
70    pub async fn bootstrap(&self) -> Result<tokio::sync::oneshot::Receiver<()>> {
71        self.api.call(act!(actor=> actor.start_bootstrap())).await
72    }
73
74    /// Get the gossip sender for this topic.
75    pub async fn gossip_sender(&self) -> Result<GossipSender> {
76        self.api
77            .call(act_ok!(actor => async move { actor.gossip_sender.clone() }))
78            .await
79    }
80
81    /// Get the gossip receiver for this topic.
82    pub async fn gossip_receiver(&self) -> Result<GossipReceiver> {
83        self.api
84            .call(act_ok!(actor => async move { actor.gossip_receiver.clone() }))
85            .await
86    }
87}
88
89impl Actor<anyhow::Error> for BootstrapActor {
90    async fn run(&mut self) -> Result<()> {
91        loop {
92            tokio::select! {
93                Ok(action) = self.rx.recv_async() => {
94                    action(self).await;
95                }
96                _ = tokio::signal::ctrl_c() => {
97                    break;
98                }
99            }
100        }
101        Ok(())
102    }
103}
104
105impl BootstrapActor {
106    pub async fn start_bootstrap(&mut self) -> Result<tokio::sync::oneshot::Receiver<()>> {
107        let (sender, receiver) = tokio::sync::oneshot::channel();
108        tokio::spawn({
109            let mut last_published_unix_minute = 0;
110            let (gossip_sender, gossip_receiver) =
111                (self.gossip_sender.clone(), self.gossip_receiver.clone());
112            let record_publisher = self.record_publisher.clone();
113            async move {
114                tracing::debug!("Bootstrap: starting bootstrap process");
115                loop {
116                    // Check if we are connected to at least one node
117                    if gossip_receiver.is_joined().await {
118                        tracing::debug!("Bootstrap: already joined, exiting bootstrap loop");
119                        break;
120                    }
121
122                    // On the first try we check the prev unix minute, after that the current one
123                    let unix_minute = crate::unix_minute(if last_published_unix_minute == 0 {
124                        -1
125                    } else {
126                        0
127                    });
128
129                    // Unique, verified records for the unix minute
130                    let mut records = record_publisher.get_records(unix_minute-1).await;
131                    records.extend(record_publisher.get_records(unix_minute).await);
132
133                    tracing::debug!("Bootstrap: fetched {} records for unix_minute {}", records.len(), unix_minute);
134
135                    // If there are no records, invoke the publish_proc (the publishing procedure)
136                    // continue the loop after
137                    if records.is_empty() {
138                        if unix_minute != last_published_unix_minute {
139                            tracing::debug!("Bootstrap: no records found, publishing own record for unix_minute {}", unix_minute);
140                            last_published_unix_minute = unix_minute;
141                            let record_creator = record_publisher.clone();
142                            let record_content = GossipRecordContent {
143                                active_peers: [[0; 32]; 5],
144                                last_message_hashes: [[0; 32]; 5],
145                            };
146                            if let Ok(record) = Record::sign(
147                                record_publisher.record_topic().hash(),
148                                unix_minute,
149                                record_publisher.pub_key().to_bytes(),
150                                record_content,
151                                &record_publisher.signing_key(),
152                            ) {
153                                tokio::spawn(async move {
154                                    let _ = record_creator.publish_record(record).await;
155                                });
156                            }
157                        }
158                        sleep(Duration::from_millis(100)).await;
159                        continue;
160                    }
161
162                    // We found records
163
164                    // Collect node ids from active_peers and record.node_id (of publisher)
165                    let bootstrap_nodes = records
166                        .iter()
167                        .flat_map(|record| {
168                            let mut v = vec![record.node_id()];
169                            if let Ok(record_content) = record.content::<GossipRecordContent>() {
170                                for peer in record_content.active_peers {
171                                    if peer != [0; 32] {
172                                        v.push(peer);
173                                    }
174                                }
175                            }
176                            v
177                        })
178                        .filter_map(|node_id| iroh::NodeId::from_bytes(&node_id).ok())
179                        .collect::<HashSet<_>>();
180
181                    tracing::debug!("Bootstrap: extracted {} potential bootstrap nodes", bootstrap_nodes.len());
182
183                    // Maybe in the meantime someone connected to us via one of our published records
184                    // we don't want to disrup the gossip rotations any more then we have to
185                    // so we check again before joining new peers
186                    if gossip_receiver.is_joined().await {
187                        tracing::debug!("Bootstrap: joined while processing records, exiting");
188                        break;
189                    }
190
191                    // Instead of throwing everything into join_peers() at once we go node_id by node_id
192                    // again to disrupt as little nodes peer neighborhoods as possible.
193                    for node_id in bootstrap_nodes.iter() {
194                        match gossip_sender.join_peers(vec![*node_id], None).await {
195                            Ok(_) => {
196                                tracing::debug!("Bootstrap: attempted to join peer {}", node_id);
197                                sleep(Duration::from_millis(100)).await;
198                                if gossip_receiver.is_joined().await {
199                                    tracing::debug!("Bootstrap: successfully joined via peer {}", node_id);
200                                    break;
201                                }
202                            }
203                            Err(e) => {
204                                tracing::debug!("Bootstrap: failed to join peer {}: {:?}", node_id, e);
205                                continue;
206                            }
207                        }
208                    }
209
210                    // If we are still not connected to anyone:
211                    // give it the default iroh-gossip connection timeout before the final is_joined() check
212                    if !gossip_receiver.is_joined().await {
213                        tracing::debug!("Bootstrap: not joined yet, waiting 500ms before final check");
214                        sleep(Duration::from_millis(500)).await;
215                    }
216
217                    // If we are connected: return
218                    if gossip_receiver.is_joined().await {
219                        tracing::debug!("Bootstrap: successfully joined after final wait");
220                        break;
221                    } else {
222                        tracing::debug!("Bootstrap: still not joined after attempting all peers");
223                        // If we are not connected: check if we should publish a record this minute
224                        if unix_minute != last_published_unix_minute {
225                            tracing::debug!("Bootstrap: publishing fallback record for unix_minute {}", unix_minute);
226                            last_published_unix_minute = unix_minute;
227                            let record_creator = record_publisher.clone();
228                            if let Ok(record) = Record::sign(
229                                record_publisher.record_topic().hash(),
230                                unix_minute,
231                                record_publisher.pub_key().to_bytes(),
232                                GossipRecordContent {
233                                    active_peers: [[0; 32]; 5],
234                                    last_message_hashes: [[0; 32]; 5],
235                                },
236                                &record_publisher.signing_key(),
237                            ) {
238                                tokio::spawn(async move {
239                                    let _ = record_creator.publish_record(record).await;
240                                });
241                            }
242                        }
243                        sleep(Duration::from_millis(100)).await;
244                        continue;
245                    }
246                }
247                tracing::debug!("Bootstrap: completed successfully");
248                let _ = sender.send(());
249            }
250        });
251
252        Ok(receiver)
253    }
254}