use bc_components::ARID;
use bc_envelope::Envelope;
use bc_ur::UREncodable;
use dcbor::CBOREncodable;
use mainline::{Dht, MutableItem, SigningKey};
use super::error::Error as MainlineError;
use crate::{
Error, KvStore, Result,
arid_derivation::{derive_mainline_key, obfuscate_with_arid},
};
pub struct MainlineDhtKv {
dht: mainline::async_dht::AsyncDht,
max_value_size: usize,
salt: Option<Vec<u8>>,
}
impl MainlineDhtKv {
pub async fn new() -> Result<Self> {
let dht = Dht::client().map_err(MainlineError::from)?.as_async();
dht.bootstrapped().await;
Ok(Self {
dht,
max_value_size: 1000, salt: None, })
}
pub fn with_max_size(mut self, size: usize) -> Self {
self.max_value_size = size;
self
}
pub fn with_salt(mut self, salt: Vec<u8>) -> Self {
self.salt = Some(salt);
self
}
fn derive_signing_key(arid: &ARID) -> SigningKey {
let key_bytes = derive_mainline_key(arid);
let mut seed = [0u8; 32];
seed[..20].copy_from_slice(&key_bytes[..20]);
for i in 20..32 {
seed[i] = key_bytes[i % 20].wrapping_mul(i as u8);
}
SigningKey::from_bytes(&seed)
}
}
#[async_trait::async_trait(?Send)]
impl KvStore for MainlineDhtKv {
async fn put(
&self,
arid: &ARID,
envelope: &Envelope,
ttl_seconds: Option<u64>,
verbose: bool,
) -> Result<String> {
self.put_impl(arid, envelope, ttl_seconds, verbose).await
}
async fn get(
&self,
arid: &ARID,
timeout_seconds: Option<u64>,
verbose: bool,
) -> Result<Option<Envelope>> {
self.get_impl(arid, timeout_seconds, verbose).await
}
async fn exists(&self, arid: &ARID) -> Result<bool> {
self.exists_impl(arid).await
}
}
impl MainlineDhtKv {
async fn put_impl(
&self,
arid: &ARID,
envelope: &Envelope,
_ttl_seconds: Option<u64>, verbose: bool,
) -> Result<String> {
use crate::logging::verbose_println;
if verbose {
verbose_println("Starting Mainline DHT put operation");
}
let bytes = envelope.to_cbor_data();
if verbose {
verbose_println(&format!("Envelope size: {} bytes", bytes.len()));
}
let obfuscated = obfuscate_with_arid(arid, &bytes);
if obfuscated.len() > self.max_value_size {
return Err(MainlineError::ValueTooLarge {
size: obfuscated.len(),
}
.into());
}
if verbose {
verbose_println("Obfuscated envelope data");
}
if verbose {
verbose_println("Deriving DHT signing key from ARID");
}
let signing_key = Self::derive_signing_key(arid);
let pubkey = signing_key.verifying_key().to_bytes();
let salt_opt = self.salt.as_deref();
if verbose {
verbose_println("Checking for existing value (write-once check)");
}
if self
.dht
.get_mutable_most_recent(&pubkey, salt_opt)
.await
.is_some()
{
return Err(Error::AlreadyExists { arid: arid.ur_string() });
}
if verbose {
verbose_println("Creating mutable DHT item");
}
let item = MutableItem::new(signing_key, &obfuscated, 1, salt_opt);
if verbose {
verbose_println("Putting value to DHT");
}
self.dht
.put_mutable(item, None)
.await
.map_err(MainlineError::from)?;
if verbose {
verbose_println("Mainline DHT put operation completed");
}
Ok(format!("dht://{}", hex::encode(pubkey)))
}
async fn get_impl(
&self,
arid: &ARID,
timeout_seconds: Option<u64>,
verbose: bool,
) -> Result<Option<Envelope>> {
use tokio::time::{Duration, Instant, sleep};
use crate::logging::{
verbose_newline, verbose_print_dot, verbose_println,
};
if verbose {
verbose_println("Starting Mainline DHT get operation");
}
if verbose {
verbose_println("Deriving DHT public key from ARID");
}
let signing_key = Self::derive_signing_key(arid);
let pubkey = signing_key.verifying_key().to_bytes();
let salt_opt = self.salt.as_deref();
let timeout = timeout_seconds.unwrap_or(30); let deadline = Instant::now() + Duration::from_secs(timeout);
let poll_interval = Duration::from_millis(1000);
if verbose {
verbose_println("Polling DHT for value");
}
loop {
let item =
self.dht.get_mutable_most_recent(&pubkey, salt_opt).await;
if let Some(mutable_item) = item {
if verbose {
verbose_newline();
verbose_println("Value found in DHT");
}
let obfuscated_bytes = mutable_item.value().to_vec();
let deobfuscated = obfuscate_with_arid(arid, &obfuscated_bytes);
if verbose {
verbose_println("Deobfuscated envelope data");
}
let envelope = Envelope::try_from_cbor_data(deobfuscated)?;
if verbose {
verbose_println("Mainline DHT get operation completed");
}
return Ok(Some(envelope));
}
if Instant::now() >= deadline {
if verbose {
verbose_newline();
verbose_println("Timeout reached, value not found");
}
return Ok(None);
}
if verbose {
verbose_print_dot();
}
sleep(poll_interval).await;
}
}
async fn exists_impl(&self, arid: &ARID) -> Result<bool> {
let signing_key = Self::derive_signing_key(arid);
let pubkey = signing_key.verifying_key().to_bytes();
let salt_opt = self.salt.as_deref();
let item = self.dht.get_mutable_most_recent(&pubkey, salt_opt).await;
Ok(item.is_some())
}
}