use crate::append_only_zks::Azks;
use crate::ecvrf::{VRFKeyStorage, VRFPublicKey};
use crate::errors::{AkdError, DirectoryError, StorageError};
use crate::proof_structs::*;
use crate::storage::manager::StorageManager;
use crate::storage::types::{AkdLabel, AkdValue, DbRecord, ValueState, ValueStateRetrievalFlag};
use crate::storage::Database;
use crate::{helper_structs::LookupInfo, EpochHash, Node};
use log::{debug, error, info};
#[cfg(feature = "rand")]
use rand::{distributions::Alphanumeric, CryptoRng, Rng};
use std::collections::HashMap;
use std::marker::{PhantomData, Send, Sync};
use std::sync::Arc;
use winter_crypto::{Digest, Hasher};
#[cfg(feature = "rand")]
impl AkdValue {
pub fn random<R: CryptoRng + Rng>(rng: &mut R) -> Self {
Self::from_utf8_str(&get_random_str(rng))
}
}
#[cfg(feature = "rand")]
impl AkdLabel {
pub fn random<R: CryptoRng + Rng>(rng: &mut R) -> Self {
Self::from_utf8_str(&get_random_str(rng))
}
}
pub struct Directory<S: Database + Sync + Send, V, H> {
storage: StorageManager<S>,
vrf: V,
hasher: PhantomData<H>,
read_only: bool,
cache_lock: Arc<tokio::sync::RwLock<()>>,
}
impl<S: Database + Sync + Send, V: VRFKeyStorage, H: Hasher> Clone for Directory<S, V, H> {
fn clone(&self) -> Self {
Self {
storage: self.storage.clone(),
vrf: self.vrf.clone(),
hasher: self.hasher,
read_only: self.read_only,
cache_lock: self.cache_lock.clone(),
}
}
}
impl<S: Database + Sync + Send, V: VRFKeyStorage, H: Hasher> Directory<S, V, H> {
pub async fn new(
storage: &StorageManager<S>,
vrf: &V,
read_only: bool,
) -> Result<Self, AkdError> {
let azks = Directory::<S, V, H>::get_azks_from_storage(storage, false).await;
if read_only && azks.is_err() {
return Err(AkdError::Directory(DirectoryError::ReadOnlyDirectory(
format!(
"Cannot start directory in read-only mode when AZKS is missing, error: {:?}",
azks.err().take()
),
)));
} else if azks.is_err() {
let azks = Azks::new::<_, H>(storage).await?;
storage.set(DbRecord::Azks(azks.clone())).await?;
}
Ok(Directory {
storage: storage.clone(),
read_only,
hasher: PhantomData,
cache_lock: Arc::new(tokio::sync::RwLock::new(())),
vrf: vrf.clone(),
})
}
pub async fn publish(
&self,
updates: Vec<(AkdLabel, AkdValue)>,
) -> Result<EpochHash<H>, AkdError> {
if self.read_only {
return Err(AkdError::Directory(DirectoryError::ReadOnlyDirectory(
"Cannot publish while in read-only mode".to_string(),
)));
}
let _guard = self.cache_lock.read().await;
let mut update_set = Vec::<Node<H>>::new();
let mut user_data_update_set = Vec::<ValueState>::new();
let mut current_azks = self.retrieve_current_azks().await?;
let current_epoch = current_azks.get_latest_epoch();
let next_epoch = current_epoch + 1;
let mut keys: Vec<AkdLabel> = updates.iter().map(|(uname, _val)| uname.clone()).collect();
keys.sort_by(|a, b| a.cmp(b));
let all_user_versions_retrieved = self
.storage
.get_user_state_versions(&keys, ValueStateRetrievalFlag::LeqEpoch(current_epoch))
.await?;
info!(
"Retrieved {} previous user versions of {} requested",
all_user_versions_retrieved.len(),
keys.len()
);
let commitment_key = self.derive_commitment_key().await?;
for (uname, val) in updates {
match all_user_versions_retrieved.get(&uname) {
None => {
let latest_version = 1;
let label = self
.vrf
.get_node_label::<H>(&uname, false, latest_version)
.await?;
let value_to_add =
crate::utils::commit_value::<H>(&commitment_key.as_bytes(), &label, &val);
update_set.push(Node::<H> {
label,
hash: value_to_add,
});
let latest_state =
ValueState::new(uname, val, latest_version, label, next_epoch);
user_data_update_set.push(latest_state);
}
Some((_, previous_value)) if val == *previous_value => {
}
Some((previous_version, _)) => {
let latest_version = *previous_version + 1;
let stale_label = self
.vrf
.get_node_label::<H>(&uname, true, *previous_version)
.await?;
let fresh_label = self
.vrf
.get_node_label::<H>(&uname, false, latest_version)
.await?;
let stale_value_to_add = H::hash(&crate::EMPTY_VALUE);
let fresh_value_to_add = crate::utils::commit_value::<H>(
&commitment_key.as_bytes(),
&fresh_label,
&val,
);
update_set.push(Node::<H> {
label: stale_label,
hash: stale_value_to_add,
});
update_set.push(Node::<H> {
label: fresh_label,
hash: fresh_value_to_add,
});
let new_state =
ValueState::new(uname, val, latest_version, fresh_label, next_epoch);
user_data_update_set.push(new_state);
}
}
}
let insertion_set: Vec<Node<H>> = update_set.to_vec();
if insertion_set.is_empty() {
info!("After filtering for duplicated user information, there is no publish which is necessary (0 updates)");
let root_hash = current_azks.get_root_hash::<_, H>(&self.storage).await?;
return Ok(EpochHash(current_epoch, root_hash));
}
if let false = self.storage.begin_transaction().await {
error!("Transaction is already active");
return Err(AkdError::Storage(StorageError::Transaction(
"Transaction is already active".to_string(),
)));
}
info!("Starting database insertion");
current_azks
.batch_insert_leaves::<_, H>(&self.storage, insertion_set)
.await?;
let mut updates = vec![DbRecord::Azks(current_azks.clone())];
for update in user_data_update_set.into_iter() {
updates.push(DbRecord::ValueState(update));
}
self.storage.batch_set(updates).await?;
debug!("Committing transaction");
if let Err(err) = self.storage.commit_transaction().await {
let _ = self.storage.rollback_transaction().await;
return Err(AkdError::Storage(err));
} else {
debug!("Transaction committed");
}
let root_hash = current_azks
.get_root_hash_at_epoch::<_, H>(&self.storage, next_epoch)
.await?;
Ok(EpochHash(next_epoch, root_hash))
}
pub async fn lookup(&self, uname: AkdLabel) -> Result<LookupProof<H>, AkdError> {
let _guard = self.cache_lock.read().await;
let current_azks = self.retrieve_current_azks().await?;
let current_epoch = current_azks.get_latest_epoch();
let lookup_info = self.get_lookup_info(uname.clone(), current_epoch).await?;
self.lookup_with_info(uname, ¤t_azks, current_epoch, lookup_info)
.await
}
async fn lookup_with_info(
&self,
uname: AkdLabel,
current_azks: &Azks,
current_epoch: u64,
lookup_info: LookupInfo,
) -> Result<LookupProof<H>, AkdError> {
let current_version = lookup_info.value_state.version;
let commitment_key = self.derive_commitment_key().await?;
let plaintext_value = lookup_info.value_state.plaintext_val;
let existence_vrf = self
.vrf
.get_label_proof::<H>(&uname, false, current_version)
.await?;
let commitment_label = self
.vrf
.get_node_label_from_vrf_pf::<H>(existence_vrf)
.await?;
let lookup_proof = LookupProof {
epoch: lookup_info.value_state.epoch,
plaintext_value: plaintext_value.clone(),
version: lookup_info.value_state.version,
existence_vrf_proof: existence_vrf.to_bytes().to_vec(),
existence_proof: current_azks
.get_membership_proof(&self.storage, lookup_info.existent_label, current_epoch)
.await?,
marker_vrf_proof: self
.vrf
.get_label_proof::<H>(&uname, false, lookup_info.marker_version)
.await?
.to_bytes()
.to_vec(),
marker_proof: current_azks
.get_membership_proof(&self.storage, lookup_info.marker_label, current_epoch)
.await?,
freshness_vrf_proof: self
.vrf
.get_label_proof::<H>(&uname, true, current_version)
.await?
.to_bytes()
.to_vec(),
freshness_proof: current_azks
.get_non_membership_proof(&self.storage, lookup_info.non_existent_label)
.await?,
commitment_proof: crate::utils::get_commitment_proof::<H>(
&commitment_key.as_bytes(),
&commitment_label,
&plaintext_value,
)
.as_bytes()
.to_vec(),
};
Ok(lookup_proof)
}
pub async fn batch_lookup(&self, unames: &[AkdLabel]) -> Result<Vec<LookupProof<H>>, AkdError> {
let current_azks = self.retrieve_current_azks().await?;
let current_epoch = current_azks.get_latest_epoch();
let mut lookup_labels = Vec::new();
let mut lookup_infos = Vec::new();
for uname in unames {
let lookup_info = self.get_lookup_info(uname.clone(), current_epoch).await?;
lookup_infos.push(lookup_info.clone());
lookup_labels.push(lookup_info.existent_label);
lookup_labels.push(lookup_info.marker_label);
lookup_labels.push(lookup_info.non_existent_label);
}
let lookup_prefixes_set = crate::utils::build_lookup_prefixes_set(&lookup_labels);
current_azks
.bfs_preload_nodes::<_, H>(&self.storage, lookup_prefixes_set)
.await?;
assert_eq!(unames.len(), lookup_infos.len());
let mut lookup_proofs = Vec::new();
for i in 0..unames.len() {
lookup_proofs.push(
self.lookup_with_info(
unames[i].clone(),
¤t_azks,
current_epoch,
lookup_infos[i].clone(),
)
.await?,
);
}
Ok(lookup_proofs)
}
async fn get_lookup_info(&self, uname: AkdLabel, epoch: u64) -> Result<LookupInfo, AkdError> {
match self
.storage
.get_user_state(&uname, ValueStateRetrievalFlag::LeqEpoch(epoch))
.await
{
Err(_) => {
match std::str::from_utf8(&uname) {
Ok(name) => Err(AkdError::Storage(StorageError::NotFound(format!(
"User {} at epoch {}",
name, epoch
)))),
_ => Err(AkdError::Storage(StorageError::NotFound(format!(
"User {:?} at epoch {}",
uname, epoch
)))),
}
}
Ok(latest_st) => {
let version = latest_st.version;
let marker_version = 1 << get_marker_version(version);
let existent_label = self.vrf.get_node_label::<H>(&uname, false, version).await?;
let marker_label = self
.vrf
.get_node_label::<H>(&uname, false, marker_version)
.await?;
let non_existent_label =
self.vrf.get_node_label::<H>(&uname, true, version).await?;
Ok(LookupInfo {
value_state: latest_st,
marker_version,
existent_label,
marker_label,
non_existent_label,
})
}
}
}
pub async fn key_history(
&self,
uname: &AkdLabel,
params: HistoryParams,
) -> Result<HistoryProof<H>, AkdError> {
let _guard = self.cache_lock.read().await;
let current_azks = self.retrieve_current_azks().await?;
let current_epoch = current_azks.get_latest_epoch();
let mut user_data = self.storage.get_user_data(uname).await?.states;
user_data.sort_by(|a, b| b.epoch.cmp(&a.epoch));
user_data = match params {
HistoryParams::Complete => user_data,
HistoryParams::MostRecent(n) => user_data.into_iter().take(n).collect::<Vec<_>>(),
HistoryParams::SinceEpoch(epoch) => {
user_data = user_data
.into_iter()
.filter(|val| val.epoch >= epoch)
.collect::<Vec<_>>();
user_data.sort_by(|a, b| b.epoch.cmp(&a.epoch));
user_data
}
};
if user_data.is_empty() {
let msg = if let Ok(username_str) = std::str::from_utf8(uname) {
format!("User {}", username_str)
} else {
format!("User {:?}", uname)
};
return Err(AkdError::Storage(StorageError::NotFound(msg)));
}
let mut update_proofs = Vec::<UpdateProof<H>>::new();
let mut last_version = 0;
for user_state in user_data {
if user_state.epoch <= current_epoch {
let proof = self.create_single_update_proof(uname, &user_state).await?;
update_proofs.push(proof);
last_version = if user_state.version > last_version {
user_state.version
} else {
last_version
};
}
}
let next_marker = get_marker_version(last_version) + 1;
let final_marker = get_marker_version(current_epoch);
let mut next_few_vrf_proofs = Vec::<Vec<u8>>::new();
let mut non_existence_of_next_few = Vec::<NonMembershipProof<H>>::new();
for ver in last_version + 1..(1 << next_marker) {
let label_for_ver = self.vrf.get_node_label::<H>(uname, false, ver).await?;
let non_existence_of_ver = current_azks
.get_non_membership_proof(&self.storage, label_for_ver)
.await?;
non_existence_of_next_few.push(non_existence_of_ver);
next_few_vrf_proofs.push(
self.vrf
.get_label_proof::<H>(uname, false, ver)
.await?
.to_bytes()
.to_vec(),
);
}
let mut future_marker_vrf_proofs = Vec::<Vec<u8>>::new();
let mut non_existence_of_future_markers = Vec::<NonMembershipProof<H>>::new();
for marker_power in next_marker..final_marker + 1 {
let ver = 1 << marker_power;
let label_for_ver = self.vrf.get_node_label::<H>(uname, false, ver).await?;
let non_existence_of_ver = current_azks
.get_non_membership_proof(&self.storage, label_for_ver)
.await?;
non_existence_of_future_markers.push(non_existence_of_ver);
future_marker_vrf_proofs.push(
self.vrf
.get_label_proof::<H>(uname, false, ver)
.await?
.to_bytes()
.to_vec(),
);
}
Ok(HistoryProof {
update_proofs,
next_few_vrf_proofs,
non_existence_of_next_few,
future_marker_vrf_proofs,
non_existence_of_future_markers,
})
}
pub async fn poll_for_azks_changes(
&self,
period: tokio::time::Duration,
change_detected: Option<tokio::sync::mpsc::Sender<()>>,
) -> Result<(), AkdError> {
let mut last = Directory::<S, V, H>::get_azks_from_storage(&self.storage, false).await?;
loop {
tokio::time::sleep(period).await;
let latest = Directory::<S, V, H>::get_azks_from_storage(&self.storage, true).await?;
if latest.latest_epoch > last.latest_epoch {
{
let _guard = self.cache_lock.write().await;
self.storage.flush_cache().await;
last =
Directory::<S, V, H>::get_azks_from_storage(&self.storage, false).await?;
if let Some(channel) = &change_detected {
channel.send(()).await.map_err(|send_err| {
AkdError::Storage(StorageError::Connection(format!(
"Tokio MPSC sender failed to publish notification with error {:?}",
send_err
)))
})?;
}
}
}
}
#[allow(unreachable_code)]
Ok(())
}
pub async fn audit(
&self,
audit_start_ep: u64,
audit_end_ep: u64,
) -> Result<AppendOnlyProof<H>, AkdError> {
let _guard = self.cache_lock.read().await;
let current_azks = self.retrieve_current_azks().await?;
let current_epoch = current_azks.get_latest_epoch();
if audit_start_ep >= audit_end_ep {
Err(AkdError::Directory(DirectoryError::InvalidEpoch(format!(
"Start epoch {} is greater than or equal the end epoch {}",
audit_start_ep, audit_end_ep
))))
} else if current_epoch < audit_end_ep {
Err(AkdError::Directory(DirectoryError::InvalidEpoch(format!(
"End epoch {} is greater than the current epoch {}",
audit_end_ep, current_epoch
))))
} else {
current_azks
.get_append_only_proof::<_, H>(&self.storage, audit_start_ep, audit_end_ep)
.await
}
}
pub async fn retrieve_current_azks(&self) -> Result<Azks, crate::errors::AkdError> {
Directory::<S, V, H>::get_azks_from_storage(&self.storage, false).await
}
async fn get_azks_from_storage(
storage: &StorageManager<S>,
ignore_cache: bool,
) -> Result<Azks, crate::errors::AkdError> {
let got = if ignore_cache {
storage
.get_direct::<Azks>(&crate::append_only_zks::DEFAULT_AZKS_KEY)
.await?
} else {
storage
.get::<Azks>(&crate::append_only_zks::DEFAULT_AZKS_KEY)
.await?
};
match got {
DbRecord::Azks(azks) => Ok(azks),
_ => {
error!("No AZKS can be found. You should re-initialize the directory to create a new one");
Err(AkdError::Storage(StorageError::NotFound(
"AZKS not found".to_string(),
)))
}
}
}
pub async fn get_public_key(&self) -> Result<VRFPublicKey, AkdError> {
Ok(self.vrf.get_vrf_public_key().await?)
}
async fn create_single_update_proof(
&self,
uname: &AkdLabel,
user_state: &ValueState,
) -> Result<UpdateProof<H>, AkdError> {
let epoch = user_state.epoch;
let plaintext_value = &user_state.plaintext_val;
let version = user_state.version;
let label_at_ep = self.vrf.get_node_label::<H>(uname, false, version).await?;
let current_azks = self.retrieve_current_azks().await?;
let existence_vrf = self.vrf.get_label_proof::<H>(uname, false, version).await?;
let existence_vrf_proof = existence_vrf.to_bytes().to_vec();
let existence_label = self
.vrf
.get_node_label_from_vrf_pf::<H>(existence_vrf)
.await?;
let existence_at_ep = current_azks
.get_membership_proof(&self.storage, label_at_ep, epoch)
.await?;
let mut previous_version_stale_at_ep = Option::None;
let mut previous_version_vrf_proof = Option::None;
if version > 1 {
let prev_label_at_ep = self
.vrf
.get_node_label::<H>(uname, true, version - 1)
.await?;
previous_version_stale_at_ep = Option::Some(
current_azks
.get_membership_proof(&self.storage, prev_label_at_ep, epoch)
.await?,
);
previous_version_vrf_proof = Option::Some(
self.vrf
.get_label_proof::<H>(uname, true, version - 1)
.await?
.to_bytes()
.to_vec(),
);
}
let commitment_key = self.derive_commitment_key().await?;
let commitment_proof = crate::utils::get_commitment_proof::<H>(
&commitment_key.as_bytes(),
&existence_label,
plaintext_value,
)
.as_bytes()
.to_vec();
Ok(UpdateProof {
epoch,
version,
plaintext_value: plaintext_value.clone(),
existence_vrf_proof,
existence_at_ep,
previous_version_vrf_proof,
previous_version_stale_at_ep,
commitment_proof,
})
}
pub async fn get_root_hash_at_epoch(
&self,
current_azks: &Azks,
epoch: u64,
) -> Result<H::Digest, AkdError> {
let _guard = self.cache_lock.read().await;
current_azks
.get_root_hash_at_epoch::<_, H>(&self.storage, epoch)
.await
}
pub async fn get_root_hash(&self, current_azks: &Azks) -> Result<H::Digest, AkdError> {
self.get_root_hash_at_epoch(current_azks, current_azks.get_latest_epoch())
.await
}
async fn derive_commitment_key(&self) -> Result<H::Digest, AkdError> {
let raw_key = self.vrf.retrieve().await?;
let commitment_key = H::hash(&raw_key);
Ok(commitment_key)
}
}
#[derive(Copy, Clone)]
pub enum HistoryParams {
Complete,
MostRecent(usize),
SinceEpoch(u64),
}
impl Default for HistoryParams {
fn default() -> Self {
Self::Complete
}
}
pub(crate) fn get_marker_version(version: u64) -> u64 {
(64 - version.leading_zeros() - 1).into()
}
#[cfg(feature = "rand")]
fn get_random_str<R: CryptoRng + Rng>(rng: &mut R) -> String {
rng.sample_iter(&Alphanumeric)
.take(32)
.map(char::from)
.collect()
}
type KeyHistoryHelper<D> = (Vec<D>, Vec<Option<D>>);
pub async fn get_key_history_hashes<S: Database + Sync + Send, H: Hasher, V: VRFKeyStorage>(
akd_dir: &Directory<S, V, H>,
history_proof: &HistoryProof<H>,
) -> Result<KeyHistoryHelper<H::Digest>, AkdError> {
let mut epoch_hash_map: HashMap<u64, H::Digest> = HashMap::new();
let mut root_hashes = Vec::<H::Digest>::new();
let mut previous_root_hashes = Vec::<Option<H::Digest>>::new();
let current_azks = akd_dir.retrieve_current_azks().await?;
for proof in &history_proof.update_proofs {
let hash = akd_dir
.get_root_hash_at_epoch(¤t_azks, proof.epoch)
.await?;
epoch_hash_map.insert(proof.epoch, hash);
root_hashes.push(hash);
}
for proof in &history_proof.update_proofs {
let epoch_in_question = proof.epoch - 1;
if epoch_in_question == 0 {
previous_root_hashes.push(None);
} else if let Some(hash) = epoch_hash_map.get(&epoch_in_question) {
previous_root_hashes.push(Some(*hash));
} else {
let hash = akd_dir
.get_root_hash_at_epoch(¤t_azks, proof.epoch - 1)
.await?;
previous_root_hashes.push(Some(hash));
}
}
Ok((root_hashes, previous_root_hashes))
}
pub async fn get_directory_root_hash_and_ep<
S: Database + Sync + Send,
H: Hasher,
V: VRFKeyStorage,
>(
akd_dir: &Directory<S, V, H>,
) -> Result<(H::Digest, u64), AkdError> {
let current_azks = akd_dir.retrieve_current_azks().await?;
let latest_epoch = current_azks.get_latest_epoch();
let root_hash = akd_dir.get_root_hash(¤t_azks).await?;
Ok((root_hash, latest_epoch))
}
#[derive(Debug, Clone)]
pub enum PublishCorruption {
UnmarkedStaleVersion(AkdLabel),
MarkVersionStale(AkdLabel, u64),
}
impl<S: Database + Sync + Send, V: VRFKeyStorage, H: Hasher> Directory<S, V, H> {
pub async fn publish_malicious_update(
&self,
updates: Vec<(AkdLabel, AkdValue)>,
corruption: PublishCorruption,
) -> Result<EpochHash<H>, AkdError> {
if self.read_only {
return Err(AkdError::Directory(DirectoryError::ReadOnlyDirectory(
"Cannot publish while in read-only mode".to_string(),
)));
}
let _guard = self.cache_lock.read().await;
let mut update_set = Vec::<Node<H>>::new();
if let PublishCorruption::MarkVersionStale(ref uname, version_number) = corruption {
let stale_label = self
.vrf
.get_node_label::<H>(uname, true, version_number)
.await?;
let stale_value_to_add = H::hash(&crate::EMPTY_VALUE);
update_set.push(Node::<H> {
label: stale_label,
hash: stale_value_to_add,
})
};
let mut user_data_update_set = Vec::<ValueState>::new();
let mut current_azks = self.retrieve_current_azks().await?;
let current_epoch = current_azks.get_latest_epoch();
let next_epoch = current_epoch + 1;
let mut keys: Vec<AkdLabel> = updates.iter().map(|(uname, _val)| uname.clone()).collect();
keys.sort_by(|a, b| a.cmp(b));
let all_user_versions_retrieved = self
.storage
.get_user_state_versions(&keys, ValueStateRetrievalFlag::LeqEpoch(current_epoch))
.await?;
info!(
"Retrieved {} previous user versions of {} requested",
all_user_versions_retrieved.len(),
keys.len()
);
let commitment_key = self.derive_commitment_key().await?;
for (uname, val) in updates {
match all_user_versions_retrieved.get(&uname) {
None => {
let latest_version = 1;
let label = self
.vrf
.get_node_label::<H>(&uname, false, latest_version)
.await?;
let value_to_add =
crate::utils::commit_value::<H>(&commitment_key.as_bytes(), &label, &val);
update_set.push(Node::<H> {
label,
hash: value_to_add,
});
let latest_state =
ValueState::new(uname, val, latest_version, label, next_epoch);
user_data_update_set.push(latest_state);
}
Some((_, previous_value)) if val == *previous_value => {
}
Some((previous_version, _)) => {
let latest_version = *previous_version + 1;
let stale_label = self
.vrf
.get_node_label::<H>(&uname, true, *previous_version)
.await?;
let fresh_label = self
.vrf
.get_node_label::<H>(&uname, false, latest_version)
.await?;
let stale_value_to_add = H::hash(&crate::EMPTY_VALUE);
let fresh_value_to_add = crate::utils::commit_value::<H>(
&commitment_key.as_bytes(),
&fresh_label,
&val,
);
match &corruption {
PublishCorruption::UnmarkedStaleVersion(target_uname) => {
if *target_uname != uname {
update_set.push(Node::<H> {
label: stale_label,
hash: stale_value_to_add,
})
}
}
_ => update_set.push(Node::<H> {
label: stale_label,
hash: stale_value_to_add,
}),
};
update_set.push(Node::<H> {
label: fresh_label,
hash: fresh_value_to_add,
});
let new_state =
ValueState::new(uname, val, latest_version, fresh_label, next_epoch);
user_data_update_set.push(new_state);
}
}
}
let insertion_set: Vec<Node<H>> = update_set.to_vec();
if insertion_set.is_empty() {
info!("After filtering for duplicated user information, there is no publish which is necessary (0 updates)");
let root_hash = current_azks.get_root_hash::<_, H>(&self.storage).await?;
return Ok(EpochHash(current_epoch, root_hash));
}
if let false = self.storage.begin_transaction().await {
error!("Transaction is already active");
return Err(AkdError::Storage(StorageError::Transaction(
"Transaction is already active".to_string(),
)));
}
info!("Starting database insertion");
current_azks
.batch_insert_leaves::<_, H>(&self.storage, insertion_set)
.await?;
let mut updates = vec![DbRecord::Azks(current_azks.clone())];
for update in user_data_update_set.into_iter() {
updates.push(DbRecord::ValueState(update));
}
self.storage.batch_set(updates).await?;
debug!("Committing transaction");
if let Err(err) = self.storage.commit_transaction().await {
let _ = self.storage.rollback_transaction().await;
return Err(AkdError::Storage(err));
} else {
debug!("Transaction committed");
}
let root_hash = current_azks
.get_root_hash_at_epoch::<_, H>(&self.storage, next_epoch)
.await?;
Ok(EpochHash(next_epoch, root_hash))
}
}