distributed_topic_tracker/gossip/topic/
bootstrap.rs1use std::{collections::HashSet, time::Duration};
4
5use actor_helper::{Action, Actor, Handle, Receiver, act, act_ok};
6use anyhow::Result;
7use iroh::EndpointId;
8use tokio::time::sleep;
9
10use crate::{
11 GossipSender,
12 crypto::Record,
13 gossip::{GossipRecordContent, receiver::GossipReceiver},
14};
15
16#[derive(Debug, Clone)]
21pub struct Bootstrap {
22 api: Handle<BootstrapActor, anyhow::Error>,
23}
24
25#[derive(Debug)]
26struct BootstrapActor {
27 rx: Receiver<Action<Self>>,
28
29 record_publisher: crate::crypto::RecordPublisher,
30
31 gossip_sender: GossipSender,
32 gossip_receiver: GossipReceiver,
33}
34
35impl Bootstrap {
36 pub async fn new(
38 record_publisher: crate::crypto::RecordPublisher,
39 gossip: iroh_gossip::net::Gossip,
40 ) -> Result<Self> {
41 let gossip_topic: iroh_gossip::api::GossipTopic = gossip
42 .subscribe(
43 iroh_gossip::proto::TopicId::from(record_publisher.record_topic().hash()),
44 vec![],
45 )
46 .await?;
47 let (gossip_sender, gossip_receiver) = gossip_topic.split();
48 let (gossip_sender, gossip_receiver) = (
49 GossipSender::new(gossip_sender, gossip.clone()),
50 GossipReceiver::new(gossip_receiver, gossip.clone()),
51 );
52
53 let (api, rx) = Handle::channel();
54
55 tokio::spawn(async move {
56 let mut actor = BootstrapActor {
57 rx,
58 record_publisher,
59 gossip_sender,
60 gossip_receiver,
61 };
62 let _ = actor.run().await;
63 });
64
65 Ok(Self { api })
66 }
67
68 pub async fn bootstrap(&self) -> Result<tokio::sync::oneshot::Receiver<()>> {
72 self.api.call(act!(actor=> actor.start_bootstrap())).await
73 }
74
75 pub async fn gossip_sender(&self) -> Result<GossipSender> {
77 self.api
78 .call(act_ok!(actor => async move { actor.gossip_sender.clone() }))
79 .await
80 }
81
82 pub async fn gossip_receiver(&self) -> Result<GossipReceiver> {
84 self.api
85 .call(act_ok!(actor => async move { actor.gossip_receiver.clone() }))
86 .await
87 }
88}
89
90impl Actor<anyhow::Error> for BootstrapActor {
91 async fn run(&mut self) -> Result<()> {
92 loop {
93 tokio::select! {
94 Ok(action) = self.rx.recv_async() => {
95 action(self).await;
96 }
97 _ = tokio::signal::ctrl_c() => {
98 break;
99 }
100 }
101 }
102 Ok(())
103 }
104}
105
106impl BootstrapActor {
107 pub async fn start_bootstrap(&mut self) -> Result<tokio::sync::oneshot::Receiver<()>> {
108 let (sender, receiver) = tokio::sync::oneshot::channel();
109 tokio::spawn({
110 let mut last_published_unix_minute = 0;
111 let (gossip_sender, gossip_receiver) =
112 (self.gossip_sender.clone(), self.gossip_receiver.clone());
113 let record_publisher = self.record_publisher.clone();
114 async move {
115 tracing::debug!("Bootstrap: starting bootstrap process");
116 loop {
117 if gossip_receiver.is_joined().await {
119 tracing::debug!("Bootstrap: already joined, exiting bootstrap loop");
120 break;
121 }
122
123 let unix_minute = crate::unix_minute(if last_published_unix_minute == 0 {
125 -1
126 } else {
127 0
128 });
129
130 let mut records = record_publisher.get_records(unix_minute - 1).await;
132 records.extend(record_publisher.get_records(unix_minute).await);
133
134 tracing::debug!(
135 "Bootstrap: fetched {} records for unix_minute {}",
136 records.len(),
137 unix_minute
138 );
139
140 if records.is_empty() {
143 if unix_minute != last_published_unix_minute {
144 tracing::debug!(
145 "Bootstrap: no records found, publishing own record for unix_minute {}",
146 unix_minute
147 );
148 last_published_unix_minute = unix_minute;
149 let record_creator = record_publisher.clone();
150 let record_content = GossipRecordContent {
151 active_peers: [[0; 32]; 5],
152 last_message_hashes: [[0; 32]; 5],
153 };
154 if let Ok(record) = Record::sign(
155 record_publisher.record_topic().hash(),
156 unix_minute,
157 record_publisher.pub_key().to_bytes(),
158 record_content,
159 &record_publisher.signing_key(),
160 ) {
161 tokio::spawn(async move {
162 let _ = record_creator.publish_record(record).await;
163 });
164 }
165 }
166 sleep(Duration::from_millis(100)).await;
167 continue;
168 }
169
170 let bootstrap_nodes = records
174 .iter()
175 .flat_map(|record| {
176 let mut v = vec![record.node_id()];
177 if let Ok(record_content) = record.content::<GossipRecordContent>() {
178 for peer in record_content.active_peers {
179 if peer != [0; 32] {
180 v.push(peer);
181 }
182 }
183 }
184 v
185 })
186 .filter_map(|node_id| EndpointId::from_bytes(&node_id).ok())
187 .collect::<HashSet<_>>();
188
189 tracing::debug!(
190 "Bootstrap: extracted {} potential bootstrap nodes",
191 bootstrap_nodes.len()
192 );
193
194 if gossip_receiver.is_joined().await {
198 tracing::debug!("Bootstrap: joined while processing records, exiting");
199 break;
200 }
201
202 for node_id in bootstrap_nodes.iter() {
205 match gossip_sender.join_peers(vec![*node_id], None).await {
206 Ok(_) => {
207 tracing::debug!("Bootstrap: attempted to join peer {}", node_id);
208 sleep(Duration::from_millis(100)).await;
209 if gossip_receiver.is_joined().await {
210 tracing::debug!(
211 "Bootstrap: successfully joined via peer {}",
212 node_id
213 );
214 break;
215 }
216 }
217 Err(e) => {
218 tracing::debug!(
219 "Bootstrap: failed to join peer {}: {:?}",
220 node_id,
221 e
222 );
223 continue;
224 }
225 }
226 }
227
228 if !gossip_receiver.is_joined().await {
231 tracing::debug!(
232 "Bootstrap: not joined yet, waiting 500ms before final check"
233 );
234 sleep(Duration::from_millis(500)).await;
235 }
236
237 if gossip_receiver.is_joined().await {
239 tracing::debug!("Bootstrap: successfully joined after final wait");
240 break;
241 } else {
242 tracing::debug!("Bootstrap: still not joined after attempting all peers");
243 if unix_minute != last_published_unix_minute {
245 tracing::debug!(
246 "Bootstrap: publishing fallback record for unix_minute {}",
247 unix_minute
248 );
249 last_published_unix_minute = unix_minute;
250 let record_creator = record_publisher.clone();
251 if let Ok(record) = Record::sign(
252 record_publisher.record_topic().hash(),
253 unix_minute,
254 record_publisher.pub_key().to_bytes(),
255 GossipRecordContent {
256 active_peers: [[0; 32]; 5],
257 last_message_hashes: [[0; 32]; 5],
258 },
259 &record_publisher.signing_key(),
260 ) {
261 tokio::spawn(async move {
262 let _ = record_creator.publish_record(record).await;
263 });
264 }
265 }
266 sleep(Duration::from_millis(100)).await;
267 continue;
268 }
269 }
270 tracing::debug!("Bootstrap: completed successfully");
271 let _ = sender.send(());
272 }
273 });
274
275 Ok(receiver)
276 }
277}