distributed_topic_tracker/gossip/topic/
bootstrap.rs

1use std::{collections::HashSet, time::Duration};
2
3use actor_helper::{Action, Actor, Handle, act, act_ok};
4use anyhow::Result;
5use tokio::time::sleep;
6
7use crate::{
8    GossipSender,
9    crypto::Record,
10    gossip::{GossipRecordContent, receiver::GossipReceiver},
11};
12
13#[derive(Debug, Clone)]
14pub struct Bootstrap {
15    api: Handle<BootstrapActor>,
16}
17
18#[derive(Debug)]
19struct BootstrapActor {
20    rx: tokio::sync::mpsc::Receiver<Action<Self>>,
21
22    record_publisher: crate::crypto::RecordPublisher,
23
24    gossip_sender: GossipSender,
25    gossip_receiver: GossipReceiver,
26}
27
28impl Bootstrap {
29    pub async fn new(
30        record_publisher: crate::crypto::RecordPublisher,
31        gossip: iroh_gossip::net::Gossip,
32    ) -> Result<Self> {
33        let gossip_topic: iroh_gossip::api::GossipTopic = gossip
34            .subscribe(
35                iroh_gossip::proto::TopicId::from(record_publisher.record_topic().hash()),
36                vec![],
37            )
38            .await?;
39        let (gossip_sender, gossip_receiver) = gossip_topic.split();
40        let (gossip_sender, gossip_receiver) = (
41            GossipSender::new(gossip_sender, gossip.clone()),
42            GossipReceiver::new(gossip_receiver, gossip.clone()),
43        );
44
45        let (api, rx) = Handle::channel(32);
46
47        tokio::spawn(async move {
48            let mut actor = BootstrapActor {
49                rx,
50                record_publisher,
51                gossip_sender,
52                gossip_receiver,
53            };
54            let _ = actor.run().await;
55        });
56
57        Ok(Self { api: api })
58    }
59
60    pub async fn bootstrap(&self) -> Result<tokio::sync::oneshot::Receiver<()>> {
61        self.api.call(act!(actor=> actor.start_bootstrap())).await
62    }
63
64    pub async fn gossip_sender(&self) -> Result<GossipSender> {
65        self.api
66            .call(act_ok!(actor => async move { actor.gossip_sender.clone() }))
67            .await
68    }
69
70    pub async fn gossip_receiver(&self) -> Result<GossipReceiver> {
71        self.api
72            .call(act_ok!(actor => async move { actor.gossip_receiver.clone() }))
73            .await
74    }
75}
76
77impl Actor for BootstrapActor {
78    async fn run(&mut self) -> Result<()> {
79        loop {
80            tokio::select! {
81                Some(action) = self.rx.recv() => {
82                    action(self).await;
83                }
84                _ = tokio::signal::ctrl_c() => {
85                    break;
86                }
87            }
88        }
89        Ok(())
90    }
91}
92
93impl BootstrapActor {
94    pub async fn start_bootstrap(&mut self) -> Result<tokio::sync::oneshot::Receiver<()>> {
95        let (sender, receiver) = tokio::sync::oneshot::channel();
96        tokio::spawn({
97            let mut last_published_unix_minute = 0;
98            let (gossip_sender, gossip_receiver) =
99                (self.gossip_sender.clone(), self.gossip_receiver.clone());
100            let record_publisher = self.record_publisher.clone();
101            async move {
102                loop {
103                    // Check if we are connected to at least one node
104                    if gossip_receiver.is_joined().await {
105                        break;
106                    }
107
108                    // On the first try we check the prev unix minute, after that the current one
109                    let unix_minute = crate::unix_minute(if last_published_unix_minute == 0 {
110                        -1
111                    } else {
112                        0
113                    });
114
115                    // Unique, verified records for the unix minute
116                    let records = record_publisher.get_records(unix_minute).await;
117
118                    // If there are no records, invoke the publish_proc (the publishing procedure)
119                    // continue the loop after
120                    if records.is_empty() {
121                        if unix_minute != last_published_unix_minute {
122                            last_published_unix_minute = unix_minute;
123                            let record_creator = record_publisher.clone();
124                            let record_content = GossipRecordContent {
125                                active_peers: [[0; 32]; 5],
126                                last_message_hashes: [[0; 32]; 5],
127                            };
128                            if let Ok(record) = Record::sign(
129                                record_publisher.record_topic().hash(),
130                                unix_minute,
131                                record_publisher.pub_key().to_bytes(),
132                                record_content,
133                                &record_publisher.signing_key(),
134                            ) {
135                                tokio::spawn(async move {
136                                    let _ = record_creator.publish_record(record).await;
137                                });
138                            }
139                        }
140                        sleep(Duration::from_millis(100)).await;
141                        continue;
142                    }
143
144                    // We found records
145
146                    // Collect node ids from active_peers and record.node_id (of publisher)
147                    let bootstrap_nodes = records
148                        .iter()
149                        .flat_map(|record| {
150                            let mut v = vec![record.node_id()];
151                            if let Ok(record_content) = record.content::<GossipRecordContent>() {
152                                for peer in record_content.active_peers {
153                                    if peer != [0; 32] {
154                                        v.push(peer);
155                                    }
156                                }
157                            }
158                            v
159                        })
160                        .filter_map(|node_id| iroh::NodeId::from_bytes(&node_id).ok())
161                        .collect::<HashSet<_>>();
162
163                    // Maybe in the meantime someone connected to us via one of our published records
164                    // we don't want to disrup the gossip rotations any more then we have to
165                    // so we check again before joining new peers
166                    /*
167                    println!(
168                        "checking if joined before joining peers: {}",
169                        gossip_receiver.neighbors().await.len()
170                    );
171                    println!("bootstrap_records: {}", bootstrap_nodes.len());
172                    */
173                    if gossip_receiver.is_joined().await {
174                        break;
175                    }
176
177                    // Instead of throwing everything into join_peers() at once we go node_id by node_id
178                    // again to disrupt as little nodes peer neighborhoods as possible.
179                    for node_id in bootstrap_nodes.iter() {
180                        match gossip_sender.join_peers(vec![*node_id], None).await {
181                            Ok(_) => {
182                                sleep(Duration::from_millis(100)).await;
183                                //println!("joined peer: {}", node_id);
184                                if gossip_receiver.is_joined().await {
185                                    break;
186                                }
187                            }
188                            Err(_) => {
189                                //println!("failed to join peers");
190                                continue;
191                            }
192                        }
193                    }
194
195                    // If we are still not connected to anyone:
196                    // give it the default iroh-gossip connection timeout before the final is_joined() check
197                    if !gossip_receiver.is_joined().await {
198                        sleep(Duration::from_millis(500)).await;
199                    }
200
201                    // If we are connected: return
202                    if gossip_receiver.is_joined().await {
203                        break;
204                    } else {
205                        // If we are not connected: check if we should publish a record this minute
206                        if unix_minute != last_published_unix_minute {
207                            last_published_unix_minute = unix_minute;
208                            let record_creator = record_publisher.clone();
209                            if let Ok(record) = Record::sign(
210                                record_publisher.record_topic().hash(),
211                                unix_minute,
212                                record_publisher.pub_key().to_bytes(),
213                                GossipRecordContent {
214                                    active_peers: [[0; 32]; 5],
215                                    last_message_hashes: [[0; 32]; 5],
216                                },
217                                &record_publisher.signing_key(),
218                            ) {
219                                tokio::spawn(async move {
220                                    let _ = record_creator.publish_record(record).await;
221                                });
222                            }
223                        }
224                        sleep(Duration::from_millis(100)).await;
225                        continue;
226                    }
227                }
228                let _ = sender.send(());
229            }
230        });
231
232        Ok(receiver)
233    }
234}