1use std::{collections::HashSet, sync::Arc, time::Duration};
2
3use anyhow::{Result, bail};
4use arc_swap::ArcSwap;
5use ed25519_dalek::ed25519::signature::SignerMut;
6use futures::StreamExt as _;
7use iroh::Endpoint;
8use iroh_gossip::{api::Event, proto::DeliveryScope};
9use mainline::{MutableItem, async_dht::AsyncDht};
10use once_cell::sync::Lazy;
11use rand::seq::SliceRandom;
12use sha2::Digest;
13
14use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
15use tokio::time::{sleep, timeout};
16
17pub const MAX_JOIN_PEERS_COUNT: usize = 30;
18pub const MAX_BOOTSTRAP_RECORDS: usize = 10;
19pub const SECRET_ROTATION: DefaultSecretRotation = DefaultSecretRotation;
20
21static DHT: Lazy<ArcSwap<mainline::async_dht::AsyncDht>> = Lazy::new(|| {
22 ArcSwap::from_pointee(
23 mainline::Dht::builder()
24 .build()
25 .expect("failed to create dht")
26 .as_async(),
27 )
28});
29
30fn get_dht() -> Arc<AsyncDht> {
31 DHT.load_full()
32}
33
34async fn reset_dht() {
35 let n_dht = mainline::Dht::builder()
36 .build()
37 .expect("failed to create dht");
38 DHT.store(Arc::new(n_dht.as_async()));
39}
40
41#[derive(Debug, Clone)]
42pub struct EncryptedRecord {
43 encrypted_record: Vec<u8>,
44 encrypted_decryption_key: Vec<u8>,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Hash)]
48pub struct Record {
49 topic: [u8; 32],
50 unix_minute: u64,
51 node_id: [u8; 32],
52 active_peers: [[u8; 32]; 5],
53 last_message_hashes: [[u8; 32]; 5],
54 signature: [u8; 64],
55}
56
57pub struct Gossip<R: SecretRotation + Default + Clone + Send + 'static> {
58 pub gossip: iroh_gossip::net::Gossip,
59 endpoint: iroh::Endpoint,
60 secret_rotation_function: R,
61}
62
63#[derive(Debug)]
64pub struct Topic<R: SecretRotation + Default + Clone + Send + 'static> {
65 topic_id: TopicId,
66 gossip_sender: GossipSender,
67 gossip_receiver: GossipReceiver,
68 _gossip: iroh_gossip::net::Gossip,
69 initial_secret_hash: [u8; 32],
70 secret_rotation_function: R,
71 node_id: iroh::NodeId,
72}
73
74#[derive(Debug, Clone)]
75pub struct TopicId {
76 _raw: String,
77 hash: [u8; 32], }
79
80#[derive(Debug)]
81pub struct GossipReceiver {
82 gossip_event_forwarder: tokio::sync::broadcast::Sender<iroh_gossip::api::Event>,
83 action_req: tokio::sync::mpsc::Sender<InnerActionRecv>,
84 last_message_hashes: Vec<[u8; 32]>,
85 _keep_alive_rx: tokio::sync::broadcast::Receiver<iroh_gossip::api::Event>,
86 _gossip: iroh_gossip::net::Gossip,
87}
88
89impl Clone for GossipReceiver {
90 fn clone(&self) -> Self {
91 Self {
92 gossip_event_forwarder: self.gossip_event_forwarder.clone(),
93 action_req: self.action_req.clone(),
94 last_message_hashes: self.last_message_hashes.clone(),
95 _keep_alive_rx: self.gossip_event_forwarder.subscribe(),
96 _gossip: self._gossip.clone(),
97 }
98 }
99}
100
101#[derive(Debug)]
102pub struct GossipSender {
103 action_req: tokio::sync::mpsc::Sender<InnerActionSend>,
104 _gossip: iroh_gossip::net::Gossip,
105}
106
107impl Clone for GossipSender {
108 fn clone(&self) -> Self {
109 Self {
110 action_req: self.action_req.clone(),
111 _gossip: self._gossip.clone(),
112 }
113 }
114}
115
116
117#[derive(Debug)]
118enum InnerActionRecv {
119 ReqNeighbors(tokio::sync::oneshot::Sender<HashSet<iroh::NodeId>>),
120 ReqIsJoined(tokio::sync::oneshot::Sender<bool>),
121}
122
123#[derive(Debug)]
124enum InnerActionSend {
125 ReqSend(Vec<u8>, tokio::sync::oneshot::Sender<bool>),
126 ReqJoinPeers(Vec<iroh::NodeId>, tokio::sync::oneshot::Sender<bool>),
127}
128
129impl EncryptedRecord {
130 pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
131 let one_time_key_bytes: [u8; 32] = decryption_key
132 .decrypt(&self.encrypted_decryption_key)?
133 .as_slice()
134 .try_into()?;
135 let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
136
137 let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
138 let record = Record::from_bytes(decrypted_record)?;
139 Ok(record)
140 }
141
142 pub fn to_bytes(&self) -> Vec<u8> {
143 let mut buf = Vec::new();
144 let encrypted_record_len = self.encrypted_record.len() as u32;
145 buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
146 buf.extend_from_slice(&self.encrypted_record);
147 buf.extend_from_slice(&self.encrypted_decryption_key);
148 buf
149 }
150
151 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
152 let (encrypted_record_len, buf) = buf.split_at(4);
153 let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
154 let (encrypted_record, encrypted_decryption_key) =
155 buf.split_at(encrypted_record_len as usize);
156
157 Ok(Self {
158 encrypted_record: encrypted_record.to_vec(),
159 encrypted_decryption_key: encrypted_decryption_key.to_vec(),
160 })
161 }
162}
163
164impl Record {
165 pub fn sign(
166 topic: [u8; 32],
167 unix_minute: u64,
168 node_id: [u8; 32],
169 active_peers: [[u8; 32]; 5],
170 last_message_hashes: [[u8; 32]; 5],
171 signing_key: &ed25519_dalek::SigningKey,
172 ) -> Self {
173 let mut signature_data = Vec::new();
174 signature_data.extend_from_slice(&topic);
175 signature_data.extend_from_slice(&unix_minute.to_le_bytes());
176 signature_data.extend_from_slice(&node_id);
177 for active_peer in active_peers {
178 signature_data.extend_from_slice(&active_peer);
179 }
180 for last_message_hash in last_message_hashes {
181 signature_data.extend_from_slice(&last_message_hash);
182 }
183 let mut signing_key = signing_key.clone();
184 let signature = signing_key.sign(&signature_data);
185 Self {
186 topic,
187 unix_minute,
188 node_id,
189 active_peers,
190 last_message_hashes,
191 signature: signature.to_bytes(),
192 }
193 }
194
195 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
196 let (topic, buf) = buf.split_at(32);
197 let (unix_minute, buf) = buf.split_at(8);
198 let (node_id, mut buf) = buf.split_at(32);
199
200 let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
201 #[allow(clippy::needless_range_loop)]
202 for i in 0..active_peers.len() {
203 let (active_peer, _buf) = buf.split_at(32);
204 active_peers[i] = active_peer.try_into()?;
205 buf = _buf;
206 }
207 let mut last_message_hashes: [[u8; 32]; 5] = [[0; 32]; 5];
208 #[allow(clippy::needless_range_loop)]
209 for i in 0..last_message_hashes.len() {
210 let (last_message_hash, _buf) = buf.split_at(32);
211 last_message_hashes[i] = last_message_hash.try_into()?;
212 buf = _buf;
213 }
214
215 let (signature, buf) = buf.split_at(64);
216
217 if !buf.is_empty() {
218 bail!("buffer not empty after reconstruction")
219 }
220
221 Ok(Self {
222 topic: topic.try_into()?,
223 unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
224 node_id: node_id.try_into()?,
225 active_peers,
226 last_message_hashes,
227 signature: signature.try_into()?,
228 })
229 }
230
231 pub fn to_bytes(&self) -> Vec<u8> {
232 let mut buf = Vec::new();
233 buf.extend_from_slice(&self.topic);
234 buf.extend_from_slice(&self.unix_minute.to_le_bytes());
235 buf.extend_from_slice(&self.node_id);
236 for active_peer in self.active_peers {
237 buf.extend_from_slice(&active_peer);
238 }
239 for last_message_hash in self.last_message_hashes {
240 buf.extend_from_slice(&last_message_hash);
241 }
242 buf.extend_from_slice(&self.signature);
243 buf
244 }
245
246 pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
247 if self.topic != *actual_topic {
248 bail!("topic mismatch")
249 }
250 if self.unix_minute != actual_unix_minute {
251 bail!("unix minute mismatch")
252 }
253
254 let record_bytes = self.to_bytes();
255 let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
256 let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
257 let node_id = ed25519_dalek::VerifyingKey::from_bytes(&self.node_id)?;
258
259 node_id.verify_strict(signature_data.as_slice(), &signature)?;
260
261 Ok(())
262 }
263
264 pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
265 let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::thread_rng());
266 let p_key = one_time_key.verifying_key();
267 let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
268 let key_enc = encryption_key
269 .verifying_key()
270 .encrypt(&one_time_key.to_bytes())
271 .expect("encryption failed");
272
273 EncryptedRecord {
274 encrypted_record: data_enc,
275 encrypted_decryption_key: key_enc,
276 }
277 }
278}
279
280impl GossipSender {
281 pub fn new(gossip_sender: iroh_gossip::api::GossipSender, gossip: iroh_gossip::net::Gossip) -> Self {
282 let (action_req_tx, mut action_req_rx) =
283 tokio::sync::mpsc::channel::<InnerActionSend>(1024);
284
285 tokio::spawn({
286 let gossip_sender = gossip_sender;
287 async move {
288 while let Some(inner_action) = action_req_rx.recv().await {
289 match inner_action {
290 InnerActionSend::ReqSend(data, tx) => {
291 let res = gossip_sender.broadcast(data.into()).await;
292 tx.send(res.is_ok()).expect("broadcast failed");
293 }
294 InnerActionSend::ReqJoinPeers(peers, tx) => {
295 let res = gossip_sender.join_peers(peers).await;
296 tx.send(res.is_ok()).expect("broadcast failed");
297 }
298 }
299 }
300 }
301 });
302
303 Self {
304 action_req: action_req_tx,
305 _gossip: gossip,
306 }
307 }
308
309 pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
310 let (tx, rx) = tokio::sync::oneshot::channel::<bool>();
311 let _ = self
312 .action_req
313 .send(InnerActionSend::ReqSend(data, tx))
314 .await;
315
316 match rx.await {
317 Ok(true) => Ok(()),
318 Ok(false) => bail!("broadcast failed"),
319 Err(_) => panic!("broadcast failed"),
320 }
321 }
322
323 pub async fn join_peers(
324 &self,
325 peers: Vec<iroh::NodeId>,
326 max_peers: Option<usize>,
327 ) -> Result<()> {
328 let mut peers = peers;
329 if let Some(max_peers) = max_peers {
330 peers.shuffle(&mut rand::thread_rng());
331 peers.truncate(max_peers);
332 }
333
334 let (tx, rx) = tokio::sync::oneshot::channel::<bool>();
335 let _ = self
336 .action_req
337 .send(InnerActionSend::ReqJoinPeers(peers, tx))
338 .await;
339
340 match rx.await {
341 Ok(true) => Ok(()),
342 Ok(false) => bail!("join peers failed"),
343 Err(_) => panic!("broadcast failed"),
344 }
345 }
346}
347
348impl GossipReceiver {
349 pub fn new(gossip_receiver: iroh_gossip::api::GossipReceiver, gossip: iroh_gossip::net::Gossip) -> Self {
350 let (gossip_forward_tx, _) =
351 tokio::sync::broadcast::channel::<iroh_gossip::api::Event>(1024);
352 let (action_req_tx, mut action_req_rx) =
353 tokio::sync::mpsc::channel::<InnerActionRecv>(1024);
354
355 let keep_alive_rx = gossip_forward_tx.subscribe();
356
357 let self_ref = Self {
358 gossip_event_forwarder: gossip_forward_tx.clone(),
359 action_req: action_req_tx.clone(),
360 last_message_hashes: vec![],
361 _keep_alive_rx: keep_alive_rx,
362 _gossip: gossip,
363 };
364
365 tokio::spawn({
366 let mut self_ref = self_ref.clone();
367 async move {
368 let mut gossip_receiver = gossip_receiver;
369 loop {
370 tokio::select! {
371 Some(inner_action) = action_req_rx.recv() => {
372 match inner_action {
373 InnerActionRecv::ReqNeighbors(tx) => {
374 let neighbors = gossip_receiver.neighbors().collect::<HashSet<iroh::NodeId>>();
375 let _ = tx.send(neighbors);
376 },
377 InnerActionRecv::ReqIsJoined(tx) => {
378 let is_joined = gossip_receiver.is_joined();
379 let _ = tx.send(is_joined);
380 }
381 }
382 }
383 gossip_event_res = gossip_receiver.next() => {
384 if let Some(Ok(gossip_event)) = gossip_event_res {
385 if let Event::Received(msg) = gossip_event.clone() {
386 if let DeliveryScope::Swarm(_) = msg.scope {
387 let hash = sha2::Sha512::digest(&msg.content);
388 self_ref.last_message_hashes.push(hash[..32].try_into().expect("hashing failed"));
389 while self_ref.last_message_hashes.len() > 5 {
390 self_ref.last_message_hashes.pop();
391 }
392 }
393 }
394 let _ = self_ref.gossip_event_forwarder.send(gossip_event.clone());
395 } else {
396 break;
397 }
398 }
399 }
400 }
401 }
402 });
403
404 self_ref
405 }
406
407 pub async fn neighbors(&mut self) -> Result<HashSet<iroh::NodeId>> {
408 let (neighbors_tx, neighbors_rx) = tokio::sync::oneshot::channel::<HashSet<iroh::NodeId>>();
409
410 let _ = self
411 .action_req
412 .send(InnerActionRecv::ReqNeighbors(neighbors_tx))
413 .await;
414
415 neighbors_rx.await.map_err(|err| anyhow::anyhow!(err))
416 }
417
418 pub async fn is_joined(&mut self) -> Result<bool> {
419 let (is_joined_tx, is_joined_rx) = tokio::sync::oneshot::channel::<bool>();
420
421 let _ = self
422 .action_req
423 .send(InnerActionRecv::ReqIsJoined(is_joined_tx))
424 .await;
425
426 is_joined_rx.await.map_err(|err| anyhow::anyhow!(err))
427 }
428
429 pub async fn subscribe(&mut self) -> Result<tokio::sync::broadcast::Receiver<Event>> {
430 Ok(self.gossip_event_forwarder.subscribe())
431 }
432
433 pub fn last_message_hashes(&self) -> Vec<[u8; 32]> {
434 self.last_message_hashes.clone()
435 }
436}
437
438impl TopicId {
439 pub fn new(raw: String) -> Self {
440 let mut raw_hash = sha2::Sha512::new();
441 raw_hash.update(raw.as_bytes());
442
443 Self {
444 _raw: raw,
445 hash: raw_hash.finalize()[..32]
446 .try_into()
447 .expect("hashing 'raw' failed"),
448 }
449 }
450}
451
452impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
454 pub async fn new(
455 topic_id: TopicId,
456 endpoint: &iroh::Endpoint,
457 node_signing_key: &ed25519_dalek::SigningKey,
458 gossip: iroh_gossip::net::Gossip,
459 initial_secret: &Vec<u8>,
460 secret_rotation_function: Option<R>,
461 async_bootstrap: bool,
462 ) -> Result<Self> {
463 let mut initial_secret_hash = sha2::Sha512::new();
465 initial_secret_hash.update(initial_secret);
466 let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
467 .try_into()
468 .expect("hashing failed");
469
470 let (gossip_tx, gossip_rx) = if async_bootstrap {
472 Self::bootstrap_no_wait(
473 topic_id.clone(),
474 endpoint,
475 node_signing_key,
476 gossip.clone(),
477 initial_secret_hash,
478 secret_rotation_function.clone(),
479 )
480 .await?
481 } else {
482 Self::bootstrap(
483 topic_id.clone(),
484 endpoint,
485 node_signing_key,
486 gossip.clone(),
487 initial_secret_hash,
488 secret_rotation_function.clone(),
489 )
490 .await?
491 };
492
493 let _join_handler = Self::spawn_publisher(
495 topic_id.clone(),
496 secret_rotation_function.clone(),
497 initial_secret_hash,
498 endpoint.node_id(),
499 gossip_rx.clone(),
500 gossip_tx.clone(),
501 node_signing_key.clone(),
502 );
503
504 Ok(Self {
505 topic_id,
506 gossip_sender: gossip_tx,
507 gossip_receiver: gossip_rx,
508 _gossip: gossip,
509 initial_secret_hash,
510 secret_rotation_function: secret_rotation_function.unwrap_or_default(),
511 node_id: endpoint.node_id(),
512 })
513 }
514
515 pub fn split(&self) -> (GossipSender, GossipReceiver) {
516 (self.gossip_sender.clone(), self.gossip_receiver.clone())
517 }
518
519 pub fn topic_id(&self) -> &TopicId {
520 &self.topic_id
521 }
522
523 pub fn node_id(&self) -> &iroh::NodeId {
524 &self.node_id
525 }
526
527 pub fn gossip_sender(&self) -> GossipSender {
528 self.gossip_sender.clone()
529 }
530
531 pub fn gossip_receiver(&self) -> GossipReceiver {
532 self.gossip_receiver.clone()
533 }
534
535 pub fn secret_rotation_function(&self) -> R {
536 self.secret_rotation_function.clone()
537 }
538
539 pub fn initial_secret_hash(&self) -> [u8; 32] {
540 self.initial_secret_hash
541 }
542
543 pub fn set_initial_secret_hash(&mut self, initial_secret_hash: [u8; 32]) {
544 self.initial_secret_hash = initial_secret_hash;
545 }
546}
547
548impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
550 pub async fn bootstrap_no_wait(
551 topic_id: TopicId,
552 endpoint: &iroh::Endpoint,
553 node_signing_key: &ed25519_dalek::SigningKey,
554 gossip: iroh_gossip::net::Gossip,
555 initial_secret_hash: [u8; 32],
556 secret_rotation_function: Option<R>,
557 ) -> Result<(GossipSender, GossipReceiver)> {
558 let gossip_topic: iroh_gossip::api::GossipTopic = gossip
559 .subscribe(iroh_gossip::proto::TopicId::from(topic_id.hash), vec![])
560 .await?;
561 let (gossip_sender, gossip_receiver) = gossip_topic.split();
562 let (gossip_sender, gossip_receiver) = (
563 GossipSender::new(gossip_sender,gossip.clone()),
564 GossipReceiver::new(gossip_receiver,gossip.clone()),
565 );
566
567 tokio::spawn({
568 let gossip_sender = gossip_sender.clone();
569 let gossip_receiver = gossip_receiver.clone();
570 let endpoint = endpoint.clone();
571 let node_signing_key = node_signing_key.clone();
572 async move {
573 Self::bootstrap_from_gossip(
574 gossip_sender,
575 gossip_receiver,
576 topic_id,
577 &endpoint,
578 &node_signing_key,
579 initial_secret_hash,
580 secret_rotation_function,
581 )
582 .await
583 }
584 });
585
586 Ok((gossip_sender, gossip_receiver))
587 }
588
589 pub async fn bootstrap(
590 topic_id: TopicId,
591 endpoint: &iroh::Endpoint,
592 node_signing_key: &ed25519_dalek::SigningKey,
593 gossip: iroh_gossip::net::Gossip,
594 initial_secret_hash: [u8; 32],
595 secret_rotation_function: Option<R>,
596 ) -> Result<(GossipSender, GossipReceiver)> {
597 let gossip_topic: iroh_gossip::api::GossipTopic = gossip
598 .subscribe(iroh_gossip::proto::TopicId::from(topic_id.hash), vec![])
599 .await?;
600 let (gossip_sender, gossip_receiver) = gossip_topic.split();
601 let (gossip_sender, gossip_receiver) = (
602 GossipSender::new(gossip_sender,gossip.clone()),
603 GossipReceiver::new(gossip_receiver,gossip.clone()),
604 );
605 Self::bootstrap_from_gossip(
606 gossip_sender,
607 gossip_receiver,
608 topic_id,
609 endpoint,
610 node_signing_key,
611 initial_secret_hash,
612 secret_rotation_function,
613 )
614 .await
615 }
616
617 async fn bootstrap_from_gossip(
618 gossip_sender: GossipSender,
619 mut gossip_receiver: GossipReceiver,
620 topic_id: TopicId,
621 endpoint: &iroh::Endpoint,
622 node_signing_key: &ed25519_dalek::SigningKey,
623 initial_secret_hash: [u8; 32],
624 secret_rotation_function: Option<R>,
625 ) -> Result<(GossipSender, GossipReceiver)> {
626 let mut last_published_unix_minute = 0;
627 loop {
628 if let Ok(joined) = gossip_receiver.is_joined().await {
630 if joined {
631 return Ok((gossip_sender, gossip_receiver));
632 }
633 }
634
635 let unix_minute = crate::unix_minute(if last_published_unix_minute == 0 {
637 -1
638 } else {
639 0
640 });
641
642 let records = Topic::get_unix_minute_records(
644 &topic_id.clone(),
645 unix_minute,
646 secret_rotation_function.clone(),
647 initial_secret_hash,
648 &endpoint.node_id(),
649 )
650 .await;
651
652 if records.is_empty() {
655 if unix_minute != last_published_unix_minute
656 && Self::publish_proc(
657 unix_minute,
658 &topic_id,
659 secret_rotation_function.clone(),
660 initial_secret_hash,
661 endpoint.node_id(),
662 node_signing_key,
663 HashSet::new(),
664 vec![],
665 )
666 .await
667 .is_ok()
668 {
669 last_published_unix_minute = unix_minute;
670 }
671 sleep(Duration::from_millis(100)).await;
672 continue;
673 }
674
675 let bootstrap_nodes = records
679 .iter()
680 .flat_map(|record| {
681 let mut v = vec![record.node_id];
682 for peer in record.active_peers {
683 if peer != [0; 32] {
684 v.push(peer);
685 }
686 }
687 v
688 })
689 .filter_map(|node_id| iroh::NodeId::from_bytes(&node_id).ok())
690 .collect::<HashSet<_>>();
691
692 if let Ok(joined) = gossip_receiver.is_joined().await {
696 if joined {
697 return Ok((gossip_sender, gossip_receiver));
698 }
699 }
700
701 for node_id in bootstrap_nodes.iter() {
704 match gossip_sender.join_peers(vec![*node_id], None).await {
705 Ok(_) => {
706 sleep(Duration::from_millis(100)).await;
707 if let Ok(joined) = gossip_receiver.is_joined().await {
708 if joined {
709 break;
710 }
711 }
712 }
713 Err(_) => {
714 continue;
715 }
716 }
717 }
718
719 if let Ok(joined) = gossip_receiver.is_joined().await {
722 if !joined {
723 sleep(Duration::from_millis(500)).await;
724 }
725 }
726
727 if let Ok(joined) = gossip_receiver.is_joined().await {
729 if joined {
730 return Ok((gossip_sender, gossip_receiver));
731 }
732 } else {
733 if unix_minute != last_published_unix_minute
735 && Self::publish_proc(
736 unix_minute,
737 &topic_id,
738 secret_rotation_function.clone(),
739 initial_secret_hash,
740 endpoint.node_id(),
741 node_signing_key,
742 HashSet::new(),
743 vec![],
744 )
745 .await
746 .is_ok()
747 {
748 last_published_unix_minute = unix_minute;
749 }
750 sleep(Duration::from_millis(100)).await;
751 continue;
752 }
753 }
754 }
755
756 #[allow(clippy::too_many_arguments)]
759 async fn publish_proc(
760 unix_minute: u64,
761 topic_id: &TopicId,
762 secret_rotation_function: Option<R>,
763 initial_secret_hash: [u8; 32],
764 node_id: iroh::NodeId,
765 node_signing_key: &ed25519_dalek::SigningKey,
766 neighbors: HashSet<iroh::NodeId>,
767 last_message_hashes: Vec<[u8; 32]>,
768 ) -> Result<HashSet<Record>> {
769 let records = Topic::<R>::get_unix_minute_records(
771 &topic_id.clone(),
772 unix_minute,
773 secret_rotation_function.clone(),
774 initial_secret_hash,
775 &node_id,
776 )
777 .await
778 .iter()
779 .filter(|&record| {
780 record
781 .active_peers
782 .iter()
783 .filter(|&peer| peer.eq(&[0u8; 32]))
784 .count()
785 > 0
786 || record
787 .last_message_hashes
788 .iter()
789 .filter(|&hash| hash.eq(&[0u8; 32]))
790 .count()
791 > 0
792 })
793 .cloned()
794 .collect::<HashSet<_>>();
795
796 if records.len() >= MAX_BOOTSTRAP_RECORDS {
799 return Ok(records);
800 }
801
802 let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
804 for (i, peer) in neighbors.iter().take(5).enumerate() {
805 active_peers[i] = *peer.as_bytes()
806 }
807
808 let mut last_message_hashes_array = [[0u8; 32]; 5];
809 for (i, hash) in last_message_hashes.iter().take(5).enumerate() {
810 last_message_hashes_array[i] = *hash;
811 }
812
813 let record = Record::sign(
814 topic_id.hash,
815 unix_minute,
816 *node_id.as_bytes(),
817 active_peers,
818 last_message_hashes_array,
819 node_signing_key,
820 );
821 Topic::<R>::publish_unix_minute_record(
822 unix_minute,
823 &topic_id.clone(),
824 secret_rotation_function.clone(),
825 initial_secret_hash,
826 record,
827 Some(3),
828 )
829 .await?;
830
831 Ok(records)
832 }
833
834 fn spawn_publisher(
836 topic_id: TopicId,
837 secret_rotation_function: Option<R>,
838 initial_secret_hash: [u8; 32],
839 node_id: iroh::NodeId,
840 gossip_receiver: GossipReceiver,
841 gossip_sender: GossipSender,
842 node_signing_key: ed25519_dalek::SigningKey,
843 ) -> tokio::task::JoinHandle<()> {
844 let mut gossip_receiver = gossip_receiver;
845
846 tokio::spawn(async move {
847 let mut backoff = 1;
848 loop {
849 let unix_minute = crate::unix_minute(0);
850
851 if let Ok(records) = Topic::<R>::publish_proc(
853 unix_minute,
854 &topic_id.clone(),
855 Some(secret_rotation_function.clone().unwrap_or_default()),
856 initial_secret_hash,
857 node_id,
858 &node_signing_key,
859 gossip_receiver.neighbors().await.unwrap_or_default(),
860 gossip_receiver.last_message_hashes(),
861 )
862 .await
863 {
864 let neighbors = gossip_receiver.neighbors().await.unwrap_or_default();
866 if neighbors.len() < 4 && !records.is_empty() {
867 let node_ids = records
868 .iter()
869 .flat_map(|record| {
870 record
871 .active_peers
872 .iter()
873 .filter_map(|&active_peer| {
874 if active_peer == [0; 32]
875 || neighbors.contains(&active_peer)
876 || active_peer.eq(record.node_id.to_vec().as_slice())
877 || active_peer.eq(node_id.as_bytes())
878 {
879 None
880 } else {
881 iroh::NodeId::from_bytes(&active_peer).ok()
882 }
883 })
884 .collect::<Vec<_>>()
885 })
886 .collect::<HashSet<_>>();
887 if gossip_sender
888 .join_peers(
889 node_ids.iter().cloned().collect::<Vec<_>>(),
890 Some(MAX_JOIN_PEERS_COUNT),
891 )
892 .await
893 .is_ok()
894 {
895 }
897 }
898
899 if !gossip_receiver.last_message_hashes().is_empty() {
901 let peers_to_join = records
902 .iter()
903 .filter(|record| {
904 !record.last_message_hashes.iter().all(|last_message_hash| {
905 *last_message_hash != [0; 32]
906 && gossip_receiver
907 .last_message_hashes()
908 .contains(last_message_hash)
909 })
910 })
911 .collect::<Vec<_>>();
912 if !peers_to_join.is_empty() {
913 let node_ids = peers_to_join
914 .iter()
915 .flat_map(|&record| {
916 let mut peers = vec![];
917 if let Ok(node_id) = iroh::NodeId::from_bytes(&record.node_id) {
918 peers.push(node_id);
919 }
920 for active_peer in record.active_peers {
921 if active_peer == [0; 32] {
922 continue;
923 }
924 if let Ok(node_id) = iroh::NodeId::from_bytes(&active_peer)
925 {
926 peers.push(node_id);
927 }
928 }
929 peers
930 })
931 .collect::<HashSet<_>>();
932
933 if gossip_sender
934 .join_peers(
935 node_ids.iter().cloned().collect::<Vec<_>>(),
936 Some(MAX_JOIN_PEERS_COUNT),
937 )
938 .await
939 .is_ok()
940 {
941 }
946 }
947 }
948 } else {
949 sleep(Duration::from_secs(backoff)).await;
950 backoff = (backoff * 2).max(60);
951 continue;
952 }
953
954 backoff = 1;
955 sleep(Duration::from_secs(rand::random::<u64>() % 60)).await;
956 }
957 })
958 }
959}
960
961impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
963 fn signing_keypair(topic_id: &TopicId, unix_minute: u64) -> ed25519_dalek::SigningKey {
964 let mut sign_keypair_hash = sha2::Sha512::new();
965 sign_keypair_hash.update(topic_id.hash);
966 sign_keypair_hash.update(unix_minute.to_le_bytes());
967 let sign_keypair_seed: [u8; 32] = sign_keypair_hash.finalize()[..32]
968 .try_into()
969 .expect("hashing failed");
970 ed25519_dalek::SigningKey::from_bytes(&sign_keypair_seed)
971 }
972
973 fn encryption_keypair(
974 topic_id: &TopicId,
975 secret_rotation_function: &R,
976 initial_secret_hash: [u8; 32],
977 unix_minute: u64,
978 ) -> ed25519_dalek::SigningKey {
979 let enc_keypair_seed = secret_rotation_function.get_unix_minute_secret(
980 topic_id.hash,
981 unix_minute,
982 initial_secret_hash,
983 );
984 ed25519_dalek::SigningKey::from_bytes(&enc_keypair_seed)
985 }
986
987 fn salt(topic_id: &TopicId, unix_minute: u64) -> [u8; 32] {
989 let mut slot_hash = sha2::Sha512::new();
990 slot_hash.update(topic_id.hash);
991 slot_hash.update(unix_minute.to_le_bytes());
992 slot_hash.finalize()[..32]
993 .try_into()
994 .expect("hashing failed")
995 }
996
997 async fn get_unix_minute_records(
998 topic_id: &TopicId,
999 unix_minute: u64,
1000 secret_rotation_function: Option<R>,
1001 initial_secret_hash: [u8; 32],
1002 node_id: &iroh::NodeId,
1003 ) -> HashSet<Record> {
1004 let topic_sign = Topic::<R>::signing_keypair(topic_id, unix_minute);
1005 let encryption_key = Topic::<R>::encryption_keypair(
1006 topic_id,
1007 &secret_rotation_function.clone().unwrap_or_default(),
1008 initial_secret_hash,
1009 unix_minute,
1010 );
1011 let salt = Topic::<R>::salt(topic_id, unix_minute);
1012
1013 let dht = get_dht();
1015
1016 let records_iter = timeout(
1017 Duration::from_secs(10),
1018 dht.get_mutable(topic_sign.verifying_key().as_bytes(), Some(&salt), None)
1019 .collect::<Vec<_>>(),
1020 )
1021 .await
1022 .unwrap_or_default();
1023
1024 records_iter
1025 .iter()
1026 .filter_map(
1027 |record| match EncryptedRecord::from_bytes(record.value().to_vec()) {
1028 Ok(encrypted_record) => match encrypted_record.decrypt(&encryption_key) {
1029 Ok(record) => match record.verify(&topic_id.hash, unix_minute) {
1030 Ok(_) => match record.node_id.eq(node_id.as_bytes()) {
1031 true => None,
1032 false => Some(record),
1033 },
1034 Err(_) => None,
1035 },
1036 Err(_) => None,
1037 },
1038 Err(_) => None,
1039 },
1040 )
1041 .collect::<HashSet<_>>()
1042 }
1043
1044 async fn publish_unix_minute_record(
1045 unix_minute: u64,
1046 topic_id: &TopicId,
1047 secret_rotation_function: Option<R>,
1048 initial_secret_hash: [u8; 32],
1049 record: Record,
1050 retry_count: Option<usize>,
1051 ) -> Result<()> {
1052 let sign_key = Topic::<R>::signing_keypair(&topic_id.clone(), unix_minute);
1053 let salt = Topic::<R>::salt(topic_id, unix_minute);
1054 let encryption_key = Topic::<R>::encryption_keypair(
1055 &topic_id.clone(),
1056 &secret_rotation_function.clone().unwrap_or_default(),
1057 initial_secret_hash,
1058 unix_minute,
1059 );
1060 let encrypted_record = record.encrypt(&encryption_key);
1061
1062 for i in 0..retry_count.unwrap_or(3) {
1063 let dht = get_dht();
1064
1065 let most_recent_result = timeout(
1066 Duration::from_secs(10),
1067 dht.get_mutable_most_recent(
1068 sign_key.clone().verifying_key().as_bytes(),
1069 Some(&salt),
1070 ),
1071 )
1072 .await
1073 .unwrap_or_default();
1074
1075 let item = if let Some(mut_item) = most_recent_result {
1076 MutableItem::new(
1077 sign_key.clone(),
1078 &encrypted_record.to_bytes(),
1079 mut_item.seq() + 1,
1080 Some(&salt),
1081 )
1082 } else {
1083 MutableItem::new(
1084 sign_key.clone(),
1085 &encrypted_record.to_bytes(),
1086 0,
1087 Some(&salt),
1088 )
1089 };
1090
1091 let put_result = match timeout(
1092 Duration::from_secs(10),
1093 dht.put_mutable(item.clone(), Some(item.seq())),
1094 )
1095 .await
1096 {
1097 Ok(result) => result.ok(),
1098 Err(_) => None,
1099 };
1100
1101 if put_result.is_some() {
1102 break;
1103 } else if i == retry_count.unwrap_or(3) - 1 {
1104 bail!("failed to publish record")
1105 }
1106
1107 reset_dht().await;
1108
1109 sleep(Duration::from_millis(rand::random::<u64>() % 2000)).await;
1110 }
1111 Ok(())
1112 }
1113}
1114
1115pub trait AutoDiscoveryBuilder {
1116 #[allow(async_fn_in_trait)]
1117 async fn spawn_with_auto_discovery<R: SecretRotation + Default + Clone + Send + 'static>(
1118 self,
1119 endpoint: Endpoint,
1120 secret_rotation_function: Option<R>,
1121 ) -> Result<Gossip<R>>;
1122}
1123
1124impl AutoDiscoveryBuilder for iroh_gossip::net::Builder {
1125 async fn spawn_with_auto_discovery<R: SecretRotation + Default + Clone + Send + 'static>(
1126 self,
1127 endpoint: Endpoint,
1128 secret_rotation_function: Option<R>,
1129 ) -> Result<Gossip<R>> {
1130 Ok(Gossip {
1131 gossip: self.spawn(endpoint.clone()),
1132 endpoint: endpoint.clone(),
1133 secret_rotation_function: secret_rotation_function.unwrap_or_default(),
1134 })
1135 }
1136}
1137
1138pub trait AutoDiscoveryGossip<R: SecretRotation + Default + Clone + Send + 'static> {
1139 #[allow(async_fn_in_trait)]
1140 async fn subscribe_and_join_with_auto_discovery(
1141 &self,
1142 topic_id: TopicId,
1143 initial_secret: Vec<u8>,
1144 ) -> Result<Topic<R>>;
1145
1146 #[allow(async_fn_in_trait)]
1147 async fn subscribe_and_join_with_auto_discovery_no_wait(
1148 &self,
1149 topic_id: TopicId,
1150 initial_secret: Vec<u8>,
1151 ) -> Result<Topic<R>>;
1152}
1153
1154#[derive(Debug, Clone, Copy, Default)]
1156pub struct DefaultSecretRotation;
1157
1158pub trait SecretRotation {
1159 fn get_unix_minute_secret(
1160 &self,
1161 topic_hash: [u8; 32],
1162 unix_minute: u64,
1163 initial_secret_hash: [u8; 32],
1164 ) -> [u8; 32];
1165}
1166
1167impl<R: SecretRotation + Default + Clone + Send + 'static> AutoDiscoveryGossip<R> for Gossip<R> {
1168 async fn subscribe_and_join_with_auto_discovery(
1169 &self,
1170 topic_id: TopicId,
1171 initial_secret: Vec<u8>,
1172 ) -> Result<Topic<R>> {
1173 Topic::new(
1174 topic_id,
1175 &self.endpoint,
1176 self.endpoint.secret_key().secret(),
1177 self.gossip.clone(),
1178 &initial_secret,
1179 Some(self.secret_rotation_function.clone()),
1180 false,
1181 )
1182 .await
1183 }
1184
1185 async fn subscribe_and_join_with_auto_discovery_no_wait(
1186 &self,
1187 topic_id: TopicId,
1188 initial_secret: Vec<u8>,
1189 ) -> Result<Topic<R>> {
1190 Topic::new(
1191 topic_id,
1192 &self.endpoint,
1193 self.endpoint.secret_key().secret(),
1194 self.gossip.clone(),
1195 &initial_secret,
1196 Some(self.secret_rotation_function.clone()),
1197 true,
1198 )
1199 .await
1200 }
1201}
1202
1203impl SecretRotation for DefaultSecretRotation {
1204 fn get_unix_minute_secret(
1205 &self,
1206 topic_hash: [u8; 32],
1207 unix_minute: u64,
1208 initial_secret_hash: [u8; 32],
1209 ) -> [u8; 32] {
1210 let mut hash = sha2::Sha512::new();
1211 hash.update(topic_hash);
1212 hash.update(unix_minute.to_be_bytes());
1213 hash.update(initial_secret_hash);
1214 hash.finalize()[..32].try_into().expect("hashing failed")
1215 }
1216}
1217
1218pub fn unix_minute(minute_offset: i64) -> u64 {
1219 ((chrono::Utc::now().timestamp() as f64 / 60.0f64).floor() as i64 + minute_offset) as u64
1220}
1221
1222#[cfg(test)]
1223mod tests {
1224 use super::*;
1225 use ed25519_dalek::SigningKey;
1226 use rand::rngs::OsRng;
1227
1228 #[test]
1229 fn test_topic_id_creation() {
1230 let topic_id = TopicId::new("test-topic".to_string());
1231 assert_eq!(topic_id._raw, "test-topic");
1232 assert_eq!(topic_id.hash.len(), 32);
1233
1234 let topic_id2 = TopicId::new("test-topic".to_string());
1236 assert_eq!(topic_id.hash, topic_id2.hash);
1237
1238 let topic_id3 = TopicId::new("different-topic".to_string());
1240 assert_ne!(topic_id.hash, topic_id3.hash);
1241 }
1242
1243 #[test]
1244 fn test_record_serialization_roundtrip() {
1245 let signing_key = SigningKey::generate(&mut OsRng);
1246 let topic = [1u8; 32];
1247 let unix_minute = 12345u64;
1248 let node_id = [2u8; 32];
1249 let active_peers = [[3u8; 32]; 5];
1250 let last_message_hashes = [[4u8; 32]; 5];
1251
1252 let record = Record::sign(
1253 topic,
1254 unix_minute,
1255 node_id,
1256 active_peers,
1257 last_message_hashes,
1258 &signing_key,
1259 );
1260
1261 let bytes = record.to_bytes();
1263 let deserialized = Record::from_bytes(bytes).unwrap();
1264
1265 assert_eq!(record.topic, deserialized.topic);
1266 assert_eq!(record.unix_minute, deserialized.unix_minute);
1267 assert_eq!(record.node_id, deserialized.node_id);
1268 assert_eq!(record.active_peers, deserialized.active_peers);
1269 assert_eq!(record.last_message_hashes, deserialized.last_message_hashes);
1270 assert_eq!(record.signature, deserialized.signature);
1271 }
1272
1273 #[test]
1274 fn test_record_verification() {
1275 let signing_key = SigningKey::generate(&mut OsRng);
1276 let topic = [1u8; 32];
1277 let unix_minute = 12345u64;
1278 let node_id = signing_key.verifying_key().to_bytes();
1279 let active_peers = [[3u8; 32]; 5];
1280 let last_message_hashes = [[4u8; 32]; 5];
1281
1282 let record = Record::sign(
1283 topic,
1284 unix_minute,
1285 node_id,
1286 active_peers,
1287 last_message_hashes,
1288 &signing_key,
1289 );
1290
1291 assert!(record.verify(&topic, unix_minute).is_ok());
1293
1294 let wrong_topic = [99u8; 32];
1296 assert!(record.verify(&wrong_topic, unix_minute).is_err());
1297
1298 assert!(record.verify(&topic, unix_minute + 1).is_err());
1300 }
1301
1302 #[test]
1303 fn test_encrypted_record_roundtrip() {
1304 let signing_key = SigningKey::generate(&mut OsRng);
1305 let encryption_key = SigningKey::generate(&mut OsRng);
1306 let topic = [1u8; 32];
1307 let unix_minute = 12345u64;
1308 let node_id = signing_key.verifying_key().to_bytes();
1309 let active_peers = [[3u8; 32]; 5];
1310 let last_message_hashes = [[4u8; 32]; 5];
1311
1312 let record = Record::sign(
1313 topic,
1314 unix_minute,
1315 node_id,
1316 active_peers,
1317 last_message_hashes,
1318 &signing_key,
1319 );
1320
1321 let encrypted = record.encrypt(&encryption_key);
1323 let decrypted = encrypted.decrypt(&encryption_key).unwrap();
1324
1325 assert_eq!(record.topic, decrypted.topic);
1326 assert_eq!(record.unix_minute, decrypted.unix_minute);
1327 assert_eq!(record.node_id, decrypted.node_id);
1328 assert_eq!(record.active_peers, decrypted.active_peers);
1329 assert_eq!(record.last_message_hashes, decrypted.last_message_hashes);
1330 assert_eq!(record.signature, decrypted.signature);
1331 }
1332
1333 #[test]
1334 fn test_encrypted_record_serialization() {
1335 let signing_key = SigningKey::generate(&mut OsRng);
1336 let encryption_key = SigningKey::generate(&mut OsRng);
1337 let topic = [1u8; 32];
1338 let unix_minute = 12345u64;
1339 let node_id = signing_key.verifying_key().to_bytes();
1340 let active_peers = [[3u8; 32]; 5];
1341 let last_message_hashes = [[4u8; 32]; 5];
1342
1343 let record = Record::sign(
1344 topic,
1345 unix_minute,
1346 node_id,
1347 active_peers,
1348 last_message_hashes,
1349 &signing_key,
1350 );
1351
1352 let encrypted = record.encrypt(&encryption_key);
1353
1354 let bytes = encrypted.to_bytes();
1356 let deserialized = EncryptedRecord::from_bytes(bytes).unwrap();
1357
1358 let decrypted = deserialized.decrypt(&encryption_key).unwrap();
1360 assert_eq!(record.topic, decrypted.topic);
1361 assert_eq!(record.unix_minute, decrypted.unix_minute);
1362 }
1363
1364 #[test]
1365 fn test_default_secret_rotation() {
1366 let rotation = DefaultSecretRotation;
1367 let topic_hash = [1u8; 32];
1368 let unix_minute = 12345u64;
1369 let initial_secret_hash = [2u8; 32];
1370
1371 let secret1 = rotation.get_unix_minute_secret(topic_hash, unix_minute, initial_secret_hash);
1372 let secret2 = rotation.get_unix_minute_secret(topic_hash, unix_minute, initial_secret_hash);
1373
1374 assert_eq!(secret1, secret2);
1376
1377 let secret3 =
1379 rotation.get_unix_minute_secret(topic_hash, unix_minute + 1, initial_secret_hash);
1380 assert_ne!(secret1, secret3);
1381
1382 let different_topic = [99u8; 32];
1384 let secret4 =
1385 rotation.get_unix_minute_secret(different_topic, unix_minute, initial_secret_hash);
1386 assert_ne!(secret1, secret4);
1387 }
1388
1389 #[test]
1390 fn test_unix_minute_function() {
1391 let current = unix_minute(0);
1392 let prev = unix_minute(-1);
1393 let next = unix_minute(1);
1394
1395 assert_eq!(current, prev + 1);
1396 assert_eq!(next, current + 1);
1397
1398 let current2 = unix_minute(0);
1400 assert_eq!(current, current2);
1401 }
1402
1403 #[test]
1404 fn test_topic_signing_keypair_deterministic() {
1405 let topic_id = TopicId::new("test-topic".to_string());
1406 let unix_minute = 12345u64;
1407
1408 let key1 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute);
1409 let key2 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute);
1410
1411 assert_eq!(key1.to_bytes(), key2.to_bytes());
1413
1414 let key3 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute + 1);
1416 assert_ne!(key1.to_bytes(), key3.to_bytes());
1417 }
1418
1419 #[test]
1420 fn test_topic_encryption_keypair_deterministic() {
1421 let topic_id = TopicId::new("test-topic".to_string());
1422 let unix_minute = 12345u64;
1423 let initial_secret_hash = [1u8; 32];
1424 let rotation = DefaultSecretRotation;
1425
1426 let key1 = Topic::<DefaultSecretRotation>::encryption_keypair(
1427 &topic_id,
1428 &rotation,
1429 initial_secret_hash,
1430 unix_minute,
1431 );
1432 let key2 = Topic::<DefaultSecretRotation>::encryption_keypair(
1433 &topic_id,
1434 &rotation,
1435 initial_secret_hash,
1436 unix_minute,
1437 );
1438
1439 assert_eq!(key1.to_bytes(), key2.to_bytes());
1441
1442 let key3 = Topic::<DefaultSecretRotation>::encryption_keypair(
1444 &topic_id,
1445 &rotation,
1446 initial_secret_hash,
1447 unix_minute + 1,
1448 );
1449 assert_ne!(key1.to_bytes(), key3.to_bytes());
1450 }
1451
1452 #[test]
1453 fn test_topic_salt_deterministic() {
1454 let topic_id = TopicId::new("test-topic".to_string());
1455 let unix_minute = 12345u64;
1456
1457 let salt1 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute);
1458 let salt2 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute);
1459
1460 assert_eq!(salt1, salt2);
1462
1463 let salt3 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute + 1);
1465 assert_ne!(salt1, salt3);
1466 }
1467}