distributed_topic_tracker/crypto/
record.rs1use std::{collections::HashSet, time::Duration};
2
3use anyhow::{Result, bail};
4use ed25519_dalek::ed25519::signature::SignerMut;
5use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
6use iroh::NodeId;
7use sha2::Digest;
8
9#[derive(Debug, Clone)]
10pub struct EncryptedRecord {
11 encrypted_record: Vec<u8>,
12 encrypted_decryption_key: Vec<u8>,
13}
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub struct Record {
17 topic: [u8; 32],
18 unix_minute: u64,
19 node_id: [u8; 32],
20 active_peers: [[u8; 32]; 5],
21 last_message_hashes: [[u8; 32]; 5],
22 signature: [u8; 64],
23}
24
25#[derive(Debug,Clone)]
26pub struct RecordPublisher {
27 dht: crate::dht::Dht,
28
29 topic_id: crate::topic::topic::TopicId,
30 node_id: iroh::NodeId,
31 signing_key: mainline::SigningKey,
32 secret_rotation: Option<crate::crypto::keys::RotationHandle>,
33 initial_secret_hash: [u8; 32],
34}
35
36impl RecordPublisher {
37 pub fn new(
38 topic_id: crate::topic::topic::TopicId,
39 node_id: iroh::NodeId,
40 signing_key: mainline::SigningKey,
41 secret_rotation: Option<crate::crypto::keys::RotationHandle>,
42 initial_secret: Vec<u8>,
43 ) -> Self {
44 let mut initial_secret_hash = sha2::Sha512::new();
45 initial_secret_hash.update(initial_secret);
46 let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
47 .try_into()
48 .expect("hashing failed");
49
50 Self {
51 dht: crate::dht::Dht::new(),
52 topic_id,
53 node_id,
54 signing_key,
55 secret_rotation,
56 initial_secret_hash,
57 }
58 }
59
60 pub fn new_record(
61 &self,
62 unix_minute: u64,
63 neighbors: Vec<NodeId>,
64 last_message_hashes: Vec<[u8; 32]>,
65 ) -> Record {
66 let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
67 for (i, peer) in neighbors.iter().take(5).enumerate() {
68 active_peers[i] = *peer.as_bytes()
69 }
70
71 let mut last_message_hashes_array = [[0u8; 32]; 5];
72 for (i, hash) in last_message_hashes.iter().take(5).enumerate() {
73 last_message_hashes_array[i] = *hash;
74 }
75
76 Record::sign(
77 self.topic_id.hash(),
78 unix_minute,
79 self.node_id.public().to_bytes(),
80 active_peers,
81 last_message_hashes_array,
82 &self.signing_key,
83 )
84 }
85
86 pub fn node_id(&self) -> iroh::NodeId {
87 self.node_id.clone()
88 }
89
90 pub fn topic_id(&self) -> crate::topic::topic::TopicId {
91 self.topic_id.clone()
92 }
93
94 pub fn signing_key(&self) -> mainline::SigningKey {
95 self.signing_key.clone()
96 }
97
98 pub fn secret_rotation(&self) -> Option<crate::crypto::keys::RotationHandle> {
99 self.secret_rotation.clone()
100 }
101
102 pub fn initial_secret_hash(&self) -> [u8; 32] {
103 self.initial_secret_hash
104 }
105}
106
107impl RecordPublisher {
108
109 pub async fn publish_record(&self, record: Record) -> Result<()> {
111 let records = self.get_records(record.unix_minute())
113 .await
114 .iter()
115 .filter(|&record| {
116 record
117 .active_peers()
118 .iter()
119 .filter(|&peer| peer.eq(&[0u8; 32]))
120 .count()
121 > 0
122 || record
123 .last_message_hashes()
124 .iter()
125 .filter(|&hash| hash.eq(&[0u8; 32]))
126 .count()
127 > 0
128 })
129 .cloned()
130 .collect::<HashSet<_>>();
131
132 if records.len() >= crate::MAX_BOOTSTRAP_RECORDS {
135 return Ok(());
136 }
137
138 let sign_key = crate::crypto::keys::signing_keypair(&self.topic_id.clone(), record.unix_minute);
140 let salt = crate::crypto::keys::salt(&self.topic_id, record.unix_minute);
141 let encryption_key = crate::crypto::keys::encryption_keypair(
142 &self.topic_id.clone(),
143 &self.secret_rotation.clone().unwrap_or_default(),
144 self.initial_secret_hash,
145 record.unix_minute,
146 );
147 let encrypted_record = record.encrypt(&encryption_key);
148
149 self.dht.put_mutable(
150 sign_key.clone(),
151 sign_key.verifying_key().into(),
152 Some(salt.to_vec()),
153 encrypted_record.to_bytes().to_vec(),
154 Some(3),
155 Duration::from_secs(10),
156 )
157 .await?;
158
159 Ok(())
160 }
161
162 pub async fn get_records(&self, unix_minute: u64) -> HashSet<Record> {
163 let topic_sign = crate::crypto::keys::signing_keypair(&self.topic_id, unix_minute);
164 let encryption_key = crate::crypto::keys::encryption_keypair(
165 &self.topic_id,
166 &self.secret_rotation.clone().unwrap_or_default(),
167 self.initial_secret_hash,
168 unix_minute,
169 );
170 let salt = crate::crypto::keys::salt(&self.topic_id, unix_minute);
171
172 let records_iter = self.dht
174 .get(
175 topic_sign.verifying_key().into(),
176 Some(salt.to_vec()),
177 None,
178 Duration::from_secs(10),
179 )
180 .await
181 .unwrap_or_default();
182
183 records_iter
184 .iter()
185 .filter_map(
186 |record| match EncryptedRecord::from_bytes(record.value().to_vec()) {
187 Ok(encrypted_record) => match encrypted_record.decrypt(&encryption_key) {
188 Ok(record) => match record.verify(&self.topic_id.hash(), unix_minute) {
189 Ok(_) => match record.node_id().eq(self.node_id.as_bytes()) {
190 true => None,
191 false => Some(record),
192 },
193 Err(_) => None,
194 },
195 Err(_) => None,
196 },
197 Err(_) => None,
198 },
199 )
200 .collect::<HashSet<_>>()
201 }
202}
203
204impl EncryptedRecord {
205 pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
206 let one_time_key_bytes: [u8; 32] = decryption_key
207 .decrypt(&self.encrypted_decryption_key)?
208 .as_slice()
209 .try_into()?;
210 let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
211
212 let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
213 let record = Record::from_bytes(decrypted_record)?;
214 Ok(record)
215 }
216
217 pub fn to_bytes(&self) -> Vec<u8> {
218 let mut buf = Vec::new();
219 let encrypted_record_len = self.encrypted_record.len() as u32;
220 buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
221 buf.extend_from_slice(&self.encrypted_record);
222 buf.extend_from_slice(&self.encrypted_decryption_key);
223 buf
224 }
225
226 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
227 let (encrypted_record_len, buf) = buf.split_at(4);
228 let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
229 let (encrypted_record, encrypted_decryption_key) =
230 buf.split_at(encrypted_record_len as usize);
231
232 Ok(Self {
233 encrypted_record: encrypted_record.to_vec(),
234 encrypted_decryption_key: encrypted_decryption_key.to_vec(),
235 })
236 }
237}
238
239impl Record {
240 pub fn sign(
241 topic: [u8; 32],
242 unix_minute: u64,
243 node_id: [u8; 32],
244 active_peers: [[u8; 32]; 5],
245 last_message_hashes: [[u8; 32]; 5],
246 signing_key: &ed25519_dalek::SigningKey,
247 ) -> Self {
248 let mut signature_data = Vec::new();
249 signature_data.extend_from_slice(&topic);
250 signature_data.extend_from_slice(&unix_minute.to_le_bytes());
251 signature_data.extend_from_slice(&node_id);
252 for active_peer in active_peers {
253 signature_data.extend_from_slice(&active_peer);
254 }
255 for last_message_hash in last_message_hashes {
256 signature_data.extend_from_slice(&last_message_hash);
257 }
258 let mut signing_key = signing_key.clone();
259 let signature = signing_key.sign(&signature_data);
260 Self {
261 topic,
262 unix_minute,
263 node_id,
264 active_peers,
265 last_message_hashes,
266 signature: signature.to_bytes(),
267 }
268 }
269
270 pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
271 let (topic, buf) = buf.split_at(32);
272 let (unix_minute, buf) = buf.split_at(8);
273 let (node_id, mut buf) = buf.split_at(32);
274
275 let mut active_peers: [[u8; 32]; 5] = [[0; 32]; 5];
276 #[allow(clippy::needless_range_loop)]
277 for i in 0..active_peers.len() {
278 let (active_peer, _buf) = buf.split_at(32);
279 active_peers[i] = active_peer.try_into()?;
280 buf = _buf;
281 }
282 let mut last_message_hashes: [[u8; 32]; 5] = [[0; 32]; 5];
283 #[allow(clippy::needless_range_loop)]
284 for i in 0..last_message_hashes.len() {
285 let (last_message_hash, _buf) = buf.split_at(32);
286 last_message_hashes[i] = last_message_hash.try_into()?;
287 buf = _buf;
288 }
289
290 let (signature, buf) = buf.split_at(64);
291
292 if !buf.is_empty() {
293 bail!("buffer not empty after reconstruction")
294 }
295
296 Ok(Self {
297 topic: topic.try_into()?,
298 unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
299 node_id: node_id.try_into()?,
300 active_peers,
301 last_message_hashes,
302 signature: signature.try_into()?,
303 })
304 }
305
306 pub fn to_bytes(&self) -> Vec<u8> {
307 let mut buf = Vec::new();
308 buf.extend_from_slice(&self.topic);
309 buf.extend_from_slice(&self.unix_minute.to_le_bytes());
310 buf.extend_from_slice(&self.node_id);
311 for active_peer in self.active_peers {
312 buf.extend_from_slice(&active_peer);
313 }
314 for last_message_hash in self.last_message_hashes {
315 buf.extend_from_slice(&last_message_hash);
316 }
317 buf.extend_from_slice(&self.signature);
318 buf
319 }
320
321 pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
322 if self.topic != *actual_topic {
323 bail!("topic mismatch")
324 }
325 if self.unix_minute != actual_unix_minute {
326 bail!("unix minute mismatch")
327 }
328
329 let record_bytes = self.to_bytes();
330 let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
331 let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
332 let node_id = ed25519_dalek::VerifyingKey::from_bytes(&self.node_id)?;
333
334 node_id.verify_strict(signature_data.as_slice(), &signature)?;
335
336 Ok(())
337 }
338
339 pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
340 let one_time_key = ed25519_dalek::SigningKey::generate(&mut rand::thread_rng());
341 let p_key = one_time_key.verifying_key();
342 let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
343 let key_enc = encryption_key
344 .verifying_key()
345 .encrypt(&one_time_key.to_bytes())
346 .expect("encryption failed");
347
348 EncryptedRecord {
349 encrypted_record: data_enc,
350 encrypted_decryption_key: key_enc,
351 }
352 }
353}
354
355impl Record {
357 pub fn topic(&self) -> [u8; 32] {
358 self.topic
359 }
360
361 pub fn unix_minute(&self) -> u64 {
362 self.unix_minute
363 }
364
365 pub fn node_id(&self) -> [u8; 32] {
366 self.node_id
367 }
368
369 pub fn active_peers(&self) -> [[u8; 32]; 5] {
370 self.active_peers
371 }
372
373 pub fn last_message_hashes(&self) -> [[u8; 32]; 5] {
374 self.last_message_hashes
375 }
376
377 pub fn signature(&self) -> [u8; 64] {
378 self.signature
379 }
380}