distributed_topic_tracker/gossip/topic/
bootstrap.rs1use std::{collections::HashSet, time::Duration};
4
5use actor_helper::{Action, Actor, Handle, Receiver, act, act_ok};
6use anyhow::Result;
7use tokio::time::sleep;
8
9use crate::{
10 GossipSender,
11 crypto::Record,
12 gossip::{GossipRecordContent, receiver::GossipReceiver},
13};
14
15#[derive(Debug, Clone)]
20pub struct Bootstrap {
21 api: Handle<BootstrapActor, anyhow::Error>,
22}
23
24#[derive(Debug)]
25struct BootstrapActor {
26 rx: Receiver<Action<Self>>,
27
28 record_publisher: crate::crypto::RecordPublisher,
29
30 gossip_sender: GossipSender,
31 gossip_receiver: GossipReceiver,
32}
33
34impl Bootstrap {
35 pub async fn new(
37 record_publisher: crate::crypto::RecordPublisher,
38 gossip: iroh_gossip::net::Gossip,
39 ) -> Result<Self> {
40 let gossip_topic: iroh_gossip::api::GossipTopic = gossip
41 .subscribe(
42 iroh_gossip::proto::TopicId::from(record_publisher.record_topic().hash()),
43 vec![],
44 )
45 .await?;
46 let (gossip_sender, gossip_receiver) = gossip_topic.split();
47 let (gossip_sender, gossip_receiver) = (
48 GossipSender::new(gossip_sender, gossip.clone()),
49 GossipReceiver::new(gossip_receiver, gossip.clone()),
50 );
51
52 let (api, rx) = Handle::channel();
53
54 tokio::spawn(async move {
55 let mut actor = BootstrapActor {
56 rx,
57 record_publisher,
58 gossip_sender,
59 gossip_receiver,
60 };
61 let _ = actor.run().await;
62 });
63
64 Ok(Self { api: api })
65 }
66
67 pub async fn bootstrap(&self) -> Result<tokio::sync::oneshot::Receiver<()>> {
71 self.api.call(act!(actor=> actor.start_bootstrap())).await
72 }
73
74 pub async fn gossip_sender(&self) -> Result<GossipSender> {
76 self.api
77 .call(act_ok!(actor => async move { actor.gossip_sender.clone() }))
78 .await
79 }
80
81 pub async fn gossip_receiver(&self) -> Result<GossipReceiver> {
83 self.api
84 .call(act_ok!(actor => async move { actor.gossip_receiver.clone() }))
85 .await
86 }
87}
88
89impl Actor<anyhow::Error> for BootstrapActor {
90 async fn run(&mut self) -> Result<()> {
91 loop {
92 tokio::select! {
93 Ok(action) = self.rx.recv_async() => {
94 action(self).await;
95 }
96 _ = tokio::signal::ctrl_c() => {
97 break;
98 }
99 }
100 }
101 Ok(())
102 }
103}
104
105impl BootstrapActor {
106 pub async fn start_bootstrap(&mut self) -> Result<tokio::sync::oneshot::Receiver<()>> {
107 let (sender, receiver) = tokio::sync::oneshot::channel();
108 tokio::spawn({
109 let mut last_published_unix_minute = 0;
110 let (gossip_sender, gossip_receiver) =
111 (self.gossip_sender.clone(), self.gossip_receiver.clone());
112 let record_publisher = self.record_publisher.clone();
113 async move {
114 tracing::debug!("Bootstrap: starting bootstrap process");
115 loop {
116 if gossip_receiver.is_joined().await {
118 tracing::debug!("Bootstrap: already joined, exiting bootstrap loop");
119 break;
120 }
121
122 let unix_minute = crate::unix_minute(if last_published_unix_minute == 0 {
124 -1
125 } else {
126 0
127 });
128
129 let mut records = record_publisher.get_records(unix_minute-1).await;
131 records.extend(record_publisher.get_records(unix_minute).await);
132
133 tracing::debug!("Bootstrap: fetched {} records for unix_minute {}", records.len(), unix_minute);
134
135 if records.is_empty() {
138 if unix_minute != last_published_unix_minute {
139 tracing::debug!("Bootstrap: no records found, publishing own record for unix_minute {}", unix_minute);
140 last_published_unix_minute = unix_minute;
141 let record_creator = record_publisher.clone();
142 let record_content = GossipRecordContent {
143 active_peers: [[0; 32]; 5],
144 last_message_hashes: [[0; 32]; 5],
145 };
146 if let Ok(record) = Record::sign(
147 record_publisher.record_topic().hash(),
148 unix_minute,
149 record_publisher.pub_key().to_bytes(),
150 record_content,
151 &record_publisher.signing_key(),
152 ) {
153 tokio::spawn(async move {
154 let _ = record_creator.publish_record(record).await;
155 });
156 }
157 }
158 sleep(Duration::from_millis(100)).await;
159 continue;
160 }
161
162 let bootstrap_nodes = records
166 .iter()
167 .flat_map(|record| {
168 let mut v = vec![record.node_id()];
169 if let Ok(record_content) = record.content::<GossipRecordContent>() {
170 for peer in record_content.active_peers {
171 if peer != [0; 32] {
172 v.push(peer);
173 }
174 }
175 }
176 v
177 })
178 .filter_map(|node_id| iroh::NodeId::from_bytes(&node_id).ok())
179 .collect::<HashSet<_>>();
180
181 tracing::debug!("Bootstrap: extracted {} potential bootstrap nodes", bootstrap_nodes.len());
182
183 if gossip_receiver.is_joined().await {
187 tracing::debug!("Bootstrap: joined while processing records, exiting");
188 break;
189 }
190
191 for node_id in bootstrap_nodes.iter() {
194 match gossip_sender.join_peers(vec![*node_id], None).await {
195 Ok(_) => {
196 tracing::debug!("Bootstrap: attempted to join peer {}", node_id);
197 sleep(Duration::from_millis(100)).await;
198 if gossip_receiver.is_joined().await {
199 tracing::debug!("Bootstrap: successfully joined via peer {}", node_id);
200 break;
201 }
202 }
203 Err(e) => {
204 tracing::debug!("Bootstrap: failed to join peer {}: {:?}", node_id, e);
205 continue;
206 }
207 }
208 }
209
210 if !gossip_receiver.is_joined().await {
213 tracing::debug!("Bootstrap: not joined yet, waiting 500ms before final check");
214 sleep(Duration::from_millis(500)).await;
215 }
216
217 if gossip_receiver.is_joined().await {
219 tracing::debug!("Bootstrap: successfully joined after final wait");
220 break;
221 } else {
222 tracing::debug!("Bootstrap: still not joined after attempting all peers");
223 if unix_minute != last_published_unix_minute {
225 tracing::debug!("Bootstrap: publishing fallback record for unix_minute {}", unix_minute);
226 last_published_unix_minute = unix_minute;
227 let record_creator = record_publisher.clone();
228 if let Ok(record) = Record::sign(
229 record_publisher.record_topic().hash(),
230 unix_minute,
231 record_publisher.pub_key().to_bytes(),
232 GossipRecordContent {
233 active_peers: [[0; 32]; 5],
234 last_message_hashes: [[0; 32]; 5],
235 },
236 &record_publisher.signing_key(),
237 ) {
238 tokio::spawn(async move {
239 let _ = record_creator.publish_record(record).await;
240 });
241 }
242 }
243 sleep(Duration::from_millis(100)).await;
244 continue;
245 }
246 }
247 tracing::debug!("Bootstrap: completed successfully");
248 let _ = sender.send(());
249 }
250 });
251
252 Ok(receiver)
253 }
254}