distributed_topic_tracker/
lib.rs

1use std::{collections::HashSet, sync::Arc, time::Duration};
2
3use anyhow::{Result, bail};
4use arc_swap::ArcSwap;
5use ed25519_dalek::ed25519::signature::SignerMut;
6use futures::StreamExt as _;
7use iroh::Endpoint;
8use iroh_gossip::{api::Event, proto::DeliveryScope};
9use mainline::{MutableItem, async_dht::AsyncDht};
10use once_cell::sync::Lazy;
11use rand::seq::SliceRandom;
12use sha2::Digest;
13
14use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
15use tokio::time::{sleep, timeout};
16
17pub const MAX_JOIN_PEERS_COUNT: usize = 30;
18pub const MAX_BOOTSTRAP_RECORDS: usize = 10;
19pub const SECRET_ROTATION: DefaultSecretRotation = DefaultSecretRotation;
20
21static DHT: Lazy<ArcSwap<mainline::async_dht::AsyncDht>> = Lazy::new(|| {
22    ArcSwap::from_pointee(
23        mainline::Dht::builder()
24            .build()
25            .expect("failed to create dht")
26            .as_async(),
27    )
28});
29
30fn get_dht() -> Arc<AsyncDht> {
31    DHT.load_full()
32}
33
34async fn reset_dht() {
35    let n_dht = mainline::Dht::builder()
36        .build()
37        .expect("failed to create dht");
38    DHT.store(Arc::new(n_dht.as_async()));
39}
40
41#[derive(Debug, Clone)]
42pub struct EncryptedRecord {
43    encrypted_record: Vec<u8>,
44    encrypted_decryption_key: Vec<u8>,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Hash)]
48pub struct Record {
49    topic: [u8; 32],
50    unix_minute: u64,
51    node_id: [u8; 32],
52    active_peers: [[u8; 32]; 5],
53    last_message_hashes: [[u8; 32]; 5],
54    signature: [u8; 64],
55}
56
57pub struct Gossip<R: SecretRotation + Default + Clone + Send + 'static> {
58    pub gossip: iroh_gossip::net::Gossip,
59    endpoint: iroh::Endpoint,
60    secret_rotation_function: R,
61}
62
63#[derive(Debug)]
64pub struct Topic<R: SecretRotation + Default + Clone + Send + 'static> {
65    topic_id: TopicId,
66    gossip_sender: GossipSender,
67    gossip_receiver: GossipReceiver,
68    _gossip: iroh_gossip::net::Gossip,
69    initial_secret_hash: [u8; 32],
70    secret_rotation_function: R,
71    node_id: iroh::NodeId,
72}
73
74#[derive(Debug, Clone)]
75pub struct TopicId {
76    _raw: String,
77    hash: [u8; 32], // sha512( raw )[..32]
78}
79
80#[derive(Debug)]
81pub struct GossipReceiver {
82    gossip_event_forwarder: tokio::sync::broadcast::Sender<iroh_gossip::api::Event>,
83    action_req: tokio::sync::mpsc::Sender<InnerActionRecv>,
84    last_message_hashes: Vec<[u8; 32]>,
85    _keep_alive_rx: tokio::sync::broadcast::Receiver<iroh_gossip::api::Event>,
86    _gossip: iroh_gossip::net::Gossip,
87}
88
89impl Clone for GossipReceiver {
90    fn clone(&self) -> Self {
91        Self {
92            gossip_event_forwarder: self.gossip_event_forwarder.clone(),
93            action_req: self.action_req.clone(),
94            last_message_hashes: self.last_message_hashes.clone(),
95            _keep_alive_rx: self.gossip_event_forwarder.subscribe(),
96            _gossip: self._gossip.clone(),
97        }
98    }
99}
100
101#[derive(Debug)]
102pub struct GossipSender {
103    action_req: tokio::sync::mpsc::Sender<InnerActionSend>,
104    _gossip: iroh_gossip::net::Gossip,
105}
106
107impl Clone for GossipSender {
108    fn clone(&self) -> Self {
109        Self {
110            action_req: self.action_req.clone(),
111            _gossip: self._gossip.clone(),
112        }
113    }
114}
115
116
117#[derive(Debug)]
118enum InnerActionRecv {
119    ReqNeighbors(tokio::sync::oneshot::Sender<HashSet<iroh::NodeId>>),
120    ReqIsJoined(tokio::sync::oneshot::Sender<bool>),
121}
122
123#[derive(Debug)]
124enum InnerActionSend {
125    ReqSend(Vec<u8>, tokio::sync::oneshot::Sender<bool>),
126    ReqJoinPeers(Vec<iroh::NodeId>, tokio::sync::oneshot::Sender<bool>),
127}
128
129impl EncryptedRecord {
130    pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
131        let one_time_key_bytes: [u8; 32] = decryption_key
132            .decrypt(&self.encrypted_decryption_key)?
133            .as_slice()
134            .try_into()?;
135        let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
136
137        let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
138        let record = Record::from_bytes(decrypted_record)?;
139        Ok(record)
140    }
141
142    pub fn to_bytes(&self) -> Vec<u8> {
143        let mut buf = Vec::new();
144        let encrypted_record_len = self.encrypted_record.len() as u32;
145        buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
146        buf.extend_from_slice(&self.encrypted_record);
147        buf.extend_from_slice(&self.encrypted_decryption_key);
148        buf
149    }
150
151    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
152        let (encrypted_record_len, buf) = buf.split_at(4);
153        let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
154        let (encrypted_record, encrypted_decryption_key) =
155            buf.split_at(encrypted_record_len as usize);
156
157        Ok(Self {
158            encrypted_record: encrypted_record.to_vec(),
159            encrypted_decryption_key: encrypted_decryption_key.to_vec(),
160        })
161    }
162}
163
164impl Record {
165    pub fn sign(
166        topic: [u8; 32],
167        unix_minute: u64,
168        node_id: [u8; 32],
169        active_peers: [[u8; 32]; 5],
170        last_message_hashes: [[u8; 32]; 5],
171        signing_key: &ed25519_dalek::SigningKey,
172    ) -> Self {
173        let mut signature_data = Vec::new();
174        signature_data.extend_from_slice(&topic);
175        signature_data.extend_from_slice(&unix_minute.to_le_bytes());
176        signature_data.extend_from_slice(&node_id);
177        for active_peer in active_peers {
178            signature_data.extend_from_slice(&active_peer);
179        }
180        for last_message_hash in last_message_hashes {
181            signature_data.extend_from_slice(&last_message_hash);
182        }
183        let mut signing_key = signing_key.clone();
184        let signature = signing_key.sign(&signature_data);
185        Self {
186            topic,
187            unix_minute,
188            node_id,
189            active_peers,
190            last_message_hashes,
191            signature: signature.to_bytes(),
192        }
193    }
194
195    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
196        let (topic, buf) = buf.split_at(32);
197        let (unix_minute, buf) = buf.split_at(8);
198        let (node_id, mut buf) = buf.split_at(32);
199
200        let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
201        #[allow(clippy::needless_range_loop)]
202        for i in 0..active_peers.len() {
203            let (active_peer, _buf) = buf.split_at(32);
204            active_peers[i] = active_peer.try_into()?;
205            buf = _buf;
206        }
207        let mut last_message_hashes: [[u8; 32]; 5] = [[0; 32]; 5];
208        #[allow(clippy::needless_range_loop)]
209        for i in 0..last_message_hashes.len() {
210            let (last_message_hash, _buf) = buf.split_at(32);
211            last_message_hashes[i] = last_message_hash.try_into()?;
212            buf = _buf;
213        }
214
215        let (signature, buf) = buf.split_at(64);
216
217        if !buf.is_empty() {
218            bail!("buffer not empty after reconstruction")
219        }
220
221        Ok(Self {
222            topic: topic.try_into()?,
223            unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
224            node_id: node_id.try_into()?,
225            active_peers,
226            last_message_hashes,
227            signature: signature.try_into()?,
228        })
229    }
230
231    pub fn to_bytes(&self) -> Vec<u8> {
232        let mut buf = Vec::new();
233        buf.extend_from_slice(&self.topic);
234        buf.extend_from_slice(&self.unix_minute.to_le_bytes());
235        buf.extend_from_slice(&self.node_id);
236        for active_peer in self.active_peers {
237            buf.extend_from_slice(&active_peer);
238        }
239        for last_message_hash in self.last_message_hashes {
240            buf.extend_from_slice(&last_message_hash);
241        }
242        buf.extend_from_slice(&self.signature);
243        buf
244    }
245
246    pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
247        if self.topic != *actual_topic {
248            bail!("topic mismatch")
249        }
250        if self.unix_minute != actual_unix_minute {
251            bail!("unix minute mismatch")
252        }
253
254        let record_bytes = self.to_bytes();
255        let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
256        let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
257        let node_id = ed25519_dalek::VerifyingKey::from_bytes(&self.node_id)?;
258
259        node_id.verify_strict(signature_data.as_slice(), &signature)?;
260
261        Ok(())
262    }
263
264    pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
265        let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::thread_rng());
266        let p_key = one_time_key.verifying_key();
267        let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
268        let key_enc = encryption_key
269            .verifying_key()
270            .encrypt(&one_time_key.to_bytes())
271            .expect("encryption failed");
272
273        EncryptedRecord {
274            encrypted_record: data_enc,
275            encrypted_decryption_key: key_enc,
276        }
277    }
278}
279
280impl GossipSender {
281    pub fn new(gossip_sender: iroh_gossip::api::GossipSender, gossip: iroh_gossip::net::Gossip) -> Self {
282        let (action_req_tx, mut action_req_rx) =
283            tokio::sync::mpsc::channel::<InnerActionSend>(1024);
284
285        tokio::spawn({
286            let gossip_sender = gossip_sender;
287            async move {
288                while let Some(inner_action) = action_req_rx.recv().await {
289                    match inner_action {
290                        InnerActionSend::ReqSend(data, tx) => {
291                            let res = gossip_sender.broadcast(data.into()).await;
292                            tx.send(res.is_ok()).expect("broadcast failed");
293                        }
294                        InnerActionSend::ReqJoinPeers(peers, tx) => {
295                            let res = gossip_sender.join_peers(peers).await;                     
296                            tx.send(res.is_ok()).expect("broadcast failed");
297                        }
298                    }
299                }
300            }
301        });
302
303        Self {
304            action_req: action_req_tx,
305            _gossip: gossip,
306        }
307    }
308
309    pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
310        let (tx, rx) = tokio::sync::oneshot::channel::<bool>();
311        let _ = self
312            .action_req
313            .send(InnerActionSend::ReqSend(data, tx))
314            .await;
315
316        match rx.await {
317            Ok(true) => Ok(()),
318            Ok(false) => bail!("broadcast failed"),
319            Err(_) => panic!("broadcast failed"),
320        }
321    }
322
323    pub async fn join_peers(
324        &self,
325        peers: Vec<iroh::NodeId>,
326        max_peers: Option<usize>,
327    ) -> Result<()> {
328        let mut peers = peers;
329        if let Some(max_peers) = max_peers {
330            peers.shuffle(&mut rand::thread_rng());
331            peers.truncate(max_peers);
332        }
333
334        let (tx, rx) = tokio::sync::oneshot::channel::<bool>();
335        let _ = self
336            .action_req
337            .send(InnerActionSend::ReqJoinPeers(peers, tx))
338            .await;
339
340        match rx.await {
341            Ok(true) => Ok(()),
342            Ok(false) => bail!("join peers failed"),
343            Err(_) => panic!("broadcast failed"),
344        }
345    }
346}
347
348impl GossipReceiver {
349    pub fn new(gossip_receiver: iroh_gossip::api::GossipReceiver, gossip: iroh_gossip::net::Gossip) -> Self {
350        let (gossip_forward_tx, _) =
351            tokio::sync::broadcast::channel::<iroh_gossip::api::Event>(1024);
352        let (action_req_tx, mut action_req_rx) =
353            tokio::sync::mpsc::channel::<InnerActionRecv>(1024);
354
355        let keep_alive_rx = gossip_forward_tx.subscribe();
356
357        let self_ref = Self {
358            gossip_event_forwarder: gossip_forward_tx.clone(),
359            action_req: action_req_tx.clone(),
360            last_message_hashes: vec![],
361            _keep_alive_rx: keep_alive_rx,
362            _gossip: gossip,
363        };
364
365        tokio::spawn({
366            let mut self_ref = self_ref.clone();
367            async move {
368                let mut gossip_receiver = gossip_receiver;
369                loop {
370                    tokio::select! {
371                        Some(inner_action) = action_req_rx.recv() => {
372                            match inner_action {
373                                InnerActionRecv::ReqNeighbors(tx) => {
374                                    let neighbors = gossip_receiver.neighbors().collect::<HashSet<iroh::NodeId>>();
375                                    let _ = tx.send(neighbors);
376                                },
377                                InnerActionRecv::ReqIsJoined(tx) => {
378                                    let is_joined = gossip_receiver.is_joined();
379                                    let _ = tx.send(is_joined);
380                                }
381                            }
382                        }
383                        gossip_event_res = gossip_receiver.next() => {
384                            if let Some(Ok(gossip_event)) = gossip_event_res {
385                                if let Event::Received(msg) = gossip_event.clone() {
386                                    if let DeliveryScope::Swarm(_) = msg.scope {
387                                        let hash = sha2::Sha512::digest(&msg.content);
388                                        self_ref.last_message_hashes.push(hash[..32].try_into().expect("hashing failed"));
389                                        while self_ref.last_message_hashes.len() > 5 {
390                                            self_ref.last_message_hashes.pop();
391                                        }
392                                    }
393                                }
394                                let _ = self_ref.gossip_event_forwarder.send(gossip_event.clone());
395                            } else {
396                                break;
397                            }
398                        }
399                    }
400                }
401            }
402        });
403
404        self_ref
405    }
406
407    pub async fn neighbors(&mut self) -> Result<HashSet<iroh::NodeId>> {
408        let (neighbors_tx, neighbors_rx) = tokio::sync::oneshot::channel::<HashSet<iroh::NodeId>>();
409
410        let _ = self
411            .action_req
412            .send(InnerActionRecv::ReqNeighbors(neighbors_tx))
413            .await;
414
415        neighbors_rx.await.map_err(|err| anyhow::anyhow!(err))
416    }
417
418    pub async fn is_joined(&mut self) -> Result<bool> {
419        let (is_joined_tx, is_joined_rx) = tokio::sync::oneshot::channel::<bool>();
420
421        let _ = self
422            .action_req
423            .send(InnerActionRecv::ReqIsJoined(is_joined_tx))
424            .await;
425
426        is_joined_rx.await.map_err(|err| anyhow::anyhow!(err))
427    }
428
429    pub async fn subscribe(&mut self) -> Result<tokio::sync::broadcast::Receiver<Event>> {
430        Ok(self.gossip_event_forwarder.subscribe())
431    }
432
433    pub fn last_message_hashes(&self) -> Vec<[u8; 32]> {
434        self.last_message_hashes.clone()
435    }
436}
437
438impl TopicId {
439    pub fn new(raw: String) -> Self {
440        let mut raw_hash = sha2::Sha512::new();
441        raw_hash.update(raw.as_bytes());
442
443        Self {
444            _raw: raw,
445            hash: raw_hash.finalize()[..32]
446                .try_into()
447                .expect("hashing 'raw' failed"),
448        }
449    }
450}
451
452// State: new, split, spawn_publisher
453impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
454    pub async fn new(
455        topic_id: TopicId,
456        endpoint: &iroh::Endpoint,
457        node_signing_key: &ed25519_dalek::SigningKey,
458        gossip: iroh_gossip::net::Gossip,
459        initial_secret: &Vec<u8>,
460        secret_rotation_function: Option<R>,
461        async_bootstrap: bool,
462    ) -> Result<Self> {
463        // Create secret_hash
464        let mut initial_secret_hash = sha2::Sha512::new();
465        initial_secret_hash.update(initial_secret);
466        let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
467            .try_into()
468            .expect("hashing failed");
469
470        // Bootstrap to get gossip tx/rx
471        let (gossip_tx, gossip_rx) = if async_bootstrap {
472            Self::bootstrap_no_wait(
473                topic_id.clone(),
474                endpoint,
475                node_signing_key,
476                gossip.clone(),
477                initial_secret_hash,
478                secret_rotation_function.clone(),
479            )
480            .await?
481        } else {
482            Self::bootstrap(
483                topic_id.clone(),
484                endpoint,
485                node_signing_key,
486                gossip.clone(),
487                initial_secret_hash,
488                secret_rotation_function.clone(),
489            )
490            .await?
491        };
492
493        // Spawn publisher
494        let _join_handler = Self::spawn_publisher(
495            topic_id.clone(),
496            secret_rotation_function.clone(),
497            initial_secret_hash,
498            endpoint.node_id(),
499            gossip_rx.clone(),
500            gossip_tx.clone(),
501            node_signing_key.clone(),
502        );
503
504        Ok(Self {
505            topic_id,
506            gossip_sender: gossip_tx,
507            gossip_receiver: gossip_rx,
508            _gossip: gossip,
509            initial_secret_hash,
510            secret_rotation_function: secret_rotation_function.unwrap_or_default(),
511            node_id: endpoint.node_id(),
512        })
513    }
514
515    pub fn split(&self) -> (GossipSender, GossipReceiver) {
516        (self.gossip_sender.clone(), self.gossip_receiver.clone())
517    }
518
519    pub fn topic_id(&self) -> &TopicId {
520        &self.topic_id
521    }
522
523    pub fn node_id(&self) -> &iroh::NodeId {
524        &self.node_id
525    }
526
527    pub fn gossip_sender(&self) -> GossipSender {
528        self.gossip_sender.clone()
529    }
530
531    pub fn gossip_receiver(&self) -> GossipReceiver {
532        self.gossip_receiver.clone()
533    }
534
535    pub fn secret_rotation_function(&self) -> R {
536        self.secret_rotation_function.clone()
537    }
538
539    pub fn initial_secret_hash(&self) -> [u8; 32] {
540        self.initial_secret_hash
541    }
542
543    pub fn set_initial_secret_hash(&mut self, initial_secret_hash: [u8; 32]) {
544        self.initial_secret_hash = initial_secret_hash;
545    }
546}
547
548// Procedures: Bootstrap, Publishing, Publisher
549impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
550    pub async fn bootstrap_no_wait(
551        topic_id: TopicId,
552        endpoint: &iroh::Endpoint,
553        node_signing_key: &ed25519_dalek::SigningKey,
554        gossip: iroh_gossip::net::Gossip,
555        initial_secret_hash: [u8; 32],
556        secret_rotation_function: Option<R>,
557    ) -> Result<(GossipSender, GossipReceiver)> {
558        let gossip_topic: iroh_gossip::api::GossipTopic = gossip
559            .subscribe(iroh_gossip::proto::TopicId::from(topic_id.hash), vec![])
560            .await?;
561        let (gossip_sender, gossip_receiver) = gossip_topic.split();
562        let (gossip_sender, gossip_receiver) = (
563            GossipSender::new(gossip_sender,gossip.clone()),
564            GossipReceiver::new(gossip_receiver,gossip.clone()),
565        );
566
567        tokio::spawn({
568            let gossip_sender = gossip_sender.clone();
569            let gossip_receiver = gossip_receiver.clone();
570            let endpoint = endpoint.clone();
571            let node_signing_key = node_signing_key.clone();
572            async move {
573                Self::bootstrap_from_gossip(
574                    gossip_sender,
575                    gossip_receiver,
576                    topic_id,
577                    &endpoint,
578                    &node_signing_key,
579                    initial_secret_hash,
580                    secret_rotation_function,
581                )
582                .await
583            }
584        });
585
586        Ok((gossip_sender, gossip_receiver))
587    }
588
589    pub async fn bootstrap(
590        topic_id: TopicId,
591        endpoint: &iroh::Endpoint,
592        node_signing_key: &ed25519_dalek::SigningKey,
593        gossip: iroh_gossip::net::Gossip,
594        initial_secret_hash: [u8; 32],
595        secret_rotation_function: Option<R>,
596    ) -> Result<(GossipSender, GossipReceiver)> {
597        let gossip_topic: iroh_gossip::api::GossipTopic = gossip
598            .subscribe(iroh_gossip::proto::TopicId::from(topic_id.hash), vec![])
599            .await?;
600        let (gossip_sender, gossip_receiver) = gossip_topic.split();
601        let (gossip_sender, gossip_receiver) = (
602            GossipSender::new(gossip_sender,gossip.clone()),
603            GossipReceiver::new(gossip_receiver,gossip.clone()),
604        );
605        Self::bootstrap_from_gossip(
606            gossip_sender,
607            gossip_receiver,
608            topic_id,
609            endpoint,
610            node_signing_key,
611            initial_secret_hash,
612            secret_rotation_function,
613        )
614        .await
615    }
616
617    async fn bootstrap_from_gossip(
618        gossip_sender: GossipSender,
619        mut gossip_receiver: GossipReceiver,
620        topic_id: TopicId,
621        endpoint: &iroh::Endpoint,
622        node_signing_key: &ed25519_dalek::SigningKey,
623        initial_secret_hash: [u8; 32],
624        secret_rotation_function: Option<R>,
625    ) -> Result<(GossipSender, GossipReceiver)> {
626        let mut last_published_unix_minute = 0;
627        loop {
628            // Check if we are connected to at least one node
629            if let Ok(joined) = gossip_receiver.is_joined().await {
630                if joined {
631                    return Ok((gossip_sender, gossip_receiver));
632                }
633            }
634
635            // On the first try we check the prev unix minute, after that the current one
636            let unix_minute = crate::unix_minute(if last_published_unix_minute == 0 {
637                -1
638            } else {
639                0
640            });
641
642            // Unique, verified records for the unix minute
643            let records = Topic::get_unix_minute_records(
644                &topic_id.clone(),
645                unix_minute,
646                secret_rotation_function.clone(),
647                initial_secret_hash,
648                &endpoint.node_id(),
649            )
650            .await;
651
652            // If there are no records, invoke the publish_proc (the publishing procedure)
653            // continue the loop after
654            if records.is_empty() {
655                if unix_minute != last_published_unix_minute
656                    && Self::publish_proc(
657                        unix_minute,
658                        &topic_id,
659                        secret_rotation_function.clone(),
660                        initial_secret_hash,
661                        endpoint.node_id(),
662                        node_signing_key,
663                        HashSet::new(),
664                        vec![],
665                    )
666                    .await
667                    .is_ok()
668                {
669                    last_published_unix_minute = unix_minute;
670                }
671                sleep(Duration::from_millis(100)).await;
672                continue;
673            }
674
675            // We found records
676
677            // Collect node ids from active_peers and record.node_id (of publisher)
678            let bootstrap_nodes = records
679                .iter()
680                .flat_map(|record| {
681                    let mut v = vec![record.node_id];
682                    for peer in record.active_peers {
683                        if peer != [0; 32] {
684                            v.push(peer);
685                        }
686                    }
687                    v
688                })
689                .filter_map(|node_id| iroh::NodeId::from_bytes(&node_id).ok())
690                .collect::<HashSet<_>>();
691
692            // Maybe in the meantime someone connected to us via one of our published records
693            // we don't want to disrup the gossip rotations any more then we have to
694            // so we check again before joining new peers
695            if let Ok(joined) = gossip_receiver.is_joined().await {
696                if joined {
697                    return Ok((gossip_sender, gossip_receiver));
698                }
699            }
700
701            // Instead of throwing everything into join_peers() at once we go node_id by node_id
702            // again to disrupt as little nodes peer neighborhoods as possible.
703            for node_id in bootstrap_nodes.iter() {
704                match gossip_sender.join_peers(vec![*node_id], None).await {
705                    Ok(_) => {
706                        sleep(Duration::from_millis(100)).await;
707                        if let Ok(joined) = gossip_receiver.is_joined().await {
708                            if joined {
709                                break;
710                            }
711                        }
712                    }
713                    Err(_) => {
714                        continue;
715                    }
716                }
717            }
718
719            // If we are still not connected to anyone:
720            // give it the default iroh-gossip connection timeout before the final is_joined() check
721            if let Ok(joined) = gossip_receiver.is_joined().await {
722                if !joined {
723                    sleep(Duration::from_millis(500)).await;
724                }
725            }
726
727            // If we are connected: return
728            if let Ok(joined) = gossip_receiver.is_joined().await {
729                if joined {
730                    return Ok((gossip_sender, gossip_receiver));
731                }
732            } else {
733                // If we are not connected: check if we should publish a record this minute
734                if unix_minute != last_published_unix_minute
735                    && Self::publish_proc(
736                        unix_minute,
737                        &topic_id,
738                        secret_rotation_function.clone(),
739                        initial_secret_hash,
740                        endpoint.node_id(),
741                        node_signing_key,
742                        HashSet::new(),
743                        vec![],
744                    )
745                    .await
746                    .is_ok()
747                {
748                    last_published_unix_minute = unix_minute;
749                }
750                sleep(Duration::from_millis(100)).await;
751                continue;
752            }
753        }
754    }
755
756    // publishing procedure: if more then MAX_BOOTSTRAP_RECORDS are written, don't write.
757    // returns all valid records found from nodes already connected to the iroh-gossip network.
758    #[allow(clippy::too_many_arguments)]
759    async fn publish_proc(
760        unix_minute: u64,
761        topic_id: &TopicId,
762        secret_rotation_function: Option<R>,
763        initial_secret_hash: [u8; 32],
764        node_id: iroh::NodeId,
765        node_signing_key: &ed25519_dalek::SigningKey,
766        neighbors: HashSet<iroh::NodeId>,
767        last_message_hashes: Vec<[u8; 32]>,
768    ) -> Result<HashSet<Record>> {
769        // Get verified records that have active_peers or last_message_hashes set (active participants)
770        let records = Topic::<R>::get_unix_minute_records(
771            &topic_id.clone(),
772            unix_minute,
773            secret_rotation_function.clone(),
774            initial_secret_hash,
775            &node_id,
776        )
777        .await
778        .iter()
779        .filter(|&record| {
780            record
781                .active_peers
782                .iter()
783                .filter(|&peer| peer.eq(&[0u8; 32]))
784                .count()
785                > 0
786                || record
787                    .last_message_hashes
788                    .iter()
789                    .filter(|&hash| hash.eq(&[0u8; 32]))
790                    .count()
791                    > 0
792        })
793        .cloned()
794        .collect::<HashSet<_>>();
795
796        // Don't publish if there are more then MAX_BOOTSTRAP_RECORDS already written
797        // that either have active_peers or last_message_hashes set (active participants)
798        if records.len() >= MAX_BOOTSTRAP_RECORDS {
799            return Ok(records);
800        }
801
802        // Publish own records
803        let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
804        for (i, peer) in neighbors.iter().take(5).enumerate() {
805            active_peers[i] = *peer.as_bytes()
806        }
807
808        let mut last_message_hashes_array = [[0u8; 32]; 5];
809        for (i, hash) in last_message_hashes.iter().take(5).enumerate() {
810            last_message_hashes_array[i] = *hash;
811        }
812
813        let record = Record::sign(
814            topic_id.hash,
815            unix_minute,
816            *node_id.as_bytes(),
817            active_peers,
818            last_message_hashes_array,
819            node_signing_key,
820        );
821        Topic::<R>::publish_unix_minute_record(
822            unix_minute,
823            &topic_id.clone(),
824            secret_rotation_function.clone(),
825            initial_secret_hash,
826            record,
827            Some(3),
828        )
829        .await?;
830
831        Ok(records)
832    }
833
834    // Runs after bootstrap to keep anouncing the topic on mainline and help identify and merge network bubbles
835    fn spawn_publisher(
836        topic_id: TopicId,
837        secret_rotation_function: Option<R>,
838        initial_secret_hash: [u8; 32],
839        node_id: iroh::NodeId,
840        gossip_receiver: GossipReceiver,
841        gossip_sender: GossipSender,
842        node_signing_key: ed25519_dalek::SigningKey,
843    ) -> tokio::task::JoinHandle<()> {
844        let mut gossip_receiver = gossip_receiver;
845
846        tokio::spawn(async move {
847            let mut backoff = 1;
848            loop {
849                let unix_minute = crate::unix_minute(0);
850
851                // Run publish_proc() (publishing procedure that is aware of MAX_BOOTSTRAP_RECORDS already written)
852                if let Ok(records) = Topic::<R>::publish_proc(
853                    unix_minute,
854                    &topic_id.clone(),
855                    Some(secret_rotation_function.clone().unwrap_or_default()),
856                    initial_secret_hash,
857                    node_id,
858                    &node_signing_key,
859                    gossip_receiver.neighbors().await.unwrap_or_default(),
860                    gossip_receiver.last_message_hashes(),
861                )
862                .await
863                {
864                    // Cluster size as bubble indicator
865                    let neighbors = gossip_receiver.neighbors().await.unwrap_or_default();
866                    if neighbors.len() < 4 && !records.is_empty() {
867                        let node_ids = records
868                            .iter()
869                            .flat_map(|record| {
870                                record
871                                    .active_peers
872                                    .iter()
873                                    .filter_map(|&active_peer| {
874                                        if active_peer == [0; 32]
875                                            || neighbors.contains(&active_peer)
876                                            || active_peer.eq(record.node_id.to_vec().as_slice())
877                                            || active_peer.eq(node_id.as_bytes())
878                                        {
879                                            None
880                                        } else {
881                                            iroh::NodeId::from_bytes(&active_peer).ok()
882                                        }
883                                    })
884                                    .collect::<Vec<_>>()
885                            })
886                            .collect::<HashSet<_>>();
887                        if gossip_sender
888                            .join_peers(
889                                node_ids.iter().cloned().collect::<Vec<_>>(),
890                                Some(MAX_JOIN_PEERS_COUNT),
891                            )
892                            .await
893                            .is_ok()
894                        {
895                            //println!("group-merger -> joined peer {}", node_id);
896                        }
897                    }
898
899                    // Message overlap indicator
900                    if !gossip_receiver.last_message_hashes().is_empty() {
901                        let peers_to_join = records
902                            .iter()
903                            .filter(|record| {
904                                !record.last_message_hashes.iter().all(|last_message_hash| {
905                                    *last_message_hash != [0; 32]
906                                        && gossip_receiver
907                                            .last_message_hashes()
908                                            .contains(last_message_hash)
909                                })
910                            })
911                            .collect::<Vec<_>>();
912                        if !peers_to_join.is_empty() {
913                            let node_ids = peers_to_join
914                                .iter()
915                                .flat_map(|&record| {
916                                    let mut peers = vec![];
917                                    if let Ok(node_id) = iroh::NodeId::from_bytes(&record.node_id) {
918                                        peers.push(node_id);
919                                    }
920                                    for active_peer in record.active_peers {
921                                        if active_peer == [0; 32] {
922                                            continue;
923                                        }
924                                        if let Ok(node_id) = iroh::NodeId::from_bytes(&active_peer)
925                                        {
926                                            peers.push(node_id);
927                                        }
928                                    }
929                                    peers
930                                })
931                                .collect::<HashSet<_>>();
932
933                            if gossip_sender
934                                .join_peers(
935                                    node_ids.iter().cloned().collect::<Vec<_>>(),
936                                    Some(MAX_JOIN_PEERS_COUNT),
937                                )
938                                .await
939                                .is_ok()
940                            {
941                                /*println!(
942                                    "bouble detected: no-message-overlap -> joined {} peers",
943                                    node_ids.len()
944                                );*/
945                            }
946                        }
947                    }
948                } else {
949                    sleep(Duration::from_secs(backoff)).await;
950                    backoff = (backoff * 2).max(60);
951                    continue;
952                }
953
954                backoff = 1;
955                sleep(Duration::from_secs(rand::random::<u64>() % 60)).await;
956            }
957        })
958    }
959}
960
961// Basic building blocks
962impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
963    fn signing_keypair(topic_id: &TopicId, unix_minute: u64) -> ed25519_dalek::SigningKey {
964        let mut sign_keypair_hash = sha2::Sha512::new();
965        sign_keypair_hash.update(topic_id.hash);
966        sign_keypair_hash.update(unix_minute.to_le_bytes());
967        let sign_keypair_seed: [u8; 32] = sign_keypair_hash.finalize()[..32]
968            .try_into()
969            .expect("hashing failed");
970        ed25519_dalek::SigningKey::from_bytes(&sign_keypair_seed)
971    }
972
973    fn encryption_keypair(
974        topic_id: &TopicId,
975        secret_rotation_function: &R,
976        initial_secret_hash: [u8; 32],
977        unix_minute: u64,
978    ) -> ed25519_dalek::SigningKey {
979        let enc_keypair_seed = secret_rotation_function.get_unix_minute_secret(
980            topic_id.hash,
981            unix_minute,
982            initial_secret_hash,
983        );
984        ed25519_dalek::SigningKey::from_bytes(&enc_keypair_seed)
985    }
986
987    // salt = hash (topic + unix_minute)
988    fn salt(topic_id: &TopicId, unix_minute: u64) -> [u8; 32] {
989        let mut slot_hash = sha2::Sha512::new();
990        slot_hash.update(topic_id.hash);
991        slot_hash.update(unix_minute.to_le_bytes());
992        slot_hash.finalize()[..32]
993            .try_into()
994            .expect("hashing failed")
995    }
996
997    async fn get_unix_minute_records(
998        topic_id: &TopicId,
999        unix_minute: u64,
1000        secret_rotation_function: Option<R>,
1001        initial_secret_hash: [u8; 32],
1002        node_id: &iroh::NodeId,
1003    ) -> HashSet<Record> {
1004        let topic_sign = Topic::<R>::signing_keypair(topic_id, unix_minute);
1005        let encryption_key = Topic::<R>::encryption_keypair(
1006            topic_id,
1007            &secret_rotation_function.clone().unwrap_or_default(),
1008            initial_secret_hash,
1009            unix_minute,
1010        );
1011        let salt = Topic::<R>::salt(topic_id, unix_minute);
1012
1013        // Get records, decrypt and verify
1014        let dht = get_dht();
1015
1016        let records_iter = timeout(
1017            Duration::from_secs(10),
1018            dht.get_mutable(topic_sign.verifying_key().as_bytes(), Some(&salt), None)
1019                .collect::<Vec<_>>(),
1020        )
1021        .await
1022        .unwrap_or_default();
1023
1024        records_iter
1025            .iter()
1026            .filter_map(
1027                |record| match EncryptedRecord::from_bytes(record.value().to_vec()) {
1028                    Ok(encrypted_record) => match encrypted_record.decrypt(&encryption_key) {
1029                        Ok(record) => match record.verify(&topic_id.hash, unix_minute) {
1030                            Ok(_) => match record.node_id.eq(node_id.as_bytes()) {
1031                                true => None,
1032                                false => Some(record),
1033                            },
1034                            Err(_) => None,
1035                        },
1036                        Err(_) => None,
1037                    },
1038                    Err(_) => None,
1039                },
1040            )
1041            .collect::<HashSet<_>>()
1042    }
1043
1044    async fn publish_unix_minute_record(
1045        unix_minute: u64,
1046        topic_id: &TopicId,
1047        secret_rotation_function: Option<R>,
1048        initial_secret_hash: [u8; 32],
1049        record: Record,
1050        retry_count: Option<usize>,
1051    ) -> Result<()> {
1052        let sign_key = Topic::<R>::signing_keypair(&topic_id.clone(), unix_minute);
1053        let salt = Topic::<R>::salt(topic_id, unix_minute);
1054        let encryption_key = Topic::<R>::encryption_keypair(
1055            &topic_id.clone(),
1056            &secret_rotation_function.clone().unwrap_or_default(),
1057            initial_secret_hash,
1058            unix_minute,
1059        );
1060        let encrypted_record = record.encrypt(&encryption_key);
1061
1062        for i in 0..retry_count.unwrap_or(3) {
1063            let dht = get_dht();
1064
1065            let most_recent_result = timeout(
1066                Duration::from_secs(10),
1067                dht.get_mutable_most_recent(
1068                    sign_key.clone().verifying_key().as_bytes(),
1069                    Some(&salt),
1070                ),
1071            )
1072            .await
1073            .unwrap_or_default();
1074
1075            let item = if let Some(mut_item) = most_recent_result {
1076                MutableItem::new(
1077                    sign_key.clone(),
1078                    &encrypted_record.to_bytes(),
1079                    mut_item.seq() + 1,
1080                    Some(&salt),
1081                )
1082            } else {
1083                MutableItem::new(
1084                    sign_key.clone(),
1085                    &encrypted_record.to_bytes(),
1086                    0,
1087                    Some(&salt),
1088                )
1089            };
1090
1091            let put_result = match timeout(
1092                Duration::from_secs(10),
1093                dht.put_mutable(item.clone(), Some(item.seq())),
1094            )
1095            .await
1096            {
1097                Ok(result) => result.ok(),
1098                Err(_) => None,
1099            };
1100
1101            if put_result.is_some() {
1102                break;
1103            } else if i == retry_count.unwrap_or(3) - 1 {
1104                bail!("failed to publish record")
1105            }
1106
1107            reset_dht().await;
1108
1109            sleep(Duration::from_millis(rand::random::<u64>() % 2000)).await;
1110        }
1111        Ok(())
1112    }
1113}
1114
1115pub trait AutoDiscoveryBuilder {
1116    #[allow(async_fn_in_trait)]
1117    async fn spawn_with_auto_discovery<R: SecretRotation + Default + Clone + Send + 'static>(
1118        self,
1119        endpoint: Endpoint,
1120        secret_rotation_function: Option<R>,
1121    ) -> Result<Gossip<R>>;
1122}
1123
1124impl AutoDiscoveryBuilder for iroh_gossip::net::Builder {
1125    async fn spawn_with_auto_discovery<R: SecretRotation + Default + Clone + Send + 'static>(
1126        self,
1127        endpoint: Endpoint,
1128        secret_rotation_function: Option<R>,
1129    ) -> Result<Gossip<R>> {
1130        Ok(Gossip {
1131            gossip: self.spawn(endpoint.clone()),
1132            endpoint: endpoint.clone(),
1133            secret_rotation_function: secret_rotation_function.unwrap_or_default(),
1134        })
1135    }
1136}
1137
1138pub trait AutoDiscoveryGossip<R: SecretRotation + Default + Clone + Send + 'static> {
1139    #[allow(async_fn_in_trait)]
1140    async fn subscribe_and_join_with_auto_discovery(
1141        &self,
1142        topic_id: TopicId,
1143        initial_secret: Vec<u8>,
1144    ) -> Result<Topic<R>>;
1145
1146    #[allow(async_fn_in_trait)]
1147    async fn subscribe_and_join_with_auto_discovery_no_wait(
1148        &self,
1149        topic_id: TopicId,
1150        initial_secret: Vec<u8>,
1151    ) -> Result<Topic<R>>;
1152}
1153
1154// Default secret rotation function
1155#[derive(Debug, Clone, Copy, Default)]
1156pub struct DefaultSecretRotation;
1157
1158pub trait SecretRotation {
1159    fn get_unix_minute_secret(
1160        &self,
1161        topic_hash: [u8; 32],
1162        unix_minute: u64,
1163        initial_secret_hash: [u8; 32],
1164    ) -> [u8; 32];
1165}
1166
1167impl<R: SecretRotation + Default + Clone + Send + 'static> AutoDiscoveryGossip<R> for Gossip<R> {
1168    async fn subscribe_and_join_with_auto_discovery(
1169        &self,
1170        topic_id: TopicId,
1171        initial_secret: Vec<u8>,
1172    ) -> Result<Topic<R>> {
1173        Topic::new(
1174            topic_id,
1175            &self.endpoint,
1176            self.endpoint.secret_key().secret(),
1177            self.gossip.clone(),
1178            &initial_secret,
1179            Some(self.secret_rotation_function.clone()),
1180            false,
1181        )
1182        .await
1183    }
1184
1185    async fn subscribe_and_join_with_auto_discovery_no_wait(
1186        &self,
1187        topic_id: TopicId,
1188        initial_secret: Vec<u8>,
1189    ) -> Result<Topic<R>> {
1190        Topic::new(
1191            topic_id,
1192            &self.endpoint,
1193            self.endpoint.secret_key().secret(),
1194            self.gossip.clone(),
1195            &initial_secret,
1196            Some(self.secret_rotation_function.clone()),
1197            true,
1198        )
1199        .await
1200    }
1201}
1202
1203impl SecretRotation for DefaultSecretRotation {
1204    fn get_unix_minute_secret(
1205        &self,
1206        topic_hash: [u8; 32],
1207        unix_minute: u64,
1208        initial_secret_hash: [u8; 32],
1209    ) -> [u8; 32] {
1210        let mut hash = sha2::Sha512::new();
1211        hash.update(topic_hash);
1212        hash.update(unix_minute.to_be_bytes());
1213        hash.update(initial_secret_hash);
1214        hash.finalize()[..32].try_into().expect("hashing failed")
1215    }
1216}
1217
1218pub fn unix_minute(minute_offset: i64) -> u64 {
1219    ((chrono::Utc::now().timestamp() as f64 / 60.0f64).floor() as i64 + minute_offset) as u64
1220}
1221
1222#[cfg(test)]
1223mod tests {
1224    use super::*;
1225    use ed25519_dalek::SigningKey;
1226    use rand::rngs::OsRng;
1227
1228    #[test]
1229    fn test_topic_id_creation() {
1230        let topic_id = TopicId::new("test-topic".to_string());
1231        assert_eq!(topic_id._raw, "test-topic");
1232        assert_eq!(topic_id.hash.len(), 32);
1233
1234        // Same input should produce same hash
1235        let topic_id2 = TopicId::new("test-topic".to_string());
1236        assert_eq!(topic_id.hash, topic_id2.hash);
1237
1238        // Different input should produce different hash
1239        let topic_id3 = TopicId::new("different-topic".to_string());
1240        assert_ne!(topic_id.hash, topic_id3.hash);
1241    }
1242
1243    #[test]
1244    fn test_record_serialization_roundtrip() {
1245        let signing_key = SigningKey::generate(&mut OsRng);
1246        let topic = [1u8; 32];
1247        let unix_minute = 12345u64;
1248        let node_id = [2u8; 32];
1249        let active_peers = [[3u8; 32]; 5];
1250        let last_message_hashes = [[4u8; 32]; 5];
1251
1252        let record = Record::sign(
1253            topic,
1254            unix_minute,
1255            node_id,
1256            active_peers,
1257            last_message_hashes,
1258            &signing_key,
1259        );
1260
1261        // Test serialization roundtrip
1262        let bytes = record.to_bytes();
1263        let deserialized = Record::from_bytes(bytes).unwrap();
1264
1265        assert_eq!(record.topic, deserialized.topic);
1266        assert_eq!(record.unix_minute, deserialized.unix_minute);
1267        assert_eq!(record.node_id, deserialized.node_id);
1268        assert_eq!(record.active_peers, deserialized.active_peers);
1269        assert_eq!(record.last_message_hashes, deserialized.last_message_hashes);
1270        assert_eq!(record.signature, deserialized.signature);
1271    }
1272
1273    #[test]
1274    fn test_record_verification() {
1275        let signing_key = SigningKey::generate(&mut OsRng);
1276        let topic = [1u8; 32];
1277        let unix_minute = 12345u64;
1278        let node_id = signing_key.verifying_key().to_bytes();
1279        let active_peers = [[3u8; 32]; 5];
1280        let last_message_hashes = [[4u8; 32]; 5];
1281
1282        let record = Record::sign(
1283            topic,
1284            unix_minute,
1285            node_id,
1286            active_peers,
1287            last_message_hashes,
1288            &signing_key,
1289        );
1290
1291        // Valid verification should pass
1292        assert!(record.verify(&topic, unix_minute).is_ok());
1293
1294        // Wrong topic should fail
1295        let wrong_topic = [99u8; 32];
1296        assert!(record.verify(&wrong_topic, unix_minute).is_err());
1297
1298        // Wrong unix_minute should fail
1299        assert!(record.verify(&topic, unix_minute + 1).is_err());
1300    }
1301
1302    #[test]
1303    fn test_encrypted_record_roundtrip() {
1304        let signing_key = SigningKey::generate(&mut OsRng);
1305        let encryption_key = SigningKey::generate(&mut OsRng);
1306        let topic = [1u8; 32];
1307        let unix_minute = 12345u64;
1308        let node_id = signing_key.verifying_key().to_bytes();
1309        let active_peers = [[3u8; 32]; 5];
1310        let last_message_hashes = [[4u8; 32]; 5];
1311
1312        let record = Record::sign(
1313            topic,
1314            unix_minute,
1315            node_id,
1316            active_peers,
1317            last_message_hashes,
1318            &signing_key,
1319        );
1320
1321        // Test encryption/decryption roundtrip
1322        let encrypted = record.encrypt(&encryption_key);
1323        let decrypted = encrypted.decrypt(&encryption_key).unwrap();
1324
1325        assert_eq!(record.topic, decrypted.topic);
1326        assert_eq!(record.unix_minute, decrypted.unix_minute);
1327        assert_eq!(record.node_id, decrypted.node_id);
1328        assert_eq!(record.active_peers, decrypted.active_peers);
1329        assert_eq!(record.last_message_hashes, decrypted.last_message_hashes);
1330        assert_eq!(record.signature, decrypted.signature);
1331    }
1332
1333    #[test]
1334    fn test_encrypted_record_serialization() {
1335        let signing_key = SigningKey::generate(&mut OsRng);
1336        let encryption_key = SigningKey::generate(&mut OsRng);
1337        let topic = [1u8; 32];
1338        let unix_minute = 12345u64;
1339        let node_id = signing_key.verifying_key().to_bytes();
1340        let active_peers = [[3u8; 32]; 5];
1341        let last_message_hashes = [[4u8; 32]; 5];
1342
1343        let record = Record::sign(
1344            topic,
1345            unix_minute,
1346            node_id,
1347            active_peers,
1348            last_message_hashes,
1349            &signing_key,
1350        );
1351
1352        let encrypted = record.encrypt(&encryption_key);
1353
1354        // Test serialization roundtrip
1355        let bytes = encrypted.to_bytes();
1356        let deserialized = EncryptedRecord::from_bytes(bytes).unwrap();
1357
1358        // Should be able to decrypt the deserialized version
1359        let decrypted = deserialized.decrypt(&encryption_key).unwrap();
1360        assert_eq!(record.topic, decrypted.topic);
1361        assert_eq!(record.unix_minute, decrypted.unix_minute);
1362    }
1363
1364    #[test]
1365    fn test_default_secret_rotation() {
1366        let rotation = DefaultSecretRotation;
1367        let topic_hash = [1u8; 32];
1368        let unix_minute = 12345u64;
1369        let initial_secret_hash = [2u8; 32];
1370
1371        let secret1 = rotation.get_unix_minute_secret(topic_hash, unix_minute, initial_secret_hash);
1372        let secret2 = rotation.get_unix_minute_secret(topic_hash, unix_minute, initial_secret_hash);
1373
1374        // Same inputs should produce same secret
1375        assert_eq!(secret1, secret2);
1376
1377        // Different unix_minute should produce different secret
1378        let secret3 =
1379            rotation.get_unix_minute_secret(topic_hash, unix_minute + 1, initial_secret_hash);
1380        assert_ne!(secret1, secret3);
1381
1382        // Different topic should produce different secret
1383        let different_topic = [99u8; 32];
1384        let secret4 =
1385            rotation.get_unix_minute_secret(different_topic, unix_minute, initial_secret_hash);
1386        assert_ne!(secret1, secret4);
1387    }
1388
1389    #[test]
1390    fn test_unix_minute_function() {
1391        let current = unix_minute(0);
1392        let prev = unix_minute(-1);
1393        let next = unix_minute(1);
1394
1395        assert_eq!(current, prev + 1);
1396        assert_eq!(next, current + 1);
1397
1398        // Should be deterministic
1399        let current2 = unix_minute(0);
1400        assert_eq!(current, current2);
1401    }
1402
1403    #[test]
1404    fn test_topic_signing_keypair_deterministic() {
1405        let topic_id = TopicId::new("test-topic".to_string());
1406        let unix_minute = 12345u64;
1407
1408        let key1 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute);
1409        let key2 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute);
1410
1411        // Same inputs should produce same keypair
1412        assert_eq!(key1.to_bytes(), key2.to_bytes());
1413
1414        // Different unix_minute should produce different keypair
1415        let key3 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute + 1);
1416        assert_ne!(key1.to_bytes(), key3.to_bytes());
1417    }
1418
1419    #[test]
1420    fn test_topic_encryption_keypair_deterministic() {
1421        let topic_id = TopicId::new("test-topic".to_string());
1422        let unix_minute = 12345u64;
1423        let initial_secret_hash = [1u8; 32];
1424        let rotation = DefaultSecretRotation;
1425
1426        let key1 = Topic::<DefaultSecretRotation>::encryption_keypair(
1427            &topic_id,
1428            &rotation,
1429            initial_secret_hash,
1430            unix_minute,
1431        );
1432        let key2 = Topic::<DefaultSecretRotation>::encryption_keypair(
1433            &topic_id,
1434            &rotation,
1435            initial_secret_hash,
1436            unix_minute,
1437        );
1438
1439        // Same inputs should produce same keypair
1440        assert_eq!(key1.to_bytes(), key2.to_bytes());
1441
1442        // Different unix_minute should produce different keypair
1443        let key3 = Topic::<DefaultSecretRotation>::encryption_keypair(
1444            &topic_id,
1445            &rotation,
1446            initial_secret_hash,
1447            unix_minute + 1,
1448        );
1449        assert_ne!(key1.to_bytes(), key3.to_bytes());
1450    }
1451
1452    #[test]
1453    fn test_topic_salt_deterministic() {
1454        let topic_id = TopicId::new("test-topic".to_string());
1455        let unix_minute = 12345u64;
1456
1457        let salt1 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute);
1458        let salt2 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute);
1459
1460        // Same inputs should produce same salt
1461        assert_eq!(salt1, salt2);
1462
1463        // Different unix_minute should produce different salt
1464        let salt3 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute + 1);
1465        assert_ne!(salt1, salt3);
1466    }
1467}