1use std::{collections::HashSet, sync::Arc, time::Duration};
2
3use anyhow::{Result, bail};
4use arc_swap::ArcSwap;
5use ed25519_dalek::ed25519::signature::SignerMut;
6use futures::StreamExt as _;
7use iroh::Endpoint;
8use iroh_gossip::{api::Event, proto::DeliveryScope};
9use mainline::{MutableItem, async_dht::AsyncDht};
10use once_cell::sync::Lazy;
11use rand::seq::SliceRandom;
12use sha2::Digest;
13
14use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
15use tokio::time::{sleep, timeout};
16
17pub const MAX_JOIN_PEERS_COUNT: usize = 30;
18pub const MAX_BOOTSTRAP_RECORDS: usize = 10;
19pub const SECRET_ROTATION: DefaultSecretRotation = DefaultSecretRotation;
20
21static DHT: Lazy<ArcSwap<mainline::async_dht::AsyncDht>> = Lazy::new(|| {
22 ArcSwap::from_pointee(
23 mainline::Dht::builder()
24 .build()
25 .expect("failed to create dht")
26 .as_async(),
27 )
28});
29
30fn get_dht() -> Arc<AsyncDht> {
31 DHT.load_full()
32}
33
34async fn reset_dht() {
35 let n_dht = mainline::Dht::builder()
36 .build()
37 .expect("failed to create dht");
38 DHT.store(Arc::new(n_dht.as_async()));
39}
40
41#[derive(Debug, Clone)]
42pub struct EncryptedRecord {
43 encrypted_record: Vec<u8>,
44 encrypted_decryption_key: Vec<u8>,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Hash)]
48pub struct Record {
49 topic: [u8; 32],
50 unix_minute: u64,
51 node_id: [u8; 32],
52 active_peers: [[u8; 32]; 5],
53 last_message_hashes: [[u8; 32]; 5],
54 signature: [u8; 64],
55}
56
57pub struct Gossip<R: SecretRotation + Default + Clone + Send + 'static> {
58 pub gossip: iroh_gossip::net::Gossip,
59 endpoint: iroh::Endpoint,
60 secret_rotation_function: R,
61}
62
63#[derive(Debug)]
64pub struct Topic<R: SecretRotation + Default + Clone + Send + 'static> {
65 topic_id: TopicId,
66 gossip_sender: GossipSender,
67 gossip_receiver: GossipReceiver,
68 _gossip: iroh_gossip::net::Gossip,
69 initial_secret_hash: [u8; 32],
70 secret_rotation_function: R,
71 node_id: iroh::NodeId,
72}
73
74#[derive(Debug, Clone)]
75pub struct TopicId {
76 _raw: String,
77 hash: [u8; 32], }
79
80#[derive(Debug)]
81pub struct GossipReceiver {
82 gossip_event_forwarder: tokio::sync::broadcast::Sender<iroh_gossip::api::Event>,
83 action_req: tokio::sync::mpsc::Sender<InnerActionRecv>,
84 last_message_hashes: Vec<[u8; 32]>,
85 _keep_alive_rx: tokio::sync::broadcast::Receiver<iroh_gossip::api::Event>,
86 _gossip: iroh_gossip::net::Gossip,
87}
88
89impl Clone for GossipReceiver {
90 fn clone(&self) -> Self {
91 Self {
92 gossip_event_forwarder: self.gossip_event_forwarder.clone(),
93 action_req: self.action_req.clone(),
94 last_message_hashes: self.last_message_hashes.clone(),
95 _keep_alive_rx: self.gossip_event_forwarder.subscribe(),
96 _gossip: self._gossip.clone(),
97 }
98 }
99}
100
101#[derive(Debug)]
102pub struct GossipSender {
103 action_req: tokio::sync::mpsc::Sender<InnerActionSend>,
104 _gossip: iroh_gossip::net::Gossip,
105}
106
107impl Clone for GossipSender {
108 fn clone(&self) -> Self {
109 Self {
110 action_req: self.action_req.clone(),
111 _gossip: self._gossip.clone(),
112 }
113 }
114}
115
116
117#[derive(Debug)]
118enum InnerActionRecv {
119 ReqNeighbors(tokio::sync::oneshot::Sender<HashSet<iroh::NodeId>>),
120 ReqIsJoined(tokio::sync::oneshot::Sender<bool>),
121}
122
123#[derive(Debug)]
124enum InnerActionSend {
125 ReqSend(Vec<u8>, tokio::sync::oneshot::Sender<bool>),
126 ReqJoinPeers(Vec<iroh::NodeId>, tokio::sync::oneshot::Sender<bool>),
127}
128
129impl EncryptedRecord {
130 pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
131 let one_time_key_bytes: [u8; 32] = decryption_key
132 .decrypt(&self.encrypted_decryption_key)?
133 .as_slice()
134 .try_into()?;
135 let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
136
137 let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
138 let record = Record::from_bytes(decrypted_record)?;
139 Ok(record)
140 }
141
142 pub fn to_bytes(&self) -> Vec<u8> {
143 let mut buf = Vec::new();
144 let encrypted_record_len = self.encrypted_record.len() as u32;
145 buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
146 buf.extend_from_slice(&self.encrypted_record);
147 buf.extend_from_slice(&self.encrypted_decryption_key);
148 buf
149 }
150
151 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
152 let (encrypted_record_len, buf) = buf.split_at(4);
153 let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
154 let (encrypted_record, encrypted_decryption_key) =
155 buf.split_at(encrypted_record_len as usize);
156
157 Ok(Self {
158 encrypted_record: encrypted_record.to_vec(),
159 encrypted_decryption_key: encrypted_decryption_key.to_vec(),
160 })
161 }
162}
163
164impl Record {
165 pub fn sign(
166 topic: [u8; 32],
167 unix_minute: u64,
168 node_id: [u8; 32],
169 active_peers: [[u8; 32]; 5],
170 last_message_hashes: [[u8; 32]; 5],
171 signing_key: &ed25519_dalek::SigningKey,
172 ) -> Self {
173 let mut signature_data = Vec::new();
174 signature_data.extend_from_slice(&topic);
175 signature_data.extend_from_slice(&unix_minute.to_le_bytes());
176 signature_data.extend_from_slice(&node_id);
177 for active_peer in active_peers {
178 signature_data.extend_from_slice(&active_peer);
179 }
180 for last_message_hash in last_message_hashes {
181 signature_data.extend_from_slice(&last_message_hash);
182 }
183 let mut signing_key = signing_key.clone();
184 let signature = signing_key.sign(&signature_data);
185 Self {
186 topic,
187 unix_minute,
188 node_id,
189 active_peers,
190 last_message_hashes,
191 signature: signature.to_bytes(),
192 }
193 }
194
195 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
196 let (topic, buf) = buf.split_at(32);
197 let (unix_minute, buf) = buf.split_at(8);
198 let (node_id, mut buf) = buf.split_at(32);
199
200 let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
201 #[allow(clippy::needless_range_loop)]
202 for i in 0..active_peers.len() {
203 let (active_peer, _buf) = buf.split_at(32);
204 active_peers[i] = active_peer.try_into()?;
205 buf = _buf;
206 }
207 let mut last_message_hashes: [[u8; 32]; 5] = [[0; 32]; 5];
208 #[allow(clippy::needless_range_loop)]
209 for i in 0..last_message_hashes.len() {
210 let (last_message_hash, _buf) = buf.split_at(32);
211 last_message_hashes[i] = last_message_hash.try_into()?;
212 buf = _buf;
213 }
214
215 let (signature, buf) = buf.split_at(64);
216
217 if !buf.is_empty() {
218 bail!("buffer not empty after reconstruction")
219 }
220
221 Ok(Self {
222 topic: topic.try_into()?,
223 unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
224 node_id: node_id.try_into()?,
225 active_peers,
226 last_message_hashes,
227 signature: signature.try_into()?,
228 })
229 }
230
231 pub fn to_bytes(&self) -> Vec<u8> {
232 let mut buf = Vec::new();
233 buf.extend_from_slice(&self.topic);
234 buf.extend_from_slice(&self.unix_minute.to_le_bytes());
235 buf.extend_from_slice(&self.node_id);
236 for active_peer in self.active_peers {
237 buf.extend_from_slice(&active_peer);
238 }
239 for last_message_hash in self.last_message_hashes {
240 buf.extend_from_slice(&last_message_hash);
241 }
242 buf.extend_from_slice(&self.signature);
243 buf
244 }
245
246 pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
247 if self.topic != *actual_topic {
248 bail!("topic mismatch")
249 }
250 if self.unix_minute != actual_unix_minute {
251 bail!("unix minute mismatch")
252 }
253
254 let record_bytes = self.to_bytes();
255 let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
256 let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
257 let node_id = ed25519_dalek::VerifyingKey::from_bytes(&self.node_id)?;
258
259 node_id.verify_strict(signature_data.as_slice(), &signature)?;
260
261 Ok(())
262 }
263
264 pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
265 let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::thread_rng());
266 let p_key = one_time_key.verifying_key();
267 let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
268 let key_enc = encryption_key
269 .verifying_key()
270 .encrypt(&one_time_key.to_bytes())
271 .expect("encryption failed");
272
273 EncryptedRecord {
274 encrypted_record: data_enc,
275 encrypted_decryption_key: key_enc,
276 }
277 }
278}
279
280impl GossipSender {
281 pub fn new(gossip_sender: iroh_gossip::api::GossipSender, gossip: iroh_gossip::net::Gossip) -> Self {
282 let (action_req_tx, mut action_req_rx) =
283 tokio::sync::mpsc::channel::<InnerActionSend>(1024);
284
285 tokio::spawn({
286 let gossip_sender = gossip_sender;
287 async move {
288 while let Some(inner_action) = action_req_rx.recv().await {
289 match inner_action {
290 InnerActionSend::ReqSend(data, tx) => {
291 let res = gossip_sender.broadcast(data.into()).await;
292 tx.send(res.is_ok()).expect("broadcast failed");
293 }
294 InnerActionSend::ReqJoinPeers(peers, tx) => {
295 let res = gossip_sender.join_peers(peers).await;
296 tx.send(res.is_ok()).expect("broadcast failed");
297 }
298 }
299 }
300 }
301 });
302
303 Self {
304 action_req: action_req_tx,
305 _gossip: gossip,
306 }
307 }
308
309 pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
310 let (tx, rx) = tokio::sync::oneshot::channel::<bool>();
311 let _ = self
312 .action_req
313 .send(InnerActionSend::ReqSend(data, tx))
314 .await;
315
316 match rx.await {
317 Ok(true) => Ok(()),
318 Ok(false) => bail!("broadcast failed"),
319 Err(_) => panic!("broadcast failed"),
320 }
321 }
322
323 pub async fn join_peers(
324 &self,
325 peers: Vec<iroh::NodeId>,
326 max_peers: Option<usize>,
327 ) -> Result<()> {
328 let mut peers = peers;
329 if let Some(max_peers) = max_peers {
330 peers.shuffle(&mut rand::thread_rng());
331 peers.truncate(max_peers);
332 }
333
334 let (tx, rx) = tokio::sync::oneshot::channel::<bool>();
335 let _ = self
336 .action_req
337 .send(InnerActionSend::ReqJoinPeers(peers, tx))
338 .await;
339
340 match rx.await {
341 Ok(true) => Ok(()),
342 Ok(false) => bail!("join peers failed"),
343 Err(_) => panic!("broadcast failed"),
344 }
345 }
346}
347
348impl GossipReceiver {
349 pub fn new(gossip_receiver: iroh_gossip::api::GossipReceiver, gossip: iroh_gossip::net::Gossip) -> Self {
350 let (gossip_forward_tx, _) =
351 tokio::sync::broadcast::channel::<iroh_gossip::api::Event>(1024);
352 let (action_req_tx, mut action_req_rx) =
353 tokio::sync::mpsc::channel::<InnerActionRecv>(1024);
354
355 let keep_alive_rx = gossip_forward_tx.subscribe();
356
357 let self_ref = Self {
358 gossip_event_forwarder: gossip_forward_tx.clone(),
359 action_req: action_req_tx.clone(),
360 last_message_hashes: vec![],
361 _keep_alive_rx: keep_alive_rx,
362 _gossip: gossip,
363 };
364
365 tokio::spawn({
366 let mut self_ref = self_ref.clone();
367 async move {
368 let mut gossip_receiver = gossip_receiver;
369 loop {
370 tokio::select! {
371 Some(inner_action) = action_req_rx.recv() => {
372 match inner_action {
373 InnerActionRecv::ReqNeighbors(tx) => {
374 let neighbors = gossip_receiver.neighbors().collect::<HashSet<iroh::NodeId>>();
375 let _ = tx.send(neighbors);
376 },
377 InnerActionRecv::ReqIsJoined(tx) => {
378 let is_joined = gossip_receiver.is_joined();
379 let _ = tx.send(is_joined);
380 }
381 }
382 }
383 gossip_event_res = gossip_receiver.next() => {
384 if let Some(Ok(gossip_event)) = gossip_event_res {
385 if let Event::Received(msg) = gossip_event.clone() {
386 if let DeliveryScope::Swarm(_) = msg.scope {
387 let hash = sha2::Sha512::digest(&msg.content);
388 self_ref.last_message_hashes.push(hash[..32].try_into().expect("hashing failed"));
389 while self_ref.last_message_hashes.len() > 5 {
390 self_ref.last_message_hashes.pop();
391 }
392 }
393 }
394 let _ = self_ref.gossip_event_forwarder.send(gossip_event.clone());
395 } else {
396 break;
397 }
398 }
399 }
400 }
401 }
402 });
403
404 self_ref
405 }
406
407 pub async fn neighbors(&mut self) -> Result<HashSet<iroh::NodeId>> {
408 let (neighbors_tx, neighbors_rx) = tokio::sync::oneshot::channel::<HashSet<iroh::NodeId>>();
409
410 let _ = self
411 .action_req
412 .send(InnerActionRecv::ReqNeighbors(neighbors_tx))
413 .await;
414
415 neighbors_rx.await.map_err(|err| anyhow::anyhow!(err))
416 }
417
418 pub async fn is_joined(&mut self) -> Result<bool> {
419 let (is_joined_tx, is_joined_rx) = tokio::sync::oneshot::channel::<bool>();
420
421 let _ = self
422 .action_req
423 .send(InnerActionRecv::ReqIsJoined(is_joined_tx))
424 .await;
425
426 is_joined_rx.await.map_err(|err| anyhow::anyhow!(err))
427 }
428
429 pub async fn subscribe(&mut self) -> Result<tokio::sync::broadcast::Receiver<Event>> {
430 Ok(self.gossip_event_forwarder.subscribe())
431 }
432
433 pub fn last_message_hashes(&self) -> Vec<[u8; 32]> {
434 self.last_message_hashes.clone()
435 }
436}
437
438impl TopicId {
439 pub fn new(raw: String) -> Self {
440 let mut raw_hash = sha2::Sha512::new();
441 raw_hash.update(raw.as_bytes());
442
443 Self {
444 _raw: raw,
445 hash: raw_hash.finalize()[..32]
446 .try_into()
447 .expect("hashing 'raw' failed"),
448 }
449 }
450}
451
452impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
454 pub async fn new(
455 topic_id: TopicId,
456 endpoint: &iroh::Endpoint,
457 node_signing_key: &ed25519_dalek::SigningKey,
458 gossip: iroh_gossip::net::Gossip,
459 initial_secret: &Vec<u8>,
460 secret_rotation_function: Option<R>,
461 async_bootstrap: bool,
462 ) -> Result<Self> {
463 let mut initial_secret_hash = sha2::Sha512::new();
465 initial_secret_hash.update(initial_secret);
466 let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
467 .try_into()
468 .expect("hashing failed");
469
470 let (gossip_tx, gossip_rx) = if async_bootstrap {
472 Self::bootstrap_no_wait(
473 topic_id.clone(),
474 endpoint,
475 node_signing_key,
476 gossip.clone(),
477 initial_secret_hash,
478 secret_rotation_function.clone(),
479 )
480 .await?
481 } else {
482 Self::bootstrap(
483 topic_id.clone(),
484 endpoint,
485 node_signing_key,
486 gossip.clone(),
487 initial_secret_hash,
488 secret_rotation_function.clone(),
489 )
490 .await?
491 };
492
493 let _join_handler = Self::spawn_publisher(
495 topic_id.clone(),
496 secret_rotation_function.clone(),
497 initial_secret_hash,
498 endpoint.node_id(),
499 gossip_rx.clone(),
500 gossip_tx.clone(),
501 node_signing_key.clone(),
502 );
503
504 Ok(Self {
505 topic_id,
506 gossip_sender: gossip_tx,
507 gossip_receiver: gossip_rx,
508 _gossip: gossip,
509 initial_secret_hash,
510 secret_rotation_function: secret_rotation_function.unwrap_or_default(),
511 node_id: endpoint.node_id(),
512 })
513 }
514
515 pub fn split(&self) -> (GossipSender, GossipReceiver) {
516 (self.gossip_sender.clone(), self.gossip_receiver.clone())
517 }
518
519 pub fn topic_id(&self) -> &TopicId {
520 &self.topic_id
521 }
522
523 pub fn node_id(&self) -> &iroh::NodeId {
524 &self.node_id
525 }
526
527 pub fn gossip_sender(&self) -> GossipSender {
528 self.gossip_sender.clone()
529 }
530
531 pub fn gossip_receiver(&self) -> GossipReceiver {
532 self.gossip_receiver.clone()
533 }
534
535 pub fn secret_rotation_function(&self) -> R {
536 self.secret_rotation_function.clone()
537 }
538
539 pub fn initial_secret_hash(&self) -> [u8; 32] {
540 self.initial_secret_hash
541 }
542
543 pub fn set_initial_secret_hash(&mut self, initial_secret_hash: [u8; 32]) {
544 self.initial_secret_hash = initial_secret_hash;
545 }
546}
547
548impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
550 pub async fn bootstrap_no_wait(
551 topic_id: TopicId,
552 endpoint: &iroh::Endpoint,
553 node_signing_key: &ed25519_dalek::SigningKey,
554 gossip: iroh_gossip::net::Gossip,
555 initial_secret_hash: [u8; 32],
556 secret_rotation_function: Option<R>,
557 ) -> Result<(GossipSender, GossipReceiver)> {
558 let gossip_topic: iroh_gossip::api::GossipTopic = gossip
559 .subscribe(iroh_gossip::proto::TopicId::from(topic_id.hash), vec![])
560 .await?;
561 let (gossip_sender, gossip_receiver) = gossip_topic.split();
562 let (gossip_sender, gossip_receiver) = (
563 GossipSender::new(gossip_sender,gossip.clone()),
564 GossipReceiver::new(gossip_receiver,gossip.clone()),
565 );
566
567 tokio::spawn({
568 let gossip_sender = gossip_sender.clone();
569 let gossip_receiver = gossip_receiver.clone();
570 let endpoint = endpoint.clone();
571 let node_signing_key = node_signing_key.clone();
572 async move {
573 Self::bootstrap_from_gossip(
574 gossip_sender,
575 gossip_receiver,
576 topic_id,
577 &endpoint,
578 &node_signing_key,
579 initial_secret_hash,
580 secret_rotation_function,
581 )
582 .await
583 }
584 });
585
586 Ok((gossip_sender, gossip_receiver))
587 }
588
589 pub async fn bootstrap(
590 topic_id: TopicId,
591 endpoint: &iroh::Endpoint,
592 node_signing_key: &ed25519_dalek::SigningKey,
593 gossip: iroh_gossip::net::Gossip,
594 initial_secret_hash: [u8; 32],
595 secret_rotation_function: Option<R>,
596 ) -> Result<(GossipSender, GossipReceiver)> {
597 let gossip_topic: iroh_gossip::api::GossipTopic = gossip
598 .subscribe(iroh_gossip::proto::TopicId::from(topic_id.hash), vec![])
599 .await?;
600 let (gossip_sender, gossip_receiver) = gossip_topic.split();
601 let (gossip_sender, gossip_receiver) = (
602 GossipSender::new(gossip_sender,gossip.clone()),
603 GossipReceiver::new(gossip_receiver,gossip.clone()),
604 );
605 Self::bootstrap_from_gossip(
606 gossip_sender,
607 gossip_receiver,
608 topic_id,
609 endpoint,
610 node_signing_key,
611 initial_secret_hash,
612 secret_rotation_function,
613 )
614 .await
615 }
616
617 async fn bootstrap_from_gossip(
618 gossip_sender: GossipSender,
619 mut gossip_receiver: GossipReceiver,
620 topic_id: TopicId,
621 endpoint: &iroh::Endpoint,
622 node_signing_key: &ed25519_dalek::SigningKey,
623 initial_secret_hash: [u8; 32],
624 secret_rotation_function: Option<R>,
625 ) -> Result<(GossipSender, GossipReceiver)> {
626 let mut last_published_unix_minute = 0;
627 loop {
628 if let Ok(joined) = gossip_receiver.is_joined().await {
630 if joined {
631 return Ok((gossip_sender, gossip_receiver));
632 }
633 }
634
635 let unix_minute = crate::unix_minute(if last_published_unix_minute == 0 {
637 -1
638 } else {
639 0
640 });
641
642 let records = Topic::get_unix_minute_records(
644 &topic_id.clone(),
645 unix_minute,
646 secret_rotation_function.clone(),
647 initial_secret_hash,
648 &endpoint.node_id(),
649 )
650 .await;
651
652 if records.is_empty() {
655 if unix_minute != last_published_unix_minute {
656 last_published_unix_minute = unix_minute;
657 tokio::spawn({
658 let topic_id = topic_id.clone();
659 let node_id = endpoint.node_id();
660 let node_signing_key = node_signing_key.clone();
661 let secret_rotation_function = secret_rotation_function.clone();
662 async move {
663 let _ = Self::publish_proc(
664 unix_minute,
665 &topic_id,
666 secret_rotation_function,
667 initial_secret_hash,
668 node_id,
669 &node_signing_key,
670 HashSet::new(),
671 vec![],
672 )
673 .await;
674 }
675 });
676 }
677 sleep(Duration::from_millis(100)).await;
678 continue;
679 }
680
681 let bootstrap_nodes = records
685 .iter()
686 .flat_map(|record| {
687 let mut v = vec![record.node_id];
688 for peer in record.active_peers {
689 if peer != [0; 32] {
690 v.push(peer);
691 }
692 }
693 v
694 })
695 .filter_map(|node_id| iroh::NodeId::from_bytes(&node_id).ok())
696 .collect::<HashSet<_>>();
697
698 if let Ok(joined) = gossip_receiver.is_joined().await {
702 if joined {
703 return Ok((gossip_sender, gossip_receiver));
704 }
705 }
706
707 for node_id in bootstrap_nodes.iter() {
710 match gossip_sender.join_peers(vec![*node_id], None).await {
711 Ok(_) => {
712 sleep(Duration::from_millis(100)).await;
713 if let Ok(joined) = gossip_receiver.is_joined().await {
714 if joined {
715 break;
716 }
717 }
718 }
719 Err(_) => {
720 continue;
721 }
722 }
723 }
724
725 if let Ok(joined) = gossip_receiver.is_joined().await {
728 if !joined {
729 sleep(Duration::from_millis(500)).await;
730 }
731 }
732
733 if let Ok(joined) = gossip_receiver.is_joined().await {
735 if joined {
736 return Ok((gossip_sender, gossip_receiver));
737 }
738 } else {
739 if unix_minute != last_published_unix_minute {
741 last_published_unix_minute = unix_minute;
742 tokio::spawn({
743 let topic_id = topic_id.clone();
744 let node_id = endpoint.node_id();
745 let node_signing_key = node_signing_key.clone();
746 let secret_rotation_function = secret_rotation_function.clone();
747 async move {
748 let _ = Self::publish_proc(
749 unix_minute,
750 &topic_id,
751 secret_rotation_function,
752 initial_secret_hash,
753 node_id,
754 &node_signing_key,
755 HashSet::new(),
756 vec![],
757 )
758 .await;
759 }
760 });
761 }
762 sleep(Duration::from_millis(100)).await;
763 continue;
764 }
765 }
766 }
767
768 #[allow(clippy::too_many_arguments)]
771 async fn publish_proc(
772 unix_minute: u64,
773 topic_id: &TopicId,
774 secret_rotation_function: Option<R>,
775 initial_secret_hash: [u8; 32],
776 node_id: iroh::NodeId,
777 node_signing_key: &ed25519_dalek::SigningKey,
778 neighbors: HashSet<iroh::NodeId>,
779 last_message_hashes: Vec<[u8; 32]>,
780 ) -> Result<HashSet<Record>> {
781 let records = Topic::<R>::get_unix_minute_records(
783 &topic_id.clone(),
784 unix_minute,
785 secret_rotation_function.clone(),
786 initial_secret_hash,
787 &node_id,
788 )
789 .await
790 .iter()
791 .filter(|&record| {
792 record
793 .active_peers
794 .iter()
795 .filter(|&peer| peer.eq(&[0u8; 32]))
796 .count()
797 > 0
798 || record
799 .last_message_hashes
800 .iter()
801 .filter(|&hash| hash.eq(&[0u8; 32]))
802 .count()
803 > 0
804 })
805 .cloned()
806 .collect::<HashSet<_>>();
807
808 if records.len() >= MAX_BOOTSTRAP_RECORDS {
811 return Ok(records);
812 }
813
814 let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
816 for (i, peer) in neighbors.iter().take(5).enumerate() {
817 active_peers[i] = *peer.as_bytes()
818 }
819
820 let mut last_message_hashes_array = [[0u8; 32]; 5];
821 for (i, hash) in last_message_hashes.iter().take(5).enumerate() {
822 last_message_hashes_array[i] = *hash;
823 }
824
825 let record = Record::sign(
826 topic_id.hash,
827 unix_minute,
828 *node_id.as_bytes(),
829 active_peers,
830 last_message_hashes_array,
831 node_signing_key,
832 );
833 Topic::<R>::publish_unix_minute_record(
834 unix_minute,
835 &topic_id.clone(),
836 secret_rotation_function.clone(),
837 initial_secret_hash,
838 record,
839 Some(3),
840 )
841 .await?;
842
843 Ok(records)
844 }
845
846 fn spawn_publisher(
848 topic_id: TopicId,
849 secret_rotation_function: Option<R>,
850 initial_secret_hash: [u8; 32],
851 node_id: iroh::NodeId,
852 gossip_receiver: GossipReceiver,
853 gossip_sender: GossipSender,
854 node_signing_key: ed25519_dalek::SigningKey,
855 ) -> tokio::task::JoinHandle<()> {
856 let mut gossip_receiver = gossip_receiver;
857
858 tokio::spawn(async move {
859 let mut backoff = 1;
860 loop {
861 let unix_minute = crate::unix_minute(0);
862
863 if let Ok(records) = Topic::<R>::publish_proc(
865 unix_minute,
866 &topic_id.clone(),
867 Some(secret_rotation_function.clone().unwrap_or_default()),
868 initial_secret_hash,
869 node_id,
870 &node_signing_key,
871 gossip_receiver.neighbors().await.unwrap_or_default(),
872 gossip_receiver.last_message_hashes(),
873 )
874 .await
875 {
876 let neighbors = gossip_receiver.neighbors().await.unwrap_or_default();
878 if neighbors.len() < 4 && !records.is_empty() {
879 let node_ids = records
880 .iter()
881 .flat_map(|record| {
882 record
883 .active_peers
884 .iter()
885 .filter_map(|&active_peer| {
886 if active_peer == [0; 32]
887 || neighbors.contains(&active_peer)
888 || active_peer.eq(record.node_id.to_vec().as_slice())
889 || active_peer.eq(node_id.as_bytes())
890 {
891 None
892 } else {
893 iroh::NodeId::from_bytes(&active_peer).ok()
894 }
895 })
896 .collect::<Vec<_>>()
897 })
898 .collect::<HashSet<_>>();
899 if gossip_sender
900 .join_peers(
901 node_ids.iter().cloned().collect::<Vec<_>>(),
902 Some(MAX_JOIN_PEERS_COUNT),
903 )
904 .await
905 .is_ok()
906 {
907 }
909 }
910
911 if !gossip_receiver.last_message_hashes().is_empty() {
913 let peers_to_join = records
914 .iter()
915 .filter(|record| {
916 !record.last_message_hashes.iter().all(|last_message_hash| {
917 *last_message_hash != [0; 32]
918 && gossip_receiver
919 .last_message_hashes()
920 .contains(last_message_hash)
921 })
922 })
923 .collect::<Vec<_>>();
924 if !peers_to_join.is_empty() {
925 let node_ids = peers_to_join
926 .iter()
927 .flat_map(|&record| {
928 let mut peers = vec![];
929 if let Ok(node_id) = iroh::NodeId::from_bytes(&record.node_id) {
930 peers.push(node_id);
931 }
932 for active_peer in record.active_peers {
933 if active_peer == [0; 32] {
934 continue;
935 }
936 if let Ok(node_id) = iroh::NodeId::from_bytes(&active_peer)
937 {
938 peers.push(node_id);
939 }
940 }
941 peers
942 })
943 .collect::<HashSet<_>>();
944
945 if gossip_sender
946 .join_peers(
947 node_ids.iter().cloned().collect::<Vec<_>>(),
948 Some(MAX_JOIN_PEERS_COUNT),
949 )
950 .await
951 .is_ok()
952 {
953 }
958 }
959 }
960 } else {
961 sleep(Duration::from_secs(backoff)).await;
962 backoff = (backoff * 2).max(60);
963 continue;
964 }
965
966 backoff = 1;
967 sleep(Duration::from_secs(rand::random::<u64>() % 60)).await;
968 }
969 })
970 }
971}
972
973impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
975 fn signing_keypair(topic_id: &TopicId, unix_minute: u64) -> ed25519_dalek::SigningKey {
976 let mut sign_keypair_hash = sha2::Sha512::new();
977 sign_keypair_hash.update(topic_id.hash);
978 sign_keypair_hash.update(unix_minute.to_le_bytes());
979 let sign_keypair_seed: [u8; 32] = sign_keypair_hash.finalize()[..32]
980 .try_into()
981 .expect("hashing failed");
982 ed25519_dalek::SigningKey::from_bytes(&sign_keypair_seed)
983 }
984
985 fn encryption_keypair(
986 topic_id: &TopicId,
987 secret_rotation_function: &R,
988 initial_secret_hash: [u8; 32],
989 unix_minute: u64,
990 ) -> ed25519_dalek::SigningKey {
991 let enc_keypair_seed = secret_rotation_function.get_unix_minute_secret(
992 topic_id.hash,
993 unix_minute,
994 initial_secret_hash,
995 );
996 ed25519_dalek::SigningKey::from_bytes(&enc_keypair_seed)
997 }
998
999 fn salt(topic_id: &TopicId, unix_minute: u64) -> [u8; 32] {
1001 let mut slot_hash = sha2::Sha512::new();
1002 slot_hash.update(topic_id.hash);
1003 slot_hash.update(unix_minute.to_le_bytes());
1004 slot_hash.finalize()[..32]
1005 .try_into()
1006 .expect("hashing failed")
1007 }
1008
1009 async fn get_unix_minute_records(
1010 topic_id: &TopicId,
1011 unix_minute: u64,
1012 secret_rotation_function: Option<R>,
1013 initial_secret_hash: [u8; 32],
1014 node_id: &iroh::NodeId,
1015 ) -> HashSet<Record> {
1016 let topic_sign = Topic::<R>::signing_keypair(topic_id, unix_minute);
1017 let encryption_key = Topic::<R>::encryption_keypair(
1018 topic_id,
1019 &secret_rotation_function.clone().unwrap_or_default(),
1020 initial_secret_hash,
1021 unix_minute,
1022 );
1023 let salt = Topic::<R>::salt(topic_id, unix_minute);
1024
1025 let dht = get_dht();
1027
1028 let records_iter = timeout(
1029 Duration::from_secs(10),
1030 dht.get_mutable(topic_sign.verifying_key().as_bytes(), Some(&salt), None)
1031 .collect::<Vec<_>>(),
1032 )
1033 .await
1034 .unwrap_or_default();
1035
1036 records_iter
1037 .iter()
1038 .filter_map(
1039 |record| match EncryptedRecord::from_bytes(record.value().to_vec()) {
1040 Ok(encrypted_record) => match encrypted_record.decrypt(&encryption_key) {
1041 Ok(record) => match record.verify(&topic_id.hash, unix_minute) {
1042 Ok(_) => match record.node_id.eq(node_id.as_bytes()) {
1043 true => None,
1044 false => Some(record),
1045 },
1046 Err(_) => None,
1047 },
1048 Err(_) => None,
1049 },
1050 Err(_) => None,
1051 },
1052 )
1053 .collect::<HashSet<_>>()
1054 }
1055
1056 async fn publish_unix_minute_record(
1057 unix_minute: u64,
1058 topic_id: &TopicId,
1059 secret_rotation_function: Option<R>,
1060 initial_secret_hash: [u8; 32],
1061 record: Record,
1062 retry_count: Option<usize>,
1063 ) -> Result<()> {
1064 let sign_key = Topic::<R>::signing_keypair(&topic_id.clone(), unix_minute);
1065 let salt = Topic::<R>::salt(topic_id, unix_minute);
1066 let encryption_key = Topic::<R>::encryption_keypair(
1067 &topic_id.clone(),
1068 &secret_rotation_function.clone().unwrap_or_default(),
1069 initial_secret_hash,
1070 unix_minute,
1071 );
1072 let encrypted_record = record.encrypt(&encryption_key);
1073
1074 for i in 0..retry_count.unwrap_or(3) {
1075 let dht = get_dht();
1076
1077 let most_recent_result = timeout(
1078 Duration::from_secs(10),
1079 dht.get_mutable_most_recent(
1080 sign_key.clone().verifying_key().as_bytes(),
1081 Some(&salt),
1082 ),
1083 )
1084 .await
1085 .unwrap_or_default();
1086
1087 let item = if let Some(mut_item) = most_recent_result {
1088 MutableItem::new(
1089 sign_key.clone(),
1090 &encrypted_record.to_bytes(),
1091 mut_item.seq() + 1,
1092 Some(&salt),
1093 )
1094 } else {
1095 MutableItem::new(
1096 sign_key.clone(),
1097 &encrypted_record.to_bytes(),
1098 0,
1099 Some(&salt),
1100 )
1101 };
1102
1103 let put_result = match timeout(
1104 Duration::from_secs(10),
1105 dht.put_mutable(item.clone(), Some(item.seq())),
1106 )
1107 .await
1108 {
1109 Ok(result) => result.ok(),
1110 Err(_) => None,
1111 };
1112
1113 if put_result.is_some() {
1114 break;
1115 } else if i == retry_count.unwrap_or(3) - 1 {
1116 bail!("failed to publish record")
1117 }
1118
1119 reset_dht().await;
1120
1121 sleep(Duration::from_millis(rand::random::<u64>() % 2000)).await;
1122 }
1123 Ok(())
1124 }
1125}
1126
1127pub trait AutoDiscoveryBuilder {
1128 #[allow(async_fn_in_trait)]
1129 async fn spawn_with_auto_discovery<R: SecretRotation + Default + Clone + Send + 'static>(
1130 self,
1131 endpoint: Endpoint,
1132 secret_rotation_function: Option<R>,
1133 ) -> Result<Gossip<R>>;
1134}
1135
1136impl AutoDiscoveryBuilder for iroh_gossip::net::Builder {
1137 async fn spawn_with_auto_discovery<R: SecretRotation + Default + Clone + Send + 'static>(
1138 self,
1139 endpoint: Endpoint,
1140 secret_rotation_function: Option<R>,
1141 ) -> Result<Gossip<R>> {
1142 Ok(Gossip {
1143 gossip: self.spawn(endpoint.clone()),
1144 endpoint: endpoint.clone(),
1145 secret_rotation_function: secret_rotation_function.unwrap_or_default(),
1146 })
1147 }
1148}
1149
1150pub trait AutoDiscoveryGossip<R: SecretRotation + Default + Clone + Send + 'static> {
1151 #[allow(async_fn_in_trait)]
1152 async fn subscribe_and_join_with_auto_discovery(
1153 &self,
1154 topic_id: TopicId,
1155 initial_secret: Vec<u8>,
1156 ) -> Result<Topic<R>>;
1157
1158 #[allow(async_fn_in_trait)]
1159 async fn subscribe_and_join_with_auto_discovery_no_wait(
1160 &self,
1161 topic_id: TopicId,
1162 initial_secret: Vec<u8>,
1163 ) -> Result<Topic<R>>;
1164}
1165
1166#[derive(Debug, Clone, Copy, Default)]
1168pub struct DefaultSecretRotation;
1169
1170pub trait SecretRotation {
1171 fn get_unix_minute_secret(
1172 &self,
1173 topic_hash: [u8; 32],
1174 unix_minute: u64,
1175 initial_secret_hash: [u8; 32],
1176 ) -> [u8; 32];
1177}
1178
1179impl<R: SecretRotation + Default + Clone + Send + 'static> AutoDiscoveryGossip<R> for Gossip<R> {
1180 async fn subscribe_and_join_with_auto_discovery(
1181 &self,
1182 topic_id: TopicId,
1183 initial_secret: Vec<u8>,
1184 ) -> Result<Topic<R>> {
1185 Topic::new(
1186 topic_id,
1187 &self.endpoint,
1188 self.endpoint.secret_key().secret(),
1189 self.gossip.clone(),
1190 &initial_secret,
1191 Some(self.secret_rotation_function.clone()),
1192 false,
1193 )
1194 .await
1195 }
1196
1197 async fn subscribe_and_join_with_auto_discovery_no_wait(
1198 &self,
1199 topic_id: TopicId,
1200 initial_secret: Vec<u8>,
1201 ) -> Result<Topic<R>> {
1202 Topic::new(
1203 topic_id,
1204 &self.endpoint,
1205 self.endpoint.secret_key().secret(),
1206 self.gossip.clone(),
1207 &initial_secret,
1208 Some(self.secret_rotation_function.clone()),
1209 true,
1210 )
1211 .await
1212 }
1213}
1214
1215impl SecretRotation for DefaultSecretRotation {
1216 fn get_unix_minute_secret(
1217 &self,
1218 topic_hash: [u8; 32],
1219 unix_minute: u64,
1220 initial_secret_hash: [u8; 32],
1221 ) -> [u8; 32] {
1222 let mut hash = sha2::Sha512::new();
1223 hash.update(topic_hash);
1224 hash.update(unix_minute.to_be_bytes());
1225 hash.update(initial_secret_hash);
1226 hash.finalize()[..32].try_into().expect("hashing failed")
1227 }
1228}
1229
1230pub fn unix_minute(minute_offset: i64) -> u64 {
1231 ((chrono::Utc::now().timestamp() as f64 / 60.0f64).floor() as i64 + minute_offset) as u64
1232}
1233
1234#[cfg(test)]
1235mod tests {
1236 use super::*;
1237 use ed25519_dalek::SigningKey;
1238 use rand::rngs::OsRng;
1239
1240 #[test]
1241 fn test_topic_id_creation() {
1242 let topic_id = TopicId::new("test-topic".to_string());
1243 assert_eq!(topic_id._raw, "test-topic");
1244 assert_eq!(topic_id.hash.len(), 32);
1245
1246 let topic_id2 = TopicId::new("test-topic".to_string());
1248 assert_eq!(topic_id.hash, topic_id2.hash);
1249
1250 let topic_id3 = TopicId::new("different-topic".to_string());
1252 assert_ne!(topic_id.hash, topic_id3.hash);
1253 }
1254
1255 #[test]
1256 fn test_record_serialization_roundtrip() {
1257 let signing_key = SigningKey::generate(&mut OsRng);
1258 let topic = [1u8; 32];
1259 let unix_minute = 12345u64;
1260 let node_id = [2u8; 32];
1261 let active_peers = [[3u8; 32]; 5];
1262 let last_message_hashes = [[4u8; 32]; 5];
1263
1264 let record = Record::sign(
1265 topic,
1266 unix_minute,
1267 node_id,
1268 active_peers,
1269 last_message_hashes,
1270 &signing_key,
1271 );
1272
1273 let bytes = record.to_bytes();
1275 let deserialized = Record::from_bytes(bytes).unwrap();
1276
1277 assert_eq!(record.topic, deserialized.topic);
1278 assert_eq!(record.unix_minute, deserialized.unix_minute);
1279 assert_eq!(record.node_id, deserialized.node_id);
1280 assert_eq!(record.active_peers, deserialized.active_peers);
1281 assert_eq!(record.last_message_hashes, deserialized.last_message_hashes);
1282 assert_eq!(record.signature, deserialized.signature);
1283 }
1284
1285 #[test]
1286 fn test_record_verification() {
1287 let signing_key = SigningKey::generate(&mut OsRng);
1288 let topic = [1u8; 32];
1289 let unix_minute = 12345u64;
1290 let node_id = signing_key.verifying_key().to_bytes();
1291 let active_peers = [[3u8; 32]; 5];
1292 let last_message_hashes = [[4u8; 32]; 5];
1293
1294 let record = Record::sign(
1295 topic,
1296 unix_minute,
1297 node_id,
1298 active_peers,
1299 last_message_hashes,
1300 &signing_key,
1301 );
1302
1303 assert!(record.verify(&topic, unix_minute).is_ok());
1305
1306 let wrong_topic = [99u8; 32];
1308 assert!(record.verify(&wrong_topic, unix_minute).is_err());
1309
1310 assert!(record.verify(&topic, unix_minute + 1).is_err());
1312 }
1313
1314 #[test]
1315 fn test_encrypted_record_roundtrip() {
1316 let signing_key = SigningKey::generate(&mut OsRng);
1317 let encryption_key = SigningKey::generate(&mut OsRng);
1318 let topic = [1u8; 32];
1319 let unix_minute = 12345u64;
1320 let node_id = signing_key.verifying_key().to_bytes();
1321 let active_peers = [[3u8; 32]; 5];
1322 let last_message_hashes = [[4u8; 32]; 5];
1323
1324 let record = Record::sign(
1325 topic,
1326 unix_minute,
1327 node_id,
1328 active_peers,
1329 last_message_hashes,
1330 &signing_key,
1331 );
1332
1333 let encrypted = record.encrypt(&encryption_key);
1335 let decrypted = encrypted.decrypt(&encryption_key).unwrap();
1336
1337 assert_eq!(record.topic, decrypted.topic);
1338 assert_eq!(record.unix_minute, decrypted.unix_minute);
1339 assert_eq!(record.node_id, decrypted.node_id);
1340 assert_eq!(record.active_peers, decrypted.active_peers);
1341 assert_eq!(record.last_message_hashes, decrypted.last_message_hashes);
1342 assert_eq!(record.signature, decrypted.signature);
1343 }
1344
1345 #[test]
1346 fn test_encrypted_record_serialization() {
1347 let signing_key = SigningKey::generate(&mut OsRng);
1348 let encryption_key = SigningKey::generate(&mut OsRng);
1349 let topic = [1u8; 32];
1350 let unix_minute = 12345u64;
1351 let node_id = signing_key.verifying_key().to_bytes();
1352 let active_peers = [[3u8; 32]; 5];
1353 let last_message_hashes = [[4u8; 32]; 5];
1354
1355 let record = Record::sign(
1356 topic,
1357 unix_minute,
1358 node_id,
1359 active_peers,
1360 last_message_hashes,
1361 &signing_key,
1362 );
1363
1364 let encrypted = record.encrypt(&encryption_key);
1365
1366 let bytes = encrypted.to_bytes();
1368 let deserialized = EncryptedRecord::from_bytes(bytes).unwrap();
1369
1370 let decrypted = deserialized.decrypt(&encryption_key).unwrap();
1372 assert_eq!(record.topic, decrypted.topic);
1373 assert_eq!(record.unix_minute, decrypted.unix_minute);
1374 }
1375
1376 #[test]
1377 fn test_default_secret_rotation() {
1378 let rotation = DefaultSecretRotation;
1379 let topic_hash = [1u8; 32];
1380 let unix_minute = 12345u64;
1381 let initial_secret_hash = [2u8; 32];
1382
1383 let secret1 = rotation.get_unix_minute_secret(topic_hash, unix_minute, initial_secret_hash);
1384 let secret2 = rotation.get_unix_minute_secret(topic_hash, unix_minute, initial_secret_hash);
1385
1386 assert_eq!(secret1, secret2);
1388
1389 let secret3 =
1391 rotation.get_unix_minute_secret(topic_hash, unix_minute + 1, initial_secret_hash);
1392 assert_ne!(secret1, secret3);
1393
1394 let different_topic = [99u8; 32];
1396 let secret4 =
1397 rotation.get_unix_minute_secret(different_topic, unix_minute, initial_secret_hash);
1398 assert_ne!(secret1, secret4);
1399 }
1400
1401 #[test]
1402 fn test_unix_minute_function() {
1403 let current = unix_minute(0);
1404 let prev = unix_minute(-1);
1405 let next = unix_minute(1);
1406
1407 assert_eq!(current, prev + 1);
1408 assert_eq!(next, current + 1);
1409
1410 let current2 = unix_minute(0);
1412 assert_eq!(current, current2);
1413 }
1414
1415 #[test]
1416 fn test_topic_signing_keypair_deterministic() {
1417 let topic_id = TopicId::new("test-topic".to_string());
1418 let unix_minute = 12345u64;
1419
1420 let key1 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute);
1421 let key2 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute);
1422
1423 assert_eq!(key1.to_bytes(), key2.to_bytes());
1425
1426 let key3 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute + 1);
1428 assert_ne!(key1.to_bytes(), key3.to_bytes());
1429 }
1430
1431 #[test]
1432 fn test_topic_encryption_keypair_deterministic() {
1433 let topic_id = TopicId::new("test-topic".to_string());
1434 let unix_minute = 12345u64;
1435 let initial_secret_hash = [1u8; 32];
1436 let rotation = DefaultSecretRotation;
1437
1438 let key1 = Topic::<DefaultSecretRotation>::encryption_keypair(
1439 &topic_id,
1440 &rotation,
1441 initial_secret_hash,
1442 unix_minute,
1443 );
1444 let key2 = Topic::<DefaultSecretRotation>::encryption_keypair(
1445 &topic_id,
1446 &rotation,
1447 initial_secret_hash,
1448 unix_minute,
1449 );
1450
1451 assert_eq!(key1.to_bytes(), key2.to_bytes());
1453
1454 let key3 = Topic::<DefaultSecretRotation>::encryption_keypair(
1456 &topic_id,
1457 &rotation,
1458 initial_secret_hash,
1459 unix_minute + 1,
1460 );
1461 assert_ne!(key1.to_bytes(), key3.to_bytes());
1462 }
1463
1464 #[test]
1465 fn test_topic_salt_deterministic() {
1466 let topic_id = TopicId::new("test-topic".to_string());
1467 let unix_minute = 12345u64;
1468
1469 let salt1 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute);
1470 let salt2 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute);
1471
1472 assert_eq!(salt1, salt2);
1474
1475 let salt3 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute + 1);
1477 assert_ne!(salt1, salt3);
1478 }
1479}