distributed_topic_tracker/crypto/
record.rs

1use std::{collections::HashSet, time::Duration};
2
3use anyhow::{Result, bail};
4use ed25519_dalek::ed25519::signature::SignerMut;
5use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
6use iroh::NodeId;
7use sha2::Digest;
8
9#[derive(Debug, Clone)]
10pub struct EncryptedRecord {
11    encrypted_record: Vec<u8>,
12    encrypted_decryption_key: Vec<u8>,
13}
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub struct Record {
17    topic: [u8; 32],
18    unix_minute: u64,
19    node_id: [u8; 32],
20    active_peers: [[u8; 32]; 5],
21    last_message_hashes: [[u8; 32]; 5],
22    signature: [u8; 64],
23}
24
25#[derive(Debug,Clone)]
26pub struct RecordPublisher {
27    dht: crate::dht::Dht,
28    
29    topic_id: crate::topic::topic::TopicId,
30    node_id: iroh::NodeId,
31    signing_key: mainline::SigningKey,
32    secret_rotation: Option<crate::crypto::keys::RotationHandle>,
33    initial_secret_hash: [u8; 32],
34}
35
36impl RecordPublisher {
37    pub fn new(
38        topic_id: crate::topic::topic::TopicId,
39        node_id: iroh::NodeId,
40        signing_key: mainline::SigningKey,
41        secret_rotation: Option<crate::crypto::keys::RotationHandle>,
42        initial_secret: Vec<u8>,
43    ) -> Self {
44        let mut initial_secret_hash = sha2::Sha512::new();
45        initial_secret_hash.update(initial_secret);
46        let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
47            .try_into()
48            .expect("hashing failed");
49
50        Self {
51            dht: crate::dht::Dht::new(),
52            topic_id,
53            node_id,
54            signing_key,
55            secret_rotation,
56            initial_secret_hash,
57        }
58    }
59
60    pub fn new_record(
61        &self,
62        unix_minute: u64,
63        neighbors: Vec<NodeId>,
64        last_message_hashes: Vec<[u8; 32]>,
65    ) -> Record {
66        let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
67        for (i, peer) in neighbors.iter().take(5).enumerate() {
68            active_peers[i] = *peer.as_bytes()
69        }
70
71        let mut last_message_hashes_array = [[0u8; 32]; 5];
72        for (i, hash) in last_message_hashes.iter().take(5).enumerate() {
73            last_message_hashes_array[i] = *hash;
74        } 
75
76        Record::sign(
77            self.topic_id.hash(),
78            unix_minute,
79            self.node_id.public().to_bytes(),
80            active_peers,
81            last_message_hashes_array,
82            &self.signing_key,
83        )
84    }
85
86    pub fn node_id(&self) -> iroh::NodeId {
87        self.node_id.clone()
88    }
89
90    pub fn topic_id(&self) -> crate::topic::topic::TopicId {
91        self.topic_id.clone()
92    }
93
94    pub fn signing_key(&self) -> mainline::SigningKey {
95        self.signing_key.clone()
96    }
97
98    pub fn secret_rotation(&self) -> Option<crate::crypto::keys::RotationHandle> {
99        self.secret_rotation.clone()
100    }
101
102    pub fn initial_secret_hash(&self) -> [u8; 32] {
103        self.initial_secret_hash
104    }
105}
106
107impl RecordPublisher {
108
109    // returns records it checked before publishing so we don't have to get twice
110    pub async fn publish_record(&self, record: Record) -> Result<()> {
111        // Get verified records that have active_peers or last_message_hashes set (active participants)
112        let records = self.get_records(record.unix_minute())
113        .await
114        .iter()
115        .filter(|&record| {
116            record
117                .active_peers()
118                .iter()
119                .filter(|&peer| peer.eq(&[0u8; 32]))
120                .count()
121                > 0
122                || record
123                    .last_message_hashes()
124                    .iter()
125                    .filter(|&hash| hash.eq(&[0u8; 32]))
126                    .count()
127                    > 0
128        })
129        .cloned()
130        .collect::<HashSet<_>>();
131
132        // Don't publish if there are more then MAX_BOOTSTRAP_RECORDS already written
133        // that either have active_peers or last_message_hashes set (active participants)
134        if records.len() >= crate::MAX_BOOTSTRAP_RECORDS {
135            return Ok(());
136        }
137
138        // Publish own records
139        let sign_key = crate::crypto::keys::signing_keypair(&self.topic_id.clone(), record.unix_minute);
140        let salt = crate::crypto::keys::salt(&self.topic_id, record.unix_minute);
141        let encryption_key = crate::crypto::keys::encryption_keypair(
142            &self.topic_id.clone(),
143            &self.secret_rotation.clone().unwrap_or_default(),
144            self.initial_secret_hash,
145            record.unix_minute,
146        );
147        let encrypted_record = record.encrypt(&encryption_key);
148        
149        self.dht.put_mutable(
150            sign_key.clone(),
151            sign_key.verifying_key().into(),
152            Some(salt.to_vec()),
153            encrypted_record.to_bytes().to_vec(),
154            Some(3),
155            Duration::from_secs(10),
156        )
157        .await?;
158
159        Ok(())
160    }
161
162    pub async fn get_records(&self, unix_minute: u64) -> HashSet<Record> {
163        let topic_sign = crate::crypto::keys::signing_keypair(&self.topic_id, unix_minute);
164        let encryption_key = crate::crypto::keys::encryption_keypair(
165            &self.topic_id,
166            &self.secret_rotation.clone().unwrap_or_default(),
167            self.initial_secret_hash,
168            unix_minute,
169        );
170        let salt = crate::crypto::keys::salt(&self.topic_id, unix_minute);
171
172        // Get records, decrypt and verify
173        let records_iter = self.dht
174            .get(
175                topic_sign.verifying_key().into(),
176                Some(salt.to_vec()),
177                None,
178                Duration::from_secs(10),
179            )
180            .await
181            .unwrap_or_default();
182
183        records_iter
184            .iter()
185            .filter_map(
186                |record| match EncryptedRecord::from_bytes(record.value().to_vec()) {
187                    Ok(encrypted_record) => match encrypted_record.decrypt(&encryption_key) {
188                        Ok(record) => match record.verify(&self.topic_id.hash(), unix_minute) {
189                            Ok(_) => match record.node_id().eq(self.node_id.as_bytes()) {
190                                true => None,
191                                false => Some(record),
192                            },
193                            Err(_) => None,
194                        },
195                        Err(_) => None,
196                    },
197                    Err(_) => None,
198                },
199            )
200            .collect::<HashSet<_>>()
201    }
202}
203
204impl EncryptedRecord {
205    pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
206        let one_time_key_bytes: [u8; 32] = decryption_key
207            .decrypt(&self.encrypted_decryption_key)?
208            .as_slice()
209            .try_into()?;
210        let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
211
212        let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
213        let record = Record::from_bytes(decrypted_record)?;
214        Ok(record)
215    }
216
217    pub fn to_bytes(&self) -> Vec<u8> {
218        let mut buf = Vec::new();
219        let encrypted_record_len = self.encrypted_record.len() as u32;
220        buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
221        buf.extend_from_slice(&self.encrypted_record);
222        buf.extend_from_slice(&self.encrypted_decryption_key);
223        buf
224    }
225
226    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
227        let (encrypted_record_len, buf) = buf.split_at(4);
228        let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
229        let (encrypted_record, encrypted_decryption_key) =
230            buf.split_at(encrypted_record_len as usize);
231
232        Ok(Self {
233            encrypted_record: encrypted_record.to_vec(),
234            encrypted_decryption_key: encrypted_decryption_key.to_vec(),
235        })
236    }
237}
238
239impl Record {
240    pub fn sign(
241        topic: [u8; 32],
242        unix_minute: u64,
243        node_id: [u8; 32],
244        active_peers: [[u8; 32]; 5],
245        last_message_hashes: [[u8; 32]; 5],
246        signing_key: &ed25519_dalek::SigningKey,
247    ) -> Self {
248        let mut signature_data = Vec::new();
249        signature_data.extend_from_slice(&topic);
250        signature_data.extend_from_slice(&unix_minute.to_le_bytes());
251        signature_data.extend_from_slice(&node_id);
252        for active_peer in active_peers {
253            signature_data.extend_from_slice(&active_peer);
254        }
255        for last_message_hash in last_message_hashes {
256            signature_data.extend_from_slice(&last_message_hash);
257        }
258        let mut signing_key = signing_key.clone();
259        let signature = signing_key.sign(&signature_data);
260        Self {
261            topic,
262            unix_minute,
263            node_id,
264            active_peers,
265            last_message_hashes,
266            signature: signature.to_bytes(),
267        }
268    }
269
270    pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
271        let (topic, buf) = buf.split_at(32);
272        let (unix_minute, buf) = buf.split_at(8);
273        let (node_id, mut buf) = buf.split_at(32);
274
275        let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
276        #[allow(clippy::needless_range_loop)]
277        for i in 0..active_peers.len() {
278            let (active_peer, _buf) = buf.split_at(32);
279            active_peers[i] = active_peer.try_into()?;
280            buf = _buf;
281        }
282        let mut last_message_hashes: [[u8; 32]; 5] = [[0; 32]; 5];
283        #[allow(clippy::needless_range_loop)]
284        for i in 0..last_message_hashes.len() {
285            let (last_message_hash, _buf) = buf.split_at(32);
286            last_message_hashes[i] = last_message_hash.try_into()?;
287            buf = _buf;
288        }
289
290        let (signature, buf) = buf.split_at(64);
291
292        if !buf.is_empty() {
293            bail!("buffer not empty after reconstruction")
294        }
295
296        Ok(Self {
297            topic: topic.try_into()?,
298            unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
299            node_id: node_id.try_into()?,
300            active_peers,
301            last_message_hashes,
302            signature: signature.try_into()?,
303        })
304    }
305
306    pub fn to_bytes(&self) -> Vec<u8> {
307        let mut buf = Vec::new();
308        buf.extend_from_slice(&self.topic);
309        buf.extend_from_slice(&self.unix_minute.to_le_bytes());
310        buf.extend_from_slice(&self.node_id);
311        for active_peer in self.active_peers {
312            buf.extend_from_slice(&active_peer);
313        }
314        for last_message_hash in self.last_message_hashes {
315            buf.extend_from_slice(&last_message_hash);
316        }
317        buf.extend_from_slice(&self.signature);
318        buf
319    }
320
321    pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
322        if self.topic != *actual_topic {
323            bail!("topic mismatch")
324        }
325        if self.unix_minute != actual_unix_minute {
326            bail!("unix minute mismatch")
327        }
328
329        let record_bytes = self.to_bytes();
330        let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
331        let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
332        let node_id = ed25519_dalek::VerifyingKey::from_bytes(&self.node_id)?;
333
334        node_id.verify_strict(signature_data.as_slice(), &signature)?;
335
336        Ok(())
337    }
338
339    pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
340        let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::thread_rng());
341        let p_key = one_time_key.verifying_key();
342        let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
343        let key_enc = encryption_key
344            .verifying_key()
345            .encrypt(&one_time_key.to_bytes())
346            .expect("encryption failed");
347
348        EncryptedRecord {
349            encrypted_record: data_enc,
350            encrypted_decryption_key: key_enc,
351        }
352    }
353}
354
355// fields only
356impl Record {
357    pub fn topic(&self) -> [u8; 32] {
358        self.topic
359    }
360
361    pub fn unix_minute(&self) -> u64 {
362        self.unix_minute
363    }
364
365    pub fn node_id(&self) -> [u8; 32] {
366        self.node_id
367    }
368
369    pub fn active_peers(&self) -> [[u8; 32]; 5] {
370        self.active_peers
371    }
372
373    pub fn last_message_hashes(&self) -> [[u8; 32]; 5] {
374        self.last_message_hashes
375    }
376
377    pub fn signature(&self) -> [u8; 64] {
378        self.signature
379    }
380}