distributed_topic_tracker/crypto/
record.rs

1use std::{collections::HashSet, str::FromStr, time::Duration};
2
3use anyhow::{Result, bail};
4use ed25519_dalek::{Signer, SigningKey, VerifyingKey};
5
6use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
7use serde::{Deserialize, Serialize};
8use sha2::Digest;
9
10/// Topic identifier derived from a string via SHA512 hashing.
11///
12/// Used as a stable identifier for peer discovery records.
13///
14/// # Example
15///
16/// ```ignore
17/// let topic = RecordTopic::from_str("chat-app-v1")?;
18/// ```
19#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
20pub struct RecordTopic([u8; 32]);
21
22impl FromStr for RecordTopic {
23    type Err = anyhow::Error;
24
25    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
26        let mut hasher = sha2::Sha512::new();
27        hasher.update(s.as_bytes());
28        let hash: [u8; 32] = hasher.finalize()[..32]
29            .try_into()
30            .map_err(|_| anyhow::anyhow!("hashing failed"))?;
31        Ok(RecordTopic(hash))
32    }
33}
34
35impl RecordTopic {
36    /// Create from a pre-computed 32-byte hash.
37    pub fn from_bytes(bytes: &[u8; 32]) -> Self {
38        Self(*bytes)
39    }
40
41    /// Get the raw 32-byte hash.
42    pub fn hash(&self) -> [u8; 32] {
43        self.0
44    }
45}
46
47/// DHT record encrypted with HPKE.
48///
49/// Contains encrypted record data and encrypted decryption key.
50/// Decryption requires the corresponding private key.
51#[derive(Debug, Clone)]
52pub struct EncryptedRecord {
53    encrypted_record: Vec<u8>,
54    encrypted_decryption_key: Vec<u8>,
55}
56
57/// A signed DHT record containing peer discovery information.
58///
59/// Records are timestamped, signed, and include content about active peers
60/// and recent messages for bubble detection and message overlap merging.
61#[derive(Debug, Clone, PartialEq, Eq, Hash)]
62pub struct Record {
63    topic: [u8; 32],
64    unix_minute: u64,
65    pub_key: [u8; 32],
66    content: RecordContent,
67    signature: [u8; 64],
68}
69
70/// Serializable content of a DHT record.
71#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
72pub struct RecordContent(pub Vec<u8>);
73
74impl RecordContent {
75    /// Deserialize using postcard codec.
76    ///
77    /// # Example
78    ///
79    /// ```ignore
80    /// let content: GossipRecordContent = record_content.to()?;
81    /// ```
82    pub fn to<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
83        postcard::from_bytes::<T>(&self.0).map_err(|e| anyhow::anyhow!(e))
84    }
85    
86    /// Serialize from an arbitrary type using postcard.
87    pub fn from_arbitrary<T: Serialize>(from: &T) -> anyhow::Result<Self> {
88        Ok(Self(
89            postcard::to_allocvec(from).map_err(|e| anyhow::anyhow!(e))?,
90        ))
91    }
92}
93
94/// Publisher for creating and distributing signed DHT records.
95///
96/// Checks existing DHT record count before publishing to respect capacity limits.
97#[derive(Debug, Clone)]
98pub struct RecordPublisher {
99    dht: crate::dht::Dht,
100
101    record_topic: RecordTopic,
102    pub_key: VerifyingKey,
103    signing_key: SigningKey,
104    secret_rotation: Option<crate::crypto::keys::RotationHandle>,
105    initial_secret_hash: [u8; 32],
106}
107
108impl RecordPublisher {
109    /// Create a new record publisher.
110    ///
111    /// # Arguments
112    ///
113    /// * `record_topic` - Topic identifier
114    /// * `pub_key` - Ed25519 public key (verifying key)
115    /// * `signing_key` - Ed25519 secret key (signing key)
116    /// * `secret_rotation` - Optional custom key rotation strategy
117    /// * `initial_secret` - Initial secret for key derivation
118    pub fn new(
119        record_topic: impl Into<RecordTopic>,
120        pub_key: VerifyingKey,
121        signing_key: SigningKey,
122        secret_rotation: Option<crate::crypto::keys::RotationHandle>,
123        initial_secret: Vec<u8>,
124    ) -> Self {
125        let mut initial_secret_hash = sha2::Sha512::new();
126        initial_secret_hash.update(initial_secret);
127        let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
128            .try_into()
129            .expect("hashing failed");
130
131        Self {
132            dht: crate::dht::Dht::new(),
133            record_topic: record_topic.into(),
134            pub_key,
135            signing_key,
136            secret_rotation,
137            initial_secret_hash,
138        }
139    }
140
141    /// Create a new signed record with content.
142    ///
143    /// # Arguments
144    ///
145    /// * `unix_minute` - Time slot for this record
146    /// * `record_content` - Serializable content
147    pub fn new_record<'a>(
148        &'a self,
149        unix_minute: u64,
150        record_content: impl Serialize + Deserialize<'a>,
151    ) -> Result<Record> {
152        Record::sign(
153            self.record_topic.hash(),
154            unix_minute,
155            self.pub_key.to_bytes(),
156            record_content,
157            &self.signing_key,
158        )
159    }
160
161    /// Get this publisher's Ed25519 verifying key.
162    pub fn pub_key(&self) -> ed25519_dalek::VerifyingKey {
163        self.pub_key
164    }
165
166    /// Get the record topic.
167    pub fn record_topic(&self) -> RecordTopic {
168        self.record_topic
169    }
170
171    /// Get the signing key.
172    pub fn signing_key(&self) -> ed25519_dalek::SigningKey {
173        self.signing_key.clone()
174    }
175
176    /// Get the secret rotation handle if set.
177    pub fn secret_rotation(&self) -> Option<crate::crypto::keys::RotationHandle> {
178        self.secret_rotation.clone()
179    }
180
181    /// Get the initial secret hash.
182    pub fn initial_secret_hash(&self) -> [u8; 32] {
183        self.initial_secret_hash
184    }
185}
186
187impl RecordPublisher {
188    /// Publish a record to the DHT if slot capacity allows.
189    ///
190    /// Checks existing record count for this time slot and skips publishing if
191    /// `MAX_BOOTSTRAP_RECORDS` limit reached.
192    pub async fn publish_record(&self, record: Record) -> Result<()> {
193        let records = self
194            .get_records(record.unix_minute())
195            .await
196            .iter()
197            .cloned()
198            .collect::<HashSet<_>>();
199
200        tracing::debug!("RecordPublisher: found {} existing records for unix_minute {}", records.len(), record.unix_minute());
201
202        if records.len() >= crate::MAX_BOOTSTRAP_RECORDS {
203            tracing::debug!("RecordPublisher: max records reached ({}), skipping publish", crate::MAX_BOOTSTRAP_RECORDS);
204            return Ok(());
205        }
206
207        // Publish own records
208        let sign_key = crate::crypto::keys::signing_keypair(self.record_topic, record.unix_minute);
209        let salt = crate::crypto::keys::salt(self.record_topic, record.unix_minute);
210        let encryption_key = crate::crypto::keys::encryption_keypair(
211            self.record_topic,
212            &self.secret_rotation.clone().unwrap_or_default(),
213            self.initial_secret_hash,
214            record.unix_minute,
215        );
216        let encrypted_record = record.encrypt(&encryption_key);
217
218        tracing::debug!("RecordPublisher: publishing record to DHT for unix_minute {}", record.unix_minute());
219        
220        self.dht
221            .put_mutable(
222                sign_key.clone(),
223                sign_key.verifying_key(),
224                Some(salt.to_vec()),
225                encrypted_record.to_bytes().to_vec(),
226                Some(3),
227                Duration::from_secs(10),
228            )
229            .await?;
230
231        tracing::debug!("RecordPublisher: successfully published to DHT");
232        Ok(())
233    }
234
235    /// Retrieve all verified records for a given time slot from the DHT.
236    ///
237    /// Filters out records from this publisher's own node ID.
238    pub async fn get_records(&self, unix_minute: u64) -> HashSet<Record> {
239        tracing::debug!("RecordPublisher: fetching records from DHT for unix_minute {}", unix_minute);
240        
241        let topic_sign = crate::crypto::keys::signing_keypair(self.record_topic, unix_minute);
242        let encryption_key = crate::crypto::keys::encryption_keypair(
243            self.record_topic,
244            &self.secret_rotation.clone().unwrap_or_default(),
245            self.initial_secret_hash,
246            unix_minute,
247        );
248        let salt = crate::crypto::keys::salt(self.record_topic, unix_minute);
249
250        // Get records, decrypt and verify
251        let records_iter = self
252            .dht
253            .get(
254                topic_sign.verifying_key(),
255                Some(salt.to_vec()),
256                None,
257                Duration::from_secs(10),
258            )
259            .await
260            .unwrap_or_default();
261        
262        tracing::debug!("RecordPublisher: received {} raw records from DHT", records_iter.len());
263
264        let verified_records = records_iter
265            .iter()
266            .filter_map(
267                |record| match EncryptedRecord::from_bytes(record.value().to_vec()) {
268                    Ok(encrypted_record) => match encrypted_record.decrypt(&encryption_key) {
269                        Ok(record) => match record.verify(&self.record_topic.hash(), unix_minute) {
270                            Ok(_) => match record.node_id().eq(self.pub_key.as_bytes()) {
271                                true => {
272                                    tracing::debug!("RecordPublisher: filtered out self");
273                                    None
274                                },
275                                false => Some(record),
276                            },
277                            Err(_) => None,
278                        },
279                        Err(_) => None,
280                    },
281                    Err(_) => None,
282                },
283            )
284            .collect::<HashSet<_>>();
285        
286        tracing::debug!("RecordPublisher: verified {} records (filtered self)", verified_records.len());
287        verified_records
288    }
289}
290
291impl EncryptedRecord {
292    /// Decrypt using an Ed25519 HPKE private key.
293    pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
294        let one_time_key_bytes: [u8; 32] = decryption_key
295            .decrypt(&self.encrypted_decryption_key)?
296            .as_slice()
297            .try_into()?;
298        let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
299
300        let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
301        let record = Record::from_bytes(decrypted_record)?;
302        Ok(record)
303    }
304
305    /// Serialize to bytes (length-prefixed format).
306    pub fn to_bytes(&self) -> Vec<u8> {
307        let mut buf = Vec::new();
308        let encrypted_record_len = self.encrypted_record.len() as u32;
309        buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
310        buf.extend_from_slice(&self.encrypted_record);
311        buf.extend_from_slice(&self.encrypted_decryption_key);
312        buf
313    }
314
315    /// Deserialize from bytes.
316    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
317        let (encrypted_record_len, buf) = buf.split_at(4);
318        let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
319        let (encrypted_record, encrypted_decryption_key) =
320            buf.split_at(encrypted_record_len as usize);
321
322        Ok(Self {
323            encrypted_record: encrypted_record.to_vec(),
324            encrypted_decryption_key: encrypted_decryption_key.to_vec(),
325        })
326    }
327}
328
329impl Record {
330    /// Create and sign a new record.
331    pub fn sign<'a>(
332        topic: [u8; 32],
333        unix_minute: u64,
334        node_id: [u8; 32],
335        record_content: impl Serialize + Deserialize<'a>,
336        signing_key: &ed25519_dalek::SigningKey,
337    ) -> anyhow::Result<Self> {
338        let record_content = RecordContent::from_arbitrary(&record_content)?;
339        let mut signature_data = Vec::new();
340        signature_data.extend_from_slice(&topic);
341        signature_data.extend_from_slice(&unix_minute.to_le_bytes());
342        signature_data.extend_from_slice(&node_id);
343        signature_data.extend(&record_content.clone().0);
344        let signing_key = signing_key.clone();
345        let signature = signing_key.sign(&signature_data);
346        Ok(Self {
347            topic,
348            unix_minute,
349            pub_key: node_id,
350            content: record_content,
351            signature: signature.to_bytes(),
352        })
353    }
354
355    /// Deserialize from bytes.
356    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
357        let (topic, buf) = buf.split_at(32);
358        let (unix_minute, buf) = buf.split_at(8);
359        let (node_id, buf) = buf.split_at(32);
360        let (record_content, buf) = buf.split_at(buf.len() - 64);
361
362        let (signature, buf) = buf.split_at(64);
363
364        if !buf.is_empty() {
365            bail!("buffer not empty after reconstruction")
366        }
367
368        Ok(Self {
369            topic: topic.try_into()?,
370            unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
371            pub_key: node_id.try_into()?,
372            content: RecordContent(record_content.to_vec()),
373            signature: signature.try_into()?,
374        })
375    }
376
377    /// Serialize to bytes.
378    pub fn to_bytes(&self) -> Vec<u8> {
379        let mut buf = Vec::new();
380        buf.extend_from_slice(&self.topic);
381        buf.extend_from_slice(&self.unix_minute.to_le_bytes());
382        buf.extend_from_slice(&self.pub_key);
383        buf.extend(&self.content.0);
384        buf.extend_from_slice(&self.signature);
385        buf
386    }
387
388    /// Verify signature against topic and timestamp.
389    pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
390        if self.topic != *actual_topic {
391            bail!("topic mismatch")
392        }
393        if self.unix_minute != actual_unix_minute {
394            bail!("unix minute mismatch")
395        }
396
397        let record_bytes = self.to_bytes();
398        let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
399        let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
400        let node_id = ed25519_dalek::VerifyingKey::from_bytes(&self.pub_key)?;
401
402        node_id.verify_strict(signature_data.as_slice(), &signature)?;
403
404        Ok(())
405    }
406
407    /// Encrypt record with HPKE.
408    pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
409        let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::rng());
410        let p_key = one_time_key.verifying_key();
411        let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
412        let key_enc = encryption_key
413            .verifying_key()
414            .encrypt(&one_time_key.to_bytes())
415            .expect("encryption failed");
416
417        EncryptedRecord {
418            encrypted_record: data_enc,
419            encrypted_decryption_key: key_enc,
420        }
421    }
422}
423
424// Field accessors
425impl Record {
426    /// Get the topic hash.
427    pub fn topic(&self) -> [u8; 32] {
428        self.topic
429    }
430
431    /// Get the Unix minute timestamp.
432    pub fn unix_minute(&self) -> u64 {
433        self.unix_minute
434    }
435
436    /// Get the node ID (publisher's public key).
437    pub fn node_id(&self) -> [u8; 32] {
438        self.pub_key
439    }
440
441    /// Deserialize the record content.
442    pub fn content<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
443        self.content.to::<T>()
444    }
445
446    /// Get the raw signature bytes.
447    pub fn signature(&self) -> [u8; 64] {
448        self.signature
449    }
450}