distributed_topic_tracker/gossip/topic/
bootstrap.rs1use std::{collections::HashSet, time::Duration};
2
3use anyhow::Result;
4use tokio::time::sleep;
5use actor_helper::{act, act_ok, Action, Actor, Handle};
6
7use crate::{
8 GossipSender,
9 crypto::Record,
10 gossip::receiver::GossipReceiver,
11};
12
13#[derive(Debug, Clone)]
14pub struct Bootstrap {
15 api: Handle<BootstrapActor>,
16}
17
18#[derive(Debug)]
19struct BootstrapActor {
20 rx: tokio::sync::mpsc::Receiver<Action<Self>>,
21
22 record_publisher: crate::crypto::RecordPublisher,
23
24 gossip_sender: GossipSender,
25 gossip_receiver: GossipReceiver,
26}
27
28impl Bootstrap {
29 pub async fn new(
30 record_publisher: crate::crypto::RecordPublisher,
31 gossip: iroh_gossip::net::Gossip,
32 ) -> Result<Self> {
33 let gossip_topic: iroh_gossip::api::GossipTopic = gossip
34 .subscribe(
35 iroh_gossip::proto::TopicId::from(record_publisher.record_topic().hash()),
36 vec![],
37 )
38 .await?;
39 let (gossip_sender, gossip_receiver) = gossip_topic.split();
40 let (gossip_sender, gossip_receiver) = (
41 GossipSender::new(gossip_sender, gossip.clone()),
42 GossipReceiver::new(gossip_receiver, gossip.clone()),
43 );
44
45 let (api, rx) = Handle::channel(32);
46
47 tokio::spawn(async move {
48 let mut actor = BootstrapActor {
49 rx,
50 record_publisher,
51 gossip_sender,
52 gossip_receiver,
53 };
54 let _ = actor.run().await;
55 });
56
57 Ok(Self { api: api })
58 }
59
60 pub async fn bootstrap(&self) -> Result<tokio::sync::oneshot::Receiver<()>> {
61 self.api.call(act!(actor=> actor.start_bootstrap())).await
62 }
63
64 pub async fn gossip_sender(&self) -> Result<GossipSender> {
65 self.api
66 .call(act_ok!(actor => async move { actor.gossip_sender.clone() }))
67 .await
68 }
69
70 pub async fn gossip_receiver(&self) -> Result<GossipReceiver> {
71 self.api
72 .call(act_ok!(actor => async move { actor.gossip_receiver.clone() }))
73 .await
74 }
75}
76
77impl Actor for BootstrapActor {
78 async fn run(&mut self) -> Result<()> {
79 loop {
80 tokio::select! {
81 Some(action) = self.rx.recv() => {
82 action(self).await;
83 }
84 _ = tokio::signal::ctrl_c() => {
85 break;
86 }
87 }
88 }
89 Ok(())
90 }
91}
92
93impl BootstrapActor {
94 pub async fn start_bootstrap(&mut self) -> Result<tokio::sync::oneshot::Receiver<()>> {
95 let (sender, receiver) = tokio::sync::oneshot::channel();
96 tokio::spawn({
97 let mut last_published_unix_minute = 0;
98 let (gossip_sender, gossip_receiver) =
99 (self.gossip_sender.clone(), self.gossip_receiver.clone());
100 let record_publisher = self.record_publisher.clone();
101 async move {
102
103 loop {
104 if gossip_receiver.is_joined().await {
106 break;
107 }
108
109 let unix_minute = crate::unix_minute(if last_published_unix_minute == 0 {
111 -1
112 } else {
113 0
114 });
115
116 let records = record_publisher.get_records(unix_minute).await;
118
119 if records.is_empty() {
122 if unix_minute != last_published_unix_minute {
123 last_published_unix_minute = unix_minute;
124 tokio::spawn({
125 let record_creator = record_publisher.clone();
126 let record = Record::sign(
127 record_publisher.record_topic().hash(),
128 unix_minute,
129 record_publisher.pub_key().to_bytes(),
130 [[0; 32]; 5],
131 [[0; 32]; 5],
132 &record_publisher.signing_key(),
133 );
134 async move {
135 let _ = record_creator.publish_record(record).await;
136 }
137 });
138 }
139 sleep(Duration::from_millis(100)).await;
140 continue;
141 }
142
143 let bootstrap_nodes = records
147 .iter()
148 .flat_map(|record| {
149 let mut v = vec![record.node_id()];
150 for peer in record.active_peers() {
151 if peer != [0; 32] {
152 v.push(peer);
153 }
154 }
155 v
156 })
157 .filter_map(|node_id| iroh::NodeId::from_bytes(&node_id).ok())
158 .collect::<HashSet<_>>();
159
160 if gossip_receiver.is_joined().await {
171 break;
172 }
173
174 for node_id in bootstrap_nodes.iter() {
177 match gossip_sender.join_peers(vec![*node_id], None).await {
178 Ok(_) => {
179 sleep(Duration::from_millis(100)).await;
180 if gossip_receiver.is_joined().await {
182 break;
183 }
184 }
185 Err(_) => {
186 continue;
188 }
189 }
190 }
191
192 if !gossip_receiver.is_joined().await {
195 sleep(Duration::from_millis(500)).await;
196 }
197
198 if gossip_receiver.is_joined().await {
200 break;
201 } else {
202 if unix_minute != last_published_unix_minute {
204 last_published_unix_minute = unix_minute;
205 tokio::spawn({
206 let record_creator = record_publisher.clone();
207 let record = Record::sign(
208 record_publisher.record_topic().hash(),
209 unix_minute,
210 record_publisher.pub_key().to_bytes(),
211 [[0; 32]; 5],
212 [[0; 32]; 5],
213 &record_publisher.signing_key(),
214 );
215 async move {
216 let _ = record_creator.publish_record(record).await;
217 }
218 });
219 }
220 sleep(Duration::from_millis(100)).await;
221 continue;
222 }
223 }
224 let _ = sender.send(());
225 }
226 });
227
228 Ok(receiver)
229 }
230}