Skip to main content

distributed_topic_tracker/crypto/
record.rs

1use std::{
2    collections::{HashMap, HashSet},
3    str::FromStr,
4};
5
6use anyhow::{Result, bail};
7use ed25519_dalek::{Signer, SigningKey, VerifyingKey};
8use getrandom::{SysRng, rand_core::UnwrapErr};
9
10use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
11use serde::{Deserialize, Serialize};
12use sha2::Digest;
13use tokio_util::sync::CancellationToken;
14
15use crate::{Config, Dht, RotationHandle};
16
17/// Topic identifier derived from a string via SHA512 hashing.
18///
19/// Used as the stable identifier for gossip subscriptions and DHT records.
20///
21/// # Example
22///
23/// ```ignore
24/// let topic_id = TopicId::new("chat-room-1".to_string());
25/// ```
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct TopicId([u8; 32]);
28
29impl FromStr for TopicId {
30    type Err = anyhow::Error;
31
32    fn from_str(topic_name: &str) -> std::result::Result<Self, Self::Err> {
33        Ok(Self::new(topic_name.to_string()))
34    }
35}
36
37impl From<&str> for TopicId {
38    fn from(topic_name: &str) -> Self {
39        Self::new(topic_name.to_string())
40    }
41}
42
43impl From<String> for TopicId {
44    fn from(topic_name: String) -> Self {
45        Self::new(topic_name)
46    }
47}
48/// Treats `bytes` as a topic *name* and SHA-512 hashes them.
49/// For a pre-computed 32-byte hash, use [`TopicId::from_hash`] instead.
50impl From<Vec<u8>> for TopicId {
51    fn from(topic_name: Vec<u8>) -> Self {
52        Self::new(topic_name)
53    }
54}
55
56impl TopicId {
57    /// Create a new topic ID from a string.
58    ///
59    /// String is hashed with SHA512; the first 32 bytes produce the identifier.
60    pub fn new(topic_name: impl Into<Vec<u8>>) -> Self {
61        let mut topic_name_hash = sha2::Sha512::new();
62        topic_name_hash.update(topic_name.into());
63
64        Self(
65            topic_name_hash.finalize()[..32]
66                .try_into()
67                .expect("hashing 'topic_name' failed"),
68        )
69    }
70
71    /// Create from a pre-computed 32-byte hash.
72    pub fn from_hash(bytes: &[u8; 32]) -> Self {
73        Self(*bytes)
74    }
75
76    /// Get the hash bytes.
77    pub fn hash(&self) -> [u8; 32] {
78        self.0
79    }
80}
81
82/// DHT record encrypted with HPKE.
83///
84/// Contains encrypted record data and encrypted decryption key.
85/// Decryption requires the corresponding private key.
86#[derive(Debug, Clone)]
87pub struct EncryptedRecord {
88    encrypted_record: Vec<u8>,
89    encrypted_decryption_key: Vec<u8>,
90}
91
92/// A signed DHT record containing peer discovery information.
93///
94/// Records are timestamped, signed, and include content about active peers
95/// and recent messages for bubble detection and message overlap merging.
96#[derive(Debug, Clone, PartialEq, Eq, Hash)]
97pub struct Record {
98    topic: [u8; 32],
99    unix_minute: u64,
100    pub_key: [u8; 32],
101    content: RecordContent,
102    signature: [u8; 64],
103}
104
105/// Serializable content of a DHT record.
106#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
107pub struct RecordContent(pub Vec<u8>);
108
109impl RecordContent {
110    /// Deserialize using postcard codec.
111    ///
112    /// # Example
113    ///
114    /// ```ignore
115    /// let content: GossipRecordContent = record_content.to()?;
116    /// ```
117    pub fn to<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
118        postcard::from_bytes::<T>(&self.0).map_err(|e| anyhow::anyhow!(e))
119    }
120
121    /// Serialize from an arbitrary type using postcard.
122    pub fn from_arbitrary<T: Serialize>(from: &T) -> anyhow::Result<Self> {
123        Ok(Self(
124            postcard::to_allocvec(from).map_err(|e| anyhow::anyhow!(e))?,
125        ))
126    }
127}
128
129/// Publisher for creating and distributing signed DHT records.
130///
131/// Checks existing DHT record count before publishing to respect capacity limits.
132#[derive(Debug, Clone)]
133pub struct RecordPublisher {
134    dht: Dht,
135
136    config: Config,
137
138    topic_id: TopicId,
139    pub_key: VerifyingKey,
140    signing_key: SigningKey,
141    secret_rotation: Option<RotationHandle>,
142    initial_secret_hash: [u8; 32],
143}
144
145/// Builder for `RecordPublisher`.
146#[derive(Debug)]
147pub struct RecordPublisherBuilder {
148    topic_id: TopicId,
149    signing_key: SigningKey,
150    secret_rotation: Option<RotationHandle>,
151    initial_secret: Vec<u8>,
152    config: Config,
153}
154
155impl RecordPublisherBuilder {
156    /// Set a custom secret rotation strategy.
157    pub fn secret_rotation(mut self, secret_rotation: RotationHandle) -> Self {
158        self.secret_rotation = Some(secret_rotation);
159        self
160    }
161
162    /// Set the configuration.
163    pub fn config(mut self, config: Config) -> Self {
164        self.config = config;
165        self
166    }
167
168    /// Build the `RecordPublisher`.
169    pub fn build(self) -> RecordPublisher {
170        RecordPublisher::new(
171            self.topic_id,
172            self.signing_key,
173            self.secret_rotation,
174            self.initial_secret,
175            self.config,
176        )
177    }
178}
179
180impl RecordPublisher {
181    /// Create a new `RecordPublisherBuilder`.
182    pub fn builder(
183        topic_id: impl Into<TopicId>,
184        signing_key: SigningKey,
185        initial_secret: impl Into<Vec<u8>>,
186    ) -> RecordPublisherBuilder {
187        RecordPublisherBuilder {
188            topic_id: topic_id.into(),
189            signing_key,
190            secret_rotation: None,
191            initial_secret: initial_secret.into(),
192            config: Config::default(),
193        }
194    }
195
196    /// Create a new record publisher.
197    ///
198    /// # Arguments
199    ///
200    /// * `topic_id` - Topic identifier
201    /// * `signing_key` - Ed25519 secret key (signing key)
202    /// * `secret_rotation` - Optional custom key rotation strategy
203    /// * `initial_secret` - Initial secret for key derivation
204    /// * `config` - Configuration settings
205    pub fn new(
206        topic_id: impl Into<TopicId>,
207        signing_key: SigningKey,
208        secret_rotation: Option<RotationHandle>,
209        initial_secret: impl Into<Vec<u8>>,
210        config: Config,
211    ) -> Self {
212        let mut initial_secret_hash = sha2::Sha512::new();
213        initial_secret_hash.update(initial_secret.into());
214        let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
215            .try_into()
216            .expect("hashing failed");
217
218        Self {
219            dht: Dht::new(config.dht_config()),
220            config,
221            topic_id: topic_id.into(),
222            pub_key: signing_key.verifying_key(),
223            signing_key,
224            secret_rotation,
225            initial_secret_hash,
226        }
227    }
228
229    /// Create a new signed record with content.
230    ///
231    /// # Arguments
232    ///
233    /// * `unix_minute` - Time slot for this record
234    /// * `record_content` - Serializable content
235    pub fn new_record<'a>(
236        &'a self,
237        unix_minute: u64,
238        record_content: impl Serialize + Deserialize<'a>,
239    ) -> Result<Record> {
240        Record::sign(
241            self.topic_id.hash(),
242            unix_minute,
243            record_content,
244            &self.signing_key,
245        )
246    }
247
248    /// Get this publisher's Ed25519 verifying key.
249    pub fn pub_key(&self) -> ed25519_dalek::VerifyingKey {
250        self.pub_key
251    }
252
253    /// Get TopicId.
254    pub fn topic_id(&self) -> &TopicId {
255        &self.topic_id
256    }
257
258    /// Get the signing key.
259    pub fn signing_key(&self) -> &ed25519_dalek::SigningKey {
260        &self.signing_key
261    }
262
263    /// Get the secret rotation handle if set.
264    pub fn secret_rotation(&self) -> Option<RotationHandle> {
265        self.secret_rotation.clone()
266    }
267
268    /// Get the initial secret hash.
269    pub fn initial_secret_hash(&self) -> [u8; 32] {
270        self.initial_secret_hash
271    }
272
273    /// Get the configuration.
274    pub fn config(&self) -> &Config {
275        &self.config
276    }
277}
278
279impl RecordPublisher {
280    /// Publish a record to the DHT if slot capacity allows.
281    ///
282    /// Checks existing record count for this time slot and skips publishing if
283    /// `self.config.bootstrap_config().max_bootstrap_records()` limit reached.
284    pub async fn publish_record(&self, record: Record, cancel_token: CancellationToken) -> Result<()> {
285        self.publish_record_cached_records(record, None, cancel_token).await
286    }
287
288    /// Publish a record to the DHT (using cached get_records) if slot capacity allows.
289    ///
290    /// Checks existing record count for this time slot and skips publishing if
291    /// `self.config.bootstrap_config().max_bootstrap_records()` limit reached.
292    pub async fn publish_record_cached_records(
293        &self,
294        record: Record,
295        cached_records: Option<HashSet<Record>>,
296        cancel_token: CancellationToken,
297    ) -> Result<()> {
298        let publish_fut = async {
299            let records = match cached_records {
300                Some(records) => records,
301                None => self.get_records(record.unix_minute(), cancel_token.clone()).await?,
302            };
303
304            tracing::debug!(
305                "RecordPublisher: found {} existing records for unix_minute {}",
306                records.len(),
307                record.unix_minute()
308            );
309
310            if records.len() >= self.config.bootstrap_config().max_bootstrap_records() {
311                tracing::debug!(
312                    "RecordPublisher: max records reached ({}), skipping publish",
313                    self.config.bootstrap_config().max_bootstrap_records()
314                );
315                return Ok(());
316            }
317
318            // Publish own records
319            let sign_key = crate::crypto::keys::signing_keypair(self.topic_id(), record.unix_minute);
320            let salt = crate::crypto::keys::salt(self.topic_id(), record.unix_minute);
321            let encryption_key = crate::crypto::keys::encryption_keypair(
322                self.topic_id(),
323                &self.secret_rotation.clone().unwrap_or_default(),
324                self.initial_secret_hash,
325                record.unix_minute,
326            );
327            let encrypted_record = record.encrypt(&encryption_key);
328            let next_seq_num = i64::MAX;
329
330            tracing::debug!(
331                "RecordPublisher: publishing record to DHT for unix_minute {}",
332                record.unix_minute()
333            );
334
335            self.dht
336                .put_mutable(
337                    sign_key.clone(),
338                    Some(salt.to_vec()),
339                    encrypted_record.to_bytes()?,
340                    next_seq_num,
341                )
342                .await?;
343
344            tracing::debug!("RecordPublisher: successfully published to DHT");
345            Ok(())
346        };
347
348        tokio::select! {
349            _ = cancel_token.cancelled() => {
350                anyhow::bail!("publish cancelled");
351            }
352            res = publish_fut => {
353                res
354            }
355        }
356    }
357
358    /// Retrieve all verified records for a given time slot from the DHT.
359    ///
360    /// Filters out records from this publisher's own node ID.
361    /// Dedup's records based on pub_key, keeping the highest sequence number per pub_key.
362    pub async fn get_records(&self, unix_minute: u64, cancel_token: CancellationToken) -> Result<HashSet<Record>> {
363        let get_fut = async {
364            tracing::debug!(
365                "RecordPublisher: fetching records from DHT for unix_minute {}",
366                unix_minute
367            );
368
369            let topic_sign = crate::crypto::keys::signing_keypair(self.topic_id(), unix_minute);
370            let encryption_key = crate::crypto::keys::encryption_keypair(
371                self.topic_id(),
372                &self.secret_rotation.clone().unwrap_or_default(),
373                self.initial_secret_hash,
374                unix_minute,
375            );
376            let salt = crate::crypto::keys::salt(self.topic_id(), unix_minute);
377
378            // Get records, decrypt and verify
379            let records_iter = self
380                .dht
381                .get(topic_sign.verifying_key(), Some(salt.to_vec()), None)
382                .await?;
383
384            tracing::debug!(
385                "RecordPublisher: received {} raw records from DHT",
386                records_iter.len()
387            );
388
389            let mut dedubed_records = HashMap::new();
390            for item in records_iter {
391                if let Ok(encrypted_record) = EncryptedRecord::from_bytes(item.value().to_vec())
392                    && let Ok(record) = encrypted_record.decrypt(&encryption_key)
393                    && record.verify(&self.topic_id.hash(), unix_minute).is_ok()
394                    && !record.pub_key().eq(self.pub_key.as_bytes())
395                {
396                    let pub_key = record.pub_key();
397                    match dedubed_records.get(&pub_key) {
398                        Some((seq, _)) if *seq >= item.seq() => {}
399                        _ => {
400                            dedubed_records.insert(pub_key, (item.seq(), record));
401                        }
402                    }
403                }
404            }
405            tracing::debug!(
406                "RecordPublisher: verified {} records (filtered self)",
407                dedubed_records.len()
408            );
409
410            Ok(dedubed_records
411                .into_values()
412                .map(|(_, record)| record)
413                .collect::<HashSet<_>>())
414        };
415
416        tokio::select! {
417            _ = cancel_token.cancelled() => {
418                anyhow::bail!("get_records cancelled");
419            }
420            res = get_fut => {
421                res
422            }
423        }
424    }
425}
426
427impl EncryptedRecord {
428    const MAX_SIZE: usize = 2048;
429
430    /// Decrypt using an Ed25519 HPKE private key.
431    pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
432        let one_time_key_bytes: [u8; 32] = decryption_key
433            .decrypt(&self.encrypted_decryption_key)?
434            .as_slice()
435            .try_into()?;
436        let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
437
438        let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
439        let record = Record::from_bytes(decrypted_record)?;
440        Ok(record)
441    }
442
443    /// Serialize to bytes (length-prefixed format).
444    pub fn to_bytes(&self) -> Result<Vec<u8>> {
445        let mut buf = Vec::new();
446        let encrypted_record_len = self.encrypted_record.len() as u32;
447        buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
448        buf.extend_from_slice(&self.encrypted_record);
449        buf.extend_from_slice(&self.encrypted_decryption_key);
450
451        if buf.len() > Self::MAX_SIZE {
452            bail!(
453                "EncryptedRecord serialization exceeds maximum size, the max is set generously so this should never happen, if so your code is using it manually"
454            );
455        }
456        Ok(buf)
457    }
458
459    /// Deserialize from bytes.
460    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
461        if buf.len() < 4 {
462            bail!("buffer too short for EncryptedRecord deserialization")
463        }
464        let (encrypted_record_len, buf) = buf.split_at(4);
465        let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
466        const ENCRYPTED_KEY_LENGTH: usize = 88;
467        let expected_payload_len = encrypted_record_len
468            .checked_add(ENCRYPTED_KEY_LENGTH as u32)
469            .ok_or_else(|| anyhow::anyhow!("encrypted record length overflow"))?;
470        if encrypted_record_len > Self::MAX_SIZE as u32 {
471            bail!("encrypted record length exceeds maximum allowed size")
472        }
473        if buf.len() != expected_payload_len as usize {
474            bail!("buffer length does not match expected encrypted record length")
475        }
476        let (encrypted_record, encrypted_decryption_key) =
477            buf.split_at(encrypted_record_len as usize);
478
479        Ok(Self {
480            encrypted_record: encrypted_record.to_vec(),
481            encrypted_decryption_key: encrypted_decryption_key.to_vec(),
482        })
483    }
484}
485
486impl Record {
487    /// Create and sign a new record.
488    pub fn sign<'a>(
489        topic: [u8; 32],
490        unix_minute: u64,
491        record_content: impl Serialize + Deserialize<'a>,
492        signing_key: &ed25519_dalek::SigningKey,
493    ) -> anyhow::Result<Self> {
494        let record_content = RecordContent::from_arbitrary(&record_content)?;
495        let mut signature_data = Vec::new();
496        signature_data.extend_from_slice(&topic);
497        signature_data.extend_from_slice(&unix_minute.to_le_bytes());
498        signature_data.extend_from_slice(&signing_key.verifying_key().to_bytes());
499        signature_data.extend(&record_content.clone().0);
500        let signature = signing_key.sign(&signature_data);
501        Ok(Self {
502            topic,
503            unix_minute,
504            pub_key: signing_key.verifying_key().to_bytes(),
505            content: record_content,
506            signature: signature.to_bytes(),
507        })
508    }
509
510    /// Deserialize from bytes.
511    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
512        if buf.len() < 32 + 8 + 32 + 64 {
513            bail!("buffer too short for Record deserialization")
514        }
515        let (topic, buf) = buf.split_at(32);
516        let (unix_minute, buf) = buf.split_at(8);
517        let (pub_key, buf) = buf.split_at(32);
518        let (record_content, buf) = buf.split_at(buf.len() - 64);
519
520        let (signature, buf) = buf.split_at(64);
521
522        if !buf.is_empty() {
523            bail!("buffer not empty after reconstruction")
524        }
525
526        Ok(Self {
527            topic: topic.try_into()?,
528            unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
529            pub_key: pub_key.try_into()?,
530            content: RecordContent(record_content.to_vec()),
531            signature: signature.try_into()?,
532        })
533    }
534
535    /// Serialize to bytes.
536    pub fn to_bytes(&self) -> Vec<u8> {
537        let mut buf = Vec::new();
538        buf.extend_from_slice(&self.topic);
539        buf.extend_from_slice(&self.unix_minute.to_le_bytes());
540        buf.extend_from_slice(&self.pub_key);
541        buf.extend(&self.content.0);
542        buf.extend_from_slice(&self.signature);
543        buf
544    }
545
546    /// Verify signature against topic and timestamp.
547    pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
548        if self.topic != *actual_topic {
549            bail!("topic mismatch")
550        }
551        if self.unix_minute != actual_unix_minute {
552            bail!("unix minute mismatch")
553        }
554
555        let record_bytes = self.to_bytes();
556        let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
557        let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
558        let pub_key = ed25519_dalek::VerifyingKey::from_bytes(&self.pub_key)?;
559
560        pub_key.verify_strict(signature_data.as_slice(), &signature)?;
561
562        Ok(())
563    }
564
565    /// Encrypt record with HPKE.
566    pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
567        let mut csprng = UnwrapErr(SysRng);
568        let one_time_key = ed25519_dalek::SigningKey::generate(&mut csprng);
569        let p_key = one_time_key.verifying_key();
570        let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
571        let key_enc = encryption_key
572            .verifying_key()
573            .encrypt(&one_time_key.to_bytes())
574            .expect("encryption failed");
575
576        EncryptedRecord {
577            encrypted_record: data_enc,
578            encrypted_decryption_key: key_enc,
579        }
580    }
581}
582
583// Field accessors
584impl Record {
585    /// Get the topic hash.
586    pub fn topic(&self) -> [u8; 32] {
587        self.topic
588    }
589
590    /// Get the Unix minute timestamp.
591    pub fn unix_minute(&self) -> u64 {
592        self.unix_minute
593    }
594
595    /// Get the pub_key (publisher's public key).
596    pub fn pub_key(&self) -> [u8; 32] {
597        self.pub_key
598    }
599
600    /// Deserialize the record content.
601    pub fn content<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
602        self.content.to::<T>()
603    }
604
605    /// Get the raw signature bytes.
606    pub fn signature(&self) -> [u8; 64] {
607        self.signature
608    }
609}