distributed_topic_tracker/
lib.rs

1use std::{collections::HashSet, sync::Arc, time::Duration};
2
3use anyhow::{Result, bail};
4use arc_swap::ArcSwap;
5use ed25519_dalek::ed25519::signature::SignerMut;
6use futures::StreamExt as _;
7use iroh::Endpoint;
8use iroh_gossip::{
9    api::{Event},
10    proto::DeliveryScope,
11};
12use mainline::{async_dht::AsyncDht, MutableItem};
13use once_cell::sync::Lazy;
14use rand::seq::SliceRandom;
15use sha2::Digest;
16
17use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
18use tokio::{
19    time::{sleep, timeout},
20};
21
22pub const MAX_JOIN_PEERS_COUNT: usize = 30;
23pub const MAX_BOOTSTRAP_RECORDS: usize = 10;
24pub const SECRET_ROTATION: DefaultSecretRotation = DefaultSecretRotation;
25
26static DHT: Lazy<ArcSwap<mainline::async_dht::AsyncDht>> = Lazy::new(|| {
27    ArcSwap::from_pointee(
28        mainline::Dht::builder().build().expect("failed to create dht").as_async()
29    )
30});
31
32fn get_dht() -> Arc<AsyncDht> {
33    DHT.load_full()
34}
35
36async fn reset_dht() {
37    let n_dht = mainline::Dht::builder()
38                .build()
39                .expect("failed to create dht");
40    DHT.store(Arc::new(n_dht.as_async()));
41}
42
43#[derive(Debug, Clone)]
44pub struct EncryptedRecord {
45    encrypted_record: Vec<u8>,
46    encrypted_decryption_key: Vec<u8>,
47}
48
49#[derive(Debug, Clone, PartialEq, Eq, Hash)]
50pub struct Record {
51    topic: [u8; 32],
52    unix_minute: u64,
53    node_id: [u8; 32],
54    active_peers: [[u8; 32]; 5],
55    last_message_hashes: [[u8; 32]; 5],
56    signature: [u8; 64],
57}
58
59pub struct Gossip<R: SecretRotation + Default + Clone + Send + 'static> {
60    pub gossip: iroh_gossip::net::Gossip,
61    endpoint: iroh::Endpoint,
62    secret_rotation_function: R,
63}
64
65#[derive(Debug)]
66pub struct Topic<R: SecretRotation + Default + Clone + Send + 'static> {
67    topic_id: TopicId,
68    gossip_sender: GossipSender,
69    gossip_receiver: GossipReceiver,
70    initial_secret_hash: [u8; 32],
71    secret_rotation_function: R,
72    node_id: iroh::NodeId,
73}
74
75#[derive(Debug, Clone)]
76pub struct TopicId {
77    _raw: String,
78    hash: [u8; 32], // sha512( raw )[..32]
79}
80
81#[derive(Debug, Clone)]
82pub struct GossipReceiver {
83    gossip_event_forwarder: tokio::sync::broadcast::Sender<iroh_gossip::api::Event>,
84    action_req: tokio::sync::broadcast::Sender<InnerActionRecv>,
85    last_message_hashes: Vec<[u8; 32]>,
86}
87
88#[derive(Debug, Clone)]
89pub struct GossipSender {
90    action_req: tokio::sync::broadcast::Sender<InnerActionSend>,
91}
92
93#[derive(Debug, Clone)]
94enum InnerActionRecv {
95    ReqNeighbors(tokio::sync::broadcast::Sender<HashSet<iroh::NodeId>>),
96    ReqIsJoined(tokio::sync::broadcast::Sender<bool>),
97}
98
99#[derive(Debug, Clone)]
100enum InnerActionSend {
101    ReqSend(Vec<u8>, tokio::sync::broadcast::Sender<bool>),
102    ReqJoinPeers(Vec<iroh::NodeId>, tokio::sync::broadcast::Sender<bool>),
103}
104
105impl EncryptedRecord {
106    pub fn decrypt(
107        &self,
108        decryption_key: &ed25519_dalek::SigningKey
109    ) -> Result<Record> {
110        let one_time_key_bytes: [u8; 32] = decryption_key
111            .decrypt(&self.encrypted_decryption_key)?.as_slice()
112            .try_into()?;
113        let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
114
115        let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
116        let record = Record::from_bytes(decrypted_record)?;
117        Ok(record)
118    }
119
120    pub fn to_bytes(&self) -> Vec<u8> {
121        let mut buf = Vec::new();
122        let encrypted_record_len = self.encrypted_record.len() as u32;
123        buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
124        buf.extend_from_slice(&self.encrypted_record);
125        buf.extend_from_slice(&self.encrypted_decryption_key);
126        buf
127    }
128
129    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
130        let (encrypted_record_len, buf) = buf.split_at(4);
131        let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
132        let (encrypted_record, encrypted_decryption_key) =
133            buf.split_at(encrypted_record_len as usize);
134
135        Ok(Self {
136            encrypted_record: encrypted_record.to_vec(),
137            encrypted_decryption_key: encrypted_decryption_key.to_vec(),
138        })
139    }
140}
141
142impl Record {
143    pub fn sign(
144        topic: [u8; 32],
145        unix_minute: u64,
146        node_id: [u8; 32],
147        active_peers: [[u8; 32]; 5],
148        last_message_hashes: [[u8; 32]; 5],
149        signing_key: &ed25519_dalek::SigningKey,
150    ) -> Self {
151        let mut signature_data = Vec::new();
152        signature_data.extend_from_slice(&topic);
153        signature_data.extend_from_slice(&unix_minute.to_le_bytes());
154        signature_data.extend_from_slice(&node_id);
155        for active_peer in active_peers {
156            signature_data.extend_from_slice(&active_peer);
157        }
158        for last_message_hash in last_message_hashes {
159            signature_data.extend_from_slice(&last_message_hash);
160        }
161        let mut signing_key = signing_key.clone();
162        let signature = signing_key.sign(&signature_data);
163        Self {
164            topic,
165            unix_minute,
166            node_id,
167            active_peers,
168            last_message_hashes,
169            signature: signature.to_bytes(),
170        }
171    }
172
173    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
174        let (topic, buf) = buf.split_at(32);
175        let (unix_minute, buf) = buf.split_at(8);
176        let (node_id, mut buf) = buf.split_at(32);
177
178        let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
179        for i in 0..active_peers.len() {
180            let (active_peer, _buf) = buf.split_at(32);
181            active_peers[i] = active_peer.try_into()?;
182            buf = _buf;
183        }
184        let mut last_message_hashes: [[u8; 32]; 5] = [[0; 32]; 5];
185        for i in 0..last_message_hashes.len() {
186            let (last_message_hash, _buf) = buf.split_at(32);
187            last_message_hashes[i] = last_message_hash.try_into()?;
188            buf = _buf;
189        }
190
191        let (signature, buf) = buf.split_at(64);
192
193        if !buf.is_empty() {
194            bail!("buffer not empty after reconstruction")
195        }
196
197        Ok(Self {
198            topic: topic.try_into()?,
199            unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
200            node_id: node_id.try_into()?,
201            active_peers: active_peers.try_into()?,
202            last_message_hashes: last_message_hashes.try_into()?,
203            signature: signature.try_into()?,
204        })
205    }
206
207    pub fn to_bytes(&self) -> Vec<u8> {
208        let mut buf = Vec::new();
209        buf.extend_from_slice(&self.topic);
210        buf.extend_from_slice(&self.unix_minute.to_le_bytes());
211        buf.extend_from_slice(&self.node_id);
212        for active_peer in self.active_peers {
213            buf.extend_from_slice(&active_peer);
214        }
215        for last_message_hash in self.last_message_hashes {
216            buf.extend_from_slice(&last_message_hash);
217        }
218        buf.extend_from_slice(&self.signature);
219        buf
220    }
221
222    pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
223        if self.topic != *actual_topic {
224            bail!("topic mismatch")
225        }
226        if self.unix_minute != actual_unix_minute {
227            bail!("unix minute mismatch")
228        }
229
230        let record_bytes = self.to_bytes();
231        let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
232        let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
233        let node_id = ed25519_dalek::VerifyingKey::from_bytes(&self.node_id)?;
234
235        node_id.verify_strict(signature_data.as_slice(), &signature)?;
236
237        Ok(())
238    }
239
240    pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
241        let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::thread_rng());
242        let p_key = one_time_key.verifying_key();
243        let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
244        let key_enc = encryption_key
245            .verifying_key()
246            .encrypt(&one_time_key.to_bytes())
247            .expect("encryption failed");
248
249        EncryptedRecord {
250            encrypted_record: data_enc,
251            encrypted_decryption_key: key_enc,
252        }
253    }
254}
255
256impl GossipSender {
257    pub fn new(gossip_sender: iroh_gossip::api::GossipSender) -> Self {
258        let (action_req_tx, mut action_req_rx) =
259            tokio::sync::broadcast::channel::<InnerActionSend>(1024);
260
261        tokio::spawn({
262            let gossip_sender = gossip_sender;
263            async move {
264                loop {
265                    match action_req_rx.recv().await {
266                        Ok(inner_action) => match inner_action {
267                            InnerActionSend::ReqSend(data, tx) => {
268                                let res = gossip_sender.broadcast(data.into()).await;
269                                tx.send(res.is_ok()).expect("broadcast failed");
270                            }
271                            InnerActionSend::ReqJoinPeers(peers, tx) => {
272                                let res = gossip_sender.join_peers(peers).await;
273                                tx.send(res.is_ok()).expect("broadcast failed");
274                            }
275                        },
276                        Err(_) => break,
277                    }
278                }
279            }
280        });
281
282        Self {
283            action_req: action_req_tx,
284        }
285    }
286
287    pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
288        let (tx, mut rx) = tokio::sync::broadcast::channel::<bool>(1);
289        self.action_req
290            .send(InnerActionSend::ReqSend(data, tx))
291            .expect("broadcast failed");
292
293        match rx.recv().await {
294            Ok(true) => Ok(()),
295            Ok(false) => bail!("broadcast failed"),
296            Err(_) => panic!("broadcast failed"),
297        }
298    }
299
300    pub async fn join_peers(
301        &self,
302        peers: Vec<iroh::NodeId>,
303        max_peers: Option<usize>,
304    ) -> Result<()> {
305        let mut peers = peers;
306        if let Some(max_peers) = max_peers {
307            peers.shuffle(&mut rand::thread_rng());
308            peers.truncate(max_peers);
309        }
310
311        let (tx, mut rx) = tokio::sync::broadcast::channel::<bool>(1);
312        self.action_req
313            .send(InnerActionSend::ReqJoinPeers(peers, tx))
314            .expect("broadcast failed");
315
316        match rx.recv().await {
317            Ok(true) => Ok(()),
318            Ok(false) => bail!("join peers failed"),
319            Err(_) => panic!("broadcast failed"),
320        }
321    }
322}
323
324impl GossipReceiver {
325    pub fn new(gossip_receiver: iroh_gossip::api::GossipReceiver) -> Self {
326        let (gossip_forward_tx, mut gossip_forward_rx) =
327            tokio::sync::broadcast::channel::<iroh_gossip::api::Event>(1024);
328        let (action_req_tx, mut action_req_rx) =
329            tokio::sync::broadcast::channel::<InnerActionRecv>(1024);
330
331        tokio::spawn({
332            async move {
333                while let Ok(_) = gossip_forward_rx.recv().await {}
334            }
335        });
336
337        let self_ref = Self {
338            gossip_event_forwarder: gossip_forward_tx,
339            action_req: action_req_tx,
340            last_message_hashes: vec![],
341        };
342        tokio::spawn({
343            let mut self_ref = self_ref.clone();
344            async move {
345                let mut gossip_receiver = gossip_receiver;
346                loop {
347                    tokio::select! {
348                        Ok(inner_action) = action_req_rx.recv() => {
349                            match inner_action {
350                                InnerActionRecv::ReqNeighbors(tx) => {
351                                    let neighbors = gossip_receiver.neighbors().collect::<HashSet<iroh::NodeId>>();
352                                    tx.send(neighbors).expect("broadcast failed");
353                                },
354                                InnerActionRecv::ReqIsJoined(tx) => {
355                                    let is_joined = gossip_receiver.is_joined();
356                                    tx.send(is_joined).expect("broadcast failed");
357                                }
358                            }
359                        }
360                        gossip_event_res = gossip_receiver.next() => {
361                            if let Some(Ok(gossip_event)) = gossip_event_res {
362                                if let Event::Received(msg) = gossip_event.clone() {
363                                    if let DeliveryScope::Swarm(_) = msg.scope {
364                                        let hash = sha2::Sha512::digest(&msg.content);
365                                        self_ref.last_message_hashes.push(hash[..32].try_into().expect("hashing failed"));
366                                        while self_ref.last_message_hashes.len() > 5 {
367                                            self_ref.last_message_hashes.pop();
368                                        }
369                                    }
370                                }
371                                self_ref.gossip_event_forwarder.send(gossip_event).expect("broadcast failed");
372                            } else {
373                                break;
374                            }
375                        }
376                    }
377                }
378            }
379        });
380
381        self_ref
382    }
383
384    pub async fn neighbors(&mut self) -> HashSet<iroh::NodeId> {
385        let (neighbors_tx, mut neighbors_rx) =
386            tokio::sync::broadcast::channel::<HashSet<iroh::NodeId>>(1);
387        tokio::spawn({
388            let action_req = self.action_req.clone();
389            async move {
390                action_req
391                    .send(InnerActionRecv::ReqNeighbors(neighbors_tx))
392                    .expect("broadcast failed");
393            }
394        });
395
396        match neighbors_rx.recv().await {
397            Ok(neighbors) => neighbors,
398            Err(_) => panic!("broadcast failed"),
399        }
400    }
401
402    pub async fn is_joined(&mut self) -> bool {
403        let (is_joined_tx, mut is_joined_rx) = tokio::sync::broadcast::channel::<bool>(1);
404        self.action_req
405            .send(InnerActionRecv::ReqIsJoined(is_joined_tx))
406            .expect("broadcast failed");
407        match is_joined_rx.recv().await {
408            Ok(is_joined) => is_joined,
409            Err(_) => panic!("broadcast failed"),
410        }
411    }
412
413    pub async fn recv(&mut self) -> Result<Event> {
414        self.gossip_event_forwarder
415            .subscribe()
416            .recv()
417            .await
418            .map_err(|err| anyhow::anyhow!(err))
419    }
420
421    pub fn last_message_hashes(&self) -> Vec<[u8; 32]> {
422        self.last_message_hashes.clone()
423    }
424}
425
426impl TopicId {
427    pub fn new(raw: String) -> Self {
428        let mut raw_hash = sha2::Sha512::new();
429        raw_hash.update(raw.as_bytes());
430
431        Self {
432            _raw: raw,
433            hash: raw_hash.finalize()[..32]
434                .try_into()
435                .expect("hashing 'raw' failed"),
436        }
437    }
438}
439
440// State: new, split, spawn_publisher
441impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
442    pub async fn new(
443        topic_id: TopicId,
444        endpoint: &iroh::Endpoint,
445        node_signing_key: &ed25519_dalek::SigningKey,
446        gossip: iroh_gossip::net::Gossip,
447        initial_secret: &Vec<u8>,
448        secret_rotation_function: Option<R>,
449    ) -> Result<Self> {
450
451        // Create secret_hash
452        let mut initial_secret_hash = sha2::Sha512::new();
453        initial_secret_hash.update(initial_secret);
454        let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
455            .try_into()
456            .expect("hashing failed");
457
458        // Bootstrap to get gossip tx/rx
459        let (gossip_tx, gossip_rx) = Self::bootstrap(
460            topic_id.clone(),
461            endpoint,
462            node_signing_key,
463            gossip,
464            initial_secret_hash,
465            secret_rotation_function.clone(),
466        )
467        .await?;
468
469        // Spawn publisher
470        let _join_handler = Self::spawn_publisher(
471            topic_id.clone(),
472            secret_rotation_function.clone(),
473            initial_secret_hash,
474            endpoint.node_id().clone(),
475            gossip_rx.clone(),
476            gossip_tx.clone(),
477            node_signing_key.clone(),
478        );
479
480        Ok(Self {
481            topic_id,
482            gossip_sender: gossip_tx,
483            gossip_receiver: gossip_rx,
484            initial_secret_hash: initial_secret_hash,
485            secret_rotation_function: secret_rotation_function.unwrap_or_default(),
486            node_id: endpoint.node_id().clone(),
487        })
488    }
489
490    pub fn split(&self) -> (GossipSender, GossipReceiver) {
491        (self.gossip_sender.clone(), self.gossip_receiver.clone())
492    }
493
494    pub fn topic_id(&self) -> &TopicId {
495        &self.topic_id
496    }
497
498    pub fn node_id(&self) -> &iroh::NodeId {
499        &self.node_id
500    }
501
502    pub fn gossip_sender(&self) -> GossipSender {
503        self.gossip_sender.clone()
504    }
505
506    pub fn gossip_receiver(&self) -> GossipReceiver {
507        self.gossip_receiver.clone()
508    }
509
510    pub fn secret_rotation_function(&self) -> R {
511        self.secret_rotation_function.clone()
512    }
513
514    pub fn initial_secret_hash(&self) -> [u8; 32] {
515        self.initial_secret_hash
516    }
517
518    pub fn set_initial_secret_hash(&mut self, initial_secret_hash: [u8; 32]) {
519        self.initial_secret_hash = initial_secret_hash;
520    }
521}
522
523// Procedures: Bootstrap, Publishing, Publisher
524impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
525    pub async fn bootstrap(
526        topic_id: TopicId,
527        endpoint: &iroh::Endpoint,
528        node_signing_key: &ed25519_dalek::SigningKey,
529        gossip: iroh_gossip::net::Gossip,
530        initial_secret_hash: [u8; 32],
531        secret_rotation_function: Option<R>,
532    ) -> Result<(GossipSender, GossipReceiver)> {
533        let gossip_topic: iroh_gossip::api::GossipTopic = gossip
534            .subscribe(iroh_gossip::proto::TopicId::from(topic_id.hash), vec![])
535            .await?;
536        let (gossip_sender, gossip_receiver) = gossip_topic.split();
537        let (gossip_sender, mut gossip_receiver) = (
538            GossipSender::new(gossip_sender),
539            GossipReceiver::new(gossip_receiver),
540        );
541
542        let mut last_published_unix_minute = 0;
543        loop {
544            // Check if we are connected to at least one node
545            if gossip_receiver.is_joined().await {
546                return Ok((gossip_sender, gossip_receiver));
547            }
548
549            // On the first try we check the prev unix minute, after that the current one
550            let unix_minute = crate::unix_minute(if last_published_unix_minute == 0 {
551                -1
552            } else {
553                0
554            });
555
556            // Unique, verified records for the unix minute
557            let records = Topic::get_unix_minute_records(
558                &topic_id.clone(),
559                unix_minute,
560                secret_rotation_function.clone(),
561                initial_secret_hash,
562                &endpoint.node_id(),
563            )
564            .await;
565
566            // If there are no records, invoke the publish_proc (the publishing procedure)
567            // continue the loop after
568            if records.is_empty() {
569                if unix_minute != last_published_unix_minute {
570                    if Self::publish_proc(
571                        unix_minute,
572                        &topic_id,
573                        secret_rotation_function.clone(),
574                        initial_secret_hash,
575                        endpoint.node_id().clone(),
576                        node_signing_key,
577                        HashSet::new(),
578                        vec![],
579                    )
580                    .await
581                    .is_ok()
582                    {
583                        last_published_unix_minute = unix_minute;
584                    }
585                }
586                sleep(Duration::from_millis(100)).await;
587                continue;
588            }
589
590            // We found records
591
592            // Collect node ids from active_peers and record.node_id (of publisher)
593            let bootstrap_nodes = records
594                .iter()
595                .map(|record| {
596                    let mut v = vec![record.node_id];
597                    for peer in record.active_peers {
598                        if peer != [0; 32] {
599                            v.push(peer);
600                        }
601                    }
602                    v
603                })
604                .flatten()
605                .filter_map(|node_id| match iroh::NodeId::from_bytes(&node_id) {
606                    Ok(node_id) => Some(node_id),
607                    Err(_) => None,
608                })
609                .collect::<HashSet<_>>();
610
611            
612
613            // Maybe in the meantime someone connected to us via one of our published records
614            // we don't want to disrup the gossip rotations any more then we have to
615            // so we check again before joining new peers
616            if gossip_receiver.is_joined().await {
617                return Ok((gossip_sender, gossip_receiver));
618            }
619
620            // Instead of throwing everything into join_peers() at once we go node_id by node_id
621            // again to disrupt as little nodes peer neighborhoods as possible.
622            for node_id in bootstrap_nodes.iter() {
623                match gossip_sender.join_peers(vec![node_id.clone()], None).await {
624                    Ok(_) => {
625                        sleep(Duration::from_millis(100)).await;
626                        if gossip_receiver.is_joined().await {
627                            break;
628                        }
629                    }
630                    Err(_) => {
631                        continue;
632                    }
633                }
634            }
635
636            // If we are still not connected to anyone:
637            // give it the default iroh-gossip connection timeout before the final is_joined() check
638            if !gossip_receiver.is_joined().await {
639                sleep(Duration::from_millis(500)).await;
640            }
641
642            // If we are connected: return
643            if gossip_receiver.is_joined().await {
644                return Ok((gossip_sender, gossip_receiver));
645            } else {
646                // If we are not connected: check if we should publish a record this minute
647                if unix_minute != last_published_unix_minute {
648                    if Self::publish_proc(
649                        unix_minute,
650                        &topic_id,
651                        secret_rotation_function.clone(),
652                        initial_secret_hash,
653                        endpoint.node_id().clone(),
654                        node_signing_key,
655                        HashSet::new(),
656                        vec![],
657                    )
658                    .await
659                    .is_ok()
660                    {
661                        last_published_unix_minute = unix_minute;
662                    }
663                }
664                sleep(Duration::from_millis(100)).await;
665                continue;
666            }
667        }
668    }
669
670    // publishing procedure: if more then MAX_BOOTSTRAP_RECORDS are written, don't write.
671    // returns all valid records found from nodes already connected to the iroh-gossip network.
672    async fn publish_proc(
673        unix_minute: u64,
674        topic_id: &TopicId,
675        secret_rotation_function: Option<R>,
676        initial_secret_hash: [u8; 32],
677        node_id: iroh::NodeId,
678        node_signing_key: &ed25519_dalek::SigningKey,
679        neighbors: HashSet<iroh::NodeId>,
680        last_message_hashes: Vec<[u8; 32]>,
681    ) -> Result<HashSet<Record>> {
682        // Get verified records that have active_peers or last_message_hashes set (active participants)
683        let records = Topic::<R>::get_unix_minute_records(
684            &topic_id.clone(),
685            unix_minute,
686            secret_rotation_function.clone(),
687            initial_secret_hash,
688            &node_id,
689        )
690        .await
691        .iter()
692        .filter(|&record| {
693            record
694                .active_peers
695                .iter()
696                .filter(|&peer| peer.eq(&[0u8; 32]))
697                .count()
698                > 0
699                || record
700                    .last_message_hashes
701                    .iter()
702                    .filter(|&hash| hash.eq(&[0u8; 32]))
703                    .count()
704                    > 0
705        })
706        .cloned()
707        .collect::<HashSet<_>>();
708
709        // Don't publish if there are more then MAX_BOOTSTRAP_RECORDS already written
710        // that either have active_peers or last_message_hashes set (active participants)
711        if records.len() >= MAX_BOOTSTRAP_RECORDS {
712            return Ok(records);
713        }
714
715        // Publish own records
716        let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
717        for (i, peer) in neighbors.iter().take(5).enumerate() {
718            active_peers[i] = peer.as_bytes().clone();
719        }
720
721        let mut last_message_hashes_array = [[0u8; 32]; 5];
722        for (i, hash) in last_message_hashes.iter().take(5).enumerate() {
723            last_message_hashes_array[i] = *hash;
724        }
725
726        let record = Record::sign(
727            topic_id.hash,
728            unix_minute,
729            node_id.as_bytes().clone(),
730            active_peers,
731            last_message_hashes_array,
732            &node_signing_key,
733        );
734        Topic::<R>::publish_unix_minute_record(
735            unix_minute,
736            &topic_id.clone(),
737            secret_rotation_function.clone(),
738            initial_secret_hash,
739            record,
740            Some(3),
741        )
742        .await?;
743
744        Ok(records)
745    }
746
747    // Runs after bootstrap to keep anouncing the topic on mainline and help identify and merge network bubbles
748    fn spawn_publisher(
749        topic_id: TopicId,
750        secret_rotation_function: Option<R>,
751        initial_secret_hash: [u8; 32],
752        node_id: iroh::NodeId,
753        gossip_receiver: GossipReceiver,
754        gossip_sender: GossipSender,
755        node_signing_key: ed25519_dalek::SigningKey,
756    ) -> tokio::task::JoinHandle<()> {
757        let mut gossip_receiver = gossip_receiver;
758
759        tokio::spawn(async move {
760            let mut backoff = 1;
761            loop {
762                let unix_minute = crate::unix_minute(0);
763
764                // Run publish_proc() (publishing procedure that is aware of MAX_BOOTSTRAP_RECORDS already written)
765                if let Ok(records) = Topic::<R>::publish_proc(
766                    unix_minute,
767                    &topic_id.clone(),
768                    Some(secret_rotation_function.clone().unwrap_or_default()),
769                    initial_secret_hash,
770                    node_id.clone(),
771                    &node_signing_key,
772                    gossip_receiver.neighbors().await,
773                    gossip_receiver.last_message_hashes(),
774                )
775                .await
776                {
777                    // Cluster size as bubble indicator
778                    let neighbors = gossip_receiver.neighbors().await;
779                    if neighbors.len() < 4 && !records.is_empty() {
780                        let node_ids = records
781                            .iter()
782                            .map(|record| {
783                                record
784                                    .active_peers
785                                    .iter()
786                                    .filter_map(|&active_peer| {
787                                        if active_peer == [0; 32]
788                                            || neighbors.contains(&active_peer)
789                                            || active_peer.eq(record.node_id.to_vec().as_slice())
790                                            || active_peer.eq(node_id.as_bytes())
791                                        {
792                                            None
793                                        } else if let Ok(node_id) =
794                                            iroh::NodeId::from_bytes(&active_peer)
795                                        {
796                                            Some(node_id)
797                                        } else {
798                                            None
799                                        }
800                                    })
801                                    .collect::<Vec<_>>()
802                            })
803                            .flatten()
804                            .collect::<HashSet<_>>();
805                        if gossip_sender
806                            .join_peers(
807                                node_ids.iter().cloned().collect::<Vec<_>>(),
808                                Some(MAX_JOIN_PEERS_COUNT),
809                            )
810                            .await
811                            .is_ok()
812                        {
813                            //println!("group-merger -> joined peer {}", node_id);
814                        }
815                    }
816
817                    // Message overlap indicator
818                    if gossip_receiver.last_message_hashes().len() >= 1 {
819                        let peers_to_join = records
820                            .iter()
821                            .filter_map(|record| {
822                                if !record.last_message_hashes.iter().all(|last_message_hash| {
823                                    *last_message_hash != [0; 32]
824                                        && gossip_receiver
825                                            .last_message_hashes()
826                                            .contains(&last_message_hash)
827                                }) {
828                                    Some(record)
829                                } else {
830                                    None
831                                }
832                            })
833                            .collect::<Vec<_>>();
834                        if !peers_to_join.is_empty() {
835                            let node_ids = peers_to_join
836                                .iter()
837                                .filter_map(|&record| {
838                                    let mut peers = vec![];
839                                    if let Ok(node_id) = iroh::NodeId::from_bytes(&record.node_id) {
840                                        peers.push(node_id);
841                                    }
842                                    for active_peer in record.active_peers {
843                                        if active_peer == [0; 32] {
844                                            continue;
845                                        }
846                                        if let Ok(node_id) = iroh::NodeId::from_bytes(&active_peer)
847                                        {
848                                            peers.push(node_id);
849                                        }
850                                    }
851                                    Some(peers)
852                                })
853                                .flatten()
854                                .collect::<HashSet<_>>();
855
856                            if gossip_sender
857                                .join_peers(
858                                    node_ids.iter().cloned().collect::<Vec<_>>(),
859                                    Some(MAX_JOIN_PEERS_COUNT),
860                                )
861                                .await
862                                .is_ok()
863                            {
864                                /*println!(
865                                    "bouble detected: no-message-overlap -> joined {} peers",
866                                    node_ids.len()
867                                );*/
868                            }
869                        }
870                    }
871                } else {
872                    sleep(Duration::from_secs(backoff)).await;
873                    backoff = (backoff * 2).max(60);
874                    continue;
875                }
876                
877                backoff = 1;
878                sleep(Duration::from_secs(rand::random::<u64>() % 60)).await;
879            }
880        })
881    }
882}
883
884// Basic building blocks
885impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
886    fn signing_keypair(topic_id: &TopicId, unix_minute: u64) -> ed25519_dalek::SigningKey {
887        let mut sign_keypair_hash = sha2::Sha512::new();
888        sign_keypair_hash.update(topic_id.hash);
889        sign_keypair_hash.update(unix_minute.to_le_bytes());
890        let sign_keypair_seed: [u8; 32] = sign_keypair_hash.finalize()[..32]
891            .try_into()
892            .expect("hashing failed");
893        ed25519_dalek::SigningKey::from_bytes(&sign_keypair_seed)
894    }
895
896    fn encryption_keypair(
897        topic_id: &TopicId,
898        secret_rotation_function: &R,
899        initial_secret_hash: [u8; 32],
900        unix_minute: u64,
901    ) -> ed25519_dalek::SigningKey {
902        let enc_keypair_seed = secret_rotation_function.get_unix_minute_secret(
903            topic_id.hash,
904            unix_minute,
905            initial_secret_hash,
906        );
907        ed25519_dalek::SigningKey::from_bytes(&enc_keypair_seed)
908    }
909
910    // salt = hash (topic + unix_minute)
911    fn salt(topic_id: &TopicId, unix_minute: u64) -> [u8; 32] {
912        let mut slot_hash = sha2::Sha512::new();
913        slot_hash.update(topic_id.hash);
914        slot_hash.update(unix_minute.to_le_bytes());
915        slot_hash.finalize()[..32]
916            .try_into()
917            .expect("hashing failed")
918    }
919
920    async fn get_unix_minute_records(
921        topic_id: &TopicId,
922        unix_minute: u64,
923        secret_rotation_function: Option<R>,
924        initial_secret_hash: [u8; 32],
925        node_id: &iroh::NodeId,
926    ) -> HashSet<Record> {
927        let topic_sign = Topic::<R>::signing_keypair(&topic_id, unix_minute);
928        let encryption_key = Topic::<R>::encryption_keypair(
929            &topic_id,
930            &secret_rotation_function.clone().unwrap_or_default(),
931            initial_secret_hash,
932            unix_minute,
933        );
934        let salt = Topic::<R>::salt(&topic_id, unix_minute);
935
936        // Get records, decrypt and verify
937        let dht = get_dht();
938
939        let records_iter = match timeout(
940            Duration::from_secs(10),
941            dht.get_mutable(topic_sign.verifying_key().as_bytes(), Some(&salt), None)
942                .collect::<Vec<_>>(),
943        )
944        .await
945        {
946            Ok(records) => records,
947            Err(_) => vec![],
948        };
949
950        let records = records_iter
951            .iter()
952            .filter_map(|record| {
953                match EncryptedRecord::from_bytes(record.value().to_vec()) {
954                    Ok(encrypted_record) => match encrypted_record.decrypt(&encryption_key) {
955                        Ok(record) => match record.verify(&topic_id.hash, unix_minute) {
956                            Ok(_) => match record.node_id.eq(node_id.as_bytes()) {
957                                true => {
958                                    None
959                                }
960                                false => Some(record),
961                            },
962                            Err(_) => None,
963                        },
964                        Err(_) => None,
965                    },
966                    Err(_) => None,
967                }
968            })
969            .collect::<HashSet<_>>();
970        records
971    }
972
973    async fn publish_unix_minute_record(
974        unix_minute: u64,
975        topic_id: &TopicId,
976        secret_rotation_function: Option<R>,
977        initial_secret_hash: [u8; 32],
978        record: Record,
979        retry_count: Option<usize>,
980    ) -> Result<()> {
981        let sign_key = Topic::<R>::signing_keypair(&topic_id.clone(), unix_minute);
982        let salt = Topic::<R>::salt(&topic_id, unix_minute);
983        let encryption_key = Topic::<R>::encryption_keypair(
984            &topic_id.clone(),
985            &secret_rotation_function.clone().unwrap_or_default(),
986            initial_secret_hash,
987            unix_minute,
988        );
989        let encrypted_record = record.encrypt(&encryption_key);
990
991        for i in 0..retry_count.unwrap_or(3) {
992            
993            let dht = get_dht();
994
995            let most_recent_result = match timeout(
996                Duration::from_secs(10),
997                dht.get_mutable_most_recent(
998                    sign_key.clone().verifying_key().as_bytes(),
999                    Some(&salt),
1000                ),
1001            )
1002            .await
1003            {
1004                Ok(result) => result,
1005                Err(_) => None,
1006            };
1007
1008            let item = if let Some(mut_item) = most_recent_result {
1009                MutableItem::new(
1010                    sign_key.clone(),
1011                    &encrypted_record.to_bytes(),
1012                    mut_item.seq() + 1,
1013                    Some(&salt),
1014                )
1015            } else {
1016                MutableItem::new(
1017                    sign_key.clone(),
1018                    &encrypted_record.to_bytes(),
1019                    0,
1020                    Some(&salt),
1021                )
1022            };
1023
1024            let put_result = match timeout(
1025                Duration::from_secs(10),
1026                dht.put_mutable(item.clone(), Some(item.seq())),
1027            )
1028            .await
1029            {
1030                Ok(result) => result.ok(),
1031                Err(_) => None
1032            };
1033
1034            if put_result.is_some() {
1035                break;
1036            } else if i == retry_count.unwrap_or(3) - 1 {
1037                bail!("failed to publish record")
1038            }
1039
1040            reset_dht().await;
1041
1042            sleep(Duration::from_millis(rand::random::<u64>() % 2000)).await;
1043        }
1044        Ok(())
1045    }
1046}
1047
1048pub trait AutoDiscoveryBuilder {
1049    #[allow(async_fn_in_trait)]
1050    async fn spawn_with_auto_discovery<R: SecretRotation + Default + Clone + Send + 'static>(
1051        self,
1052        endpoint: Endpoint,
1053        secret_rotation_function: Option<R>,
1054    ) -> Result<Gossip<R>>;
1055}
1056
1057impl AutoDiscoveryBuilder for iroh_gossip::net::Builder {
1058    async fn spawn_with_auto_discovery<R: SecretRotation + Default + Clone + Send + 'static>(
1059        self,
1060        endpoint: Endpoint,
1061        secret_rotation_function: Option<R>,
1062    ) -> Result<Gossip<R>> {
1063        Ok(Gossip {
1064            gossip: self.spawn(endpoint.clone()),
1065            endpoint: endpoint.clone(),
1066            secret_rotation_function: secret_rotation_function.unwrap_or_default(),
1067        })
1068    }
1069}
1070
1071pub trait AutoDiscoveryGossip<R: SecretRotation + Default + Clone + Send + 'static> {
1072    #[allow(async_fn_in_trait)]
1073    async fn subscribe_and_join_with_auto_discovery(
1074        &self,
1075        topic_id: TopicId,
1076        initial_secret: &Vec<u8>,
1077    ) -> Result<Topic<R>>;
1078}
1079
1080// Default secret rotation function
1081#[derive(Debug, Clone, Copy, Default)]
1082pub struct DefaultSecretRotation;
1083
1084pub trait SecretRotation {
1085    fn get_unix_minute_secret(
1086        &self,
1087        topic_hash: [u8; 32],
1088        unix_minute: u64,
1089        initial_secret_hash: [u8; 32],
1090    ) -> [u8; 32];
1091}
1092
1093impl<R: SecretRotation + Default + Clone + Send + 'static> AutoDiscoveryGossip<R> for Gossip<R> {
1094    async fn subscribe_and_join_with_auto_discovery(
1095        &self,
1096        topic_id: TopicId,
1097        initial_secret: &Vec<u8>,
1098    ) -> Result<Topic<R>> {
1099        Topic::new(
1100            topic_id,
1101            &self.endpoint,
1102            self.endpoint.secret_key().secret(),
1103            self.gossip.clone(),
1104            initial_secret,
1105            Some(self.secret_rotation_function.clone()),
1106        )
1107        .await
1108    }
1109}
1110
1111impl SecretRotation for DefaultSecretRotation {
1112    fn get_unix_minute_secret(
1113        &self,
1114        topic_hash: [u8; 32],
1115        unix_minute: u64,
1116        initial_secret_hash: [u8; 32],
1117    ) -> [u8; 32] {
1118        let mut hash = sha2::Sha512::new();
1119        hash.update(topic_hash);
1120        hash.update(unix_minute.to_be_bytes());
1121        hash.update(initial_secret_hash);
1122        hash.finalize()[..32].try_into().expect("hashing failed")
1123    }
1124}
1125
1126pub fn unix_minute(minute_offset: i64) -> u64 {
1127    ((chrono::Utc::now().timestamp() as f64 / 60.0f64).floor() as i64 + minute_offset) as u64
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132    use super::*;
1133    use ed25519_dalek::SigningKey;
1134    use rand::rngs::OsRng;
1135
1136    #[test]
1137    fn test_topic_id_creation() {
1138        let topic_id = TopicId::new("test-topic".to_string());
1139        assert_eq!(topic_id._raw, "test-topic");
1140        assert_eq!(topic_id.hash.len(), 32);
1141
1142        // Same input should produce same hash
1143        let topic_id2 = TopicId::new("test-topic".to_string());
1144        assert_eq!(topic_id.hash, topic_id2.hash);
1145
1146        // Different input should produce different hash
1147        let topic_id3 = TopicId::new("different-topic".to_string());
1148        assert_ne!(topic_id.hash, topic_id3.hash);
1149    }
1150
1151    #[test]
1152    fn test_record_serialization_roundtrip() {
1153        let signing_key = SigningKey::generate(&mut OsRng);
1154        let topic = [1u8; 32];
1155        let unix_minute = 12345u64;
1156        let node_id = [2u8; 32];
1157        let active_peers = [[3u8; 32]; 5];
1158        let last_message_hashes = [[4u8; 32]; 5];
1159
1160        let record = Record::sign(
1161            topic,
1162            unix_minute,
1163            node_id,
1164            active_peers,
1165            last_message_hashes,
1166            &signing_key,
1167        );
1168
1169        // Test serialization roundtrip
1170        let bytes = record.to_bytes();
1171        let deserialized = Record::from_bytes(bytes).unwrap();
1172
1173        assert_eq!(record.topic, deserialized.topic);
1174        assert_eq!(record.unix_minute, deserialized.unix_minute);
1175        assert_eq!(record.node_id, deserialized.node_id);
1176        assert_eq!(record.active_peers, deserialized.active_peers);
1177        assert_eq!(record.last_message_hashes, deserialized.last_message_hashes);
1178        assert_eq!(record.signature, deserialized.signature);
1179    }
1180
1181    #[test]
1182    fn test_record_verification() {
1183        let signing_key = SigningKey::generate(&mut OsRng);
1184        let topic = [1u8; 32];
1185        let unix_minute = 12345u64;
1186        let node_id = signing_key.verifying_key().to_bytes();
1187        let active_peers = [[3u8; 32]; 5];
1188        let last_message_hashes = [[4u8; 32]; 5];
1189
1190        let record = Record::sign(
1191            topic,
1192            unix_minute,
1193            node_id,
1194            active_peers,
1195            last_message_hashes,
1196            &signing_key,
1197        );
1198
1199        // Valid verification should pass
1200        assert!(record.verify(&topic, unix_minute).is_ok());
1201
1202        // Wrong topic should fail
1203        let wrong_topic = [99u8; 32];
1204        assert!(record.verify(&wrong_topic, unix_minute).is_err());
1205
1206        // Wrong unix_minute should fail
1207        assert!(record.verify(&topic, unix_minute + 1).is_err());
1208    }
1209
1210    #[test]
1211    fn test_encrypted_record_roundtrip() {
1212        let signing_key = SigningKey::generate(&mut OsRng);
1213        let encryption_key = SigningKey::generate(&mut OsRng);
1214        let topic = [1u8; 32];
1215        let unix_minute = 12345u64;
1216        let node_id = signing_key.verifying_key().to_bytes();
1217        let active_peers = [[3u8; 32]; 5];
1218        let last_message_hashes = [[4u8; 32]; 5];
1219
1220        let record = Record::sign(
1221            topic,
1222            unix_minute,
1223            node_id,
1224            active_peers,
1225            last_message_hashes,
1226            &signing_key,
1227        );
1228
1229        // Test encryption/decryption roundtrip
1230        let encrypted = record.encrypt(&encryption_key);
1231        let decrypted = encrypted.decrypt(&encryption_key).unwrap();
1232
1233        assert_eq!(record.topic, decrypted.topic);
1234        assert_eq!(record.unix_minute, decrypted.unix_minute);
1235        assert_eq!(record.node_id, decrypted.node_id);
1236        assert_eq!(record.active_peers, decrypted.active_peers);
1237        assert_eq!(record.last_message_hashes, decrypted.last_message_hashes);
1238        assert_eq!(record.signature, decrypted.signature);
1239    }
1240
1241    #[test]
1242    fn test_encrypted_record_serialization() {
1243        let signing_key = SigningKey::generate(&mut OsRng);
1244        let encryption_key = SigningKey::generate(&mut OsRng);
1245        let topic = [1u8; 32];
1246        let unix_minute = 12345u64;
1247        let node_id = signing_key.verifying_key().to_bytes();
1248        let active_peers = [[3u8; 32]; 5];
1249        let last_message_hashes = [[4u8; 32]; 5];
1250
1251        let record = Record::sign(
1252            topic,
1253            unix_minute,
1254            node_id,
1255            active_peers,
1256            last_message_hashes,
1257            &signing_key,
1258        );
1259
1260        let encrypted = record.encrypt(&encryption_key);
1261
1262        // Test serialization roundtrip
1263        let bytes = encrypted.to_bytes();
1264        let deserialized = EncryptedRecord::from_bytes(bytes).unwrap();
1265
1266        // Should be able to decrypt the deserialized version
1267        let decrypted = deserialized.decrypt(&encryption_key).unwrap();
1268        assert_eq!(record.topic, decrypted.topic);
1269        assert_eq!(record.unix_minute, decrypted.unix_minute);
1270    }
1271
1272    #[test]
1273    fn test_default_secret_rotation() {
1274        let rotation = DefaultSecretRotation;
1275        let topic_hash = [1u8; 32];
1276        let unix_minute = 12345u64;
1277        let initial_secret_hash = [2u8; 32];
1278
1279        let secret1 = rotation.get_unix_minute_secret(topic_hash, unix_minute, initial_secret_hash);
1280        let secret2 = rotation.get_unix_minute_secret(topic_hash, unix_minute, initial_secret_hash);
1281
1282        // Same inputs should produce same secret
1283        assert_eq!(secret1, secret2);
1284
1285        // Different unix_minute should produce different secret
1286        let secret3 = rotation.get_unix_minute_secret(topic_hash, unix_minute + 1, initial_secret_hash);
1287        assert_ne!(secret1, secret3);
1288
1289        // Different topic should produce different secret
1290        let different_topic = [99u8; 32];
1291        let secret4 = rotation.get_unix_minute_secret(different_topic, unix_minute, initial_secret_hash);
1292        assert_ne!(secret1, secret4);
1293    }
1294
1295    #[test]
1296    fn test_unix_minute_function() {
1297        let current = unix_minute(0);
1298        let prev = unix_minute(-1);
1299        let next = unix_minute(1);
1300
1301        assert_eq!(current, prev + 1);
1302        assert_eq!(next, current + 1);
1303
1304        // Should be deterministic
1305        let current2 = unix_minute(0);
1306        assert_eq!(current, current2);
1307    }
1308
1309    #[test]
1310    fn test_topic_signing_keypair_deterministic() {
1311        let topic_id = TopicId::new("test-topic".to_string());
1312        let unix_minute = 12345u64;
1313
1314        let key1 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute);
1315        let key2 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute);
1316
1317        // Same inputs should produce same keypair
1318        assert_eq!(key1.to_bytes(), key2.to_bytes());
1319
1320        // Different unix_minute should produce different keypair
1321        let key3 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute + 1);
1322        assert_ne!(key1.to_bytes(), key3.to_bytes());
1323    }
1324
1325    #[test]
1326    fn test_topic_encryption_keypair_deterministic() {
1327        let topic_id = TopicId::new("test-topic".to_string());
1328        let unix_minute = 12345u64;
1329        let initial_secret_hash = [1u8; 32];
1330        let rotation = DefaultSecretRotation;
1331
1332        let key1 = Topic::<DefaultSecretRotation>::encryption_keypair(&topic_id, &rotation, initial_secret_hash, unix_minute);
1333        let key2 = Topic::<DefaultSecretRotation>::encryption_keypair(&topic_id, &rotation, initial_secret_hash, unix_minute);
1334
1335        // Same inputs should produce same keypair
1336        assert_eq!(key1.to_bytes(), key2.to_bytes());
1337
1338        // Different unix_minute should produce different keypair
1339        let key3 = Topic::<DefaultSecretRotation>::encryption_keypair(&topic_id, &rotation, initial_secret_hash, unix_minute + 1);
1340        assert_ne!(key1.to_bytes(), key3.to_bytes());
1341    }
1342
1343    #[test]
1344    fn test_topic_salt_deterministic() {
1345        let topic_id = TopicId::new("test-topic".to_string());
1346        let unix_minute = 12345u64;
1347
1348        let salt1 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute);
1349        let salt2 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute);
1350
1351        // Same inputs should produce same salt
1352        assert_eq!(salt1, salt2);
1353
1354        // Different unix_minute should produce different salt
1355        let salt3 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute + 1);
1356        assert_ne!(salt1, salt3);
1357    }
1358}