distributed_topic_tracker/crypto/
record.rs1use std::{collections::HashSet, str::FromStr, time::Duration};
2
3use anyhow::{Result, bail};
4use ed25519_dalek::{ed25519::signature::SignerMut, VerifyingKey, SigningKey};
5use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
6use sha2::Digest;
7
8#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
9pub struct RecordTopic([u8; 32]);
10
11impl FromStr for RecordTopic {
12 type Err = anyhow::Error;
13
14 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
15 let mut hasher = sha2::Sha512::new();
16 hasher.update(s.as_bytes());
17 let hash: [u8; 32] = hasher.finalize()[..32]
18 .try_into()
19 .map_err(|_| anyhow::anyhow!("hashing failed"))?;
20 Ok(RecordTopic(hash))
21
22 }
23}
24
25impl RecordTopic {
26 pub fn from_bytes(bytes: &[u8; 32]) -> Self {
27 Self(*bytes)
28 }
29
30 pub fn hash(&self) -> [u8; 32] {
31 self.0
32 }
33}
34
35#[derive(Debug, Clone)]
36pub struct EncryptedRecord {
37 encrypted_record: Vec<u8>,
38 encrypted_decryption_key: Vec<u8>,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Hash)]
42pub struct Record {
43 topic: [u8; 32],
44 unix_minute: u64,
45 pub_key: [u8; 32],
46 active_peers: [[u8; 32]; 5],
47 last_message_hashes: [[u8; 32]; 5],
48 signature: [u8; 64],
49}
50
51#[derive(Debug,Clone)]
52pub struct RecordPublisher {
53 dht: crate::dht::Dht,
54
55 record_topic: RecordTopic,
56 pub_key: VerifyingKey,
57 signing_key: SigningKey,
58 secret_rotation: Option<crate::crypto::keys::RotationHandle>,
59 initial_secret_hash: [u8; 32],
60}
61
62impl RecordPublisher {
63 pub fn new(
64 record_topic: impl Into<RecordTopic>,
65 node_id: VerifyingKey,
66 signing_key: SigningKey,
67 secret_rotation: Option<crate::crypto::keys::RotationHandle>,
68 initial_secret: Vec<u8>,
69 ) -> Self {
70 let mut initial_secret_hash = sha2::Sha512::new();
71 initial_secret_hash.update(initial_secret);
72 let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
73 .try_into()
74 .expect("hashing failed");
75
76 Self {
77 dht: crate::dht::Dht::new(),
78 record_topic: record_topic.into(),
79 pub_key: node_id,
80 signing_key,
81 secret_rotation,
82 initial_secret_hash,
83 }
84 }
85
86 pub fn new_record(
87 &self,
88 unix_minute: u64,
89 neighbors: Vec<ed25519_dalek::VerifyingKey>,
90 last_message_hashes: Vec<[u8; 32]>,
91 ) -> Record {
92 let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
93 for (i, peer) in neighbors.iter().take(5).enumerate() {
94 active_peers[i] = *peer.as_bytes()
95 }
96
97 let mut last_message_hashes_array = [[0u8; 32]; 5];
98 for (i, hash) in last_message_hashes.iter().take(5).enumerate() {
99 last_message_hashes_array[i] = *hash;
100 }
101
102 Record::sign(
103 self.record_topic.hash(),
104 unix_minute,
105 self.pub_key.to_bytes(),
106 active_peers,
107 last_message_hashes_array,
108 &self.signing_key,
109 )
110 }
111
112 pub fn pub_key(&self) -> ed25519_dalek::VerifyingKey {
113 self.pub_key.clone()
114 }
115
116 pub fn record_topic(&self) -> RecordTopic {
117 self.record_topic.clone()
118 }
119
120 pub fn signing_key(&self) -> ed25519_dalek::SigningKey {
121 self.signing_key.clone()
122 }
123
124 pub fn secret_rotation(&self) -> Option<crate::crypto::keys::RotationHandle> {
125 self.secret_rotation.clone()
126 }
127
128 pub fn initial_secret_hash(&self) -> [u8; 32] {
129 self.initial_secret_hash
130 }
131}
132
133impl RecordPublisher {
134
135 pub async fn publish_record(&self, record: Record) -> Result<()> {
137 let records = self.get_records(record.unix_minute())
139 .await
140 .iter()
141 .filter(|&record| {
142 record
143 .active_peers()
144 .iter()
145 .filter(|&peer| peer.eq(&[0u8; 32]))
146 .count()
147 > 0
148 || record
149 .last_message_hashes()
150 .iter()
151 .filter(|&hash| hash.eq(&[0u8; 32]))
152 .count()
153 > 0
154 })
155 .cloned()
156 .collect::<HashSet<_>>();
157
158 if records.len() >= crate::MAX_BOOTSTRAP_RECORDS {
161 return Ok(());
162 }
163
164 let sign_key = crate::crypto::keys::signing_keypair(self.record_topic, record.unix_minute);
166 let salt = crate::crypto::keys::salt(self.record_topic, record.unix_minute);
167 let encryption_key = crate::crypto::keys::encryption_keypair(
168 self.record_topic.clone(),
169 &self.secret_rotation.clone().unwrap_or_default(),
170 self.initial_secret_hash,
171 record.unix_minute,
172 );
173 let encrypted_record = record.encrypt(&encryption_key);
174
175 self.dht.put_mutable(
176 sign_key.clone(),
177 sign_key.verifying_key().into(),
178 Some(salt.to_vec()),
179 encrypted_record.to_bytes().to_vec(),
180 Some(3),
181 Duration::from_secs(10),
182 )
183 .await?;
184
185 Ok(())
186 }
187
188 pub async fn get_records(&self, unix_minute: u64) -> HashSet<Record> {
189 let topic_sign = crate::crypto::keys::signing_keypair(self.record_topic, unix_minute);
190 let encryption_key = crate::crypto::keys::encryption_keypair(
191 self.record_topic.clone(),
192 &self.secret_rotation.clone().unwrap_or_default(),
193 self.initial_secret_hash,
194 unix_minute,
195 );
196 let salt = crate::crypto::keys::salt(self.record_topic, unix_minute);
197
198 let records_iter = self.dht
200 .get(
201 topic_sign.verifying_key().into(),
202 Some(salt.to_vec()),
203 None,
204 Duration::from_secs(10),
205 )
206 .await
207 .unwrap_or_default();
208
209 records_iter
210 .iter()
211 .filter_map(
212 |record| match EncryptedRecord::from_bytes(record.value().to_vec()) {
213 Ok(encrypted_record) => match encrypted_record.decrypt(&encryption_key) {
214 Ok(record) => match record.verify(&self.record_topic.hash(), unix_minute) {
215 Ok(_) => match record.node_id().eq(self.pub_key.as_bytes()) {
216 true => None,
217 false => Some(record),
218 },
219 Err(_) => None,
220 },
221 Err(_) => None,
222 },
223 Err(_) => None,
224 },
225 )
226 .collect::<HashSet<_>>()
227 }
228}
229
230impl EncryptedRecord {
231 pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
232 let one_time_key_bytes: [u8; 32] = decryption_key
233 .decrypt(&self.encrypted_decryption_key)?
234 .as_slice()
235 .try_into()?;
236 let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
237
238 let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
239 let record = Record::from_bytes(decrypted_record)?;
240 Ok(record)
241 }
242
243 pub fn to_bytes(&self) -> Vec<u8> {
244 let mut buf = Vec::new();
245 let encrypted_record_len = self.encrypted_record.len() as u32;
246 buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
247 buf.extend_from_slice(&self.encrypted_record);
248 buf.extend_from_slice(&self.encrypted_decryption_key);
249 buf
250 }
251
252 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
253 let (encrypted_record_len, buf) = buf.split_at(4);
254 let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
255 let (encrypted_record, encrypted_decryption_key) =
256 buf.split_at(encrypted_record_len as usize);
257
258 Ok(Self {
259 encrypted_record: encrypted_record.to_vec(),
260 encrypted_decryption_key: encrypted_decryption_key.to_vec(),
261 })
262 }
263}
264
265impl Record {
266 pub fn sign(
267 topic: [u8; 32],
268 unix_minute: u64,
269 node_id: [u8; 32],
270 active_peers: [[u8; 32]; 5],
271 last_message_hashes: [[u8; 32]; 5],
272 signing_key: &ed25519_dalek::SigningKey,
273 ) -> Self {
274 let mut signature_data = Vec::new();
275 signature_data.extend_from_slice(&topic);
276 signature_data.extend_from_slice(&unix_minute.to_le_bytes());
277 signature_data.extend_from_slice(&node_id);
278 for active_peer in active_peers {
279 signature_data.extend_from_slice(&active_peer);
280 }
281 for last_message_hash in last_message_hashes {
282 signature_data.extend_from_slice(&last_message_hash);
283 }
284 let mut signing_key = signing_key.clone();
285 let signature = signing_key.sign(&signature_data);
286 Self {
287 topic,
288 unix_minute,
289 pub_key: node_id,
290 active_peers,
291 last_message_hashes,
292 signature: signature.to_bytes(),
293 }
294 }
295
296 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
297 let (topic, buf) = buf.split_at(32);
298 let (unix_minute, buf) = buf.split_at(8);
299 let (node_id, mut buf) = buf.split_at(32);
300
301 let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
302 #[allow(clippy::needless_range_loop)]
303 for i in 0..active_peers.len() {
304 let (active_peer, _buf) = buf.split_at(32);
305 active_peers[i] = active_peer.try_into()?;
306 buf = _buf;
307 }
308 let mut last_message_hashes: [[u8; 32]; 5] = [[0; 32]; 5];
309 #[allow(clippy::needless_range_loop)]
310 for i in 0..last_message_hashes.len() {
311 let (last_message_hash, _buf) = buf.split_at(32);
312 last_message_hashes[i] = last_message_hash.try_into()?;
313 buf = _buf;
314 }
315
316 let (signature, buf) = buf.split_at(64);
317
318 if !buf.is_empty() {
319 bail!("buffer not empty after reconstruction")
320 }
321
322 Ok(Self {
323 topic: topic.try_into()?,
324 unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
325 pub_key: node_id.try_into()?,
326 active_peers,
327 last_message_hashes,
328 signature: signature.try_into()?,
329 })
330 }
331
332 pub fn to_bytes(&self) -> Vec<u8> {
333 let mut buf = Vec::new();
334 buf.extend_from_slice(&self.topic);
335 buf.extend_from_slice(&self.unix_minute.to_le_bytes());
336 buf.extend_from_slice(&self.pub_key);
337 for active_peer in self.active_peers {
338 buf.extend_from_slice(&active_peer);
339 }
340 for last_message_hash in self.last_message_hashes {
341 buf.extend_from_slice(&last_message_hash);
342 }
343 buf.extend_from_slice(&self.signature);
344 buf
345 }
346
347 pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
348 if self.topic != *actual_topic {
349 bail!("topic mismatch")
350 }
351 if self.unix_minute != actual_unix_minute {
352 bail!("unix minute mismatch")
353 }
354
355 let record_bytes = self.to_bytes();
356 let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
357 let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
358 let node_id = ed25519_dalek::VerifyingKey::from_bytes(&self.pub_key)?;
359
360 node_id.verify_strict(signature_data.as_slice(), &signature)?;
361
362 Ok(())
363 }
364
365 pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
366 let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::thread_rng());
367 let p_key = one_time_key.verifying_key();
368 let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
369 let key_enc = encryption_key
370 .verifying_key()
371 .encrypt(&one_time_key.to_bytes())
372 .expect("encryption failed");
373
374 EncryptedRecord {
375 encrypted_record: data_enc,
376 encrypted_decryption_key: key_enc,
377 }
378 }
379}
380
381impl Record {
383 pub fn topic(&self) -> [u8; 32] {
384 self.topic
385 }
386
387 pub fn unix_minute(&self) -> u64 {
388 self.unix_minute
389 }
390
391 pub fn node_id(&self) -> [u8; 32] {
392 self.pub_key
393 }
394
395 pub fn active_peers(&self) -> [[u8; 32]; 5] {
396 self.active_peers
397 }
398
399 pub fn last_message_hashes(&self) -> [[u8; 32]; 5] {
400 self.last_message_hashes
401 }
402
403 pub fn signature(&self) -> [u8; 64] {
404 self.signature
405 }
406}