distributed_topic_tracker/crypto/
record.rs1use std::{
2 collections::{HashMap, HashSet},
3 str::FromStr,
4};
5
6use anyhow::{Result, bail};
7use ed25519_dalek::{Signer, SigningKey, VerifyingKey};
8
9use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
10use serde::{Deserialize, Serialize};
11use sha2::Digest;
12use tokio_util::sync::CancellationToken;
13
14use crate::Config;
15
16#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub struct TopicId([u8; 32]);
27
28impl FromStr for TopicId {
29 type Err = anyhow::Error;
30
31 fn from_str(topic_name: &str) -> std::result::Result<Self, Self::Err> {
32 Ok(Self::new(topic_name.to_string()))
33 }
34}
35
36impl From<&str> for TopicId {
37 fn from(topic_name: &str) -> Self {
38 Self::new(topic_name.to_string())
39 }
40}
41
42impl From<String> for TopicId {
43 fn from(topic_name: String) -> Self {
44 Self::new(topic_name)
45 }
46}
47impl From<Vec<u8>> for TopicId {
50 fn from(topic_name: Vec<u8>) -> Self {
51 Self::new(topic_name)
52 }
53}
54
55impl TopicId {
56 pub fn new(topic_name: impl Into<Vec<u8>>) -> Self {
60 let mut topic_name_hash = sha2::Sha512::new();
61 topic_name_hash.update(topic_name.into());
62
63 Self(
64 topic_name_hash.finalize()[..32]
65 .try_into()
66 .expect("hashing 'topic_name' failed"),
67 )
68 }
69
70 pub fn from_hash(bytes: &[u8; 32]) -> Self {
72 Self(*bytes)
73 }
74
75 pub fn hash(&self) -> [u8; 32] {
77 self.0
78 }
79}
80
81#[derive(Debug, Clone)]
86pub struct EncryptedRecord {
87 encrypted_record: Vec<u8>,
88 encrypted_decryption_key: Vec<u8>,
89}
90
91#[derive(Debug, Clone, PartialEq, Eq, Hash)]
96pub struct Record {
97 topic: [u8; 32],
98 unix_minute: u64,
99 pub_key: [u8; 32],
100 content: RecordContent,
101 signature: [u8; 64],
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
106pub struct RecordContent(pub Vec<u8>);
107
108impl RecordContent {
109 pub fn to<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
117 postcard::from_bytes::<T>(&self.0).map_err(|e| anyhow::anyhow!(e))
118 }
119
120 pub fn from_arbitrary<T: Serialize>(from: &T) -> anyhow::Result<Self> {
122 Ok(Self(
123 postcard::to_allocvec(from).map_err(|e| anyhow::anyhow!(e))?,
124 ))
125 }
126}
127
128#[derive(Debug, Clone)]
132pub struct RecordPublisher {
133 dht: crate::dht::Dht,
134
135 config: crate::config::Config,
136
137 topic_id: TopicId,
138 pub_key: VerifyingKey,
139 signing_key: SigningKey,
140 secret_rotation: Option<crate::crypto::keys::RotationHandle>,
141 initial_secret_hash: [u8; 32],
142}
143
144#[derive(Debug)]
146pub struct RecordPublisherBuilder {
147 topic_id: TopicId,
148 signing_key: SigningKey,
149 secret_rotation: Option<crate::crypto::keys::RotationHandle>,
150 initial_secret: Vec<u8>,
151 config: crate::config::Config,
152}
153
154impl RecordPublisherBuilder {
155 pub fn secret_rotation(mut self, secret_rotation: crate::crypto::keys::RotationHandle) -> Self {
157 self.secret_rotation = Some(secret_rotation);
158 self
159 }
160
161 pub fn config(mut self, config: crate::config::Config) -> Self {
163 self.config = config;
164 self
165 }
166
167 pub fn build(self) -> RecordPublisher {
169 RecordPublisher::new(
170 self.topic_id,
171 self.signing_key,
172 self.secret_rotation,
173 self.initial_secret,
174 self.config,
175 )
176 }
177}
178
179impl RecordPublisher {
180 pub fn builder(
182 topic_id: impl Into<TopicId>,
183 signing_key: SigningKey,
184 initial_secret: impl Into<Vec<u8>>,
185 ) -> RecordPublisherBuilder {
186 RecordPublisherBuilder {
187 topic_id: topic_id.into(),
188 signing_key,
189 secret_rotation: None,
190 initial_secret: initial_secret.into(),
191 config: crate::config::Config::default(),
192 }
193 }
194
195 pub fn new(
205 topic_id: impl Into<TopicId>,
206 signing_key: SigningKey,
207 secret_rotation: Option<crate::crypto::keys::RotationHandle>,
208 initial_secret: impl Into<Vec<u8>>,
209 config: crate::config::Config,
210 ) -> Self {
211 let mut initial_secret_hash = sha2::Sha512::new();
212 initial_secret_hash.update(initial_secret.into());
213 let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
214 .try_into()
215 .expect("hashing failed");
216
217 Self {
218 dht: crate::dht::Dht::new(config.dht_config()),
219 config,
220 topic_id: topic_id.into(),
221 pub_key: signing_key.verifying_key(),
222 signing_key,
223 secret_rotation,
224 initial_secret_hash,
225 }
226 }
227
228 pub fn new_record<'a>(
235 &'a self,
236 unix_minute: u64,
237 record_content: impl Serialize + Deserialize<'a>,
238 ) -> Result<Record> {
239 Record::sign(
240 self.topic_id.hash(),
241 unix_minute,
242 record_content,
243 &self.signing_key,
244 )
245 }
246
247 pub fn pub_key(&self) -> ed25519_dalek::VerifyingKey {
249 self.pub_key
250 }
251
252 pub fn topic_id(&self) -> &TopicId {
254 &self.topic_id
255 }
256
257 pub fn signing_key(&self) -> &ed25519_dalek::SigningKey {
259 &self.signing_key
260 }
261
262 pub fn secret_rotation(&self) -> Option<crate::crypto::keys::RotationHandle> {
264 self.secret_rotation.clone()
265 }
266
267 pub fn initial_secret_hash(&self) -> [u8; 32] {
269 self.initial_secret_hash
270 }
271
272 pub fn config(&self) -> &Config {
274 &self.config
275 }
276}
277
278impl RecordPublisher {
279 pub async fn publish_record(&self, record: Record, cancel_token: CancellationToken) -> Result<()> {
284 self.publish_record_cached_records(record, None, cancel_token).await
285 }
286
287 pub async fn publish_record_cached_records(
292 &self,
293 record: Record,
294 cached_records: Option<HashSet<Record>>,
295 cancel_token: CancellationToken,
296 ) -> Result<()> {
297 let publish_fut = async {
298 let records = match cached_records {
299 Some(records) => records,
300 None => self.get_records(record.unix_minute(), cancel_token.clone()).await?,
301 };
302
303 tracing::debug!(
304 "RecordPublisher: found {} existing records for unix_minute {}",
305 records.len(),
306 record.unix_minute()
307 );
308
309 if records.len() >= self.config.bootstrap_config().max_bootstrap_records() {
310 tracing::debug!(
311 "RecordPublisher: max records reached ({}), skipping publish",
312 self.config.bootstrap_config().max_bootstrap_records()
313 );
314 return Ok(());
315 }
316
317 let sign_key = crate::crypto::keys::signing_keypair(self.topic_id(), record.unix_minute);
319 let salt = crate::crypto::keys::salt(self.topic_id(), record.unix_minute);
320 let encryption_key = crate::crypto::keys::encryption_keypair(
321 self.topic_id(),
322 &self.secret_rotation.clone().unwrap_or_default(),
323 self.initial_secret_hash,
324 record.unix_minute,
325 );
326 let encrypted_record = record.encrypt(&encryption_key);
327 let next_seq_num = i64::MAX;
328
329 tracing::debug!(
330 "RecordPublisher: publishing record to DHT for unix_minute {}",
331 record.unix_minute()
332 );
333
334 self.dht
335 .put_mutable(
336 sign_key.clone(),
337 Some(salt.to_vec()),
338 encrypted_record.to_bytes()?,
339 next_seq_num,
340 )
341 .await?;
342
343 tracing::debug!("RecordPublisher: successfully published to DHT");
344 Ok(())
345 };
346
347 tokio::select! {
348 _ = cancel_token.cancelled() => {
349 anyhow::bail!("publish cancelled");
350 }
351 res = publish_fut => {
352 res
353 }
354 }
355 }
356
357 pub async fn get_records(&self, unix_minute: u64, cancel_token: CancellationToken) -> Result<HashSet<Record>> {
362 let get_fut = async {
363 tracing::debug!(
364 "RecordPublisher: fetching records from DHT for unix_minute {}",
365 unix_minute
366 );
367
368 let topic_sign = crate::crypto::keys::signing_keypair(self.topic_id(), unix_minute);
369 let encryption_key = crate::crypto::keys::encryption_keypair(
370 self.topic_id(),
371 &self.secret_rotation.clone().unwrap_or_default(),
372 self.initial_secret_hash,
373 unix_minute,
374 );
375 let salt = crate::crypto::keys::salt(self.topic_id(), unix_minute);
376
377 let records_iter = self
379 .dht
380 .get(topic_sign.verifying_key(), Some(salt.to_vec()), None)
381 .await?;
382
383 tracing::debug!(
384 "RecordPublisher: received {} raw records from DHT",
385 records_iter.len()
386 );
387
388 let mut dedubed_records = HashMap::new();
389 for item in records_iter {
390 if let Ok(encrypted_record) = EncryptedRecord::from_bytes(item.value().to_vec())
391 && let Ok(record) = encrypted_record.decrypt(&encryption_key)
392 && record.verify(&self.topic_id.hash(), unix_minute).is_ok()
393 && !record.pub_key().eq(self.pub_key.as_bytes())
394 {
395 let pub_key = record.pub_key();
396 match dedubed_records.get(&pub_key) {
397 Some((seq, _)) if *seq >= item.seq() => {}
398 _ => {
399 dedubed_records.insert(pub_key, (item.seq(), record));
400 }
401 }
402 }
403 }
404 tracing::debug!(
405 "RecordPublisher: verified {} records (filtered self)",
406 dedubed_records.len()
407 );
408
409 Ok(dedubed_records
410 .into_values()
411 .map(|(_, record)| record)
412 .collect::<HashSet<_>>())
413 };
414
415 tokio::select! {
416 _ = cancel_token.cancelled() => {
417 anyhow::bail!("get_records cancelled");
418 }
419 res = get_fut => {
420 res
421 }
422 }
423 }
424}
425
426impl EncryptedRecord {
427 const MAX_SIZE: usize = 2048;
428
429 pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
431 let one_time_key_bytes: [u8; 32] = decryption_key
432 .decrypt(&self.encrypted_decryption_key)?
433 .as_slice()
434 .try_into()?;
435 let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
436
437 let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
438 let record = Record::from_bytes(decrypted_record)?;
439 Ok(record)
440 }
441
442 pub fn to_bytes(&self) -> Result<Vec<u8>> {
444 let mut buf = Vec::new();
445 let encrypted_record_len = self.encrypted_record.len() as u32;
446 buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
447 buf.extend_from_slice(&self.encrypted_record);
448 buf.extend_from_slice(&self.encrypted_decryption_key);
449
450 if buf.len() > Self::MAX_SIZE {
451 bail!(
452 "EncryptedRecord serialization exceeds maximum size, the max is set generously so this should never happen, if so your code is using it manually"
453 );
454 }
455 Ok(buf)
456 }
457
458 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
460 if buf.len() < 4 {
461 bail!("buffer too short for EncryptedRecord deserialization")
462 }
463 let (encrypted_record_len, buf) = buf.split_at(4);
464 let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
465 const ENCRYPTED_KEY_LENGTH: usize = 88;
466 let expected_payload_len = encrypted_record_len
467 .checked_add(ENCRYPTED_KEY_LENGTH as u32)
468 .ok_or_else(|| anyhow::anyhow!("encrypted record length overflow"))?;
469 if encrypted_record_len > Self::MAX_SIZE as u32 {
470 bail!("encrypted record length exceeds maximum allowed size")
471 }
472 if buf.len() != expected_payload_len as usize {
473 bail!("buffer length does not match expected encrypted record length")
474 }
475 let (encrypted_record, encrypted_decryption_key) =
476 buf.split_at(encrypted_record_len as usize);
477
478 Ok(Self {
479 encrypted_record: encrypted_record.to_vec(),
480 encrypted_decryption_key: encrypted_decryption_key.to_vec(),
481 })
482 }
483}
484
485impl Record {
486 pub fn sign<'a>(
488 topic: [u8; 32],
489 unix_minute: u64,
490 record_content: impl Serialize + Deserialize<'a>,
491 signing_key: &ed25519_dalek::SigningKey,
492 ) -> anyhow::Result<Self> {
493 let record_content = RecordContent::from_arbitrary(&record_content)?;
494 let mut signature_data = Vec::new();
495 signature_data.extend_from_slice(&topic);
496 signature_data.extend_from_slice(&unix_minute.to_le_bytes());
497 signature_data.extend_from_slice(&signing_key.verifying_key().to_bytes());
498 signature_data.extend(&record_content.clone().0);
499 let signature = signing_key.sign(&signature_data);
500 Ok(Self {
501 topic,
502 unix_minute,
503 pub_key: signing_key.verifying_key().to_bytes(),
504 content: record_content,
505 signature: signature.to_bytes(),
506 })
507 }
508
509 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
511 if buf.len() < 32 + 8 + 32 + 64 {
512 bail!("buffer too short for Record deserialization")
513 }
514 let (topic, buf) = buf.split_at(32);
515 let (unix_minute, buf) = buf.split_at(8);
516 let (pub_key, buf) = buf.split_at(32);
517 let (record_content, buf) = buf.split_at(buf.len() - 64);
518
519 let (signature, buf) = buf.split_at(64);
520
521 if !buf.is_empty() {
522 bail!("buffer not empty after reconstruction")
523 }
524
525 Ok(Self {
526 topic: topic.try_into()?,
527 unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
528 pub_key: pub_key.try_into()?,
529 content: RecordContent(record_content.to_vec()),
530 signature: signature.try_into()?,
531 })
532 }
533
534 pub fn to_bytes(&self) -> Vec<u8> {
536 let mut buf = Vec::new();
537 buf.extend_from_slice(&self.topic);
538 buf.extend_from_slice(&self.unix_minute.to_le_bytes());
539 buf.extend_from_slice(&self.pub_key);
540 buf.extend(&self.content.0);
541 buf.extend_from_slice(&self.signature);
542 buf
543 }
544
545 pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
547 if self.topic != *actual_topic {
548 bail!("topic mismatch")
549 }
550 if self.unix_minute != actual_unix_minute {
551 bail!("unix minute mismatch")
552 }
553
554 let record_bytes = self.to_bytes();
555 let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
556 let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
557 let pub_key = ed25519_dalek::VerifyingKey::from_bytes(&self.pub_key)?;
558
559 pub_key.verify_strict(signature_data.as_slice(), &signature)?;
560
561 Ok(())
562 }
563
564 pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
566 let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::rng());
567 let p_key = one_time_key.verifying_key();
568 let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
569 let key_enc = encryption_key
570 .verifying_key()
571 .encrypt(&one_time_key.to_bytes())
572 .expect("encryption failed");
573
574 EncryptedRecord {
575 encrypted_record: data_enc,
576 encrypted_decryption_key: key_enc,
577 }
578 }
579}
580
581impl Record {
583 pub fn topic(&self) -> [u8; 32] {
585 self.topic
586 }
587
588 pub fn unix_minute(&self) -> u64 {
590 self.unix_minute
591 }
592
593 pub fn pub_key(&self) -> [u8; 32] {
595 self.pub_key
596 }
597
598 pub fn content<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
600 self.content.to::<T>()
601 }
602
603 pub fn signature(&self) -> [u8; 64] {
605 self.signature
606 }
607}