distributed_topic_tracker/gossip/topic/
bootstrap.rs1use std::{collections::HashSet, time::Duration};
4
5use actor_helper::{Action, Actor, Handle, Receiver, act, act_ok};
6use anyhow::Result;
7use iroh::EndpointId;
8use tokio::time::sleep;
9
10use crate::{
11 GossipSender,
12 crypto::Record,
13 gossip::{GossipRecordContent, receiver::GossipReceiver},
14};
15
16#[derive(Debug, Clone)]
21pub struct Bootstrap {
22 api: Handle<BootstrapActor, anyhow::Error>,
23}
24
25#[derive(Debug)]
26struct BootstrapActor {
27 rx: Receiver<Action<Self>>,
28
29 record_publisher: crate::crypto::RecordPublisher,
30
31 gossip_sender: GossipSender,
32 gossip_receiver: GossipReceiver,
33}
34
35impl Bootstrap {
36 pub async fn new(
38 record_publisher: crate::crypto::RecordPublisher,
39 gossip: iroh_gossip::net::Gossip,
40 ) -> Result<Self> {
41 let gossip_topic: iroh_gossip::api::GossipTopic = gossip
42 .subscribe(
43 iroh_gossip::proto::TopicId::from(record_publisher.record_topic().hash()),
44 vec![],
45 )
46 .await?;
47 let (gossip_sender, gossip_receiver) = gossip_topic.split();
48 let (gossip_sender, gossip_receiver) = (
49 GossipSender::new(gossip_sender, gossip.clone()),
50 GossipReceiver::new(gossip_receiver, gossip.clone()),
51 );
52
53 let (api, rx) = Handle::channel();
54
55 tokio::spawn(async move {
56 let mut actor = BootstrapActor {
57 rx,
58 record_publisher,
59 gossip_sender,
60 gossip_receiver,
61 };
62 let _ = actor.run().await;
63 });
64
65 Ok(Self { api })
66 }
67
68 pub async fn bootstrap(&self) -> Result<tokio::sync::oneshot::Receiver<()>> {
72 self.api.call(act!(actor=> actor.start_bootstrap())).await
73 }
74
75 pub async fn gossip_sender(&self) -> Result<GossipSender> {
77 self.api
78 .call(act_ok!(actor => async move { actor.gossip_sender.clone() }))
79 .await
80 }
81
82 pub async fn gossip_receiver(&self) -> Result<GossipReceiver> {
84 self.api
85 .call(act_ok!(actor => async move { actor.gossip_receiver.clone() }))
86 .await
87 }
88}
89
90impl Actor<anyhow::Error> for BootstrapActor {
91 async fn run(&mut self) -> Result<()> {
92 loop {
93 tokio::select! {
94 Ok(action) = self.rx.recv_async() => {
95 action(self).await;
96 }
97 else => break Ok(()),
98 }
99 }
100 }
101}
102
103impl BootstrapActor {
104 pub async fn start_bootstrap(&mut self) -> Result<tokio::sync::oneshot::Receiver<()>> {
105 let (sender, receiver) = tokio::sync::oneshot::channel();
106 tokio::spawn({
107 let mut last_published_unix_minute = 0;
108 let (gossip_sender, gossip_receiver) =
109 (self.gossip_sender.clone(), self.gossip_receiver.clone());
110 let record_publisher = self.record_publisher.clone();
111 async move {
112 tracing::debug!("Bootstrap: starting bootstrap process");
113 loop {
114 if gossip_receiver.is_joined().await {
116 tracing::debug!("Bootstrap: already joined, exiting bootstrap loop");
117 break;
118 }
119
120 let unix_minute = crate::unix_minute(if last_published_unix_minute == 0 {
122 -1
123 } else {
124 0
125 });
126
127 let mut records = record_publisher.get_records(unix_minute - 1).await;
129 records.extend(record_publisher.get_records(unix_minute).await);
130
131 tracing::debug!(
132 "Bootstrap: fetched {} records for unix_minute {}",
133 records.len(),
134 unix_minute
135 );
136
137 if records.is_empty() {
140 if unix_minute != last_published_unix_minute {
141 tracing::debug!(
142 "Bootstrap: no records found, publishing own record for unix_minute {}",
143 unix_minute
144 );
145 last_published_unix_minute = unix_minute;
146 let record_creator = record_publisher.clone();
147 let record_content = GossipRecordContent {
148 active_peers: [[0; 32]; 5],
149 last_message_hashes: [[0; 32]; 5],
150 };
151 if let Ok(record) = Record::sign(
152 record_publisher.record_topic().hash(),
153 unix_minute,
154 record_publisher.pub_key().to_bytes(),
155 record_content,
156 &record_publisher.signing_key(),
157 ) {
158 tokio::spawn(async move {
159 let _ = record_creator.publish_record(record).await;
160 });
161 }
162 }
163 sleep(Duration::from_millis(100)).await;
164 continue;
165 }
166
167 let bootstrap_nodes = records
171 .iter()
172 .flat_map(|record| {
173 let mut v = vec![record.node_id()];
174 if let Ok(record_content) = record.content::<GossipRecordContent>() {
175 for peer in record_content.active_peers {
176 if peer != [0; 32] {
177 v.push(peer);
178 }
179 }
180 }
181 v
182 })
183 .filter_map(|node_id| EndpointId::from_bytes(&node_id).ok())
184 .collect::<HashSet<_>>();
185
186 tracing::debug!(
187 "Bootstrap: extracted {} potential bootstrap nodes",
188 bootstrap_nodes.len()
189 );
190
191 if gossip_receiver.is_joined().await {
195 tracing::debug!("Bootstrap: joined while processing records, exiting");
196 break;
197 }
198
199 for node_id in bootstrap_nodes.iter() {
202 match gossip_sender.join_peers(vec![*node_id], None).await {
203 Ok(_) => {
204 tracing::debug!("Bootstrap: attempted to join peer {}", node_id);
205 sleep(Duration::from_millis(100)).await;
206 if gossip_receiver.is_joined().await {
207 tracing::debug!(
208 "Bootstrap: successfully joined via peer {}",
209 node_id
210 );
211 break;
212 }
213 }
214 Err(e) => {
215 tracing::debug!(
216 "Bootstrap: failed to join peer {}: {:?}",
217 node_id,
218 e
219 );
220 continue;
221 }
222 }
223 }
224
225 if !gossip_receiver.is_joined().await {
228 tracing::debug!(
229 "Bootstrap: not joined yet, waiting 500ms before final check"
230 );
231 sleep(Duration::from_millis(500)).await;
232 }
233
234 if gossip_receiver.is_joined().await {
236 tracing::debug!("Bootstrap: successfully joined after final wait");
237 break;
238 } else {
239 tracing::debug!("Bootstrap: still not joined after attempting all peers");
240 if unix_minute != last_published_unix_minute {
242 tracing::debug!(
243 "Bootstrap: publishing fallback record for unix_minute {}",
244 unix_minute
245 );
246 last_published_unix_minute = unix_minute;
247 let record_creator = record_publisher.clone();
248 if let Ok(record) = Record::sign(
249 record_publisher.record_topic().hash(),
250 unix_minute,
251 record_publisher.pub_key().to_bytes(),
252 GossipRecordContent {
253 active_peers: [[0; 32]; 5],
254 last_message_hashes: [[0; 32]; 5],
255 },
256 &record_publisher.signing_key(),
257 ) {
258 tokio::spawn(async move {
259 let _ = record_creator.publish_record(record).await;
260 });
261 }
262 }
263 sleep(Duration::from_millis(100)).await;
264 continue;
265 }
266 }
267 tracing::debug!("Bootstrap: completed successfully");
268 let _ = sender.send(());
269 }
270 });
271
272 Ok(receiver)
273 }
274}