1use std::{collections::HashSet, sync::Arc, time::Duration};
2
3use anyhow::{Result, bail};
4use arc_swap::ArcSwap;
5use ed25519_dalek::ed25519::signature::SignerMut;
6use futures::StreamExt as _;
7use iroh::Endpoint;
8use iroh_gossip::{
9 api::{Event},
10 proto::DeliveryScope,
11};
12use mainline::{async_dht::AsyncDht, MutableItem};
13use once_cell::sync::Lazy;
14use rand::seq::SliceRandom;
15use sha2::Digest;
16
17use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
18use tokio::{
19 time::{sleep, timeout},
20};
21
22pub const MAX_JOIN_PEERS_COUNT: usize = 30;
23pub const MAX_BOOTSTRAP_RECORDS: usize = 10;
24pub const SECRET_ROTATION: DefaultSecretRotation = DefaultSecretRotation;
25
26static DHT: Lazy<ArcSwap<mainline::async_dht::AsyncDht>> = Lazy::new(|| {
27 ArcSwap::from_pointee(
28 mainline::Dht::builder().build().expect("failed to create dht").as_async()
29 )
30});
31
32fn get_dht() -> Arc<AsyncDht> {
33 DHT.load_full()
34}
35
36async fn reset_dht() {
37 let n_dht = mainline::Dht::builder()
38 .build()
39 .expect("failed to create dht");
40 DHT.store(Arc::new(n_dht.as_async()));
41}
42
43#[derive(Debug, Clone)]
44pub struct EncryptedRecord {
45 encrypted_record: Vec<u8>,
46 encrypted_decryption_key: Vec<u8>,
47}
48
49#[derive(Debug, Clone, PartialEq, Eq, Hash)]
50pub struct Record {
51 topic: [u8; 32],
52 unix_minute: u64,
53 node_id: [u8; 32],
54 active_peers: [[u8; 32]; 5],
55 last_message_hashes: [[u8; 32]; 5],
56 signature: [u8; 64],
57}
58
59pub struct Gossip<R: SecretRotation + Default + Clone + Send + 'static> {
60 pub gossip: iroh_gossip::net::Gossip,
61 endpoint: iroh::Endpoint,
62 secret_rotation_function: R,
63}
64
65#[derive(Debug)]
66pub struct Topic<R: SecretRotation + Default + Clone + Send + 'static> {
67 topic_id: TopicId,
68 gossip_sender: GossipSender,
69 gossip_receiver: GossipReceiver,
70 initial_secret_hash: [u8; 32],
71 secret_rotation_function: R,
72 node_id: iroh::NodeId,
73}
74
75#[derive(Debug, Clone)]
76pub struct TopicId {
77 _raw: String,
78 hash: [u8; 32], }
80
81#[derive(Debug, Clone)]
82pub struct GossipReceiver {
83 gossip_event_forwarder: tokio::sync::broadcast::Sender<iroh_gossip::api::Event>,
84 action_req: tokio::sync::broadcast::Sender<InnerActionRecv>,
85 last_message_hashes: Vec<[u8; 32]>,
86}
87
88#[derive(Debug, Clone)]
89pub struct GossipSender {
90 action_req: tokio::sync::broadcast::Sender<InnerActionSend>,
91}
92
93#[derive(Debug, Clone)]
94enum InnerActionRecv {
95 ReqNeighbors(tokio::sync::broadcast::Sender<HashSet<iroh::NodeId>>),
96 ReqIsJoined(tokio::sync::broadcast::Sender<bool>),
97}
98
99#[derive(Debug, Clone)]
100enum InnerActionSend {
101 ReqSend(Vec<u8>, tokio::sync::broadcast::Sender<bool>),
102 ReqJoinPeers(Vec<iroh::NodeId>, tokio::sync::broadcast::Sender<bool>),
103}
104
105impl EncryptedRecord {
106 pub fn decrypt(
107 &self,
108 decryption_key: &ed25519_dalek::SigningKey
109 ) -> Result<Record> {
110 let one_time_key_bytes: [u8; 32] = decryption_key
111 .decrypt(&self.encrypted_decryption_key)?.as_slice()
112 .try_into()?;
113 let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
114
115 let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
116 let record = Record::from_bytes(decrypted_record)?;
117 Ok(record)
118 }
119
120 pub fn to_bytes(&self) -> Vec<u8> {
121 let mut buf = Vec::new();
122 let encrypted_record_len = self.encrypted_record.len() as u32;
123 buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
124 buf.extend_from_slice(&self.encrypted_record);
125 buf.extend_from_slice(&self.encrypted_decryption_key);
126 buf
127 }
128
129 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
130 let (encrypted_record_len, buf) = buf.split_at(4);
131 let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
132 let (encrypted_record, encrypted_decryption_key) =
133 buf.split_at(encrypted_record_len as usize);
134
135 Ok(Self {
136 encrypted_record: encrypted_record.to_vec(),
137 encrypted_decryption_key: encrypted_decryption_key.to_vec(),
138 })
139 }
140}
141
142impl Record {
143 pub fn sign(
144 topic: [u8; 32],
145 unix_minute: u64,
146 node_id: [u8; 32],
147 active_peers: [[u8; 32]; 5],
148 last_message_hashes: [[u8; 32]; 5],
149 signing_key: &ed25519_dalek::SigningKey,
150 ) -> Self {
151 let mut signature_data = Vec::new();
152 signature_data.extend_from_slice(&topic);
153 signature_data.extend_from_slice(&unix_minute.to_le_bytes());
154 signature_data.extend_from_slice(&node_id);
155 for active_peer in active_peers {
156 signature_data.extend_from_slice(&active_peer);
157 }
158 for last_message_hash in last_message_hashes {
159 signature_data.extend_from_slice(&last_message_hash);
160 }
161 let mut signing_key = signing_key.clone();
162 let signature = signing_key.sign(&signature_data);
163 Self {
164 topic,
165 unix_minute,
166 node_id,
167 active_peers,
168 last_message_hashes,
169 signature: signature.to_bytes(),
170 }
171 }
172
173 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
174 let (topic, buf) = buf.split_at(32);
175 let (unix_minute, buf) = buf.split_at(8);
176 let (node_id, mut buf) = buf.split_at(32);
177
178 let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
179 for i in 0..active_peers.len() {
180 let (active_peer, _buf) = buf.split_at(32);
181 active_peers[i] = active_peer.try_into()?;
182 buf = _buf;
183 }
184 let mut last_message_hashes: [[u8; 32]; 5] = [[0; 32]; 5];
185 for i in 0..last_message_hashes.len() {
186 let (last_message_hash, _buf) = buf.split_at(32);
187 last_message_hashes[i] = last_message_hash.try_into()?;
188 buf = _buf;
189 }
190
191 let (signature, buf) = buf.split_at(64);
192
193 if !buf.is_empty() {
194 bail!("buffer not empty after reconstruction")
195 }
196
197 Ok(Self {
198 topic: topic.try_into()?,
199 unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
200 node_id: node_id.try_into()?,
201 active_peers: active_peers.try_into()?,
202 last_message_hashes: last_message_hashes.try_into()?,
203 signature: signature.try_into()?,
204 })
205 }
206
207 pub fn to_bytes(&self) -> Vec<u8> {
208 let mut buf = Vec::new();
209 buf.extend_from_slice(&self.topic);
210 buf.extend_from_slice(&self.unix_minute.to_le_bytes());
211 buf.extend_from_slice(&self.node_id);
212 for active_peer in self.active_peers {
213 buf.extend_from_slice(&active_peer);
214 }
215 for last_message_hash in self.last_message_hashes {
216 buf.extend_from_slice(&last_message_hash);
217 }
218 buf.extend_from_slice(&self.signature);
219 buf
220 }
221
222 pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
223 if self.topic != *actual_topic {
224 bail!("topic mismatch")
225 }
226 if self.unix_minute != actual_unix_minute {
227 bail!("unix minute mismatch")
228 }
229
230 let record_bytes = self.to_bytes();
231 let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
232 let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
233 let node_id = ed25519_dalek::VerifyingKey::from_bytes(&self.node_id)?;
234
235 node_id.verify_strict(signature_data.as_slice(), &signature)?;
236
237 Ok(())
238 }
239
240 pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
241 let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::thread_rng());
242 let p_key = one_time_key.verifying_key();
243 let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
244 let key_enc = encryption_key
245 .verifying_key()
246 .encrypt(&one_time_key.to_bytes())
247 .expect("encryption failed");
248
249 EncryptedRecord {
250 encrypted_record: data_enc,
251 encrypted_decryption_key: key_enc,
252 }
253 }
254}
255
256impl GossipSender {
257 pub fn new(gossip_sender: iroh_gossip::api::GossipSender) -> Self {
258 let (action_req_tx, mut action_req_rx) =
259 tokio::sync::broadcast::channel::<InnerActionSend>(1024);
260
261 tokio::spawn({
262 let gossip_sender = gossip_sender;
263 async move {
264 loop {
265 match action_req_rx.recv().await {
266 Ok(inner_action) => match inner_action {
267 InnerActionSend::ReqSend(data, tx) => {
268 let res = gossip_sender.broadcast(data.into()).await;
269 tx.send(res.is_ok()).expect("broadcast failed");
270 }
271 InnerActionSend::ReqJoinPeers(peers, tx) => {
272 let res = gossip_sender.join_peers(peers).await;
273 tx.send(res.is_ok()).expect("broadcast failed");
274 }
275 },
276 Err(_) => break,
277 }
278 }
279 }
280 });
281
282 Self {
283 action_req: action_req_tx,
284 }
285 }
286
287 pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
288 let (tx, mut rx) = tokio::sync::broadcast::channel::<bool>(1);
289 self.action_req
290 .send(InnerActionSend::ReqSend(data, tx))
291 .expect("broadcast failed");
292
293 match rx.recv().await {
294 Ok(true) => Ok(()),
295 Ok(false) => bail!("broadcast failed"),
296 Err(_) => panic!("broadcast failed"),
297 }
298 }
299
300 pub async fn join_peers(
301 &self,
302 peers: Vec<iroh::NodeId>,
303 max_peers: Option<usize>,
304 ) -> Result<()> {
305 let mut peers = peers;
306 if let Some(max_peers) = max_peers {
307 peers.shuffle(&mut rand::thread_rng());
308 peers.truncate(max_peers);
309 }
310
311 let (tx, mut rx) = tokio::sync::broadcast::channel::<bool>(1);
312 self.action_req
313 .send(InnerActionSend::ReqJoinPeers(peers, tx))
314 .expect("broadcast failed");
315
316 match rx.recv().await {
317 Ok(true) => Ok(()),
318 Ok(false) => bail!("join peers failed"),
319 Err(_) => panic!("broadcast failed"),
320 }
321 }
322}
323
324impl GossipReceiver {
325 pub fn new(gossip_receiver: iroh_gossip::api::GossipReceiver) -> Self {
326 let (gossip_forward_tx, mut gossip_forward_rx) =
327 tokio::sync::broadcast::channel::<iroh_gossip::api::Event>(1024);
328 let (action_req_tx, mut action_req_rx) =
329 tokio::sync::broadcast::channel::<InnerActionRecv>(1024);
330
331 tokio::spawn({
332 async move {
333 while let Ok(_) = gossip_forward_rx.recv().await {}
334 }
335 });
336
337 let self_ref = Self {
338 gossip_event_forwarder: gossip_forward_tx,
339 action_req: action_req_tx,
340 last_message_hashes: vec![],
341 };
342 tokio::spawn({
343 let mut self_ref = self_ref.clone();
344 async move {
345 let mut gossip_receiver = gossip_receiver;
346 loop {
347 tokio::select! {
348 Ok(inner_action) = action_req_rx.recv() => {
349 match inner_action {
350 InnerActionRecv::ReqNeighbors(tx) => {
351 let neighbors = gossip_receiver.neighbors().collect::<HashSet<iroh::NodeId>>();
352 tx.send(neighbors).expect("broadcast failed");
353 },
354 InnerActionRecv::ReqIsJoined(tx) => {
355 let is_joined = gossip_receiver.is_joined();
356 tx.send(is_joined).expect("broadcast failed");
357 }
358 }
359 }
360 gossip_event_res = gossip_receiver.next() => {
361 if let Some(Ok(gossip_event)) = gossip_event_res {
362 if let Event::Received(msg) = gossip_event.clone() {
363 if let DeliveryScope::Swarm(_) = msg.scope {
364 let hash = sha2::Sha512::digest(&msg.content);
365 self_ref.last_message_hashes.push(hash[..32].try_into().expect("hashing failed"));
366 while self_ref.last_message_hashes.len() > 5 {
367 self_ref.last_message_hashes.pop();
368 }
369 }
370 }
371 self_ref.gossip_event_forwarder.send(gossip_event).expect("broadcast failed");
372 } else {
373 break;
374 }
375 }
376 }
377 }
378 }
379 });
380
381 self_ref
382 }
383
384 pub async fn neighbors(&mut self) -> HashSet<iroh::NodeId> {
385 let (neighbors_tx, mut neighbors_rx) =
386 tokio::sync::broadcast::channel::<HashSet<iroh::NodeId>>(1);
387 tokio::spawn({
388 let action_req = self.action_req.clone();
389 async move {
390 action_req
391 .send(InnerActionRecv::ReqNeighbors(neighbors_tx))
392 .expect("broadcast failed");
393 }
394 });
395
396 match neighbors_rx.recv().await {
397 Ok(neighbors) => neighbors,
398 Err(_) => panic!("broadcast failed"),
399 }
400 }
401
402 pub async fn is_joined(&mut self) -> bool {
403 let (is_joined_tx, mut is_joined_rx) = tokio::sync::broadcast::channel::<bool>(1);
404 self.action_req
405 .send(InnerActionRecv::ReqIsJoined(is_joined_tx))
406 .expect("broadcast failed");
407 match is_joined_rx.recv().await {
408 Ok(is_joined) => is_joined,
409 Err(_) => panic!("broadcast failed"),
410 }
411 }
412
413 pub async fn recv(&mut self) -> Result<Event> {
414 self.gossip_event_forwarder
415 .subscribe()
416 .recv()
417 .await
418 .map_err(|err| anyhow::anyhow!(err))
419 }
420
421 pub fn last_message_hashes(&self) -> Vec<[u8; 32]> {
422 self.last_message_hashes.clone()
423 }
424}
425
426impl TopicId {
427 pub fn new(raw: String) -> Self {
428 let mut raw_hash = sha2::Sha512::new();
429 raw_hash.update(raw.as_bytes());
430
431 Self {
432 _raw: raw,
433 hash: raw_hash.finalize()[..32]
434 .try_into()
435 .expect("hashing 'raw' failed"),
436 }
437 }
438}
439
440impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
442 pub async fn new(
443 topic_id: TopicId,
444 endpoint: &iroh::Endpoint,
445 node_signing_key: &ed25519_dalek::SigningKey,
446 gossip: iroh_gossip::net::Gossip,
447 initial_secret: &Vec<u8>,
448 secret_rotation_function: Option<R>,
449 ) -> Result<Self> {
450
451 let mut initial_secret_hash = sha2::Sha512::new();
453 initial_secret_hash.update(initial_secret);
454 let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
455 .try_into()
456 .expect("hashing failed");
457
458 let (gossip_tx, gossip_rx) = Self::bootstrap(
460 topic_id.clone(),
461 endpoint,
462 node_signing_key,
463 gossip,
464 initial_secret_hash,
465 secret_rotation_function.clone(),
466 )
467 .await?;
468
469 let _join_handler = Self::spawn_publisher(
471 topic_id.clone(),
472 secret_rotation_function.clone(),
473 initial_secret_hash,
474 endpoint.node_id().clone(),
475 gossip_rx.clone(),
476 gossip_tx.clone(),
477 node_signing_key.clone(),
478 );
479
480 Ok(Self {
481 topic_id,
482 gossip_sender: gossip_tx,
483 gossip_receiver: gossip_rx,
484 initial_secret_hash: initial_secret_hash,
485 secret_rotation_function: secret_rotation_function.unwrap_or_default(),
486 node_id: endpoint.node_id().clone(),
487 })
488 }
489
490 pub fn split(&self) -> (GossipSender, GossipReceiver) {
491 (self.gossip_sender.clone(), self.gossip_receiver.clone())
492 }
493
494 pub fn topic_id(&self) -> &TopicId {
495 &self.topic_id
496 }
497
498 pub fn node_id(&self) -> &iroh::NodeId {
499 &self.node_id
500 }
501
502 pub fn gossip_sender(&self) -> GossipSender {
503 self.gossip_sender.clone()
504 }
505
506 pub fn gossip_receiver(&self) -> GossipReceiver {
507 self.gossip_receiver.clone()
508 }
509
510 pub fn secret_rotation_function(&self) -> R {
511 self.secret_rotation_function.clone()
512 }
513
514 pub fn initial_secret_hash(&self) -> [u8; 32] {
515 self.initial_secret_hash
516 }
517
518 pub fn set_initial_secret_hash(&mut self, initial_secret_hash: [u8; 32]) {
519 self.initial_secret_hash = initial_secret_hash;
520 }
521}
522
523impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
525 pub async fn bootstrap(
526 topic_id: TopicId,
527 endpoint: &iroh::Endpoint,
528 node_signing_key: &ed25519_dalek::SigningKey,
529 gossip: iroh_gossip::net::Gossip,
530 initial_secret_hash: [u8; 32],
531 secret_rotation_function: Option<R>,
532 ) -> Result<(GossipSender, GossipReceiver)> {
533 let gossip_topic: iroh_gossip::api::GossipTopic = gossip
534 .subscribe(iroh_gossip::proto::TopicId::from(topic_id.hash), vec![])
535 .await?;
536 let (gossip_sender, gossip_receiver) = gossip_topic.split();
537 let (gossip_sender, mut gossip_receiver) = (
538 GossipSender::new(gossip_sender),
539 GossipReceiver::new(gossip_receiver),
540 );
541
542 let mut last_published_unix_minute = 0;
543 loop {
544 if gossip_receiver.is_joined().await {
546 return Ok((gossip_sender, gossip_receiver));
547 }
548
549 let unix_minute = crate::unix_minute(if last_published_unix_minute == 0 {
551 -1
552 } else {
553 0
554 });
555
556 let records = Topic::get_unix_minute_records(
558 &topic_id.clone(),
559 unix_minute,
560 secret_rotation_function.clone(),
561 initial_secret_hash,
562 &endpoint.node_id(),
563 )
564 .await;
565
566 if records.is_empty() {
569 if unix_minute != last_published_unix_minute {
570 if Self::publish_proc(
571 unix_minute,
572 &topic_id,
573 secret_rotation_function.clone(),
574 initial_secret_hash,
575 endpoint.node_id().clone(),
576 node_signing_key,
577 HashSet::new(),
578 vec![],
579 )
580 .await
581 .is_ok()
582 {
583 last_published_unix_minute = unix_minute;
584 }
585 }
586 sleep(Duration::from_millis(100)).await;
587 continue;
588 }
589
590 let bootstrap_nodes = records
594 .iter()
595 .map(|record| {
596 let mut v = vec![record.node_id];
597 for peer in record.active_peers {
598 if peer != [0; 32] {
599 v.push(peer);
600 }
601 }
602 v
603 })
604 .flatten()
605 .filter_map(|node_id| match iroh::NodeId::from_bytes(&node_id) {
606 Ok(node_id) => Some(node_id),
607 Err(_) => None,
608 })
609 .collect::<HashSet<_>>();
610
611
612
613 if gossip_receiver.is_joined().await {
617 return Ok((gossip_sender, gossip_receiver));
618 }
619
620 for node_id in bootstrap_nodes.iter() {
623 match gossip_sender.join_peers(vec![node_id.clone()], None).await {
624 Ok(_) => {
625 sleep(Duration::from_millis(100)).await;
626 if gossip_receiver.is_joined().await {
627 break;
628 }
629 }
630 Err(_) => {
631 continue;
632 }
633 }
634 }
635
636 if !gossip_receiver.is_joined().await {
639 sleep(Duration::from_millis(500)).await;
640 }
641
642 if gossip_receiver.is_joined().await {
644 return Ok((gossip_sender, gossip_receiver));
645 } else {
646 if unix_minute != last_published_unix_minute {
648 if Self::publish_proc(
649 unix_minute,
650 &topic_id,
651 secret_rotation_function.clone(),
652 initial_secret_hash,
653 endpoint.node_id().clone(),
654 node_signing_key,
655 HashSet::new(),
656 vec![],
657 )
658 .await
659 .is_ok()
660 {
661 last_published_unix_minute = unix_minute;
662 }
663 }
664 sleep(Duration::from_millis(100)).await;
665 continue;
666 }
667 }
668 }
669
670 async fn publish_proc(
673 unix_minute: u64,
674 topic_id: &TopicId,
675 secret_rotation_function: Option<R>,
676 initial_secret_hash: [u8; 32],
677 node_id: iroh::NodeId,
678 node_signing_key: &ed25519_dalek::SigningKey,
679 neighbors: HashSet<iroh::NodeId>,
680 last_message_hashes: Vec<[u8; 32]>,
681 ) -> Result<HashSet<Record>> {
682 let records = Topic::<R>::get_unix_minute_records(
684 &topic_id.clone(),
685 unix_minute,
686 secret_rotation_function.clone(),
687 initial_secret_hash,
688 &node_id,
689 )
690 .await
691 .iter()
692 .filter(|&record| {
693 record
694 .active_peers
695 .iter()
696 .filter(|&peer| peer.eq(&[0u8; 32]))
697 .count()
698 > 0
699 || record
700 .last_message_hashes
701 .iter()
702 .filter(|&hash| hash.eq(&[0u8; 32]))
703 .count()
704 > 0
705 })
706 .cloned()
707 .collect::<HashSet<_>>();
708
709 if records.len() >= MAX_BOOTSTRAP_RECORDS {
712 return Ok(records);
713 }
714
715 let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
717 for (i, peer) in neighbors.iter().take(5).enumerate() {
718 active_peers[i] = peer.as_bytes().clone();
719 }
720
721 let mut last_message_hashes_array = [[0u8; 32]; 5];
722 for (i, hash) in last_message_hashes.iter().take(5).enumerate() {
723 last_message_hashes_array[i] = *hash;
724 }
725
726 let record = Record::sign(
727 topic_id.hash,
728 unix_minute,
729 node_id.as_bytes().clone(),
730 active_peers,
731 last_message_hashes_array,
732 &node_signing_key,
733 );
734 Topic::<R>::publish_unix_minute_record(
735 unix_minute,
736 &topic_id.clone(),
737 secret_rotation_function.clone(),
738 initial_secret_hash,
739 record,
740 Some(3),
741 )
742 .await?;
743
744 Ok(records)
745 }
746
747 fn spawn_publisher(
749 topic_id: TopicId,
750 secret_rotation_function: Option<R>,
751 initial_secret_hash: [u8; 32],
752 node_id: iroh::NodeId,
753 gossip_receiver: GossipReceiver,
754 gossip_sender: GossipSender,
755 node_signing_key: ed25519_dalek::SigningKey,
756 ) -> tokio::task::JoinHandle<()> {
757 let mut gossip_receiver = gossip_receiver;
758
759 tokio::spawn(async move {
760 let mut backoff = 1;
761 loop {
762 let unix_minute = crate::unix_minute(0);
763
764 if let Ok(records) = Topic::<R>::publish_proc(
766 unix_minute,
767 &topic_id.clone(),
768 Some(secret_rotation_function.clone().unwrap_or_default()),
769 initial_secret_hash,
770 node_id.clone(),
771 &node_signing_key,
772 gossip_receiver.neighbors().await,
773 gossip_receiver.last_message_hashes(),
774 )
775 .await
776 {
777 let neighbors = gossip_receiver.neighbors().await;
779 if neighbors.len() < 4 && !records.is_empty() {
780 let node_ids = records
781 .iter()
782 .map(|record| {
783 record
784 .active_peers
785 .iter()
786 .filter_map(|&active_peer| {
787 if active_peer == [0; 32]
788 || neighbors.contains(&active_peer)
789 || active_peer.eq(record.node_id.to_vec().as_slice())
790 || active_peer.eq(node_id.as_bytes())
791 {
792 None
793 } else if let Ok(node_id) =
794 iroh::NodeId::from_bytes(&active_peer)
795 {
796 Some(node_id)
797 } else {
798 None
799 }
800 })
801 .collect::<Vec<_>>()
802 })
803 .flatten()
804 .collect::<HashSet<_>>();
805 if gossip_sender
806 .join_peers(
807 node_ids.iter().cloned().collect::<Vec<_>>(),
808 Some(MAX_JOIN_PEERS_COUNT),
809 )
810 .await
811 .is_ok()
812 {
813 }
815 }
816
817 if gossip_receiver.last_message_hashes().len() >= 1 {
819 let peers_to_join = records
820 .iter()
821 .filter_map(|record| {
822 if !record.last_message_hashes.iter().all(|last_message_hash| {
823 *last_message_hash != [0; 32]
824 && gossip_receiver
825 .last_message_hashes()
826 .contains(&last_message_hash)
827 }) {
828 Some(record)
829 } else {
830 None
831 }
832 })
833 .collect::<Vec<_>>();
834 if !peers_to_join.is_empty() {
835 let node_ids = peers_to_join
836 .iter()
837 .filter_map(|&record| {
838 let mut peers = vec![];
839 if let Ok(node_id) = iroh::NodeId::from_bytes(&record.node_id) {
840 peers.push(node_id);
841 }
842 for active_peer in record.active_peers {
843 if active_peer == [0; 32] {
844 continue;
845 }
846 if let Ok(node_id) = iroh::NodeId::from_bytes(&active_peer)
847 {
848 peers.push(node_id);
849 }
850 }
851 Some(peers)
852 })
853 .flatten()
854 .collect::<HashSet<_>>();
855
856 if gossip_sender
857 .join_peers(
858 node_ids.iter().cloned().collect::<Vec<_>>(),
859 Some(MAX_JOIN_PEERS_COUNT),
860 )
861 .await
862 .is_ok()
863 {
864 }
869 }
870 }
871 } else {
872 sleep(Duration::from_secs(backoff)).await;
873 backoff = (backoff * 2).max(60);
874 continue;
875 }
876
877 backoff = 1;
878 sleep(Duration::from_secs(rand::random::<u64>() % 60)).await;
879 }
880 })
881 }
882}
883
884impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
886 fn signing_keypair(topic_id: &TopicId, unix_minute: u64) -> ed25519_dalek::SigningKey {
887 let mut sign_keypair_hash = sha2::Sha512::new();
888 sign_keypair_hash.update(topic_id.hash);
889 sign_keypair_hash.update(unix_minute.to_le_bytes());
890 let sign_keypair_seed: [u8; 32] = sign_keypair_hash.finalize()[..32]
891 .try_into()
892 .expect("hashing failed");
893 ed25519_dalek::SigningKey::from_bytes(&sign_keypair_seed)
894 }
895
896 fn encryption_keypair(
897 topic_id: &TopicId,
898 secret_rotation_function: &R,
899 initial_secret_hash: [u8; 32],
900 unix_minute: u64,
901 ) -> ed25519_dalek::SigningKey {
902 let enc_keypair_seed = secret_rotation_function.get_unix_minute_secret(
903 topic_id.hash,
904 unix_minute,
905 initial_secret_hash,
906 );
907 ed25519_dalek::SigningKey::from_bytes(&enc_keypair_seed)
908 }
909
910 fn salt(topic_id: &TopicId, unix_minute: u64) -> [u8; 32] {
912 let mut slot_hash = sha2::Sha512::new();
913 slot_hash.update(topic_id.hash);
914 slot_hash.update(unix_minute.to_le_bytes());
915 slot_hash.finalize()[..32]
916 .try_into()
917 .expect("hashing failed")
918 }
919
920 async fn get_unix_minute_records(
921 topic_id: &TopicId,
922 unix_minute: u64,
923 secret_rotation_function: Option<R>,
924 initial_secret_hash: [u8; 32],
925 node_id: &iroh::NodeId,
926 ) -> HashSet<Record> {
927 let topic_sign = Topic::<R>::signing_keypair(&topic_id, unix_minute);
928 let encryption_key = Topic::<R>::encryption_keypair(
929 &topic_id,
930 &secret_rotation_function.clone().unwrap_or_default(),
931 initial_secret_hash,
932 unix_minute,
933 );
934 let salt = Topic::<R>::salt(&topic_id, unix_minute);
935
936 let dht = get_dht();
938
939 let records_iter = match timeout(
940 Duration::from_secs(10),
941 dht.get_mutable(topic_sign.verifying_key().as_bytes(), Some(&salt), None)
942 .collect::<Vec<_>>(),
943 )
944 .await
945 {
946 Ok(records) => records,
947 Err(_) => vec![],
948 };
949
950 let records = records_iter
951 .iter()
952 .filter_map(|record| {
953 match EncryptedRecord::from_bytes(record.value().to_vec()) {
954 Ok(encrypted_record) => match encrypted_record.decrypt(&encryption_key) {
955 Ok(record) => match record.verify(&topic_id.hash, unix_minute) {
956 Ok(_) => match record.node_id.eq(node_id.as_bytes()) {
957 true => {
958 None
959 }
960 false => Some(record),
961 },
962 Err(_) => None,
963 },
964 Err(_) => None,
965 },
966 Err(_) => None,
967 }
968 })
969 .collect::<HashSet<_>>();
970 records
971 }
972
973 async fn publish_unix_minute_record(
974 unix_minute: u64,
975 topic_id: &TopicId,
976 secret_rotation_function: Option<R>,
977 initial_secret_hash: [u8; 32],
978 record: Record,
979 retry_count: Option<usize>,
980 ) -> Result<()> {
981 let sign_key = Topic::<R>::signing_keypair(&topic_id.clone(), unix_minute);
982 let salt = Topic::<R>::salt(&topic_id, unix_minute);
983 let encryption_key = Topic::<R>::encryption_keypair(
984 &topic_id.clone(),
985 &secret_rotation_function.clone().unwrap_or_default(),
986 initial_secret_hash,
987 unix_minute,
988 );
989 let encrypted_record = record.encrypt(&encryption_key);
990
991 for i in 0..retry_count.unwrap_or(3) {
992
993 let dht = get_dht();
994
995 let most_recent_result = match timeout(
996 Duration::from_secs(10),
997 dht.get_mutable_most_recent(
998 sign_key.clone().verifying_key().as_bytes(),
999 Some(&salt),
1000 ),
1001 )
1002 .await
1003 {
1004 Ok(result) => result,
1005 Err(_) => None,
1006 };
1007
1008 let item = if let Some(mut_item) = most_recent_result {
1009 MutableItem::new(
1010 sign_key.clone(),
1011 &encrypted_record.to_bytes(),
1012 mut_item.seq() + 1,
1013 Some(&salt),
1014 )
1015 } else {
1016 MutableItem::new(
1017 sign_key.clone(),
1018 &encrypted_record.to_bytes(),
1019 0,
1020 Some(&salt),
1021 )
1022 };
1023
1024 let put_result = match timeout(
1025 Duration::from_secs(10),
1026 dht.put_mutable(item.clone(), Some(item.seq())),
1027 )
1028 .await
1029 {
1030 Ok(result) => result.ok(),
1031 Err(_) => None
1032 };
1033
1034 if put_result.is_some() {
1035 break;
1036 } else if i == retry_count.unwrap_or(3) - 1 {
1037 bail!("failed to publish record")
1038 }
1039
1040 reset_dht().await;
1041
1042 sleep(Duration::from_millis(rand::random::<u64>() % 2000)).await;
1043 }
1044 Ok(())
1045 }
1046}
1047
1048pub trait AutoDiscoveryBuilder {
1049 #[allow(async_fn_in_trait)]
1050 async fn spawn_with_auto_discovery<R: SecretRotation + Default + Clone + Send + 'static>(
1051 self,
1052 endpoint: Endpoint,
1053 secret_rotation_function: Option<R>,
1054 ) -> Result<Gossip<R>>;
1055}
1056
1057impl AutoDiscoveryBuilder for iroh_gossip::net::Builder {
1058 async fn spawn_with_auto_discovery<R: SecretRotation + Default + Clone + Send + 'static>(
1059 self,
1060 endpoint: Endpoint,
1061 secret_rotation_function: Option<R>,
1062 ) -> Result<Gossip<R>> {
1063 Ok(Gossip {
1064 gossip: self.spawn(endpoint.clone()),
1065 endpoint: endpoint.clone(),
1066 secret_rotation_function: secret_rotation_function.unwrap_or_default(),
1067 })
1068 }
1069}
1070
1071pub trait AutoDiscoveryGossip<R: SecretRotation + Default + Clone + Send + 'static> {
1072 #[allow(async_fn_in_trait)]
1073 async fn subscribe_and_join_with_auto_discovery(
1074 &self,
1075 topic_id: TopicId,
1076 initial_secret: &Vec<u8>,
1077 ) -> Result<Topic<R>>;
1078}
1079
1080#[derive(Debug, Clone, Copy, Default)]
1082pub struct DefaultSecretRotation;
1083
1084pub trait SecretRotation {
1085 fn get_unix_minute_secret(
1086 &self,
1087 topic_hash: [u8; 32],
1088 unix_minute: u64,
1089 initial_secret_hash: [u8; 32],
1090 ) -> [u8; 32];
1091}
1092
1093impl<R: SecretRotation + Default + Clone + Send + 'static> AutoDiscoveryGossip<R> for Gossip<R> {
1094 async fn subscribe_and_join_with_auto_discovery(
1095 &self,
1096 topic_id: TopicId,
1097 initial_secret: &Vec<u8>,
1098 ) -> Result<Topic<R>> {
1099 Topic::new(
1100 topic_id,
1101 &self.endpoint,
1102 self.endpoint.secret_key().secret(),
1103 self.gossip.clone(),
1104 initial_secret,
1105 Some(self.secret_rotation_function.clone()),
1106 )
1107 .await
1108 }
1109}
1110
1111impl SecretRotation for DefaultSecretRotation {
1112 fn get_unix_minute_secret(
1113 &self,
1114 topic_hash: [u8; 32],
1115 unix_minute: u64,
1116 initial_secret_hash: [u8; 32],
1117 ) -> [u8; 32] {
1118 let mut hash = sha2::Sha512::new();
1119 hash.update(topic_hash);
1120 hash.update(unix_minute.to_be_bytes());
1121 hash.update(initial_secret_hash);
1122 hash.finalize()[..32].try_into().expect("hashing failed")
1123 }
1124}
1125
1126pub fn unix_minute(minute_offset: i64) -> u64 {
1127 ((chrono::Utc::now().timestamp() as f64 / 60.0f64).floor() as i64 + minute_offset) as u64
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132 use super::*;
1133 use ed25519_dalek::SigningKey;
1134 use rand::rngs::OsRng;
1135
1136 #[test]
1137 fn test_topic_id_creation() {
1138 let topic_id = TopicId::new("test-topic".to_string());
1139 assert_eq!(topic_id._raw, "test-topic");
1140 assert_eq!(topic_id.hash.len(), 32);
1141
1142 let topic_id2 = TopicId::new("test-topic".to_string());
1144 assert_eq!(topic_id.hash, topic_id2.hash);
1145
1146 let topic_id3 = TopicId::new("different-topic".to_string());
1148 assert_ne!(topic_id.hash, topic_id3.hash);
1149 }
1150
1151 #[test]
1152 fn test_record_serialization_roundtrip() {
1153 let signing_key = SigningKey::generate(&mut OsRng);
1154 let topic = [1u8; 32];
1155 let unix_minute = 12345u64;
1156 let node_id = [2u8; 32];
1157 let active_peers = [[3u8; 32]; 5];
1158 let last_message_hashes = [[4u8; 32]; 5];
1159
1160 let record = Record::sign(
1161 topic,
1162 unix_minute,
1163 node_id,
1164 active_peers,
1165 last_message_hashes,
1166 &signing_key,
1167 );
1168
1169 let bytes = record.to_bytes();
1171 let deserialized = Record::from_bytes(bytes).unwrap();
1172
1173 assert_eq!(record.topic, deserialized.topic);
1174 assert_eq!(record.unix_minute, deserialized.unix_minute);
1175 assert_eq!(record.node_id, deserialized.node_id);
1176 assert_eq!(record.active_peers, deserialized.active_peers);
1177 assert_eq!(record.last_message_hashes, deserialized.last_message_hashes);
1178 assert_eq!(record.signature, deserialized.signature);
1179 }
1180
1181 #[test]
1182 fn test_record_verification() {
1183 let signing_key = SigningKey::generate(&mut OsRng);
1184 let topic = [1u8; 32];
1185 let unix_minute = 12345u64;
1186 let node_id = signing_key.verifying_key().to_bytes();
1187 let active_peers = [[3u8; 32]; 5];
1188 let last_message_hashes = [[4u8; 32]; 5];
1189
1190 let record = Record::sign(
1191 topic,
1192 unix_minute,
1193 node_id,
1194 active_peers,
1195 last_message_hashes,
1196 &signing_key,
1197 );
1198
1199 assert!(record.verify(&topic, unix_minute).is_ok());
1201
1202 let wrong_topic = [99u8; 32];
1204 assert!(record.verify(&wrong_topic, unix_minute).is_err());
1205
1206 assert!(record.verify(&topic, unix_minute + 1).is_err());
1208 }
1209
1210 #[test]
1211 fn test_encrypted_record_roundtrip() {
1212 let signing_key = SigningKey::generate(&mut OsRng);
1213 let encryption_key = SigningKey::generate(&mut OsRng);
1214 let topic = [1u8; 32];
1215 let unix_minute = 12345u64;
1216 let node_id = signing_key.verifying_key().to_bytes();
1217 let active_peers = [[3u8; 32]; 5];
1218 let last_message_hashes = [[4u8; 32]; 5];
1219
1220 let record = Record::sign(
1221 topic,
1222 unix_minute,
1223 node_id,
1224 active_peers,
1225 last_message_hashes,
1226 &signing_key,
1227 );
1228
1229 let encrypted = record.encrypt(&encryption_key);
1231 let decrypted = encrypted.decrypt(&encryption_key).unwrap();
1232
1233 assert_eq!(record.topic, decrypted.topic);
1234 assert_eq!(record.unix_minute, decrypted.unix_minute);
1235 assert_eq!(record.node_id, decrypted.node_id);
1236 assert_eq!(record.active_peers, decrypted.active_peers);
1237 assert_eq!(record.last_message_hashes, decrypted.last_message_hashes);
1238 assert_eq!(record.signature, decrypted.signature);
1239 }
1240
1241 #[test]
1242 fn test_encrypted_record_serialization() {
1243 let signing_key = SigningKey::generate(&mut OsRng);
1244 let encryption_key = SigningKey::generate(&mut OsRng);
1245 let topic = [1u8; 32];
1246 let unix_minute = 12345u64;
1247 let node_id = signing_key.verifying_key().to_bytes();
1248 let active_peers = [[3u8; 32]; 5];
1249 let last_message_hashes = [[4u8; 32]; 5];
1250
1251 let record = Record::sign(
1252 topic,
1253 unix_minute,
1254 node_id,
1255 active_peers,
1256 last_message_hashes,
1257 &signing_key,
1258 );
1259
1260 let encrypted = record.encrypt(&encryption_key);
1261
1262 let bytes = encrypted.to_bytes();
1264 let deserialized = EncryptedRecord::from_bytes(bytes).unwrap();
1265
1266 let decrypted = deserialized.decrypt(&encryption_key).unwrap();
1268 assert_eq!(record.topic, decrypted.topic);
1269 assert_eq!(record.unix_minute, decrypted.unix_minute);
1270 }
1271
1272 #[test]
1273 fn test_default_secret_rotation() {
1274 let rotation = DefaultSecretRotation;
1275 let topic_hash = [1u8; 32];
1276 let unix_minute = 12345u64;
1277 let initial_secret_hash = [2u8; 32];
1278
1279 let secret1 = rotation.get_unix_minute_secret(topic_hash, unix_minute, initial_secret_hash);
1280 let secret2 = rotation.get_unix_minute_secret(topic_hash, unix_minute, initial_secret_hash);
1281
1282 assert_eq!(secret1, secret2);
1284
1285 let secret3 = rotation.get_unix_minute_secret(topic_hash, unix_minute + 1, initial_secret_hash);
1287 assert_ne!(secret1, secret3);
1288
1289 let different_topic = [99u8; 32];
1291 let secret4 = rotation.get_unix_minute_secret(different_topic, unix_minute, initial_secret_hash);
1292 assert_ne!(secret1, secret4);
1293 }
1294
1295 #[test]
1296 fn test_unix_minute_function() {
1297 let current = unix_minute(0);
1298 let prev = unix_minute(-1);
1299 let next = unix_minute(1);
1300
1301 assert_eq!(current, prev + 1);
1302 assert_eq!(next, current + 1);
1303
1304 let current2 = unix_minute(0);
1306 assert_eq!(current, current2);
1307 }
1308
1309 #[test]
1310 fn test_topic_signing_keypair_deterministic() {
1311 let topic_id = TopicId::new("test-topic".to_string());
1312 let unix_minute = 12345u64;
1313
1314 let key1 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute);
1315 let key2 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute);
1316
1317 assert_eq!(key1.to_bytes(), key2.to_bytes());
1319
1320 let key3 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute + 1);
1322 assert_ne!(key1.to_bytes(), key3.to_bytes());
1323 }
1324
1325 #[test]
1326 fn test_topic_encryption_keypair_deterministic() {
1327 let topic_id = TopicId::new("test-topic".to_string());
1328 let unix_minute = 12345u64;
1329 let initial_secret_hash = [1u8; 32];
1330 let rotation = DefaultSecretRotation;
1331
1332 let key1 = Topic::<DefaultSecretRotation>::encryption_keypair(&topic_id, &rotation, initial_secret_hash, unix_minute);
1333 let key2 = Topic::<DefaultSecretRotation>::encryption_keypair(&topic_id, &rotation, initial_secret_hash, unix_minute);
1334
1335 assert_eq!(key1.to_bytes(), key2.to_bytes());
1337
1338 let key3 = Topic::<DefaultSecretRotation>::encryption_keypair(&topic_id, &rotation, initial_secret_hash, unix_minute + 1);
1340 assert_ne!(key1.to_bytes(), key3.to_bytes());
1341 }
1342
1343 #[test]
1344 fn test_topic_salt_deterministic() {
1345 let topic_id = TopicId::new("test-topic".to_string());
1346 let unix_minute = 12345u64;
1347
1348 let salt1 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute);
1349 let salt2 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute);
1350
1351 assert_eq!(salt1, salt2);
1353
1354 let salt3 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute + 1);
1356 assert_ne!(salt1, salt3);
1357 }
1358}