distributed_topic_tracker/
lib.rs

1use std::{collections::HashSet, sync::Arc, time::Duration};
2
3use anyhow::{Result, bail};
4use arc_swap::ArcSwap;
5use ed25519_dalek::ed25519::signature::SignerMut;
6use futures::StreamExt as _;
7use iroh::Endpoint;
8use iroh_gossip::{api::Event, proto::DeliveryScope};
9use mainline::{MutableItem, async_dht::AsyncDht};
10use once_cell::sync::Lazy;
11use rand::seq::SliceRandom;
12use sha2::Digest;
13
14use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
15use tokio::time::{sleep, timeout};
16
17pub const MAX_JOIN_PEERS_COUNT: usize = 30;
18pub const MAX_BOOTSTRAP_RECORDS: usize = 10;
19pub const SECRET_ROTATION: DefaultSecretRotation = DefaultSecretRotation;
20
21static DHT: Lazy<ArcSwap<mainline::async_dht::AsyncDht>> = Lazy::new(|| {
22    ArcSwap::from_pointee(
23        mainline::Dht::builder()
24            .build()
25            .expect("failed to create dht")
26            .as_async(),
27    )
28});
29
30fn get_dht() -> Arc<AsyncDht> {
31    DHT.load_full()
32}
33
34async fn reset_dht() {
35    let n_dht = mainline::Dht::builder()
36        .build()
37        .expect("failed to create dht");
38    DHT.store(Arc::new(n_dht.as_async()));
39}
40
41#[derive(Debug, Clone)]
42pub struct EncryptedRecord {
43    encrypted_record: Vec<u8>,
44    encrypted_decryption_key: Vec<u8>,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Hash)]
48pub struct Record {
49    topic: [u8; 32],
50    unix_minute: u64,
51    node_id: [u8; 32],
52    active_peers: [[u8; 32]; 5],
53    last_message_hashes: [[u8; 32]; 5],
54    signature: [u8; 64],
55}
56
57pub struct Gossip<R: SecretRotation + Default + Clone + Send + 'static> {
58    pub gossip: iroh_gossip::net::Gossip,
59    endpoint: iroh::Endpoint,
60    secret_rotation_function: R,
61}
62
63#[derive(Debug)]
64pub struct Topic<R: SecretRotation + Default + Clone + Send + 'static> {
65    topic_id: TopicId,
66    gossip_sender: GossipSender,
67    gossip_receiver: GossipReceiver,
68    _gossip: iroh_gossip::net::Gossip,
69    initial_secret_hash: [u8; 32],
70    secret_rotation_function: R,
71    node_id: iroh::NodeId,
72}
73
74#[derive(Debug, Clone)]
75pub struct TopicId {
76    _raw: String,
77    hash: [u8; 32], // sha512( raw )[..32]
78}
79
80#[derive(Debug)]
81pub struct GossipReceiver {
82    gossip_event_forwarder: tokio::sync::broadcast::Sender<iroh_gossip::api::Event>,
83    action_req: tokio::sync::mpsc::Sender<InnerActionRecv>,
84    last_message_hashes: Vec<[u8; 32]>,
85    _keep_alive_rx: tokio::sync::broadcast::Receiver<iroh_gossip::api::Event>,
86    _gossip: iroh_gossip::net::Gossip,
87}
88
89impl Clone for GossipReceiver {
90    fn clone(&self) -> Self {
91        Self {
92            gossip_event_forwarder: self.gossip_event_forwarder.clone(),
93            action_req: self.action_req.clone(),
94            last_message_hashes: self.last_message_hashes.clone(),
95            _keep_alive_rx: self.gossip_event_forwarder.subscribe(),
96            _gossip: self._gossip.clone(),
97        }
98    }
99}
100
101#[derive(Debug)]
102pub struct GossipSender {
103    action_req: tokio::sync::mpsc::Sender<InnerActionSend>,
104    _gossip: iroh_gossip::net::Gossip,
105}
106
107impl Clone for GossipSender {
108    fn clone(&self) -> Self {
109        Self {
110            action_req: self.action_req.clone(),
111            _gossip: self._gossip.clone(),
112        }
113    }
114}
115
116
117#[derive(Debug)]
118enum InnerActionRecv {
119    ReqNeighbors(tokio::sync::oneshot::Sender<HashSet<iroh::NodeId>>),
120    ReqIsJoined(tokio::sync::oneshot::Sender<bool>),
121}
122
123#[derive(Debug)]
124enum InnerActionSend {
125    ReqSend(Vec<u8>, tokio::sync::oneshot::Sender<bool>),
126    ReqJoinPeers(Vec<iroh::NodeId>, tokio::sync::oneshot::Sender<bool>),
127}
128
129impl EncryptedRecord {
130    pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
131        let one_time_key_bytes: [u8; 32] = decryption_key
132            .decrypt(&self.encrypted_decryption_key)?
133            .as_slice()
134            .try_into()?;
135        let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
136
137        let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
138        let record = Record::from_bytes(decrypted_record)?;
139        Ok(record)
140    }
141
142    pub fn to_bytes(&self) -> Vec<u8> {
143        let mut buf = Vec::new();
144        let encrypted_record_len = self.encrypted_record.len() as u32;
145        buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
146        buf.extend_from_slice(&self.encrypted_record);
147        buf.extend_from_slice(&self.encrypted_decryption_key);
148        buf
149    }
150
151    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
152        let (encrypted_record_len, buf) = buf.split_at(4);
153        let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
154        let (encrypted_record, encrypted_decryption_key) =
155            buf.split_at(encrypted_record_len as usize);
156
157        Ok(Self {
158            encrypted_record: encrypted_record.to_vec(),
159            encrypted_decryption_key: encrypted_decryption_key.to_vec(),
160        })
161    }
162}
163
164impl Record {
165    pub fn sign(
166        topic: [u8; 32],
167        unix_minute: u64,
168        node_id: [u8; 32],
169        active_peers: [[u8; 32]; 5],
170        last_message_hashes: [[u8; 32]; 5],
171        signing_key: &ed25519_dalek::SigningKey,
172    ) -> Self {
173        let mut signature_data = Vec::new();
174        signature_data.extend_from_slice(&topic);
175        signature_data.extend_from_slice(&unix_minute.to_le_bytes());
176        signature_data.extend_from_slice(&node_id);
177        for active_peer in active_peers {
178            signature_data.extend_from_slice(&active_peer);
179        }
180        for last_message_hash in last_message_hashes {
181            signature_data.extend_from_slice(&last_message_hash);
182        }
183        let mut signing_key = signing_key.clone();
184        let signature = signing_key.sign(&signature_data);
185        Self {
186            topic,
187            unix_minute,
188            node_id,
189            active_peers,
190            last_message_hashes,
191            signature: signature.to_bytes(),
192        }
193    }
194
195    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
196        let (topic, buf) = buf.split_at(32);
197        let (unix_minute, buf) = buf.split_at(8);
198        let (node_id, mut buf) = buf.split_at(32);
199
200        let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
201        #[allow(clippy::needless_range_loop)]
202        for i in 0..active_peers.len() {
203            let (active_peer, _buf) = buf.split_at(32);
204            active_peers[i] = active_peer.try_into()?;
205            buf = _buf;
206        }
207        let mut last_message_hashes: [[u8; 32]; 5] = [[0; 32]; 5];
208        #[allow(clippy::needless_range_loop)]
209        for i in 0..last_message_hashes.len() {
210            let (last_message_hash, _buf) = buf.split_at(32);
211            last_message_hashes[i] = last_message_hash.try_into()?;
212            buf = _buf;
213        }
214
215        let (signature, buf) = buf.split_at(64);
216
217        if !buf.is_empty() {
218            bail!("buffer not empty after reconstruction")
219        }
220
221        Ok(Self {
222            topic: topic.try_into()?,
223            unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
224            node_id: node_id.try_into()?,
225            active_peers,
226            last_message_hashes,
227            signature: signature.try_into()?,
228        })
229    }
230
231    pub fn to_bytes(&self) -> Vec<u8> {
232        let mut buf = Vec::new();
233        buf.extend_from_slice(&self.topic);
234        buf.extend_from_slice(&self.unix_minute.to_le_bytes());
235        buf.extend_from_slice(&self.node_id);
236        for active_peer in self.active_peers {
237            buf.extend_from_slice(&active_peer);
238        }
239        for last_message_hash in self.last_message_hashes {
240            buf.extend_from_slice(&last_message_hash);
241        }
242        buf.extend_from_slice(&self.signature);
243        buf
244    }
245
246    pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
247        if self.topic != *actual_topic {
248            bail!("topic mismatch")
249        }
250        if self.unix_minute != actual_unix_minute {
251            bail!("unix minute mismatch")
252        }
253
254        let record_bytes = self.to_bytes();
255        let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
256        let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
257        let node_id = ed25519_dalek::VerifyingKey::from_bytes(&self.node_id)?;
258
259        node_id.verify_strict(signature_data.as_slice(), &signature)?;
260
261        Ok(())
262    }
263
264    pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
265        let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::thread_rng());
266        let p_key = one_time_key.verifying_key();
267        let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
268        let key_enc = encryption_key
269            .verifying_key()
270            .encrypt(&one_time_key.to_bytes())
271            .expect("encryption failed");
272
273        EncryptedRecord {
274            encrypted_record: data_enc,
275            encrypted_decryption_key: key_enc,
276        }
277    }
278}
279
280impl GossipSender {
281    pub fn new(gossip_sender: iroh_gossip::api::GossipSender, gossip: iroh_gossip::net::Gossip) -> Self {
282        let (action_req_tx, mut action_req_rx) =
283            tokio::sync::mpsc::channel::<InnerActionSend>(1024);
284
285        tokio::spawn({
286            let gossip_sender = gossip_sender;
287            async move {
288                while let Some(inner_action) = action_req_rx.recv().await {
289                    match inner_action {
290                        InnerActionSend::ReqSend(data, tx) => {
291                            let res = gossip_sender.broadcast(data.into()).await;
292                            tx.send(res.is_ok()).expect("broadcast failed");
293                        }
294                        InnerActionSend::ReqJoinPeers(peers, tx) => {
295                            let res = gossip_sender.join_peers(peers).await;                     
296                            tx.send(res.is_ok()).expect("broadcast failed");
297                        }
298                    }
299                }
300            }
301        });
302
303        Self {
304            action_req: action_req_tx,
305            _gossip: gossip,
306        }
307    }
308
309    pub async fn broadcast(&self, data: Vec<u8>) -> Result<()> {
310        let (tx, rx) = tokio::sync::oneshot::channel::<bool>();
311        let _ = self
312            .action_req
313            .send(InnerActionSend::ReqSend(data, tx))
314            .await;
315
316        match rx.await {
317            Ok(true) => Ok(()),
318            Ok(false) => bail!("broadcast failed"),
319            Err(_) => panic!("broadcast failed"),
320        }
321    }
322
323    pub async fn join_peers(
324        &self,
325        peers: Vec<iroh::NodeId>,
326        max_peers: Option<usize>,
327    ) -> Result<()> {
328        let mut peers = peers;
329        if let Some(max_peers) = max_peers {
330            peers.shuffle(&mut rand::thread_rng());
331            peers.truncate(max_peers);
332        }
333
334        let (tx, rx) = tokio::sync::oneshot::channel::<bool>();
335        let _ = self
336            .action_req
337            .send(InnerActionSend::ReqJoinPeers(peers, tx))
338            .await;
339
340        match rx.await {
341            Ok(true) => Ok(()),
342            Ok(false) => bail!("join peers failed"),
343            Err(_) => panic!("broadcast failed"),
344        }
345    }
346}
347
348impl GossipReceiver {
349    pub fn new(gossip_receiver: iroh_gossip::api::GossipReceiver, gossip: iroh_gossip::net::Gossip) -> Self {
350        let (gossip_forward_tx, _) =
351            tokio::sync::broadcast::channel::<iroh_gossip::api::Event>(1024);
352        let (action_req_tx, mut action_req_rx) =
353            tokio::sync::mpsc::channel::<InnerActionRecv>(1024);
354
355        let keep_alive_rx = gossip_forward_tx.subscribe();
356
357        let self_ref = Self {
358            gossip_event_forwarder: gossip_forward_tx.clone(),
359            action_req: action_req_tx.clone(),
360            last_message_hashes: vec![],
361            _keep_alive_rx: keep_alive_rx,
362            _gossip: gossip,
363        };
364
365        tokio::spawn({
366            let mut self_ref = self_ref.clone();
367            async move {
368                let mut gossip_receiver = gossip_receiver;
369                loop {
370                    tokio::select! {
371                        Some(inner_action) = action_req_rx.recv() => {
372                            match inner_action {
373                                InnerActionRecv::ReqNeighbors(tx) => {
374                                    let neighbors = gossip_receiver.neighbors().collect::<HashSet<iroh::NodeId>>();
375                                    let _ = tx.send(neighbors);
376                                },
377                                InnerActionRecv::ReqIsJoined(tx) => {
378                                    let is_joined = gossip_receiver.is_joined();
379                                    let _ = tx.send(is_joined);
380                                }
381                            }
382                        }
383                        gossip_event_res = gossip_receiver.next() => {
384                            if let Some(Ok(gossip_event)) = gossip_event_res {
385                                if let Event::Received(msg) = gossip_event.clone() {
386                                    if let DeliveryScope::Swarm(_) = msg.scope {
387                                        let hash = sha2::Sha512::digest(&msg.content);
388                                        self_ref.last_message_hashes.push(hash[..32].try_into().expect("hashing failed"));
389                                        while self_ref.last_message_hashes.len() > 5 {
390                                            self_ref.last_message_hashes.pop();
391                                        }
392                                    }
393                                }
394                                let _ = self_ref.gossip_event_forwarder.send(gossip_event.clone());
395                            } else {
396                                break;
397                            }
398                        }
399                    }
400                }
401            }
402        });
403
404        self_ref
405    }
406
407    pub async fn neighbors(&mut self) -> Result<HashSet<iroh::NodeId>> {
408        let (neighbors_tx, neighbors_rx) = tokio::sync::oneshot::channel::<HashSet<iroh::NodeId>>();
409
410        let _ = self
411            .action_req
412            .send(InnerActionRecv::ReqNeighbors(neighbors_tx))
413            .await;
414
415        neighbors_rx.await.map_err(|err| anyhow::anyhow!(err))
416    }
417
418    pub async fn is_joined(&mut self) -> Result<bool> {
419        let (is_joined_tx, is_joined_rx) = tokio::sync::oneshot::channel::<bool>();
420
421        let _ = self
422            .action_req
423            .send(InnerActionRecv::ReqIsJoined(is_joined_tx))
424            .await;
425
426        is_joined_rx.await.map_err(|err| anyhow::anyhow!(err))
427    }
428
429    pub async fn subscribe(&mut self) -> Result<tokio::sync::broadcast::Receiver<Event>> {
430        Ok(self.gossip_event_forwarder.subscribe())
431    }
432
433    pub fn last_message_hashes(&self) -> Vec<[u8; 32]> {
434        self.last_message_hashes.clone()
435    }
436}
437
438impl TopicId {
439    pub fn new(raw: String) -> Self {
440        let mut raw_hash = sha2::Sha512::new();
441        raw_hash.update(raw.as_bytes());
442
443        Self {
444            _raw: raw,
445            hash: raw_hash.finalize()[..32]
446                .try_into()
447                .expect("hashing 'raw' failed"),
448        }
449    }
450}
451
452// State: new, split, spawn_publisher
453impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
454    pub async fn new(
455        topic_id: TopicId,
456        endpoint: &iroh::Endpoint,
457        node_signing_key: &ed25519_dalek::SigningKey,
458        gossip: iroh_gossip::net::Gossip,
459        initial_secret: &Vec<u8>,
460        secret_rotation_function: Option<R>,
461        async_bootstrap: bool,
462    ) -> Result<Self> {
463        // Create secret_hash
464        let mut initial_secret_hash = sha2::Sha512::new();
465        initial_secret_hash.update(initial_secret);
466        let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
467            .try_into()
468            .expect("hashing failed");
469
470        // Bootstrap to get gossip tx/rx
471        let (gossip_tx, gossip_rx) = if async_bootstrap {
472            Self::bootstrap_no_wait(
473                topic_id.clone(),
474                endpoint,
475                node_signing_key,
476                gossip.clone(),
477                initial_secret_hash,
478                secret_rotation_function.clone(),
479            )
480            .await?
481        } else {
482            Self::bootstrap(
483                topic_id.clone(),
484                endpoint,
485                node_signing_key,
486                gossip.clone(),
487                initial_secret_hash,
488                secret_rotation_function.clone(),
489            )
490            .await?
491        };
492
493        // Spawn publisher
494        let _join_handler = Self::spawn_publisher(
495            topic_id.clone(),
496            secret_rotation_function.clone(),
497            initial_secret_hash,
498            endpoint.node_id(),
499            gossip_rx.clone(),
500            gossip_tx.clone(),
501            node_signing_key.clone(),
502        );
503
504        Ok(Self {
505            topic_id,
506            gossip_sender: gossip_tx,
507            gossip_receiver: gossip_rx,
508            _gossip: gossip,
509            initial_secret_hash,
510            secret_rotation_function: secret_rotation_function.unwrap_or_default(),
511            node_id: endpoint.node_id(),
512        })
513    }
514
515    pub fn split(&self) -> (GossipSender, GossipReceiver) {
516        (self.gossip_sender.clone(), self.gossip_receiver.clone())
517    }
518
519    pub fn topic_id(&self) -> &TopicId {
520        &self.topic_id
521    }
522
523    pub fn node_id(&self) -> &iroh::NodeId {
524        &self.node_id
525    }
526
527    pub fn gossip_sender(&self) -> GossipSender {
528        self.gossip_sender.clone()
529    }
530
531    pub fn gossip_receiver(&self) -> GossipReceiver {
532        self.gossip_receiver.clone()
533    }
534
535    pub fn secret_rotation_function(&self) -> R {
536        self.secret_rotation_function.clone()
537    }
538
539    pub fn initial_secret_hash(&self) -> [u8; 32] {
540        self.initial_secret_hash
541    }
542
543    pub fn set_initial_secret_hash(&mut self, initial_secret_hash: [u8; 32]) {
544        self.initial_secret_hash = initial_secret_hash;
545    }
546}
547
548// Procedures: Bootstrap, Publishing, Publisher
549impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
550    pub async fn bootstrap_no_wait(
551        topic_id: TopicId,
552        endpoint: &iroh::Endpoint,
553        node_signing_key: &ed25519_dalek::SigningKey,
554        gossip: iroh_gossip::net::Gossip,
555        initial_secret_hash: [u8; 32],
556        secret_rotation_function: Option<R>,
557    ) -> Result<(GossipSender, GossipReceiver)> {
558        let gossip_topic: iroh_gossip::api::GossipTopic = gossip
559            .subscribe(iroh_gossip::proto::TopicId::from(topic_id.hash), vec![])
560            .await?;
561        let (gossip_sender, gossip_receiver) = gossip_topic.split();
562        let (gossip_sender, gossip_receiver) = (
563            GossipSender::new(gossip_sender,gossip.clone()),
564            GossipReceiver::new(gossip_receiver,gossip.clone()),
565        );
566
567        tokio::spawn({
568            let gossip_sender = gossip_sender.clone();
569            let gossip_receiver = gossip_receiver.clone();
570            let endpoint = endpoint.clone();
571            let node_signing_key = node_signing_key.clone();
572            async move {
573                Self::bootstrap_from_gossip(
574                    gossip_sender,
575                    gossip_receiver,
576                    topic_id,
577                    &endpoint,
578                    &node_signing_key,
579                    initial_secret_hash,
580                    secret_rotation_function,
581                )
582                .await
583            }
584        });
585
586        Ok((gossip_sender, gossip_receiver))
587    }
588
589    pub async fn bootstrap(
590        topic_id: TopicId,
591        endpoint: &iroh::Endpoint,
592        node_signing_key: &ed25519_dalek::SigningKey,
593        gossip: iroh_gossip::net::Gossip,
594        initial_secret_hash: [u8; 32],
595        secret_rotation_function: Option<R>,
596    ) -> Result<(GossipSender, GossipReceiver)> {
597        let gossip_topic: iroh_gossip::api::GossipTopic = gossip
598            .subscribe(iroh_gossip::proto::TopicId::from(topic_id.hash), vec![])
599            .await?;
600        let (gossip_sender, gossip_receiver) = gossip_topic.split();
601        let (gossip_sender, gossip_receiver) = (
602            GossipSender::new(gossip_sender,gossip.clone()),
603            GossipReceiver::new(gossip_receiver,gossip.clone()),
604        );
605        Self::bootstrap_from_gossip(
606            gossip_sender,
607            gossip_receiver,
608            topic_id,
609            endpoint,
610            node_signing_key,
611            initial_secret_hash,
612            secret_rotation_function,
613        )
614        .await
615    }
616
617    async fn bootstrap_from_gossip(
618        gossip_sender: GossipSender,
619        mut gossip_receiver: GossipReceiver,
620        topic_id: TopicId,
621        endpoint: &iroh::Endpoint,
622        node_signing_key: &ed25519_dalek::SigningKey,
623        initial_secret_hash: [u8; 32],
624        secret_rotation_function: Option<R>,
625    ) -> Result<(GossipSender, GossipReceiver)> {
626        let mut last_published_unix_minute = 0;
627        loop {
628            // Check if we are connected to at least one node
629            if let Ok(joined) = gossip_receiver.is_joined().await {
630                if joined {
631                    return Ok((gossip_sender, gossip_receiver));
632                }
633            }
634
635            // On the first try we check the prev unix minute, after that the current one
636            let unix_minute = crate::unix_minute(if last_published_unix_minute == 0 {
637                -1
638            } else {
639                0
640            });
641
642            // Unique, verified records for the unix minute
643            let records = Topic::get_unix_minute_records(
644                &topic_id.clone(),
645                unix_minute,
646                secret_rotation_function.clone(),
647                initial_secret_hash,
648                &endpoint.node_id(),
649            )
650            .await;
651
652            // If there are no records, invoke the publish_proc (the publishing procedure)
653            // continue the loop after
654            if records.is_empty() {
655                if unix_minute != last_published_unix_minute {
656                    last_published_unix_minute = unix_minute;
657                    tokio::spawn({
658                        let topic_id = topic_id.clone();
659                        let node_id = endpoint.node_id();
660                        let node_signing_key = node_signing_key.clone();
661                        let secret_rotation_function = secret_rotation_function.clone();
662                        async move {
663                            let _ = Self::publish_proc(
664                                unix_minute,
665                                &topic_id,
666                                secret_rotation_function,
667                                initial_secret_hash,
668                                node_id,
669                                &node_signing_key,
670                                HashSet::new(),
671                                vec![],
672                            )
673                            .await;
674                        }
675                    });
676                }
677                sleep(Duration::from_millis(100)).await;
678                continue;
679            }
680
681            // We found records
682
683            // Collect node ids from active_peers and record.node_id (of publisher)
684            let bootstrap_nodes = records
685                .iter()
686                .flat_map(|record| {
687                    let mut v = vec![record.node_id];
688                    for peer in record.active_peers {
689                        if peer != [0; 32] {
690                            v.push(peer);
691                        }
692                    }
693                    v
694                })
695                .filter_map(|node_id| iroh::NodeId::from_bytes(&node_id).ok())
696                .collect::<HashSet<_>>();
697
698            // Maybe in the meantime someone connected to us via one of our published records
699            // we don't want to disrup the gossip rotations any more then we have to
700            // so we check again before joining new peers
701            if let Ok(joined) = gossip_receiver.is_joined().await {
702                if joined {
703                    return Ok((gossip_sender, gossip_receiver));
704                }
705            }
706
707            // Instead of throwing everything into join_peers() at once we go node_id by node_id
708            // again to disrupt as little nodes peer neighborhoods as possible.
709            for node_id in bootstrap_nodes.iter() {
710                match gossip_sender.join_peers(vec![*node_id], None).await {
711                    Ok(_) => {
712                        sleep(Duration::from_millis(100)).await;
713                        if let Ok(joined) = gossip_receiver.is_joined().await {
714                            if joined {
715                                break;
716                            }
717                        }
718                    }
719                    Err(_) => {
720                        continue;
721                    }
722                }
723            }
724
725            // If we are still not connected to anyone:
726            // give it the default iroh-gossip connection timeout before the final is_joined() check
727            if let Ok(joined) = gossip_receiver.is_joined().await {
728                if !joined {
729                    sleep(Duration::from_millis(500)).await;
730                }
731            }
732
733            // If we are connected: return
734            if let Ok(joined) = gossip_receiver.is_joined().await {
735                if joined {
736                    return Ok((gossip_sender, gossip_receiver));
737                }
738            } else {
739                // If we are not connected: check if we should publish a record this minute
740                if unix_minute != last_published_unix_minute {
741                    last_published_unix_minute = unix_minute;
742                    tokio::spawn({
743                        let topic_id = topic_id.clone();
744                        let node_id = endpoint.node_id();
745                        let node_signing_key = node_signing_key.clone();
746                        let secret_rotation_function = secret_rotation_function.clone();
747                        async move {
748                            let _ = Self::publish_proc(
749                                unix_minute,
750                                &topic_id,
751                                secret_rotation_function,
752                                initial_secret_hash,
753                                node_id,
754                                &node_signing_key,
755                                HashSet::new(),
756                                vec![],
757                            )
758                            .await;
759                        }
760                    });
761                }
762                sleep(Duration::from_millis(100)).await;
763                continue;
764            }
765        }
766    }
767
768    // publishing procedure: if more then MAX_BOOTSTRAP_RECORDS are written, don't write.
769    // returns all valid records found from nodes already connected to the iroh-gossip network.
770    #[allow(clippy::too_many_arguments)]
771    async fn publish_proc(
772        unix_minute: u64,
773        topic_id: &TopicId,
774        secret_rotation_function: Option<R>,
775        initial_secret_hash: [u8; 32],
776        node_id: iroh::NodeId,
777        node_signing_key: &ed25519_dalek::SigningKey,
778        neighbors: HashSet<iroh::NodeId>,
779        last_message_hashes: Vec<[u8; 32]>,
780    ) -> Result<HashSet<Record>> {
781        // Get verified records that have active_peers or last_message_hashes set (active participants)
782        let records = Topic::<R>::get_unix_minute_records(
783            &topic_id.clone(),
784            unix_minute,
785            secret_rotation_function.clone(),
786            initial_secret_hash,
787            &node_id,
788        )
789        .await
790        .iter()
791        .filter(|&record| {
792            record
793                .active_peers
794                .iter()
795                .filter(|&peer| peer.eq(&[0u8; 32]))
796                .count()
797                > 0
798                || record
799                    .last_message_hashes
800                    .iter()
801                    .filter(|&hash| hash.eq(&[0u8; 32]))
802                    .count()
803                    > 0
804        })
805        .cloned()
806        .collect::<HashSet<_>>();
807
808        // Don't publish if there are more then MAX_BOOTSTRAP_RECORDS already written
809        // that either have active_peers or last_message_hashes set (active participants)
810        if records.len() >= MAX_BOOTSTRAP_RECORDS {
811            return Ok(records);
812        }
813
814        // Publish own records
815        let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
816        for (i, peer) in neighbors.iter().take(5).enumerate() {
817            active_peers[i] = *peer.as_bytes()
818        }
819
820        let mut last_message_hashes_array = [[0u8; 32]; 5];
821        for (i, hash) in last_message_hashes.iter().take(5).enumerate() {
822            last_message_hashes_array[i] = *hash;
823        }
824
825        let record = Record::sign(
826            topic_id.hash,
827            unix_minute,
828            *node_id.as_bytes(),
829            active_peers,
830            last_message_hashes_array,
831            node_signing_key,
832        );
833        Topic::<R>::publish_unix_minute_record(
834            unix_minute,
835            &topic_id.clone(),
836            secret_rotation_function.clone(),
837            initial_secret_hash,
838            record,
839            Some(3),
840        )
841        .await?;
842
843        Ok(records)
844    }
845
846    // Runs after bootstrap to keep anouncing the topic on mainline and help identify and merge network bubbles
847    fn spawn_publisher(
848        topic_id: TopicId,
849        secret_rotation_function: Option<R>,
850        initial_secret_hash: [u8; 32],
851        node_id: iroh::NodeId,
852        gossip_receiver: GossipReceiver,
853        gossip_sender: GossipSender,
854        node_signing_key: ed25519_dalek::SigningKey,
855    ) -> tokio::task::JoinHandle<()> {
856        let mut gossip_receiver = gossip_receiver;
857
858        tokio::spawn(async move {
859            let mut backoff = 1;
860            loop {
861                let unix_minute = crate::unix_minute(0);
862
863                // Run publish_proc() (publishing procedure that is aware of MAX_BOOTSTRAP_RECORDS already written)
864                if let Ok(records) = Topic::<R>::publish_proc(
865                    unix_minute,
866                    &topic_id.clone(),
867                    Some(secret_rotation_function.clone().unwrap_or_default()),
868                    initial_secret_hash,
869                    node_id,
870                    &node_signing_key,
871                    gossip_receiver.neighbors().await.unwrap_or_default(),
872                    gossip_receiver.last_message_hashes(),
873                )
874                .await
875                {
876                    // Cluster size as bubble indicator
877                    let neighbors = gossip_receiver.neighbors().await.unwrap_or_default();
878                    if neighbors.len() < 4 && !records.is_empty() {
879                        let node_ids = records
880                            .iter()
881                            .flat_map(|record| {
882                                record
883                                    .active_peers
884                                    .iter()
885                                    .filter_map(|&active_peer| {
886                                        if active_peer == [0; 32]
887                                            || neighbors.contains(&active_peer)
888                                            || active_peer.eq(record.node_id.to_vec().as_slice())
889                                            || active_peer.eq(node_id.as_bytes())
890                                        {
891                                            None
892                                        } else {
893                                            iroh::NodeId::from_bytes(&active_peer).ok()
894                                        }
895                                    })
896                                    .collect::<Vec<_>>()
897                            })
898                            .collect::<HashSet<_>>();
899                        if gossip_sender
900                            .join_peers(
901                                node_ids.iter().cloned().collect::<Vec<_>>(),
902                                Some(MAX_JOIN_PEERS_COUNT),
903                            )
904                            .await
905                            .is_ok()
906                        {
907                            //println!("group-merger -> joined peer {}", node_id);
908                        }
909                    }
910
911                    // Message overlap indicator
912                    if !gossip_receiver.last_message_hashes().is_empty() {
913                        let peers_to_join = records
914                            .iter()
915                            .filter(|record| {
916                                !record.last_message_hashes.iter().all(|last_message_hash| {
917                                    *last_message_hash != [0; 32]
918                                        && gossip_receiver
919                                            .last_message_hashes()
920                                            .contains(last_message_hash)
921                                })
922                            })
923                            .collect::<Vec<_>>();
924                        if !peers_to_join.is_empty() {
925                            let node_ids = peers_to_join
926                                .iter()
927                                .flat_map(|&record| {
928                                    let mut peers = vec![];
929                                    if let Ok(node_id) = iroh::NodeId::from_bytes(&record.node_id) {
930                                        peers.push(node_id);
931                                    }
932                                    for active_peer in record.active_peers {
933                                        if active_peer == [0; 32] {
934                                            continue;
935                                        }
936                                        if let Ok(node_id) = iroh::NodeId::from_bytes(&active_peer)
937                                        {
938                                            peers.push(node_id);
939                                        }
940                                    }
941                                    peers
942                                })
943                                .collect::<HashSet<_>>();
944
945                            if gossip_sender
946                                .join_peers(
947                                    node_ids.iter().cloned().collect::<Vec<_>>(),
948                                    Some(MAX_JOIN_PEERS_COUNT),
949                                )
950                                .await
951                                .is_ok()
952                            {
953                                /*println!(
954                                    "bouble detected: no-message-overlap -> joined {} peers",
955                                    node_ids.len()
956                                );*/
957                            }
958                        }
959                    }
960                } else {
961                    sleep(Duration::from_secs(backoff)).await;
962                    backoff = (backoff * 2).max(60);
963                    continue;
964                }
965
966                backoff = 1;
967                sleep(Duration::from_secs(rand::random::<u64>() % 60)).await;
968            }
969        })
970    }
971}
972
973// Basic building blocks
974impl<R: SecretRotation + Default + Clone + Send + 'static> Topic<R> {
975    fn signing_keypair(topic_id: &TopicId, unix_minute: u64) -> ed25519_dalek::SigningKey {
976        let mut sign_keypair_hash = sha2::Sha512::new();
977        sign_keypair_hash.update(topic_id.hash);
978        sign_keypair_hash.update(unix_minute.to_le_bytes());
979        let sign_keypair_seed: [u8; 32] = sign_keypair_hash.finalize()[..32]
980            .try_into()
981            .expect("hashing failed");
982        ed25519_dalek::SigningKey::from_bytes(&sign_keypair_seed)
983    }
984
985    fn encryption_keypair(
986        topic_id: &TopicId,
987        secret_rotation_function: &R,
988        initial_secret_hash: [u8; 32],
989        unix_minute: u64,
990    ) -> ed25519_dalek::SigningKey {
991        let enc_keypair_seed = secret_rotation_function.get_unix_minute_secret(
992            topic_id.hash,
993            unix_minute,
994            initial_secret_hash,
995        );
996        ed25519_dalek::SigningKey::from_bytes(&enc_keypair_seed)
997    }
998
999    // salt = hash (topic + unix_minute)
1000    fn salt(topic_id: &TopicId, unix_minute: u64) -> [u8; 32] {
1001        let mut slot_hash = sha2::Sha512::new();
1002        slot_hash.update(topic_id.hash);
1003        slot_hash.update(unix_minute.to_le_bytes());
1004        slot_hash.finalize()[..32]
1005            .try_into()
1006            .expect("hashing failed")
1007    }
1008
1009    async fn get_unix_minute_records(
1010        topic_id: &TopicId,
1011        unix_minute: u64,
1012        secret_rotation_function: Option<R>,
1013        initial_secret_hash: [u8; 32],
1014        node_id: &iroh::NodeId,
1015    ) -> HashSet<Record> {
1016        let topic_sign = Topic::<R>::signing_keypair(topic_id, unix_minute);
1017        let encryption_key = Topic::<R>::encryption_keypair(
1018            topic_id,
1019            &secret_rotation_function.clone().unwrap_or_default(),
1020            initial_secret_hash,
1021            unix_minute,
1022        );
1023        let salt = Topic::<R>::salt(topic_id, unix_minute);
1024
1025        // Get records, decrypt and verify
1026        let dht = get_dht();
1027
1028        let records_iter = timeout(
1029            Duration::from_secs(10),
1030            dht.get_mutable(topic_sign.verifying_key().as_bytes(), Some(&salt), None)
1031                .collect::<Vec<_>>(),
1032        )
1033        .await
1034        .unwrap_or_default();
1035
1036        records_iter
1037            .iter()
1038            .filter_map(
1039                |record| match EncryptedRecord::from_bytes(record.value().to_vec()) {
1040                    Ok(encrypted_record) => match encrypted_record.decrypt(&encryption_key) {
1041                        Ok(record) => match record.verify(&topic_id.hash, unix_minute) {
1042                            Ok(_) => match record.node_id.eq(node_id.as_bytes()) {
1043                                true => None,
1044                                false => Some(record),
1045                            },
1046                            Err(_) => None,
1047                        },
1048                        Err(_) => None,
1049                    },
1050                    Err(_) => None,
1051                },
1052            )
1053            .collect::<HashSet<_>>()
1054    }
1055
1056    async fn publish_unix_minute_record(
1057        unix_minute: u64,
1058        topic_id: &TopicId,
1059        secret_rotation_function: Option<R>,
1060        initial_secret_hash: [u8; 32],
1061        record: Record,
1062        retry_count: Option<usize>,
1063    ) -> Result<()> {
1064        let sign_key = Topic::<R>::signing_keypair(&topic_id.clone(), unix_minute);
1065        let salt = Topic::<R>::salt(topic_id, unix_minute);
1066        let encryption_key = Topic::<R>::encryption_keypair(
1067            &topic_id.clone(),
1068            &secret_rotation_function.clone().unwrap_or_default(),
1069            initial_secret_hash,
1070            unix_minute,
1071        );
1072        let encrypted_record = record.encrypt(&encryption_key);
1073
1074        for i in 0..retry_count.unwrap_or(3) {
1075            let dht = get_dht();
1076
1077            let most_recent_result = timeout(
1078                Duration::from_secs(10),
1079                dht.get_mutable_most_recent(
1080                    sign_key.clone().verifying_key().as_bytes(),
1081                    Some(&salt),
1082                ),
1083            )
1084            .await
1085            .unwrap_or_default();
1086
1087            let item = if let Some(mut_item) = most_recent_result {
1088                MutableItem::new(
1089                    sign_key.clone(),
1090                    &encrypted_record.to_bytes(),
1091                    mut_item.seq() + 1,
1092                    Some(&salt),
1093                )
1094            } else {
1095                MutableItem::new(
1096                    sign_key.clone(),
1097                    &encrypted_record.to_bytes(),
1098                    0,
1099                    Some(&salt),
1100                )
1101            };
1102
1103            let put_result = match timeout(
1104                Duration::from_secs(10),
1105                dht.put_mutable(item.clone(), Some(item.seq())),
1106            )
1107            .await
1108            {
1109                Ok(result) => result.ok(),
1110                Err(_) => None,
1111            };
1112
1113            if put_result.is_some() {
1114                break;
1115            } else if i == retry_count.unwrap_or(3) - 1 {
1116                bail!("failed to publish record")
1117            }
1118
1119            reset_dht().await;
1120
1121            sleep(Duration::from_millis(rand::random::<u64>() % 2000)).await;
1122        }
1123        Ok(())
1124    }
1125}
1126
1127pub trait AutoDiscoveryBuilder {
1128    #[allow(async_fn_in_trait)]
1129    async fn spawn_with_auto_discovery<R: SecretRotation + Default + Clone + Send + 'static>(
1130        self,
1131        endpoint: Endpoint,
1132        secret_rotation_function: Option<R>,
1133    ) -> Result<Gossip<R>>;
1134}
1135
1136impl AutoDiscoveryBuilder for iroh_gossip::net::Builder {
1137    async fn spawn_with_auto_discovery<R: SecretRotation + Default + Clone + Send + 'static>(
1138        self,
1139        endpoint: Endpoint,
1140        secret_rotation_function: Option<R>,
1141    ) -> Result<Gossip<R>> {
1142        Ok(Gossip {
1143            gossip: self.spawn(endpoint.clone()),
1144            endpoint: endpoint.clone(),
1145            secret_rotation_function: secret_rotation_function.unwrap_or_default(),
1146        })
1147    }
1148}
1149
1150pub trait AutoDiscoveryGossip<R: SecretRotation + Default + Clone + Send + 'static> {
1151    #[allow(async_fn_in_trait)]
1152    async fn subscribe_and_join_with_auto_discovery(
1153        &self,
1154        topic_id: TopicId,
1155        initial_secret: Vec<u8>,
1156    ) -> Result<Topic<R>>;
1157
1158    #[allow(async_fn_in_trait)]
1159    async fn subscribe_and_join_with_auto_discovery_no_wait(
1160        &self,
1161        topic_id: TopicId,
1162        initial_secret: Vec<u8>,
1163    ) -> Result<Topic<R>>;
1164}
1165
1166// Default secret rotation function
1167#[derive(Debug, Clone, Copy, Default)]
1168pub struct DefaultSecretRotation;
1169
1170pub trait SecretRotation {
1171    fn get_unix_minute_secret(
1172        &self,
1173        topic_hash: [u8; 32],
1174        unix_minute: u64,
1175        initial_secret_hash: [u8; 32],
1176    ) -> [u8; 32];
1177}
1178
1179impl<R: SecretRotation + Default + Clone + Send + 'static> AutoDiscoveryGossip<R> for Gossip<R> {
1180    async fn subscribe_and_join_with_auto_discovery(
1181        &self,
1182        topic_id: TopicId,
1183        initial_secret: Vec<u8>,
1184    ) -> Result<Topic<R>> {
1185        Topic::new(
1186            topic_id,
1187            &self.endpoint,
1188            self.endpoint.secret_key().secret(),
1189            self.gossip.clone(),
1190            &initial_secret,
1191            Some(self.secret_rotation_function.clone()),
1192            false,
1193        )
1194        .await
1195    }
1196
1197    async fn subscribe_and_join_with_auto_discovery_no_wait(
1198        &self,
1199        topic_id: TopicId,
1200        initial_secret: Vec<u8>,
1201    ) -> Result<Topic<R>> {
1202        Topic::new(
1203            topic_id,
1204            &self.endpoint,
1205            self.endpoint.secret_key().secret(),
1206            self.gossip.clone(),
1207            &initial_secret,
1208            Some(self.secret_rotation_function.clone()),
1209            true,
1210        )
1211        .await
1212    }
1213}
1214
1215impl SecretRotation for DefaultSecretRotation {
1216    fn get_unix_minute_secret(
1217        &self,
1218        topic_hash: [u8; 32],
1219        unix_minute: u64,
1220        initial_secret_hash: [u8; 32],
1221    ) -> [u8; 32] {
1222        let mut hash = sha2::Sha512::new();
1223        hash.update(topic_hash);
1224        hash.update(unix_minute.to_be_bytes());
1225        hash.update(initial_secret_hash);
1226        hash.finalize()[..32].try_into().expect("hashing failed")
1227    }
1228}
1229
1230pub fn unix_minute(minute_offset: i64) -> u64 {
1231    ((chrono::Utc::now().timestamp() as f64 / 60.0f64).floor() as i64 + minute_offset) as u64
1232}
1233
1234#[cfg(test)]
1235mod tests {
1236    use super::*;
1237    use ed25519_dalek::SigningKey;
1238    use rand::rngs::OsRng;
1239
1240    #[test]
1241    fn test_topic_id_creation() {
1242        let topic_id = TopicId::new("test-topic".to_string());
1243        assert_eq!(topic_id._raw, "test-topic");
1244        assert_eq!(topic_id.hash.len(), 32);
1245
1246        // Same input should produce same hash
1247        let topic_id2 = TopicId::new("test-topic".to_string());
1248        assert_eq!(topic_id.hash, topic_id2.hash);
1249
1250        // Different input should produce different hash
1251        let topic_id3 = TopicId::new("different-topic".to_string());
1252        assert_ne!(topic_id.hash, topic_id3.hash);
1253    }
1254
1255    #[test]
1256    fn test_record_serialization_roundtrip() {
1257        let signing_key = SigningKey::generate(&mut OsRng);
1258        let topic = [1u8; 32];
1259        let unix_minute = 12345u64;
1260        let node_id = [2u8; 32];
1261        let active_peers = [[3u8; 32]; 5];
1262        let last_message_hashes = [[4u8; 32]; 5];
1263
1264        let record = Record::sign(
1265            topic,
1266            unix_minute,
1267            node_id,
1268            active_peers,
1269            last_message_hashes,
1270            &signing_key,
1271        );
1272
1273        // Test serialization roundtrip
1274        let bytes = record.to_bytes();
1275        let deserialized = Record::from_bytes(bytes).unwrap();
1276
1277        assert_eq!(record.topic, deserialized.topic);
1278        assert_eq!(record.unix_minute, deserialized.unix_minute);
1279        assert_eq!(record.node_id, deserialized.node_id);
1280        assert_eq!(record.active_peers, deserialized.active_peers);
1281        assert_eq!(record.last_message_hashes, deserialized.last_message_hashes);
1282        assert_eq!(record.signature, deserialized.signature);
1283    }
1284
1285    #[test]
1286    fn test_record_verification() {
1287        let signing_key = SigningKey::generate(&mut OsRng);
1288        let topic = [1u8; 32];
1289        let unix_minute = 12345u64;
1290        let node_id = signing_key.verifying_key().to_bytes();
1291        let active_peers = [[3u8; 32]; 5];
1292        let last_message_hashes = [[4u8; 32]; 5];
1293
1294        let record = Record::sign(
1295            topic,
1296            unix_minute,
1297            node_id,
1298            active_peers,
1299            last_message_hashes,
1300            &signing_key,
1301        );
1302
1303        // Valid verification should pass
1304        assert!(record.verify(&topic, unix_minute).is_ok());
1305
1306        // Wrong topic should fail
1307        let wrong_topic = [99u8; 32];
1308        assert!(record.verify(&wrong_topic, unix_minute).is_err());
1309
1310        // Wrong unix_minute should fail
1311        assert!(record.verify(&topic, unix_minute + 1).is_err());
1312    }
1313
1314    #[test]
1315    fn test_encrypted_record_roundtrip() {
1316        let signing_key = SigningKey::generate(&mut OsRng);
1317        let encryption_key = SigningKey::generate(&mut OsRng);
1318        let topic = [1u8; 32];
1319        let unix_minute = 12345u64;
1320        let node_id = signing_key.verifying_key().to_bytes();
1321        let active_peers = [[3u8; 32]; 5];
1322        let last_message_hashes = [[4u8; 32]; 5];
1323
1324        let record = Record::sign(
1325            topic,
1326            unix_minute,
1327            node_id,
1328            active_peers,
1329            last_message_hashes,
1330            &signing_key,
1331        );
1332
1333        // Test encryption/decryption roundtrip
1334        let encrypted = record.encrypt(&encryption_key);
1335        let decrypted = encrypted.decrypt(&encryption_key).unwrap();
1336
1337        assert_eq!(record.topic, decrypted.topic);
1338        assert_eq!(record.unix_minute, decrypted.unix_minute);
1339        assert_eq!(record.node_id, decrypted.node_id);
1340        assert_eq!(record.active_peers, decrypted.active_peers);
1341        assert_eq!(record.last_message_hashes, decrypted.last_message_hashes);
1342        assert_eq!(record.signature, decrypted.signature);
1343    }
1344
1345    #[test]
1346    fn test_encrypted_record_serialization() {
1347        let signing_key = SigningKey::generate(&mut OsRng);
1348        let encryption_key = SigningKey::generate(&mut OsRng);
1349        let topic = [1u8; 32];
1350        let unix_minute = 12345u64;
1351        let node_id = signing_key.verifying_key().to_bytes();
1352        let active_peers = [[3u8; 32]; 5];
1353        let last_message_hashes = [[4u8; 32]; 5];
1354
1355        let record = Record::sign(
1356            topic,
1357            unix_minute,
1358            node_id,
1359            active_peers,
1360            last_message_hashes,
1361            &signing_key,
1362        );
1363
1364        let encrypted = record.encrypt(&encryption_key);
1365
1366        // Test serialization roundtrip
1367        let bytes = encrypted.to_bytes();
1368        let deserialized = EncryptedRecord::from_bytes(bytes).unwrap();
1369
1370        // Should be able to decrypt the deserialized version
1371        let decrypted = deserialized.decrypt(&encryption_key).unwrap();
1372        assert_eq!(record.topic, decrypted.topic);
1373        assert_eq!(record.unix_minute, decrypted.unix_minute);
1374    }
1375
1376    #[test]
1377    fn test_default_secret_rotation() {
1378        let rotation = DefaultSecretRotation;
1379        let topic_hash = [1u8; 32];
1380        let unix_minute = 12345u64;
1381        let initial_secret_hash = [2u8; 32];
1382
1383        let secret1 = rotation.get_unix_minute_secret(topic_hash, unix_minute, initial_secret_hash);
1384        let secret2 = rotation.get_unix_minute_secret(topic_hash, unix_minute, initial_secret_hash);
1385
1386        // Same inputs should produce same secret
1387        assert_eq!(secret1, secret2);
1388
1389        // Different unix_minute should produce different secret
1390        let secret3 =
1391            rotation.get_unix_minute_secret(topic_hash, unix_minute + 1, initial_secret_hash);
1392        assert_ne!(secret1, secret3);
1393
1394        // Different topic should produce different secret
1395        let different_topic = [99u8; 32];
1396        let secret4 =
1397            rotation.get_unix_minute_secret(different_topic, unix_minute, initial_secret_hash);
1398        assert_ne!(secret1, secret4);
1399    }
1400
1401    #[test]
1402    fn test_unix_minute_function() {
1403        let current = unix_minute(0);
1404        let prev = unix_minute(-1);
1405        let next = unix_minute(1);
1406
1407        assert_eq!(current, prev + 1);
1408        assert_eq!(next, current + 1);
1409
1410        // Should be deterministic
1411        let current2 = unix_minute(0);
1412        assert_eq!(current, current2);
1413    }
1414
1415    #[test]
1416    fn test_topic_signing_keypair_deterministic() {
1417        let topic_id = TopicId::new("test-topic".to_string());
1418        let unix_minute = 12345u64;
1419
1420        let key1 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute);
1421        let key2 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute);
1422
1423        // Same inputs should produce same keypair
1424        assert_eq!(key1.to_bytes(), key2.to_bytes());
1425
1426        // Different unix_minute should produce different keypair
1427        let key3 = Topic::<DefaultSecretRotation>::signing_keypair(&topic_id, unix_minute + 1);
1428        assert_ne!(key1.to_bytes(), key3.to_bytes());
1429    }
1430
1431    #[test]
1432    fn test_topic_encryption_keypair_deterministic() {
1433        let topic_id = TopicId::new("test-topic".to_string());
1434        let unix_minute = 12345u64;
1435        let initial_secret_hash = [1u8; 32];
1436        let rotation = DefaultSecretRotation;
1437
1438        let key1 = Topic::<DefaultSecretRotation>::encryption_keypair(
1439            &topic_id,
1440            &rotation,
1441            initial_secret_hash,
1442            unix_minute,
1443        );
1444        let key2 = Topic::<DefaultSecretRotation>::encryption_keypair(
1445            &topic_id,
1446            &rotation,
1447            initial_secret_hash,
1448            unix_minute,
1449        );
1450
1451        // Same inputs should produce same keypair
1452        assert_eq!(key1.to_bytes(), key2.to_bytes());
1453
1454        // Different unix_minute should produce different keypair
1455        let key3 = Topic::<DefaultSecretRotation>::encryption_keypair(
1456            &topic_id,
1457            &rotation,
1458            initial_secret_hash,
1459            unix_minute + 1,
1460        );
1461        assert_ne!(key1.to_bytes(), key3.to_bytes());
1462    }
1463
1464    #[test]
1465    fn test_topic_salt_deterministic() {
1466        let topic_id = TopicId::new("test-topic".to_string());
1467        let unix_minute = 12345u64;
1468
1469        let salt1 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute);
1470        let salt2 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute);
1471
1472        // Same inputs should produce same salt
1473        assert_eq!(salt1, salt2);
1474
1475        // Different unix_minute should produce different salt
1476        let salt3 = Topic::<DefaultSecretRotation>::salt(&topic_id, unix_minute + 1);
1477        assert_ne!(salt1, salt3);
1478    }
1479}