Skip to main content

distributed_topic_tracker/crypto/
record.rs

1use std::{
2    collections::{HashMap, HashSet},
3    str::FromStr,
4};
5
6use anyhow::{Result, bail};
7use ed25519_dalek::{Signer, SigningKey, VerifyingKey};
8
9use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
10use serde::{Deserialize, Serialize};
11use sha2::Digest;
12use tokio_util::sync::CancellationToken;
13
14use crate::Config;
15
16/// Topic identifier derived from a string via SHA512 hashing.
17///
18/// Used as the stable identifier for gossip subscriptions and DHT records.
19///
20/// # Example
21///
22/// ```ignore
23/// let topic_id = TopicId::new("chat-room-1".to_string());
24/// ```
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub struct TopicId([u8; 32]);
27
28impl FromStr for TopicId {
29    type Err = anyhow::Error;
30
31    fn from_str(topic_name: &str) -> std::result::Result<Self, Self::Err> {
32        Ok(Self::new(topic_name.to_string()))
33    }
34}
35
36impl From<&str> for TopicId {
37    fn from(topic_name: &str) -> Self {
38        Self::new(topic_name.to_string())
39    }
40}
41
42impl From<String> for TopicId {
43    fn from(topic_name: String) -> Self {
44        Self::new(topic_name)
45    }
46}
47/// Treats `bytes` as a topic *name* and SHA-512 hashes them.
48/// For a pre-computed 32-byte hash, use [`TopicId::from_hash`] instead.
49impl From<Vec<u8>> for TopicId {
50    fn from(topic_name: Vec<u8>) -> Self {
51        Self::new(topic_name)
52    }
53}
54
55impl TopicId {
56    /// Create a new topic ID from a string.
57    ///
58    /// String is hashed with SHA512; the first 32 bytes produce the identifier.
59    pub fn new(topic_name: impl Into<Vec<u8>>) -> Self {
60        let mut topic_name_hash = sha2::Sha512::new();
61        topic_name_hash.update(topic_name.into());
62
63        Self(
64            topic_name_hash.finalize()[..32]
65                .try_into()
66                .expect("hashing 'topic_name' failed"),
67        )
68    }
69
70    /// Create from a pre-computed 32-byte hash.
71    pub fn from_hash(bytes: &[u8; 32]) -> Self {
72        Self(*bytes)
73    }
74
75    /// Get the hash bytes.
76    pub fn hash(&self) -> [u8; 32] {
77        self.0
78    }
79}
80
81/// DHT record encrypted with HPKE.
82///
83/// Contains encrypted record data and encrypted decryption key.
84/// Decryption requires the corresponding private key.
85#[derive(Debug, Clone)]
86pub struct EncryptedRecord {
87    encrypted_record: Vec<u8>,
88    encrypted_decryption_key: Vec<u8>,
89}
90
91/// A signed DHT record containing peer discovery information.
92///
93/// Records are timestamped, signed, and include content about active peers
94/// and recent messages for bubble detection and message overlap merging.
95#[derive(Debug, Clone, PartialEq, Eq, Hash)]
96pub struct Record {
97    topic: [u8; 32],
98    unix_minute: u64,
99    pub_key: [u8; 32],
100    content: RecordContent,
101    signature: [u8; 64],
102}
103
104/// Serializable content of a DHT record.
105#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
106pub struct RecordContent(pub Vec<u8>);
107
108impl RecordContent {
109    /// Deserialize using postcard codec.
110    ///
111    /// # Example
112    ///
113    /// ```ignore
114    /// let content: GossipRecordContent = record_content.to()?;
115    /// ```
116    pub fn to<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
117        postcard::from_bytes::<T>(&self.0).map_err(|e| anyhow::anyhow!(e))
118    }
119
120    /// Serialize from an arbitrary type using postcard.
121    pub fn from_arbitrary<T: Serialize>(from: &T) -> anyhow::Result<Self> {
122        Ok(Self(
123            postcard::to_allocvec(from).map_err(|e| anyhow::anyhow!(e))?,
124        ))
125    }
126}
127
128/// Publisher for creating and distributing signed DHT records.
129///
130/// Checks existing DHT record count before publishing to respect capacity limits.
131#[derive(Debug, Clone)]
132pub struct RecordPublisher {
133    dht: crate::dht::Dht,
134
135    config: crate::config::Config,
136
137    topic_id: TopicId,
138    pub_key: VerifyingKey,
139    signing_key: SigningKey,
140    secret_rotation: Option<crate::crypto::keys::RotationHandle>,
141    initial_secret_hash: [u8; 32],
142}
143
144/// Builder for `RecordPublisher`.
145#[derive(Debug)]
146pub struct RecordPublisherBuilder {
147    topic_id: TopicId,
148    signing_key: SigningKey,
149    secret_rotation: Option<crate::crypto::keys::RotationHandle>,
150    initial_secret: Vec<u8>,
151    config: crate::config::Config,
152}
153
154impl RecordPublisherBuilder {
155    /// Set a custom secret rotation strategy.
156    pub fn secret_rotation(mut self, secret_rotation: crate::crypto::keys::RotationHandle) -> Self {
157        self.secret_rotation = Some(secret_rotation);
158        self
159    }
160
161    /// Set the configuration.
162    pub fn config(mut self, config: crate::config::Config) -> Self {
163        self.config = config;
164        self
165    }
166
167    /// Build the `RecordPublisher`.
168    pub fn build(self) -> RecordPublisher {
169        RecordPublisher::new(
170            self.topic_id,
171            self.signing_key,
172            self.secret_rotation,
173            self.initial_secret,
174            self.config,
175        )
176    }
177}
178
179impl RecordPublisher {
180    /// Create a new `RecordPublisherBuilder`.
181    pub fn builder(
182        topic_id: impl Into<TopicId>,
183        signing_key: SigningKey,
184        initial_secret: impl Into<Vec<u8>>,
185    ) -> RecordPublisherBuilder {
186        RecordPublisherBuilder {
187            topic_id: topic_id.into(),
188            signing_key,
189            secret_rotation: None,
190            initial_secret: initial_secret.into(),
191            config: crate::config::Config::default(),
192        }
193    }
194
195    /// Create a new record publisher.
196    ///
197    /// # Arguments
198    ///
199    /// * `topic_id` - Topic identifier
200    /// * `signing_key` - Ed25519 secret key (signing key)
201    /// * `secret_rotation` - Optional custom key rotation strategy
202    /// * `initial_secret` - Initial secret for key derivation
203    /// * `config` - Configuration settings
204    pub fn new(
205        topic_id: impl Into<TopicId>,
206        signing_key: SigningKey,
207        secret_rotation: Option<crate::crypto::keys::RotationHandle>,
208        initial_secret: impl Into<Vec<u8>>,
209        config: crate::config::Config,
210    ) -> Self {
211        let mut initial_secret_hash = sha2::Sha512::new();
212        initial_secret_hash.update(initial_secret.into());
213        let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
214            .try_into()
215            .expect("hashing failed");
216
217        Self {
218            dht: crate::dht::Dht::new(config.dht_config()),
219            config,
220            topic_id: topic_id.into(),
221            pub_key: signing_key.verifying_key(),
222            signing_key,
223            secret_rotation,
224            initial_secret_hash,
225        }
226    }
227
228    /// Create a new signed record with content.
229    ///
230    /// # Arguments
231    ///
232    /// * `unix_minute` - Time slot for this record
233    /// * `record_content` - Serializable content
234    pub fn new_record<'a>(
235        &'a self,
236        unix_minute: u64,
237        record_content: impl Serialize + Deserialize<'a>,
238    ) -> Result<Record> {
239        Record::sign(
240            self.topic_id.hash(),
241            unix_minute,
242            record_content,
243            &self.signing_key,
244        )
245    }
246
247    /// Get this publisher's Ed25519 verifying key.
248    pub fn pub_key(&self) -> ed25519_dalek::VerifyingKey {
249        self.pub_key
250    }
251
252    /// Get TopicId.
253    pub fn topic_id(&self) -> &TopicId {
254        &self.topic_id
255    }
256
257    /// Get the signing key.
258    pub fn signing_key(&self) -> &ed25519_dalek::SigningKey {
259        &self.signing_key
260    }
261
262    /// Get the secret rotation handle if set.
263    pub fn secret_rotation(&self) -> Option<crate::crypto::keys::RotationHandle> {
264        self.secret_rotation.clone()
265    }
266
267    /// Get the initial secret hash.
268    pub fn initial_secret_hash(&self) -> [u8; 32] {
269        self.initial_secret_hash
270    }
271
272    /// Get the configuration.
273    pub fn config(&self) -> &Config {
274        &self.config
275    }
276}
277
278impl RecordPublisher {
279    /// Publish a record to the DHT if slot capacity allows.
280    ///
281    /// Checks existing record count for this time slot and skips publishing if
282    /// `self.config.bootstrap_config().max_bootstrap_records()` limit reached.
283    pub async fn publish_record(&self, record: Record, cancel_token: CancellationToken) -> Result<()> {
284        self.publish_record_cached_records(record, None, cancel_token).await
285    }
286
287    /// Publish a record to the DHT (using cached get_records) if slot capacity allows.
288    ///
289    /// Checks existing record count for this time slot and skips publishing if
290    /// `self.config.bootstrap_config().max_bootstrap_records()` limit reached.
291    pub async fn publish_record_cached_records(
292        &self,
293        record: Record,
294        cached_records: Option<HashSet<Record>>,
295        cancel_token: CancellationToken,
296    ) -> Result<()> {
297        let publish_fut = async {
298            let records = match cached_records {
299                Some(records) => records,
300                None => self.get_records(record.unix_minute(), cancel_token.clone()).await?,
301            };
302
303            tracing::debug!(
304                "RecordPublisher: found {} existing records for unix_minute {}",
305                records.len(),
306                record.unix_minute()
307            );
308
309            if records.len() >= self.config.bootstrap_config().max_bootstrap_records() {
310                tracing::debug!(
311                    "RecordPublisher: max records reached ({}), skipping publish",
312                    self.config.bootstrap_config().max_bootstrap_records()
313                );
314                return Ok(());
315            }
316
317            // Publish own records
318            let sign_key = crate::crypto::keys::signing_keypair(self.topic_id(), record.unix_minute);
319            let salt = crate::crypto::keys::salt(self.topic_id(), record.unix_minute);
320            let encryption_key = crate::crypto::keys::encryption_keypair(
321                self.topic_id(),
322                &self.secret_rotation.clone().unwrap_or_default(),
323                self.initial_secret_hash,
324                record.unix_minute,
325            );
326            let encrypted_record = record.encrypt(&encryption_key);
327            let next_seq_num = i64::MAX;
328
329            tracing::debug!(
330                "RecordPublisher: publishing record to DHT for unix_minute {}",
331                record.unix_minute()
332            );
333
334            self.dht
335                .put_mutable(
336                    sign_key.clone(),
337                    Some(salt.to_vec()),
338                    encrypted_record.to_bytes()?,
339                    next_seq_num,
340                )
341                .await?;
342
343            tracing::debug!("RecordPublisher: successfully published to DHT");
344            Ok(())
345        };
346
347        tokio::select! {
348            _ = cancel_token.cancelled() => {
349                anyhow::bail!("publish cancelled");
350            }
351            res = publish_fut => {
352                res
353            }
354        }
355    }
356
357    /// Retrieve all verified records for a given time slot from the DHT.
358    ///
359    /// Filters out records from this publisher's own node ID.
360    /// Dedup's records based on pub_key, keeping the highest sequence number per pub_key.
361    pub async fn get_records(&self, unix_minute: u64, cancel_token: CancellationToken) -> Result<HashSet<Record>> {
362        let get_fut = async {
363            tracing::debug!(
364                "RecordPublisher: fetching records from DHT for unix_minute {}",
365                unix_minute
366            );
367
368            let topic_sign = crate::crypto::keys::signing_keypair(self.topic_id(), unix_minute);
369            let encryption_key = crate::crypto::keys::encryption_keypair(
370                self.topic_id(),
371                &self.secret_rotation.clone().unwrap_or_default(),
372                self.initial_secret_hash,
373                unix_minute,
374            );
375            let salt = crate::crypto::keys::salt(self.topic_id(), unix_minute);
376
377            // Get records, decrypt and verify
378            let records_iter = self
379                .dht
380                .get(topic_sign.verifying_key(), Some(salt.to_vec()), None)
381                .await?;
382
383            tracing::debug!(
384                "RecordPublisher: received {} raw records from DHT",
385                records_iter.len()
386            );
387
388            let mut dedubed_records = HashMap::new();
389            for item in records_iter {
390                if let Ok(encrypted_record) = EncryptedRecord::from_bytes(item.value().to_vec())
391                    && let Ok(record) = encrypted_record.decrypt(&encryption_key)
392                    && record.verify(&self.topic_id.hash(), unix_minute).is_ok()
393                    && !record.pub_key().eq(self.pub_key.as_bytes())
394                {
395                    let pub_key = record.pub_key();
396                    match dedubed_records.get(&pub_key) {
397                        Some((seq, _)) if *seq >= item.seq() => {}
398                        _ => {
399                            dedubed_records.insert(pub_key, (item.seq(), record));
400                        }
401                    }
402                }
403            }
404            tracing::debug!(
405                "RecordPublisher: verified {} records (filtered self)",
406                dedubed_records.len()
407            );
408
409            Ok(dedubed_records
410                .into_values()
411                .map(|(_, record)| record)
412                .collect::<HashSet<_>>())
413        };
414
415        tokio::select! {
416            _ = cancel_token.cancelled() => {
417                anyhow::bail!("get_records cancelled");
418            }
419            res = get_fut => {
420                res
421            }
422        }
423    }
424}
425
426impl EncryptedRecord {
427    const MAX_SIZE: usize = 2048;
428
429    /// Decrypt using an Ed25519 HPKE private key.
430    pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
431        let one_time_key_bytes: [u8; 32] = decryption_key
432            .decrypt(&self.encrypted_decryption_key)?
433            .as_slice()
434            .try_into()?;
435        let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
436
437        let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
438        let record = Record::from_bytes(decrypted_record)?;
439        Ok(record)
440    }
441
442    /// Serialize to bytes (length-prefixed format).
443    pub fn to_bytes(&self) -> Result<Vec<u8>> {
444        let mut buf = Vec::new();
445        let encrypted_record_len = self.encrypted_record.len() as u32;
446        buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
447        buf.extend_from_slice(&self.encrypted_record);
448        buf.extend_from_slice(&self.encrypted_decryption_key);
449
450        if buf.len() > Self::MAX_SIZE {
451            bail!(
452                "EncryptedRecord serialization exceeds maximum size, the max is set generously so this should never happen, if so your code is using it manually"
453            );
454        }
455        Ok(buf)
456    }
457
458    /// Deserialize from bytes.
459    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
460        if buf.len() < 4 {
461            bail!("buffer too short for EncryptedRecord deserialization")
462        }
463        let (encrypted_record_len, buf) = buf.split_at(4);
464        let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
465        const ENCRYPTED_KEY_LENGTH: usize = 88;
466        let expected_payload_len = encrypted_record_len
467            .checked_add(ENCRYPTED_KEY_LENGTH as u32)
468            .ok_or_else(|| anyhow::anyhow!("encrypted record length overflow"))?;
469        if encrypted_record_len > Self::MAX_SIZE as u32 {
470            bail!("encrypted record length exceeds maximum allowed size")
471        }
472        if buf.len() != expected_payload_len as usize {
473            bail!("buffer length does not match expected encrypted record length")
474        }
475        let (encrypted_record, encrypted_decryption_key) =
476            buf.split_at(encrypted_record_len as usize);
477
478        Ok(Self {
479            encrypted_record: encrypted_record.to_vec(),
480            encrypted_decryption_key: encrypted_decryption_key.to_vec(),
481        })
482    }
483}
484
485impl Record {
486    /// Create and sign a new record.
487    pub fn sign<'a>(
488        topic: [u8; 32],
489        unix_minute: u64,
490        record_content: impl Serialize + Deserialize<'a>,
491        signing_key: &ed25519_dalek::SigningKey,
492    ) -> anyhow::Result<Self> {
493        let record_content = RecordContent::from_arbitrary(&record_content)?;
494        let mut signature_data = Vec::new();
495        signature_data.extend_from_slice(&topic);
496        signature_data.extend_from_slice(&unix_minute.to_le_bytes());
497        signature_data.extend_from_slice(&signing_key.verifying_key().to_bytes());
498        signature_data.extend(&record_content.clone().0);
499        let signature = signing_key.sign(&signature_data);
500        Ok(Self {
501            topic,
502            unix_minute,
503            pub_key: signing_key.verifying_key().to_bytes(),
504            content: record_content,
505            signature: signature.to_bytes(),
506        })
507    }
508
509    /// Deserialize from bytes.
510    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
511        if buf.len() < 32 + 8 + 32 + 64 {
512            bail!("buffer too short for Record deserialization")
513        }
514        let (topic, buf) = buf.split_at(32);
515        let (unix_minute, buf) = buf.split_at(8);
516        let (pub_key, buf) = buf.split_at(32);
517        let (record_content, buf) = buf.split_at(buf.len() - 64);
518
519        let (signature, buf) = buf.split_at(64);
520
521        if !buf.is_empty() {
522            bail!("buffer not empty after reconstruction")
523        }
524
525        Ok(Self {
526            topic: topic.try_into()?,
527            unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
528            pub_key: pub_key.try_into()?,
529            content: RecordContent(record_content.to_vec()),
530            signature: signature.try_into()?,
531        })
532    }
533
534    /// Serialize to bytes.
535    pub fn to_bytes(&self) -> Vec<u8> {
536        let mut buf = Vec::new();
537        buf.extend_from_slice(&self.topic);
538        buf.extend_from_slice(&self.unix_minute.to_le_bytes());
539        buf.extend_from_slice(&self.pub_key);
540        buf.extend(&self.content.0);
541        buf.extend_from_slice(&self.signature);
542        buf
543    }
544
545    /// Verify signature against topic and timestamp.
546    pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
547        if self.topic != *actual_topic {
548            bail!("topic mismatch")
549        }
550        if self.unix_minute != actual_unix_minute {
551            bail!("unix minute mismatch")
552        }
553
554        let record_bytes = self.to_bytes();
555        let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
556        let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
557        let pub_key = ed25519_dalek::VerifyingKey::from_bytes(&self.pub_key)?;
558
559        pub_key.verify_strict(signature_data.as_slice(), &signature)?;
560
561        Ok(())
562    }
563
564    /// Encrypt record with HPKE.
565    pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
566        let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::rng());
567        let p_key = one_time_key.verifying_key();
568        let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
569        let key_enc = encryption_key
570            .verifying_key()
571            .encrypt(&one_time_key.to_bytes())
572            .expect("encryption failed");
573
574        EncryptedRecord {
575            encrypted_record: data_enc,
576            encrypted_decryption_key: key_enc,
577        }
578    }
579}
580
581// Field accessors
582impl Record {
583    /// Get the topic hash.
584    pub fn topic(&self) -> [u8; 32] {
585        self.topic
586    }
587
588    /// Get the Unix minute timestamp.
589    pub fn unix_minute(&self) -> u64 {
590        self.unix_minute
591    }
592
593    /// Get the pub_key (publisher's public key).
594    pub fn pub_key(&self) -> [u8; 32] {
595        self.pub_key
596    }
597
598    /// Deserialize the record content.
599    pub fn content<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
600        self.content.to::<T>()
601    }
602
603    /// Get the raw signature bytes.
604    pub fn signature(&self) -> [u8; 64] {
605        self.signature
606    }
607}