distributed_topic_tracker/crypto/
record.rs

1use std::{collections::HashSet, str::FromStr, time::Duration};
2
3use anyhow::{Result, bail};
4use ed25519_dalek::{ed25519::signature::SignerMut, VerifyingKey, SigningKey};
5use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
6use sha2::Digest;
7
8#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
9pub struct RecordTopic([u8; 32]);
10
11impl FromStr for RecordTopic {
12    type Err = anyhow::Error;
13
14    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
15        let mut hasher = sha2::Sha512::new();
16        hasher.update(s.as_bytes());
17        let hash: [u8; 32] = hasher.finalize()[..32]
18            .try_into()
19            .map_err(|_| anyhow::anyhow!("hashing failed"))?;
20        Ok(RecordTopic(hash))
21
22    }
23}
24
25impl RecordTopic {
26    pub fn from_bytes(bytes: &[u8; 32]) -> Self {
27        Self(*bytes)
28    }
29
30    pub fn hash(&self) -> [u8; 32] {
31        self.0
32    }
33}
34
35#[derive(Debug, Clone)]
36pub struct EncryptedRecord {
37    encrypted_record: Vec<u8>,
38    encrypted_decryption_key: Vec<u8>,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Hash)]
42pub struct Record {
43    topic: [u8; 32],
44    unix_minute: u64,
45    pub_key: [u8; 32],
46    active_peers: [[u8; 32]; 5],
47    last_message_hashes: [[u8; 32]; 5],
48    signature: [u8; 64],
49}
50
51#[derive(Debug,Clone)]
52pub struct RecordPublisher {
53    dht: crate::dht::Dht,
54
55    record_topic: RecordTopic,
56    pub_key: VerifyingKey,
57    signing_key: SigningKey,
58    secret_rotation: Option<crate::crypto::keys::RotationHandle>,
59    initial_secret_hash: [u8; 32],
60}
61
62impl RecordPublisher {
63    pub fn new(
64        record_topic: impl Into<RecordTopic>,
65        node_id: VerifyingKey,
66        signing_key: SigningKey,
67        secret_rotation: Option<crate::crypto::keys::RotationHandle>,
68        initial_secret: Vec<u8>,
69    ) -> Self {
70        let mut initial_secret_hash = sha2::Sha512::new();
71        initial_secret_hash.update(initial_secret);
72        let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
73            .try_into()
74            .expect("hashing failed");
75
76        Self {
77            dht: crate::dht::Dht::new(),
78            record_topic: record_topic.into(),
79            pub_key: node_id,
80            signing_key,
81            secret_rotation,
82            initial_secret_hash,
83        }
84    }
85
86    pub fn new_record(
87        &self,
88        unix_minute: u64,
89        neighbors: Vec<ed25519_dalek::VerifyingKey>,
90        last_message_hashes: Vec<[u8; 32]>,
91    ) -> Record {
92        let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
93        for (i, peer) in neighbors.iter().take(5).enumerate() {
94            active_peers[i] = *peer.as_bytes()
95        }
96
97        let mut last_message_hashes_array = [[0u8; 32]; 5];
98        for (i, hash) in last_message_hashes.iter().take(5).enumerate() {
99            last_message_hashes_array[i] = *hash;
100        } 
101
102        Record::sign(
103            self.record_topic.hash(),
104            unix_minute,
105            self.pub_key.to_bytes(),
106            active_peers,
107            last_message_hashes_array,
108            &self.signing_key,
109        )
110    }
111
112    pub fn pub_key(&self) -> ed25519_dalek::VerifyingKey {
113        self.pub_key.clone()
114    }
115
116    pub fn record_topic(&self) -> RecordTopic {
117        self.record_topic.clone()
118    }
119
120    pub fn signing_key(&self) -> ed25519_dalek::SigningKey {
121        self.signing_key.clone()
122    }
123
124    pub fn secret_rotation(&self) -> Option<crate::crypto::keys::RotationHandle> {
125        self.secret_rotation.clone()
126    }
127
128    pub fn initial_secret_hash(&self) -> [u8; 32] {
129        self.initial_secret_hash
130    }
131}
132
133impl RecordPublisher {
134
135    // returns records it checked before publishing so we don't have to get twice
136    pub async fn publish_record(&self, record: Record) -> Result<()> {
137        // Get verified records that have active_peers or last_message_hashes set (active participants)
138        let records = self.get_records(record.unix_minute())
139        .await
140        .iter()
141        .filter(|&record| {
142            record
143                .active_peers()
144                .iter()
145                .filter(|&peer| peer.eq(&[0u8; 32]))
146                .count()
147                > 0
148                || record
149                    .last_message_hashes()
150                    .iter()
151                    .filter(|&hash| hash.eq(&[0u8; 32]))
152                    .count()
153                    > 0
154        })
155        .cloned()
156        .collect::<HashSet<_>>();
157
158        // Don't publish if there are more then MAX_BOOTSTRAP_RECORDS already written
159        // that either have active_peers or last_message_hashes set (active participants)
160        if records.len() >= crate::MAX_BOOTSTRAP_RECORDS {
161            return Ok(());
162        }
163
164        // Publish own records
165        let sign_key = crate::crypto::keys::signing_keypair(self.record_topic, record.unix_minute);
166        let salt = crate::crypto::keys::salt(self.record_topic, record.unix_minute);
167        let encryption_key = crate::crypto::keys::encryption_keypair(
168            self.record_topic.clone(),
169            &self.secret_rotation.clone().unwrap_or_default(),
170            self.initial_secret_hash,
171            record.unix_minute,
172        );
173        let encrypted_record = record.encrypt(&encryption_key);
174        
175        self.dht.put_mutable(
176            sign_key.clone(),
177            sign_key.verifying_key().into(),
178            Some(salt.to_vec()),
179            encrypted_record.to_bytes().to_vec(),
180            Some(3),
181            Duration::from_secs(10),
182        )
183        .await?;
184
185        Ok(())
186    }
187
188    pub async fn get_records(&self, unix_minute: u64) -> HashSet<Record> {
189        let topic_sign = crate::crypto::keys::signing_keypair(self.record_topic, unix_minute);
190        let encryption_key = crate::crypto::keys::encryption_keypair(
191            self.record_topic.clone(),
192            &self.secret_rotation.clone().unwrap_or_default(),
193            self.initial_secret_hash,
194            unix_minute,
195        );
196        let salt = crate::crypto::keys::salt(self.record_topic, unix_minute);
197
198        // Get records, decrypt and verify
199        let records_iter = self.dht
200            .get(
201                topic_sign.verifying_key().into(),
202                Some(salt.to_vec()),
203                None,
204                Duration::from_secs(10),
205            )
206            .await
207            .unwrap_or_default();
208
209        records_iter
210            .iter()
211            .filter_map(
212                |record| match EncryptedRecord::from_bytes(record.value().to_vec()) {
213                    Ok(encrypted_record) => match encrypted_record.decrypt(&encryption_key) {
214                        Ok(record) => match record.verify(&self.record_topic.hash(), unix_minute) {
215                            Ok(_) => match record.node_id().eq(self.pub_key.as_bytes()) {
216                                true => None,
217                                false => Some(record),
218                            },
219                            Err(_) => None,
220                        },
221                        Err(_) => None,
222                    },
223                    Err(_) => None,
224                },
225            )
226            .collect::<HashSet<_>>()
227    }
228}
229
230impl EncryptedRecord {
231    pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
232        let one_time_key_bytes: [u8; 32] = decryption_key
233            .decrypt(&self.encrypted_decryption_key)?
234            .as_slice()
235            .try_into()?;
236        let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
237
238        let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
239        let record = Record::from_bytes(decrypted_record)?;
240        Ok(record)
241    }
242
243    pub fn to_bytes(&self) -> Vec<u8> {
244        let mut buf = Vec::new();
245        let encrypted_record_len = self.encrypted_record.len() as u32;
246        buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
247        buf.extend_from_slice(&self.encrypted_record);
248        buf.extend_from_slice(&self.encrypted_decryption_key);
249        buf
250    }
251
252    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
253        let (encrypted_record_len, buf) = buf.split_at(4);
254        let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
255        let (encrypted_record, encrypted_decryption_key) =
256            buf.split_at(encrypted_record_len as usize);
257
258        Ok(Self {
259            encrypted_record: encrypted_record.to_vec(),
260            encrypted_decryption_key: encrypted_decryption_key.to_vec(),
261        })
262    }
263}
264
265impl Record {
266    pub fn sign(
267        topic: [u8; 32],
268        unix_minute: u64,
269        node_id: [u8; 32],
270        active_peers: [[u8; 32]; 5],
271        last_message_hashes: [[u8; 32]; 5],
272        signing_key: &ed25519_dalek::SigningKey,
273    ) -> Self {
274        let mut signature_data = Vec::new();
275        signature_data.extend_from_slice(&topic);
276        signature_data.extend_from_slice(&unix_minute.to_le_bytes());
277        signature_data.extend_from_slice(&node_id);
278        for active_peer in active_peers {
279            signature_data.extend_from_slice(&active_peer);
280        }
281        for last_message_hash in last_message_hashes {
282            signature_data.extend_from_slice(&last_message_hash);
283        }
284        let mut signing_key = signing_key.clone();
285        let signature = signing_key.sign(&signature_data);
286        Self {
287            topic,
288            unix_minute,
289            pub_key: node_id,
290            active_peers,
291            last_message_hashes,
292            signature: signature.to_bytes(),
293        }
294    }
295
296    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
297        let (topic, buf) = buf.split_at(32);
298        let (unix_minute, buf) = buf.split_at(8);
299        let (node_id, mut buf) = buf.split_at(32);
300
301        let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
302        #[allow(clippy::needless_range_loop)]
303        for i in 0..active_peers.len() {
304            let (active_peer, _buf) = buf.split_at(32);
305            active_peers[i] = active_peer.try_into()?;
306            buf = _buf;
307        }
308        let mut last_message_hashes: [[u8; 32]; 5] = [[0; 32]; 5];
309        #[allow(clippy::needless_range_loop)]
310        for i in 0..last_message_hashes.len() {
311            let (last_message_hash, _buf) = buf.split_at(32);
312            last_message_hashes[i] = last_message_hash.try_into()?;
313            buf = _buf;
314        }
315
316        let (signature, buf) = buf.split_at(64);
317
318        if !buf.is_empty() {
319            bail!("buffer not empty after reconstruction")
320        }
321
322        Ok(Self {
323            topic: topic.try_into()?,
324            unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
325            pub_key: node_id.try_into()?,
326            active_peers,
327            last_message_hashes,
328            signature: signature.try_into()?,
329        })
330    }
331
332    pub fn to_bytes(&self) -> Vec<u8> {
333        let mut buf = Vec::new();
334        buf.extend_from_slice(&self.topic);
335        buf.extend_from_slice(&self.unix_minute.to_le_bytes());
336        buf.extend_from_slice(&self.pub_key);
337        for active_peer in self.active_peers {
338            buf.extend_from_slice(&active_peer);
339        }
340        for last_message_hash in self.last_message_hashes {
341            buf.extend_from_slice(&last_message_hash);
342        }
343        buf.extend_from_slice(&self.signature);
344        buf
345    }
346
347    pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
348        if self.topic != *actual_topic {
349            bail!("topic mismatch")
350        }
351        if self.unix_minute != actual_unix_minute {
352            bail!("unix minute mismatch")
353        }
354
355        let record_bytes = self.to_bytes();
356        let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
357        let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
358        let node_id = ed25519_dalek::VerifyingKey::from_bytes(&self.pub_key)?;
359
360        node_id.verify_strict(signature_data.as_slice(), &signature)?;
361
362        Ok(())
363    }
364
365    pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
366        let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::thread_rng());
367        let p_key = one_time_key.verifying_key();
368        let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
369        let key_enc = encryption_key
370            .verifying_key()
371            .encrypt(&one_time_key.to_bytes())
372            .expect("encryption failed");
373
374        EncryptedRecord {
375            encrypted_record: data_enc,
376            encrypted_decryption_key: key_enc,
377        }
378    }
379}
380
381// fields only
382impl Record {
383    pub fn topic(&self) -> [u8; 32] {
384        self.topic
385    }
386
387    pub fn unix_minute(&self) -> u64 {
388        self.unix_minute
389    }
390
391    pub fn node_id(&self) -> [u8; 32] {
392        self.pub_key
393    }
394
395    pub fn active_peers(&self) -> [[u8; 32]; 5] {
396        self.active_peers
397    }
398
399    pub fn last_message_hashes(&self) -> [[u8; 32]; 5] {
400        self.last_message_hashes
401    }
402
403    pub fn signature(&self) -> [u8; 64] {
404        self.signature
405    }
406}