distributed_topic_tracker/gossip/topic/
bootstrap.rs1use std::{collections::HashSet, time::Duration};
2
3use actor_helper::{Action, Actor, Handle, act, act_ok};
4use anyhow::Result;
5use tokio::time::sleep;
6
7use crate::{
8 GossipSender,
9 crypto::Record,
10 gossip::{GossipRecordContent, receiver::GossipReceiver},
11};
12
13#[derive(Debug, Clone)]
14pub struct Bootstrap {
15 api: Handle<BootstrapActor>,
16}
17
18#[derive(Debug)]
19struct BootstrapActor {
20 rx: tokio::sync::mpsc::Receiver<Action<Self>>,
21
22 record_publisher: crate::crypto::RecordPublisher,
23
24 gossip_sender: GossipSender,
25 gossip_receiver: GossipReceiver,
26}
27
28impl Bootstrap {
29 pub async fn new(
30 record_publisher: crate::crypto::RecordPublisher,
31 gossip: iroh_gossip::net::Gossip,
32 ) -> Result<Self> {
33 let gossip_topic: iroh_gossip::api::GossipTopic = gossip
34 .subscribe(
35 iroh_gossip::proto::TopicId::from(record_publisher.record_topic().hash()),
36 vec![],
37 )
38 .await?;
39 let (gossip_sender, gossip_receiver) = gossip_topic.split();
40 let (gossip_sender, gossip_receiver) = (
41 GossipSender::new(gossip_sender, gossip.clone()),
42 GossipReceiver::new(gossip_receiver, gossip.clone()),
43 );
44
45 let (api, rx) = Handle::channel(32);
46
47 tokio::spawn(async move {
48 let mut actor = BootstrapActor {
49 rx,
50 record_publisher,
51 gossip_sender,
52 gossip_receiver,
53 };
54 let _ = actor.run().await;
55 });
56
57 Ok(Self { api: api })
58 }
59
60 pub async fn bootstrap(&self) -> Result<tokio::sync::oneshot::Receiver<()>> {
61 self.api.call(act!(actor=> actor.start_bootstrap())).await
62 }
63
64 pub async fn gossip_sender(&self) -> Result<GossipSender> {
65 self.api
66 .call(act_ok!(actor => async move { actor.gossip_sender.clone() }))
67 .await
68 }
69
70 pub async fn gossip_receiver(&self) -> Result<GossipReceiver> {
71 self.api
72 .call(act_ok!(actor => async move { actor.gossip_receiver.clone() }))
73 .await
74 }
75}
76
77impl Actor for BootstrapActor {
78 async fn run(&mut self) -> Result<()> {
79 loop {
80 tokio::select! {
81 Some(action) = self.rx.recv() => {
82 action(self).await;
83 }
84 _ = tokio::signal::ctrl_c() => {
85 break;
86 }
87 }
88 }
89 Ok(())
90 }
91}
92
93impl BootstrapActor {
94 pub async fn start_bootstrap(&mut self) -> Result<tokio::sync::oneshot::Receiver<()>> {
95 let (sender, receiver) = tokio::sync::oneshot::channel();
96 tokio::spawn({
97 let mut last_published_unix_minute = 0;
98 let (gossip_sender, gossip_receiver) =
99 (self.gossip_sender.clone(), self.gossip_receiver.clone());
100 let record_publisher = self.record_publisher.clone();
101 async move {
102 loop {
103 if gossip_receiver.is_joined().await {
105 break;
106 }
107
108 let unix_minute = crate::unix_minute(if last_published_unix_minute == 0 {
110 -1
111 } else {
112 0
113 });
114
115 let records = record_publisher.get_records(unix_minute).await;
117
118 if records.is_empty() {
121 if unix_minute != last_published_unix_minute {
122 last_published_unix_minute = unix_minute;
123 let record_creator = record_publisher.clone();
124 let record_content = GossipRecordContent {
125 active_peers: [[0; 32]; 5],
126 last_message_hashes: [[0; 32]; 5],
127 };
128 if let Ok(record) = Record::sign(
129 record_publisher.record_topic().hash(),
130 unix_minute,
131 record_publisher.pub_key().to_bytes(),
132 record_content,
133 &record_publisher.signing_key(),
134 ) {
135 tokio::spawn(async move {
136 let _ = record_creator.publish_record(record).await;
137 });
138 }
139 }
140 sleep(Duration::from_millis(100)).await;
141 continue;
142 }
143
144 let bootstrap_nodes = records
148 .iter()
149 .flat_map(|record| {
150 let mut v = vec![record.node_id()];
151 if let Ok(record_content) = record.content::<GossipRecordContent>() {
152 for peer in record_content.active_peers {
153 if peer != [0; 32] {
154 v.push(peer);
155 }
156 }
157 }
158 v
159 })
160 .filter_map(|node_id| iroh::NodeId::from_bytes(&node_id).ok())
161 .collect::<HashSet<_>>();
162
163 if gossip_receiver.is_joined().await {
174 break;
175 }
176
177 for node_id in bootstrap_nodes.iter() {
180 match gossip_sender.join_peers(vec![*node_id], None).await {
181 Ok(_) => {
182 sleep(Duration::from_millis(100)).await;
183 if gossip_receiver.is_joined().await {
185 break;
186 }
187 }
188 Err(_) => {
189 continue;
191 }
192 }
193 }
194
195 if !gossip_receiver.is_joined().await {
198 sleep(Duration::from_millis(500)).await;
199 }
200
201 if gossip_receiver.is_joined().await {
203 break;
204 } else {
205 if unix_minute != last_published_unix_minute {
207 last_published_unix_minute = unix_minute;
208 let record_creator = record_publisher.clone();
209 if let Ok(record) = Record::sign(
210 record_publisher.record_topic().hash(),
211 unix_minute,
212 record_publisher.pub_key().to_bytes(),
213 GossipRecordContent {
214 active_peers: [[0; 32]; 5],
215 last_message_hashes: [[0; 32]; 5],
216 },
217 &record_publisher.signing_key(),
218 ) {
219 tokio::spawn(async move {
220 let _ = record_creator.publish_record(record).await;
221 });
222 }
223 }
224 sleep(Duration::from_millis(100)).await;
225 continue;
226 }
227 }
228 let _ = sender.send(());
229 }
230 });
231
232 Ok(receiver)
233 }
234}