distributed_topic_tracker/crypto/
record.rs

1use std::{collections::HashSet, str::FromStr, time::Duration};
2
3use anyhow::{Result, bail};
4use ed25519_dalek::{Signer, SigningKey, VerifyingKey};
5
6use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
7use serde::{Deserialize, Serialize};
8use sha2::Digest;
9
10/// Topic identifier derived from a string via SHA512 hashing.
11///
12/// Used as a stable identifier for peer discovery records.
13///
14/// # Example
15///
16/// ```ignore
17/// let topic = RecordTopic::from_str("chat-app-v1")?;
18/// ```
19#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
20pub struct RecordTopic([u8; 32]);
21
22impl FromStr for RecordTopic {
23    type Err = anyhow::Error;
24
25    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
26        let mut hasher = sha2::Sha512::new();
27        hasher.update(s.as_bytes());
28        let hash: [u8; 32] = hasher.finalize()[..32]
29            .try_into()
30            .map_err(|_| anyhow::anyhow!("hashing failed"))?;
31        Ok(RecordTopic(hash))
32    }
33}
34
35impl RecordTopic {
36    /// Create from a pre-computed 32-byte hash.
37    pub fn from_bytes(bytes: &[u8; 32]) -> Self {
38        Self(*bytes)
39    }
40
41    /// Get the raw 32-byte hash.
42    pub fn hash(&self) -> [u8; 32] {
43        self.0
44    }
45}
46
47/// DHT record encrypted with HPKE.
48///
49/// Contains encrypted record data and encrypted decryption key.
50/// Decryption requires the corresponding private key.
51#[derive(Debug, Clone)]
52pub struct EncryptedRecord {
53    encrypted_record: Vec<u8>,
54    encrypted_decryption_key: Vec<u8>,
55}
56
57/// A signed DHT record containing peer discovery information.
58///
59/// Records are timestamped, signed, and include content about active peers
60/// and recent messages for bubble detection and message overlap merging.
61#[derive(Debug, Clone, PartialEq, Eq, Hash)]
62pub struct Record {
63    topic: [u8; 32],
64    unix_minute: u64,
65    pub_key: [u8; 32],
66    content: RecordContent,
67    signature: [u8; 64],
68}
69
70/// Serializable content of a DHT record.
71#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
72pub struct RecordContent(pub Vec<u8>);
73
74impl RecordContent {
75    /// Deserialize using postcard codec.
76    ///
77    /// # Example
78    ///
79    /// ```ignore
80    /// let content: GossipRecordContent = record_content.to()?;
81    /// ```
82    pub fn to<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
83        postcard::from_bytes::<T>(&self.0).map_err(|e| anyhow::anyhow!(e))
84    }
85
86    /// Serialize from an arbitrary type using postcard.
87    pub fn from_arbitrary<T: Serialize>(from: &T) -> anyhow::Result<Self> {
88        Ok(Self(
89            postcard::to_allocvec(from).map_err(|e| anyhow::anyhow!(e))?,
90        ))
91    }
92}
93
94/// Publisher for creating and distributing signed DHT records.
95///
96/// Checks existing DHT record count before publishing to respect capacity limits.
97#[derive(Debug, Clone)]
98pub struct RecordPublisher {
99    dht: crate::dht::Dht,
100
101    record_topic: RecordTopic,
102    pub_key: VerifyingKey,
103    signing_key: SigningKey,
104    secret_rotation: Option<crate::crypto::keys::RotationHandle>,
105    initial_secret_hash: [u8; 32],
106}
107
108impl RecordPublisher {
109    /// Create a new record publisher.
110    ///
111    /// # Arguments
112    ///
113    /// * `record_topic` - Topic identifier
114    /// * `pub_key` - Ed25519 public key (verifying key)
115    /// * `signing_key` - Ed25519 secret key (signing key)
116    /// * `secret_rotation` - Optional custom key rotation strategy
117    /// * `initial_secret` - Initial secret for key derivation
118    pub fn new(
119        record_topic: impl Into<RecordTopic>,
120        pub_key: VerifyingKey,
121        signing_key: SigningKey,
122        secret_rotation: Option<crate::crypto::keys::RotationHandle>,
123        initial_secret: Vec<u8>,
124    ) -> Self {
125        let mut initial_secret_hash = sha2::Sha512::new();
126        initial_secret_hash.update(initial_secret);
127        let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
128            .try_into()
129            .expect("hashing failed");
130
131        Self {
132            dht: crate::dht::Dht::new(),
133            record_topic: record_topic.into(),
134            pub_key,
135            signing_key,
136            secret_rotation,
137            initial_secret_hash,
138        }
139    }
140
141    /// Create a new signed record with content.
142    ///
143    /// # Arguments
144    ///
145    /// * `unix_minute` - Time slot for this record
146    /// * `record_content` - Serializable content
147    pub fn new_record<'a>(
148        &'a self,
149        unix_minute: u64,
150        record_content: impl Serialize + Deserialize<'a>,
151    ) -> Result<Record> {
152        Record::sign(
153            self.record_topic.hash(),
154            unix_minute,
155            self.pub_key.to_bytes(),
156            record_content,
157            &self.signing_key,
158        )
159    }
160
161    /// Get this publisher's Ed25519 verifying key.
162    pub fn pub_key(&self) -> ed25519_dalek::VerifyingKey {
163        self.pub_key
164    }
165
166    /// Get the record topic.
167    pub fn record_topic(&self) -> RecordTopic {
168        self.record_topic
169    }
170
171    /// Get the signing key.
172    pub fn signing_key(&self) -> ed25519_dalek::SigningKey {
173        self.signing_key.clone()
174    }
175
176    /// Get the secret rotation handle if set.
177    pub fn secret_rotation(&self) -> Option<crate::crypto::keys::RotationHandle> {
178        self.secret_rotation.clone()
179    }
180
181    /// Get the initial secret hash.
182    pub fn initial_secret_hash(&self) -> [u8; 32] {
183        self.initial_secret_hash
184    }
185}
186
187impl RecordPublisher {
188    /// Publish a record to the DHT if slot capacity allows.
189    ///
190    /// Checks existing record count for this time slot and skips publishing if
191    /// `MAX_BOOTSTRAP_RECORDS` limit reached.
192    pub async fn publish_record(&self, record: Record) -> Result<()> {
193        let records = self
194            .get_records(record.unix_minute())
195            .await
196            .iter()
197            .cloned()
198            .collect::<HashSet<_>>();
199
200        tracing::debug!(
201            "RecordPublisher: found {} existing records for unix_minute {}",
202            records.len(),
203            record.unix_minute()
204        );
205
206        if records.len() >= crate::MAX_BOOTSTRAP_RECORDS {
207            tracing::debug!(
208                "RecordPublisher: max records reached ({}), skipping publish",
209                crate::MAX_BOOTSTRAP_RECORDS
210            );
211            return Ok(());
212        }
213
214        // Publish own records
215        let sign_key = crate::crypto::keys::signing_keypair(self.record_topic, record.unix_minute);
216        let salt = crate::crypto::keys::salt(self.record_topic, record.unix_minute);
217        let encryption_key = crate::crypto::keys::encryption_keypair(
218            self.record_topic,
219            &self.secret_rotation.clone().unwrap_or_default(),
220            self.initial_secret_hash,
221            record.unix_minute,
222        );
223        let encrypted_record = record.encrypt(&encryption_key);
224
225        tracing::debug!(
226            "RecordPublisher: publishing record to DHT for unix_minute {}",
227            record.unix_minute()
228        );
229
230        self.dht
231            .put_mutable(
232                sign_key.clone(),
233                sign_key.verifying_key(),
234                Some(salt.to_vec()),
235                encrypted_record.to_bytes().to_vec(),
236                Some(3),
237                Duration::from_secs(10),
238            )
239            .await?;
240
241        tracing::debug!("RecordPublisher: successfully published to DHT");
242        Ok(())
243    }
244
245    /// Retrieve all verified records for a given time slot from the DHT.
246    ///
247    /// Filters out records from this publisher's own node ID.
248    pub async fn get_records(&self, unix_minute: u64) -> HashSet<Record> {
249        tracing::debug!(
250            "RecordPublisher: fetching records from DHT for unix_minute {}",
251            unix_minute
252        );
253
254        let topic_sign = crate::crypto::keys::signing_keypair(self.record_topic, unix_minute);
255        let encryption_key = crate::crypto::keys::encryption_keypair(
256            self.record_topic,
257            &self.secret_rotation.clone().unwrap_or_default(),
258            self.initial_secret_hash,
259            unix_minute,
260        );
261        let salt = crate::crypto::keys::salt(self.record_topic, unix_minute);
262
263        // Get records, decrypt and verify
264        let records_iter = self
265            .dht
266            .get(
267                topic_sign.verifying_key(),
268                Some(salt.to_vec()),
269                None,
270                Duration::from_secs(10),
271            )
272            .await
273            .unwrap_or_default();
274
275        tracing::debug!(
276            "RecordPublisher: received {} raw records from DHT",
277            records_iter.len()
278        );
279
280        let verified_records = records_iter
281            .iter()
282            .filter_map(
283                |record| match EncryptedRecord::from_bytes(record.value().to_vec()) {
284                    Ok(encrypted_record) => match encrypted_record.decrypt(&encryption_key) {
285                        Ok(record) => match record.verify(&self.record_topic.hash(), unix_minute) {
286                            Ok(_) => match record.node_id().eq(self.pub_key.as_bytes()) {
287                                true => {
288                                    tracing::debug!("RecordPublisher: filtered out self");
289                                    None
290                                }
291                                false => Some(record),
292                            },
293                            Err(_) => None,
294                        },
295                        Err(_) => None,
296                    },
297                    Err(_) => None,
298                },
299            )
300            .collect::<HashSet<_>>();
301
302        tracing::debug!(
303            "RecordPublisher: verified {} records (filtered self)",
304            verified_records.len()
305        );
306        verified_records
307    }
308}
309
310impl EncryptedRecord {
311    /// Decrypt using an Ed25519 HPKE private key.
312    pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
313        let one_time_key_bytes: [u8; 32] = decryption_key
314            .decrypt(&self.encrypted_decryption_key)?
315            .as_slice()
316            .try_into()?;
317        let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
318
319        let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
320        let record = Record::from_bytes(decrypted_record)?;
321        Ok(record)
322    }
323
324    /// Serialize to bytes (length-prefixed format).
325    pub fn to_bytes(&self) -> Vec<u8> {
326        let mut buf = Vec::new();
327        let encrypted_record_len = self.encrypted_record.len() as u32;
328        buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
329        buf.extend_from_slice(&self.encrypted_record);
330        buf.extend_from_slice(&self.encrypted_decryption_key);
331        buf
332    }
333
334    /// Deserialize from bytes.
335    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
336        let (encrypted_record_len, buf) = buf.split_at(4);
337        let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
338        let (encrypted_record, encrypted_decryption_key) =
339            buf.split_at(encrypted_record_len as usize);
340
341        Ok(Self {
342            encrypted_record: encrypted_record.to_vec(),
343            encrypted_decryption_key: encrypted_decryption_key.to_vec(),
344        })
345    }
346}
347
348impl Record {
349    /// Create and sign a new record.
350    pub fn sign<'a>(
351        topic: [u8; 32],
352        unix_minute: u64,
353        node_id: [u8; 32],
354        record_content: impl Serialize + Deserialize<'a>,
355        signing_key: &ed25519_dalek::SigningKey,
356    ) -> anyhow::Result<Self> {
357        let record_content = RecordContent::from_arbitrary(&record_content)?;
358        let mut signature_data = Vec::new();
359        signature_data.extend_from_slice(&topic);
360        signature_data.extend_from_slice(&unix_minute.to_le_bytes());
361        signature_data.extend_from_slice(&node_id);
362        signature_data.extend(&record_content.clone().0);
363        let signing_key = signing_key.clone();
364        let signature = signing_key.sign(&signature_data);
365        Ok(Self {
366            topic,
367            unix_minute,
368            pub_key: node_id,
369            content: record_content,
370            signature: signature.to_bytes(),
371        })
372    }
373
374    /// Deserialize from bytes.
375    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
376        let (topic, buf) = buf.split_at(32);
377        let (unix_minute, buf) = buf.split_at(8);
378        let (node_id, buf) = buf.split_at(32);
379        let (record_content, buf) = buf.split_at(buf.len() - 64);
380
381        let (signature, buf) = buf.split_at(64);
382
383        if !buf.is_empty() {
384            bail!("buffer not empty after reconstruction")
385        }
386
387        Ok(Self {
388            topic: topic.try_into()?,
389            unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
390            pub_key: node_id.try_into()?,
391            content: RecordContent(record_content.to_vec()),
392            signature: signature.try_into()?,
393        })
394    }
395
396    /// Serialize to bytes.
397    pub fn to_bytes(&self) -> Vec<u8> {
398        let mut buf = Vec::new();
399        buf.extend_from_slice(&self.topic);
400        buf.extend_from_slice(&self.unix_minute.to_le_bytes());
401        buf.extend_from_slice(&self.pub_key);
402        buf.extend(&self.content.0);
403        buf.extend_from_slice(&self.signature);
404        buf
405    }
406
407    /// Verify signature against topic and timestamp.
408    pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
409        if self.topic != *actual_topic {
410            bail!("topic mismatch")
411        }
412        if self.unix_minute != actual_unix_minute {
413            bail!("unix minute mismatch")
414        }
415
416        let record_bytes = self.to_bytes();
417        let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
418        let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
419        let node_id = ed25519_dalek::VerifyingKey::from_bytes(&self.pub_key)?;
420
421        node_id.verify_strict(signature_data.as_slice(), &signature)?;
422
423        Ok(())
424    }
425
426    /// Encrypt record with HPKE.
427    pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
428        let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::rng());
429        let p_key = one_time_key.verifying_key();
430        let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
431        let key_enc = encryption_key
432            .verifying_key()
433            .encrypt(&one_time_key.to_bytes())
434            .expect("encryption failed");
435
436        EncryptedRecord {
437            encrypted_record: data_enc,
438            encrypted_decryption_key: key_enc,
439        }
440    }
441}
442
443// Field accessors
444impl Record {
445    /// Get the topic hash.
446    pub fn topic(&self) -> [u8; 32] {
447        self.topic
448    }
449
450    /// Get the Unix minute timestamp.
451    pub fn unix_minute(&self) -> u64 {
452        self.unix_minute
453    }
454
455    /// Get the node ID (publisher's public key).
456    pub fn node_id(&self) -> [u8; 32] {
457        self.pub_key
458    }
459
460    /// Deserialize the record content.
461    pub fn content<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
462        self.content.to::<T>()
463    }
464
465    /// Get the raw signature bytes.
466    pub fn signature(&self) -> [u8; 64] {
467        self.signature
468    }
469}