distributed_topic_tracker/crypto/
record.rs1use std::{collections::HashSet, str::FromStr, time::Duration};
2
3use anyhow::{Result, bail};
4use ed25519_dalek::{Signer, SigningKey, VerifyingKey};
5
6use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
7use serde::{Deserialize, Serialize};
8use sha2::Digest;
9
10#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
20pub struct RecordTopic([u8; 32]);
21
22impl FromStr for RecordTopic {
23 type Err = anyhow::Error;
24
25 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
26 let mut hasher = sha2::Sha512::new();
27 hasher.update(s.as_bytes());
28 let hash: [u8; 32] = hasher.finalize()[..32]
29 .try_into()
30 .map_err(|_| anyhow::anyhow!("hashing failed"))?;
31 Ok(RecordTopic(hash))
32 }
33}
34
35impl RecordTopic {
36 pub fn from_bytes(bytes: &[u8; 32]) -> Self {
38 Self(*bytes)
39 }
40
41 pub fn hash(&self) -> [u8; 32] {
43 self.0
44 }
45}
46
47#[derive(Debug, Clone)]
52pub struct EncryptedRecord {
53 encrypted_record: Vec<u8>,
54 encrypted_decryption_key: Vec<u8>,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, Hash)]
62pub struct Record {
63 topic: [u8; 32],
64 unix_minute: u64,
65 pub_key: [u8; 32],
66 content: RecordContent,
67 signature: [u8; 64],
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
72pub struct RecordContent(pub Vec<u8>);
73
74impl RecordContent {
75 pub fn to<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
83 postcard::from_bytes::<T>(&self.0).map_err(|e| anyhow::anyhow!(e))
84 }
85
86 pub fn from_arbitrary<T: Serialize>(from: &T) -> anyhow::Result<Self> {
88 Ok(Self(
89 postcard::to_allocvec(from).map_err(|e| anyhow::anyhow!(e))?,
90 ))
91 }
92}
93
94#[derive(Debug, Clone)]
98pub struct RecordPublisher {
99 dht: crate::dht::Dht,
100
101 record_topic: RecordTopic,
102 pub_key: VerifyingKey,
103 signing_key: SigningKey,
104 secret_rotation: Option<crate::crypto::keys::RotationHandle>,
105 initial_secret_hash: [u8; 32],
106}
107
108impl RecordPublisher {
109 pub fn new(
119 record_topic: impl Into<RecordTopic>,
120 pub_key: VerifyingKey,
121 signing_key: SigningKey,
122 secret_rotation: Option<crate::crypto::keys::RotationHandle>,
123 initial_secret: Vec<u8>,
124 ) -> Self {
125 let mut initial_secret_hash = sha2::Sha512::new();
126 initial_secret_hash.update(initial_secret);
127 let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
128 .try_into()
129 .expect("hashing failed");
130
131 Self {
132 dht: crate::dht::Dht::new(),
133 record_topic: record_topic.into(),
134 pub_key,
135 signing_key,
136 secret_rotation,
137 initial_secret_hash,
138 }
139 }
140
141 pub fn new_record<'a>(
148 &'a self,
149 unix_minute: u64,
150 record_content: impl Serialize + Deserialize<'a>,
151 ) -> Result<Record> {
152 Record::sign(
153 self.record_topic.hash(),
154 unix_minute,
155 self.pub_key.to_bytes(),
156 record_content,
157 &self.signing_key,
158 )
159 }
160
161 pub fn pub_key(&self) -> ed25519_dalek::VerifyingKey {
163 self.pub_key
164 }
165
166 pub fn record_topic(&self) -> RecordTopic {
168 self.record_topic
169 }
170
171 pub fn signing_key(&self) -> ed25519_dalek::SigningKey {
173 self.signing_key.clone()
174 }
175
176 pub fn secret_rotation(&self) -> Option<crate::crypto::keys::RotationHandle> {
178 self.secret_rotation.clone()
179 }
180
181 pub fn initial_secret_hash(&self) -> [u8; 32] {
183 self.initial_secret_hash
184 }
185}
186
187impl RecordPublisher {
188 pub async fn publish_record(&self, record: Record) -> Result<()> {
193 let records = self
194 .get_records(record.unix_minute())
195 .await
196 .iter()
197 .cloned()
198 .collect::<HashSet<_>>();
199
200 tracing::debug!("RecordPublisher: found {} existing records for unix_minute {}", records.len(), record.unix_minute());
201
202 if records.len() >= crate::MAX_BOOTSTRAP_RECORDS {
203 tracing::debug!("RecordPublisher: max records reached ({}), skipping publish", crate::MAX_BOOTSTRAP_RECORDS);
204 return Ok(());
205 }
206
207 let sign_key = crate::crypto::keys::signing_keypair(self.record_topic, record.unix_minute);
209 let salt = crate::crypto::keys::salt(self.record_topic, record.unix_minute);
210 let encryption_key = crate::crypto::keys::encryption_keypair(
211 self.record_topic,
212 &self.secret_rotation.clone().unwrap_or_default(),
213 self.initial_secret_hash,
214 record.unix_minute,
215 );
216 let encrypted_record = record.encrypt(&encryption_key);
217
218 tracing::debug!("RecordPublisher: publishing record to DHT for unix_minute {}", record.unix_minute());
219
220 self.dht
221 .put_mutable(
222 sign_key.clone(),
223 sign_key.verifying_key(),
224 Some(salt.to_vec()),
225 encrypted_record.to_bytes().to_vec(),
226 Some(3),
227 Duration::from_secs(10),
228 )
229 .await?;
230
231 tracing::debug!("RecordPublisher: successfully published to DHT");
232 Ok(())
233 }
234
235 pub async fn get_records(&self, unix_minute: u64) -> HashSet<Record> {
239 tracing::debug!("RecordPublisher: fetching records from DHT for unix_minute {}", unix_minute);
240
241 let topic_sign = crate::crypto::keys::signing_keypair(self.record_topic, unix_minute);
242 let encryption_key = crate::crypto::keys::encryption_keypair(
243 self.record_topic,
244 &self.secret_rotation.clone().unwrap_or_default(),
245 self.initial_secret_hash,
246 unix_minute,
247 );
248 let salt = crate::crypto::keys::salt(self.record_topic, unix_minute);
249
250 let records_iter = self
252 .dht
253 .get(
254 topic_sign.verifying_key(),
255 Some(salt.to_vec()),
256 None,
257 Duration::from_secs(10),
258 )
259 .await
260 .unwrap_or_default();
261
262 tracing::debug!("RecordPublisher: received {} raw records from DHT", records_iter.len());
263
264 let verified_records = records_iter
265 .iter()
266 .filter_map(
267 |record| match EncryptedRecord::from_bytes(record.value().to_vec()) {
268 Ok(encrypted_record) => match encrypted_record.decrypt(&encryption_key) {
269 Ok(record) => match record.verify(&self.record_topic.hash(), unix_minute) {
270 Ok(_) => match record.node_id().eq(self.pub_key.as_bytes()) {
271 true => {
272 tracing::debug!("RecordPublisher: filtered out self");
273 None
274 },
275 false => Some(record),
276 },
277 Err(_) => None,
278 },
279 Err(_) => None,
280 },
281 Err(_) => None,
282 },
283 )
284 .collect::<HashSet<_>>();
285
286 tracing::debug!("RecordPublisher: verified {} records (filtered self)", verified_records.len());
287 verified_records
288 }
289}
290
291impl EncryptedRecord {
292 pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
294 let one_time_key_bytes: [u8; 32] = decryption_key
295 .decrypt(&self.encrypted_decryption_key)?
296 .as_slice()
297 .try_into()?;
298 let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
299
300 let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
301 let record = Record::from_bytes(decrypted_record)?;
302 Ok(record)
303 }
304
305 pub fn to_bytes(&self) -> Vec<u8> {
307 let mut buf = Vec::new();
308 let encrypted_record_len = self.encrypted_record.len() as u32;
309 buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
310 buf.extend_from_slice(&self.encrypted_record);
311 buf.extend_from_slice(&self.encrypted_decryption_key);
312 buf
313 }
314
315 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
317 let (encrypted_record_len, buf) = buf.split_at(4);
318 let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
319 let (encrypted_record, encrypted_decryption_key) =
320 buf.split_at(encrypted_record_len as usize);
321
322 Ok(Self {
323 encrypted_record: encrypted_record.to_vec(),
324 encrypted_decryption_key: encrypted_decryption_key.to_vec(),
325 })
326 }
327}
328
329impl Record {
330 pub fn sign<'a>(
332 topic: [u8; 32],
333 unix_minute: u64,
334 node_id: [u8; 32],
335 record_content: impl Serialize + Deserialize<'a>,
336 signing_key: &ed25519_dalek::SigningKey,
337 ) -> anyhow::Result<Self> {
338 let record_content = RecordContent::from_arbitrary(&record_content)?;
339 let mut signature_data = Vec::new();
340 signature_data.extend_from_slice(&topic);
341 signature_data.extend_from_slice(&unix_minute.to_le_bytes());
342 signature_data.extend_from_slice(&node_id);
343 signature_data.extend(&record_content.clone().0);
344 let signing_key = signing_key.clone();
345 let signature = signing_key.sign(&signature_data);
346 Ok(Self {
347 topic,
348 unix_minute,
349 pub_key: node_id,
350 content: record_content,
351 signature: signature.to_bytes(),
352 })
353 }
354
355 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
357 let (topic, buf) = buf.split_at(32);
358 let (unix_minute, buf) = buf.split_at(8);
359 let (node_id, buf) = buf.split_at(32);
360 let (record_content, buf) = buf.split_at(buf.len() - 64);
361
362 let (signature, buf) = buf.split_at(64);
363
364 if !buf.is_empty() {
365 bail!("buffer not empty after reconstruction")
366 }
367
368 Ok(Self {
369 topic: topic.try_into()?,
370 unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
371 pub_key: node_id.try_into()?,
372 content: RecordContent(record_content.to_vec()),
373 signature: signature.try_into()?,
374 })
375 }
376
377 pub fn to_bytes(&self) -> Vec<u8> {
379 let mut buf = Vec::new();
380 buf.extend_from_slice(&self.topic);
381 buf.extend_from_slice(&self.unix_minute.to_le_bytes());
382 buf.extend_from_slice(&self.pub_key);
383 buf.extend(&self.content.0);
384 buf.extend_from_slice(&self.signature);
385 buf
386 }
387
388 pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
390 if self.topic != *actual_topic {
391 bail!("topic mismatch")
392 }
393 if self.unix_minute != actual_unix_minute {
394 bail!("unix minute mismatch")
395 }
396
397 let record_bytes = self.to_bytes();
398 let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
399 let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
400 let node_id = ed25519_dalek::VerifyingKey::from_bytes(&self.pub_key)?;
401
402 node_id.verify_strict(signature_data.as_slice(), &signature)?;
403
404 Ok(())
405 }
406
407 pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
409 let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::rng());
410 let p_key = one_time_key.verifying_key();
411 let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
412 let key_enc = encryption_key
413 .verifying_key()
414 .encrypt(&one_time_key.to_bytes())
415 .expect("encryption failed");
416
417 EncryptedRecord {
418 encrypted_record: data_enc,
419 encrypted_decryption_key: key_enc,
420 }
421 }
422}
423
424impl Record {
426 pub fn topic(&self) -> [u8; 32] {
428 self.topic
429 }
430
431 pub fn unix_minute(&self) -> u64 {
433 self.unix_minute
434 }
435
436 pub fn node_id(&self) -> [u8; 32] {
438 self.pub_key
439 }
440
441 pub fn content<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
443 self.content.to::<T>()
444 }
445
446 pub fn signature(&self) -> [u8; 64] {
448 self.signature
449 }
450}