distributed_topic_tracker/crypto/
record.rs1use std::{
2 collections::{HashMap, HashSet},
3 str::FromStr,
4};
5
6use anyhow::{Result, bail};
7use ed25519_dalek::{Signer, SigningKey, VerifyingKey};
8use getrandom::{SysRng, rand_core::UnwrapErr};
9
10use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
11use serde::{Deserialize, Serialize};
12use sha2::Digest;
13use tokio_util::sync::CancellationToken;
14
15use crate::{Config, Dht, RotationHandle};
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct TopicId([u8; 32]);
28
29impl FromStr for TopicId {
30 type Err = anyhow::Error;
31
32 fn from_str(topic_name: &str) -> std::result::Result<Self, Self::Err> {
33 Ok(Self::new(topic_name.to_string()))
34 }
35}
36
37impl From<&str> for TopicId {
38 fn from(topic_name: &str) -> Self {
39 Self::new(topic_name.to_string())
40 }
41}
42
43impl From<String> for TopicId {
44 fn from(topic_name: String) -> Self {
45 Self::new(topic_name)
46 }
47}
48impl From<Vec<u8>> for TopicId {
51 fn from(topic_name: Vec<u8>) -> Self {
52 Self::new(topic_name)
53 }
54}
55
56impl TopicId {
57 pub fn new(topic_name: impl Into<Vec<u8>>) -> Self {
61 let mut topic_name_hash = sha2::Sha512::new();
62 topic_name_hash.update(topic_name.into());
63
64 Self(
65 topic_name_hash.finalize()[..32]
66 .try_into()
67 .expect("hashing 'topic_name' failed"),
68 )
69 }
70
71 pub fn from_hash(bytes: &[u8; 32]) -> Self {
73 Self(*bytes)
74 }
75
76 pub fn hash(&self) -> [u8; 32] {
78 self.0
79 }
80}
81
82#[derive(Debug, Clone)]
87pub struct EncryptedRecord {
88 encrypted_record: Vec<u8>,
89 encrypted_decryption_key: Vec<u8>,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq, Hash)]
97pub struct Record {
98 topic: [u8; 32],
99 unix_minute: u64,
100 pub_key: [u8; 32],
101 content: RecordContent,
102 signature: [u8; 64],
103}
104
105#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
107pub struct RecordContent(pub Vec<u8>);
108
109impl RecordContent {
110 pub fn to<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
118 postcard::from_bytes::<T>(&self.0).map_err(|e| anyhow::anyhow!(e))
119 }
120
121 pub fn from_arbitrary<T: Serialize>(from: &T) -> anyhow::Result<Self> {
123 Ok(Self(
124 postcard::to_allocvec(from).map_err(|e| anyhow::anyhow!(e))?,
125 ))
126 }
127}
128
129#[derive(Debug, Clone)]
133pub struct RecordPublisher {
134 dht: Dht,
135
136 config: Config,
137
138 topic_id: TopicId,
139 pub_key: VerifyingKey,
140 signing_key: SigningKey,
141 secret_rotation: Option<RotationHandle>,
142 initial_secret_hash: [u8; 32],
143}
144
145#[derive(Debug)]
147pub struct RecordPublisherBuilder {
148 topic_id: TopicId,
149 signing_key: SigningKey,
150 secret_rotation: Option<RotationHandle>,
151 initial_secret: Vec<u8>,
152 config: Config,
153}
154
155impl RecordPublisherBuilder {
156 pub fn secret_rotation(mut self, secret_rotation: RotationHandle) -> Self {
158 self.secret_rotation = Some(secret_rotation);
159 self
160 }
161
162 pub fn config(mut self, config: Config) -> Self {
164 self.config = config;
165 self
166 }
167
168 pub fn build(self) -> RecordPublisher {
170 RecordPublisher::new(
171 self.topic_id,
172 self.signing_key,
173 self.secret_rotation,
174 self.initial_secret,
175 self.config,
176 )
177 }
178}
179
180impl RecordPublisher {
181 pub fn builder(
183 topic_id: impl Into<TopicId>,
184 signing_key: SigningKey,
185 initial_secret: impl Into<Vec<u8>>,
186 ) -> RecordPublisherBuilder {
187 RecordPublisherBuilder {
188 topic_id: topic_id.into(),
189 signing_key,
190 secret_rotation: None,
191 initial_secret: initial_secret.into(),
192 config: Config::default(),
193 }
194 }
195
196 pub fn new(
206 topic_id: impl Into<TopicId>,
207 signing_key: SigningKey,
208 secret_rotation: Option<RotationHandle>,
209 initial_secret: impl Into<Vec<u8>>,
210 config: Config,
211 ) -> Self {
212 let mut initial_secret_hash = sha2::Sha512::new();
213 initial_secret_hash.update(initial_secret.into());
214 let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
215 .try_into()
216 .expect("hashing failed");
217
218 Self {
219 dht: Dht::new(config.dht_config()),
220 config,
221 topic_id: topic_id.into(),
222 pub_key: signing_key.verifying_key(),
223 signing_key,
224 secret_rotation,
225 initial_secret_hash,
226 }
227 }
228
229 pub fn new_record<'a>(
236 &'a self,
237 unix_minute: u64,
238 record_content: impl Serialize + Deserialize<'a>,
239 ) -> Result<Record> {
240 Record::sign(
241 self.topic_id.hash(),
242 unix_minute,
243 record_content,
244 &self.signing_key,
245 )
246 }
247
248 pub fn pub_key(&self) -> ed25519_dalek::VerifyingKey {
250 self.pub_key
251 }
252
253 pub fn topic_id(&self) -> &TopicId {
255 &self.topic_id
256 }
257
258 pub fn signing_key(&self) -> &ed25519_dalek::SigningKey {
260 &self.signing_key
261 }
262
263 pub fn secret_rotation(&self) -> Option<RotationHandle> {
265 self.secret_rotation.clone()
266 }
267
268 pub fn initial_secret_hash(&self) -> [u8; 32] {
270 self.initial_secret_hash
271 }
272
273 pub fn config(&self) -> &Config {
275 &self.config
276 }
277}
278
279impl RecordPublisher {
280 pub async fn publish_record(&self, record: Record, cancel_token: CancellationToken) -> Result<()> {
285 self.publish_record_cached_records(record, None, cancel_token).await
286 }
287
288 pub async fn publish_record_cached_records(
293 &self,
294 record: Record,
295 cached_records: Option<HashSet<Record>>,
296 cancel_token: CancellationToken,
297 ) -> Result<()> {
298 let publish_fut = async {
299 let records = match cached_records {
300 Some(records) => records,
301 None => self.get_records(record.unix_minute(), cancel_token.clone()).await?,
302 };
303
304 tracing::debug!(
305 "RecordPublisher: found {} existing records for unix_minute {}",
306 records.len(),
307 record.unix_minute()
308 );
309
310 if records.len() >= self.config.bootstrap_config().max_bootstrap_records() {
311 tracing::debug!(
312 "RecordPublisher: max records reached ({}), skipping publish",
313 self.config.bootstrap_config().max_bootstrap_records()
314 );
315 return Ok(());
316 }
317
318 let sign_key = crate::crypto::keys::signing_keypair(self.topic_id(), record.unix_minute);
320 let salt = crate::crypto::keys::salt(self.topic_id(), record.unix_minute);
321 let encryption_key = crate::crypto::keys::encryption_keypair(
322 self.topic_id(),
323 &self.secret_rotation.clone().unwrap_or_default(),
324 self.initial_secret_hash,
325 record.unix_minute,
326 );
327 let encrypted_record = record.encrypt(&encryption_key);
328 let next_seq_num = i64::MAX;
329
330 tracing::debug!(
331 "RecordPublisher: publishing record to DHT for unix_minute {}",
332 record.unix_minute()
333 );
334
335 self.dht
336 .put_mutable(
337 sign_key.clone(),
338 Some(salt.to_vec()),
339 encrypted_record.to_bytes()?,
340 next_seq_num,
341 )
342 .await?;
343
344 tracing::debug!("RecordPublisher: successfully published to DHT");
345 Ok(())
346 };
347
348 tokio::select! {
349 _ = cancel_token.cancelled() => {
350 anyhow::bail!("publish cancelled");
351 }
352 res = publish_fut => {
353 res
354 }
355 }
356 }
357
358 pub async fn get_records(&self, unix_minute: u64, cancel_token: CancellationToken) -> Result<HashSet<Record>> {
363 let get_fut = async {
364 tracing::debug!(
365 "RecordPublisher: fetching records from DHT for unix_minute {}",
366 unix_minute
367 );
368
369 let topic_sign = crate::crypto::keys::signing_keypair(self.topic_id(), unix_minute);
370 let encryption_key = crate::crypto::keys::encryption_keypair(
371 self.topic_id(),
372 &self.secret_rotation.clone().unwrap_or_default(),
373 self.initial_secret_hash,
374 unix_minute,
375 );
376 let salt = crate::crypto::keys::salt(self.topic_id(), unix_minute);
377
378 let records_iter = self
380 .dht
381 .get(topic_sign.verifying_key(), Some(salt.to_vec()), None)
382 .await?;
383
384 tracing::debug!(
385 "RecordPublisher: received {} raw records from DHT",
386 records_iter.len()
387 );
388
389 let mut dedubed_records = HashMap::new();
390 for item in records_iter {
391 if let Ok(encrypted_record) = EncryptedRecord::from_bytes(item.value().to_vec())
392 && let Ok(record) = encrypted_record.decrypt(&encryption_key)
393 && record.verify(&self.topic_id.hash(), unix_minute).is_ok()
394 && !record.pub_key().eq(self.pub_key.as_bytes())
395 {
396 let pub_key = record.pub_key();
397 match dedubed_records.get(&pub_key) {
398 Some((seq, _)) if *seq >= item.seq() => {}
399 _ => {
400 dedubed_records.insert(pub_key, (item.seq(), record));
401 }
402 }
403 }
404 }
405 tracing::debug!(
406 "RecordPublisher: verified {} records (filtered self)",
407 dedubed_records.len()
408 );
409
410 Ok(dedubed_records
411 .into_values()
412 .map(|(_, record)| record)
413 .collect::<HashSet<_>>())
414 };
415
416 tokio::select! {
417 _ = cancel_token.cancelled() => {
418 anyhow::bail!("get_records cancelled");
419 }
420 res = get_fut => {
421 res
422 }
423 }
424 }
425}
426
427impl EncryptedRecord {
428 const MAX_SIZE: usize = 2048;
429
430 pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
432 let one_time_key_bytes: [u8; 32] = decryption_key
433 .decrypt(&self.encrypted_decryption_key)?
434 .as_slice()
435 .try_into()?;
436 let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
437
438 let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
439 let record = Record::from_bytes(decrypted_record)?;
440 Ok(record)
441 }
442
443 pub fn to_bytes(&self) -> Result<Vec<u8>> {
445 let mut buf = Vec::new();
446 let encrypted_record_len = self.encrypted_record.len() as u32;
447 buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
448 buf.extend_from_slice(&self.encrypted_record);
449 buf.extend_from_slice(&self.encrypted_decryption_key);
450
451 if buf.len() > Self::MAX_SIZE {
452 bail!(
453 "EncryptedRecord serialization exceeds maximum size, the max is set generously so this should never happen, if so your code is using it manually"
454 );
455 }
456 Ok(buf)
457 }
458
459 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
461 if buf.len() < 4 {
462 bail!("buffer too short for EncryptedRecord deserialization")
463 }
464 let (encrypted_record_len, buf) = buf.split_at(4);
465 let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
466 const ENCRYPTED_KEY_LENGTH: usize = 88;
467 let expected_payload_len = encrypted_record_len
468 .checked_add(ENCRYPTED_KEY_LENGTH as u32)
469 .ok_or_else(|| anyhow::anyhow!("encrypted record length overflow"))?;
470 if encrypted_record_len > Self::MAX_SIZE as u32 {
471 bail!("encrypted record length exceeds maximum allowed size")
472 }
473 if buf.len() != expected_payload_len as usize {
474 bail!("buffer length does not match expected encrypted record length")
475 }
476 let (encrypted_record, encrypted_decryption_key) =
477 buf.split_at(encrypted_record_len as usize);
478
479 Ok(Self {
480 encrypted_record: encrypted_record.to_vec(),
481 encrypted_decryption_key: encrypted_decryption_key.to_vec(),
482 })
483 }
484}
485
486impl Record {
487 pub fn sign<'a>(
489 topic: [u8; 32],
490 unix_minute: u64,
491 record_content: impl Serialize + Deserialize<'a>,
492 signing_key: &ed25519_dalek::SigningKey,
493 ) -> anyhow::Result<Self> {
494 let record_content = RecordContent::from_arbitrary(&record_content)?;
495 let mut signature_data = Vec::new();
496 signature_data.extend_from_slice(&topic);
497 signature_data.extend_from_slice(&unix_minute.to_le_bytes());
498 signature_data.extend_from_slice(&signing_key.verifying_key().to_bytes());
499 signature_data.extend(&record_content.clone().0);
500 let signature = signing_key.sign(&signature_data);
501 Ok(Self {
502 topic,
503 unix_minute,
504 pub_key: signing_key.verifying_key().to_bytes(),
505 content: record_content,
506 signature: signature.to_bytes(),
507 })
508 }
509
510 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
512 if buf.len() < 32 + 8 + 32 + 64 {
513 bail!("buffer too short for Record deserialization")
514 }
515 let (topic, buf) = buf.split_at(32);
516 let (unix_minute, buf) = buf.split_at(8);
517 let (pub_key, buf) = buf.split_at(32);
518 let (record_content, buf) = buf.split_at(buf.len() - 64);
519
520 let (signature, buf) = buf.split_at(64);
521
522 if !buf.is_empty() {
523 bail!("buffer not empty after reconstruction")
524 }
525
526 Ok(Self {
527 topic: topic.try_into()?,
528 unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
529 pub_key: pub_key.try_into()?,
530 content: RecordContent(record_content.to_vec()),
531 signature: signature.try_into()?,
532 })
533 }
534
535 pub fn to_bytes(&self) -> Vec<u8> {
537 let mut buf = Vec::new();
538 buf.extend_from_slice(&self.topic);
539 buf.extend_from_slice(&self.unix_minute.to_le_bytes());
540 buf.extend_from_slice(&self.pub_key);
541 buf.extend(&self.content.0);
542 buf.extend_from_slice(&self.signature);
543 buf
544 }
545
546 pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
548 if self.topic != *actual_topic {
549 bail!("topic mismatch")
550 }
551 if self.unix_minute != actual_unix_minute {
552 bail!("unix minute mismatch")
553 }
554
555 let record_bytes = self.to_bytes();
556 let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
557 let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
558 let pub_key = ed25519_dalek::VerifyingKey::from_bytes(&self.pub_key)?;
559
560 pub_key.verify_strict(signature_data.as_slice(), &signature)?;
561
562 Ok(())
563 }
564
565 pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
567 let mut csprng = UnwrapErr(SysRng);
568 let one_time_key = ed25519_dalek::SigningKey::generate(&mut csprng);
569 let p_key = one_time_key.verifying_key();
570 let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
571 let key_enc = encryption_key
572 .verifying_key()
573 .encrypt(&one_time_key.to_bytes())
574 .expect("encryption failed");
575
576 EncryptedRecord {
577 encrypted_record: data_enc,
578 encrypted_decryption_key: key_enc,
579 }
580 }
581}
582
583impl Record {
585 pub fn topic(&self) -> [u8; 32] {
587 self.topic
588 }
589
590 pub fn unix_minute(&self) -> u64 {
592 self.unix_minute
593 }
594
595 pub fn pub_key(&self) -> [u8; 32] {
597 self.pub_key
598 }
599
600 pub fn content<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
602 self.content.to::<T>()
603 }
604
605 pub fn signature(&self) -> [u8; 64] {
607 self.signature
608 }
609}