distributed_topic_tracker/crypto/
record.rs1use std::{collections::HashSet, str::FromStr, time::Duration};
2
3use anyhow::{Result, bail};
4use ed25519_dalek::{Signer, SigningKey, VerifyingKey};
5
6use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
7use serde::{Deserialize, Serialize};
8use sha2::Digest;
9
10#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
20pub struct RecordTopic([u8; 32]);
21
22impl FromStr for RecordTopic {
23 type Err = anyhow::Error;
24
25 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
26 let mut hasher = sha2::Sha512::new();
27 hasher.update(s.as_bytes());
28 let hash: [u8; 32] = hasher.finalize()[..32]
29 .try_into()
30 .map_err(|_| anyhow::anyhow!("hashing failed"))?;
31 Ok(RecordTopic(hash))
32 }
33}
34
35impl RecordTopic {
36 pub fn from_bytes(bytes: &[u8; 32]) -> Self {
38 Self(*bytes)
39 }
40
41 pub fn hash(&self) -> [u8; 32] {
43 self.0
44 }
45}
46
47#[derive(Debug, Clone)]
52pub struct EncryptedRecord {
53 encrypted_record: Vec<u8>,
54 encrypted_decryption_key: Vec<u8>,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, Hash)]
62pub struct Record {
63 topic: [u8; 32],
64 unix_minute: u64,
65 pub_key: [u8; 32],
66 content: RecordContent,
67 signature: [u8; 64],
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
72pub struct RecordContent(pub Vec<u8>);
73
74impl RecordContent {
75 pub fn to<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
83 postcard::from_bytes::<T>(&self.0).map_err(|e| anyhow::anyhow!(e))
84 }
85
86 pub fn from_arbitrary<T: Serialize>(from: &T) -> anyhow::Result<Self> {
88 Ok(Self(
89 postcard::to_allocvec(from).map_err(|e| anyhow::anyhow!(e))?,
90 ))
91 }
92}
93
94#[derive(Debug, Clone)]
98pub struct RecordPublisher {
99 dht: crate::dht::Dht,
100
101 record_topic: RecordTopic,
102 pub_key: VerifyingKey,
103 signing_key: SigningKey,
104 secret_rotation: Option<crate::crypto::keys::RotationHandle>,
105 initial_secret_hash: [u8; 32],
106}
107
108impl RecordPublisher {
109 pub fn new(
119 record_topic: impl Into<RecordTopic>,
120 pub_key: VerifyingKey,
121 signing_key: SigningKey,
122 secret_rotation: Option<crate::crypto::keys::RotationHandle>,
123 initial_secret: Vec<u8>,
124 ) -> Self {
125 let mut initial_secret_hash = sha2::Sha512::new();
126 initial_secret_hash.update(initial_secret);
127 let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
128 .try_into()
129 .expect("hashing failed");
130
131 Self {
132 dht: crate::dht::Dht::new(),
133 record_topic: record_topic.into(),
134 pub_key,
135 signing_key,
136 secret_rotation,
137 initial_secret_hash,
138 }
139 }
140
141 pub fn new_record<'a>(
148 &'a self,
149 unix_minute: u64,
150 record_content: impl Serialize + Deserialize<'a>,
151 ) -> Result<Record> {
152 Record::sign(
153 self.record_topic.hash(),
154 unix_minute,
155 self.pub_key.to_bytes(),
156 record_content,
157 &self.signing_key,
158 )
159 }
160
161 pub fn pub_key(&self) -> ed25519_dalek::VerifyingKey {
163 self.pub_key
164 }
165
166 pub fn record_topic(&self) -> RecordTopic {
168 self.record_topic
169 }
170
171 pub fn signing_key(&self) -> ed25519_dalek::SigningKey {
173 self.signing_key.clone()
174 }
175
176 pub fn secret_rotation(&self) -> Option<crate::crypto::keys::RotationHandle> {
178 self.secret_rotation.clone()
179 }
180
181 pub fn initial_secret_hash(&self) -> [u8; 32] {
183 self.initial_secret_hash
184 }
185}
186
187impl RecordPublisher {
188 pub async fn publish_record(&self, record: Record) -> Result<()> {
193 let records = self
194 .get_records(record.unix_minute())
195 .await
196 .iter()
197 .cloned()
198 .collect::<HashSet<_>>();
199
200 tracing::debug!(
201 "RecordPublisher: found {} existing records for unix_minute {}",
202 records.len(),
203 record.unix_minute()
204 );
205
206 if records.len() >= crate::MAX_BOOTSTRAP_RECORDS {
207 tracing::debug!(
208 "RecordPublisher: max records reached ({}), skipping publish",
209 crate::MAX_BOOTSTRAP_RECORDS
210 );
211 return Ok(());
212 }
213
214 let sign_key = crate::crypto::keys::signing_keypair(self.record_topic, record.unix_minute);
216 let salt = crate::crypto::keys::salt(self.record_topic, record.unix_minute);
217 let encryption_key = crate::crypto::keys::encryption_keypair(
218 self.record_topic,
219 &self.secret_rotation.clone().unwrap_or_default(),
220 self.initial_secret_hash,
221 record.unix_minute,
222 );
223 let encrypted_record = record.encrypt(&encryption_key);
224
225 tracing::debug!(
226 "RecordPublisher: publishing record to DHT for unix_minute {}",
227 record.unix_minute()
228 );
229
230 self.dht
231 .put_mutable(
232 sign_key.clone(),
233 sign_key.verifying_key(),
234 Some(salt.to_vec()),
235 encrypted_record.to_bytes().to_vec(),
236 Some(3),
237 Duration::from_secs(10),
238 )
239 .await?;
240
241 tracing::debug!("RecordPublisher: successfully published to DHT");
242 Ok(())
243 }
244
245 pub async fn get_records(&self, unix_minute: u64) -> HashSet<Record> {
249 tracing::debug!(
250 "RecordPublisher: fetching records from DHT for unix_minute {}",
251 unix_minute
252 );
253
254 let topic_sign = crate::crypto::keys::signing_keypair(self.record_topic, unix_minute);
255 let encryption_key = crate::crypto::keys::encryption_keypair(
256 self.record_topic,
257 &self.secret_rotation.clone().unwrap_or_default(),
258 self.initial_secret_hash,
259 unix_minute,
260 );
261 let salt = crate::crypto::keys::salt(self.record_topic, unix_minute);
262
263 let records_iter = self
265 .dht
266 .get(
267 topic_sign.verifying_key(),
268 Some(salt.to_vec()),
269 None,
270 Duration::from_secs(10),
271 )
272 .await
273 .unwrap_or_default();
274
275 tracing::debug!(
276 "RecordPublisher: received {} raw records from DHT",
277 records_iter.len()
278 );
279
280 let verified_records = records_iter
281 .iter()
282 .filter_map(
283 |record| match EncryptedRecord::from_bytes(record.value().to_vec()) {
284 Ok(encrypted_record) => match encrypted_record.decrypt(&encryption_key) {
285 Ok(record) => match record.verify(&self.record_topic.hash(), unix_minute) {
286 Ok(_) => match record.node_id().eq(self.pub_key.as_bytes()) {
287 true => {
288 tracing::debug!("RecordPublisher: filtered out self");
289 None
290 }
291 false => Some(record),
292 },
293 Err(_) => None,
294 },
295 Err(_) => None,
296 },
297 Err(_) => None,
298 },
299 )
300 .collect::<HashSet<_>>();
301
302 tracing::debug!(
303 "RecordPublisher: verified {} records (filtered self)",
304 verified_records.len()
305 );
306 verified_records
307 }
308}
309
310impl EncryptedRecord {
311 pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
313 let one_time_key_bytes: [u8; 32] = decryption_key
314 .decrypt(&self.encrypted_decryption_key)?
315 .as_slice()
316 .try_into()?;
317 let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
318
319 let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
320 let record = Record::from_bytes(decrypted_record)?;
321 Ok(record)
322 }
323
324 pub fn to_bytes(&self) -> Vec<u8> {
326 let mut buf = Vec::new();
327 let encrypted_record_len = self.encrypted_record.len() as u32;
328 buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
329 buf.extend_from_slice(&self.encrypted_record);
330 buf.extend_from_slice(&self.encrypted_decryption_key);
331 buf
332 }
333
334 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
336 let (encrypted_record_len, buf) = buf.split_at(4);
337 let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
338 let (encrypted_record, encrypted_decryption_key) =
339 buf.split_at(encrypted_record_len as usize);
340
341 Ok(Self {
342 encrypted_record: encrypted_record.to_vec(),
343 encrypted_decryption_key: encrypted_decryption_key.to_vec(),
344 })
345 }
346}
347
348impl Record {
349 pub fn sign<'a>(
351 topic: [u8; 32],
352 unix_minute: u64,
353 node_id: [u8; 32],
354 record_content: impl Serialize + Deserialize<'a>,
355 signing_key: &ed25519_dalek::SigningKey,
356 ) -> anyhow::Result<Self> {
357 let record_content = RecordContent::from_arbitrary(&record_content)?;
358 let mut signature_data = Vec::new();
359 signature_data.extend_from_slice(&topic);
360 signature_data.extend_from_slice(&unix_minute.to_le_bytes());
361 signature_data.extend_from_slice(&node_id);
362 signature_data.extend(&record_content.clone().0);
363 let signing_key = signing_key.clone();
364 let signature = signing_key.sign(&signature_data);
365 Ok(Self {
366 topic,
367 unix_minute,
368 pub_key: node_id,
369 content: record_content,
370 signature: signature.to_bytes(),
371 })
372 }
373
374 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
376 let (topic, buf) = buf.split_at(32);
377 let (unix_minute, buf) = buf.split_at(8);
378 let (node_id, buf) = buf.split_at(32);
379 let (record_content, buf) = buf.split_at(buf.len() - 64);
380
381 let (signature, buf) = buf.split_at(64);
382
383 if !buf.is_empty() {
384 bail!("buffer not empty after reconstruction")
385 }
386
387 Ok(Self {
388 topic: topic.try_into()?,
389 unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
390 pub_key: node_id.try_into()?,
391 content: RecordContent(record_content.to_vec()),
392 signature: signature.try_into()?,
393 })
394 }
395
396 pub fn to_bytes(&self) -> Vec<u8> {
398 let mut buf = Vec::new();
399 buf.extend_from_slice(&self.topic);
400 buf.extend_from_slice(&self.unix_minute.to_le_bytes());
401 buf.extend_from_slice(&self.pub_key);
402 buf.extend(&self.content.0);
403 buf.extend_from_slice(&self.signature);
404 buf
405 }
406
407 pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
409 if self.topic != *actual_topic {
410 bail!("topic mismatch")
411 }
412 if self.unix_minute != actual_unix_minute {
413 bail!("unix minute mismatch")
414 }
415
416 let record_bytes = self.to_bytes();
417 let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
418 let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
419 let node_id = ed25519_dalek::VerifyingKey::from_bytes(&self.pub_key)?;
420
421 node_id.verify_strict(signature_data.as_slice(), &signature)?;
422
423 Ok(())
424 }
425
426 pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
428 let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::rng());
429 let p_key = one_time_key.verifying_key();
430 let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
431 let key_enc = encryption_key
432 .verifying_key()
433 .encrypt(&one_time_key.to_bytes())
434 .expect("encryption failed");
435
436 EncryptedRecord {
437 encrypted_record: data_enc,
438 encrypted_decryption_key: key_enc,
439 }
440 }
441}
442
443impl Record {
445 pub fn topic(&self) -> [u8; 32] {
447 self.topic
448 }
449
450 pub fn unix_minute(&self) -> u64 {
452 self.unix_minute
453 }
454
455 pub fn node_id(&self) -> [u8; 32] {
457 self.pub_key
458 }
459
460 pub fn content<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
462 self.content.to::<T>()
463 }
464
465 pub fn signature(&self) -> [u8; 64] {
467 self.signature
468 }
469}