use std::{
collections::{HashMap, HashSet},
str::FromStr,
};
use anyhow::{Result, bail};
use ed25519_dalek::{Signer, SigningKey, VerifyingKey};
use getrandom::{SysRng, rand_core::UnwrapErr};
use ed25519_dalek_hpke::{Ed25519hpkeDecryption, Ed25519hpkeEncryption};
use serde::{Deserialize, Serialize};
use sha2::Digest;
use tokio_util::sync::CancellationToken;
use crate::{Config, Dht, RotationHandle};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TopicId([u8; 32]);
impl FromStr for TopicId {
type Err = anyhow::Error;
fn from_str(topic_name: &str) -> std::result::Result<Self, Self::Err> {
Ok(Self::new(topic_name.to_string()))
}
}
impl From<&str> for TopicId {
fn from(topic_name: &str) -> Self {
Self::new(topic_name.to_string())
}
}
impl From<String> for TopicId {
fn from(topic_name: String) -> Self {
Self::new(topic_name)
}
}
impl From<Vec<u8>> for TopicId {
fn from(topic_name: Vec<u8>) -> Self {
Self::new(topic_name)
}
}
impl TopicId {
pub fn new(topic_name: impl Into<Vec<u8>>) -> Self {
let mut topic_name_hash = sha2::Sha512::new();
topic_name_hash.update(topic_name.into());
Self(
topic_name_hash.finalize()[..32]
.try_into()
.expect("hashing 'topic_name' failed"),
)
}
pub fn from_hash(bytes: &[u8; 32]) -> Self {
Self(*bytes)
}
pub fn hash(&self) -> [u8; 32] {
self.0
}
}
#[derive(Debug, Clone)]
pub struct EncryptedRecord {
encrypted_record: Vec<u8>,
encrypted_decryption_key: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Record {
topic: [u8; 32],
unix_minute: u64,
pub_key: [u8; 32],
content: RecordContent,
signature: [u8; 64],
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct RecordContent(pub Vec<u8>);
impl RecordContent {
pub fn to<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
postcard::from_bytes::<T>(&self.0).map_err(|e| anyhow::anyhow!(e))
}
pub fn from_arbitrary<T: Serialize>(from: &T) -> anyhow::Result<Self> {
Ok(Self(
postcard::to_allocvec(from).map_err(|e| anyhow::anyhow!(e))?,
))
}
}
#[derive(Debug, Clone)]
pub struct RecordPublisher {
dht: Dht,
config: Config,
topic_id: TopicId,
pub_key: VerifyingKey,
signing_key: SigningKey,
secret_rotation: Option<RotationHandle>,
initial_secret_hash: [u8; 32],
}
#[derive(Debug)]
pub struct RecordPublisherBuilder {
topic_id: TopicId,
signing_key: SigningKey,
secret_rotation: Option<RotationHandle>,
initial_secret: Vec<u8>,
config: Config,
}
impl RecordPublisherBuilder {
pub fn secret_rotation(mut self, secret_rotation: RotationHandle) -> Self {
self.secret_rotation = Some(secret_rotation);
self
}
pub fn config(mut self, config: Config) -> Self {
self.config = config;
self
}
pub fn build(self) -> RecordPublisher {
RecordPublisher::new(
self.topic_id,
self.signing_key,
self.secret_rotation,
self.initial_secret,
self.config,
)
}
}
impl RecordPublisher {
pub fn builder(
topic_id: impl Into<TopicId>,
signing_key: SigningKey,
initial_secret: impl Into<Vec<u8>>,
) -> RecordPublisherBuilder {
RecordPublisherBuilder {
topic_id: topic_id.into(),
signing_key,
secret_rotation: None,
initial_secret: initial_secret.into(),
config: Config::default(),
}
}
pub fn new(
topic_id: impl Into<TopicId>,
signing_key: SigningKey,
secret_rotation: Option<RotationHandle>,
initial_secret: impl Into<Vec<u8>>,
config: Config,
) -> Self {
let mut initial_secret_hash = sha2::Sha512::new();
initial_secret_hash.update(initial_secret.into());
let initial_secret_hash: [u8; 32] = initial_secret_hash.finalize()[..32]
.try_into()
.expect("hashing failed");
Self {
dht: Dht::new(config.dht_config()),
config,
topic_id: topic_id.into(),
pub_key: signing_key.verifying_key(),
signing_key,
secret_rotation,
initial_secret_hash,
}
}
pub fn new_record<'a>(
&'a self,
unix_minute: u64,
record_content: impl Serialize + Deserialize<'a>,
) -> Result<Record> {
Record::sign(
self.topic_id.hash(),
unix_minute,
record_content,
&self.signing_key,
)
}
pub fn pub_key(&self) -> ed25519_dalek::VerifyingKey {
self.pub_key
}
pub fn topic_id(&self) -> &TopicId {
&self.topic_id
}
pub fn signing_key(&self) -> &ed25519_dalek::SigningKey {
&self.signing_key
}
pub fn secret_rotation(&self) -> Option<RotationHandle> {
self.secret_rotation.clone()
}
pub fn initial_secret_hash(&self) -> [u8; 32] {
self.initial_secret_hash
}
pub fn config(&self) -> &Config {
&self.config
}
}
impl RecordPublisher {
pub async fn publish_record(&self, record: Record, cancel_token: CancellationToken) -> Result<()> {
self.publish_record_cached_records(record, None, cancel_token).await
}
pub async fn publish_record_cached_records(
&self,
record: Record,
cached_records: Option<HashSet<Record>>,
cancel_token: CancellationToken,
) -> Result<()> {
let publish_fut = async {
let records = match cached_records {
Some(records) => records,
None => self.get_records(record.unix_minute(), cancel_token.clone()).await?,
};
tracing::debug!(
"RecordPublisher: found {} existing records for unix_minute {}",
records.len(),
record.unix_minute()
);
if records.len() >= self.config.bootstrap_config().max_bootstrap_records() {
tracing::debug!(
"RecordPublisher: max records reached ({}), skipping publish",
self.config.bootstrap_config().max_bootstrap_records()
);
return Ok(());
}
let sign_key = crate::crypto::keys::signing_keypair(self.topic_id(), record.unix_minute);
let salt = crate::crypto::keys::salt(self.topic_id(), record.unix_minute);
let encryption_key = crate::crypto::keys::encryption_keypair(
self.topic_id(),
&self.secret_rotation.clone().unwrap_or_default(),
self.initial_secret_hash,
record.unix_minute,
);
let encrypted_record = record.encrypt(&encryption_key);
let next_seq_num = i64::MAX;
tracing::debug!(
"RecordPublisher: publishing record to DHT for unix_minute {}",
record.unix_minute()
);
self.dht
.put_mutable(
sign_key.clone(),
Some(salt.to_vec()),
encrypted_record.to_bytes()?,
next_seq_num,
)
.await?;
tracing::debug!("RecordPublisher: successfully published to DHT");
Ok(())
};
tokio::select! {
_ = cancel_token.cancelled() => {
anyhow::bail!("publish cancelled");
}
res = publish_fut => {
res
}
}
}
pub async fn get_records(&self, unix_minute: u64, cancel_token: CancellationToken) -> Result<HashSet<Record>> {
let get_fut = async {
tracing::debug!(
"RecordPublisher: fetching records from DHT for unix_minute {}",
unix_minute
);
let topic_sign = crate::crypto::keys::signing_keypair(self.topic_id(), unix_minute);
let encryption_key = crate::crypto::keys::encryption_keypair(
self.topic_id(),
&self.secret_rotation.clone().unwrap_or_default(),
self.initial_secret_hash,
unix_minute,
);
let salt = crate::crypto::keys::salt(self.topic_id(), unix_minute);
let records_iter = self
.dht
.get(topic_sign.verifying_key(), Some(salt.to_vec()), None)
.await?;
tracing::debug!(
"RecordPublisher: received {} raw records from DHT",
records_iter.len()
);
let mut dedubed_records = HashMap::new();
for item in records_iter {
if let Ok(encrypted_record) = EncryptedRecord::from_bytes(item.value().to_vec())
&& let Ok(record) = encrypted_record.decrypt(&encryption_key)
&& record.verify(&self.topic_id.hash(), unix_minute).is_ok()
&& !record.pub_key().eq(self.pub_key.as_bytes())
{
let pub_key = record.pub_key();
match dedubed_records.get(&pub_key) {
Some((seq, _)) if *seq >= item.seq() => {}
_ => {
dedubed_records.insert(pub_key, (item.seq(), record));
}
}
}
}
tracing::debug!(
"RecordPublisher: verified {} records (filtered self)",
dedubed_records.len()
);
Ok(dedubed_records
.into_values()
.map(|(_, record)| record)
.collect::<HashSet<_>>())
};
tokio::select! {
_ = cancel_token.cancelled() => {
anyhow::bail!("get_records cancelled");
}
res = get_fut => {
res
}
}
}
}
impl EncryptedRecord {
const MAX_SIZE: usize = 2048;
pub fn decrypt(&self, decryption_key: &ed25519_dalek::SigningKey) -> Result<Record> {
let one_time_key_bytes: [u8; 32] = decryption_key
.decrypt(&self.encrypted_decryption_key)?
.as_slice()
.try_into()?;
let one_time_key = ed25519_dalek::SigningKey::from_bytes(&one_time_key_bytes);
let decrypted_record = one_time_key.decrypt(&self.encrypted_record)?;
let record = Record::from_bytes(decrypted_record)?;
Ok(record)
}
pub fn to_bytes(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
let encrypted_record_len = self.encrypted_record.len() as u32;
buf.extend_from_slice(&encrypted_record_len.to_le_bytes());
buf.extend_from_slice(&self.encrypted_record);
buf.extend_from_slice(&self.encrypted_decryption_key);
if buf.len() > Self::MAX_SIZE {
bail!(
"EncryptedRecord serialization exceeds maximum size, the max is set generously so this should never happen, if so your code is using it manually"
);
}
Ok(buf)
}
pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
if buf.len() < 4 {
bail!("buffer too short for EncryptedRecord deserialization")
}
let (encrypted_record_len, buf) = buf.split_at(4);
let encrypted_record_len = u32::from_le_bytes(encrypted_record_len.try_into()?);
const ENCRYPTED_KEY_LENGTH: usize = 88;
let expected_payload_len = encrypted_record_len
.checked_add(ENCRYPTED_KEY_LENGTH as u32)
.ok_or_else(|| anyhow::anyhow!("encrypted record length overflow"))?;
if encrypted_record_len > Self::MAX_SIZE as u32 {
bail!("encrypted record length exceeds maximum allowed size")
}
if buf.len() != expected_payload_len as usize {
bail!("buffer length does not match expected encrypted record length")
}
let (encrypted_record, encrypted_decryption_key) =
buf.split_at(encrypted_record_len as usize);
Ok(Self {
encrypted_record: encrypted_record.to_vec(),
encrypted_decryption_key: encrypted_decryption_key.to_vec(),
})
}
}
impl Record {
pub fn sign<'a>(
topic: [u8; 32],
unix_minute: u64,
record_content: impl Serialize + Deserialize<'a>,
signing_key: &ed25519_dalek::SigningKey,
) -> anyhow::Result<Self> {
let record_content = RecordContent::from_arbitrary(&record_content)?;
let mut signature_data = Vec::new();
signature_data.extend_from_slice(&topic);
signature_data.extend_from_slice(&unix_minute.to_le_bytes());
signature_data.extend_from_slice(&signing_key.verifying_key().to_bytes());
signature_data.extend(&record_content.clone().0);
let signature = signing_key.sign(&signature_data);
Ok(Self {
topic,
unix_minute,
pub_key: signing_key.verifying_key().to_bytes(),
content: record_content,
signature: signature.to_bytes(),
})
}
pub fn from_bytes(buf: Vec<u8>) -> Result<Self> {
if buf.len() < 32 + 8 + 32 + 64 {
bail!("buffer too short for Record deserialization")
}
let (topic, buf) = buf.split_at(32);
let (unix_minute, buf) = buf.split_at(8);
let (pub_key, buf) = buf.split_at(32);
let (record_content, buf) = buf.split_at(buf.len() - 64);
let (signature, buf) = buf.split_at(64);
if !buf.is_empty() {
bail!("buffer not empty after reconstruction")
}
Ok(Self {
topic: topic.try_into()?,
unix_minute: u64::from_le_bytes(unix_minute.try_into()?),
pub_key: pub_key.try_into()?,
content: RecordContent(record_content.to_vec()),
signature: signature.try_into()?,
})
}
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&self.topic);
buf.extend_from_slice(&self.unix_minute.to_le_bytes());
buf.extend_from_slice(&self.pub_key);
buf.extend(&self.content.0);
buf.extend_from_slice(&self.signature);
buf
}
pub fn verify(&self, actual_topic: &[u8; 32], actual_unix_minute: u64) -> Result<()> {
if self.topic != *actual_topic {
bail!("topic mismatch")
}
if self.unix_minute != actual_unix_minute {
bail!("unix minute mismatch")
}
let record_bytes = self.to_bytes();
let signature_data = record_bytes[..record_bytes.len() - 64].to_vec();
let signature = ed25519_dalek::Signature::from_bytes(&self.signature);
let pub_key = ed25519_dalek::VerifyingKey::from_bytes(&self.pub_key)?;
pub_key.verify_strict(signature_data.as_slice(), &signature)?;
Ok(())
}
pub fn encrypt(&self, encryption_key: &ed25519_dalek::SigningKey) -> EncryptedRecord {
let mut csprng = UnwrapErr(SysRng);
let one_time_key = ed25519_dalek::SigningKey::generate(&mut csprng);
let p_key = one_time_key.verifying_key();
let data_enc = p_key.encrypt(&self.to_bytes()).expect("encryption failed");
let key_enc = encryption_key
.verifying_key()
.encrypt(&one_time_key.to_bytes())
.expect("encryption failed");
EncryptedRecord {
encrypted_record: data_enc,
encrypted_decryption_key: key_enc,
}
}
}
impl Record {
pub fn topic(&self) -> [u8; 32] {
self.topic
}
pub fn unix_minute(&self) -> u64 {
self.unix_minute
}
pub fn pub_key(&self) -> [u8; 32] {
self.pub_key
}
pub fn content<'a, T: Deserialize<'a>>(&'a self) -> anyhow::Result<T> {
self.content.to::<T>()
}
pub fn signature(&self) -> [u8; 64] {
self.signature
}
}