distributed_topic_tracker/gossip/topic/
bootstrap.rs

1use std::{collections::HashSet, time::Duration};
2
3use anyhow::Result;
4use tokio::time::sleep;
5use actor_helper::{act, act_ok, Action, Actor, Handle};
6
7use crate::{
8    GossipSender,
9    crypto::Record,
10    gossip::receiver::GossipReceiver,
11};
12
13#[derive(Debug, Clone)]
14pub struct Bootstrap {
15    api: Handle<BootstrapActor>,
16}
17
18#[derive(Debug)]
19struct BootstrapActor {
20    rx: tokio::sync::mpsc::Receiver<Action<Self>>,
21
22    record_publisher: crate::crypto::RecordPublisher,
23
24    gossip_sender: GossipSender,
25    gossip_receiver: GossipReceiver,
26}
27
28impl Bootstrap {
29    pub async fn new(
30        record_publisher: crate::crypto::RecordPublisher,
31        gossip: iroh_gossip::net::Gossip,
32    ) -> Result<Self> {
33        let gossip_topic: iroh_gossip::api::GossipTopic = gossip
34            .subscribe(
35                iroh_gossip::proto::TopicId::from(record_publisher.record_topic().hash()),
36                vec![],
37            )
38            .await?;
39        let (gossip_sender, gossip_receiver) = gossip_topic.split();
40        let (gossip_sender, gossip_receiver) = (
41            GossipSender::new(gossip_sender, gossip.clone()),
42            GossipReceiver::new(gossip_receiver, gossip.clone()),
43        );
44
45        let (api, rx) = Handle::channel(32);
46
47        tokio::spawn(async move {
48            let mut actor = BootstrapActor {
49                rx,
50                record_publisher,
51                gossip_sender,
52                gossip_receiver,
53            };
54            let _ = actor.run().await;
55        });
56
57        Ok(Self { api: api })
58    }
59
60    pub async fn bootstrap(&self) -> Result<tokio::sync::oneshot::Receiver<()>> {
61        self.api.call(act!(actor=> actor.start_bootstrap())).await        
62    }
63
64    pub async fn gossip_sender(&self) -> Result<GossipSender> {
65        self.api
66            .call(act_ok!(actor => async move { actor.gossip_sender.clone() }))
67            .await
68    }
69
70    pub async fn gossip_receiver(&self) -> Result<GossipReceiver> {
71        self.api
72            .call(act_ok!(actor => async move { actor.gossip_receiver.clone() }))
73            .await
74    }
75}
76
77impl Actor for BootstrapActor {
78    async fn run(&mut self) -> Result<()> {
79        loop {
80            tokio::select! {
81                Some(action) = self.rx.recv() => {
82                    action(self).await;
83                }
84                _ = tokio::signal::ctrl_c() => {
85                    break;
86                }
87            }
88        }
89        Ok(())
90    }
91}
92
93impl BootstrapActor {
94    pub async fn start_bootstrap(&mut self) -> Result<tokio::sync::oneshot::Receiver<()>> {
95        let (sender, receiver) = tokio::sync::oneshot::channel();
96        tokio::spawn({
97            let mut last_published_unix_minute = 0;
98            let (gossip_sender, gossip_receiver) =
99                    (self.gossip_sender.clone(), self.gossip_receiver.clone());
100            let record_publisher = self.record_publisher.clone();
101            async move {
102                
103                loop {
104                    // Check if we are connected to at least one node
105                    if gossip_receiver.is_joined().await {
106                        break;
107                    }
108
109                    // On the first try we check the prev unix minute, after that the current one
110                    let unix_minute = crate::unix_minute(if last_published_unix_minute == 0 {
111                        -1
112                    } else {
113                        0
114                    });
115
116                    // Unique, verified records for the unix minute
117                    let records = record_publisher.get_records(unix_minute).await;
118
119                    // If there are no records, invoke the publish_proc (the publishing procedure)
120                    // continue the loop after
121                    if records.is_empty() {
122                        if unix_minute != last_published_unix_minute {
123                            last_published_unix_minute = unix_minute;
124                            tokio::spawn({
125                                let record_creator = record_publisher.clone();
126                                let record = Record::sign(
127                                    record_publisher.record_topic().hash(),
128                                    unix_minute,
129                                    record_publisher.pub_key().to_bytes(),
130                                    [[0; 32]; 5],
131                                    [[0; 32]; 5],
132                                    &record_publisher.signing_key(),
133                                );
134                                async move {
135                                    let _ = record_creator.publish_record(record).await;
136                                }
137                            });
138                        }
139                        sleep(Duration::from_millis(100)).await;
140                        continue;
141                    }
142
143                    // We found records
144
145                    // Collect node ids from active_peers and record.node_id (of publisher)
146                    let bootstrap_nodes = records
147                        .iter()
148                        .flat_map(|record| {
149                            let mut v = vec![record.node_id()];
150                            for peer in record.active_peers() {
151                                if peer != [0; 32] {
152                                    v.push(peer);
153                                }
154                            }
155                            v
156                        })
157                        .filter_map(|node_id| iroh::NodeId::from_bytes(&node_id).ok())
158                        .collect::<HashSet<_>>();
159
160                    // Maybe in the meantime someone connected to us via one of our published records
161                    // we don't want to disrup the gossip rotations any more then we have to
162                    // so we check again before joining new peers
163                    /*
164                    println!(
165                        "checking if joined before joining peers: {}",
166                        gossip_receiver.neighbors().await.len()
167                    );
168                    println!("bootstrap_records: {}", bootstrap_nodes.len());
169                    */
170                    if gossip_receiver.is_joined().await {
171                        break;
172                    }
173
174                    // Instead of throwing everything into join_peers() at once we go node_id by node_id
175                    // again to disrupt as little nodes peer neighborhoods as possible.
176                    for node_id in bootstrap_nodes.iter() {
177                        match gossip_sender.join_peers(vec![*node_id], None).await {
178                            Ok(_) => {
179                                sleep(Duration::from_millis(100)).await;
180                                //println!("joined peer: {}", node_id);
181                                if gossip_receiver.is_joined().await {
182                                    break;
183                                }
184                            }
185                            Err(_) => {
186                                //println!("failed to join peers");
187                                continue;
188                            }
189                        }
190                    }
191
192                    // If we are still not connected to anyone:
193                    // give it the default iroh-gossip connection timeout before the final is_joined() check
194                    if !gossip_receiver.is_joined().await {
195                        sleep(Duration::from_millis(500)).await;
196                    }
197
198                    // If we are connected: return
199                    if gossip_receiver.is_joined().await {
200                        break;
201                    } else {
202                        // If we are not connected: check if we should publish a record this minute
203                        if unix_minute != last_published_unix_minute {
204                            last_published_unix_minute = unix_minute;
205                            tokio::spawn({
206                                let record_creator = record_publisher.clone();
207                                let record = Record::sign(
208                                    record_publisher.record_topic().hash(),
209                                    unix_minute,
210                                    record_publisher.pub_key().to_bytes(),
211                                    [[0; 32]; 5],
212                                    [[0; 32]; 5],
213                                    &record_publisher.signing_key(),
214                                );
215                                async move {
216                                    let _ = record_creator.publish_record(record).await;
217                                }
218                            });
219                        }
220                        sleep(Duration::from_millis(100)).await;
221                        continue;
222                    }
223                }
224                let _ = sender.send(());
225            }
226        });
227
228        Ok(receiver)
229    }
230}